Refactor server event processing and entity updates

This commit is contained in:
Eero
2026-05-02 00:28:12 +08:00
parent 8b6da6b033
commit 9d6cb5225a
6 changed files with 173 additions and 130 deletions

View File

@@ -1133,6 +1133,9 @@ namespace Barotrauma.Networking
Log(ClientLogName(c) + " has reported an error: " + errorStr, ServerLog.MessageType.Error); Log(ClientLogName(c) + " has reported an error: " + errorStr, ServerLog.MessageType.Error);
GameAnalyticsManager.AddErrorEventOnce("GameServer.HandleClientError:" + errorStrNoName, GameAnalyticsManager.ErrorSeverity.Error, errorStr); GameAnalyticsManager.AddErrorEventOnce("GameServer.HandleClientError:" + errorStrNoName, GameAnalyticsManager.ErrorSeverity.Error, errorStr);
Log(
$"Entity event state at client error: pending={EntityEventManager.PendingCreateEventCount}, queued={EntityEventManager.EventCount}, unique={EntityEventManager.UniqueEventCount}, buffered={EntityEventManager.BufferedEventCount}, lastCreated={EntityEventManager.LastCreatedEventID}",
ServerLog.MessageType.Error);
try try
{ {

View File

@@ -5,12 +5,8 @@ using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks;
using static Barotrauma.EosInterface.Ownership;
// DO NOT TOUCH ANYTHING HERE
// OR EVERYTHING WILL FAIL
namespace Barotrauma.Networking namespace Barotrauma.Networking
{ {
class ServerEntityEvent : NetEntityEvent class ServerEntityEvent : NetEntityEvent
@@ -66,14 +62,42 @@ namespace Barotrauma.Networking
public List<ServerEntityEvent> Events public List<ServerEntityEvent> Events
{ {
get { return events; } get
{
FlushPendingCreates();
return events;
}
} }
public List<ServerEntityEvent> UniqueEvents public List<ServerEntityEvent> UniqueEvents
{ {
get { return uniqueEvents; } get
{
FlushPendingCreates();
return uniqueEvents;
}
} }
public int PendingCreateEventCount => pendingCreateQueue.Count;
public int EventCount
{
get
{
FlushPendingCreates();
return events.Count;
}
}
public int UniqueEventCount
{
get
{
FlushPendingCreates();
return uniqueEvents.Count;
}
}
public int BufferedEventCount => bufferedEvents.Count;
public UInt16 LastCreatedEventID => ID;
private class BufferedEvent private class BufferedEvent
{ {
public readonly Client Sender; public readonly Client Sender;
@@ -124,11 +148,6 @@ namespace Barotrauma.Networking
private readonly ConcurrentQueue<PendingCreateEvent> pendingCreateQueue; private readonly ConcurrentQueue<PendingCreateEvent> pendingCreateQueue;
private readonly Task createEventTask;
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly SemaphoreSlim eventSignal = new SemaphoreSlim(0);
public ServerEntityEventManager(GameServer server) public ServerEntityEventManager(GameServer server)
{ {
events = new List<ServerEntityEvent>(); events = new List<ServerEntityEvent>();
@@ -138,33 +157,24 @@ namespace Barotrauma.Networking
pendingCreateQueue = new ConcurrentQueue<PendingCreateEvent>(); pendingCreateQueue = new ConcurrentQueue<PendingCreateEvent>();
lastWarningTime = -10.0; lastWarningTime = -10.0;
SEM = this; SEM = this;
createEventTask = Task.Run(() => CreateEventProcessorLoop(cancellationTokenSource.Token));
} }
private async Task CreateEventProcessorLoop(CancellationToken token) public void FlushPendingCreates()
{ {
while (!token.IsCancellationRequested) if (GameMain.MainThread != null && Thread.CurrentThread != GameMain.MainThread)
{ {
try throw new InvalidOperationException($"{nameof(ServerEntityEventManager)} pending events must be flushed on the main thread.");
{
await eventSignal.WaitAsync(100, token);
ProcessPendingCreateEvents();
}
catch (OperationCanceledException)
{
break;
}
} }
ProcessPendingCreateEvents();
} }
private void ProcessPendingCreateEvents() private void ProcessPendingCreateEvents()
{ {
// Dequeue and process all pending events currently in the queue. // CreateEntityEvent can be called from parallel update code. The queue keeps
// Use a lock to synchronize modifications to shared lists / ID. // that enqueue path safe, while this method is only called from the main tick
// before reading or writing entity-event state.
while (pendingCreateQueue.TryDequeue(out PendingCreateEvent pending)) while (pendingCreateQueue.TryDequeue(out PendingCreateEvent pending))
{ {
// The original CreateEvent logic (mostly unchanged) but executed under a lock
if (pending == null || pending.Entity == null) { continue; } if (pending == null || pending.Entity == null) { continue; }
var entity = pending.Entity; var entity = pending.Entity;
@@ -216,34 +226,18 @@ namespace Barotrauma.Networking
{ {
if (!ValidateEntity(entity)) { return; } if (!ValidateEntity(entity)) { return; }
// enqueue and let background task handle the rest
pendingCreateQueue.Enqueue(new PendingCreateEvent(entity, extraData)); pendingCreateQueue.Enqueue(new PendingCreateEvent(entity, extraData));
if (eventSignal.CurrentCount == 0)
{
eventSignal.Release();
}
} }
public void Dispose() public void Dispose()
{ {
cancellationTokenSource.Cancel(); ClearPendingCreates();
eventSignal.Release();
try
{
createEventTask?.Wait(2000);
}
catch (AggregateException) { }
finally
{
cancellationTokenSource.Dispose();
eventSignal.Dispose();
}
} }
// Due to intensive access demend and time it takes to refactor, we use try-catch when facing thread-safety issue to skip to next update :(
public void Update(List<Client> clients) public void Update(List<Client> clients)
{ {
FlushPendingCreates();
foreach (BufferedEvent bufferedEvent in bufferedEvents) foreach (BufferedEvent bufferedEvent in bufferedEvents)
{ {
if (bufferedEvent.Character == null || bufferedEvent.Character.IsDead) if (bufferedEvent.Character == null || bufferedEvent.Character.IsDead)
@@ -329,14 +323,7 @@ namespace Barotrauma.Networking
} }
} }
try lastSentToAnyoneTime = events.Find(e => e.ID == lastSentToAnyone)?.CreateTime ?? Timing.TotalTime;
{
lastSentToAnyoneTime = events.ToList().Find(e => e.ID == lastSentToAnyone)?.CreateTime ?? Timing.TotalTime;
}
catch
{
lastSentToAnyoneTime = Timing.TotalTime;
}
if (Timing.TotalTime - lastWarningTime > 5.0 && if (Timing.TotalTime - lastWarningTime > 5.0 &&
@@ -353,15 +340,7 @@ namespace Barotrauma.Networking
clients.Where(c => c.NeedsMidRoundSync).ForEach(c => { if (NetIdUtils.IdMoreRecent(lastSentToAll, c.FirstNewEventID)) lastSentToAll = (ushort)(c.FirstNewEventID - 1); }); clients.Where(c => c.NeedsMidRoundSync).ForEach(c => { if (NetIdUtils.IdMoreRecent(lastSentToAll, c.FirstNewEventID)) lastSentToAll = (ushort)(c.FirstNewEventID - 1); });
ServerEntityEvent firstEventToResend; ServerEntityEvent firstEventToResend = events.Find(e => e.ID == (ushort)(lastSentToAll + 1));
try
{
firstEventToResend = events.Find(e => e.ID == (ushort)(lastSentToAll + 1));
}
catch
{
firstEventToResend = null;
}
if (firstEventToResend != null && if (firstEventToResend != null &&
GameMain.GameSession.RoundDuration > server.ServerSettings.RoundStartSyncDuration && GameMain.GameSession.RoundDuration > server.ServerSettings.RoundStartSyncDuration &&
@@ -450,6 +429,8 @@ namespace Barotrauma.Networking
/// </summary> /// </summary>
public void Write(in SegmentTableWriter<ServerNetSegment> segmentTable, Client client, IWriteMessage msg, out List<NetEntityEvent> sentEvents) public void Write(in SegmentTableWriter<ServerNetSegment> segmentTable, Client client, IWriteMessage msg, out List<NetEntityEvent> sentEvents)
{ {
FlushPendingCreates();
List<NetEntityEvent> eventsToSync = GetEventsToSync(client); List<NetEntityEvent> eventsToSync = GetEventsToSync(client);
if (eventsToSync.Count == 0) if (eventsToSync.Count == 0)
@@ -576,6 +557,8 @@ namespace Barotrauma.Networking
public void InitClientMidRoundSync(Client client) public void InitClientMidRoundSync(Client client)
{ {
FlushPendingCreates();
//no need for midround syncing if no events have been created, //no need for midround syncing if no events have been created,
//or if the first created unique event is still in the event list //or if the first created unique event is still in the event list
if (uniqueEvents.Count == 0 || (events.Count > 0 && events[0].ID == uniqueEvents[0].ID)) if (uniqueEvents.Count == 0 || (events.Count > 0 && events[0].ID == uniqueEvents[0].ID))
@@ -693,6 +676,8 @@ namespace Barotrauma.Networking
public void Clear() public void Clear()
{ {
ClearPendingCreates();
ID = 0; ID = 0;
events.Clear(); events.Clear();
@@ -709,5 +694,10 @@ namespace Barotrauma.Networking
c.LastSentEntityEventID = 0; c.LastSentEntityEventID = 0;
} }
} }
private void ClearPendingCreates()
{
while (pendingCreateQueue.TryDequeue(out _)) { }
}
} }
} }

View File

@@ -2,11 +2,7 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.IO;
using System.Linq; using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
namespace Barotrauma namespace Barotrauma
@@ -39,7 +35,27 @@ namespace Barotrauma
} }
public int ConnectClients public int ConnectClients
{ {
get { return GameMain.Server.ConnectedClients.Count; } get { return GameMain.Server?.ConnectedClients.Count ?? 0; }
}
public int PendingEntityEvents
{
get { return GameMain.Server?.EntityEventManager?.PendingCreateEventCount ?? 0; }
}
public int EntityEvents
{
get { return GameMain.Server?.EntityEventManager?.EventCount ?? 0; }
}
public int UniqueEntityEvents
{
get { return GameMain.Server?.EntityEventManager?.UniqueEventCount ?? 0; }
}
public int BufferedEntityEvents
{
get { return GameMain.Server?.EntityEventManager?.BufferedEventCount ?? 0; }
} }
public double RealTickRate public double RealTickRate
@@ -166,6 +182,10 @@ namespace Barotrauma
$"Character Count: {CharacterCount}\n" + $"Character Count: {CharacterCount}\n" +
$"Clients Count {ConnectClients}\n " + $"Clients Count {ConnectClients}\n " +
$"PhysicsBody Count: {PhysicsBodyCount}\n" + $"PhysicsBody Count: {PhysicsBodyCount}\n" +
$"Entity Events: {EntityEvents}\n" +
$"Unique Entity Events: {UniqueEntityEvents}\n" +
$"Pending Entity Events: {PendingEntityEvents}\n" +
$"Buffered Entity Events: {BufferedEntityEvents}\n" +
$"Tick Rate: {RealTickRate}\n" + $"Tick Rate: {RealTickRate}\n" +
$"Min Tick Rate: {TickRateLow}\n" + $"Min Tick Rate: {TickRateLow}\n" +
$"Max Tick Rate: {TickRateHigh}\n" + $"Max Tick Rate: {TickRateHigh}\n" +

View File

@@ -8,7 +8,6 @@ using System.Linq;
using System.Reflection; using System.Reflection;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Xml.Linq; using System.Xml.Linq;
using static OneOf.Types.TrueFalseOrNull;
namespace Barotrauma namespace Barotrauma
{ {
@@ -643,7 +642,7 @@ namespace Barotrauma
/// </summary> /// </summary>
public static void UpdateAll(float deltaTime, Camera cam, ParallelOptions parallelOptions) public static void UpdateAll(float deltaTime, Camera cam, ParallelOptions parallelOptions)
{ {
Random rand = new Random(); mapEntityUpdateTick++;
#if CLIENT #if CLIENT
var sw = new System.Diagnostics.Stopwatch(); var sw = new System.Diagnostics.Stopwatch();
sw.Start(); sw.Start();
@@ -665,46 +664,50 @@ namespace Barotrauma
while (n > 1) while (n > 1)
{ {
n--; n--;
int k = rand.Next(n + 1); int k = Rand.Int(n + 1);
(gapList[n], gapList[k]) = (gapList[k], gapList[n]); (gapList[n], gapList[k]) = (gapList[k], gapList[n]);
} }
var itemList = Item.ItemList.ToList(); var itemList = Item.ItemList.ToList();
// First phase: parallel updates that have no order dependencies int mapEntityUpdateInterval = Math.Max(MapEntityUpdateInterval, 1);
Parallel.Invoke(parallelOptions, int poweredUpdateInterval = Math.Max(PoweredUpdateInterval, 1);
() =>
{ if (mapEntityUpdateTick % mapEntityUpdateInterval == 0)
Parallel.ForEach(hullList, parallelOptions, hull => {
float mapEntityDeltaTime = deltaTime * mapEntityUpdateInterval;
Parallel.Invoke(parallelOptions,
() =>
{ {
hull.Update(deltaTime, cam); Parallel.ForEach(hullList, parallelOptions, hull =>
}); {
hull.Update(mapEntityDeltaTime, cam);
}, });
// Structure parallel update },
() => () =>
{
Parallel.ForEach(structureList, parallelOptions, structure =>
{ {
structure.Update(deltaTime, cam); Parallel.ForEach(structureList, parallelOptions, structure =>
{
structure.Update(mapEntityDeltaTime, cam);
});
}); });
}, }
() =>
// moved waterflow reset here to see if we can reduce at least some time foreach (Gap gap in gapList)
{ {
// PLEASE WORK gap.ResetWaterFlowThisFrame();
Parallel.ForEach(gapList, parallelOptions, gap => }
{
gap.ResetWaterFlowThisFrame(); foreach (Gap gap in gapList)
gap.Update(deltaTime, cam); {
}); gap.Update(deltaTime, cam);
}, }
// Powered components update
() => if (mapEntityUpdateTick % poweredUpdateInterval == 0)
{ {
Powered.UpdatePower(deltaTime); Powered.UpdatePower(deltaTime * poweredUpdateInterval);
} }
);
#if CLIENT #if CLIENT
// Hull Cheats need to be executed after Hull update // Hull Cheats need to be executed after Hull update
@@ -720,27 +723,42 @@ namespace Barotrauma
// Item update (Item.Update() is not thread-safe and must be executed on the main thread) // Item update (Item.Update() is not thread-safe and must be executed on the main thread)
Item.UpdatePendingConditionUpdates(deltaTime); Item.UpdatePendingConditionUpdates(deltaTime);
Item lastUpdatedItem = null; if (mapEntityUpdateTick % mapEntityUpdateInterval == 0)
try
{ {
foreach (Item item in itemList) float itemDeltaTime = deltaTime * mapEntityUpdateInterval;
Item lastUpdatedItem = null;
try
{ {
lastUpdatedItem = item; foreach (Item item in itemList)
item.Update(deltaTime, cam); {
if (LuaCsSetup.Instance.Game.UpdatePriorityItems.Contains(item)) { continue; }
lastUpdatedItem = item;
item.Update(itemDeltaTime, cam);
}
}
catch (InvalidOperationException e)
{
GameAnalyticsManager.AddErrorEventOnce(
"MapEntity.UpdateAll:ItemUpdateInvalidOperation",
GameAnalyticsManager.ErrorSeverity.Critical,
$"Error while updating item {lastUpdatedItem?.Name ?? "null"}: {e.Message}");
throw new InvalidOperationException($"Error while updating item {lastUpdatedItem?.Name ?? "null"}", innerException: e);
} }
} }
catch (InvalidOperationException e)
foreach (var item in LuaCsSetup.Instance.Game.UpdatePriorityItems)
{ {
GameAnalyticsManager.AddErrorEventOnce( if (item.Removed) { continue; }
"MapEntity.UpdateAll:ItemUpdateInvalidOperation",
GameAnalyticsManager.ErrorSeverity.Critical, item.Update(deltaTime, cam);
$"Error while updating item {lastUpdatedItem?.Name ?? "null"}: {e.Message}");
throw new InvalidOperationException($"Error while updating item {lastUpdatedItem?.Name ?? "null"}", innerException: e);
} }
UpdateAllProjSpecific(deltaTime); if (mapEntityUpdateTick % mapEntityUpdateInterval == 0)
Spawner?.Update(); {
UpdateAllProjSpecific(deltaTime * mapEntityUpdateInterval);
Spawner?.Update();
}
#if CLIENT #if CLIENT
sw.Stop(); sw.Stop();

View File

@@ -281,7 +281,6 @@ namespace Barotrauma
#endif #endif
SingleThreadActionStandbySignal.Wait(); SingleThreadActionStandbySignal.Wait();
try try
{ {
GameMain.World.Step((float)Timing.Step); GameMain.World.Step((float)Timing.Step);
@@ -292,8 +291,10 @@ namespace Barotrauma
DebugConsole.ThrowError(errorMsg, e); DebugConsole.ThrowError(errorMsg, e);
GameAnalyticsManager.AddErrorEventOnce("GameScreen.Update:WorldLockedException" + e.Message, GameAnalyticsManager.ErrorSeverity.Critical, errorMsg); GameAnalyticsManager.AddErrorEventOnce("GameScreen.Update:WorldLockedException" + e.Message, GameAnalyticsManager.ErrorSeverity.Critical, errorMsg);
} }
finally
SingleThreadActionStandbySignal.Release(); {
SingleThreadActionStandbySignal.Release();
}
#if CLIENT #if CLIENT
sw.Stop(); sw.Stop();

View File

@@ -1,9 +1,7 @@
using Barotrauma.Networking; using System;
using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using static Barotrauma.EosInterface.Ownership;
namespace Barotrauma namespace Barotrauma
{ {
@@ -15,7 +13,8 @@ namespace Barotrauma
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly SemaphoreSlim actionSignal = new SemaphoreSlim(0); private readonly SemaphoreSlim actionSignal = new SemaphoreSlim(0);
private static Task WorkerTask; private readonly Task workerTask;
private bool disposed;
public static readonly SemaphoreSlim SingleThreadActionStandbySignal = new SemaphoreSlim(1); public static readonly SemaphoreSlim SingleThreadActionStandbySignal = new SemaphoreSlim(1);
@@ -26,28 +25,35 @@ namespace Barotrauma
public SingleThreadWorker() public SingleThreadWorker()
{ {
ActionQueue = new ConcurrentQueue<Action>(); ActionQueue = new ConcurrentQueue<Action>();
WorkerTask = CreateProcessTask(cancellationTokenSource.Token); workerTask = CreateProcessTask(cancellationTokenSource.Token);
} }
public void Dispose() public void Dispose()
{ {
if (disposed) { return; }
disposed = true;
cancellationTokenSource.Cancel(); cancellationTokenSource.Cancel();
WorkerTask.Wait(); try
WorkerTask.Dispose(); {
Instance = null; actionSignal.Release();
workerTask.Wait(2);
}
catch (AggregateException) { }
catch (ObjectDisposedException) { }
cancellationTokenSource.Dispose(); cancellationTokenSource.Dispose();
actionSignal.Dispose(); actionSignal.Dispose();
SingleThreadActionStandbySignal.Dispose();
} }
private async Task CreateProcessTask(CancellationToken token) private async Task CreateProcessTask(CancellationToken token)
{ {
while (!token.IsCancellationRequested) while (!token.IsCancellationRequested)
{ {
bool lockTaken = false;
try try
{ {
await actionSignal.WaitAsync(100, token); await actionSignal.WaitAsync(100, token);
SingleThreadActionStandbySignal.Wait(CancellationToken.None); SingleThreadActionStandbySignal.Wait(CancellationToken.None);
lockTaken = true;
RunActions(); RunActions();
} }
catch (OperationCanceledException) catch (OperationCanceledException)
@@ -56,7 +62,10 @@ namespace Barotrauma
} }
finally finally
{ {
SingleThreadActionStandbySignal.Release(); if (lockTaken)
{
SingleThreadActionStandbySignal.Release();
}
} }
} }
} }
@@ -68,6 +77,8 @@ namespace Barotrauma
/// <param name="action"></param> /// <param name="action"></param>
public void AddAction(Action action) public void AddAction(Action action)
{ {
if (disposed || action == null) { return; }
// enqueue and let background task handle the rest // enqueue and let background task handle the rest
ActionQueue.Enqueue(action); ActionQueue.Enqueue(action);
@@ -96,7 +107,7 @@ namespace Barotrauma
Console.ForegroundColor = ConsoleColor.Yellow; Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine($"WARNING: Error occurred when running Single Thread Actions." + Console.WriteLine($"WARNING: Error occurred when running Single Thread Actions." +
$"If the server didn't crash or stop responding then this should be fine \n{e}"); $"If the server didn't crash or stop responding then this should be fine \n{e}");
Console.ForegroundColor = Console.ForegroundColor; Console.ForegroundColor = originalForeground;
} }
} }
} }