2024-08-02 08:56:49 +02:00
|
|
|
|
using Logging;
|
2024-08-02 17:52:44 +02:00
|
|
|
|
using System.Collections.Concurrent;
|
2024-08-02 08:56:49 +02:00
|
|
|
|
|
|
|
|
|
namespace OverwatchTranscript
|
2024-08-01 16:25:28 +02:00
|
|
|
|
{
|
|
|
|
|
public class BucketSet
|
|
|
|
|
{
|
2024-08-02 10:44:15 +02:00
|
|
|
|
private const int numberOfActiveBuckets = 10;
|
2024-08-02 08:56:49 +02:00
|
|
|
|
private readonly ILog log;
|
|
|
|
|
private readonly string workingDir;
|
2024-08-01 16:25:28 +02:00
|
|
|
|
private readonly object _bucketLock = new object();
|
2024-08-02 14:25:38 +02:00
|
|
|
|
private readonly List<EventBucketWriter> fullBuckets = new List<EventBucketWriter>();
|
|
|
|
|
private readonly List<EventBucketWriter> activeBuckets = new List<EventBucketWriter>();
|
|
|
|
|
private readonly ActionQueue queue = new ActionQueue();
|
2024-08-01 16:25:28 +02:00
|
|
|
|
private int activeBucketIndex = 0;
|
|
|
|
|
private bool closed = false;
|
2024-08-02 08:56:49 +02:00
|
|
|
|
private string internalErrors = string.Empty;
|
|
|
|
|
|
|
|
|
|
public BucketSet(ILog log, string workingDir)
|
2024-08-01 16:25:28 +02:00
|
|
|
|
{
|
2024-08-02 08:56:49 +02:00
|
|
|
|
this.log = log;
|
2024-08-01 16:25:28 +02:00
|
|
|
|
this.workingDir = workingDir;
|
|
|
|
|
|
|
|
|
|
for (var i = 0; i < numberOfActiveBuckets;i++)
|
|
|
|
|
{
|
|
|
|
|
AddNewBucket();
|
|
|
|
|
}
|
2024-08-02 10:44:15 +02:00
|
|
|
|
|
2024-08-02 14:25:38 +02:00
|
|
|
|
queue.Start();
|
2024-08-01 16:25:28 +02:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void Add(DateTime utc, object payload)
|
|
|
|
|
{
|
|
|
|
|
if (closed) throw new Exception("Buckets already closed!");
|
2024-08-02 14:25:38 +02:00
|
|
|
|
queue.Add(() => AddInternal(utc, payload));
|
|
|
|
|
|
|
|
|
|
if (queue.Count > 1000)
|
2024-08-02 10:44:15 +02:00
|
|
|
|
{
|
|
|
|
|
Thread.Sleep(1);
|
|
|
|
|
}
|
2024-08-01 16:25:28 +02:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public IFinalizedBucket[] FinalizeBuckets()
|
|
|
|
|
{
|
|
|
|
|
closed = true;
|
2024-08-02 14:25:38 +02:00
|
|
|
|
queue.StopAndJoin();
|
2024-08-01 16:25:28 +02:00
|
|
|
|
|
2024-08-02 08:56:49 +02:00
|
|
|
|
if (IsEmpty()) throw new Exception("No entries have been added.");
|
|
|
|
|
if (!string.IsNullOrEmpty(internalErrors)) throw new Exception(internalErrors);
|
|
|
|
|
|
2024-08-01 16:25:28 +02:00
|
|
|
|
var buckets = fullBuckets.Concat(activeBuckets).ToArray();
|
2024-08-02 08:56:49 +02:00
|
|
|
|
log.Debug($"Finalizing {buckets.Length} buckets...");
|
2024-08-02 17:52:44 +02:00
|
|
|
|
|
|
|
|
|
var finalized = new ConcurrentBag<IFinalizedBucket>();
|
|
|
|
|
var tasks = Parallel.ForEach(buckets, b => finalized.Add(b.FinalizeBucket()));
|
|
|
|
|
if (!tasks.IsCompleted) throw new Exception("Failed to finalize buckets: " + tasks);
|
|
|
|
|
|
|
|
|
|
return finalized.ToArray();
|
2024-08-01 16:25:28 +02:00
|
|
|
|
}
|
|
|
|
|
|
2024-08-02 08:56:49 +02:00
|
|
|
|
private bool IsEmpty()
|
|
|
|
|
{
|
|
|
|
|
return fullBuckets.All(b => b.Count == 0) && activeBuckets.All(b => b.Count == 0);
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-01 16:25:28 +02:00
|
|
|
|
private void AddInternal(DateTime utc, object payload)
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
lock (_bucketLock)
|
|
|
|
|
{
|
|
|
|
|
var current = activeBuckets[activeBucketIndex];
|
|
|
|
|
current.Add(utc, payload);
|
|
|
|
|
activeBucketIndex = (activeBucketIndex + 1) % numberOfActiveBuckets;
|
|
|
|
|
|
|
|
|
|
if (current.IsFull)
|
|
|
|
|
{
|
2024-08-02 10:44:15 +02:00
|
|
|
|
log.Debug("Bucket is full. New bucket...");
|
2024-08-01 16:25:28 +02:00
|
|
|
|
fullBuckets.Add(current);
|
|
|
|
|
activeBuckets.Remove(current);
|
|
|
|
|
AddNewBucket();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
{
|
2024-08-02 08:56:49 +02:00
|
|
|
|
internalErrors += ex.ToString();
|
2024-08-02 10:44:15 +02:00
|
|
|
|
log.Error(ex.ToString());
|
2024-08-01 16:25:28 +02:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-02 10:44:15 +02:00
|
|
|
|
private static int bucketSizeIndex = 0;
|
|
|
|
|
private static int[] bucketSizes = new[]
|
|
|
|
|
{
|
|
|
|
|
10000,
|
|
|
|
|
15000,
|
|
|
|
|
20000,
|
|
|
|
|
};
|
|
|
|
|
|
2024-08-01 16:25:28 +02:00
|
|
|
|
private void AddNewBucket()
|
|
|
|
|
{
|
|
|
|
|
lock (_bucketLock)
|
|
|
|
|
{
|
2024-08-02 10:44:15 +02:00
|
|
|
|
var size = bucketSizes[bucketSizeIndex];
|
|
|
|
|
bucketSizeIndex = (bucketSizeIndex + 1) % bucketSizes.Length;
|
2024-08-02 14:25:38 +02:00
|
|
|
|
activeBuckets.Add(new EventBucketWriter(log, Path.Combine(workingDir, Guid.NewGuid().ToString()), size));
|
2024-08-01 16:25:28 +02:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|