Adds streamless small-swarm test

This commit is contained in:
Ben 2024-12-18 12:47:54 +01:00
parent 2afcb92d08
commit 5b42b764fb
No known key found for this signature in database
GPG Key ID: 0F16E812E736C24B
5 changed files with 132 additions and 30 deletions

View File

@ -98,7 +98,7 @@
private void Fail()
{
throw new TimeoutException($"Retry '{description}' timed out after {tryNumber} tries over {Time.FormatDuration(Duration())}: {GetFailureReport}",
throw new TimeoutException($"Retry '{description}' timed out after {tryNumber} tries over {Time.FormatDuration(Duration())}: {GetFailureReport()}",
new AggregateException(failures.Select(f => f.Exception)));
}

View File

@ -23,6 +23,11 @@ namespace CodexPlugin
TrackedFile? DownloadContent(ContentId contentId, string fileLabel = "");
TrackedFile? DownloadContent(ContentId contentId, Action<Failure> onFailure, string fileLabel = "");
LocalDataset DownloadStreamless(ContentId cid);
/// <summary>
/// TODO: This will monitor the quota-used of the node until 'size' bytes are added. That's a very bad way
/// to track the streamless download progress. Replace it once we have a good API for this.
/// </summary>
LocalDataset DownloadStreamlessWait(ContentId cid, ByteSize size);
LocalDataset DownloadManifestOnly(ContentId cid);
LocalDatasetList LocalFiles();
CodexSpace Space();
@ -205,11 +210,28 @@ namespace CodexPlugin
public LocalDataset DownloadStreamless(ContentId cid)
{
Log($"Downloading streamless '{cid}' (no-wait)");
return CodexAccess.DownloadStreamless(cid);
}
public LocalDataset DownloadStreamlessWait(ContentId cid, ByteSize size)
{
Log($"Downloading streamless '{cid}' (wait till finished)");
var sw = Stopwatch.Measure(log, nameof(DownloadStreamlessWait), () =>
{
var startSpace = Space();
var result = CodexAccess.DownloadStreamless(cid);
WaitUntilQuotaUsedIncreased(startSpace, size);
return result;
});
return sw.Value;
}
public LocalDataset DownloadManifestOnly(ContentId cid)
{
Log($"Downloading manifest-only '{cid}'");
return CodexAccess.DownloadManifestOnly(cid);
}
@ -321,6 +343,39 @@ namespace CodexPlugin
}
}
public void WaitUntilQuotaUsedIncreased(CodexSpace startSpace, ByteSize expectedIncreaseOfQuotaUsed)
{
WaitUntilQuotaUsedIncreased(startSpace, expectedIncreaseOfQuotaUsed, TimeSpan.FromMinutes(2));
}
public void WaitUntilQuotaUsedIncreased(
CodexSpace startSpace,
ByteSize expectedIncreaseOfQuotaUsed,
TimeSpan maxTimeout)
{
Log($"Waiting until quotaUsed " +
$"(start: {startSpace.QuotaUsedBytes}) " +
$"increases by {expectedIncreaseOfQuotaUsed} " +
$"to reach {startSpace.QuotaUsedBytes + expectedIncreaseOfQuotaUsed.SizeInBytes}");
var retry = new Retry($"Checking local space for quotaUsed increase of {expectedIncreaseOfQuotaUsed}",
maxTimeout: maxTimeout,
sleepAfterFail: TimeSpan.FromSeconds(3),
onFail: f => { });
retry.Run(() =>
{
var space = Space();
var increase = space.QuotaUsedBytes - startSpace.QuotaUsedBytes;
if (increase < expectedIncreaseOfQuotaUsed.SizeInBytes)
throw new Exception($"Expected quota-used not reached. " +
$"Expected increase: {expectedIncreaseOfQuotaUsed.SizeInBytes} " +
$"Actual increase: {increase} " +
$"Actual used: {space.QuotaUsedBytes}");
});
}
private void EnsureMarketplace()
{
if (ethAccount == null) throw new Exception("Marketplace is not enabled for this Codex node. Please start it with the option '.EnableMarketplace(...)' to enable it.");

View File

@ -1,5 +1,7 @@
using CodexTests;
using CodexPlugin;
using CodexTests;
using NUnit.Framework;
using System.Drawing;
using Utils;
namespace CodexReleaseTests.DataTests
@ -13,31 +15,17 @@ namespace CodexReleaseTests.DataTests
var uploader = StartCodex();
var downloader = StartCodex(s => s.WithBootstrapNode(uploader));
var file = GenerateTestFile(10.MB());
var size = file.GetFilesize().SizeInBytes;
var size = 10.MB();
var file = GenerateTestFile(size);
var cid = uploader.UploadFile(file);
var startSpace = downloader.Space();
var start = DateTime.UtcNow;
var localDataset = downloader.DownloadStreamless(cid);
var localDataset = downloader.DownloadStreamlessWait(cid, size);
Assert.That(localDataset.Cid, Is.EqualTo(cid));
Assert.That(localDataset.Manifest.OriginalBytes.SizeInBytes, Is.EqualTo(file.GetFilesize().SizeInBytes));
// TODO: We have no way to inspect the status or progress of the download.
// We use local space information to estimate.
var retry = new Retry("Checking local space",
maxTimeout: TimeSpan.FromMinutes(2),
sleepAfterFail: TimeSpan.FromSeconds(3),
onFail: f => { });
retry.Run(() =>
{
var space = downloader.Space();
var expected = startSpace.FreeBytes - size;
if (space.FreeBytes > expected) throw new Exception("Expected free space not reached.");
});
// Stop the uploader node and verify that the downloader has the data.
uploader.Stop(waitTillStopped: true);
var downloaded = downloader.DownloadContent(cid);

View File

@ -24,11 +24,23 @@ namespace CodexReleaseTests.DataTests
AssertAllFilesDownloadedCorrectly(files);
}
[Test]
public void StreamlessSmallSwarm()
{
var nodes = StartCodex(NumberOfNodes);
var files = nodes.Select(UploadUniqueFilePerNode).ToArray();
var tasks = ParallelStreamlessDownloadEachFile(nodes, files);
Task.WaitAll(tasks);
AssertAllFilesStreamlesslyDownloadedCorrectly(nodes, files);
}
private SwarmTestNetworkFile UploadUniqueFilePerNode(ICodexNode node)
{
var file = GenerateTestFile(FileSizeMb.MB());
var cid = node.UploadFile(file);
return new SwarmTestNetworkFile(file, cid);
return new SwarmTestNetworkFile(node, file, cid);
}
private Task[] ParallelDownloadEachFile(ICodexNodeGroup nodes, SwarmTestNetworkFile[] files)
@ -43,6 +55,18 @@ namespace CodexReleaseTests.DataTests
return tasks.ToArray();
}
private Task[] ParallelStreamlessDownloadEachFile(ICodexNodeGroup nodes, SwarmTestNetworkFile[] files)
{
var tasks = new List<Task>();
foreach (var node in nodes)
{
tasks.Add(StartStreamlessDownload(node, files));
}
return tasks.ToArray();
}
private Task StartDownload(ICodexNode node, SwarmTestNetworkFile[] files)
{
return Task.Run(() =>
@ -68,6 +92,31 @@ namespace CodexReleaseTests.DataTests
});
}
private Task StartStreamlessDownload(ICodexNode node, SwarmTestNetworkFile[] files)
{
return Task.Run(() =>
{
var remaining = files.ToList();
while (remaining.Count > 0)
{
var file = remaining.PickOneRandom();
if (file.Uploader.GetName() != node.GetName())
{
try
{
var startSpace = node.Space();
node.DownloadStreamlessWait(file.Cid, FileSizeMb.MB());
}
catch (Exception ex)
{
file.Error = ex;
}
}
}
});
}
private void AssertAllFilesDownloadedCorrectly(SwarmTestNetworkFile[] files)
{
foreach (var file in files)
@ -83,14 +132,32 @@ namespace CodexReleaseTests.DataTests
}
}
private void AssertAllFilesStreamlesslyDownloadedCorrectly(ICodexNodeGroup nodes, SwarmTestNetworkFile[] files)
{
var totalFilesSpace = 0.Bytes();
foreach (var file in files)
{
if (file.Error != null) throw file.Error;
totalFilesSpace = new ByteSize(totalFilesSpace.SizeInBytes + file.Original.GetFilesize().SizeInBytes);
}
foreach (var node in nodes)
{
var currentSpace = node.Space();
Assert.That(currentSpace.QuotaUsedBytes, Is.GreaterThanOrEqualTo(totalFilesSpace.SizeInBytes));
}
}
private class SwarmTestNetworkFile
{
public SwarmTestNetworkFile(TrackedFile original, ContentId cid)
public SwarmTestNetworkFile(ICodexNode uploader, TrackedFile original, ContentId cid)
{
Uploader = uploader;
Original = original;
Cid = cid;
}
public ICodexNode Uploader { get; }
public TrackedFile Original { get; }
public ContentId Cid { get; }
public object Lock { get; } = new object();

View File

@ -15,6 +15,7 @@ using Newtonsoft.Json;
using NUnit.Framework;
using NUnit.Framework.Constraints;
using OverwatchTranscript;
using Utils;
namespace CodexTests
{
@ -155,15 +156,6 @@ namespace CodexTests
node.Space().ToString() + Environment.NewLine;
}
// Disabled for now: Makes huge log files!
//private string GetNodeMetrics(IMetricsAccess? metrics)
//{
// if (metrics == null) return "No metrics enabled";
// var m = metrics.GetAllMetrics();
// if (m == null) return "No metrics received";
// return m.AsCsv();
//}
protected virtual void OnCodexSetup(ICodexSetup setup)
{
}