using CodexOpenApi; using CodexPlugin; using Logging; using Newtonsoft.Json; using Utils; namespace AutoClient { public class Purchaser { private readonly App app; private readonly string nodeId; private readonly ILog log; private readonly HttpClient client; private readonly Address address; private readonly CodexApi codex; private Task workerTask = Task.CompletedTask; public Purchaser(App app, string nodeId, ILog log, HttpClient client, Address address, CodexApi codex) { this.app = app; this.nodeId = nodeId; this.log = log; this.client = client; this.address = address; this.codex = codex; } public void Start() { workerTask = Task.Run(Worker); } public void Stop() { workerTask.Wait(); } private async Task Worker() { log.Log("Worker started."); while (!app.Cts.Token.IsCancellationRequested) { try { var pid = await StartNewPurchase(); await WaitTillFinished(pid); await DownloadForeignCid(); } catch (Exception ex) { log.Error("Worker failed with: " + ex); await Task.Delay(TimeSpan.FromHours(6)); } } } private async Task DownloadForeignCid() { var cid = app.CidRepo.GetForeignCid(nodeId); if (cid == null) return; var size = app.CidRepo.GetSizeForCid(cid); if (size == null) return; try { var sw = System.Diagnostics.Stopwatch.StartNew(); var filename = Guid.NewGuid().ToString().ToLowerInvariant(); { using var fileStream = File.OpenWrite(filename); var fileResponse = await codex.DownloadNetworkStreamAsync(cid); fileResponse.Stream.CopyTo(fileStream); } var time = sw.Elapsed; File.Delete(filename); app.Performance.DownloadSuccessful(size.Value, time); } catch (Exception ex) { app.Performance.DownloadFailed(ex); } } private async Task StartNewPurchase() { var file = await CreateFile(); try { var cid = await UploadFile(file); return await RequestStorage(cid); } finally { DeleteFile(file); } } private async Task CreateFile() { return await app.Generator.Generate(); } private void DeleteFile(string file) { try { File.Delete(file); } catch (Exception exc) { app.Log.Error($"Failed to delete file '{file}': {exc}"); } } private async Task UploadFile(string filename) { using var fileStream = File.OpenRead(filename); try { var info = new FileInfo(filename); var sw = System.Diagnostics.Stopwatch.StartNew(); var cid = await UploadStream(fileStream); var time = sw.Elapsed; app.Performance.UploadSuccessful(info.Length, time); app.CidRepo.Add(nodeId, cid.Id, info.Length); return cid; } catch (Exception exc) { app.Performance.UploadFailed(exc); throw; } } private async Task UploadStream(FileStream fileStream) { log.Debug($"Uploading file..."); var response = await codex.UploadAsync(fileStream, app.Cts.Token); if (string.IsNullOrEmpty(response)) FrameworkAssert.Fail("Received empty response."); if (response.StartsWith("Unable to store block")) FrameworkAssert.Fail("Node failed to store block."); log.Debug($"Uploaded file. Received contentId: '{response}'."); return new ContentId(response); } private async Task RequestStorage(ContentId cid) { log.Debug("Requesting storage for " + cid.Id); var result = await codex.CreateStorageRequestAsync(cid.Id, new StorageRequestCreation() { Collateral = app.Config.RequiredCollateral.ToString(), Duration = (app.Config.ContractDurationMinutes * 60).ToString(), Expiry = (app.Config.ContractExpiryMinutes * 60).ToString(), Nodes = app.Config.NumHosts, Reward = app.Config.Price.ToString(), ProofProbability = "15", Tolerance = app.Config.HostTolerance }, app.Cts.Token); log.Debug("Purchase ID: " + result); var encoded = await GetEncodedCid(result); app.CidRepo.AddEncoded(cid.Id, encoded); return result; } private async Task GetEncodedCid(string pid) { try { var sp = (await GetStoragePurchase(pid))!; return sp.Request.Content.Cid; } catch (Exception ex) { log.Error(ex.ToString()); throw; } } private async Task GetStoragePurchase(string pid) { // openapi still don't match code. var str = await client.GetStringAsync($"{address.Host}:{address.Port}/api/codex/v1/storage/purchases/{pid}"); if (string.IsNullOrEmpty(str)) return null; return JsonConvert.DeserializeObject(str); } private async Task WaitTillFinished(string pid) { try { var emptyResponseTolerance = 10; while (!app.Cts.Token.IsCancellationRequested) { var purchase = await GetStoragePurchase(pid); if (purchase == null) { await FixedShortDelay(); emptyResponseTolerance--; if (emptyResponseTolerance == 0) { log.Log("Received 10 empty responses. Stop tracking this purchase."); await ExpiryTimeDelay(); return; } continue; } var status = purchase.State.ToLowerInvariant(); if (status.Contains("cancel")) { app.Performance.StorageContractCancelled(); return; } if (status.Contains("error")) { app.Performance.StorageContractErrored(purchase.Error); return; } if (status.Contains("finished")) { app.Performance.StorageContractFinished(); return; } if (status.Contains("started")) { app.Performance.StorageContractStarted(); await FixedDurationDelay(); } await FixedShortDelay(); } } catch (Exception ex) { log.Log($"Wait failed with exception: {ex}. Assume contract will expire: Wait expiry time."); await ExpiryTimeDelay(); } } private async Task FixedDurationDelay() { await Task.Delay(app.Config.ContractDurationMinutes * 60 * 1000, app.Cts.Token); } private async Task ExpiryTimeDelay() { await Task.Delay(app.Config.ContractExpiryMinutes * 60 * 1000, app.Cts.Token); } private async Task FixedShortDelay() { await Task.Delay(15 * 1000, app.Cts.Token); } } }