Merge pull request #3584 from GregorR/netplay-nouveau

Netplay Nouveau
This commit is contained in:
Twinaphex 2016-09-14 16:40:39 +02:00 committed by GitHub
commit 83eeb15e4a
6 changed files with 400 additions and 317 deletions

81
network/netplay/README Normal file
View File

@ -0,0 +1,81 @@
This is RetroArch's Netplay code. RetroArch Netplay allows a second player to
be connected via the Internet, rather than local at the same computer. Netplay
in RetroArch is guaranteed* to work with perfect synchronization given a few
minor constraints:
(1) The core is deterministic,
(2) The only input devices the core interacts with are the joypad and analog sticks, and
(3) Both the core and the loaded content are identical on host and client.
Furthermore, if the core supports serialization (save states), Netplay allows
for latency and clock drift, providing both host and client with a smooth
experience.
Note that this documentation is all for (the poorly-named) "net" mode, which is
the normal mode, and not "spectator" mode, which has its own whole host of
problems.
Netplay in RetroArch works by expecting input to come delayed from the network,
then rewinding and re-playing with the delayed input to get a consistent state.
So long as both sides agree on which frame is which, it should be impossible
for them to become de-synced, since each input event always happens at the
correct frame.
In terms of the implementation, Netplay is in effect a state buffer
(implemented as a ring of buffers) and some pre- and post-frame behaviors.
Within the state buffers, there are three locations: self, other and read. Each
refers to a frame, and a state buffer corresponding to that frame. The state
buffer contains the savestate for the frame, and the input from both the local
and remote players.
Self is where the emulator believes itself to be, which may be ahead or behind
of what it's read from the peer. Generally speaking, self progresses at 1 frame
per frame, except when the network stalls, described later.
Other is where it was most recently in perfect sync: i.e., other-1 is the last
frame from which both local and remote input have been actioned. As such, other
is always less than or equal to both self and read. Since the state buffer is a
ring, other is the first frame that it's unsafe to overwrite.
Read is where it's read up to, which can be slightly ahead of other since it
can't always immediately act upon new data.
In general, other ≤ read and other ≤ self. In all likelyhood, read ≤ self, but
it is both possible and supported for the remote host to get ahead of the local
host.
Pre-frame, Netplay serializes the core's state, polls for local input, and
polls for input from the other side. If the input from the other side is too
far behind, it stalls to allow the other side to catch up. To assure that this
stalling does not block the UI thread, it is implemented by rewinding the
thread every frame until data is ready.
If input has not been received for the other side up to the current frame (the
usual case), the remote input is simulated in a simplistic manner. Each
frame's local serialized state and simulated or real input goes into the frame
buffers.
During the frame of execution, when the core requests input, it receives the
input from the thread buffer, both local and real or simulated remote.
Post-frame, it checks whether it's read more than it's actioned, i.e. if read >
other. If so, it rewinds to other (by loading the serialized state there) and
runs the core in replay mode with the real data up to read, then sets other =
read.
When in Netplay mode, the callback for receiving input is replaced by
input_state_net. It is the role of input_state_net to combine the true local
input (whether live or replay) with the remote input (whether true or
simulated).
Some thoughts about "frame counts": The frame counters act like indexes into a
0-indexed array; i.e., they refer to the first unactioned frame. So, when
read_frame_count is 23, we've read 23 frames, but the last frame we read is
frame 22. With self_frame_count it's slightly more complicated, since there are
two relevant actions: Reading the data and emulating with the data. The frame
count is only incremented after the latter, so there is a period of time during
which we've actually read self_frame_count+1 frames of local input.
* Guarantee not actually a guarantee.

View File

@ -1,6 +1,7 @@
/* RetroArch - A frontend for libretro.
* Copyright (C) 2010-2014 - Hans-Kristian Arntzen
* Copyright (C) 2011-2016 - Daniel De Matteis
* Copyright (C) 2016 - Gregor Richards
*
* RetroArch is free software: you can redistribute it and/or modify it under the terms
* of the GNU General Public License as published by the Free Software Found-
@ -25,6 +26,7 @@
#include <retro_assert.h>
#include <net/net_compat.h>
#include <net/net_socket.h>
#include <features/features_cpu.h>
#include <retro_endianness.h>
#include "netplay_private.h"
@ -56,18 +58,6 @@ static void warn_hangup(void)
runloop_msg_queue_push("Netplay has disconnected. Will continue without connection.", 0, 480, false);
}
/**
* check_netplay_synched:
* @netplay: pointer to the netplay object.
* Checks to see if the host and client have synchronized states. Returns true
* on success and false on failure.
*/
bool check_netplay_synched(netplay_t* netplay)
{
retro_assert(netplay);
return netplay->frame_count < (netplay->flip_frame + 2 * UDP_FRAME_PACKETS);
}
static bool netplay_info_cb(netplay_t* netplay, unsigned frames) {
return netplay->net_cbs->info_cb(netplay, frames);
}
@ -96,39 +86,6 @@ static bool netplay_can_poll(netplay_t *netplay)
return netplay->can_poll;
}
static bool send_chunk(netplay_t *netplay)
{
const struct sockaddr *addr = NULL;
if (netplay->addr)
addr = netplay->addr->ai_addr;
else if (netplay->has_client_addr)
addr = (const struct sockaddr*)&netplay->their_addr;
if (addr)
{
ssize_t bytes_sent;
#ifdef HAVE_IPV6
bytes_sent = (sendto(netplay->udp_fd, (const char*)netplay->packet_buffer,
sizeof(netplay->packet_buffer), 0, addr,
sizeof(struct sockaddr_in6)));
#else
bytes_sent = (sendto(netplay->udp_fd, (const char*)netplay->packet_buffer,
sizeof(netplay->packet_buffer), 0, addr,
sizeof(struct sockaddr_in)));
#endif
if (bytes_sent != sizeof(netplay->packet_buffer))
{
warn_hangup();
netplay->has_connection = false;
return false;
}
}
return true;
}
/**
* get_self_input_state:
* @netplay : pointer to netplay object
@ -139,10 +96,19 @@ static bool send_chunk(netplay_t *netplay)
**/
static bool get_self_input_state(netplay_t *netplay)
{
uint32_t state[UDP_WORDS_PER_FRAME - 1] = {0};
uint32_t state[WORDS_PER_FRAME - 1] = {0, 0, 0};
struct delta_frame *ptr = &netplay->buffer[netplay->self_ptr];
if (!input_driver_is_libretro_input_blocked() && netplay->frame_count > 0)
if (!netplay_delta_frame_ready(netplay, ptr, netplay->self_frame_count))
return false;
if (ptr->have_local)
{
/* We've already read this frame! */
return true;
}
if (!input_driver_is_libretro_input_blocked() && netplay->self_frame_count > 0)
{
unsigned i;
settings_t *settings = config_get_ptr();
@ -179,18 +145,19 @@ static bool get_self_input_state(netplay_t *netplay)
* }
*
* payload {
* ; To compat packet losses, send input in a sliding window
* frame redundancy_frames[UDP_FRAME_PACKETS];
* cmd (CMD_INPUT)
* cmd_size (4 words)
* frame
* }
*/
memmove(netplay->packet_buffer, netplay->packet_buffer + UDP_WORDS_PER_FRAME,
sizeof (netplay->packet_buffer) - UDP_WORDS_PER_FRAME * sizeof(uint32_t));
netplay->packet_buffer[(UDP_FRAME_PACKETS - 1) * UDP_WORDS_PER_FRAME] = htonl(netplay->frame_count);
netplay->packet_buffer[(UDP_FRAME_PACKETS - 1) * UDP_WORDS_PER_FRAME + 1] = htonl(state[0]);
netplay->packet_buffer[(UDP_FRAME_PACKETS - 1) * UDP_WORDS_PER_FRAME + 2] = htonl(state[1]);
netplay->packet_buffer[(UDP_FRAME_PACKETS - 1) * UDP_WORDS_PER_FRAME + 3] = htonl(state[2]);
netplay->packet_buffer[0] = htonl(NETPLAY_CMD_INPUT);
netplay->packet_buffer[1] = htonl(WORDS_PER_FRAME * sizeof(uint32_t));
netplay->packet_buffer[2] = htonl(netplay->self_frame_count);
netplay->packet_buffer[3] = htonl(state[0]);
netplay->packet_buffer[4] = htonl(state[1]);
netplay->packet_buffer[5] = htonl(state[2]);
if (!send_chunk(netplay))
if (!socket_send_all_blocking(netplay->fd, netplay->packet_buffer, sizeof(netplay->packet_buffer), false))
{
warn_hangup();
netplay->has_connection = false;
@ -198,47 +165,97 @@ static bool get_self_input_state(netplay_t *netplay)
}
memcpy(ptr->self_state, state, sizeof(state));
netplay->self_ptr = NEXT_PTR(netplay->self_ptr);
ptr->have_local = true;
return true;
}
static bool netplay_cmd_ack(netplay_t *netplay)
static bool netplay_send_raw_cmd(netplay_t *netplay, uint32_t cmd,
const void *data, size_t size)
{
uint32_t cmd = htonl(NETPLAY_CMD_ACK);
return socket_send_all_blocking(netplay->fd, &cmd, sizeof(cmd), false);
uint32_t cmdbuf[2];
cmdbuf[0] = htonl(cmd);
cmdbuf[1] = htonl(size);
if (!socket_send_all_blocking(netplay->fd, cmdbuf, sizeof(cmdbuf), false))
return false;
if (size > 0)
if (!socket_send_all_blocking(netplay->fd, data, size, false))
return false;
return true;
}
static bool netplay_cmd_nak(netplay_t *netplay)
{
uint32_t cmd = htonl(NETPLAY_CMD_NAK);
return socket_send_all_blocking(netplay->fd, &cmd, sizeof(cmd), false);
}
static bool netplay_get_response(netplay_t *netplay)
{
uint32_t response;
if (!socket_receive_all_blocking(netplay->fd, &response, sizeof(response)))
return false;
return ntohl(response) == NETPLAY_CMD_ACK;
return netplay_send_raw_cmd(netplay, NETPLAY_CMD_NAK, NULL, 0);
}
static bool netplay_get_cmd(netplay_t *netplay)
{
uint32_t cmd;
uint32_t flip_frame;
size_t cmd_size;
uint32_t cmd_size;
/* FIXME: This depends on delta_frame_ready */
netplay->timeout_cnt = 0;
if (!socket_receive_all_blocking(netplay->fd, &cmd, sizeof(cmd)))
return false;
cmd = ntohl(cmd);
cmd_size = cmd & 0xffff;
cmd = cmd >> 16;
if (!socket_receive_all_blocking(netplay->fd, &cmd_size, sizeof(cmd)))
return false;
cmd_size = ntohl(cmd_size);
switch (cmd)
{
case NETPLAY_CMD_ACK:
/* Why are we even bothering? */
return true;
case NETPLAY_CMD_NAK:
/* Disconnect now! */
return false;
case NETPLAY_CMD_INPUT:
{
uint32_t buffer[WORDS_PER_FRAME];
unsigned i;
if (cmd_size != WORDS_PER_FRAME * sizeof(uint32_t))
{
RARCH_ERR("NETPLAY_CMD_INPUT received an unexpected payload size.\n");
return netplay_cmd_nak(netplay);
}
if (!socket_receive_all_blocking(netplay->fd, buffer, sizeof(buffer)))
{
RARCH_ERR("Failed to receive NETPLAY_CMD_INPUT input.\n");
return netplay_cmd_nak(netplay);
}
for (i = 0; i < WORDS_PER_FRAME; i++)
buffer[i] = ntohl(buffer[i]);
if (buffer[0] != netplay->read_frame_count)
{
/* Out of order = out of luck */
return netplay_cmd_nak(netplay);
}
/* The data's good! */
netplay->buffer[netplay->read_ptr].have_remote = true;
memcpy(netplay->buffer[netplay->read_ptr].real_input_state, buffer + 1, sizeof(buffer) - sizeof(uint32_t));
netplay->read_ptr = NEXT_PTR(netplay->read_ptr);
netplay->read_frame_count++;
return true;
}
case NETPLAY_CMD_FLIP_PLAYERS:
if (cmd_size != sizeof(uint32_t))
{
@ -267,7 +284,7 @@ static bool netplay_get_cmd(netplay_t *netplay)
RARCH_LOG("Netplay users are flipped.\n");
runloop_msg_queue_push("Netplay users are flipped.", 1, 180, false);
return netplay_cmd_ack(netplay);
return true;
case NETPLAY_CMD_SPECTATE:
RARCH_ERR("NETPLAY_CMD_SPECTATE unimplemented.\n");
@ -275,7 +292,7 @@ static bool netplay_get_cmd(netplay_t *netplay)
case NETPLAY_CMD_DISCONNECT:
warn_hangup();
return netplay_cmd_ack(netplay);
return true;
case NETPLAY_CMD_LOAD_SAVESTATE:
RARCH_ERR("NETPLAY_CMD_LOAD_SAVESTATE unimplemented.\n");
@ -283,11 +300,11 @@ static bool netplay_get_cmd(netplay_t *netplay)
case NETPLAY_CMD_PAUSE:
command_event(CMD_EVENT_PAUSE, NULL);
return netplay_cmd_ack(netplay);
return true;
case NETPLAY_CMD_RESUME:
command_event(CMD_EVENT_UNPAUSE, NULL);
return netplay_cmd_ack(netplay);
return true;
default: break;
}
@ -301,8 +318,8 @@ static bool netplay_get_cmd(netplay_t *netplay)
static int poll_input(netplay_t *netplay, bool block)
{
int max_fd = (netplay->fd > netplay->udp_fd ?
netplay->fd : netplay->udp_fd) + 1;
bool had_input = false;
int max_fd = netplay->fd + 1;
struct timeval tv = {0};
tv.tv_sec = 0;
tv.tv_usec = block ? (RETRY_MS * 1000) : 0;
@ -314,11 +331,11 @@ static int poll_input(netplay_t *netplay, bool block)
* Technically possible for select() to modify tmp_tv, so
* we go paranoia mode. */
struct timeval tmp_tv = tv;
had_input = false;
netplay->timeout_cnt++;
FD_ZERO(&fds);
FD_SET(netplay->udp_fd, &fds);
FD_SET(netplay->fd, &fds);
if (socket_select(max_fd, &fds, NULL, NULL, &tmp_tv) < 0)
@ -326,83 +343,43 @@ static int poll_input(netplay_t *netplay, bool block)
/* Somewhat hacky,
* but we aren't using the TCP connection for anything useful atm. */
if (FD_ISSET(netplay->fd, &fds) && !netplay_get_cmd(netplay))
return -1;
if (FD_ISSET(netplay->udp_fd, &fds))
return 1;
if (FD_ISSET(netplay->fd, &fds))
{
/* If we're not ready for input, wait until we are. Could fill the TCP buffer, stalling the other side. */
if (netplay_delta_frame_ready(netplay, &netplay->buffer[netplay->read_ptr], netplay->read_frame_count))
{
had_input = true;
if (!netplay_get_cmd(netplay))
return -1;
}
}
if (!block)
continue;
if (!send_chunk(netplay))
{
warn_hangup();
netplay->has_connection = false;
RARCH_LOG("Network is stalling at frame %u, count %u of %d ...\n",
netplay->self_frame_count, netplay->timeout_cnt, MAX_RETRIES);
if (netplay->timeout_cnt >= MAX_RETRIES)
return -1;
}
} while (had_input || (block && (netplay->read_frame_count <= netplay->self_frame_count)));
RARCH_LOG("Network is stalling, resending packet... Count %u of %d ...\n",
netplay->timeout_cnt, MAX_RETRIES);
} while ((netplay->timeout_cnt < MAX_RETRIES) && block);
if (block)
return -1;
return 0;
}
static bool receive_data(netplay_t *netplay, uint32_t *buffer, size_t size)
{
socklen_t addrlen = sizeof(netplay->their_addr);
if (recvfrom(netplay->udp_fd, (char*)buffer, size, 0,
(struct sockaddr*)&netplay->their_addr, &addrlen) != (ssize_t)size)
return false;
netplay->has_client_addr = true;
return true;
}
static void parse_packet(netplay_t *netplay, uint32_t *buffer, unsigned size)
{
unsigned i;
for (i = 0; i < size * UDP_WORDS_PER_FRAME; i++)
buffer[i] = ntohl(buffer[i]);
for (i = 0; i < size && netplay->read_frame_count <= netplay->frame_count; i++)
{
uint32_t frame = buffer[UDP_WORDS_PER_FRAME * i + 0];
const uint32_t *state = &buffer[UDP_WORDS_PER_FRAME * i + 1];
if (frame != netplay->read_frame_count)
continue;
netplay->buffer[netplay->read_ptr].is_simulated = false;
memcpy(netplay->buffer[netplay->read_ptr].real_input_state, state,
sizeof(netplay->buffer[netplay->read_ptr].real_input_state));
netplay->read_ptr = NEXT_PTR(netplay->read_ptr);
netplay->read_frame_count++;
netplay->timeout_cnt = 0;
}
}
/* TODO: Somewhat better prediction. :P */
static void simulate_input(netplay_t *netplay)
{
size_t ptr = PREV_PTR(netplay->self_ptr);
size_t ptr = netplay->self_ptr;
size_t prev = PREV_PTR(netplay->read_ptr);
memcpy(netplay->buffer[ptr].simulated_input_state,
netplay->buffer[prev].real_input_state,
sizeof(netplay->buffer[prev].real_input_state));
netplay->buffer[ptr].is_simulated = true;
netplay->buffer[ptr].used_real = false;
}
#define MAX_STALL_TIME_USEC (10*1000*1000)
/**
* netplay_poll:
* @netplay : pointer to netplay object
@ -422,27 +399,11 @@ static bool netplay_poll(netplay_t *netplay)
netplay->can_poll = false;
if (!get_self_input_state(netplay))
return false;
get_self_input_state(netplay);
/* We skip reading the first frame so the host has a chance to grab
* our host info so we don't block forever :') */
if (netplay->frame_count == 0)
{
netplay->buffer[0].used_real = true;
netplay->buffer[0].is_simulated = false;
memset(netplay->buffer[0].real_input_state,
0, sizeof(netplay->buffer[0].real_input_state));
netplay->read_ptr = NEXT_PTR(netplay->read_ptr);
netplay->read_frame_count++;
return true;
}
/* We might have reached the end of the buffer, where we
* simply have to block. */
res = poll_input(netplay, netplay->other_ptr == netplay->self_ptr);
/* Read Netplay input, block if we're configured to stall for input every
* frame */
res = poll_input(netplay, (netplay->stall_frames == 0) && (netplay->read_frame_count <= netplay->self_frame_count));
if (res == -1)
{
netplay->has_connection = false;
@ -450,39 +411,38 @@ static bool netplay_poll(netplay_t *netplay)
return false;
}
if (res == 1)
{
uint32_t first_read = netplay->read_frame_count;
do
{
uint32_t buffer[UDP_FRAME_PACKETS * UDP_WORDS_PER_FRAME];
if (!receive_data(netplay, buffer, sizeof(buffer)))
{
warn_hangup();
netplay->has_connection = false;
return false;
}
parse_packet(netplay, buffer, UDP_FRAME_PACKETS);
/* Simulate the input if we don't have real input */
if (!netplay->buffer[netplay->self_ptr].have_remote)
simulate_input(netplay);
} while ((netplay->read_frame_count <= netplay->frame_count) &&
poll_input(netplay, (netplay->other_ptr == netplay->self_ptr) &&
(first_read == netplay->read_frame_count)) == 1);
/* Consider stalling */
switch (netplay->stall) {
case RARCH_NETPLAY_STALL_RUNNING_FAST:
if (netplay->read_frame_count >= netplay->self_frame_count)
netplay->stall = RARCH_NETPLAY_STALL_NONE;
break;
default: /* not stalling */
if (netplay->read_frame_count + netplay->stall_frames <= netplay->self_frame_count)
{
netplay->stall = RARCH_NETPLAY_STALL_RUNNING_FAST;
netplay->stall_time = cpu_features_get_time_usec();
}
}
else
/* If we're stalling, consider disconnection */
if (netplay->stall)
{
/* Cannot allow this. Should not happen though. */
if (netplay->self_ptr == netplay->other_ptr)
retro_time_t now = cpu_features_get_time_usec();
if (now - netplay->stall_time >= MAX_STALL_TIME_USEC)
{
/* Stalled out! */
netplay->has_connection = false;
warn_hangup();
return false;
}
}
if (netplay->read_ptr != netplay->self_ptr)
simulate_input(netplay);
else
netplay->buffer[PREV_PTR(netplay->self_ptr)].used_real = true;
return true;
}
@ -504,14 +464,14 @@ void video_frame_net(const void *data, unsigned width,
void audio_sample_net(int16_t left, int16_t right)
{
netplay_t *netplay = (netplay_t*)netplay_data;
if (!netplay_should_skip(netplay))
if (!netplay_should_skip(netplay) && !netplay->stall)
netplay->cbs.sample_cb(left, right);
}
size_t audio_sample_batch_net(const int16_t *data, size_t frames)
{
netplay_t *netplay = (netplay_t*)netplay_data;
if (!netplay_should_skip(netplay))
if (!netplay_should_skip(netplay) && !netplay->stall)
return netplay->cbs.sample_batch_cb(data, frames);
return frames;
}
@ -533,13 +493,13 @@ static bool netplay_is_alive(netplay_t *netplay)
static bool netplay_flip_port(netplay_t *netplay, bool port)
{
size_t frame = netplay->frame_count;
size_t frame = netplay->self_frame_count;
if (netplay->flip_frame == 0)
return port;
if (netplay->is_replay)
frame = netplay->tmp_frame_count;
frame = netplay->replay_frame_count;
return port ^ netplay->flip ^ (frame < netplay->flip_frame);
}
@ -549,16 +509,21 @@ static int16_t netplay_input_state(netplay_t *netplay,
unsigned idx, unsigned id)
{
size_t ptr = netplay->is_replay ?
netplay->tmp_ptr : PREV_PTR(netplay->self_ptr);
netplay->replay_ptr : netplay->self_ptr;
const uint32_t *curr_input_state = netplay->buffer[ptr].self_state;
if (netplay->port == (netplay_flip_port(netplay, port) ? 1 : 0))
{
if (netplay->buffer[ptr].is_simulated)
curr_input_state = netplay->buffer[ptr].simulated_input_state;
else
if (netplay->buffer[ptr].have_remote)
{
netplay->buffer[ptr].used_real = true;
curr_input_state = netplay->buffer[ptr].real_input_state;
}
else
{
curr_input_state = netplay->buffer[ptr].simulated_input_state;
}
}
switch (device)
@ -663,6 +628,13 @@ static int init_tcp_connection(const struct addrinfo *res,
bool ret = true;
int fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
#if defined(IPPROTO_TCP) && defined(TCP_NODELAY)
{
int flag = 1;
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(int));
}
#endif
if (fd < 0)
{
ret = false;
@ -762,37 +734,6 @@ static bool init_tcp_socket(netplay_t *netplay, const char *server,
return ret;
}
static bool init_udp_socket(netplay_t *netplay, const char *server,
uint16_t port)
{
int fd = socket_init((void**)&netplay->addr, port, server, SOCKET_TYPE_DATAGRAM);
if (fd < 0)
goto error;
netplay->udp_fd = fd;
if (!server)
{
/* Not sure if we have to do this for UDP, but hey :) */
if (!socket_bind(netplay->udp_fd, (void*)netplay->addr))
{
RARCH_ERR("Failed to bind socket.\n");
socket_close(netplay->udp_fd);
netplay->udp_fd = -1;
}
freeaddrinfo_retro(netplay->addr);
netplay->addr = NULL;
}
return true;
error:
RARCH_ERR("Failed to initialize socket.\n");
return false;
}
static bool init_socket(netplay_t *netplay, const char *server, uint16_t port)
{
if (!network_init())
@ -800,8 +741,6 @@ static bool init_socket(netplay_t *netplay, const char *server, uint16_t port)
if (!init_tcp_socket(netplay, server, port, netplay->spectate.enabled))
return false;
if (!netplay->spectate.enabled && !init_udp_socket(netplay, server, port))
return false;
return true;
}
@ -825,22 +764,20 @@ netplay_t *netplay_new(const char *server, uint16_t port,
bool spectate,
const char *nick)
{
uint32_t buffer_frames;
netplay_t *netplay = NULL;
if (frames > UDP_FRAME_PACKETS)
frames = UDP_FRAME_PACKETS;
netplay = (netplay_t*)calloc(1, sizeof(*netplay));
if (!netplay)
return NULL;
netplay->fd = -1;
netplay->udp_fd = -1;
netplay->cbs = *cb;
netplay->port = server ? 0 : 1;
netplay->spectate.enabled = spectate;
netplay->is_server = server == NULL;
strlcpy(netplay->nick, nick, sizeof(netplay->nick));
netplay->stall_frames = frames;
if(spectate)
netplay->net_cbs = netplay_get_cbs_spectate();
@ -861,28 +798,11 @@ netplay_t *netplay_new(const char *server, uint16_t port,
error:
if (netplay->fd >= 0)
socket_close(netplay->fd);
if (netplay->udp_fd >= 0)
socket_close(netplay->udp_fd);
free(netplay);
return NULL;
}
static bool netplay_send_raw_cmd(netplay_t *netplay, uint32_t cmd,
const void *data, size_t size)
{
cmd = (cmd << 16) | (size & 0xffff);
cmd = htonl(cmd);
if (!socket_send_all_blocking(netplay->fd, &cmd, sizeof(cmd), false))
return false;
if (!socket_send_all_blocking(netplay->fd, data, size, false))
return false;
return true;
}
/**
* netplay_command:
* @netplay : pointer to netplay object
@ -905,7 +825,6 @@ bool netplay_command(netplay_t* netplay, enum netplay_cmd cmd,
const char* msg = NULL;
bool allowed_spectate = !!(flags & CMD_OPT_ALLOWED_IN_SPECTATE_MODE);
bool host_only = !!(flags & CMD_OPT_HOST_ONLY);
bool require_sync = !!(flags & CMD_OPT_REQUIRE_SYNC);
retro_assert(netplay);
@ -921,21 +840,11 @@ bool netplay_command(netplay_t* netplay, enum netplay_cmd cmd,
goto error;
}
if(require_sync && check_netplay_synched(netplay))
{
msg = "Cannot %s while host and client are not in sync.";
if (!netplay_send_raw_cmd(netplay, cmd, data, sz))
goto error;
}
if(netplay_send_raw_cmd(netplay, cmd, data, sz)) {
if(netplay_get_response(netplay))
runloop_msg_queue_push(success_msg, 1, 180, false);
else
{
msg = "Failed to send command \"%s\"";
goto error;
}
}
runloop_msg_queue_push(success_msg, 1, 180, false);
return true;
error:
@ -953,14 +862,14 @@ error:
**/
static void netplay_flip_users(netplay_t *netplay)
{
uint32_t flip_frame = netplay->frame_count + 2 * UDP_FRAME_PACKETS;
uint32_t flip_frame = netplay->self_frame_count + 32; /* FIXME: This value is now arbitrary */
uint32_t flip_frame_net = htonl(flip_frame);
bool command = netplay_command(
netplay, NETPLAY_CMD_FLIP_PLAYERS,
&flip_frame_net, sizeof flip_frame_net,
CMD_OPT_HOST_ONLY | CMD_OPT_REQUIRE_SYNC,
"flip users", "Successfully flipped users.\n");
if(command)
{
netplay->flip ^= true;
@ -990,8 +899,6 @@ void netplay_free(netplay_t *netplay)
}
else
{
socket_close(netplay->udp_fd);
for (i = 0; i < netplay->buffer_size; i++)
free(netplay->buffer[i].state);

View File

@ -40,7 +40,7 @@ enum rarch_netplay_ctl_state
enum netplay_cmd
{
/* Miscellaneous commands */
/* Basic commands */
/* Acknowlegement response */
NETPLAY_CMD_ACK = 0x0000,
@ -48,23 +48,28 @@ enum netplay_cmd
/* Failed acknowlegement response */
NETPLAY_CMD_NAK = 0x0001,
/* Input data */
NETPLAY_CMD_INPUT = 0x0002,
/* Misc. commands */
/* Swap inputs between player 1 and player 2 */
NETPLAY_CMD_FLIP_PLAYERS = 0x0002,
NETPLAY_CMD_FLIP_PLAYERS = 0x0003,
/* Toggle spectate/join mode */
NETPLAY_CMD_SPECTATE = 0x0003,
NETPLAY_CMD_SPECTATE = 0x0004,
/* Gracefully disconnects from host */
NETPLAY_CMD_DISCONNECT = 0x0004,
NETPLAY_CMD_DISCONNECT = 0x0005,
/* Sends multiple config requests over,
* See enum netplay_cmd_cfg */
NETPLAY_CMD_CFG = 0x0005,
NETPLAY_CMD_CFG = 0x0006,
/* CMD_CFG streamlines sending multiple
configurations. This acknowledges
each one individually */
NETPLAY_CMD_CFG_ACK = 0x0006,
NETPLAY_CMD_CFG_ACK = 0x0007,
/* Loading and synchronization */

View File

@ -1,6 +1,7 @@
/* RetroArch - A frontend for libretro.
* Copyright (C) 2010-2014 - Hans-Kristian Arntzen
* Copyright (C) 2011-2016 - Daniel De Matteis
* Copyright (C) 2016 - Gregor Richards
*
* RetroArch is free software: you can redistribute it and/or modify it under the terms
* of the GNU General Public License as published by the Free Software Found-
@ -79,7 +80,7 @@ uint32_t *netplay_bsv_header_generate(size_t *size, uint32_t magic)
uint32_t *header, bsv_header[4] = {0};
core_serialize_size(&info);
serialize_size = info.size;
header_size = sizeof(bsv_header) + serialize_size;
*size = header_size;
@ -177,9 +178,9 @@ uint32_t netplay_impl_magic(void)
core_api_version(&api_info);
api = api_info.version;
runloop_ctl(RUNLOOP_CTL_SYSTEM_INFO_GET, &info);
if (info)
lib = info->info.library_name;
@ -199,6 +200,8 @@ uint32_t netplay_impl_magic(void)
for (i = 0; i < len; i++)
res ^= ver[i] << ((i & 0xf) + 16);
res ^= NETPLAY_PROTOCOL_VERSION << 24;
return res;
}
@ -215,7 +218,7 @@ bool netplay_send_info(netplay_t *netplay)
core_get_memory(&mem_info);
content_get_crc(&content_crc_ptr);
header[0] = htonl(*content_crc_ptr);
header[1] = htonl(netplay_impl_magic());
header[2] = htonl(mem_info.size);
@ -342,3 +345,23 @@ bool netplay_is_spectate(netplay_t* netplay)
return false;
return netplay->spectate.enabled;
}
bool netplay_delta_frame_ready(netplay_t *netplay, struct delta_frame *delta, uint32_t frame)
{
void *remember_state;
if (delta->used)
{
if (delta->frame == frame) return true;
if (netplay->other_frame_count <= delta->frame)
{
/* We haven't even replayed this frame yet, so we can't overwrite it! */
return false;
}
}
remember_state = delta->state;
memset(delta, 0, sizeof(struct delta_frame));
delta->used = true;
delta->frame = frame;
delta->state = remember_state;
return true;
}

View File

@ -1,6 +1,7 @@
/* RetroArch - A frontend for libretro.
* Copyright (C) 2010-2014 - Hans-Kristian Arntzen
* Copyright (C) 2011-2016 - Daniel De Matteis
* Copyright (C) 2016 - Gregor Richards
*
* RetroArch is free software: you can redistribute it and/or modify it under the terms
* of the GNU General Public License as published by the Free Software Found-
@ -15,9 +16,12 @@
*/
#include <compat/strl.h>
#include <stdio.h>
#include "netplay_private.h"
#include "retro_assert.h"
#include "../../autosave.h"
/**
@ -30,10 +34,19 @@ static void netplay_net_pre_frame(netplay_t *netplay)
{
retro_ctx_serialize_info_t serial_info;
serial_info.data = netplay->buffer[netplay->self_ptr].state;
serial_info.size = netplay->state_size;
if (netplay_delta_frame_ready(netplay, &netplay->buffer[netplay->self_ptr], netplay->self_frame_count))
{
serial_info.data_const = NULL;
serial_info.data = netplay->buffer[netplay->self_ptr].state;
serial_info.size = netplay->state_size;
core_serialize(&serial_info);
if (!core_serialize(&serial_info))
{
/* If the core can't serialize properly, we must stall for the
* remote input on EVERY frame, because we can't recover */
netplay->stall_frames = 0;
}
}
netplay->can_poll = true;
@ -49,15 +62,17 @@ static void netplay_net_pre_frame(netplay_t *netplay)
**/
static void netplay_net_post_frame(netplay_t *netplay)
{
netplay->frame_count++;
netplay->self_ptr = NEXT_PTR(netplay->self_ptr);
netplay->self_frame_count++;
/* Nothing to do... */
if (netplay->other_frame_count == netplay->read_frame_count)
/* Only relevant if we're connected */
if (!netplay->has_connection)
return;
/* Skip ahead if we predicted correctly.
* Skip until our simulation failed. */
while (netplay->other_frame_count < netplay->read_frame_count)
while (netplay->other_frame_count < netplay->read_frame_count &&
netplay->other_frame_count < netplay->self_frame_count)
{
const struct delta_frame *ptr = &netplay->buffer[netplay->other_ptr];
@ -69,24 +84,29 @@ static void netplay_net_post_frame(netplay_t *netplay)
netplay->other_frame_count++;
}
if (netplay->other_frame_count < netplay->read_frame_count)
/* Now replay the real input if we've gotten ahead of it */
if (netplay->other_frame_count < netplay->read_frame_count &&
netplay->other_frame_count < netplay->self_frame_count)
{
retro_ctx_serialize_info_t serial_info;
bool first = true;
/* Replay frames. */
netplay->is_replay = true;
netplay->tmp_ptr = netplay->other_ptr;
netplay->tmp_frame_count = netplay->other_frame_count;
netplay->replay_ptr = netplay->other_ptr;
netplay->replay_frame_count = netplay->other_frame_count;
serial_info.data_const = netplay->buffer[netplay->other_ptr].state;
serial_info.size = netplay->state_size;
core_unserialize(&serial_info);
while (first || (netplay->tmp_ptr != netplay->self_ptr))
if (netplay->replay_frame_count < netplay->self_frame_count)
{
serial_info.data = netplay->buffer[netplay->tmp_ptr].state;
serial_info.data = NULL;
serial_info.data_const = netplay->buffer[netplay->replay_ptr].state;
serial_info.size = netplay->state_size;
core_unserialize(&serial_info);
}
while (netplay->replay_frame_count < netplay->self_frame_count)
{
serial_info.data = netplay->buffer[netplay->replay_ptr].state;
serial_info.size = netplay->state_size;
serial_info.data_const = NULL;
@ -99,15 +119,37 @@ static void netplay_net_post_frame(netplay_t *netplay)
#if defined(HAVE_THREADS)
autosave_unlock();
#endif
netplay->tmp_ptr = NEXT_PTR(netplay->tmp_ptr);
netplay->tmp_frame_count++;
first = false;
netplay->replay_ptr = NEXT_PTR(netplay->replay_ptr);
netplay->replay_frame_count++;
}
netplay->other_ptr = netplay->read_ptr;
netplay->other_frame_count = netplay->read_frame_count;
if (netplay->read_frame_count < netplay->self_frame_count)
{
netplay->other_ptr = netplay->read_ptr;
netplay->other_frame_count = netplay->read_frame_count;
}
else
{
netplay->other_ptr = netplay->self_ptr;
netplay->other_frame_count = netplay->self_frame_count;
}
netplay->is_replay = false;
}
/* If we're supposed to stall, rewind */
if (netplay->stall)
{
retro_ctx_serialize_info_t serial_info;
netplay->self_ptr = PREV_PTR(netplay->self_ptr);
netplay->self_frame_count--;
serial_info.data = NULL;
serial_info.data_const = netplay->buffer[netplay->self_ptr].state;
serial_info.size = netplay->state_size;
core_unserialize(&serial_info);
}
}
static bool netplay_net_init_buffers(netplay_t *netplay)
{
@ -119,7 +161,7 @@ static bool netplay_net_init_buffers(netplay_t *netplay)
netplay->buffer = (struct delta_frame*)calloc(netplay->buffer_size,
sizeof(*netplay->buffer));
if (!netplay->buffer)
return false;
@ -133,8 +175,6 @@ static bool netplay_net_init_buffers(netplay_t *netplay)
if (!netplay->buffer[i].state)
return false;
netplay->buffer[i].is_simulated = true;
}
return true;
@ -153,7 +193,11 @@ static bool netplay_net_info_cb(netplay_t* netplay, unsigned frames)
return false;
}
netplay->buffer_size = frames + 1;
/* * 2 + 1 because:
* Self sits in the middle,
* Other is allowed to drift as much as 'frames' frames behind
* Read is allowed to drift as much as 'frames' frames ahead */
netplay->buffer_size = frames * 2 + 1;
if (!netplay_net_init_buffers(netplay))
return false;

View File

@ -20,6 +20,7 @@
#include "netplay.h"
#include <net/net_compat.h>
#include <features/features_cpu.h>
#include <retro_endianness.h>
#include "../../core.h"
@ -30,23 +31,33 @@
#define HAVE_IPV6
#endif
#define UDP_FRAME_PACKETS 16
#define UDP_WORDS_PER_FRAME 4 /* Allows us to send 128 bits worth of state per frame. */
#define MAX_SPECTATORS 16
#define RARCH_DEFAULT_PORT 55435
#define WORDS_PER_FRAME 4 /* Allows us to send 128 bits worth of state per frame. */
#define MAX_SPECTATORS 16
#define RARCH_DEFAULT_PORT 55435
#define NETPLAY_PROTOCOL_VERSION 1
#define PREV_PTR(x) ((x) == 0 ? netplay->buffer_size - 1 : (x) - 1)
#define NEXT_PTR(x) ((x + 1) % netplay->buffer_size)
struct delta_frame
{
bool used; /* a bit derpy, but this is how we know if the delta's been used at all */
uint32_t frame;
void *state;
uint32_t real_input_state[UDP_WORDS_PER_FRAME - 1];
uint32_t simulated_input_state[UDP_WORDS_PER_FRAME - 1];
uint32_t self_state[UDP_WORDS_PER_FRAME - 1];
uint32_t real_input_state[WORDS_PER_FRAME - 1];
uint32_t simulated_input_state[WORDS_PER_FRAME - 1];
uint32_t self_state[WORDS_PER_FRAME - 1];
bool is_simulated;
/* Have we read local input? */
bool have_local;
/* Have we read the real remote input? */
bool have_remote;
/* Is the current state as of self_frame_count using the real remote data? */
bool used_real;
};
@ -56,6 +67,12 @@ struct netplay_callbacks {
bool (*info_cb) (netplay_t *netplay, unsigned frames);
};
enum rarch_netplay_stall_reasons
{
RARCH_NETPLAY_STALL_NONE = 0,
RARCH_NETPLAY_STALL_RUNNING_FAST
};
struct netplay
{
char nick[32];
@ -65,8 +82,6 @@ struct netplay
struct retro_callbacks cbs;
/* TCP connection for state sending, etc. Also used for commands */
int fd;
/* UDP connection for game state updates. */
int udp_fd;
/* Which port is governed by netplay (other user)? */
unsigned port;
bool has_connection;
@ -81,8 +96,8 @@ struct netplay
/* Pointer to where we are reading.
* Generally, other_ptr <= read_ptr <= self_ptr. */
size_t read_ptr;
/* A temporary pointer used on replay. */
size_t tmp_ptr;
/* A pointer used temporarily for replay. */
size_t replay_ptr;
size_t state_size;
@ -90,14 +105,15 @@ struct netplay
bool is_replay;
/* We don't want to poll several times on a frame. */
bool can_poll;
/* If we end up having to drop remote frame data because it's ahead of us, fast-forward is URGENT */
bool must_fast_forward;
/* To compat UDP packet loss we also send
* old data along with the packets. */
uint32_t packet_buffer[UDP_FRAME_PACKETS * UDP_WORDS_PER_FRAME];
uint32_t frame_count;
/* A buffer for outgoing input packets. */
uint32_t packet_buffer[2 + WORDS_PER_FRAME];
uint32_t self_frame_count;
uint32_t read_frame_count;
uint32_t other_frame_count;
uint32_t tmp_frame_count;
uint32_t replay_frame_count;
struct addrinfo *addr;
struct sockaddr_storage their_addr;
bool has_client_addr;
@ -126,6 +142,11 @@ struct netplay
bool pause;
uint32_t pause_frame;
/* And stalling */
uint32_t stall_frames;
int stall;
retro_time_t stall_time;
struct netplay_callbacks* net_cbs;
};
@ -158,4 +179,6 @@ bool netplay_is_server(netplay_t* netplay);
bool netplay_is_spectate(netplay_t* netplay);
bool netplay_delta_frame_ready(netplay_t *netplay, struct delta_frame *delta, uint32_t frame);
#endif