From 6a0d2830d6561ad1638ca64ce891be13101172a6 Mon Sep 17 00:00:00 2001 From: benbierens Date: Fri, 2 Aug 2024 14:25:38 +0200 Subject: [PATCH] Speeds up transcript building but not by much --- Framework/OverwatchTranscript/ActionQueue.cs | 60 +++++++++ Framework/OverwatchTranscript/BucketSet.cs | 61 ++------- .../OverwatchTranscript/EventBucketReader.cs | 127 ++++++++++++++++++ .../{EventBucket.cs => EventBucketWriter.cs} | 41 +----- .../MomentReferenceBuilder.cs | 74 ++++++---- 5 files changed, 246 insertions(+), 117 deletions(-) create mode 100644 Framework/OverwatchTranscript/ActionQueue.cs create mode 100644 Framework/OverwatchTranscript/EventBucketReader.cs rename Framework/OverwatchTranscript/{EventBucket.cs => EventBucketWriter.cs} (75%) diff --git a/Framework/OverwatchTranscript/ActionQueue.cs b/Framework/OverwatchTranscript/ActionQueue.cs new file mode 100644 index 0000000..83e9865 --- /dev/null +++ b/Framework/OverwatchTranscript/ActionQueue.cs @@ -0,0 +1,60 @@ +namespace OverwatchTranscript +{ + public class ActionQueue + { + private readonly object queueLock = new object(); + private readonly AutoResetEvent signal = new AutoResetEvent(false); + private List queue = new List(); + private Task queueWorker = null!; + private bool stopping = false; + + public void Start() + { + queueWorker = Task.Run(QueueWorker); + } + + public int Count { get; private set; } + + public void StopAndJoin() + { + stopping = true; + queueWorker.Wait(); + if (queue.Count > 0) throw new Exception("not all acions handled"); + queueWorker.Dispose(); + } + + public void Add(Action action) + { + if (stopping) throw new Exception("queue stopping"); + + lock (queueLock) + { + queue.Add(action); + Count = queue.Count; + } + signal.Set(); + } + + private void QueueWorker() + { + while (true) + { + signal.WaitOne(10); + + List work = null!; + lock (queueLock) + { + work = queue; + queue = new List(); + Count = 0; + } + if (stopping && !work.Any()) return; + + foreach (var action in work) + { + action(); + } + } + } + } +} diff --git a/Framework/OverwatchTranscript/BucketSet.cs b/Framework/OverwatchTranscript/BucketSet.cs index c1755da..eb256ba 100644 --- a/Framework/OverwatchTranscript/BucketSet.cs +++ b/Framework/OverwatchTranscript/BucketSet.cs @@ -5,14 +5,12 @@ namespace OverwatchTranscript public class BucketSet { private const int numberOfActiveBuckets = 10; - private readonly object queueLock = new object(); - private List queue = new List(); - private readonly Task queueWorker; private readonly ILog log; private readonly string workingDir; private readonly object _bucketLock = new object(); - private readonly List fullBuckets = new List(); - private readonly List activeBuckets = new List(); + private readonly List fullBuckets = new List(); + private readonly List activeBuckets = new List(); + private readonly ActionQueue queue = new ActionQueue(); private int activeBucketIndex = 0; private bool closed = false; private string internalErrors = string.Empty; @@ -27,20 +25,15 @@ namespace OverwatchTranscript AddNewBucket(); } - queueWorker = Task.Run(QueueWorker); + queue.Start(); } public void Add(DateTime utc, object payload) { if (closed) throw new Exception("Buckets already closed!"); - int count = 0; - lock (queueLock) - { - queue.Add(() => AddInternal(utc, payload)); - count = queue.Count; - } - - if (count > 1000) + queue.Add(() => AddInternal(utc, payload)); + + if (queue.Count > 1000) { Thread.Sleep(1); } @@ -49,8 +42,7 @@ namespace OverwatchTranscript public IFinalizedBucket[] FinalizeBuckets() { closed = true; - WaitForZeroQueue(); - queueWorker.Wait(); + queue.StopAndJoin(); if (IsEmpty()) throw new Exception("No entries have been added."); if (!string.IsNullOrEmpty(internalErrors)) throw new Exception(internalErrors); @@ -105,42 +97,7 @@ namespace OverwatchTranscript { var size = bucketSizes[bucketSizeIndex]; bucketSizeIndex = (bucketSizeIndex + 1) % bucketSizes.Length; - activeBuckets.Add(new EventBucket(log, Path.Combine(workingDir, Guid.NewGuid().ToString()), size)); - } - } - - private void QueueWorker() - { - while (true) - { - List work = null!; - lock (queueLock) - { - work = queue; - queue = new List(); - } - - if (closed && !work.Any()) return; - foreach (var action in work) - { - action(); - } - - Thread.Sleep(0); - } - } - - private void WaitForZeroQueue() - { - log.Debug("Wait for zero pending."); - while (true) - { - lock (queueLock) - { - log.Debug("(wait) Pending: " + queue.Count); - if (queue.Count == 0) return; - } - Thread.Sleep(10); + activeBuckets.Add(new EventBucketWriter(log, Path.Combine(workingDir, Guid.NewGuid().ToString()), size)); } } } diff --git a/Framework/OverwatchTranscript/EventBucketReader.cs b/Framework/OverwatchTranscript/EventBucketReader.cs new file mode 100644 index 0000000..e44ba44 --- /dev/null +++ b/Framework/OverwatchTranscript/EventBucketReader.cs @@ -0,0 +1,127 @@ +using Logging; +using Newtonsoft.Json; + +namespace OverwatchTranscript +{ + public interface IFinalizedBucket + { + bool IsEmpty { get; } + DateTime? SeeTopUtc(); + BucketTop? TakeTop(); + } + + public class BucketTop + { + public BucketTop(DateTime utc, OverwatchEvent[] events) + { + Utc = utc; + Events = events; + } + + public DateTime Utc { get; } + public OverwatchEvent[] Events { get; } + } + + public class EventBucketReader : IFinalizedBucket + { + private readonly object topLock = new object(); + private readonly string bucketFile; + private readonly AutoResetEvent topReadySignal = new AutoResetEvent(false); + private readonly AutoResetEvent topTakenSignal = new AutoResetEvent(true); + private BucketTop? top; + private DateTime? topUtc; + + public EventBucketReader(ILog log, string bucketFile) + { + this.bucketFile = bucketFile; + if (!File.Exists(bucketFile)) throw new Exception("Doesn't exist: " + bucketFile); + + log.Debug("Read Bucket open: " + bucketFile); + + Task.Run(ReadBucket); + } + + public bool IsEmpty { get; private set; } + + public DateTime? SeeTopUtc() + { + if (IsEmpty) return null; + return topUtc; + } + + public BucketTop? TakeTop() + { + if (IsEmpty) return null; + topReadySignal.WaitOne(); + + lock (topLock) + { + var t = top; + top = null; + topUtc = null; + topTakenSignal.Set(); + return t; + } + } + + private void ReadBucket() + { + using var file = File.OpenRead(bucketFile); + using var reader = new StreamReader(file); + + while (true) + { + topTakenSignal.WaitOne(10); + lock (topLock) + { + if (top == null) + { + top = CreateNewTop(reader); + if (top != null) + { + topUtc = top.Utc; + } + topReadySignal.Set(); + } + if (top == null) + { + IsEmpty = true; + return; + } + } + } + } + + private EventBucketEntry? nextEntry = null; + private BucketTop? CreateNewTop(StreamReader reader) + { + if (nextEntry == null) + { + nextEntry = ReadEntry(reader); + if (nextEntry == null) return null; + } + + var topEntry = nextEntry; + var entries = new List + { + topEntry + }; + + nextEntry = ReadEntry(reader); + while (nextEntry != null && nextEntry.Utc == topEntry.Utc) + { + entries.Add(nextEntry); + nextEntry = ReadEntry(reader); + } + + return new BucketTop(topEntry.Utc, entries.Select(e => e.Event).ToArray()); + } + + private EventBucketEntry? ReadEntry(StreamReader reader) + { + var line = reader.ReadLine(); + if (string.IsNullOrEmpty(line)) return null; + return JsonConvert.DeserializeObject(line); + } + } +} diff --git a/Framework/OverwatchTranscript/EventBucket.cs b/Framework/OverwatchTranscript/EventBucketWriter.cs similarity index 75% rename from Framework/OverwatchTranscript/EventBucket.cs rename to Framework/OverwatchTranscript/EventBucketWriter.cs index f5b8722..179b7f4 100644 --- a/Framework/OverwatchTranscript/EventBucket.cs +++ b/Framework/OverwatchTranscript/EventBucketWriter.cs @@ -3,7 +3,7 @@ using Newtonsoft.Json; namespace OverwatchTranscript { - public class EventBucket : IFinalizedBucket + public class EventBucketWriter { private const int MaxBuffer = 1000; @@ -13,16 +13,15 @@ namespace OverwatchTranscript private readonly string bucketFile; private readonly int maxCount; private readonly List buffer = new List(); - private EventBucketEntry? topEntry; - public EventBucket(ILog log, string bucketFile, int maxCount) + public EventBucketWriter(ILog log, string bucketFile, int maxCount) { this.log = log; this.bucketFile = bucketFile; this.maxCount = maxCount; if (File.Exists(bucketFile)) throw new Exception("Already exists"); - log.Debug("Bucket open: " + bucketFile); + log.Debug("Write Bucket open: " + bucketFile); } public int Count { get; private set; } @@ -47,29 +46,7 @@ namespace OverwatchTranscript SortFileByTimestamps(); } log.Debug($"Finalized bucket with {Count} entries"); - return this; - } - - public EventBucketEntry? ViewTopEntry() - { - if (!closed) throw new Exception("Bucket not closed yet. FinalizeBucket first."); - return topEntry; - } - - public void PopTopEntry() - { - var lines = File.ReadAllLines(bucketFile).ToList(); - lines.RemoveAt(0); - File.WriteAllLines(bucketFile, lines); - - if (lines.Any()) - { - topEntry = JsonConvert.DeserializeObject(lines[0]); - } - else - { - topEntry = null; - } + return new EventBucketReader(log, bucketFile); } public override string ToString() @@ -124,19 +101,9 @@ namespace OverwatchTranscript File.Delete(bucketFile); File.WriteAllLines(bucketFile, entries.Select(JsonConvert.SerializeObject)); - - topEntry = entries.FirstOrDefault(); } } - public interface IFinalizedBucket - { - int Count { get; } - bool IsFull { get; } - EventBucketEntry? ViewTopEntry(); - void PopTopEntry(); - } - [Serializable] public class EventBucketEntry { diff --git a/Framework/OverwatchTranscript/MomentReferenceBuilder.cs b/Framework/OverwatchTranscript/MomentReferenceBuilder.cs index 997ebaa..7211137 100644 --- a/Framework/OverwatchTranscript/MomentReferenceBuilder.cs +++ b/Framework/OverwatchTranscript/MomentReferenceBuilder.cs @@ -15,17 +15,26 @@ namespace OverwatchTranscript this.workingDir = workingDir; } - public OverwatchMomentReference[] Build(IFinalizedBucket[] buckets) + public OverwatchMomentReference[] Build(IFinalizedBucket[] finalizedBuckets) { var result = new List(); var currentBuilder = new Builder(log, workingDir); - log.Debug($"Building references for {buckets.Length} buckets."); - while (EntriesRemaining(buckets)) + var buckets = finalizedBuckets.ToList(); + log.Debug($"Building references for {buckets.Count} buckets."); + while (buckets.Any()) { + buckets.RemoveAll(b => b.IsEmpty); + if (!buckets.Any()) break; + var earliestUtc = GetEarliestUtc(buckets); - var entries = CollectAllEntriesForUtc(earliestUtc, buckets); - var moment = ConvertEntriesToMoment(entries); + if (earliestUtc == null) + { + Thread.Sleep(1); continue; + } + + var tops = CollectAllTopsForUtc(earliestUtc.Value, buckets); + var moment = ConvertTopsToMoment(tops); currentBuilder.Add(moment); if (currentBuilder.NumberOfMoments == MaxMomentsPerReference) { @@ -42,56 +51,58 @@ namespace OverwatchTranscript return result.ToArray(); } - private OverwatchMoment ConvertEntriesToMoment(List entries) + private OverwatchMoment ConvertTopsToMoment(List tops) { - var discintUtc = entries.Select(e => e.Utc).Distinct().ToArray(); + var discintUtc = tops.Select(e => e.Utc).Distinct().ToArray(); if (discintUtc.Length != 1) throw new Exception("UTC mixing in moment construction."); return new OverwatchMoment { - Utc = entries[0].Utc, - Events = entries.Select(e => e.Event).ToArray() + Utc = tops[0].Utc, + Events = tops.SelectMany(e => e.Events).ToArray() }; } - private List CollectAllEntriesForUtc(DateTime earliestUtc, IFinalizedBucket[] buckets) + private List CollectAllTopsForUtc(DateTime earliestUtc, List buckets) { - var result = new List(); + var result = new List(); foreach (var bucket in buckets) { - var top = bucket.ViewTopEntry(); - while (top != null && top.Utc == earliestUtc) + if (bucket.IsEmpty) continue; + + var utc = bucket.SeeTopUtc(); + if (utc == null) continue; + + if (utc.Value == earliestUtc) { + var top = bucket.TakeTop(); + if (top == null) throw new Exception("top was null after top utc was not"); result.Add(top); - bucket.PopTopEntry(); - top = bucket.ViewTopEntry(); } } return result; } - private DateTime GetEarliestUtc(IFinalizedBucket[] buckets) + private DateTime? GetEarliestUtc(List buckets) { var earliest = DateTime.MaxValue; foreach (var bucket in buckets) { - var top = bucket.ViewTopEntry(); - if (top != null && top.Utc < earliest) earliest = top.Utc; + var utc = bucket.SeeTopUtc(); + if (utc == null) return null; + + if (utc.Value < earliest) earliest = utc.Value; } return earliest; } - private bool EntriesRemaining(IFinalizedBucket[] buckets) - { - return buckets.Any(b => b.ViewTopEntry() != null); - } - public class Builder { private readonly ILog log; private OverwatchMomentReference reference; + private readonly ActionQueue queue = new ActionQueue(); public Builder(ILog log, string workingDir) { @@ -104,25 +115,32 @@ namespace OverwatchTranscript NumberOfMoments = 0, }; this.log = log; + + queue.Start(); } public int NumberOfMoments => reference.NumberOfMoments; public void Add(OverwatchMoment moment) { - File.AppendAllLines(reference.MomentsFile, new[] - { - JsonConvert.SerializeObject(moment) - }); - if (moment.Utc < reference.EarliestUtc) reference.EarliestUtc = moment.Utc; if (moment.Utc > reference.LatestUtc) reference.LatestUtc = moment.Utc; reference.NumberOfMoments++; reference.NumberOfEvents += moment.Events.Length; + + queue.Add(() => + { + File.AppendAllLines(reference.MomentsFile, new[] + { + JsonConvert.SerializeObject(moment) + }); + }); } public OverwatchMomentReference Build() { + queue.StopAndJoin(); + log.Debug($"Created reference with {reference.NumberOfMoments} moments and {reference.NumberOfEvents} events..."); var result = reference; reference = null!;