cs-codex-dist-tests/Framework/OverwatchTranscript/BucketSet.cs

122 lines
3.6 KiB
C#
Raw Normal View History

2024-08-02 06:56:49 +00:00
using Logging;
namespace OverwatchTranscript
2024-08-01 14:25:28 +00:00
{
public class BucketSet
{
private const int numberOfActiveBuckets = 5;
private readonly object _counterLock = new object();
private int pendingAdds = 0;
2024-08-02 06:56:49 +00:00
private readonly ILog log;
private readonly string workingDir;
2024-08-01 14:25:28 +00:00
private readonly object _bucketLock = new object();
private readonly List<EventBucket> fullBuckets = new List<EventBucket>();
private readonly List<EventBucket> activeBuckets = new List<EventBucket>();
private int activeBucketIndex = 0;
private bool closed = false;
2024-08-02 06:56:49 +00:00
private string internalErrors = string.Empty;
public BucketSet(ILog log, string workingDir)
2024-08-01 14:25:28 +00:00
{
2024-08-02 06:56:49 +00:00
this.log = log;
2024-08-01 14:25:28 +00:00
this.workingDir = workingDir;
for (var i = 0; i < numberOfActiveBuckets;i++)
{
AddNewBucket();
}
}
public void Add(DateTime utc, object payload)
{
if (closed) throw new Exception("Buckets already closed!");
AddPending();
Task.Run(() => AddInternal(utc, payload));
}
public IFinalizedBucket[] FinalizeBuckets()
{
closed = true;
WaitForZeroPending();
2024-08-02 06:56:49 +00:00
if (IsEmpty()) throw new Exception("No entries have been added.");
if (!string.IsNullOrEmpty(internalErrors)) throw new Exception(internalErrors);
2024-08-01 14:25:28 +00:00
var buckets = fullBuckets.Concat(activeBuckets).ToArray();
2024-08-02 06:56:49 +00:00
log.Debug($"Finalizing {buckets.Length} buckets...");
2024-08-01 14:25:28 +00:00
return buckets.Select(b => b.FinalizeBucket()).ToArray();
}
2024-08-02 06:56:49 +00:00
private bool IsEmpty()
{
return fullBuckets.All(b => b.Count == 0) && activeBuckets.All(b => b.Count == 0);
}
2024-08-01 14:25:28 +00:00
private void AddInternal(DateTime utc, object payload)
{
try
{
lock (_bucketLock)
{
var current = activeBuckets[activeBucketIndex];
current.Add(utc, payload);
activeBucketIndex = (activeBucketIndex + 1) % numberOfActiveBuckets;
if (current.IsFull)
{
fullBuckets.Add(current);
activeBuckets.Remove(current);
AddNewBucket();
}
RemovePending();
}
}
catch (Exception ex)
{
2024-08-02 06:56:49 +00:00
internalErrors += ex.ToString();
2024-08-01 14:25:28 +00:00
}
}
private void AddNewBucket()
{
lock (_bucketLock)
{
2024-08-02 06:56:49 +00:00
activeBuckets.Add(new EventBucket(log, Path.Combine(workingDir, Guid.NewGuid().ToString())));
2024-08-01 14:25:28 +00:00
}
}
private void AddPending()
{
lock (_counterLock)
{
pendingAdds++;
2024-08-02 06:56:49 +00:00
log.Debug("(+) Pending: " + pendingAdds);
2024-08-01 14:25:28 +00:00
}
}
private void RemovePending()
{
lock (_counterLock)
{
pendingAdds--;
2024-08-02 06:56:49 +00:00
if (pendingAdds < 0) internalErrors += "Pending less than zero";
log.Debug("(-) Pending: " + pendingAdds);
2024-08-01 14:25:28 +00:00
}
}
private void WaitForZeroPending()
{
2024-08-02 06:56:49 +00:00
log.Debug("Wait for zero pending.");
2024-08-01 14:25:28 +00:00
while (true)
{
lock (_counterLock)
{
2024-08-02 06:56:49 +00:00
log.Debug("(wait) Pending: " + pendingAdds);
2024-08-01 14:25:28 +00:00
if (pendingAdds == 0) return;
}
Thread.Sleep(10);
}
}
}
}