fixes multipeer download test

This commit is contained in:
benbierens 2024-07-25 10:10:11 +02:00
parent ae4d782566
commit 485b2387e4
No known key found for this signature in database
GPG Key ID: 877D2C2E09A22F3A
9 changed files with 123 additions and 98 deletions

View File

@ -5,7 +5,7 @@ namespace Core
{
public interface IDownloadedLog
{
void IterateLines(Action<string> action);
void IterateLines(Action<string> action, params string[] thatContain);
string[] GetLinesContaining(string expectedString);
string[] FindLinesThatContain(params string[] tags);
void DeleteFile();
@ -20,7 +20,7 @@ namespace Core
logFile = logHandler.LogFile;
}
public void IterateLines(Action<string> action)
public void IterateLines(Action<string> action, params string[] thatContain)
{
using var file = File.OpenRead(logFile.FullFilename);
using var streamReader = new StreamReader(file);
@ -28,7 +28,10 @@ namespace Core
var line = streamReader.ReadLine();
while (line != null)
{
action(line);
if (thatContain.All(line.Contains))
{
action(line);
}
line = streamReader.ReadLine();
}
}

View File

@ -13,18 +13,20 @@ namespace Core
internal class Http : IHttp
{
private static readonly object httpLock = new object();
private static readonly Dictionary<string, object> httpLocks = new Dictionary<string, object>();
private readonly ILog log;
private readonly ITimeSet timeSet;
private readonly Action<HttpClient> onClientCreated;
private readonly string id;
internal Http(ILog log, ITimeSet timeSet)
: this(log, timeSet, DoNothing)
internal Http(string id, ILog log, ITimeSet timeSet)
: this(id, log, timeSet, DoNothing)
{
}
internal Http(ILog log, ITimeSet timeSet, Action<HttpClient> onClientCreated)
internal Http(string id, ILog log, ITimeSet timeSet, Action<HttpClient> onClientCreated)
{
this.id = id;
this.log = log;
this.timeSet = timeSet;
this.onClientCreated = onClientCreated;
@ -63,12 +65,19 @@ namespace Core
private T LockRetry<T>(Func<T> operation, Retry retry)
{
var httpLock = GetLock();
lock (httpLock)
{
return retry.Run(operation);
}
}
private object GetLock()
{
if (!httpLocks.ContainsKey(id)) httpLocks.Add(id, new object());
return httpLocks[id];
}
private HttpClient GetClient()
{
var client = new HttpClient();

View File

@ -27,9 +27,9 @@ namespace Core
public interface IHttpFactoryTool
{
IHttp CreateHttp(Action<HttpClient> onClientCreated);
IHttp CreateHttp(Action<HttpClient> onClientCreated, ITimeSet timeSet);
IHttp CreateHttp();
IHttp CreateHttp(string id, Action<HttpClient> onClientCreated);
IHttp CreateHttp(string id, Action<HttpClient> onClientCreated, ITimeSet timeSet);
IHttp CreateHttp(string id);
}
public interface IFileTool
@ -58,19 +58,19 @@ namespace Core
log.Prefix = prefix;
}
public IHttp CreateHttp(Action<HttpClient> onClientCreated)
public IHttp CreateHttp(string id, Action<HttpClient> onClientCreated)
{
return CreateHttp(onClientCreated, TimeSet);
return CreateHttp(id, onClientCreated, TimeSet);
}
public IHttp CreateHttp(Action<HttpClient> onClientCreated, ITimeSet ts)
public IHttp CreateHttp(string id, Action<HttpClient> onClientCreated, ITimeSet ts)
{
return new Http(log, ts, onClientCreated);
return new Http(id, log, ts, onClientCreated);
}
public IHttp CreateHttp()
public IHttp CreateHttp(string id)
{
return new Http(log, TimeSet);
return new Http(id, log, TimeSet);
}
public IStartupWorkflow CreateWorkflow(string? namespaceOverride = null)

View File

@ -132,20 +132,6 @@ namespace CodexPlugin
return workflow.GetPodInfo(Container);
}
public void LogDiskSpace(string msg)
{
what to do with this?
//try
//{
// var diskInfo = tools.CreateWorkflow().ExecuteCommand(Container.Containers.Single(), "df", "--sync");
// Log($"{msg} - Disk info: {diskInfo}");
//}
//catch (Exception e)
//{
// Log("Failed to get disk info: " + e);
//}
}
public void DeleteRepoFolder()
{
try
@ -164,13 +150,13 @@ namespace CodexPlugin
private T OnCodex<T>(Func<CodexApi, Task<T>> action)
{
var result = tools.CreateHttp(CheckContainerCrashed).OnClient(client => CallCodex(client, action));
var result = tools.CreateHttp(GetHttpId(), CheckContainerCrashed).OnClient(client => CallCodex(client, action));
return result;
}
private T OnCodex<T>(Func<CodexApi, Task<T>> action, Retry retry)
{
var result = tools.CreateHttp(CheckContainerCrashed).OnClient(client => CallCodex(client, action), retry);
var result = tools.CreateHttp(GetHttpId(), CheckContainerCrashed).OnClient(client => CallCodex(client, action), retry);
return result;
}
@ -197,7 +183,7 @@ namespace CodexPlugin
private IEndpoint GetEndpoint()
{
return tools
.CreateHttp(CheckContainerCrashed)
.CreateHttp(GetHttpId(), CheckContainerCrashed)
.CreateEndpoint(GetAddress(), "/api/codex/v1/", Container.Name);
}
@ -206,6 +192,11 @@ namespace CodexPlugin
return Container.Containers.Single().GetAddress(log, CodexContainerRecipe.ApiPortTag);
}
private string GetHttpId()
{
return GetAddress().ToString();
}
private void CheckContainerCrashed(HttpClient client)
{
if (CrashWatcher.HasContainerCrashed()) throw new Exception($"Container {GetName()} has crashed.");
@ -228,8 +219,6 @@ namespace CodexPlugin
$"(HTTP timeout = {Time.FormatDuration(timeSet.HttpCallTimeout())}) " +
$"Checking if node responds to debug/info...");
LogDiskSpace("After retry failure");
try
{
var debugInfo = GetDebugInfo();

View File

@ -115,8 +115,6 @@ namespace CodexPlugin
public ContentId UploadFile(TrackedFile file, Action<Failure> onFailure)
{
CodexAccess.LogDiskSpace("Before upload");
using var fileStream = File.OpenRead(file.Filename);
var logMessage = $"Uploading file {file.Describe()}...";
@ -133,7 +131,6 @@ namespace CodexPlugin
if (response.StartsWith(UploadFailedMessage)) FrameworkAssert.Fail("Node failed to store block.");
Log($"Uploaded file. Received contentId: '{response}'.");
CodexAccess.LogDiskSpace("After upload");
return new ContentId(response);
}
@ -205,10 +202,21 @@ namespace CodexPlugin
var log = tools.GetLog();
log.AddStringReplace(nodePeerId, nodeName);
log.AddStringReplace(ToShortIdString(nodePeerId), nodeName);
log.AddStringReplace(debugInfo.Table.LocalNode.NodeId, nodeName);
log.AddStringReplace(ToShortIdString(debugInfo.Table.LocalNode.NodeId), nodeName);
Version = debugInfo.Version;
}
private string ToShortIdString(string id)
{
if (id.Length > 10)
{
return $"{id[..3]}*{id[^6..]}";
}
return id;
}
private string[] GetPeerMultiAddresses(CodexNode peer, DebugInfo peerInfo)
{
// The peer we want to connect is in a different pod.
@ -223,8 +231,6 @@ namespace CodexPlugin
private void DownloadToFile(string contentId, TrackedFile file, Action<Failure> onFailure)
{
CodexAccess.LogDiskSpace("Before download");
using var fileStream = File.OpenWrite(file.Filename);
try
{
@ -236,8 +242,6 @@ namespace CodexPlugin
Log($"Failed to download file '{contentId}'.");
throw;
}
CodexAccess.LogDiskSpace("After download");
}
private void EnsureMarketplace()

View File

@ -34,7 +34,7 @@ namespace CodexPlugin
{
var podInfo = GetPodInfo(rc);
var podInfos = string.Join(", ", rc.Containers.Select(c => $"Container: '{c.Name}' PodLabel: '{c.RunningPod.StartResult.Deployment.PodLabel}' runs at '{podInfo.K8SNodeName}'={podInfo.Ip}"));
Log($"Started {codexSetup.NumberOfNodes} nodes of image '{containers.First().Containers.First().Recipe.Image}'. ({podInfos})");
Log($"Started node with image '{containers.First().Containers.First().Recipe.Image}'. ({podInfos})");
}
LogSeparator();

View File

@ -15,9 +15,10 @@ namespace MetricsPlugin
{
RunningContainer = runningContainer;
log = tools.GetLog();
var address = RunningContainer.GetAddress(log, PrometheusContainerRecipe.PortTag);
endpoint = tools
.CreateHttp()
.CreateEndpoint(RunningContainer.GetAddress(log, PrometheusContainerRecipe.PortTag), "/api/v1/");
.CreateHttp(address.ToString())
.CreateEndpoint(address, "/api/v1/");
}
public RunningContainer RunningContainer { get; }

View File

@ -71,7 +71,7 @@ namespace ContinuousTests
var address = new Address($"http://{serviceName}.{k8sNamespace}.svc.cluster.local", 9200);
var baseUrl = "";
var http = tools.CreateHttp(client =>
var http = tools.CreateHttp(address.ToString(), client =>
{
client.DefaultRequestHeaders.Add("kbn-xsrf", "reporting");
});

View File

@ -1,4 +1,5 @@
using DistTestCore;
using CodexPlugin;
using DistTestCore;
using NUnit.Framework;
using Utils;
@ -16,10 +17,19 @@ namespace CodexTests.ScalabilityTests
[Values(100, 1000)] int fileSize
)
{
var hosts = StartCodex(numberOfHosts, s => s.WithLogLevel(CodexPlugin.CodexLogLevel.Trace));
var hosts = StartCodex(numberOfHosts, s => s.WithName("host").WithLogLevel(CodexLogLevel.Trace));
var file = GenerateTestFile(fileSize.MB());
var cid = hosts[0].UploadFile(file);
var tailOfManifestCid = cid.Id.Substring(cid.Id.Length - 6);
var uploadTasks = hosts.Select(h =>
{
return Task.Run(() =>
{
return h.UploadFile(file);
});
}).ToArray();
Task.WaitAll(uploadTasks);
var cid = new ContentId(uploadTasks.Select(t => t.Result.Id).Distinct().Single());
var uploadLog = Ci.DownloadLog(hosts[0]);
var expectedNumberOfBlocks = RoundUp(fileSize.MB().SizeInBytes, 64.KB().SizeInBytes) + 1; // +1 for manifest block.
@ -27,55 +37,36 @@ namespace CodexTests.ScalabilityTests
.FindLinesThatContain("Block Stored")
.Select(s =>
{
var start = s.IndexOf("cid=") + 4;
var end = s.IndexOf(" count=");
var len = end - start;
return s.Substring(start, len);
var line = CodexLogLine.Parse(s)!;
return line.Attributes["cid"];
})
.ToArray();
Assert.That(blockCids.Length, Is.EqualTo(expectedNumberOfBlocks));
foreach (var h in hosts) h.DownloadContent(cid);
var client = StartCodex(s => s.WithLogLevel(CodexPlugin.CodexLogLevel.Trace));
var client = StartCodex(s => s.WithName("client").WithLogLevel(CodexLogLevel.Trace));
var resultFile = client.DownloadContent(cid);
resultFile!.AssertIsEqual(file);
var downloadLog = Ci.DownloadLog(client);
var host = string.Empty;
var blockIndexHostMap = new Dictionary<int, string>();
downloadLog.IterateLines(line =>
var blockAddressHostMap = new Dictionary<string, List<string>>();
downloadLog
.IterateLines(s =>
{
// Received blocks from peer
// topics="codex blockexcengine"
// tid=1
// peer=16U*5ULEov
// blocks="treeCid: zDzSvJTfBgds9wsRV6iB8ZVf4fL6Nynxh2hkJSyTH4j8A9QPucyU, index: 1597"
// count=28138
var line = CodexLogLine.Parse(s)!;
var peer = line.Attributes["peer"];
var blockAddresses = line.Attributes["blocks"];
if (line.Contains("peer=") && line.Contains(" len="))
{
var start = line.IndexOf("peer=") + 5;
var end = line.IndexOf(" len=");
var len = end - start;
host = line.Substring(start, len);
}
else if (!string.IsNullOrEmpty(host) && line.Contains("Storing block with key"))
{
var start = line.IndexOf("cid=") + 4;
var end = line.IndexOf(" count=");
var len = end - start;
var blockCid = line.Substring(start, len);
AddBlockAddresses(peer, blockAddresses, blockAddressHostMap);
blockCidHostMap.Add(blockCid, host);
host = string.Empty;
}
});
}, thatContain: "Received blocks from peer");
var totalFetched = blockCidHostMap.Count(p => !string.IsNullOrEmpty(p.Value));
//PrintFullMap(blockCidHostMap);
PrintOverview(blockCidHostMap);
var totalFetched = blockAddressHostMap.Count(p => p.Value.Any());
PrintFullMap(blockAddressHostMap);
//PrintOverview(blockCidHostMap);
Log("Expected number of blocks: " + expectedNumberOfBlocks);
Log("Total number of block CIDs found in dataset + manifest block: " + blockCids.Length);
@ -83,13 +74,47 @@ namespace CodexTests.ScalabilityTests
Assert.That(totalFetched, Is.EqualTo(expectedNumberOfBlocks));
}
private void PrintOverview(Dictionary<string, string> blockCidHostMap)
private void AddBlockAddresses(string peer, string blockAddresses, Dictionary<string, List<string>> blockAddressHostMap)
{
// Single line can contain multiple block addresses.
var tokens = blockAddresses.Split(",", StringSplitOptions.RemoveEmptyEntries).ToList();
while (tokens.Count > 0)
{
if (tokens.Count == 1)
{
AddBlockAddress(peer, tokens[0], blockAddressHostMap);
return;
}
var blockAddress = $"{tokens[0]}, {tokens[1]}";
tokens.RemoveRange(0, 2);
AddBlockAddress(peer, blockAddress, blockAddressHostMap);
}
}
private void AddBlockAddress(string peer, string blockAddress, Dictionary<string, List<string>> blockAddressHostMap)
{
if (blockAddressHostMap.ContainsKey(blockAddress))
{
blockAddressHostMap[blockAddress].Add(peer);
}
else
{
blockAddressHostMap[blockAddress] = new List<string> { peer };
}
}
private void PrintOverview(Dictionary<string, List<string>> blockAddressHostMap)
{
var overview = new Dictionary<string, int>();
foreach (var pair in blockCidHostMap)
foreach (var pair in blockAddressHostMap)
{
if (!overview.ContainsKey(pair.Value)) overview.Add(pair.Value, 1);
else overview[pair.Value]++;
foreach (var host in pair.Value)
{
if (!overview.ContainsKey(host)) overview.Add(host, 1);
else overview[host]++;
}
}
Log("Blocks fetched per host:");
@ -99,19 +124,13 @@ namespace CodexTests.ScalabilityTests
}
}
private void PrintFullMap(Dictionary<string, string> blockCidHostMap)
private void PrintFullMap(Dictionary<string, List<string>> blockAddressHostMap)
{
Log("Per block, host it was fetched from:");
foreach (var pair in blockCidHostMap)
foreach (var pair in blockAddressHostMap)
{
if (string.IsNullOrEmpty(pair.Value))
{
Log($"block: {pair.Key} = Not seen");
}
else
{
Log($"block: {pair.Key} = '{pair.Value}'");
}
var hostStr = $"[{string.Join(",", pair.Value)}]";
Log($"blockAddress: {pair.Key} = {hostStr}");
}
}