diff --git a/Framework/Core/EntryPoint.cs b/Framework/Core/EntryPoint.cs index 7977eb3..0db40f3 100644 --- a/Framework/Core/EntryPoint.cs +++ b/Framework/Core/EntryPoint.cs @@ -38,10 +38,14 @@ namespace Core return new CoreInterface(this); } - public void Decommission(bool deleteKubernetesResources, bool deleteTrackedFiles) + /// + /// Deletes kubernetes and tracked file resources. + /// when `waitTillDone` is true, this function will block until resources are deleted. + /// + public void Decommission(bool deleteKubernetesResources, bool deleteTrackedFiles, bool waitTillDone) { - manager.DecommissionPlugins(deleteKubernetesResources, deleteTrackedFiles); - Tools.Decommission(deleteKubernetesResources, deleteTrackedFiles); + manager.DecommissionPlugins(deleteKubernetesResources, deleteTrackedFiles, waitTillDone); + Tools.Decommission(deleteKubernetesResources, deleteTrackedFiles, waitTillDone); } internal T GetPlugin() where T : IProjectPlugin diff --git a/Framework/Core/Http.cs b/Framework/Core/Http.cs index 49b3b09..44cb3c6 100644 --- a/Framework/Core/Http.cs +++ b/Framework/Core/Http.cs @@ -7,6 +7,7 @@ namespace Core { T OnClient(Func action); T OnClient(Func action, string description); + T OnClient(Func action, Retry retry); IEndpoint CreateEndpoint(Address address, string baseUrl, string? logAlias = null); } @@ -35,13 +36,19 @@ namespace Core } public T OnClient(Func action, string description) + { + var retry = new Retry(description, timeSet.HttpRetryTimeout(), timeSet.HttpCallRetryDelay(), f => { }); + return OnClient(action, retry); + } + + public T OnClient(Func action, Retry retry) { var client = GetClient(); return LockRetry(() => { return action(client); - }, description); + }, retry); } public IEndpoint CreateEndpoint(Address address, string baseUrl, string? logAlias = null) @@ -54,11 +61,11 @@ namespace Core return DebugStack.GetCallerName(skipFrames: 2); } - private T LockRetry(Func operation, string description) + private T LockRetry(Func operation, Retry retry) { lock (httpLock) { - return Time.Retry(operation, timeSet.HttpRetryTimeout(), timeSet.HttpCallRetryDelay(), description); + return retry.Run(operation); } } diff --git a/Framework/Core/PluginManager.cs b/Framework/Core/PluginManager.cs index e2b2a5c..27b08fe 100644 --- a/Framework/Core/PluginManager.cs +++ b/Framework/Core/PluginManager.cs @@ -34,12 +34,12 @@ return metadata; } - internal void DecommissionPlugins(bool deleteKubernetesResources, bool deleteTrackedFiles) + internal void DecommissionPlugins(bool deleteKubernetesResources, bool deleteTrackedFiles, bool waitTillDone) { foreach (var pair in pairs) { pair.Plugin.Decommission(); - pair.Tools.Decommission(deleteKubernetesResources, deleteTrackedFiles); + pair.Tools.Decommission(deleteKubernetesResources, deleteTrackedFiles, waitTillDone); } } diff --git a/Framework/Core/PluginTools.cs b/Framework/Core/PluginTools.cs index 5e1faee..aa8e0a6 100644 --- a/Framework/Core/PluginTools.cs +++ b/Framework/Core/PluginTools.cs @@ -6,7 +6,13 @@ namespace Core { public interface IPluginTools : IWorkflowTool, ILogTool, IHttpFactoryTool, IFileTool { - void Decommission(bool deleteKubernetesResources, bool deleteTrackedFiles); + ITimeSet TimeSet { get; } + + /// + /// Deletes kubernetes and tracked file resources. + /// when `waitTillDone` is true, this function will block until resources are deleted. + /// + void Decommission(bool deleteKubernetesResources, bool deleteTrackedFiles, bool waitTillDone); } public interface IWorkflowTool @@ -33,7 +39,6 @@ namespace Core internal class PluginTools : IPluginTools { - private readonly ITimeSet timeSet; private readonly WorkflowCreator workflowCreator; private readonly IFileManager fileManager; private readonly LogPrefixer log; @@ -42,10 +47,12 @@ namespace Core { this.log = new LogPrefixer(log); this.workflowCreator = workflowCreator; - this.timeSet = timeSet; + TimeSet = timeSet; fileManager = new FileManager(log, fileManagerRootFolder); } + public ITimeSet TimeSet { get; } + public void ApplyLogPrefix(string prefix) { log.Prefix = prefix; @@ -53,7 +60,7 @@ namespace Core public IHttp CreateHttp(Action onClientCreated) { - return CreateHttp(onClientCreated, timeSet); + return CreateHttp(onClientCreated, TimeSet); } public IHttp CreateHttp(Action onClientCreated, ITimeSet ts) @@ -63,7 +70,7 @@ namespace Core public IHttp CreateHttp() { - return new Http(log, timeSet); + return new Http(log, TimeSet); } public IStartupWorkflow CreateWorkflow(string? namespaceOverride = null) @@ -71,9 +78,9 @@ namespace Core return workflowCreator.CreateWorkflow(namespaceOverride); } - public void Decommission(bool deleteKubernetesResources, bool deleteTrackedFiles) + public void Decommission(bool deleteKubernetesResources, bool deleteTrackedFiles, bool waitTillDone) { - if (deleteKubernetesResources) CreateWorkflow().DeleteNamespace(); + if (deleteKubernetesResources) CreateWorkflow().DeleteNamespace(waitTillDone); if (deleteTrackedFiles) fileManager.DeleteAllFiles(); } diff --git a/Framework/KubernetesWorkflow/ByteSizeExtensions.cs b/Framework/KubernetesWorkflow/ByteSizeExtensions.cs deleted file mode 100644 index c3cca24..0000000 --- a/Framework/KubernetesWorkflow/ByteSizeExtensions.cs +++ /dev/null @@ -1,41 +0,0 @@ -using Utils; - -namespace KubernetesWorkflow -{ - public static class ByteSizeExtensions - { - public static string ToSuffixNotation(this ByteSize b) - { - long x = 1024; - var map = new Dictionary - { - { Pow(x, 4), "Ti" }, - { Pow(x, 3), "Gi" }, - { Pow(x, 2), "Mi" }, - { (x), "Ki" }, - }; - - var bytes = b.SizeInBytes; - foreach (var pair in map) - { - if (bytes > pair.Key) - { - double bytesD = bytes; - double divD = pair.Key; - double numD = Math.Ceiling(bytesD / divD); - var v = Convert.ToInt64(numD); - return $"{v}{pair.Value}"; - } - } - - return $"{bytes}"; - } - - private static long Pow(long x, int v) - { - long result = 1; - for (var i = 0; i < v; i++) result *= x; - return result; - } - } -} diff --git a/Framework/KubernetesWorkflow/CrashWatcher.cs b/Framework/KubernetesWorkflow/CrashWatcher.cs index 5cb2e03..57e30eb 100644 --- a/Framework/KubernetesWorkflow/CrashWatcher.cs +++ b/Framework/KubernetesWorkflow/CrashWatcher.cs @@ -50,7 +50,9 @@ namespace KubernetesWorkflow public bool HasContainerCrashed() { using var client = new Kubernetes(config); - return HasContainerBeenRestarted(client); + var result = HasContainerBeenRestarted(client); + if (result) DownloadCrashedContainerLogs(client); + return result; } private void Worker() @@ -83,12 +85,13 @@ namespace KubernetesWorkflow private bool HasContainerBeenRestarted(Kubernetes client) { var podInfo = client.ReadNamespacedPod(podName, k8sNamespace); - return podInfo.Status.ContainerStatuses.Any(c => c.RestartCount > 0); + var result = podInfo.Status.ContainerStatuses.Any(c => c.RestartCount > 0); + if (result) log.Log("Pod crash detected for " + containerName); + return result; } private void DownloadCrashedContainerLogs(Kubernetes client) { - log.Log("Pod crash detected for " + containerName); using var stream = client.ReadNamespacedPodLog(podName, k8sNamespace, recipeName, previous: true); logHandler!.Log(stream); } diff --git a/Framework/KubernetesWorkflow/K8sCluster.cs b/Framework/KubernetesWorkflow/K8sCluster.cs index 366855f..be87e1f 100644 --- a/Framework/KubernetesWorkflow/K8sCluster.cs +++ b/Framework/KubernetesWorkflow/K8sCluster.cs @@ -16,6 +16,7 @@ namespace KubernetesWorkflow { var config = GetConfig(); UpdateHostAddress(config); + config.SkipTlsVerify = true; // Required for operation on Wings cluster. return config; } diff --git a/Framework/KubernetesWorkflow/K8sController.cs b/Framework/KubernetesWorkflow/K8sController.cs index 0f50d87..00c7563 100644 --- a/Framework/KubernetesWorkflow/K8sController.cs +++ b/Framework/KubernetesWorkflow/K8sController.cs @@ -115,7 +115,7 @@ namespace KubernetesWorkflow }); } - public void DeleteAllNamespacesStartingWith(string prefix) + public void DeleteAllNamespacesStartingWith(string prefix, bool wait) { log.Debug(); @@ -124,25 +124,28 @@ namespace KubernetesWorkflow foreach (var ns in namespaces) { - DeleteNamespace(ns); + DeleteNamespace(ns, wait); } } - public void DeleteNamespace() + public void DeleteNamespace(bool wait) { log.Debug(); if (IsNamespaceOnline(K8sNamespace)) { client.Run(c => c.DeleteNamespace(K8sNamespace, null, null, gracePeriodSeconds: 0)); + + if (wait) WaitUntilNamespaceDeleted(K8sNamespace); } } - public void DeleteNamespace(string ns) + public void DeleteNamespace(string ns, bool wait) { log.Debug(); if (IsNamespaceOnline(ns)) { client.Run(c => c.DeleteNamespace(ns, null, null, gracePeriodSeconds: 0)); + if (wait) WaitUntilNamespaceDeleted(ns); } } @@ -532,7 +535,7 @@ namespace KubernetesWorkflow } if (set.Memory.SizeInBytes != 0) { - result.Add("memory", new ResourceQuantity(set.Memory.ToSuffixNotation())); + result.Add("memory", new ResourceQuantity(set.Memory.SizeInBytes.ToString())); } return result; } @@ -871,6 +874,11 @@ namespace KubernetesWorkflow WaitUntil(() => IsNamespaceOnline(K8sNamespace), nameof(WaitUntilNamespaceCreated)); } + private void WaitUntilNamespaceDeleted(string @namespace) + { + WaitUntil(() => !IsNamespaceOnline(@namespace), nameof(WaitUntilNamespaceDeleted)); + } + private void WaitUntilDeploymentOnline(string deploymentName) { WaitUntil(() => diff --git a/Framework/KubernetesWorkflow/Recipe/ContainerRecipeFactory.cs b/Framework/KubernetesWorkflow/Recipe/ContainerRecipeFactory.cs index 6b6ae2d..2c42143 100644 --- a/Framework/KubernetesWorkflow/Recipe/ContainerRecipeFactory.cs +++ b/Framework/KubernetesWorkflow/Recipe/ContainerRecipeFactory.cs @@ -105,7 +105,7 @@ namespace KubernetesWorkflow.Recipe protected void AddVolume(string name, string mountPath, string? subPath = null, string? secret = null, string? hostPath = null) { - var size = 10.MB().ToSuffixNotation(); + var size = 10.MB().SizeInBytes.ToString(); volumeMounts.Add(new VolumeMount(name, mountPath, subPath, size, secret, hostPath)); } @@ -114,7 +114,7 @@ namespace KubernetesWorkflow.Recipe volumeMounts.Add(new VolumeMount( $"autovolume-{Guid.NewGuid().ToString().ToLowerInvariant()}", mountPath, - resourceQuantity: volumeSize.ToSuffixNotation())); + resourceQuantity: volumeSize.SizeInBytes.ToString())); } protected void Additional(object userData) diff --git a/Framework/KubernetesWorkflow/RunningPod.cs b/Framework/KubernetesWorkflow/RunningPod.cs index e86b2aa..43a05cc 100644 --- a/Framework/KubernetesWorkflow/RunningPod.cs +++ b/Framework/KubernetesWorkflow/RunningPod.cs @@ -1,8 +1,5 @@ -using k8s; -using k8s.Models; -using KubernetesWorkflow.Recipe; +using KubernetesWorkflow.Recipe; using KubernetesWorkflow.Types; -using Newtonsoft.Json; namespace KubernetesWorkflow { diff --git a/Framework/KubernetesWorkflow/StartupWorkflow.cs b/Framework/KubernetesWorkflow/StartupWorkflow.cs index cda6148..3a7326d 100644 --- a/Framework/KubernetesWorkflow/StartupWorkflow.cs +++ b/Framework/KubernetesWorkflow/StartupWorkflow.cs @@ -17,8 +17,8 @@ namespace KubernetesWorkflow void Stop(RunningPod pod, bool waitTillStopped); void DownloadContainerLog(RunningContainer container, ILogHandler logHandler, int? tailLines = null); string ExecuteCommand(RunningContainer container, string command, params string[] args); - void DeleteNamespace(); - void DeleteNamespacesStartingWith(string namespacePrefix); + void DeleteNamespace(bool wait); + void DeleteNamespacesStartingWith(string namespacePrefix, bool wait); } public class StartupWorkflow : IStartupWorkflow @@ -122,19 +122,19 @@ namespace KubernetesWorkflow }); } - public void DeleteNamespace() + public void DeleteNamespace(bool wait) { K8s(controller => { - controller.DeleteNamespace(); + controller.DeleteNamespace(wait); }); } - public void DeleteNamespacesStartingWith(string namespacePrefix) + public void DeleteNamespacesStartingWith(string namespacePrefix, bool wait) { K8s(controller => { - controller.DeleteAllNamespacesStartingWith(namespacePrefix); + controller.DeleteAllNamespacesStartingWith(namespacePrefix, wait); }); } diff --git a/Framework/Utils/Retry.cs b/Framework/Utils/Retry.cs new file mode 100644 index 0000000..ec05ea6 --- /dev/null +++ b/Framework/Utils/Retry.cs @@ -0,0 +1,131 @@ +namespace Utils +{ + public class Retry + { + private readonly string description; + private readonly TimeSpan maxTimeout; + private readonly TimeSpan sleepAfterFail; + private readonly Action onFail; + + public Retry(string description, TimeSpan maxTimeout, TimeSpan sleepAfterFail, Action onFail) + { + this.description = description; + this.maxTimeout = maxTimeout; + this.sleepAfterFail = sleepAfterFail; + this.onFail = onFail; + } + + public void Run(Action task) + { + var run = new RetryRun(description, task, maxTimeout, sleepAfterFail, onFail); + run.Run(); + } + + public T Run(Func task) + { + T? result = default; + + var run = new RetryRun(description, () => + { + result = task(); + }, maxTimeout, sleepAfterFail, onFail); + run.Run(); + + return result!; + } + + private class RetryRun + { + private readonly string description; + private readonly Action task; + private readonly TimeSpan maxTimeout; + private readonly TimeSpan sleepAfterFail; + private readonly Action onFail; + private readonly DateTime start = DateTime.UtcNow; + private readonly List failures = new List(); + private int tryNumber; + private DateTime tryStart; + + public RetryRun(string description, Action task, TimeSpan maxTimeout, TimeSpan sleepAfterFail, Action onFail) + { + this.description = description; + this.task = task; + this.maxTimeout = maxTimeout; + this.sleepAfterFail = sleepAfterFail; + this.onFail = onFail; + + tryNumber = 0; + tryStart = DateTime.UtcNow; + } + + public void Run() + { + while (true) + { + CheckMaximums(); + + tryNumber++; + tryStart = DateTime.UtcNow; + try + { + task(); + return; + } + catch (Exception ex) + { + var failure = CaptureFailure(ex); + onFail(failure); + Time.Sleep(sleepAfterFail); + } + } + } + + private Failure CaptureFailure(Exception ex) + { + var f = new Failure(ex, DateTime.UtcNow - tryStart, tryNumber); + failures.Add(f); + return f; + } + + private void CheckMaximums() + { + if (Duration() > maxTimeout) Fail(); + } + + private void Fail() + { + throw new TimeoutException($"Retry '{description}' timed out after {tryNumber} tries over {Time.FormatDuration(Duration())}: {GetFailureReport}", + new AggregateException(failures.Select(f => f.Exception))); + } + + private string GetFailureReport() + { + return Environment.NewLine + string.Join(Environment.NewLine, failures.Select(f => f.Describe())); + } + + private TimeSpan Duration() + { + return DateTime.UtcNow - start; + } + } + } + + public class Failure + { + public Failure(Exception exception, TimeSpan duration, int tryNumber) + { + Exception = exception; + Duration = duration; + TryNumber = tryNumber; + } + + public Exception Exception { get; } + public TimeSpan Duration { get; } + public int TryNumber { get; } + + public string Describe() + { + return $"Try {TryNumber} failed after {Time.FormatDuration(Duration)} with exception '{Exception}'"; + } + } +} diff --git a/Framework/Utils/Time.cs b/Framework/Utils/Time.cs index 10caaf4..42051af 100644 --- a/Framework/Utils/Time.cs +++ b/Framework/Utils/Time.cs @@ -1,6 +1,4 @@ -using System.Diagnostics; - -namespace Utils +namespace Utils { public static class Time { @@ -111,78 +109,24 @@ namespace Utils public static void Retry(Action action, TimeSpan maxTimeout, TimeSpan retryTime, string description) { - var start = DateTime.UtcNow; - var tries = 1; - var tryInfo = new List<(Exception, TimeSpan)>(); - - while (true) - { - var duration = DateTime.UtcNow - start; - if (duration > maxTimeout) - { - var info = FormatTryInfos(tryInfo); - throw new TimeoutException($"Retry '{description}' timed out after {tries} tries over {FormatDuration(duration)}.{Environment.NewLine}{info}"); - } - - var sw = Stopwatch.StartNew(); - try - { - action(); - return; - } - catch (Exception ex) - { - tryInfo.Add((ex, sw.Elapsed)); - tries++; - } - - Sleep(retryTime); - } - } - - private static string FormatTryInfos(List<(Exception, TimeSpan)> tryInfo) - { - return string.Join(Environment.NewLine, tryInfo.Select(FormatTryInfo).ToArray()); - } - - private static string FormatTryInfo((Exception, TimeSpan) info, int index) - { - return $"Attempt {index} took {FormatDuration(info.Item2)} and failed with exception {info.Item1}."; - } - - private static Action failedCallback = i => { }; - public static void SetRetryFailedCallback(Action onRetryFailed) - { - failedCallback = onRetryFailed; + Retry(action, maxTimeout, retryTime, description, f => { }); } public static T Retry(Func action, TimeSpan maxTimeout, TimeSpan retryTime, string description) { - var start = DateTime.UtcNow; - var tries = 1; - var exceptions = new List(); + return Retry(action, maxTimeout, retryTime, description, f => { }); + } - while (true) - { - var duration = DateTime.UtcNow - start; - if (duration > maxTimeout) - { - throw new TimeoutException($"Retry '{description}' timed out after {tries} tries over {FormatDuration(duration)}.", new AggregateException(exceptions)); - } + public static void Retry(Action action, TimeSpan maxTimeout, TimeSpan retryTime, string description, Action onFail) + { + var r = new Retry(description, maxTimeout, retryTime, onFail); + r.Run(action); + } - try - { - return action(); - } - catch (Exception ex) - { - exceptions.Add(ex); - failedCallback(tries); - tries++; - } - - Sleep(retryTime); - } + public static T Retry(Func action, TimeSpan maxTimeout, TimeSpan retryTime, string description, Action onFail) + { + var r = new Retry(description, maxTimeout, retryTime, onFail); + return r.Run(action); } } } diff --git a/ProjectPlugins/CodexPlugin/ApiChecker.cs b/ProjectPlugins/CodexPlugin/ApiChecker.cs index f3e4e10..d394405 100644 --- a/ProjectPlugins/CodexPlugin/ApiChecker.cs +++ b/ProjectPlugins/CodexPlugin/ApiChecker.cs @@ -9,7 +9,7 @@ namespace CodexPlugin public class ApiChecker { // - private const string OpenApiYamlHash = "0F-C8-02-1E-2C-2C-15-F6-91-6A-01-31-11-49-95-06-79-26-25-BF-27-3C-A8-2E-5F-7F-34-FD-C0-57-A0-9A"; + private const string OpenApiYamlHash = "67-76-AB-FC-54-4F-EB-81-F5-E4-F8-27-DF-82-92-41-63-A5-EA-1B-17-14-0C-BE-20-9C-B3-DF-CE-E4-AA-38"; private const string OpenApiFilePath = "/codex/openapi.yaml"; private const string DisableEnvironmentVariable = "CODEXPLUGIN_DISABLE_APICHECK"; diff --git a/ProjectPlugins/CodexPlugin/CodexAccess.cs b/ProjectPlugins/CodexPlugin/CodexAccess.cs index e35a9f9..80f1061 100644 --- a/ProjectPlugins/CodexPlugin/CodexAccess.cs +++ b/ProjectPlugins/CodexPlugin/CodexAccess.cs @@ -2,6 +2,7 @@ using Core; using KubernetesWorkflow; using KubernetesWorkflow.Types; +using Logging; using Newtonsoft.Json; using Utils; @@ -9,6 +10,7 @@ namespace CodexPlugin { public class CodexAccess : ILogHandler { + private readonly ILog log; private readonly IPluginTools tools; private readonly Mapper mapper = new Mapper(); private bool hasContainerCrashed; @@ -16,6 +18,7 @@ namespace CodexPlugin public CodexAccess(IPluginTools tools, RunningPod container, CrashWatcher crashWatcher) { this.tools = tools; + log = tools.GetLog(); Container = container; CrashWatcher = crashWatcher; hasContainerCrashed = false; @@ -34,20 +37,23 @@ namespace CodexPlugin public DebugPeer GetDebugPeer(string peerId) { // Cannot use openAPI: debug/peer endpoint is not specified there. - var endpoint = GetEndpoint(); - var str = endpoint.HttpGetString($"debug/peer/{peerId}"); - - if (str.ToLowerInvariant() == "unable to find peer!") + return CrashCheck(() => { - return new DebugPeer - { - IsPeerFound = false - }; - } + var endpoint = GetEndpoint(); + var str = endpoint.HttpGetString($"debug/peer/{peerId}"); - var result = endpoint.Deserialize(str); - result.IsPeerFound = true; - return result; + if (str.ToLowerInvariant() == "unable to find peer!") + { + return new DebugPeer + { + IsPeerFound = false + }; + } + + var result = endpoint.Deserialize(str); + result.IsPeerFound = true; + return result; + }); } public void ConnectToPeer(string peerId, string[] peerMultiAddresses) @@ -59,14 +65,19 @@ namespace CodexPlugin }); } - public string UploadFile(FileStream fileStream) + public string UploadFile(FileStream fileStream, Action onFailure) { - return OnCodex(api => api.UploadAsync(fileStream)); + return OnCodex( + api => api.UploadAsync(fileStream), + CreateRetryConfig(nameof(UploadFile), onFailure)); } - public Stream DownloadFile(string contentId) + public Stream DownloadFile(string contentId, Action onFailure) { - var fileResponse = OnCodex(api => api.DownloadNetworkAsync(contentId)); + var fileResponse = OnCodex( + api => api.DownloadNetworkAsync(contentId), + CreateRetryConfig(nameof(DownloadFile), onFailure)); + if (fileResponse.StatusCode != 200) throw new Exception("Download failed with StatusCode: " + fileResponse.StatusCode); return fileResponse.Stream; } @@ -89,15 +100,24 @@ namespace CodexPlugin return OnCodex(api => api.CreateStorageRequestAsync(request.ContentId.Id, body)); } + public CodexSpace Space() + { + var space = OnCodex(api => api.SpaceAsync()); + return mapper.Map(space); + } + public StoragePurchase GetPurchaseStatus(string purchaseId) { - var endpoint = GetEndpoint(); - return Time.Retry(() => + return CrashCheck(() => { - var str = endpoint.HttpGetString($"storage/purchases/{purchaseId}"); - if (string.IsNullOrEmpty(str)) throw new Exception("Empty response."); - return JsonConvert.DeserializeObject(str)!; - }, nameof(GetPurchaseStatus)); + var endpoint = GetEndpoint(); + return Time.Retry(() => + { + var str = endpoint.HttpGetString($"storage/purchases/{purchaseId}"); + if (string.IsNullOrEmpty(str)) throw new Exception("Empty response."); + return JsonConvert.DeserializeObject(str)!; + }, nameof(GetPurchaseStatus)); + }); // TODO: current getpurchase api does not line up with its openapi spec. // return mapper.Map(OnCodex(api => api.GetPurchaseAsync(purchaseId))); @@ -114,19 +134,67 @@ namespace CodexPlugin return workflow.GetPodInfo(Container); } + public void LogDiskSpace(string msg) + { + try + { + var diskInfo = tools.CreateWorkflow().ExecuteCommand(Container.Containers.Single(), "df", "--sync"); + Log($"{msg} - Disk info: {diskInfo}"); + } + catch (Exception e) + { + Log("Failed to get disk info: " + e); + } + } + + public void DeleteRepoFolder() + { + try + { + var containerNumber = Container.Containers.First().Recipe.Number; + var dataDir = $"datadir{containerNumber}"; + var workflow = tools.CreateWorkflow(); + workflow.ExecuteCommand(Container.Containers.First(), "rm", "-Rfv", $"/codex/{dataDir}/repo"); + Log("Deleted repo folder."); + } + catch (Exception e) + { + Log("Unable to delete repo folder: " + e); + } + } + private T OnCodex(Func> action) { - var address = GetAddress(); - var result = tools.CreateHttp(CheckContainerCrashed) - .OnClient(client => - { - var api = new CodexApi(client); - api.BaseUrl = $"{address.Host}:{address.Port}/api/codex/v1"; - return Time.Wait(action(api)); - }); + var result = tools.CreateHttp(CheckContainerCrashed).OnClient(client => CallCodex(client, action)); return result; } + private T OnCodex(Func> action, Retry retry) + { + var result = tools.CreateHttp(CheckContainerCrashed).OnClient(client => CallCodex(client, action), retry); + return result; + } + + private T CallCodex(HttpClient client, Func> action) + { + var address = GetAddress(); + var api = new CodexApi(client); + api.BaseUrl = $"{address.Host}:{address.Port}/api/codex/v1"; + return CrashCheck(() => Time.Wait(action(api))); + } + + private T CrashCheck(Func action) + { + try + { + return action(); + } + finally + { + CrashWatcher.HasContainerCrashed(); + } + } + private IEndpoint GetEndpoint() { return tools @@ -136,20 +204,19 @@ namespace CodexPlugin private Address GetAddress() { - return Container.Containers.Single().GetAddress(tools.GetLog(), CodexContainerRecipe.ApiPortTag); + return Container.Containers.Single().GetAddress(log, CodexContainerRecipe.ApiPortTag); } private void CheckContainerCrashed(HttpClient client) { - if (hasContainerCrashed) throw new Exception("Container has crashed."); + if (hasContainerCrashed) throw new Exception($"Container {GetName()} has crashed."); } - public void Log(Stream crashLog) + void ILogHandler.Log(Stream crashLog) { - var log = tools.GetLog(); var file = log.CreateSubfile(); - log.Log($"Container {Container.Name} has crashed. Downloading crash log to '{file.FullFilename}'..."); - file.Write($"Container Crash Log for {Container.Name}."); + Log($"Downloading log to '{file.FullFilename}'..."); + file.Write($"Container log for {Container.Name}."); using var reader = new StreamReader(crashLog); var line = reader.ReadLine(); @@ -159,8 +226,63 @@ namespace CodexPlugin line = reader.ReadLine(); } - log.Log("Crash log successfully downloaded."); + Log("Container log successfully downloaded."); hasContainerCrashed = true; } + + private Retry CreateRetryConfig(string description, Action onFailure) + { + var timeSet = tools.TimeSet; + + return new Retry(description, timeSet.HttpRetryTimeout(), timeSet.HttpCallRetryDelay(), failure => + { + onFailure(failure); + Investigate(failure, timeSet); + }); + } + + private void Investigate(Failure failure, ITimeSet timeSet) + { + Log($"Retry {failure.TryNumber} took {Time.FormatDuration(failure.Duration)} and failed with '{failure.Exception}'. " + + $"(HTTP timeout = {Time.FormatDuration(timeSet.HttpCallTimeout())}) " + + $"Checking if node responds to debug/info..."); + + LogDiskSpace("After retry failure"); + + try + { + var debugInfo = GetDebugInfo(); + if (string.IsNullOrEmpty(debugInfo.Spr)) + { + Log("Did not get value debug/info response."); + Throw(failure); + } + else + { + Log("Got valid response from debug/info."); + } + } + catch (Exception ex) + { + Log("Got exception from debug/info call: " + ex); + Throw(failure); + } + + if (failure.Duration < timeSet.HttpCallTimeout()) + { + Log("Retry failed within HTTP timeout duration."); + Throw(failure); + } + } + + private void Throw(Failure failure) + { + throw failure.Exception; + } + + private void Log(string msg) + { + log.Log($"{GetName()} {msg}"); + } } } diff --git a/ProjectPlugins/CodexPlugin/CodexContainerRecipe.cs b/ProjectPlugins/CodexPlugin/CodexContainerRecipe.cs index 07b94ac..564e81a 100644 --- a/ProjectPlugins/CodexPlugin/CodexContainerRecipe.cs +++ b/ProjectPlugins/CodexPlugin/CodexContainerRecipe.cs @@ -7,7 +7,7 @@ namespace CodexPlugin { public class CodexContainerRecipe : ContainerRecipeFactory { - private const string DefaultDockerImage = "codexstorage/nim-codex:sha-267266a-dist-tests"; + private const string DefaultDockerImage = "codexstorage/nim-codex:sha-b89493e-dist-tests"; public const string ApiPortTag = "codex_api_port"; public const string ListenPortTag = "codex_listen_port"; @@ -120,7 +120,7 @@ namespace CodexPlugin } } - if(!string.IsNullOrEmpty(config.NameOverride)) + if (!string.IsNullOrEmpty(config.NameOverride)) { AddEnvVar("CODEX_NODENAME", config.NameOverride); } @@ -160,7 +160,7 @@ namespace CodexPlugin private ByteSize GetVolumeCapacity(CodexStartupConfig config) { - if (config.StorageQuota != null) return config.StorageQuota; + if (config.StorageQuota != null) return config.StorageQuota.Multiply(1.2); // Default Codex quota: 8 Gb, using +20% to be safe. return 8.GB().Multiply(1.2); } diff --git a/ProjectPlugins/CodexPlugin/CodexNode.cs b/ProjectPlugins/CodexPlugin/CodexNode.cs index 33c249a..c5372f6 100644 --- a/ProjectPlugins/CodexPlugin/CodexNode.cs +++ b/ProjectPlugins/CodexPlugin/CodexNode.cs @@ -15,8 +15,11 @@ namespace CodexPlugin DebugInfo GetDebugInfo(); DebugPeer GetDebugPeer(string peerId); ContentId UploadFile(TrackedFile file); + ContentId UploadFile(TrackedFile file, Action onFailure); TrackedFile? DownloadContent(ContentId contentId, string fileLabel = ""); + TrackedFile? DownloadContent(ContentId contentId, Action onFailure, string fileLabel = ""); LocalDatasetList LocalFiles(); + CodexSpace Space(); void ConnectToPeer(ICodexNode node); DebugInfoVersion Version { get; } IMarketplaceAccess Marketplace { get; } @@ -24,6 +27,12 @@ namespace CodexPlugin PodInfo GetPodInfo(); ITransferSpeeds TransferSpeeds { get; } EthAccount EthAccount { get; } + + /// + /// Warning! The node is not usable after this. + /// TODO: Replace with delete-blocks debug call once available in Codex. + /// + void DeleteRepoFolder(); void Stop(bool waitTillStopped); } @@ -90,7 +99,7 @@ namespace CodexPlugin { var debugInfo = CodexAccess.GetDebugInfo(); var known = string.Join(",", debugInfo.Table.Nodes.Select(n => n.PeerId)); - Log($"Got DebugInfo with id: '{debugInfo.Id}'. This node knows: {known}"); + Log($"Got DebugInfo with id: {debugInfo.Id}. This node knows: [{known}]"); return debugInfo; } @@ -101,13 +110,20 @@ namespace CodexPlugin public ContentId UploadFile(TrackedFile file) { + return UploadFile(file, DoNothing); + } + + public ContentId UploadFile(TrackedFile file, Action onFailure) + { + CodexAccess.LogDiskSpace("Before upload"); + using var fileStream = File.OpenRead(file.Filename); var logMessage = $"Uploading file {file.Describe()}..."; Log(logMessage); var measurement = Stopwatch.Measure(tools.GetLog(), logMessage, () => { - return CodexAccess.UploadFile(fileStream); + return CodexAccess.UploadFile(fileStream, onFailure); }); var response = measurement.Value; @@ -117,15 +133,22 @@ namespace CodexPlugin if (response.StartsWith(UploadFailedMessage)) FrameworkAssert.Fail("Node failed to store block."); Log($"Uploaded file. Received contentId: '{response}'."); + CodexAccess.LogDiskSpace("After upload"); + return new ContentId(response); } public TrackedFile? DownloadContent(ContentId contentId, string fileLabel = "") + { + return DownloadContent(contentId, DoNothing, fileLabel); + } + + public TrackedFile? DownloadContent(ContentId contentId, Action onFailure, string fileLabel = "") { var logMessage = $"Downloading for contentId: '{contentId.Id}'..."; Log(logMessage); var file = tools.GetFileManager().CreateEmptyFile(fileLabel); - var measurement = Stopwatch.Measure(tools.GetLog(), logMessage, () => DownloadToFile(contentId.Id, file)); + var measurement = Stopwatch.Measure(tools.GetLog(), logMessage, () => DownloadToFile(contentId.Id, file, onFailure)); transferSpeeds.AddDownloadSample(file.GetFilesize(), measurement); Log($"Downloaded file {file.Describe()} to '{file.Filename}'."); return file; @@ -136,6 +159,11 @@ namespace CodexPlugin return CodexAccess.LocalFiles(); } + public CodexSpace Space() + { + return CodexAccess.Space(); + } + public void ConnectToPeer(ICodexNode node) { var peer = (CodexNode)node; @@ -152,15 +180,16 @@ namespace CodexPlugin return CodexAccess.GetPodInfo(); } + public void DeleteRepoFolder() + { + CodexAccess.DeleteRepoFolder(); + } + public void Stop(bool waitTillStopped) { + Log("Stopping..."); CrashWatcher.Stop(); Group.Stop(this, waitTillStopped); - // if (Group.Count() > 1) throw new InvalidOperationException("Codex-nodes that are part of a group cannot be " + - // "individually shut down. Use 'BringOffline()' on the group object to stop the group. This method is only " + - // "available for codex-nodes in groups of 1."); - // - // Group.BringOffline(waitTillStopped); } public void EnsureOnlineGetVersionResponse() @@ -192,12 +221,14 @@ namespace CodexPlugin .ToArray(); } - private void DownloadToFile(string contentId, TrackedFile file) + private void DownloadToFile(string contentId, TrackedFile file, Action onFailure) { + CodexAccess.LogDiskSpace("Before download"); + using var fileStream = File.OpenWrite(file.Filename); try { - using var downloadStream = CodexAccess.DownloadFile(contentId); + using var downloadStream = CodexAccess.DownloadFile(contentId, onFailure); downloadStream.CopyTo(fileStream); } catch @@ -205,6 +236,8 @@ namespace CodexPlugin Log($"Failed to download file '{contentId}'."); throw; } + + CodexAccess.LogDiskSpace("After download"); } private void EnsureMarketplace() @@ -216,5 +249,9 @@ namespace CodexPlugin { tools.GetLog().Log($"{GetName()}: {msg}"); } + + private void DoNothing(Failure failure) + { + } } } diff --git a/ProjectPlugins/CodexPlugin/CodexStarter.cs b/ProjectPlugins/CodexPlugin/CodexStarter.cs index dec13d3..77fd89a 100644 --- a/ProjectPlugins/CodexPlugin/CodexStarter.cs +++ b/ProjectPlugins/CodexPlugin/CodexStarter.cs @@ -33,7 +33,7 @@ namespace CodexPlugin foreach (var rc in containers) { var podInfo = GetPodInfo(rc); - var podInfos = string.Join(", ", rc.Containers.Select(c => $"Container: '{c.Name}' runs at '{podInfo.K8SNodeName}'={podInfo.Ip}")); + var podInfos = string.Join(", ", rc.Containers.Select(c => $"Container: '{c.Name}' PodLabel: '{c.RunningPod.StartResult.Deployment.PodLabel}' runs at '{podInfo.K8SNodeName}'={podInfo.Ip}")); Log($"Started {codexSetup.NumberOfNodes} nodes of image '{containers.First().Containers.First().Recipe.Image}'. ({podInfos})"); } LogSeparator(); diff --git a/ProjectPlugins/CodexPlugin/CodexTypes.cs b/ProjectPlugins/CodexPlugin/CodexTypes.cs index 945251e..d40fb13 100644 --- a/ProjectPlugins/CodexPlugin/CodexTypes.cs +++ b/ProjectPlugins/CodexPlugin/CodexTypes.cs @@ -105,4 +105,18 @@ namespace CodexPlugin return HashCode.Combine(Id); } } + + public class CodexSpace + { + public long TotalBlocks { get; set; } + public long QuotaMaxBytes { get; set; } + public long QuotaUsedBytes { get; set; } + public long QuotaReservedBytes { get; set; } + public long FreeBytes => QuotaMaxBytes - (QuotaUsedBytes + QuotaReservedBytes); + + public override string ToString() + { + return JsonConvert.SerializeObject(this); + } + } } diff --git a/ProjectPlugins/CodexPlugin/Mapper.cs b/ProjectPlugins/CodexPlugin/Mapper.cs index 61c95b4..c175ae1 100644 --- a/ProjectPlugins/CodexPlugin/Mapper.cs +++ b/ProjectPlugins/CodexPlugin/Mapper.cs @@ -1,4 +1,5 @@ using CodexContractsPlugin; +using CodexOpenApi; using Newtonsoft.Json.Linq; using System.Numerics; using Utils; @@ -62,14 +63,47 @@ namespace CodexPlugin }; } - public StoragePurchase Map(CodexOpenApi.Purchase purchase) - { - return new StoragePurchase - { - State = purchase.State, - Error = purchase.Error - }; - } + // TODO: Fix openapi spec for this call. + //public StoragePurchase Map(CodexOpenApi.Purchase purchase) + //{ + // return new StoragePurchase(Map(purchase.Request)) + // { + // State = purchase.State, + // Error = purchase.Error + // }; + //} + + //public StorageRequest Map(CodexOpenApi.StorageRequest request) + //{ + // return new StorageRequest(Map(request.Ask), Map(request.Content)) + // { + // Id = request.Id, + // Client = request.Client, + // Expiry = TimeSpan.FromSeconds(Convert.ToInt64(request.Expiry)), + // Nonce = request.Nonce + // }; + //} + + //public StorageAsk Map(CodexOpenApi.StorageAsk ask) + //{ + // return new StorageAsk + // { + // Duration = TimeSpan.FromSeconds(Convert.ToInt64(ask.Duration)), + // MaxSlotLoss = ask.MaxSlotLoss, + // ProofProbability = ask.ProofProbability, + // Reward = Convert.ToDecimal(ask.Reward).TstWei(), + // Slots = ask.Slots, + // SlotSize = new ByteSize(Convert.ToInt64(ask.SlotSize)) + // }; + //} + + //public StorageContent Map(CodexOpenApi.Content content) + //{ + // return new StorageContent + // { + // Cid = content.Cid + // }; + //} public StorageAvailability Map(CodexOpenApi.SalesAvailabilityREAD read) { @@ -84,6 +118,17 @@ namespace CodexPlugin }; } + public CodexSpace Map(Space space) + { + return new CodexSpace + { + QuotaMaxBytes = space.QuotaMaxBytes, + QuotaReservedBytes = space.QuotaReservedBytes, + QuotaUsedBytes = space.QuotaUsedBytes, + TotalBlocks = space.TotalBlocks + }; + } + private DebugInfoVersion MapDebugInfoVersion(JObject obj) { return new DebugInfoVersion @@ -98,7 +143,7 @@ namespace CodexPlugin return new DebugInfoTable { LocalNode = MapDebugInfoTableNode(obj.GetValue("localNode")), - Nodes = new DebugInfoTableNode[0] + Nodes = MapDebugInfoTableNodeArray(obj.GetValue("nodes") as JArray) }; } @@ -117,6 +162,16 @@ namespace CodexPlugin }; } + private DebugInfoTableNode[] MapDebugInfoTableNodeArray(JArray? nodes) + { + if (nodes == null || nodes.Count == 0) + { + return new DebugInfoTableNode[0]; + } + + return nodes.Select(MapDebugInfoTableNode).ToArray(); + } + private Manifest MapManifest(CodexOpenApi.ManifestItem manifest) { return new Manifest diff --git a/ProjectPlugins/CodexPlugin/MarketplaceTypes.cs b/ProjectPlugins/CodexPlugin/MarketplaceTypes.cs index caeda25..21aa86a 100644 --- a/ProjectPlugins/CodexPlugin/MarketplaceTypes.cs +++ b/ProjectPlugins/CodexPlugin/MarketplaceTypes.cs @@ -1,5 +1,7 @@ using CodexContractsPlugin; +using CodexOpenApi; using Logging; +using System.Data; using Utils; namespace CodexPlugin @@ -37,6 +39,34 @@ namespace CodexPlugin { public string State { get; set; } = string.Empty; public string Error { get; set; } = string.Empty; + public StorageRequest Request { get; set; } = null!; + } + + public class StorageRequest + { + public string Id { get; set; } = string.Empty; + public string Client { get; set; } = string.Empty; + public StorageAsk Ask { get; set; } = null!; + public StorageContent Content { get; set; } = null!; + public string Expiry { get; set; } = string.Empty; + public string Nonce { get; set; } = string.Empty; + } + + public class StorageAsk + { + public int Slots { get; set; } + public string SlotSize { get; set; } = string.Empty; + public string Duration { get; set; } = string.Empty; + public string ProofProbability { get; set; } = string.Empty; + public string Reward { get; set; } = string.Empty; + public int MaxSlotLoss { get; set; } + } + + public class StorageContent + { + public string Cid { get; set; } = string.Empty; + //public ErasureParameters Erasure { get; set; } + //public PoRParameters Por { get; set; } } public class StorageAvailability diff --git a/ProjectPlugins/CodexPlugin/StoragePurchaseContract.cs b/ProjectPlugins/CodexPlugin/StoragePurchaseContract.cs index d812f51..3446231 100644 --- a/ProjectPlugins/CodexPlugin/StoragePurchaseContract.cs +++ b/ProjectPlugins/CodexPlugin/StoragePurchaseContract.cs @@ -7,6 +7,8 @@ namespace CodexPlugin public interface IStoragePurchaseContract { string PurchaseId { get; } + StoragePurchaseRequest Purchase { get; } + ContentId ContentId { get; } void WaitForStorageContractSubmitted(); void WaitForStorageContractStarted(); void WaitForStorageContractFinished(); @@ -28,10 +30,13 @@ namespace CodexPlugin this.codexAccess = codexAccess; PurchaseId = purchaseId; Purchase = purchase; + + ContentId = new ContentId(codexAccess.GetPurchaseStatus(purchaseId).Request.Content.Cid); } public string PurchaseId { get; } public StoragePurchaseRequest Purchase { get; } + public ContentId ContentId { get; } public TimeSpan? PendingToSubmitted => contractSubmittedUtc - contractPendingUtc; public TimeSpan? SubmittedToStarted => contractStartedUtc - contractSubmittedUtc; diff --git a/ProjectPlugins/CodexPlugin/openapi.yaml b/ProjectPlugins/CodexPlugin/openapi.yaml index 49c75e6..94450bf 100644 --- a/ProjectPlugins/CodexPlugin/openapi.yaml +++ b/ProjectPlugins/CodexPlugin/openapi.yaml @@ -289,6 +289,7 @@ components: description: "Root hash of the content" originalBytes: type: integer + format: int64 description: "Length of original content in bytes" blockSize: type: integer @@ -303,14 +304,18 @@ components: totalBlocks: description: "Number of blocks stored by the node" type: integer + format: int64 quotaMaxBytes: type: integer + format: int64 description: "Maximum storage space used by the node" quotaUsedBytes: type: integer + format: int64 description: "Amount of storage space currently in use" quotaReservedBytes: type: integer + format: int64 description: "Amount of storage space reserved" servers: diff --git a/ProjectPlugins/MetricsPlugin/MetricsQuery.cs b/ProjectPlugins/MetricsPlugin/MetricsQuery.cs index 199015b..cdab697 100644 --- a/ProjectPlugins/MetricsPlugin/MetricsQuery.cs +++ b/ProjectPlugins/MetricsPlugin/MetricsQuery.cs @@ -1,4 +1,5 @@ using Core; +using IdentityModel; using KubernetesWorkflow.Types; using Logging; using System.Globalization; @@ -177,6 +178,41 @@ namespace MetricsPlugin { return "[" + string.Join(',', Sets.Select(s => s.ToString())) + "]"; } + + public string AsCsv() + { + var allTimestamps = Sets.SelectMany(s => s.Values.Select(v => v.Timestamp)).Distinct().OrderDescending().ToArray(); + + var lines = new List(); + MakeLine(lines, e => + { + e.Add("Metrics"); + foreach (var ts in allTimestamps) e.Add(ts.ToEpochTime().ToString()); + }); + + foreach (var set in Sets) + { + MakeLine(lines, e => + { + e.Add(set.Name); + foreach (var ts in allTimestamps) + { + var value = set.Values.SingleOrDefault(v => v.Timestamp == ts); + if (value == null) e.Add(" "); + else e.Add(value.Value.ToString()); + } + }); + } + + return string.Join(Environment.NewLine, lines.ToArray()); + } + + private void MakeLine(List lines, Action> values) + { + var list = new List(); + values(list); + lines.Add(string.Join(",", list)); + } } public class MetricsSet diff --git a/Tests/CodexContinuousTests/ContinuousTestRunner.cs b/Tests/CodexContinuousTests/ContinuousTestRunner.cs index b860c86..fde65aa 100644 --- a/Tests/CodexContinuousTests/ContinuousTestRunner.cs +++ b/Tests/CodexContinuousTests/ContinuousTestRunner.cs @@ -148,7 +148,7 @@ namespace ContinuousTests log.Log($"Clearing namespace '{test.CustomK8sNamespace}'..."); var entryPoint = entryPointFactory.CreateEntryPoint(config.KubeConfigFile, config.DataPath, test.CustomK8sNamespace, log); - entryPoint.Tools.CreateWorkflow().DeleteNamespacesStartingWith(test.CustomK8sNamespace); + entryPoint.Tools.CreateWorkflow().DeleteNamespacesStartingWith(test.CustomK8sNamespace, wait: true); } private void PerformCleanup(ILog log) @@ -157,7 +157,7 @@ namespace ContinuousTests log.Log("Cleaning up test namespace..."); var entryPoint = entryPointFactory.CreateEntryPoint(config.KubeConfigFile, config.DataPath, config.CodexDeployment.Metadata.KubeNamespace, log); - entryPoint.Decommission(deleteKubernetesResources: true, deleteTrackedFiles: true); + entryPoint.Decommission(deleteKubernetesResources: true, deleteTrackedFiles: true, waitTillDone: true); log.Log("Cleanup finished."); } } diff --git a/Tests/CodexContinuousTests/NodeRunner.cs b/Tests/CodexContinuousTests/NodeRunner.cs index 31f1f2e..e58facc 100644 --- a/Tests/CodexContinuousTests/NodeRunner.cs +++ b/Tests/CodexContinuousTests/NodeRunner.cs @@ -64,7 +64,7 @@ namespace ContinuousTests } finally { - entryPoint.Tools.CreateWorkflow().DeleteNamespace(); + entryPoint.Tools.CreateWorkflow().DeleteNamespace(wait: false); } } diff --git a/Tests/CodexContinuousTests/SingleTestRun.cs b/Tests/CodexContinuousTests/SingleTestRun.cs index 38bc502..95d0466 100644 --- a/Tests/CodexContinuousTests/SingleTestRun.cs +++ b/Tests/CodexContinuousTests/SingleTestRun.cs @@ -54,7 +54,8 @@ namespace ContinuousTests entryPoint.Decommission( deleteKubernetesResources: false, // This would delete the continuous test net. - deleteTrackedFiles: true + deleteTrackedFiles: true, + waitTillDone: false ); runFinishedHandle.Set(); } diff --git a/Tests/CodexLongTests/BasicTests/LargeFileTests.cs b/Tests/CodexLongTests/BasicTests/LargeFileTests.cs index d182ed8..0470e35 100644 --- a/Tests/CodexLongTests/BasicTests/LargeFileTests.cs +++ b/Tests/CodexLongTests/BasicTests/LargeFileTests.cs @@ -50,8 +50,6 @@ namespace CodexLongTests.BasicTests var node = StartCodex(s => s.WithStorageQuota((size + 10).MB())); - Time.SetRetryFailedCallback(i => OnFailed(i, node)); - var uploadStart = DateTime.UtcNow; var cid = node.UploadFile(expectedFile); var downloadStart = DateTime.UtcNow; @@ -62,17 +60,6 @@ namespace CodexLongTests.BasicTests AssertTimeConstraint(uploadStart, downloadStart, downloadFinished, size); } - private void OnFailed(int tries, ICodexNode node) - { - if (tries < 5) return; - - if (tries % 10 == 0) - { - Log($"After try {tries}, downloading node log."); - Ci.DownloadLog(node); - } - } - private void AssertTimeConstraint(DateTime uploadStart, DateTime downloadStart, DateTime downloadFinished, long size) { float sizeInMB = size; diff --git a/Tests/CodexLongTests/DownloadConnectivityTests/LongFullyConnectedDownloadTests.cs b/Tests/CodexLongTests/DownloadConnectivityTests/LongFullyConnectedDownloadTests.cs index 15fc9f3..eaa999e 100644 --- a/Tests/CodexLongTests/DownloadConnectivityTests/LongFullyConnectedDownloadTests.cs +++ b/Tests/CodexLongTests/DownloadConnectivityTests/LongFullyConnectedDownloadTests.cs @@ -15,9 +15,9 @@ namespace CodexLongTests.DownloadConnectivityTests [Values(10, 15, 20)] int numberOfNodes, [Values(10, 100)] int sizeMBs) { - for (var i = 0; i < numberOfNodes; i++) StartCodex(); + var nodes = StartCodex(numberOfNodes); - CreatePeerDownloadTestHelpers().AssertFullDownloadInterconnectivity(GetAllOnlineCodexNodes(), sizeMBs.MB()); + CreatePeerDownloadTestHelpers().AssertFullDownloadInterconnectivity(nodes, sizeMBs.MB()); } } } diff --git a/Tests/CodexLongTests/ScalabilityTests/ScalabilityTests.cs b/Tests/CodexLongTests/ScalabilityTests/ScalabilityTests.cs index b87be97..c3b05f7 100644 --- a/Tests/CodexLongTests/ScalabilityTests/ScalabilityTests.cs +++ b/Tests/CodexLongTests/ScalabilityTests/ScalabilityTests.cs @@ -17,12 +17,13 @@ public class ScalabilityTests : CodexDistTest [Combinatorial] [UseLongTimeouts] [DontDownloadLogs] + [WaitForCleanup] public void ShouldMaintainFileInNetwork( - [Values(10, 40, 80, 100)] int numberOfNodes, - [Values(100, 1000, 5000, 10000)] int fileSizeInMb + [Values(4, 5, 6)] int numberOfNodes, // TODO: include 10, 40, 80 and 100, not 5 + [Values(4000, 5000, 6000, 7000, 8000, 9000, 10000)] int fileSizeInMb ) { - var logLevel = CodexLogLevel.Info; + var logLevel = CodexLogLevel.Trace; var bootstrap = StartCodex(s => s.WithLogLevel(logLevel)); var nodes = StartCodex(numberOfNodes - 1, s => s @@ -35,17 +36,27 @@ public class ScalabilityTests : CodexDistTest var downloader = nodes.PickOneRandom(); var testFile = GenerateTestFile(fileSizeInMb.MB()); - var contentId = uploader.UploadFile(testFile); - var downloadedFile = downloader.DownloadContent(contentId); + + LogNodeStatus(uploader); + var contentId = uploader.UploadFile(testFile, f => LogNodeStatus(uploader)); + LogNodeStatus(uploader); + + LogNodeStatus(downloader); + var downloadedFile = downloader.DownloadContent(contentId, f => LogNodeStatus(downloader)); + LogNodeStatus(downloader); downloadedFile!.AssertIsEqual(testFile); + uploader.DeleteRepoFolder(); uploader.Stop(true); var otherDownloader = nodes.PickOneRandom(); downloadedFile = otherDownloader.DownloadContent(contentId); downloadedFile!.AssertIsEqual(testFile); + + downloader.DeleteRepoFolder(); + otherDownloader.DeleteRepoFolder(); } /// @@ -57,6 +68,7 @@ public class ScalabilityTests : CodexDistTest [Combinatorial] [UseLongTimeouts] [DontDownloadLogs] + [WaitForCleanup] public void EveryoneGetsAFile( [Values(10, 40, 80, 100)] int numberOfNodes, [Values(100, 1000, 5000, 10000)] int fileSizeInMb diff --git a/Tests/CodexTests/BasicTests/ExampleTests.cs b/Tests/CodexTests/BasicTests/ExampleTests.cs index 0356298..f9d5b89 100644 --- a/Tests/CodexTests/BasicTests/ExampleTests.cs +++ b/Tests/CodexTests/BasicTests/ExampleTests.cs @@ -45,6 +45,9 @@ namespace CodexTests.BasicTests metrics[0].AssertThat("libp2p_peers", Is.EqualTo(1)); metrics[1].AssertThat("libp2p_peers", Is.EqualTo(1)); + + LogNodeStatus(primary, metrics[0]); + LogNodeStatus(primary2, metrics[1]); } [Test] diff --git a/Tests/CodexTests/BasicTests/MarketplaceTests.cs b/Tests/CodexTests/BasicTests/MarketplaceTests.cs index bd3ce41..3f45469 100644 --- a/Tests/CodexTests/BasicTests/MarketplaceTests.cs +++ b/Tests/CodexTests/BasicTests/MarketplaceTests.cs @@ -85,6 +85,46 @@ namespace CodexTests.BasicTests Assert.That(contracts.GetRequestState(request), Is.EqualTo(RequestState.Finished)); } + [Test] + public void CanDownloadContentFromContractCid() + { + var fileSize = 10.MB(); + var geth = Ci.StartGethNode(s => s.IsMiner().WithName("disttest-geth")); + var contracts = Ci.StartCodexContracts(geth); + var testFile = GenerateTestFile(fileSize); + + var client = StartCodex(s => s + .WithName("Client") + .EnableMarketplace(geth, contracts, m => m + .WithInitial(10.Eth(), 10.Tst()))); + + var uploadCid = client.UploadFile(testFile); + + var purchase = new StoragePurchaseRequest(uploadCid) + { + PricePerSlotPerSecond = 2.TstWei(), + RequiredCollateral = 10.TstWei(), + MinRequiredNumberOfNodes = 5, + NodeFailureTolerance = 2, + ProofProbability = 5, + Duration = TimeSpan.FromMinutes(5), + Expiry = TimeSpan.FromMinutes(4) + }; + + var purchaseContract = client.Marketplace.RequestStorage(purchase); + var contractCid = purchaseContract.ContentId; + Assert.That(uploadCid.Id, Is.Not.EqualTo(contractCid.Id)); + + // Download both from client. + testFile.AssertIsEqual(client.DownloadContent(uploadCid)); + testFile.AssertIsEqual(client.DownloadContent(contractCid)); + + // Download both from another node. + var downloader = StartCodex(s => s.WithName("Downloader")); + testFile.AssertIsEqual(downloader.DownloadContent(uploadCid)); + testFile.AssertIsEqual(downloader.DownloadContent(contractCid)); + } + private void WaitForAllSlotFilledEvents(ICodexContracts contracts, StoragePurchaseRequest purchase, IGethNode geth) { Time.Retry(() => @@ -92,9 +132,9 @@ namespace CodexTests.BasicTests var events = contracts.GetEvents(GetTestRunTimeRange()); var slotFilledEvents = events.GetSlotFilledEvents(); - Debug($"SlotFilledEvents: {slotFilledEvents.Length} - NumSlots: {purchase.MinRequiredNumberOfNodes}"); - - if (slotFilledEvents.Length != purchase.MinRequiredNumberOfNodes) throw new Exception(); + var msg = $"SlotFilledEvents: {slotFilledEvents.Length} - NumSlots: {purchase.MinRequiredNumberOfNodes}"; + Debug(msg); + if (slotFilledEvents.Length != purchase.MinRequiredNumberOfNodes) throw new Exception(msg); }, purchase.Expiry + TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(5), "Checking SlotFilled events"); } diff --git a/Tests/CodexTests/BasicTests/OneClientTests.cs b/Tests/CodexTests/BasicTests/OneClientTests.cs index 11e8125..451b1cd 100644 --- a/Tests/CodexTests/BasicTests/OneClientTests.cs +++ b/Tests/CodexTests/BasicTests/OneClientTests.cs @@ -13,6 +13,8 @@ namespace CodexTests.BasicTests var primary = StartCodex(); PerformOneClientTest(primary); + + LogNodeStatus(primary); } [Test] diff --git a/Tests/CodexTests/CodexDistTest.cs b/Tests/CodexTests/CodexDistTest.cs index 7058b8e..e62627f 100644 --- a/Tests/CodexTests/CodexDistTest.cs +++ b/Tests/CodexTests/CodexDistTest.cs @@ -6,14 +6,14 @@ using Core; using DistTestCore; using DistTestCore.Helpers; using DistTestCore.Logs; +using MetricsPlugin; +using Newtonsoft.Json; using NUnit.Framework.Constraints; namespace CodexTests { public class CodexDistTest : DistTest { - private readonly Dictionary> onlineCodexNodes = new Dictionary>(); - public CodexDistTest() { ProjectPlugin.Load(); @@ -29,16 +29,6 @@ namespace CodexTests localBuilder.Build(); } - protected override void LifecycleStart(TestLifecycle lifecycle) - { - onlineCodexNodes.Add(lifecycle, new List()); - } - - protected override void LifecycleStop(TestLifecycle lifecycle) - { - onlineCodexNodes.Remove(lifecycle); - } - public ICodexNode StartCodex() { return StartCodex(s => { }); @@ -61,7 +51,7 @@ namespace CodexTests setup(s); OnCodexSetup(s); }); - onlineCodexNodes[Get()].AddRange(group); + return group; } @@ -75,11 +65,6 @@ namespace CodexTests return new PeerDownloadTestHelpers(GetTestLog(), Get().GetFileManager()); } - public IEnumerable GetAllOnlineCodexNodes() - { - return onlineCodexNodes[Get()]; - } - public void AssertBalance(ICodexContracts contracts, ICodexNode codexNode, Constraint constraint, string msg = "") { AssertHelpers.RetryAssert(constraint, () => contracts.GetTestTokenBalance(codexNode), nameof(AssertBalance) + msg); @@ -99,17 +84,29 @@ namespace CodexTests log.AssertLogDoesNotContain("ERR "); } + public void LogNodeStatus(ICodexNode node, IMetricsAccess? metrics = null) + { + Log("Status for " + node.GetName() + Environment.NewLine + + GetBasicNodeStatus(node)); + } + + private string GetBasicNodeStatus(ICodexNode node) + { + return JsonConvert.SerializeObject(node.GetDebugInfo(), Formatting.Indented) + Environment.NewLine + + node.Space().ToString() + Environment.NewLine; + } + + // Disabled for now: Makes huge log files! + //private string GetNodeMetrics(IMetricsAccess? metrics) + //{ + // if (metrics == null) return "No metrics enabled"; + // var m = metrics.GetAllMetrics(); + // if (m == null) return "No metrics received"; + // return m.AsCsv(); + //} + protected virtual void OnCodexSetup(ICodexSetup setup) { } - - protected override void CollectStatusLogData(TestLifecycle lifecycle, Dictionary data) - { - var nodes = onlineCodexNodes[lifecycle]; - var upload = nodes.Select(n => n.TransferSpeeds.GetUploadSpeed()).ToList()!.OptionalAverage(); - var download = nodes.Select(n => n.TransferSpeeds.GetDownloadSpeed()).ToList()!.OptionalAverage(); - if (upload != null) data.Add("avgupload", upload.ToString()); - if (download != null) data.Add("avgdownload", download.ToString()); - } } } diff --git a/Tests/CodexTests/DownloadConnectivityTests/FullyConnectedDownloadTests.cs b/Tests/CodexTests/DownloadConnectivityTests/FullyConnectedDownloadTests.cs index e40d9fb..e6a42c5 100644 --- a/Tests/CodexTests/DownloadConnectivityTests/FullyConnectedDownloadTests.cs +++ b/Tests/CodexTests/DownloadConnectivityTests/FullyConnectedDownloadTests.cs @@ -1,4 +1,5 @@ using CodexContractsPlugin; +using CodexPlugin; using GethPlugin; using NUnit.Framework; using Utils; @@ -11,9 +12,9 @@ namespace CodexTests.DownloadConnectivityTests [Test] public void MetricsDoesNotInterfereWithPeerDownload() { - StartCodex(2, s => s.EnableMetrics()); + var nodes = StartCodex(2, s => s.EnableMetrics()); - AssertAllNodesConnected(); + AssertAllNodesConnected(nodes); } [Test] @@ -21,10 +22,10 @@ namespace CodexTests.DownloadConnectivityTests { var geth = Ci.StartGethNode(s => s.IsMiner()); var contracts = Ci.StartCodexContracts(geth); - StartCodex(2, s => s.EnableMarketplace(geth, contracts, m => m + var nodes = StartCodex(2, s => s.EnableMarketplace(geth, contracts, m => m .WithInitial(10.Eth(), 1000.TstWei()))); - AssertAllNodesConnected(); + AssertAllNodesConnected(nodes); } [Test] @@ -33,14 +34,14 @@ namespace CodexTests.DownloadConnectivityTests [Values(2, 5)] int numberOfNodes, [Values(1, 10)] int sizeMBs) { - StartCodex(numberOfNodes); + var nodes = StartCodex(numberOfNodes); - AssertAllNodesConnected(sizeMBs); + AssertAllNodesConnected(nodes, sizeMBs); } - private void AssertAllNodesConnected(int sizeMBs = 10) + private void AssertAllNodesConnected(IEnumerable nodes, int sizeMBs = 10) { - CreatePeerDownloadTestHelpers().AssertFullDownloadInterconnectivity(GetAllOnlineCodexNodes(), sizeMBs.MB()); + CreatePeerDownloadTestHelpers().AssertFullDownloadInterconnectivity(nodes, sizeMBs.MB()); } } } diff --git a/Tests/CodexTests/PeerDiscoveryTests/LayeredDiscoveryTests.cs b/Tests/CodexTests/PeerDiscoveryTests/LayeredDiscoveryTests.cs index 9c884fe..33225d6 100644 --- a/Tests/CodexTests/PeerDiscoveryTests/LayeredDiscoveryTests.cs +++ b/Tests/CodexTests/PeerDiscoveryTests/LayeredDiscoveryTests.cs @@ -1,4 +1,5 @@ -using NUnit.Framework; +using CodexPlugin; +using NUnit.Framework; namespace CodexTests.PeerDiscoveryTests { @@ -13,7 +14,7 @@ namespace CodexTests.PeerDiscoveryTests var l1Node = StartCodex(s => s.WithBootstrapNode(root)); var l2Target = StartCodex(s => s.WithBootstrapNode(l1Node)); - AssertAllNodesConnected(); + AssertAllNodesConnected(root, l1Source, l1Node, l2Target); } [Test] @@ -25,7 +26,7 @@ namespace CodexTests.PeerDiscoveryTests var l2Node = StartCodex(s => s.WithBootstrapNode(l1Node)); var l3Target = StartCodex(s => s.WithBootstrapNode(l2Node)); - AssertAllNodesConnected(); + AssertAllNodesConnected(root, l1Source, l1Node, l2Node, l3Target); } [TestCase(3)] @@ -33,18 +34,22 @@ namespace CodexTests.PeerDiscoveryTests [TestCase(10)] public void NodeChainTest(int chainLength) { + var nodes = new List(); var node = StartCodex(); + nodes.Add(node); + for (var i = 1; i < chainLength; i++) { node = StartCodex(s => s.WithBootstrapNode(node)); + nodes.Add(node); } - AssertAllNodesConnected(); + AssertAllNodesConnected(nodes.ToArray()); } - private void AssertAllNodesConnected() + private void AssertAllNodesConnected(params ICodexNode[] nodes) { - CreatePeerConnectionTestHelpers().AssertFullyConnected(GetAllOnlineCodexNodes()); + CreatePeerConnectionTestHelpers().AssertFullyConnected(nodes); } } } diff --git a/Tests/CodexTests/PeerDiscoveryTests/PeerDiscoveryTests.cs b/Tests/CodexTests/PeerDiscoveryTests/PeerDiscoveryTests.cs index 34da8d4..52619dd 100644 --- a/Tests/CodexTests/PeerDiscoveryTests/PeerDiscoveryTests.cs +++ b/Tests/CodexTests/PeerDiscoveryTests/PeerDiscoveryTests.cs @@ -21,9 +21,9 @@ namespace CodexTests.PeerDiscoveryTests [Test] public void MetricsDoesNotInterfereWithPeerDiscovery() { - StartCodex(2, s => s.EnableMetrics()); + var nodes = StartCodex(2, s => s.EnableMetrics()); - AssertAllNodesConnected(); + AssertAllNodesConnected(nodes); } [Test] @@ -31,10 +31,10 @@ namespace CodexTests.PeerDiscoveryTests { var geth = Ci.StartGethNode(s => s.IsMiner()); var contracts = Ci.StartCodexContracts(geth); - StartCodex(2, s => s.EnableMarketplace(geth, contracts, m => m + var nodes = StartCodex(2, s => s.EnableMarketplace(geth, contracts, m => m .WithInitial(10.Eth(), 1000.TstWei()))); - AssertAllNodesConnected(); + AssertAllNodesConnected(nodes); } [TestCase(2)] @@ -42,16 +42,15 @@ namespace CodexTests.PeerDiscoveryTests [TestCase(10)] public void VariableNodes(int number) { - StartCodex(number); + var nodes = StartCodex(number); - AssertAllNodesConnected(); + AssertAllNodesConnected(nodes); } - private void AssertAllNodesConnected() + private void AssertAllNodesConnected(IEnumerable nodes) { - var allNodes = GetAllOnlineCodexNodes(); - CreatePeerConnectionTestHelpers().AssertFullyConnected(allNodes); - CheckRoutingTable(allNodes); + CreatePeerConnectionTestHelpers().AssertFullyConnected(nodes); + CheckRoutingTable(nodes); } private void CheckRoutingTable(IEnumerable allNodes) diff --git a/Tests/DistTestCore/DistTest.cs b/Tests/DistTestCore/DistTest.cs index ed99fe9..7b1d09e 100644 --- a/Tests/DistTestCore/DistTest.cs +++ b/Tests/DistTestCore/DistTest.cs @@ -52,7 +52,7 @@ namespace DistTestCore { Stopwatch.Measure(fixtureLog, "Global setup", () => { - globalEntryPoint.Tools.CreateWorkflow().DeleteNamespacesStartingWith(TestNamespacePrefix); + globalEntryPoint.Tools.CreateWorkflow().DeleteNamespacesStartingWith(TestNamespacePrefix, wait: true); }); } catch (Exception ex) @@ -72,7 +72,8 @@ namespace DistTestCore globalEntryPoint.Decommission( // There shouldn't be any of either, but clean everything up regardless. deleteKubernetesResources: true, - deleteTrackedFiles: true + deleteTrackedFiles: true, + waitTillDone: true ); } @@ -185,7 +186,13 @@ namespace DistTestCore lock (lifecycleLock) { var testNamespace = TestNamespacePrefix + Guid.NewGuid().ToString(); - var lifecycle = new TestLifecycle(fixtureLog.CreateTestLog(), configuration, GetTimeSet(), testNamespace, deployId); + var lifecycle = new TestLifecycle( + fixtureLog.CreateTestLog(), + configuration, + GetTimeSet(), + testNamespace, + deployId, + ShouldWaitForCleanup()); lifecycles.Add(testName, lifecycle); LifecycleStart(lifecycle); } @@ -208,7 +215,7 @@ namespace DistTestCore IncludeLogsOnTestFailure(lifecycle); LifecycleStop(lifecycle); lifecycle.DeleteAllResources(); - lifecycle = null!; + lifecycles.Remove(GetCurrentTestName()); }); } @@ -235,6 +242,11 @@ namespace DistTestCore return new DefaultTimeSet(); } + private bool ShouldWaitForCleanup() + { + return CurrentTestMethodHasAttribute(); + } + private bool ShouldUseLongTimeouts() { return CurrentTestMethodHasAttribute(); diff --git a/Tests/DistTestCore/TestLifecycle.cs b/Tests/DistTestCore/TestLifecycle.cs index 542ca27..4191cd8 100644 --- a/Tests/DistTestCore/TestLifecycle.cs +++ b/Tests/DistTestCore/TestLifecycle.cs @@ -16,7 +16,7 @@ namespace DistTestCore private readonly List runningContainers = new(); private readonly string deployId; - public TestLifecycle(TestLog log, Configuration configuration, ITimeSet timeSet, string testNamespace, string deployId) + public TestLifecycle(TestLog log, Configuration configuration, ITimeSet timeSet, string testNamespace, string deployId, bool waitForCleanup) { Log = log; Configuration = configuration; @@ -27,7 +27,7 @@ namespace DistTestCore metadata = entryPoint.GetPluginMetadata(); CoreInterface = entryPoint.CreateInterface(); this.deployId = deployId; - + WaitForCleanup = waitForCleanup; log.WriteLogTag(); } @@ -35,13 +35,15 @@ namespace DistTestCore public TestLog Log { get; } public Configuration Configuration { get; } public ITimeSet TimeSet { get; } + public bool WaitForCleanup { get; } public CoreInterface CoreInterface { get; } public void DeleteAllResources() { entryPoint.Decommission( deleteKubernetesResources: true, - deleteTrackedFiles: true + deleteTrackedFiles: true, + waitTillDone: WaitForCleanup ); } diff --git a/Tests/DistTestCore/WaitForCleanupAttribute.cs b/Tests/DistTestCore/WaitForCleanupAttribute.cs new file mode 100644 index 0000000..928e3c2 --- /dev/null +++ b/Tests/DistTestCore/WaitForCleanupAttribute.cs @@ -0,0 +1,15 @@ +using NUnit.Framework; + +namespace DistTestCore +{ + /// + /// By default, test system does not wait until all resources are destroyed before starting the + /// next test. This saves a lot of time but it's not always what you want. + /// If you want to be sure the resources of your test are destroyed before the next test starts, + /// add this attribute to your test method. + /// + [AttributeUsage(AttributeTargets.Method, AllowMultiple = false)] + public class WaitForCleanupAttribute : PropertyAttribute + { + } +}