Reimplemented Netplay spectate mode

Spectate mode is now far more similar to net (normal) mode, and, more
importantly, it works. In addition, spectate mode will not fast-forward
to catch up with the server if it lags too far behind.
This commit is contained in:
Gregor Richards 2016-09-15 23:04:48 -04:00
parent a42be48638
commit ad336df709
6 changed files with 292 additions and 236 deletions

View File

@ -109,20 +109,10 @@ static bool core_init_libretro_cbs(void *data)
/* Force normal poll type for netplay. */
core_poll_type = POLL_TYPE_NORMAL;
if (global->netplay.is_spectate)
{
core.retro_set_input_state(
(global->netplay.is_client ?
input_state_spectate_client : input_state_spectate)
);
}
else
{
core.retro_set_video_refresh(video_frame_net);
core.retro_set_audio_sample(audio_sample_net);
core.retro_set_audio_sample_batch(audio_sample_batch_net);
core.retro_set_input_state(input_state_net);
}
core.retro_set_video_refresh(video_frame_net);
core.retro_set_audio_sample(audio_sample_net);
core.retro_set_audio_sample_batch(audio_sample_batch_net);
core.retro_set_input_state(input_state_net);
#endif
return true;

View File

@ -157,11 +157,14 @@ static bool get_self_input_state(netplay_t *netplay)
netplay->packet_buffer[4] = htonl(state[1]);
netplay->packet_buffer[5] = htonl(state[2]);
if (!socket_send_all_blocking(netplay->fd, netplay->packet_buffer, sizeof(netplay->packet_buffer), false))
if (!netplay->spectate.enabled) /* Spectate sends in its own way */
{
warn_hangup();
netplay->has_connection = false;
return false;
if (!socket_send_all_blocking(netplay->fd, netplay->packet_buffer, sizeof(netplay->packet_buffer), false))
{
warn_hangup();
netplay->has_connection = false;
return false;
}
}
memcpy(ptr->self_state, state, sizeof(state));
@ -550,6 +553,10 @@ static bool netplay_poll(netplay_t *netplay)
get_self_input_state(netplay);
/* No network side in spectate mode */
if (netplay_is_server(netplay) && netplay->spectate.enabled)
return true;
/* Read Netplay input, block if we're configured to stall for input every
* frame */
res = poll_input(netplay, (netplay->stall_frames == 0) && (netplay->read_frame_count <= netplay->self_frame_count));
@ -899,6 +906,41 @@ static bool init_socket(netplay_t *netplay, const char *server, uint16_t port)
return true;
}
static bool netplay_init_buffers(netplay_t *netplay, unsigned frames)
{
unsigned i;
retro_ctx_size_info_t info;
if (!netplay)
return false;
/* * 2 + 1 because:
* Self sits in the middle,
* Other is allowed to drift as much as 'frames' frames behind
* Read is allowed to drift as much as 'frames' frames ahead */
netplay->buffer_size = frames * 2 + 1;
netplay->buffer = (struct delta_frame*)calloc(netplay->buffer_size,
sizeof(*netplay->buffer));
if (!netplay->buffer)
return false;
core_serialize_size(&info);
netplay->state_size = info.size;
for (i = 0; i < netplay->buffer_size; i++)
{
netplay->buffer[i].state = calloc(netplay->state_size, 1);
if (!netplay->buffer[i].state)
return false;
}
return true;
}
/**
* netplay_new:
* @server : IP address of server.
@ -931,6 +973,12 @@ netplay_t *netplay_new(const char *server, uint16_t port,
netplay->stall_frames = frames;
netplay->check_frames = check_frames;
if (!netplay_init_buffers(netplay, frames))
{
free(netplay);
return NULL;
}
if(spectate)
netplay->net_cbs = netplay_get_cbs_spectate();
else
@ -1064,56 +1112,6 @@ void netplay_free(netplay_t *netplay)
free(netplay);
}
static void netplay_set_spectate_input(netplay_t *netplay, int16_t input)
{
if (netplay->spectate.input_ptr >= netplay->spectate.input_sz)
{
netplay->spectate.input_sz++;
netplay->spectate.input_sz *= 2;
netplay->spectate.input = (uint16_t*)realloc(netplay->spectate.input,
netplay->spectate.input_sz * sizeof(uint16_t));
}
netplay->spectate.input[netplay->spectate.input_ptr++] = swap_if_big16(input);
}
int16_t input_state_spectate(unsigned port, unsigned device,
unsigned idx, unsigned id)
{
netplay_t *netplay = (netplay_t*)netplay_data;
int16_t res = netplay->cbs.state_cb(port, device, idx, id);
netplay_set_spectate_input(netplay, res);
return res;
}
static int16_t netplay_get_spectate_input(netplay_t *netplay, bool port,
unsigned device, unsigned idx, unsigned id)
{
int16_t inp;
retro_ctx_input_state_info_t input_info;
if (socket_receive_all_blocking(netplay->fd, (char*)&inp, sizeof(inp)))
return swap_if_big16(inp);
RARCH_ERR("Connection with host was cut.\n");
runloop_msg_queue_push("Connection with host was cut.", 1, 180, true);
input_info.cb = netplay->cbs.state_cb;
core_set_input_state(&input_info);
return netplay->cbs.state_cb(port, device, idx, id);
}
int16_t input_state_spectate_client(unsigned port, unsigned device,
unsigned idx, unsigned id)
{
return netplay_get_spectate_input((netplay_t*)netplay_data, port,
device, idx, id);
}
/**
* netplay_pre_frame:
* @netplay : pointer to netplay object
@ -1132,7 +1130,8 @@ bool netplay_pre_frame(netplay_t *netplay)
/* FIXME: This is an ugly way to learn we're not paused anymore */
netplay_frontend_paused(netplay, false);
}
netplay->net_cbs->pre_frame(netplay);
if (!netplay->net_cbs->pre_frame(netplay))
return false;
return (!netplay->has_connection || (!netplay->stall && !netplay->remote_paused));
}
@ -1164,7 +1163,7 @@ void netplay_frontend_paused(netplay_t *netplay, bool paused)
return;
netplay->local_paused = paused;
if (netplay->has_connection)
if (netplay->has_connection && !netplay->spectate.enabled)
netplay_send_raw_cmd(netplay, paused ? NETPLAY_CMD_PAUSE : NETPLAY_CMD_RESUME, NULL, 0);
}

View File

@ -125,12 +125,6 @@ void audio_sample_net(int16_t left, int16_t right);
size_t audio_sample_batch_net(const int16_t *data, size_t frames);
int16_t input_state_spectate(unsigned port, unsigned device,
unsigned idx, unsigned id);
int16_t input_state_spectate_client(unsigned port, unsigned device,
unsigned idx, unsigned id);
/**
* netplay_new:
* @server : IP address of server.

View File

@ -52,7 +52,7 @@ static void netplay_handle_frame_hash(netplay_t *netplay, struct delta_frame *de
*
* Pre-frame for Netplay (normal version).
**/
static void netplay_net_pre_frame(netplay_t *netplay)
static bool netplay_net_pre_frame(netplay_t *netplay)
{
retro_ctx_serialize_info_t serial_info;
@ -83,6 +83,8 @@ static void netplay_net_pre_frame(netplay_t *netplay)
netplay->can_poll = true;
input_poll_net();
return true;
}
/**
@ -190,34 +192,6 @@ static void netplay_net_post_frame(netplay_t *netplay)
core_unserialize(&serial_info);
}
}
static bool netplay_net_init_buffers(netplay_t *netplay)
{
unsigned i;
retro_ctx_size_info_t info;
if (!netplay)
return false;
netplay->buffer = (struct delta_frame*)calloc(netplay->buffer_size,
sizeof(*netplay->buffer));
if (!netplay->buffer)
return false;
core_serialize_size(&info);
netplay->state_size = info.size;
for (i = 0; i < netplay->buffer_size; i++)
{
netplay->buffer[i].state = calloc(netplay->state_size, 1);
if (!netplay->buffer[i].state)
return false;
}
return true;
}
static bool netplay_net_info_cb(netplay_t* netplay, unsigned frames)
{
@ -232,15 +206,6 @@ static bool netplay_net_info_cb(netplay_t* netplay, unsigned frames)
return false;
}
/* * 2 + 1 because:
* Self sits in the middle,
* Other is allowed to drift as much as 'frames' frames behind
* Read is allowed to drift as much as 'frames' frames ahead */
netplay->buffer_size = frames * 2 + 1;
if (!netplay_net_init_buffers(netplay))
return false;
netplay->has_connection = true;
return true;

View File

@ -66,7 +66,7 @@ struct delta_frame
};
struct netplay_callbacks {
void (*pre_frame) (netplay_t *netplay);
bool (*pre_frame) (netplay_t *netplay);
void (*post_frame)(netplay_t *netplay);
bool (*info_cb) (netplay_t *netplay, unsigned frames);
};
@ -138,6 +138,7 @@ struct netplay
struct {
bool enabled;
int fds[MAX_SPECTATORS];
uint32_t frames[MAX_SPECTATORS];
uint16_t *input;
size_t input_ptr;
size_t input_sz;

View File

@ -1,6 +1,7 @@
/* RetroArch - A frontend for libretro.
* Copyright (C) 2010-2014 - Hans-Kristian Arntzen
* Copyright (C) 2011-2016 - Daniel De Matteis
* Copyright (C) 2016 - Gregor Richards
*
* RetroArch is free software: you can redistribute it and/or modify it under the terms
* of the GNU General Public License as published by the Free Software Found-
@ -14,10 +15,8 @@
* If not, see <http://www.gnu.org/licenses/>.
*/
#include <compat/strl.h>
#include <stdio.h>
#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include <net/net_compat.h>
#include <net/net_socket.h>
@ -25,157 +24,266 @@
#include "netplay_private.h"
#include "../../runloop.h"
#include "retro_assert.h"
#include "../../autosave.h"
/**
* netplay_pre_frame_spectate:
* netplay_spectate_pre_frame:
* @netplay : pointer to netplay object
*
* Pre-frame for Netplay (spectate mode version).
* Pre-frame for Netplay (spectator version).
**/
static void netplay_spectate_pre_frame(netplay_t *netplay)
static bool netplay_spectate_pre_frame(netplay_t *netplay)
{
unsigned i;
uint32_t *header;
int new_fd, idx, bufsize;
size_t header_size;
struct sockaddr_storage their_addr;
socklen_t addr_size;
fd_set fds;
struct timeval tmp_tv = {0};
if (!netplay_is_server(netplay))
return;
FD_ZERO(&fds);
FD_SET(netplay->fd, &fds);
if (socket_select(netplay->fd + 1, &fds, NULL, NULL, &tmp_tv) <= 0)
return;
if (!FD_ISSET(netplay->fd, &fds))
return;
addr_size = sizeof(their_addr);
new_fd = accept(netplay->fd, (struct sockaddr*)&their_addr, &addr_size);
if (new_fd < 0)
if (netplay_is_server(netplay))
{
RARCH_ERR("%s\n", msg_hash_to_str(MSG_FAILED_TO_ACCEPT_INCOMING_SPECTATOR));
return;
}
fd_set fds;
struct timeval tmp_tv = {0};
int new_fd, idx, i;
struct sockaddr_storage their_addr;
socklen_t addr_size;
retro_ctx_serialize_info_t serial_info;
uint32_t header[3];
idx = -1;
for (i = 0; i < MAX_SPECTATORS; i++)
{
if (netplay->spectate.fds[i] == -1)
netplay->can_poll = true;
input_poll_net();
/* Send our input to any connected spectators */
for (i = 0; i < MAX_SPECTATORS; i++)
{
idx = i;
break;
if (netplay->spectate.fds[i] >= 0)
{
netplay->packet_buffer[2] = htonl(netplay->self_frame_count - netplay->spectate.frames[i]);
if (!socket_send_all_blocking(netplay->spectate.fds[i], netplay->packet_buffer, sizeof(netplay->packet_buffer), false))
{
socket_close(netplay->spectate.fds[i]);
netplay->spectate.fds[i] = -1;
}
}
}
}
/* No vacant client streams :( */
if (idx == -1)
/* Check for connections */
FD_ZERO(&fds);
FD_SET(netplay->fd, &fds);
if (socket_select(netplay->fd + 1, &fds, NULL, NULL, &tmp_tv) <= 0)
return true;
if (!FD_ISSET(netplay->fd, &fds))
return true;
addr_size = sizeof(their_addr);
new_fd = accept(netplay->fd, (struct sockaddr*)&their_addr, &addr_size);
if (new_fd < 0)
{
RARCH_ERR("%s\n", msg_hash_to_str(MSG_FAILED_TO_ACCEPT_INCOMING_SPECTATOR));
return true;
}
idx = -1;
for (i = 0; i < MAX_SPECTATORS; i++)
{
if (netplay->spectate.fds[i] == -1)
{
idx = i;
break;
}
}
/* No vacant client streams :( */
if (idx == -1)
{
socket_close(new_fd);
return true;
}
if (!netplay_get_nickname(netplay, new_fd))
{
RARCH_ERR("%s\n", msg_hash_to_str(MSG_FAILED_TO_GET_NICKNAME_FROM_CLIENT));
socket_close(new_fd);
return true;
}
if (!netplay_send_nickname(netplay, new_fd))
{
RARCH_ERR("%s\n", msg_hash_to_str(MSG_FAILED_TO_SEND_NICKNAME_TO_CLIENT));
socket_close(new_fd);
return true;
}
/* Start them at the current frame */
netplay->spectate.frames[idx] = netplay->self_frame_count;
serial_info.data_const = NULL;
serial_info.data = netplay->buffer[netplay->self_ptr].state;
serial_info.size = netplay->state_size;
if (core_serialize(&serial_info))
{
/* Send them the savestate */
header[0] = htonl(NETPLAY_CMD_LOAD_SAVESTATE);
header[1] = htonl(serial_info.size + sizeof(uint32_t));
header[2] = htonl(0);
if (!socket_send_all_blocking(new_fd, header, sizeof(header), false))
{
socket_close(new_fd);
return true;
}
if (!socket_send_all_blocking(new_fd, serial_info.data, serial_info.size, false))
{
socket_close(new_fd);
return true;
}
}
/* And send them this frame's input */
netplay->packet_buffer[2] = htonl(0);
if (!socket_send_all_blocking(new_fd, netplay->packet_buffer, sizeof(netplay->packet_buffer), false))
{
socket_close(new_fd);
return true;
}
netplay->spectate.fds[idx] = new_fd;
}
else
{
socket_close(new_fd);
return;
if (netplay_delta_frame_ready(netplay, &netplay->buffer[netplay->self_ptr], netplay->self_frame_count))
{
/* Mark our own data as already read, so we ignore local input */
netplay->buffer[netplay->self_ptr].have_local = true;
}
netplay->can_poll = true;
input_poll_net();
/* Only proceed if we have data */
if (netplay->read_frame_count <= netplay->self_frame_count)
return false;
}
if (!netplay_get_nickname(netplay, new_fd))
{
RARCH_ERR("%s\n", msg_hash_to_str(MSG_FAILED_TO_GET_NICKNAME_FROM_CLIENT));
socket_close(new_fd);
return;
}
if (!netplay_send_nickname(netplay, new_fd))
{
RARCH_ERR("%s\n", msg_hash_to_str(MSG_FAILED_TO_SEND_NICKNAME_TO_CLIENT));
socket_close(new_fd);
return;
}
header = netplay_bsv_header_generate(&header_size,
netplay_impl_magic());
if (!header)
{
RARCH_ERR("%s\n", msg_hash_to_str(MSG_FAILED_TO_GENERATE_BSV_HEADER));
socket_close(new_fd);
return;
}
bufsize = header_size;
setsockopt(new_fd, SOL_SOCKET, SO_SNDBUF, (const char*)&bufsize,
sizeof(int));
if (!socket_send_all_blocking(new_fd, header, header_size, false))
{
RARCH_ERR("%s\n", msg_hash_to_str(MSG_FAILED_TO_SEND_HEADER_TO_CLIENT));
socket_close(new_fd);
free(header);
return;
}
free(header);
netplay->spectate.fds[idx] = new_fd;
#ifndef HAVE_SOCKET_LEGACY
netplay_log_connection(&their_addr, idx, netplay->other_nick);
#endif
return true;
}
/**
* netplay_post_frame_spectate:
* netplay_spectate_post_frame:
* @netplay : pointer to netplay object
*
* Post-frame for Netplay (spectate mode version).
* We check if we have new input and replay from recorded input.
* Post-frame for Netplay (spectator version).
* Not much here, just fast forward if we're behind the server.
**/
static void netplay_spectate_post_frame(netplay_t *netplay)
{
unsigned i;
netplay->self_ptr = NEXT_PTR(netplay->self_ptr);
netplay->self_frame_count++;
if (!netplay_is_server(netplay))
return;
for (i = 0; i < MAX_SPECTATORS; i++)
if (netplay_is_server(netplay))
{
char msg[128];
/* Not expecting any client data */
netplay->read_ptr = netplay->other_ptr = netplay->self_ptr;
netplay->read_frame_count = netplay->other_frame_count = netplay->self_frame_count;
if (netplay->spectate.fds[i] == -1)
continue;
if (socket_send_all_blocking(netplay->spectate.fds[i],
netplay->spectate.input,
netplay->spectate.input_ptr * sizeof(int16_t),
false))
continue;
RARCH_LOG("Client (#%u) disconnected ...\n", i);
snprintf(msg, sizeof(msg), "Client (#%u) disconnected.", i);
runloop_msg_queue_push(msg, 1, 180, false);
socket_close(netplay->spectate.fds[i]);
netplay->spectate.fds[i] = -1;
break;
}
else
{
/* If we must rewind, it's because we got a save state */
if (netplay->force_rewind)
{
retro_ctx_serialize_info_t serial_info;
netplay->spectate.input_ptr = 0;
/* Replay frames. */
netplay->is_replay = true;
netplay->replay_ptr = netplay->other_ptr;
netplay->replay_frame_count = netplay->other_frame_count;
serial_info.data = NULL;
serial_info.data_const = netplay->buffer[netplay->replay_ptr].state;
serial_info.size = netplay->state_size;
core_unserialize(&serial_info);
while (netplay->replay_frame_count < netplay->self_frame_count)
{
#if defined(HAVE_THREADS)
autosave_lock();
#endif
core_run();
#if defined(HAVE_THREADS)
autosave_unlock();
#endif
netplay->replay_ptr = NEXT_PTR(netplay->replay_ptr);
netplay->replay_frame_count++;
}
netplay->is_replay = false;
netplay->force_rewind = false;
}
/* We're in sync by definition */
if (netplay->read_frame_count < netplay->self_frame_count)
{
netplay->other_ptr = netplay->read_ptr;
netplay->other_frame_count = netplay->read_frame_count;
}
else
{
netplay->other_ptr = netplay->self_ptr;
netplay->other_frame_count = netplay->self_frame_count;
}
/* If the server gets significantly ahead, skip to catch up */
if (netplay->self_frame_count + netplay->stall_frames <= netplay->read_frame_count)
{
/* "Replay" into the future */
netplay->is_replay = true;
netplay->replay_ptr = netplay->self_ptr;
netplay->replay_frame_count = netplay->self_frame_count;
while (netplay->replay_frame_count < netplay->read_frame_count - 1)
{
#if defined(HAVE_THREADS)
autosave_lock();
#endif
core_run();
#if defined(HAVE_THREADS)
autosave_unlock();
#endif
netplay->replay_ptr = NEXT_PTR(netplay->replay_ptr);
netplay->replay_frame_count++;
netplay->self_ptr = netplay->replay_ptr;
netplay->self_frame_count = netplay->replay_frame_count;
}
netplay->is_replay = false;
}
}
}
static bool netplay_spectate_info_cb(netplay_t *netplay, unsigned frames)
static bool netplay_spectate_info_cb(netplay_t* netplay, unsigned frames)
{
unsigned i;
if(netplay_is_server(netplay))
if (netplay_is_server(netplay))
{
if(!netplay_get_info(netplay))
int i;
for (i = 0; i < MAX_SPECTATORS; i++)
{
netplay->spectate.fds[i] = -1;
}
}
else
{
if (!netplay_send_nickname(netplay, netplay->fd))
return false;
if (!netplay_get_nickname(netplay, netplay->fd))
return false;
}
for (i = 0; i < MAX_SPECTATORS; i++)
netplay->spectate.fds[i] = -1;
netplay->has_connection = true;
return true;
}
@ -186,6 +294,5 @@ struct netplay_callbacks* netplay_get_cbs_spectate(void)
&netplay_spectate_post_frame,
&netplay_spectate_info_cb
};
return &cbs;
}