mirror of
https://github.com/RPCS3/rpcs3.git
synced 2024-11-17 08:11:51 +00:00
Split normal sockets and p2p sockets handling v2
This commit is contained in:
parent
04d6ff274b
commit
b7a882f42b
@ -1250,7 +1250,7 @@ error_code sys_net_bnet_close(ppu_thread& ppu, s32 s)
|
||||
|
||||
{
|
||||
// Ensures the socket has no lingering copy from the network thread
|
||||
std::lock_guard nw_lock(g_fxo->get<network_context>().s_nw_mutex);
|
||||
std::lock_guard nw_lock(g_fxo->get<network_context>().mutex_thread_loop);
|
||||
sock.reset();
|
||||
}
|
||||
|
||||
@ -1279,7 +1279,7 @@ error_code sys_net_bnet_poll(ppu_thread& ppu, vm::ptr<sys_net_pollfd> fds, s32 n
|
||||
|
||||
lv2_obj::prepare_for_sleep(ppu);
|
||||
|
||||
std::unique_lock nw_lock(g_fxo->get<network_context>().s_nw_mutex);
|
||||
std::unique_lock nw_lock(g_fxo->get<network_context>().mutex_thread_loop);
|
||||
std::shared_lock lock(id_manager::g_mutex);
|
||||
|
||||
std::vector<::pollfd> _fds(nfds);
|
||||
@ -1375,7 +1375,7 @@ error_code sys_net_bnet_poll(ppu_thread& ppu, vm::ptr<sys_net_pollfd> fds, s32 n
|
||||
fds_buf[i].revents |= SYS_NET_POLLERR;
|
||||
|
||||
signaled++;
|
||||
g_fxo->get<network_context>().s_to_awake.emplace_back(&ppu);
|
||||
sock->queue_wake(&ppu);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -1412,7 +1412,7 @@ error_code sys_net_bnet_poll(ppu_thread& ppu, vm::ptr<sys_net_pollfd> fds, s32 n
|
||||
return {};
|
||||
}
|
||||
|
||||
std::lock_guard nw_lock(g_fxo->get<network_context>().s_nw_mutex);
|
||||
std::lock_guard nw_lock(g_fxo->get<network_context>().mutex_thread_loop);
|
||||
|
||||
if (signaled)
|
||||
{
|
||||
@ -1443,7 +1443,7 @@ error_code sys_net_bnet_select(ppu_thread& ppu, s32 nfds, vm::ptr<sys_net_fd_set
|
||||
{
|
||||
ppu.state += cpu_flag::wait;
|
||||
|
||||
sys_net.trace("sys_net_bnet_select(nfds=%d, readfds=*0x%x, writefds=*0x%x, exceptfds=*0x%x, timeout=*0x%x)", nfds, readfds, writefds, exceptfds, _timeout);
|
||||
sys_net.trace("sys_net_bnet_select(nfds=%d, readfds=*0x%x, writefds=*0x%x, exceptfds=*0x%x, timeout=*0x%x(%d:%d))", nfds, readfds, writefds, exceptfds, _timeout, _timeout ? _timeout->tv_sec.value() : 0, _timeout ? _timeout->tv_usec.value() : 0);
|
||||
|
||||
atomic_t<s32> signaled{0};
|
||||
|
||||
@ -1474,8 +1474,7 @@ error_code sys_net_bnet_select(ppu_thread& ppu, s32 nfds, vm::ptr<sys_net_fd_set
|
||||
if (exceptfds)
|
||||
_exceptfds = *exceptfds;
|
||||
|
||||
std::lock_guard nw_lock(g_fxo->get<network_context>().s_nw_mutex);
|
||||
|
||||
std::lock_guard nw_lock(g_fxo->get<network_context>().mutex_thread_loop);
|
||||
reader_lock lock(id_manager::g_mutex);
|
||||
|
||||
std::vector<::pollfd> _fds(nfds);
|
||||
@ -1607,7 +1606,7 @@ error_code sys_net_bnet_select(ppu_thread& ppu, s32 nfds, vm::ptr<sys_net_fd_set
|
||||
// rexcept.set(i);
|
||||
|
||||
signaled++;
|
||||
g_fxo->get<network_context>().s_to_awake.emplace_back(&ppu);
|
||||
sock->queue_wake(&ppu);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -1652,7 +1651,7 @@ error_code sys_net_bnet_select(ppu_thread& ppu, s32 nfds, vm::ptr<sys_net_fd_set
|
||||
return {};
|
||||
}
|
||||
|
||||
std::lock_guard nw_lock(g_fxo->get<network_context>().s_nw_mutex);
|
||||
std::lock_guard nw_lock(g_fxo->get<network_context>().mutex_thread_loop);
|
||||
|
||||
if (signaled)
|
||||
{
|
||||
@ -1749,6 +1748,14 @@ error_code lv2_socket::abort_socket(s32 flags)
|
||||
lv2_obj::append(ppu.get());
|
||||
}
|
||||
|
||||
const u32 num_waiters = qcopy.size();
|
||||
if (num_waiters && (type == SYS_NET_SOCK_STREAM || type == SYS_NET_SOCK_DGRAM))
|
||||
{
|
||||
auto& nc = g_fxo->get<network_context>();
|
||||
const u32 prev_value = nc.num_polls.fetch_sub(num_waiters);
|
||||
ensure(prev_value >= num_waiters);
|
||||
}
|
||||
|
||||
lv2_obj::awake_all();
|
||||
return CELL_OK;
|
||||
}
|
||||
@ -1772,7 +1779,7 @@ error_code sys_net_abort(ppu_thread& ppu, s32 type, u64 arg, s32 flags)
|
||||
{
|
||||
case _socket:
|
||||
{
|
||||
std::lock_guard nw_lock(g_fxo->get<network_context>().s_nw_mutex);
|
||||
std::lock_guard nw_lock(g_fxo->get<network_context>().mutex_thread_loop);
|
||||
|
||||
const auto sock = idm::get<lv2_socket>(static_cast<u32>(arg));
|
||||
|
||||
@ -1813,7 +1820,7 @@ error_code sys_net_abort(ppu_thread& ppu, s32 type, u64 arg, s32 flags)
|
||||
}
|
||||
|
||||
// Ensures the socket has no lingering copy from the network thread
|
||||
g_fxo->get<network_context>().s_nw_mutex.lock_unlock();
|
||||
g_fxo->get<network_context>().mutex_thread_loop.lock_unlock();
|
||||
|
||||
return not_an_error(::narrow<s32>(sockets.size()) - failed);
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
#include "stdafx.h"
|
||||
#include "lv2_socket.h"
|
||||
#include "network_context.h"
|
||||
|
||||
LOG_CHANNEL(sys_net);
|
||||
|
||||
@ -67,13 +68,24 @@ void lv2_socket::poll_queue(std::shared_ptr<ppu_thread> ppu, bs_t<lv2_socket::po
|
||||
{
|
||||
set_poll_event(event);
|
||||
queue.emplace_back(std::move(ppu), poll_cb);
|
||||
|
||||
// Makes sure network_context thread is awaken
|
||||
if (type == SYS_NET_SOCK_STREAM || type == SYS_NET_SOCK_DGRAM)
|
||||
{
|
||||
auto& nc = g_fxo->get<network_context>();
|
||||
const u32 prev_value = nc.num_polls.fetch_add(1);
|
||||
if (!prev_value)
|
||||
{
|
||||
nc.num_polls.notify_one();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
s32 lv2_socket::clear_queue(ppu_thread* ppu)
|
||||
u32 lv2_socket::clear_queue(ppu_thread* ppu)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
s32 cleared = 0;
|
||||
u32 cleared = 0;
|
||||
|
||||
for (auto it = queue.begin(); it != queue.end();)
|
||||
{
|
||||
@ -92,6 +104,13 @@ s32 lv2_socket::clear_queue(ppu_thread* ppu)
|
||||
events.store({});
|
||||
}
|
||||
|
||||
if (cleared && (type == SYS_NET_SOCK_STREAM || type == SYS_NET_SOCK_DGRAM))
|
||||
{
|
||||
// Makes sure network_context thread can go back to sleep if there is no active polling
|
||||
const u32 prev_value = g_fxo->get<network_context>().num_polls.fetch_sub(cleared);
|
||||
ensure(prev_value >= cleared);
|
||||
}
|
||||
|
||||
return cleared;
|
||||
}
|
||||
|
||||
@ -113,21 +132,46 @@ void lv2_socket::handle_events(const pollfd& native_pfd, [[maybe_unused]] bool u
|
||||
if (unset_connecting)
|
||||
set_connecting(false);
|
||||
#endif
|
||||
u32 handled = 0;
|
||||
|
||||
for (auto it = queue.begin(); it != queue.end();)
|
||||
{
|
||||
if (it->second(events_happening))
|
||||
{
|
||||
it = queue.erase(it);
|
||||
handled++;
|
||||
continue;
|
||||
}
|
||||
|
||||
it++;
|
||||
}
|
||||
|
||||
if (handled && (type == SYS_NET_SOCK_STREAM || type == SYS_NET_SOCK_DGRAM))
|
||||
{
|
||||
const u32 prev_value = g_fxo->get<network_context>().num_polls.fetch_sub(handled);
|
||||
ensure(prev_value >= handled);
|
||||
}
|
||||
|
||||
if (queue.empty())
|
||||
{
|
||||
events.store({});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void lv2_socket::queue_wake(ppu_thread* ppu)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case SYS_NET_SOCK_STREAM:
|
||||
case SYS_NET_SOCK_DGRAM:
|
||||
g_fxo->get<network_context>().ppu_to_awake.emplace_back(ppu);
|
||||
break;
|
||||
case SYS_NET_SOCK_DGRAM_P2P:
|
||||
case SYS_NET_SOCK_STREAM_P2P:
|
||||
g_fxo->get<p2p_context>().ppu_to_awake.emplace_back(ppu);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -70,8 +70,9 @@ public:
|
||||
bs_t<poll_t> get_events() const;
|
||||
void set_poll_event(bs_t<poll_t> event);
|
||||
void poll_queue(std::shared_ptr<ppu_thread> ppu, bs_t<poll_t> event, std::function<bool(bs_t<poll_t>)> poll_cb);
|
||||
s32 clear_queue(ppu_thread*);
|
||||
u32 clear_queue(ppu_thread*);
|
||||
void handle_events(const pollfd& native_fd, bool unset_connecting = false);
|
||||
void queue_wake(ppu_thread* ppu);
|
||||
|
||||
lv2_socket_family get_family() const;
|
||||
lv2_socket_type get_type() const;
|
||||
|
@ -121,16 +121,14 @@ s32 lv2_socket_p2p::bind(const sys_net_sockaddr& addr)
|
||||
|
||||
socket_type real_socket{};
|
||||
|
||||
auto& nc = g_fxo->get<network_context>();
|
||||
auto& nc = g_fxo->get<p2p_context>();
|
||||
{
|
||||
std::lock_guard list_lock(nc.list_p2p_ports_mutex);
|
||||
if (!nc.list_p2p_ports.contains(p2p_port))
|
||||
{
|
||||
nc.list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(p2p_port), std::forward_as_tuple(p2p_port));
|
||||
}
|
||||
|
||||
nc.create_p2p_port(p2p_port);
|
||||
auto& pport = ::at32(nc.list_p2p_ports, p2p_port);
|
||||
real_socket = pport.p2p_socket;
|
||||
|
||||
{
|
||||
std::lock_guard lock(pport.bound_p2p_vports_mutex);
|
||||
|
||||
@ -332,7 +330,7 @@ void lv2_socket_p2p::close()
|
||||
return;
|
||||
}
|
||||
|
||||
auto& nc = g_fxo->get<network_context>();
|
||||
auto& nc = g_fxo->get<p2p_context>();
|
||||
{
|
||||
std::lock_guard lock(nc.list_p2p_ports_mutex);
|
||||
ensure(nc.list_p2p_ports.contains(port));
|
||||
|
@ -445,7 +445,7 @@ bool lv2_socket_p2ps::handle_listening(p2ps_encapsulated_tcp* tcp_header, [[mayb
|
||||
const u64 key_connected = (reinterpret_cast<struct sockaddr_in*>(op_addr)->sin_addr.s_addr) | (static_cast<u64>(tcp_header->src_port) << 48) | (static_cast<u64>(tcp_header->dst_port) << 32);
|
||||
|
||||
{
|
||||
auto& nc = g_fxo->get<network_context>();
|
||||
auto& nc = g_fxo->get<p2p_context>();
|
||||
auto& pport = ::at32(nc.list_p2p_ports, port);
|
||||
pport.bound_p2p_streams.emplace(key_connected, new_sock_id);
|
||||
}
|
||||
@ -593,16 +593,14 @@ s32 lv2_socket_p2ps::bind(const sys_net_sockaddr& addr)
|
||||
|
||||
socket_type real_socket{};
|
||||
|
||||
auto& nc = g_fxo->get<network_context>();
|
||||
auto& nc = g_fxo->get<p2p_context>();
|
||||
{
|
||||
std::lock_guard list_lock(nc.list_p2p_ports_mutex);
|
||||
if (!nc.list_p2p_ports.contains(p2p_port))
|
||||
{
|
||||
nc.list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(p2p_port), std::forward_as_tuple(p2p_port));
|
||||
}
|
||||
|
||||
nc.create_p2p_port(p2p_port);
|
||||
auto& pport = ::at32(nc.list_p2p_ports, p2p_port);
|
||||
real_socket = pport.p2p_socket;
|
||||
|
||||
{
|
||||
// Ensures the socket & the bound list are updated at the same time to avoid races
|
||||
std::lock_guard vport_lock(pport.bound_p2p_vports_mutex);
|
||||
@ -673,6 +671,12 @@ std::optional<s32> lv2_socket_p2ps::connect(const sys_net_sockaddr& addr)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
if (status != p2ps_stream_status::stream_closed)
|
||||
{
|
||||
sys_net.error("[P2PS] Called connect on a socket that is not closed!");
|
||||
return -SYS_NET_EALREADY;
|
||||
}
|
||||
|
||||
p2ps_encapsulated_tcp send_hdr;
|
||||
const auto psa_in_p2p = reinterpret_cast<const sys_net_sockaddr_in_p2p*>(&addr);
|
||||
auto name = sys_net_addr_to_native_addr(addr);
|
||||
@ -683,14 +687,14 @@ std::optional<s32> lv2_socket_p2ps::connect(const sys_net_sockaddr& addr)
|
||||
|
||||
socket_type real_socket{};
|
||||
|
||||
auto& nc = g_fxo->get<network_context>();
|
||||
auto& nc = g_fxo->get<p2p_context>();
|
||||
{
|
||||
std::lock_guard list_lock(nc.list_p2p_ports_mutex);
|
||||
if (!nc.list_p2p_ports.contains(port))
|
||||
nc.list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(port), std::forward_as_tuple(port));
|
||||
|
||||
nc.create_p2p_port(port);
|
||||
auto& pport = ::at32(nc.list_p2p_ports, port);
|
||||
real_socket = pport.p2p_socket;
|
||||
|
||||
{
|
||||
std::lock_guard lock(pport.bound_p2p_vports_mutex);
|
||||
if (vport == 0)
|
||||
@ -752,6 +756,12 @@ std::optional<std::tuple<s32, std::vector<u8>, sys_net_sockaddr>> lv2_socket_p2p
|
||||
|
||||
if (!data_available)
|
||||
{
|
||||
if (status == p2ps_stream_status::stream_closed)
|
||||
{
|
||||
sys_net.error("[P2PS] Called recvfrom on closed socket!");
|
||||
return {{0, {}, {}}};
|
||||
}
|
||||
|
||||
if (so_nbio || (flags & SYS_NET_MSG_DONTWAIT))
|
||||
{
|
||||
return {{-SYS_NET_EWOULDBLOCK, {}, {}}};
|
||||
@ -808,6 +818,12 @@ std::optional<s32> lv2_socket_p2ps::sendto([[maybe_unused]] s32 flags, const std
|
||||
lock.lock();
|
||||
}
|
||||
|
||||
if (status == p2ps_stream_status::stream_closed)
|
||||
{
|
||||
sys_net.error("[P2PS] Called sendto on a closed socket!");
|
||||
return -SYS_NET_ECONNRESET;
|
||||
}
|
||||
|
||||
constexpr u32 max_data_len = (65535 - (VPORT_P2P_HEADER_SIZE + sizeof(p2ps_encapsulated_tcp)));
|
||||
|
||||
::sockaddr_in name{};
|
||||
@ -857,7 +873,7 @@ void lv2_socket_p2ps::close()
|
||||
return;
|
||||
}
|
||||
|
||||
auto& nc = g_fxo->get<network_context>();
|
||||
auto& nc = g_fxo->get<p2p_context>();
|
||||
{
|
||||
std::lock_guard lock(nc.list_p2p_ports_mutex);
|
||||
ensure(nc.list_p2p_ports.contains(port));
|
||||
|
@ -12,13 +12,13 @@ LOG_CHANNEL(sys_net);
|
||||
s32 send_packet_from_p2p_port(const std::vector<u8>& data, const sockaddr_in& addr)
|
||||
{
|
||||
s32 res{};
|
||||
auto& nc = g_fxo->get<network_context>();
|
||||
auto& nc = g_fxo->get<p2p_context>();
|
||||
{
|
||||
std::lock_guard list_lock(nc.list_p2p_ports_mutex);
|
||||
if (nc.list_p2p_ports.contains(SCE_NP_PORT))
|
||||
{
|
||||
auto& def_port = ::at32(nc.list_p2p_ports, SCE_NP_PORT);
|
||||
res = ::sendto(def_port.p2p_socket, reinterpret_cast<const char*>(data.data()), ::size32(data), 0, reinterpret_cast<const sockaddr*>(&addr), sizeof(sockaddr_in));
|
||||
res = ::sendto(def_port.p2p_socket, reinterpret_cast<const char*>(data.data()), ::size32(data), 0, reinterpret_cast<const sockaddr*>(&addr), sizeof(sockaddr_in));
|
||||
|
||||
if (res == -1)
|
||||
sys_net.error("Failed to send signaling packet: %s", get_last_error(false, false));
|
||||
@ -35,7 +35,7 @@ s32 send_packet_from_p2p_port(const std::vector<u8>& data, const sockaddr_in& ad
|
||||
std::vector<std::vector<u8>> get_rpcn_msgs()
|
||||
{
|
||||
std::vector<std::vector<u8>> msgs;
|
||||
auto& nc = g_fxo->get<network_context>();
|
||||
auto& nc = g_fxo->get<p2p_context>();
|
||||
{
|
||||
std::lock_guard list_lock(nc.list_p2p_ports_mutex);
|
||||
if (nc.list_p2p_ports.contains(SCE_NP_PORT))
|
||||
@ -59,7 +59,7 @@ std::vector<std::vector<u8>> get_rpcn_msgs()
|
||||
std::vector<signaling_message> get_sign_msgs()
|
||||
{
|
||||
std::vector<signaling_message> msgs;
|
||||
auto& nc = g_fxo->get<network_context>();
|
||||
auto& nc = g_fxo->get<p2p_context>();
|
||||
{
|
||||
std::lock_guard list_lock(nc.list_p2p_ports_mutex);
|
||||
if (nc.list_p2p_ports.contains(SCE_NP_PORT))
|
||||
@ -85,15 +85,31 @@ namespace np
|
||||
void init_np_handler_dependencies();
|
||||
}
|
||||
|
||||
network_thread::network_thread()
|
||||
void base_network_thread::wake_threads()
|
||||
{
|
||||
ppu_to_awake.erase(std::unique(ppu_to_awake.begin(), ppu_to_awake.end()), ppu_to_awake.end());
|
||||
for (ppu_thread* ppu : ppu_to_awake)
|
||||
{
|
||||
network_clear_queue(*ppu);
|
||||
lv2_obj::append(ppu);
|
||||
}
|
||||
|
||||
if (!ppu_to_awake.empty())
|
||||
{
|
||||
lv2_obj::awake_all();
|
||||
}
|
||||
ppu_to_awake.clear();
|
||||
}
|
||||
|
||||
p2p_thread::p2p_thread()
|
||||
{
|
||||
np::init_np_handler_dependencies();
|
||||
}
|
||||
|
||||
void network_thread::bind_sce_np_port()
|
||||
void p2p_thread::bind_sce_np_port()
|
||||
{
|
||||
std::lock_guard list_lock(list_p2p_ports_mutex);
|
||||
list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(SCE_NP_PORT), std::forward_as_tuple(SCE_NP_PORT));
|
||||
create_p2p_port(SCE_NP_PORT);
|
||||
}
|
||||
|
||||
void network_thread::operator()()
|
||||
@ -101,7 +117,7 @@ void network_thread::operator()()
|
||||
std::vector<std::shared_ptr<lv2_socket>> socklist;
|
||||
socklist.reserve(lv2_socket::id_count);
|
||||
|
||||
s_to_awake.clear();
|
||||
ppu_to_awake.clear();
|
||||
|
||||
std::vector<::pollfd> fds(lv2_socket::id_count);
|
||||
#ifdef _WIN32
|
||||
@ -109,10 +125,14 @@ void network_thread::operator()()
|
||||
std::vector<bool> was_connecting(lv2_socket::id_count);
|
||||
#endif
|
||||
|
||||
std::vector<::pollfd> p2p_fd(lv2_socket::id_count);
|
||||
|
||||
while (thread_ctrl::state() != thread_state::aborting)
|
||||
{
|
||||
if (!num_polls)
|
||||
{
|
||||
thread_ctrl::wait_on(num_polls, 0);
|
||||
continue;
|
||||
}
|
||||
|
||||
ensure(socklist.size() <= lv2_socket::id_count);
|
||||
|
||||
// Wait with 1ms timeout
|
||||
@ -122,47 +142,7 @@ void network_thread::operator()()
|
||||
::poll(fds.data(), socklist.size(), 1);
|
||||
#endif
|
||||
|
||||
// Check P2P sockets for incoming packets(timeout could probably be set at 0)
|
||||
{
|
||||
std::lock_guard lock(list_p2p_ports_mutex);
|
||||
std::memset(p2p_fd.data(), 0, p2p_fd.size() * sizeof(::pollfd));
|
||||
auto num_p2p_sockets = 0;
|
||||
for (const auto& p2p_port : list_p2p_ports)
|
||||
{
|
||||
p2p_fd[num_p2p_sockets].events = POLLIN;
|
||||
p2p_fd[num_p2p_sockets].revents = 0;
|
||||
p2p_fd[num_p2p_sockets].fd = p2p_port.second.p2p_socket;
|
||||
num_p2p_sockets++;
|
||||
}
|
||||
|
||||
if (num_p2p_sockets)
|
||||
{
|
||||
#ifdef _WIN32
|
||||
const auto ret_p2p = WSAPoll(p2p_fd.data(), num_p2p_sockets, 1);
|
||||
#else
|
||||
const auto ret_p2p = ::poll(p2p_fd.data(), num_p2p_sockets, 1);
|
||||
#endif
|
||||
if (ret_p2p > 0)
|
||||
{
|
||||
auto fd_index = 0;
|
||||
for (auto& p2p_port : list_p2p_ports)
|
||||
{
|
||||
if ((p2p_fd[fd_index].revents & POLLIN) == POLLIN || (p2p_fd[fd_index].revents & POLLRDNORM) == POLLRDNORM)
|
||||
{
|
||||
while (p2p_port.second.recv_data())
|
||||
;
|
||||
}
|
||||
fd_index++;
|
||||
}
|
||||
}
|
||||
else if (ret_p2p < 0)
|
||||
{
|
||||
sys_net.error("[P2P] Error poll on master P2P socket: %d", get_last_error(false));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::lock_guard lock(s_nw_mutex);
|
||||
std::lock_guard lock(mutex_thread_loop);
|
||||
|
||||
for (usz i = 0; i < socklist.size(); i++)
|
||||
{
|
||||
@ -173,20 +153,7 @@ void network_thread::operator()()
|
||||
#endif
|
||||
}
|
||||
|
||||
s_to_awake.erase(std::unique(s_to_awake.begin(), s_to_awake.end()), s_to_awake.end());
|
||||
|
||||
for (ppu_thread* ppu : s_to_awake)
|
||||
{
|
||||
network_clear_queue(*ppu);
|
||||
lv2_obj::append(ppu);
|
||||
}
|
||||
|
||||
if (!s_to_awake.empty())
|
||||
{
|
||||
lv2_obj::awake_all();
|
||||
}
|
||||
|
||||
s_to_awake.clear();
|
||||
wake_threads();
|
||||
socklist.clear();
|
||||
|
||||
// Obtain all native active sockets
|
||||
@ -216,3 +183,71 @@ void network_thread::operator()()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Must be used under list_p2p_ports_mutex lock!
|
||||
void p2p_thread::create_p2p_port(u16 p2p_port)
|
||||
{
|
||||
if (!list_p2p_ports.contains(p2p_port))
|
||||
{
|
||||
list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(p2p_port), std::forward_as_tuple(p2p_port));
|
||||
const u32 prev_value = num_p2p_ports.fetch_add(1);
|
||||
if (!prev_value)
|
||||
{
|
||||
num_p2p_ports.notify_one();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void p2p_thread::operator()()
|
||||
{
|
||||
std::vector<::pollfd> p2p_fd(lv2_socket::id_count);
|
||||
|
||||
while (thread_ctrl::state() != thread_state::aborting)
|
||||
{
|
||||
if (!num_p2p_ports)
|
||||
{
|
||||
thread_ctrl::wait_on(num_p2p_ports, 0);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check P2P sockets for incoming packets
|
||||
auto num_p2p_sockets = 0;
|
||||
{
|
||||
std::lock_guard lock(list_p2p_ports_mutex);
|
||||
std::memset(p2p_fd.data(), 0, p2p_fd.size() * sizeof(::pollfd));
|
||||
for (const auto& p2p_port : list_p2p_ports)
|
||||
{
|
||||
p2p_fd[num_p2p_sockets].events = POLLIN;
|
||||
p2p_fd[num_p2p_sockets].revents = 0;
|
||||
p2p_fd[num_p2p_sockets].fd = p2p_port.second.p2p_socket;
|
||||
num_p2p_sockets++;
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef _WIN32
|
||||
const auto ret_p2p = WSAPoll(p2p_fd.data(), num_p2p_sockets, 1);
|
||||
#else
|
||||
const auto ret_p2p = ::poll(p2p_fd.data(), num_p2p_sockets, 1);
|
||||
#endif
|
||||
if (ret_p2p > 0)
|
||||
{
|
||||
std::lock_guard lock(list_p2p_ports_mutex);
|
||||
auto fd_index = 0;
|
||||
for (auto& p2p_port : list_p2p_ports)
|
||||
{
|
||||
if ((p2p_fd[fd_index].revents & POLLIN) == POLLIN || (p2p_fd[fd_index].revents & POLLRDNORM) == POLLRDNORM)
|
||||
{
|
||||
while (p2p_port.second.recv_data())
|
||||
;
|
||||
}
|
||||
fd_index++;
|
||||
}
|
||||
|
||||
wake_threads();
|
||||
}
|
||||
else if (ret_p2p < 0)
|
||||
{
|
||||
sys_net.error("[P2P] Error poll on master P2P socket: %d", get_last_error(false));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -7,19 +7,38 @@
|
||||
|
||||
#include "nt_p2p_port.h"
|
||||
|
||||
struct network_thread
|
||||
struct base_network_thread
|
||||
{
|
||||
std::vector<ppu_thread*> s_to_awake;
|
||||
shared_mutex s_nw_mutex;
|
||||
std::vector<ppu_thread*> ppu_to_awake;
|
||||
|
||||
shared_mutex list_p2p_ports_mutex;
|
||||
std::map<u16, nt_p2p_port> list_p2p_ports{};
|
||||
void wake_threads();
|
||||
};
|
||||
|
||||
struct network_thread : base_network_thread
|
||||
{
|
||||
shared_mutex mutex_thread_loop;
|
||||
atomic_t<u32> num_polls = 0;
|
||||
|
||||
static constexpr auto thread_name = "Network Thread";
|
||||
|
||||
network_thread();
|
||||
void operator()();
|
||||
};
|
||||
|
||||
struct p2p_thread : base_network_thread
|
||||
{
|
||||
shared_mutex list_p2p_ports_mutex;
|
||||
std::map<u16, nt_p2p_port> list_p2p_ports;
|
||||
atomic_t<u32> num_p2p_ports = 0;
|
||||
|
||||
static constexpr auto thread_name = "Network P2P Thread";
|
||||
|
||||
p2p_thread();
|
||||
|
||||
void create_p2p_port(u16 p2p_port);
|
||||
|
||||
void bind_sce_np_port();
|
||||
void operator()();
|
||||
};
|
||||
|
||||
using network_context = named_thread<network_thread>;
|
||||
using p2p_context = named_thread<p2p_thread>;
|
||||
|
@ -175,9 +175,9 @@ bool is_ip_public_address(const ::sockaddr_in& addr)
|
||||
return true;
|
||||
}
|
||||
|
||||
s32 network_clear_queue(ppu_thread& ppu)
|
||||
u32 network_clear_queue(ppu_thread& ppu)
|
||||
{
|
||||
s32 cleared = 0;
|
||||
u32 cleared = 0;
|
||||
|
||||
idm::select<lv2_socket>([&](u32, lv2_socket& sock)
|
||||
{
|
||||
|
@ -23,7 +23,7 @@ sys_net_error get_last_error(bool is_blocking, bool is_connecting = false);
|
||||
sys_net_sockaddr native_addr_to_sys_net_addr(const ::sockaddr_storage& native_addr);
|
||||
::sockaddr_in sys_net_addr_to_native_addr(const sys_net_sockaddr& sn_addr);
|
||||
bool is_ip_public_address(const ::sockaddr_in& addr);
|
||||
s32 network_clear_queue(ppu_thread& ppu);
|
||||
u32 network_clear_queue(ppu_thread& ppu);
|
||||
|
||||
#ifdef _WIN32
|
||||
void windows_poll(std::vector<pollfd>& fds, unsigned long nfds, int timeout, std::vector<bool>& connecting);
|
||||
|
@ -408,11 +408,11 @@ namespace np
|
||||
|
||||
void np_handler::init_np_handler_dependencies()
|
||||
{
|
||||
if (is_psn_active && g_cfg.net.psn_status == np_psn_status::psn_rpcn && g_fxo->is_init<network_context>() && !m_inited_np_handler_dependencies)
|
||||
if (is_psn_active && g_cfg.net.psn_status == np_psn_status::psn_rpcn && g_fxo->is_init<p2p_context>() && !m_inited_np_handler_dependencies)
|
||||
{
|
||||
m_inited_np_handler_dependencies = true;
|
||||
|
||||
auto& nc = g_fxo->get<network_context>();
|
||||
auto& nc = g_fxo->get<p2p_context>();
|
||||
nc.bind_sce_np_port();
|
||||
|
||||
std::lock_guard lock(mutex_rpcn);
|
||||
@ -817,6 +817,16 @@ namespace np
|
||||
string_to_online_name(rpcn->get_online_name(), online_name);
|
||||
string_to_avatar_url(rpcn->get_avatar_url(), avatar_url);
|
||||
public_ip_addr = rpcn->get_addr_sig();
|
||||
|
||||
if (!public_ip_addr)
|
||||
{
|
||||
rsx::overlays::queue_message(rpcn::rpcn_state_to_localized_string_id(rpcn::rpcn_state::failure_other));
|
||||
rpcn_log.error("Failed to get a reply from RPCN signaling!");
|
||||
is_psn_active = false;
|
||||
rpcn->terminate_connection();
|
||||
return;
|
||||
}
|
||||
|
||||
local_ip_addr = std::bit_cast<u32, be_t<u32>>(rpcn->get_addr_local());
|
||||
|
||||
break;
|
||||
|
@ -2594,4 +2594,58 @@ namespace rpcn
|
||||
return it == friend_infos.friends.end() ? std::nullopt : std::optional(*it);
|
||||
}
|
||||
|
||||
bool rpcn_client::is_connected() const
|
||||
{
|
||||
return connected;
|
||||
}
|
||||
|
||||
bool rpcn_client::is_authentified() const
|
||||
{
|
||||
return authentified;
|
||||
}
|
||||
rpcn_state rpcn_client::get_rpcn_state() const
|
||||
{
|
||||
return state;
|
||||
}
|
||||
|
||||
const std::string& rpcn_client::get_online_name() const
|
||||
{
|
||||
return online_name;
|
||||
}
|
||||
|
||||
const std::string& rpcn_client::get_avatar_url() const
|
||||
{
|
||||
return avatar_url;
|
||||
}
|
||||
|
||||
u32 rpcn_client::get_addr_sig() const
|
||||
{
|
||||
if (!addr_sig)
|
||||
{
|
||||
addr_sig.wait(0, static_cast<atomic_wait_timeout>(10'000'000'000));
|
||||
}
|
||||
|
||||
return addr_sig.load();
|
||||
}
|
||||
|
||||
u16 rpcn_client::get_port_sig() const
|
||||
{
|
||||
if (!port_sig)
|
||||
{
|
||||
port_sig.wait(0, static_cast<atomic_wait_timeout>(10'000'000'000));
|
||||
}
|
||||
|
||||
return port_sig.load();
|
||||
}
|
||||
|
||||
u32 rpcn_client::get_addr_local() const
|
||||
{
|
||||
return local_addr_sig.load();
|
||||
}
|
||||
|
||||
void rpcn_client::update_local_addr(u32 addr)
|
||||
{
|
||||
local_addr_sig = std::bit_cast<u32, be_t<u32>>(addr);
|
||||
}
|
||||
|
||||
} // namespace rpcn
|
||||
|
@ -437,18 +437,9 @@ namespace rpcn
|
||||
void remove_message_cb(message_cb_func cb_func, void* cb_param);
|
||||
void mark_message_used(u64 id);
|
||||
|
||||
bool is_connected() const
|
||||
{
|
||||
return connected;
|
||||
}
|
||||
bool is_authentified() const
|
||||
{
|
||||
return authentified;
|
||||
}
|
||||
rpcn_state get_rpcn_state() const
|
||||
{
|
||||
return state;
|
||||
}
|
||||
bool is_connected() const;
|
||||
bool is_authentified() const;
|
||||
rpcn_state get_rpcn_state() const;
|
||||
|
||||
void server_infos_updated();
|
||||
|
||||
@ -495,44 +486,13 @@ namespace rpcn
|
||||
bool tus_delete_multislot_data(u32 req_id, SceNpCommunicationId& communication_id, const SceNpOnlineId& targetNpId, vm::cptr<SceNpTusSlotId> slotIdArray, s32 arrayNum, bool vuser);
|
||||
bool send_presence(const SceNpCommunicationId& pr_com_id, const std::string& pr_title, const std::string& pr_status, const std::string& pr_comment, const std::vector<u8>& pr_data);
|
||||
|
||||
const std::string& get_online_name() const
|
||||
{
|
||||
return online_name;
|
||||
}
|
||||
const std::string& get_avatar_url() const
|
||||
{
|
||||
return avatar_url;
|
||||
}
|
||||
const std::string& get_online_name() const;
|
||||
const std::string& get_avatar_url() const;
|
||||
|
||||
u32 get_addr_sig() const
|
||||
{
|
||||
if (!addr_sig)
|
||||
{
|
||||
addr_sig.wait(0, static_cast<atomic_wait_timeout>(10'000'000'000));
|
||||
}
|
||||
|
||||
return addr_sig.load();
|
||||
}
|
||||
|
||||
u16 get_port_sig() const
|
||||
{
|
||||
if (!port_sig)
|
||||
{
|
||||
port_sig.wait(0, static_cast<atomic_wait_timeout>(10'000'000'000));
|
||||
}
|
||||
|
||||
return port_sig.load();
|
||||
}
|
||||
|
||||
u32 get_addr_local() const
|
||||
{
|
||||
return local_addr_sig.load();
|
||||
}
|
||||
|
||||
void update_local_addr(u32 addr)
|
||||
{
|
||||
local_addr_sig = std::bit_cast<u32, be_t<u32>>(addr);
|
||||
}
|
||||
u32 get_addr_sig() const;
|
||||
u16 get_port_sig() const;
|
||||
u32 get_addr_local() const;
|
||||
void update_local_addr(u32 addr);
|
||||
|
||||
private:
|
||||
bool get_reply(u64 expected_id, std::vector<u8>& data);
|
||||
|
Loading…
Reference in New Issue
Block a user