Implements folder-storing
This commit is contained in:
parent
b54c9ff9a3
commit
c35784c90f
@ -140,7 +140,7 @@ namespace CodexPlugin
|
|||||||
|
|
||||||
public ContentId UploadFile(TrackedFile file, Action<Failure> onFailure)
|
public ContentId UploadFile(TrackedFile file, Action<Failure> onFailure)
|
||||||
{
|
{
|
||||||
return UploadFile(file, "application/x-binary", $"attachment; filename=\"{file.Filename}\"", onFailure);
|
return UploadFile(file, "application/octet-stream", $"attachment; filename=\"{file.Filename}\"", onFailure);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ContentId UploadFile(TrackedFile file, string contentType, string contentDisposition, Action<Failure> onFailure)
|
public ContentId UploadFile(TrackedFile file, string contentType, string contentDisposition, Action<Failure> onFailure)
|
||||||
|
@ -40,6 +40,12 @@ namespace CodexPlugin
|
|||||||
public string State { get; set; } = string.Empty;
|
public string State { get; set; } = string.Empty;
|
||||||
public string Error { get; set; } = string.Empty;
|
public string Error { get; set; } = string.Empty;
|
||||||
public StorageRequest Request { get; set; } = null!;
|
public StorageRequest Request { get; set; } = null!;
|
||||||
|
|
||||||
|
public bool IsCancelled => State.ToLowerInvariant().Contains("cancel");
|
||||||
|
public bool IsError => State.ToLowerInvariant().Contains("error");
|
||||||
|
public bool IsFinished => State.ToLowerInvariant().Contains("finished");
|
||||||
|
public bool IsStarted => State.ToLowerInvariant().Contains("started");
|
||||||
|
public bool IsSubmitted => State.ToLowerInvariant().Contains("submitted");
|
||||||
}
|
}
|
||||||
|
|
||||||
public class StorageRequest
|
public class StorageRequest
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
using Logging;
|
using AutoClient.Modes;
|
||||||
|
using Logging;
|
||||||
|
|
||||||
namespace AutoClient
|
namespace AutoClient
|
||||||
{
|
{
|
||||||
@ -19,6 +20,15 @@ namespace AutoClient
|
|||||||
new FileLog(Path.Combine(config.LogPath, "performance")),
|
new FileLog(Path.Combine(config.LogPath, "performance")),
|
||||||
new ConsoleLog()
|
new ConsoleLog()
|
||||||
));
|
));
|
||||||
|
|
||||||
|
if (!string.IsNullOrEmpty(config.FolderToStore))
|
||||||
|
{
|
||||||
|
FolderWorkDispatcher = new FolderWorkDispatcher(config.FolderToStore);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
FolderWorkDispatcher = null!;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Configuration Config { get; }
|
public Configuration Config { get; }
|
||||||
@ -27,6 +37,7 @@ namespace AutoClient
|
|||||||
public CancellationTokenSource Cts { get; } = new CancellationTokenSource();
|
public CancellationTokenSource Cts { get; } = new CancellationTokenSource();
|
||||||
public CidRepo CidRepo { get; }
|
public CidRepo CidRepo { get; }
|
||||||
public Performance Performance { get; }
|
public Performance Performance { get; }
|
||||||
|
public FolderWorkDispatcher FolderWorkDispatcher { get; }
|
||||||
|
|
||||||
private IFileGenerator CreateGenerator()
|
private IFileGenerator CreateGenerator()
|
||||||
{
|
{
|
||||||
|
@ -9,13 +9,15 @@ namespace AutoClient
|
|||||||
public class AutomaticPurchaser
|
public class AutomaticPurchaser
|
||||||
{
|
{
|
||||||
private readonly ILog log;
|
private readonly ILog log;
|
||||||
private readonly ICodexInstance codex;
|
private readonly ICodexInstance instance;
|
||||||
|
private readonly CodexNode codex;
|
||||||
private Task workerTask = Task.CompletedTask;
|
private Task workerTask = Task.CompletedTask;
|
||||||
private App app => codex.App;
|
private App app => instance.App;
|
||||||
|
|
||||||
public AutomaticPurchaser(ILog log, ICodexInstance codex)
|
public AutomaticPurchaser(ILog log, ICodexInstance instance, CodexNode codex)
|
||||||
{
|
{
|
||||||
this.log = log;
|
this.log = log;
|
||||||
|
this.instance = instance;
|
||||||
this.codex = codex;
|
this.codex = codex;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -50,28 +52,16 @@ namespace AutoClient
|
|||||||
|
|
||||||
private async Task DownloadForeignCid()
|
private async Task DownloadForeignCid()
|
||||||
{
|
{
|
||||||
var cid = app.CidRepo.GetForeignCid(codex.NodeId);
|
var cid = app.CidRepo.GetForeignCid(instance.NodeId);
|
||||||
if (cid == null) return;
|
if (cid == null) return;
|
||||||
|
|
||||||
var size = app.CidRepo.GetSizeForCid(cid);
|
var size = app.CidRepo.GetSizeForCid(cid);
|
||||||
if (size == null) return;
|
if (size == null) return;
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
var sw = System.Diagnostics.Stopwatch.StartNew();
|
|
||||||
var filename = Guid.NewGuid().ToString().ToLowerInvariant();
|
var filename = Guid.NewGuid().ToString().ToLowerInvariant();
|
||||||
{
|
await codex.DownloadCid(filename, cid, size);
|
||||||
using var fileStream = File.OpenWrite(filename);
|
|
||||||
var fileResponse = await codex.Codex.DownloadNetworkStreamAsync(cid);
|
DeleteFile(filename);
|
||||||
fileResponse.Stream.CopyTo(fileStream);
|
|
||||||
}
|
|
||||||
var time = sw.Elapsed;
|
|
||||||
File.Delete(filename);
|
|
||||||
app.Performance.DownloadSuccessful(size.Value, time);
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
app.Performance.DownloadFailed(ex);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task<string> StartNewPurchase()
|
private async Task<string> StartNewPurchase()
|
||||||
@ -79,8 +69,8 @@ namespace AutoClient
|
|||||||
var file = await CreateFile();
|
var file = await CreateFile();
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var cid = await UploadFile(file);
|
var cid = await codex.UploadFile(file);
|
||||||
return await RequestStorage(cid);
|
return await codex.RequestStorage(cid);
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
@ -105,85 +95,6 @@ namespace AutoClient
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task<ContentId> UploadFile(string filename)
|
|
||||||
{
|
|
||||||
using var fileStream = File.OpenRead(filename);
|
|
||||||
try
|
|
||||||
{
|
|
||||||
var info = new FileInfo(filename);
|
|
||||||
var sw = System.Diagnostics.Stopwatch.StartNew();
|
|
||||||
var cid = await UploadStream(fileStream, filename);
|
|
||||||
var time = sw.Elapsed;
|
|
||||||
app.Performance.UploadSuccessful(info.Length, time);
|
|
||||||
app.CidRepo.Add(codex.NodeId, cid.Id, info.Length);
|
|
||||||
return cid;
|
|
||||||
}
|
|
||||||
catch (Exception exc)
|
|
||||||
{
|
|
||||||
app.Performance.UploadFailed(exc);
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private async Task<ContentId> UploadStream(FileStream fileStream, string filename)
|
|
||||||
{
|
|
||||||
log.Debug($"Uploading file...");
|
|
||||||
var response = await codex.Codex.UploadAsync(
|
|
||||||
content_type: "application/x-binary",
|
|
||||||
content_disposition: $"attachment; filename=\"{filename}\"",
|
|
||||||
fileStream, app.Cts.Token);
|
|
||||||
|
|
||||||
if (string.IsNullOrEmpty(response)) FrameworkAssert.Fail("Received empty response.");
|
|
||||||
if (response.StartsWith("Unable to store block")) FrameworkAssert.Fail("Node failed to store block.");
|
|
||||||
|
|
||||||
log.Debug($"Uploaded file. Received contentId: '{response}'.");
|
|
||||||
return new ContentId(response);
|
|
||||||
}
|
|
||||||
|
|
||||||
private async Task<string> RequestStorage(ContentId cid)
|
|
||||||
{
|
|
||||||
log.Debug("Requesting storage for " + cid.Id);
|
|
||||||
var result = await codex.Codex.CreateStorageRequestAsync(cid.Id, new StorageRequestCreation()
|
|
||||||
{
|
|
||||||
Collateral = app.Config.RequiredCollateral.ToString(),
|
|
||||||
Duration = (app.Config.ContractDurationMinutes * 60).ToString(),
|
|
||||||
Expiry = (app.Config.ContractExpiryMinutes * 60).ToString(),
|
|
||||||
Nodes = app.Config.NumHosts,
|
|
||||||
Reward = app.Config.Price.ToString(),
|
|
||||||
ProofProbability = "15",
|
|
||||||
Tolerance = app.Config.HostTolerance
|
|
||||||
}, app.Cts.Token);
|
|
||||||
|
|
||||||
log.Debug("Purchase ID: " + result);
|
|
||||||
|
|
||||||
var encoded = await GetEncodedCid(result);
|
|
||||||
app.CidRepo.AddEncoded(cid.Id, encoded);
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
private async Task<string> GetEncodedCid(string pid)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
var sp = (await GetStoragePurchase(pid))!;
|
|
||||||
return sp.Request.Content.Cid;
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
log.Error(ex.ToString());
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private async Task<StoragePurchase?> GetStoragePurchase(string pid)
|
|
||||||
{
|
|
||||||
// openapi still don't match code.
|
|
||||||
var str = await codex.Client.GetStringAsync($"{codex.Address.Host}:{codex.Address.Port}/api/codex/v1/storage/purchases/{pid}");
|
|
||||||
if (string.IsNullOrEmpty(str)) return null;
|
|
||||||
return JsonConvert.DeserializeObject<StoragePurchase>(str);
|
|
||||||
}
|
|
||||||
|
|
||||||
private async Task WaitTillFinished(string pid)
|
private async Task WaitTillFinished(string pid)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
@ -191,7 +102,7 @@ namespace AutoClient
|
|||||||
var emptyResponseTolerance = 10;
|
var emptyResponseTolerance = 10;
|
||||||
while (!app.Cts.Token.IsCancellationRequested)
|
while (!app.Cts.Token.IsCancellationRequested)
|
||||||
{
|
{
|
||||||
var purchase = await GetStoragePurchase(pid);
|
var purchase = await codex.GetStoragePurchase(pid);
|
||||||
if (purchase == null)
|
if (purchase == null)
|
||||||
{
|
{
|
||||||
await FixedShortDelay();
|
await FixedShortDelay();
|
||||||
@ -204,23 +115,22 @@ namespace AutoClient
|
|||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
var status = purchase.State.ToLowerInvariant();
|
if (purchase.IsCancelled)
|
||||||
if (status.Contains("cancel"))
|
|
||||||
{
|
{
|
||||||
app.Performance.StorageContractCancelled();
|
app.Performance.StorageContractCancelled();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (status.Contains("error"))
|
if (purchase.IsError)
|
||||||
{
|
{
|
||||||
app.Performance.StorageContractErrored(purchase.Error);
|
app.Performance.StorageContractErrored(purchase.Error);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (status.Contains("finished"))
|
if (purchase.IsFinished)
|
||||||
{
|
{
|
||||||
app.Performance.StorageContractFinished();
|
app.Performance.StorageContractFinished();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (status.Contains("started"))
|
if (purchase.IsStarted)
|
||||||
{
|
{
|
||||||
app.Performance.StorageContractStarted();
|
app.Performance.StorageContractStarted();
|
||||||
await FixedDurationDelay();
|
await FixedDurationDelay();
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
lock (_lock)
|
lock (_lock)
|
||||||
{
|
{
|
||||||
entries.Add(new CidEntry(nodeId, cid, knownSize));
|
entries.Add(new CidEntry(nodeId, cid, knownSize));
|
||||||
|
if (entries.Count > 1000) entries.Clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,5 +1,8 @@
|
|||||||
using CodexOpenApi;
|
using CodexOpenApi;
|
||||||
|
using CodexPlugin;
|
||||||
using Logging;
|
using Logging;
|
||||||
|
using Nethereum.Model;
|
||||||
|
using Newtonsoft.Json;
|
||||||
using Utils;
|
using Utils;
|
||||||
|
|
||||||
namespace AutoClient
|
namespace AutoClient
|
||||||
@ -30,4 +33,112 @@ namespace AutoClient
|
|||||||
public HttpClient Client { get; }
|
public HttpClient Client { get; }
|
||||||
public Address Address { get; }
|
public Address Address { get; }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public class CodexNode
|
||||||
|
{
|
||||||
|
private readonly App app;
|
||||||
|
private readonly ICodexInstance codex;
|
||||||
|
|
||||||
|
public CodexNode(App app, ICodexInstance instance)
|
||||||
|
{
|
||||||
|
this.app = app;
|
||||||
|
codex = instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task DownloadCid(string filename, string cid, long? size)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var sw = System.Diagnostics.Stopwatch.StartNew();
|
||||||
|
using var fileStream = File.OpenWrite(filename);
|
||||||
|
var fileResponse = await codex.Codex.DownloadNetworkStreamAsync(cid);
|
||||||
|
fileResponse.Stream.CopyTo(fileStream);
|
||||||
|
var time = sw.Elapsed;
|
||||||
|
app.Performance.DownloadSuccessful(size, time);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
app.Performance.DownloadFailed(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<ContentId> UploadFile(string filename)
|
||||||
|
{
|
||||||
|
using var fileStream = File.OpenRead(filename);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var info = new FileInfo(filename);
|
||||||
|
var sw = System.Diagnostics.Stopwatch.StartNew();
|
||||||
|
var cid = await UploadStream(fileStream, filename);
|
||||||
|
var time = sw.Elapsed;
|
||||||
|
app.Performance.UploadSuccessful(info.Length, time);
|
||||||
|
app.CidRepo.Add(codex.NodeId, cid.Id, info.Length);
|
||||||
|
return cid;
|
||||||
|
}
|
||||||
|
catch (Exception exc)
|
||||||
|
{
|
||||||
|
app.Performance.UploadFailed(exc);
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<string> RequestStorage(ContentId cid)
|
||||||
|
{
|
||||||
|
app.Log.Debug("Requesting storage for " + cid.Id);
|
||||||
|
var result = await codex.Codex.CreateStorageRequestAsync(cid.Id, new StorageRequestCreation()
|
||||||
|
{
|
||||||
|
Collateral = app.Config.RequiredCollateral.ToString(),
|
||||||
|
Duration = (app.Config.ContractDurationMinutes * 60).ToString(),
|
||||||
|
Expiry = (app.Config.ContractExpiryMinutes * 60).ToString(),
|
||||||
|
Nodes = app.Config.NumHosts,
|
||||||
|
Reward = app.Config.Price.ToString(),
|
||||||
|
ProofProbability = "15",
|
||||||
|
Tolerance = app.Config.HostTolerance
|
||||||
|
}, app.Cts.Token);
|
||||||
|
|
||||||
|
app.Log.Debug("Purchase ID: " + result);
|
||||||
|
|
||||||
|
var encoded = await GetEncodedCid(result);
|
||||||
|
app.CidRepo.AddEncoded(cid.Id, encoded);
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<StoragePurchase?> GetStoragePurchase(string pid)
|
||||||
|
{
|
||||||
|
// openapi still don't match code.
|
||||||
|
var str = await codex.Client.GetStringAsync($"{codex.Address.Host}:{codex.Address.Port}/api/codex/v1/storage/purchases/{pid}");
|
||||||
|
if (string.IsNullOrEmpty(str)) return null;
|
||||||
|
return JsonConvert.DeserializeObject<StoragePurchase>(str);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task<ContentId> UploadStream(FileStream fileStream, string filename)
|
||||||
|
{
|
||||||
|
app.Log.Debug($"Uploading file...");
|
||||||
|
var response = await codex.Codex.UploadAsync(
|
||||||
|
content_type: "application/octet-stream",
|
||||||
|
content_disposition: $"attachment; filename=\"{filename}\"",
|
||||||
|
fileStream, app.Cts.Token);
|
||||||
|
|
||||||
|
if (string.IsNullOrEmpty(response)) FrameworkAssert.Fail("Received empty response.");
|
||||||
|
if (response.StartsWith("Unable to store block")) FrameworkAssert.Fail("Node failed to store block.");
|
||||||
|
|
||||||
|
app.Log.Debug($"Uploaded file. Received contentId: '{response}'.");
|
||||||
|
return new ContentId(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task<string> GetEncodedCid(string pid)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var sp = (await GetStoragePurchase(pid))!;
|
||||||
|
return sp.Request.Content.Cid;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
app.Log.Error(ex.ToString());
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -34,6 +34,9 @@ namespace AutoClient
|
|||||||
[Uniform("filesizemb", "smb", "FILESIZEMB", false, "When greater than zero, size of file generated and uploaded. When zero, random images are used instead.")]
|
[Uniform("filesizemb", "smb", "FILESIZEMB", false, "When greater than zero, size of file generated and uploaded. When zero, random images are used instead.")]
|
||||||
public int FileSizeMb { get; set; } = 0;
|
public int FileSizeMb { get; set; } = 0;
|
||||||
|
|
||||||
|
[Uniform("folderToStore", "fts", "FOLDERTOSTORE", false, "When set, autoclient will attempt to upload and purchase storage for every non-JSON file in the provided folder.")]
|
||||||
|
public string FolderToStore { get; set; } = string.Empty;
|
||||||
|
|
||||||
public string LogPath
|
public string LogPath
|
||||||
{
|
{
|
||||||
get
|
get
|
||||||
|
@ -1,21 +1,408 @@
|
|||||||
using System;
|
using CodexOpenApi;
|
||||||
|
using Newtonsoft.Json;
|
||||||
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
|
using System.Text.Json.Serialization;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
using static AutoClient.Modes.FileWorker;
|
||||||
|
using static AutoClient.Modes.FolderWorkOverview;
|
||||||
|
|
||||||
namespace AutoClient.Modes
|
namespace AutoClient.Modes
|
||||||
{
|
{
|
||||||
public class FolderStoreMode : IMode
|
public class FolderStoreMode : IMode
|
||||||
{
|
{
|
||||||
|
private readonly App app;
|
||||||
|
private readonly string folder;
|
||||||
|
private readonly PurchaseInfo purchaseInfo;
|
||||||
|
private readonly CancellationTokenSource cts = new CancellationTokenSource();
|
||||||
|
private Task checkTask = Task.CompletedTask;
|
||||||
|
|
||||||
|
public FolderStoreMode(App app, string folder, PurchaseInfo purchaseInfo)
|
||||||
|
{
|
||||||
|
this.app = app;
|
||||||
|
this.folder = folder;
|
||||||
|
this.purchaseInfo = purchaseInfo;
|
||||||
|
}
|
||||||
|
|
||||||
public void Start(ICodexInstance instance, int index)
|
public void Start(ICodexInstance instance, int index)
|
||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
checkTask = Task.Run(async () =>
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await RunChecker(instance);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
app.Log.Error("Exception in FolderStoreMode worker: " + ex);
|
||||||
|
Environment.Exit(1);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task RunChecker(ICodexInstance instance)
|
||||||
|
{
|
||||||
|
var i = 0;
|
||||||
|
while (!cts.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
Thread.Sleep(5000);
|
||||||
|
await ProcessWorkItem(instance);
|
||||||
|
i++;
|
||||||
|
|
||||||
|
if (i > 5)
|
||||||
|
{
|
||||||
|
i = 0;
|
||||||
|
var overview = new FolderWorkOverview(app, purchaseInfo, folder);
|
||||||
|
overview.Update();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task ProcessWorkItem(ICodexInstance instance)
|
||||||
|
{
|
||||||
|
var file = app.FolderWorkDispatcher.GetFileToCheck();
|
||||||
|
var worker = new FileWorker(app, purchaseInfo, folder, file);
|
||||||
|
await worker.Update(instance);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void Stop()
|
public void Stop()
|
||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
cts.Cancel();
|
||||||
|
checkTask.Wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public class PurchaseInfo
|
||||||
|
{
|
||||||
|
public TimeSpan PurchaseDurationTotal { get; set; }
|
||||||
|
public TimeSpan PurchaseDurationSafe { get; set; }
|
||||||
|
}
|
||||||
|
|
||||||
|
public class FileWorker : JsonBacked<WorkerStatus>
|
||||||
|
{
|
||||||
|
private readonly App app;
|
||||||
|
private readonly PurchaseInfo purchaseInfo;
|
||||||
|
private readonly string sourceFilename;
|
||||||
|
|
||||||
|
public FileWorker(App app, PurchaseInfo purchaseInfo, string folder, string filename)
|
||||||
|
: base(app, folder, filename + ".json")
|
||||||
|
{
|
||||||
|
this.app = app;
|
||||||
|
this.purchaseInfo = purchaseInfo;
|
||||||
|
sourceFilename = filename;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task Update(ICodexInstance instance)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var codex = new CodexNode(app, instance);
|
||||||
|
await EnsureCid(instance, codex);
|
||||||
|
await EnsureRecentPurchase(instance, codex);
|
||||||
|
SaveState();
|
||||||
|
}
|
||||||
|
catch (Exception exc)
|
||||||
|
{
|
||||||
|
app.Log.Error("Exception during fileworker update: " + exc);
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task EnsureRecentPurchase(ICodexInstance instance, CodexNode codex)
|
||||||
|
{
|
||||||
|
var recent = GetMostRecent();
|
||||||
|
if (recent == null)
|
||||||
|
{
|
||||||
|
app.Log.Log($"No recent purchase for '{sourceFilename}'.");
|
||||||
|
await MakeNewPurchase(instance, codex);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await UpdatePurchase(recent, instance, codex);
|
||||||
|
|
||||||
|
if (recent.Expiry.HasValue || recent.Finish.HasValue)
|
||||||
|
{
|
||||||
|
app.Log.Log($"Recent purchase for '{sourceFilename}' has expired or finished.");
|
||||||
|
await MakeNewPurchase(instance, codex);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (recent.Started.HasValue &&
|
||||||
|
(recent.Created + purchaseInfo.PurchaseDurationSafe) > DateTime.UtcNow)
|
||||||
|
{
|
||||||
|
app.Log.Log($"Recent purchase for '{sourceFilename}' is going to expire soon.");
|
||||||
|
await MakeNewPurchase(instance, codex);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
app.Log.Log($"No new purchase needed for '{sourceFilename}'.");
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task UpdatePurchase(WorkerPurchase recent, ICodexInstance instance, CodexNode codex)
|
||||||
|
{
|
||||||
|
if (string.IsNullOrEmpty(recent.Pid)) throw new Exception("No purchaseID!");
|
||||||
|
var now = DateTime.UtcNow;
|
||||||
|
|
||||||
|
var purchase = await codex.GetStoragePurchase(recent.Pid);
|
||||||
|
if (purchase == null)
|
||||||
|
{
|
||||||
|
app.Log.Log($"No purchase information found for PID '{recent.Pid}' for file '{sourceFilename}'. Consider this one expired.");
|
||||||
|
recent.Expiry = now;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (purchase.IsSubmitted)
|
||||||
|
{
|
||||||
|
if (!recent.Submitted.HasValue) recent.Submitted = now;
|
||||||
|
}
|
||||||
|
if (purchase.IsStarted)
|
||||||
|
{
|
||||||
|
if (!recent.Submitted.HasValue) recent.Submitted = now;
|
||||||
|
if (!recent.Started.HasValue) recent.Started = now;
|
||||||
|
}
|
||||||
|
if (purchase.IsCancelled)
|
||||||
|
{
|
||||||
|
if (!recent.Submitted.HasValue) recent.Submitted = now;
|
||||||
|
if (!recent.Expiry.HasValue) recent.Expiry = now;
|
||||||
|
}
|
||||||
|
if (purchase.IsError)
|
||||||
|
{
|
||||||
|
if (!recent.Submitted.HasValue) recent.Submitted = now;
|
||||||
|
if (!recent.Expiry.HasValue) recent.Expiry = now;
|
||||||
|
}
|
||||||
|
if (purchase.IsFinished)
|
||||||
|
{
|
||||||
|
if (!recent.Submitted.HasValue) recent.Submitted = now;
|
||||||
|
if (!recent.Started.HasValue) recent.Started = now;
|
||||||
|
if (!recent.Finish.HasValue) recent.Finish = now;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task MakeNewPurchase(ICodexInstance instance, CodexNode codex)
|
||||||
|
{
|
||||||
|
if (string.IsNullOrEmpty(State.Cid)) throw new Exception("No cid!");
|
||||||
|
|
||||||
|
var response = await codex.RequestStorage(new CodexPlugin.ContentId(State.Cid));
|
||||||
|
if (string.IsNullOrEmpty(response) ||
|
||||||
|
response == "Unable to encode manifest" ||
|
||||||
|
response == "Purchasing not available" ||
|
||||||
|
response == "Expiry required" ||
|
||||||
|
response == "Expiry needs to be in future" ||
|
||||||
|
response == "Expiry has to be before the request's end (now + duration)")
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
State.Purchases = State.Purchases.Concat([
|
||||||
|
new WorkerPurchase
|
||||||
|
{
|
||||||
|
Created = DateTime.UtcNow,
|
||||||
|
Pid = response
|
||||||
|
}
|
||||||
|
]).ToArray();
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task EnsureCid(ICodexInstance instance, CodexNode codex)
|
||||||
|
{
|
||||||
|
if (!string.IsNullOrEmpty(State.Cid))
|
||||||
|
{
|
||||||
|
var found = true;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var manifest = await instance.Codex.DownloadNetworkManifestAsync(State.Cid);
|
||||||
|
if (manifest == null) found = false;
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
found = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!found)
|
||||||
|
{
|
||||||
|
app.Log.Log($"Existing CID '{State.Cid}' for '{sourceFilename}' could not be found in the network.");
|
||||||
|
State.Cid = "";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (string.IsNullOrEmpty(State.Cid))
|
||||||
|
{
|
||||||
|
app.Log.Log($"Uploading '{sourceFilename}'...");
|
||||||
|
var cid = await codex.UploadFile(sourceFilename);
|
||||||
|
app.Log.Log("Got CID: " + cid);
|
||||||
|
State.Cid = cid.Id;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private WorkerPurchase? GetMostRecent()
|
||||||
|
{
|
||||||
|
var maxSubmitted = State.Purchases.Where(p => p.Submitted.HasValue).Max(p => p.Submitted!.Value);
|
||||||
|
return State.Purchases.SingleOrDefault(p => p.Submitted.HasValue && p.Submitted.Value == maxSubmitted);
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool IsCurrentlyRunning()
|
||||||
|
{
|
||||||
|
if (!State.Purchases.Any()) return false;
|
||||||
|
|
||||||
|
return State.Purchases.Any(p =>
|
||||||
|
p.Submitted.HasValue &&
|
||||||
|
p.Started.HasValue &&
|
||||||
|
!p.Expiry.HasValue &&
|
||||||
|
!p.Finish.HasValue &&
|
||||||
|
p.Started.Value > (DateTime.UtcNow - purchaseInfo.PurchaseDurationTotal)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool IsCurrentlyFailed()
|
||||||
|
{
|
||||||
|
if (!State.Purchases.Any()) return false;
|
||||||
|
|
||||||
|
var mostRecent = GetMostRecent();
|
||||||
|
if (mostRecent == null ) return false;
|
||||||
|
|
||||||
|
return mostRecent.Expiry.HasValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
[Serializable]
|
||||||
|
public class WorkerStatus
|
||||||
|
{
|
||||||
|
public string Cid { get; set; } = string.Empty;
|
||||||
|
public WorkerPurchase[] Purchases { get; set; } = Array.Empty<WorkerPurchase>();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Serializable]
|
||||||
|
public class WorkerPurchase
|
||||||
|
{
|
||||||
|
public string Pid { get; set; } = string.Empty;
|
||||||
|
public DateTime Created { get; set; }
|
||||||
|
public DateTime? Submitted { get; set; }
|
||||||
|
public DateTime? Started { get; set; }
|
||||||
|
public DateTime? Expiry { get; set; }
|
||||||
|
public DateTime? Finish { get; set; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public class FolderWorkDispatcher
|
||||||
|
{
|
||||||
|
private readonly List<string> files = new List<string>();
|
||||||
|
public FolderWorkDispatcher(string folder)
|
||||||
|
{
|
||||||
|
var fs = Directory.GetFiles(folder);
|
||||||
|
foreach (var f in fs)
|
||||||
|
{
|
||||||
|
if (!f.ToLowerInvariant().EndsWith(".json"))
|
||||||
|
{
|
||||||
|
files.Add(f);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public string GetFileToCheck()
|
||||||
|
{
|
||||||
|
var file = files.First();
|
||||||
|
files.RemoveAt(0);
|
||||||
|
files.Add(file);
|
||||||
|
return file;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public class FolderWorkOverview : JsonBacked<WorkMonitorStatus>
|
||||||
|
{
|
||||||
|
private const string OverviewFilename = "codex_folder_saver_overview.json";
|
||||||
|
private readonly App app;
|
||||||
|
private readonly PurchaseInfo purchaseInfo;
|
||||||
|
|
||||||
|
public FolderWorkOverview(App app, PurchaseInfo purchaseInfo, string folder)
|
||||||
|
: base(app, folder, Path.Combine(folder, OverviewFilename))
|
||||||
|
{
|
||||||
|
this.app = app;
|
||||||
|
this.purchaseInfo = purchaseInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Update()
|
||||||
|
{
|
||||||
|
var jsonFiles = Directory.GetFiles(Folder).Where(f => f.ToLowerInvariant().EndsWith(".json") && !f.Contains(OverviewFilename)).ToList();
|
||||||
|
|
||||||
|
var total = 0;
|
||||||
|
var successful = 0;
|
||||||
|
var failed = 0;
|
||||||
|
foreach (var file in jsonFiles)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var worker = new FileWorker(app, purchaseInfo, Folder, file);
|
||||||
|
total++;
|
||||||
|
if (worker.IsCurrentlyRunning()) successful++;
|
||||||
|
if (worker.IsCurrentlyFailed()) failed++;
|
||||||
|
}
|
||||||
|
catch (Exception exc)
|
||||||
|
{
|
||||||
|
app.Log.Error("Exception in workoverview update: " + exc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
State.TotalFiles = total;
|
||||||
|
State.SuccessfulStored = successful;
|
||||||
|
State.StoreFailed = failed;
|
||||||
|
SaveState();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Serializable]
|
||||||
|
public class WorkMonitorStatus
|
||||||
|
{
|
||||||
|
public int TotalFiles { get; set; }
|
||||||
|
public int SuccessfulStored { get; set; }
|
||||||
|
public int StoreFailed { get; set; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract class JsonBacked<T> where T : new()
|
||||||
|
{
|
||||||
|
private readonly App app;
|
||||||
|
|
||||||
|
protected JsonBacked(App app, string folder, string filePath)
|
||||||
|
{
|
||||||
|
this.app = app;
|
||||||
|
Folder = folder;
|
||||||
|
FilePath = filePath;
|
||||||
|
LoadState();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void LoadState()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (!File.Exists(FilePath))
|
||||||
|
{
|
||||||
|
State = new T();
|
||||||
|
SaveState();
|
||||||
|
}
|
||||||
|
var text = File.ReadAllText(FilePath);
|
||||||
|
State = JsonConvert.DeserializeObject<T>(text)!;
|
||||||
|
if (State == null) throw new Exception("Didn't deserialize " + FilePath);
|
||||||
|
}
|
||||||
|
catch (Exception exc)
|
||||||
|
{
|
||||||
|
app.Log.Error("Failed to load state: " + exc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected string Folder { get; }
|
||||||
|
protected string FilePath { get; }
|
||||||
|
protected T State { get; private set; } = default(T)!;
|
||||||
|
|
||||||
|
protected void SaveState()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var json = JsonConvert.SerializeObject(State);
|
||||||
|
File.WriteAllText(FilePath, json);
|
||||||
|
}
|
||||||
|
catch (Exception exc)
|
||||||
|
{
|
||||||
|
app.Log.Error("Failed to save state: " + exc);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,7 +17,7 @@ namespace AutoClient.Modes
|
|||||||
{
|
{
|
||||||
for (var i = 0; i < app.Config.NumConcurrentPurchases; i++)
|
for (var i = 0; i < app.Config.NumConcurrentPurchases; i++)
|
||||||
{
|
{
|
||||||
purchasers.Add(new AutomaticPurchaser(new LogPrefixer(app.Log, $"({i}) "), instance));
|
purchasers.Add(new AutomaticPurchaser(new LogPrefixer(app.Log, $"({i}) "), instance, new CodexNode(app, instance)));
|
||||||
}
|
}
|
||||||
|
|
||||||
var delayPerPurchaser =
|
var delayPerPurchaser =
|
||||||
|
@ -16,11 +16,13 @@ namespace AutoClient
|
|||||||
Log($"Download failed: {ex}");
|
Log($"Download failed: {ex}");
|
||||||
}
|
}
|
||||||
|
|
||||||
public void DownloadSuccessful(long size, TimeSpan time)
|
public void DownloadSuccessful(long? size, TimeSpan time)
|
||||||
{
|
{
|
||||||
|
if (!size.HasValue) return;
|
||||||
|
|
||||||
long milliseconds = Convert.ToInt64(time.TotalMilliseconds);
|
long milliseconds = Convert.ToInt64(time.TotalMilliseconds);
|
||||||
if (milliseconds < 1) milliseconds = 1;
|
if (milliseconds < 1) milliseconds = 1;
|
||||||
long bytesPerSecond = 1000 * (size / milliseconds);
|
long bytesPerSecond = 1000 * (size.Value / milliseconds);
|
||||||
Log($"Download successful: {bytesPerSecond} bytes per second");
|
Log($"Download successful: {bytesPerSecond} bytes per second");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -55,9 +55,25 @@ public class Program
|
|||||||
|
|
||||||
private IMode CreateMode()
|
private IMode CreateMode()
|
||||||
{
|
{
|
||||||
|
if (!string.IsNullOrEmpty(app.Config.FolderToStore))
|
||||||
|
{
|
||||||
|
return CreateFolderStoreMode();
|
||||||
|
}
|
||||||
|
|
||||||
return new PurchasingMode(app);
|
return new PurchasingMode(app);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private IMode CreateFolderStoreMode()
|
||||||
|
{
|
||||||
|
if (app.Config.ContractDurationMinutes - 1 < 5) throw new Exception("Contract duration config option not long enough!");
|
||||||
|
|
||||||
|
return new FolderStoreMode(app, app.Config.FolderToStore, new PurchaseInfo
|
||||||
|
{
|
||||||
|
PurchaseDurationTotal = TimeSpan.FromMinutes(app.Config.ContractDurationMinutes),
|
||||||
|
PurchaseDurationSafe = TimeSpan.FromMinutes(app.Config.ContractDurationMinutes - 1),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
private async Task<CodexInstance[]> CreateCodexInstances()
|
private async Task<CodexInstance[]> CreateCodexInstances()
|
||||||
{
|
{
|
||||||
var endpointStrs = app.Config.CodexEndpoints.Split(";", StringSplitOptions.RemoveEmptyEntries);
|
var endpointStrs = app.Config.CodexEndpoints.Split(";", StringSplitOptions.RemoveEmptyEntries);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user