diff --git a/ProjectPlugins/CodexPlugin/CodexAccess.cs b/ProjectPlugins/CodexPlugin/CodexAccess.cs index 537dee0..5d97bec 100644 --- a/ProjectPlugins/CodexPlugin/CodexAccess.cs +++ b/ProjectPlugins/CodexPlugin/CodexAccess.cs @@ -60,24 +60,18 @@ namespace CodexPlugin }); } - public string UploadFile(FileStream fileStream) + public string UploadFile(FileStream fileStream, Action onFailure) { - LogSpaceStatistics("Before upload"); - - var str = OnCodex( + return OnCodex( api => api.UploadAsync(fileStream), - CreateRetryConfig(nameof(UploadFile))); - - LogSpaceStatistics("After upload"); - - return str; + CreateRetryConfig(nameof(UploadFile), onFailure)); } - public Stream DownloadFile(string contentId) + public Stream DownloadFile(string contentId, Action onFailure) { var fileResponse = OnCodex( api => api.DownloadNetworkAsync(contentId), - CreateRetryConfig(nameof(DownloadFile))); + CreateRetryConfig(nameof(DownloadFile), onFailure)); if (fileResponse.StatusCode != 200) throw new Exception("Download failed with StatusCode: " + fileResponse.StatusCode); return fileResponse.Stream; @@ -188,13 +182,14 @@ namespace CodexPlugin hasContainerCrashed = true; } - private Retry CreateRetryConfig(string description) + private Retry CreateRetryConfig(string description, Action onFailure) { var timeSet = tools.TimeSet; var log = tools.GetLog(); return new Retry(description, timeSet.HttpRetryTimeout(), timeSet.HttpCallRetryDelay(), failure => { + onFailure(failure); if (failure.Duration.TotalSeconds < timeSet.HttpCallTimeout().TotalSeconds) { Investigate(log, failure, timeSet); @@ -243,9 +238,9 @@ namespace CodexPlugin } } - private void LogSpaceStatistics(string prefix = "") + private void LogSpaceStatistics() { - tools.GetLog().Log($"{prefix} Space statistics: {Space()}"); + tools.GetLog().Log($"Space statistics: {Space()}"); } private void Throw(Failure failure) diff --git a/ProjectPlugins/CodexPlugin/CodexNode.cs b/ProjectPlugins/CodexPlugin/CodexNode.cs index dc8531c..5179aae 100644 --- a/ProjectPlugins/CodexPlugin/CodexNode.cs +++ b/ProjectPlugins/CodexPlugin/CodexNode.cs @@ -15,7 +15,9 @@ namespace CodexPlugin DebugInfo GetDebugInfo(); DebugPeer GetDebugPeer(string peerId); ContentId UploadFile(TrackedFile file); + ContentId UploadFile(TrackedFile file, Action onFailure); TrackedFile? DownloadContent(ContentId contentId, string fileLabel = ""); + TrackedFile? DownloadContent(ContentId contentId, Action onFailure, string fileLabel = ""); LocalDatasetList LocalFiles(); CodexSpace Space(); void ConnectToPeer(ICodexNode node); @@ -91,6 +93,11 @@ namespace CodexPlugin } public ContentId UploadFile(TrackedFile file) + { + return UploadFile(file, DoNothing); + } + + public ContentId UploadFile(TrackedFile file, Action onFailure) { using var fileStream = File.OpenRead(file.Filename); @@ -98,7 +105,7 @@ namespace CodexPlugin Log(logMessage); var measurement = Stopwatch.Measure(tools.GetLog(), logMessage, () => { - return CodexAccess.UploadFile(fileStream); + return CodexAccess.UploadFile(fileStream, onFailure); }); var response = measurement.Value; @@ -112,11 +119,16 @@ namespace CodexPlugin } public TrackedFile? DownloadContent(ContentId contentId, string fileLabel = "") + { + return DownloadContent(contentId, DoNothing, fileLabel); + } + + public TrackedFile? DownloadContent(ContentId contentId, Action onFailure, string fileLabel = "") { var logMessage = $"Downloading for contentId: '{contentId.Id}'..."; Log(logMessage); var file = tools.GetFileManager().CreateEmptyFile(fileLabel); - var measurement = Stopwatch.Measure(tools.GetLog(), logMessage, () => DownloadToFile(contentId.Id, file)); + var measurement = Stopwatch.Measure(tools.GetLog(), logMessage, () => DownloadToFile(contentId.Id, file, onFailure)); transferSpeeds.AddDownloadSample(file.GetFilesize(), measurement); Log($"Downloaded file {file.Describe()} to '{file.Filename}'."); return file; @@ -188,12 +200,12 @@ namespace CodexPlugin .ToArray(); } - private void DownloadToFile(string contentId, TrackedFile file) + private void DownloadToFile(string contentId, TrackedFile file, Action onFailure) { using var fileStream = File.OpenWrite(file.Filename); try { - using var downloadStream = CodexAccess.DownloadFile(contentId); + using var downloadStream = CodexAccess.DownloadFile(contentId, onFailure); downloadStream.CopyTo(fileStream); } catch @@ -207,5 +219,9 @@ namespace CodexPlugin { tools.GetLog().Log($"{GetName()}: {msg}"); } + + private void DoNothing(Failure failure) + { + } } } diff --git a/ProjectPlugins/MetricsPlugin/MetricsQuery.cs b/ProjectPlugins/MetricsPlugin/MetricsQuery.cs index 199015b..cdab697 100644 --- a/ProjectPlugins/MetricsPlugin/MetricsQuery.cs +++ b/ProjectPlugins/MetricsPlugin/MetricsQuery.cs @@ -1,4 +1,5 @@ using Core; +using IdentityModel; using KubernetesWorkflow.Types; using Logging; using System.Globalization; @@ -177,6 +178,41 @@ namespace MetricsPlugin { return "[" + string.Join(',', Sets.Select(s => s.ToString())) + "]"; } + + public string AsCsv() + { + var allTimestamps = Sets.SelectMany(s => s.Values.Select(v => v.Timestamp)).Distinct().OrderDescending().ToArray(); + + var lines = new List(); + MakeLine(lines, e => + { + e.Add("Metrics"); + foreach (var ts in allTimestamps) e.Add(ts.ToEpochTime().ToString()); + }); + + foreach (var set in Sets) + { + MakeLine(lines, e => + { + e.Add(set.Name); + foreach (var ts in allTimestamps) + { + var value = set.Values.SingleOrDefault(v => v.Timestamp == ts); + if (value == null) e.Add(" "); + else e.Add(value.Value.ToString()); + } + }); + } + + return string.Join(Environment.NewLine, lines.ToArray()); + } + + private void MakeLine(List lines, Action> values) + { + var list = new List(); + values(list); + lines.Add(string.Join(",", list)); + } } public class MetricsSet diff --git a/Tests/CodexLongTests/ScalabilityTests/ScalabilityTests.cs b/Tests/CodexLongTests/ScalabilityTests/ScalabilityTests.cs index b4c1b39..e75d189 100644 --- a/Tests/CodexLongTests/ScalabilityTests/ScalabilityTests.cs +++ b/Tests/CodexLongTests/ScalabilityTests/ScalabilityTests.cs @@ -1,4 +1,5 @@ using CodexPlugin; +using MetricsPlugin; using DistTestCore; using FileUtils; using NUnit.Framework; @@ -26,6 +27,7 @@ public class ScalabilityTests : CodexDistTest var bootstrap = StartCodex(s => s.WithLogLevel(logLevel)); var nodes = StartCodex(numberOfNodes - 1, s => s + .EnableMetrics() .WithBootstrapNode(bootstrap) .WithLogLevel(logLevel) .WithStorageQuota((fileSizeInMb + 50).MB()) @@ -33,10 +35,15 @@ public class ScalabilityTests : CodexDistTest var uploader = nodes.PickOneRandom(); var downloader = nodes.PickOneRandom(); + var metrics = Ci.GetMetricsFor(uploader, downloader); var testFile = GenerateTestFile(fileSizeInMb.MB()); - var contentId = uploader.UploadFile(testFile); - var downloadedFile = downloader.DownloadContent(contentId); + + LogNodeStatus(uploader, metrics[0]); + var contentId = uploader.UploadFile(testFile, f => LogNodeStatus(uploader, metrics[0])); + + LogNodeStatus(downloader, metrics[1]); + var downloadedFile = downloader.DownloadContent(contentId, f => LogNodeStatus(downloader, metrics[1])); downloadedFile!.AssertIsEqual(testFile); diff --git a/Tests/CodexTests/CodexDistTest.cs b/Tests/CodexTests/CodexDistTest.cs index 7058b8e..10ca6a4 100644 --- a/Tests/CodexTests/CodexDistTest.cs +++ b/Tests/CodexTests/CodexDistTest.cs @@ -6,6 +6,8 @@ using Core; using DistTestCore; using DistTestCore.Helpers; using DistTestCore.Logs; +using MetricsPlugin; +using Newtonsoft.Json; using NUnit.Framework.Constraints; namespace CodexTests @@ -99,6 +101,27 @@ namespace CodexTests log.AssertLogDoesNotContain("ERR "); } + public void LogNodeStatus(ICodexNode node, IMetricsAccess? metrics = null) + { + Log("Status for " + node.GetName() + Environment.NewLine + + GetBasicNodeStatus(node) + + GetNodeMetrics(metrics)); + } + + private string GetBasicNodeStatus(ICodexNode node) + { + return JsonConvert.SerializeObject(node.GetDebugInfo(), Formatting.Indented) + Environment.NewLine + + node.Space().ToString() + Environment.NewLine; + } + + private string GetNodeMetrics(IMetricsAccess? metrics) + { + if (metrics == null) return "No metrics enabled"; + var m = metrics.GetAllMetrics(); + if (m == null) return "No metrics received"; + return m.AsCsv(); + } + protected virtual void OnCodexSetup(ICodexSetup setup) { }