mirror of
https://github.com/misternebula/quantum-space-buddies.git
synced 2025-01-18 13:23:05 +00:00
371 lines
8.8 KiB
C#
371 lines
8.8 KiB
C#
using QuantumUNET.Messages;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using UnityEngine;
|
|
|
|
namespace QuantumUNET.Transport
|
|
{
|
|
internal class QChannelBuffer : IDisposable
|
|
{
|
|
public int NumMsgsOut { get; private set; }
|
|
public int NumBufferedMsgsOut { get; private set; }
|
|
public int NumBytesOut { get; private set; }
|
|
public int NumMsgsIn { get; private set; }
|
|
public int NumBytesIn { get; private set; }
|
|
public int NumBufferedPerSecond { get; private set; }
|
|
public int LastBufferedPerSecond { get; private set; }
|
|
public const int MaxPendingPacketCount = 16;
|
|
public const int MaxBufferedPackets = 512;
|
|
public float MaxDelay = 0.01f;
|
|
|
|
private readonly QNetworkConnection _connection;
|
|
private QChannelPacket _currentPacket;
|
|
private float _lastFlushTime;
|
|
private readonly byte _channelId;
|
|
private int _maxPacketSize;
|
|
private readonly bool _isReliable;
|
|
private bool _allowFragmentation;
|
|
private bool _isBroken;
|
|
private int _maxPendingPacketCount;
|
|
private const int _maxFreePacketCount = 512;
|
|
private readonly Queue<QChannelPacket> _pendingPackets;
|
|
private static List<QChannelPacket> _freePackets;
|
|
internal static int _pendingPacketCount;
|
|
private float _lastBufferedMessageCountTimer = Time.realtimeSinceStartup;
|
|
private static readonly QNetworkWriter _sendWriter = new();
|
|
private static readonly QNetworkWriter _fragmentWriter = new();
|
|
private const int _packetHeaderReserveSize = 100;
|
|
private bool _disposed;
|
|
internal QNetBuffer _fragmentBuffer = new();
|
|
private bool _readingFragment;
|
|
|
|
public QChannelBuffer(QNetworkConnection conn, int bufferSize, byte cid, bool isReliable, bool isSequenced)
|
|
{
|
|
_connection = conn;
|
|
_maxPacketSize = bufferSize - 100;
|
|
_currentPacket = new QChannelPacket(_maxPacketSize, isReliable);
|
|
_channelId = cid;
|
|
_maxPendingPacketCount = 16;
|
|
_isReliable = isReliable;
|
|
_allowFragmentation = isReliable && isSequenced;
|
|
if (isReliable)
|
|
{
|
|
_pendingPackets = new Queue<QChannelPacket>();
|
|
if (_freePackets == null)
|
|
{
|
|
_freePackets = new List<QChannelPacket>();
|
|
}
|
|
}
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
Dispose(true);
|
|
GC.SuppressFinalize(this);
|
|
}
|
|
|
|
protected virtual void Dispose(bool disposing)
|
|
{
|
|
if (!_disposed && disposing)
|
|
{
|
|
if (_pendingPackets != null)
|
|
{
|
|
while (_pendingPackets.Count > 0)
|
|
{
|
|
_pendingPacketCount--;
|
|
var item = _pendingPackets.Dequeue();
|
|
if (_freePackets.Count < 512)
|
|
{
|
|
_freePackets.Add(item);
|
|
}
|
|
}
|
|
|
|
_pendingPackets.Clear();
|
|
}
|
|
}
|
|
|
|
_disposed = true;
|
|
}
|
|
|
|
public bool SetOption(QChannelOption option, int value)
|
|
{
|
|
bool result;
|
|
if (option != QChannelOption.MaxPendingBuffers)
|
|
{
|
|
if (option != QChannelOption.AllowFragmentation)
|
|
{
|
|
if (option != QChannelOption.MaxPacketSize)
|
|
{
|
|
result = false;
|
|
}
|
|
else if (!_currentPacket.IsEmpty() || _pendingPackets.Count > 0)
|
|
{
|
|
Debug.LogError("Cannot set MaxPacketSize after sending data.");
|
|
result = false;
|
|
}
|
|
else if (value <= 0)
|
|
{
|
|
Debug.LogError("Cannot set MaxPacketSize less than one.");
|
|
result = false;
|
|
}
|
|
else if (value > _maxPacketSize)
|
|
{
|
|
Debug.LogError(
|
|
$"Cannot set MaxPacketSize to greater than the existing maximum ({_maxPacketSize}).");
|
|
result = false;
|
|
}
|
|
else
|
|
{
|
|
_currentPacket = new QChannelPacket(value, _isReliable);
|
|
_maxPacketSize = value;
|
|
result = true;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
_allowFragmentation = value != 0;
|
|
result = true;
|
|
}
|
|
}
|
|
else if (!_isReliable)
|
|
{
|
|
result = false;
|
|
}
|
|
else if (value is < 0 or >= 512)
|
|
{
|
|
Debug.LogError(
|
|
$"Invalid MaxPendingBuffers for channel {_channelId}. Must be greater than zero and less than {512}");
|
|
result = false;
|
|
}
|
|
else
|
|
{
|
|
_maxPendingPacketCount = value;
|
|
result = true;
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
public void CheckInternalBuffer()
|
|
{
|
|
if (Time.realtimeSinceStartup - _lastFlushTime > MaxDelay && !_currentPacket.IsEmpty())
|
|
{
|
|
SendInternalBuffer();
|
|
_lastFlushTime = Time.realtimeSinceStartup;
|
|
}
|
|
|
|
if (Time.realtimeSinceStartup - _lastBufferedMessageCountTimer > 1f)
|
|
{
|
|
LastBufferedPerSecond = NumBufferedPerSecond;
|
|
NumBufferedPerSecond = 0;
|
|
_lastBufferedMessageCountTimer = Time.realtimeSinceStartup;
|
|
}
|
|
}
|
|
|
|
public bool SendWriter(QNetworkWriter writer)
|
|
{
|
|
var arraySegment = writer.AsArraySegment();
|
|
return SendBytes(arraySegment.Array, arraySegment.Count);
|
|
}
|
|
|
|
public bool Send(short msgType, QMessageBase msg)
|
|
{
|
|
_sendWriter.StartMessage(msgType);
|
|
msg.Serialize(_sendWriter);
|
|
_sendWriter.FinishMessage();
|
|
NumMsgsOut++;
|
|
return SendWriter(_sendWriter);
|
|
}
|
|
|
|
internal bool HandleFragment(QNetworkReader reader)
|
|
{
|
|
bool result;
|
|
if (reader.ReadByte() == 0)
|
|
{
|
|
if (!_readingFragment)
|
|
{
|
|
_fragmentBuffer.SeekZero();
|
|
_readingFragment = true;
|
|
}
|
|
|
|
var array = reader.ReadBytesAndSize();
|
|
_fragmentBuffer.WriteBytes(array, (ushort)array.Length);
|
|
result = false;
|
|
}
|
|
else
|
|
{
|
|
_readingFragment = false;
|
|
result = true;
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
internal bool SendFragmentBytes(byte[] bytes, int bytesToSend)
|
|
{
|
|
var num = 0;
|
|
while (bytesToSend > 0)
|
|
{
|
|
var num2 = Math.Min(bytesToSend, _maxPacketSize - 32);
|
|
var array = new byte[num2];
|
|
Array.Copy(bytes, num, array, 0, num2);
|
|
_fragmentWriter.StartMessage(17);
|
|
_fragmentWriter.Write(0);
|
|
_fragmentWriter.WriteBytesFull(array);
|
|
_fragmentWriter.FinishMessage();
|
|
SendWriter(_fragmentWriter);
|
|
num += num2;
|
|
bytesToSend -= num2;
|
|
}
|
|
|
|
_fragmentWriter.StartMessage(17);
|
|
_fragmentWriter.Write(1);
|
|
_fragmentWriter.FinishMessage();
|
|
SendWriter(_fragmentWriter);
|
|
return true;
|
|
}
|
|
|
|
internal bool SendBytes(byte[] bytes, int bytesToSend)
|
|
{
|
|
bool result;
|
|
if (bytesToSend >= 65535)
|
|
{
|
|
Debug.LogError($"ChannelBuffer:SendBytes cannot send packet larger than {ushort.MaxValue} bytes");
|
|
result = false;
|
|
}
|
|
else if (bytesToSend <= 0)
|
|
{
|
|
Debug.LogError("ChannelBuffer:SendBytes cannot send zero bytes");
|
|
result = false;
|
|
}
|
|
else if (bytesToSend > _maxPacketSize)
|
|
{
|
|
if (_allowFragmentation)
|
|
{
|
|
result = SendFragmentBytes(bytes, bytesToSend);
|
|
}
|
|
else
|
|
{
|
|
Debug.LogError(
|
|
$"Failed to send big message of {bytesToSend} bytes. The maximum is {_maxPacketSize} bytes on channel:{_channelId}");
|
|
result = false;
|
|
}
|
|
}
|
|
else if (!_currentPacket.HasSpace(bytesToSend))
|
|
{
|
|
if (_isReliable)
|
|
{
|
|
if (_pendingPackets.Count == 0)
|
|
{
|
|
if (!_currentPacket.SendToTransport(_connection, _channelId))
|
|
{
|
|
QueuePacket();
|
|
}
|
|
|
|
_currentPacket.Write(bytes, bytesToSend);
|
|
result = true;
|
|
}
|
|
else if (_pendingPackets.Count >= _maxPendingPacketCount)
|
|
{
|
|
if (!_isBroken)
|
|
{
|
|
Debug.LogError($"ChannelBuffer buffer limit of {_pendingPackets.Count} packets reached.");
|
|
}
|
|
|
|
_isBroken = true;
|
|
result = false;
|
|
}
|
|
else
|
|
{
|
|
QueuePacket();
|
|
_currentPacket.Write(bytes, bytesToSend);
|
|
result = true;
|
|
}
|
|
}
|
|
else if (!_currentPacket.SendToTransport(_connection, _channelId))
|
|
{
|
|
Debug.Log($"ChannelBuffer SendBytes no space on unreliable channel {_channelId}");
|
|
result = false;
|
|
}
|
|
else
|
|
{
|
|
_currentPacket.Write(bytes, bytesToSend);
|
|
result = true;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
_currentPacket.Write(bytes, bytesToSend);
|
|
result = MaxDelay != 0f || SendInternalBuffer();
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
private void QueuePacket()
|
|
{
|
|
_pendingPacketCount++;
|
|
_pendingPackets.Enqueue(_currentPacket);
|
|
_currentPacket = AllocPacket();
|
|
}
|
|
|
|
private QChannelPacket AllocPacket()
|
|
{
|
|
QChannelPacket result;
|
|
if (_freePackets.Count == 0)
|
|
{
|
|
result = new QChannelPacket(_maxPacketSize, _isReliable);
|
|
}
|
|
else
|
|
{
|
|
var channelPacket = _freePackets[_freePackets.Count - 1];
|
|
_freePackets.RemoveAt(_freePackets.Count - 1);
|
|
channelPacket.Reset();
|
|
result = channelPacket;
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
private static void FreePacket(QChannelPacket packet)
|
|
{
|
|
if (_freePackets.Count < 512)
|
|
{
|
|
_freePackets.Add(packet);
|
|
}
|
|
}
|
|
|
|
public bool SendInternalBuffer()
|
|
{
|
|
bool result;
|
|
if (_isReliable && _pendingPackets.Count > 0)
|
|
{
|
|
while (_pendingPackets.Count > 0)
|
|
{
|
|
var channelPacket = _pendingPackets.Dequeue();
|
|
if (!channelPacket.SendToTransport(_connection, _channelId))
|
|
{
|
|
_pendingPackets.Enqueue(channelPacket);
|
|
break;
|
|
}
|
|
|
|
_pendingPacketCount--;
|
|
FreePacket(channelPacket);
|
|
if (_isBroken && _pendingPackets.Count < _maxPendingPacketCount / 2)
|
|
{
|
|
Debug.LogWarning("ChannelBuffer recovered from overflow but data was lost.");
|
|
_isBroken = false;
|
|
}
|
|
}
|
|
|
|
result = true;
|
|
}
|
|
else
|
|
{
|
|
result = _currentPacket.SendToTransport(_connection, _channelId);
|
|
}
|
|
|
|
return result;
|
|
}
|
|
}
|
|
} |