sys_net improvements (#15584)

Also remove redundant ensures
This commit is contained in:
RipleyTom 2024-05-13 04:35:08 +02:00 committed by GitHub
parent fc92aef4d1
commit a50683d6ca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 161 additions and 43 deletions

View File

@ -115,7 +115,6 @@ std::pair<AudioChannelCnt, AudioChannelCnt> AudioBackend::get_channel_count_and_
{
audio_out_configuration& audio_out_cfg = g_fxo->get<audio_out_configuration>();
std::lock_guard lock(audio_out_cfg.mtx);
ensure(device_index < audio_out_cfg.out.size());
const audio_out_configuration::audio_out& out = ::at32(audio_out_cfg.out, device_index);
return out.get_channel_count_and_downmixer();
}
@ -124,7 +123,6 @@ AudioChannelCnt AudioBackend::get_max_channel_count(u32 device_index)
{
audio_out_configuration& audio_out_cfg = g_fxo->get<audio_out_configuration>();
std::lock_guard lock(audio_out_cfg.mtx);
ensure(device_index < audio_out_cfg.out.size());
const audio_out_configuration::audio_out& out = ::at32(audio_out_cfg.out, device_index);
AudioChannelCnt count = AudioChannelCnt::STEREO;

View File

@ -516,7 +516,6 @@ error_code cellAudioOutGetDeviceInfo(u32 audioOut, u32 deviceIndex, vm::ptr<Cell
audio_out_configuration& cfg = g_fxo->get<audio_out_configuration>();
std::lock_guard lock(cfg.mtx);
ensure(audioOut < cfg.out.size());
const audio_out_configuration::audio_out& out = ::at32(cfg.out, audioOut);
ensure(out.sound_modes.size() <= 16);

View File

@ -852,7 +852,8 @@ error_code cellVoiceWriteToIPortEx2(u32 ips, vm::cptr<void> data, vm::ptr<u32> s
error_code cellVoiceReadFromOPort(u32 ops, vm::ptr<void> data, vm::ptr<u32> size)
{
cellVoice.todo("cellVoiceReadFromOPort(ops=%d, data=*0x%x, size=*0x%x)", ops, data, size);
// Spammy TODO
cellVoice.trace("cellVoiceReadFromOPort(ops=%d, data=*0x%x, size=*0x%x)", ops, data, size);
auto& manager = g_fxo->get<voice_manager>();

View File

@ -333,7 +333,6 @@ void lv2_socket_p2p::close()
auto& nc = g_fxo->get<p2p_context>();
{
std::lock_guard lock(nc.list_p2p_ports_mutex);
ensure(nc.list_p2p_ports.contains(port));
auto& p2p_port = ::at32(nc.list_p2p_ports, port);
{
std::lock_guard lock(p2p_port.bound_p2p_vports_mutex);

View File

@ -68,6 +68,23 @@ public:
}
}
void clear_all_messages(s32 sock_id)
{
std::lock_guard lock(data_mutex);
for (auto it = msgs.begin(); it != msgs.end();)
{
auto& msg = it->second;
if (msg.sock_id == sock_id)
{
it = msgs.erase(it);
continue;
}
it++;
}
}
void operator()()
{
atomic_wait_timeout timeout = atomic_wait_timeout::inf;
@ -114,10 +131,10 @@ public:
// Too many retries, need to notify the socket that the connection is dead
idm::check<lv2_socket>(msg.sock_id, [&](lv2_socket& sock)
{
sys_net.error("[P2PS] Too many retries, closing the socket");
sys_net.error("[P2PS] Too many retries, closing the stream");
ensure(sock.get_type() == SYS_NET_SOCK_STREAM_P2P);
auto& sock_p2ps = reinterpret_cast<lv2_socket_p2ps&>(sock);
sock_p2ps.set_status(p2ps_stream_status::stream_closed);
sock_p2ps.close_stream();
});
it = msgs.erase(it);
continue;
@ -129,11 +146,17 @@ public:
ensure(sock.get_type() == SYS_NET_SOCK_STREAM_P2P);
auto& sock_p2ps = reinterpret_cast<lv2_socket_p2ps&>(sock);
if (::sendto(sock_p2ps.get_socket(), reinterpret_cast<const char*>(msg.data.data()), ::size32(msg.data), 0, reinterpret_cast<const sockaddr*>(&msg.dst_addr), sizeof(msg.dst_addr)) == -1)
while (::sendto(sock_p2ps.get_socket(), reinterpret_cast<const char*>(msg.data.data()), ::size32(msg.data), 0, reinterpret_cast<const sockaddr*>(&msg.dst_addr), sizeof(msg.dst_addr)) == -1)
{
sys_net.error("[P2PS] Resending the packet failed(%s), closing the socket", get_last_error(false));
const sys_net_error err = get_last_error(false);
// concurrency on the socket(from a sendto for example) can result in EAGAIN error in which case we try again
if (err == SYS_NET_EAGAIN)
{
continue;
}
sock_p2ps.set_status(p2ps_stream_status::stream_closed);
sys_net.error("[P2PS] Resending the packet failed(%s), closing the stream", err);
sock_p2ps.close_stream();
return false;
}
return true;
@ -279,7 +302,7 @@ void lv2_socket_p2ps::save(utils::serial& ar)
ar(status, max_backlog, backlog, op_port, op_vport, op_addr, data_beg_seq, received_data, cur_seq);
}
bool lv2_socket_p2ps::handle_connected(p2ps_encapsulated_tcp* tcp_header, u8* data, ::sockaddr_storage* op_addr)
bool lv2_socket_p2ps::handle_connected(p2ps_encapsulated_tcp* tcp_header, u8* data, ::sockaddr_storage* op_addr, nt_p2p_port* p2p_port)
{
std::lock_guard lock(mutex);
@ -289,9 +312,7 @@ bool lv2_socket_p2ps::handle_connected(p2ps_encapsulated_tcp* tcp_header, u8* da
return false;
}
nt_p2p_port::dump_packet(tcp_header);
if (tcp_header->flags == static_cast<u8>(p2ps_tcp_flags::ACK))
if (tcp_header->flags & static_cast<u8>(p2ps_tcp_flags::ACK))
{
auto& tcpm = g_fxo->get<named_thread<tcp_timeout_monitor>>();
tcpm.confirm_data_received(lv2_id, tcp_header->ack);
@ -346,25 +367,32 @@ bool lv2_socket_p2ps::handle_connected(p2ps_encapsulated_tcp* tcp_header, u8* da
status = p2ps_stream_status::stream_connected;
send_ack();
}
else
{
sys_net.error("[P2PS] Unexpected U2S TCP flag received with handshaking state: 0x%02X", tcp_header->flags);
}
return true;
}
else if (status == p2ps_stream_status::stream_connected)
{
if (tcp_header->seq < data_beg_seq)
{
// Data has already been processed
sys_net.trace("[P2PS] Data has already been processed");
if (tcp_header->flags != p2ps_tcp_flags::ACK && tcp_header->flags != p2ps_tcp_flags::RST)
send_ack();
return true;
}
switch (tcp_header->flags)
{
case p2ps_tcp_flags::PSH:
case 0:
case p2ps_tcp_flags::PSH:
case p2ps_tcp_flags::ACK:
case p2ps_tcp_flags::SYN:
case p2ps_tcp_flags::SYN | p2ps_tcp_flags::ACK:
{
if (tcp_header->seq < data_beg_seq)
{
// Data has already been processed
sys_net.trace("[P2PS] Data has already been processed");
if (tcp_header->flags != p2ps_tcp_flags::ACK)
send_ack();
return true;
}
if (!received_data.count(tcp_header->seq))
{
// New data
@ -381,13 +409,13 @@ bool lv2_socket_p2ps::handle_connected(p2ps_encapsulated_tcp* tcp_header, u8* da
case p2ps_tcp_flags::RST:
case p2ps_tcp_flags::FIN:
{
sys_net.error("[P2PS] Received RST/FIN packet(%d), closing the socket", tcp_header->flags);
status = p2ps_stream_status::stream_closed;
sys_net.error("[P2PS] Received RST/FIN packet(%d), closing the stream", tcp_header->flags);
close_stream_nl(p2p_port);
return false;
}
default:
{
sys_net.error("[P2PS] Unknown U2S TCP flag received");
sys_net.error("[P2PS] Unexpected U2S TCP flag received with connected state: 0x%02X", tcp_header->flags);
return true;
}
}
@ -453,7 +481,7 @@ bool lv2_socket_p2ps::handle_listening(p2ps_encapsulated_tcp* tcp_header, [[mayb
auto packet = generate_u2s_packet(send_hdr, nullptr, 0);
{
std::lock_guard lock(sock_lv2->mutex);
send_u2s_packet(std::move(packet), reinterpret_cast<::sockaddr_in*>(op_addr), send_hdr.seq, true);
sock_lv2->send_u2s_packet(std::move(packet), reinterpret_cast<::sockaddr_in*>(op_addr), send_hdr.seq, true);
}
backlog.push_back(new_sock_id);
@ -476,6 +504,10 @@ bool lv2_socket_p2ps::handle_listening(p2ps_encapsulated_tcp* tcp_header, [[mayb
}
}
}
else
{
sys_net.error("[P2PS] Unexpected U2S TCP flag received on listening socket: 0x%02X", tcp_header->flags);
}
// Ignore other packets?
@ -487,10 +519,16 @@ void lv2_socket_p2ps::send_u2s_packet(std::vector<u8> data, const ::sockaddr_in*
char ip_str[16];
inet_ntop(AF_INET, &dst->sin_addr, ip_str, sizeof(ip_str));
sys_net.trace("[P2PS] Sending U2S packet on socket %d(id:%d): data(%d, seq %d, require_ack %d) to %s:%d", socket, lv2_id, data.size(), seq, require_ack, ip_str, std::bit_cast<u16, be_t<u16>>(dst->sin_port));
if (::sendto(socket, reinterpret_cast<char*>(data.data()), ::size32(data), 0, reinterpret_cast<const sockaddr*>(dst), sizeof(sockaddr_in)) == -1)
while (::sendto(socket, reinterpret_cast<char*>(data.data()), ::size32(data), 0, reinterpret_cast<const sockaddr*>(dst), sizeof(sockaddr_in)) == -1)
{
sys_net.error("[P2PS] Attempting to send a u2s packet failed(%s), closing socket!", get_last_error(false));
status = p2ps_stream_status::stream_closed;
const sys_net_error err = get_last_error(false);
// concurrency on the socket can result in EAGAIN error in which case we try again
if (err == SYS_NET_EAGAIN)
{
continue;
}
sys_net.error("[P2PS] Attempting to send a u2s packet failed(%s)!", err);
return;
}
@ -502,6 +540,35 @@ void lv2_socket_p2ps::send_u2s_packet(std::vector<u8> data, const ::sockaddr_in*
}
}
void lv2_socket_p2ps::close_stream_nl(nt_p2p_port* p2p_port)
{
status = p2ps_stream_status::stream_closed;
for (auto it = p2p_port->bound_p2p_streams.begin(); it != p2p_port->bound_p2p_streams.end();)
{
if (it->second == lv2_id)
{
it = p2p_port->bound_p2p_streams.erase(it);
continue;
}
it++;
}
auto& tcpm = g_fxo->get<named_thread<tcp_timeout_monitor>>();
tcpm.clear_all_messages(lv2_id);
}
void lv2_socket_p2ps::close_stream()
{
auto& nc = g_fxo->get<p2p_context>();
std::lock_guard lock(nc.list_p2p_ports_mutex);
auto& p2p_port = ::at32(nc.list_p2p_ports, port);
std::scoped_lock more_lock(p2p_port.bound_p2p_vports_mutex, mutex);
close_stream_nl(&p2p_port);
}
p2ps_stream_status lv2_socket_p2ps::get_status() const
{
return status;
@ -608,11 +675,14 @@ s32 lv2_socket_p2ps::bind(const sys_net_sockaddr& addr)
if (p2p_vport == 0)
{
p2p_vport = 30000;
sys_net.warning("[P2PS] vport was unassigned in bind!");
p2p_vport = pport.get_port();
while (pport.bound_p2ps_vports.contains(p2p_vport))
{
p2p_vport++;
p2p_vport = pport.get_port();
}
std::set<s32> bound_ports{lv2_id};
pport.bound_p2ps_vports.insert(std::make_pair(p2p_vport, std::move(bound_ports)));
}
@ -701,10 +771,11 @@ std::optional<s32> lv2_socket_p2ps::connect(const sys_net_sockaddr& addr)
{
// Unassigned vport, assigns one
sys_net.warning("[P2PS] vport was unassigned before connect!");
vport = 30000;
vport = pport.get_port();
while (pport.bound_p2p_vports.count(vport) || pport.bound_p2p_streams.count(static_cast<u64>(vport) << 32))
{
vport++;
vport = pport.get_port();
}
}
const u64 key = name.sin_addr.s_addr | (static_cast<u64>(vport) << 32) | (static_cast<u64>(dst_vport) << 48);
@ -876,7 +947,6 @@ void lv2_socket_p2ps::close()
auto& nc = g_fxo->get<p2p_context>();
{
std::lock_guard lock(nc.list_p2p_ports_mutex);
ensure(nc.list_p2p_ports.contains(port));
auto& p2p_port = ::at32(nc.list_p2p_ports, port);
{
std::lock_guard lock(p2p_port.bound_p2p_vports_mutex);
@ -902,6 +972,9 @@ void lv2_socket_p2ps::close()
}
}
}
auto& tcpm = g_fxo->get<named_thread<tcp_timeout_monitor>>();
tcpm.clear_all_messages(lv2_id);
}
s32 lv2_socket_p2ps::shutdown([[maybe_unused]] s32 how)

View File

@ -16,6 +16,8 @@
#include "lv2_socket_p2p.h"
struct nt_p2p_port;
constexpr be_t<u32> P2PS_U2S_SIG = (static_cast<u32>('U') << 24 | static_cast<u32>('2') << 16 | static_cast<u32>('S') << 8 | static_cast<u32>('0'));
struct p2ps_encapsulated_tcp
@ -63,9 +65,10 @@ public:
p2ps_stream_status get_status() const;
void set_status(p2ps_stream_status new_status);
bool handle_connected(p2ps_encapsulated_tcp* tcp_header, u8* data, ::sockaddr_storage* op_addr);
bool handle_connected(p2ps_encapsulated_tcp* tcp_header, u8* data, ::sockaddr_storage* op_addr, nt_p2p_port* p2p_port);
bool handle_listening(p2ps_encapsulated_tcp* tcp_header, u8* data, ::sockaddr_storage* op_addr);
void send_u2s_packet(std::vector<u8> data, const ::sockaddr_in* dst, u64 seq, bool require_ack);
void close_stream();
std::tuple<bool, s32, std::shared_ptr<lv2_socket>, sys_net_sockaddr> accept(bool is_lock = true) override;
s32 bind(const sys_net_sockaddr& addr) override;
@ -87,7 +90,10 @@ public:
s32 poll(sys_net_pollfd& sn_pfd, pollfd& native_pfd) override;
std::tuple<bool, bool, bool> select(bs_t<poll_t> selected, pollfd& native_pfd) override;
protected:
private:
void close_stream_nl(nt_p2p_port* p2p_port);
private:
static constexpr usz MAX_RECEIVED_BUFFER = (1024 * 1024 * 10);
p2ps_stream_status status = p2ps_stream_status::stream_closed;

View File

@ -212,9 +212,9 @@ void p2p_thread::operator()()
// Check P2P sockets for incoming packets
auto num_p2p_sockets = 0;
std::memset(p2p_fd.data(), 0, p2p_fd.size() * sizeof(::pollfd));
{
std::lock_guard lock(list_p2p_ports_mutex);
std::memset(p2p_fd.data(), 0, p2p_fd.size() * sizeof(::pollfd));
for (const auto& p2p_port : list_p2p_ports)
{
p2p_fd[num_p2p_sockets].events = POLLIN;

View File

@ -97,6 +97,17 @@ void nt_p2p_port::dump_packet(p2ps_encapsulated_tcp* tcph)
sys_net.trace("PACKET DUMP:\nsrc_port: %d\ndst_port: %d\nflags: %d\nseq: %d\nack: %d\nlen: %d", tcph->src_port, tcph->dst_port, tcph->flags, tcph->seq, tcph->ack, tcph->length);
}
// Must be used under bound_p2p_vports_mutex lock
u16 nt_p2p_port::get_port()
{
if (binding_port == 0)
{
binding_port = 30000;
}
return binding_port++;
}
bool nt_p2p_port::handle_connected(s32 sock_id, p2ps_encapsulated_tcp* tcp_header, u8* data, ::sockaddr_storage* op_addr)
{
const auto sock = idm::check<lv2_socket>(sock_id, [&](lv2_socket& sock) -> bool
@ -104,7 +115,7 @@ bool nt_p2p_port::handle_connected(s32 sock_id, p2ps_encapsulated_tcp* tcp_heade
ensure(sock.get_type() == SYS_NET_SOCK_STREAM_P2P);
auto& sock_p2ps = reinterpret_cast<lv2_socket_p2ps&>(sock);
return sock_p2ps.handle_connected(tcp_header, data, op_addr);
return sock_p2ps.handle_connected(tcp_header, data, op_addr, this);
});
if (!sock)
@ -287,7 +298,10 @@ bool nt_p2p_port::recv_data()
return true;
}
// The packet is valid, check if it's bound
// The packet is valid
dump_packet(tcp_header);
// Check if it's bound
const u64 key_connected = (reinterpret_cast<struct sockaddr_in*>(&native_addr)->sin_addr.s_addr) | (static_cast<u64>(tcp_header->src_port) << 48) | (static_cast<u64>(tcp_header->dst_port) << 32);
{
@ -311,6 +325,28 @@ bool nt_p2p_port::recv_data()
}
return true;
}
if (tcp_header->flags == p2ps_tcp_flags::RST)
{
sys_net.trace("[P2PS] Received RST on unbound P2PS");
return true;
}
// The P2PS packet was sent to an unbound vport, send a RST packet
p2ps_encapsulated_tcp send_hdr;
send_hdr.src_port = tcp_header->dst_port;
send_hdr.dst_port = tcp_header->src_port;
send_hdr.flags = p2ps_tcp_flags::RST;
auto packet = generate_u2s_packet(send_hdr, nullptr, 0);
if (::sendto(p2p_socket, reinterpret_cast<char*>(packet.data()), ::size32(packet), 0, reinterpret_cast<const sockaddr*>(&native_addr), sizeof(sockaddr_in)) == -1)
{
sys_net.error("[P2PS] Error sending RST to sender to unbound P2PS: %s", get_last_error(false));
return true;
}
sys_net.trace("[P2PS] Sent RST to sender to unbound P2PS");
return true;
}
}

View File

@ -56,6 +56,8 @@ struct nt_p2p_port
// List of active(either from a connect or an accept) P2PS sockets (key, sock_id)
// key is ( (src_vport) << 48 | (dst_vport) << 32 | addr ) with src_vport and addr being 0 for listening sockets
std::map<u64, s32> bound_p2p_streams{};
// Current free port index
u16 binding_port = 30000;
// Queued messages from RPCN
shared_mutex s_rpcn_mutex;
@ -71,6 +73,8 @@ struct nt_p2p_port
static void dump_packet(p2ps_encapsulated_tcp* tcph);
u16 get_port();
bool handle_connected(s32 sock_id, p2ps_encapsulated_tcp* tcp_header, u8* data, ::sockaddr_storage* op_addr);
bool handle_listening(s32 sock_id, p2ps_encapsulated_tcp* tcp_header, u8* data, ::sockaddr_storage* op_addr);
bool recv_data();

View File

@ -371,6 +371,7 @@ void signaling_handler::process_incoming_messages()
schedule_repeat = false;
sent_packet.command = signal_finished_ack;
update_ext_si_status(si, false);
update_si_status(si, SCE_NP_SIGNALING_CONN_STATUS_INACTIVE, SCE_NP_SIGNALING_ERROR_TERMINATED_BY_PEER);
break;
case signal_finished_ack:
reply = false;
@ -593,6 +594,9 @@ void signaling_handler::update_si_status(std::shared_ptr<signaling_info>& si, s3
void signaling_handler::update_ext_si_status(std::shared_ptr<signaling_info>& si, bool op_activated)
{
if (!si)
return;
if (op_activated && !si->op_activated)
{
si->op_activated = true;
@ -679,7 +683,6 @@ void signaling_handler::start_sig(u32 conn_id, u32 addr, u16 port)
sent_packet.command = signal_connect;
sent_packet.timestamp_sender = get_micro_timestamp(steady_clock::now());
ensure(sig_peers.contains(conn_id));
std::shared_ptr<signaling_info> si = ::at32(sig_peers, conn_id);
const auto now = steady_clock::now();

View File

@ -690,7 +690,6 @@ namespace utils
while (thread_ctrl::state() != thread_state::aborting)
{
ensure(m_context.current_track < m_context.playlist.size());
media_log.notice("audio_decoder: about to decode: %s (index=%d)", ::at32(m_context.playlist, m_context.current_track), m_context.current_track);
decode_track(::at32(m_context.playlist, m_context.current_track));