371 lines
8.8 KiB
C#
Raw Normal View History

2020-12-07 21:19:16 +00:00
using QuantumUNET.Messages;
using System;
2020-12-02 18:40:38 +00:00
using System.Collections.Generic;
using UnityEngine;
namespace QuantumUNET.Transport
2020-12-02 18:40:38 +00:00
{
2020-12-23 12:58:45 +00:00
internal class QChannelBuffer : IDisposable
2020-12-02 18:40:38 +00:00
{
2020-12-03 11:56:32 +00:00
public int NumMsgsOut { get; private set; }
public int NumBufferedMsgsOut { get; private set; }
public int NumBytesOut { get; private set; }
public int NumMsgsIn { get; private set; }
public int NumBytesIn { get; private set; }
public int NumBufferedPerSecond { get; private set; }
public int LastBufferedPerSecond { get; private set; }
public const int MaxPendingPacketCount = 16;
public const int MaxBufferedPackets = 512;
public float MaxDelay = 0.01f;
2020-12-23 12:58:45 +00:00
private readonly QNetworkConnection _connection;
private QChannelPacket _currentPacket;
2020-12-03 11:56:32 +00:00
private float _lastFlushTime;
2020-12-18 20:32:16 +00:00
private readonly byte _channelId;
2020-12-03 11:56:32 +00:00
private int _maxPacketSize;
2020-12-18 20:32:16 +00:00
private readonly bool _isReliable;
2020-12-03 11:56:32 +00:00
private bool _allowFragmentation;
private bool _isBroken;
private int _maxPendingPacketCount;
private const int _maxFreePacketCount = 512;
2020-12-23 12:58:45 +00:00
private readonly Queue<QChannelPacket> _pendingPackets;
private static List<QChannelPacket> _freePackets;
2020-12-03 11:56:32 +00:00
internal static int _pendingPacketCount;
private float _lastBufferedMessageCountTimer = Time.realtimeSinceStartup;
2021-11-20 19:55:54 +00:00
private static readonly QNetworkWriter _sendWriter = new();
private static readonly QNetworkWriter _fragmentWriter = new();
2020-12-03 11:56:32 +00:00
private const int _packetHeaderReserveSize = 100;
private bool _disposed;
2021-11-20 19:55:54 +00:00
internal QNetBuffer _fragmentBuffer = new();
private bool _readingFragment;
2020-12-03 11:56:32 +00:00
2020-12-23 12:58:45 +00:00
public QChannelBuffer(QNetworkConnection conn, int bufferSize, byte cid, bool isReliable, bool isSequenced)
2020-12-02 18:40:38 +00:00
{
2020-12-03 11:56:32 +00:00
_connection = conn;
_maxPacketSize = bufferSize - 100;
2020-12-23 12:58:45 +00:00
_currentPacket = new QChannelPacket(_maxPacketSize, isReliable);
2020-12-03 11:56:32 +00:00
_channelId = cid;
_maxPendingPacketCount = 16;
_isReliable = isReliable;
_allowFragmentation = isReliable && isSequenced;
2020-12-02 18:40:38 +00:00
if (isReliable)
{
2020-12-23 12:58:45 +00:00
_pendingPackets = new Queue<QChannelPacket>();
2020-12-03 11:56:32 +00:00
if (_freePackets == null)
2020-12-02 18:40:38 +00:00
{
2020-12-23 12:58:45 +00:00
_freePackets = new List<QChannelPacket>();
2020-12-02 18:40:38 +00:00
}
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
2020-12-03 11:56:32 +00:00
if (!_disposed && disposing)
2020-12-02 18:40:38 +00:00
{
2020-12-03 11:56:32 +00:00
if (_pendingPackets != null)
2020-12-02 18:40:38 +00:00
{
2020-12-03 11:56:32 +00:00
while (_pendingPackets.Count > 0)
2020-12-02 18:40:38 +00:00
{
2020-12-03 11:56:32 +00:00
_pendingPacketCount--;
var item = _pendingPackets.Dequeue();
if (_freePackets.Count < 512)
2020-12-02 18:40:38 +00:00
{
2020-12-03 11:56:32 +00:00
_freePackets.Add(item);
2020-12-02 18:40:38 +00:00
}
}
2021-06-18 22:39:21 +01:00
2020-12-03 11:56:32 +00:00
_pendingPackets.Clear();
2020-12-02 18:40:38 +00:00
}
}
2021-06-18 22:39:21 +01:00
2020-12-03 11:56:32 +00:00
_disposed = true;
2020-12-02 18:40:38 +00:00
}
2021-12-02 19:58:50 -08:00
public bool SetOption(QChannelOption option, int value)
2020-12-02 18:40:38 +00:00
{
bool result;
2021-12-02 19:58:50 -08:00
if (option != QChannelOption.MaxPendingBuffers)
2020-12-02 18:40:38 +00:00
{
2021-12-02 19:58:50 -08:00
if (option != QChannelOption.AllowFragmentation)
2020-12-02 18:40:38 +00:00
{
2021-12-02 19:58:50 -08:00
if (option != QChannelOption.MaxPacketSize)
2020-12-02 18:40:38 +00:00
{
result = false;
}
2020-12-03 11:56:32 +00:00
else if (!_currentPacket.IsEmpty() || _pendingPackets.Count > 0)
2020-12-02 18:40:38 +00:00
{
Debug.LogError("Cannot set MaxPacketSize after sending data.");
2020-12-02 18:40:38 +00:00
result = false;
}
else if (value <= 0)
{
Debug.LogError("Cannot set MaxPacketSize less than one.");
2020-12-02 18:40:38 +00:00
result = false;
}
2020-12-03 11:56:32 +00:00
else if (value > _maxPacketSize)
2020-12-02 18:40:38 +00:00
{
Debug.LogError(
$"Cannot set MaxPacketSize to greater than the existing maximum ({_maxPacketSize}).");
2020-12-02 18:40:38 +00:00
result = false;
}
else
{
2020-12-23 12:58:45 +00:00
_currentPacket = new QChannelPacket(value, _isReliable);
2020-12-03 11:56:32 +00:00
_maxPacketSize = value;
2020-12-02 18:40:38 +00:00
result = true;
}
}
else
{
2020-12-03 11:56:32 +00:00
_allowFragmentation = value != 0;
2020-12-02 18:40:38 +00:00
result = true;
}
}
2020-12-03 11:56:32 +00:00
else if (!_isReliable)
2020-12-02 18:40:38 +00:00
{
result = false;
}
2021-11-20 19:55:54 +00:00
else if (value is < 0 or >= 512)
2020-12-02 18:40:38 +00:00
{
Debug.LogError(
$"Invalid MaxPendingBuffers for channel {_channelId}. Must be greater than zero and less than {512}");
2020-12-02 18:40:38 +00:00
result = false;
}
else
{
2020-12-03 11:56:32 +00:00
_maxPendingPacketCount = value;
2020-12-02 18:40:38 +00:00
result = true;
}
2021-06-18 22:39:21 +01:00
2020-12-02 18:40:38 +00:00
return result;
}
public void CheckInternalBuffer()
{
2020-12-03 11:56:32 +00:00
if (Time.realtimeSinceStartup - _lastFlushTime > MaxDelay && !_currentPacket.IsEmpty())
2020-12-02 18:40:38 +00:00
{
SendInternalBuffer();
2020-12-03 11:56:32 +00:00
_lastFlushTime = Time.realtimeSinceStartup;
2020-12-02 18:40:38 +00:00
}
2021-06-18 22:39:21 +01:00
2020-12-03 11:56:32 +00:00
if (Time.realtimeSinceStartup - _lastBufferedMessageCountTimer > 1f)
2020-12-02 18:40:38 +00:00
{
2020-12-03 11:56:32 +00:00
LastBufferedPerSecond = NumBufferedPerSecond;
NumBufferedPerSecond = 0;
_lastBufferedMessageCountTimer = Time.realtimeSinceStartup;
2020-12-02 18:40:38 +00:00
}
}
2020-12-23 12:58:45 +00:00
public bool SendWriter(QNetworkWriter writer)
2020-12-02 18:40:38 +00:00
{
2020-12-04 21:04:18 +00:00
var arraySegment = writer.AsArraySegment();
2020-12-02 21:19:10 +00:00
return SendBytes(arraySegment.Array, arraySegment.Count);
2020-12-02 18:40:38 +00:00
}
2020-12-23 12:58:45 +00:00
public bool Send(short msgType, QMessageBase msg)
2020-12-02 18:40:38 +00:00
{
2020-12-03 11:56:32 +00:00
_sendWriter.StartMessage(msgType);
msg.Serialize(_sendWriter);
_sendWriter.FinishMessage();
NumMsgsOut++;
return SendWriter(_sendWriter);
2020-12-02 18:40:38 +00:00
}
2020-12-23 12:58:45 +00:00
internal bool HandleFragment(QNetworkReader reader)
2020-12-02 18:40:38 +00:00
{
bool result;
if (reader.ReadByte() == 0)
{
2020-12-03 11:56:32 +00:00
if (!_readingFragment)
2020-12-02 18:40:38 +00:00
{
2020-12-03 11:56:32 +00:00
_fragmentBuffer.SeekZero();
_readingFragment = true;
2020-12-02 18:40:38 +00:00
}
2021-06-18 22:39:21 +01:00
2020-12-02 21:19:10 +00:00
var array = reader.ReadBytesAndSize();
2020-12-03 11:56:32 +00:00
_fragmentBuffer.WriteBytes(array, (ushort)array.Length);
2020-12-02 18:40:38 +00:00
result = false;
}
else
{
2020-12-03 11:56:32 +00:00
_readingFragment = false;
2020-12-02 18:40:38 +00:00
result = true;
}
2021-06-18 22:39:21 +01:00
2020-12-02 18:40:38 +00:00
return result;
}
internal bool SendFragmentBytes(byte[] bytes, int bytesToSend)
{
2020-12-02 21:19:10 +00:00
var num = 0;
2020-12-02 18:40:38 +00:00
while (bytesToSend > 0)
{
2020-12-03 11:56:32 +00:00
var num2 = Math.Min(bytesToSend, _maxPacketSize - 32);
2020-12-02 21:19:10 +00:00
var array = new byte[num2];
2020-12-02 18:40:38 +00:00
Array.Copy(bytes, num, array, 0, num2);
2020-12-03 11:56:32 +00:00
_fragmentWriter.StartMessage(17);
_fragmentWriter.Write(0);
_fragmentWriter.WriteBytesFull(array);
_fragmentWriter.FinishMessage();
SendWriter(_fragmentWriter);
2020-12-02 18:40:38 +00:00
num += num2;
bytesToSend -= num2;
}
2021-06-18 22:39:21 +01:00
2020-12-03 11:56:32 +00:00
_fragmentWriter.StartMessage(17);
_fragmentWriter.Write(1);
_fragmentWriter.FinishMessage();
SendWriter(_fragmentWriter);
2020-12-02 18:40:38 +00:00
return true;
}
internal bool SendBytes(byte[] bytes, int bytesToSend)
{
bool result;
if (bytesToSend >= 65535)
{
Debug.LogError($"ChannelBuffer:SendBytes cannot send packet larger than {ushort.MaxValue} bytes");
2020-12-02 18:40:38 +00:00
result = false;
}
else if (bytesToSend <= 0)
{
Debug.LogError("ChannelBuffer:SendBytes cannot send zero bytes");
2020-12-02 18:40:38 +00:00
result = false;
}
2020-12-03 11:56:32 +00:00
else if (bytesToSend > _maxPacketSize)
2020-12-02 18:40:38 +00:00
{
2020-12-03 11:56:32 +00:00
if (_allowFragmentation)
2020-12-02 18:40:38 +00:00
{
result = SendFragmentBytes(bytes, bytesToSend);
}
else
{
Debug.LogError(
$"Failed to send big message of {bytesToSend} bytes. The maximum is {_maxPacketSize} bytes on channel:{_channelId}");
2020-12-02 18:40:38 +00:00
result = false;
}
}
2020-12-03 11:56:32 +00:00
else if (!_currentPacket.HasSpace(bytesToSend))
2020-12-02 18:40:38 +00:00
{
2020-12-03 11:56:32 +00:00
if (_isReliable)
2020-12-02 18:40:38 +00:00
{
2020-12-03 11:56:32 +00:00
if (_pendingPackets.Count == 0)
2020-12-02 18:40:38 +00:00
{
2020-12-03 11:56:32 +00:00
if (!_currentPacket.SendToTransport(_connection, _channelId))
2020-12-02 18:40:38 +00:00
{
QueuePacket();
}
2021-06-18 22:39:21 +01:00
2020-12-03 11:56:32 +00:00
_currentPacket.Write(bytes, bytesToSend);
2020-12-02 18:40:38 +00:00
result = true;
}
2020-12-03 11:56:32 +00:00
else if (_pendingPackets.Count >= _maxPendingPacketCount)
2020-12-02 18:40:38 +00:00
{
2020-12-03 11:56:32 +00:00
if (!_isBroken)
2020-12-02 18:40:38 +00:00
{
Debug.LogError($"ChannelBuffer buffer limit of {_pendingPackets.Count} packets reached.");
2020-12-02 18:40:38 +00:00
}
2021-06-18 22:39:21 +01:00
2020-12-03 11:56:32 +00:00
_isBroken = true;
2020-12-02 18:40:38 +00:00
result = false;
}
else
{
QueuePacket();
2020-12-03 11:56:32 +00:00
_currentPacket.Write(bytes, bytesToSend);
2020-12-02 18:40:38 +00:00
result = true;
}
}
2020-12-03 11:56:32 +00:00
else if (!_currentPacket.SendToTransport(_connection, _channelId))
2020-12-02 18:40:38 +00:00
{
Debug.Log($"ChannelBuffer SendBytes no space on unreliable channel {_channelId}");
2020-12-02 18:40:38 +00:00
result = false;
}
else
{
2020-12-03 11:56:32 +00:00
_currentPacket.Write(bytes, bytesToSend);
2020-12-02 18:40:38 +00:00
result = true;
}
}
else
{
2020-12-03 11:56:32 +00:00
_currentPacket.Write(bytes, bytesToSend);
result = MaxDelay != 0f || SendInternalBuffer();
2020-12-02 18:40:38 +00:00
}
2021-06-18 22:39:21 +01:00
2020-12-02 18:40:38 +00:00
return result;
}
private void QueuePacket()
{
2020-12-03 11:56:32 +00:00
_pendingPacketCount++;
_pendingPackets.Enqueue(_currentPacket);
_currentPacket = AllocPacket();
2020-12-02 18:40:38 +00:00
}
2020-12-23 12:58:45 +00:00
private QChannelPacket AllocPacket()
2020-12-02 18:40:38 +00:00
{
2020-12-23 12:58:45 +00:00
QChannelPacket result;
2020-12-03 11:56:32 +00:00
if (_freePackets.Count == 0)
2020-12-02 18:40:38 +00:00
{
2020-12-23 12:58:45 +00:00
result = new QChannelPacket(_maxPacketSize, _isReliable);
2020-12-02 18:40:38 +00:00
}
else
{
2020-12-03 11:56:32 +00:00
var channelPacket = _freePackets[_freePackets.Count - 1];
_freePackets.RemoveAt(_freePackets.Count - 1);
2020-12-02 18:40:38 +00:00
channelPacket.Reset();
result = channelPacket;
}
2021-06-18 22:39:21 +01:00
2020-12-02 18:40:38 +00:00
return result;
}
2020-12-23 12:58:45 +00:00
private static void FreePacket(QChannelPacket packet)
2020-12-02 18:40:38 +00:00
{
2020-12-03 11:56:32 +00:00
if (_freePackets.Count < 512)
2020-12-02 18:40:38 +00:00
{
2020-12-03 11:56:32 +00:00
_freePackets.Add(packet);
2020-12-02 18:40:38 +00:00
}
}
public bool SendInternalBuffer()
{
bool result;
2020-12-03 11:56:32 +00:00
if (_isReliable && _pendingPackets.Count > 0)
2020-12-02 18:40:38 +00:00
{
2020-12-03 11:56:32 +00:00
while (_pendingPackets.Count > 0)
2020-12-02 18:40:38 +00:00
{
2020-12-03 11:56:32 +00:00
var channelPacket = _pendingPackets.Dequeue();
if (!channelPacket.SendToTransport(_connection, _channelId))
2020-12-02 18:40:38 +00:00
{
2020-12-03 11:56:32 +00:00
_pendingPackets.Enqueue(channelPacket);
2020-12-02 18:40:38 +00:00
break;
}
2021-06-18 22:39:21 +01:00
2020-12-03 11:56:32 +00:00
_pendingPacketCount--;
2020-12-02 18:40:38 +00:00
FreePacket(channelPacket);
2020-12-03 11:56:32 +00:00
if (_isBroken && _pendingPackets.Count < _maxPendingPacketCount / 2)
2020-12-02 18:40:38 +00:00
{
Debug.LogWarning("ChannelBuffer recovered from overflow but data was lost.");
2020-12-03 11:56:32 +00:00
_isBroken = false;
2020-12-02 18:40:38 +00:00
}
}
2021-06-18 22:39:21 +01:00
2020-12-02 18:40:38 +00:00
result = true;
}
else
{
2020-12-03 11:56:32 +00:00
result = _currentPacket.SendToTransport(_connection, _channelId);
2020-12-02 18:40:38 +00:00
}
2021-06-18 22:39:21 +01:00
2020-12-02 18:40:38 +00:00
return result;
}
}
2020-12-03 08:28:05 +00:00
}