2
0
mirror of synced 2025-01-24 15:28:58 +00:00

Merge branch 'master' into chainstate-update

# Conflicts:
#	ProjectPlugins/CodexPlugin/CodexNode.cs
#	ProjectPlugins/CodexPlugin/MarketplaceAccess.cs
This commit is contained in:
benbierens 2024-06-14 09:28:34 +02:00
commit 1fa7787d3b
No known key found for this signature in database
GPG Key ID: 877D2C2E09A22F3A
40 changed files with 732 additions and 291 deletions

View File

@ -38,10 +38,14 @@ namespace Core
return new CoreInterface(this);
}
public void Decommission(bool deleteKubernetesResources, bool deleteTrackedFiles)
/// <summary>
/// Deletes kubernetes and tracked file resources.
/// when `waitTillDone` is true, this function will block until resources are deleted.
/// </summary>
public void Decommission(bool deleteKubernetesResources, bool deleteTrackedFiles, bool waitTillDone)
{
manager.DecommissionPlugins(deleteKubernetesResources, deleteTrackedFiles);
Tools.Decommission(deleteKubernetesResources, deleteTrackedFiles);
manager.DecommissionPlugins(deleteKubernetesResources, deleteTrackedFiles, waitTillDone);
Tools.Decommission(deleteKubernetesResources, deleteTrackedFiles, waitTillDone);
}
internal T GetPlugin<T>() where T : IProjectPlugin

View File

@ -7,6 +7,7 @@ namespace Core
{
T OnClient<T>(Func<HttpClient, T> action);
T OnClient<T>(Func<HttpClient, T> action, string description);
T OnClient<T>(Func<HttpClient, T> action, Retry retry);
IEndpoint CreateEndpoint(Address address, string baseUrl, string? logAlias = null);
}
@ -35,13 +36,19 @@ namespace Core
}
public T OnClient<T>(Func<HttpClient, T> action, string description)
{
var retry = new Retry(description, timeSet.HttpRetryTimeout(), timeSet.HttpCallRetryDelay(), f => { });
return OnClient(action, retry);
}
public T OnClient<T>(Func<HttpClient, T> action, Retry retry)
{
var client = GetClient();
return LockRetry(() =>
{
return action(client);
}, description);
}, retry);
}
public IEndpoint CreateEndpoint(Address address, string baseUrl, string? logAlias = null)
@ -54,11 +61,11 @@ namespace Core
return DebugStack.GetCallerName(skipFrames: 2);
}
private T LockRetry<T>(Func<T> operation, string description)
private T LockRetry<T>(Func<T> operation, Retry retry)
{
lock (httpLock)
{
return Time.Retry(operation, timeSet.HttpRetryTimeout(), timeSet.HttpCallRetryDelay(), description);
return retry.Run(operation);
}
}

View File

@ -34,12 +34,12 @@
return metadata;
}
internal void DecommissionPlugins(bool deleteKubernetesResources, bool deleteTrackedFiles)
internal void DecommissionPlugins(bool deleteKubernetesResources, bool deleteTrackedFiles, bool waitTillDone)
{
foreach (var pair in pairs)
{
pair.Plugin.Decommission();
pair.Tools.Decommission(deleteKubernetesResources, deleteTrackedFiles);
pair.Tools.Decommission(deleteKubernetesResources, deleteTrackedFiles, waitTillDone);
}
}

View File

@ -6,7 +6,13 @@ namespace Core
{
public interface IPluginTools : IWorkflowTool, ILogTool, IHttpFactoryTool, IFileTool
{
void Decommission(bool deleteKubernetesResources, bool deleteTrackedFiles);
ITimeSet TimeSet { get; }
/// <summary>
/// Deletes kubernetes and tracked file resources.
/// when `waitTillDone` is true, this function will block until resources are deleted.
/// </summary>
void Decommission(bool deleteKubernetesResources, bool deleteTrackedFiles, bool waitTillDone);
}
public interface IWorkflowTool
@ -33,7 +39,6 @@ namespace Core
internal class PluginTools : IPluginTools
{
private readonly ITimeSet timeSet;
private readonly WorkflowCreator workflowCreator;
private readonly IFileManager fileManager;
private readonly LogPrefixer log;
@ -42,10 +47,12 @@ namespace Core
{
this.log = new LogPrefixer(log);
this.workflowCreator = workflowCreator;
this.timeSet = timeSet;
TimeSet = timeSet;
fileManager = new FileManager(log, fileManagerRootFolder);
}
public ITimeSet TimeSet { get; }
public void ApplyLogPrefix(string prefix)
{
log.Prefix = prefix;
@ -53,7 +60,7 @@ namespace Core
public IHttp CreateHttp(Action<HttpClient> onClientCreated)
{
return CreateHttp(onClientCreated, timeSet);
return CreateHttp(onClientCreated, TimeSet);
}
public IHttp CreateHttp(Action<HttpClient> onClientCreated, ITimeSet ts)
@ -63,7 +70,7 @@ namespace Core
public IHttp CreateHttp()
{
return new Http(log, timeSet);
return new Http(log, TimeSet);
}
public IStartupWorkflow CreateWorkflow(string? namespaceOverride = null)
@ -71,9 +78,9 @@ namespace Core
return workflowCreator.CreateWorkflow(namespaceOverride);
}
public void Decommission(bool deleteKubernetesResources, bool deleteTrackedFiles)
public void Decommission(bool deleteKubernetesResources, bool deleteTrackedFiles, bool waitTillDone)
{
if (deleteKubernetesResources) CreateWorkflow().DeleteNamespace();
if (deleteKubernetesResources) CreateWorkflow().DeleteNamespace(waitTillDone);
if (deleteTrackedFiles) fileManager.DeleteAllFiles();
}

View File

@ -1,41 +0,0 @@
using Utils;
namespace KubernetesWorkflow
{
public static class ByteSizeExtensions
{
public static string ToSuffixNotation(this ByteSize b)
{
long x = 1024;
var map = new Dictionary<long, string>
{
{ Pow(x, 4), "Ti" },
{ Pow(x, 3), "Gi" },
{ Pow(x, 2), "Mi" },
{ (x), "Ki" },
};
var bytes = b.SizeInBytes;
foreach (var pair in map)
{
if (bytes > pair.Key)
{
double bytesD = bytes;
double divD = pair.Key;
double numD = Math.Ceiling(bytesD / divD);
var v = Convert.ToInt64(numD);
return $"{v}{pair.Value}";
}
}
return $"{bytes}";
}
private static long Pow(long x, int v)
{
long result = 1;
for (var i = 0; i < v; i++) result *= x;
return result;
}
}
}

View File

@ -50,7 +50,9 @@ namespace KubernetesWorkflow
public bool HasContainerCrashed()
{
using var client = new Kubernetes(config);
return HasContainerBeenRestarted(client);
var result = HasContainerBeenRestarted(client);
if (result) DownloadCrashedContainerLogs(client);
return result;
}
private void Worker()
@ -83,12 +85,13 @@ namespace KubernetesWorkflow
private bool HasContainerBeenRestarted(Kubernetes client)
{
var podInfo = client.ReadNamespacedPod(podName, k8sNamespace);
return podInfo.Status.ContainerStatuses.Any(c => c.RestartCount > 0);
var result = podInfo.Status.ContainerStatuses.Any(c => c.RestartCount > 0);
if (result) log.Log("Pod crash detected for " + containerName);
return result;
}
private void DownloadCrashedContainerLogs(Kubernetes client)
{
log.Log("Pod crash detected for " + containerName);
using var stream = client.ReadNamespacedPodLog(podName, k8sNamespace, recipeName, previous: true);
logHandler!.Log(stream);
}

View File

@ -16,6 +16,7 @@ namespace KubernetesWorkflow
{
var config = GetConfig();
UpdateHostAddress(config);
config.SkipTlsVerify = true; // Required for operation on Wings cluster.
return config;
}

View File

@ -115,7 +115,7 @@ namespace KubernetesWorkflow
});
}
public void DeleteAllNamespacesStartingWith(string prefix)
public void DeleteAllNamespacesStartingWith(string prefix, bool wait)
{
log.Debug();
@ -124,25 +124,28 @@ namespace KubernetesWorkflow
foreach (var ns in namespaces)
{
DeleteNamespace(ns);
DeleteNamespace(ns, wait);
}
}
public void DeleteNamespace()
public void DeleteNamespace(bool wait)
{
log.Debug();
if (IsNamespaceOnline(K8sNamespace))
{
client.Run(c => c.DeleteNamespace(K8sNamespace, null, null, gracePeriodSeconds: 0));
if (wait) WaitUntilNamespaceDeleted(K8sNamespace);
}
}
public void DeleteNamespace(string ns)
public void DeleteNamespace(string ns, bool wait)
{
log.Debug();
if (IsNamespaceOnline(ns))
{
client.Run(c => c.DeleteNamespace(ns, null, null, gracePeriodSeconds: 0));
if (wait) WaitUntilNamespaceDeleted(ns);
}
}
@ -532,7 +535,7 @@ namespace KubernetesWorkflow
}
if (set.Memory.SizeInBytes != 0)
{
result.Add("memory", new ResourceQuantity(set.Memory.ToSuffixNotation()));
result.Add("memory", new ResourceQuantity(set.Memory.SizeInBytes.ToString()));
}
return result;
}
@ -871,6 +874,11 @@ namespace KubernetesWorkflow
WaitUntil(() => IsNamespaceOnline(K8sNamespace), nameof(WaitUntilNamespaceCreated));
}
private void WaitUntilNamespaceDeleted(string @namespace)
{
WaitUntil(() => !IsNamespaceOnline(@namespace), nameof(WaitUntilNamespaceDeleted));
}
private void WaitUntilDeploymentOnline(string deploymentName)
{
WaitUntil(() =>

View File

@ -105,7 +105,7 @@ namespace KubernetesWorkflow.Recipe
protected void AddVolume(string name, string mountPath, string? subPath = null, string? secret = null, string? hostPath = null)
{
var size = 10.MB().ToSuffixNotation();
var size = 10.MB().SizeInBytes.ToString();
volumeMounts.Add(new VolumeMount(name, mountPath, subPath, size, secret, hostPath));
}
@ -114,7 +114,7 @@ namespace KubernetesWorkflow.Recipe
volumeMounts.Add(new VolumeMount(
$"autovolume-{Guid.NewGuid().ToString().ToLowerInvariant()}",
mountPath,
resourceQuantity: volumeSize.ToSuffixNotation()));
resourceQuantity: volumeSize.SizeInBytes.ToString()));
}
protected void Additional(object userData)

View File

@ -1,8 +1,5 @@
using k8s;
using k8s.Models;
using KubernetesWorkflow.Recipe;
using KubernetesWorkflow.Recipe;
using KubernetesWorkflow.Types;
using Newtonsoft.Json;
namespace KubernetesWorkflow
{

View File

@ -17,8 +17,8 @@ namespace KubernetesWorkflow
void Stop(RunningPod pod, bool waitTillStopped);
void DownloadContainerLog(RunningContainer container, ILogHandler logHandler, int? tailLines = null);
string ExecuteCommand(RunningContainer container, string command, params string[] args);
void DeleteNamespace();
void DeleteNamespacesStartingWith(string namespacePrefix);
void DeleteNamespace(bool wait);
void DeleteNamespacesStartingWith(string namespacePrefix, bool wait);
}
public class StartupWorkflow : IStartupWorkflow
@ -122,19 +122,19 @@ namespace KubernetesWorkflow
});
}
public void DeleteNamespace()
public void DeleteNamespace(bool wait)
{
K8s(controller =>
{
controller.DeleteNamespace();
controller.DeleteNamespace(wait);
});
}
public void DeleteNamespacesStartingWith(string namespacePrefix)
public void DeleteNamespacesStartingWith(string namespacePrefix, bool wait)
{
K8s(controller =>
{
controller.DeleteAllNamespacesStartingWith(namespacePrefix);
controller.DeleteAllNamespacesStartingWith(namespacePrefix, wait);
});
}

131
Framework/Utils/Retry.cs Normal file
View File

@ -0,0 +1,131 @@
namespace Utils
{
public class Retry
{
private readonly string description;
private readonly TimeSpan maxTimeout;
private readonly TimeSpan sleepAfterFail;
private readonly Action<Failure> onFail;
public Retry(string description, TimeSpan maxTimeout, TimeSpan sleepAfterFail, Action<Failure> onFail)
{
this.description = description;
this.maxTimeout = maxTimeout;
this.sleepAfterFail = sleepAfterFail;
this.onFail = onFail;
}
public void Run(Action task)
{
var run = new RetryRun(description, task, maxTimeout, sleepAfterFail, onFail);
run.Run();
}
public T Run<T>(Func<T> task)
{
T? result = default;
var run = new RetryRun(description, () =>
{
result = task();
}, maxTimeout, sleepAfterFail, onFail);
run.Run();
return result!;
}
private class RetryRun
{
private readonly string description;
private readonly Action task;
private readonly TimeSpan maxTimeout;
private readonly TimeSpan sleepAfterFail;
private readonly Action<Failure> onFail;
private readonly DateTime start = DateTime.UtcNow;
private readonly List<Failure> failures = new List<Failure>();
private int tryNumber;
private DateTime tryStart;
public RetryRun(string description, Action task, TimeSpan maxTimeout, TimeSpan sleepAfterFail, Action<Failure> onFail)
{
this.description = description;
this.task = task;
this.maxTimeout = maxTimeout;
this.sleepAfterFail = sleepAfterFail;
this.onFail = onFail;
tryNumber = 0;
tryStart = DateTime.UtcNow;
}
public void Run()
{
while (true)
{
CheckMaximums();
tryNumber++;
tryStart = DateTime.UtcNow;
try
{
task();
return;
}
catch (Exception ex)
{
var failure = CaptureFailure(ex);
onFail(failure);
Time.Sleep(sleepAfterFail);
}
}
}
private Failure CaptureFailure(Exception ex)
{
var f = new Failure(ex, DateTime.UtcNow - tryStart, tryNumber);
failures.Add(f);
return f;
}
private void CheckMaximums()
{
if (Duration() > maxTimeout) Fail();
}
private void Fail()
{
throw new TimeoutException($"Retry '{description}' timed out after {tryNumber} tries over {Time.FormatDuration(Duration())}: {GetFailureReport}",
new AggregateException(failures.Select(f => f.Exception)));
}
private string GetFailureReport()
{
return Environment.NewLine + string.Join(Environment.NewLine, failures.Select(f => f.Describe()));
}
private TimeSpan Duration()
{
return DateTime.UtcNow - start;
}
}
}
public class Failure
{
public Failure(Exception exception, TimeSpan duration, int tryNumber)
{
Exception = exception;
Duration = duration;
TryNumber = tryNumber;
}
public Exception Exception { get; }
public TimeSpan Duration { get; }
public int TryNumber { get; }
public string Describe()
{
return $"Try {TryNumber} failed after {Time.FormatDuration(Duration)} with exception '{Exception}'";
}
}
}

View File

@ -1,6 +1,4 @@
using System.Diagnostics;
namespace Utils
namespace Utils
{
public static class Time
{
@ -111,78 +109,24 @@ namespace Utils
public static void Retry(Action action, TimeSpan maxTimeout, TimeSpan retryTime, string description)
{
var start = DateTime.UtcNow;
var tries = 1;
var tryInfo = new List<(Exception, TimeSpan)>();
while (true)
{
var duration = DateTime.UtcNow - start;
if (duration > maxTimeout)
{
var info = FormatTryInfos(tryInfo);
throw new TimeoutException($"Retry '{description}' timed out after {tries} tries over {FormatDuration(duration)}.{Environment.NewLine}{info}");
}
var sw = Stopwatch.StartNew();
try
{
action();
return;
}
catch (Exception ex)
{
tryInfo.Add((ex, sw.Elapsed));
tries++;
}
Sleep(retryTime);
}
}
private static string FormatTryInfos(List<(Exception, TimeSpan)> tryInfo)
{
return string.Join(Environment.NewLine, tryInfo.Select(FormatTryInfo).ToArray());
}
private static string FormatTryInfo((Exception, TimeSpan) info, int index)
{
return $"Attempt {index} took {FormatDuration(info.Item2)} and failed with exception {info.Item1}.";
}
private static Action<int> failedCallback = i => { };
public static void SetRetryFailedCallback(Action<int> onRetryFailed)
{
failedCallback = onRetryFailed;
Retry(action, maxTimeout, retryTime, description, f => { });
}
public static T Retry<T>(Func<T> action, TimeSpan maxTimeout, TimeSpan retryTime, string description)
{
var start = DateTime.UtcNow;
var tries = 1;
var exceptions = new List<Exception>();
return Retry(action, maxTimeout, retryTime, description, f => { });
}
while (true)
{
var duration = DateTime.UtcNow - start;
if (duration > maxTimeout)
{
throw new TimeoutException($"Retry '{description}' timed out after {tries} tries over {FormatDuration(duration)}.", new AggregateException(exceptions));
}
public static void Retry(Action action, TimeSpan maxTimeout, TimeSpan retryTime, string description, Action<Failure> onFail)
{
var r = new Retry(description, maxTimeout, retryTime, onFail);
r.Run(action);
}
try
{
return action();
}
catch (Exception ex)
{
exceptions.Add(ex);
failedCallback(tries);
tries++;
}
Sleep(retryTime);
}
public static T Retry<T>(Func<T> action, TimeSpan maxTimeout, TimeSpan retryTime, string description, Action<Failure> onFail)
{
var r = new Retry(description, maxTimeout, retryTime, onFail);
return r.Run(action);
}
}
}

View File

@ -9,7 +9,7 @@ namespace CodexPlugin
public class ApiChecker
{
// <INSERT-OPENAPI-YAML-HASH>
private const string OpenApiYamlHash = "0F-C8-02-1E-2C-2C-15-F6-91-6A-01-31-11-49-95-06-79-26-25-BF-27-3C-A8-2E-5F-7F-34-FD-C0-57-A0-9A";
private const string OpenApiYamlHash = "67-76-AB-FC-54-4F-EB-81-F5-E4-F8-27-DF-82-92-41-63-A5-EA-1B-17-14-0C-BE-20-9C-B3-DF-CE-E4-AA-38";
private const string OpenApiFilePath = "/codex/openapi.yaml";
private const string DisableEnvironmentVariable = "CODEXPLUGIN_DISABLE_APICHECK";

View File

@ -2,6 +2,7 @@
using Core;
using KubernetesWorkflow;
using KubernetesWorkflow.Types;
using Logging;
using Newtonsoft.Json;
using Utils;
@ -9,6 +10,7 @@ namespace CodexPlugin
{
public class CodexAccess : ILogHandler
{
private readonly ILog log;
private readonly IPluginTools tools;
private readonly Mapper mapper = new Mapper();
private bool hasContainerCrashed;
@ -16,6 +18,7 @@ namespace CodexPlugin
public CodexAccess(IPluginTools tools, RunningPod container, CrashWatcher crashWatcher)
{
this.tools = tools;
log = tools.GetLog();
Container = container;
CrashWatcher = crashWatcher;
hasContainerCrashed = false;
@ -34,20 +37,23 @@ namespace CodexPlugin
public DebugPeer GetDebugPeer(string peerId)
{
// Cannot use openAPI: debug/peer endpoint is not specified there.
var endpoint = GetEndpoint();
var str = endpoint.HttpGetString($"debug/peer/{peerId}");
if (str.ToLowerInvariant() == "unable to find peer!")
return CrashCheck(() =>
{
return new DebugPeer
{
IsPeerFound = false
};
}
var endpoint = GetEndpoint();
var str = endpoint.HttpGetString($"debug/peer/{peerId}");
var result = endpoint.Deserialize<DebugPeer>(str);
result.IsPeerFound = true;
return result;
if (str.ToLowerInvariant() == "unable to find peer!")
{
return new DebugPeer
{
IsPeerFound = false
};
}
var result = endpoint.Deserialize<DebugPeer>(str);
result.IsPeerFound = true;
return result;
});
}
public void ConnectToPeer(string peerId, string[] peerMultiAddresses)
@ -59,14 +65,19 @@ namespace CodexPlugin
});
}
public string UploadFile(FileStream fileStream)
public string UploadFile(FileStream fileStream, Action<Failure> onFailure)
{
return OnCodex(api => api.UploadAsync(fileStream));
return OnCodex(
api => api.UploadAsync(fileStream),
CreateRetryConfig(nameof(UploadFile), onFailure));
}
public Stream DownloadFile(string contentId)
public Stream DownloadFile(string contentId, Action<Failure> onFailure)
{
var fileResponse = OnCodex(api => api.DownloadNetworkAsync(contentId));
var fileResponse = OnCodex(
api => api.DownloadNetworkAsync(contentId),
CreateRetryConfig(nameof(DownloadFile), onFailure));
if (fileResponse.StatusCode != 200) throw new Exception("Download failed with StatusCode: " + fileResponse.StatusCode);
return fileResponse.Stream;
}
@ -89,15 +100,24 @@ namespace CodexPlugin
return OnCodex<string>(api => api.CreateStorageRequestAsync(request.ContentId.Id, body));
}
public CodexSpace Space()
{
var space = OnCodex<Space>(api => api.SpaceAsync());
return mapper.Map(space);
}
public StoragePurchase GetPurchaseStatus(string purchaseId)
{
var endpoint = GetEndpoint();
return Time.Retry(() =>
return CrashCheck(() =>
{
var str = endpoint.HttpGetString($"storage/purchases/{purchaseId}");
if (string.IsNullOrEmpty(str)) throw new Exception("Empty response.");
return JsonConvert.DeserializeObject<StoragePurchase>(str)!;
}, nameof(GetPurchaseStatus));
var endpoint = GetEndpoint();
return Time.Retry(() =>
{
var str = endpoint.HttpGetString($"storage/purchases/{purchaseId}");
if (string.IsNullOrEmpty(str)) throw new Exception("Empty response.");
return JsonConvert.DeserializeObject<StoragePurchase>(str)!;
}, nameof(GetPurchaseStatus));
});
// TODO: current getpurchase api does not line up with its openapi spec.
// return mapper.Map(OnCodex(api => api.GetPurchaseAsync(purchaseId)));
@ -114,19 +134,67 @@ namespace CodexPlugin
return workflow.GetPodInfo(Container);
}
public void LogDiskSpace(string msg)
{
try
{
var diskInfo = tools.CreateWorkflow().ExecuteCommand(Container.Containers.Single(), "df", "--sync");
Log($"{msg} - Disk info: {diskInfo}");
}
catch (Exception e)
{
Log("Failed to get disk info: " + e);
}
}
public void DeleteRepoFolder()
{
try
{
var containerNumber = Container.Containers.First().Recipe.Number;
var dataDir = $"datadir{containerNumber}";
var workflow = tools.CreateWorkflow();
workflow.ExecuteCommand(Container.Containers.First(), "rm", "-Rfv", $"/codex/{dataDir}/repo");
Log("Deleted repo folder.");
}
catch (Exception e)
{
Log("Unable to delete repo folder: " + e);
}
}
private T OnCodex<T>(Func<CodexApi, Task<T>> action)
{
var address = GetAddress();
var result = tools.CreateHttp(CheckContainerCrashed)
.OnClient(client =>
{
var api = new CodexApi(client);
api.BaseUrl = $"{address.Host}:{address.Port}/api/codex/v1";
return Time.Wait(action(api));
});
var result = tools.CreateHttp(CheckContainerCrashed).OnClient(client => CallCodex(client, action));
return result;
}
private T OnCodex<T>(Func<CodexApi, Task<T>> action, Retry retry)
{
var result = tools.CreateHttp(CheckContainerCrashed).OnClient(client => CallCodex(client, action), retry);
return result;
}
private T CallCodex<T>(HttpClient client, Func<CodexApi, Task<T>> action)
{
var address = GetAddress();
var api = new CodexApi(client);
api.BaseUrl = $"{address.Host}:{address.Port}/api/codex/v1";
return CrashCheck(() => Time.Wait(action(api)));
}
private T CrashCheck<T>(Func<T> action)
{
try
{
return action();
}
finally
{
CrashWatcher.HasContainerCrashed();
}
}
private IEndpoint GetEndpoint()
{
return tools
@ -136,20 +204,19 @@ namespace CodexPlugin
private Address GetAddress()
{
return Container.Containers.Single().GetAddress(tools.GetLog(), CodexContainerRecipe.ApiPortTag);
return Container.Containers.Single().GetAddress(log, CodexContainerRecipe.ApiPortTag);
}
private void CheckContainerCrashed(HttpClient client)
{
if (hasContainerCrashed) throw new Exception("Container has crashed.");
if (hasContainerCrashed) throw new Exception($"Container {GetName()} has crashed.");
}
public void Log(Stream crashLog)
void ILogHandler.Log(Stream crashLog)
{
var log = tools.GetLog();
var file = log.CreateSubfile();
log.Log($"Container {Container.Name} has crashed. Downloading crash log to '{file.FullFilename}'...");
file.Write($"Container Crash Log for {Container.Name}.");
Log($"Downloading log to '{file.FullFilename}'...");
file.Write($"Container log for {Container.Name}.");
using var reader = new StreamReader(crashLog);
var line = reader.ReadLine();
@ -159,8 +226,63 @@ namespace CodexPlugin
line = reader.ReadLine();
}
log.Log("Crash log successfully downloaded.");
Log("Container log successfully downloaded.");
hasContainerCrashed = true;
}
private Retry CreateRetryConfig(string description, Action<Failure> onFailure)
{
var timeSet = tools.TimeSet;
return new Retry(description, timeSet.HttpRetryTimeout(), timeSet.HttpCallRetryDelay(), failure =>
{
onFailure(failure);
Investigate(failure, timeSet);
});
}
private void Investigate(Failure failure, ITimeSet timeSet)
{
Log($"Retry {failure.TryNumber} took {Time.FormatDuration(failure.Duration)} and failed with '{failure.Exception}'. " +
$"(HTTP timeout = {Time.FormatDuration(timeSet.HttpCallTimeout())}) " +
$"Checking if node responds to debug/info...");
LogDiskSpace("After retry failure");
try
{
var debugInfo = GetDebugInfo();
if (string.IsNullOrEmpty(debugInfo.Spr))
{
Log("Did not get value debug/info response.");
Throw(failure);
}
else
{
Log("Got valid response from debug/info.");
}
}
catch (Exception ex)
{
Log("Got exception from debug/info call: " + ex);
Throw(failure);
}
if (failure.Duration < timeSet.HttpCallTimeout())
{
Log("Retry failed within HTTP timeout duration.");
Throw(failure);
}
}
private void Throw(Failure failure)
{
throw failure.Exception;
}
private void Log(string msg)
{
log.Log($"{GetName()} {msg}");
}
}
}

View File

@ -7,7 +7,7 @@ namespace CodexPlugin
{
public class CodexContainerRecipe : ContainerRecipeFactory
{
private const string DefaultDockerImage = "codexstorage/nim-codex:sha-267266a-dist-tests";
private const string DefaultDockerImage = "codexstorage/nim-codex:sha-b89493e-dist-tests";
public const string ApiPortTag = "codex_api_port";
public const string ListenPortTag = "codex_listen_port";
@ -120,7 +120,7 @@ namespace CodexPlugin
}
}
if(!string.IsNullOrEmpty(config.NameOverride))
if (!string.IsNullOrEmpty(config.NameOverride))
{
AddEnvVar("CODEX_NODENAME", config.NameOverride);
}
@ -160,7 +160,7 @@ namespace CodexPlugin
private ByteSize GetVolumeCapacity(CodexStartupConfig config)
{
if (config.StorageQuota != null) return config.StorageQuota;
if (config.StorageQuota != null) return config.StorageQuota.Multiply(1.2);
// Default Codex quota: 8 Gb, using +20% to be safe.
return 8.GB().Multiply(1.2);
}

View File

@ -15,8 +15,11 @@ namespace CodexPlugin
DebugInfo GetDebugInfo();
DebugPeer GetDebugPeer(string peerId);
ContentId UploadFile(TrackedFile file);
ContentId UploadFile(TrackedFile file, Action<Failure> onFailure);
TrackedFile? DownloadContent(ContentId contentId, string fileLabel = "");
TrackedFile? DownloadContent(ContentId contentId, Action<Failure> onFailure, string fileLabel = "");
LocalDatasetList LocalFiles();
CodexSpace Space();
void ConnectToPeer(ICodexNode node);
DebugInfoVersion Version { get; }
IMarketplaceAccess Marketplace { get; }
@ -24,6 +27,12 @@ namespace CodexPlugin
PodInfo GetPodInfo();
ITransferSpeeds TransferSpeeds { get; }
EthAccount EthAccount { get; }
/// <summary>
/// Warning! The node is not usable after this.
/// TODO: Replace with delete-blocks debug call once available in Codex.
/// </summary>
void DeleteRepoFolder();
void Stop(bool waitTillStopped);
}
@ -90,7 +99,7 @@ namespace CodexPlugin
{
var debugInfo = CodexAccess.GetDebugInfo();
var known = string.Join(",", debugInfo.Table.Nodes.Select(n => n.PeerId));
Log($"Got DebugInfo with id: '{debugInfo.Id}'. This node knows: {known}");
Log($"Got DebugInfo with id: {debugInfo.Id}. This node knows: [{known}]");
return debugInfo;
}
@ -101,13 +110,20 @@ namespace CodexPlugin
public ContentId UploadFile(TrackedFile file)
{
return UploadFile(file, DoNothing);
}
public ContentId UploadFile(TrackedFile file, Action<Failure> onFailure)
{
CodexAccess.LogDiskSpace("Before upload");
using var fileStream = File.OpenRead(file.Filename);
var logMessage = $"Uploading file {file.Describe()}...";
Log(logMessage);
var measurement = Stopwatch.Measure(tools.GetLog(), logMessage, () =>
{
return CodexAccess.UploadFile(fileStream);
return CodexAccess.UploadFile(fileStream, onFailure);
});
var response = measurement.Value;
@ -117,15 +133,22 @@ namespace CodexPlugin
if (response.StartsWith(UploadFailedMessage)) FrameworkAssert.Fail("Node failed to store block.");
Log($"Uploaded file. Received contentId: '{response}'.");
CodexAccess.LogDiskSpace("After upload");
return new ContentId(response);
}
public TrackedFile? DownloadContent(ContentId contentId, string fileLabel = "")
{
return DownloadContent(contentId, DoNothing, fileLabel);
}
public TrackedFile? DownloadContent(ContentId contentId, Action<Failure> onFailure, string fileLabel = "")
{
var logMessage = $"Downloading for contentId: '{contentId.Id}'...";
Log(logMessage);
var file = tools.GetFileManager().CreateEmptyFile(fileLabel);
var measurement = Stopwatch.Measure(tools.GetLog(), logMessage, () => DownloadToFile(contentId.Id, file));
var measurement = Stopwatch.Measure(tools.GetLog(), logMessage, () => DownloadToFile(contentId.Id, file, onFailure));
transferSpeeds.AddDownloadSample(file.GetFilesize(), measurement);
Log($"Downloaded file {file.Describe()} to '{file.Filename}'.");
return file;
@ -136,6 +159,11 @@ namespace CodexPlugin
return CodexAccess.LocalFiles();
}
public CodexSpace Space()
{
return CodexAccess.Space();
}
public void ConnectToPeer(ICodexNode node)
{
var peer = (CodexNode)node;
@ -152,15 +180,16 @@ namespace CodexPlugin
return CodexAccess.GetPodInfo();
}
public void DeleteRepoFolder()
{
CodexAccess.DeleteRepoFolder();
}
public void Stop(bool waitTillStopped)
{
Log("Stopping...");
CrashWatcher.Stop();
Group.Stop(this, waitTillStopped);
// if (Group.Count() > 1) throw new InvalidOperationException("Codex-nodes that are part of a group cannot be " +
// "individually shut down. Use 'BringOffline()' on the group object to stop the group. This method is only " +
// "available for codex-nodes in groups of 1.");
//
// Group.BringOffline(waitTillStopped);
}
public void EnsureOnlineGetVersionResponse()
@ -192,12 +221,14 @@ namespace CodexPlugin
.ToArray();
}
private void DownloadToFile(string contentId, TrackedFile file)
private void DownloadToFile(string contentId, TrackedFile file, Action<Failure> onFailure)
{
CodexAccess.LogDiskSpace("Before download");
using var fileStream = File.OpenWrite(file.Filename);
try
{
using var downloadStream = CodexAccess.DownloadFile(contentId);
using var downloadStream = CodexAccess.DownloadFile(contentId, onFailure);
downloadStream.CopyTo(fileStream);
}
catch
@ -205,6 +236,8 @@ namespace CodexPlugin
Log($"Failed to download file '{contentId}'.");
throw;
}
CodexAccess.LogDiskSpace("After download");
}
private void EnsureMarketplace()
@ -216,5 +249,9 @@ namespace CodexPlugin
{
tools.GetLog().Log($"{GetName()}: {msg}");
}
private void DoNothing(Failure failure)
{
}
}
}

View File

@ -33,7 +33,7 @@ namespace CodexPlugin
foreach (var rc in containers)
{
var podInfo = GetPodInfo(rc);
var podInfos = string.Join(", ", rc.Containers.Select(c => $"Container: '{c.Name}' runs at '{podInfo.K8SNodeName}'={podInfo.Ip}"));
var podInfos = string.Join(", ", rc.Containers.Select(c => $"Container: '{c.Name}' PodLabel: '{c.RunningPod.StartResult.Deployment.PodLabel}' runs at '{podInfo.K8SNodeName}'={podInfo.Ip}"));
Log($"Started {codexSetup.NumberOfNodes} nodes of image '{containers.First().Containers.First().Recipe.Image}'. ({podInfos})");
}
LogSeparator();

View File

@ -105,4 +105,18 @@ namespace CodexPlugin
return HashCode.Combine(Id);
}
}
public class CodexSpace
{
public long TotalBlocks { get; set; }
public long QuotaMaxBytes { get; set; }
public long QuotaUsedBytes { get; set; }
public long QuotaReservedBytes { get; set; }
public long FreeBytes => QuotaMaxBytes - (QuotaUsedBytes + QuotaReservedBytes);
public override string ToString()
{
return JsonConvert.SerializeObject(this);
}
}
}

View File

@ -1,4 +1,5 @@
using CodexContractsPlugin;
using CodexOpenApi;
using Newtonsoft.Json.Linq;
using System.Numerics;
using Utils;
@ -62,14 +63,47 @@ namespace CodexPlugin
};
}
public StoragePurchase Map(CodexOpenApi.Purchase purchase)
{
return new StoragePurchase
{
State = purchase.State,
Error = purchase.Error
};
}
// TODO: Fix openapi spec for this call.
//public StoragePurchase Map(CodexOpenApi.Purchase purchase)
//{
// return new StoragePurchase(Map(purchase.Request))
// {
// State = purchase.State,
// Error = purchase.Error
// };
//}
//public StorageRequest Map(CodexOpenApi.StorageRequest request)
//{
// return new StorageRequest(Map(request.Ask), Map(request.Content))
// {
// Id = request.Id,
// Client = request.Client,
// Expiry = TimeSpan.FromSeconds(Convert.ToInt64(request.Expiry)),
// Nonce = request.Nonce
// };
//}
//public StorageAsk Map(CodexOpenApi.StorageAsk ask)
//{
// return new StorageAsk
// {
// Duration = TimeSpan.FromSeconds(Convert.ToInt64(ask.Duration)),
// MaxSlotLoss = ask.MaxSlotLoss,
// ProofProbability = ask.ProofProbability,
// Reward = Convert.ToDecimal(ask.Reward).TstWei(),
// Slots = ask.Slots,
// SlotSize = new ByteSize(Convert.ToInt64(ask.SlotSize))
// };
//}
//public StorageContent Map(CodexOpenApi.Content content)
//{
// return new StorageContent
// {
// Cid = content.Cid
// };
//}
public StorageAvailability Map(CodexOpenApi.SalesAvailabilityREAD read)
{
@ -84,6 +118,17 @@ namespace CodexPlugin
};
}
public CodexSpace Map(Space space)
{
return new CodexSpace
{
QuotaMaxBytes = space.QuotaMaxBytes,
QuotaReservedBytes = space.QuotaReservedBytes,
QuotaUsedBytes = space.QuotaUsedBytes,
TotalBlocks = space.TotalBlocks
};
}
private DebugInfoVersion MapDebugInfoVersion(JObject obj)
{
return new DebugInfoVersion
@ -98,7 +143,7 @@ namespace CodexPlugin
return new DebugInfoTable
{
LocalNode = MapDebugInfoTableNode(obj.GetValue("localNode")),
Nodes = new DebugInfoTableNode[0]
Nodes = MapDebugInfoTableNodeArray(obj.GetValue("nodes") as JArray)
};
}
@ -117,6 +162,16 @@ namespace CodexPlugin
};
}
private DebugInfoTableNode[] MapDebugInfoTableNodeArray(JArray? nodes)
{
if (nodes == null || nodes.Count == 0)
{
return new DebugInfoTableNode[0];
}
return nodes.Select(MapDebugInfoTableNode).ToArray();
}
private Manifest MapManifest(CodexOpenApi.ManifestItem manifest)
{
return new Manifest

View File

@ -1,5 +1,7 @@
using CodexContractsPlugin;
using CodexOpenApi;
using Logging;
using System.Data;
using Utils;
namespace CodexPlugin
@ -37,6 +39,34 @@ namespace CodexPlugin
{
public string State { get; set; } = string.Empty;
public string Error { get; set; } = string.Empty;
public StorageRequest Request { get; set; } = null!;
}
public class StorageRequest
{
public string Id { get; set; } = string.Empty;
public string Client { get; set; } = string.Empty;
public StorageAsk Ask { get; set; } = null!;
public StorageContent Content { get; set; } = null!;
public string Expiry { get; set; } = string.Empty;
public string Nonce { get; set; } = string.Empty;
}
public class StorageAsk
{
public int Slots { get; set; }
public string SlotSize { get; set; } = string.Empty;
public string Duration { get; set; } = string.Empty;
public string ProofProbability { get; set; } = string.Empty;
public string Reward { get; set; } = string.Empty;
public int MaxSlotLoss { get; set; }
}
public class StorageContent
{
public string Cid { get; set; } = string.Empty;
//public ErasureParameters Erasure { get; set; }
//public PoRParameters Por { get; set; }
}
public class StorageAvailability

View File

@ -7,6 +7,8 @@ namespace CodexPlugin
public interface IStoragePurchaseContract
{
string PurchaseId { get; }
StoragePurchaseRequest Purchase { get; }
ContentId ContentId { get; }
void WaitForStorageContractSubmitted();
void WaitForStorageContractStarted();
void WaitForStorageContractFinished();
@ -28,10 +30,13 @@ namespace CodexPlugin
this.codexAccess = codexAccess;
PurchaseId = purchaseId;
Purchase = purchase;
ContentId = new ContentId(codexAccess.GetPurchaseStatus(purchaseId).Request.Content.Cid);
}
public string PurchaseId { get; }
public StoragePurchaseRequest Purchase { get; }
public ContentId ContentId { get; }
public TimeSpan? PendingToSubmitted => contractSubmittedUtc - contractPendingUtc;
public TimeSpan? SubmittedToStarted => contractStartedUtc - contractSubmittedUtc;

View File

@ -289,6 +289,7 @@ components:
description: "Root hash of the content"
originalBytes:
type: integer
format: int64
description: "Length of original content in bytes"
blockSize:
type: integer
@ -303,14 +304,18 @@ components:
totalBlocks:
description: "Number of blocks stored by the node"
type: integer
format: int64
quotaMaxBytes:
type: integer
format: int64
description: "Maximum storage space used by the node"
quotaUsedBytes:
type: integer
format: int64
description: "Amount of storage space currently in use"
quotaReservedBytes:
type: integer
format: int64
description: "Amount of storage space reserved"
servers:

View File

@ -1,4 +1,5 @@
using Core;
using IdentityModel;
using KubernetesWorkflow.Types;
using Logging;
using System.Globalization;
@ -177,6 +178,41 @@ namespace MetricsPlugin
{
return "[" + string.Join(',', Sets.Select(s => s.ToString())) + "]";
}
public string AsCsv()
{
var allTimestamps = Sets.SelectMany(s => s.Values.Select(v => v.Timestamp)).Distinct().OrderDescending().ToArray();
var lines = new List<string>();
MakeLine(lines, e =>
{
e.Add("Metrics");
foreach (var ts in allTimestamps) e.Add(ts.ToEpochTime().ToString());
});
foreach (var set in Sets)
{
MakeLine(lines, e =>
{
e.Add(set.Name);
foreach (var ts in allTimestamps)
{
var value = set.Values.SingleOrDefault(v => v.Timestamp == ts);
if (value == null) e.Add(" ");
else e.Add(value.Value.ToString());
}
});
}
return string.Join(Environment.NewLine, lines.ToArray());
}
private void MakeLine(List<string> lines, Action<List<string>> values)
{
var list = new List<string>();
values(list);
lines.Add(string.Join(",", list));
}
}
public class MetricsSet

View File

@ -148,7 +148,7 @@ namespace ContinuousTests
log.Log($"Clearing namespace '{test.CustomK8sNamespace}'...");
var entryPoint = entryPointFactory.CreateEntryPoint(config.KubeConfigFile, config.DataPath, test.CustomK8sNamespace, log);
entryPoint.Tools.CreateWorkflow().DeleteNamespacesStartingWith(test.CustomK8sNamespace);
entryPoint.Tools.CreateWorkflow().DeleteNamespacesStartingWith(test.CustomK8sNamespace, wait: true);
}
private void PerformCleanup(ILog log)
@ -157,7 +157,7 @@ namespace ContinuousTests
log.Log("Cleaning up test namespace...");
var entryPoint = entryPointFactory.CreateEntryPoint(config.KubeConfigFile, config.DataPath, config.CodexDeployment.Metadata.KubeNamespace, log);
entryPoint.Decommission(deleteKubernetesResources: true, deleteTrackedFiles: true);
entryPoint.Decommission(deleteKubernetesResources: true, deleteTrackedFiles: true, waitTillDone: true);
log.Log("Cleanup finished.");
}
}

View File

@ -64,7 +64,7 @@ namespace ContinuousTests
}
finally
{
entryPoint.Tools.CreateWorkflow().DeleteNamespace();
entryPoint.Tools.CreateWorkflow().DeleteNamespace(wait: false);
}
}

View File

@ -54,7 +54,8 @@ namespace ContinuousTests
entryPoint.Decommission(
deleteKubernetesResources: false, // This would delete the continuous test net.
deleteTrackedFiles: true
deleteTrackedFiles: true,
waitTillDone: false
);
runFinishedHandle.Set();
}

View File

@ -50,8 +50,6 @@ namespace CodexLongTests.BasicTests
var node = StartCodex(s => s.WithStorageQuota((size + 10).MB()));
Time.SetRetryFailedCallback(i => OnFailed(i, node));
var uploadStart = DateTime.UtcNow;
var cid = node.UploadFile(expectedFile);
var downloadStart = DateTime.UtcNow;
@ -62,17 +60,6 @@ namespace CodexLongTests.BasicTests
AssertTimeConstraint(uploadStart, downloadStart, downloadFinished, size);
}
private void OnFailed(int tries, ICodexNode node)
{
if (tries < 5) return;
if (tries % 10 == 0)
{
Log($"After try {tries}, downloading node log.");
Ci.DownloadLog(node);
}
}
private void AssertTimeConstraint(DateTime uploadStart, DateTime downloadStart, DateTime downloadFinished, long size)
{
float sizeInMB = size;

View File

@ -15,9 +15,9 @@ namespace CodexLongTests.DownloadConnectivityTests
[Values(10, 15, 20)] int numberOfNodes,
[Values(10, 100)] int sizeMBs)
{
for (var i = 0; i < numberOfNodes; i++) StartCodex();
var nodes = StartCodex(numberOfNodes);
CreatePeerDownloadTestHelpers().AssertFullDownloadInterconnectivity(GetAllOnlineCodexNodes(), sizeMBs.MB());
CreatePeerDownloadTestHelpers().AssertFullDownloadInterconnectivity(nodes, sizeMBs.MB());
}
}
}

View File

@ -17,12 +17,13 @@ public class ScalabilityTests : CodexDistTest
[Combinatorial]
[UseLongTimeouts]
[DontDownloadLogs]
[WaitForCleanup]
public void ShouldMaintainFileInNetwork(
[Values(10, 40, 80, 100)] int numberOfNodes,
[Values(100, 1000, 5000, 10000)] int fileSizeInMb
[Values(4, 5, 6)] int numberOfNodes, // TODO: include 10, 40, 80 and 100, not 5
[Values(4000, 5000, 6000, 7000, 8000, 9000, 10000)] int fileSizeInMb
)
{
var logLevel = CodexLogLevel.Info;
var logLevel = CodexLogLevel.Trace;
var bootstrap = StartCodex(s => s.WithLogLevel(logLevel));
var nodes = StartCodex(numberOfNodes - 1, s => s
@ -35,17 +36,27 @@ public class ScalabilityTests : CodexDistTest
var downloader = nodes.PickOneRandom();
var testFile = GenerateTestFile(fileSizeInMb.MB());
var contentId = uploader.UploadFile(testFile);
var downloadedFile = downloader.DownloadContent(contentId);
LogNodeStatus(uploader);
var contentId = uploader.UploadFile(testFile, f => LogNodeStatus(uploader));
LogNodeStatus(uploader);
LogNodeStatus(downloader);
var downloadedFile = downloader.DownloadContent(contentId, f => LogNodeStatus(downloader));
LogNodeStatus(downloader);
downloadedFile!.AssertIsEqual(testFile);
uploader.DeleteRepoFolder();
uploader.Stop(true);
var otherDownloader = nodes.PickOneRandom();
downloadedFile = otherDownloader.DownloadContent(contentId);
downloadedFile!.AssertIsEqual(testFile);
downloader.DeleteRepoFolder();
otherDownloader.DeleteRepoFolder();
}
/// <summary>
@ -57,6 +68,7 @@ public class ScalabilityTests : CodexDistTest
[Combinatorial]
[UseLongTimeouts]
[DontDownloadLogs]
[WaitForCleanup]
public void EveryoneGetsAFile(
[Values(10, 40, 80, 100)] int numberOfNodes,
[Values(100, 1000, 5000, 10000)] int fileSizeInMb

View File

@ -45,6 +45,9 @@ namespace CodexTests.BasicTests
metrics[0].AssertThat("libp2p_peers", Is.EqualTo(1));
metrics[1].AssertThat("libp2p_peers", Is.EqualTo(1));
LogNodeStatus(primary, metrics[0]);
LogNodeStatus(primary2, metrics[1]);
}
[Test]

View File

@ -85,6 +85,46 @@ namespace CodexTests.BasicTests
Assert.That(contracts.GetRequestState(request), Is.EqualTo(RequestState.Finished));
}
[Test]
public void CanDownloadContentFromContractCid()
{
var fileSize = 10.MB();
var geth = Ci.StartGethNode(s => s.IsMiner().WithName("disttest-geth"));
var contracts = Ci.StartCodexContracts(geth);
var testFile = GenerateTestFile(fileSize);
var client = StartCodex(s => s
.WithName("Client")
.EnableMarketplace(geth, contracts, m => m
.WithInitial(10.Eth(), 10.Tst())));
var uploadCid = client.UploadFile(testFile);
var purchase = new StoragePurchaseRequest(uploadCid)
{
PricePerSlotPerSecond = 2.TstWei(),
RequiredCollateral = 10.TstWei(),
MinRequiredNumberOfNodes = 5,
NodeFailureTolerance = 2,
ProofProbability = 5,
Duration = TimeSpan.FromMinutes(5),
Expiry = TimeSpan.FromMinutes(4)
};
var purchaseContract = client.Marketplace.RequestStorage(purchase);
var contractCid = purchaseContract.ContentId;
Assert.That(uploadCid.Id, Is.Not.EqualTo(contractCid.Id));
// Download both from client.
testFile.AssertIsEqual(client.DownloadContent(uploadCid));
testFile.AssertIsEqual(client.DownloadContent(contractCid));
// Download both from another node.
var downloader = StartCodex(s => s.WithName("Downloader"));
testFile.AssertIsEqual(downloader.DownloadContent(uploadCid));
testFile.AssertIsEqual(downloader.DownloadContent(contractCid));
}
private void WaitForAllSlotFilledEvents(ICodexContracts contracts, StoragePurchaseRequest purchase, IGethNode geth)
{
Time.Retry(() =>
@ -92,9 +132,9 @@ namespace CodexTests.BasicTests
var events = contracts.GetEvents(GetTestRunTimeRange());
var slotFilledEvents = events.GetSlotFilledEvents();
Debug($"SlotFilledEvents: {slotFilledEvents.Length} - NumSlots: {purchase.MinRequiredNumberOfNodes}");
if (slotFilledEvents.Length != purchase.MinRequiredNumberOfNodes) throw new Exception();
var msg = $"SlotFilledEvents: {slotFilledEvents.Length} - NumSlots: {purchase.MinRequiredNumberOfNodes}";
Debug(msg);
if (slotFilledEvents.Length != purchase.MinRequiredNumberOfNodes) throw new Exception(msg);
}, purchase.Expiry + TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(5), "Checking SlotFilled events");
}

View File

@ -13,6 +13,8 @@ namespace CodexTests.BasicTests
var primary = StartCodex();
PerformOneClientTest(primary);
LogNodeStatus(primary);
}
[Test]

View File

@ -6,14 +6,14 @@ using Core;
using DistTestCore;
using DistTestCore.Helpers;
using DistTestCore.Logs;
using MetricsPlugin;
using Newtonsoft.Json;
using NUnit.Framework.Constraints;
namespace CodexTests
{
public class CodexDistTest : DistTest
{
private readonly Dictionary<TestLifecycle, List<ICodexNode>> onlineCodexNodes = new Dictionary<TestLifecycle, List<ICodexNode>>();
public CodexDistTest()
{
ProjectPlugin.Load<CodexPlugin.CodexPlugin>();
@ -29,16 +29,6 @@ namespace CodexTests
localBuilder.Build();
}
protected override void LifecycleStart(TestLifecycle lifecycle)
{
onlineCodexNodes.Add(lifecycle, new List<ICodexNode>());
}
protected override void LifecycleStop(TestLifecycle lifecycle)
{
onlineCodexNodes.Remove(lifecycle);
}
public ICodexNode StartCodex()
{
return StartCodex(s => { });
@ -61,7 +51,7 @@ namespace CodexTests
setup(s);
OnCodexSetup(s);
});
onlineCodexNodes[Get()].AddRange(group);
return group;
}
@ -75,11 +65,6 @@ namespace CodexTests
return new PeerDownloadTestHelpers(GetTestLog(), Get().GetFileManager());
}
public IEnumerable<ICodexNode> GetAllOnlineCodexNodes()
{
return onlineCodexNodes[Get()];
}
public void AssertBalance(ICodexContracts contracts, ICodexNode codexNode, Constraint constraint, string msg = "")
{
AssertHelpers.RetryAssert(constraint, () => contracts.GetTestTokenBalance(codexNode), nameof(AssertBalance) + msg);
@ -99,17 +84,29 @@ namespace CodexTests
log.AssertLogDoesNotContain("ERR ");
}
public void LogNodeStatus(ICodexNode node, IMetricsAccess? metrics = null)
{
Log("Status for " + node.GetName() + Environment.NewLine +
GetBasicNodeStatus(node));
}
private string GetBasicNodeStatus(ICodexNode node)
{
return JsonConvert.SerializeObject(node.GetDebugInfo(), Formatting.Indented) + Environment.NewLine +
node.Space().ToString() + Environment.NewLine;
}
// Disabled for now: Makes huge log files!
//private string GetNodeMetrics(IMetricsAccess? metrics)
//{
// if (metrics == null) return "No metrics enabled";
// var m = metrics.GetAllMetrics();
// if (m == null) return "No metrics received";
// return m.AsCsv();
//}
protected virtual void OnCodexSetup(ICodexSetup setup)
{
}
protected override void CollectStatusLogData(TestLifecycle lifecycle, Dictionary<string, string> data)
{
var nodes = onlineCodexNodes[lifecycle];
var upload = nodes.Select(n => n.TransferSpeeds.GetUploadSpeed()).ToList()!.OptionalAverage();
var download = nodes.Select(n => n.TransferSpeeds.GetDownloadSpeed()).ToList()!.OptionalAverage();
if (upload != null) data.Add("avgupload", upload.ToString());
if (download != null) data.Add("avgdownload", download.ToString());
}
}
}

View File

@ -1,4 +1,5 @@
using CodexContractsPlugin;
using CodexPlugin;
using GethPlugin;
using NUnit.Framework;
using Utils;
@ -11,9 +12,9 @@ namespace CodexTests.DownloadConnectivityTests
[Test]
public void MetricsDoesNotInterfereWithPeerDownload()
{
StartCodex(2, s => s.EnableMetrics());
var nodes = StartCodex(2, s => s.EnableMetrics());
AssertAllNodesConnected();
AssertAllNodesConnected(nodes);
}
[Test]
@ -21,10 +22,10 @@ namespace CodexTests.DownloadConnectivityTests
{
var geth = Ci.StartGethNode(s => s.IsMiner());
var contracts = Ci.StartCodexContracts(geth);
StartCodex(2, s => s.EnableMarketplace(geth, contracts, m => m
var nodes = StartCodex(2, s => s.EnableMarketplace(geth, contracts, m => m
.WithInitial(10.Eth(), 1000.TstWei())));
AssertAllNodesConnected();
AssertAllNodesConnected(nodes);
}
[Test]
@ -33,14 +34,14 @@ namespace CodexTests.DownloadConnectivityTests
[Values(2, 5)] int numberOfNodes,
[Values(1, 10)] int sizeMBs)
{
StartCodex(numberOfNodes);
var nodes = StartCodex(numberOfNodes);
AssertAllNodesConnected(sizeMBs);
AssertAllNodesConnected(nodes, sizeMBs);
}
private void AssertAllNodesConnected(int sizeMBs = 10)
private void AssertAllNodesConnected(IEnumerable<ICodexNode> nodes, int sizeMBs = 10)
{
CreatePeerDownloadTestHelpers().AssertFullDownloadInterconnectivity(GetAllOnlineCodexNodes(), sizeMBs.MB());
CreatePeerDownloadTestHelpers().AssertFullDownloadInterconnectivity(nodes, sizeMBs.MB());
}
}
}

View File

@ -1,4 +1,5 @@
using NUnit.Framework;
using CodexPlugin;
using NUnit.Framework;
namespace CodexTests.PeerDiscoveryTests
{
@ -13,7 +14,7 @@ namespace CodexTests.PeerDiscoveryTests
var l1Node = StartCodex(s => s.WithBootstrapNode(root));
var l2Target = StartCodex(s => s.WithBootstrapNode(l1Node));
AssertAllNodesConnected();
AssertAllNodesConnected(root, l1Source, l1Node, l2Target);
}
[Test]
@ -25,7 +26,7 @@ namespace CodexTests.PeerDiscoveryTests
var l2Node = StartCodex(s => s.WithBootstrapNode(l1Node));
var l3Target = StartCodex(s => s.WithBootstrapNode(l2Node));
AssertAllNodesConnected();
AssertAllNodesConnected(root, l1Source, l1Node, l2Node, l3Target);
}
[TestCase(3)]
@ -33,18 +34,22 @@ namespace CodexTests.PeerDiscoveryTests
[TestCase(10)]
public void NodeChainTest(int chainLength)
{
var nodes = new List<ICodexNode>();
var node = StartCodex();
nodes.Add(node);
for (var i = 1; i < chainLength; i++)
{
node = StartCodex(s => s.WithBootstrapNode(node));
nodes.Add(node);
}
AssertAllNodesConnected();
AssertAllNodesConnected(nodes.ToArray());
}
private void AssertAllNodesConnected()
private void AssertAllNodesConnected(params ICodexNode[] nodes)
{
CreatePeerConnectionTestHelpers().AssertFullyConnected(GetAllOnlineCodexNodes());
CreatePeerConnectionTestHelpers().AssertFullyConnected(nodes);
}
}
}

View File

@ -21,9 +21,9 @@ namespace CodexTests.PeerDiscoveryTests
[Test]
public void MetricsDoesNotInterfereWithPeerDiscovery()
{
StartCodex(2, s => s.EnableMetrics());
var nodes = StartCodex(2, s => s.EnableMetrics());
AssertAllNodesConnected();
AssertAllNodesConnected(nodes);
}
[Test]
@ -31,10 +31,10 @@ namespace CodexTests.PeerDiscoveryTests
{
var geth = Ci.StartGethNode(s => s.IsMiner());
var contracts = Ci.StartCodexContracts(geth);
StartCodex(2, s => s.EnableMarketplace(geth, contracts, m => m
var nodes = StartCodex(2, s => s.EnableMarketplace(geth, contracts, m => m
.WithInitial(10.Eth(), 1000.TstWei())));
AssertAllNodesConnected();
AssertAllNodesConnected(nodes);
}
[TestCase(2)]
@ -42,16 +42,15 @@ namespace CodexTests.PeerDiscoveryTests
[TestCase(10)]
public void VariableNodes(int number)
{
StartCodex(number);
var nodes = StartCodex(number);
AssertAllNodesConnected();
AssertAllNodesConnected(nodes);
}
private void AssertAllNodesConnected()
private void AssertAllNodesConnected(IEnumerable<ICodexNode> nodes)
{
var allNodes = GetAllOnlineCodexNodes();
CreatePeerConnectionTestHelpers().AssertFullyConnected(allNodes);
CheckRoutingTable(allNodes);
CreatePeerConnectionTestHelpers().AssertFullyConnected(nodes);
CheckRoutingTable(nodes);
}
private void CheckRoutingTable(IEnumerable<ICodexNode> allNodes)

View File

@ -52,7 +52,7 @@ namespace DistTestCore
{
Stopwatch.Measure(fixtureLog, "Global setup", () =>
{
globalEntryPoint.Tools.CreateWorkflow().DeleteNamespacesStartingWith(TestNamespacePrefix);
globalEntryPoint.Tools.CreateWorkflow().DeleteNamespacesStartingWith(TestNamespacePrefix, wait: true);
});
}
catch (Exception ex)
@ -72,7 +72,8 @@ namespace DistTestCore
globalEntryPoint.Decommission(
// There shouldn't be any of either, but clean everything up regardless.
deleteKubernetesResources: true,
deleteTrackedFiles: true
deleteTrackedFiles: true,
waitTillDone: true
);
}
@ -185,7 +186,13 @@ namespace DistTestCore
lock (lifecycleLock)
{
var testNamespace = TestNamespacePrefix + Guid.NewGuid().ToString();
var lifecycle = new TestLifecycle(fixtureLog.CreateTestLog(), configuration, GetTimeSet(), testNamespace, deployId);
var lifecycle = new TestLifecycle(
fixtureLog.CreateTestLog(),
configuration,
GetTimeSet(),
testNamespace,
deployId,
ShouldWaitForCleanup());
lifecycles.Add(testName, lifecycle);
LifecycleStart(lifecycle);
}
@ -208,7 +215,7 @@ namespace DistTestCore
IncludeLogsOnTestFailure(lifecycle);
LifecycleStop(lifecycle);
lifecycle.DeleteAllResources();
lifecycle = null!;
lifecycles.Remove(GetCurrentTestName());
});
}
@ -235,6 +242,11 @@ namespace DistTestCore
return new DefaultTimeSet();
}
private bool ShouldWaitForCleanup()
{
return CurrentTestMethodHasAttribute<WaitForCleanupAttribute>();
}
private bool ShouldUseLongTimeouts()
{
return CurrentTestMethodHasAttribute<UseLongTimeoutsAttribute>();

View File

@ -16,7 +16,7 @@ namespace DistTestCore
private readonly List<RunningPod> runningContainers = new();
private readonly string deployId;
public TestLifecycle(TestLog log, Configuration configuration, ITimeSet timeSet, string testNamespace, string deployId)
public TestLifecycle(TestLog log, Configuration configuration, ITimeSet timeSet, string testNamespace, string deployId, bool waitForCleanup)
{
Log = log;
Configuration = configuration;
@ -27,7 +27,7 @@ namespace DistTestCore
metadata = entryPoint.GetPluginMetadata();
CoreInterface = entryPoint.CreateInterface();
this.deployId = deployId;
WaitForCleanup = waitForCleanup;
log.WriteLogTag();
}
@ -35,13 +35,15 @@ namespace DistTestCore
public TestLog Log { get; }
public Configuration Configuration { get; }
public ITimeSet TimeSet { get; }
public bool WaitForCleanup { get; }
public CoreInterface CoreInterface { get; }
public void DeleteAllResources()
{
entryPoint.Decommission(
deleteKubernetesResources: true,
deleteTrackedFiles: true
deleteTrackedFiles: true,
waitTillDone: WaitForCleanup
);
}

View File

@ -0,0 +1,15 @@
using NUnit.Framework;
namespace DistTestCore
{
/// <summary>
/// By default, test system does not wait until all resources are destroyed before starting the
/// next test. This saves a lot of time but it's not always what you want.
/// If you want to be sure the resources of your test are destroyed before the next test starts,
/// add this attribute to your test method.
/// </summary>
[AttributeUsage(AttributeTargets.Method, AllowMultiple = false)]
public class WaitForCleanupAttribute : PropertyAttribute
{
}
}