diff --git a/Framework/OverwatchTranscript/EventBucketReader.cs b/Framework/OverwatchTranscript/EventBucketReader.cs index 3fec41d8..ebc915cc 100644 --- a/Framework/OverwatchTranscript/EventBucketReader.cs +++ b/Framework/OverwatchTranscript/EventBucketReader.cs @@ -7,6 +7,7 @@ namespace OverwatchTranscript public interface IFinalizedBucket { bool IsEmpty { get; } + void Update(); DateTime? SeeTopUtc(); BucketTop? TakeTop(); } @@ -28,7 +29,8 @@ namespace OverwatchTranscript private readonly string bucketFile; private readonly ConcurrentQueue topQueue = new ConcurrentQueue(); private readonly AutoResetEvent itemDequeued = new AutoResetEvent(false); - private bool stopping; + private readonly AutoResetEvent itemEnqueued = new AutoResetEvent(false); + private bool sourceIsEmpty; public EventBucketReader(ILog log, string bucketFile) { @@ -42,34 +44,38 @@ namespace OverwatchTranscript public bool IsEmpty { get; private set; } + public void Update() + { + if (IsEmpty) return; + while (topQueue.Count == 0) + { + UpdateIsEmpty(); + if (IsEmpty) return; + + itemDequeued.Set(); + itemEnqueued.WaitOne(200); + } + } + public DateTime? SeeTopUtc() { if (IsEmpty) return null; - while (true) + if (topQueue.TryPeek(out BucketTop? top)) { - UpdateIsEmpty(); - if (IsEmpty) return null; - if (topQueue.TryPeek(out BucketTop? top)) - { - return top.Utc; - } + return top.Utc; } + return null; } public BucketTop? TakeTop() { if (IsEmpty) return null; - - while (true) + if (topQueue.TryDequeue(out BucketTop? top)) { - UpdateIsEmpty(); - if (IsEmpty) return null; - if (topQueue.TryDequeue(out BucketTop? top)) - { - itemDequeued.Set(); - return top; - } + itemDequeued.Set(); + return top; } + return null; } private void ReadBucket() @@ -85,23 +91,25 @@ namespace OverwatchTranscript if (top != null) { topQueue.Enqueue(top); + itemEnqueued.Set(); } else { - stopping = true; + sourceIsEmpty = true; + UpdateIsEmpty(); return; } } itemDequeued.Reset(); - itemDequeued.WaitOne(); + itemDequeued.WaitOne(5000); } } private void UpdateIsEmpty() { - var empty = stopping && topQueue.IsEmpty; - if (!IsEmpty && empty) + var allEmpty = sourceIsEmpty && topQueue.IsEmpty; + if (!IsEmpty && allEmpty) { File.Delete(bucketFile); IsEmpty = true; diff --git a/Framework/OverwatchTranscript/MomentReferenceBuilder.cs b/Framework/OverwatchTranscript/MomentReferenceBuilder.cs index 616ebab6..e3093e18 100644 --- a/Framework/OverwatchTranscript/MomentReferenceBuilder.cs +++ b/Framework/OverwatchTranscript/MomentReferenceBuilder.cs @@ -24,6 +24,8 @@ namespace OverwatchTranscript log.Debug($"Building references for {buckets.Count} buckets."); while (buckets.Any()) { + foreach (var b in buckets) b.Update(); + buckets.RemoveAll(b => b.IsEmpty); if (!buckets.Any()) break; diff --git a/Tests/CodexTests/DownloadConnectivityTests/SwarmTests.cs b/Tests/CodexTests/DownloadConnectivityTests/SwarmTests.cs index e531d7f9..ee4ad46f 100644 --- a/Tests/CodexTests/DownloadConnectivityTests/SwarmTests.cs +++ b/Tests/CodexTests/DownloadConnectivityTests/SwarmTests.cs @@ -7,11 +7,15 @@ namespace CodexTests.DownloadConnectivityTests public class SwarmTests : AutoBootstrapDistTest { [Test] + [Combinatorial] [CreateTranscript("swarm_retransmit")] - public void DetectBlockRetransmits() + public void DetectBlockRetransmits( + [Values(1, 5, 10, 20)] int fileSize, + [Values(3, 5, 10, 20)] int numNodes + ) { - var nodes = StartCodex(10); - var file = GenerateTestFile(10.MB()); + var nodes = StartCodex(numNodes); + var file = GenerateTestFile(fileSize.MB()); var cid = nodes[0].UploadFile(file); var tasks = nodes.Select(n => Task.Run(() => n.DownloadContent(cid))).ToArray(); diff --git a/Tests/FrameworkTests/OverwatchTranscriptTests/TranscriptTests.cs b/Tests/FrameworkTests/OverwatchTranscriptTests/TranscriptTests.cs index c5b81bae..255ea178 100644 --- a/Tests/FrameworkTests/OverwatchTranscriptTests/TranscriptTests.cs +++ b/Tests/FrameworkTests/OverwatchTranscriptTests/TranscriptTests.cs @@ -117,10 +117,10 @@ namespace FrameworkTests.OverwatchTranscriptTests var e = events.SingleOrDefault(e => e.Moment.Utc == utc && e.Payload.EventData == data); if (e == null) Assert.Fail("Event not found"); - Assert.That(e.Moment.Utc, Is.EqualTo(utc)); - Assert.That(e.Moment.Duration, Is.EqualTo(duration)); - Assert.That(e.Moment.Index, Is.EqualTo(index)); - Assert.That(e.Payload.EventData, Is.EqualTo(data)); + Assert.That(e!.Moment.Utc, Is.EqualTo(utc)); + Assert.That(e!.Moment.Duration, Is.EqualTo(duration)); + Assert.That(e!.Moment.Index, Is.EqualTo(index)); + Assert.That(e!.Payload.EventData, Is.EqualTo(data)); } private void AssertFileContent() diff --git a/Tools/CsvCombiner/CsvCombiner.csproj b/Tools/CsvCombiner/CsvCombiner.csproj new file mode 100644 index 00000000..b88f5505 --- /dev/null +++ b/Tools/CsvCombiner/CsvCombiner.csproj @@ -0,0 +1,14 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + diff --git a/Tools/CsvCombiner/Program.cs b/Tools/CsvCombiner/Program.cs new file mode 100644 index 00000000..6e71cd07 --- /dev/null +++ b/Tools/CsvCombiner/Program.cs @@ -0,0 +1,71 @@ +using Logging; + +public class Program +{ + public static void Main(string[] args) + { + args = ["d:\\CodexTestLogs\\BlockExchange\\experiment2-fetchbatched"]; + var p = new Program(args[0]); + p.Run(); + } + + private static readonly ILog log = new ConsoleLog(); + private string path; + + private readonly Dictionary> combine = new Dictionary>(); + + public Program(string path) + { + this.path = path; + } + + private void Run() + { + Log("Starting in " + path); + + var files = Directory.GetFiles(path) + .Where(f => f.ToLowerInvariant().EndsWith(".csv")).ToArray(); + + foreach (var file in files) + { + AddToMap(file); + } + + var i = 0; + foreach (var pair in combine) + { + var list = pair.Value; + list.Insert(0, pair.Key); + + File.WriteAllLines(Path.Combine(path, "combine_" + i + ".csv"), list.ToArray()); + i++; + } + + Log("done"); + } + + private void AddToMap(string file) + { + var lines = File.ReadAllLines(file); + if (lines.Length > 1) + { + var header = lines[0]; + var list = GetList(header); + list.AddRange(lines.Skip(1)); + } + } + + private List GetList(string header) + { + if (!combine.ContainsKey(header)) + { + combine.Add(header, new List()); + } + return combine[header]; + } + + private void Log(string msg) + { + log.Log(msg); + } +} \ No newline at end of file diff --git a/Tools/TranscriptAnalysis/CsvWriter.cs b/Tools/TranscriptAnalysis/CsvWriter.cs new file mode 100644 index 00000000..2925f0f3 --- /dev/null +++ b/Tools/TranscriptAnalysis/CsvWriter.cs @@ -0,0 +1,136 @@ +using Logging; + +namespace TranscriptAnalysis +{ + public class CsvWriter + { + private readonly ILog log; + + public CsvWriter(ILog log) + { + this.log = log; + } + + public ICsv CreateNew() + { + return new Csv(); + } + + public void Write(ICsv csv, string filename) + { + var c = (Csv)csv; + + using var file = File.OpenWrite(filename); + using var writer = new StreamWriter(file); + c.CreateLines(writer.WriteLine); + + log.Log($"CSV written to: '{filename}'"); + } + } + + public interface ICsv + { + ICsvColumn GetColumn(string title, float defaultValue); + ICsvColumn GetColumn(string title, string defaultValue); + void AddRow(params CsvCell[] cells); + } + + public class Csv : ICsv + { + private readonly string Sep = ","; + private readonly List columns = new List(); + private readonly List rows = new List(); + + public ICsvColumn GetColumn(string title, float defaultValue) + { + return GetColumn(title, defaultValue.ToString()); + } + + public ICsvColumn GetColumn(string title, string defaultValue) + { + var column = columns.SingleOrDefault(c => c.Title == title); + if (column == null) + { + column = new CsvColumn(title, defaultValue); + columns.Add(column); + } + return column; + } + + public void AddRow(params CsvCell[] cells) + { + rows.Add(new CsvRow(cells)); + } + + public void CreateLines(Action onLine) + { + CreateHeaderLine(onLine); + foreach (var row in rows) + { + CreateRowLine(row, onLine); + } + } + + private void CreateHeaderLine(Action onLine) + { + onLine(string.Join(Sep, columns.Select(c => c.Title).ToArray())); + } + + private void CreateRowLine(CsvRow row, Action onLine) + { + onLine(string.Join(Sep, columns.Select(c => GetRowCellValue(row, c)).ToArray())); + } + + private string GetRowCellValue(CsvRow row, CsvColumn column) + { + var cell = row.Cells.SingleOrDefault(c => c.Column == column); + if (cell == null) return column.DefaultValue; + return cell.Value; + } + } + + public class CsvCell + { + public CsvCell(ICsvColumn column, float value) + : this(column, value.ToString()) + { + } + + public CsvCell(ICsvColumn column, string value) + { + Column = column; + Value = value; + } + + public ICsvColumn Column { get; } + public string Value { get; } + } + + public interface ICsvColumn + { + string Title { get; } + string DefaultValue { get; } + } + + public class CsvColumn : ICsvColumn + { + public CsvColumn(string title, string defaultValue) + { + Title = title; + DefaultValue = defaultValue; + } + + public string Title { get; } + public string DefaultValue { get; } + } + + public class CsvRow + { + public CsvRow(CsvCell[] cells) + { + Cells = cells; + } + + public CsvCell[] Cells { get; } + } +} diff --git a/Tools/TranscriptAnalysis/Program.cs b/Tools/TranscriptAnalysis/Program.cs index 289d70bc..47868c41 100644 --- a/Tools/TranscriptAnalysis/Program.cs +++ b/Tools/TranscriptAnalysis/Program.cs @@ -31,7 +31,7 @@ public static class Program }; var header = reader.GetHeader("cdx_h"); - var receivers = new ReceiverSet(log, reader, header); + var receivers = new ReceiverSet(args[0], log, reader, header); receivers.InitAll(); var processor = new Processor(log, reader); diff --git a/Tools/TranscriptAnalysis/ReceiverSet.cs b/Tools/TranscriptAnalysis/ReceiverSet.cs index bdc6c3f6..35c2a700 100644 --- a/Tools/TranscriptAnalysis/ReceiverSet.cs +++ b/Tools/TranscriptAnalysis/ReceiverSet.cs @@ -7,7 +7,7 @@ namespace TranscriptAnalysis { public interface IEventReceiver { - void Init(ILog log, OverwatchCodexHeader header); + void Init(string sourceFilename, ILog log, OverwatchCodexHeader header); void Finish(); } @@ -18,13 +18,15 @@ namespace TranscriptAnalysis public class ReceiverSet { + private readonly string sourceFilename; private readonly ILog log; private readonly ITranscriptReader reader; private readonly OverwatchCodexHeader header; private readonly List receivers = new List(); - public ReceiverSet(ILog log, ITranscriptReader reader, OverwatchCodexHeader header) + public ReceiverSet(string sourceFilename, ILog log, ITranscriptReader reader, OverwatchCodexHeader header) { + this.sourceFilename = sourceFilename; this.log = log; this.reader = reader; this.header = header; @@ -53,7 +55,7 @@ namespace TranscriptAnalysis mux.Add(receiver); receivers.Add(receiver); - receiver.Init(log, header); + receiver.Init(sourceFilename, log, header); } // We use a mux here because, for each time we call reader.AddEventHandler, diff --git a/Tools/TranscriptAnalysis/Receivers/BaseReceiver.cs b/Tools/TranscriptAnalysis/Receivers/BaseReceiver.cs index 17e02d7d..c8ed20db 100644 --- a/Tools/TranscriptAnalysis/Receivers/BaseReceiver.cs +++ b/Tools/TranscriptAnalysis/Receivers/BaseReceiver.cs @@ -8,29 +8,38 @@ namespace TranscriptAnalysis.Receivers { protected ILog log { get; private set; } = new NullLog(); protected OverwatchCodexHeader Header { get; private set; } = null!; + protected CsvWriter CsvWriter { get; private set; } + protected string SourceFilename { get; private set; } = string.Empty; public abstract string Name { get; } public abstract void Receive(ActivateEvent @event); public abstract void Finish(); - public void Init(ILog log, OverwatchCodexHeader header) + protected BaseReceiver() + { + CsvWriter = new CsvWriter(log); + } + + public void Init(string sourceFilename, ILog log, OverwatchCodexHeader header) { this.log = new LogPrefixer(log, $"({Name}) "); Header = header; + SourceFilename = sourceFilename; } - protected string GetPeerId(int nodeIndex) + protected string? GetPeerId(int nodeIndex) { - return GetIdentity(nodeIndex).PeerId; + return GetIdentity(nodeIndex)?.PeerId; } - protected string GetName(int nodeIndex) + protected string? GetName(int nodeIndex) { - return GetIdentity(nodeIndex).Name; + return GetIdentity(nodeIndex)?.Name; } - protected CodexNodeIdentity GetIdentity(int nodeIndex) + protected CodexNodeIdentity? GetIdentity(int nodeIndex) { + if (nodeIndex < 0) return null; return Header.Nodes[nodeIndex]; } diff --git a/Tools/TranscriptAnalysis/Receivers/DuplicateBlocksReceived.cs b/Tools/TranscriptAnalysis/Receivers/DuplicateBlocksReceived.cs index 6eeb38ef..cbdd71cc 100644 --- a/Tools/TranscriptAnalysis/Receivers/DuplicateBlocksReceived.cs +++ b/Tools/TranscriptAnalysis/Receivers/DuplicateBlocksReceived.cs @@ -5,6 +5,9 @@ namespace TranscriptAnalysis.Receivers { public class DuplicateBlocksReceived : BaseReceiver { + public static List Counts = new List(); + private long uploadSize; + public override string Name => "BlocksReceived"; public override void Receive(ActivateEvent @event) @@ -13,11 +16,17 @@ namespace TranscriptAnalysis.Receivers { Handle(@event.Payload, @event.Payload.BlockReceived); } + if (@event.Payload.FileUploaded != null) + { + var uploadEvent = @event.Payload.FileUploaded; + uploadSize = uploadEvent.ByteSize; + } } public override void Finish() { Log("Number of BlockReceived events seen: " + seen); + var csv = CsvWriter.CreateNew(); var totalReceived = peerIdBlockAddrCount.Sum(a => a.Value.Sum(p => p.Value)); var maxRepeats = peerIdBlockAddrCount.Max(a => a.Value.Max(p => p.Value)); @@ -31,13 +40,26 @@ namespace TranscriptAnalysis.Receivers } } + if (Counts.Any()) throw new Exception("Should be empty"); + float t = totalReceived; + csv.GetColumn("numNodes", Header.Nodes.Length); + csv.GetColumn("filesize", uploadSize.ToString()); + var receiveCountColumn = csv.GetColumn("receiveCount", 0.0f); + var occuranceColumn = csv.GetColumn("occurance", 0.0f); occurances.PrintContinous((i, count) => { float n = count; float p = 100.0f * (n / t); Log($"Block received {i} times = {count}x ({p}%)"); + Counts.Add(count); + csv.AddRow( + new CsvCell(receiveCountColumn, i), + new CsvCell(occuranceColumn, count) + ); }); + + CsvWriter.Write(csv, SourceFilename + "_blockduplicates.csv"); } private int seen = 0; @@ -46,6 +68,7 @@ namespace TranscriptAnalysis.Receivers private void Handle(OverwatchCodexEvent payload, BlockReceivedEvent blockReceived) { var receiverPeerId = GetPeerId(payload.NodeIdentity); + if (receiverPeerId == null) return; var blockAddress = blockReceived.BlockAddress; seen++; diff --git a/Tools/TranscriptAnalysis/Receivers/LogReplaceReceiver.cs b/Tools/TranscriptAnalysis/Receivers/LogReplaceReceiver.cs index caddc606..d4963eef 100644 --- a/Tools/TranscriptAnalysis/Receivers/LogReplaceReceiver.cs +++ b/Tools/TranscriptAnalysis/Receivers/LogReplaceReceiver.cs @@ -14,6 +14,8 @@ namespace TranscriptAnalysis.Receivers { var peerId = GetPeerId(@event.Payload.NodeIdentity); var name = GetName(@event.Payload.NodeIdentity); + if (peerId == null) return; + if (name == null) return; if (!seen.Contains(peerId)) { diff --git a/Tools/TranscriptAnalysis/Receivers/NodesDegree.cs b/Tools/TranscriptAnalysis/Receivers/NodesDegree.cs index 1677fdf1..d3ab71a0 100644 --- a/Tools/TranscriptAnalysis/Receivers/NodesDegree.cs +++ b/Tools/TranscriptAnalysis/Receivers/NodesDegree.cs @@ -46,6 +46,7 @@ namespace TranscriptAnalysis.Receivers private readonly Dictionary dialingNodes = new Dictionary(); private readonly Dictionary dials = new Dictionary(); + private long uploadSize; public override string Name => "NodesDegree"; @@ -54,12 +55,20 @@ namespace TranscriptAnalysis.Receivers if (@event.Payload.DialSuccessful != null) { var peerId = GetPeerId(@event.Payload.NodeIdentity); + if (peerId == null) return; AddDial(peerId, @event.Payload.DialSuccessful.TargetPeerId); } + if (@event.Payload.FileUploaded != null) + { + var uploadEvent = @event.Payload.FileUploaded; + uploadSize = uploadEvent.ByteSize; + } } public override void Finish() { + var csv = CsvWriter.CreateNew(); + var numNodes = dialingNodes.Count; var redialOccurances = new OccuranceMap(); foreach (var dial in dials.Values) @@ -80,12 +89,22 @@ namespace TranscriptAnalysis.Receivers }); float tot = numNodes; + csv.GetColumn("numNodes", Header.Nodes.Length); + csv.GetColumn("filesize", uploadSize.ToString()); + var degreeColumn = csv.GetColumn("degree", 0.0f); + var occuranceColumn = csv.GetColumn("occurance", 0.0f); degreeOccurances.Print((i, count) => { float n = count; float p = 100.0f * (n / tot); Log($"Degree: {i} = {count}x ({p}%)"); + csv.AddRow( + new CsvCell(degreeColumn, i), + new CsvCell(occuranceColumn, n) + ); }); + + CsvWriter.Write(csv, SourceFilename + "_nodeDegrees.csv"); } private void AddDial(string peerId, string targetPeerId) diff --git a/cs-codex-dist-testing.sln b/cs-codex-dist-testing.sln index 6041a7c1..67f63100 100644 --- a/cs-codex-dist-testing.sln +++ b/cs-codex-dist-testing.sln @@ -76,6 +76,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TranscriptAnalysis", "Tools EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MarketInsights", "Tools\MarketInsights\MarketInsights.csproj", "{004614DF-1C65-45E3-882D-59AE44282573}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CsvCombiner", "Tools\CsvCombiner\CsvCombiner.csproj", "{6230347F-5045-4E25-8E7A-13D7221B7444}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -202,6 +204,10 @@ Global {004614DF-1C65-45E3-882D-59AE44282573}.Debug|Any CPU.Build.0 = Debug|Any CPU {004614DF-1C65-45E3-882D-59AE44282573}.Release|Any CPU.ActiveCfg = Release|Any CPU {004614DF-1C65-45E3-882D-59AE44282573}.Release|Any CPU.Build.0 = Release|Any CPU + {6230347F-5045-4E25-8E7A-13D7221B7444}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6230347F-5045-4E25-8E7A-13D7221B7444}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6230347F-5045-4E25-8E7A-13D7221B7444}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6230347F-5045-4E25-8E7A-13D7221B7444}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -237,6 +243,7 @@ Global {870DDFBE-D7ED-4196-9681-13CA947BDEA6} = {81AE04BC-CBFA-4E6F-B039-8208E9AFAAE7} {C0EEBD32-23CB-45EC-A863-79FB948508C8} = {7591C5B3-D86E-4AE4-8ED2-B272D17FE7E3} {004614DF-1C65-45E3-882D-59AE44282573} = {7591C5B3-D86E-4AE4-8ED2-B272D17FE7E3} + {6230347F-5045-4E25-8E7A-13D7221B7444} = {7591C5B3-D86E-4AE4-8ED2-B272D17FE7E3} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {237BF0AA-9EC4-4659-AD9A-65DEB974250C}