From a50683d6ca4ec9cf420846c6216bccf4116cfa27 Mon Sep 17 00:00:00 2001 From: RipleyTom Date: Mon, 13 May 2024 04:35:08 +0200 Subject: [PATCH] sys_net improvements (#15584) Also remove redundant ensures --- rpcs3/Emu/Audio/AudioBackend.cpp | 2 - rpcs3/Emu/Cell/Modules/cellAudioOut.cpp | 1 - rpcs3/Emu/Cell/Modules/cellVoice.cpp | 3 +- rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2p.cpp | 1 - .../Emu/Cell/lv2/sys_net/lv2_socket_p2ps.cpp | 135 ++++++++++++++---- rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.h | 10 +- .../Emu/Cell/lv2/sys_net/network_context.cpp | 2 +- rpcs3/Emu/Cell/lv2/sys_net/nt_p2p_port.cpp | 40 +++++- rpcs3/Emu/Cell/lv2/sys_net/nt_p2p_port.h | 4 + rpcs3/Emu/NP/signaling_handler.cpp | 5 +- rpcs3/util/media_utils.cpp | 1 - 11 files changed, 161 insertions(+), 43 deletions(-) diff --git a/rpcs3/Emu/Audio/AudioBackend.cpp b/rpcs3/Emu/Audio/AudioBackend.cpp index 83c0046717..73c017eef9 100644 --- a/rpcs3/Emu/Audio/AudioBackend.cpp +++ b/rpcs3/Emu/Audio/AudioBackend.cpp @@ -115,7 +115,6 @@ std::pair AudioBackend::get_channel_count_and_ { audio_out_configuration& audio_out_cfg = g_fxo->get(); std::lock_guard lock(audio_out_cfg.mtx); - ensure(device_index < audio_out_cfg.out.size()); const audio_out_configuration::audio_out& out = ::at32(audio_out_cfg.out, device_index); return out.get_channel_count_and_downmixer(); } @@ -124,7 +123,6 @@ AudioChannelCnt AudioBackend::get_max_channel_count(u32 device_index) { audio_out_configuration& audio_out_cfg = g_fxo->get(); std::lock_guard lock(audio_out_cfg.mtx); - ensure(device_index < audio_out_cfg.out.size()); const audio_out_configuration::audio_out& out = ::at32(audio_out_cfg.out, device_index); AudioChannelCnt count = AudioChannelCnt::STEREO; diff --git a/rpcs3/Emu/Cell/Modules/cellAudioOut.cpp b/rpcs3/Emu/Cell/Modules/cellAudioOut.cpp index ff7e630022..28888fcd0d 100644 --- a/rpcs3/Emu/Cell/Modules/cellAudioOut.cpp +++ b/rpcs3/Emu/Cell/Modules/cellAudioOut.cpp @@ -516,7 +516,6 @@ error_code cellAudioOutGetDeviceInfo(u32 audioOut, u32 deviceIndex, vm::ptrget(); std::lock_guard lock(cfg.mtx); - ensure(audioOut < cfg.out.size()); const audio_out_configuration::audio_out& out = ::at32(cfg.out, audioOut); ensure(out.sound_modes.size() <= 16); diff --git a/rpcs3/Emu/Cell/Modules/cellVoice.cpp b/rpcs3/Emu/Cell/Modules/cellVoice.cpp index e577a3aa42..7c507a26a3 100644 --- a/rpcs3/Emu/Cell/Modules/cellVoice.cpp +++ b/rpcs3/Emu/Cell/Modules/cellVoice.cpp @@ -852,7 +852,8 @@ error_code cellVoiceWriteToIPortEx2(u32 ips, vm::cptr data, vm::ptr s error_code cellVoiceReadFromOPort(u32 ops, vm::ptr data, vm::ptr size) { - cellVoice.todo("cellVoiceReadFromOPort(ops=%d, data=*0x%x, size=*0x%x)", ops, data, size); + // Spammy TODO + cellVoice.trace("cellVoiceReadFromOPort(ops=%d, data=*0x%x, size=*0x%x)", ops, data, size); auto& manager = g_fxo->get(); diff --git a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2p.cpp b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2p.cpp index e10d30e1a3..ecda6ded37 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2p.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2p.cpp @@ -333,7 +333,6 @@ void lv2_socket_p2p::close() auto& nc = g_fxo->get(); { std::lock_guard lock(nc.list_p2p_ports_mutex); - ensure(nc.list_p2p_ports.contains(port)); auto& p2p_port = ::at32(nc.list_p2p_ports, port); { std::lock_guard lock(p2p_port.bound_p2p_vports_mutex); diff --git a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.cpp b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.cpp index 9d8b8df618..e692dea8dd 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.cpp @@ -68,6 +68,23 @@ public: } } + void clear_all_messages(s32 sock_id) + { + std::lock_guard lock(data_mutex); + + for (auto it = msgs.begin(); it != msgs.end();) + { + auto& msg = it->second; + + if (msg.sock_id == sock_id) + { + it = msgs.erase(it); + continue; + } + it++; + } + } + void operator()() { atomic_wait_timeout timeout = atomic_wait_timeout::inf; @@ -114,10 +131,10 @@ public: // Too many retries, need to notify the socket that the connection is dead idm::check(msg.sock_id, [&](lv2_socket& sock) { - sys_net.error("[P2PS] Too many retries, closing the socket"); + sys_net.error("[P2PS] Too many retries, closing the stream"); ensure(sock.get_type() == SYS_NET_SOCK_STREAM_P2P); auto& sock_p2ps = reinterpret_cast(sock); - sock_p2ps.set_status(p2ps_stream_status::stream_closed); + sock_p2ps.close_stream(); }); it = msgs.erase(it); continue; @@ -129,11 +146,17 @@ public: ensure(sock.get_type() == SYS_NET_SOCK_STREAM_P2P); auto& sock_p2ps = reinterpret_cast(sock); - if (::sendto(sock_p2ps.get_socket(), reinterpret_cast(msg.data.data()), ::size32(msg.data), 0, reinterpret_cast(&msg.dst_addr), sizeof(msg.dst_addr)) == -1) + while (::sendto(sock_p2ps.get_socket(), reinterpret_cast(msg.data.data()), ::size32(msg.data), 0, reinterpret_cast(&msg.dst_addr), sizeof(msg.dst_addr)) == -1) { - sys_net.error("[P2PS] Resending the packet failed(%s), closing the socket", get_last_error(false)); + const sys_net_error err = get_last_error(false); + // concurrency on the socket(from a sendto for example) can result in EAGAIN error in which case we try again + if (err == SYS_NET_EAGAIN) + { + continue; + } - sock_p2ps.set_status(p2ps_stream_status::stream_closed); + sys_net.error("[P2PS] Resending the packet failed(%s), closing the stream", err); + sock_p2ps.close_stream(); return false; } return true; @@ -279,7 +302,7 @@ void lv2_socket_p2ps::save(utils::serial& ar) ar(status, max_backlog, backlog, op_port, op_vport, op_addr, data_beg_seq, received_data, cur_seq); } -bool lv2_socket_p2ps::handle_connected(p2ps_encapsulated_tcp* tcp_header, u8* data, ::sockaddr_storage* op_addr) +bool lv2_socket_p2ps::handle_connected(p2ps_encapsulated_tcp* tcp_header, u8* data, ::sockaddr_storage* op_addr, nt_p2p_port* p2p_port) { std::lock_guard lock(mutex); @@ -289,9 +312,7 @@ bool lv2_socket_p2ps::handle_connected(p2ps_encapsulated_tcp* tcp_header, u8* da return false; } - nt_p2p_port::dump_packet(tcp_header); - - if (tcp_header->flags == static_cast(p2ps_tcp_flags::ACK)) + if (tcp_header->flags & static_cast(p2ps_tcp_flags::ACK)) { auto& tcpm = g_fxo->get>(); tcpm.confirm_data_received(lv2_id, tcp_header->ack); @@ -346,25 +367,32 @@ bool lv2_socket_p2ps::handle_connected(p2ps_encapsulated_tcp* tcp_header, u8* da status = p2ps_stream_status::stream_connected; send_ack(); } + else + { + sys_net.error("[P2PS] Unexpected U2S TCP flag received with handshaking state: 0x%02X", tcp_header->flags); + } return true; } else if (status == p2ps_stream_status::stream_connected) { - if (tcp_header->seq < data_beg_seq) - { - // Data has already been processed - sys_net.trace("[P2PS] Data has already been processed"); - if (tcp_header->flags != p2ps_tcp_flags::ACK && tcp_header->flags != p2ps_tcp_flags::RST) - send_ack(); - return true; - } - switch (tcp_header->flags) { - case p2ps_tcp_flags::PSH: case 0: + case p2ps_tcp_flags::PSH: + case p2ps_tcp_flags::ACK: + case p2ps_tcp_flags::SYN: + case p2ps_tcp_flags::SYN | p2ps_tcp_flags::ACK: { + if (tcp_header->seq < data_beg_seq) + { + // Data has already been processed + sys_net.trace("[P2PS] Data has already been processed"); + if (tcp_header->flags != p2ps_tcp_flags::ACK) + send_ack(); + return true; + } + if (!received_data.count(tcp_header->seq)) { // New data @@ -381,13 +409,13 @@ bool lv2_socket_p2ps::handle_connected(p2ps_encapsulated_tcp* tcp_header, u8* da case p2ps_tcp_flags::RST: case p2ps_tcp_flags::FIN: { - sys_net.error("[P2PS] Received RST/FIN packet(%d), closing the socket", tcp_header->flags); - status = p2ps_stream_status::stream_closed; + sys_net.error("[P2PS] Received RST/FIN packet(%d), closing the stream", tcp_header->flags); + close_stream_nl(p2p_port); return false; } default: { - sys_net.error("[P2PS] Unknown U2S TCP flag received"); + sys_net.error("[P2PS] Unexpected U2S TCP flag received with connected state: 0x%02X", tcp_header->flags); return true; } } @@ -453,7 +481,7 @@ bool lv2_socket_p2ps::handle_listening(p2ps_encapsulated_tcp* tcp_header, [[mayb auto packet = generate_u2s_packet(send_hdr, nullptr, 0); { std::lock_guard lock(sock_lv2->mutex); - send_u2s_packet(std::move(packet), reinterpret_cast<::sockaddr_in*>(op_addr), send_hdr.seq, true); + sock_lv2->send_u2s_packet(std::move(packet), reinterpret_cast<::sockaddr_in*>(op_addr), send_hdr.seq, true); } backlog.push_back(new_sock_id); @@ -476,6 +504,10 @@ bool lv2_socket_p2ps::handle_listening(p2ps_encapsulated_tcp* tcp_header, [[mayb } } } + else + { + sys_net.error("[P2PS] Unexpected U2S TCP flag received on listening socket: 0x%02X", tcp_header->flags); + } // Ignore other packets? @@ -487,10 +519,16 @@ void lv2_socket_p2ps::send_u2s_packet(std::vector data, const ::sockaddr_in* char ip_str[16]; inet_ntop(AF_INET, &dst->sin_addr, ip_str, sizeof(ip_str)); sys_net.trace("[P2PS] Sending U2S packet on socket %d(id:%d): data(%d, seq %d, require_ack %d) to %s:%d", socket, lv2_id, data.size(), seq, require_ack, ip_str, std::bit_cast>(dst->sin_port)); - if (::sendto(socket, reinterpret_cast(data.data()), ::size32(data), 0, reinterpret_cast(dst), sizeof(sockaddr_in)) == -1) + while (::sendto(socket, reinterpret_cast(data.data()), ::size32(data), 0, reinterpret_cast(dst), sizeof(sockaddr_in)) == -1) { - sys_net.error("[P2PS] Attempting to send a u2s packet failed(%s), closing socket!", get_last_error(false)); - status = p2ps_stream_status::stream_closed; + const sys_net_error err = get_last_error(false); + // concurrency on the socket can result in EAGAIN error in which case we try again + if (err == SYS_NET_EAGAIN) + { + continue; + } + + sys_net.error("[P2PS] Attempting to send a u2s packet failed(%s)!", err); return; } @@ -502,6 +540,35 @@ void lv2_socket_p2ps::send_u2s_packet(std::vector data, const ::sockaddr_in* } } +void lv2_socket_p2ps::close_stream_nl(nt_p2p_port* p2p_port) +{ + status = p2ps_stream_status::stream_closed; + + for (auto it = p2p_port->bound_p2p_streams.begin(); it != p2p_port->bound_p2p_streams.end();) + { + if (it->second == lv2_id) + { + it = p2p_port->bound_p2p_streams.erase(it); + continue; + } + it++; + } + + auto& tcpm = g_fxo->get>(); + tcpm.clear_all_messages(lv2_id); +} + +void lv2_socket_p2ps::close_stream() +{ + auto& nc = g_fxo->get(); + + std::lock_guard lock(nc.list_p2p_ports_mutex); + auto& p2p_port = ::at32(nc.list_p2p_ports, port); + + std::scoped_lock more_lock(p2p_port.bound_p2p_vports_mutex, mutex); + close_stream_nl(&p2p_port); +} + p2ps_stream_status lv2_socket_p2ps::get_status() const { return status; @@ -608,11 +675,14 @@ s32 lv2_socket_p2ps::bind(const sys_net_sockaddr& addr) if (p2p_vport == 0) { - p2p_vport = 30000; + sys_net.warning("[P2PS] vport was unassigned in bind!"); + p2p_vport = pport.get_port(); + while (pport.bound_p2ps_vports.contains(p2p_vport)) { - p2p_vport++; + p2p_vport = pport.get_port(); } + std::set bound_ports{lv2_id}; pport.bound_p2ps_vports.insert(std::make_pair(p2p_vport, std::move(bound_ports))); } @@ -701,10 +771,11 @@ std::optional lv2_socket_p2ps::connect(const sys_net_sockaddr& addr) { // Unassigned vport, assigns one sys_net.warning("[P2PS] vport was unassigned before connect!"); - vport = 30000; + vport = pport.get_port(); + while (pport.bound_p2p_vports.count(vport) || pport.bound_p2p_streams.count(static_cast(vport) << 32)) { - vport++; + vport = pport.get_port(); } } const u64 key = name.sin_addr.s_addr | (static_cast(vport) << 32) | (static_cast(dst_vport) << 48); @@ -876,7 +947,6 @@ void lv2_socket_p2ps::close() auto& nc = g_fxo->get(); { std::lock_guard lock(nc.list_p2p_ports_mutex); - ensure(nc.list_p2p_ports.contains(port)); auto& p2p_port = ::at32(nc.list_p2p_ports, port); { std::lock_guard lock(p2p_port.bound_p2p_vports_mutex); @@ -902,6 +972,9 @@ void lv2_socket_p2ps::close() } } } + + auto& tcpm = g_fxo->get>(); + tcpm.clear_all_messages(lv2_id); } s32 lv2_socket_p2ps::shutdown([[maybe_unused]] s32 how) diff --git a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.h b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.h index 05129df9a8..6d1333eb58 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.h +++ b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.h @@ -16,6 +16,8 @@ #include "lv2_socket_p2p.h" +struct nt_p2p_port; + constexpr be_t P2PS_U2S_SIG = (static_cast('U') << 24 | static_cast('2') << 16 | static_cast('S') << 8 | static_cast('0')); struct p2ps_encapsulated_tcp @@ -63,9 +65,10 @@ public: p2ps_stream_status get_status() const; void set_status(p2ps_stream_status new_status); - bool handle_connected(p2ps_encapsulated_tcp* tcp_header, u8* data, ::sockaddr_storage* op_addr); + bool handle_connected(p2ps_encapsulated_tcp* tcp_header, u8* data, ::sockaddr_storage* op_addr, nt_p2p_port* p2p_port); bool handle_listening(p2ps_encapsulated_tcp* tcp_header, u8* data, ::sockaddr_storage* op_addr); void send_u2s_packet(std::vector data, const ::sockaddr_in* dst, u64 seq, bool require_ack); + void close_stream(); std::tuple, sys_net_sockaddr> accept(bool is_lock = true) override; s32 bind(const sys_net_sockaddr& addr) override; @@ -87,7 +90,10 @@ public: s32 poll(sys_net_pollfd& sn_pfd, pollfd& native_pfd) override; std::tuple select(bs_t selected, pollfd& native_pfd) override; -protected: +private: + void close_stream_nl(nt_p2p_port* p2p_port); + +private: static constexpr usz MAX_RECEIVED_BUFFER = (1024 * 1024 * 10); p2ps_stream_status status = p2ps_stream_status::stream_closed; diff --git a/rpcs3/Emu/Cell/lv2/sys_net/network_context.cpp b/rpcs3/Emu/Cell/lv2/sys_net/network_context.cpp index e8f4b5197a..c4c79680b3 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/network_context.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_net/network_context.cpp @@ -212,9 +212,9 @@ void p2p_thread::operator()() // Check P2P sockets for incoming packets auto num_p2p_sockets = 0; + std::memset(p2p_fd.data(), 0, p2p_fd.size() * sizeof(::pollfd)); { std::lock_guard lock(list_p2p_ports_mutex); - std::memset(p2p_fd.data(), 0, p2p_fd.size() * sizeof(::pollfd)); for (const auto& p2p_port : list_p2p_ports) { p2p_fd[num_p2p_sockets].events = POLLIN; diff --git a/rpcs3/Emu/Cell/lv2/sys_net/nt_p2p_port.cpp b/rpcs3/Emu/Cell/lv2/sys_net/nt_p2p_port.cpp index e216435c7b..78970bee2c 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/nt_p2p_port.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_net/nt_p2p_port.cpp @@ -97,6 +97,17 @@ void nt_p2p_port::dump_packet(p2ps_encapsulated_tcp* tcph) sys_net.trace("PACKET DUMP:\nsrc_port: %d\ndst_port: %d\nflags: %d\nseq: %d\nack: %d\nlen: %d", tcph->src_port, tcph->dst_port, tcph->flags, tcph->seq, tcph->ack, tcph->length); } +// Must be used under bound_p2p_vports_mutex lock +u16 nt_p2p_port::get_port() +{ + if (binding_port == 0) + { + binding_port = 30000; + } + + return binding_port++; +} + bool nt_p2p_port::handle_connected(s32 sock_id, p2ps_encapsulated_tcp* tcp_header, u8* data, ::sockaddr_storage* op_addr) { const auto sock = idm::check(sock_id, [&](lv2_socket& sock) -> bool @@ -104,7 +115,7 @@ bool nt_p2p_port::handle_connected(s32 sock_id, p2ps_encapsulated_tcp* tcp_heade ensure(sock.get_type() == SYS_NET_SOCK_STREAM_P2P); auto& sock_p2ps = reinterpret_cast(sock); - return sock_p2ps.handle_connected(tcp_header, data, op_addr); + return sock_p2ps.handle_connected(tcp_header, data, op_addr, this); }); if (!sock) @@ -287,7 +298,10 @@ bool nt_p2p_port::recv_data() return true; } - // The packet is valid, check if it's bound + // The packet is valid + dump_packet(tcp_header); + + // Check if it's bound const u64 key_connected = (reinterpret_cast(&native_addr)->sin_addr.s_addr) | (static_cast(tcp_header->src_port) << 48) | (static_cast(tcp_header->dst_port) << 32); { @@ -311,6 +325,28 @@ bool nt_p2p_port::recv_data() } return true; } + + if (tcp_header->flags == p2ps_tcp_flags::RST) + { + sys_net.trace("[P2PS] Received RST on unbound P2PS"); + return true; + } + + // The P2PS packet was sent to an unbound vport, send a RST packet + p2ps_encapsulated_tcp send_hdr; + send_hdr.src_port = tcp_header->dst_port; + send_hdr.dst_port = tcp_header->src_port; + send_hdr.flags = p2ps_tcp_flags::RST; + auto packet = generate_u2s_packet(send_hdr, nullptr, 0); + + if (::sendto(p2p_socket, reinterpret_cast(packet.data()), ::size32(packet), 0, reinterpret_cast(&native_addr), sizeof(sockaddr_in)) == -1) + { + sys_net.error("[P2PS] Error sending RST to sender to unbound P2PS: %s", get_last_error(false)); + return true; + } + + sys_net.trace("[P2PS] Sent RST to sender to unbound P2PS"); + return true; } } diff --git a/rpcs3/Emu/Cell/lv2/sys_net/nt_p2p_port.h b/rpcs3/Emu/Cell/lv2/sys_net/nt_p2p_port.h index 9d80b63337..2d57bc8461 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/nt_p2p_port.h +++ b/rpcs3/Emu/Cell/lv2/sys_net/nt_p2p_port.h @@ -56,6 +56,8 @@ struct nt_p2p_port // List of active(either from a connect or an accept) P2PS sockets (key, sock_id) // key is ( (src_vport) << 48 | (dst_vport) << 32 | addr ) with src_vport and addr being 0 for listening sockets std::map bound_p2p_streams{}; + // Current free port index + u16 binding_port = 30000; // Queued messages from RPCN shared_mutex s_rpcn_mutex; @@ -71,6 +73,8 @@ struct nt_p2p_port static void dump_packet(p2ps_encapsulated_tcp* tcph); + u16 get_port(); + bool handle_connected(s32 sock_id, p2ps_encapsulated_tcp* tcp_header, u8* data, ::sockaddr_storage* op_addr); bool handle_listening(s32 sock_id, p2ps_encapsulated_tcp* tcp_header, u8* data, ::sockaddr_storage* op_addr); bool recv_data(); diff --git a/rpcs3/Emu/NP/signaling_handler.cpp b/rpcs3/Emu/NP/signaling_handler.cpp index 9381e9505f..d463b15676 100644 --- a/rpcs3/Emu/NP/signaling_handler.cpp +++ b/rpcs3/Emu/NP/signaling_handler.cpp @@ -371,6 +371,7 @@ void signaling_handler::process_incoming_messages() schedule_repeat = false; sent_packet.command = signal_finished_ack; update_ext_si_status(si, false); + update_si_status(si, SCE_NP_SIGNALING_CONN_STATUS_INACTIVE, SCE_NP_SIGNALING_ERROR_TERMINATED_BY_PEER); break; case signal_finished_ack: reply = false; @@ -593,6 +594,9 @@ void signaling_handler::update_si_status(std::shared_ptr& si, s3 void signaling_handler::update_ext_si_status(std::shared_ptr& si, bool op_activated) { + if (!si) + return; + if (op_activated && !si->op_activated) { si->op_activated = true; @@ -679,7 +683,6 @@ void signaling_handler::start_sig(u32 conn_id, u32 addr, u16 port) sent_packet.command = signal_connect; sent_packet.timestamp_sender = get_micro_timestamp(steady_clock::now()); - ensure(sig_peers.contains(conn_id)); std::shared_ptr si = ::at32(sig_peers, conn_id); const auto now = steady_clock::now(); diff --git a/rpcs3/util/media_utils.cpp b/rpcs3/util/media_utils.cpp index a7f6810b1e..73369a03a6 100644 --- a/rpcs3/util/media_utils.cpp +++ b/rpcs3/util/media_utils.cpp @@ -690,7 +690,6 @@ namespace utils while (thread_ctrl::state() != thread_state::aborting) { - ensure(m_context.current_track < m_context.playlist.size()); media_log.notice("audio_decoder: about to decode: %s (index=%d)", ::at32(m_context.playlist, m_context.current_track), m_context.current_track); decode_track(::at32(m_context.playlist, m_context.current_track));