diff --git a/Framework/OverwatchTranscript/BucketSet.cs b/Framework/OverwatchTranscript/BucketSet.cs new file mode 100644 index 00000000..544dc5b7 --- /dev/null +++ b/Framework/OverwatchTranscript/BucketSet.cs @@ -0,0 +1,111 @@ +namespace OverwatchTranscript +{ + public class BucketSet + { + private const int numberOfActiveBuckets = 5; + private readonly object _counterLock = new object(); + private int pendingAdds = 0; + + private readonly object _bucketLock = new object(); + private readonly List fullBuckets = new List(); + private readonly List activeBuckets = new List(); + private int activeBucketIndex = 0; + private bool closed = false; + private readonly string workingDir; + + public BucketSet(string workingDir) + { + this.workingDir = workingDir; + + for (var i = 0; i < numberOfActiveBuckets;i++) + { + AddNewBucket(); + } + } + + public string Error { get; private set; } = string.Empty; + + public void Add(DateTime utc, object payload) + { + if (closed) throw new Exception("Buckets already closed!"); + AddPending(); + Task.Run(() => AddInternal(utc, payload)); + } + + public bool IsEmpty() + { + return fullBuckets.All(b => b.Count == 0) && activeBuckets.All(b => b.Count == 0); + } + + public IFinalizedBucket[] FinalizeBuckets() + { + closed = true; + WaitForZeroPending(); + + var buckets = fullBuckets.Concat(activeBuckets).ToArray(); + return buckets.Select(b => b.FinalizeBucket()).ToArray(); + } + + private void AddInternal(DateTime utc, object payload) + { + try + { + lock (_bucketLock) + { + var current = activeBuckets[activeBucketIndex]; + current.Add(utc, payload); + activeBucketIndex = (activeBucketIndex + 1) % numberOfActiveBuckets; + + if (current.IsFull) + { + fullBuckets.Add(current); + activeBuckets.Remove(current); + AddNewBucket(); + } + RemovePending(); + } + } + catch (Exception ex) + { + Error += ex.ToString(); + } + } + + private void AddNewBucket() + { + lock (_bucketLock) + { + activeBuckets.Add(new EventBucket(Path.Combine(workingDir, Guid.NewGuid().ToString()))); + } + } + + private void AddPending() + { + lock (_counterLock) + { + pendingAdds++; + } + } + + private void RemovePending() + { + lock (_counterLock) + { + pendingAdds--; + if (pendingAdds < 0) Error += "Pending less than zero"; + } + } + + private void WaitForZeroPending() + { + while (true) + { + lock (_counterLock) + { + if (pendingAdds == 0) return; + } + Thread.Sleep(10); + } + } + } +} diff --git a/Framework/OverwatchTranscript/EventBucket.cs b/Framework/OverwatchTranscript/EventBucket.cs index 09ee121c..a71fa4b9 100644 --- a/Framework/OverwatchTranscript/EventBucket.cs +++ b/Framework/OverwatchTranscript/EventBucket.cs @@ -2,15 +2,14 @@ namespace OverwatchTranscript { - public class EventBucket + public class EventBucket : IFinalizedBucket { private const int MaxCount = 10000; private const int MaxBuffer = 100; private readonly object _lock = new object(); - private readonly object _counterLock = new object(); private bool closed = false; - private int pendingAdds = 0; + private readonly string bucketFile; private readonly List buffer = new List(); private EventBucketEntry? topEntry; @@ -28,24 +27,26 @@ namespace OverwatchTranscript public bool IsFull { get; private set; } public DateTime EarliestUtc { get; private set; } public DateTime LatestUtc { get; private set; } - public string Error { get; private set; } = string.Empty; public void Add(DateTime utc, object payload) { - if (closed) throw new Exception("Already closed"); - AddPending(); - Task.Run(() => InternalAdd(utc, payload)); - } - - public void FinalizeBucket() - { - closed = true; lock (_lock) { - WaitForZeroPending(); + if (closed) throw new Exception("Already closed"); + AddToBuffer(utc, payload); + BufferToFile(); + } + } + + public IFinalizedBucket FinalizeBucket() + { + lock (_lock) + { + closed = true; BufferToFile(); SortFileByTimestamps(); } + return this; } public EventBucketEntry? ViewTopEntry() @@ -70,43 +71,11 @@ namespace OverwatchTranscript } } - private void InternalAdd(DateTime utc, object payload) - { - lock (_lock) - { - AddToBuffer(utc, payload); - BufferToFile(); - RemovePending(); - } - } - - private void BufferToFile() - { - if (buffer.Count > MaxBuffer) - { - using var file = File.Open(bucketFile, FileMode.Append); - using var writer = new StreamWriter(file); - foreach (var entry in buffer) - { - writer.WriteLine(JsonConvert.SerializeObject(entry)); - } - buffer.Clear(); - } - } - private void AddToBuffer(DateTime utc, object payload) { var typeName = payload.GetType().FullName; - if (string.IsNullOrEmpty(typeName)) - { - Error += "Empty typename for payload"; - return; - } - if (utc == default) - { - Error += "DateTimeUtc not set"; - return; - } + if (string.IsNullOrEmpty(typeName)) throw new Exception("Empty typename for payload"); + if (utc == default) throw new Exception("DateTimeUtc not set"); var entry = new EventBucketEntry { @@ -126,6 +95,20 @@ namespace OverwatchTranscript buffer.Add(entry); } + private void BufferToFile() + { + if (buffer.Count > MaxBuffer) + { + using var file = File.Open(bucketFile, FileMode.Append); + using var writer = new StreamWriter(file); + foreach (var entry in buffer) + { + writer.WriteLine(JsonConvert.SerializeObject(entry)); + } + buffer.Clear(); + } + } + private void SortFileByTimestamps() { var lines = File.ReadAllLines(bucketFile); @@ -139,35 +122,16 @@ namespace OverwatchTranscript topEntry = entries.First(); } + } - private void AddPending() - { - lock (_counterLock) - { - pendingAdds++; - } - } - - private void RemovePending() - { - lock (_counterLock) - { - pendingAdds--; - if (pendingAdds < 0) Error += "Pending less than zero"; - } - } - - private void WaitForZeroPending() - { - while (true) - { - lock (_counterLock) - { - if (pendingAdds == 0) return; - } - Thread.Sleep(10); - } - } + public interface IFinalizedBucket + { + int Count { get; } + bool IsFull { get; } + DateTime EarliestUtc { get; } + DateTime LatestUtc { get; } + EventBucketEntry? ViewTopEntry(); + void PopTopEntry(); } [Serializable] diff --git a/Framework/OverwatchTranscript/Model.cs b/Framework/OverwatchTranscript/Model.cs index 08beda85..3854b997 100644 --- a/Framework/OverwatchTranscript/Model.cs +++ b/Framework/OverwatchTranscript/Model.cs @@ -4,7 +4,17 @@ public class OverwatchTranscript { public OverwatchHeader Header { get; set; } = new(); - public OverwatchMoment[] Moments { get; set; } = Array.Empty(); + public OverwatchMomentReference[] MomentReferences { get; set; } = Array.Empty(); + } + + [Serializable] + public class OverwatchMomentReference + { + public string MomentsFile { get; set; } = string.Empty; + public int NumberOfMoments { get; set; } + public int NumberOfEvents { get; set; } + public DateTime EarliestUtc { get; set; } + public DateTime LatestUtc { get; set; } } [Serializable] diff --git a/Framework/OverwatchTranscript/MomentReader.cs b/Framework/OverwatchTranscript/MomentReader.cs new file mode 100644 index 00000000..e30d4f66 --- /dev/null +++ b/Framework/OverwatchTranscript/MomentReader.cs @@ -0,0 +1,90 @@ +using Newtonsoft.Json; + +namespace OverwatchTranscript +{ + public class MomentReader + { + private readonly OverwatchTranscript model; + private readonly string workingDir; + private int referenceIndex = 0; + private int momentsRead = 0; + private OpenReference currentRef; + + public MomentReader(OverwatchTranscript model, string workingDir) + { + this.model = model; + this.workingDir = workingDir; + + currentRef = CreateOpenReference(); + } + + public OverwatchMoment? Next() + { + if (referenceIndex >= model.MomentReferences.Length) return null; + + var moment = currentRef.ReadNext(); + if (moment == null) + { + currentRef.Close(); + currentRef = null!; + + // This reference file ran out. + // The number of moments read should match exactly the number of moments + // describe in the reference. If not, error: + var expected = model.MomentReferences[referenceIndex].NumberOfMoments; + if (momentsRead != expected) + { + throw new Exception("Number of moments read from referenced file does not match number of moments value in model. " + + $"Reads: { momentsRead} - model.MomentReferences[{referenceIndex}].NumberOfMoment: {expected}"); + } + + referenceIndex++; + if (referenceIndex < model.MomentReferences.Length) + { + currentRef = CreateOpenReference(); + } + momentsRead = 0; + return Next(); + } + else + { + momentsRead++; + return moment; + } + } + + private OpenReference CreateOpenReference() + { + var filepath = Path.Combine(workingDir, model.MomentReferences[referenceIndex].MomentsFile); + return new OpenReference(filepath); + } + + private class OpenReference + { + private readonly FileStream file; + private readonly StreamReader reader; + + public OpenReference(string filePath) + { + file = File.OpenRead(filePath); + reader = new StreamReader(file); + } + + public OverwatchMoment? ReadNext() + { + var line = reader.ReadLine(); + if (string.IsNullOrEmpty(line)) return null; + return JsonConvert.DeserializeObject(line); + } + + public void Close() + { + reader.Close(); + file.Close(); + + reader.Dispose(); + file.Dispose(); + } + } + } +} diff --git a/Framework/OverwatchTranscript/MomentReferenceBuilder.cs b/Framework/OverwatchTranscript/MomentReferenceBuilder.cs new file mode 100644 index 00000000..d94a7dac --- /dev/null +++ b/Framework/OverwatchTranscript/MomentReferenceBuilder.cs @@ -0,0 +1,122 @@ +using Newtonsoft.Json; + +namespace OverwatchTranscript +{ + public class MomentReferenceBuilder + { + private const int MaxMomentsPerReference = 100; + private readonly string workingDir; + + public MomentReferenceBuilder(string workingDir) + { + this.workingDir = workingDir; + } + + public OverwatchMomentReference[] Build(IFinalizedBucket[] buckets) + { + var result = new List(); + + var currentBuilder = new Builder(workingDir); + + while (EntriesRemaining(buckets)) + { + var earliestUtc = GetEarliestUtc(buckets); + var entries = CollectAllEntriesForUtc(earliestUtc, buckets); + var moment = ConvertEntriesToMoment(entries); + currentBuilder.Add(moment); + if (currentBuilder.NumberOfMoments == MaxMomentsPerReference) + { + result.Add(currentBuilder.Build()); + currentBuilder = new Builder(workingDir); + } + } + + return result.ToArray(); + } + + private OverwatchMoment ConvertEntriesToMoment(List entries) + { + var discintUtc = entries.Select(e => e.Utc).Distinct().ToArray(); + if (discintUtc.Length != 1) throw new Exception("UTC mixing in moment construction."); + + return new OverwatchMoment + { + Utc = entries[0].Utc, + Events = entries.Select(e => e.Event).ToArray() + }; + } + + private List CollectAllEntriesForUtc(DateTime earliestUtc, IFinalizedBucket[] buckets) + { + var result = new List(); + + foreach (var bucket in buckets) + { + var top = bucket.ViewTopEntry(); + while (top != null && top.Utc == earliestUtc) + { + result.Add(top); + bucket.PopTopEntry(); + top = bucket.ViewTopEntry(); + } + } + + return result; + } + + private DateTime GetEarliestUtc(IFinalizedBucket[] buckets) + { + var earliest = DateTime.MaxValue; + foreach (var bucket in buckets) + { + var top = bucket.ViewTopEntry(); + if (top != null && top.Utc < earliest) earliest = top.Utc; + } + return earliest; + } + + private bool EntriesRemaining(IFinalizedBucket[] buckets) + { + return buckets.Any(b => b.ViewTopEntry() != null); + } + + public class Builder + { + private OverwatchMomentReference reference; + + public Builder(string workingDir) + { + reference = new OverwatchMomentReference + { + MomentsFile = Path.Combine(workingDir, Guid.NewGuid().ToString()), + EarliestUtc = DateTime.MaxValue, + LatestUtc = DateTime.MinValue, + NumberOfEvents = 0, + NumberOfMoments = 0, + }; + } + + public int NumberOfMoments => reference.NumberOfMoments; + + public void Add(OverwatchMoment moment) + { + File.AppendAllLines(reference.MomentsFile, new[] + { + JsonConvert.SerializeObject(moment) + }); + + if (moment.Utc < reference.EarliestUtc) reference.EarliestUtc = moment.Utc; + if (moment.Utc > reference.LatestUtc) reference.LatestUtc = moment.Utc; + reference.NumberOfMoments++; + reference.NumberOfEvents += moment.Events.Length; + } + + public OverwatchMomentReference Build() + { + var result = reference; + reference = null!; + return result; + } + } + } +} diff --git a/Framework/OverwatchTranscript/TranscriptReader.cs b/Framework/OverwatchTranscript/TranscriptReader.cs index ddfaec35..70fa00e2 100644 --- a/Framework/OverwatchTranscript/TranscriptReader.cs +++ b/Framework/OverwatchTranscript/TranscriptReader.cs @@ -25,9 +25,12 @@ namespace OverwatchTranscript private readonly List> momentHandlers = new List>(); private readonly Dictionary>> eventHandlers = new Dictionary>>(); private readonly string workingDir; - private OverwatchTranscript model = null!; - private long momentIndex = 0; + private readonly OverwatchTranscript model; + private readonly MomentReader reader; private bool closed; + private long momentCounter; + private readonly object queueLock = new object(); + private readonly List queue = new List(); public TranscriptReader(string workingDir, string inputFilename) { @@ -39,7 +42,8 @@ namespace OverwatchTranscript if (!Directory.Exists(workingDir)) Directory.CreateDirectory(workingDir); if (File.Exists(transcriptFile) || Directory.Exists(artifactsFolder)) throw new Exception("workingdir not clean"); - LoadModel(inputFilename); + model = LoadModel(inputFilename); + reader = new MomentReader(model, workingDir); } public OverwatchCommonHeader Header @@ -93,14 +97,15 @@ namespace OverwatchTranscript public void Next() { CheckClosed(); - if (momentIndex >= model.Moments.Length) return; + OverwatchMoment moment = null!; + lock (queueLock) + { + if (queue.Count == 0) return; + moment = queue[0]; + queue.RemoveAt(0); + } - var moment = model.Moments[momentIndex]; - var momentDuration = GetMomentDuration(); - - ActivateMoment(moment, momentDuration, momentIndex); - - momentIndex++; + ActivateMoment(moment); } public void Close() @@ -120,12 +125,10 @@ namespace OverwatchTranscript private TimeSpan? GetMomentDuration() { - if (momentIndex < 0) throw new Exception("Index < 0"); - if (momentIndex + 1 >= model.Moments.Length) return null; + if (current == null) return null; + if (next == null) return null; - return - model.Moments[momentIndex + 1].Utc - - model.Moments[momentIndex].Utc; + return next.Utc - current.Utc; } private void ActivateMoment(OverwatchMoment moment, TimeSpan? duration, long momentIndex) @@ -162,7 +165,7 @@ namespace OverwatchTranscript } } - private void LoadModel(string inputFilename) + private OverwatchTranscript LoadModel(string inputFilename) { ZipFile.ExtractToDirectory(inputFilename, workingDir); @@ -172,7 +175,7 @@ namespace OverwatchTranscript throw new Exception("Is not a transcript file. Unzipped to: " + workingDir); } - model = JsonConvert.DeserializeObject(File.ReadAllText(transcriptFile))!; + return JsonConvert.DeserializeObject(File.ReadAllText(transcriptFile))!; } private void CheckClosed() diff --git a/Framework/OverwatchTranscript/TranscriptWriter.cs b/Framework/OverwatchTranscript/TranscriptWriter.cs index 4fdc260d..f5defa3b 100644 --- a/Framework/OverwatchTranscript/TranscriptWriter.cs +++ b/Framework/OverwatchTranscript/TranscriptWriter.cs @@ -14,10 +14,11 @@ namespace OverwatchTranscript public class TranscriptWriter : ITranscriptWriter { private readonly object _lock = new object(); + private readonly MomentReferenceBuilder builder; private readonly string transcriptFile; private readonly string artifactsFolder; private readonly Dictionary header = new Dictionary(); - private readonly SortedList> buffer = new SortedList>(); + private readonly BucketSet bucketSet; private readonly string workingDir; private bool closed; @@ -25,6 +26,8 @@ namespace OverwatchTranscript { closed = false; this.workingDir = workingDir; + bucketSet = new BucketSet(workingDir); + builder = new MomentReferenceBuilder(workingDir); transcriptFile = Path.Combine(workingDir, TranscriptConstants.TranscriptFilename); artifactsFolder = Path.Combine(workingDir, TranscriptConstants.ArtifactFolderName); @@ -35,27 +38,7 @@ namespace OverwatchTranscript public void Add(DateTime utc, object payload) { CheckClosed(); - var typeName = payload.GetType().FullName; - if (string.IsNullOrEmpty(typeName)) throw new Exception("Empty typename for payload"); - if (utc == default) throw new Exception("DateTimeUtc not set"); - - var newEvent = new OverwatchEvent - { - Type = typeName, - Payload = JsonConvert.SerializeObject(payload) - }; - - lock (_lock) - { - if (buffer.ContainsKey(utc)) - { - buffer[utc].Add(newEvent); - } - else - { - buffer.Add(utc, new List { newEvent }); - } - } + bucketSet.Add(utc, payload); } public void AddHeader(string key, object value) @@ -78,12 +61,17 @@ namespace OverwatchTranscript public void Write(string outputFilename) { - if (!buffer.Any()) throw new Exception("No entries added."); + if (bucketSet.IsEmpty()) throw new Exception("No entries added."); + if (!string.IsNullOrEmpty(bucketSet.Error)) + { + throw new Exception("Exceptions in BucketSet: " + bucketSet.Error); + } CheckClosed(); closed = true; - var model = CreateModel(); + var momentReferences = builder.Build(bucketSet.FinalizeBuckets()); + var model = CreateModel(momentReferences); File.WriteAllText(transcriptFile, JsonConvert.SerializeObject(model, Formatting.Indented)); @@ -92,7 +80,7 @@ namespace OverwatchTranscript Directory.Delete(workingDir, true); } - private OverwatchTranscript CreateModel() + private OverwatchTranscript CreateModel(OverwatchMomentReference[] momentReferences) { lock (_lock) { @@ -100,7 +88,7 @@ namespace OverwatchTranscript { Header = new OverwatchHeader { - Common = CreateCommonHeader(), + Common = CreateCommonHeader(momentReferences), Entries = header.Select(h => { return new OverwatchHeaderEntry @@ -110,31 +98,27 @@ namespace OverwatchTranscript }; }).ToArray() }, - Moments = buffer.Select(p => - { - return new OverwatchMoment - { - Utc = p.Key, - Events = p.Value.ToArray() - }; - }).ToArray() + MomentReferences = momentReferences }; header.Clear(); - buffer.Clear(); - return model; } } - private OverwatchCommonHeader CreateCommonHeader() + private OverwatchCommonHeader CreateCommonHeader(OverwatchMomentReference[] momentReferences) { + var moments = momentReferences.Sum(m => m.NumberOfMoments); + var events = momentReferences.Sum(m => m.NumberOfEvents); + var earliest = momentReferences.Min(m => m.EarliestUtc); + var latest = momentReferences.Max(m => m.LatestUtc); + return new OverwatchCommonHeader { - NumberOfMoments = buffer.Count, - NumberOfEvents = buffer.Sum(e => e.Value.Count), - EarliestUtc = buffer.Min(e => e.Key), - LatestUtc = buffer.Max(e => e.Key) + NumberOfMoments = moments, + NumberOfEvents = events, + EarliestUtc = earliest, + LatestUtc = latest }; }