implements loadbalancer

This commit is contained in:
ThatBen 2025-04-03 13:10:01 +02:00
parent 582c7326a1
commit 1240575f93
No known key found for this signature in database
GPG Key ID: 62C543548433D43E
10 changed files with 202 additions and 285 deletions

View File

@ -20,10 +20,6 @@ namespace AutoClient
new FileLog(Path.Combine(config.LogPath, "performance")),
new ConsoleLog()
));
var httpFactory = new HttpFactory(Log, new AutoClientWebTimeSet());
CodexNodeFactory = new CodexNodeFactory(log: Log, httpFactory: httpFactory, dataDir: Config.DataPath);
}
public Configuration Config { get; }
@ -31,7 +27,6 @@ namespace AutoClient
public IFileGenerator Generator { get; }
public CancellationTokenSource Cts { get; } = new CancellationTokenSource();
public Performance Performance { get; }
public CodexNodeFactory CodexNodeFactory { get; }
private IFileGenerator CreateGenerator()
{

View File

@ -1,145 +0,0 @@
using Logging;
namespace AutoClient
{
public class AutomaticPurchaser
{
private readonly App app;
private readonly ILog log;
private readonly CodexWrapper node;
private Task workerTask = Task.CompletedTask;
public AutomaticPurchaser(App app, ILog log, CodexWrapper node)
{
this.app = app;
this.log = log;
this.node = node;
}
public void Start()
{
workerTask = Task.Run(Worker);
}
public void Stop()
{
workerTask.Wait();
}
private async Task Worker()
{
log.Log("Worker started.");
while (!app.Cts.Token.IsCancellationRequested)
{
try
{
var pid = await StartNewPurchase();
await WaitTillFinished(pid);
}
catch (Exception ex)
{
log.Error("Worker failed with: " + ex);
await Task.Delay(TimeSpan.FromHours(6));
}
}
}
private async Task<string> StartNewPurchase()
{
var file = await CreateFile();
try
{
var cid = node.UploadFile(file);
var response = node.RequestStorage(cid);
return response.PurchaseId;
}
finally
{
DeleteFile(file);
}
}
private async Task<string> CreateFile()
{
return await app.Generator.Generate();
}
private void DeleteFile(string file)
{
try
{
File.Delete(file);
}
catch (Exception exc)
{
log.Error($"Failed to delete file '{file}': {exc}");
}
}
private async Task WaitTillFinished(string pid)
{
try
{
var emptyResponseTolerance = 10;
while (!app.Cts.Token.IsCancellationRequested)
{
var purchase = node.GetStoragePurchase(pid);
if (purchase == null)
{
await FixedShortDelay();
emptyResponseTolerance--;
if (emptyResponseTolerance == 0)
{
log.Log("Received 10 empty responses. Stop tracking this purchase.");
await ExpiryTimeDelay();
return;
}
continue;
}
if (purchase.IsCancelled)
{
app.Performance.StorageContractCancelled();
return;
}
if (purchase.IsError)
{
app.Performance.StorageContractErrored(purchase.Error);
return;
}
if (purchase.IsFinished)
{
app.Performance.StorageContractFinished();
return;
}
if (purchase.IsStarted)
{
app.Performance.StorageContractStarted();
await FixedDurationDelay();
}
await FixedShortDelay();
}
}
catch (Exception ex)
{
log.Log($"Wait failed with exception: {ex}. Assume contract will expire: Wait expiry time.");
await ExpiryTimeDelay();
}
}
private async Task FixedDurationDelay()
{
await Task.Delay(app.Config.ContractDurationMinutes * 60 * 1000, app.Cts.Token);
}
private async Task ExpiryTimeDelay()
{
await Task.Delay(app.Config.ContractExpiryMinutes * 60 * 1000, app.Cts.Token);
}
private async Task FixedShortDelay()
{
await Task.Delay(15 * 1000, app.Cts.Token);
}
}
}

View File

@ -0,0 +1,112 @@
namespace AutoClient
{
public class LoadBalancer
{
private readonly App app;
private readonly List<Cdx> instances;
private readonly object instanceLock = new object();
private readonly List<Task> tasks = new List<Task>();
private readonly object taskLock = new object();
private class Cdx
{
public Cdx(CodexWrapper instance)
{
Instance = instance;
}
public CodexWrapper Instance { get; }
public bool IsBusy { get; set; } = false;
}
public LoadBalancer(App app, CodexWrapper[] instances)
{
this.app = app;
this.instances = instances.Select(i => new Cdx(i)).ToList();
}
public void DispatchOnCodex(Action<CodexWrapper> action)
{
lock (taskLock)
{
WaitUntilNotAllBusy();
tasks.Add(Task.Run(() => RunTask(action)));
}
}
public void CleanUpTasks()
{
lock (taskLock)
{
foreach (var task in tasks)
{
if (task.IsFaulted) throw task.Exception;
}
tasks.RemoveAll(t => t.IsCompleted);
}
}
private void RunTask(Action<CodexWrapper> action)
{
var instance = GetAndSetFreeInstance();
try
{
action(instance.Instance);
}
finally
{
ReleaseInstance(instance);
}
}
private Cdx GetAndSetFreeInstance()
{
lock (instanceLock)
{
return GetSetInstance();
}
}
private Cdx GetSetInstance()
{
var i = instances.First();
instances.RemoveAt(0);
instances.Add(i);
if (i.IsBusy) return GetSetInstance();
i.IsBusy = true;
return i;
}
private void ReleaseInstance(Cdx instance)
{
lock (instanceLock)
{
instance.IsBusy = false;
}
}
private void WaitUntilNotAllBusy()
{
if (AllBusy())
{
app.Log.Log("[LoadBalancer] All instances are busy. Waiting...");
while (AllBusy())
{
Thread.Sleep(TimeSpan.FromSeconds(5.0));
}
}
}
private bool AllBusy()
{
lock (instanceLock)
{
return instances.All(i => i.IsBusy);
}
}
}
}

View File

@ -1,5 +1,4 @@
using GethConnector;
using Logging;
using Logging;
using Utils;
namespace AutoClient.Modes.FolderStore

View File

@ -4,32 +4,64 @@ using Utils;
namespace AutoClient.Modes.FolderStore
{
public interface IFileSaverEventHandler
{
void SaveChanges();
void OnFailure();
}
public class FileSaver
{
private readonly ILog log;
private readonly LoadBalancer loadBalancer;
private readonly Stats stats;
private readonly string folderFile;
private readonly FileStatus entry;
private readonly IFileSaverEventHandler handler;
public FileSaver(ILog log, LoadBalancer loadBalancer, Stats stats, string folderFile, FileStatus entry, IFileSaverEventHandler handler)
{
this.log = log;
this.loadBalancer = loadBalancer;
this.stats = stats;
this.folderFile = folderFile;
this.entry = entry;
this.handler = handler;
}
public void Process()
{
loadBalancer.DispatchOnCodex(instance =>
{
var run = new FileSaverRun(log, instance, stats, folderFile, entry, handler);
run.Process();
});
}
}
public class FileSaverRun
{
private readonly ILog log;
private readonly CodexWrapper instance;
private readonly Stats stats;
private readonly string folderFile;
private readonly FileStatus entry;
private readonly Action saveChanges;
private readonly IFileSaverEventHandler handler;
private readonly QuotaCheck quotaCheck;
public FileSaver(ILog log, CodexWrapper instance, Stats stats, string folderFile, FileStatus entry, Action saveChanges)
public FileSaverRun(ILog log, CodexWrapper instance, Stats stats, string folderFile, FileStatus entry, IFileSaverEventHandler handler)
{
this.log = log;
this.instance = instance;
this.stats = stats;
this.folderFile = folderFile;
this.entry = entry;
this.saveChanges = saveChanges;
this.handler = handler;
quotaCheck = new QuotaCheck(log, folderFile, instance);
}
public bool HasFailed { get; private set; }
public void Process()
{
HasFailed = false;
if (HasRecentPurchase())
{
Log($"Purchase running: '{entry.PurchaseId}'");
@ -71,7 +103,7 @@ namespace AutoClient.Modes.FolderStore
Thread.Sleep(TimeSpan.FromMinutes(1.0));
}
Log("Could not upload: Insufficient local storage quota.");
HasFailed = true;
handler.OnFailure();
return false;
}
@ -108,7 +140,7 @@ namespace AutoClient.Modes.FolderStore
var result = instance.Node.LocalFiles();
if (result == null) return false;
if (result.Content == null) return false;
var localCids = result.Content.Where(c => !string.IsNullOrEmpty(c.Cid.Id)).Select(c => c.Cid.Id).ToArray();
var isFound = localCids.Any(c => c.ToLowerInvariant() == entry.BasicCid.ToLowerInvariant());
if (isFound)
@ -150,9 +182,9 @@ namespace AutoClient.Modes.FolderStore
entry.BasicCid = string.Empty;
stats.FailedUploads++;
log.Error("Failed to upload: " + exc);
HasFailed = true;
handler.OnFailure();
}
saveChanges();
handler.SaveChanges();
}
private void CreateNewPurchase()
@ -168,7 +200,7 @@ namespace AutoClient.Modes.FolderStore
WaitForStarted(request);
stats.StorageRequestStats.SuccessfullyStarted++;
saveChanges();
handler.SaveChanges();
Log($"Successfully started new purchase: '{entry.PurchaseId}' for {Time.FormatDuration(request.Purchase.Duration)}");
}
@ -176,9 +208,9 @@ namespace AutoClient.Modes.FolderStore
{
entry.EncodedCid = string.Empty;
entry.PurchaseId = string.Empty;
saveChanges();
handler.SaveChanges();
log.Error("Failed to start new purchase: " + exc);
HasFailed = true;
handler.OnFailure();
}
}
@ -197,7 +229,7 @@ namespace AutoClient.Modes.FolderStore
throw new Exception("CID received from storage request was not protected.");
}
saveChanges();
handler.SaveChanges();
Log("Saved new purchaseId: " + entry.PurchaseId);
return request;
}
@ -233,7 +265,7 @@ namespace AutoClient.Modes.FolderStore
Log("Request failed to start. State: " + update.State);
entry.EncodedCid = string.Empty;
entry.PurchaseId = string.Empty;
saveChanges();
handler.SaveChanges();
return;
}
}
@ -241,10 +273,10 @@ namespace AutoClient.Modes.FolderStore
}
catch (Exception exc)
{
HasFailed = true;
handler.OnFailure();
Log($"Exception in {nameof(WaitForSubmittedToStarted)}: {exc}");
throw;
}
}
}
private void WaitForSubmitted(IStoragePurchaseContract request)

View File

@ -2,21 +2,21 @@
namespace AutoClient.Modes.FolderStore
{
public class FolderSaver
public class FolderSaver : IFileSaverEventHandler
{
private const string FolderSaverFilename = "foldersaver.json";
private readonly App app;
private readonly CodexWrapper instance;
private readonly LoadBalancer loadBalancer;
private readonly JsonFile<FolderStatus> statusFile;
private readonly FolderStatus status;
private readonly BalanceChecker balanceChecker;
private int changeCounter = 0;
private int failureCount = 0;
public FolderSaver(App app, CodexWrapper instance)
public FolderSaver(App app, LoadBalancer loadBalancer)
{
this.app = app;
this.instance = instance;
this.loadBalancer = loadBalancer;
balanceChecker = new BalanceChecker(app);
statusFile = new JsonFile<FolderStatus>(app, Path.Combine(app.Config.FolderToStore, FolderSaverFilename));
@ -87,7 +87,6 @@ namespace AutoClient.Modes.FolderStore
{
var fileSaver = CreateFileSaver(folderFile, entry);
fileSaver.Process();
if (fileSaver.HasFailed) failureCount++;
}
private void SaveFolderSaverJsonFile()
@ -101,7 +100,6 @@ namespace AutoClient.Modes.FolderStore
ApplyPadding(folderFile);
var fileSaver = CreateFileSaver(folderFile, entry);
fileSaver.Process();
if (fileSaver.HasFailed) failureCount++;
if (!string.IsNullOrEmpty(entry.EncodedCid))
{
@ -148,11 +146,18 @@ namespace AutoClient.Modes.FolderStore
{
var fixedLength = entry.Filename.PadRight(35);
var prefix = $"[{fixedLength}] ";
return new FileSaver(new LogPrefixer(app.Log, prefix), instance, status.Stats, folderFile, entry, saveChanges: () =>
{
statusFile.Save(status);
changeCounter++;
});
return new FileSaver(new LogPrefixer(app.Log, prefix), loadBalancer, status.Stats, folderFile, entry, this);
}
public void SaveChanges()
{
statusFile.Save(status);
changeCounter++;
}
public void OnFailure()
{
failureCount++;
}
}
}

View File

@ -2,28 +2,26 @@
namespace AutoClient.Modes
{
public class FolderStoreMode : IMode
public class FolderStoreMode
{
private readonly App app;
private readonly string folder;
private readonly PurchaseInfo purchaseInfo;
private readonly CancellationTokenSource cts = new CancellationTokenSource();
private Task checkTask = Task.CompletedTask;
private readonly LoadBalancer loadBalancer;
public FolderStoreMode(App app, string folder, PurchaseInfo purchaseInfo)
public FolderStoreMode(App app, LoadBalancer loadBalancer)
{
this.app = app;
this.folder = folder;
this.purchaseInfo = purchaseInfo;
this.loadBalancer = loadBalancer;
}
public void Start(CodexWrapper instance, int index)
public void Start()
{
checkTask = Task.Run(() =>
{
try
{
var saver = new FolderSaver(app, instance);
var saver = new FolderSaver(app, loadBalancer);
while (!cts.IsCancellationRequested)
{
saver.Run(cts);

View File

@ -1,8 +0,0 @@
namespace AutoClient.Modes
{
public interface IMode
{
void Start(CodexWrapper node, int index);
void Stop();
}
}

View File

@ -1,48 +0,0 @@
using Logging;
namespace AutoClient.Modes
{
public class PurchasingMode : IMode
{
private readonly List<AutomaticPurchaser> purchasers = new List<AutomaticPurchaser>();
private readonly App app;
private Task starterTask = Task.CompletedTask;
public PurchasingMode(App app)
{
this.app = app;
}
public void Start(CodexWrapper node, int index)
{
for (var i = 0; i < app.Config.NumConcurrentPurchases; i++)
{
purchasers.Add(new AutomaticPurchaser(app, new LogPrefixer(app.Log, $"({i}) "), node));
}
var delayPerPurchaser =
TimeSpan.FromSeconds(10 * index) +
TimeSpan.FromMinutes(app.Config.ContractDurationMinutes) / app.Config.NumConcurrentPurchases;
starterTask = Task.Run(() => StartPurchasers(delayPerPurchaser));
}
private async Task StartPurchasers(TimeSpan delayPerPurchaser)
{
foreach (var purchaser in purchasers)
{
purchaser.Start();
await Task.Delay(delayPerPurchaser);
}
}
public void Stop()
{
starterTask.Wait();
foreach (var purchaser in purchasers)
{
purchaser.Stop();
}
}
}
}

View File

@ -1,22 +1,22 @@
using ArgsUniform;
using AutoClient;
using AutoClient.Modes;
using AutoClient.Modes.FolderStore;
using CodexClient;
using GethPlugin;
using Utils;
using WebUtils;
using Logging;
public class Program
{
private readonly App app;
private readonly List<IMode> modes = new List<IMode>();
public Program(Configuration config)
{
app = new App(config);
}
public static async Task Main(string[] args)
public static void Main(string[] args)
{
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (sender, args) => cts.Cancel();
@ -30,60 +30,35 @@ public class Program
}
var p = new Program(config);
await p.Run();
p.Run();
}
public async Task Run()
public void Run()
{
await Task.CompletedTask;
if (app.Config.ContractDurationMinutes - 1 < 5) throw new Exception("Contract duration config option not long enough!");
var codexNodes = CreateCodexWrappers();
var loadBalancer = new LoadBalancer(app, codexNodes);
var i = 0;
foreach (var cdx in codexNodes)
{
var mode = CreateMode();
modes.Add(mode);
mode.Start(cdx, i);
i++;
}
var folderStore = new FolderStoreMode(app, loadBalancer);
folderStore.Start();
app.Cts.Token.WaitHandle.WaitOne();
foreach (var mode in modes) mode.Stop();
modes.Clear();
folderStore.Stop();
app.Log.Log("Done");
}
private IMode CreateMode()
{
if (!string.IsNullOrEmpty(app.Config.FolderToStore))
{
return CreateFolderStoreMode();
}
return new PurchasingMode(app);
}
private IMode CreateFolderStoreMode()
{
if (app.Config.ContractDurationMinutes - 1 < 5) throw new Exception("Contract duration config option not long enough!");
return new FolderStoreMode(app, app.Config.FolderToStore, new PurchaseInfo(
purchaseDurationTotal: TimeSpan.FromMinutes(app.Config.ContractDurationMinutes),
purchaseDurationSafe: TimeSpan.FromMinutes(app.Config.ContractDurationMinutes - 120)
));
}
private CodexWrapper[] CreateCodexWrappers()
{
var endpointStrs = app.Config.CodexEndpoints.Split(";", StringSplitOptions.RemoveEmptyEntries);
var result = new List<CodexWrapper>();
var i = 1;
foreach (var e in endpointStrs)
{
result.Add(CreateCodexWrapper(e));
result.Add(CreateCodexWrapper(e, i));
i++;
}
return result.ToArray();
@ -91,7 +66,7 @@ public class Program
private readonly string LogLevel = "TRACE;info:discv5,providers,routingtable,manager,cache;warn:libp2p,multistream,switch,transport,tcptransport,semaphore,asyncstreamwrapper,lpstream,mplex,mplexchannel,noise,bufferstream,mplexcoder,secure,chronosstream,connection,websock,ws-session,muxedupgrade,upgrade,identify,contracts,clock,serde,json,serialization,JSONRPC-WS-CLIENT,JSONRPC-HTTP-CLIENT,repostore";
private CodexWrapper CreateCodexWrapper(string endpoint)
private CodexWrapper CreateCodexWrapper(string endpoint, int number)
{
var splitIndex = endpoint.LastIndexOf(':');
var host = endpoint.Substring(0, splitIndex);
@ -103,11 +78,13 @@ public class Program
port: port
);
var log = new LogPrefixer(app.Log, $"[{number.ToString().PadLeft(3, '0')}] ");
var httpFactory = new HttpFactory(log, new AutoClientWebTimeSet());
var codexNodeFactory = new CodexNodeFactory(log: log, httpFactory: httpFactory, dataDir: app.Config.DataPath);
var instance = CodexInstance.CreateFromApiEndpoint("[AutoClient]", address, EthAccountGenerator.GenerateNew());
var node = app.CodexNodeFactory.CreateCodexNode(instance);
var node = codexNodeFactory.CreateCodexNode(instance);
node.SetLogLevel(LogLevel);
return new CodexWrapper(app, node);
}