Run synchronization even when stalled

Previously, we could be stalled by one player but still reading data
from another, which would wedge the client because we would never act
upon the newly-read data. Now we act upon data even if we're stalled.
Fixes bugs in initial connection with high latency.
This commit is contained in:
Gregor Richards 2016-12-17 16:50:06 -05:00
parent d6d96704fd
commit 04266cf4f7
4 changed files with 118 additions and 44 deletions

View File

@ -521,6 +521,8 @@ static void netplay_frontend_paused(netplay_t *netplay, bool paused)
**/
bool netplay_pre_frame(netplay_t *netplay)
{
bool sync_stalled;
retro_assert(netplay);
/* FIXME: This is an ugly way to learn we're not paused anymore */
@ -556,11 +558,18 @@ bool netplay_pre_frame(netplay_t *netplay)
}
}
if (!netplay_sync_pre_frame(netplay))
return false;
sync_stalled = !netplay_sync_pre_frame(netplay);
return (!netplay->connected_players ||
(!netplay->stall && !netplay->remote_paused));
if (sync_stalled ||
(netplay->connected_players &&
(netplay->stall || netplay->remote_paused)))
{
/* We may have received data even if we're stalled, so run post-frame
* sync */
netplay_sync_post_frame(netplay, true);
return false;
}
return true;
}
/**
@ -576,7 +585,7 @@ void netplay_post_frame(netplay_t *netplay)
size_t i;
retro_assert(netplay);
netplay_update_unread_ptr(netplay);
netplay_sync_post_frame(netplay);
netplay_sync_post_frame(netplay, false);
for (i = 0; i < netplay->connections_size; i++)
{

View File

@ -26,6 +26,33 @@
#include "../../runloop.h"
#if 1
#define DEBUG_NETPLAY_STEPS 1
static void print_state(netplay_t *netplay)
{
char msg[512];
size_t cur = 0;
uint32_t player;
#define APPEND(out) cur += snprintf out
#define M msg + cur, sizeof(msg) - cur
APPEND((M, "NETPLAY: S:%u U:%u O:%u", netplay->self_frame_count, netplay->unread_frame_count, netplay->other_frame_count));
if (!netplay->is_server)
APPEND((M, " H:%u", netplay->server_frame_count));
for (player = 0; player < MAX_USERS; player++)
{
if ((netplay->connected_players & (1<<player)))
APPEND((M, " %u:%u", player, netplay->read_frame_count[player]));
}
APPEND((M, "\n"));
msg[sizeof(msg)-1] = '\0';
RARCH_LOG("%s\n", msg);
}
#endif
/**
* remote_unpaused
*
@ -362,8 +389,6 @@ static bool netplay_get_cmd(netplay_t *netplay,
if (connection->mode < NETPLAY_CONNECTION_CONNECTED)
return netplay_handshake(netplay, connection, had_input);
/* FIXME: This depends on delta_frame_ready */
#define RECV(buf, sz) \
recvd = netplay_recv(&connection->recv_packet_buffer, connection->fd, (buf), \
(sz), false); \
@ -452,7 +477,7 @@ static bool netplay_get_cmd(netplay_t *netplay,
if (!netplay_delta_frame_ready(netplay, dframe, netplay->read_frame_count[player]))
{
/* FIXME: Catastrophe! */
RARCH_ERR("Netplay input without a ready delta frame!\n");
RARCH_ERR("Netplay input from %u without a ready delta frame!\n", player);
return netplay_cmd_nak(netplay, connection);
}
memcpy(dframe->real_input_state[player], buffer + 2,
@ -475,6 +500,11 @@ static bool netplay_get_cmd(netplay_t *netplay,
netplay->server_ptr = netplay->read_ptr[player];
netplay->server_frame_count = netplay->read_frame_count[player];
}
#ifdef DEBUG_NETPLAY_STEPS
RARCH_LOG("Received input from %u\n", player);
print_state(netplay);
#endif
break;
}
@ -737,14 +767,33 @@ static bool netplay_get_cmd(netplay_t *netplay,
}
else
{
uint32_t frame_count;
/* It wants future frames, make sure we don't capture or send intermediate ones */
START(netplay->self_ptr);
while (dframe->used && dframe->frame < frame)
frame_count = netplay->self_frame_count;
while (true)
{
if (!dframe->used)
{
/* Make sure it's ready */
if (!netplay_delta_frame_ready(netplay, dframe, frame_count))
{
RARCH_ERR("Received mode change but delta frame isn't ready!\n");
return netplay_cmd_nak(netplay, connection);
}
}
memset(dframe->self_state, 0, sizeof(dframe->self_state));
memset(dframe->real_input_state[player], 0, sizeof(dframe->self_state));
dframe->have_local = true;
/* Go on to the next delta frame */
NEXT();
frame_count++;
if (frame_count >= frame)
break;
}
}
@ -755,6 +804,11 @@ static bool netplay_get_cmd(netplay_t *netplay,
RARCH_LOG("%s\n", msg);
runloop_msg_queue_push(msg, 1, 180, false);
#ifdef DEBUG_NETPLAY_STEPS
RARCH_LOG("Received mode change self->%u\n", player);
print_state(netplay);
#endif
}
else /* YOU && !PLAYING */
{
@ -770,6 +824,11 @@ static bool netplay_get_cmd(netplay_t *netplay,
RARCH_LOG("%s\n", msg);
runloop_msg_queue_push(msg, 1, 180, false);
#ifdef DEBUG_NETPLAY_STEPS
RARCH_LOG("Received mode change %u self->spectating\n", netplay->self_player);
print_state(netplay);
#endif
}
}
@ -795,6 +854,11 @@ static bool netplay_get_cmd(netplay_t *netplay,
RARCH_LOG("%s\n", msg);
runloop_msg_queue_push(msg, 1, 180, false);
#ifdef DEBUG_NETPLAY_STEPS
RARCH_LOG("Received mode change spectator->%u\n", player);
print_state(netplay);
#endif
}
else
{
@ -805,6 +869,12 @@ static bool netplay_get_cmd(netplay_t *netplay,
snprintf(msg, sizeof(msg)-1, "Player %d has left", player+1);
RARCH_LOG("%s\n", msg);
runloop_msg_queue_push(msg, 1, 180, false);
#ifdef DEBUG_NETPLAY_STEPS
RARCH_LOG("Received mode change %u->spectator\n", player);
print_state(netplay);
#endif
}
}
@ -1003,6 +1073,12 @@ static bool netplay_get_cmd(netplay_t *netplay,
return netplay_cmd_nak(netplay, connection);
}
if (!netplay_delta_frame_ready(netplay, &netplay->buffer[netplay->read_ptr[connection->player]], frame))
{
RARCH_ERR("CMD_LOAD_SAVESTATE with unready delta frame.\n");
return netplay_cmd_nak(netplay, connection);
}
RECV(&isize, sizeof(isize))
{
RARCH_ERR("CMD_LOAD_SAVESTATE failed to receive inflated size.\n");
@ -1059,6 +1135,12 @@ static bool netplay_get_cmd(netplay_t *netplay,
netplay->savestate_request_outstanding = false;
netplay->other_ptr = netplay->read_ptr[connection->player];
netplay->other_frame_count = frame;
#ifdef DEBUG_NETPLAY_STEPS
RARCH_LOG("Loading state at %u\n", frame);
print_state(netplay);
#endif
break;
}
@ -1154,21 +1236,15 @@ int netplay_poll_net_input(netplay_t *netplay, bool block)
/* Make sure we're actually ready for data */
if (netplay->self_mode >= NETPLAY_CONNECTION_CONNECTED)
{
netplay_update_unread_ptr(netplay);
if (!netplay_delta_frame_ready(netplay,
&netplay->buffer[netplay->unread_ptr], netplay->unread_frame_count))
{
fprintf(stderr, "CATASTROPHE: Cannot load %u (%lu) while at %u (%lu)\n", netplay->self_frame_count, netplay->self_ptr, netplay->unread_frame_count, netplay->unread_ptr);
break;
}
if (!netplay->is_server &&
!netplay_delta_frame_ready(netplay,
&netplay->buffer[netplay->server_ptr],
netplay->server_frame_count))
{
fprintf(stderr, "CATASTROPHE DEUX\n");
break;
}
netplay_update_unread_ptr(netplay);
if (!netplay_delta_frame_ready(netplay,
&netplay->buffer[netplay->unread_ptr], netplay->unread_frame_count))
{
fprintf(stderr, "CATASTROPHE: Cannot load %u (%lu=%u) while at %u (%lu)\n",
netplay->unread_frame_count, netplay->unread_ptr, netplay->buffer[netplay->unread_ptr].frame,
netplay->self_frame_count, netplay->self_ptr);
break;
}
}
/* Read input from each connection */

View File

@ -777,10 +777,11 @@ bool netplay_sync_pre_frame(netplay_t *netplay);
/**
* netplay_sync_post_frame
* @netplay : pointer to netplay object
* @stalled : true if we're currently stalled
*
* Post-frame for Netplay synchronization.
* We check if we have new input and replay from recorded input.
*/
void netplay_sync_post_frame(netplay_t *netplay);
void netplay_sync_post_frame(netplay_t *netplay, bool stalled);
#endif

View File

@ -342,10 +342,14 @@ process:
* Post-frame for Netplay synchronization.
* We check if we have new input and replay from recorded input.
*/
void netplay_sync_post_frame(netplay_t *netplay)
void netplay_sync_post_frame(netplay_t *netplay, bool stalled)
{
netplay->self_ptr = NEXT_PTR(netplay->self_ptr);
netplay->self_frame_count++;
/* Unless we're stalling, we've just finished running a frame */
if (!stalled)
{
netplay->self_ptr = NEXT_PTR(netplay->self_ptr);
netplay->self_frame_count++;
}
/* Only relevant if we're connected */
if ((netplay->is_server && !netplay->connected_players) ||
@ -505,20 +509,4 @@ void netplay_sync_post_frame(netplay_t *netplay)
driver_ctl(RARCH_DRIVER_CTL_SET_NONBLOCK_STATE, NULL);
}
}
/* If we're supposed to stall, rewind (we shouldn't get this far if we're
* stalled, so this is a last resort) */
if (netplay->stall)
{
retro_ctx_serialize_info_t serial_info;
netplay->self_ptr = PREV_PTR(netplay->self_ptr);
netplay->self_frame_count--;
serial_info.data = NULL;
serial_info.data_const = netplay->buffer[netplay->self_ptr].state;
serial_info.size = netplay->state_size;
core_unserialize(&serial_info);
}
}