mirror of
https://github.com/libretro/RetroArch
synced 2025-03-03 04:14:00 +00:00
Use nonblocking sockets for Netplay to avoid some stalls.
This commit is contained in:
parent
89820220b7
commit
900e5a79ec
@ -1126,6 +1126,7 @@ ifeq ($(HAVE_NETWORKING), 1)
|
||||
network/netplay/netplay_spectate.o \
|
||||
network/netplay/netplay_common.o \
|
||||
network/netplay/netplay_discovery.o \
|
||||
network/netplay/netplay_buf.o \
|
||||
network/netplay/netplay.o
|
||||
|
||||
# Retro Achievements (also depends on threads)
|
||||
|
@ -69,6 +69,9 @@ int socket_select(int nfds, fd_set *readfs, fd_set *writefds,
|
||||
|
||||
int socket_send_all_blocking(int fd, const void *data_, size_t size, bool no_signal);
|
||||
|
||||
ssize_t socket_send_all_nonblocking(int fd, const void *data_, size_t size,
|
||||
bool no_signal);
|
||||
|
||||
int socket_receive_all_blocking(int fd, void *data_, size_t size);
|
||||
|
||||
ssize_t socket_receive_all_nonblocking(int fd, bool *error,
|
||||
|
@ -74,12 +74,9 @@ ssize_t socket_receive_all_nonblocking(int fd, bool *error,
|
||||
const uint8_t *data = (const uint8_t*)data_;
|
||||
ssize_t ret = recv(fd, (char*)data, size, 0);
|
||||
|
||||
if (ret > 0)
|
||||
if (ret >= 0)
|
||||
return ret;
|
||||
|
||||
if (ret == 0)
|
||||
return -1;
|
||||
|
||||
if (isagain(ret))
|
||||
return 0;
|
||||
|
||||
@ -179,6 +176,36 @@ int socket_send_all_blocking(int fd, const void *data_, size_t size,
|
||||
return true;
|
||||
}
|
||||
|
||||
ssize_t socket_send_all_nonblocking(int fd, const void *data_, size_t size,
|
||||
bool no_signal)
|
||||
{
|
||||
const uint8_t *data = (const uint8_t*)data_;
|
||||
ssize_t sent = 0;
|
||||
|
||||
while (size)
|
||||
{
|
||||
ssize_t ret = send(fd, (const char*)data, size,
|
||||
no_signal ? MSG_NOSIGNAL : 0);
|
||||
if (ret < 0)
|
||||
{
|
||||
if (isagain(ret))
|
||||
break;
|
||||
|
||||
return -1;
|
||||
}
|
||||
else if (ret == 0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
data += ret;
|
||||
size -= ret;
|
||||
sent += ret;
|
||||
}
|
||||
|
||||
return sent;
|
||||
}
|
||||
|
||||
bool socket_bind(int fd, void *data)
|
||||
{
|
||||
int yes = 1;
|
||||
|
@ -263,6 +263,9 @@ static bool init_socket(netplay_t *netplay, void *direct_host, const char *serve
|
||||
if (!init_tcp_socket(netplay, direct_host, server, port, netplay->spectate.enabled))
|
||||
return false;
|
||||
|
||||
netplay_clear_socket_buffer(&netplay->send_packet_buffer);
|
||||
netplay_clear_socket_buffer(&netplay->recv_packet_buffer);
|
||||
|
||||
if (netplay->is_server && netplay->nat_traversal)
|
||||
init_nat_traversal(netplay);
|
||||
|
||||
@ -399,17 +402,17 @@ static bool get_self_input_state(netplay_t *netplay)
|
||||
* frame
|
||||
* }
|
||||
*/
|
||||
netplay->packet_buffer[0] = htonl(NETPLAY_CMD_INPUT);
|
||||
netplay->packet_buffer[1] = htonl(WORDS_PER_FRAME * sizeof(uint32_t));
|
||||
netplay->packet_buffer[2] = htonl(netplay->self_frame_count);
|
||||
netplay->packet_buffer[3] = htonl(state[0]);
|
||||
netplay->packet_buffer[4] = htonl(state[1]);
|
||||
netplay->packet_buffer[5] = htonl(state[2]);
|
||||
netplay->input_packet_buffer[0] = htonl(NETPLAY_CMD_INPUT);
|
||||
netplay->input_packet_buffer[1] = htonl(WORDS_PER_FRAME * sizeof(uint32_t));
|
||||
netplay->input_packet_buffer[2] = htonl(netplay->self_frame_count);
|
||||
netplay->input_packet_buffer[3] = htonl(state[0]);
|
||||
netplay->input_packet_buffer[4] = htonl(state[1]);
|
||||
netplay->input_packet_buffer[5] = htonl(state[2]);
|
||||
|
||||
if (!netplay->spectate.enabled) /* Spectate sends in its own way */
|
||||
{
|
||||
if (!socket_send_all_blocking(netplay->fd,
|
||||
netplay->packet_buffer, sizeof(netplay->packet_buffer), false))
|
||||
if (!netplay_send(&netplay->send_packet_buffer, netplay->fd, cmdbuf,
|
||||
sizeof(cmdbuf)))
|
||||
{
|
||||
hangup(netplay);
|
||||
return false;
|
||||
@ -429,11 +432,12 @@ static bool netplay_send_raw_cmd(netplay_t *netplay, uint32_t cmd,
|
||||
cmdbuf[0] = htonl(cmd);
|
||||
cmdbuf[1] = htonl(size);
|
||||
|
||||
if (!socket_send_all_blocking(netplay->fd, cmdbuf, sizeof(cmdbuf), false))
|
||||
if (!netplay_send(&netplay->send_packet_buffer, netplay->fd, cmdbuf,
|
||||
sizeof(cmdbuf)))
|
||||
return false;
|
||||
|
||||
if (size > 0)
|
||||
if (!socket_send_all_blocking(netplay->fd, data, size, false))
|
||||
if (!netplay_send(&netplay->send_packet_buffer, netplay->fd, data, size))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
@ -460,31 +464,46 @@ bool netplay_cmd_request_savestate(netplay_t *netplay)
|
||||
return netplay_send_raw_cmd(netplay, NETPLAY_CMD_REQUEST_SAVESTATE, NULL, 0);
|
||||
}
|
||||
|
||||
static ssize_t netplay_recva(netplay_t *netplay, void *buf, size_t len)
|
||||
{
|
||||
return netplay_recv(&netplay->read_packet_buffer, netplay->fd, buf, len, false);
|
||||
}
|
||||
|
||||
static bool netplay_get_cmd(netplay_t *netplay)
|
||||
{
|
||||
uint32_t cmd;
|
||||
uint32_t flip_frame;
|
||||
uint32_t cmd_size;
|
||||
ssize_t recvd;
|
||||
|
||||
/* FIXME: This depends on delta_frame_ready */
|
||||
|
||||
netplay->timeout_cnt = 0;
|
||||
#define RECV(buf, sz) \
|
||||
recvd = netplay_recva(netplay, (buf), (sz)); \
|
||||
if (recvd >= 0 && recvd < (sz)) goto shrt; \
|
||||
else if (recvd < 0)
|
||||
|
||||
if (!socket_receive_all_blocking(netplay->fd, &cmd, sizeof(cmd)))
|
||||
/* Keep receiving commands until there's no input left */
|
||||
while (true)
|
||||
{
|
||||
|
||||
RECV(&cmd, sizeof(cmd))
|
||||
return false;
|
||||
|
||||
cmd = ntohl(cmd);
|
||||
|
||||
if (!socket_receive_all_blocking(netplay->fd, &cmd_size, sizeof(cmd)))
|
||||
RECV(&cmd_size, sizeof(cmd_size));
|
||||
return false;
|
||||
|
||||
cmd_size = ntohl(cmd_size);
|
||||
|
||||
netplay->timeout_cnt = 0;
|
||||
|
||||
switch (cmd)
|
||||
{
|
||||
case NETPLAY_CMD_ACK:
|
||||
/* Why are we even bothering? */
|
||||
return true;
|
||||
break;
|
||||
|
||||
case NETPLAY_CMD_NAK:
|
||||
/* Disconnect now! */
|
||||
@ -501,7 +520,7 @@ static bool netplay_get_cmd(netplay_t *netplay)
|
||||
return netplay_cmd_nak(netplay);
|
||||
}
|
||||
|
||||
if (!socket_receive_all_blocking(netplay->fd, buffer, sizeof(buffer)))
|
||||
RECV(buffer, sizeof(buffer))
|
||||
{
|
||||
RARCH_ERR("Failed to receive NETPLAY_CMD_INPUT input.\n");
|
||||
return netplay_cmd_nak(netplay);
|
||||
@ -513,7 +532,7 @@ static bool netplay_get_cmd(netplay_t *netplay)
|
||||
if (buffer[0] < netplay->read_frame_count)
|
||||
{
|
||||
/* We already had this, so ignore the new transmission */
|
||||
return true;
|
||||
break;
|
||||
}
|
||||
else if (buffer[0] > netplay->read_frame_count)
|
||||
{
|
||||
@ -527,7 +546,7 @@ static bool netplay_get_cmd(netplay_t *netplay)
|
||||
buffer + 1, sizeof(buffer) - sizeof(uint32_t));
|
||||
netplay->read_ptr = NEXT_PTR(netplay->read_ptr);
|
||||
netplay->read_frame_count++;
|
||||
return true;
|
||||
break;
|
||||
}
|
||||
|
||||
case NETPLAY_CMD_FLIP_PLAYERS:
|
||||
@ -537,8 +556,7 @@ static bool netplay_get_cmd(netplay_t *netplay)
|
||||
return netplay_cmd_nak(netplay);
|
||||
}
|
||||
|
||||
if (!socket_receive_all_blocking(
|
||||
netplay->fd, &flip_frame, sizeof(flip_frame)))
|
||||
RECV(&flip_frame, sizeof(flip_frame))
|
||||
{
|
||||
RARCH_ERR("Failed to receive CMD_FLIP_PLAYERS argument.\n");
|
||||
return netplay_cmd_nak(netplay);
|
||||
@ -565,7 +583,7 @@ static bool netplay_get_cmd(netplay_t *netplay)
|
||||
runloop_msg_queue_push(
|
||||
msg_hash_to_str(MSG_NETPLAY_USERS_HAS_FLIPPED), 1, 180, false);
|
||||
|
||||
return true;
|
||||
break;
|
||||
|
||||
case NETPLAY_CMD_SPECTATE:
|
||||
RARCH_ERR("NETPLAY_CMD_SPECTATE unimplemented.\n");
|
||||
@ -587,7 +605,7 @@ static bool netplay_get_cmd(netplay_t *netplay)
|
||||
return netplay_cmd_nak(netplay);
|
||||
}
|
||||
|
||||
if (!socket_receive_all_blocking(netplay->fd, buffer, sizeof(buffer)))
|
||||
RECV(buffer, sizeof(buffer))
|
||||
{
|
||||
RARCH_ERR("NETPLAY_CMD_CRC failed to receive payload.\n");
|
||||
return netplay_cmd_nak(netplay);
|
||||
@ -614,7 +632,7 @@ static bool netplay_get_cmd(netplay_t *netplay)
|
||||
if (!found)
|
||||
{
|
||||
/* Oh well, we got rid of it! */
|
||||
return true;
|
||||
break;
|
||||
}
|
||||
|
||||
if (buffer[0] <= netplay->other_frame_count)
|
||||
@ -636,14 +654,14 @@ static bool netplay_get_cmd(netplay_t *netplay)
|
||||
netplay->buffer[tmp_ptr].crc = buffer[1];
|
||||
}
|
||||
|
||||
return true;
|
||||
break;
|
||||
}
|
||||
|
||||
case NETPLAY_CMD_REQUEST_SAVESTATE:
|
||||
/* Delay until next frame so we don't send the savestate after the
|
||||
* input */
|
||||
netplay->force_send_savestate = true;
|
||||
return true;
|
||||
break;
|
||||
|
||||
case NETPLAY_CMD_LOAD_SAVESTATE:
|
||||
{
|
||||
@ -684,7 +702,7 @@ static bool netplay_get_cmd(netplay_t *netplay)
|
||||
return netplay_cmd_nak(netplay);
|
||||
}
|
||||
|
||||
if (!socket_receive_all_blocking(netplay->fd, &frame, sizeof(frame)))
|
||||
RECV(&frame, sizeof(frame))
|
||||
{
|
||||
RARCH_ERR("CMD_LOAD_SAVESTATE failed to receive savestate frame.\n");
|
||||
return netplay_cmd_nak(netplay);
|
||||
@ -697,7 +715,7 @@ static bool netplay_get_cmd(netplay_t *netplay)
|
||||
return netplay_cmd_nak(netplay);
|
||||
}
|
||||
|
||||
if (!socket_receive_all_blocking(netplay->fd, &isize, sizeof(isize)))
|
||||
RECV(&isize, sizeof(isize))
|
||||
{
|
||||
RARCH_ERR("CMD_LOAD_SAVESTATE failed to receive inflated size.\n");
|
||||
return netplay_cmd_nak(netplay);
|
||||
@ -710,8 +728,7 @@ static bool netplay_get_cmd(netplay_t *netplay)
|
||||
return netplay_cmd_nak(netplay);
|
||||
}
|
||||
|
||||
if (!socket_receive_all_blocking(netplay->fd,
|
||||
netplay->zbuffer, cmd_size - 2*sizeof(uint32_t)))
|
||||
RECV(netplay->zbuffer, cmd_size - 2*sizeof(uint32_t))
|
||||
{
|
||||
RARCH_ERR("CMD_LOAD_SAVESTATE failed to receive savestate.\n");
|
||||
return netplay_cmd_nak(netplay);
|
||||
@ -742,23 +759,30 @@ static bool netplay_get_cmd(netplay_t *netplay)
|
||||
netplay->savestate_request_outstanding = false;
|
||||
netplay->other_ptr = netplay->read_ptr;
|
||||
netplay->other_frame_count = frame;
|
||||
return true;
|
||||
break;
|
||||
}
|
||||
|
||||
case NETPLAY_CMD_PAUSE:
|
||||
netplay->remote_paused = true;
|
||||
return true;
|
||||
break;
|
||||
|
||||
case NETPLAY_CMD_RESUME:
|
||||
netplay->remote_paused = false;
|
||||
return true;
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
RARCH_ERR("%s.\n", msg_hash_to_str(MSG_UNKNOWN_NETPLAY_COMMAND_RECEIVED));
|
||||
return netplay_cmd_nak(netplay);
|
||||
}
|
||||
|
||||
RARCH_ERR("%s.\n", msg_hash_to_str(MSG_UNKNOWN_NETPLAY_COMMAND_RECEIVED));
|
||||
return netplay_cmd_nak(netplay);
|
||||
netplay_recv_flush(&netplay->recv_packet_buffer);
|
||||
|
||||
}
|
||||
|
||||
shrt:
|
||||
/* No more data, reset and try again */
|
||||
netplay_recv_reset(&netplay->recv_packet_buffer);
|
||||
return true;
|
||||
}
|
||||
|
||||
static int poll_input(netplay_t *netplay, bool block)
|
||||
@ -1262,6 +1286,8 @@ bool netplay_init_serialization(netplay_t *netplay)
|
||||
|
||||
static bool netplay_init_buffers(netplay_t *netplay, unsigned frames)
|
||||
{
|
||||
size_t packet_buffer_size;
|
||||
|
||||
if (!netplay)
|
||||
return false;
|
||||
|
||||
@ -1280,6 +1306,15 @@ static bool netplay_init_buffers(netplay_t *netplay, unsigned frames)
|
||||
if (!(netplay->quirks & NETPLAY_QUIRK_INITIALIZATION))
|
||||
netplay_init_serialization(netplay);
|
||||
|
||||
/* Make our packet buffer big enough for a save state and frames-many frames
|
||||
* of input data, plus the headers for each of them */
|
||||
packet_buffer_size = netplay->state_size + frames * WORDS_PER_FRAME + (frames+1)*3;
|
||||
|
||||
if (!netplay_init_socket_buffer(&netplay->send_packet_buffer, packet_buffer_size))
|
||||
return false;
|
||||
if (!netplay_init_socket_buffer(&netplay->recv_packet_buffer, packet_buffer_size))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -1343,6 +1378,10 @@ netplay_t *netplay_new(void *direct_host, const char *server, uint16_t port,
|
||||
if(!netplay_info_cb(netplay, delay_frames))
|
||||
goto error;
|
||||
|
||||
/* FIXME: Our initial connection should also be nonblocking */
|
||||
if (!socket_nonblock(netplay->fd))
|
||||
goto error;
|
||||
|
||||
return netplay;
|
||||
|
||||
error:
|
||||
@ -1464,6 +1503,9 @@ void netplay_free(netplay_t *netplay)
|
||||
free(netplay->buffer);
|
||||
}
|
||||
|
||||
netplay_deinit_socket_buffer(&netplay->send_packet_buffer);
|
||||
netplay_deinit_socket_buffer(&netplay->recv_packet_buffer);
|
||||
|
||||
if (netplay->zbuffer)
|
||||
free(netplay->zbuffer);
|
||||
|
||||
@ -1542,6 +1584,8 @@ void netplay_post_frame(netplay_t *netplay)
|
||||
{
|
||||
retro_assert(netplay && netplay->net_cbs->post_frame);
|
||||
netplay->net_cbs->post_frame(netplay);
|
||||
if (!netplay_send_flush(&netplay->send_packet_buffer, netplay->fd, false))
|
||||
hangup(netplay);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1559,8 +1603,13 @@ void netplay_frontend_paused(netplay_t *netplay, bool paused)
|
||||
|
||||
netplay->local_paused = paused;
|
||||
if (netplay->has_connection && !netplay->spectate.enabled)
|
||||
{
|
||||
netplay_send_raw_cmd(netplay, paused
|
||||
? NETPLAY_CMD_PAUSE : NETPLAY_CMD_RESUME, NULL, 0);
|
||||
|
||||
/* We're not going to be polled, so we need to flush this command now */
|
||||
netplay_send_flush(&netplay->send_packet_buffer, netplay->fd, true);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1651,14 +1700,15 @@ void netplay_load_savestate(netplay_t *netplay,
|
||||
header[2] = htonl(netplay->self_frame_count);
|
||||
header[3] = htonl(serial_info->size);
|
||||
|
||||
if (!socket_send_all_blocking(netplay->fd, header, sizeof(header), false))
|
||||
if (!netplay_send(&netplay->send_packet_buffer, netplay->fd, header,
|
||||
sizeof(header)))
|
||||
{
|
||||
hangup(netplay);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!socket_send_all_blocking(netplay->fd,
|
||||
netplay->zbuffer, wn, false))
|
||||
if (!netplay_send(&netplay->send_packet_buffer, netplay->fd,
|
||||
netplay->zbuffer, wn))
|
||||
{
|
||||
hangup(netplay);
|
||||
return;
|
||||
|
272
network/netplay/netplay_buf.c
Normal file
272
network/netplay/netplay_buf.c
Normal file
@ -0,0 +1,272 @@
|
||||
/* RetroArch - A frontend for libretro.
|
||||
* Copyright (C) 2016 - Gregor Richards
|
||||
*
|
||||
* RetroArch is free software: you can redistribute it and/or modify it under the terms
|
||||
* of the GNU General Public License as published by the Free Software Found-
|
||||
* ation, either version 3 of the License, or (at your option) any later version.
|
||||
*
|
||||
* RetroArch is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
|
||||
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
|
||||
* PURPOSE. See the GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License along with RetroArch.
|
||||
* If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <net/net_compat.h>
|
||||
#include <net/net_socket.h>
|
||||
|
||||
#include "netplay_private.h"
|
||||
|
||||
static size_t buf_used(struct socket_buffer *sbuf)
|
||||
{
|
||||
if (sbuf->end < sbuf->start)
|
||||
{
|
||||
size_t newend = sbuf->end;
|
||||
while (newend < sbuf->start) newend += sbuf->bufsz;
|
||||
return newend - sbuf->start;
|
||||
}
|
||||
|
||||
return sbuf->end - sbuf->start;
|
||||
}
|
||||
|
||||
static size_t buf_unread(struct socket_buffer *sbuf)
|
||||
{
|
||||
if (sbuf->end < sbuf->read)
|
||||
{
|
||||
size_t newend = sbuf->end;
|
||||
while (newend < sbuf->read) newend += sbuf->bufsz;
|
||||
return newend - sbuf->read;
|
||||
}
|
||||
|
||||
return sbuf->end - sbuf->read;
|
||||
}
|
||||
|
||||
static size_t buf_remaining(struct socket_buffer *sbuf)
|
||||
{
|
||||
return sbuf->bufsz - buf_used(sbuf) - 1;
|
||||
}
|
||||
|
||||
bool netplay_init_socket_buffer(struct socket_buffer *sbuf, size_t size)
|
||||
{
|
||||
sbuf->data = malloc(size);
|
||||
if (sbuf->data == NULL)
|
||||
return false;
|
||||
sbuf->bufsz = size;
|
||||
sbuf->start = sbuf->read = sbuf->end = 0;
|
||||
return true;
|
||||
}
|
||||
|
||||
void netplay_deinit_socket_buffer(struct socket_buffer *sbuf)
|
||||
{
|
||||
if (sbuf->data)
|
||||
free(sbuf->data);
|
||||
}
|
||||
|
||||
void netplay_clear_socket_buffer(struct socket_buffer *sbuf)
|
||||
{
|
||||
sbuf->start = sbuf->read = sbuf->end = 0;
|
||||
}
|
||||
|
||||
bool netplay_send(struct socket_buffer *sbuf, int sockfd, const void *buf, size_t len)
|
||||
{
|
||||
if (buf_remaining(sbuf) < len)
|
||||
{
|
||||
/* Need to force a blocking send */
|
||||
if (!netplay_send_flush(sbuf, sockfd, true))
|
||||
return false;
|
||||
}
|
||||
|
||||
if (buf_remaining(sbuf) < len)
|
||||
{
|
||||
/* Can only be that this is simply too big for our buffer, in which case
|
||||
* we just need to do a blocking send */
|
||||
if (!socket_send_all_blocking(sockfd, buf, len, false))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
/* Copy it into our buffer */
|
||||
if (sbuf->bufsz - sbuf->end < len)
|
||||
{
|
||||
/* Half at a time */
|
||||
size_t chunka = sbuf->bufsz - sbuf->end,
|
||||
chunkb = len - chunka;
|
||||
memcpy(sbuf->data + sbuf->end, buf, chunka);
|
||||
memcpy(sbuf->data, (const unsigned char *) buf + chunka, chunkb);
|
||||
sbuf->end = chunkb;
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Straight in */
|
||||
memcpy(sbuf->data + sbuf->end, buf, len);
|
||||
sbuf->end += len;
|
||||
|
||||
}
|
||||
|
||||
/* Flush what we can immediately */
|
||||
return netplay_send_flush(sbuf, sockfd, false);
|
||||
}
|
||||
|
||||
bool netplay_send_flush(struct socket_buffer *sbuf, int sockfd, bool block)
|
||||
{
|
||||
ssize_t sent;
|
||||
|
||||
if (buf_used(sbuf) == 0)
|
||||
return true;
|
||||
|
||||
if (sbuf->end > sbuf->start)
|
||||
{
|
||||
/* Usual case: Everything's in order */
|
||||
if (block)
|
||||
{
|
||||
if (!socket_send_all_blocking(sockfd, sbuf->data + sbuf->start, buf_used(sbuf), false))
|
||||
return false;
|
||||
sbuf->start = sbuf->end = 0;
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
sent = socket_send_all_nonblocking(sockfd, sbuf->data + sbuf->start, buf_used(sbuf), false);
|
||||
if (sent < 0)
|
||||
return false;
|
||||
sbuf->start += sent;
|
||||
|
||||
if (sbuf->start == sbuf->end)
|
||||
sbuf->start = sbuf->end = 0;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Unusual case: Buffer overlaps break */
|
||||
if (block)
|
||||
{
|
||||
if (!socket_send_all_blocking(sockfd, sbuf->data + sbuf->start, sbuf->bufsz - sbuf->start, false))
|
||||
return false;
|
||||
sbuf->start = 0;
|
||||
return netplay_send_flush(sbuf, sockfd, true);
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
sent = socket_send_all_nonblocking(sockfd, sbuf->data + sbuf->start, sbuf->bufsz - sbuf->start, false);
|
||||
if (sent < 0)
|
||||
return false;
|
||||
sbuf->start += sent;
|
||||
|
||||
if (sbuf->start >= sbuf->bufsz)
|
||||
{
|
||||
sbuf->start = 0;
|
||||
return netplay_send_flush(sbuf, sockfd, false);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
ssize_t netplay_recv(struct socket_buffer *sbuf, int sockfd, void *buf, size_t len, bool block)
|
||||
{
|
||||
bool error;
|
||||
ssize_t recvd;
|
||||
|
||||
/* Receive whatever we can into the buffer */
|
||||
if (sbuf->end > sbuf->start)
|
||||
{
|
||||
error = false;
|
||||
recvd = socket_receive_all_nonblocking(sockfd, &error,
|
||||
sbuf->data + sbuf->end, sbuf->bufsz - sbuf->end -
|
||||
((sbuf->start == 0) ? 1 : 0));
|
||||
if (recvd < 0 || error)
|
||||
return -1;
|
||||
sbuf->end += recvd;
|
||||
if (sbuf->end >= sbuf->bufsz)
|
||||
{
|
||||
sbuf->end = 0;
|
||||
error = false;
|
||||
recvd = socket_receive_all_nonblocking(sockfd, &error, sbuf->data, sbuf->start - 1);
|
||||
if (recvd < 0 || error)
|
||||
return -1;
|
||||
sbuf->end += recvd;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
error = false;
|
||||
recvd = socket_receive_all_nonblocking(sockfd, &error, sbuf->data + sbuf->end, sbuf->start - sbuf->end - 1);
|
||||
if (recvd < 0 || error)
|
||||
return -1;
|
||||
sbuf->end += recvd;
|
||||
|
||||
}
|
||||
|
||||
/* Now copy it into the reader */
|
||||
if (sbuf->end > sbuf->read || (sbuf->bufsz - sbuf->read) >= len)
|
||||
{
|
||||
size_t unread = buf_unread(sbuf);
|
||||
if (len <= unread)
|
||||
{
|
||||
memcpy(buf, sbuf->data + sbuf->read, len);
|
||||
sbuf->read += len;
|
||||
if (sbuf->read >= sbuf->bufsz)
|
||||
sbuf->read = 0;
|
||||
recvd = len;
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
memcpy(buf, sbuf->data + sbuf->read, unread);
|
||||
sbuf->read += unread;
|
||||
if (sbuf->read >= sbuf->bufsz)
|
||||
sbuf->read = 0;
|
||||
recvd = unread;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Our read goes around the edge */
|
||||
size_t chunka = sbuf->bufsz - sbuf->read,
|
||||
pchunklen = len - chunka,
|
||||
chunkb = (pchunklen >= sbuf->end) ? sbuf->end : pchunklen;
|
||||
memcpy(buf, sbuf->data + sbuf->read, chunka);
|
||||
memcpy((unsigned char *) buf + chunka, sbuf->data, chunkb);
|
||||
sbuf->read = chunkb;
|
||||
recvd = chunka + chunkb;
|
||||
|
||||
}
|
||||
|
||||
/* Perhaps block for more data */
|
||||
if (block)
|
||||
{
|
||||
sbuf->start = sbuf->read;
|
||||
if (recvd < len)
|
||||
{
|
||||
if (!socket_receive_all_blocking(sockfd, (unsigned char *) buf + recvd, len - recvd))
|
||||
return -1;
|
||||
recvd = len;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
return recvd;
|
||||
}
|
||||
|
||||
void netplay_recv_reset(struct socket_buffer *sbuf)
|
||||
{
|
||||
sbuf->read = sbuf->start;
|
||||
}
|
||||
|
||||
void netplay_recv_flush(struct socket_buffer *sbuf)
|
||||
{
|
||||
sbuf->start = sbuf->read;
|
||||
}
|
@ -157,11 +157,22 @@ static bool netplay_net_pre_frame(netplay_t *netplay)
|
||||
RARCH_WARN("Cannot set Netplay port to close-on-exec. It may fail to reopen if the client disconnects.\n");
|
||||
#endif
|
||||
|
||||
netplay_clear_socket_buffer(&netplay->send_packet_buffer);
|
||||
netplay_clear_socket_buffer(&netplay->recv_packet_buffer);
|
||||
|
||||
/* Establish the connection */
|
||||
if (netplay_handshake(netplay))
|
||||
{
|
||||
netplay->has_connection = true;
|
||||
|
||||
/* FIXME: Not the best place for this, needs to happen after initial
|
||||
* connection in get_info */
|
||||
if (!socket_nonblock(netplay->fd))
|
||||
{
|
||||
free(netplay);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Send them the savestate */
|
||||
if (!(netplay->quirks & (NETPLAY_QUIRK_NO_SAVESTATES|NETPLAY_QUIRK_NO_TRANSMISSION)))
|
||||
{
|
||||
|
@ -103,6 +103,14 @@ struct delta_frame
|
||||
bool used_real;
|
||||
};
|
||||
|
||||
struct socket_buffer
|
||||
{
|
||||
unsigned char *data;
|
||||
size_t bufsz;
|
||||
size_t start, end;
|
||||
size_t read;
|
||||
};
|
||||
|
||||
struct netplay_callbacks {
|
||||
bool (*pre_frame) (netplay_t *netplay);
|
||||
void (*post_frame)(netplay_t *netplay);
|
||||
@ -180,11 +188,18 @@ struct netplay
|
||||
bool savestate_request_outstanding;
|
||||
|
||||
/* A buffer for outgoing input packets. */
|
||||
uint32_t packet_buffer[2 + WORDS_PER_FRAME];
|
||||
uint32_t input_packet_buffer[2 + WORDS_PER_FRAME];
|
||||
|
||||
/* And buffers for sending and receiving our actual data */
|
||||
struct socket_buffer send_packet_buffer, recv_packet_buffer;
|
||||
|
||||
/* All of our frame counts */
|
||||
uint32_t self_frame_count;
|
||||
uint32_t read_frame_count;
|
||||
uint32_t other_frame_count;
|
||||
uint32_t replay_frame_count;
|
||||
|
||||
/* And socket info */
|
||||
struct addrinfo *addr;
|
||||
struct sockaddr_storage their_addr;
|
||||
bool has_client_addr;
|
||||
@ -263,4 +278,20 @@ bool netplay_cmd_request_savestate(netplay_t *netplay);
|
||||
|
||||
bool netplay_lan_ad_server(netplay_t *netplay);
|
||||
|
||||
bool netplay_init_socket_buffer(struct socket_buffer *sbuf, size_t size);
|
||||
|
||||
void netplay_deinit_socket_buffer(struct socket_buffer *sbuf);
|
||||
|
||||
void netplay_clear_socket_buffer(struct socket_buffer *sbuf);
|
||||
|
||||
bool netplay_send(struct socket_buffer *sbuf, int sockfd, const void *buf, size_t len);
|
||||
|
||||
bool netplay_send_flush(struct socket_buffer *sbuf, int sockfd, bool block);
|
||||
|
||||
ssize_t netplay_recv(struct socket_buffer *sbuf, int sockfd, void *buf, size_t len, bool block);
|
||||
|
||||
void netplay_recv_reset(struct socket_buffer *sbuf);
|
||||
|
||||
void netplay_recv_flush(struct socket_buffer *sbuf);
|
||||
|
||||
#endif
|
||||
|
@ -54,8 +54,8 @@ static bool netplay_spectate_pre_frame(netplay_t *netplay)
|
||||
{
|
||||
if (netplay->spectate.fds[i] >= 0)
|
||||
{
|
||||
netplay->packet_buffer[2] = htonl(netplay->self_frame_count - netplay->spectate.frames[i]);
|
||||
if (!socket_send_all_blocking(netplay->spectate.fds[i], netplay->packet_buffer, sizeof(netplay->packet_buffer), false))
|
||||
netplay->input_packet_buffer[2] = htonl(netplay->self_frame_count - netplay->spectate.frames[i]);
|
||||
if (!socket_send_all_blocking(netplay->spectate.fds[i], netplay->input_packet_buffer, sizeof(netplay->input_packet_buffer), false))
|
||||
{
|
||||
socket_close(netplay->spectate.fds[i]);
|
||||
netplay->spectate.fds[i] = -1;
|
||||
@ -146,8 +146,8 @@ static bool netplay_spectate_pre_frame(netplay_t *netplay)
|
||||
}
|
||||
|
||||
/* And send them this frame's input */
|
||||
netplay->packet_buffer[2] = htonl(0);
|
||||
if (!socket_send_all_blocking(new_fd, netplay->packet_buffer, sizeof(netplay->packet_buffer), false))
|
||||
netplay->input_packet_buffer[2] = htonl(0);
|
||||
if (!socket_send_all_blocking(new_fd, netplay->input_packet_buffer, sizeof(netplay->input_packet_buffer), false))
|
||||
{
|
||||
socket_close(new_fd);
|
||||
return true;
|
||||
|
Loading…
x
Reference in New Issue
Block a user