2
0
mirror of synced 2025-01-11 09:06:56 +00:00

finishes: fast millions of events support

This commit is contained in:
Ben 2024-08-05 12:24:13 +02:00
parent 72865e7f96
commit 2ec7f26387
No known key found for this signature in database
GPG Key ID: 0F16E812E736C24B
3 changed files with 56 additions and 28 deletions

View File

@ -2,6 +2,7 @@
{
public class ActionQueue
{
// Using ConcurrentQueue<> here would make this process slower.
private readonly object queueLock = new object();
private readonly AutoResetEvent signal = new AutoResetEvent(false);
private List<Action> queue = new List<Action>();

View File

@ -4,6 +4,7 @@ using System;
using System.IO.Compression;
using System.Linq;
using System.Collections.Generic;
using System.Collections.Concurrent;
namespace OverwatchTranscript
{
@ -28,8 +29,7 @@ namespace OverwatchTranscript
private readonly OverwatchTranscript model;
private bool closed;
private long momentCounter;
private readonly object queueLock = new object();
private readonly List<OverwatchMoment> queue = new List<OverwatchMoment>();
private readonly ConcurrentQueue<OverwatchMoment> queue = new ConcurrentQueue<OverwatchMoment>();
private readonly Task queueFiller;
public TranscriptReader(string workingDir, string inputFilename)
@ -95,22 +95,36 @@ namespace OverwatchTranscript
}
}
private readonly object nextLock = new object();
private OverwatchMoment? moment = null;
private OverwatchMoment? next = null;
public bool Next()
{
CheckClosed();
OverwatchMoment moment = null!;
OverwatchMoment? next = null;
lock (queueLock)
{
if (queue.Count == 0) return false;
moment = queue[0];
if (queue.Count > 1) next = queue[1];
queue.RemoveAt(0);
OverwatchMoment? m = null;
TimeSpan? duration = null;
lock (nextLock)
{
if (next == null)
{
if (!queue.TryDequeue(out moment)) return false;
queue.TryDequeue(out next);
}
else
{
moment = next;
next = null;
queue.TryDequeue(out next);
}
m = moment;
duration = GetMomentDuration();
}
var duration = GetMomentDuration(moment, next);
ActivateMoment(moment, duration);
return true;
}
@ -144,25 +158,22 @@ namespace OverwatchTranscript
return;
}
lock (queueLock)
while (queue.Count < 10)
{
while (queue.Count < 100)
var moment = reader.Next();
if (moment == null)
{
var moment = reader.Next();
if (moment == null)
{
reader.Close();
return;
}
queue.Add(moment);
reader.Close();
return;
}
queue.Enqueue(moment);
}
Thread.Sleep(1);
}
}
private TimeSpan? GetMomentDuration(OverwatchMoment moment, OverwatchMoment? next)
private TimeSpan? GetMomentDuration()
{
if (moment == null) return null;
if (next == null) return null;

View File

@ -13,6 +13,7 @@ namespace FrameworkTests.OverwatchTranscript
private TranscriptWriter writer = null!;
[Test]
[Ignore("Takes about 25 minutes to run.")]
public void MillionsOfEvents()
{
var workdir = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString());
@ -20,17 +21,26 @@ namespace FrameworkTests.OverwatchTranscript
var log = new FileLog(nameof(MillionsOfEvents));
writer = new TranscriptWriter(log, workdir);
var tasks = new List<Task>();
for (var i = 0; i < NumberOfThreads; i++)
Stopwatch.Measure(log, "Generate", () =>
{
tasks.Add(RunGeneratorThread());
}
var tasks = new List<Task>();
for (var i = 0; i < NumberOfThreads; i++)
{
tasks.Add(RunGeneratorThread());
}
Task.WaitAll(tasks.ToArray());
Task.WaitAll(tasks.ToArray());
});
writer.Write(TranscriptFilename);
Stopwatch.Measure(log, "Write", () =>
{
writer.Write(TranscriptFilename);
});
ReadTranscript(workdir);
Stopwatch.Measure(log, "Read", () =>
{
ReadTranscript(workdir);
});
File.Delete(TranscriptFilename);
}
@ -66,9 +76,15 @@ namespace FrameworkTests.OverwatchTranscript
Assert.That(reader.Header.NumberOfEvents, Is.EqualTo(expectedNumberOfEvents));
var counter = 0;
var current = DateTime.MinValue;
reader.AddEventHandler<MyEvent>(e =>
{
counter++;
if (e.Moment.Utc < current)
{
Assert.Fail("Event has moment BEFORE previous one.");
}
current = e.Moment.Utc;
});
var run = true;