Files
LuaCsForBarotraumaEP/Barotrauma/BarotraumaShared/SharedSource/Networking/ChildServerRelay.cs
NotAlwaysTrue fff157d5ca Added a Performence Monitor for debug
Many multi-threading work in ServerSource
2025-12-19 13:43:12 +08:00

371 lines
13 KiB
C#

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
#if SERVER
using PipeType = System.IO.Pipes.AnonymousPipeClientStream;
#else
using PipeType = System.IO.Pipes.AnonymousPipeServerStream;
#endif
namespace Barotrauma.Networking
{
static partial class ChildServerRelay
{
private static PipeType writeStream;
private static PipeType readStream;
private enum WriteStatus : byte
{
Success = 0x00,
Heartbeat = 0x01,
RequestShutdown = 0xCC,
Crash = 0xFF
}
private static ManualResetEvent writeManualResetEvent;
private enum StatusEnum
{
NeverStarted,
Active,
RequestedShutDown,
ShutDown
}
private static volatile StatusEnum status = StatusEnum.NeverStarted;
public static bool HasShutDown => status is StatusEnum.ShutDown;
private const int ReadBufferSize = MsgConstants.MTU * 2;
private static byte[] readTempBytes;
private static int readIncOffset;
private static int readIncTotal;
private static ConcurrentQueue<byte[]> msgsToWrite;
private static ConcurrentQueue<string> errorsToWrite;
private static ConcurrentQueue<byte[]> msgsToRead;
private static Thread readThread;
private static Thread writeThread;
private static CancellationTokenSource readCancellationToken;
private static void PrivateStart()
{
status = StatusEnum.Active;
readIncOffset = 0;
readIncTotal = 0;
readTempBytes = new byte[ReadBufferSize];
msgsToWrite = new ConcurrentQueue<byte[]>();
errorsToWrite = new ConcurrentQueue<string>();
msgsToRead = new ConcurrentQueue<byte[]>();
readCancellationToken = new CancellationTokenSource();
writeManualResetEvent = new ManualResetEvent(false);
readThread = new Thread(UpdateRead)
{
Name = "ChildServerRelay.ReadThread",
IsBackground = true
};
writeThread = new Thread(UpdateWrite)
{
Name = "ChildServerRelay.WriteThread",
IsBackground = true
};
readThread.Start();
writeThread.Start();
}
private static void PrivateShutDown()
{
if (Thread.CurrentThread != GameMain.MainThread)
{
throw new InvalidOperationException(
$"Cannot call {nameof(ChildServerRelay)}.{nameof(PrivateShutDown)} from a thread other than the main one");
}
if (status is StatusEnum.NeverStarted) { return; }
status = StatusEnum.ShutDown;
writeManualResetEvent?.Set();
readCancellationToken?.Cancel();
readThread?.Join(); readThread = null;
writeThread?.Join(); writeThread = null;
readCancellationToken?.Dispose();
readCancellationToken = null;
readStream?.Dispose(); readStream = null;
writeStream?.Dispose(); writeStream = null;
msgsToRead?.Clear(); msgsToWrite?.Clear();
}
private static Option<int> ReadIncomingMsgs()
{
Task<int> readTask = readStream?.ReadAsync(readTempBytes, 0, readTempBytes.Length, readCancellationToken.Token);
if (readTask is null) { return Option<int>.None(); }
int timeOutMilliseconds = 150;
for (int i = 0; i < 150; i++)
{
if (status is StatusEnum.ShutDown)
{
readCancellationToken?.Cancel();
return Option<int>.None();
}
try
{
if (readTask.IsCompleted || readTask.Wait(timeOutMilliseconds, readCancellationToken.Token))
{
break;
}
}
catch (AggregateException aggregateException)
{
if (aggregateException.InnerException is OperationCanceledException) { return Option<int>.None(); }
CheckPipeConnected(nameof(readStream), readStream);
throw;
}
catch (OperationCanceledException)
{
return Option<int>.None();
}
}
if (readTask.Status == TaskStatus.RanToCompletion)
{
return Option<int>.Some(readTask.Result);
}
bool swallowException =
status is not StatusEnum.Active
&& readTask.Exception?.InnerException is ObjectDisposedException or System.IO.IOException;
if (swallowException)
{
readCancellationToken?.Cancel();
return Option<int>.None();
}
throw new Exception(
$"ChildServerRelay readTask did not run to completion: status was {readTask.Status}.",
readTask.Exception);
}
private static void CheckPipeConnected(string name, PipeType pipe)
{
if (status is StatusEnum.Active && pipe is not { IsConnected: true })
{
string exceptionMsg = $"{name} was disconnected unexpectedly.";
#if CLIENT
if (Process is { HasExited: true, ExitCode: var exitCode })
{
exceptionMsg += $" Child process exit code was {(uint)exitCode:X8}.";
}
else if (Process is { HasExited: false })
{
exceptionMsg += " Child process has not exited.";
}
#endif
#if !DEBUG
throw new Exception(exceptionMsg);
#endif
}
}
static partial void HandleCrashString(string str);
private static void UpdateRead()
{
Span<byte> msgLengthSpan = stackalloc byte[4 + 1];
while (!HasShutDown)
{
CheckPipeConnected(nameof(readStream), readStream);
bool readBytes(Span<byte> readTo)
{
for (int i = 0; i < readTo.Length; i++)
{
if (readIncOffset >= readIncTotal)
{
if (!ReadIncomingMsgs().TryUnwrap(out readIncTotal)) { return false; }
readIncOffset = 0;
if (readIncTotal == 0) { Thread.Yield(); continue; }
}
readTo[i] = readTempBytes[readIncOffset];
readIncOffset++;
}
return true;
}
if (!readBytes(msgLengthSpan)) { status = StatusEnum.ShutDown; break; }
int msgLength = msgLengthSpan[0]
| (msgLengthSpan[1] << 8)
| (msgLengthSpan[2] << 16)
| (msgLengthSpan[3] << 24);
WriteStatus writeStatus = (WriteStatus)msgLengthSpan[4];
byte[] msg = msgLength > 0 ? new byte[msgLength] : Array.Empty<byte>();
if (msg.Length > 0 && !readBytes(msg.AsSpan())) { status = StatusEnum.ShutDown; break; }
switch (writeStatus)
{
case WriteStatus.Success:
msgsToRead.Enqueue(msg);
break;
case WriteStatus.Heartbeat:
//do nothing
break;
case WriteStatus.RequestShutdown:
status = StatusEnum.ShutDown;
break;
case WriteStatus.Crash:
HandleCrashString(Encoding.UTF8.GetString(msg));
status = StatusEnum.ShutDown;
break;
}
Thread.Yield();
}
}
private static void UpdateWrite()
{
while (!HasShutDown)
{
CheckPipeConnected(nameof(writeStream), writeStream);
void writeMsg(WriteStatus writeStatus, byte[] msg)
{
// It's SUPER IMPORTANT that this stack allocation
// remains in this local function and is never inlined,
// because C# is stupid and only calls for deallocation
// when the function returns; placing it in the loop
// this method is based around would lead to a stack
// overflow real quick!
Span<byte> headerBytes = stackalloc byte[4 + 1];
headerBytes[0] = (byte)(msg.Length & 0xFF);
headerBytes[1] = (byte)((msg.Length >> 8) & 0xFF);
headerBytes[2] = (byte)((msg.Length >> 16) & 0xFF);
headerBytes[3] = (byte)((msg.Length >> 24) & 0xFF);
headerBytes[4] = (byte)writeStatus;
try
{
writeStream?.Write(headerBytes);
writeStream?.Write(msg);
}
catch (Exception exception)
{
switch (exception)
{
case ObjectDisposedException _:
case System.IO.IOException _:
if (!HasShutDown)
{
CheckPipeConnected(nameof(writeStream), writeStream);
throw;
}
break;
default:
throw;
};
}
}
if (status is StatusEnum.RequestedShutDown)
{
writeMsg(WriteStatus.RequestShutdown, Array.Empty<byte>());
status = StatusEnum.ShutDown;
}
while (errorsToWrite.TryDequeue(out var error))
{
writeMsg(WriteStatus.Crash, Encoding.UTF8.GetBytes(error));
status = StatusEnum.ShutDown;
}
while (msgsToWrite.TryDequeue(out var msg))
{
writeMsg(WriteStatus.Success, msg);
if (HasShutDown) { break; }
}
if (!HasShutDown)
{
writeManualResetEvent.Reset();
if (!writeManualResetEvent.WaitOne(1000))
{
if (HasShutDown) { return; }
//heartbeat to keep the other end alive
writeMsg(WriteStatus.Heartbeat, Array.Empty<byte>());
}
}
}
}
public static void Write(byte[] msg)
{
if (HasShutDown) { return; }
if (msg.Length > 0x1fff_ffff)
{
//This message is extremely long and is close to breaking
//ChildServerRelay, so let's not allow this to go through!
return;
}
msgsToWrite.Enqueue(msg);
writeManualResetEvent.Set();
}
private static readonly Stopwatch stopwatch = new Stopwatch();
private const int MaxMilliseconds = 8;
public static IEnumerable<byte[]> Read()
{
stopwatch.Restart();
// To avoid the stopwatch somehow experiencing magical overhead that makes it not even
// start the loop within 8ms, use this bool to force at least one iteration.
bool hasIteratedAtLeastOnce = false;
// If it's taken more than 8 milliseconds to read dequeued messages, take
// a break from reading and allow all of the other logic to run for a tick.
// Otherwise the server may overwhelm the host client with redundant messages
// that are being read too slowly.
while (!hasIteratedAtLeastOnce || stopwatch.ElapsedMilliseconds < MaxMilliseconds)
{
hasIteratedAtLeastOnce = true;
if (!ReadSingleMessage(out var msg))
{
// No more messages available to dequeue, we don't need
// to reach 8 milliseconds to know we're done here
break;
}
yield return msg;
}
stopwatch.Stop();
}
private static bool ReadSingleMessage(out byte[] msg)
{
if (HasShutDown) { msg = null; return false; }
return msgsToRead.TryDequeue(out msg);
}
}
}