diff --git a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket.cpp b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket.cpp index 27a66d4666..fd2b1295db 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket.cpp @@ -1,5 +1,6 @@ #include "stdafx.h" #include "lv2_socket.h" +#include "network_context.h" LOG_CHANNEL(sys_net); @@ -67,6 +68,17 @@ void lv2_socket::poll_queue(std::shared_ptr ppu, bs_tget(); + const u32 prev_value = nc.num_polls.fetch_add(1); + if (!prev_value) + { + nc.num_polls.notify_one(); + } + } } s32 lv2_socket::clear_queue(ppu_thread* ppu) @@ -81,6 +93,14 @@ s32 lv2_socket::clear_queue(ppu_thread* ppu) { it = queue.erase(it); cleared++; + + // Makes sure network_context thread can go back to sleep if there is no active polling + if (type == SYS_NET_SOCK_STREAM || type == SYS_NET_SOCK_DGRAM) + { + const u32 prev_value = g_fxo->get().num_polls.fetch_sub(1); + ensure(prev_value); + } + continue; } diff --git a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2p.cpp b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2p.cpp index d061996abe..c76cc198ca 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2p.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2p.cpp @@ -121,16 +121,14 @@ s32 lv2_socket_p2p::bind(const sys_net_sockaddr& addr) socket_type real_socket{}; - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); { std::lock_guard list_lock(nc.list_p2p_ports_mutex); - if (!nc.list_p2p_ports.contains(p2p_port)) - { - nc.list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(p2p_port), std::forward_as_tuple(p2p_port)); - } + nc.create_p2p_port(p2p_port); auto& pport = ::at32(nc.list_p2p_ports, p2p_port); real_socket = pport.p2p_socket; + { std::lock_guard lock(pport.bound_p2p_vports_mutex); @@ -332,7 +330,7 @@ void lv2_socket_p2p::close() return; } - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); { std::lock_guard lock(nc.list_p2p_ports_mutex); ensure(nc.list_p2p_ports.contains(port)); diff --git a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.cpp b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.cpp index f1b341f494..3aaca99c4e 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.cpp @@ -445,7 +445,7 @@ bool lv2_socket_p2ps::handle_listening(p2ps_encapsulated_tcp* tcp_header, [[mayb const u64 key_connected = (reinterpret_cast(op_addr)->sin_addr.s_addr) | (static_cast(tcp_header->src_port) << 48) | (static_cast(tcp_header->dst_port) << 32); { - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); auto& pport = ::at32(nc.list_p2p_ports, port); pport.bound_p2p_streams.emplace(key_connected, new_sock_id); } @@ -593,16 +593,14 @@ s32 lv2_socket_p2ps::bind(const sys_net_sockaddr& addr) socket_type real_socket{}; - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); { std::lock_guard list_lock(nc.list_p2p_ports_mutex); - if (!nc.list_p2p_ports.contains(p2p_port)) - { - nc.list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(p2p_port), std::forward_as_tuple(p2p_port)); - } + nc.create_p2p_port(p2p_port); auto& pport = ::at32(nc.list_p2p_ports, p2p_port); real_socket = pport.p2p_socket; + { // Ensures the socket & the bound list are updated at the same time to avoid races std::lock_guard vport_lock(pport.bound_p2p_vports_mutex); @@ -683,14 +681,14 @@ std::optional lv2_socket_p2ps::connect(const sys_net_sockaddr& addr) socket_type real_socket{}; - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); { std::lock_guard list_lock(nc.list_p2p_ports_mutex); - if (!nc.list_p2p_ports.contains(port)) - nc.list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(port), std::forward_as_tuple(port)); + nc.create_p2p_port(port); auto& pport = ::at32(nc.list_p2p_ports, port); real_socket = pport.p2p_socket; + { std::lock_guard lock(pport.bound_p2p_vports_mutex); if (vport == 0) @@ -857,7 +855,7 @@ void lv2_socket_p2ps::close() return; } - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); { std::lock_guard lock(nc.list_p2p_ports_mutex); ensure(nc.list_p2p_ports.contains(port)); diff --git a/rpcs3/Emu/Cell/lv2/sys_net/network_context.cpp b/rpcs3/Emu/Cell/lv2/sys_net/network_context.cpp index 6c284a14e6..ab120a1beb 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/network_context.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_net/network_context.cpp @@ -12,13 +12,13 @@ LOG_CHANNEL(sys_net); s32 send_packet_from_p2p_port(const std::vector& data, const sockaddr_in& addr) { s32 res{}; - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); { std::lock_guard list_lock(nc.list_p2p_ports_mutex); if (nc.list_p2p_ports.contains(SCE_NP_PORT)) { auto& def_port = ::at32(nc.list_p2p_ports, SCE_NP_PORT); - res = ::sendto(def_port.p2p_socket, reinterpret_cast(data.data()), ::size32(data), 0, reinterpret_cast(&addr), sizeof(sockaddr_in)); + res = ::sendto(def_port.p2p_socket, reinterpret_cast(data.data()), ::size32(data), 0, reinterpret_cast(&addr), sizeof(sockaddr_in)); if (res == -1) sys_net.error("Failed to send signaling packet: %s", get_last_error(false, false)); @@ -35,7 +35,7 @@ s32 send_packet_from_p2p_port(const std::vector& data, const sockaddr_in& ad std::vector> get_rpcn_msgs() { std::vector> msgs; - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); { std::lock_guard list_lock(nc.list_p2p_ports_mutex); if (nc.list_p2p_ports.contains(SCE_NP_PORT)) @@ -59,7 +59,7 @@ std::vector> get_rpcn_msgs() std::vector get_sign_msgs() { std::vector msgs; - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); { std::lock_guard list_lock(nc.list_p2p_ports_mutex); if (nc.list_p2p_ports.contains(SCE_NP_PORT)) @@ -85,15 +85,15 @@ namespace np void init_np_handler_dependencies(); } -network_thread::network_thread() +p2p_thread::p2p_thread() { np::init_np_handler_dependencies(); } -void network_thread::bind_sce_np_port() +void p2p_thread::bind_sce_np_port() { std::lock_guard list_lock(list_p2p_ports_mutex); - list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(SCE_NP_PORT), std::forward_as_tuple(SCE_NP_PORT)); + create_p2p_port(SCE_NP_PORT); } void network_thread::operator()() @@ -109,10 +109,14 @@ void network_thread::operator()() std::vector was_connecting(lv2_socket::id_count); #endif - std::vector<::pollfd> p2p_fd(lv2_socket::id_count); - while (thread_ctrl::state() != thread_state::aborting) { + if (!num_polls) + { + thread_ctrl::wait_on(num_polls, 0); + continue; + } + ensure(socklist.size() <= lv2_socket::id_count); // Wait with 1ms timeout @@ -122,46 +126,6 @@ void network_thread::operator()() ::poll(fds.data(), socklist.size(), 1); #endif - // Check P2P sockets for incoming packets(timeout could probably be set at 0) - { - std::lock_guard lock(list_p2p_ports_mutex); - std::memset(p2p_fd.data(), 0, p2p_fd.size() * sizeof(::pollfd)); - auto num_p2p_sockets = 0; - for (const auto& p2p_port : list_p2p_ports) - { - p2p_fd[num_p2p_sockets].events = POLLIN; - p2p_fd[num_p2p_sockets].revents = 0; - p2p_fd[num_p2p_sockets].fd = p2p_port.second.p2p_socket; - num_p2p_sockets++; - } - - if (num_p2p_sockets) - { -#ifdef _WIN32 - const auto ret_p2p = WSAPoll(p2p_fd.data(), num_p2p_sockets, 1); -#else - const auto ret_p2p = ::poll(p2p_fd.data(), num_p2p_sockets, 1); -#endif - if (ret_p2p > 0) - { - auto fd_index = 0; - for (auto& p2p_port : list_p2p_ports) - { - if ((p2p_fd[fd_index].revents & POLLIN) == POLLIN || (p2p_fd[fd_index].revents & POLLRDNORM) == POLLRDNORM) - { - while (p2p_port.second.recv_data()) - ; - } - fd_index++; - } - } - else if (ret_p2p < 0) - { - sys_net.error("[P2P] Error poll on master P2P socket: %d", get_last_error(false)); - } - } - } - std::lock_guard lock(s_nw_mutex); for (usz i = 0; i < socklist.size(); i++) @@ -216,3 +180,69 @@ void network_thread::operator()() } } } + +// Must be used under list_p2p_ports_mutex lock! +void p2p_thread::create_p2p_port(u16 p2p_port) +{ + if (!list_p2p_ports.contains(p2p_port)) + { + list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(p2p_port), std::forward_as_tuple(p2p_port)); + const u32 prev_value = num_p2p_ports.fetch_add(1); + if (!prev_value) + { + num_p2p_ports.notify_one(); + } + } +} + +void p2p_thread::operator()() +{ + std::vector<::pollfd> p2p_fd(lv2_socket::id_count); + + while (thread_ctrl::state() != thread_state::aborting) + { + if (!num_p2p_ports) + { + thread_ctrl::wait_on(num_p2p_ports, 0); + continue; + } + + // Check P2P sockets for incoming packets + auto num_p2p_sockets = 0; + { + std::lock_guard lock(list_p2p_ports_mutex); + std::memset(p2p_fd.data(), 0, p2p_fd.size() * sizeof(::pollfd)); + for (const auto& p2p_port : list_p2p_ports) + { + p2p_fd[num_p2p_sockets].events = POLLIN; + p2p_fd[num_p2p_sockets].revents = 0; + p2p_fd[num_p2p_sockets].fd = p2p_port.second.p2p_socket; + num_p2p_sockets++; + } + } + +#ifdef _WIN32 + const auto ret_p2p = WSAPoll(p2p_fd.data(), num_p2p_sockets, 1); +#else + const auto ret_p2p = ::poll(p2p_fd.data(), num_p2p_sockets, 1); +#endif + if (ret_p2p > 0) + { + std::lock_guard lock(list_p2p_ports_mutex); + auto fd_index = 0; + for (auto& p2p_port : list_p2p_ports) + { + if ((p2p_fd[fd_index].revents & POLLIN) == POLLIN || (p2p_fd[fd_index].revents & POLLRDNORM) == POLLRDNORM) + { + while (p2p_port.second.recv_data()) + ; + } + fd_index++; + } + } + else if (ret_p2p < 0) + { + sys_net.error("[P2P] Error poll on master P2P socket: %d", get_last_error(false)); + } + } +} diff --git a/rpcs3/Emu/Cell/lv2/sys_net/network_context.h b/rpcs3/Emu/Cell/lv2/sys_net/network_context.h index 3814a4776a..ad3c51a1a8 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/network_context.h +++ b/rpcs3/Emu/Cell/lv2/sys_net/network_context.h @@ -11,15 +11,28 @@ struct network_thread { std::vector s_to_awake; shared_mutex s_nw_mutex; - - shared_mutex list_p2p_ports_mutex; - std::map list_p2p_ports{}; + atomic_t num_polls = 0; static constexpr auto thread_name = "Network Thread"; - network_thread(); + void operator()(); +}; + +struct p2p_thread +{ + shared_mutex list_p2p_ports_mutex; + std::map list_p2p_ports; + atomic_t num_p2p_ports = 0; + + static constexpr auto thread_name = "Network P2P Thread"; + + p2p_thread(); + + void create_p2p_port(u16 p2p_port); + void bind_sce_np_port(); void operator()(); }; using network_context = named_thread; +using p2p_context = named_thread; diff --git a/rpcs3/Emu/NP/np_handler.cpp b/rpcs3/Emu/NP/np_handler.cpp index 74a484658e..d3780baf87 100644 --- a/rpcs3/Emu/NP/np_handler.cpp +++ b/rpcs3/Emu/NP/np_handler.cpp @@ -408,11 +408,11 @@ namespace np void np_handler::init_np_handler_dependencies() { - if (is_psn_active && g_cfg.net.psn_status == np_psn_status::psn_rpcn && g_fxo->is_init() && !m_inited_np_handler_dependencies) + if (is_psn_active && g_cfg.net.psn_status == np_psn_status::psn_rpcn && g_fxo->is_init() && !m_inited_np_handler_dependencies) { m_inited_np_handler_dependencies = true; - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); nc.bind_sce_np_port(); std::lock_guard lock(mutex_rpcn); @@ -817,6 +817,16 @@ namespace np string_to_online_name(rpcn->get_online_name(), online_name); string_to_avatar_url(rpcn->get_avatar_url(), avatar_url); public_ip_addr = rpcn->get_addr_sig(); + + if (!public_ip_addr) + { + rsx::overlays::queue_message(rpcn::rpcn_state_to_localized_string_id(rpcn::rpcn_state::failure_other)); + rpcn_log.error("Failed to get a reply from RPCN signaling!"); + is_psn_active = false; + rpcn->terminate_connection(); + return; + } + local_ip_addr = std::bit_cast>(rpcn->get_addr_local()); break; diff --git a/rpcs3/Emu/NP/rpcn_client.cpp b/rpcs3/Emu/NP/rpcn_client.cpp index fcbddee27d..44ce18be61 100644 --- a/rpcs3/Emu/NP/rpcn_client.cpp +++ b/rpcs3/Emu/NP/rpcn_client.cpp @@ -2594,4 +2594,58 @@ namespace rpcn return it == friend_infos.friends.end() ? std::nullopt : std::optional(*it); } + bool rpcn_client::is_connected() const + { + return connected; + } + + bool rpcn_client::is_authentified() const + { + return authentified; + } + rpcn_state rpcn_client::get_rpcn_state() const + { + return state; + } + + const std::string& rpcn_client::get_online_name() const + { + return online_name; + } + + const std::string& rpcn_client::get_avatar_url() const + { + return avatar_url; + } + + u32 rpcn_client::get_addr_sig() const + { + if (!addr_sig) + { + addr_sig.wait(0, static_cast(10'000'000'000)); + } + + return addr_sig.load(); + } + + u16 rpcn_client::get_port_sig() const + { + if (!port_sig) + { + port_sig.wait(0, static_cast(10'000'000'000)); + } + + return port_sig.load(); + } + + u32 rpcn_client::get_addr_local() const + { + return local_addr_sig.load(); + } + + void rpcn_client::update_local_addr(u32 addr) + { + local_addr_sig = std::bit_cast>(addr); + } + } // namespace rpcn diff --git a/rpcs3/Emu/NP/rpcn_client.h b/rpcs3/Emu/NP/rpcn_client.h index af432d787a..c1f9f1e11a 100644 --- a/rpcs3/Emu/NP/rpcn_client.h +++ b/rpcs3/Emu/NP/rpcn_client.h @@ -437,18 +437,9 @@ namespace rpcn void remove_message_cb(message_cb_func cb_func, void* cb_param); void mark_message_used(u64 id); - bool is_connected() const - { - return connected; - } - bool is_authentified() const - { - return authentified; - } - rpcn_state get_rpcn_state() const - { - return state; - } + bool is_connected() const; + bool is_authentified() const; + rpcn_state get_rpcn_state() const; void server_infos_updated(); @@ -495,44 +486,13 @@ namespace rpcn bool tus_delete_multislot_data(u32 req_id, SceNpCommunicationId& communication_id, const SceNpOnlineId& targetNpId, vm::cptr slotIdArray, s32 arrayNum, bool vuser); bool send_presence(const SceNpCommunicationId& pr_com_id, const std::string& pr_title, const std::string& pr_status, const std::string& pr_comment, const std::vector& pr_data); - const std::string& get_online_name() const - { - return online_name; - } - const std::string& get_avatar_url() const - { - return avatar_url; - } + const std::string& get_online_name() const; + const std::string& get_avatar_url() const; - u32 get_addr_sig() const - { - if (!addr_sig) - { - addr_sig.wait(0, static_cast(10'000'000'000)); - } - - return addr_sig.load(); - } - - u16 get_port_sig() const - { - if (!port_sig) - { - port_sig.wait(0, static_cast(10'000'000'000)); - } - - return port_sig.load(); - } - - u32 get_addr_local() const - { - return local_addr_sig.load(); - } - - void update_local_addr(u32 addr) - { - local_addr_sig = std::bit_cast>(addr); - } + u32 get_addr_sig() const; + u16 get_port_sig() const; + u32 get_addr_local() const; + void update_local_addr(u32 addr); private: bool get_reply(u64 expected_id, std::vector& data);