diff --git a/Tools/AutoClient/App.cs b/Tools/AutoClient/App.cs new file mode 100644 index 0000000..d88ac66 --- /dev/null +++ b/Tools/AutoClient/App.cs @@ -0,0 +1,36 @@ +using Logging; + +namespace AutoClient +{ + public class App + { + public App(Configuration config) + { + Config = config; + + Log = new LogSplitter( + new FileLog(Path.Combine(config.LogPath, "autoclient")), + new ConsoleLog() + ); + + Generator = CreateGenerator(); + CidRepo = new CidRepo(config); + } + + public Configuration Config { get; } + public ILog Log { get; } + public IFileGenerator Generator { get; } + public CancellationTokenSource Cts { get; } = new CancellationTokenSource(); + public CidRepo CidRepo { get; } + public Performance Performance { get; } = new Performance(); + + private IFileGenerator CreateGenerator() + { + if (Config.FileSizeMb > 0) + { + return new RandomFileGenerator(Config, Log); + } + return new ImageGenerator(Log); + } + } +} diff --git a/Tools/AutoClient/CidRepo.cs b/Tools/AutoClient/CidRepo.cs new file mode 100644 index 0000000..e9038e3 --- /dev/null +++ b/Tools/AutoClient/CidRepo.cs @@ -0,0 +1,85 @@ +namespace AutoClient +{ + public class CidRepo + { + private readonly Random random = new Random(); + private readonly object _lock = new object(); + private readonly List entries = new List(); + private readonly Configuration config; + + public CidRepo(Configuration config) + { + this.config = config; + } + + public void Add(string nodeId, string cid, long knownSize) + { + lock (_lock) + { + entries.Add(new CidEntry(nodeId, cid, knownSize)); + } + } + + public void AddEncoded(string originalCid, string encodedCid) + { + lock (_lock) + { + var entry = entries.SingleOrDefault(e => e.Cid == originalCid); + if (entry == null) return; + + entry.Encoded = encodedCid; + } + } + + public string? GetForeignCid(string myNodeId) + { + lock (_lock) + { + while (true) + { + if (!entries.Any()) return null; + var available = entries.Where(e => e.NodeId != myNodeId).ToArray(); + if (!available.Any()) return null; + + var i = random.Next(0, available.Length); + var entry = available[i]; + + if (entry.CreatedUtc < (DateTime.UtcNow + TimeSpan.FromMinutes(config.ContractDurationMinutes))) + { + entries.Remove(entry); + } + else + { + return entry.Cid; + } + } + } + } + + public long? GetSizeForCid(string cid) + { + lock (_lock) + { + var entry = entries.SingleOrDefault(e => e.Cid == cid); + if (entry == null) return null; + return entry.KnownSize; + } + } + } + + public class CidEntry + { + public CidEntry(string nodeId, string cid, long knownSize) + { + NodeId = nodeId; + Cid = cid; + KnownSize = knownSize; + } + + public string NodeId { get; } + public string Cid { get; } + public string Encoded { get; set; } = string.Empty; + public long KnownSize { get; } + public DateTime CreatedUtc { get; } = DateTime.UtcNow; + } +} diff --git a/Tools/AutoClient/CodexUser.cs b/Tools/AutoClient/CodexUser.cs index bf71026..fd3259c 100644 --- a/Tools/AutoClient/CodexUser.cs +++ b/Tools/AutoClient/CodexUser.cs @@ -1,45 +1,57 @@ using CodexOpenApi; using Logging; -using static Org.BouncyCastle.Math.EC.ECCurve; using Utils; namespace AutoClient { public class CodexUser { - private readonly ILog log; + private readonly App app; private readonly CodexApi codex; private readonly HttpClient client; private readonly Address address; - private readonly IFileGenerator generator; - private readonly Configuration config; - private readonly CancellationToken cancellationToken; + private readonly List purchasers = new List(); + private Task starterTask = Task.CompletedTask; + private readonly string nodeId = Guid.NewGuid().ToString(); - public CodexUser(ILog log, CodexApi codex, HttpClient client, Address address, IFileGenerator generator, Configuration config, CancellationToken cancellationToken) + public CodexUser(App app, CodexApi codex, HttpClient client, Address address) { - this.log = log; + this.app = app; this.codex = codex; this.client = client; this.address = address; - this.generator = generator; - this.config = config; - this.cancellationToken = cancellationToken; } - public async Task Run() + public void Start(int index) { - var purchasers = new List(); - for (var i = 0; i < config.NumConcurrentPurchases; i++) + for (var i = 0; i < app.Config.NumConcurrentPurchases; i++) { - purchasers.Add(new Purchaser(new LogPrefixer(log, $"({i}) "), client, address, codex, config, generator, cancellationToken)); + purchasers.Add(new Purchaser(app, nodeId, new LogPrefixer(app.Log, $"({i}) "), client, address, codex)); } - var delayPerPurchaser = TimeSpan.FromMinutes(config.ContractDurationMinutes) / config.NumConcurrentPurchases; + var delayPerPurchaser = + TimeSpan.FromSeconds(10 * index) + + TimeSpan.FromMinutes(app.Config.ContractDurationMinutes) / app.Config.NumConcurrentPurchases; + + starterTask = Task.Run(() => StartPurchasers(delayPerPurchaser)); + } + + private async Task StartPurchasers(TimeSpan delayPerPurchaser) + { foreach (var purchaser in purchasers) { purchaser.Start(); await Task.Delay(delayPerPurchaser); } } + + public void Stop() + { + starterTask.Wait(); + foreach (var purchaser in purchasers) + { + purchaser.Stop(); + } + } } } diff --git a/Tools/AutoClient/ImageGenerator.cs b/Tools/AutoClient/ImageGenerator.cs index 390381b..bfb95e1 100644 --- a/Tools/AutoClient/ImageGenerator.cs +++ b/Tools/AutoClient/ImageGenerator.cs @@ -11,16 +11,16 @@ namespace AutoClient public class ImageGenerator : IFileGenerator { - private LogSplitter log; + private readonly ILog log; - public ImageGenerator(LogSplitter log) + public ImageGenerator(ILog log) { this.log = log; } public async Task Generate() { - log.Log("Fetching random image from picsum.photos..."); + log.Debug("Fetching random image from picsum.photos..."); var httpClient = new HttpClient(); var thing = await httpClient.GetStreamAsync("https://picsum.photos/3840/2160"); diff --git a/Tools/AutoClient/Performance.cs b/Tools/AutoClient/Performance.cs new file mode 100644 index 0000000..ba3213b --- /dev/null +++ b/Tools/AutoClient/Performance.cs @@ -0,0 +1,45 @@ +namespace AutoClient +{ + public class Performance + { + internal void DownloadFailed(Exception ex) + { + throw new NotImplementedException(); + } + + internal void DownloadSuccessful(long? size, TimeSpan time) + { + throw new NotImplementedException(); + } + + internal void StorageContractCancelled() + { + throw new NotImplementedException(); + } + + internal void StorageContractErrored(string error) + { + throw new NotImplementedException(); + } + + internal void StorageContractFinished() + { + throw new NotImplementedException(); + } + + internal void StorageContractStarted() + { + throw new NotImplementedException(); + } + + internal void UploadFailed(Exception exc) + { + throw new NotImplementedException(); + } + + internal void UploadSuccessful(long length, TimeSpan time) + { + throw new NotImplementedException(); + } + } +} diff --git a/Tools/AutoClient/Program.cs b/Tools/AutoClient/Program.cs index 07c7500..6d1c1fd 100644 --- a/Tools/AutoClient/Program.cs +++ b/Tools/AutoClient/Program.cs @@ -1,24 +1,15 @@ using ArgsUniform; using AutoClient; using CodexOpenApi; -using Core; -using Logging; -using Nethereum.Model; using Utils; public class Program { - private readonly CancellationTokenSource cts; - private readonly Configuration config; - private readonly LogSplitter log; - private readonly IFileGenerator generator; + private readonly App app; - public Program(CancellationTokenSource cts, Configuration config, LogSplitter log, IFileGenerator generator) + public Program(Configuration config) { - this.cts = cts; - this.config = config; - this.log = log; - this.generator = generator; + app = new App(config); } public static async Task Main(string[] args) @@ -34,30 +25,31 @@ public class Program throw new Exception("Number of concurrent purchases must be > 0"); } - var log = new LogSplitter( - new FileLog(Path.Combine(config.LogPath, "autoclient")), - new ConsoleLog() - ); - - var generator = CreateGenerator(config, log); - - var p = new Program(cts, config, log, generator); - await p.Run(args); - cts.Token.WaitHandle.WaitOne(); - log.Log("Done."); + var p = new Program(config); + await p.Run(); } - public async Task Run(string[] args) + public async Task Run() { - var codexUsers = CreateUsers(); + var codexUsers = await CreateUsers(); + var i = 0; + foreach (var user in codexUsers) + { + user.Start(i); + i++; + } - + app.Cts.Token.WaitHandle.WaitOne(); + + foreach (var user in codexUsers) user.Stop(); + + app.Log.Log("Done"); } private async Task CreateUsers() { - var endpointStrs = config.CodexEndpoints.Split(";", StringSplitOptions.RemoveEmptyEntries); + var endpointStrs = app.Config.CodexEndpoints.Split(";", StringSplitOptions.RemoveEmptyEntries); var result = new List(); foreach (var e in endpointStrs) @@ -79,21 +71,24 @@ public class Program port: port ); - log.Log($"Start. Address: {address}"); - - var client = new HttpClient(); var codex = new CodexApi(client); codex.BaseUrl = $"{address.Host}:{address.Port}/api/codex/v1"; + app.Log.Log($"Checking Codex at {address}..."); await CheckCodex(codex); + app.Log.Log("OK"); - return new CodexUser(); + return new CodexUser( + app, + codex, + client, + address + ); } private async Task CheckCodex(CodexApi codex) { - log.Log("Checking Codex..."); try { var info = await codex.GetDebugInfoAsync(); @@ -101,20 +96,11 @@ public class Program } catch (Exception ex) { - log.Log($"Codex not OK: {ex}"); + app.Log.Error($"Codex not OK: {ex}"); throw; } } - private static IFileGenerator CreateGenerator(Configuration config, LogSplitter log) - { - if (config.FileSizeMb > 0) - { - return new RandomFileGenerator(config, log); - } - return new ImageGenerator(log); - } - private static void PrintHelp() { Console.WriteLine("Generates fake data and creates Codex storage contracts for it."); diff --git a/Tools/AutoClient/Purchaser.cs b/Tools/AutoClient/Purchaser.cs index d0d6995..cc5966e 100644 --- a/Tools/AutoClient/Purchaser.cs +++ b/Tools/AutoClient/Purchaser.cs @@ -8,36 +8,76 @@ namespace AutoClient { public class Purchaser { + private readonly App app; + private readonly string nodeId; private readonly ILog log; private readonly HttpClient client; private readonly Address address; private readonly CodexApi codex; - private readonly Configuration config; - private readonly IFileGenerator generator; - private readonly CancellationToken ct; + private Task workerTask = Task.CompletedTask; - public Purchaser(ILog log, HttpClient client, Address address, CodexApi codex, Configuration config, IFileGenerator generator, CancellationToken ct) + public Purchaser(App app, string nodeId, ILog log, HttpClient client, Address address, CodexApi codex) { + this.app = app; + this.nodeId = nodeId; this.log = log; this.client = client; this.address = address; this.codex = codex; - this.config = config; - this.generator = generator; - this.ct = ct; } public void Start() { - Task.Run(Worker); + workerTask = Task.Run(Worker); + } + + public void Stop() + { + workerTask.Wait(); } private async Task Worker() { - while (!ct.IsCancellationRequested) + log.Log("Worker started."); + while (!app.Cts.Token.IsCancellationRequested) { - var pid = await StartNewPurchase(); - await WaitTillFinished(pid); + try + { + var pid = await StartNewPurchase(); + await WaitTillFinished(pid); + await DownloadForeignCid(); + } + catch (Exception ex) + { + log.Error("Worker failed with: " + ex); + await Task.Delay(TimeSpan.FromHours(6)); + } + } + } + + private async Task DownloadForeignCid() + { + var cid = app.CidRepo.GetForeignCid(nodeId); + if (cid == null) return; + var size = app.CidRepo.GetSizeForCid(cid); + if (cid == null) return; + + try + { + var sw = System.Diagnostics.Stopwatch.StartNew(); + var filename = Guid.NewGuid().ToString().ToLowerInvariant(); + { + using var fileStream = File.OpenWrite(filename); + var fileResponse = await codex.DownloadNetworkAsync(cid); + fileResponse.Stream.CopyTo(fileStream); + } + var time = sw.Elapsed; + File.Delete(filename); + app.Performance.DownloadSuccessful(size, time); + } + catch (Exception ex) + { + app.Performance.DownloadFailed(ex); } } @@ -50,72 +90,96 @@ namespace AutoClient private async Task CreateFile() { - return await generator.Generate(); + return await app.Generator.Generate(); } private async Task UploadFile(string filename) { - // Copied from CodexNode :/ using var fileStream = File.OpenRead(filename); + try + { + var info = new FileInfo(filename); + var sw = System.Diagnostics.Stopwatch.StartNew(); + var cid = await UploadStream(fileStream); + var time = sw.Elapsed; + app.Performance.UploadSuccessful(info.Length, time); + app.CidRepo.Add(nodeId, cid.Id, info.Length); + return cid; + } + catch (Exception exc) + { + app.Performance.UploadFailed(exc); + throw; + } + } - log.Log($"Uploading file {filename}..."); - var response = await codex.UploadAsync(fileStream, ct); + private async Task UploadStream(FileStream fileStream) + { + log.Debug($"Uploading file..."); + var response = await codex.UploadAsync(fileStream, app.Cts.Token); if (string.IsNullOrEmpty(response)) FrameworkAssert.Fail("Received empty response."); if (response.StartsWith("Unable to store block")) FrameworkAssert.Fail("Node failed to store block."); - log.Log($"Uploaded file. Received contentId: '{response}'."); + log.Debug($"Uploaded file. Received contentId: '{response}'."); return new ContentId(response); } private async Task RequestStorage(ContentId cid) { - log.Log("Requesting storage for " + cid.Id); + log.Debug("Requesting storage for " + cid.Id); var result = await codex.CreateStorageRequestAsync(cid.Id, new StorageRequestCreation() { - Collateral = config.RequiredCollateral.ToString(), - Duration = (config.ContractDurationMinutes * 60).ToString(), - Expiry = (config.ContractExpiryMinutes * 60).ToString(), - Nodes = config.NumHosts, - Reward = config.Price.ToString(), + Collateral = app.Config.RequiredCollateral.ToString(), + Duration = (app.Config.ContractDurationMinutes * 60).ToString(), + Expiry = (app.Config.ContractExpiryMinutes * 60).ToString(), + Nodes = app.Config.NumHosts, + Reward = app.Config.Price.ToString(), ProofProbability = "15", - Tolerance = config.HostTolerance - }, ct); + Tolerance = app.Config.HostTolerance + }, app.Cts.Token); - log.Log("Purchase ID: " + result); + log.Debug("Purchase ID: " + result); + + var encoded = await GetEncodedCid(result); + app.CidRepo.AddEncoded(cid.Id, encoded); return result; } - private async Task GetPurchaseState(string pid) + private async Task GetEncodedCid(string pid) { try { - // openapi still don't match code. - var str = await client.GetStringAsync($"{address.Host}:{address.Port}/api/codex/v1/storage/purchases/{pid}"); - if (string.IsNullOrEmpty(str)) return null; - var sp = JsonConvert.DeserializeObject(str)!; - log.Log($"Purchase {pid} is {sp.State}"); - if (!string.IsNullOrEmpty(sp.Error)) log.Log($"Purchase {pid} error is {sp.Error}"); - return sp.State; + var sp = await GetStoragePurchase(pid)!; + return sp.Request.Content.Cid; } - catch + catch (Exception ex) { - return null; + log.Error(ex.ToString()); + throw; } } + private async Task GetStoragePurchase(string pid) + { + // openapi still don't match code. + var str = await client.GetStringAsync($"{address.Host}:{address.Port}/api/codex/v1/storage/purchases/{pid}"); + if (string.IsNullOrEmpty(str)) return null; + return JsonConvert.DeserializeObject(str); + } + private async Task WaitTillFinished(string pid) { - log.Log("Waiting..."); try { var emptyResponseTolerance = 10; - while (true) + while (!app.Cts.Token.IsCancellationRequested) { - var status = (await GetPurchaseState(pid))?.ToLowerInvariant(); - if (string.IsNullOrEmpty(status)) + var purchase = await GetStoragePurchase(pid); + if (purchase == null) { + await FixedShortDelay(); emptyResponseTolerance--; if (emptyResponseTolerance == 0) { @@ -123,19 +187,28 @@ namespace AutoClient await ExpiryTimeDelay(); return; } + continue; } - else + var status = purchase.State.ToLowerInvariant(); + if (status.Contains("cancel")) { - if (status.Contains("cancel") || - status.Contains("error") || - status.Contains("finished")) - { - return; - } - if (status.Contains("started")) - { - await FixedDurationDelay(); - } + app.Performance.StorageContractCancelled(); + return; + } + if (status.Contains("error")) + { + app.Performance.StorageContractErrored(purchase.Error); + return; + } + if (status.Contains("finished")) + { + app.Performance.StorageContractFinished(); + return; + } + if (status.Contains("started")) + { + app.Performance.StorageContractStarted(); + await FixedDurationDelay(); } await FixedShortDelay(); @@ -150,17 +223,17 @@ namespace AutoClient private async Task FixedDurationDelay() { - await Task.Delay(config.ContractDurationMinutes * 60 * 1000, ct); + await Task.Delay(app.Config.ContractDurationMinutes * 60 * 1000, app.Cts.Token); } private async Task ExpiryTimeDelay() { - await Task.Delay(config.ContractExpiryMinutes * 60 * 1000, ct); + await Task.Delay(app.Config.ContractExpiryMinutes * 60 * 1000, app.Cts.Token); } private async Task FixedShortDelay() { - await Task.Delay(15 * 1000, ct); + await Task.Delay(15 * 1000, app.Cts.Token); } } }