concurrent purchases

This commit is contained in:
benbierens 2024-06-28 08:47:09 +02:00
parent 4c75cebcd6
commit c57dc4daa1
No known key found for this signature in database
GPG Key ID: 877D2C2E09A22F3A
3 changed files with 51 additions and 66 deletions

View File

@ -13,6 +13,9 @@ namespace AutoClient
[Uniform("datapath", "dp", "DATAPATH", false, "Root path where all data files will be saved.")]
public string DataPath { get; set; } = "datapath";
[Uniform("purchases", "np", "PURCHASES", false, "Number of concurrent purchases.")]
public int NumConcurrentPurchases { get; set; } = 1;
[Uniform("contract-duration", "cd", "CONTRACTDURATION", false, "contract duration in minutes. (default 30)")]
public int ContractDurationMinutes { get; set; } = 30;

View File

@ -8,7 +8,6 @@ public static class Program
{
public static async Task Main(string[] args)
{
var cts = new CancellationTokenSource();
var cancellationToken = cts.Token;
Console.CancelKeyPress += (sender, args) => cts.Cancel();
@ -16,6 +15,11 @@ public static class Program
var uniformArgs = new ArgsUniform<Configuration>(PrintHelp, args);
var config = uniformArgs.Parse(true);
if (config.NumConcurrentPurchases < 1)
{
throw new Exception("Number of concurrent purchases must be > 0");
}
var log = new LogSplitter(
new FileLog(Path.Combine(config.LogPath, "autoclient")),
new ConsoleLog()
@ -36,8 +40,20 @@ public static class Program
await CheckCodex(codex, log);
var runner = new Runner(log, client, address, codex, cancellationToken, config, imgGenerator);
await runner.Run();
var purchasers = new List<Purchaser>();
for (var i = 0; i < config.NumConcurrentPurchases; i++)
{
purchasers.Add(
new Purchaser(new LogPrefixer(log, $"({i}) "), client, address, codex, cancellationToken, config, imgGenerator)
);
}
var delayPerPurchaser = TimeSpan.FromMinutes(config.ContractDurationMinutes) / config.NumConcurrentPurchases;
foreach (var purchaser in purchasers)
{
purchaser.Start();
await Task.Delay(delayPerPurchaser);
}
log.Log("Done.");
}
@ -61,16 +77,4 @@ public static class Program
{
Console.WriteLine("Generates fake data and creates Codex storage contracts for it.");
}
private static IPluginTools CreateTools(ILog log, Configuration config)
{
var configuration = new KubernetesWorkflow.Configuration(
null,
operationTimeout: TimeSpan.FromMinutes(10),
retryDelay: TimeSpan.FromSeconds(10),
kubernetesNamespace: "notUsed!#");
var result = new EntryPoint(log, configuration, config.DataPath, new DefaultTimeSet());
return result.Tools;
}
}

View File

@ -6,7 +6,7 @@ using Utils;
namespace AutoClient
{
public class Runner
public class Purchaser
{
private readonly ILog log;
private readonly HttpClient client;
@ -16,7 +16,7 @@ namespace AutoClient
private readonly Configuration config;
private readonly ImageGenerator generator;
public Runner(ILog log, HttpClient client, Address address, CodexApi codex, CancellationToken ct, Configuration config, ImageGenerator generator)
public Purchaser(ILog log, HttpClient client, Address address, CodexApi codex, CancellationToken ct, Configuration config, ImageGenerator generator)
{
this.log = log;
this.client = client;
@ -27,33 +27,25 @@ namespace AutoClient
this.generator = generator;
}
public async Task Run()
public void Start()
{
Task.Run(Worker);
}
private async Task Worker()
{
while (!ct.IsCancellationRequested)
{
log.Log("New run!");
try
{
await DoRun();
log.Log("Run succcessful.");
}
catch (Exception ex)
{
log.Error("Exception during run: " + ex);
}
await FixedShortDelay();
var pid = await StartNewPurchase();
await WaitTillFinished(pid);
}
}
private async Task DoRun()
private async Task<string> StartNewPurchase()
{
var file = await CreateFile();
var cid = await UploadFile(file);
var pid = await RequestStorage(cid);
await WaitUntilStarted(pid);
return await RequestStorage(cid);
}
private async Task<string> CreateFile()
@ -66,8 +58,7 @@ namespace AutoClient
// Copied from CodexNode :/
using var fileStream = File.OpenRead(filename);
var logMessage = $"Uploading file {filename}...";
log.Log(logMessage);
log.Log($"Uploading file {filename}...");
var response = await codex.UploadAsync(fileStream, ct);
if (string.IsNullOrEmpty(response)) FrameworkAssert.Fail("Received empty response.");
@ -91,7 +82,7 @@ namespace AutoClient
Tolerance = config.HostTolerance
}, ct);
log.Log("Response: " + result);
log.Log("Purchase ID: " + result);
return result;
}
@ -108,59 +99,46 @@ namespace AutoClient
if (!string.IsNullOrEmpty(sp.Error)) log.Log($"Purchase {pid} error is {sp.Error}");
return sp.State;
}
catch (Exception ex)
catch
{
return null;
}
}
private async Task WaitUntilStarted(string pid)
private async Task WaitTillFinished(string pid)
{
log.Log("Waiting till contract is started, or expired...");
log.Log("Waiting...");
try
{
var emptyResponseTolerance = 10;
while (true)
{
await FixedShortDelay();
var status = await GetPurchaseState(pid);
var status = (await GetPurchaseState(pid))?.ToLowerInvariant();
if (string.IsNullOrEmpty(status))
{
emptyResponseTolerance--;
if (emptyResponseTolerance == 0)
{
log.Log("Received 10 empty responses. Applying expiry delay, then carrying on.");
log.Log("Received 10 empty responses. Stop tracking this purchase.");
await ExpiryTimeDelay();
return;
}
await FixedShortDelay();
}
else
{
if (status.Contains("pending") || status.Contains("submitted"))
if (status.Contains("cancel") ||
status.Contains("error") ||
status.Contains("finished"))
{
await FixedShortDelay();
return;
}
else if (status.Contains("started"))
if (status.Contains("started"))
{
log.Log("Started.");
await FixedDurationDelay();
}
else if (status.Contains("finished"))
{
log.Log("Purchase finished.");
return;
}
else if (status.Contains("error"))
{
await FixedShortDelay();
return;
}
else
{
await FixedShortDelay();
}
}
}
}
catch (Exception ex)
@ -172,17 +150,17 @@ namespace AutoClient
private async Task FixedDurationDelay()
{
await Task.Delay(config.ContractDurationMinutes * 60 * 1000);
await Task.Delay(config.ContractDurationMinutes * 60 * 1000, ct);
}
private async Task ExpiryTimeDelay()
{
await Task.Delay(config.ContractExpiryMinutes * 60 * 1000);
await Task.Delay(config.ContractExpiryMinutes * 60 * 1000, ct);
}
private async Task FixedShortDelay()
{
await Task.Delay(15 * 1000);
await Task.Delay(15 * 1000, ct);
}
}
}