2021-12-02 20:09:07 -08:00

371 lines
8.8 KiB
C#

using QuantumUNET.Messages;
using System;
using System.Collections.Generic;
using UnityEngine;
namespace QuantumUNET.Transport
{
internal class QChannelBuffer : IDisposable
{
public int NumMsgsOut { get; private set; }
public int NumBufferedMsgsOut { get; private set; }
public int NumBytesOut { get; private set; }
public int NumMsgsIn { get; private set; }
public int NumBytesIn { get; private set; }
public int NumBufferedPerSecond { get; private set; }
public int LastBufferedPerSecond { get; private set; }
public const int MaxPendingPacketCount = 16;
public const int MaxBufferedPackets = 512;
public float MaxDelay = 0.01f;
private readonly QNetworkConnection _connection;
private QChannelPacket _currentPacket;
private float _lastFlushTime;
private readonly byte _channelId;
private int _maxPacketSize;
private readonly bool _isReliable;
private bool _allowFragmentation;
private bool _isBroken;
private int _maxPendingPacketCount;
private const int _maxFreePacketCount = 512;
private readonly Queue<QChannelPacket> _pendingPackets;
private static List<QChannelPacket> _freePackets;
internal static int _pendingPacketCount;
private float _lastBufferedMessageCountTimer = Time.realtimeSinceStartup;
private static readonly QNetworkWriter _sendWriter = new();
private static readonly QNetworkWriter _fragmentWriter = new();
private const int _packetHeaderReserveSize = 100;
private bool _disposed;
internal QNetBuffer _fragmentBuffer = new();
private bool _readingFragment;
public QChannelBuffer(QNetworkConnection conn, int bufferSize, byte cid, bool isReliable, bool isSequenced)
{
_connection = conn;
_maxPacketSize = bufferSize - 100;
_currentPacket = new QChannelPacket(_maxPacketSize, isReliable);
_channelId = cid;
_maxPendingPacketCount = 16;
_isReliable = isReliable;
_allowFragmentation = isReliable && isSequenced;
if (isReliable)
{
_pendingPackets = new Queue<QChannelPacket>();
if (_freePackets == null)
{
_freePackets = new List<QChannelPacket>();
}
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!_disposed && disposing)
{
if (_pendingPackets != null)
{
while (_pendingPackets.Count > 0)
{
_pendingPacketCount--;
var item = _pendingPackets.Dequeue();
if (_freePackets.Count < 512)
{
_freePackets.Add(item);
}
}
_pendingPackets.Clear();
}
}
_disposed = true;
}
public bool SetOption(QChannelOption option, int value)
{
bool result;
if (option != QChannelOption.MaxPendingBuffers)
{
if (option != QChannelOption.AllowFragmentation)
{
if (option != QChannelOption.MaxPacketSize)
{
result = false;
}
else if (!_currentPacket.IsEmpty() || _pendingPackets.Count > 0)
{
Debug.LogError("Cannot set MaxPacketSize after sending data.");
result = false;
}
else if (value <= 0)
{
Debug.LogError("Cannot set MaxPacketSize less than one.");
result = false;
}
else if (value > _maxPacketSize)
{
Debug.LogError(
$"Cannot set MaxPacketSize to greater than the existing maximum ({_maxPacketSize}).");
result = false;
}
else
{
_currentPacket = new QChannelPacket(value, _isReliable);
_maxPacketSize = value;
result = true;
}
}
else
{
_allowFragmentation = value != 0;
result = true;
}
}
else if (!_isReliable)
{
result = false;
}
else if (value is < 0 or >= 512)
{
Debug.LogError(
$"Invalid MaxPendingBuffers for channel {_channelId}. Must be greater than zero and less than {512}");
result = false;
}
else
{
_maxPendingPacketCount = value;
result = true;
}
return result;
}
public void CheckInternalBuffer()
{
if (Time.realtimeSinceStartup - _lastFlushTime > MaxDelay && !_currentPacket.IsEmpty())
{
SendInternalBuffer();
_lastFlushTime = Time.realtimeSinceStartup;
}
if (Time.realtimeSinceStartup - _lastBufferedMessageCountTimer > 1f)
{
LastBufferedPerSecond = NumBufferedPerSecond;
NumBufferedPerSecond = 0;
_lastBufferedMessageCountTimer = Time.realtimeSinceStartup;
}
}
public bool SendWriter(QNetworkWriter writer)
{
var arraySegment = writer.AsArraySegment();
return SendBytes(arraySegment.Array, arraySegment.Count);
}
public bool Send(short msgType, QMessageBase msg)
{
_sendWriter.StartMessage(msgType);
msg.Serialize(_sendWriter);
_sendWriter.FinishMessage();
NumMsgsOut++;
return SendWriter(_sendWriter);
}
internal bool HandleFragment(QNetworkReader reader)
{
bool result;
if (reader.ReadByte() == 0)
{
if (!_readingFragment)
{
_fragmentBuffer.SeekZero();
_readingFragment = true;
}
var array = reader.ReadBytesAndSize();
_fragmentBuffer.WriteBytes(array, (ushort)array.Length);
result = false;
}
else
{
_readingFragment = false;
result = true;
}
return result;
}
internal bool SendFragmentBytes(byte[] bytes, int bytesToSend)
{
var num = 0;
while (bytesToSend > 0)
{
var num2 = Math.Min(bytesToSend, _maxPacketSize - 32);
var array = new byte[num2];
Array.Copy(bytes, num, array, 0, num2);
_fragmentWriter.StartMessage(17);
_fragmentWriter.Write(0);
_fragmentWriter.WriteBytesFull(array);
_fragmentWriter.FinishMessage();
SendWriter(_fragmentWriter);
num += num2;
bytesToSend -= num2;
}
_fragmentWriter.StartMessage(17);
_fragmentWriter.Write(1);
_fragmentWriter.FinishMessage();
SendWriter(_fragmentWriter);
return true;
}
internal bool SendBytes(byte[] bytes, int bytesToSend)
{
bool result;
if (bytesToSend >= 65535)
{
Debug.LogError($"ChannelBuffer:SendBytes cannot send packet larger than {ushort.MaxValue} bytes");
result = false;
}
else if (bytesToSend <= 0)
{
Debug.LogError("ChannelBuffer:SendBytes cannot send zero bytes");
result = false;
}
else if (bytesToSend > _maxPacketSize)
{
if (_allowFragmentation)
{
result = SendFragmentBytes(bytes, bytesToSend);
}
else
{
Debug.LogError(
$"Failed to send big message of {bytesToSend} bytes. The maximum is {_maxPacketSize} bytes on channel:{_channelId}");
result = false;
}
}
else if (!_currentPacket.HasSpace(bytesToSend))
{
if (_isReliable)
{
if (_pendingPackets.Count == 0)
{
if (!_currentPacket.SendToTransport(_connection, _channelId))
{
QueuePacket();
}
_currentPacket.Write(bytes, bytesToSend);
result = true;
}
else if (_pendingPackets.Count >= _maxPendingPacketCount)
{
if (!_isBroken)
{
Debug.LogError($"ChannelBuffer buffer limit of {_pendingPackets.Count} packets reached.");
}
_isBroken = true;
result = false;
}
else
{
QueuePacket();
_currentPacket.Write(bytes, bytesToSend);
result = true;
}
}
else if (!_currentPacket.SendToTransport(_connection, _channelId))
{
Debug.Log($"ChannelBuffer SendBytes no space on unreliable channel {_channelId}");
result = false;
}
else
{
_currentPacket.Write(bytes, bytesToSend);
result = true;
}
}
else
{
_currentPacket.Write(bytes, bytesToSend);
result = MaxDelay != 0f || SendInternalBuffer();
}
return result;
}
private void QueuePacket()
{
_pendingPacketCount++;
_pendingPackets.Enqueue(_currentPacket);
_currentPacket = AllocPacket();
}
private QChannelPacket AllocPacket()
{
QChannelPacket result;
if (_freePackets.Count == 0)
{
result = new QChannelPacket(_maxPacketSize, _isReliable);
}
else
{
var channelPacket = _freePackets[_freePackets.Count - 1];
_freePackets.RemoveAt(_freePackets.Count - 1);
channelPacket.Reset();
result = channelPacket;
}
return result;
}
private static void FreePacket(QChannelPacket packet)
{
if (_freePackets.Count < 512)
{
_freePackets.Add(packet);
}
}
public bool SendInternalBuffer()
{
bool result;
if (_isReliable && _pendingPackets.Count > 0)
{
while (_pendingPackets.Count > 0)
{
var channelPacket = _pendingPackets.Dequeue();
if (!channelPacket.SendToTransport(_connection, _channelId))
{
_pendingPackets.Enqueue(channelPacket);
break;
}
_pendingPacketCount--;
FreePacket(channelPacket);
if (_isBroken && _pendingPackets.Count < _maxPendingPacketCount / 2)
{
Debug.LogWarning("ChannelBuffer recovered from overflow but data was lost.");
_isBroken = false;
}
}
result = true;
}
else
{
result = _currentPacket.SendToTransport(_connection, _channelId);
}
return result;
}
}
}