This commit is contained in:
benbierens 2024-08-01 16:25:28 +02:00
parent 53aad5cb37
commit 7a3a2b558b
No known key found for this signature in database
GPG Key ID: 877D2C2E09A22F3A
7 changed files with 418 additions and 134 deletions

View File

@ -0,0 +1,111 @@
namespace OverwatchTranscript
{
public class BucketSet
{
private const int numberOfActiveBuckets = 5;
private readonly object _counterLock = new object();
private int pendingAdds = 0;
private readonly object _bucketLock = new object();
private readonly List<EventBucket> fullBuckets = new List<EventBucket>();
private readonly List<EventBucket> activeBuckets = new List<EventBucket>();
private int activeBucketIndex = 0;
private bool closed = false;
private readonly string workingDir;
public BucketSet(string workingDir)
{
this.workingDir = workingDir;
for (var i = 0; i < numberOfActiveBuckets;i++)
{
AddNewBucket();
}
}
public string Error { get; private set; } = string.Empty;
public void Add(DateTime utc, object payload)
{
if (closed) throw new Exception("Buckets already closed!");
AddPending();
Task.Run(() => AddInternal(utc, payload));
}
public bool IsEmpty()
{
return fullBuckets.All(b => b.Count == 0) && activeBuckets.All(b => b.Count == 0);
}
public IFinalizedBucket[] FinalizeBuckets()
{
closed = true;
WaitForZeroPending();
var buckets = fullBuckets.Concat(activeBuckets).ToArray();
return buckets.Select(b => b.FinalizeBucket()).ToArray();
}
private void AddInternal(DateTime utc, object payload)
{
try
{
lock (_bucketLock)
{
var current = activeBuckets[activeBucketIndex];
current.Add(utc, payload);
activeBucketIndex = (activeBucketIndex + 1) % numberOfActiveBuckets;
if (current.IsFull)
{
fullBuckets.Add(current);
activeBuckets.Remove(current);
AddNewBucket();
}
RemovePending();
}
}
catch (Exception ex)
{
Error += ex.ToString();
}
}
private void AddNewBucket()
{
lock (_bucketLock)
{
activeBuckets.Add(new EventBucket(Path.Combine(workingDir, Guid.NewGuid().ToString())));
}
}
private void AddPending()
{
lock (_counterLock)
{
pendingAdds++;
}
}
private void RemovePending()
{
lock (_counterLock)
{
pendingAdds--;
if (pendingAdds < 0) Error += "Pending less than zero";
}
}
private void WaitForZeroPending()
{
while (true)
{
lock (_counterLock)
{
if (pendingAdds == 0) return;
}
Thread.Sleep(10);
}
}
}
}

View File

@ -2,15 +2,14 @@
namespace OverwatchTranscript
{
public class EventBucket
public class EventBucket : IFinalizedBucket
{
private const int MaxCount = 10000;
private const int MaxBuffer = 100;
private readonly object _lock = new object();
private readonly object _counterLock = new object();
private bool closed = false;
private int pendingAdds = 0;
private readonly string bucketFile;
private readonly List<EventBucketEntry> buffer = new List<EventBucketEntry>();
private EventBucketEntry? topEntry;
@ -28,24 +27,26 @@ namespace OverwatchTranscript
public bool IsFull { get; private set; }
public DateTime EarliestUtc { get; private set; }
public DateTime LatestUtc { get; private set; }
public string Error { get; private set; } = string.Empty;
public void Add(DateTime utc, object payload)
{
if (closed) throw new Exception("Already closed");
AddPending();
Task.Run(() => InternalAdd(utc, payload));
}
public void FinalizeBucket()
{
closed = true;
lock (_lock)
{
WaitForZeroPending();
if (closed) throw new Exception("Already closed");
AddToBuffer(utc, payload);
BufferToFile();
}
}
public IFinalizedBucket FinalizeBucket()
{
lock (_lock)
{
closed = true;
BufferToFile();
SortFileByTimestamps();
}
return this;
}
public EventBucketEntry? ViewTopEntry()
@ -70,43 +71,11 @@ namespace OverwatchTranscript
}
}
private void InternalAdd(DateTime utc, object payload)
{
lock (_lock)
{
AddToBuffer(utc, payload);
BufferToFile();
RemovePending();
}
}
private void BufferToFile()
{
if (buffer.Count > MaxBuffer)
{
using var file = File.Open(bucketFile, FileMode.Append);
using var writer = new StreamWriter(file);
foreach (var entry in buffer)
{
writer.WriteLine(JsonConvert.SerializeObject(entry));
}
buffer.Clear();
}
}
private void AddToBuffer(DateTime utc, object payload)
{
var typeName = payload.GetType().FullName;
if (string.IsNullOrEmpty(typeName))
{
Error += "Empty typename for payload";
return;
}
if (utc == default)
{
Error += "DateTimeUtc not set";
return;
}
if (string.IsNullOrEmpty(typeName)) throw new Exception("Empty typename for payload");
if (utc == default) throw new Exception("DateTimeUtc not set");
var entry = new EventBucketEntry
{
@ -126,6 +95,20 @@ namespace OverwatchTranscript
buffer.Add(entry);
}
private void BufferToFile()
{
if (buffer.Count > MaxBuffer)
{
using var file = File.Open(bucketFile, FileMode.Append);
using var writer = new StreamWriter(file);
foreach (var entry in buffer)
{
writer.WriteLine(JsonConvert.SerializeObject(entry));
}
buffer.Clear();
}
}
private void SortFileByTimestamps()
{
var lines = File.ReadAllLines(bucketFile);
@ -139,35 +122,16 @@ namespace OverwatchTranscript
topEntry = entries.First();
}
}
private void AddPending()
{
lock (_counterLock)
{
pendingAdds++;
}
}
private void RemovePending()
{
lock (_counterLock)
{
pendingAdds--;
if (pendingAdds < 0) Error += "Pending less than zero";
}
}
private void WaitForZeroPending()
{
while (true)
{
lock (_counterLock)
{
if (pendingAdds == 0) return;
}
Thread.Sleep(10);
}
}
public interface IFinalizedBucket
{
int Count { get; }
bool IsFull { get; }
DateTime EarliestUtc { get; }
DateTime LatestUtc { get; }
EventBucketEntry? ViewTopEntry();
void PopTopEntry();
}
[Serializable]

View File

@ -4,7 +4,17 @@
public class OverwatchTranscript
{
public OverwatchHeader Header { get; set; } = new();
public OverwatchMoment[] Moments { get; set; } = Array.Empty<OverwatchMoment>();
public OverwatchMomentReference[] MomentReferences { get; set; } = Array.Empty<OverwatchMomentReference>();
}
[Serializable]
public class OverwatchMomentReference
{
public string MomentsFile { get; set; } = string.Empty;
public int NumberOfMoments { get; set; }
public int NumberOfEvents { get; set; }
public DateTime EarliestUtc { get; set; }
public DateTime LatestUtc { get; set; }
}
[Serializable]

View File

@ -0,0 +1,90 @@
using Newtonsoft.Json;
namespace OverwatchTranscript
{
public class MomentReader
{
private readonly OverwatchTranscript model;
private readonly string workingDir;
private int referenceIndex = 0;
private int momentsRead = 0;
private OpenReference currentRef;
public MomentReader(OverwatchTranscript model, string workingDir)
{
this.model = model;
this.workingDir = workingDir;
currentRef = CreateOpenReference();
}
public OverwatchMoment? Next()
{
if (referenceIndex >= model.MomentReferences.Length) return null;
var moment = currentRef.ReadNext();
if (moment == null)
{
currentRef.Close();
currentRef = null!;
// This reference file ran out.
// The number of moments read should match exactly the number of moments
// describe in the reference. If not, error:
var expected = model.MomentReferences[referenceIndex].NumberOfMoments;
if (momentsRead != expected)
{
throw new Exception("Number of moments read from referenced file does not match number of moments value in model. " +
$"Reads: { momentsRead} - model.MomentReferences[{referenceIndex}].NumberOfMoment: {expected}");
}
referenceIndex++;
if (referenceIndex < model.MomentReferences.Length)
{
currentRef = CreateOpenReference();
}
momentsRead = 0;
return Next();
}
else
{
momentsRead++;
return moment;
}
}
private OpenReference CreateOpenReference()
{
var filepath = Path.Combine(workingDir, model.MomentReferences[referenceIndex].MomentsFile);
return new OpenReference(filepath);
}
private class OpenReference
{
private readonly FileStream file;
private readonly StreamReader reader;
public OpenReference(string filePath)
{
file = File.OpenRead(filePath);
reader = new StreamReader(file);
}
public OverwatchMoment? ReadNext()
{
var line = reader.ReadLine();
if (string.IsNullOrEmpty(line)) return null;
return JsonConvert.DeserializeObject<OverwatchMoment>(line);
}
public void Close()
{
reader.Close();
file.Close();
reader.Dispose();
file.Dispose();
}
}
}
}

View File

@ -0,0 +1,122 @@
using Newtonsoft.Json;
namespace OverwatchTranscript
{
public class MomentReferenceBuilder
{
private const int MaxMomentsPerReference = 100;
private readonly string workingDir;
public MomentReferenceBuilder(string workingDir)
{
this.workingDir = workingDir;
}
public OverwatchMomentReference[] Build(IFinalizedBucket[] buckets)
{
var result = new List<OverwatchMomentReference>();
var currentBuilder = new Builder(workingDir);
while (EntriesRemaining(buckets))
{
var earliestUtc = GetEarliestUtc(buckets);
var entries = CollectAllEntriesForUtc(earliestUtc, buckets);
var moment = ConvertEntriesToMoment(entries);
currentBuilder.Add(moment);
if (currentBuilder.NumberOfMoments == MaxMomentsPerReference)
{
result.Add(currentBuilder.Build());
currentBuilder = new Builder(workingDir);
}
}
return result.ToArray();
}
private OverwatchMoment ConvertEntriesToMoment(List<EventBucketEntry> entries)
{
var discintUtc = entries.Select(e => e.Utc).Distinct().ToArray();
if (discintUtc.Length != 1) throw new Exception("UTC mixing in moment construction.");
return new OverwatchMoment
{
Utc = entries[0].Utc,
Events = entries.Select(e => e.Event).ToArray()
};
}
private List<EventBucketEntry> CollectAllEntriesForUtc(DateTime earliestUtc, IFinalizedBucket[] buckets)
{
var result = new List<EventBucketEntry>();
foreach (var bucket in buckets)
{
var top = bucket.ViewTopEntry();
while (top != null && top.Utc == earliestUtc)
{
result.Add(top);
bucket.PopTopEntry();
top = bucket.ViewTopEntry();
}
}
return result;
}
private DateTime GetEarliestUtc(IFinalizedBucket[] buckets)
{
var earliest = DateTime.MaxValue;
foreach (var bucket in buckets)
{
var top = bucket.ViewTopEntry();
if (top != null && top.Utc < earliest) earliest = top.Utc;
}
return earliest;
}
private bool EntriesRemaining(IFinalizedBucket[] buckets)
{
return buckets.Any(b => b.ViewTopEntry() != null);
}
public class Builder
{
private OverwatchMomentReference reference;
public Builder(string workingDir)
{
reference = new OverwatchMomentReference
{
MomentsFile = Path.Combine(workingDir, Guid.NewGuid().ToString()),
EarliestUtc = DateTime.MaxValue,
LatestUtc = DateTime.MinValue,
NumberOfEvents = 0,
NumberOfMoments = 0,
};
}
public int NumberOfMoments => reference.NumberOfMoments;
public void Add(OverwatchMoment moment)
{
File.AppendAllLines(reference.MomentsFile, new[]
{
JsonConvert.SerializeObject(moment)
});
if (moment.Utc < reference.EarliestUtc) reference.EarliestUtc = moment.Utc;
if (moment.Utc > reference.LatestUtc) reference.LatestUtc = moment.Utc;
reference.NumberOfMoments++;
reference.NumberOfEvents += moment.Events.Length;
}
public OverwatchMomentReference Build()
{
var result = reference;
reference = null!;
return result;
}
}
}
}

View File

@ -25,9 +25,12 @@ namespace OverwatchTranscript
private readonly List<Action<ActivateMoment>> momentHandlers = new List<Action<ActivateMoment>>();
private readonly Dictionary<string, List<Action<ActivateMoment, string>>> eventHandlers = new Dictionary<string, List<Action<ActivateMoment, string>>>();
private readonly string workingDir;
private OverwatchTranscript model = null!;
private long momentIndex = 0;
private readonly OverwatchTranscript model;
private readonly MomentReader reader;
private bool closed;
private long momentCounter;
private readonly object queueLock = new object();
private readonly List<OverwatchMoment> queue = new List<OverwatchMoment>();
public TranscriptReader(string workingDir, string inputFilename)
{
@ -39,7 +42,8 @@ namespace OverwatchTranscript
if (!Directory.Exists(workingDir)) Directory.CreateDirectory(workingDir);
if (File.Exists(transcriptFile) || Directory.Exists(artifactsFolder)) throw new Exception("workingdir not clean");
LoadModel(inputFilename);
model = LoadModel(inputFilename);
reader = new MomentReader(model, workingDir);
}
public OverwatchCommonHeader Header
@ -93,14 +97,15 @@ namespace OverwatchTranscript
public void Next()
{
CheckClosed();
if (momentIndex >= model.Moments.Length) return;
OverwatchMoment moment = null!;
lock (queueLock)
{
if (queue.Count == 0) return;
moment = queue[0];
queue.RemoveAt(0);
}
var moment = model.Moments[momentIndex];
var momentDuration = GetMomentDuration();
ActivateMoment(moment, momentDuration, momentIndex);
momentIndex++;
ActivateMoment(moment);
}
public void Close()
@ -120,12 +125,10 @@ namespace OverwatchTranscript
private TimeSpan? GetMomentDuration()
{
if (momentIndex < 0) throw new Exception("Index < 0");
if (momentIndex + 1 >= model.Moments.Length) return null;
if (current == null) return null;
if (next == null) return null;
return
model.Moments[momentIndex + 1].Utc -
model.Moments[momentIndex].Utc;
return next.Utc - current.Utc;
}
private void ActivateMoment(OverwatchMoment moment, TimeSpan? duration, long momentIndex)
@ -162,7 +165,7 @@ namespace OverwatchTranscript
}
}
private void LoadModel(string inputFilename)
private OverwatchTranscript LoadModel(string inputFilename)
{
ZipFile.ExtractToDirectory(inputFilename, workingDir);
@ -172,7 +175,7 @@ namespace OverwatchTranscript
throw new Exception("Is not a transcript file. Unzipped to: " + workingDir);
}
model = JsonConvert.DeserializeObject<OverwatchTranscript>(File.ReadAllText(transcriptFile))!;
return JsonConvert.DeserializeObject<OverwatchTranscript>(File.ReadAllText(transcriptFile))!;
}
private void CheckClosed()

View File

@ -14,10 +14,11 @@ namespace OverwatchTranscript
public class TranscriptWriter : ITranscriptWriter
{
private readonly object _lock = new object();
private readonly MomentReferenceBuilder builder;
private readonly string transcriptFile;
private readonly string artifactsFolder;
private readonly Dictionary<string, string> header = new Dictionary<string, string>();
private readonly SortedList<DateTime, List<OverwatchEvent>> buffer = new SortedList<DateTime, List<OverwatchEvent>>();
private readonly BucketSet bucketSet;
private readonly string workingDir;
private bool closed;
@ -25,6 +26,8 @@ namespace OverwatchTranscript
{
closed = false;
this.workingDir = workingDir;
bucketSet = new BucketSet(workingDir);
builder = new MomentReferenceBuilder(workingDir);
transcriptFile = Path.Combine(workingDir, TranscriptConstants.TranscriptFilename);
artifactsFolder = Path.Combine(workingDir, TranscriptConstants.ArtifactFolderName);
@ -35,27 +38,7 @@ namespace OverwatchTranscript
public void Add(DateTime utc, object payload)
{
CheckClosed();
var typeName = payload.GetType().FullName;
if (string.IsNullOrEmpty(typeName)) throw new Exception("Empty typename for payload");
if (utc == default) throw new Exception("DateTimeUtc not set");
var newEvent = new OverwatchEvent
{
Type = typeName,
Payload = JsonConvert.SerializeObject(payload)
};
lock (_lock)
{
if (buffer.ContainsKey(utc))
{
buffer[utc].Add(newEvent);
}
else
{
buffer.Add(utc, new List<OverwatchEvent> { newEvent });
}
}
bucketSet.Add(utc, payload);
}
public void AddHeader(string key, object value)
@ -78,12 +61,17 @@ namespace OverwatchTranscript
public void Write(string outputFilename)
{
if (!buffer.Any()) throw new Exception("No entries added.");
if (bucketSet.IsEmpty()) throw new Exception("No entries added.");
if (!string.IsNullOrEmpty(bucketSet.Error))
{
throw new Exception("Exceptions in BucketSet: " + bucketSet.Error);
}
CheckClosed();
closed = true;
var model = CreateModel();
var momentReferences = builder.Build(bucketSet.FinalizeBuckets());
var model = CreateModel(momentReferences);
File.WriteAllText(transcriptFile, JsonConvert.SerializeObject(model, Formatting.Indented));
@ -92,7 +80,7 @@ namespace OverwatchTranscript
Directory.Delete(workingDir, true);
}
private OverwatchTranscript CreateModel()
private OverwatchTranscript CreateModel(OverwatchMomentReference[] momentReferences)
{
lock (_lock)
{
@ -100,7 +88,7 @@ namespace OverwatchTranscript
{
Header = new OverwatchHeader
{
Common = CreateCommonHeader(),
Common = CreateCommonHeader(momentReferences),
Entries = header.Select(h =>
{
return new OverwatchHeaderEntry
@ -110,31 +98,27 @@ namespace OverwatchTranscript
};
}).ToArray()
},
Moments = buffer.Select(p =>
{
return new OverwatchMoment
{
Utc = p.Key,
Events = p.Value.ToArray()
};
}).ToArray()
MomentReferences = momentReferences
};
header.Clear();
buffer.Clear();
return model;
}
}
private OverwatchCommonHeader CreateCommonHeader()
private OverwatchCommonHeader CreateCommonHeader(OverwatchMomentReference[] momentReferences)
{
var moments = momentReferences.Sum(m => m.NumberOfMoments);
var events = momentReferences.Sum(m => m.NumberOfEvents);
var earliest = momentReferences.Min(m => m.EarliestUtc);
var latest = momentReferences.Max(m => m.LatestUtc);
return new OverwatchCommonHeader
{
NumberOfMoments = buffer.Count,
NumberOfEvents = buffer.Sum(e => e.Value.Count),
EarliestUtc = buffer.Min(e => e.Key),
LatestUtc = buffer.Max(e => e.Key)
NumberOfMoments = moments,
NumberOfEvents = events,
EarliestUtc = earliest,
LatestUtc = latest
};
}