2
0
mirror of synced 2025-02-03 04:03:39 +00:00

Speeds up transcript building but not by much

This commit is contained in:
benbierens 2024-08-02 14:25:38 +02:00
parent b3710f26ae
commit 6a0d2830d6
No known key found for this signature in database
GPG Key ID: 877D2C2E09A22F3A
5 changed files with 246 additions and 117 deletions

View File

@ -0,0 +1,60 @@
namespace OverwatchTranscript
{
public class ActionQueue
{
private readonly object queueLock = new object();
private readonly AutoResetEvent signal = new AutoResetEvent(false);
private List<Action> queue = new List<Action>();
private Task queueWorker = null!;
private bool stopping = false;
public void Start()
{
queueWorker = Task.Run(QueueWorker);
}
public int Count { get; private set; }
public void StopAndJoin()
{
stopping = true;
queueWorker.Wait();
if (queue.Count > 0) throw new Exception("not all acions handled");
queueWorker.Dispose();
}
public void Add(Action action)
{
if (stopping) throw new Exception("queue stopping");
lock (queueLock)
{
queue.Add(action);
Count = queue.Count;
}
signal.Set();
}
private void QueueWorker()
{
while (true)
{
signal.WaitOne(10);
List<Action> work = null!;
lock (queueLock)
{
work = queue;
queue = new List<Action>();
Count = 0;
}
if (stopping && !work.Any()) return;
foreach (var action in work)
{
action();
}
}
}
}
}

View File

@ -5,14 +5,12 @@ namespace OverwatchTranscript
public class BucketSet
{
private const int numberOfActiveBuckets = 10;
private readonly object queueLock = new object();
private List<Action> queue = new List<Action>();
private readonly Task queueWorker;
private readonly ILog log;
private readonly string workingDir;
private readonly object _bucketLock = new object();
private readonly List<EventBucket> fullBuckets = new List<EventBucket>();
private readonly List<EventBucket> activeBuckets = new List<EventBucket>();
private readonly List<EventBucketWriter> fullBuckets = new List<EventBucketWriter>();
private readonly List<EventBucketWriter> activeBuckets = new List<EventBucketWriter>();
private readonly ActionQueue queue = new ActionQueue();
private int activeBucketIndex = 0;
private bool closed = false;
private string internalErrors = string.Empty;
@ -27,20 +25,15 @@ namespace OverwatchTranscript
AddNewBucket();
}
queueWorker = Task.Run(QueueWorker);
queue.Start();
}
public void Add(DateTime utc, object payload)
{
if (closed) throw new Exception("Buckets already closed!");
int count = 0;
lock (queueLock)
{
queue.Add(() => AddInternal(utc, payload));
count = queue.Count;
}
if (count > 1000)
queue.Add(() => AddInternal(utc, payload));
if (queue.Count > 1000)
{
Thread.Sleep(1);
}
@ -49,8 +42,7 @@ namespace OverwatchTranscript
public IFinalizedBucket[] FinalizeBuckets()
{
closed = true;
WaitForZeroQueue();
queueWorker.Wait();
queue.StopAndJoin();
if (IsEmpty()) throw new Exception("No entries have been added.");
if (!string.IsNullOrEmpty(internalErrors)) throw new Exception(internalErrors);
@ -105,42 +97,7 @@ namespace OverwatchTranscript
{
var size = bucketSizes[bucketSizeIndex];
bucketSizeIndex = (bucketSizeIndex + 1) % bucketSizes.Length;
activeBuckets.Add(new EventBucket(log, Path.Combine(workingDir, Guid.NewGuid().ToString()), size));
}
}
private void QueueWorker()
{
while (true)
{
List<Action> work = null!;
lock (queueLock)
{
work = queue;
queue = new List<Action>();
}
if (closed && !work.Any()) return;
foreach (var action in work)
{
action();
}
Thread.Sleep(0);
}
}
private void WaitForZeroQueue()
{
log.Debug("Wait for zero pending.");
while (true)
{
lock (queueLock)
{
log.Debug("(wait) Pending: " + queue.Count);
if (queue.Count == 0) return;
}
Thread.Sleep(10);
activeBuckets.Add(new EventBucketWriter(log, Path.Combine(workingDir, Guid.NewGuid().ToString()), size));
}
}
}

View File

@ -0,0 +1,127 @@
using Logging;
using Newtonsoft.Json;
namespace OverwatchTranscript
{
public interface IFinalizedBucket
{
bool IsEmpty { get; }
DateTime? SeeTopUtc();
BucketTop? TakeTop();
}
public class BucketTop
{
public BucketTop(DateTime utc, OverwatchEvent[] events)
{
Utc = utc;
Events = events;
}
public DateTime Utc { get; }
public OverwatchEvent[] Events { get; }
}
public class EventBucketReader : IFinalizedBucket
{
private readonly object topLock = new object();
private readonly string bucketFile;
private readonly AutoResetEvent topReadySignal = new AutoResetEvent(false);
private readonly AutoResetEvent topTakenSignal = new AutoResetEvent(true);
private BucketTop? top;
private DateTime? topUtc;
public EventBucketReader(ILog log, string bucketFile)
{
this.bucketFile = bucketFile;
if (!File.Exists(bucketFile)) throw new Exception("Doesn't exist: " + bucketFile);
log.Debug("Read Bucket open: " + bucketFile);
Task.Run(ReadBucket);
}
public bool IsEmpty { get; private set; }
public DateTime? SeeTopUtc()
{
if (IsEmpty) return null;
return topUtc;
}
public BucketTop? TakeTop()
{
if (IsEmpty) return null;
topReadySignal.WaitOne();
lock (topLock)
{
var t = top;
top = null;
topUtc = null;
topTakenSignal.Set();
return t;
}
}
private void ReadBucket()
{
using var file = File.OpenRead(bucketFile);
using var reader = new StreamReader(file);
while (true)
{
topTakenSignal.WaitOne(10);
lock (topLock)
{
if (top == null)
{
top = CreateNewTop(reader);
if (top != null)
{
topUtc = top.Utc;
}
topReadySignal.Set();
}
if (top == null)
{
IsEmpty = true;
return;
}
}
}
}
private EventBucketEntry? nextEntry = null;
private BucketTop? CreateNewTop(StreamReader reader)
{
if (nextEntry == null)
{
nextEntry = ReadEntry(reader);
if (nextEntry == null) return null;
}
var topEntry = nextEntry;
var entries = new List<EventBucketEntry>
{
topEntry
};
nextEntry = ReadEntry(reader);
while (nextEntry != null && nextEntry.Utc == topEntry.Utc)
{
entries.Add(nextEntry);
nextEntry = ReadEntry(reader);
}
return new BucketTop(topEntry.Utc, entries.Select(e => e.Event).ToArray());
}
private EventBucketEntry? ReadEntry(StreamReader reader)
{
var line = reader.ReadLine();
if (string.IsNullOrEmpty(line)) return null;
return JsonConvert.DeserializeObject<EventBucketEntry>(line);
}
}
}

View File

@ -3,7 +3,7 @@ using Newtonsoft.Json;
namespace OverwatchTranscript
{
public class EventBucket : IFinalizedBucket
public class EventBucketWriter
{
private const int MaxBuffer = 1000;
@ -13,16 +13,15 @@ namespace OverwatchTranscript
private readonly string bucketFile;
private readonly int maxCount;
private readonly List<EventBucketEntry> buffer = new List<EventBucketEntry>();
private EventBucketEntry? topEntry;
public EventBucket(ILog log, string bucketFile, int maxCount)
public EventBucketWriter(ILog log, string bucketFile, int maxCount)
{
this.log = log;
this.bucketFile = bucketFile;
this.maxCount = maxCount;
if (File.Exists(bucketFile)) throw new Exception("Already exists");
log.Debug("Bucket open: " + bucketFile);
log.Debug("Write Bucket open: " + bucketFile);
}
public int Count { get; private set; }
@ -47,29 +46,7 @@ namespace OverwatchTranscript
SortFileByTimestamps();
}
log.Debug($"Finalized bucket with {Count} entries");
return this;
}
public EventBucketEntry? ViewTopEntry()
{
if (!closed) throw new Exception("Bucket not closed yet. FinalizeBucket first.");
return topEntry;
}
public void PopTopEntry()
{
var lines = File.ReadAllLines(bucketFile).ToList();
lines.RemoveAt(0);
File.WriteAllLines(bucketFile, lines);
if (lines.Any())
{
topEntry = JsonConvert.DeserializeObject<EventBucketEntry>(lines[0]);
}
else
{
topEntry = null;
}
return new EventBucketReader(log, bucketFile);
}
public override string ToString()
@ -124,19 +101,9 @@ namespace OverwatchTranscript
File.Delete(bucketFile);
File.WriteAllLines(bucketFile, entries.Select(JsonConvert.SerializeObject));
topEntry = entries.FirstOrDefault();
}
}
public interface IFinalizedBucket
{
int Count { get; }
bool IsFull { get; }
EventBucketEntry? ViewTopEntry();
void PopTopEntry();
}
[Serializable]
public class EventBucketEntry
{

View File

@ -15,17 +15,26 @@ namespace OverwatchTranscript
this.workingDir = workingDir;
}
public OverwatchMomentReference[] Build(IFinalizedBucket[] buckets)
public OverwatchMomentReference[] Build(IFinalizedBucket[] finalizedBuckets)
{
var result = new List<OverwatchMomentReference>();
var currentBuilder = new Builder(log, workingDir);
log.Debug($"Building references for {buckets.Length} buckets.");
while (EntriesRemaining(buckets))
var buckets = finalizedBuckets.ToList();
log.Debug($"Building references for {buckets.Count} buckets.");
while (buckets.Any())
{
buckets.RemoveAll(b => b.IsEmpty);
if (!buckets.Any()) break;
var earliestUtc = GetEarliestUtc(buckets);
var entries = CollectAllEntriesForUtc(earliestUtc, buckets);
var moment = ConvertEntriesToMoment(entries);
if (earliestUtc == null)
{
Thread.Sleep(1); continue;
}
var tops = CollectAllTopsForUtc(earliestUtc.Value, buckets);
var moment = ConvertTopsToMoment(tops);
currentBuilder.Add(moment);
if (currentBuilder.NumberOfMoments == MaxMomentsPerReference)
{
@ -42,56 +51,58 @@ namespace OverwatchTranscript
return result.ToArray();
}
private OverwatchMoment ConvertEntriesToMoment(List<EventBucketEntry> entries)
private OverwatchMoment ConvertTopsToMoment(List<BucketTop> tops)
{
var discintUtc = entries.Select(e => e.Utc).Distinct().ToArray();
var discintUtc = tops.Select(e => e.Utc).Distinct().ToArray();
if (discintUtc.Length != 1) throw new Exception("UTC mixing in moment construction.");
return new OverwatchMoment
{
Utc = entries[0].Utc,
Events = entries.Select(e => e.Event).ToArray()
Utc = tops[0].Utc,
Events = tops.SelectMany(e => e.Events).ToArray()
};
}
private List<EventBucketEntry> CollectAllEntriesForUtc(DateTime earliestUtc, IFinalizedBucket[] buckets)
private List<BucketTop> CollectAllTopsForUtc(DateTime earliestUtc, List<IFinalizedBucket> buckets)
{
var result = new List<EventBucketEntry>();
var result = new List<BucketTop>();
foreach (var bucket in buckets)
{
var top = bucket.ViewTopEntry();
while (top != null && top.Utc == earliestUtc)
if (bucket.IsEmpty) continue;
var utc = bucket.SeeTopUtc();
if (utc == null) continue;
if (utc.Value == earliestUtc)
{
var top = bucket.TakeTop();
if (top == null) throw new Exception("top was null after top utc was not");
result.Add(top);
bucket.PopTopEntry();
top = bucket.ViewTopEntry();
}
}
return result;
}
private DateTime GetEarliestUtc(IFinalizedBucket[] buckets)
private DateTime? GetEarliestUtc(List<IFinalizedBucket> buckets)
{
var earliest = DateTime.MaxValue;
foreach (var bucket in buckets)
{
var top = bucket.ViewTopEntry();
if (top != null && top.Utc < earliest) earliest = top.Utc;
var utc = bucket.SeeTopUtc();
if (utc == null) return null;
if (utc.Value < earliest) earliest = utc.Value;
}
return earliest;
}
private bool EntriesRemaining(IFinalizedBucket[] buckets)
{
return buckets.Any(b => b.ViewTopEntry() != null);
}
public class Builder
{
private readonly ILog log;
private OverwatchMomentReference reference;
private readonly ActionQueue queue = new ActionQueue();
public Builder(ILog log, string workingDir)
{
@ -104,25 +115,32 @@ namespace OverwatchTranscript
NumberOfMoments = 0,
};
this.log = log;
queue.Start();
}
public int NumberOfMoments => reference.NumberOfMoments;
public void Add(OverwatchMoment moment)
{
File.AppendAllLines(reference.MomentsFile, new[]
{
JsonConvert.SerializeObject(moment)
});
if (moment.Utc < reference.EarliestUtc) reference.EarliestUtc = moment.Utc;
if (moment.Utc > reference.LatestUtc) reference.LatestUtc = moment.Utc;
reference.NumberOfMoments++;
reference.NumberOfEvents += moment.Events.Length;
queue.Add(() =>
{
File.AppendAllLines(reference.MomentsFile, new[]
{
JsonConvert.SerializeObject(moment)
});
});
}
public OverwatchMomentReference Build()
{
queue.StopAndJoin();
log.Debug($"Created reference with {reference.NumberOfMoments} moments and {reference.NumberOfEvents} events...");
var result = reference;
reference = null!;