Multitudinous fixes and updates to Netplay. Had to be one commit since

they're mostly related:

(1) Renamed frame_count to self_frame_count to be consistent with all
other names.

(2) Previously, it was possible to overwrite data in the ring buffer
that hadn't yet been used. Now that's not possible, but that just
changes one breakage for another: It's now possible to miss the NEW
data. The final resolution for this will probably be requesting stalls.
This is accomplished simply by storing frame numbers in the ring buffer
and checking them against the 'other' head.

(3) In TCP packets, separated cmd_size from cmd. It was beyond pointless
for these to be combined, and restricted cmd_size to 16 bits, which
will probably fail when/if state loading is supported.

(4) Readahead is now allowed. In the past, if the peer got ahead of us,
we would simply ignore their data. Thus, if they got too far ahead of
us, we'd stop reading their data altogether. Fabulous. Now, we're happy
to read future input.

(5) If the peer gets too far ahead of us (currently an unconfigurable 10
frames), fast forward to catch up. This should prevent desync due to
clock drift or stutter.

(6) Used frame_count in a few places where ptr was used. Doing a
comparison of pointers on a ring buffer is a far more dangerous way to
assure we're done with a task than simply using the count, since the
ring buffer is... well, a ring.

(7) Renamed tmp_{ptr,frame_count} to replay_{ptr,frame_count} for
clarity.

(8) Slightly changed the protocol version hash, just to assure that
other clients wouldn't think they were compatible with this one.

(9) There was an off-by-one error which, under some circumstances, could
allow the replay engine to run a complete round through the ring buffer,
replaying stale data. Fixed.
This commit is contained in:
Gregor Richards 2016-09-11 22:01:47 -04:00
parent 9022bf75ad
commit 69b7dc0d08
4 changed files with 114 additions and 31 deletions

View File

@ -65,7 +65,7 @@ static void warn_hangup(void)
bool check_netplay_synched(netplay_t* netplay)
{
retro_assert(netplay);
return netplay->frame_count < (netplay->flip_frame + 2 * UDP_FRAME_PACKETS);
return netplay->self_frame_count < (netplay->flip_frame + 2 * UDP_FRAME_PACKETS);
}
static bool netplay_info_cb(netplay_t* netplay, unsigned frames) {
@ -142,7 +142,9 @@ static bool get_self_input_state(netplay_t *netplay)
uint32_t state[UDP_WORDS_PER_FRAME - 1] = {0};
struct delta_frame *ptr = &netplay->buffer[netplay->self_ptr];
if (!input_driver_is_libretro_input_blocked() && netplay->frame_count > 0)
if (!netplay_delta_frame_ready(netplay, ptr, netplay->self_frame_count)) return false;
if (!input_driver_is_libretro_input_blocked() && netplay->self_frame_count > 0)
{
unsigned i;
settings_t *settings = config_get_ptr();
@ -185,7 +187,7 @@ static bool get_self_input_state(netplay_t *netplay)
*/
memmove(netplay->packet_buffer, netplay->packet_buffer + UDP_WORDS_PER_FRAME,
sizeof (netplay->packet_buffer) - UDP_WORDS_PER_FRAME * sizeof(uint32_t));
netplay->packet_buffer[(UDP_FRAME_PACKETS - 1) * UDP_WORDS_PER_FRAME] = htonl(netplay->frame_count);
netplay->packet_buffer[(UDP_FRAME_PACKETS - 1) * UDP_WORDS_PER_FRAME] = htonl(netplay->self_frame_count);
netplay->packet_buffer[(UDP_FRAME_PACKETS - 1) * UDP_WORDS_PER_FRAME + 1] = htonl(state[0]);
netplay->packet_buffer[(UDP_FRAME_PACKETS - 1) * UDP_WORDS_PER_FRAME + 2] = htonl(state[1]);
netplay->packet_buffer[(UDP_FRAME_PACKETS - 1) * UDP_WORDS_PER_FRAME + 3] = htonl(state[2]);
@ -227,15 +229,17 @@ static bool netplay_get_cmd(netplay_t *netplay)
{
uint32_t cmd;
uint32_t flip_frame;
size_t cmd_size;
uint32_t cmd_size;
if (!socket_receive_all_blocking(netplay->fd, &cmd, sizeof(cmd)))
return false;
cmd = ntohl(cmd);
cmd_size = cmd & 0xffff;
cmd = cmd >> 16;
if (!socket_receive_all_blocking(netplay->fd, &cmd_size, sizeof(cmd)))
return false;
cmd_size = ntohl(cmd_size);
switch (cmd)
{
@ -371,7 +375,7 @@ static void parse_packet(netplay_t *netplay, uint32_t *buffer, unsigned size)
for (i = 0; i < size * UDP_WORDS_PER_FRAME; i++)
buffer[i] = ntohl(buffer[i]);
for (i = 0; i < size && netplay->read_frame_count <= netplay->frame_count; i++)
for (i = 0; i < size; i++)
{
uint32_t frame = buffer[UDP_WORDS_PER_FRAME * i + 0];
const uint32_t *state = &buffer[UDP_WORDS_PER_FRAME * i + 1];
@ -379,6 +383,14 @@ static void parse_packet(netplay_t *netplay, uint32_t *buffer, unsigned size)
if (frame != netplay->read_frame_count)
continue;
if (!netplay_delta_frame_ready(netplay, &netplay->buffer[netplay->read_ptr], netplay->read_frame_count))
{
netplay->must_fast_forward = true;
continue;
}
/* FIXME: acknowledge when we completely drop data on the floor */
netplay->buffer[netplay->read_ptr].is_simulated = false;
memcpy(netplay->buffer[netplay->read_ptr].real_input_state, state,
sizeof(netplay->buffer[netplay->read_ptr].real_input_state));
@ -427,7 +439,7 @@ static bool netplay_poll(netplay_t *netplay)
/* We skip reading the first frame so the host has a chance to grab
* our host info so we don't block forever :') */
if (netplay->frame_count == 0)
if (netplay->self_frame_count == 0)
{
netplay->buffer[0].used_real = true;
netplay->buffer[0].is_simulated = false;
@ -464,7 +476,7 @@ static bool netplay_poll(netplay_t *netplay)
}
parse_packet(netplay, buffer, UDP_FRAME_PACKETS);
} while ((netplay->read_frame_count <= netplay->frame_count) &&
} while ((netplay->read_frame_count <= netplay->self_frame_count) &&
poll_input(netplay, (netplay->other_ptr == netplay->self_ptr) &&
(first_read == netplay->read_frame_count)) == 1);
}
@ -478,7 +490,7 @@ static bool netplay_poll(netplay_t *netplay)
}
}
if (netplay->read_ptr != netplay->self_ptr)
if (netplay->read_frame_count <= netplay->self_frame_count)
simulate_input(netplay);
else
netplay->buffer[PREV_PTR(netplay->self_ptr)].used_real = true;
@ -533,13 +545,13 @@ static bool netplay_is_alive(netplay_t *netplay)
static bool netplay_flip_port(netplay_t *netplay, bool port)
{
size_t frame = netplay->frame_count;
size_t frame = netplay->self_frame_count;
if (netplay->flip_frame == 0)
return port;
if (netplay->is_replay)
frame = netplay->tmp_frame_count;
frame = netplay->replay_frame_count;
return port ^ netplay->flip ^ (frame < netplay->flip_frame);
}
@ -549,7 +561,7 @@ static int16_t netplay_input_state(netplay_t *netplay,
unsigned idx, unsigned id)
{
size_t ptr = netplay->is_replay ?
netplay->tmp_ptr : PREV_PTR(netplay->self_ptr);
netplay->replay_ptr : PREV_PTR(netplay->self_ptr);
const uint32_t *curr_input_state = netplay->buffer[ptr].self_state;
@ -871,12 +883,17 @@ error:
static bool netplay_send_raw_cmd(netplay_t *netplay, uint32_t cmd,
const void *data, size_t size)
{
cmd = (cmd << 16) | (size & 0xffff);
uint32_t cmd_size;
cmd = htonl(cmd);
cmd_size = htonl(size);
if (!socket_send_all_blocking(netplay->fd, &cmd, sizeof(cmd), false))
return false;
if (!socket_send_all_blocking(netplay->fd, &cmd_size, sizeof(cmd_size), false))
return false;
if (!socket_send_all_blocking(netplay->fd, data, size, false))
return false;
@ -953,7 +970,7 @@ error:
**/
static void netplay_flip_users(netplay_t *netplay)
{
uint32_t flip_frame = netplay->frame_count + 2 * UDP_FRAME_PACKETS;
uint32_t flip_frame = netplay->self_frame_count + 2 * UDP_FRAME_PACKETS;
uint32_t flip_frame_net = htonl(flip_frame);
bool command = netplay_command(
netplay, NETPLAY_CMD_FLIP_PLAYERS,

View File

@ -199,6 +199,8 @@ uint32_t netplay_impl_magic(void)
for (i = 0; i < len; i++)
res ^= ver[i] << ((i & 0xf) + 16);
res ^= NETPLAY_PROTOCOL_VERSION << 24;
return res;
}
@ -342,3 +344,16 @@ bool netplay_is_spectate(netplay_t* netplay)
return false;
return netplay->spectate.enabled;
}
bool netplay_delta_frame_ready(netplay_t *netplay, struct delta_frame *delta, uint32_t frame)
{
if (delta->frame == frame) return true;
if (netplay->other_frame_count <= delta->frame)
{
/* We haven't even replayed this frame yet, so we can't overwrite it! */
return false;
}
memset(delta, 0, sizeof(struct delta_frame));
delta->frame = frame;
return true;
}

View File

@ -30,10 +30,13 @@ static void netplay_net_pre_frame(netplay_t *netplay)
{
retro_ctx_serialize_info_t serial_info;
serial_info.data = netplay->buffer[netplay->self_ptr].state;
serial_info.size = netplay->state_size;
if (netplay_delta_frame_ready(netplay, &netplay->buffer[netplay->self_ptr], netplay->self_frame_count))
{
serial_info.data = netplay->buffer[netplay->self_ptr].state;
serial_info.size = netplay->state_size;
core_serialize(&serial_info);
core_serialize(&serial_info);
}
netplay->can_poll = true;
@ -49,7 +52,7 @@ static void netplay_net_pre_frame(netplay_t *netplay)
**/
static void netplay_net_post_frame(netplay_t *netplay)
{
netplay->frame_count++;
netplay->self_frame_count++;
/* Nothing to do... */
if (netplay->other_frame_count == netplay->read_frame_count)
@ -69,24 +72,24 @@ static void netplay_net_post_frame(netplay_t *netplay)
netplay->other_frame_count++;
}
/* Now replay the real input if we've gotten ahead of it */
if (netplay->other_frame_count < netplay->read_frame_count)
{
retro_ctx_serialize_info_t serial_info;
bool first = true;
/* Replay frames. */
netplay->is_replay = true;
netplay->tmp_ptr = netplay->other_ptr;
netplay->tmp_frame_count = netplay->other_frame_count;
netplay->replay_ptr = netplay->other_ptr;
netplay->replay_frame_count = netplay->other_frame_count;
serial_info.data_const = netplay->buffer[netplay->other_ptr].state;
serial_info.size = netplay->state_size;
core_unserialize(&serial_info);
while (first || (netplay->tmp_ptr != netplay->self_ptr))
while (netplay->replay_frame_count < netplay->self_frame_count)
{
serial_info.data = netplay->buffer[netplay->tmp_ptr].state;
serial_info.data = netplay->buffer[netplay->replay_ptr].state;
serial_info.size = netplay->state_size;
serial_info.data_const = NULL;
@ -99,15 +102,54 @@ static void netplay_net_post_frame(netplay_t *netplay)
#if defined(HAVE_THREADS)
autosave_unlock();
#endif
netplay->tmp_ptr = NEXT_PTR(netplay->tmp_ptr);
netplay->tmp_frame_count++;
first = false;
netplay->replay_ptr = NEXT_PTR(netplay->replay_ptr);
netplay->replay_frame_count++;
}
netplay->other_ptr = netplay->read_ptr;
netplay->other_frame_count = netplay->read_frame_count;
netplay->is_replay = false;
}
/* And if the other side has gotten too far ahead of /us/, skip to catch up
* FIXME: Make this configurable */
if (netplay->read_frame_count > netplay->self_frame_count + 10 ||
netplay->must_fast_forward)
{
/* "replay" into the future */
netplay->is_replay = true;
netplay->replay_ptr = netplay->self_ptr;
netplay->replay_frame_count = netplay->self_frame_count;
/* just assume input doesn't change for the intervening frames */
while (netplay->replay_frame_count < netplay->read_frame_count)
{
size_t cur = netplay->replay_ptr;
size_t prev = PREV_PTR(cur);
memcpy(netplay->buffer[cur].self_state, netplay->buffer[prev].self_state,
sizeof(netplay->buffer[prev].self_state));
#if defined(HAVE_THREADS)
autosave_lock();
#endif
core_run();
#if defined(HAVE_THREADS)
autosave_unlock();
#endif
netplay->replay_ptr = NEXT_PTR(cur);
netplay->replay_frame_count++;
}
/* at this point, other = read = self */
netplay->self_ptr = netplay->replay_ptr;
netplay->self_frame_count = netplay->replay_frame_count;
netplay->other_ptr = netplay->read_ptr;
netplay->other_frame_count = netplay->read_frame_count;
netplay->is_replay = false;
}
}
static bool netplay_net_init_buffers(netplay_t *netplay)
{

View File

@ -35,17 +35,22 @@
#define MAX_SPECTATORS 16
#define RARCH_DEFAULT_PORT 55435
#define NETPLAY_PROTOCOL_VERSION 1
#define PREV_PTR(x) ((x) == 0 ? netplay->buffer_size - 1 : (x) - 1)
#define NEXT_PTR(x) ((x + 1) % netplay->buffer_size)
struct delta_frame
{
uint32_t frame;
void *state;
uint32_t real_input_state[UDP_WORDS_PER_FRAME - 1];
uint32_t simulated_input_state[UDP_WORDS_PER_FRAME - 1];
uint32_t self_state[UDP_WORDS_PER_FRAME - 1];
bool have_local;
bool is_simulated;
bool used_real;
};
@ -81,8 +86,8 @@ struct netplay
/* Pointer to where we are reading.
* Generally, other_ptr <= read_ptr <= self_ptr. */
size_t read_ptr;
/* A temporary pointer used on replay. */
size_t tmp_ptr;
/* A pointer used temporarily for replay. */
size_t replay_ptr;
size_t state_size;
@ -90,14 +95,16 @@ struct netplay
bool is_replay;
/* We don't want to poll several times on a frame. */
bool can_poll;
/* If we end up having to drop remote frame data because it's ahead of us, fast-forward is URGENT */
bool must_fast_forward;
/* To compat UDP packet loss we also send
* old data along with the packets. */
uint32_t packet_buffer[UDP_FRAME_PACKETS * UDP_WORDS_PER_FRAME];
uint32_t frame_count;
uint32_t self_frame_count;
uint32_t read_frame_count;
uint32_t other_frame_count;
uint32_t tmp_frame_count;
uint32_t replay_frame_count;
struct addrinfo *addr;
struct sockaddr_storage their_addr;
bool has_client_addr;
@ -158,4 +165,6 @@ bool netplay_is_server(netplay_t* netplay);
bool netplay_is_spectate(netplay_t* netplay);
bool netplay_delta_frame_ready(netplay_t *netplay, struct delta_frame *delta, uint32_t frame);
#endif