diff --git a/Framework/OverwatchTranscript/EventBucket.cs b/Framework/OverwatchTranscript/EventBucket.cs new file mode 100644 index 0000000..09ee121 --- /dev/null +++ b/Framework/OverwatchTranscript/EventBucket.cs @@ -0,0 +1,179 @@ +using Newtonsoft.Json; + +namespace OverwatchTranscript +{ + public class EventBucket + { + private const int MaxCount = 10000; + private const int MaxBuffer = 100; + + private readonly object _lock = new object(); + private readonly object _counterLock = new object(); + private bool closed = false; + private int pendingAdds = 0; + private readonly string bucketFile; + private readonly List buffer = new List(); + private EventBucketEntry? topEntry; + + public EventBucket(string bucketFile) + { + this.bucketFile = bucketFile; + if (File.Exists(bucketFile)) throw new Exception("Already exists"); + + EarliestUtc = DateTime.MaxValue; + LatestUtc = DateTime.MinValue; + } + + public int Count { get; private set; } + public bool IsFull { get; private set; } + public DateTime EarliestUtc { get; private set; } + public DateTime LatestUtc { get; private set; } + public string Error { get; private set; } = string.Empty; + + public void Add(DateTime utc, object payload) + { + if (closed) throw new Exception("Already closed"); + AddPending(); + Task.Run(() => InternalAdd(utc, payload)); + } + + public void FinalizeBucket() + { + closed = true; + lock (_lock) + { + WaitForZeroPending(); + BufferToFile(); + SortFileByTimestamps(); + } + } + + public EventBucketEntry? ViewTopEntry() + { + if (!closed) throw new Exception("Bucket not closed yet. FinalizeBucket first."); + return topEntry; + } + + public void PopTopEntry() + { + var lines = File.ReadAllLines(bucketFile).ToList(); + lines.RemoveAt(0); + File.WriteAllLines(bucketFile, lines); + + if (lines.Any()) + { + topEntry = JsonConvert.DeserializeObject(lines[0]); + } + else + { + topEntry = null; + } + } + + private void InternalAdd(DateTime utc, object payload) + { + lock (_lock) + { + AddToBuffer(utc, payload); + BufferToFile(); + RemovePending(); + } + } + + private void BufferToFile() + { + if (buffer.Count > MaxBuffer) + { + using var file = File.Open(bucketFile, FileMode.Append); + using var writer = new StreamWriter(file); + foreach (var entry in buffer) + { + writer.WriteLine(JsonConvert.SerializeObject(entry)); + } + buffer.Clear(); + } + } + + private void AddToBuffer(DateTime utc, object payload) + { + var typeName = payload.GetType().FullName; + if (string.IsNullOrEmpty(typeName)) + { + Error += "Empty typename for payload"; + return; + } + if (utc == default) + { + Error += "DateTimeUtc not set"; + return; + } + + var entry = new EventBucketEntry + { + Utc = utc, + Event = new OverwatchEvent + { + Type = typeName, + Payload = JsonConvert.SerializeObject(payload) + } + }; + + if (utc < EarliestUtc) EarliestUtc = utc; + if (utc > LatestUtc) LatestUtc = utc; + Count++; + IsFull = Count > MaxCount; + + buffer.Add(entry); + } + + private void SortFileByTimestamps() + { + var lines = File.ReadAllLines(bucketFile); + var entries = lines.Select(JsonConvert.DeserializeObject) + .Cast() + .OrderBy(e => e.Utc) + .ToArray(); + + File.Delete(bucketFile); + File.WriteAllLines(bucketFile, entries.Select(JsonConvert.SerializeObject)); + + topEntry = entries.First(); + } + + private void AddPending() + { + lock (_counterLock) + { + pendingAdds++; + } + } + + private void RemovePending() + { + lock (_counterLock) + { + pendingAdds--; + if (pendingAdds < 0) Error += "Pending less than zero"; + } + } + + private void WaitForZeroPending() + { + while (true) + { + lock (_counterLock) + { + if (pendingAdds == 0) return; + } + Thread.Sleep(10); + } + } + } + + [Serializable] + public class EventBucketEntry + { + public DateTime Utc { get; set; } + public OverwatchEvent Event { get; set; } = new(); + } +} diff --git a/ProjectPlugins/CodexPlugin/CodexNode.cs b/ProjectPlugins/CodexPlugin/CodexNode.cs index 22e6b99..98b3653 100644 --- a/ProjectPlugins/CodexPlugin/CodexNode.cs +++ b/ProjectPlugins/CodexPlugin/CodexNode.cs @@ -144,7 +144,7 @@ namespace CodexPlugin hooks.OnFileUploading(uniqueId, size); - var logMessage = $"Uploading file '{file.Describe()}'..."; + var logMessage = $"Uploading file {file.Describe()}..."; var measurement = Stopwatch.Measure(log, logMessage, () => { return CodexAccess.UploadFile(fileStream, onFailure); @@ -156,7 +156,7 @@ namespace CodexPlugin if (string.IsNullOrEmpty(response)) FrameworkAssert.Fail("Received empty response."); if (response.StartsWith(UploadFailedMessage)) FrameworkAssert.Fail("Node failed to store block."); - Log($"Uploaded file '{file.Describe()}'. Received contentId: '{response}'."); + Log($"Uploaded file {file.Describe()}. Received contentId: '{response}'."); var cid = new ContentId(response); hooks.OnFileUploaded(uniqueId, size, cid); diff --git a/ProjectPlugins/CodexPlugin/OverwatchSupport/CodexLogConverter.cs b/ProjectPlugins/CodexPlugin/OverwatchSupport/CodexLogConverter.cs index 0f62c53..23fa642 100644 --- a/ProjectPlugins/CodexPlugin/OverwatchSupport/CodexLogConverter.cs +++ b/ProjectPlugins/CodexPlugin/OverwatchSupport/CodexLogConverter.cs @@ -79,7 +79,7 @@ namespace CodexPlugin.OverwatchSupport }); } - public void AddEvent(DateTime utc, Action action) + private void AddEvent(DateTime utc, Action action) { var e = new OverwatchCodexEvent { diff --git a/ProjectPlugins/CodexPlugin/OverwatchSupport/CodexNodeTranscriptWriter.cs b/ProjectPlugins/CodexPlugin/OverwatchSupport/CodexNodeTranscriptWriter.cs new file mode 100644 index 0000000..f0e5de3 --- /dev/null +++ b/ProjectPlugins/CodexPlugin/OverwatchSupport/CodexNodeTranscriptWriter.cs @@ -0,0 +1,150 @@ +using CodexPlugin.Hooks; +using OverwatchTranscript; +using Utils; + +namespace CodexPlugin.OverwatchSupport +{ + public class CodexNodeTranscriptWriter : ICodexNodeHooks + { + private readonly ITranscriptWriter writer; + private readonly NameIdMap nameIdMap; + private readonly string name; + private string peerId = string.Empty; + private readonly List<(DateTime, OverwatchCodexEvent)> pendingEvents = new List<(DateTime, OverwatchCodexEvent)>(); + + public CodexNodeTranscriptWriter(ITranscriptWriter writer, NameIdMap nameIdMap, string name) + { + this.writer = writer; + this.nameIdMap = nameIdMap; + this.name = name; + } + + public void OnNodeStarting(DateTime startUtc, string image) + { + WriteCodexEvent(startUtc, e => + { + e.NodeStarting = new NodeStartingEvent + { + Image = image + }; + }); + } + + public void OnNodeStarted(string peerId) + { + this.peerId = peerId; + nameIdMap.Add(name, peerId); + WriteCodexEvent(e => + { + e.NodeStarted = new NodeStartedEvent + { + }; + }); + } + + public void OnNodeStopping() + { + WriteCodexEvent(e => + { + e.NodeStopping = new NodeStoppingEvent + { + }; + }); + } + + public void OnFileDownloading(ContentId cid) + { + WriteCodexEvent(e => + { + e.FileDownloading = new FileDownloadingEvent + { + Cid = cid.Id + }; + }); + } + + public void OnFileDownloaded(ByteSize size, ContentId cid) + { + WriteCodexEvent(e => + { + e.FileDownloaded = new FileDownloadedEvent + { + Cid = cid.Id, + ByteSize = size.SizeInBytes + }; + }); + } + + public void OnFileUploading(string uid, ByteSize size) + { + WriteCodexEvent(e => + { + e.FileUploading = new FileUploadingEvent + { + UniqueId = uid, + ByteSize = size.SizeInBytes + }; + }); + } + + public void OnFileUploaded(string uid, ByteSize size, ContentId cid) + { + WriteCodexEvent(e => + { + e.FileUploaded = new FileUploadedEvent + { + UniqueId = uid, + Cid = cid.Id, + ByteSize = size.SizeInBytes + }; + }); + } + + private void WriteCodexEvent(Action action) + { + WriteCodexEvent(DateTime.UtcNow, action); + } + + private void WriteCodexEvent(DateTime utc, Action action) + { + var e = new OverwatchCodexEvent + { + Name = name, + PeerId = peerId + }; + + action(e); + + if (string.IsNullOrEmpty(peerId)) + { + // If we don't know our peerId, don't write the events yet. + AddToCache(utc, e); + } + else + { + e.Write(utc, writer); + + // Write any events that we cached when we didn't have our peerId yet. + WriteAndClearCache(); + } + } + + private void AddToCache(DateTime utc, OverwatchCodexEvent e) + { + pendingEvents.Add((utc, e)); + } + + private void WriteAndClearCache() + { + if (pendingEvents.Any()) + { + foreach (var pair in pendingEvents) + { + pair.Item2.PeerId = peerId; + pair.Item2.Write(pair.Item1, writer); + } + pendingEvents.Clear(); + } + } + } +} diff --git a/ProjectPlugins/CodexPlugin/OverwatchSupport/CodexTranscriptWriter.cs b/ProjectPlugins/CodexPlugin/OverwatchSupport/CodexTranscriptWriter.cs index 8bfbb02..436f1f7 100644 --- a/ProjectPlugins/CodexPlugin/OverwatchSupport/CodexTranscriptWriter.cs +++ b/ProjectPlugins/CodexPlugin/OverwatchSupport/CodexTranscriptWriter.cs @@ -86,148 +86,4 @@ namespace CodexPlugin.OverwatchSupport return log.GetLinesContaining("Run Codex node").Any(); } } - - public class CodexNodeTranscriptWriter : ICodexNodeHooks - { - private readonly ITranscriptWriter writer; - private readonly NameIdMap nameIdMap; - private readonly string name; - private string peerId = string.Empty; - private readonly List<(DateTime, OverwatchCodexEvent)> pendingEvents = new List<(DateTime, OverwatchCodexEvent)>(); - - public CodexNodeTranscriptWriter(ITranscriptWriter writer, NameIdMap nameIdMap, string name) - { - this.writer = writer; - this.nameIdMap = nameIdMap; - this.name = name; - } - - public void OnNodeStarting(DateTime startUtc, string image) - { - WriteCodexEvent(startUtc, e => - { - e.NodeStarting = new NodeStartingEvent - { - Image = image - }; - }); - } - - public void OnNodeStarted(string peerId) - { - this.peerId = peerId; - nameIdMap.Add(name, peerId); - WriteCodexEvent(e => - { - e.NodeStarted = new NodeStartedEvent - { - }; - }); - } - - public void OnNodeStopping() - { - WriteCodexEvent(e => - { - e.NodeStopping = new NodeStoppingEvent - { - }; - }); - } - - public void OnFileDownloading(ContentId cid) - { - WriteCodexEvent(e => - { - e.FileDownloading = new FileDownloadingEvent - { - Cid = cid.Id - }; - }); - } - - public void OnFileDownloaded(ByteSize size, ContentId cid) - { - WriteCodexEvent(e => - { - e.FileDownloaded = new FileDownloadedEvent - { - Cid = cid.Id, - ByteSize = size.SizeInBytes - }; - }); - } - - public void OnFileUploading(string uid, ByteSize size) - { - WriteCodexEvent(e => - { - e.FileUploading = new FileUploadingEvent - { - UniqueId = uid, - ByteSize = size.SizeInBytes - }; - }); - } - - public void OnFileUploaded(string uid, ByteSize size, ContentId cid) - { - WriteCodexEvent(e => - { - e.FileUploaded = new FileUploadedEvent - { - UniqueId = uid, - Cid = cid.Id, - ByteSize = size.SizeInBytes - }; - }); - } - - private void WriteCodexEvent(Action action) - { - WriteCodexEvent(DateTime.UtcNow, action); - } - - private void WriteCodexEvent(DateTime utc, Action action) - { - var e = new OverwatchCodexEvent - { - Name = name, - PeerId = peerId - }; - - action(e); - - if (string.IsNullOrEmpty(peerId)) - { - // If we don't know our peerId, don't write the events yet. - AddToCache(utc, e); - } - else - { - e.Write(utc, writer); - - // Write any events that we cached when we didn't have our peerId yet. - WriteAndClearCache(); - } - } - - private void AddToCache(DateTime utc, OverwatchCodexEvent e) - { - pendingEvents.Add((utc, e)); - } - - private void WriteAndClearCache() - { - if (pendingEvents.Any()) - { - foreach (var pair in pendingEvents) - { - pair.Item2.PeerId = peerId; - pair.Item2.Write(pair.Item1, writer); - } - pendingEvents.Clear(); - } - } - } }