working but very slow moment aggregation

This commit is contained in:
benbierens 2024-08-02 10:44:15 +02:00
parent f6cd9db408
commit b3710f26ae
No known key found for this signature in database
GPG Key ID: 877D2C2E09A22F3A
8 changed files with 191 additions and 51 deletions

View File

@ -4,9 +4,10 @@ namespace OverwatchTranscript
{
public class BucketSet
{
private const int numberOfActiveBuckets = 5;
private readonly object _counterLock = new object();
private int pendingAdds = 0;
private const int numberOfActiveBuckets = 10;
private readonly object queueLock = new object();
private List<Action> queue = new List<Action>();
private readonly Task queueWorker;
private readonly ILog log;
private readonly string workingDir;
private readonly object _bucketLock = new object();
@ -25,19 +26,31 @@ namespace OverwatchTranscript
{
AddNewBucket();
}
queueWorker = Task.Run(QueueWorker);
}
public void Add(DateTime utc, object payload)
{
if (closed) throw new Exception("Buckets already closed!");
AddPending();
Task.Run(() => AddInternal(utc, payload));
int count = 0;
lock (queueLock)
{
queue.Add(() => AddInternal(utc, payload));
count = queue.Count;
}
if (count > 1000)
{
Thread.Sleep(1);
}
}
public IFinalizedBucket[] FinalizeBuckets()
{
closed = true;
WaitForZeroPending();
WaitForZeroQueue();
queueWorker.Wait();
if (IsEmpty()) throw new Exception("No entries have been added.");
if (!string.IsNullOrEmpty(internalErrors)) throw new Exception(internalErrors);
@ -64,55 +77,68 @@ namespace OverwatchTranscript
if (current.IsFull)
{
log.Debug("Bucket is full. New bucket...");
fullBuckets.Add(current);
activeBuckets.Remove(current);
AddNewBucket();
}
RemovePending();
}
}
catch (Exception ex)
{
internalErrors += ex.ToString();
log.Error(ex.ToString());
}
}
private static int bucketSizeIndex = 0;
private static int[] bucketSizes = new[]
{
10000,
15000,
20000,
};
private void AddNewBucket()
{
lock (_bucketLock)
{
activeBuckets.Add(new EventBucket(log, Path.Combine(workingDir, Guid.NewGuid().ToString())));
var size = bucketSizes[bucketSizeIndex];
bucketSizeIndex = (bucketSizeIndex + 1) % bucketSizes.Length;
activeBuckets.Add(new EventBucket(log, Path.Combine(workingDir, Guid.NewGuid().ToString()), size));
}
}
private void AddPending()
private void QueueWorker()
{
lock (_counterLock)
while (true)
{
pendingAdds++;
log.Debug("(+) Pending: " + pendingAdds);
List<Action> work = null!;
lock (queueLock)
{
work = queue;
queue = new List<Action>();
}
if (closed && !work.Any()) return;
foreach (var action in work)
{
action();
}
Thread.Sleep(0);
}
}
private void RemovePending()
{
lock (_counterLock)
{
pendingAdds--;
if (pendingAdds < 0) internalErrors += "Pending less than zero";
log.Debug("(-) Pending: " + pendingAdds);
}
}
private void WaitForZeroPending()
private void WaitForZeroQueue()
{
log.Debug("Wait for zero pending.");
while (true)
{
lock (_counterLock)
lock (queueLock)
{
log.Debug("(wait) Pending: " + pendingAdds);
if (pendingAdds == 0) return;
log.Debug("(wait) Pending: " + queue.Count);
if (queue.Count == 0) return;
}
Thread.Sleep(10);
}

View File

@ -5,20 +5,21 @@ namespace OverwatchTranscript
{
public class EventBucket : IFinalizedBucket
{
private const int MaxCount = 10000;
private const int MaxBuffer = 100;
private const int MaxBuffer = 1000;
private readonly object _lock = new object();
private bool closed = false;
private readonly ILog log;
private readonly string bucketFile;
private readonly int maxCount;
private readonly List<EventBucketEntry> buffer = new List<EventBucketEntry>();
private EventBucketEntry? topEntry;
public EventBucket(ILog log, string bucketFile)
public EventBucket(ILog log, string bucketFile, int maxCount)
{
this.log = log;
this.bucketFile = bucketFile;
this.maxCount = maxCount;
if (File.Exists(bucketFile)) throw new Exception("Already exists");
log.Debug("Bucket open: " + bucketFile);
@ -93,7 +94,7 @@ namespace OverwatchTranscript
};
Count++;
IsFull = Count > MaxCount;
IsFull = Count > maxCount;
buffer.Add(entry);
}

View File

@ -25,8 +25,7 @@ namespace OverwatchTranscript
var moment = currentRef.ReadNext();
if (moment == null)
{
currentRef.Close();
currentRef = null!;
Close();
// This reference file ran out.
// The number of moments read should match exactly the number of moments
@ -41,10 +40,16 @@ namespace OverwatchTranscript
referenceIndex++;
if (referenceIndex < model.MomentReferences.Length)
{
// Proceed to next reference file.
currentRef = CreateOpenReference();
momentsRead = 0;
return Next();
}
else
{
// That was the last one.
return null;
}
momentsRead = 0;
return Next();
}
else
{
@ -53,6 +58,15 @@ namespace OverwatchTranscript
}
}
public void Close()
{
if (currentRef != null)
{
currentRef.Close();
currentRef = null!;
}
}
private OpenReference CreateOpenReference()
{
var filepath = Path.Combine(workingDir, model.MomentReferences[referenceIndex].MomentsFile);

View File

@ -1,22 +1,26 @@
using Newtonsoft.Json;
using Logging;
using Newtonsoft.Json;
namespace OverwatchTranscript
{
public class MomentReferenceBuilder
{
private const int MaxMomentsPerReference = 100;
private const int MaxMomentsPerReference = 1000;
private readonly ILog log;
private readonly string workingDir;
public MomentReferenceBuilder(string workingDir)
public MomentReferenceBuilder(ILog log, string workingDir)
{
this.log = log;
this.workingDir = workingDir;
}
public OverwatchMomentReference[] Build(IFinalizedBucket[] buckets)
{
var result = new List<OverwatchMomentReference>();
var currentBuilder = new Builder(workingDir);
var currentBuilder = new Builder(log, workingDir);
log.Debug($"Building references for {buckets.Length} buckets.");
while (EntriesRemaining(buckets))
{
var earliestUtc = GetEarliestUtc(buckets);
@ -26,7 +30,7 @@ namespace OverwatchTranscript
if (currentBuilder.NumberOfMoments == MaxMomentsPerReference)
{
result.Add(currentBuilder.Build());
currentBuilder = new Builder(workingDir);
currentBuilder = new Builder(log, workingDir);
}
}
@ -86,9 +90,10 @@ namespace OverwatchTranscript
public class Builder
{
private readonly ILog log;
private OverwatchMomentReference reference;
public Builder(string workingDir)
public Builder(ILog log, string workingDir)
{
reference = new OverwatchMomentReference
{
@ -98,6 +103,7 @@ namespace OverwatchTranscript
NumberOfEvents = 0,
NumberOfMoments = 0,
};
this.log = log;
}
public int NumberOfMoments => reference.NumberOfMoments;
@ -117,6 +123,7 @@ namespace OverwatchTranscript
public OverwatchMomentReference Build()
{
log.Debug($"Created reference with {reference.NumberOfMoments} moments and {reference.NumberOfEvents} events...");
var result = reference;
reference = null!;
return result;

View File

@ -13,7 +13,7 @@ namespace OverwatchTranscript
T GetHeader<T>(string key);
void AddMomentHandler(Action<ActivateMoment> handler);
void AddEventHandler<T>(Action<ActivateEvent<T>> handler);
void Next();
bool Next();
void Close();
}
@ -26,7 +26,6 @@ namespace OverwatchTranscript
private readonly Dictionary<string, List<Action<ActivateMoment, string>>> eventHandlers = new Dictionary<string, List<Action<ActivateMoment, string>>>();
private readonly string workingDir;
private readonly OverwatchTranscript model;
private readonly MomentReader reader;
private bool closed;
private long momentCounter;
private readonly object queueLock = new object();
@ -44,9 +43,8 @@ namespace OverwatchTranscript
if (File.Exists(transcriptFile) || Directory.Exists(artifactsFolder)) throw new Exception("workingdir not clean");
model = LoadModel(inputFilename);
reader = new MomentReader(model, workingDir);
queueFiller = Task.Run(FillQueue);
queueFiller = Task.Run(() => FillQueue(model, workingDir));
}
public OverwatchCommonHeader Header
@ -97,14 +95,14 @@ namespace OverwatchTranscript
}
}
public void Next()
public bool Next()
{
CheckClosed();
OverwatchMoment moment = null!;
OverwatchMoment? next = null;
lock (queueLock)
{
if (queue.Count == 0) return;
if (queue.Count == 0) return false;
moment = queue[0];
if (queue.Count > 1) next = queue[1];
@ -113,6 +111,7 @@ namespace OverwatchTranscript
var duration = GetMomentDuration(moment, next);
ActivateMoment(moment, duration);
return true;
}
public void Close()
@ -133,18 +132,28 @@ namespace OverwatchTranscript
};
}
private void FillQueue()
private void FillQueue(OverwatchTranscript model, string workingDir)
{
var reader = new MomentReader(model, workingDir);
while (true)
{
if (closed) return;
if (closed)
{
reader.Close();
return;
}
lock (queueLock)
{
while (queue.Count < 100)
{
var moment = reader.Next();
if (moment == null) return;
if (moment == null)
{
reader.Close();
return;
}
queue.Add(moment);
}
}

View File

@ -30,7 +30,7 @@ namespace OverwatchTranscript
this.log = log;
this.workingDir = workingDir;
bucketSet = new BucketSet(log, workingDir);
builder = new MomentReferenceBuilder(workingDir);
builder = new MomentReferenceBuilder(log, workingDir);
transcriptFile = Path.Combine(workingDir, TranscriptConstants.TranscriptFilename);
artifactsFolder = Path.Combine(workingDir, TranscriptConstants.ArtifactFolderName);
@ -74,6 +74,7 @@ namespace OverwatchTranscript
ZipFile.CreateFromDirectory(workingDir, outputFilename);
log.Debug($"Transcript written to {outputFilename}");
log.Debug($"Common header: {JsonConvert.SerializeObject(model.Header.Common, Formatting.Indented)}");
Directory.Delete(workingDir, true);
log.Debug($"Workdir {workingDir} deleted");

View File

@ -0,0 +1,83 @@
using Logging;
using NUnit.Framework;
using OverwatchTranscript;
namespace FrameworkTests.OverwatchTranscript
{
[TestFixture]
public class TranscriptLargeTests
{
private const int NumberOfThreads = 10;
private const int NumberOfEventsPerThread = 1000000;
private const string TranscriptFilename = "testtranscriptlarge.owts";
private TranscriptWriter writer = null!;
[Test]
public void MillionsOfEvents()
{
var workdir = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString());
var log = new FileLog(nameof(MillionsOfEvents));
writer = new TranscriptWriter(log, workdir);
var tasks = new List<Task>();
for (var i = 0; i < NumberOfThreads; i++)
{
tasks.Add(RunGeneratorThread());
}
Task.WaitAll(tasks.ToArray());
writer.Write(TranscriptFilename);
ReadTranscript(workdir);
File.Delete(TranscriptFilename);
}
private Task RunGeneratorThread()
{
return Task.Run(() =>
{
try
{
var remaining = NumberOfEventsPerThread;
while (remaining > 0)
{
writer.Add(DateTime.UtcNow, new MyEvent
{
EventData = Guid.NewGuid().ToString()
});
remaining--;
}
}
catch (Exception ex)
{
Assert.Fail("exception in thread: " + ex);
}
});
}
private void ReadTranscript(string workdir)
{
var reader = new TranscriptReader(workdir, TranscriptFilename);
var expectedNumberOfEvents = NumberOfThreads * NumberOfEventsPerThread;
Assert.That(reader.Header.NumberOfEvents, Is.EqualTo(expectedNumberOfEvents));
var counter = 0;
reader.AddEventHandler<MyEvent>(e =>
{
counter++;
});
var run = true;
while (run)
{
run = reader.Next();
}
reader.Close();
}
}
}

View File

@ -1,5 +1,4 @@
using Logging;
using Newtonsoft.Json;
using NUnit.Framework;
using OverwatchTranscript;