diff --git a/Utilities/Thread.h b/Utilities/Thread.h index e724d35c0b..8da60897bb 100644 --- a/Utilities/Thread.h +++ b/Utilities/Thread.h @@ -40,6 +40,8 @@ enum class thread_state : u32 finished // Final state, always set at the end of thread execution }; +class need_wakeup {}; + template class named_thread; @@ -414,6 +416,11 @@ public: { thread::notify_abort(); } + + if constexpr (std::is_base_of_v) + { + this->wake_up(); + } } return *this; diff --git a/rpcs3/Emu/CMakeLists.txt b/rpcs3/Emu/CMakeLists.txt index 023cd854f8..694e059e7f 100644 --- a/rpcs3/Emu/CMakeLists.txt +++ b/rpcs3/Emu/CMakeLists.txt @@ -364,6 +364,7 @@ target_sources(rpcs3_emu PRIVATE target_sources(rpcs3_emu PRIVATE NP/fb_helpers.cpp NP/np_handler.cpp + NP/signaling_handler.cpp NP/np_structs_extra.cpp NP/rpcn_client.cpp NP/rpcn_config.cpp diff --git a/rpcs3/Emu/Cell/Modules/cellNetCtl.cpp b/rpcs3/Emu/Cell/Modules/cellNetCtl.cpp index e3274c7738..b6216bcaec 100644 --- a/rpcs3/Emu/Cell/Modules/cellNetCtl.cpp +++ b/rpcs3/Emu/Cell/Modules/cellNetCtl.cpp @@ -203,25 +203,21 @@ error_code cellNetCtlGetInfo(s32 code, vm::ptr info) return CELL_NET_CTL_ERROR_NOT_CONNECTED; } - if (code == CELL_NET_CTL_INFO_MTU) + switch (code) { - info->mtu = 1500; - } - else if (code == CELL_NET_CTL_INFO_LINK) - { - info->link = CELL_NET_CTL_LINK_CONNECTED; - } - else if (code == CELL_NET_CTL_INFO_IP_ADDRESS) - { - strcpy_trunc(info->ip_address, np_handler::ip_to_string(nph->get_local_ip_addr())); - } - else if (code == CELL_NET_CTL_INFO_NETMASK) - { - strcpy_trunc(info->netmask, "255.255.255.255"); - } - else if (code == CELL_NET_CTL_INFO_HTTP_PROXY_CONFIG) - { - info->http_proxy_config = 0; + case CELL_NET_CTL_INFO_DEVICE: info->device = CELL_NET_CTL_DEVICE_WIRED; break; + // case CELL_NET_CTL_INFO_ETHER_ADDR: std::memset(info->ether_addr.data, 0xFF, sizeof(info->ether_addr.data)); break; + case CELL_NET_CTL_INFO_MTU: info->mtu = 1500; break; + case CELL_NET_CTL_INFO_LINK: info->link = CELL_NET_CTL_LINK_CONNECTED; break; + case CELL_NET_CTL_INFO_LINK_TYPE: info->link_type = CELL_NET_CTL_LINK_TYPE_10BASE_FULL; break; + case CELL_NET_CTL_INFO_IP_CONFIG: info->ip_config = CELL_NET_CTL_IP_STATIC; break; + case CELL_NET_CTL_INFO_DEFAULT_ROUTE: strcpy_trunc(info->default_route, "192.168.1.1"); break; + case CELL_NET_CTL_INFO_PRIMARY_DNS: strcpy_trunc(info->primary_dns, "8.8.8.8"); break; + case CELL_NET_CTL_INFO_SECONDARY_DNS: strcpy_trunc(info->secondary_dns, "8.8.8.8"); break; + case CELL_NET_CTL_INFO_IP_ADDRESS: strcpy_trunc(info->ip_address, np_handler::ip_to_string(nph->get_local_ip_addr())); break; + case CELL_NET_CTL_INFO_NETMASK: strcpy_trunc(info->netmask, "255.255.255.255"); break; + case CELL_NET_CTL_INFO_HTTP_PROXY_CONFIG: info->http_proxy_config = 0; break; + default: cellNetCtl.error("Unsupported request: %s", InfoCodeToName(code)); break; } return CELL_OK; diff --git a/rpcs3/Emu/Cell/Modules/cellVoice.cpp b/rpcs3/Emu/Cell/Modules/cellVoice.cpp index 61717b34c3..4bee87023b 100644 --- a/rpcs3/Emu/Cell/Modules/cellVoice.cpp +++ b/rpcs3/Emu/Cell/Modules/cellVoice.cpp @@ -858,6 +858,9 @@ error_code cellVoiceReadFromOPort(u32 ops, vm::ptr data, vm::ptr size if (!oport || oport->info.portType <= CELLVOICE_PORTTYPE_IN_VOICE) return CELL_VOICE_ERROR_TOPOLOGY; + + if (size) + *size = 0; return CELL_OK; } diff --git a/rpcs3/Emu/Cell/Modules/sceNp.cpp b/rpcs3/Emu/Cell/Modules/sceNp.cpp index a518d2f4f8..0997a9f4f9 100644 --- a/rpcs3/Emu/Cell/Modules/sceNp.cpp +++ b/rpcs3/Emu/Cell/Modules/sceNp.cpp @@ -22,6 +22,7 @@ void fmt_class_string::format(std::string& out, u64 arg) { switch (error) { + STR_CASE(GAME_ERR_NOT_XMBBUY_CONTENT); STR_CASE(SCE_NP_ERROR_NOT_INITIALIZED); STR_CASE(SCE_NP_ERROR_ALREADY_INITIALIZED); STR_CASE(SCE_NP_ERROR_INVALID_ARGUMENT); @@ -335,6 +336,20 @@ void fmt_class_string::format(std::string& out, u64 arg) STR_CASE(SCE_NP_CORE_SERVER_ERROR_RESOURCE_CONSTRAINT); STR_CASE(SCE_NP_CORE_SERVER_ERROR_SYSTEM_SHUTDOWN); STR_CASE(SCE_NP_CORE_SERVER_ERROR_UNSUPPORTED_CLIENT_VERSION); + STR_CASE(SCE_NP_DRM_INSTALL_ERROR_FORMAT); + STR_CASE(SCE_NP_DRM_INSTALL_ERROR_CHECK); + STR_CASE(SCE_NP_DRM_INSTALL_ERROR_UNSUPPORTED); + STR_CASE(SCE_NP_DRM_SERVER_ERROR_SERVICE_IS_END); + STR_CASE(SCE_NP_DRM_SERVER_ERROR_SERVICE_STOP_TEMPORARILY); + STR_CASE(SCE_NP_DRM_SERVER_ERROR_SERVICE_IS_BUSY); + STR_CASE(SCE_NP_DRM_SERVER_ERROR_INVALID_USER_CREDENTIAL); + STR_CASE(SCE_NP_DRM_SERVER_ERROR_INVALID_PRODUCT_ID); + STR_CASE(SCE_NP_DRM_SERVER_ERROR_ACCOUNT_IS_CLOSED); + STR_CASE(SCE_NP_DRM_SERVER_ERROR_ACCOUNT_IS_SUSPENDED); + STR_CASE(SCE_NP_DRM_SERVER_ERROR_ACTIVATED_CONSOLE_IS_FULL); + STR_CASE(SCE_NP_DRM_SERVER_ERROR_CONSOLE_NOT_ACTIVATED); + STR_CASE(SCE_NP_DRM_SERVER_ERROR_PRIMARY_CONSOLE_CANNOT_CHANGED); + STR_CASE(SCE_NP_DRM_SERVER_ERROR_UNKNOWN); STR_CASE(SCE_NP_SIGNALING_ERROR_NOT_INITIALIZED); STR_CASE(SCE_NP_SIGNALING_ERROR_ALREADY_INITIALIZED); STR_CASE(SCE_NP_SIGNALING_ERROR_OUT_OF_MEMORY); @@ -625,7 +640,7 @@ error_code sceNpDrmProcessExitSpawn2(ppu_thread& ppu, vm::cptr klicensee, vm error_code sceNpBasicRegisterHandler(vm::cptr context, vm::ptr handler, vm::ptr arg) { - sceNp.todo("sceNpBasicRegisterHandler(context=*0x%x, handler=*0x%x, arg=*0x%x)", context, handler, arg); + sceNp.warning("sceNpBasicRegisterHandler(context=*0x%x, handler=*0x%x, arg=*0x%x)", context, handler, arg); const auto nph = g_fxo->get>(); @@ -2501,9 +2516,9 @@ error_code sceNpManagerGetNetworkTime(vm::ptr pTick) return SCE_NP_ERROR_INVALID_STATE; } - // FIXME: Get the network time auto now = std::chrono::system_clock::now(); - pTick->tick = std::chrono::duration_cast(now.time_since_epoch()).count(); + // That's assuming epoch is unix epoch which is not actually standardized, god I hate you C++ std + pTick->tick = std::chrono::duration_cast(now.time_since_epoch()).count() + (62135596800 * 1000 * 1000); return CELL_OK; } @@ -2811,7 +2826,7 @@ error_code sceNpManagerGetPsHandle() error_code sceNpManagerRequestTicket(vm::cptr npId, vm::cptr serviceId, vm::cptr cookie, u32 cookieSize, vm::cptr entitlementId, u32 consumedCount) { - sceNp.todo("sceNpManagerRequestTicket(npId=*0x%x, serviceId=%s, cookie=*0x%x, cookieSize=%d, entitlementId=%s, consumedCount=%d)", npId, serviceId, cookie, cookieSize, entitlementId, consumedCount); + sceNp.error("sceNpManagerRequestTicket(npId=*0x%x, serviceId=%s, cookie=*0x%x, cookieSize=%d, entitlementId=%s, consumedCount=%d)", npId, serviceId, cookie, cookieSize, entitlementId, consumedCount); const auto nph = g_fxo->get>(); @@ -2820,7 +2835,7 @@ error_code sceNpManagerRequestTicket(vm::cptr npId, vm::cptr serv return SCE_NP_ERROR_NOT_INITIALIZED; } - if (!serviceId || !cookie || cookieSize > SCE_NP_COOKIE_MAX_SIZE || !entitlementId) + if (!npId || !serviceId || cookieSize > SCE_NP_COOKIE_MAX_SIZE) { return SCE_NP_AUTH_EINVALID_ARGUMENT; } @@ -2835,6 +2850,8 @@ error_code sceNpManagerRequestTicket(vm::cptr npId, vm::cptr serv return SCE_NP_ERROR_INVALID_STATE; } + nph->req_ticket(0x00020001, npId.get_ptr(), serviceId.get_ptr(), reinterpret_cast(cookie.get_ptr()), cookieSize, entitlementId.get_ptr(), consumedCount); + return CELL_OK; } @@ -2851,7 +2868,7 @@ error_code sceNpManagerRequestTicket2(vm::cptr npId, vm::cptr SCE_NP_COOKIE_MAX_SIZE || !entitlementId) + if (!npId || !serviceId || cookieSize > SCE_NP_COOKIE_MAX_SIZE) { return SCE_NP_AUTH_EINVALID_ARGUMENT; } @@ -2866,12 +2883,14 @@ error_code sceNpManagerRequestTicket2(vm::cptr npId, vm::cptrreq_ticket(0x00020001, npId.get_ptr(), serviceId.get_ptr(), reinterpret_cast(cookie.get_ptr()), cookieSize, entitlementId.get_ptr(), consumedCount); + return CELL_OK; } error_code sceNpManagerGetTicket(vm::ptr buffer, vm::ptr bufferSize) { - sceNp.todo("sceNpManagerGetTicket(buffer=*0x%x, bufferSize=*0x%x)", buffer, bufferSize); + sceNp.error("sceNpManagerGetTicket(buffer=*0x%x, bufferSize=*0x%x)", buffer, bufferSize); const auto nph = g_fxo->get>(); @@ -2885,6 +2904,21 @@ error_code sceNpManagerGetTicket(vm::ptr buffer, vm::ptr bufferSize) return SCE_NP_ERROR_INVALID_ARGUMENT; } + const auto& ticket = nph->get_ticket(); + + if (!buffer) + { + *bufferSize = static_cast(ticket.size()); + return CELL_OK; + } + + if (*bufferSize < ticket.size()) + { + return SCE_NP_ERROR_INVALID_ARGUMENT; + } + + memcpy(buffer.get_ptr(), ticket.data(), ticket.size()); + return CELL_OK; } @@ -3268,7 +3302,7 @@ error_code sceNpScoreCreateTransactionCtx(s32 titleCtxId) return SCE_NP_COMMUNITY_ERROR_INVALID_ONLINE_ID; } - return CELL_OK; + return not_an_error(nph->create_score_transaction_context(titleCtxId)); } error_code sceNpScoreDestroyTransactionCtx(s32 transId) @@ -4346,6 +4380,9 @@ error_code sceNpSignalingCreateCtx(vm::ptr npId, vm::ptrcreate_signaling_context(npId, handler, arg); + const auto sigh = g_fxo->get>(); + sigh->set_sig_cb(*ctx_id, handler, arg); + return CELL_OK; } @@ -4370,7 +4407,7 @@ error_code sceNpSignalingDestroyCtx(u32 ctx_id) error_code sceNpSignalingAddExtendedHandler(u32 ctx_id, vm::ptr handler, vm::ptr arg) { - sceNp.todo("sceNpSignalingAddExtendedHandler(ctx_id=%d, handler=*0x%x, arg=*0x%x)", ctx_id, handler, arg); + sceNp.warning("sceNpSignalingAddExtendedHandler(ctx_id=%d, handler=*0x%x, arg=*0x%x)", ctx_id, handler, arg); const auto nph = g_fxo->get>(); @@ -4379,6 +4416,9 @@ error_code sceNpSignalingAddExtendedHandler(u32 ctx_id, vm::ptrget>(); + sigh->set_ext_sig_cb(ctx_id, handler, arg); + return CELL_OK; } @@ -4420,9 +4460,9 @@ error_code sceNpSignalingGetCtxOpt(u32 ctx_id, s32 optname, vm::ptr optval) return CELL_OK; } -error_code sceNpSignalingActivateConnection(u32 ctx_id, vm::ptr npId, u32 conn_id) +error_code sceNpSignalingActivateConnection(u32 ctx_id, vm::ptr npId, vm::ptr conn_id) { - sceNp.todo("sceNpSignalingActivateConnection(ctx_id=%d, npId=*0x%x, conn_id=%d)", ctx_id, npId, conn_id); + sceNp.warning("sceNpSignalingActivateConnection(ctx_id=%d, npId=*0x%x, conn_id=%d)", ctx_id, npId, conn_id); const auto nph = g_fxo->get>(); @@ -4436,6 +4476,12 @@ error_code sceNpSignalingActivateConnection(u32 ctx_id, vm::ptr npId, u return SCE_NP_SIGNALING_ERROR_INVALID_ARGUMENT; } + if (strncmp(nph->get_npid().handle.data, npId->handle.data, 16) == 0) + return SCE_NP_SIGNALING_ERROR_OWN_NP_ID; + + const auto sigh = g_fxo->get>(); + *conn_id = sigh->init_sig_infos(npId.get_ptr()); + return CELL_OK; } @@ -4469,7 +4515,7 @@ error_code sceNpSignalingTerminateConnection(u32 ctx_id, u32 conn_id) error_code sceNpSignalingGetConnectionStatus(u32 ctx_id, u32 conn_id, vm::ptr conn_status, vm::ptr peer_addr, vm::ptr peer_port) { - sceNp.todo("sceNpSignalingGetConnectionStatus(ctx_id=%d, conn_id=%d, conn_status=*0x%x, peer_addr=*0x%x, peer_port=*0x%x)", ctx_id, conn_id, conn_status, peer_addr, peer_port); + sceNp.warning("sceNpSignalingGetConnectionStatus(ctx_id=%d, conn_id=%d, conn_status=*0x%x, peer_addr=*0x%x, peer_port=*0x%x)", ctx_id, conn_id, conn_status, peer_addr, peer_port); const auto nph = g_fxo->get>(); @@ -4483,6 +4529,15 @@ error_code sceNpSignalingGetConnectionStatus(u32 ctx_id, u32 conn_id, vm::ptrget>(); + const auto si = sigh->get_sig_infos(conn_id); + + *conn_status = si.connStatus; + if (peer_addr) + (*peer_addr).np_s_addr = si.addr; // infos.addr is already BE + if (peer_port) + *peer_port = si.port; + return CELL_OK; } @@ -4545,7 +4600,7 @@ error_code sceNpSignalingGetConnectionFromPeerAddress(u32 ctx_id, vm::ptr info) { - sceNp.todo("sceNpSignalingGetLocalNetInfo(ctx_id=%d, info=*0x%x)", ctx_id, info); + sceNp.warning("sceNpSignalingGetLocalNetInfo(ctx_id=%d, info=*0x%x)", ctx_id, info); const auto nph = g_fxo->get>(); @@ -4554,9 +4609,8 @@ error_code sceNpSignalingGetLocalNetInfo(u32 ctx_id, vm::ptrsize != sizeof(SceNpSignalingNetInfo)) { - // TODO: check info->size return SCE_NP_SIGNALING_ERROR_INVALID_ARGUMENT; } diff --git a/rpcs3/Emu/Cell/Modules/sceNp.h b/rpcs3/Emu/Cell/Modules/sceNp.h index ee61456739..d5917d6ef5 100644 --- a/rpcs3/Emu/Cell/Modules/sceNp.h +++ b/rpcs3/Emu/Cell/Modules/sceNp.h @@ -540,6 +540,8 @@ enum SCE_NP_MANAGER_STATUS_ONLINE = 3, }; +#define SCE_NP_MANAGER_EVENT_GOT_TICKET 255 + // Event types enum { @@ -1042,7 +1044,7 @@ struct SceNpEntitlementId }; // Callback for getting the connection status -using SceNpManagerCallback = void(s32 event, s32 result, u32 arg_addr); +using SceNpManagerCallback = void(s32 event, s32 result, vm::ptr arg); // Score data unique to the application struct SceNpScoreGameInfo @@ -1318,7 +1320,7 @@ struct SceNpScoreRecordOptParam using SceNpCustomMenuEventHandler = s32(s32 retCode, u32 index, vm::cptr npid, SceNpCustomMenuSelectedType type, vm::ptr arg); using SceNpBasicEventHandler = s32(s32 event, s32 retCode, u32 reqId, vm::ptr arg); using SceNpCommerceHandler = void(u32 ctx_id, u32 subject_id, s32 event, s32 error_code, vm::ptr arg); -using SceNpSignalingHandler = void(u32 ctx_id, u32 subject_id, s32 event, s32 error_code, u32 arg_addr); +using SceNpSignalingHandler = void(u32 ctx_id, u32 subject_id, s32 event, s32 error_code, vm::ptr arg); using SceNpFriendlistResultHandler = s32(s32 retCode, vm::ptr arg); using SceNpMatchingHandler = void(u32 ctx_id, u32 req_id, s32 event, s32 error_code, vm::ptr arg); using SceNpMatchingGUIHandler = void(u32 ctx_id, s32 event, s32 error_code, vm::ptr arg); diff --git a/rpcs3/Emu/Cell/Modules/sceNp2.cpp b/rpcs3/Emu/Cell/Modules/sceNp2.cpp index abb4eafa7c..61ae6f1b30 100644 --- a/rpcs3/Emu/Cell/Modules/sceNp2.cpp +++ b/rpcs3/Emu/Cell/Modules/sceNp2.cpp @@ -423,11 +423,25 @@ error_code sceNpMatching2SignalingGetConnectionStatus( return SCE_NP_MATCHING2_ERROR_NOT_INITIALIZED; } - const auto& infos = nph->get_peer_infos(ctxId, roomId, memberId); + if (!connStatus) + { + return SCE_NP_MATCHING2_ERROR_INVALID_ARGUMENT; + } - *connStatus = infos.connStatus; - (*peerAddr).np_s_addr = infos.addr; // infos.addr is already BE - *peerPort = std::bit_cast>(infos.port); // infos.port is already BE + const auto sigh = g_fxo->get>(); + const auto si = sigh->get_sig2_infos(roomId, memberId); + + *connStatus = si.connStatus; + + if (peerAddr) + { + (*peerAddr).np_s_addr = si.addr; // infos.addr is already BE + } + + if (peerPort) + { + *peerPort = si.port; + } return CELL_OK; } @@ -619,9 +633,10 @@ error_code sceNpMatching2SignalingGetConnectionInfo( } case 5: { - const auto& infos = nph->get_peer_infos(ctxId, roomId, memberId); - connInfo->address.port = infos.port; - connInfo->address.addr.np_s_addr = infos.addr; + const auto sigh = g_fxo->get>(); + const auto si = sigh->get_sig2_infos(roomId, memberId); + connInfo->address.port = std::bit_cast>(si.port); + connInfo->address.addr.np_s_addr = si.addr; break; } case 6: @@ -739,7 +754,7 @@ error_code sceNpMatching2GetServerInfo( error_code sceNpMatching2GetEventData(SceNpMatching2ContextId ctxId, SceNpMatching2EventKey eventKey, vm::ptr buf, u64 bufLen) { - sceNp2.todo("sceNpMatching2GetEventData(ctxId=%d, eventKey=%d, buf=*0x%x, bufLen=%d)", ctxId, eventKey, buf, bufLen); + sceNp2.notice("sceNpMatching2GetEventData(ctxId=%d, eventKey=%d, buf=*0x%x, bufLen=%d)", ctxId, eventKey, buf, bufLen); const auto nph = g_fxo->get>(); @@ -1058,9 +1073,12 @@ error_code sceNpMatching2GetServerIdListLocal(SceNpMatching2ContextId ctxId, vm: u32 num_servs = std::min(static_cast(slist.size()), serverIdNum); - for (u32 i = 0; i < num_servs; i++) + if (serverId) { - serverId[i] = slist[i]; + for (u32 i = 0; i < num_servs; i++) + { + serverId[i] = slist[i]; + } } return not_an_error(static_cast(num_servs)); @@ -1142,7 +1160,7 @@ error_code sceNpMatching2GetSignalingOptParamLocal(SceNpMatching2ContextId ctxId error_code sceNpMatching2RegisterSignalingCallback(SceNpMatching2ContextId ctxId, vm::ptr cbFunc, vm::ptr cbFuncArg) { - sceNp2.todo("sceNpMatching2RegisterSignalingCallback(ctxId=%d, cbFunc=*0x%x, cbFuncArg=*0x%x)", ctxId, cbFunc, cbFuncArg); + sceNp2.notice("sceNpMatching2RegisterSignalingCallback(ctxId=%d, cbFunc=*0x%x, cbFuncArg=*0x%x)", ctxId, cbFunc, cbFuncArg); const auto nph = g_fxo->get>(); @@ -1151,9 +1169,8 @@ error_code sceNpMatching2RegisterSignalingCallback(SceNpMatching2ContextId ctxId return SCE_NP_MATCHING2_ERROR_NOT_INITIALIZED; } - nph->signal_event_cb = cbFunc; - nph->signal_event_cb_ctx = ctxId; - nph->signal_event_cb_arg = cbFuncArg; + const auto sigh = g_fxo->get>(); + sigh->set_sig2_cb(ctxId, cbFunc, cbFuncArg); return CELL_OK; } diff --git a/rpcs3/Emu/Cell/lv2/sys_net.cpp b/rpcs3/Emu/Cell/lv2/sys_net.cpp index 0011c33aae..bb6c6cb8c7 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_net.cpp @@ -26,7 +26,6 @@ #include "Emu/NP/np_handler.h" #include -#include #include LOG_CHANNEL(sys_net); @@ -253,7 +252,7 @@ static void network_clear_queue(ppu_thread& ppu) } // Object in charge of retransmiting packets for STREAM_P2P sockets -class tcp_timeout_monitor +class tcp_timeout_monitor : public need_wakeup { public: void add_message(s32 sock_id, const sockaddr_in *dst, std::vector data, u64 seq) @@ -306,7 +305,7 @@ public: else wakey.wait(lock); - if (abort) + if (thread_ctrl::state() == thread_state::aborting) return; const auto now = std::chrono::system_clock::now(); @@ -314,88 +313,88 @@ public: std::set rtt_increased; for (auto it = msgs.begin(); it != msgs.end();) { - if (it->first >= now) - { - // reply is late, increases rtt - auto& msg = it->second; - const auto addr = msg.dst_addr.sin_addr.s_addr; - rtt_info rtt = rtts[msg.sock_id]; - // Only increases rtt once per loop(in case a big number of packets are sent at once) - if (!rtt_increased.count(msg.sock_id)) - { - rtt.num_retries += 1; - // Increases current rtt by 10% - rtt.rtt_time *= 1.1; - rtts[addr] = rtt; - - rtt_increased.emplace(msg.sock_id); - } - - if (rtt.num_retries >= 10) - { - // Too many retries, need to notify the socket that the connection is dead - idm::check(msg.sock_id, [&](lv2_socket& sock) - { - sock.p2ps.status = lv2_socket::p2ps_i::stream_status::stream_closed; - }); - it = msgs.erase(it); - continue; - } - - // resend the message - const auto res = idm::check(msg.sock_id, [&](lv2_socket& sock) -> bool - { - if (sendto(sock.socket, reinterpret_cast(msg.data.data()), msg.data.size(), 0, reinterpret_cast(&msg.dst_addr), sizeof(msg.dst_addr)) == -1) - { - sock.p2ps.status = lv2_socket::p2ps_i::stream_status::stream_closed; - return false; - } - return true; - }); - - if (!res || !res.ret) - { - it = msgs.erase(it); - continue; - } - - // Update key timeout - msgs.insert(std::make_pair(now + rtt.rtt_time, std::move(msg))); - it = msgs.erase(it); - } - else - { + if (it->first > now) break; + + // reply is late, increases rtt + auto& msg = it->second; + const auto addr = msg.dst_addr.sin_addr.s_addr; + rtt_info rtt = rtts[msg.sock_id]; + // Only increases rtt once per loop(in case a big number of packets are sent at once) + if (!rtt_increased.count(msg.sock_id)) + { + rtt.num_retries += 1; + // Increases current rtt by 10% + rtt.rtt_time += (rtt.rtt_time / 10); + rtts[addr] = rtt; + + rtt_increased.emplace(msg.sock_id); } + + if (rtt.num_retries >= 10) + { + // Too many retries, need to notify the socket that the connection is dead + idm::check(msg.sock_id, [&](lv2_socket& sock) + { + sock.p2ps.status = lv2_socket::p2ps_i::stream_status::stream_closed; + }); + it = msgs.erase(it); + continue; + } + + // resend the message + const auto res = idm::check(msg.sock_id, [&](lv2_socket& sock) -> bool + { + if (sendto(sock.socket, reinterpret_cast(msg.data.data()), msg.data.size(), 0, reinterpret_cast(&msg.dst_addr), sizeof(msg.dst_addr)) == -1) + { + sock.p2ps.status = lv2_socket::p2ps_i::stream_status::stream_closed; + return false; + } + return true; + }); + + if (!res || !res.ret) + { + it = msgs.erase(it); + continue; + } + + // Update key timeout + msgs.insert(std::make_pair(now + rtt.rtt_time, std::move(msg))); + it = msgs.erase(it); } } } -public: - std::condition_variable wakey; - static constexpr auto thread_name = "Tcp Over Udp Timeout Manager Thread"sv; - std::atomic abort = false; + void wake_up() + { + wakey.notify_one(); + } -private: - std::mutex data_mutex; - // List of outgoing messages - struct message - { - s32 sock_id; - ::sockaddr_in dst_addr; - std::vector data; - u64 seq; - std::chrono::time_point initial_sendtime; + public: + static constexpr auto thread_name = "Tcp Over Udp Timeout Manager Thread"sv; + + private: + std::condition_variable wakey; + std::mutex data_mutex; + // List of outgoing messages + struct message + { + s32 sock_id; + ::sockaddr_in dst_addr; + std::vector data; + u64 seq; + std::chrono::time_point initial_sendtime; + }; + std::map, message> msgs; // (wakeup time, msg) + // List of rtts + struct rtt_info + { + unsigned long num_retries = 0; + std::chrono::milliseconds rtt_time = 50ms; + }; + std::unordered_map rtts; // (sock_id, rtt) }; - std::map, message> msgs; // (wakeup time, msg) - // List of rtts - struct rtt_info - { - unsigned long num_retries = 0; - std::chrono::milliseconds rtt_time = 50ms; - }; - std::unordered_map rtts; // (sock_id, rtt) -}; struct nt_p2p_port { @@ -412,12 +411,13 @@ struct nt_p2p_port // Queued messages from RPCN shared_mutex s_rpcn_mutex; - std::queue> rpcn_msgs{}; + std::vector> rpcn_msgs{}; + // Queued signaling messages + shared_mutex s_sign_mutex; + std::vector, std::vector>> sign_msgs{}; std::array p2p_recv_data{}; - std::mt19937 randgen; - nt_p2p_port(u16 port) : port(port) { // Creates and bind P2P Socket @@ -447,10 +447,6 @@ struct nt_p2p_port sys_net.fatal("Failed to bind DGRAM socket to %d for P2P!", port); sys_net.notice("P2P port %d was bound!", port); - - // Initializes random generator - std::random_device rd; - randgen.seed(rd()); } ~nt_p2p_port() @@ -492,15 +488,20 @@ struct nt_p2p_port memcpy(packet_data+sizeof(u16), &header, sizeof(lv2_socket::p2ps_i::encapsulated_tcp)); if(datasize) memcpy(packet_data+sizeof(u16)+sizeof(lv2_socket::p2ps_i::encapsulated_tcp), data, datasize); + + auto* hdr_ptr = reinterpret_cast(packet_data+sizeof(u16)); + hdr_ptr->checksum = 0; + hdr_ptr->checksum = tcp_checksum(reinterpret_cast(hdr_ptr), sizeof(lv2_socket::p2ps_i::encapsulated_tcp) + datasize); return packet; } static void send_u2s_packet(lv2_socket &sock, s32 sock_id, std::vector data, const ::sockaddr_in* dst, u32 seq = 0, bool require_ack = true) { - if (sendto(sock.socket, reinterpret_cast(data.data()), data.size(), 0, reinterpret_cast(&dst), sizeof(sockaddr)) == -1) + sys_net.trace("Sending U2S packet on socket %d(id:%d): data(%d, seq %d, require_ack %d) to %s:%d", sock.socket, sock_id, data.size(), seq, require_ack, inet_ntoa(dst->sin_addr), std::bit_cast>(dst->sin_port)); + if (sendto(sock.socket, reinterpret_cast(data.data()), data.size(), 0, reinterpret_cast(dst), sizeof(sockaddr_in)) == -1) { - sys_net.warning("Attempting to send a u2s packet failed, closing socket!"); + sys_net.error("Attempting to send a u2s packet failed(%s), closing socket!", get_last_error(false)); sock.p2ps.status = lv2_socket::p2ps_i::stream_status::stream_closed; return; } @@ -513,6 +514,12 @@ struct nt_p2p_port } } + void dump_packet(lv2_socket::p2ps_i::encapsulated_tcp* tcph) + { + const std::string result = fmt::format("src_port: %d\ndst_port: %d\nflags: %d\nseq: %d\nack: %d\nlen: %d", tcph->src_port, tcph->dst_port, tcph->flags, tcph->seq, tcph->ack, tcph->length); + sys_net.trace("PACKET DUMP:\n%s", result); + } + bool handle_connected(s32 sock_id, lv2_socket::p2ps_i::encapsulated_tcp* tcp_header, u8* data, ::sockaddr_storage* op_addr) { const auto sock = idm::check(sock_id, [&](lv2_socket& sock) -> bool @@ -521,6 +528,8 @@ struct nt_p2p_port if (sock.p2ps.status != lv2_socket::p2ps_i::stream_status::stream_connected && sock.p2ps.status != lv2_socket::p2ps_i::stream_status::stream_handshaking) return false; + + dump_packet(tcp_header); if (tcp_header->flags == lv2_socket::p2ps_i::ACK) { @@ -531,9 +540,9 @@ struct nt_p2p_port auto send_ack = [&]() { auto final_ack = sock.p2ps.data_beg_seq; - while (sock.p2ps.data_mapping.count(final_ack)) + while (sock.p2ps.received_data.count(final_ack)) { - final_ack += sock.p2ps.data_mapping[final_ack]; + final_ack += sock.p2ps.received_data.at(final_ack).size(); } sock.p2ps.data_available = final_ack - sock.p2ps.data_beg_seq; @@ -542,25 +551,17 @@ struct nt_p2p_port send_hdr.dst_port = tcp_header->src_port; send_hdr.flags = lv2_socket::p2ps_i::ACK; send_hdr.ack = final_ack; - send_hdr.length = 0; - send_hdr.checksum = nt_p2p_port::tcp_checksum(reinterpret_cast(&send_hdr), sizeof(lv2_socket::p2ps_i::encapsulated_tcp)); auto packet = generate_u2s_packet(send_hdr, nullptr, 0); + sys_net.trace("Sent ack %d", final_ack); send_u2s_packet(sock, sock_id, std::move(packet), reinterpret_cast<::sockaddr_in*>(op_addr), 0, false); }; - if (tcp_header->seq < sock.p2ps.data_beg_seq) - { - // Data has already been processed - if (tcp_header->flags != lv2_socket::p2ps_i::ACK && tcp_header->flags != lv2_socket::p2ps_i::RST) - send_ack(); - return true; - } - if (sock.p2ps.status == lv2_socket::p2ps_i::stream_status::stream_handshaking) { // Only expect SYN|ACK if (tcp_header->flags == (lv2_socket::p2ps_i::SYN | lv2_socket::p2ps_i::ACK)) { + sys_net.trace("Received SYN|ACK, status is now connected"); sock.p2ps.data_beg_seq = tcp_header->seq + 1; sock.p2ps.status = lv2_socket::p2ps_i::stream_status::stream_connected; send_ack(); @@ -570,25 +571,28 @@ struct nt_p2p_port } else if (sock.p2ps.status == lv2_socket::p2ps_i::stream_status::stream_connected) { + if (tcp_header->seq < sock.p2ps.data_beg_seq) + { + // Data has already been processed + sys_net.trace("Data has already been processed"); + if (tcp_header->flags != lv2_socket::p2ps_i::ACK && tcp_header->flags != lv2_socket::p2ps_i::RST) + send_ack(); + return true; + } + switch (tcp_header->flags) { case lv2_socket::p2ps_i::PSH: case 0: { - const auto offset = tcp_header->seq - sock.p2ps.data_beg_seq; - if ((offset + tcp_header->length) > lv2_socket::p2ps_i::MAX_RECEIVED_BUFFER) - { - // Data is too far ahead(max 10mb of input cache), ignore the packet - return true; - } - if (!sock.p2ps.data_mapping.count(tcp_header->seq)) + if (!sock.p2ps.received_data.count(tcp_header->seq)) { // New data - if ((offset + tcp_header->length) < sock.p2ps.received_data.size()) - sock.p2ps.received_data.resize(offset + tcp_header->length); - - memcpy(sock.p2ps.received_data.data() + offset, data, tcp_header->length); - sock.p2ps.data_mapping.insert(std::make_pair(tcp_header->seq, tcp_header->length)); + sock.p2ps.received_data.emplace(tcp_header->seq, std::vector(data, data + tcp_header->length)); + } + else + { + sys_net.trace("Data was not new!"); } send_ack(); @@ -629,27 +633,25 @@ struct nt_p2p_port return false; // Only valid packet - if (tcp_header->flags != lv2_socket::p2ps_i::SYN && sock->p2ps.backlog.size() < sock->p2ps.max_backlog) + if (tcp_header->flags == lv2_socket::p2ps_i::SYN && sock->p2ps.backlog.size() < sock->p2ps.max_backlog) { // Yes, new connection and a backlog is available, create a new lv2_socket for it and send SYN|ACK // Prepare reply packet + sys_net.notice("Received connection on listening STREAM-P2P socket!"); lv2_socket::p2ps_i::encapsulated_tcp send_hdr; send_hdr.src_port = tcp_header->dst_port; send_hdr.dst_port = tcp_header->src_port; send_hdr.flags = lv2_socket::p2ps_i::SYN | lv2_socket::p2ps_i::ACK; send_hdr.ack = tcp_header->seq + 1; - send_hdr.length = 0; // Generates random starting SEQ - std::uniform_int_distribution seq_gen(std::numeric_limits::min(), std::numeric_limits::max()); - send_hdr.seq = seq_gen(randgen); - send_hdr.checksum = nt_p2p_port::tcp_checksum(reinterpret_cast(&send_hdr), sizeof(lv2_socket::p2ps_i::encapsulated_tcp)); + send_hdr.seq = rand(); // Create new socket auto sock_lv2 = std::make_shared(0, SYS_NET_SOCK_STREAM_P2P, SYS_NET_AF_INET); sock_lv2->socket = sock->socket; sock_lv2->p2p.port = sock->p2p.port; sock_lv2->p2p.vport = sock->p2p.vport; - sock_lv2->p2ps.op_addr = std::bit_cast>((reinterpret_cast(op_addr)->sin_addr.s_addr)); + sock_lv2->p2ps.op_addr = reinterpret_cast(op_addr)->sin_addr.s_addr; sock_lv2->p2ps.op_port = std::bit_cast>((reinterpret_cast(op_addr)->sin_port)); sock_lv2->p2ps.op_vport = tcp_header->src_port; sock_lv2->p2ps.cur_seq = send_hdr.seq + 1; @@ -670,12 +672,11 @@ struct nt_p2p_port else if (tcp_header->flags == lv2_socket::p2ps_i::SYN) { // Send a RST packet on backlog full + sys_net.trace("Backlog was full, sent a RST packet"); lv2_socket::p2ps_i::encapsulated_tcp send_hdr; send_hdr.src_port = tcp_header->dst_port; send_hdr.dst_port = tcp_header->src_port; send_hdr.flags = lv2_socket::p2ps_i::RST; - send_hdr.length = 0; - send_hdr.checksum = nt_p2p_port::tcp_checksum(reinterpret_cast(&send_hdr), sizeof(lv2_socket::p2ps_i::encapsulated_tcp)); auto packet = generate_u2s_packet(send_hdr, nullptr, 0); send_u2s_packet(*sock, sock_id, std::move(packet), reinterpret_cast<::sockaddr_in*>(op_addr), 0, false); } @@ -685,7 +686,7 @@ struct nt_p2p_port return true; } - void recv_data() + bool recv_data() { ::sockaddr_storage native_addr; ::socklen_t native_addrlen = sizeof(native_addr); @@ -693,14 +694,17 @@ struct nt_p2p_port if (recv_res == -1) { - sys_net.error("Error recvfrom on P2P socket: %d", get_last_error(false)); - return; + auto lerr = get_last_error(false); + if (lerr != SYS_NET_EINPROGRESS && lerr != SYS_NET_EWOULDBLOCK) + sys_net.error("Error recvfrom on P2P socket: %d", lerr); + + return false; } if (recv_res < static_cast(sizeof(u16))) { sys_net.error("Received badly formed packet on P2P port(no vport)!"); - return; + return true; } u16 dst_vport = reinterpret_cast&>(p2p_recv_data[0]); @@ -711,8 +715,29 @@ struct nt_p2p_port memcpy(rpcn_msg.data(), p2p_recv_data.data() + sizeof(u16), recv_res - sizeof(u16)); std::lock_guard lock(s_rpcn_mutex); - rpcn_msgs.push(std::move(rpcn_msg)); - return; + rpcn_msgs.push_back(std::move(rpcn_msg)); + return true; + } + + if (dst_vport == 65535) // Reserved for signaling + { + std::vector sign_msg(recv_res - sizeof(u16)); + memcpy(sign_msg.data(), p2p_recv_data.data() + sizeof(u16), recv_res - sizeof(u16)); + + std::pair, std::vector> msg; + msg.first.first = reinterpret_cast(&native_addr)->sin_addr.s_addr; + msg.first.second = std::bit_cast>(reinterpret_cast(&native_addr)->sin_port); + msg.second = std::move(sign_msg); + + { + std::lock_guard lock(s_sign_mutex); + sign_msgs.push_back(std::move(msg)); + } + + const auto sigh = g_fxo->get>(); + sigh->wake_up(); + + return true; } { @@ -743,11 +768,11 @@ struct nt_p2p_port if (!sock) bound_p2p_vports.erase(dst_vport); - return; + return true; } } - // Not directed at a bound DGRAM_P2P vport so check if the packet is a STREAM_P2P packet + // Not directed at a bound DGRAM_P2P vport so check if the packet is a STREAM-P2P packet const auto sp_size = recv_res - sizeof(u16); u8 *sp_data = p2p_recv_data.data() + sizeof(u16); @@ -755,7 +780,7 @@ struct nt_p2p_port if (sp_size < sizeof(lv2_socket::p2ps_i::encapsulated_tcp)) { sys_net.trace("Received P2P packet targeted at unbound vport(likely) or invalid"); - return; + return true; } auto* tcp_header = reinterpret_cast(sp_data); @@ -764,20 +789,20 @@ struct nt_p2p_port if (tcp_header->signature != lv2_socket::p2ps_i::U2S_sig) { sys_net.trace("Received P2P packet targeted at unbound vport"); - return; + return true; } if (tcp_header->length != (sp_size - sizeof(lv2_socket::p2ps_i::encapsulated_tcp))) { - sys_net.error("Received STREAM_P2P packet tcp length didn't match packet length"); - return; + sys_net.error("Received STREAM-P2P packet tcp length didn't match packet length"); + return true; } // Sanity check if (tcp_header->dst_port != dst_vport) { - sys_net.error("Received STREAM_P2P packet with dst_port != vport"); - return; + sys_net.error("Received STREAM-P2P packet with dst_port != vport"); + return true; } // Validate checksum @@ -786,7 +811,7 @@ struct nt_p2p_port if (given_checksum != nt_p2p_port::tcp_checksum(reinterpret_cast(sp_data), sp_size)) { sys_net.error("Checksum is invalid, dropping packet!"); - return; + return true; } // The packet is valid, check if it's bound @@ -798,19 +823,22 @@ struct nt_p2p_port if (bound_p2p_streams.count(key_connected)) { const auto sock_id = bound_p2p_streams.at(key_connected); + sys_net.trace("Received packet for connected STREAM-P2P socket(s=%d)", sock_id); handle_connected(sock_id, tcp_header, sp_data + sizeof(lv2_socket::p2ps_i::encapsulated_tcp), &native_addr); - return; + return true; } if(bound_p2p_streams.count(key_listening)) { const auto sock_id = bound_p2p_streams.at(key_listening); + sys_net.trace("Received packet for listening STREAM-P2P socket(s=%d)", sock_id); handle_listening(sock_id, tcp_header, sp_data + sizeof(lv2_socket::p2ps_i::encapsulated_tcp), &native_addr); - return; + return true; } } - sys_net.trace("Received a P2P_STREAM packet with no bound target"); + sys_net.trace("Received a STREAM-P2P packet with no bound target"); + return true; } }; @@ -838,9 +866,6 @@ struct network_thread #ifdef _WIN32 WSACleanup(); #endif - auto tcpm = g_fxo->get>(); - tcpm->abort = true; - tcpm->wakey.notify_one(); } void operator()() @@ -894,7 +919,7 @@ struct network_thread { if ((p2p_fd[fd_index].revents & POLLIN) == POLLIN || (p2p_fd[fd_index].revents & POLLRDNORM) == POLLRDNORM) { - p2p_port.second.recv_data(); + while(p2p_port.second.recv_data()); } fd_index++; } @@ -1010,9 +1035,9 @@ s32 send_packet_from_p2p_port(const std::vector& data, const sockaddr_in& ad return res; } -std::queue> get_rpcn_msgs() +std::vector> get_rpcn_msgs() { - auto msgs = std::queue>(); + auto msgs = std::vector>(); const auto nc = g_fxo->get(); { std::lock_guard list_lock(nc->list_p2p_ports_mutex); @@ -1020,13 +1045,31 @@ std::queue> get_rpcn_msgs() { std::lock_guard lock(def_port.s_rpcn_mutex); msgs = std::move(def_port.rpcn_msgs); - def_port.rpcn_msgs = std::queue>(); + def_port.rpcn_msgs.clear(); } } return msgs; } +std::vector, std::vector>> get_sign_msgs() +{ + auto msgs = std::vector, std::vector>>(); + const auto nc = g_fxo->get(); + { + std::lock_guard list_lock(nc->list_p2p_ports_mutex); + auto& def_port = nc->list_p2p_ports.at(3658); + { + std::lock_guard lock(def_port.s_sign_mutex); + msgs = std::move(def_port.sign_msgs); + def_port.sign_msgs.clear(); + } + } + + return msgs; +} + + lv2_socket::lv2_socket(lv2_socket::socket_type s, s32 s_type, s32 family) : socket(s), type{s_type}, family{family} { @@ -1044,7 +1087,7 @@ lv2_socket::lv2_socket(lv2_socket::socket_type s, s32 s_type, s32 family) lv2_socket::~lv2_socket() { - if (type != SYS_NET_SOCK_DGRAM_P2P) + if (type != SYS_NET_SOCK_DGRAM_P2P && type != SYS_NET_SOCK_STREAM_P2P) { #ifdef _WIN32 ::closesocket(socket); @@ -1084,6 +1127,8 @@ error_code sys_net_bnet_accept(ppu_thread& ppu, s32 s, vm::ptr return false; } + sys_net.trace("Found a socket in backlog!"); + p2ps = true; result = sock.p2ps.backlog.front(); sock.p2ps.backlog.pop(); @@ -1158,7 +1203,7 @@ error_code sys_net_bnet_accept(ppu_thread& ppu, s32 s, vm::ptr if (p2ps) { - return result; + return not_an_error(result); } if (!sock.ret) @@ -1415,6 +1460,7 @@ error_code sys_net_bnet_connect(ppu_thread& ppu, s32 s, vm::ptr(&send_hdr), sizeof(lv2_socket::p2ps_i::encapsulated_tcp)); // sock.socket = p2p_socket; + sock.p2ps.op_addr = name.sin_addr.s_addr; sock.p2ps.op_port = dst_port; sock.p2ps.op_vport = dst_vport; sock.p2ps.cur_seq = send_hdr.seq + 1; sock.p2ps.data_beg_seq = 0; + sock.p2ps.data_available = 0; + sock.p2ps.received_data.clear(); sock.p2ps.status = lv2_socket::p2ps_i::stream_status::stream_handshaking; packet = nt_p2p_port::generate_u2s_packet(send_hdr, nullptr, 0); @@ -2089,29 +2137,32 @@ error_code sys_net_bnet_recvfrom(ppu_thread& ppu, s32 s, vm::ptr buf, u32 return false; } - native_result = std::min(sock.p2ps.data_available, len); - memcpy(buf.get_ptr(), sock.p2ps.received_data.data(), native_result); + const u32 to_give = std::min(sock.p2ps.data_available, len); + sys_net.trace("STREAM-P2P socket had %d available, given %d", sock.p2ps.data_available, to_give); - sock.p2ps.data_beg_seq += native_result; - for(auto it = sock.p2ps.data_mapping.begin(); it != sock.p2ps.data_mapping.end();) + u32 left_to_give = to_give; + while (left_to_give) { - if ((it->first + it->second) <= sock.p2ps.data_beg_seq) + auto& cur_data = sock.p2ps.received_data.begin()->second; + auto to_give_for_this_packet = std::min(static_cast(cur_data.size()), left_to_give); + memcpy(reinterpret_cast(buf.get_ptr()) + (to_give - left_to_give), cur_data.data(), to_give_for_this_packet); + if (cur_data.size() != to_give_for_this_packet) { - it = sock.p2ps.data_mapping.erase(it); - continue; + auto amount_left = cur_data.size() - to_give_for_this_packet; + std::vector new_vec(amount_left); + memcpy(new_vec.data(), cur_data.data() + to_give_for_this_packet, amount_left); + auto new_key = (sock.p2ps.received_data.begin()->first) + to_give_for_this_packet; + sock.p2ps.received_data.emplace(new_key, std::move(new_vec)); } - if (it->first < sock.p2ps.data_beg_seq) - { - auto new_size = (it->first + it->second) - sock.p2ps.data_beg_seq; - sock.p2ps.data_mapping.erase(it); - sock.p2ps.data_mapping.emplace(sock.p2ps.data_beg_seq, new_size); - } + sock.p2ps.received_data.erase(sock.p2ps.received_data.begin()); - break; + left_to_give -= to_give_for_this_packet; } - sock.p2ps.data_available -= native_result; + sock.p2ps.data_available -= to_give; + sock.p2ps.data_beg_seq += to_give; + native_result = to_give; return true; } @@ -2311,6 +2362,8 @@ error_code sys_net_bnet_sendto(ppu_thread& ppu, s32 s, vm::cptr buf, u32 l name.sin_family = AF_INET; name.sin_port = std::bit_cast(psa_in->sin_port); name.sin_addr.s_addr = std::bit_cast(psa_in->sin_addr); + + sys_net.trace("Sending to %s:%d", inet_ntoa(name.sin_addr), psa_in->sin_port); } ::socklen_t namelen = sizeof(name); @@ -2355,7 +2408,7 @@ error_code sys_net_bnet_sendto(ppu_thread& ppu, s32 s, vm::cptr buf, u32 l // Prepare address name.sin_family = AF_INET; name.sin_port = std::bit_cast>(sock.p2ps.op_port); - name.sin_addr.s_addr = std::bit_cast>(sock.p2ps.op_addr); + name.sin_addr.s_addr = sock.p2ps.op_addr; // Prepares encapsulated tcp lv2_socket::p2ps_i::encapsulated_tcp tcp_header; tcp_header.src_port = sock.p2p.vport; @@ -2366,7 +2419,6 @@ error_code sys_net_bnet_sendto(ppu_thread& ppu, s32 s, vm::cptr buf, u32 l while(cur_total_len > 0) { s64 cur_data_len; - std::vector cur_data; if (cur_total_len >= max_data_len) cur_data_len = max_data_len; else @@ -2374,43 +2426,26 @@ error_code sys_net_bnet_sendto(ppu_thread& ppu, s32 s, vm::cptr buf, u32 l tcp_header.length = cur_data_len; tcp_header.seq = sock.p2ps.cur_seq; - tcp_header.checksum = 0; - cur_data.resize(sizeof(u16) + sizeof(lv2_socket::p2ps_i::encapsulated_tcp) + cur_data_len); - reinterpret_cast&>(cur_data[0]) = sock.p2ps.op_vport; - memcpy(cur_data.data()+sizeof(u16), &tcp_header, sizeof(lv2_socket::p2ps_i::encapsulated_tcp)); - memcpy(cur_data.data()+sizeof(u16)+sizeof(lv2_socket::p2ps_i::encapsulated_tcp), &_buf[len - cur_total_len], cur_data_len); - - auto *tcp_pointer = reinterpret_cast(cur_data.data() + sizeof(u16)); - tcp_pointer->checksum = nt_p2p_port::tcp_checksum(reinterpret_cast(cur_data.data()+sizeof(u16)), cur_data_len+sizeof(lv2_socket::p2ps_i::encapsulated_tcp)); - - stream_packets.emplace_back(std::move(cur_data)); + auto packet = nt_p2p_port::generate_u2s_packet(tcp_header, &_buf[len - cur_total_len], cur_data_len); + nt_p2p_port::send_u2s_packet(sock, s, std::move(packet), &name, tcp_header.seq); cur_total_len -= cur_data_len; sock.p2ps.cur_seq += cur_data_len; } - // Send the packets - for (const auto &packet : stream_packets) - { - native_result = sendto(sock.socket, reinterpret_cast(packet.data()), packet.size(), 0, reinterpret_cast(&name), namelen); - if (native_result < 0) - { - result = get_last_error(!sock.so_nbio && (flags & SYS_NET_MSG_DONTWAIT) == 0); - sys_net.error("P2P Stream sendto error :%s!", result); - - if (result) - { - return false; - } - } - native_result = len; - return true; - } + + native_result = len; + return true; } //if (!(sock.events & lv2_socket::poll::write)) { const auto nph = g_fxo->get>(); + if (addr && type == SYS_NET_SOCK_DGRAM && psa_in->sin_port == 53) + { + nph->add_dns_spy(s); + } + if (nph->is_dns(s)) { const s32 ret_analyzer = nph->analyze_dns_packet(s, reinterpret_cast(_buf.data()), len); @@ -2873,7 +2908,7 @@ error_code sys_net_bnet_close(ppu_thread& ppu, s32 s) p2p_port.bound_p2p_vports.erase(sock->p2p.vport); } } - } + } const auto nph = g_fxo->get>(); nph->remove_dns_spy(s); diff --git a/rpcs3/Emu/Cell/lv2/sys_net.h b/rpcs3/Emu/Cell/lv2/sys_net.h index 7d69049b46..2a563a5b61 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net.h +++ b/rpcs3/Emu/Cell/lv2/sys_net.h @@ -380,7 +380,7 @@ struct lv2_socket final CWR = (1 << 7), }; - static constexpr be_t U2S_sig = static_cast('U') << 24 | static_cast('2') << 16 | static_cast('S') << 8 | static_cast('0'); + static constexpr be_t U2S_sig = (static_cast('U') << 24 | static_cast('2') << 16 | static_cast('S') << 8 | static_cast('0')); static constexpr std::size_t MAX_RECEIVED_BUFFER = (1024*1024*10); // P2P stream socket specific @@ -412,10 +412,9 @@ struct lv2_socket final u16 op_port = 0, op_vport = 0; u32 op_addr = 0; - std::vector received_data; // resized if needed, on recv will give all continuous data available and move data to beginning of vector u64 data_beg_seq = 0; // Seq of first byte of received_data u32 data_available = 0; // Amount of continuous data available(calculated on ACK send) - std::map data_mapping; // holds seq/size of data received + std::map> received_data; // holds seq/data of data received u32 cur_seq = 0; // SEQ of next packet to be sent } p2ps; diff --git a/rpcs3/Emu/NP/np_handler.cpp b/rpcs3/Emu/NP/np_handler.cpp index 557ace30a0..bc4bd005a6 100644 --- a/rpcs3/Emu/NP/np_handler.cpp +++ b/rpcs3/Emu/NP/np_handler.cpp @@ -209,6 +209,8 @@ void np_handler::init_NP(u32 poolsize, vm::ptr poolptr) ASSERT(!s_npid.empty()); // It should have been generated before this np_handler::string_to_npid(s_npid.c_str(), &npid); + const auto sigh = g_fxo->get>(); + sigh->set_self_sig_info(npid); } switch (g_cfg.net.psn_status) @@ -499,6 +501,36 @@ u32 np_handler::send_room_message(SceNpMatching2ContextId ctx_id, vm::cptr& reply_data) RoomDataInternal_to_SceNpMatching2RoomDataInternal(resp, room_info.get_ptr(), npid); - auto& info = p2p_info[room_info->roomId][1]; - info.connStatus = SCE_NP_SIGNALING_CONN_STATUS_ACTIVE; - info.addr = rpcn.get_addr_sig(); - info.port = rpcn.get_port_sig(); + // Establish Matching2 self signaling info + const auto sigh = g_fxo->get>(); + sigh->set_self_sig2_info(room_info->roomId, 1); + sigh->set_sig2_infos(room_info->roomId, 1, SCE_NP_SIGNALING_CONN_STATUS_ACTIVE, rpcn.get_addr_sig(), rpcn.get_port_sig(), true); + // TODO? Should this send a message to Signaling CB? Is this even necessary? extra_nps::print_create_room_resp(room_resp); @@ -680,10 +715,11 @@ bool np_handler::reply_join_room(u32 req_id, std::vector& reply_data) extra_nps::print_room_data_internal(room_resp->roomDataInternal.get_ptr()); - auto& info = p2p_info[room_info->roomId][member_id]; - info.connStatus = SCE_NP_SIGNALING_CONN_STATUS_ACTIVE; - info.addr = rpcn.get_addr_sig(); - info.port = rpcn.get_port_sig(); + // Establish Matching2 self signaling info + const auto sigh = g_fxo->get>(); + sigh->set_self_sig2_info(room_info->roomId, member_id); + sigh->set_sig2_infos(room_info->roomId, member_id, SCE_NP_SIGNALING_CONN_STATUS_ACTIVE, rpcn.get_addr_sig(), rpcn.get_port_sig(), true); + // TODO? Should this send a message to Signaling CB? Is this even necessary? sysutil_register_cb([=](ppu_thread& cb_ppu) -> s32 { cb_info.cb(cb_ppu, cb_info.ctx_id, req_id, SCE_NP_MATCHING2_REQUEST_EVENT_JoinRoom, event_key, 0, sizeof(SceNpMatching2JoinRoomResponse), cb_info.cb_arg); @@ -708,7 +744,9 @@ bool np_handler::reply_leave_room(u32 req_id, std::vector& reply_data) u32 event_key = get_event_key(); // Unsure if necessary if there is no data - p2p_info.erase(room_id); + // Disconnect all users from that room + const auto sigh = g_fxo->get>(); + sigh->disconnect_sig2_users(room_id); sysutil_register_cb([=](ppu_thread& cb_ppu) -> s32 { cb_info.cb(cb_ppu, cb_info.ctx_id, req_id, SCE_NP_MATCHING2_REQUEST_EVENT_LeaveRoom, event_key, 0, 0, cb_info.cb_arg); @@ -858,7 +896,50 @@ bool np_handler::reply_send_room_message(u32 req_id, std::vector& reply_data }); return true; +} +bool np_handler::reply_req_sign_infos(u32 req_id, std::vector& reply_data) +{ + if (!pending_sign_infos_requests.count(req_id)) + return error_and_disconnect("Unexpected reply ID to req RequestSignalingInfos"); + + u32 conn_id = pending_sign_infos_requests.at(req_id); + pending_sign_infos_requests.erase(req_id); + + vec_stream reply(reply_data, 1); + u32 addr = reply.get(); + u16 port = reply.get(); + + if (reply.is_error()) + return error_and_disconnect("Malformed reply to RequestSignalingInfos command"); + + const auto sigh = g_fxo->get>(); + sigh->start_sig(conn_id, addr, port); + + return true; +} + +bool np_handler::reply_req_ticket(u32 req_id, std::vector& reply_data) +{ + vec_stream reply(reply_data, 1); + auto ticket_raw = reply.get_rawdata(); + + if (reply.is_error()) + return error_and_disconnect("Malformed reply to RequestTicket command"); + + current_ticket = std::move(ticket_raw); + auto ticket_size = static_cast(current_ticket.size()); + + if (manager_cb) + { + sysutil_register_cb([manager_cb = this->manager_cb, ticket_size, manager_cb_arg = this->manager_cb_arg](ppu_thread& cb_ppu) -> s32 + { + manager_cb(cb_ppu, SCE_NP_MANAGER_EVENT_GOT_TICKET, ticket_size, manager_cb_arg); + return 0; + }); + } + + return true; } void np_handler::notif_user_joined_room(std::vector& data) @@ -879,6 +960,9 @@ void np_handler::notif_user_joined_room(std::vector& data) SceNpMatching2RoomMemberUpdateInfo* notif_data = reinterpret_cast(allocate_req_result(event_key, sizeof(SceNpMatching2RoomMemberUpdateInfo))); RoomMemberUpdateInfo_to_SceNpMatching2RoomMemberUpdateInfo(update_info, notif_data); + rpcn_log.notice("Received notification that user %s(%d) joined the room", notif_data->roomMemberDataInternal->userInfo.npId.handle.data, notif_data->roomMemberDataInternal->memberId); + extra_nps::print_room_member_data_internal(notif_data->roomMemberDataInternal.get_ptr()); + sysutil_register_cb([room_event_cb = this->room_event_cb, room_id, event_key, room_event_cb_ctx = this->room_event_cb_ctx, room_event_cb_arg = this->room_event_cb_arg](ppu_thread& cb_ppu) -> s32 { room_event_cb(cb_ppu, room_event_cb_ctx, room_id, SCE_NP_MATCHING2_ROOM_EVENT_MemberJoined, event_key, 0, sizeof(SceNpMatching2RoomMemberUpdateInfo), room_event_cb_arg); return 0; @@ -903,6 +987,9 @@ void np_handler::notif_user_left_room(std::vector& data) SceNpMatching2RoomMemberUpdateInfo* notif_data = reinterpret_cast(allocate_req_result(event_key, sizeof(SceNpMatching2RoomMemberUpdateInfo))); RoomMemberUpdateInfo_to_SceNpMatching2RoomMemberUpdateInfo(update_info, notif_data); + rpcn_log.notice("Received notification that user %s(%d) left the room", notif_data->roomMemberDataInternal->userInfo.npId.handle.data, notif_data->roomMemberDataInternal->memberId); + extra_nps::print_room_member_data_internal(notif_data->roomMemberDataInternal.get_ptr()); + sysutil_register_cb([room_event_cb = this->room_event_cb, room_event_cb_ctx = this->room_event_cb_ctx, room_id, event_key, room_event_cb_arg = this->room_event_cb_arg](ppu_thread& cb_ppu) -> s32 { room_event_cb(cb_ppu, room_event_cb_ctx, room_id, SCE_NP_MATCHING2_ROOM_EVENT_MemberLeft, event_key, 0, sizeof(SceNpMatching2RoomMemberUpdateInfo), room_event_cb_arg); return 0; @@ -927,44 +1014,32 @@ void np_handler::notif_room_destroyed(std::vector& data) SceNpMatching2RoomUpdateInfo* notif_data = reinterpret_cast(allocate_req_result(event_key, sizeof(SceNpMatching2RoomUpdateInfo))); RoomUpdateInfo_to_SceNpMatching2RoomUpdateInfo(update_info, notif_data); + const auto sigh = g_fxo->get>(); + sigh->disconnect_sig2_users(room_id); + sysutil_register_cb([room_event_cb = this->room_event_cb, room_event_cb_ctx = this->room_event_cb_ctx, room_id, event_key, room_event_cb_arg = this->room_event_cb_arg](ppu_thread& cb_ppu) -> s32 { room_event_cb(cb_ppu, room_event_cb_ctx, room_id, SCE_NP_MATCHING2_ROOM_EVENT_RoomDestroyed, event_key, 0, sizeof(SceNpMatching2RoomUpdateInfo), room_event_cb_arg); return 0; }); } -void np_handler::notif_p2p_established(std::vector& data) +void np_handler::notif_p2p_connect(std::vector& data) { if (data.size() != 16) { - rpcn_log.error("Notification data for SignalP2PEstablished != 14"); + rpcn_log.error("Notification data for SignalP2PConnect != 14"); return; } const u64 room_id = reinterpret_cast&>(data[0]); const u16 member_id = reinterpret_cast&>(data[8]); - const u16 port_p2p = reinterpret_cast&>(data[10]); + const u16 port_p2p = reinterpret_cast&>(data[10]); const u32 addr_p2p = reinterpret_cast&>(data[12]); - auto& info = p2p_info[room_id][member_id]; - info.connStatus = SCE_NP_SIGNALING_CONN_STATUS_ACTIVE; - info.addr = addr_p2p; - info.port = port_p2p; - - // Signal the callback - if (signal_event_cb) - { - sysutil_register_cb([signal_event_cb = this->signal_event_cb, signal_event_cb_ctx = this->signal_event_cb_ctx, room_id, member_id, signal_event_cb_arg = this->signal_event_cb_arg](ppu_thread& cb_ppu) -> s32 - { - signal_event_cb(cb_ppu, signal_event_cb_ctx, room_id, member_id, SCE_NP_MATCHING2_SIGNALING_EVENT_Established, 0, signal_event_cb_arg); - return 0; - }); - } - - in_addr da_addr; - da_addr.s_addr = addr_p2p; - - rpcn_log.notice("P2P Established(Room Id: %d | Member Id: %d): Address: [%s:%d]", room_id, member_id, inet_ntoa(da_addr), std::bit_cast>(port_p2p)); + // Attempt Signaling + const auto sigh = g_fxo->get>(); + sigh->set_sig2_infos(room_id, member_id, SCE_NP_SIGNALING_CONN_STATUS_PENDING, addr_p2p, port_p2p); + sigh->start_sig2(room_id, member_id); } void np_handler::notif_room_message_received(std::vector& data) @@ -996,11 +1071,6 @@ void np_handler::notif_room_message_received(std::vector& data) } } -const signaling_info& np_handler::get_peer_infos(u16 context_id, u64 room_id, u16 member_id) -{ - return p2p_info[room_id][member_id]; -} - void np_handler::add_dns_spy(u32 sock) { dns_spylist.emplace(std::make_pair(sock, std::queue>())); @@ -1136,6 +1206,15 @@ bool np_handler::destroy_score_context(s32 ctx_id) return idm::remove(static_cast(ctx_id)); } +s32 np_handler::create_score_transaction_context(s32 score_context_id) +{ + return static_cast(idm::make(score_context_id)); +} +bool np_handler::destroy_score_transaction_context(s32 ctx_id) +{ + return idm::remove(static_cast(ctx_id)); +} + u16 np_handler::create_match2_context(vm::cptr communicationId, vm::cptr passphrase) { return static_cast(idm::make(communicationId, passphrase)); @@ -1189,8 +1268,6 @@ bool np_handler::destroy_signaling_context(s32 ctx_id) return idm::remove(static_cast(ctx_id)); } - - bool np_handler::error_and_disconnect(const std::string& error_msg) { rpcn_log.error("%s", error_msg); diff --git a/rpcs3/Emu/NP/np_handler.h b/rpcs3/Emu/NP/np_handler.h index f4ef7910e3..68ceac5a7f 100644 --- a/rpcs3/Emu/NP/np_handler.h +++ b/rpcs3/Emu/NP/np_handler.h @@ -10,13 +10,7 @@ #include "Emu/NP/rpcn_client.h" #include "generated/np2_structs_generated.h" - -struct signaling_info -{ - int connStatus = SCE_NP_SIGNALING_CONN_STATUS_INACTIVE; - u32 addr = 0; - u16 port = 0; -}; +#include "signaling_handler.h" class np_handler { @@ -55,8 +49,8 @@ public: UserJoinedRoom, UserLeftRoom, RoomDestroyed, - SignalP2PEstablished, - _SignalP2PDisconnected, + SignalP2PConnect, + _SignalP2PDisconnect, RoomMessageReceived, }; @@ -87,9 +81,6 @@ public: vm::ptr room_event_cb{}; // Room events u16 room_event_cb_ctx = 0; vm::ptr room_event_cb_arg{}; - vm::ptr signal_event_cb{}; // Room events - u16 signal_event_cb_ctx = 0; - vm::ptr signal_event_cb_arg{}; vm::ptr room_msg_cb{}; u16 room_msg_cb_ctx = 0; vm::ptr room_msg_cb_arg{}; @@ -112,6 +103,21 @@ public: s32 create_score_context(vm::cptr communicationId, vm::cptr passphrase); bool destroy_score_context(s32 ctx_id); + struct score_transaction_ctx + { + score_transaction_ctx(s32 score_context_id) + { + this->score_context_id = score_context_id; + } + + static const u32 id_base = 1; + static const u32 id_step = 1; + static const u32 id_count = 32; + s32 score_context_id = 0; + }; + s32 create_score_transaction_context(s32 score_context_id); + bool destroy_score_transaction_context(s32 ctx_id); + // Match2 related struct match2_ctx { @@ -230,11 +236,15 @@ public: u32 send_room_message(SceNpMatching2ContextId ctx_id, vm::cptr optParam, const SceNpMatching2SendRoomMessageRequest* req); u32 get_match2_event(SceNpMatching2EventKey event_key, u8* dest, u32 size); - const signaling_info& get_peer_infos(u16 context_id, u64 room_id, u16 member_id); // Misc stuff + void req_ticket(u32 version, const SceNpId *npid, const char *service_id, const u8 *cookie, u32 cookie_size, const char *entitlement_id, u32 consumed_count); + const std::vector& get_ticket() { return current_ticket; } u32 add_players_to_history(vm::cptr npids, u32 count); + // For signaling + void req_sign_infos(const std::string& npid, u32 conn_id); + static constexpr std::string_view thread_name = "NP Handler Thread"; protected: @@ -246,7 +256,7 @@ protected: void notif_user_joined_room(std::vector& data); void notif_user_left_room(std::vector& data); void notif_room_destroyed(std::vector& data); - void notif_p2p_established(std::vector& data); + void notif_p2p_connect(std::vector& data); void notif_room_message_received(std::vector& data); // Reply handlers @@ -260,6 +270,8 @@ protected: bool reply_set_roomdata_internal(u32 req_id, std::vector& reply_data); bool reply_get_ping_info(u32 req_id, std::vector& reply_data); bool reply_send_room_message(u32 req_id, std::vector& reply_data); + bool reply_req_sign_infos(u32 req_id, std::vector& reply_data); + bool reply_req_ticket(u32 req_id, std::vector& reply_data); // Helper functions(fb=>np2) void BinAttr_to_SceNpMatching2BinAttr(const flatbuffers::Vector>* fb_attr, vm::ptr binattr_info); @@ -279,12 +291,15 @@ protected: }; u32 generate_callback_info(SceNpMatching2ContextId ctx_id, vm::cptr optParam); - std::map pending_requests; + std::unordered_map pending_requests; + std::unordered_map pending_sign_infos_requests; protected: bool is_connected = false; bool is_psn_active = false; + std::vector current_ticket; + // IP & DNS info be_t local_ip_addr{}; be_t public_ip_addr{}; @@ -306,14 +321,8 @@ protected: std::map mpool_allocs{}; // offset/size vm::addr_t allocate(u32 size); - // Memory pool static objects( room_id , internals ) - std::map> room_infos; - - // Signal P2P infos (room_id / user_id) - std::map> p2p_info{}; - // Requests(reqEventKey : data) - std::map> match2_req_results{}; + std::unordered_map> match2_req_results{}; atomic_t match2_low_reqid_cnt = 1; atomic_t match2_event_cnt = 1; u32 get_req_id(u16 app_req) diff --git a/rpcs3/Emu/NP/np_structs_extra.cpp b/rpcs3/Emu/NP/np_structs_extra.cpp index acadc8cd3f..234677155d 100644 --- a/rpcs3/Emu/NP/np_structs_extra.cpp +++ b/rpcs3/Emu/NP/np_structs_extra.cpp @@ -160,7 +160,12 @@ namespace extra_nps sceNp2.warning("maxSlot: %d", room->maxSlot); sceNp2.warning("members: *0x%x", room->memberList.members); - print_room_member_data_internal(room->memberList.members.get_ptr()); + auto cur_member = room->memberList.members; + while (cur_member) + { + print_room_member_data_internal(cur_member.get_ptr()); + cur_member = cur_member->next; + } sceNp2.warning("membersNum: %d", room->memberList.membersNum); sceNp2.warning("me: *0x%x", room->memberList.me); sceNp2.warning("owner: *0x%x", room->memberList.owner); diff --git a/rpcs3/Emu/NP/rpcn_client.cpp b/rpcs3/Emu/NP/rpcn_client.cpp index b6251d6c8f..94324b5b79 100644 --- a/rpcs3/Emu/NP/rpcn_client.cpp +++ b/rpcs3/Emu/NP/rpcn_client.cpp @@ -33,7 +33,7 @@ LOG_CHANNEL(rpcn_log, "rpcn"); -#define RPCN_PROTOCOL_VERSION 7 +#define RPCN_PROTOCOL_VERSION 9 #define RPCN_HEADER_SIZE 9 rpcn_client::rpcn_client(bool in_config) @@ -398,7 +398,7 @@ bool rpcn_client::create_user(const std::string& npid, const std::string& passwo } s32 send_packet_from_p2p_port(const std::vector& data, const sockaddr_in& addr); -std::queue> get_rpcn_msgs(); +std::vector> get_rpcn_msgs(); bool rpcn_client::manage_connection() { @@ -412,14 +412,12 @@ bool rpcn_client::manage_connection() auto rpcn_msgs = get_rpcn_msgs(); - while (!rpcn_msgs.empty()) + for (const auto& msg : rpcn_msgs) { - const auto& msg = rpcn_msgs.front(); - if (msg.size() == 6) { - addr_sig = reinterpret_cast(msg[0]); - port_sig = reinterpret_cast(msg[4]); + addr_sig = reinterpret_cast&>(msg[0]); + port_sig = reinterpret_cast&>(msg[4]); in_addr orig{}; orig.s_addr = addr_sig; @@ -430,8 +428,6 @@ bool rpcn_client::manage_connection() { rpcn_log.error("Received faulty RPCN UDP message!"); } - - rpcn_msgs.pop(); } // Send a packet every 5 seconds and then every 500 ms until reply is received @@ -440,7 +436,10 @@ bool rpcn_client::manage_connection() std::vector ping(9); ping[0] = 1; *reinterpret_cast*>(&ping[1]) = user_id; - send_packet_from_p2p_port(ping, addr_rpcn_udp); + if (send_packet_from_p2p_port(ping, addr_rpcn_udp) == -1) + { + rpcn_log.error("Failed to send ping to rpcn!"); + } last_ping_time = now; } } @@ -1055,6 +1054,30 @@ bool rpcn_client::send_room_message(u32 req_id, const SceNpMatching2SendRoomMess return true; } +bool rpcn_client::req_sign_infos(u32 req_id, const std::string& npid) +{ + std::vector data{}; + std::copy(npid.begin(), npid.end(), std::back_inserter(data)); + data.push_back(0); + + if (!forge_send(CommandType::RequestSignalingInfos, req_id, data)) + return false; + + return true; +} + +bool rpcn_client::req_ticket(u32 req_id, const std::string& service_id) +{ + std::vector data{}; + std::copy(service_id.begin(), service_id.end(), std::back_inserter(data)); + data.push_back(0); + + if (!forge_send(CommandType::RequestTicket, req_id, data)) + return false; + + return true; +} + std::vector rpcn_client::forge_request(u16 command, u32 packet_id, const std::vector& data) const { u16 packet_size = data.size() + RPCN_HEADER_SIZE; diff --git a/rpcs3/Emu/NP/rpcn_client.h b/rpcs3/Emu/NP/rpcn_client.h index 836bd7e334..4bc972b5ef 100644 --- a/rpcs3/Emu/NP/rpcn_client.h +++ b/rpcs3/Emu/NP/rpcn_client.h @@ -120,6 +120,8 @@ enum CommandType : u16 SetRoomDataInternal, PingRoomOwner, SendRoomMessage, + RequestSignalingInfos, + RequestTicket, }; class rpcn_client @@ -173,6 +175,8 @@ public: bool set_roomdata_internal(u32 req_id, const SceNpMatching2SetRoomDataInternalRequest* req); bool ping_room_owner(u32 req_id, u64 room_id); bool send_room_message(u32 req_id, const SceNpMatching2SendRoomMessageRequest* req); + bool req_sign_infos(u32 req_id, const std::string& npid); + bool req_ticket(u32 req_id, const std::string& service_id); const std::string& get_online_name() const { diff --git a/rpcs3/Emu/NP/signaling_handler.cpp b/rpcs3/Emu/NP/signaling_handler.cpp new file mode 100644 index 0000000000..d06c088ed0 --- /dev/null +++ b/rpcs3/Emu/NP/signaling_handler.cpp @@ -0,0 +1,630 @@ +#include "stdafx.h" +#include "Emu/Cell/PPUModule.h" +#include "signaling_handler.h" +#include "Emu/IdManager.h" +#include "Emu/System.h" +#include "Emu/Cell/Modules/cellSysutil.h" +#include "np_handler.h" +#include + +#ifdef _WIN32 +#include +#else +#include +#include +#endif + +LOG_CHANNEL(sign_log, "Signaling"); + +std::vector, std::vector>> get_sign_msgs(); +s32 send_packet_from_p2p_port(const std::vector& data, const sockaddr_in& addr); + +template <> +void fmt_class_string::format(std::string& out, u64 arg) +{ + format_enum(out, arg, [](auto value) { + switch (value) + { + case signal_ping: return "PING"; + case signal_pong: return "PONG"; + case signal_connect: return "CONNECT"; + case signal_connect_ack: return "CONNECT_ACK"; + case signal_confirm: return "CONFIRM"; + case signal_finished: return "FINISHED"; + case signal_finished_ack: return "FINISHED_ACK"; + } + + return unknown; + }); +} + +///////////////////////////// +//// SIGNALING CALLBACKS //// +///////////////////////////// + +void signaling_handler::set_sig_cb(u32 sig_cb_ctx, vm::ptr sig_cb, vm::ptr sig_cb_arg) +{ + std::lock_guard lock(data_mutex); + this->sig_cb_ctx = sig_cb_ctx; + this->sig_cb = sig_cb; + this->sig_cb_arg = sig_cb_arg; +} + +void signaling_handler::set_ext_sig_cb(u32 sig_cb_ctx, vm::ptr sig_ext_cb, vm::ptr sig_ext_cb_arg) +{ + std::lock_guard lock(data_mutex); + this->sig_ext_cb_ctx = sig_ext_cb_ctx; + this->sig_ext_cb = sig_ext_cb; + this->sig_ext_cb_arg = sig_ext_cb_arg; +} + +void signaling_handler::set_sig2_cb(u16 sig2_cb_ctx, vm::ptr sig2_cb, vm::ptr sig2_cb_arg) +{ + std::lock_guard lock(data_mutex); + this->sig2_cb_ctx = sig2_cb_ctx; + this->sig2_cb = sig2_cb; + this->sig2_cb_arg = sig2_cb_arg; +} + +void signaling_handler::signal_sig_callback(u32 conn_id, int event) +{ + if (sig_cb) + { + sysutil_register_cb([sig_cb = this->sig_cb, sig_cb_ctx = this->sig_cb_ctx, conn_id, event, sig_cb_arg = this->sig_cb_arg](ppu_thread& cb_ppu) -> s32 { + sig_cb(cb_ppu, sig_cb_ctx, conn_id, event, 0, sig_cb_arg); + return 0; + }); + sign_log.notice("Called sig CB: 0x%x (conn_id: %d)", event, conn_id); + } + + // extended callback also receives normal events + signal_ext_sig_callback(conn_id, event); +} + +void signaling_handler::signal_ext_sig_callback(u32 conn_id, int event) +{ + if (sig_ext_cb) + { + sysutil_register_cb([sig_ext_cb = this->sig_ext_cb, sig_ext_cb_ctx = this->sig_ext_cb_ctx, conn_id, event, sig_ext_cb_arg = this->sig_ext_cb_arg](ppu_thread& cb_ppu) -> s32 { + sig_ext_cb(cb_ppu, sig_ext_cb_ctx, conn_id, event, 0, sig_ext_cb_arg); + return 0; + }); + sign_log.notice("Called EXT sig CB: 0x%x (conn_id: %d, member_id: %d)", event, conn_id); + } +} + +void signaling_handler::signal_sig2_callback(u64 room_id, u16 member_id, SceNpMatching2Event event) +{ + // Signal the callback + if (sig2_cb) + { + sysutil_register_cb([sig2_cb = this->sig2_cb, sig2_cb_ctx = this->sig2_cb_ctx, room_id, member_id, event, sig2_cb_arg = this->sig2_cb_arg](ppu_thread& cb_ppu) -> s32 { + sig2_cb(cb_ppu, sig2_cb_ctx, room_id, member_id, event, 0, sig2_cb_arg); + return 0; + }); + } + + sign_log.notice("Called sig2 CB: 0x%x (room_id: %d, member_id: %d)", event, room_id, member_id); +} + +/////////////////////////////////// +//// SIGNALING MSGS PROCESSING //// +/////////////////////////////////// + +void signaling_handler::reschedule_packet(std::shared_ptr& si, SignalingCommand cmd, std::chrono::time_point new_timepoint) +{ + for (auto it = qpackets.begin(); it != qpackets.end(); it++) + { + if (it->second.packet.command == cmd && it->second.sig_info == si) + { + auto new_queue = qpackets.extract(it); + new_queue.key() = new_timepoint; + qpackets.insert(std::move(new_queue)); + return; + } + } +} + +void signaling_handler::retire_packet(std::shared_ptr& si, SignalingCommand cmd) +{ + for (auto it = qpackets.begin(); it != qpackets.end(); it++) + { + if (it->second.packet.command == cmd && it->second.sig_info == si) + { + qpackets.erase(it); + return; + } + } +} + +void signaling_handler::retire_all_packets(std::shared_ptr& si) +{ + for (auto it = qpackets.begin(); it != qpackets.end();) + { + if (it->second.sig_info == si) + it = qpackets.erase(it); + else + it++; + } +} + +bool signaling_handler::validate_signaling_packet(const signaling_packet* sp) +{ + if (sp->signature != SIGNALING_SIGNATURE) + { + sign_log.error("Received a signaling packet with an invalid signature"); + return false; + } + + if (sp->version != 1u && sp->version != 2u) + { + sign_log.error("Invalid version in signaling packet"); + return false; + } + + return true; +} + +void signaling_handler::process_incoming_messages() +{ + auto msgs = get_sign_msgs(); + + for (const auto& msg : msgs) + { + if (msg.second.size() != sizeof(signaling_packet)) + { + sign_log.error("Received an invalid signaling packet"); + continue; + } + + auto op_addr = msg.first.first; + auto op_port = msg.first.second; + auto* sp = reinterpret_cast(msg.second.data()); + + if (!validate_signaling_packet(sp)) + continue; + + if (sign_log.enabled == logs::level::trace) + { + in_addr addr; + addr.s_addr = op_addr; + + if (sp->version == 1u) + { + char npid_buf[17]{}; + memcpy(npid_buf, sp->V1.npid.handle.data, 16); + std::string npid(npid_buf); + sign_log.trace("sig1 %s from %s:%d(%s)", sp->command, inet_ntoa(addr), op_port, npid); + } + else + { + sign_log.trace("sig2 %s from %s:%d(%d:%d)", sp->command, inet_ntoa(addr), op_port, sp->V2.room_id, sp->V2.member_id); + } + } + + bool reply = false, schedule_repeat = false; + auto& sent_packet = sp->version == 1u ? sig1_packet : sig2_packet; + + // Get signaling info for user to know if we should even bother looking further + auto si = get_signaling_ptr(sp); + + if (!si && sp->version == 1u && sp->command == signal_connect) + { + // Connection can be remotely established and not mutual + const u32 conn_id = create_sig_infos(&sp->V1.npid); + si = sig1_peers.at(conn_id); + // Activate the connection without triggering the main CB + si->connStatus = SCE_NP_SIGNALING_CONN_STATUS_ACTIVE; + si->addr = op_addr; + si->port = op_port; + si->ext_status = ext_sign_peer; + // Notify extended callback that peer activated + signal_ext_sig_callback(conn_id, SCE_NP_SIGNALING_EVENT_EXT_PEER_ACTIVATED); + } + + if (!si || (si->connStatus == SCE_NP_SIGNALING_CONN_STATUS_INACTIVE && sp->command != signal_finished)) + { + // User is unknown to us or the connection is inactive + // Ignore packet unless it's a finished packet in case the finished_ack wasn't received by opponent + continue; + } + + const auto now = std::chrono::system_clock::now(); + if (si) + si->time_last_msg_recvd = now; + + const auto setup_ping = [&]() + { + for (auto it = qpackets.begin(); it != qpackets.end(); it++) + { + if (it->second.packet.command == signal_ping && it->second.sig_info == si) + { + return; + } + } + + sent_packet.command = signal_ping; + queue_signaling_packet(sent_packet, si, now + REPEAT_PING_DELAY); + }; + + switch (sp->command) + { + case signal_ping: + reply = true; + schedule_repeat = false; + sent_packet.command = signal_pong; + break; + case signal_pong: + reply = false; + schedule_repeat = false; + reschedule_packet(si, signal_ping, now + std::chrono::seconds(15)); + break; + case signal_connect: + reply = true; + schedule_repeat = true; + sent_packet.command = signal_connect_ack; + // connection is established + // TODO: notify extended callback! + break; + case signal_connect_ack: + reply = true; + schedule_repeat = false; + setup_ping(); + sent_packet.command = signal_confirm; + retire_packet(si, signal_connect); + // connection is active + update_si_addr(si, op_addr, op_port); + update_si_status(si, SCE_NP_SIGNALING_CONN_STATUS_ACTIVE); + break; + case signal_confirm: + reply = false; + schedule_repeat = false; + setup_ping(); + retire_packet(si, signal_connect_ack); + // connection is active + update_si_addr(si, op_addr, op_port); + update_si_status(si, SCE_NP_SIGNALING_CONN_STATUS_ACTIVE, true); + break; + case signal_finished: + reply = true; + schedule_repeat = false; + sent_packet.command = signal_finished_ack; + // terminate connection + update_si_status(si, SCE_NP_SIGNALING_CONN_STATUS_INACTIVE); + break; + case signal_finished_ack: + reply = false; + schedule_repeat = false; + retire_packet(si, signal_finished); + break; + default: sign_log.error("Invalid signaling command received"); continue; + } + + if (reply) + { + send_signaling_packet(sent_packet, op_addr, op_port); + if (schedule_repeat) + queue_signaling_packet(sent_packet, si, now + REPEAT_CONNECT_DELAY); + } + } +} + +void signaling_handler::operator()() +{ + while (thread_ctrl::state() != thread_state::aborting) + { + std::unique_lock lock(data_mutex); + if (qpackets.size()) + wakey.wait_until(lock, qpackets.begin()->first); + else + wakey.wait(lock); + + if (thread_ctrl::state() == thread_state::aborting) + return; + + process_incoming_messages(); + + const auto now = std::chrono::system_clock::now(); + + for (auto it = qpackets.begin(); it != qpackets.end();) + { + if (it->first > now) + break; + + if (it->second.sig_info->time_last_msg_recvd < now - std::chrono::seconds(60)) + { + // We had no connection to opponent for 60 seconds, consider the connection dead + sign_log.trace("Timeout disconnection"); + update_si_status(it->second.sig_info, SCE_NP_SIGNALING_CONN_STATUS_INACTIVE); + break; // qpackets will be emptied of all packets from this user so we're requeuing + } + + // Resend the packet + send_signaling_packet(it->second.packet, it->second.sig_info->addr, it->second.sig_info->port); + + // Reschedule another packet + SignalingCommand cmd = it->second.packet.command; + auto& si = it->second.sig_info; + + std::chrono::milliseconds delay(500); + switch (cmd) + { + case signal_ping: + case signal_pong: + delay = REPEAT_PING_DELAY; + break; + case signal_connect: + case signal_connect_ack: + case signal_confirm: + delay = REPEAT_CONNECT_DELAY; + break; + case signal_finished: + case signal_finished_ack: + delay = REPEAT_FINISHED_DELAY; + break; + } + + it++; + reschedule_packet(si, cmd, now + delay); + } + } +} + +void signaling_handler::wake_up() +{ + wakey.notify_one(); +} + +void signaling_handler::update_si_addr(std::shared_ptr& si, u32 new_addr, u16 new_port) +{ + ASSERT(si); + + if (si->addr != new_addr || si->port != new_port) + { + in_addr addr_old, addr_new; + addr_old.s_addr = si->addr; + addr_new.s_addr = new_addr; + + sign_log.trace("Updated Address from %s:%d to %s:%d", inet_ntoa(addr_old), si->port, inet_ntoa(addr_new), new_port); + si->addr = new_addr; + si->port = new_port; + } +} + +void signaling_handler::update_si_status(std::shared_ptr& si, s32 new_status, bool confirm_packet) +{ + if (!si) + return; + + if (si->connStatus == SCE_NP_SIGNALING_CONN_STATUS_PENDING && new_status == SCE_NP_SIGNALING_CONN_STATUS_ACTIVE) + { + si->connStatus = SCE_NP_SIGNALING_CONN_STATUS_ACTIVE; + ASSERT(si->version == 1u || si->version == 2u); + if (si->version == 1u) + signal_sig_callback(si->conn_id, SCE_NP_SIGNALING_EVENT_ESTABLISHED); + else + signal_sig2_callback(si->room_id, si->member_id, SCE_NP_MATCHING2_SIGNALING_EVENT_Established); + + return; + } + + if ((si->connStatus == SCE_NP_SIGNALING_CONN_STATUS_PENDING || si->connStatus == SCE_NP_SIGNALING_CONN_STATUS_ACTIVE) && new_status == SCE_NP_SIGNALING_CONN_STATUS_INACTIVE) + { + si->connStatus = SCE_NP_SIGNALING_CONN_STATUS_INACTIVE; + ASSERT(si->version == 1u || si->version == 2u); + if (si->version == 1u) + signal_sig_callback(si->conn_id, SCE_NP_SIGNALING_EVENT_DEAD); + else + signal_sig2_callback(si->room_id, si->member_id, SCE_NP_MATCHING2_SIGNALING_EVENT_Dead); + + retire_all_packets(si); + return; + } + + if (confirm_packet && si->version == 1u && si->ext_status == ext_sign_none) + { + si->ext_status = ext_sign_mutual; + signal_ext_sig_callback(si->conn_id, SCE_NP_SIGNALING_EVENT_EXT_MUTUAL_ACTIVATED); + } +} + +void signaling_handler::set_self_sig_info(SceNpId& npid) +{ + std::lock_guard lock(data_mutex); + sig1_packet.V1.npid = npid; +} + +void signaling_handler::set_self_sig2_info(u64 room_id, u16 member_id) +{ + std::lock_guard lock(data_mutex); + sig2_packet.V2.room_id = room_id; + sig2_packet.V2.member_id = member_id; +} + +void signaling_handler::send_signaling_packet(signaling_packet& sp, u32 addr, u16 port) +{ + std::vector packet(sizeof(signaling_packet) + sizeof(u16)); + reinterpret_cast&>(packet.data()[0]) = 65535; + memcpy(packet.data() + sizeof(u16), &sp, sizeof(signaling_packet)); + + sockaddr_in dest; + memset(&dest, 0, sizeof(sockaddr_in)); + dest.sin_family = AF_INET; + dest.sin_addr.s_addr = addr; + dest.sin_port = std::bit_cast>(port); + + sign_log.trace("Sending %s packet to %s:%d", sp.command, inet_ntoa(dest.sin_addr), port); + + if (send_packet_from_p2p_port(packet, dest) == -1) + { + sign_log.error("Failed to send signaling packet to %s:%d", inet_ntoa(dest.sin_addr), port); + } +} + +void signaling_handler::queue_signaling_packet(signaling_packet& sp, std::shared_ptr si, std::chrono::time_point wakeup_time) +{ + queued_packet qp; + qp.sig_info = si; + qp.packet = sp; + qpackets.emplace(wakeup_time, std::move(qp)); +} + +std::shared_ptr signaling_handler::get_signaling_ptr(const signaling_packet* sp) +{ + // V1 + if (sp->version == 1u) + { + char npid_buf[17]{}; + memcpy(npid_buf, sp->V1.npid.handle.data, 16); + std::string npid(npid_buf); + + if (!npid_to_conn_id.count(npid)) + return nullptr; + + auto conn_id = npid_to_conn_id.at(npid); + if (!sig1_peers.count(conn_id)) + { + sign_log.error("Discrepancy in signaling 1 data"); + return nullptr; + } + + return sig1_peers.at(conn_id); + } + + // V2 + auto room_id = sp->V2.room_id; + auto member_id = sp->V2.member_id; + if (!sig2_peers.count(room_id) || !sig2_peers.at(room_id).count(member_id)) + return nullptr; + + return sig2_peers.at(room_id).at(member_id); +} + +void signaling_handler::start_sig(u32 conn_id, u32 addr, u16 port) +{ + std::lock_guard lock(data_mutex); + + auto& sent_packet = sig1_packet; + sent_packet.command = signal_connect; + + auto si = sig1_peers.at(conn_id); + + si->addr = addr; + si->port = port; + + send_signaling_packet(sent_packet, si->addr, si->port); + queue_signaling_packet(sent_packet, si, std::chrono::system_clock::now() + REPEAT_CONNECT_DELAY); +} + +void signaling_handler::start_sig2(u64 room_id, u16 member_id) +{ + std::lock_guard lock(data_mutex); + + auto& sent_packet = sig2_packet; + sent_packet.command = signal_connect; + + auto si = sig2_peers.at(room_id).at(member_id); + + send_signaling_packet(sent_packet, si->addr, si->port); + queue_signaling_packet(sent_packet, si, std::chrono::system_clock::now() + REPEAT_CONNECT_DELAY); +} + +void signaling_handler::disconnect_sig2_users(u64 room_id) +{ + std::lock_guard lock(data_mutex); + + if (!sig2_peers.count(room_id)) + return; + + auto& sent_packet = sig2_packet; + + sent_packet.command = signal_finished; + + for (const auto& member : sig2_peers.at(room_id)) + { + auto& si = member.second; + if (si->connStatus != SCE_NP_SIGNALING_CONN_STATUS_INACTIVE && !si->self) + { + send_signaling_packet(sent_packet, si->addr, si->port); + queue_signaling_packet(sent_packet, si, std::chrono::system_clock::now() + REPEAT_FINISHED_DELAY); + } + } +} + +u32 signaling_handler::create_sig_infos(const SceNpId* npid) +{ + ASSERT(npid->handle.term == 0); + std::string npid_str(reinterpret_cast(npid->handle.data)); + + if (npid_to_conn_id.count(npid_str)) + { + return npid_to_conn_id.at(npid_str); + } + + const u32 conn_id = cur_conn_id++; + npid_to_conn_id.emplace(npid_str, conn_id); + sig1_peers.emplace(conn_id, std::make_shared()); + sig1_peers.at(conn_id)->version = 1; + sig1_peers.at(conn_id)->conn_id = conn_id; + + return conn_id; +} + +u32 signaling_handler::init_sig_infos(const SceNpId* npid) +{ + std::lock_guard lock(data_mutex); + + const u32 conn_id = create_sig_infos(npid); + + if (sig1_peers[conn_id]->connStatus == SCE_NP_SIGNALING_CONN_STATUS_INACTIVE) + { + sign_log.trace("Creating new sig1 connection and requesting infos from RPCN"); + sig1_peers[conn_id]->connStatus = SCE_NP_SIGNALING_CONN_STATUS_PENDING; + + // Request peer infos from RPCN + std::string npid_str(reinterpret_cast(npid->handle.data)); + const auto nph = g_fxo->get>(); + nph->req_sign_infos(npid_str, conn_id); + } + else + { + // Connection already exists(from passive connection) + if (sig1_peers[conn_id]->connStatus == SCE_NP_SIGNALING_CONN_STATUS_ACTIVE && sig1_peers[conn_id]->ext_status == ext_sign_peer) + { + sign_log.trace("Activating already peer activated connection"); + sig1_peers[conn_id]->ext_status = ext_sign_mutual; + start_sig(conn_id, sig1_peers[conn_id]->addr, sig1_peers[conn_id]->port); + signal_sig_callback(conn_id, SCE_NP_SIGNALING_EVENT_ESTABLISHED); + signal_ext_sig_callback(conn_id, SCE_NP_SIGNALING_EVENT_EXT_MUTUAL_ACTIVATED); + } + } + + return conn_id; +} + +signaling_info signaling_handler::get_sig_infos(u32 conn_id) +{ + return *sig1_peers[conn_id]; +} + +void signaling_handler::set_sig2_infos(u64 room_id, u16 member_id, s32 status, u32 addr, u16 port, bool self) +{ + std::lock_guard lock(data_mutex); + if (!sig2_peers[room_id][member_id]) + sig2_peers[room_id][member_id] = std::make_shared(); + + auto& peer = sig2_peers[room_id][member_id]; + peer->connStatus = status; + peer->addr = addr; + peer->port = port; + peer->self = self; + peer->version = 2; + peer->room_id = room_id; + peer->member_id = member_id; +} + +signaling_info signaling_handler::get_sig2_infos(u64 room_id, u16 member_id) +{ + std::lock_guard lock(data_mutex); + return *sig2_peers[room_id][member_id]; +} diff --git a/rpcs3/Emu/NP/signaling_handler.h b/rpcs3/Emu/NP/signaling_handler.h new file mode 100644 index 0000000000..0042e2fd38 --- /dev/null +++ b/rpcs3/Emu/NP/signaling_handler.h @@ -0,0 +1,151 @@ +#pragma once +#include "Utilities/BEType.h" +#include "Emu/Memory/vm.h" +#include "Emu/Memory/vm_ptr.h" +#include "Emu/Cell/Modules/sceNp.h" +#include "Emu/Cell/Modules/sceNp2.h" +#include "Utilities/Thread.h" +#include +#include +#include + +enum ext_signaling_status : u8 +{ + ext_sign_none = 0, + ext_sign_peer = 1, + ext_sign_mutual = 2, +}; + +struct signaling_info +{ + s32 connStatus = SCE_NP_SIGNALING_CONN_STATUS_INACTIVE; + u32 addr = 0; + u16 port = 0; + + // For handler + std::chrono::time_point time_last_msg_recvd = std::chrono::system_clock::now(); + bool self = false; + u32 version = 0; + // Signaling + u32 conn_id = 0; + ext_signaling_status ext_status = ext_sign_none; + // Matching2 + u64 room_id = 0; + u16 member_id = 0; +}; + +enum SignalingCommand : u32 +{ + signal_ping, + signal_pong, + signal_connect, + signal_connect_ack, + signal_confirm, + signal_finished, + signal_finished_ack, +}; + +class signaling_handler : public need_wakeup +{ +public: + void operator()(); + void wake_up(); + + void set_self_sig_info(SceNpId& npid); + void set_self_sig2_info(u64 room_id, u16 member_id); + + u32 init_sig_infos(const SceNpId* npid); + signaling_info get_sig_infos(u32 conn_id); + + void set_sig2_infos(u64 room_id, u16 member_id, s32 status, u32 addr, u16 port, bool self = false); + signaling_info get_sig2_infos(u64 room_id, u16 member_id); + + void set_sig_cb(u32 sig_cb_ctx, vm::ptr sig_cb, vm::ptr sig_cb_arg); + void set_ext_sig_cb(u32 sig_cb_ctx, vm::ptr sig_ext_cb, vm::ptr sig_ext_cb_arg); + void set_sig2_cb(u16 sig2_cb_ctx, vm::ptr sig2_cb, vm::ptr sig2_cb_arg); + + void start_sig(u32 conn_id, u32 addr, u16 port); + + void start_sig2(u64 room_id, u16 member_id); + void disconnect_sig2_users(u64 room_id); + +public: + static constexpr auto thread_name = "Signaling Manager Thread"sv; + +private: + static constexpr auto REPEAT_CONNECT_DELAY = std::chrono::milliseconds(200); + static constexpr auto REPEAT_PING_DELAY = std::chrono::milliseconds(500); + static constexpr auto REPEAT_FINISHED_DELAY = std::chrono::milliseconds(500); + static constexpr be_t SIGNALING_SIGNATURE = (static_cast('S') << 24 | static_cast('I') << 16 | static_cast('G') << 8 | static_cast('N')); + + struct signaling_packet + { + be_t signature = SIGNALING_SIGNATURE; + le_t version; + le_t command; + union { + struct + { + SceNpId npid; + } V1; + struct + { + le_t room_id; + le_t member_id; + } V2; + }; + }; + + struct queued_packet + { + signaling_packet packet; + std::shared_ptr sig_info; + }; + +private: + u32 sig_cb_ctx = 0; + vm::ptr sig_cb{}; + vm::ptr sig_cb_arg{}; + + u32 sig_ext_cb_ctx = 0; + vm::ptr sig_ext_cb{}; + vm::ptr sig_ext_cb_arg{}; + + u16 sig2_cb_ctx = 0; + vm::ptr sig2_cb{}; + vm::ptr sig2_cb_arg{}; + +private: + u32 create_sig_infos(const SceNpId* npid); + void update_si_addr(std::shared_ptr& si, u32 new_addr, u16 new_port); + void update_si_status(std::shared_ptr& si, s32 new_status, bool confirm_packet = false); + void signal_sig_callback(u32 conn_id, int event); + void signal_ext_sig_callback(u32 conn_id, int event); + void signal_sig2_callback(u64 room_id, u16 member_id, SceNpMatching2Event event); + +private: + bool validate_signaling_packet(const signaling_packet* sp); + void reschedule_packet(std::shared_ptr& si, SignalingCommand cmd, std::chrono::time_point new_timepoint); + void retire_packet(std::shared_ptr& si, SignalingCommand cmd); + void retire_all_packets(std::shared_ptr& si); + +private: + std::mutex data_mutex; + std::condition_variable wakey; + + signaling_packet sig1_packet{.version = 1u}; + signaling_packet sig2_packet{.version = 2u}; + + std::map, queued_packet> qpackets; // (wakeup time, packet) + + u32 cur_conn_id = 1; + std::unordered_map npid_to_conn_id; // (npid, conn_id) + std::unordered_map> sig1_peers; // (conn_id, sig_info) + std::unordered_map>> sig2_peers; // (room (member_id, sig_info)) + +private: + void process_incoming_messages(); + std::shared_ptr get_signaling_ptr(const signaling_packet* sp); + void send_signaling_packet(signaling_packet& sp, u32 addr, u16 port); + void queue_signaling_packet(signaling_packet& sp, std::shared_ptr si, std::chrono::time_point wakeup_time); +}; diff --git a/rpcs3/emucore.vcxproj b/rpcs3/emucore.vcxproj index b1889a0990..1ad84bd04d 100644 --- a/rpcs3/emucore.vcxproj +++ b/rpcs3/emucore.vcxproj @@ -110,6 +110,7 @@ + @@ -489,6 +490,7 @@ + diff --git a/rpcs3/emucore.vcxproj.filters b/rpcs3/emucore.vcxproj.filters index fbf0e91898..2013b953f1 100644 --- a/rpcs3/emucore.vcxproj.filters +++ b/rpcs3/emucore.vcxproj.filters @@ -932,6 +932,9 @@ Emu\NP + + Emu\NP + Emu\GPU\RSX\Overlays @@ -1798,6 +1801,9 @@ Emu\NP + + Emu\NP + Emu\GPU\RSX\Overlays diff --git a/rpcs3/rpcs3qt/rpcn_settings_dialog.cpp b/rpcs3/rpcs3qt/rpcn_settings_dialog.cpp index 4b1aac25aa..959b1e0e96 100644 --- a/rpcs3/rpcs3qt/rpcn_settings_dialog.cpp +++ b/rpcs3/rpcs3qt/rpcn_settings_dialog.cpp @@ -37,6 +37,7 @@ rpcn_settings_dialog::rpcn_settings_dialog(QWidget* parent) QPushButton* btn_chg_pass = new QPushButton(tr("Set Password")); QLabel *label_token = new QLabel(tr("Token:")); m_edit_token = new QLineEdit(); + m_edit_token->setMaxLength(16); QPushButton* btn_create = new QPushButton(tr("Create Account"), this); QPushButton* btn_save = new QPushButton(tr("Save"), this); @@ -154,6 +155,12 @@ bool rpcn_settings_dialog::save_config() return false; } + if (!token.empty() && token.size() != 16) + { + QMessageBox::critical(this, tr("Invalid token"), tr("The token you have received should be 16 characters long."), QMessageBox::Ok); + return false; + } + g_cfg_rpcn.set_host(host); g_cfg_rpcn.set_npid(npid); g_cfg_rpcn.set_token(token);