Merge branch 'feature/block-retransmit'

This commit is contained in:
Ben 2024-10-22 12:54:33 +02:00
commit ddbe5b111a
No known key found for this signature in database
GPG Key ID: 0F16E812E736C24B
14 changed files with 335 additions and 38 deletions

View File

@ -7,6 +7,7 @@ namespace OverwatchTranscript
public interface IFinalizedBucket
{
bool IsEmpty { get; }
void Update();
DateTime? SeeTopUtc();
BucketTop? TakeTop();
}
@ -28,7 +29,8 @@ namespace OverwatchTranscript
private readonly string bucketFile;
private readonly ConcurrentQueue<BucketTop> topQueue = new ConcurrentQueue<BucketTop>();
private readonly AutoResetEvent itemDequeued = new AutoResetEvent(false);
private bool stopping;
private readonly AutoResetEvent itemEnqueued = new AutoResetEvent(false);
private bool sourceIsEmpty;
public EventBucketReader(ILog log, string bucketFile)
{
@ -42,34 +44,38 @@ namespace OverwatchTranscript
public bool IsEmpty { get; private set; }
public DateTime? SeeTopUtc()
public void Update()
{
if (IsEmpty) return null;
while (true)
if (IsEmpty) return;
while (topQueue.Count == 0)
{
UpdateIsEmpty();
if (IsEmpty) return;
itemDequeued.Set();
itemEnqueued.WaitOne(200);
}
}
public DateTime? SeeTopUtc()
{
if (IsEmpty) return null;
if (topQueue.TryPeek(out BucketTop? top))
{
return top.Utc;
}
}
return null;
}
public BucketTop? TakeTop()
{
if (IsEmpty) return null;
while (true)
{
UpdateIsEmpty();
if (IsEmpty) return null;
if (topQueue.TryDequeue(out BucketTop? top))
{
itemDequeued.Set();
return top;
}
}
return null;
}
private void ReadBucket()
@ -85,23 +91,25 @@ namespace OverwatchTranscript
if (top != null)
{
topQueue.Enqueue(top);
itemEnqueued.Set();
}
else
{
stopping = true;
sourceIsEmpty = true;
UpdateIsEmpty();
return;
}
}
itemDequeued.Reset();
itemDequeued.WaitOne();
itemDequeued.WaitOne(5000);
}
}
private void UpdateIsEmpty()
{
var empty = stopping && topQueue.IsEmpty;
if (!IsEmpty && empty)
var allEmpty = sourceIsEmpty && topQueue.IsEmpty;
if (!IsEmpty && allEmpty)
{
File.Delete(bucketFile);
IsEmpty = true;

View File

@ -24,6 +24,8 @@ namespace OverwatchTranscript
log.Debug($"Building references for {buckets.Count} buckets.");
while (buckets.Any())
{
foreach (var b in buckets) b.Update();
buckets.RemoveAll(b => b.IsEmpty);
if (!buckets.Any()) break;

View File

@ -7,11 +7,15 @@ namespace CodexTests.DownloadConnectivityTests
public class SwarmTests : AutoBootstrapDistTest
{
[Test]
[Combinatorial]
[CreateTranscript("swarm_retransmit")]
public void DetectBlockRetransmits()
public void DetectBlockRetransmits(
[Values(1, 5, 10, 20)] int fileSize,
[Values(3, 5, 10, 20)] int numNodes
)
{
var nodes = StartCodex(10);
var file = GenerateTestFile(10.MB());
var nodes = StartCodex(numNodes);
var file = GenerateTestFile(fileSize.MB());
var cid = nodes[0].UploadFile(file);
var tasks = nodes.Select(n => Task.Run(() => n.DownloadContent(cid))).ToArray();

View File

@ -117,10 +117,10 @@ namespace FrameworkTests.OverwatchTranscriptTests
var e = events.SingleOrDefault(e => e.Moment.Utc == utc && e.Payload.EventData == data);
if (e == null) Assert.Fail("Event not found");
Assert.That(e.Moment.Utc, Is.EqualTo(utc));
Assert.That(e.Moment.Duration, Is.EqualTo(duration));
Assert.That(e.Moment.Index, Is.EqualTo(index));
Assert.That(e.Payload.EventData, Is.EqualTo(data));
Assert.That(e!.Moment.Utc, Is.EqualTo(utc));
Assert.That(e!.Moment.Duration, Is.EqualTo(duration));
Assert.That(e!.Moment.Index, Is.EqualTo(index));
Assert.That(e!.Payload.EventData, Is.EqualTo(data));
}
private void AssertFileContent()

View File

@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\Framework\Logging\Logging.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,71 @@
using Logging;
public class Program
{
public static void Main(string[] args)
{
args = ["d:\\CodexTestLogs\\BlockExchange\\experiment2-fetchbatched"];
var p = new Program(args[0]);
p.Run();
}
private static readonly ILog log = new ConsoleLog();
private string path;
private readonly Dictionary<string, List<string>> combine = new Dictionary<string, List<string>>();
public Program(string path)
{
this.path = path;
}
private void Run()
{
Log("Starting in " + path);
var files = Directory.GetFiles(path)
.Where(f => f.ToLowerInvariant().EndsWith(".csv")).ToArray();
foreach (var file in files)
{
AddToMap(file);
}
var i = 0;
foreach (var pair in combine)
{
var list = pair.Value;
list.Insert(0, pair.Key);
File.WriteAllLines(Path.Combine(path, "combine_" + i + ".csv"), list.ToArray());
i++;
}
Log("done");
}
private void AddToMap(string file)
{
var lines = File.ReadAllLines(file);
if (lines.Length > 1)
{
var header = lines[0];
var list = GetList(header);
list.AddRange(lines.Skip(1));
}
}
private List<string> GetList(string header)
{
if (!combine.ContainsKey(header))
{
combine.Add(header, new List<string>());
}
return combine[header];
}
private void Log(string msg)
{
log.Log(msg);
}
}

View File

@ -0,0 +1,136 @@
using Logging;
namespace TranscriptAnalysis
{
public class CsvWriter
{
private readonly ILog log;
public CsvWriter(ILog log)
{
this.log = log;
}
public ICsv CreateNew()
{
return new Csv();
}
public void Write(ICsv csv, string filename)
{
var c = (Csv)csv;
using var file = File.OpenWrite(filename);
using var writer = new StreamWriter(file);
c.CreateLines(writer.WriteLine);
log.Log($"CSV written to: '{filename}'");
}
}
public interface ICsv
{
ICsvColumn GetColumn(string title, float defaultValue);
ICsvColumn GetColumn(string title, string defaultValue);
void AddRow(params CsvCell[] cells);
}
public class Csv : ICsv
{
private readonly string Sep = ",";
private readonly List<CsvColumn> columns = new List<CsvColumn>();
private readonly List<CsvRow> rows = new List<CsvRow>();
public ICsvColumn GetColumn(string title, float defaultValue)
{
return GetColumn(title, defaultValue.ToString());
}
public ICsvColumn GetColumn(string title, string defaultValue)
{
var column = columns.SingleOrDefault(c => c.Title == title);
if (column == null)
{
column = new CsvColumn(title, defaultValue);
columns.Add(column);
}
return column;
}
public void AddRow(params CsvCell[] cells)
{
rows.Add(new CsvRow(cells));
}
public void CreateLines(Action<string> onLine)
{
CreateHeaderLine(onLine);
foreach (var row in rows)
{
CreateRowLine(row, onLine);
}
}
private void CreateHeaderLine(Action<string> onLine)
{
onLine(string.Join(Sep, columns.Select(c => c.Title).ToArray()));
}
private void CreateRowLine(CsvRow row, Action<string> onLine)
{
onLine(string.Join(Sep, columns.Select(c => GetRowCellValue(row, c)).ToArray()));
}
private string GetRowCellValue(CsvRow row, CsvColumn column)
{
var cell = row.Cells.SingleOrDefault(c => c.Column == column);
if (cell == null) return column.DefaultValue;
return cell.Value;
}
}
public class CsvCell
{
public CsvCell(ICsvColumn column, float value)
: this(column, value.ToString())
{
}
public CsvCell(ICsvColumn column, string value)
{
Column = column;
Value = value;
}
public ICsvColumn Column { get; }
public string Value { get; }
}
public interface ICsvColumn
{
string Title { get; }
string DefaultValue { get; }
}
public class CsvColumn : ICsvColumn
{
public CsvColumn(string title, string defaultValue)
{
Title = title;
DefaultValue = defaultValue;
}
public string Title { get; }
public string DefaultValue { get; }
}
public class CsvRow
{
public CsvRow(CsvCell[] cells)
{
Cells = cells;
}
public CsvCell[] Cells { get; }
}
}

View File

@ -31,7 +31,7 @@ public static class Program
};
var header = reader.GetHeader<OverwatchCodexHeader>("cdx_h");
var receivers = new ReceiverSet(log, reader, header);
var receivers = new ReceiverSet(args[0], log, reader, header);
receivers.InitAll();
var processor = new Processor(log, reader);

View File

@ -7,7 +7,7 @@ namespace TranscriptAnalysis
{
public interface IEventReceiver
{
void Init(ILog log, OverwatchCodexHeader header);
void Init(string sourceFilename, ILog log, OverwatchCodexHeader header);
void Finish();
}
@ -18,13 +18,15 @@ namespace TranscriptAnalysis
public class ReceiverSet
{
private readonly string sourceFilename;
private readonly ILog log;
private readonly ITranscriptReader reader;
private readonly OverwatchCodexHeader header;
private readonly List<IEventReceiver> receivers = new List<IEventReceiver>();
public ReceiverSet(ILog log, ITranscriptReader reader, OverwatchCodexHeader header)
public ReceiverSet(string sourceFilename, ILog log, ITranscriptReader reader, OverwatchCodexHeader header)
{
this.sourceFilename = sourceFilename;
this.log = log;
this.reader = reader;
this.header = header;
@ -53,7 +55,7 @@ namespace TranscriptAnalysis
mux.Add(receiver);
receivers.Add(receiver);
receiver.Init(log, header);
receiver.Init(sourceFilename, log, header);
}
// We use a mux here because, for each time we call reader.AddEventHandler,

View File

@ -8,29 +8,38 @@ namespace TranscriptAnalysis.Receivers
{
protected ILog log { get; private set; } = new NullLog();
protected OverwatchCodexHeader Header { get; private set; } = null!;
protected CsvWriter CsvWriter { get; private set; }
protected string SourceFilename { get; private set; } = string.Empty;
public abstract string Name { get; }
public abstract void Receive(ActivateEvent<T> @event);
public abstract void Finish();
public void Init(ILog log, OverwatchCodexHeader header)
protected BaseReceiver()
{
CsvWriter = new CsvWriter(log);
}
public void Init(string sourceFilename, ILog log, OverwatchCodexHeader header)
{
this.log = new LogPrefixer(log, $"({Name}) ");
Header = header;
SourceFilename = sourceFilename;
}
protected string GetPeerId(int nodeIndex)
protected string? GetPeerId(int nodeIndex)
{
return GetIdentity(nodeIndex).PeerId;
return GetIdentity(nodeIndex)?.PeerId;
}
protected string GetName(int nodeIndex)
protected string? GetName(int nodeIndex)
{
return GetIdentity(nodeIndex).Name;
return GetIdentity(nodeIndex)?.Name;
}
protected CodexNodeIdentity GetIdentity(int nodeIndex)
protected CodexNodeIdentity? GetIdentity(int nodeIndex)
{
if (nodeIndex < 0) return null;
return Header.Nodes[nodeIndex];
}

View File

@ -5,6 +5,9 @@ namespace TranscriptAnalysis.Receivers
{
public class DuplicateBlocksReceived : BaseReceiver<OverwatchCodexEvent>
{
public static List<int> Counts = new List<int>();
private long uploadSize;
public override string Name => "BlocksReceived";
public override void Receive(ActivateEvent<OverwatchCodexEvent> @event)
@ -13,11 +16,17 @@ namespace TranscriptAnalysis.Receivers
{
Handle(@event.Payload, @event.Payload.BlockReceived);
}
if (@event.Payload.FileUploaded != null)
{
var uploadEvent = @event.Payload.FileUploaded;
uploadSize = uploadEvent.ByteSize;
}
}
public override void Finish()
{
Log("Number of BlockReceived events seen: " + seen);
var csv = CsvWriter.CreateNew();
var totalReceived = peerIdBlockAddrCount.Sum(a => a.Value.Sum(p => p.Value));
var maxRepeats = peerIdBlockAddrCount.Max(a => a.Value.Max(p => p.Value));
@ -31,13 +40,26 @@ namespace TranscriptAnalysis.Receivers
}
}
if (Counts.Any()) throw new Exception("Should be empty");
float t = totalReceived;
csv.GetColumn("numNodes", Header.Nodes.Length);
csv.GetColumn("filesize", uploadSize.ToString());
var receiveCountColumn = csv.GetColumn("receiveCount", 0.0f);
var occuranceColumn = csv.GetColumn("occurance", 0.0f);
occurances.PrintContinous((i, count) =>
{
float n = count;
float p = 100.0f * (n / t);
Log($"Block received {i} times = {count}x ({p}%)");
Counts.Add(count);
csv.AddRow(
new CsvCell(receiveCountColumn, i),
new CsvCell(occuranceColumn, count)
);
});
CsvWriter.Write(csv, SourceFilename + "_blockduplicates.csv");
}
private int seen = 0;
@ -46,6 +68,7 @@ namespace TranscriptAnalysis.Receivers
private void Handle(OverwatchCodexEvent payload, BlockReceivedEvent blockReceived)
{
var receiverPeerId = GetPeerId(payload.NodeIdentity);
if (receiverPeerId == null) return;
var blockAddress = blockReceived.BlockAddress;
seen++;

View File

@ -14,6 +14,8 @@ namespace TranscriptAnalysis.Receivers
{
var peerId = GetPeerId(@event.Payload.NodeIdentity);
var name = GetName(@event.Payload.NodeIdentity);
if (peerId == null) return;
if (name == null) return;
if (!seen.Contains(peerId))
{

View File

@ -46,6 +46,7 @@ namespace TranscriptAnalysis.Receivers
private readonly Dictionary<string, Node> dialingNodes = new Dictionary<string, Node>();
private readonly Dictionary<string, Dial> dials = new Dictionary<string, Dial>();
private long uploadSize;
public override string Name => "NodesDegree";
@ -54,12 +55,20 @@ namespace TranscriptAnalysis.Receivers
if (@event.Payload.DialSuccessful != null)
{
var peerId = GetPeerId(@event.Payload.NodeIdentity);
if (peerId == null) return;
AddDial(peerId, @event.Payload.DialSuccessful.TargetPeerId);
}
if (@event.Payload.FileUploaded != null)
{
var uploadEvent = @event.Payload.FileUploaded;
uploadSize = uploadEvent.ByteSize;
}
}
public override void Finish()
{
var csv = CsvWriter.CreateNew();
var numNodes = dialingNodes.Count;
var redialOccurances = new OccuranceMap();
foreach (var dial in dials.Values)
@ -80,12 +89,22 @@ namespace TranscriptAnalysis.Receivers
});
float tot = numNodes;
csv.GetColumn("numNodes", Header.Nodes.Length);
csv.GetColumn("filesize", uploadSize.ToString());
var degreeColumn = csv.GetColumn("degree", 0.0f);
var occuranceColumn = csv.GetColumn("occurance", 0.0f);
degreeOccurances.Print((i, count) =>
{
float n = count;
float p = 100.0f * (n / tot);
Log($"Degree: {i} = {count}x ({p}%)");
csv.AddRow(
new CsvCell(degreeColumn, i),
new CsvCell(occuranceColumn, n)
);
});
CsvWriter.Write(csv, SourceFilename + "_nodeDegrees.csv");
}
private void AddDial(string peerId, string targetPeerId)

View File

@ -76,6 +76,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TranscriptAnalysis", "Tools
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MarketInsights", "Tools\MarketInsights\MarketInsights.csproj", "{004614DF-1C65-45E3-882D-59AE44282573}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CsvCombiner", "Tools\CsvCombiner\CsvCombiner.csproj", "{6230347F-5045-4E25-8E7A-13D7221B7444}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -202,6 +204,10 @@ Global
{004614DF-1C65-45E3-882D-59AE44282573}.Debug|Any CPU.Build.0 = Debug|Any CPU
{004614DF-1C65-45E3-882D-59AE44282573}.Release|Any CPU.ActiveCfg = Release|Any CPU
{004614DF-1C65-45E3-882D-59AE44282573}.Release|Any CPU.Build.0 = Release|Any CPU
{6230347F-5045-4E25-8E7A-13D7221B7444}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6230347F-5045-4E25-8E7A-13D7221B7444}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6230347F-5045-4E25-8E7A-13D7221B7444}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6230347F-5045-4E25-8E7A-13D7221B7444}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -237,6 +243,7 @@ Global
{870DDFBE-D7ED-4196-9681-13CA947BDEA6} = {81AE04BC-CBFA-4E6F-B039-8208E9AFAAE7}
{C0EEBD32-23CB-45EC-A863-79FB948508C8} = {7591C5B3-D86E-4AE4-8ED2-B272D17FE7E3}
{004614DF-1C65-45E3-882D-59AE44282573} = {7591C5B3-D86E-4AE4-8ED2-B272D17FE7E3}
{6230347F-5045-4E25-8E7A-13D7221B7444} = {7591C5B3-D86E-4AE4-8ED2-B272D17FE7E3}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {237BF0AA-9EC4-4659-AD9A-65DEB974250C}