diff --git a/Tools/AutoClient/App.cs b/Tools/AutoClient/App.cs index a581b6db..1a7e33a4 100644 --- a/Tools/AutoClient/App.cs +++ b/Tools/AutoClient/App.cs @@ -20,10 +20,6 @@ namespace AutoClient new FileLog(Path.Combine(config.LogPath, "performance")), new ConsoleLog() )); - - var httpFactory = new HttpFactory(Log, new AutoClientWebTimeSet()); - - CodexNodeFactory = new CodexNodeFactory(log: Log, httpFactory: httpFactory, dataDir: Config.DataPath); } public Configuration Config { get; } @@ -31,7 +27,6 @@ namespace AutoClient public IFileGenerator Generator { get; } public CancellationTokenSource Cts { get; } = new CancellationTokenSource(); public Performance Performance { get; } - public CodexNodeFactory CodexNodeFactory { get; } private IFileGenerator CreateGenerator() { diff --git a/Tools/AutoClient/AutomaticPurchaser.cs b/Tools/AutoClient/AutomaticPurchaser.cs deleted file mode 100644 index 1886f321..00000000 --- a/Tools/AutoClient/AutomaticPurchaser.cs +++ /dev/null @@ -1,145 +0,0 @@ -using Logging; - -namespace AutoClient -{ - public class AutomaticPurchaser - { - private readonly App app; - private readonly ILog log; - private readonly CodexWrapper node; - private Task workerTask = Task.CompletedTask; - - public AutomaticPurchaser(App app, ILog log, CodexWrapper node) - { - this.app = app; - this.log = log; - this.node = node; - } - - public void Start() - { - workerTask = Task.Run(Worker); - } - - public void Stop() - { - workerTask.Wait(); - } - - private async Task Worker() - { - log.Log("Worker started."); - while (!app.Cts.Token.IsCancellationRequested) - { - try - { - var pid = await StartNewPurchase(); - await WaitTillFinished(pid); - } - catch (Exception ex) - { - log.Error("Worker failed with: " + ex); - await Task.Delay(TimeSpan.FromHours(6)); - } - } - } - - private async Task StartNewPurchase() - { - var file = await CreateFile(); - try - { - var cid = node.UploadFile(file); - var response = node.RequestStorage(cid); - return response.PurchaseId; - } - finally - { - DeleteFile(file); - } - } - - private async Task CreateFile() - { - return await app.Generator.Generate(); - } - - private void DeleteFile(string file) - { - try - { - File.Delete(file); - } - catch (Exception exc) - { - log.Error($"Failed to delete file '{file}': {exc}"); - } - } - - private async Task WaitTillFinished(string pid) - { - try - { - var emptyResponseTolerance = 10; - while (!app.Cts.Token.IsCancellationRequested) - { - var purchase = node.GetStoragePurchase(pid); - if (purchase == null) - { - await FixedShortDelay(); - emptyResponseTolerance--; - if (emptyResponseTolerance == 0) - { - log.Log("Received 10 empty responses. Stop tracking this purchase."); - await ExpiryTimeDelay(); - return; - } - continue; - } - if (purchase.IsCancelled) - { - app.Performance.StorageContractCancelled(); - return; - } - if (purchase.IsError) - { - app.Performance.StorageContractErrored(purchase.Error); - return; - } - if (purchase.IsFinished) - { - app.Performance.StorageContractFinished(); - return; - } - if (purchase.IsStarted) - { - app.Performance.StorageContractStarted(); - await FixedDurationDelay(); - } - - await FixedShortDelay(); - } - } - catch (Exception ex) - { - log.Log($"Wait failed with exception: {ex}. Assume contract will expire: Wait expiry time."); - await ExpiryTimeDelay(); - } - } - - private async Task FixedDurationDelay() - { - await Task.Delay(app.Config.ContractDurationMinutes * 60 * 1000, app.Cts.Token); - } - - private async Task ExpiryTimeDelay() - { - await Task.Delay(app.Config.ContractExpiryMinutes * 60 * 1000, app.Cts.Token); - } - - private async Task FixedShortDelay() - { - await Task.Delay(15 * 1000, app.Cts.Token); - } - } -} diff --git a/Tools/AutoClient/LoadBalancer.cs b/Tools/AutoClient/LoadBalancer.cs new file mode 100644 index 00000000..64627adc --- /dev/null +++ b/Tools/AutoClient/LoadBalancer.cs @@ -0,0 +1,112 @@ +namespace AutoClient +{ + public class LoadBalancer + { + private readonly App app; + private readonly List instances; + private readonly object instanceLock = new object(); + private readonly List tasks = new List(); + private readonly object taskLock = new object(); + + private class Cdx + { + public Cdx(CodexWrapper instance) + { + Instance = instance; + } + + public CodexWrapper Instance { get; } + public bool IsBusy { get; set; } = false; + } + + public LoadBalancer(App app, CodexWrapper[] instances) + { + this.app = app; + this.instances = instances.Select(i => new Cdx(i)).ToList(); + } + + public void DispatchOnCodex(Action action) + { + lock (taskLock) + { + WaitUntilNotAllBusy(); + + tasks.Add(Task.Run(() => RunTask(action))); + } + } + + public void CleanUpTasks() + { + lock (taskLock) + { + foreach (var task in tasks) + { + if (task.IsFaulted) throw task.Exception; + } + + tasks.RemoveAll(t => t.IsCompleted); + } + } + + private void RunTask(Action action) + { + var instance = GetAndSetFreeInstance(); + try + { + action(instance.Instance); + } + finally + { + ReleaseInstance(instance); + } + } + + private Cdx GetAndSetFreeInstance() + { + lock (instanceLock) + { + return GetSetInstance(); + } + } + + private Cdx GetSetInstance() + { + var i = instances.First(); + instances.RemoveAt(0); + instances.Add(i); + + if (i.IsBusy) return GetSetInstance(); + + i.IsBusy = true; + return i; + } + + private void ReleaseInstance(Cdx instance) + { + lock (instanceLock) + { + instance.IsBusy = false; + } + } + + private void WaitUntilNotAllBusy() + { + if (AllBusy()) + { + app.Log.Log("[LoadBalancer] All instances are busy. Waiting..."); + while (AllBusy()) + { + Thread.Sleep(TimeSpan.FromSeconds(5.0)); + } + } + } + + private bool AllBusy() + { + lock (instanceLock) + { + return instances.All(i => i.IsBusy); + } + } + } +} diff --git a/Tools/AutoClient/Modes/FolderStore/BalanceChecker.cs b/Tools/AutoClient/Modes/FolderStore/BalanceChecker.cs index 895651ae..502d48cd 100644 --- a/Tools/AutoClient/Modes/FolderStore/BalanceChecker.cs +++ b/Tools/AutoClient/Modes/FolderStore/BalanceChecker.cs @@ -1,5 +1,4 @@ -using GethConnector; -using Logging; +using Logging; using Utils; namespace AutoClient.Modes.FolderStore diff --git a/Tools/AutoClient/Modes/FolderStore/FileSaver.cs b/Tools/AutoClient/Modes/FolderStore/FileSaver.cs index ec5299f1..35630307 100644 --- a/Tools/AutoClient/Modes/FolderStore/FileSaver.cs +++ b/Tools/AutoClient/Modes/FolderStore/FileSaver.cs @@ -4,32 +4,64 @@ using Utils; namespace AutoClient.Modes.FolderStore { + public interface IFileSaverEventHandler + { + void SaveChanges(); + void OnFailure(); + } + public class FileSaver + { + private readonly ILog log; + private readonly LoadBalancer loadBalancer; + private readonly Stats stats; + private readonly string folderFile; + private readonly FileStatus entry; + private readonly IFileSaverEventHandler handler; + + public FileSaver(ILog log, LoadBalancer loadBalancer, Stats stats, string folderFile, FileStatus entry, IFileSaverEventHandler handler) + { + this.log = log; + this.loadBalancer = loadBalancer; + this.stats = stats; + this.folderFile = folderFile; + this.entry = entry; + this.handler = handler; + } + + public void Process() + { + loadBalancer.DispatchOnCodex(instance => + { + var run = new FileSaverRun(log, instance, stats, folderFile, entry, handler); + run.Process(); + }); + } + } + + public class FileSaverRun { private readonly ILog log; private readonly CodexWrapper instance; private readonly Stats stats; private readonly string folderFile; private readonly FileStatus entry; - private readonly Action saveChanges; + private readonly IFileSaverEventHandler handler; private readonly QuotaCheck quotaCheck; - public FileSaver(ILog log, CodexWrapper instance, Stats stats, string folderFile, FileStatus entry, Action saveChanges) + public FileSaverRun(ILog log, CodexWrapper instance, Stats stats, string folderFile, FileStatus entry, IFileSaverEventHandler handler) { this.log = log; this.instance = instance; this.stats = stats; this.folderFile = folderFile; this.entry = entry; - this.saveChanges = saveChanges; + this.handler = handler; quotaCheck = new QuotaCheck(log, folderFile, instance); } - public bool HasFailed { get; private set; } - public void Process() { - HasFailed = false; if (HasRecentPurchase()) { Log($"Purchase running: '{entry.PurchaseId}'"); @@ -71,7 +103,7 @@ namespace AutoClient.Modes.FolderStore Thread.Sleep(TimeSpan.FromMinutes(1.0)); } Log("Could not upload: Insufficient local storage quota."); - HasFailed = true; + handler.OnFailure(); return false; } @@ -108,7 +140,7 @@ namespace AutoClient.Modes.FolderStore var result = instance.Node.LocalFiles(); if (result == null) return false; if (result.Content == null) return false; - + var localCids = result.Content.Where(c => !string.IsNullOrEmpty(c.Cid.Id)).Select(c => c.Cid.Id).ToArray(); var isFound = localCids.Any(c => c.ToLowerInvariant() == entry.BasicCid.ToLowerInvariant()); if (isFound) @@ -150,9 +182,9 @@ namespace AutoClient.Modes.FolderStore entry.BasicCid = string.Empty; stats.FailedUploads++; log.Error("Failed to upload: " + exc); - HasFailed = true; + handler.OnFailure(); } - saveChanges(); + handler.SaveChanges(); } private void CreateNewPurchase() @@ -168,7 +200,7 @@ namespace AutoClient.Modes.FolderStore WaitForStarted(request); stats.StorageRequestStats.SuccessfullyStarted++; - saveChanges(); + handler.SaveChanges(); Log($"Successfully started new purchase: '{entry.PurchaseId}' for {Time.FormatDuration(request.Purchase.Duration)}"); } @@ -176,9 +208,9 @@ namespace AutoClient.Modes.FolderStore { entry.EncodedCid = string.Empty; entry.PurchaseId = string.Empty; - saveChanges(); + handler.SaveChanges(); log.Error("Failed to start new purchase: " + exc); - HasFailed = true; + handler.OnFailure(); } } @@ -197,7 +229,7 @@ namespace AutoClient.Modes.FolderStore throw new Exception("CID received from storage request was not protected."); } - saveChanges(); + handler.SaveChanges(); Log("Saved new purchaseId: " + entry.PurchaseId); return request; } @@ -233,7 +265,7 @@ namespace AutoClient.Modes.FolderStore Log("Request failed to start. State: " + update.State); entry.EncodedCid = string.Empty; entry.PurchaseId = string.Empty; - saveChanges(); + handler.SaveChanges(); return; } } @@ -241,10 +273,10 @@ namespace AutoClient.Modes.FolderStore } catch (Exception exc) { - HasFailed = true; + handler.OnFailure(); Log($"Exception in {nameof(WaitForSubmittedToStarted)}: {exc}"); throw; - } + } } private void WaitForSubmitted(IStoragePurchaseContract request) diff --git a/Tools/AutoClient/Modes/FolderStore/FolderSaver.cs b/Tools/AutoClient/Modes/FolderStore/FolderSaver.cs index ed358ccc..0605c150 100644 --- a/Tools/AutoClient/Modes/FolderStore/FolderSaver.cs +++ b/Tools/AutoClient/Modes/FolderStore/FolderSaver.cs @@ -2,21 +2,21 @@ namespace AutoClient.Modes.FolderStore { - public class FolderSaver + public class FolderSaver : IFileSaverEventHandler { private const string FolderSaverFilename = "foldersaver.json"; private readonly App app; - private readonly CodexWrapper instance; + private readonly LoadBalancer loadBalancer; private readonly JsonFile statusFile; private readonly FolderStatus status; private readonly BalanceChecker balanceChecker; private int changeCounter = 0; private int failureCount = 0; - public FolderSaver(App app, CodexWrapper instance) + public FolderSaver(App app, LoadBalancer loadBalancer) { this.app = app; - this.instance = instance; + this.loadBalancer = loadBalancer; balanceChecker = new BalanceChecker(app); statusFile = new JsonFile(app, Path.Combine(app.Config.FolderToStore, FolderSaverFilename)); @@ -87,7 +87,6 @@ namespace AutoClient.Modes.FolderStore { var fileSaver = CreateFileSaver(folderFile, entry); fileSaver.Process(); - if (fileSaver.HasFailed) failureCount++; } private void SaveFolderSaverJsonFile() @@ -101,7 +100,6 @@ namespace AutoClient.Modes.FolderStore ApplyPadding(folderFile); var fileSaver = CreateFileSaver(folderFile, entry); fileSaver.Process(); - if (fileSaver.HasFailed) failureCount++; if (!string.IsNullOrEmpty(entry.EncodedCid)) { @@ -148,11 +146,18 @@ namespace AutoClient.Modes.FolderStore { var fixedLength = entry.Filename.PadRight(35); var prefix = $"[{fixedLength}] "; - return new FileSaver(new LogPrefixer(app.Log, prefix), instance, status.Stats, folderFile, entry, saveChanges: () => - { - statusFile.Save(status); - changeCounter++; - }); + return new FileSaver(new LogPrefixer(app.Log, prefix), loadBalancer, status.Stats, folderFile, entry, this); + } + + public void SaveChanges() + { + statusFile.Save(status); + changeCounter++; + } + + public void OnFailure() + { + failureCount++; } } } diff --git a/Tools/AutoClient/Modes/FolderStoreMode.cs b/Tools/AutoClient/Modes/FolderStoreMode.cs index 477ea9bf..64a29065 100644 --- a/Tools/AutoClient/Modes/FolderStoreMode.cs +++ b/Tools/AutoClient/Modes/FolderStoreMode.cs @@ -2,28 +2,26 @@ namespace AutoClient.Modes { - public class FolderStoreMode : IMode + public class FolderStoreMode { private readonly App app; - private readonly string folder; - private readonly PurchaseInfo purchaseInfo; private readonly CancellationTokenSource cts = new CancellationTokenSource(); private Task checkTask = Task.CompletedTask; + private readonly LoadBalancer loadBalancer; - public FolderStoreMode(App app, string folder, PurchaseInfo purchaseInfo) + public FolderStoreMode(App app, LoadBalancer loadBalancer) { this.app = app; - this.folder = folder; - this.purchaseInfo = purchaseInfo; + this.loadBalancer = loadBalancer; } - public void Start(CodexWrapper instance, int index) + public void Start() { checkTask = Task.Run(() => { try { - var saver = new FolderSaver(app, instance); + var saver = new FolderSaver(app, loadBalancer); while (!cts.IsCancellationRequested) { saver.Run(cts); diff --git a/Tools/AutoClient/Modes/Mode.cs b/Tools/AutoClient/Modes/Mode.cs deleted file mode 100644 index afc76f7a..00000000 --- a/Tools/AutoClient/Modes/Mode.cs +++ /dev/null @@ -1,8 +0,0 @@ -namespace AutoClient.Modes -{ - public interface IMode - { - void Start(CodexWrapper node, int index); - void Stop(); - } -} diff --git a/Tools/AutoClient/Modes/PurchasingMode.cs b/Tools/AutoClient/Modes/PurchasingMode.cs deleted file mode 100644 index 206ab365..00000000 --- a/Tools/AutoClient/Modes/PurchasingMode.cs +++ /dev/null @@ -1,48 +0,0 @@ -using Logging; - -namespace AutoClient.Modes -{ - public class PurchasingMode : IMode - { - private readonly List purchasers = new List(); - private readonly App app; - private Task starterTask = Task.CompletedTask; - - public PurchasingMode(App app) - { - this.app = app; - } - - public void Start(CodexWrapper node, int index) - { - for (var i = 0; i < app.Config.NumConcurrentPurchases; i++) - { - purchasers.Add(new AutomaticPurchaser(app, new LogPrefixer(app.Log, $"({i}) "), node)); - } - - var delayPerPurchaser = - TimeSpan.FromSeconds(10 * index) + - TimeSpan.FromMinutes(app.Config.ContractDurationMinutes) / app.Config.NumConcurrentPurchases; - - starterTask = Task.Run(() => StartPurchasers(delayPerPurchaser)); - } - - private async Task StartPurchasers(TimeSpan delayPerPurchaser) - { - foreach (var purchaser in purchasers) - { - purchaser.Start(); - await Task.Delay(delayPerPurchaser); - } - } - - public void Stop() - { - starterTask.Wait(); - foreach (var purchaser in purchasers) - { - purchaser.Stop(); - } - } - } -} diff --git a/Tools/AutoClient/Program.cs b/Tools/AutoClient/Program.cs index 6a35ccf5..7bda2d3e 100644 --- a/Tools/AutoClient/Program.cs +++ b/Tools/AutoClient/Program.cs @@ -1,22 +1,22 @@ using ArgsUniform; using AutoClient; using AutoClient.Modes; -using AutoClient.Modes.FolderStore; using CodexClient; using GethPlugin; using Utils; +using WebUtils; +using Logging; public class Program { private readonly App app; - private readonly List modes = new List(); public Program(Configuration config) { app = new App(config); } - public static async Task Main(string[] args) + public static void Main(string[] args) { var cts = new CancellationTokenSource(); Console.CancelKeyPress += (sender, args) => cts.Cancel(); @@ -30,60 +30,35 @@ public class Program } var p = new Program(config); - await p.Run(); + p.Run(); } - public async Task Run() + public void Run() { - await Task.CompletedTask; + if (app.Config.ContractDurationMinutes - 1 < 5) throw new Exception("Contract duration config option not long enough!"); var codexNodes = CreateCodexWrappers(); + var loadBalancer = new LoadBalancer(app, codexNodes); - var i = 0; - foreach (var cdx in codexNodes) - { - var mode = CreateMode(); - modes.Add(mode); - - mode.Start(cdx, i); - i++; - } + var folderStore = new FolderStoreMode(app, loadBalancer); + folderStore.Start(); app.Cts.Token.WaitHandle.WaitOne(); - foreach (var mode in modes) mode.Stop(); - modes.Clear(); + folderStore.Stop(); app.Log.Log("Done"); } - private IMode CreateMode() - { - if (!string.IsNullOrEmpty(app.Config.FolderToStore)) - { - return CreateFolderStoreMode(); - } - - return new PurchasingMode(app); - } - - private IMode CreateFolderStoreMode() - { - if (app.Config.ContractDurationMinutes - 1 < 5) throw new Exception("Contract duration config option not long enough!"); - - return new FolderStoreMode(app, app.Config.FolderToStore, new PurchaseInfo( - purchaseDurationTotal: TimeSpan.FromMinutes(app.Config.ContractDurationMinutes), - purchaseDurationSafe: TimeSpan.FromMinutes(app.Config.ContractDurationMinutes - 120) - )); - } - private CodexWrapper[] CreateCodexWrappers() { var endpointStrs = app.Config.CodexEndpoints.Split(";", StringSplitOptions.RemoveEmptyEntries); var result = new List(); + var i = 1; foreach (var e in endpointStrs) { - result.Add(CreateCodexWrapper(e)); + result.Add(CreateCodexWrapper(e, i)); + i++; } return result.ToArray(); @@ -91,7 +66,7 @@ public class Program private readonly string LogLevel = "TRACE;info:discv5,providers,routingtable,manager,cache;warn:libp2p,multistream,switch,transport,tcptransport,semaphore,asyncstreamwrapper,lpstream,mplex,mplexchannel,noise,bufferstream,mplexcoder,secure,chronosstream,connection,websock,ws-session,muxedupgrade,upgrade,identify,contracts,clock,serde,json,serialization,JSONRPC-WS-CLIENT,JSONRPC-HTTP-CLIENT,repostore"; - private CodexWrapper CreateCodexWrapper(string endpoint) + private CodexWrapper CreateCodexWrapper(string endpoint, int number) { var splitIndex = endpoint.LastIndexOf(':'); var host = endpoint.Substring(0, splitIndex); @@ -103,11 +78,13 @@ public class Program port: port ); + var log = new LogPrefixer(app.Log, $"[{number.ToString().PadLeft(3, '0')}] "); + var httpFactory = new HttpFactory(log, new AutoClientWebTimeSet()); + var codexNodeFactory = new CodexNodeFactory(log: log, httpFactory: httpFactory, dataDir: app.Config.DataPath); var instance = CodexInstance.CreateFromApiEndpoint("[AutoClient]", address, EthAccountGenerator.GenerateNew()); - var node = app.CodexNodeFactory.CreateCodexNode(instance); + var node = codexNodeFactory.CreateCodexNode(instance); node.SetLogLevel(LogLevel); - return new CodexWrapper(app, node); }