diff --git a/Framework/Core/Http.cs b/Framework/Core/Http.cs index 49b3b09..44cb3c6 100644 --- a/Framework/Core/Http.cs +++ b/Framework/Core/Http.cs @@ -7,6 +7,7 @@ namespace Core { T OnClient(Func action); T OnClient(Func action, string description); + T OnClient(Func action, Retry retry); IEndpoint CreateEndpoint(Address address, string baseUrl, string? logAlias = null); } @@ -35,13 +36,19 @@ namespace Core } public T OnClient(Func action, string description) + { + var retry = new Retry(description, timeSet.HttpRetryTimeout(), timeSet.HttpCallRetryDelay(), f => { }); + return OnClient(action, retry); + } + + public T OnClient(Func action, Retry retry) { var client = GetClient(); return LockRetry(() => { return action(client); - }, description); + }, retry); } public IEndpoint CreateEndpoint(Address address, string baseUrl, string? logAlias = null) @@ -54,11 +61,11 @@ namespace Core return DebugStack.GetCallerName(skipFrames: 2); } - private T LockRetry(Func operation, string description) + private T LockRetry(Func operation, Retry retry) { lock (httpLock) { - return Time.Retry(operation, timeSet.HttpRetryTimeout(), timeSet.HttpCallRetryDelay(), description); + return retry.Run(operation); } } diff --git a/Framework/Core/PluginTools.cs b/Framework/Core/PluginTools.cs index 5e1faee..78f7814 100644 --- a/Framework/Core/PluginTools.cs +++ b/Framework/Core/PluginTools.cs @@ -6,6 +6,7 @@ namespace Core { public interface IPluginTools : IWorkflowTool, ILogTool, IHttpFactoryTool, IFileTool { + ITimeSet TimeSet { get; } void Decommission(bool deleteKubernetesResources, bool deleteTrackedFiles); } @@ -33,7 +34,6 @@ namespace Core internal class PluginTools : IPluginTools { - private readonly ITimeSet timeSet; private readonly WorkflowCreator workflowCreator; private readonly IFileManager fileManager; private readonly LogPrefixer log; @@ -42,10 +42,12 @@ namespace Core { this.log = new LogPrefixer(log); this.workflowCreator = workflowCreator; - this.timeSet = timeSet; + TimeSet = timeSet; fileManager = new FileManager(log, fileManagerRootFolder); } + public ITimeSet TimeSet { get; } + public void ApplyLogPrefix(string prefix) { log.Prefix = prefix; @@ -53,7 +55,7 @@ namespace Core public IHttp CreateHttp(Action onClientCreated) { - return CreateHttp(onClientCreated, timeSet); + return CreateHttp(onClientCreated, TimeSet); } public IHttp CreateHttp(Action onClientCreated, ITimeSet ts) @@ -63,7 +65,7 @@ namespace Core public IHttp CreateHttp() { - return new Http(log, timeSet); + return new Http(log, TimeSet); } public IStartupWorkflow CreateWorkflow(string? namespaceOverride = null) diff --git a/Framework/Utils/Retry.cs b/Framework/Utils/Retry.cs index 45790e8..ec05ea6 100644 --- a/Framework/Utils/Retry.cs +++ b/Framework/Utils/Retry.cs @@ -1,37 +1,44 @@ - -namespace Utils +namespace Utils { - public class Retry + public class Retry { private readonly string description; - private readonly Func task; private readonly TimeSpan maxTimeout; - private readonly int maxRetries; private readonly TimeSpan sleepAfterFail; private readonly Action onFail; - public Retry(string description, Func task, TimeSpan maxTimeout, int maxRetries, TimeSpan sleepAfterFail, Action onFail) + public Retry(string description, TimeSpan maxTimeout, TimeSpan sleepAfterFail, Action onFail) { this.description = description; - this.task = task; this.maxTimeout = maxTimeout; - this.maxRetries = maxRetries; this.sleepAfterFail = sleepAfterFail; this.onFail = onFail; } - public T Run() + public void Run(Action task) { - var run = new RetryRun(description, task, maxTimeout, maxRetries, sleepAfterFail, onFail); - return run.Run(); + var run = new RetryRun(description, task, maxTimeout, sleepAfterFail, onFail); + run.Run(); + } + + public T Run(Func task) + { + T? result = default; + + var run = new RetryRun(description, () => + { + result = task(); + }, maxTimeout, sleepAfterFail, onFail); + run.Run(); + + return result!; } private class RetryRun { private readonly string description; - private readonly Func task; + private readonly Action task; private readonly TimeSpan maxTimeout; - private readonly int maxRetries; private readonly TimeSpan sleepAfterFail; private readonly Action onFail; private readonly DateTime start = DateTime.UtcNow; @@ -39,12 +46,11 @@ namespace Utils private int tryNumber; private DateTime tryStart; - public RetryRun(string description, Func task, TimeSpan maxTimeout, int maxRetries, TimeSpan sleepAfterFail, Action onFail) + public RetryRun(string description, Action task, TimeSpan maxTimeout, TimeSpan sleepAfterFail, Action onFail) { this.description = description; this.task = task; this.maxTimeout = maxTimeout; - this.maxRetries = maxRetries; this.sleepAfterFail = sleepAfterFail; this.onFail = onFail; @@ -52,7 +58,7 @@ namespace Utils tryStart = DateTime.UtcNow; } - public T Run() + public void Run() { while (true) { @@ -62,7 +68,8 @@ namespace Utils tryStart = DateTime.UtcNow; try { - return task(); + task(); + return; } catch (Exception ex) { @@ -83,7 +90,6 @@ namespace Utils private void CheckMaximums() { if (Duration() > maxTimeout) Fail(); - if (tryNumber > maxRetries) Fail(); } private void Fail() diff --git a/Framework/Utils/Time.cs b/Framework/Utils/Time.cs index 10caaf4..f22242f 100644 --- a/Framework/Utils/Time.cs +++ b/Framework/Utils/Time.cs @@ -111,78 +111,24 @@ namespace Utils public static void Retry(Action action, TimeSpan maxTimeout, TimeSpan retryTime, string description) { - var start = DateTime.UtcNow; - var tries = 1; - var tryInfo = new List<(Exception, TimeSpan)>(); - - while (true) - { - var duration = DateTime.UtcNow - start; - if (duration > maxTimeout) - { - var info = FormatTryInfos(tryInfo); - throw new TimeoutException($"Retry '{description}' timed out after {tries} tries over {FormatDuration(duration)}.{Environment.NewLine}{info}"); - } - - var sw = Stopwatch.StartNew(); - try - { - action(); - return; - } - catch (Exception ex) - { - tryInfo.Add((ex, sw.Elapsed)); - tries++; - } - - Sleep(retryTime); - } - } - - private static string FormatTryInfos(List<(Exception, TimeSpan)> tryInfo) - { - return string.Join(Environment.NewLine, tryInfo.Select(FormatTryInfo).ToArray()); - } - - private static string FormatTryInfo((Exception, TimeSpan) info, int index) - { - return $"Attempt {index} took {FormatDuration(info.Item2)} and failed with exception {info.Item1}."; - } - - private static Action failedCallback = i => { }; - public static void SetRetryFailedCallback(Action onRetryFailed) - { - failedCallback = onRetryFailed; + Retry(action, maxTimeout, retryTime, description, f => { }); } public static T Retry(Func action, TimeSpan maxTimeout, TimeSpan retryTime, string description) { - var start = DateTime.UtcNow; - var tries = 1; - var exceptions = new List(); + return Retry(action, maxTimeout, retryTime, description, f => { }); + } - while (true) - { - var duration = DateTime.UtcNow - start; - if (duration > maxTimeout) - { - throw new TimeoutException($"Retry '{description}' timed out after {tries} tries over {FormatDuration(duration)}.", new AggregateException(exceptions)); - } + public static void Retry(Action action, TimeSpan maxTimeout, TimeSpan retryTime, string description, Action onFail) + { + var r = new Retry(description, maxTimeout, retryTime, onFail); + r.Run(action); + } - try - { - return action(); - } - catch (Exception ex) - { - exceptions.Add(ex); - failedCallback(tries); - tries++; - } - - Sleep(retryTime); - } + public static T Retry(Func action, TimeSpan maxTimeout, TimeSpan retryTime, string description, Action onFail) + { + var r = new Retry(description, maxTimeout, retryTime, onFail); + return r.Run(action); } } } diff --git a/ProjectPlugins/CodexPlugin/CodexAccess.cs b/ProjectPlugins/CodexPlugin/CodexAccess.cs index 121ccf9..78a3894 100644 --- a/ProjectPlugins/CodexPlugin/CodexAccess.cs +++ b/ProjectPlugins/CodexPlugin/CodexAccess.cs @@ -2,6 +2,7 @@ using Core; using KubernetesWorkflow; using KubernetesWorkflow.Types; +using Logging; using Newtonsoft.Json; using Utils; @@ -61,12 +62,17 @@ namespace CodexPlugin public string UploadFile(FileStream fileStream) { - return OnCodex(api => api.UploadAsync(fileStream)); + return OnCodex( + api => api.UploadAsync(fileStream), + CreateRetryConfig(nameof(UploadFile))); } public Stream DownloadFile(string contentId) { - var fileResponse = OnCodex(api => api.DownloadNetworkAsync(contentId)); + var fileResponse = OnCodex( + api => api.DownloadNetworkAsync(contentId), + CreateRetryConfig(nameof(DownloadFile))); + if (fileResponse.StatusCode != 200) throw new Exception("Download failed with StatusCode: " + fileResponse.StatusCode); return fileResponse.Stream; } @@ -89,6 +95,12 @@ namespace CodexPlugin return OnCodex(api => api.CreateStorageRequestAsync(request.ContentId.Id, body)); } + public CodexSpace Space() + { + var space = OnCodex(api => api.SpaceAsync()); + return mapper.Map(space); + } + public StoragePurchase GetPurchaseStatus(string purchaseId) { var endpoint = GetEndpoint(); @@ -116,17 +128,24 @@ namespace CodexPlugin private T OnCodex(Func> action) { - var address = GetAddress(); - var result = tools.CreateHttp(CheckContainerCrashed) - .OnClient(client => - { - var api = new CodexApi(client); - api.BaseUrl = $"{address.Host}:{address.Port}/api/codex/v1"; - return Time.Wait(action(api)); - }); + var result = tools.CreateHttp(CheckContainerCrashed).OnClient(client => CallCodex(client, action)); return result; } + private T OnCodex(Func> action, Retry retry) + { + var result = tools.CreateHttp(CheckContainerCrashed).OnClient(client => CallCodex(client, action), retry); + return result; + } + + private T CallCodex(HttpClient client, Func> action) + { + var address = GetAddress(); + var api = new CodexApi(client); + api.BaseUrl = $"{address.Host}:{address.Port}/api/codex/v1"; + return Time.Wait(action(api)); + } + private IEndpoint GetEndpoint() { return tools @@ -144,7 +163,7 @@ namespace CodexPlugin if (hasContainerCrashed) throw new Exception($"Container {GetName()} has crashed."); } - public void Log(Stream crashLog) + void ILogHandler.Log(Stream crashLog) { var log = tools.GetLog(); var file = log.CreateSubfile(); @@ -162,5 +181,80 @@ namespace CodexPlugin log.Log("Crash log successfully downloaded."); hasContainerCrashed = true; } + + private Retry CreateRetryConfig(string description) + { + var timeSet = tools.TimeSet; + var log = tools.GetLog(); + + return new Retry(description, timeSet.HttpRetryTimeout(), timeSet.HttpCallRetryDelay(), failure => + { + if (failure.TryNumber < 3) return; + if (failure.Duration.TotalSeconds < timeSet.HttpCallTimeout().TotalSeconds) + { + Investigate(log, failure, timeSet); + } + }); + } + + private void Investigate(ILog log, Failure failure, ITimeSet timeSet) + { + log.Log($"Retry {failure.TryNumber} took {Time.FormatDuration(failure.Duration)}. (HTTP timeout = {Time.FormatDuration(timeSet.HttpCallTimeout())}) " + + $"Checking if node responds to debug/info..."); + try + { + var debugInfo = GetDebugInfo(); + if (string.IsNullOrEmpty(debugInfo.Spr)) + { + log.Log("Did not get value debug/info response."); + DownloadLog(); + Throw(failure); + } + else + { + log.Log("Got valid response from debug/info. Checking storage statistics..."); + CheckSpaceStatistics(log, failure); + } + } + catch (Exception ex) + { + log.Log("Got exception from debug/info call: " + ex); + DownloadLog(); + Throw(failure); + } + } + + private void CheckSpaceStatistics(ILog log, Failure failure) + { + try + { + var space = Space(); + log.Log($"Got space statistics: {space}"); + var freeSpace = space.QuotaMaxBytes - (space.QuotaUsedBytes + space.QuotaReservedBytes); + log.Log($"Free space: {freeSpace}"); + + if (freeSpace < 1.MB().SizeInBytes) + { + log.Log("There's less than 1MB free. Stopping..."); + Throw(failure); + } + } + catch (Exception e) + { + log.Log("Failed to get space statistics: " + e); + DownloadLog(); + Throw(failure); + } + } + + private void Throw(Failure failure) + { + throw failure.Exception; + } + + private void DownloadLog() + { + tools.CreateWorkflow().DownloadContainerLog(Container.Containers.Single(), this); + } } } diff --git a/ProjectPlugins/CodexPlugin/CodexNode.cs b/ProjectPlugins/CodexPlugin/CodexNode.cs index 034fe63..dc8531c 100644 --- a/ProjectPlugins/CodexPlugin/CodexNode.cs +++ b/ProjectPlugins/CodexPlugin/CodexNode.cs @@ -17,6 +17,7 @@ namespace CodexPlugin ContentId UploadFile(TrackedFile file); TrackedFile? DownloadContent(ContentId contentId, string fileLabel = ""); LocalDatasetList LocalFiles(); + CodexSpace Space(); void ConnectToPeer(ICodexNode node); DebugInfoVersion Version { get; } IMarketplaceAccess Marketplace { get; } @@ -126,6 +127,11 @@ namespace CodexPlugin return CodexAccess.LocalFiles(); } + public CodexSpace Space() + { + return CodexAccess.Space(); + } + public void ConnectToPeer(ICodexNode node) { var peer = (CodexNode)node; diff --git a/ProjectPlugins/CodexPlugin/CodexTypes.cs b/ProjectPlugins/CodexPlugin/CodexTypes.cs index 945251e..b82aa78 100644 --- a/ProjectPlugins/CodexPlugin/CodexTypes.cs +++ b/ProjectPlugins/CodexPlugin/CodexTypes.cs @@ -105,4 +105,17 @@ namespace CodexPlugin return HashCode.Combine(Id); } } + + public class CodexSpace + { + public int TotalBlocks { get; set; } + public int QuotaMaxBytes { get; set; } + public int QuotaUsedBytes { get; set; } + public int QuotaReservedBytes { get; set; } + + public override string ToString() + { + return JsonConvert.SerializeObject(this); + } + } } diff --git a/ProjectPlugins/CodexPlugin/Mapper.cs b/ProjectPlugins/CodexPlugin/Mapper.cs index 61c95b4..0c695e0 100644 --- a/ProjectPlugins/CodexPlugin/Mapper.cs +++ b/ProjectPlugins/CodexPlugin/Mapper.cs @@ -1,4 +1,5 @@ using CodexContractsPlugin; +using CodexOpenApi; using Newtonsoft.Json.Linq; using System.Numerics; using Utils; @@ -84,6 +85,17 @@ namespace CodexPlugin }; } + public CodexSpace Map(Space space) + { + return new CodexSpace + { + QuotaMaxBytes = space.QuotaMaxBytes, + QuotaReservedBytes = space.QuotaReservedBytes, + QuotaUsedBytes = space.QuotaUsedBytes, + TotalBlocks = space.TotalBlocks + }; + } + private DebugInfoVersion MapDebugInfoVersion(JObject obj) { return new DebugInfoVersion diff --git a/Tests/CodexLongTests/BasicTests/LargeFileTests.cs b/Tests/CodexLongTests/BasicTests/LargeFileTests.cs index d182ed8..0470e35 100644 --- a/Tests/CodexLongTests/BasicTests/LargeFileTests.cs +++ b/Tests/CodexLongTests/BasicTests/LargeFileTests.cs @@ -50,8 +50,6 @@ namespace CodexLongTests.BasicTests var node = StartCodex(s => s.WithStorageQuota((size + 10).MB())); - Time.SetRetryFailedCallback(i => OnFailed(i, node)); - var uploadStart = DateTime.UtcNow; var cid = node.UploadFile(expectedFile); var downloadStart = DateTime.UtcNow; @@ -62,17 +60,6 @@ namespace CodexLongTests.BasicTests AssertTimeConstraint(uploadStart, downloadStart, downloadFinished, size); } - private void OnFailed(int tries, ICodexNode node) - { - if (tries < 5) return; - - if (tries % 10 == 0) - { - Log($"After try {tries}, downloading node log."); - Ci.DownloadLog(node); - } - } - private void AssertTimeConstraint(DateTime uploadStart, DateTime downloadStart, DateTime downloadFinished, long size) { float sizeInMB = size;