From b7a882f42b82d7f4c6ff0926e25060ac8d98f005 Mon Sep 17 00:00:00 2001 From: RipleyTom Date: Wed, 24 Apr 2024 19:27:50 +0200 Subject: [PATCH] Split normal sockets and p2p sockets handling v2 --- rpcs3/Emu/Cell/lv2/sys_net.cpp | 29 +-- rpcs3/Emu/Cell/lv2/sys_net/lv2_socket.cpp | 48 ++++- rpcs3/Emu/Cell/lv2/sys_net/lv2_socket.h | 3 +- rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2p.cpp | 10 +- .../Emu/Cell/lv2/sys_net/lv2_socket_p2ps.cpp | 36 ++-- .../Emu/Cell/lv2/sys_net/network_context.cpp | 165 +++++++++++------- rpcs3/Emu/Cell/lv2/sys_net/network_context.h | 31 +++- .../Emu/Cell/lv2/sys_net/sys_net_helpers.cpp | 4 +- rpcs3/Emu/Cell/lv2/sys_net/sys_net_helpers.h | 2 +- rpcs3/Emu/NP/np_handler.cpp | 14 +- rpcs3/Emu/NP/rpcn_client.cpp | 54 ++++++ rpcs3/Emu/NP/rpcn_client.h | 58 +----- 12 files changed, 299 insertions(+), 155 deletions(-) diff --git a/rpcs3/Emu/Cell/lv2/sys_net.cpp b/rpcs3/Emu/Cell/lv2/sys_net.cpp index 79b5067c87..abc4296033 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_net.cpp @@ -1250,7 +1250,7 @@ error_code sys_net_bnet_close(ppu_thread& ppu, s32 s) { // Ensures the socket has no lingering copy from the network thread - std::lock_guard nw_lock(g_fxo->get().s_nw_mutex); + std::lock_guard nw_lock(g_fxo->get().mutex_thread_loop); sock.reset(); } @@ -1279,7 +1279,7 @@ error_code sys_net_bnet_poll(ppu_thread& ppu, vm::ptr fds, s32 n lv2_obj::prepare_for_sleep(ppu); - std::unique_lock nw_lock(g_fxo->get().s_nw_mutex); + std::unique_lock nw_lock(g_fxo->get().mutex_thread_loop); std::shared_lock lock(id_manager::g_mutex); std::vector<::pollfd> _fds(nfds); @@ -1375,7 +1375,7 @@ error_code sys_net_bnet_poll(ppu_thread& ppu, vm::ptr fds, s32 n fds_buf[i].revents |= SYS_NET_POLLERR; signaled++; - g_fxo->get().s_to_awake.emplace_back(&ppu); + sock->queue_wake(&ppu); return true; } @@ -1412,7 +1412,7 @@ error_code sys_net_bnet_poll(ppu_thread& ppu, vm::ptr fds, s32 n return {}; } - std::lock_guard nw_lock(g_fxo->get().s_nw_mutex); + std::lock_guard nw_lock(g_fxo->get().mutex_thread_loop); if (signaled) { @@ -1443,7 +1443,7 @@ error_code sys_net_bnet_select(ppu_thread& ppu, s32 nfds, vm::ptrtv_sec.value() : 0, _timeout ? _timeout->tv_usec.value() : 0); atomic_t signaled{0}; @@ -1474,8 +1474,7 @@ error_code sys_net_bnet_select(ppu_thread& ppu, s32 nfds, vm::ptrget().s_nw_mutex); - + std::lock_guard nw_lock(g_fxo->get().mutex_thread_loop); reader_lock lock(id_manager::g_mutex); std::vector<::pollfd> _fds(nfds); @@ -1607,7 +1606,7 @@ error_code sys_net_bnet_select(ppu_thread& ppu, s32 nfds, vm::ptrget().s_to_awake.emplace_back(&ppu); + sock->queue_wake(&ppu); return true; } @@ -1652,7 +1651,7 @@ error_code sys_net_bnet_select(ppu_thread& ppu, s32 nfds, vm::ptrget().s_nw_mutex); + std::lock_guard nw_lock(g_fxo->get().mutex_thread_loop); if (signaled) { @@ -1749,6 +1748,14 @@ error_code lv2_socket::abort_socket(s32 flags) lv2_obj::append(ppu.get()); } + const u32 num_waiters = qcopy.size(); + if (num_waiters && (type == SYS_NET_SOCK_STREAM || type == SYS_NET_SOCK_DGRAM)) + { + auto& nc = g_fxo->get(); + const u32 prev_value = nc.num_polls.fetch_sub(num_waiters); + ensure(prev_value >= num_waiters); + } + lv2_obj::awake_all(); return CELL_OK; } @@ -1772,7 +1779,7 @@ error_code sys_net_abort(ppu_thread& ppu, s32 type, u64 arg, s32 flags) { case _socket: { - std::lock_guard nw_lock(g_fxo->get().s_nw_mutex); + std::lock_guard nw_lock(g_fxo->get().mutex_thread_loop); const auto sock = idm::get(static_cast(arg)); @@ -1813,7 +1820,7 @@ error_code sys_net_abort(ppu_thread& ppu, s32 type, u64 arg, s32 flags) } // Ensures the socket has no lingering copy from the network thread - g_fxo->get().s_nw_mutex.lock_unlock(); + g_fxo->get().mutex_thread_loop.lock_unlock(); return not_an_error(::narrow(sockets.size()) - failed); } diff --git a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket.cpp b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket.cpp index 27a66d4666..d5d53b6c70 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket.cpp @@ -1,5 +1,6 @@ #include "stdafx.h" #include "lv2_socket.h" +#include "network_context.h" LOG_CHANNEL(sys_net); @@ -67,13 +68,24 @@ void lv2_socket::poll_queue(std::shared_ptr ppu, bs_tget(); + const u32 prev_value = nc.num_polls.fetch_add(1); + if (!prev_value) + { + nc.num_polls.notify_one(); + } + } } -s32 lv2_socket::clear_queue(ppu_thread* ppu) +u32 lv2_socket::clear_queue(ppu_thread* ppu) { std::lock_guard lock(mutex); - s32 cleared = 0; + u32 cleared = 0; for (auto it = queue.begin(); it != queue.end();) { @@ -92,6 +104,13 @@ s32 lv2_socket::clear_queue(ppu_thread* ppu) events.store({}); } + if (cleared && (type == SYS_NET_SOCK_STREAM || type == SYS_NET_SOCK_DGRAM)) + { + // Makes sure network_context thread can go back to sleep if there is no active polling + const u32 prev_value = g_fxo->get().num_polls.fetch_sub(cleared); + ensure(prev_value >= cleared); + } + return cleared; } @@ -113,21 +132,46 @@ void lv2_socket::handle_events(const pollfd& native_pfd, [[maybe_unused]] bool u if (unset_connecting) set_connecting(false); #endif + u32 handled = 0; for (auto it = queue.begin(); it != queue.end();) { if (it->second(events_happening)) { it = queue.erase(it); + handled++; continue; } it++; } + if (handled && (type == SYS_NET_SOCK_STREAM || type == SYS_NET_SOCK_DGRAM)) + { + const u32 prev_value = g_fxo->get().num_polls.fetch_sub(handled); + ensure(prev_value >= handled); + } + if (queue.empty()) { events.store({}); } } } + +void lv2_socket::queue_wake(ppu_thread* ppu) +{ + switch (type) + { + case SYS_NET_SOCK_STREAM: + case SYS_NET_SOCK_DGRAM: + g_fxo->get().ppu_to_awake.emplace_back(ppu); + break; + case SYS_NET_SOCK_DGRAM_P2P: + case SYS_NET_SOCK_STREAM_P2P: + g_fxo->get().ppu_to_awake.emplace_back(ppu); + break; + default: + break; + } +} diff --git a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket.h b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket.h index 7bf5985dd8..8cc84b729a 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket.h +++ b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket.h @@ -70,8 +70,9 @@ public: bs_t get_events() const; void set_poll_event(bs_t event); void poll_queue(std::shared_ptr ppu, bs_t event, std::function)> poll_cb); - s32 clear_queue(ppu_thread*); + u32 clear_queue(ppu_thread*); void handle_events(const pollfd& native_fd, bool unset_connecting = false); + void queue_wake(ppu_thread* ppu); lv2_socket_family get_family() const; lv2_socket_type get_type() const; diff --git a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2p.cpp b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2p.cpp index d061996abe..e10d30e1a3 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2p.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2p.cpp @@ -121,16 +121,14 @@ s32 lv2_socket_p2p::bind(const sys_net_sockaddr& addr) socket_type real_socket{}; - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); { std::lock_guard list_lock(nc.list_p2p_ports_mutex); - if (!nc.list_p2p_ports.contains(p2p_port)) - { - nc.list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(p2p_port), std::forward_as_tuple(p2p_port)); - } + nc.create_p2p_port(p2p_port); auto& pport = ::at32(nc.list_p2p_ports, p2p_port); real_socket = pport.p2p_socket; + { std::lock_guard lock(pport.bound_p2p_vports_mutex); @@ -332,7 +330,7 @@ void lv2_socket_p2p::close() return; } - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); { std::lock_guard lock(nc.list_p2p_ports_mutex); ensure(nc.list_p2p_ports.contains(port)); diff --git a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.cpp b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.cpp index f1b341f494..9d8b8df618 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.cpp @@ -445,7 +445,7 @@ bool lv2_socket_p2ps::handle_listening(p2ps_encapsulated_tcp* tcp_header, [[mayb const u64 key_connected = (reinterpret_cast(op_addr)->sin_addr.s_addr) | (static_cast(tcp_header->src_port) << 48) | (static_cast(tcp_header->dst_port) << 32); { - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); auto& pport = ::at32(nc.list_p2p_ports, port); pport.bound_p2p_streams.emplace(key_connected, new_sock_id); } @@ -593,16 +593,14 @@ s32 lv2_socket_p2ps::bind(const sys_net_sockaddr& addr) socket_type real_socket{}; - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); { std::lock_guard list_lock(nc.list_p2p_ports_mutex); - if (!nc.list_p2p_ports.contains(p2p_port)) - { - nc.list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(p2p_port), std::forward_as_tuple(p2p_port)); - } + nc.create_p2p_port(p2p_port); auto& pport = ::at32(nc.list_p2p_ports, p2p_port); real_socket = pport.p2p_socket; + { // Ensures the socket & the bound list are updated at the same time to avoid races std::lock_guard vport_lock(pport.bound_p2p_vports_mutex); @@ -673,6 +671,12 @@ std::optional lv2_socket_p2ps::connect(const sys_net_sockaddr& addr) { std::lock_guard lock(mutex); + if (status != p2ps_stream_status::stream_closed) + { + sys_net.error("[P2PS] Called connect on a socket that is not closed!"); + return -SYS_NET_EALREADY; + } + p2ps_encapsulated_tcp send_hdr; const auto psa_in_p2p = reinterpret_cast(&addr); auto name = sys_net_addr_to_native_addr(addr); @@ -683,14 +687,14 @@ std::optional lv2_socket_p2ps::connect(const sys_net_sockaddr& addr) socket_type real_socket{}; - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); { std::lock_guard list_lock(nc.list_p2p_ports_mutex); - if (!nc.list_p2p_ports.contains(port)) - nc.list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(port), std::forward_as_tuple(port)); + nc.create_p2p_port(port); auto& pport = ::at32(nc.list_p2p_ports, port); real_socket = pport.p2p_socket; + { std::lock_guard lock(pport.bound_p2p_vports_mutex); if (vport == 0) @@ -752,6 +756,12 @@ std::optional, sys_net_sockaddr>> lv2_socket_p2p if (!data_available) { + if (status == p2ps_stream_status::stream_closed) + { + sys_net.error("[P2PS] Called recvfrom on closed socket!"); + return {{0, {}, {}}}; + } + if (so_nbio || (flags & SYS_NET_MSG_DONTWAIT)) { return {{-SYS_NET_EWOULDBLOCK, {}, {}}}; @@ -808,6 +818,12 @@ std::optional lv2_socket_p2ps::sendto([[maybe_unused]] s32 flags, const std lock.lock(); } + if (status == p2ps_stream_status::stream_closed) + { + sys_net.error("[P2PS] Called sendto on a closed socket!"); + return -SYS_NET_ECONNRESET; + } + constexpr u32 max_data_len = (65535 - (VPORT_P2P_HEADER_SIZE + sizeof(p2ps_encapsulated_tcp))); ::sockaddr_in name{}; @@ -857,7 +873,7 @@ void lv2_socket_p2ps::close() return; } - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); { std::lock_guard lock(nc.list_p2p_ports_mutex); ensure(nc.list_p2p_ports.contains(port)); diff --git a/rpcs3/Emu/Cell/lv2/sys_net/network_context.cpp b/rpcs3/Emu/Cell/lv2/sys_net/network_context.cpp index 6c284a14e6..e8f4b5197a 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/network_context.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_net/network_context.cpp @@ -12,13 +12,13 @@ LOG_CHANNEL(sys_net); s32 send_packet_from_p2p_port(const std::vector& data, const sockaddr_in& addr) { s32 res{}; - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); { std::lock_guard list_lock(nc.list_p2p_ports_mutex); if (nc.list_p2p_ports.contains(SCE_NP_PORT)) { auto& def_port = ::at32(nc.list_p2p_ports, SCE_NP_PORT); - res = ::sendto(def_port.p2p_socket, reinterpret_cast(data.data()), ::size32(data), 0, reinterpret_cast(&addr), sizeof(sockaddr_in)); + res = ::sendto(def_port.p2p_socket, reinterpret_cast(data.data()), ::size32(data), 0, reinterpret_cast(&addr), sizeof(sockaddr_in)); if (res == -1) sys_net.error("Failed to send signaling packet: %s", get_last_error(false, false)); @@ -35,7 +35,7 @@ s32 send_packet_from_p2p_port(const std::vector& data, const sockaddr_in& ad std::vector> get_rpcn_msgs() { std::vector> msgs; - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); { std::lock_guard list_lock(nc.list_p2p_ports_mutex); if (nc.list_p2p_ports.contains(SCE_NP_PORT)) @@ -59,7 +59,7 @@ std::vector> get_rpcn_msgs() std::vector get_sign_msgs() { std::vector msgs; - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); { std::lock_guard list_lock(nc.list_p2p_ports_mutex); if (nc.list_p2p_ports.contains(SCE_NP_PORT)) @@ -85,15 +85,31 @@ namespace np void init_np_handler_dependencies(); } -network_thread::network_thread() +void base_network_thread::wake_threads() +{ + ppu_to_awake.erase(std::unique(ppu_to_awake.begin(), ppu_to_awake.end()), ppu_to_awake.end()); + for (ppu_thread* ppu : ppu_to_awake) + { + network_clear_queue(*ppu); + lv2_obj::append(ppu); + } + + if (!ppu_to_awake.empty()) + { + lv2_obj::awake_all(); + } + ppu_to_awake.clear(); +} + +p2p_thread::p2p_thread() { np::init_np_handler_dependencies(); } -void network_thread::bind_sce_np_port() +void p2p_thread::bind_sce_np_port() { std::lock_guard list_lock(list_p2p_ports_mutex); - list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(SCE_NP_PORT), std::forward_as_tuple(SCE_NP_PORT)); + create_p2p_port(SCE_NP_PORT); } void network_thread::operator()() @@ -101,7 +117,7 @@ void network_thread::operator()() std::vector> socklist; socklist.reserve(lv2_socket::id_count); - s_to_awake.clear(); + ppu_to_awake.clear(); std::vector<::pollfd> fds(lv2_socket::id_count); #ifdef _WIN32 @@ -109,10 +125,14 @@ void network_thread::operator()() std::vector was_connecting(lv2_socket::id_count); #endif - std::vector<::pollfd> p2p_fd(lv2_socket::id_count); - while (thread_ctrl::state() != thread_state::aborting) { + if (!num_polls) + { + thread_ctrl::wait_on(num_polls, 0); + continue; + } + ensure(socklist.size() <= lv2_socket::id_count); // Wait with 1ms timeout @@ -122,47 +142,7 @@ void network_thread::operator()() ::poll(fds.data(), socklist.size(), 1); #endif - // Check P2P sockets for incoming packets(timeout could probably be set at 0) - { - std::lock_guard lock(list_p2p_ports_mutex); - std::memset(p2p_fd.data(), 0, p2p_fd.size() * sizeof(::pollfd)); - auto num_p2p_sockets = 0; - for (const auto& p2p_port : list_p2p_ports) - { - p2p_fd[num_p2p_sockets].events = POLLIN; - p2p_fd[num_p2p_sockets].revents = 0; - p2p_fd[num_p2p_sockets].fd = p2p_port.second.p2p_socket; - num_p2p_sockets++; - } - - if (num_p2p_sockets) - { -#ifdef _WIN32 - const auto ret_p2p = WSAPoll(p2p_fd.data(), num_p2p_sockets, 1); -#else - const auto ret_p2p = ::poll(p2p_fd.data(), num_p2p_sockets, 1); -#endif - if (ret_p2p > 0) - { - auto fd_index = 0; - for (auto& p2p_port : list_p2p_ports) - { - if ((p2p_fd[fd_index].revents & POLLIN) == POLLIN || (p2p_fd[fd_index].revents & POLLRDNORM) == POLLRDNORM) - { - while (p2p_port.second.recv_data()) - ; - } - fd_index++; - } - } - else if (ret_p2p < 0) - { - sys_net.error("[P2P] Error poll on master P2P socket: %d", get_last_error(false)); - } - } - } - - std::lock_guard lock(s_nw_mutex); + std::lock_guard lock(mutex_thread_loop); for (usz i = 0; i < socklist.size(); i++) { @@ -173,20 +153,7 @@ void network_thread::operator()() #endif } - s_to_awake.erase(std::unique(s_to_awake.begin(), s_to_awake.end()), s_to_awake.end()); - - for (ppu_thread* ppu : s_to_awake) - { - network_clear_queue(*ppu); - lv2_obj::append(ppu); - } - - if (!s_to_awake.empty()) - { - lv2_obj::awake_all(); - } - - s_to_awake.clear(); + wake_threads(); socklist.clear(); // Obtain all native active sockets @@ -216,3 +183,71 @@ void network_thread::operator()() } } } + +// Must be used under list_p2p_ports_mutex lock! +void p2p_thread::create_p2p_port(u16 p2p_port) +{ + if (!list_p2p_ports.contains(p2p_port)) + { + list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(p2p_port), std::forward_as_tuple(p2p_port)); + const u32 prev_value = num_p2p_ports.fetch_add(1); + if (!prev_value) + { + num_p2p_ports.notify_one(); + } + } +} + +void p2p_thread::operator()() +{ + std::vector<::pollfd> p2p_fd(lv2_socket::id_count); + + while (thread_ctrl::state() != thread_state::aborting) + { + if (!num_p2p_ports) + { + thread_ctrl::wait_on(num_p2p_ports, 0); + continue; + } + + // Check P2P sockets for incoming packets + auto num_p2p_sockets = 0; + { + std::lock_guard lock(list_p2p_ports_mutex); + std::memset(p2p_fd.data(), 0, p2p_fd.size() * sizeof(::pollfd)); + for (const auto& p2p_port : list_p2p_ports) + { + p2p_fd[num_p2p_sockets].events = POLLIN; + p2p_fd[num_p2p_sockets].revents = 0; + p2p_fd[num_p2p_sockets].fd = p2p_port.second.p2p_socket; + num_p2p_sockets++; + } + } + +#ifdef _WIN32 + const auto ret_p2p = WSAPoll(p2p_fd.data(), num_p2p_sockets, 1); +#else + const auto ret_p2p = ::poll(p2p_fd.data(), num_p2p_sockets, 1); +#endif + if (ret_p2p > 0) + { + std::lock_guard lock(list_p2p_ports_mutex); + auto fd_index = 0; + for (auto& p2p_port : list_p2p_ports) + { + if ((p2p_fd[fd_index].revents & POLLIN) == POLLIN || (p2p_fd[fd_index].revents & POLLRDNORM) == POLLRDNORM) + { + while (p2p_port.second.recv_data()) + ; + } + fd_index++; + } + + wake_threads(); + } + else if (ret_p2p < 0) + { + sys_net.error("[P2P] Error poll on master P2P socket: %d", get_last_error(false)); + } + } +} diff --git a/rpcs3/Emu/Cell/lv2/sys_net/network_context.h b/rpcs3/Emu/Cell/lv2/sys_net/network_context.h index 3814a4776a..5cb846bd88 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/network_context.h +++ b/rpcs3/Emu/Cell/lv2/sys_net/network_context.h @@ -7,19 +7,38 @@ #include "nt_p2p_port.h" -struct network_thread +struct base_network_thread { - std::vector s_to_awake; - shared_mutex s_nw_mutex; + std::vector ppu_to_awake; - shared_mutex list_p2p_ports_mutex; - std::map list_p2p_ports{}; + void wake_threads(); +}; + +struct network_thread : base_network_thread +{ + shared_mutex mutex_thread_loop; + atomic_t num_polls = 0; static constexpr auto thread_name = "Network Thread"; - network_thread(); + void operator()(); +}; + +struct p2p_thread : base_network_thread +{ + shared_mutex list_p2p_ports_mutex; + std::map list_p2p_ports; + atomic_t num_p2p_ports = 0; + + static constexpr auto thread_name = "Network P2P Thread"; + + p2p_thread(); + + void create_p2p_port(u16 p2p_port); + void bind_sce_np_port(); void operator()(); }; using network_context = named_thread; +using p2p_context = named_thread; diff --git a/rpcs3/Emu/Cell/lv2/sys_net/sys_net_helpers.cpp b/rpcs3/Emu/Cell/lv2/sys_net/sys_net_helpers.cpp index 1386e4a7e4..66494df084 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/sys_net_helpers.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_net/sys_net_helpers.cpp @@ -175,9 +175,9 @@ bool is_ip_public_address(const ::sockaddr_in& addr) return true; } -s32 network_clear_queue(ppu_thread& ppu) +u32 network_clear_queue(ppu_thread& ppu) { - s32 cleared = 0; + u32 cleared = 0; idm::select([&](u32, lv2_socket& sock) { diff --git a/rpcs3/Emu/Cell/lv2/sys_net/sys_net_helpers.h b/rpcs3/Emu/Cell/lv2/sys_net/sys_net_helpers.h index d82f1e057a..1243ec1933 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/sys_net_helpers.h +++ b/rpcs3/Emu/Cell/lv2/sys_net/sys_net_helpers.h @@ -23,7 +23,7 @@ sys_net_error get_last_error(bool is_blocking, bool is_connecting = false); sys_net_sockaddr native_addr_to_sys_net_addr(const ::sockaddr_storage& native_addr); ::sockaddr_in sys_net_addr_to_native_addr(const sys_net_sockaddr& sn_addr); bool is_ip_public_address(const ::sockaddr_in& addr); -s32 network_clear_queue(ppu_thread& ppu); +u32 network_clear_queue(ppu_thread& ppu); #ifdef _WIN32 void windows_poll(std::vector& fds, unsigned long nfds, int timeout, std::vector& connecting); diff --git a/rpcs3/Emu/NP/np_handler.cpp b/rpcs3/Emu/NP/np_handler.cpp index 74a484658e..d3780baf87 100644 --- a/rpcs3/Emu/NP/np_handler.cpp +++ b/rpcs3/Emu/NP/np_handler.cpp @@ -408,11 +408,11 @@ namespace np void np_handler::init_np_handler_dependencies() { - if (is_psn_active && g_cfg.net.psn_status == np_psn_status::psn_rpcn && g_fxo->is_init() && !m_inited_np_handler_dependencies) + if (is_psn_active && g_cfg.net.psn_status == np_psn_status::psn_rpcn && g_fxo->is_init() && !m_inited_np_handler_dependencies) { m_inited_np_handler_dependencies = true; - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); nc.bind_sce_np_port(); std::lock_guard lock(mutex_rpcn); @@ -817,6 +817,16 @@ namespace np string_to_online_name(rpcn->get_online_name(), online_name); string_to_avatar_url(rpcn->get_avatar_url(), avatar_url); public_ip_addr = rpcn->get_addr_sig(); + + if (!public_ip_addr) + { + rsx::overlays::queue_message(rpcn::rpcn_state_to_localized_string_id(rpcn::rpcn_state::failure_other)); + rpcn_log.error("Failed to get a reply from RPCN signaling!"); + is_psn_active = false; + rpcn->terminate_connection(); + return; + } + local_ip_addr = std::bit_cast>(rpcn->get_addr_local()); break; diff --git a/rpcs3/Emu/NP/rpcn_client.cpp b/rpcs3/Emu/NP/rpcn_client.cpp index fcbddee27d..44ce18be61 100644 --- a/rpcs3/Emu/NP/rpcn_client.cpp +++ b/rpcs3/Emu/NP/rpcn_client.cpp @@ -2594,4 +2594,58 @@ namespace rpcn return it == friend_infos.friends.end() ? std::nullopt : std::optional(*it); } + bool rpcn_client::is_connected() const + { + return connected; + } + + bool rpcn_client::is_authentified() const + { + return authentified; + } + rpcn_state rpcn_client::get_rpcn_state() const + { + return state; + } + + const std::string& rpcn_client::get_online_name() const + { + return online_name; + } + + const std::string& rpcn_client::get_avatar_url() const + { + return avatar_url; + } + + u32 rpcn_client::get_addr_sig() const + { + if (!addr_sig) + { + addr_sig.wait(0, static_cast(10'000'000'000)); + } + + return addr_sig.load(); + } + + u16 rpcn_client::get_port_sig() const + { + if (!port_sig) + { + port_sig.wait(0, static_cast(10'000'000'000)); + } + + return port_sig.load(); + } + + u32 rpcn_client::get_addr_local() const + { + return local_addr_sig.load(); + } + + void rpcn_client::update_local_addr(u32 addr) + { + local_addr_sig = std::bit_cast>(addr); + } + } // namespace rpcn diff --git a/rpcs3/Emu/NP/rpcn_client.h b/rpcs3/Emu/NP/rpcn_client.h index af432d787a..c1f9f1e11a 100644 --- a/rpcs3/Emu/NP/rpcn_client.h +++ b/rpcs3/Emu/NP/rpcn_client.h @@ -437,18 +437,9 @@ namespace rpcn void remove_message_cb(message_cb_func cb_func, void* cb_param); void mark_message_used(u64 id); - bool is_connected() const - { - return connected; - } - bool is_authentified() const - { - return authentified; - } - rpcn_state get_rpcn_state() const - { - return state; - } + bool is_connected() const; + bool is_authentified() const; + rpcn_state get_rpcn_state() const; void server_infos_updated(); @@ -495,44 +486,13 @@ namespace rpcn bool tus_delete_multislot_data(u32 req_id, SceNpCommunicationId& communication_id, const SceNpOnlineId& targetNpId, vm::cptr slotIdArray, s32 arrayNum, bool vuser); bool send_presence(const SceNpCommunicationId& pr_com_id, const std::string& pr_title, const std::string& pr_status, const std::string& pr_comment, const std::vector& pr_data); - const std::string& get_online_name() const - { - return online_name; - } - const std::string& get_avatar_url() const - { - return avatar_url; - } + const std::string& get_online_name() const; + const std::string& get_avatar_url() const; - u32 get_addr_sig() const - { - if (!addr_sig) - { - addr_sig.wait(0, static_cast(10'000'000'000)); - } - - return addr_sig.load(); - } - - u16 get_port_sig() const - { - if (!port_sig) - { - port_sig.wait(0, static_cast(10'000'000'000)); - } - - return port_sig.load(); - } - - u32 get_addr_local() const - { - return local_addr_sig.load(); - } - - void update_local_addr(u32 addr) - { - local_addr_sig = std::bit_cast>(addr); - } + u32 get_addr_sig() const; + u16 get_port_sig() const; + u32 get_addr_local() const; + void update_local_addr(u32 addr); private: bool get_reply(u64 expected_id, std::vector& data);