From 485b2387e482ffc2474afe76a33ae8032bc01315 Mon Sep 17 00:00:00 2001 From: benbierens Date: Thu, 25 Jul 2024 10:10:11 +0200 Subject: [PATCH] fixes multipeer download test --- Framework/Core/DownloadedLog.cs | 9 +- Framework/Core/Http.cs | 17 ++- Framework/Core/PluginTools.cs | 18 +-- ProjectPlugins/CodexPlugin/CodexAccess.cs | 27 ++-- ProjectPlugins/CodexPlugin/CodexNode.cs | 18 ++- ProjectPlugins/CodexPlugin/CodexStarter.cs | 2 +- ProjectPlugins/MetricsPlugin/MetricsQuery.cs | 5 +- .../ElasticSearchLogDownloader.cs | 2 +- .../MultiPeerDownloadTests.cs | 123 ++++++++++-------- 9 files changed, 123 insertions(+), 98 deletions(-) diff --git a/Framework/Core/DownloadedLog.cs b/Framework/Core/DownloadedLog.cs index 3979f3ee..4a4d8e9e 100644 --- a/Framework/Core/DownloadedLog.cs +++ b/Framework/Core/DownloadedLog.cs @@ -5,7 +5,7 @@ namespace Core { public interface IDownloadedLog { - void IterateLines(Action action); + void IterateLines(Action action, params string[] thatContain); string[] GetLinesContaining(string expectedString); string[] FindLinesThatContain(params string[] tags); void DeleteFile(); @@ -20,7 +20,7 @@ namespace Core logFile = logHandler.LogFile; } - public void IterateLines(Action action) + public void IterateLines(Action action, params string[] thatContain) { using var file = File.OpenRead(logFile.FullFilename); using var streamReader = new StreamReader(file); @@ -28,7 +28,10 @@ namespace Core var line = streamReader.ReadLine(); while (line != null) { - action(line); + if (thatContain.All(line.Contains)) + { + action(line); + } line = streamReader.ReadLine(); } } diff --git a/Framework/Core/Http.cs b/Framework/Core/Http.cs index 44cb3c66..1cb468ad 100644 --- a/Framework/Core/Http.cs +++ b/Framework/Core/Http.cs @@ -13,18 +13,20 @@ namespace Core internal class Http : IHttp { - private static readonly object httpLock = new object(); + private static readonly Dictionary httpLocks = new Dictionary(); private readonly ILog log; private readonly ITimeSet timeSet; private readonly Action onClientCreated; + private readonly string id; - internal Http(ILog log, ITimeSet timeSet) - : this(log, timeSet, DoNothing) + internal Http(string id, ILog log, ITimeSet timeSet) + : this(id, log, timeSet, DoNothing) { } - internal Http(ILog log, ITimeSet timeSet, Action onClientCreated) + internal Http(string id, ILog log, ITimeSet timeSet, Action onClientCreated) { + this.id = id; this.log = log; this.timeSet = timeSet; this.onClientCreated = onClientCreated; @@ -63,12 +65,19 @@ namespace Core private T LockRetry(Func operation, Retry retry) { + var httpLock = GetLock(); lock (httpLock) { return retry.Run(operation); } } + private object GetLock() + { + if (!httpLocks.ContainsKey(id)) httpLocks.Add(id, new object()); + return httpLocks[id]; + } + private HttpClient GetClient() { var client = new HttpClient(); diff --git a/Framework/Core/PluginTools.cs b/Framework/Core/PluginTools.cs index aa8e0a6e..3db27449 100644 --- a/Framework/Core/PluginTools.cs +++ b/Framework/Core/PluginTools.cs @@ -27,9 +27,9 @@ namespace Core public interface IHttpFactoryTool { - IHttp CreateHttp(Action onClientCreated); - IHttp CreateHttp(Action onClientCreated, ITimeSet timeSet); - IHttp CreateHttp(); + IHttp CreateHttp(string id, Action onClientCreated); + IHttp CreateHttp(string id, Action onClientCreated, ITimeSet timeSet); + IHttp CreateHttp(string id); } public interface IFileTool @@ -58,19 +58,19 @@ namespace Core log.Prefix = prefix; } - public IHttp CreateHttp(Action onClientCreated) + public IHttp CreateHttp(string id, Action onClientCreated) { - return CreateHttp(onClientCreated, TimeSet); + return CreateHttp(id, onClientCreated, TimeSet); } - public IHttp CreateHttp(Action onClientCreated, ITimeSet ts) + public IHttp CreateHttp(string id, Action onClientCreated, ITimeSet ts) { - return new Http(log, ts, onClientCreated); + return new Http(id, log, ts, onClientCreated); } - public IHttp CreateHttp() + public IHttp CreateHttp(string id) { - return new Http(log, TimeSet); + return new Http(id, log, TimeSet); } public IStartupWorkflow CreateWorkflow(string? namespaceOverride = null) diff --git a/ProjectPlugins/CodexPlugin/CodexAccess.cs b/ProjectPlugins/CodexPlugin/CodexAccess.cs index 72051f4a..704c74ba 100644 --- a/ProjectPlugins/CodexPlugin/CodexAccess.cs +++ b/ProjectPlugins/CodexPlugin/CodexAccess.cs @@ -132,20 +132,6 @@ namespace CodexPlugin return workflow.GetPodInfo(Container); } - public void LogDiskSpace(string msg) - { - what to do with this? - //try - //{ - // var diskInfo = tools.CreateWorkflow().ExecuteCommand(Container.Containers.Single(), "df", "--sync"); - // Log($"{msg} - Disk info: {diskInfo}"); - //} - //catch (Exception e) - //{ - // Log("Failed to get disk info: " + e); - //} - } - public void DeleteRepoFolder() { try @@ -164,13 +150,13 @@ namespace CodexPlugin private T OnCodex(Func> action) { - var result = tools.CreateHttp(CheckContainerCrashed).OnClient(client => CallCodex(client, action)); + var result = tools.CreateHttp(GetHttpId(), CheckContainerCrashed).OnClient(client => CallCodex(client, action)); return result; } private T OnCodex(Func> action, Retry retry) { - var result = tools.CreateHttp(CheckContainerCrashed).OnClient(client => CallCodex(client, action), retry); + var result = tools.CreateHttp(GetHttpId(), CheckContainerCrashed).OnClient(client => CallCodex(client, action), retry); return result; } @@ -197,7 +183,7 @@ namespace CodexPlugin private IEndpoint GetEndpoint() { return tools - .CreateHttp(CheckContainerCrashed) + .CreateHttp(GetHttpId(), CheckContainerCrashed) .CreateEndpoint(GetAddress(), "/api/codex/v1/", Container.Name); } @@ -206,6 +192,11 @@ namespace CodexPlugin return Container.Containers.Single().GetAddress(log, CodexContainerRecipe.ApiPortTag); } + private string GetHttpId() + { + return GetAddress().ToString(); + } + private void CheckContainerCrashed(HttpClient client) { if (CrashWatcher.HasContainerCrashed()) throw new Exception($"Container {GetName()} has crashed."); @@ -228,8 +219,6 @@ namespace CodexPlugin $"(HTTP timeout = {Time.FormatDuration(timeSet.HttpCallTimeout())}) " + $"Checking if node responds to debug/info..."); - LogDiskSpace("After retry failure"); - try { var debugInfo = GetDebugInfo(); diff --git a/ProjectPlugins/CodexPlugin/CodexNode.cs b/ProjectPlugins/CodexPlugin/CodexNode.cs index c5372f65..47b5961c 100644 --- a/ProjectPlugins/CodexPlugin/CodexNode.cs +++ b/ProjectPlugins/CodexPlugin/CodexNode.cs @@ -115,8 +115,6 @@ namespace CodexPlugin public ContentId UploadFile(TrackedFile file, Action onFailure) { - CodexAccess.LogDiskSpace("Before upload"); - using var fileStream = File.OpenRead(file.Filename); var logMessage = $"Uploading file {file.Describe()}..."; @@ -133,7 +131,6 @@ namespace CodexPlugin if (response.StartsWith(UploadFailedMessage)) FrameworkAssert.Fail("Node failed to store block."); Log($"Uploaded file. Received contentId: '{response}'."); - CodexAccess.LogDiskSpace("After upload"); return new ContentId(response); } @@ -205,10 +202,21 @@ namespace CodexPlugin var log = tools.GetLog(); log.AddStringReplace(nodePeerId, nodeName); + log.AddStringReplace(ToShortIdString(nodePeerId), nodeName); log.AddStringReplace(debugInfo.Table.LocalNode.NodeId, nodeName); + log.AddStringReplace(ToShortIdString(debugInfo.Table.LocalNode.NodeId), nodeName); Version = debugInfo.Version; } + private string ToShortIdString(string id) + { + if (id.Length > 10) + { + return $"{id[..3]}*{id[^6..]}"; + } + return id; + } + private string[] GetPeerMultiAddresses(CodexNode peer, DebugInfo peerInfo) { // The peer we want to connect is in a different pod. @@ -223,8 +231,6 @@ namespace CodexPlugin private void DownloadToFile(string contentId, TrackedFile file, Action onFailure) { - CodexAccess.LogDiskSpace("Before download"); - using var fileStream = File.OpenWrite(file.Filename); try { @@ -236,8 +242,6 @@ namespace CodexPlugin Log($"Failed to download file '{contentId}'."); throw; } - - CodexAccess.LogDiskSpace("After download"); } private void EnsureMarketplace() diff --git a/ProjectPlugins/CodexPlugin/CodexStarter.cs b/ProjectPlugins/CodexPlugin/CodexStarter.cs index 77fd89a6..eee5ef33 100644 --- a/ProjectPlugins/CodexPlugin/CodexStarter.cs +++ b/ProjectPlugins/CodexPlugin/CodexStarter.cs @@ -34,7 +34,7 @@ namespace CodexPlugin { var podInfo = GetPodInfo(rc); var podInfos = string.Join(", ", rc.Containers.Select(c => $"Container: '{c.Name}' PodLabel: '{c.RunningPod.StartResult.Deployment.PodLabel}' runs at '{podInfo.K8SNodeName}'={podInfo.Ip}")); - Log($"Started {codexSetup.NumberOfNodes} nodes of image '{containers.First().Containers.First().Recipe.Image}'. ({podInfos})"); + Log($"Started node with image '{containers.First().Containers.First().Recipe.Image}'. ({podInfos})"); } LogSeparator(); diff --git a/ProjectPlugins/MetricsPlugin/MetricsQuery.cs b/ProjectPlugins/MetricsPlugin/MetricsQuery.cs index cdab6971..a66351c0 100644 --- a/ProjectPlugins/MetricsPlugin/MetricsQuery.cs +++ b/ProjectPlugins/MetricsPlugin/MetricsQuery.cs @@ -15,9 +15,10 @@ namespace MetricsPlugin { RunningContainer = runningContainer; log = tools.GetLog(); + var address = RunningContainer.GetAddress(log, PrometheusContainerRecipe.PortTag); endpoint = tools - .CreateHttp() - .CreateEndpoint(RunningContainer.GetAddress(log, PrometheusContainerRecipe.PortTag), "/api/v1/"); + .CreateHttp(address.ToString()) + .CreateEndpoint(address, "/api/v1/"); } public RunningContainer RunningContainer { get; } diff --git a/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs b/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs index 10c909ca..397b5db1 100644 --- a/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs +++ b/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs @@ -71,7 +71,7 @@ namespace ContinuousTests var address = new Address($"http://{serviceName}.{k8sNamespace}.svc.cluster.local", 9200); var baseUrl = ""; - var http = tools.CreateHttp(client => + var http = tools.CreateHttp(address.ToString(), client => { client.DefaultRequestHeaders.Add("kbn-xsrf", "reporting"); }); diff --git a/Tests/CodexLongTests/ScalabilityTests/MultiPeerDownloadTests.cs b/Tests/CodexLongTests/ScalabilityTests/MultiPeerDownloadTests.cs index bc59a909..3d37c635 100644 --- a/Tests/CodexLongTests/ScalabilityTests/MultiPeerDownloadTests.cs +++ b/Tests/CodexLongTests/ScalabilityTests/MultiPeerDownloadTests.cs @@ -1,4 +1,5 @@ -using DistTestCore; +using CodexPlugin; +using DistTestCore; using NUnit.Framework; using Utils; @@ -16,10 +17,19 @@ namespace CodexTests.ScalabilityTests [Values(100, 1000)] int fileSize ) { - var hosts = StartCodex(numberOfHosts, s => s.WithLogLevel(CodexPlugin.CodexLogLevel.Trace)); + var hosts = StartCodex(numberOfHosts, s => s.WithName("host").WithLogLevel(CodexLogLevel.Trace)); var file = GenerateTestFile(fileSize.MB()); - var cid = hosts[0].UploadFile(file); - var tailOfManifestCid = cid.Id.Substring(cid.Id.Length - 6); + + var uploadTasks = hosts.Select(h => + { + return Task.Run(() => + { + return h.UploadFile(file); + }); + }).ToArray(); + + Task.WaitAll(uploadTasks); + var cid = new ContentId(uploadTasks.Select(t => t.Result.Id).Distinct().Single()); var uploadLog = Ci.DownloadLog(hosts[0]); var expectedNumberOfBlocks = RoundUp(fileSize.MB().SizeInBytes, 64.KB().SizeInBytes) + 1; // +1 for manifest block. @@ -27,55 +37,36 @@ namespace CodexTests.ScalabilityTests .FindLinesThatContain("Block Stored") .Select(s => { - var start = s.IndexOf("cid=") + 4; - var end = s.IndexOf(" count="); - var len = end - start; - return s.Substring(start, len); + var line = CodexLogLine.Parse(s)!; + return line.Attributes["cid"]; }) .ToArray(); Assert.That(blockCids.Length, Is.EqualTo(expectedNumberOfBlocks)); - foreach (var h in hosts) h.DownloadContent(cid); - var client = StartCodex(s => s.WithLogLevel(CodexPlugin.CodexLogLevel.Trace)); + + var client = StartCodex(s => s.WithName("client").WithLogLevel(CodexLogLevel.Trace)); var resultFile = client.DownloadContent(cid); resultFile!.AssertIsEqual(file); var downloadLog = Ci.DownloadLog(client); var host = string.Empty; - var blockIndexHostMap = new Dictionary(); - downloadLog.IterateLines(line => + var blockAddressHostMap = new Dictionary>(); + downloadLog + .IterateLines(s => { - // Received blocks from peer - // topics="codex blockexcengine" - // tid=1 - // peer=16U*5ULEov - // blocks="treeCid: zDzSvJTfBgds9wsRV6iB8ZVf4fL6Nynxh2hkJSyTH4j8A9QPucyU, index: 1597" - // count=28138 + var line = CodexLogLine.Parse(s)!; + var peer = line.Attributes["peer"]; + var blockAddresses = line.Attributes["blocks"]; - if (line.Contains("peer=") && line.Contains(" len=")) - { - var start = line.IndexOf("peer=") + 5; - var end = line.IndexOf(" len="); - var len = end - start; - host = line.Substring(start, len); - } - else if (!string.IsNullOrEmpty(host) && line.Contains("Storing block with key")) - { - var start = line.IndexOf("cid=") + 4; - var end = line.IndexOf(" count="); - var len = end - start; - var blockCid = line.Substring(start, len); + AddBlockAddresses(peer, blockAddresses, blockAddressHostMap); - blockCidHostMap.Add(blockCid, host); - host = string.Empty; - } - }); + }, thatContain: "Received blocks from peer"); - var totalFetched = blockCidHostMap.Count(p => !string.IsNullOrEmpty(p.Value)); - //PrintFullMap(blockCidHostMap); - PrintOverview(blockCidHostMap); + var totalFetched = blockAddressHostMap.Count(p => p.Value.Any()); + PrintFullMap(blockAddressHostMap); + //PrintOverview(blockCidHostMap); Log("Expected number of blocks: " + expectedNumberOfBlocks); Log("Total number of block CIDs found in dataset + manifest block: " + blockCids.Length); @@ -83,13 +74,47 @@ namespace CodexTests.ScalabilityTests Assert.That(totalFetched, Is.EqualTo(expectedNumberOfBlocks)); } - private void PrintOverview(Dictionary blockCidHostMap) + private void AddBlockAddresses(string peer, string blockAddresses, Dictionary> blockAddressHostMap) + { + // Single line can contain multiple block addresses. + var tokens = blockAddresses.Split(",", StringSplitOptions.RemoveEmptyEntries).ToList(); + while (tokens.Count > 0) + { + if (tokens.Count == 1) + { + AddBlockAddress(peer, tokens[0], blockAddressHostMap); + return; + } + + var blockAddress = $"{tokens[0]}, {tokens[1]}"; + tokens.RemoveRange(0, 2); + + AddBlockAddress(peer, blockAddress, blockAddressHostMap); + } + } + + private void AddBlockAddress(string peer, string blockAddress, Dictionary> blockAddressHostMap) + { + if (blockAddressHostMap.ContainsKey(blockAddress)) + { + blockAddressHostMap[blockAddress].Add(peer); + } + else + { + blockAddressHostMap[blockAddress] = new List { peer }; + } + } + + private void PrintOverview(Dictionary> blockAddressHostMap) { var overview = new Dictionary(); - foreach (var pair in blockCidHostMap) + foreach (var pair in blockAddressHostMap) { - if (!overview.ContainsKey(pair.Value)) overview.Add(pair.Value, 1); - else overview[pair.Value]++; + foreach (var host in pair.Value) + { + if (!overview.ContainsKey(host)) overview.Add(host, 1); + else overview[host]++; + } } Log("Blocks fetched per host:"); @@ -99,19 +124,13 @@ namespace CodexTests.ScalabilityTests } } - private void PrintFullMap(Dictionary blockCidHostMap) + private void PrintFullMap(Dictionary> blockAddressHostMap) { Log("Per block, host it was fetched from:"); - foreach (var pair in blockCidHostMap) + foreach (var pair in blockAddressHostMap) { - if (string.IsNullOrEmpty(pair.Value)) - { - Log($"block: {pair.Key} = Not seen"); - } - else - { - Log($"block: {pair.Key} = '{pair.Value}'"); - } + var hostStr = $"[{string.Join(",", pair.Value)}]"; + Log($"blockAddress: {pair.Key} = {hostStr}"); } }