Adds logging of node status to base codexdistTest
This commit is contained in:
parent
f4d1dae478
commit
62b56e198b
|
@ -60,24 +60,18 @@ namespace CodexPlugin
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public string UploadFile(FileStream fileStream)
|
public string UploadFile(FileStream fileStream, Action<Failure> onFailure)
|
||||||
{
|
{
|
||||||
LogSpaceStatistics("Before upload");
|
return OnCodex(
|
||||||
|
|
||||||
var str = OnCodex(
|
|
||||||
api => api.UploadAsync(fileStream),
|
api => api.UploadAsync(fileStream),
|
||||||
CreateRetryConfig(nameof(UploadFile)));
|
CreateRetryConfig(nameof(UploadFile), onFailure));
|
||||||
|
|
||||||
LogSpaceStatistics("After upload");
|
|
||||||
|
|
||||||
return str;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Stream DownloadFile(string contentId)
|
public Stream DownloadFile(string contentId, Action<Failure> onFailure)
|
||||||
{
|
{
|
||||||
var fileResponse = OnCodex(
|
var fileResponse = OnCodex(
|
||||||
api => api.DownloadNetworkAsync(contentId),
|
api => api.DownloadNetworkAsync(contentId),
|
||||||
CreateRetryConfig(nameof(DownloadFile)));
|
CreateRetryConfig(nameof(DownloadFile), onFailure));
|
||||||
|
|
||||||
if (fileResponse.StatusCode != 200) throw new Exception("Download failed with StatusCode: " + fileResponse.StatusCode);
|
if (fileResponse.StatusCode != 200) throw new Exception("Download failed with StatusCode: " + fileResponse.StatusCode);
|
||||||
return fileResponse.Stream;
|
return fileResponse.Stream;
|
||||||
|
@ -188,13 +182,14 @@ namespace CodexPlugin
|
||||||
hasContainerCrashed = true;
|
hasContainerCrashed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Retry CreateRetryConfig(string description)
|
private Retry CreateRetryConfig(string description, Action<Failure> onFailure)
|
||||||
{
|
{
|
||||||
var timeSet = tools.TimeSet;
|
var timeSet = tools.TimeSet;
|
||||||
var log = tools.GetLog();
|
var log = tools.GetLog();
|
||||||
|
|
||||||
return new Retry(description, timeSet.HttpRetryTimeout(), timeSet.HttpCallRetryDelay(), failure =>
|
return new Retry(description, timeSet.HttpRetryTimeout(), timeSet.HttpCallRetryDelay(), failure =>
|
||||||
{
|
{
|
||||||
|
onFailure(failure);
|
||||||
if (failure.Duration.TotalSeconds < timeSet.HttpCallTimeout().TotalSeconds)
|
if (failure.Duration.TotalSeconds < timeSet.HttpCallTimeout().TotalSeconds)
|
||||||
{
|
{
|
||||||
Investigate(log, failure, timeSet);
|
Investigate(log, failure, timeSet);
|
||||||
|
@ -243,9 +238,9 @@ namespace CodexPlugin
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void LogSpaceStatistics(string prefix = "")
|
private void LogSpaceStatistics()
|
||||||
{
|
{
|
||||||
tools.GetLog().Log($"{prefix} Space statistics: {Space()}");
|
tools.GetLog().Log($"Space statistics: {Space()}");
|
||||||
}
|
}
|
||||||
|
|
||||||
private void Throw(Failure failure)
|
private void Throw(Failure failure)
|
||||||
|
|
|
@ -15,7 +15,9 @@ namespace CodexPlugin
|
||||||
DebugInfo GetDebugInfo();
|
DebugInfo GetDebugInfo();
|
||||||
DebugPeer GetDebugPeer(string peerId);
|
DebugPeer GetDebugPeer(string peerId);
|
||||||
ContentId UploadFile(TrackedFile file);
|
ContentId UploadFile(TrackedFile file);
|
||||||
|
ContentId UploadFile(TrackedFile file, Action<Failure> onFailure);
|
||||||
TrackedFile? DownloadContent(ContentId contentId, string fileLabel = "");
|
TrackedFile? DownloadContent(ContentId contentId, string fileLabel = "");
|
||||||
|
TrackedFile? DownloadContent(ContentId contentId, Action<Failure> onFailure, string fileLabel = "");
|
||||||
LocalDatasetList LocalFiles();
|
LocalDatasetList LocalFiles();
|
||||||
CodexSpace Space();
|
CodexSpace Space();
|
||||||
void ConnectToPeer(ICodexNode node);
|
void ConnectToPeer(ICodexNode node);
|
||||||
|
@ -91,6 +93,11 @@ namespace CodexPlugin
|
||||||
}
|
}
|
||||||
|
|
||||||
public ContentId UploadFile(TrackedFile file)
|
public ContentId UploadFile(TrackedFile file)
|
||||||
|
{
|
||||||
|
return UploadFile(file, DoNothing);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ContentId UploadFile(TrackedFile file, Action<Failure> onFailure)
|
||||||
{
|
{
|
||||||
using var fileStream = File.OpenRead(file.Filename);
|
using var fileStream = File.OpenRead(file.Filename);
|
||||||
|
|
||||||
|
@ -98,7 +105,7 @@ namespace CodexPlugin
|
||||||
Log(logMessage);
|
Log(logMessage);
|
||||||
var measurement = Stopwatch.Measure(tools.GetLog(), logMessage, () =>
|
var measurement = Stopwatch.Measure(tools.GetLog(), logMessage, () =>
|
||||||
{
|
{
|
||||||
return CodexAccess.UploadFile(fileStream);
|
return CodexAccess.UploadFile(fileStream, onFailure);
|
||||||
});
|
});
|
||||||
|
|
||||||
var response = measurement.Value;
|
var response = measurement.Value;
|
||||||
|
@ -112,11 +119,16 @@ namespace CodexPlugin
|
||||||
}
|
}
|
||||||
|
|
||||||
public TrackedFile? DownloadContent(ContentId contentId, string fileLabel = "")
|
public TrackedFile? DownloadContent(ContentId contentId, string fileLabel = "")
|
||||||
|
{
|
||||||
|
return DownloadContent(contentId, DoNothing, fileLabel);
|
||||||
|
}
|
||||||
|
|
||||||
|
public TrackedFile? DownloadContent(ContentId contentId, Action<Failure> onFailure, string fileLabel = "")
|
||||||
{
|
{
|
||||||
var logMessage = $"Downloading for contentId: '{contentId.Id}'...";
|
var logMessage = $"Downloading for contentId: '{contentId.Id}'...";
|
||||||
Log(logMessage);
|
Log(logMessage);
|
||||||
var file = tools.GetFileManager().CreateEmptyFile(fileLabel);
|
var file = tools.GetFileManager().CreateEmptyFile(fileLabel);
|
||||||
var measurement = Stopwatch.Measure(tools.GetLog(), logMessage, () => DownloadToFile(contentId.Id, file));
|
var measurement = Stopwatch.Measure(tools.GetLog(), logMessage, () => DownloadToFile(contentId.Id, file, onFailure));
|
||||||
transferSpeeds.AddDownloadSample(file.GetFilesize(), measurement);
|
transferSpeeds.AddDownloadSample(file.GetFilesize(), measurement);
|
||||||
Log($"Downloaded file {file.Describe()} to '{file.Filename}'.");
|
Log($"Downloaded file {file.Describe()} to '{file.Filename}'.");
|
||||||
return file;
|
return file;
|
||||||
|
@ -188,12 +200,12 @@ namespace CodexPlugin
|
||||||
.ToArray();
|
.ToArray();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void DownloadToFile(string contentId, TrackedFile file)
|
private void DownloadToFile(string contentId, TrackedFile file, Action<Failure> onFailure)
|
||||||
{
|
{
|
||||||
using var fileStream = File.OpenWrite(file.Filename);
|
using var fileStream = File.OpenWrite(file.Filename);
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
using var downloadStream = CodexAccess.DownloadFile(contentId);
|
using var downloadStream = CodexAccess.DownloadFile(contentId, onFailure);
|
||||||
downloadStream.CopyTo(fileStream);
|
downloadStream.CopyTo(fileStream);
|
||||||
}
|
}
|
||||||
catch
|
catch
|
||||||
|
@ -207,5 +219,9 @@ namespace CodexPlugin
|
||||||
{
|
{
|
||||||
tools.GetLog().Log($"{GetName()}: {msg}");
|
tools.GetLog().Log($"{GetName()}: {msg}");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void DoNothing(Failure failure)
|
||||||
|
{
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
using Core;
|
using Core;
|
||||||
|
using IdentityModel;
|
||||||
using KubernetesWorkflow.Types;
|
using KubernetesWorkflow.Types;
|
||||||
using Logging;
|
using Logging;
|
||||||
using System.Globalization;
|
using System.Globalization;
|
||||||
|
@ -177,6 +178,41 @@ namespace MetricsPlugin
|
||||||
{
|
{
|
||||||
return "[" + string.Join(',', Sets.Select(s => s.ToString())) + "]";
|
return "[" + string.Join(',', Sets.Select(s => s.ToString())) + "]";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public string AsCsv()
|
||||||
|
{
|
||||||
|
var allTimestamps = Sets.SelectMany(s => s.Values.Select(v => v.Timestamp)).Distinct().OrderDescending().ToArray();
|
||||||
|
|
||||||
|
var lines = new List<string>();
|
||||||
|
MakeLine(lines, e =>
|
||||||
|
{
|
||||||
|
e.Add("Metrics");
|
||||||
|
foreach (var ts in allTimestamps) e.Add(ts.ToEpochTime().ToString());
|
||||||
|
});
|
||||||
|
|
||||||
|
foreach (var set in Sets)
|
||||||
|
{
|
||||||
|
MakeLine(lines, e =>
|
||||||
|
{
|
||||||
|
e.Add(set.Name);
|
||||||
|
foreach (var ts in allTimestamps)
|
||||||
|
{
|
||||||
|
var value = set.Values.SingleOrDefault(v => v.Timestamp == ts);
|
||||||
|
if (value == null) e.Add(" ");
|
||||||
|
else e.Add(value.Value.ToString());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return string.Join(Environment.NewLine, lines.ToArray());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void MakeLine(List<string> lines, Action<List<string>> values)
|
||||||
|
{
|
||||||
|
var list = new List<string>();
|
||||||
|
values(list);
|
||||||
|
lines.Add(string.Join(",", list));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public class MetricsSet
|
public class MetricsSet
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
using CodexPlugin;
|
using CodexPlugin;
|
||||||
|
using MetricsPlugin;
|
||||||
using DistTestCore;
|
using DistTestCore;
|
||||||
using FileUtils;
|
using FileUtils;
|
||||||
using NUnit.Framework;
|
using NUnit.Framework;
|
||||||
|
@ -26,6 +27,7 @@ public class ScalabilityTests : CodexDistTest
|
||||||
|
|
||||||
var bootstrap = StartCodex(s => s.WithLogLevel(logLevel));
|
var bootstrap = StartCodex(s => s.WithLogLevel(logLevel));
|
||||||
var nodes = StartCodex(numberOfNodes - 1, s => s
|
var nodes = StartCodex(numberOfNodes - 1, s => s
|
||||||
|
.EnableMetrics()
|
||||||
.WithBootstrapNode(bootstrap)
|
.WithBootstrapNode(bootstrap)
|
||||||
.WithLogLevel(logLevel)
|
.WithLogLevel(logLevel)
|
||||||
.WithStorageQuota((fileSizeInMb + 50).MB())
|
.WithStorageQuota((fileSizeInMb + 50).MB())
|
||||||
|
@ -33,10 +35,15 @@ public class ScalabilityTests : CodexDistTest
|
||||||
|
|
||||||
var uploader = nodes.PickOneRandom();
|
var uploader = nodes.PickOneRandom();
|
||||||
var downloader = nodes.PickOneRandom();
|
var downloader = nodes.PickOneRandom();
|
||||||
|
var metrics = Ci.GetMetricsFor(uploader, downloader);
|
||||||
|
|
||||||
var testFile = GenerateTestFile(fileSizeInMb.MB());
|
var testFile = GenerateTestFile(fileSizeInMb.MB());
|
||||||
var contentId = uploader.UploadFile(testFile);
|
|
||||||
var downloadedFile = downloader.DownloadContent(contentId);
|
LogNodeStatus(uploader, metrics[0]);
|
||||||
|
var contentId = uploader.UploadFile(testFile, f => LogNodeStatus(uploader, metrics[0]));
|
||||||
|
|
||||||
|
LogNodeStatus(downloader, metrics[1]);
|
||||||
|
var downloadedFile = downloader.DownloadContent(contentId, f => LogNodeStatus(downloader, metrics[1]));
|
||||||
|
|
||||||
downloadedFile!.AssertIsEqual(testFile);
|
downloadedFile!.AssertIsEqual(testFile);
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,8 @@ using Core;
|
||||||
using DistTestCore;
|
using DistTestCore;
|
||||||
using DistTestCore.Helpers;
|
using DistTestCore.Helpers;
|
||||||
using DistTestCore.Logs;
|
using DistTestCore.Logs;
|
||||||
|
using MetricsPlugin;
|
||||||
|
using Newtonsoft.Json;
|
||||||
using NUnit.Framework.Constraints;
|
using NUnit.Framework.Constraints;
|
||||||
|
|
||||||
namespace CodexTests
|
namespace CodexTests
|
||||||
|
@ -99,6 +101,27 @@ namespace CodexTests
|
||||||
log.AssertLogDoesNotContain("ERR ");
|
log.AssertLogDoesNotContain("ERR ");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void LogNodeStatus(ICodexNode node, IMetricsAccess? metrics = null)
|
||||||
|
{
|
||||||
|
Log("Status for " + node.GetName() + Environment.NewLine +
|
||||||
|
GetBasicNodeStatus(node) +
|
||||||
|
GetNodeMetrics(metrics));
|
||||||
|
}
|
||||||
|
|
||||||
|
private string GetBasicNodeStatus(ICodexNode node)
|
||||||
|
{
|
||||||
|
return JsonConvert.SerializeObject(node.GetDebugInfo(), Formatting.Indented) + Environment.NewLine +
|
||||||
|
node.Space().ToString() + Environment.NewLine;
|
||||||
|
}
|
||||||
|
|
||||||
|
private string GetNodeMetrics(IMetricsAccess? metrics)
|
||||||
|
{
|
||||||
|
if (metrics == null) return "No metrics enabled";
|
||||||
|
var m = metrics.GetAllMetrics();
|
||||||
|
if (m == null) return "No metrics received";
|
||||||
|
return m.AsCsv();
|
||||||
|
}
|
||||||
|
|
||||||
protected virtual void OnCodexSetup(ICodexSetup setup)
|
protected virtual void OnCodexSetup(ICodexSetup setup)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue