2
0
mirror of synced 2025-01-11 09:06:56 +00:00

Sets up event bucket

This commit is contained in:
benbierens 2024-08-01 14:50:25 +02:00
parent fd65e1f022
commit 53aad5cb37
No known key found for this signature in database
GPG Key ID: 877D2C2E09A22F3A
5 changed files with 332 additions and 147 deletions

View File

@ -0,0 +1,179 @@
using Newtonsoft.Json;
namespace OverwatchTranscript
{
public class EventBucket
{
private const int MaxCount = 10000;
private const int MaxBuffer = 100;
private readonly object _lock = new object();
private readonly object _counterLock = new object();
private bool closed = false;
private int pendingAdds = 0;
private readonly string bucketFile;
private readonly List<EventBucketEntry> buffer = new List<EventBucketEntry>();
private EventBucketEntry? topEntry;
public EventBucket(string bucketFile)
{
this.bucketFile = bucketFile;
if (File.Exists(bucketFile)) throw new Exception("Already exists");
EarliestUtc = DateTime.MaxValue;
LatestUtc = DateTime.MinValue;
}
public int Count { get; private set; }
public bool IsFull { get; private set; }
public DateTime EarliestUtc { get; private set; }
public DateTime LatestUtc { get; private set; }
public string Error { get; private set; } = string.Empty;
public void Add(DateTime utc, object payload)
{
if (closed) throw new Exception("Already closed");
AddPending();
Task.Run(() => InternalAdd(utc, payload));
}
public void FinalizeBucket()
{
closed = true;
lock (_lock)
{
WaitForZeroPending();
BufferToFile();
SortFileByTimestamps();
}
}
public EventBucketEntry? ViewTopEntry()
{
if (!closed) throw new Exception("Bucket not closed yet. FinalizeBucket first.");
return topEntry;
}
public void PopTopEntry()
{
var lines = File.ReadAllLines(bucketFile).ToList();
lines.RemoveAt(0);
File.WriteAllLines(bucketFile, lines);
if (lines.Any())
{
topEntry = JsonConvert.DeserializeObject<EventBucketEntry>(lines[0]);
}
else
{
topEntry = null;
}
}
private void InternalAdd(DateTime utc, object payload)
{
lock (_lock)
{
AddToBuffer(utc, payload);
BufferToFile();
RemovePending();
}
}
private void BufferToFile()
{
if (buffer.Count > MaxBuffer)
{
using var file = File.Open(bucketFile, FileMode.Append);
using var writer = new StreamWriter(file);
foreach (var entry in buffer)
{
writer.WriteLine(JsonConvert.SerializeObject(entry));
}
buffer.Clear();
}
}
private void AddToBuffer(DateTime utc, object payload)
{
var typeName = payload.GetType().FullName;
if (string.IsNullOrEmpty(typeName))
{
Error += "Empty typename for payload";
return;
}
if (utc == default)
{
Error += "DateTimeUtc not set";
return;
}
var entry = new EventBucketEntry
{
Utc = utc,
Event = new OverwatchEvent
{
Type = typeName,
Payload = JsonConvert.SerializeObject(payload)
}
};
if (utc < EarliestUtc) EarliestUtc = utc;
if (utc > LatestUtc) LatestUtc = utc;
Count++;
IsFull = Count > MaxCount;
buffer.Add(entry);
}
private void SortFileByTimestamps()
{
var lines = File.ReadAllLines(bucketFile);
var entries = lines.Select(JsonConvert.DeserializeObject<EventBucketEntry>)
.Cast<EventBucketEntry>()
.OrderBy(e => e.Utc)
.ToArray();
File.Delete(bucketFile);
File.WriteAllLines(bucketFile, entries.Select(JsonConvert.SerializeObject));
topEntry = entries.First();
}
private void AddPending()
{
lock (_counterLock)
{
pendingAdds++;
}
}
private void RemovePending()
{
lock (_counterLock)
{
pendingAdds--;
if (pendingAdds < 0) Error += "Pending less than zero";
}
}
private void WaitForZeroPending()
{
while (true)
{
lock (_counterLock)
{
if (pendingAdds == 0) return;
}
Thread.Sleep(10);
}
}
}
[Serializable]
public class EventBucketEntry
{
public DateTime Utc { get; set; }
public OverwatchEvent Event { get; set; } = new();
}
}

View File

@ -144,7 +144,7 @@ namespace CodexPlugin
hooks.OnFileUploading(uniqueId, size);
var logMessage = $"Uploading file '{file.Describe()}'...";
var logMessage = $"Uploading file {file.Describe()}...";
var measurement = Stopwatch.Measure(log, logMessage, () =>
{
return CodexAccess.UploadFile(fileStream, onFailure);
@ -156,7 +156,7 @@ namespace CodexPlugin
if (string.IsNullOrEmpty(response)) FrameworkAssert.Fail("Received empty response.");
if (response.StartsWith(UploadFailedMessage)) FrameworkAssert.Fail("Node failed to store block.");
Log($"Uploaded file '{file.Describe()}'. Received contentId: '{response}'.");
Log($"Uploaded file {file.Describe()}. Received contentId: '{response}'.");
var cid = new ContentId(response);
hooks.OnFileUploaded(uniqueId, size, cid);

View File

@ -79,7 +79,7 @@ namespace CodexPlugin.OverwatchSupport
});
}
public void AddEvent(DateTime utc, Action<OverwatchCodexEvent> action)
private void AddEvent(DateTime utc, Action<OverwatchCodexEvent> action)
{
var e = new OverwatchCodexEvent
{

View File

@ -0,0 +1,150 @@
using CodexPlugin.Hooks;
using OverwatchTranscript;
using Utils;
namespace CodexPlugin.OverwatchSupport
{
public class CodexNodeTranscriptWriter : ICodexNodeHooks
{
private readonly ITranscriptWriter writer;
private readonly NameIdMap nameIdMap;
private readonly string name;
private string peerId = string.Empty;
private readonly List<(DateTime, OverwatchCodexEvent)> pendingEvents = new List<(DateTime, OverwatchCodexEvent)>();
public CodexNodeTranscriptWriter(ITranscriptWriter writer, NameIdMap nameIdMap, string name)
{
this.writer = writer;
this.nameIdMap = nameIdMap;
this.name = name;
}
public void OnNodeStarting(DateTime startUtc, string image)
{
WriteCodexEvent(startUtc, e =>
{
e.NodeStarting = new NodeStartingEvent
{
Image = image
};
});
}
public void OnNodeStarted(string peerId)
{
this.peerId = peerId;
nameIdMap.Add(name, peerId);
WriteCodexEvent(e =>
{
e.NodeStarted = new NodeStartedEvent
{
};
});
}
public void OnNodeStopping()
{
WriteCodexEvent(e =>
{
e.NodeStopping = new NodeStoppingEvent
{
};
});
}
public void OnFileDownloading(ContentId cid)
{
WriteCodexEvent(e =>
{
e.FileDownloading = new FileDownloadingEvent
{
Cid = cid.Id
};
});
}
public void OnFileDownloaded(ByteSize size, ContentId cid)
{
WriteCodexEvent(e =>
{
e.FileDownloaded = new FileDownloadedEvent
{
Cid = cid.Id,
ByteSize = size.SizeInBytes
};
});
}
public void OnFileUploading(string uid, ByteSize size)
{
WriteCodexEvent(e =>
{
e.FileUploading = new FileUploadingEvent
{
UniqueId = uid,
ByteSize = size.SizeInBytes
};
});
}
public void OnFileUploaded(string uid, ByteSize size, ContentId cid)
{
WriteCodexEvent(e =>
{
e.FileUploaded = new FileUploadedEvent
{
UniqueId = uid,
Cid = cid.Id,
ByteSize = size.SizeInBytes
};
});
}
private void WriteCodexEvent(Action<OverwatchCodexEvent> action)
{
WriteCodexEvent(DateTime.UtcNow, action);
}
private void WriteCodexEvent(DateTime utc, Action<OverwatchCodexEvent> action)
{
var e = new OverwatchCodexEvent
{
Name = name,
PeerId = peerId
};
action(e);
if (string.IsNullOrEmpty(peerId))
{
// If we don't know our peerId, don't write the events yet.
AddToCache(utc, e);
}
else
{
e.Write(utc, writer);
// Write any events that we cached when we didn't have our peerId yet.
WriteAndClearCache();
}
}
private void AddToCache(DateTime utc, OverwatchCodexEvent e)
{
pendingEvents.Add((utc, e));
}
private void WriteAndClearCache()
{
if (pendingEvents.Any())
{
foreach (var pair in pendingEvents)
{
pair.Item2.PeerId = peerId;
pair.Item2.Write(pair.Item1, writer);
}
pendingEvents.Clear();
}
}
}
}

View File

@ -86,148 +86,4 @@ namespace CodexPlugin.OverwatchSupport
return log.GetLinesContaining("Run Codex node").Any();
}
}
public class CodexNodeTranscriptWriter : ICodexNodeHooks
{
private readonly ITranscriptWriter writer;
private readonly NameIdMap nameIdMap;
private readonly string name;
private string peerId = string.Empty;
private readonly List<(DateTime, OverwatchCodexEvent)> pendingEvents = new List<(DateTime, OverwatchCodexEvent)>();
public CodexNodeTranscriptWriter(ITranscriptWriter writer, NameIdMap nameIdMap, string name)
{
this.writer = writer;
this.nameIdMap = nameIdMap;
this.name = name;
}
public void OnNodeStarting(DateTime startUtc, string image)
{
WriteCodexEvent(startUtc, e =>
{
e.NodeStarting = new NodeStartingEvent
{
Image = image
};
});
}
public void OnNodeStarted(string peerId)
{
this.peerId = peerId;
nameIdMap.Add(name, peerId);
WriteCodexEvent(e =>
{
e.NodeStarted = new NodeStartedEvent
{
};
});
}
public void OnNodeStopping()
{
WriteCodexEvent(e =>
{
e.NodeStopping = new NodeStoppingEvent
{
};
});
}
public void OnFileDownloading(ContentId cid)
{
WriteCodexEvent(e =>
{
e.FileDownloading = new FileDownloadingEvent
{
Cid = cid.Id
};
});
}
public void OnFileDownloaded(ByteSize size, ContentId cid)
{
WriteCodexEvent(e =>
{
e.FileDownloaded = new FileDownloadedEvent
{
Cid = cid.Id,
ByteSize = size.SizeInBytes
};
});
}
public void OnFileUploading(string uid, ByteSize size)
{
WriteCodexEvent(e =>
{
e.FileUploading = new FileUploadingEvent
{
UniqueId = uid,
ByteSize = size.SizeInBytes
};
});
}
public void OnFileUploaded(string uid, ByteSize size, ContentId cid)
{
WriteCodexEvent(e =>
{
e.FileUploaded = new FileUploadedEvent
{
UniqueId = uid,
Cid = cid.Id,
ByteSize = size.SizeInBytes
};
});
}
private void WriteCodexEvent(Action<OverwatchCodexEvent> action)
{
WriteCodexEvent(DateTime.UtcNow, action);
}
private void WriteCodexEvent(DateTime utc, Action<OverwatchCodexEvent> action)
{
var e = new OverwatchCodexEvent
{
Name = name,
PeerId = peerId
};
action(e);
if (string.IsNullOrEmpty(peerId))
{
// If we don't know our peerId, don't write the events yet.
AddToCache(utc, e);
}
else
{
e.Write(utc, writer);
// Write any events that we cached when we didn't have our peerId yet.
WriteAndClearCache();
}
}
private void AddToCache(DateTime utc, OverwatchCodexEvent e)
{
pendingEvents.Add((utc, e));
}
private void WriteAndClearCache()
{
if (pendingEvents.Any())
{
foreach (var pair in pendingEvents)
{
pair.Item2.PeerId = peerId;
pair.Item2.Write(pair.Item1, writer);
}
pendingEvents.Clear();
}
}
}
}