From 53c6223dc62afc7012a69efda34a6f0e93cb917c Mon Sep 17 00:00:00 2001 From: Gregor Richards Date: Sun, 30 Oct 2016 14:27:43 -0400 Subject: [PATCH] Use zlib compression for savestates sent over netplay. --- network/netplay/README | 19 +++++-- network/netplay/netplay.c | 87 +++++++++++++++++++++++++++---- network/netplay/netplay_private.h | 4 ++ 3 files changed, 96 insertions(+), 14 deletions(-) diff --git a/network/netplay/README b/network/netplay/README index 8f0ca71057..34b71609a0 100644 --- a/network/netplay/README +++ b/network/netplay/README @@ -60,10 +60,10 @@ During the frame of execution, when the core requests input, it receives the input from the state buffer, both local and real or simulated remote. Post-frame, it checks whether it's read more than it's actioned, i.e. if read > -other self > other. If so, it first checks whether its simulated remote data -was correct. If it was, it simply moves other up. If not, it rewinds to other -(by loading the serialized state there) and runs the core in replay mode with -the real data up to the least of self and read, then sets other to that. +other and self > other. If so, it first checks whether its simulated remote +data was correct. If it was, it simply moves other up. If not, it rewinds to +other (by loading the serialized state there) and runs the core in replay mode +with the real data up to the least of self and read, then sets other to that. When in Netplay mode, the callback for receiving input is replaced by input_state_net. It is the role of input_state_net to combine the true local @@ -146,8 +146,17 @@ Command: LOAD_SAVESTATE Payload: { frame number: uint32 + uncompressed size: uint32 serialized save state: blob (variable size) } Description: Cause the other side to load a savestate, notionally one which the sending - side has also loaded. + side has also loaded. The serialized savestate is zlib compressed. + +Command: PAUSE +Payload: None + Indicates that the core is paused. The receiving peer should also pause. + +Command: RESUME +Payload: None + Indicates that the core is no longer paused. diff --git a/network/netplay/netplay.c b/network/netplay/netplay.c index 14fcd618a9..8fce10d916 100644 --- a/network/netplay/netplay.c +++ b/network/netplay/netplay.c @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -584,6 +585,8 @@ static bool netplay_get_cmd(netplay_t *netplay) case NETPLAY_CMD_LOAD_SAVESTATE: { uint32_t frame; + uint32_t isize; + z_stream stream; /* Make sure we're ready for it */ if (netplay->quirks & NETPLAY_QUIRK_INITIALIZATION) @@ -612,9 +615,9 @@ static bool netplay_get_cmd(netplay_t *netplay) * (strangely) force a rewind to the frame we're already on, so it * gets loaded. This is just to avoid having reloading implemented in * too many places. */ - if (cmd_size > netplay->state_size + sizeof(uint32_t)) + if (cmd_size > netplay->zbuffer_size + 2*sizeof(uint32_t)) { - RARCH_ERR("CMD_LOAD_SAVESTATE received an unexpected save state size.\n"); + RARCH_ERR("CMD_LOAD_SAVESTATE received an unexpected payload size.\n"); return netplay_cmd_nak(netplay); } @@ -631,14 +634,45 @@ static bool netplay_get_cmd(netplay_t *netplay) return netplay_cmd_nak(netplay); } + if (!socket_receive_all_blocking(netplay->fd, &isize, sizeof(isize))) + { + RARCH_ERR("CMD_LOAD_SAVESTATE failed to receive inflated size.\n"); + return netplay_cmd_nak(netplay); + } + isize = ntohl(isize); + + if (isize != netplay->state_size) + { + RARCH_ERR("CMD_LOAD_SAVESTATE received an unexpected save state size.\n"); + return netplay_cmd_nak(netplay); + } + if (!socket_receive_all_blocking(netplay->fd, - netplay->buffer[netplay->read_ptr].state, - cmd_size - sizeof(uint32_t))) + netplay->zbuffer, cmd_size - 2*sizeof(uint32_t))) { RARCH_ERR("CMD_LOAD_SAVESTATE failed to receive savestate.\n"); return netplay_cmd_nak(netplay); } + /* And uncompress it */ + memset(&stream, 0, sizeof(stream)); + inflateInit(&stream); + stream.next_in = netplay->zbuffer; + stream.avail_in = cmd_size - 2*sizeof(uint32_t); + stream.next_out = netplay->buffer[netplay->read_ptr].state; + stream.avail_out = netplay->state_size; + if (inflate(&stream, 1) == Z_STREAM_ERROR) + { + RARCH_ERR("CMD_LOAD_SAVESTATE failed to uncompress savestate.\n"); + return netplay_cmd_nak(netplay); + } + + if (stream.total_out != netplay->state_size) + { + RARCH_ERR("CMD_LOAD_SAVESTATE received too-short savestate.\n"); + return netplay_cmd_nak(netplay); + } + /* Skip ahead if it's past where we are */ if (frame > netplay->self_frame_count) { @@ -1101,6 +1135,15 @@ bool netplay_init_serialization(netplay_t *netplay) } } + netplay->zbuffer_size = netplay->state_size * 2; + netplay->zbuffer = calloc(netplay->zbuffer_size, 1); + if (!netplay->zbuffer) + { + netplay->quirks |= NETPLAY_QUIRK_NO_TRANSMISSION; + netplay->zbuffer_size = 0; + return false; + } + return true; } @@ -1292,7 +1335,8 @@ void netplay_free(netplay_t *netplay) free(netplay->spectate.input); } - else + + if (netplay->buffer) { for (i = 0; i < netplay->buffer_size; i++) if (netplay->buffer[i].state) @@ -1301,6 +1345,9 @@ void netplay_free(netplay_t *netplay) free(netplay->buffer); } + if (netplay->zbuffer) + free(netplay->zbuffer); + if (netplay->addr) freeaddrinfo_retro(netplay->addr); @@ -1391,8 +1438,9 @@ void netplay_frontend_paused(netplay_t *netplay, bool paused) void netplay_load_savestate(netplay_t *netplay, retro_ctx_serialize_info_t *serial_info, bool save) { - uint32_t header[3]; + uint32_t header[4]; retro_ctx_serialize_info_t tmp_serial_info; + z_stream stream; if (!netplay->has_connection) return; @@ -1442,10 +1490,31 @@ void netplay_load_savestate(netplay_t *netplay, | NETPLAY_QUIRK_NO_TRANSMISSION)) return; - /* And send it to the peer (FIXME: this is an ugly way to do this) */ + /* Compress it */ + memset(&stream, 0, sizeof(stream)); + deflateInit(&stream, Z_DEFAULT_COMPRESSION); + stream.next_in = (z_const Bytef *) serial_info->data_const; + stream.avail_in = serial_info->size; + stream.next_out = netplay->zbuffer; + stream.avail_out = netplay->zbuffer_size; + if (deflate(&stream, 1) == Z_STREAM_ERROR) + { + fprintf(stderr, "OH NO! %s\n", stream.msg); + hangup(netplay); + return; + } + if (stream.total_in != serial_info->size) + { + fprintf(stderr, "OH NO 2! %u %u\n", (unsigned) stream.total_in, (unsigned) serial_info->size); + hangup(netplay); + return; + } + + /* And send it to the peer */ header[0] = htonl(NETPLAY_CMD_LOAD_SAVESTATE); - header[1] = htonl(serial_info->size + sizeof(uint32_t)); + header[1] = htonl(stream.total_out + 2*sizeof(uint32_t)); header[2] = htonl(netplay->self_frame_count); + header[3] = htonl(serial_info->size); if (!socket_send_all_blocking(netplay->fd, header, sizeof(header), false)) { @@ -1454,7 +1523,7 @@ void netplay_load_savestate(netplay_t *netplay, } if (!socket_send_all_blocking(netplay->fd, - serial_info->data_const, serial_info->size, false)) + netplay->zbuffer, stream.total_out, false)) { hangup(netplay); return; diff --git a/network/netplay/netplay_private.h b/network/netplay/netplay_private.h index 4cfbe9273e..09ffff2b18 100644 --- a/network/netplay/netplay_private.h +++ b/network/netplay/netplay_private.h @@ -122,6 +122,10 @@ struct netplay struct delta_frame *buffer; size_t buffer_size; + /* A buffer into which to compress frames for transfer */ + uint8_t *zbuffer; + size_t zbuffer_size; + /* Pointer where we are now. */ size_t self_ptr; /* Points to the last reliable state that self ever had. */