Upload/download failure now automatically investigates node

This commit is contained in:
Ben 2024-06-05 09:20:00 +02:00
parent 11b866986d
commit e9555cf99e
No known key found for this signature in database
GPG Key ID: 541B9D8C9F1426A1
9 changed files with 188 additions and 115 deletions

View File

@ -7,6 +7,7 @@ namespace Core
{
T OnClient<T>(Func<HttpClient, T> action);
T OnClient<T>(Func<HttpClient, T> action, string description);
T OnClient<T>(Func<HttpClient, T> action, Retry retry);
IEndpoint CreateEndpoint(Address address, string baseUrl, string? logAlias = null);
}
@ -35,13 +36,19 @@ namespace Core
}
public T OnClient<T>(Func<HttpClient, T> action, string description)
{
var retry = new Retry(description, timeSet.HttpRetryTimeout(), timeSet.HttpCallRetryDelay(), f => { });
return OnClient(action, retry);
}
public T OnClient<T>(Func<HttpClient, T> action, Retry retry)
{
var client = GetClient();
return LockRetry(() =>
{
return action(client);
}, description);
}, retry);
}
public IEndpoint CreateEndpoint(Address address, string baseUrl, string? logAlias = null)
@ -54,11 +61,11 @@ namespace Core
return DebugStack.GetCallerName(skipFrames: 2);
}
private T LockRetry<T>(Func<T> operation, string description)
private T LockRetry<T>(Func<T> operation, Retry retry)
{
lock (httpLock)
{
return Time.Retry(operation, timeSet.HttpRetryTimeout(), timeSet.HttpCallRetryDelay(), description);
return retry.Run(operation);
}
}

View File

@ -6,6 +6,7 @@ namespace Core
{
public interface IPluginTools : IWorkflowTool, ILogTool, IHttpFactoryTool, IFileTool
{
ITimeSet TimeSet { get; }
void Decommission(bool deleteKubernetesResources, bool deleteTrackedFiles);
}
@ -33,7 +34,6 @@ namespace Core
internal class PluginTools : IPluginTools
{
private readonly ITimeSet timeSet;
private readonly WorkflowCreator workflowCreator;
private readonly IFileManager fileManager;
private readonly LogPrefixer log;
@ -42,10 +42,12 @@ namespace Core
{
this.log = new LogPrefixer(log);
this.workflowCreator = workflowCreator;
this.timeSet = timeSet;
TimeSet = timeSet;
fileManager = new FileManager(log, fileManagerRootFolder);
}
public ITimeSet TimeSet { get; }
public void ApplyLogPrefix(string prefix)
{
log.Prefix = prefix;
@ -53,7 +55,7 @@ namespace Core
public IHttp CreateHttp(Action<HttpClient> onClientCreated)
{
return CreateHttp(onClientCreated, timeSet);
return CreateHttp(onClientCreated, TimeSet);
}
public IHttp CreateHttp(Action<HttpClient> onClientCreated, ITimeSet ts)
@ -63,7 +65,7 @@ namespace Core
public IHttp CreateHttp()
{
return new Http(log, timeSet);
return new Http(log, TimeSet);
}
public IStartupWorkflow CreateWorkflow(string? namespaceOverride = null)

View File

@ -1,37 +1,44 @@

namespace Utils
namespace Utils
{
public class Retry<T>
public class Retry
{
private readonly string description;
private readonly Func<T> task;
private readonly TimeSpan maxTimeout;
private readonly int maxRetries;
private readonly TimeSpan sleepAfterFail;
private readonly Action<Failure> onFail;
public Retry(string description, Func<T> task, TimeSpan maxTimeout, int maxRetries, TimeSpan sleepAfterFail, Action<Failure> onFail)
public Retry(string description, TimeSpan maxTimeout, TimeSpan sleepAfterFail, Action<Failure> onFail)
{
this.description = description;
this.task = task;
this.maxTimeout = maxTimeout;
this.maxRetries = maxRetries;
this.sleepAfterFail = sleepAfterFail;
this.onFail = onFail;
}
public T Run()
public void Run(Action task)
{
var run = new RetryRun(description, task, maxTimeout, maxRetries, sleepAfterFail, onFail);
return run.Run();
var run = new RetryRun(description, task, maxTimeout, sleepAfterFail, onFail);
run.Run();
}
public T Run<T>(Func<T> task)
{
T? result = default;
var run = new RetryRun(description, () =>
{
result = task();
}, maxTimeout, sleepAfterFail, onFail);
run.Run();
return result!;
}
private class RetryRun
{
private readonly string description;
private readonly Func<T> task;
private readonly Action task;
private readonly TimeSpan maxTimeout;
private readonly int maxRetries;
private readonly TimeSpan sleepAfterFail;
private readonly Action<Failure> onFail;
private readonly DateTime start = DateTime.UtcNow;
@ -39,12 +46,11 @@ namespace Utils
private int tryNumber;
private DateTime tryStart;
public RetryRun(string description, Func<T> task, TimeSpan maxTimeout, int maxRetries, TimeSpan sleepAfterFail, Action<Failure> onFail)
public RetryRun(string description, Action task, TimeSpan maxTimeout, TimeSpan sleepAfterFail, Action<Failure> onFail)
{
this.description = description;
this.task = task;
this.maxTimeout = maxTimeout;
this.maxRetries = maxRetries;
this.sleepAfterFail = sleepAfterFail;
this.onFail = onFail;
@ -52,7 +58,7 @@ namespace Utils
tryStart = DateTime.UtcNow;
}
public T Run()
public void Run()
{
while (true)
{
@ -62,7 +68,8 @@ namespace Utils
tryStart = DateTime.UtcNow;
try
{
return task();
task();
return;
}
catch (Exception ex)
{
@ -83,7 +90,6 @@ namespace Utils
private void CheckMaximums()
{
if (Duration() > maxTimeout) Fail();
if (tryNumber > maxRetries) Fail();
}
private void Fail()

View File

@ -111,78 +111,24 @@ namespace Utils
public static void Retry(Action action, TimeSpan maxTimeout, TimeSpan retryTime, string description)
{
var start = DateTime.UtcNow;
var tries = 1;
var tryInfo = new List<(Exception, TimeSpan)>();
while (true)
{
var duration = DateTime.UtcNow - start;
if (duration > maxTimeout)
{
var info = FormatTryInfos(tryInfo);
throw new TimeoutException($"Retry '{description}' timed out after {tries} tries over {FormatDuration(duration)}.{Environment.NewLine}{info}");
}
var sw = Stopwatch.StartNew();
try
{
action();
return;
}
catch (Exception ex)
{
tryInfo.Add((ex, sw.Elapsed));
tries++;
}
Sleep(retryTime);
}
}
private static string FormatTryInfos(List<(Exception, TimeSpan)> tryInfo)
{
return string.Join(Environment.NewLine, tryInfo.Select(FormatTryInfo).ToArray());
}
private static string FormatTryInfo((Exception, TimeSpan) info, int index)
{
return $"Attempt {index} took {FormatDuration(info.Item2)} and failed with exception {info.Item1}.";
}
private static Action<int> failedCallback = i => { };
public static void SetRetryFailedCallback(Action<int> onRetryFailed)
{
failedCallback = onRetryFailed;
Retry(action, maxTimeout, retryTime, description, f => { });
}
public static T Retry<T>(Func<T> action, TimeSpan maxTimeout, TimeSpan retryTime, string description)
{
var start = DateTime.UtcNow;
var tries = 1;
var exceptions = new List<Exception>();
return Retry(action, maxTimeout, retryTime, description, f => { });
}
while (true)
{
var duration = DateTime.UtcNow - start;
if (duration > maxTimeout)
{
throw new TimeoutException($"Retry '{description}' timed out after {tries} tries over {FormatDuration(duration)}.", new AggregateException(exceptions));
}
public static void Retry(Action action, TimeSpan maxTimeout, TimeSpan retryTime, string description, Action<Failure> onFail)
{
var r = new Retry(description, maxTimeout, retryTime, onFail);
r.Run(action);
}
try
{
return action();
}
catch (Exception ex)
{
exceptions.Add(ex);
failedCallback(tries);
tries++;
}
Sleep(retryTime);
}
public static T Retry<T>(Func<T> action, TimeSpan maxTimeout, TimeSpan retryTime, string description, Action<Failure> onFail)
{
var r = new Retry(description, maxTimeout, retryTime, onFail);
return r.Run(action);
}
}
}

View File

@ -2,6 +2,7 @@
using Core;
using KubernetesWorkflow;
using KubernetesWorkflow.Types;
using Logging;
using Newtonsoft.Json;
using Utils;
@ -61,12 +62,17 @@ namespace CodexPlugin
public string UploadFile(FileStream fileStream)
{
return OnCodex(api => api.UploadAsync(fileStream));
return OnCodex(
api => api.UploadAsync(fileStream),
CreateRetryConfig(nameof(UploadFile)));
}
public Stream DownloadFile(string contentId)
{
var fileResponse = OnCodex(api => api.DownloadNetworkAsync(contentId));
var fileResponse = OnCodex(
api => api.DownloadNetworkAsync(contentId),
CreateRetryConfig(nameof(DownloadFile)));
if (fileResponse.StatusCode != 200) throw new Exception("Download failed with StatusCode: " + fileResponse.StatusCode);
return fileResponse.Stream;
}
@ -89,6 +95,12 @@ namespace CodexPlugin
return OnCodex<string>(api => api.CreateStorageRequestAsync(request.ContentId.Id, body));
}
public CodexSpace Space()
{
var space = OnCodex<Space>(api => api.SpaceAsync());
return mapper.Map(space);
}
public StoragePurchase GetPurchaseStatus(string purchaseId)
{
var endpoint = GetEndpoint();
@ -116,17 +128,24 @@ namespace CodexPlugin
private T OnCodex<T>(Func<CodexApi, Task<T>> action)
{
var address = GetAddress();
var result = tools.CreateHttp(CheckContainerCrashed)
.OnClient(client =>
{
var api = new CodexApi(client);
api.BaseUrl = $"{address.Host}:{address.Port}/api/codex/v1";
return Time.Wait(action(api));
});
var result = tools.CreateHttp(CheckContainerCrashed).OnClient(client => CallCodex(client, action));
return result;
}
private T OnCodex<T>(Func<CodexApi, Task<T>> action, Retry retry)
{
var result = tools.CreateHttp(CheckContainerCrashed).OnClient(client => CallCodex(client, action), retry);
return result;
}
private T CallCodex<T>(HttpClient client, Func<CodexApi, Task<T>> action)
{
var address = GetAddress();
var api = new CodexApi(client);
api.BaseUrl = $"{address.Host}:{address.Port}/api/codex/v1";
return Time.Wait(action(api));
}
private IEndpoint GetEndpoint()
{
return tools
@ -144,7 +163,7 @@ namespace CodexPlugin
if (hasContainerCrashed) throw new Exception($"Container {GetName()} has crashed.");
}
public void Log(Stream crashLog)
void ILogHandler.Log(Stream crashLog)
{
var log = tools.GetLog();
var file = log.CreateSubfile();
@ -162,5 +181,80 @@ namespace CodexPlugin
log.Log("Crash log successfully downloaded.");
hasContainerCrashed = true;
}
private Retry CreateRetryConfig(string description)
{
var timeSet = tools.TimeSet;
var log = tools.GetLog();
return new Retry(description, timeSet.HttpRetryTimeout(), timeSet.HttpCallRetryDelay(), failure =>
{
if (failure.TryNumber < 3) return;
if (failure.Duration.TotalSeconds < timeSet.HttpCallTimeout().TotalSeconds)
{
Investigate(log, failure, timeSet);
}
});
}
private void Investigate(ILog log, Failure failure, ITimeSet timeSet)
{
log.Log($"Retry {failure.TryNumber} took {Time.FormatDuration(failure.Duration)}. (HTTP timeout = {Time.FormatDuration(timeSet.HttpCallTimeout())}) " +
$"Checking if node responds to debug/info...");
try
{
var debugInfo = GetDebugInfo();
if (string.IsNullOrEmpty(debugInfo.Spr))
{
log.Log("Did not get value debug/info response.");
DownloadLog();
Throw(failure);
}
else
{
log.Log("Got valid response from debug/info. Checking storage statistics...");
CheckSpaceStatistics(log, failure);
}
}
catch (Exception ex)
{
log.Log("Got exception from debug/info call: " + ex);
DownloadLog();
Throw(failure);
}
}
private void CheckSpaceStatistics(ILog log, Failure failure)
{
try
{
var space = Space();
log.Log($"Got space statistics: {space}");
var freeSpace = space.QuotaMaxBytes - (space.QuotaUsedBytes + space.QuotaReservedBytes);
log.Log($"Free space: {freeSpace}");
if (freeSpace < 1.MB().SizeInBytes)
{
log.Log("There's less than 1MB free. Stopping...");
Throw(failure);
}
}
catch (Exception e)
{
log.Log("Failed to get space statistics: " + e);
DownloadLog();
Throw(failure);
}
}
private void Throw(Failure failure)
{
throw failure.Exception;
}
private void DownloadLog()
{
tools.CreateWorkflow().DownloadContainerLog(Container.Containers.Single(), this);
}
}
}

View File

@ -17,6 +17,7 @@ namespace CodexPlugin
ContentId UploadFile(TrackedFile file);
TrackedFile? DownloadContent(ContentId contentId, string fileLabel = "");
LocalDatasetList LocalFiles();
CodexSpace Space();
void ConnectToPeer(ICodexNode node);
DebugInfoVersion Version { get; }
IMarketplaceAccess Marketplace { get; }
@ -126,6 +127,11 @@ namespace CodexPlugin
return CodexAccess.LocalFiles();
}
public CodexSpace Space()
{
return CodexAccess.Space();
}
public void ConnectToPeer(ICodexNode node)
{
var peer = (CodexNode)node;

View File

@ -105,4 +105,17 @@ namespace CodexPlugin
return HashCode.Combine(Id);
}
}
public class CodexSpace
{
public int TotalBlocks { get; set; }
public int QuotaMaxBytes { get; set; }
public int QuotaUsedBytes { get; set; }
public int QuotaReservedBytes { get; set; }
public override string ToString()
{
return JsonConvert.SerializeObject(this);
}
}
}

View File

@ -1,4 +1,5 @@
using CodexContractsPlugin;
using CodexOpenApi;
using Newtonsoft.Json.Linq;
using System.Numerics;
using Utils;
@ -84,6 +85,17 @@ namespace CodexPlugin
};
}
public CodexSpace Map(Space space)
{
return new CodexSpace
{
QuotaMaxBytes = space.QuotaMaxBytes,
QuotaReservedBytes = space.QuotaReservedBytes,
QuotaUsedBytes = space.QuotaUsedBytes,
TotalBlocks = space.TotalBlocks
};
}
private DebugInfoVersion MapDebugInfoVersion(JObject obj)
{
return new DebugInfoVersion

View File

@ -50,8 +50,6 @@ namespace CodexLongTests.BasicTests
var node = StartCodex(s => s.WithStorageQuota((size + 10).MB()));
Time.SetRetryFailedCallback(i => OnFailed(i, node));
var uploadStart = DateTime.UtcNow;
var cid = node.UploadFile(expectedFile);
var downloadStart = DateTime.UtcNow;
@ -62,17 +60,6 @@ namespace CodexLongTests.BasicTests
AssertTimeConstraint(uploadStart, downloadStart, downloadFinished, size);
}
private void OnFailed(int tries, ICodexNode node)
{
if (tries < 5) return;
if (tries % 10 == 0)
{
Log($"After try {tries}, downloading node log.");
Ci.DownloadLog(node);
}
}
private void AssertTimeConstraint(DateTime uploadStart, DateTime downloadStart, DateTime downloadFinished, long size)
{
float sizeInMB = size;