Restores connectivity test helpers

This commit is contained in:
benbierens 2023-09-13 12:24:46 +02:00
parent d1895bab02
commit 84dd514517
No known key found for this signature in database
GPG Key ID: FE44815D96D0A1AA
8 changed files with 348 additions and 360 deletions

View File

@ -1,5 +1,6 @@
using Core; using Core;
using FileUtils; using FileUtils;
using KubernetesWorkflow;
using Logging; using Logging;
using MetricsPlugin; using MetricsPlugin;
using NUnit.Framework; using NUnit.Framework;
@ -10,6 +11,7 @@ namespace CodexPlugin
public interface IOnlineCodexNode public interface IOnlineCodexNode
{ {
string GetName(); string GetName();
RunningContainer Container { get; }
CodexDebugResponse GetDebugInfo(); CodexDebugResponse GetDebugInfo();
CodexDebugPeerResponse GetDebugPeer(string peerId); CodexDebugPeerResponse GetDebugPeer(string peerId);
ContentId UploadFile(TrackedFile file); ContentId UploadFile(TrackedFile file);
@ -35,6 +37,7 @@ namespace CodexPlugin
Version = new CodexDebugVersionResponse(); Version = new CodexDebugVersionResponse();
} }
public RunningContainer Container { get { return CodexAccess.Container; } }
public CodexAccess CodexAccess { get; } public CodexAccess CodexAccess { get; }
public CodexNodeGroup Group { get; } public CodexNodeGroup Group { get; }
public CodexDebugVersionResponse Version { get; private set; } public CodexDebugVersionResponse Version { get; private set; }

View File

@ -1,199 +0,0 @@
//using DistTestCore.Codex;
//using Logging;
//using NUnit.Framework;
//namespace DistTestCore.Helpers
//{
// public interface IFullConnectivityImplementation
// {
// string Description();
// string ValidateEntry(FullConnectivityHelper.Entry entry, FullConnectivityHelper.Entry[] allEntries);
// FullConnectivityHelper.PeerConnectionState Check(FullConnectivityHelper.Entry from, FullConnectivityHelper.Entry to);
// }
// public class FullConnectivityHelper
// {
// private static string Nl = Environment.NewLine;
// private readonly BaseLog log;
// private readonly IFullConnectivityImplementation implementation;
// public FullConnectivityHelper(BaseLog log, IFullConnectivityImplementation implementation)
// {
// this.log = log;
// this.implementation = implementation;
// }
// public void AssertFullyConnected(IEnumerable<CodexAccess> nodes)
// {
// AssertFullyConnected(nodes.ToArray());
// }
// private void AssertFullyConnected(CodexAccess[] nodes)
// {
// Log($"Asserting '{implementation.Description()}' for nodes: '{string.Join(",", nodes.Select(n => n.GetName()))}'...");
// var entries = CreateEntries(nodes);
// var pairs = CreatePairs(entries);
// // Each pair gets two chances.
// CheckAndRemoveSuccessful(pairs);
// CheckAndRemoveSuccessful(pairs);
// if (pairs.Any())
// {
// var pairDetails = string.Join(Nl, pairs.SelectMany(p => p.GetResultMessages()));
// Log($"Connections failed:{Nl}{pairDetails}");
// Assert.Fail(string.Join(Nl, pairs.SelectMany(p => p.GetResultMessages())));
// }
// else
// {
// Log($"'{implementation.Description()}' = Success! for nodes: {string.Join(",", nodes.Select(n => n.GetName()))}");
// }
// }
// private void CheckAndRemoveSuccessful(List<Pair> pairs)
// {
// var results = new List<string>();
// foreach (var pair in pairs.ToArray())
// {
// pair.Check();
// if (pair.Success)
// {
// results.AddRange(pair.GetResultMessages());
// pairs.Remove(pair);
// }
// }
// Log($"Connections successful:{Nl}{string.Join(Nl, results)}");
// }
// private Entry[] CreateEntries(CodexAccess[] nodes)
// {
// var entries = nodes.Select(n => new Entry(n)).ToArray();
// var errors = entries
// .Select(e => implementation.ValidateEntry(e, entries))
// .Where(s => !string.IsNullOrEmpty(s))
// .ToArray();
// if (errors.Any())
// {
// Assert.Fail("Some node entries failed to validate: " + string.Join(Nl, errors));
// }
// return entries;
// }
// private List<Pair> CreatePairs(Entry[] entries)
// {
// return CreatePairsIterator(entries).ToList();
// }
// private IEnumerable<Pair> CreatePairsIterator(Entry[] entries)
// {
// for (var x = 0; x < entries.Length; x++)
// {
// for (var y = x + 1; y < entries.Length; y++)
// {
// yield return new Pair(implementation, entries[x], entries[y]);
// }
// }
// }
// private void Log(string msg)
// {
// log.Log(msg);
// }
// public class Entry
// {
// public Entry(CodexAccess node)
// {
// Node = node;
// Response = node.GetDebugInfo();
// }
// public CodexAccess Node { get; }
// public CodexDebugResponse Response { get; }
// public override string ToString()
// {
// if (Response == null || string.IsNullOrEmpty(Response.id)) return "UNKNOWN";
// return Response.id;
// }
// }
// public enum PeerConnectionState
// {
// Unknown,
// Connection,
// NoConnection,
// }
// public class Pair
// {
// private TimeSpan aToBTime = TimeSpan.FromSeconds(0);
// private TimeSpan bToATime = TimeSpan.FromSeconds(0);
// private readonly IFullConnectivityImplementation implementation;
// public Pair(IFullConnectivityImplementation implementation, Entry a, Entry b)
// {
// this.implementation = implementation;
// A = a;
// B = b;
// }
// public Entry A { get; }
// public Entry B { get; }
// public PeerConnectionState AKnowsB { get; private set; }
// public PeerConnectionState BKnowsA { get; private set; }
// public bool Success { get { return AKnowsB == PeerConnectionState.Connection && BKnowsA == PeerConnectionState.Connection; } }
// public bool Inconclusive { get { return AKnowsB == PeerConnectionState.Unknown || BKnowsA == PeerConnectionState.Unknown; } }
// public void Check()
// {
// aToBTime = Measure(() => AKnowsB = Check(A, B));
// bToATime = Measure(() => BKnowsA = Check(B, A));
// }
// public override string ToString()
// {
// return $"[{string.Join(",", GetResultMessages())}]";
// }
// public string[] GetResultMessages()
// {
// var aName = A.ToString();
// var bName = B.ToString();
// return new[]
// {
// $"[{aName} --> {bName}] = {AKnowsB} ({aToBTime.TotalSeconds} seconds)",
// $"[{aName} <-- {bName}] = {BKnowsA} ({bToATime.TotalSeconds} seconds)"
// };
// }
// private static TimeSpan Measure(Action action)
// {
// var start = DateTime.UtcNow;
// action();
// return DateTime.UtcNow - start;
// }
// private PeerConnectionState Check(Entry from, Entry to)
// {
// Thread.Sleep(10);
// try
// {
// return implementation.Check(from, to);
// }
// catch
// {
// // Didn't get a conclusive answer. Try again later.
// return PeerConnectionState.Unknown;
// }
// }
// }
// }
//}

View File

@ -1,71 +0,0 @@
//using DistTestCore.Codex;
//using Logging;
//using static DistTestCore.Helpers.FullConnectivityHelper;
//namespace DistTestCore.Helpers
//{
// public class PeerConnectionTestHelpers : IFullConnectivityImplementation
// {
// private readonly FullConnectivityHelper helper;
// public PeerConnectionTestHelpers(BaseLog log)
// {
// helper = new FullConnectivityHelper(log, this);
// }
// public void AssertFullyConnected(IEnumerable<IOnlineCodexNode> nodes)
// {
// AssertFullyConnected(nodes.Select(n => ((OnlineCodexNode)n).CodexAccess));
// }
// public void AssertFullyConnected(IEnumerable<CodexAccess> nodes)
// {
// helper.AssertFullyConnected(nodes);
// }
// public string Description()
// {
// return "Peer Discovery";
// }
// public string ValidateEntry(Entry entry, Entry[] allEntries)
// {
// var result = string.Empty;
// foreach (var peer in entry.Response.table.nodes)
// {
// var expected = GetExpectedDiscoveryEndpoint(allEntries, peer);
// if (expected != peer.address)
// {
// result += $"Node:{entry.Node.GetName()} has incorrect peer table entry. Was: '{peer.address}', expected: '{expected}'. ";
// }
// }
// return result;
// }
// public PeerConnectionState Check(Entry from, Entry to)
// {
// var peerId = to.Response.id;
// var response = from.Node.GetDebugPeer(peerId);
// if (!response.IsPeerFound)
// {
// return PeerConnectionState.NoConnection;
// }
// if (!string.IsNullOrEmpty(response.peerId) && response.addresses.Any())
// {
// return PeerConnectionState.Connection;
// }
// return PeerConnectionState.Unknown;
// }
// private static string GetExpectedDiscoveryEndpoint(Entry[] allEntries, CodexDebugTableNodeResponse node)
// {
// var peer = allEntries.SingleOrDefault(e => e.Response.table.localNode.peerId == node.peerId);
// if (peer == null) return $"peerId: {node.peerId} is not known.";
// var ip = peer.Node.Container.Pod.PodInfo.Ip;
// var discPort = peer.Node.Container.Recipe.GetPortByTag(CodexContainerRecipe.DiscoveryPortTag);
// return $"{ip}:{discPort.Number}";
// }
// }
//}

View File

@ -1,89 +0,0 @@
//using DistTestCore.Codex;
//using FileUtils;
//using Logging;
//using Utils;
//using static DistTestCore.Helpers.FullConnectivityHelper;
//namespace DistTestCore.Helpers
//{
// public class PeerDownloadTestHelpers : IFullConnectivityImplementation
// {
// private readonly FullConnectivityHelper helper;
// private readonly BaseLog log;
// private readonly FileManager fileManager;
// private ByteSize testFileSize;
// public PeerDownloadTestHelpers(BaseLog log, FileManager fileManager)
// {
// helper = new FullConnectivityHelper(log, this);
// testFileSize = 1.MB();
// this.log = log;
// this.fileManager = fileManager;
// }
// public void AssertFullDownloadInterconnectivity(IEnumerable<IOnlineCodexNode> nodes, ByteSize testFileSize)
// {
// AssertFullDownloadInterconnectivity(nodes.Select(n => ((OnlineCodexNode)n).CodexAccess), testFileSize);
// }
// public void AssertFullDownloadInterconnectivity(IEnumerable<CodexAccess> nodes, ByteSize testFileSize)
// {
// this.testFileSize = testFileSize;
// helper.AssertFullyConnected(nodes);
// }
// public string Description()
// {
// return "Download Connectivity";
// }
// public string ValidateEntry(Entry entry, Entry[] allEntries)
// {
// return string.Empty;
// }
// public PeerConnectionState Check(Entry from, Entry to)
// {
// return fileManager.ScopedFiles(() => CheckConnectivity(from, to));
// }
// private PeerConnectionState CheckConnectivity(Entry from, Entry to)
// {
// var expectedFile = GenerateTestFile(from.Node, to.Node);
// using var uploadStream = File.OpenRead(expectedFile.Filename);
// var contentId = Stopwatch.Measure(log, "Upload", () => from.Node.UploadFile(uploadStream));
// try
// {
// var downloadedFile = Stopwatch.Measure(log, "Download", () => DownloadFile(to.Node, contentId, expectedFile.Label + "_downloaded"));
// expectedFile.AssertIsEqual(downloadedFile);
// return PeerConnectionState.Connection;
// }
// catch
// {
// // Should an exception occur during the download or file-content assertion,
// // We consider that as no-connection for the purpose of this test.
// return PeerConnectionState.NoConnection;
// }
// // Should an exception occur during upload, then this try is inconclusive and we try again next loop.
// }
// private TestFile DownloadFile(CodexAccess node, string contentId, string label)
// {
// var downloadedFile = fileManager.CreateEmptyTestFile(label);
// using var downloadStream = File.OpenWrite(downloadedFile.Filename);
// using var stream = node.DownloadFile(contentId);
// stream.CopyTo(downloadStream);
// return downloadedFile;
// }
// private TestFile GenerateTestFile(CodexAccess uploader, CodexAccess downloader)
// {
// var up = uploader.GetName().Replace("<", "").Replace(">", "");
// var down = downloader.GetName().Replace("<", "").Replace(">", "");
// var label = $"~from:{up}-to:{down}~";
// return fileManager.GenerateTestFile(testFileSize, label);
// }
// }
//}

View File

@ -0,0 +1,199 @@
using CodexPlugin;
using Logging;
using NUnit.Framework;
namespace DistTestCore.Helpers
{
public interface IFullConnectivityImplementation
{
string Description();
string ValidateEntry(FullConnectivityHelper.Entry entry, FullConnectivityHelper.Entry[] allEntries);
FullConnectivityHelper.PeerConnectionState Check(FullConnectivityHelper.Entry from, FullConnectivityHelper.Entry to);
}
public class FullConnectivityHelper
{
private static string Nl = Environment.NewLine;
private readonly ILog log;
private readonly IFullConnectivityImplementation implementation;
public FullConnectivityHelper(ILog log, IFullConnectivityImplementation implementation)
{
this.log = log;
this.implementation = implementation;
}
public void AssertFullyConnected(IEnumerable<IOnlineCodexNode> nodes)
{
AssertFullyConnected(nodes.ToArray());
}
private void AssertFullyConnected(IOnlineCodexNode[] nodes)
{
Log($"Asserting '{implementation.Description()}' for nodes: '{string.Join(",", nodes.Select(n => n.GetName()))}'...");
var entries = CreateEntries(nodes);
var pairs = CreatePairs(entries);
// Each pair gets two chances.
CheckAndRemoveSuccessful(pairs);
CheckAndRemoveSuccessful(pairs);
if (pairs.Any())
{
var pairDetails = string.Join(Nl, pairs.SelectMany(p => p.GetResultMessages()));
Log($"Connections failed:{Nl}{pairDetails}");
Assert.Fail(string.Join(Nl, pairs.SelectMany(p => p.GetResultMessages())));
}
else
{
Log($"'{implementation.Description()}' = Success! for nodes: {string.Join(",", nodes.Select(n => n.GetName()))}");
}
}
private void CheckAndRemoveSuccessful(List<Pair> pairs)
{
var results = new List<string>();
foreach (var pair in pairs.ToArray())
{
pair.Check();
if (pair.Success)
{
results.AddRange(pair.GetResultMessages());
pairs.Remove(pair);
}
}
Log($"Connections successful:{Nl}{string.Join(Nl, results)}");
}
private Entry[] CreateEntries(IOnlineCodexNode[] nodes)
{
var entries = nodes.Select(n => new Entry(n)).ToArray();
var errors = entries
.Select(e => implementation.ValidateEntry(e, entries))
.Where(s => !string.IsNullOrEmpty(s))
.ToArray();
if (errors.Any())
{
Assert.Fail("Some node entries failed to validate: " + string.Join(Nl, errors));
}
return entries;
}
private List<Pair> CreatePairs(Entry[] entries)
{
return CreatePairsIterator(entries).ToList();
}
private IEnumerable<Pair> CreatePairsIterator(Entry[] entries)
{
for (var x = 0; x < entries.Length; x++)
{
for (var y = x + 1; y < entries.Length; y++)
{
yield return new Pair(implementation, entries[x], entries[y]);
}
}
}
private void Log(string msg)
{
log.Log(msg);
}
public class Entry
{
public Entry(IOnlineCodexNode node)
{
Node = node;
Response = node.GetDebugInfo();
}
public IOnlineCodexNode Node { get; }
public CodexDebugResponse Response { get; }
public override string ToString()
{
if (Response == null || string.IsNullOrEmpty(Response.id)) return "UNKNOWN";
return Response.id;
}
}
public enum PeerConnectionState
{
Unknown,
Connection,
NoConnection,
}
public class Pair
{
private TimeSpan aToBTime = TimeSpan.FromSeconds(0);
private TimeSpan bToATime = TimeSpan.FromSeconds(0);
private readonly IFullConnectivityImplementation implementation;
public Pair(IFullConnectivityImplementation implementation, Entry a, Entry b)
{
this.implementation = implementation;
A = a;
B = b;
}
public Entry A { get; }
public Entry B { get; }
public PeerConnectionState AKnowsB { get; private set; }
public PeerConnectionState BKnowsA { get; private set; }
public bool Success { get { return AKnowsB == PeerConnectionState.Connection && BKnowsA == PeerConnectionState.Connection; } }
public bool Inconclusive { get { return AKnowsB == PeerConnectionState.Unknown || BKnowsA == PeerConnectionState.Unknown; } }
public void Check()
{
aToBTime = Measure(() => AKnowsB = Check(A, B));
bToATime = Measure(() => BKnowsA = Check(B, A));
}
public override string ToString()
{
return $"[{string.Join(",", GetResultMessages())}]";
}
public string[] GetResultMessages()
{
var aName = A.ToString();
var bName = B.ToString();
return new[]
{
$"[{aName} --> {bName}] = {AKnowsB} ({aToBTime.TotalSeconds} seconds)",
$"[{aName} <-- {bName}] = {BKnowsA} ({bToATime.TotalSeconds} seconds)"
};
}
private static TimeSpan Measure(Action action)
{
var start = DateTime.UtcNow;
action();
return DateTime.UtcNow - start;
}
private PeerConnectionState Check(Entry from, Entry to)
{
Thread.Sleep(10);
try
{
return implementation.Check(from, to);
}
catch
{
// Didn't get a conclusive answer. Try again later.
return PeerConnectionState.Unknown;
}
}
}
}
}

View File

@ -0,0 +1,67 @@
using CodexPlugin;
using Logging;
using static DistTestCore.Helpers.FullConnectivityHelper;
namespace DistTestCore.Helpers
{
public class PeerConnectionTestHelpers : IFullConnectivityImplementation
{
private readonly FullConnectivityHelper helper;
public PeerConnectionTestHelpers(ILog log)
{
helper = new FullConnectivityHelper(log, this);
}
public void AssertFullyConnected(IEnumerable<IOnlineCodexNode> nodes)
{
helper.AssertFullyConnected(nodes);
}
public string Description()
{
return "Peer Discovery";
}
public string ValidateEntry(Entry entry, Entry[] allEntries)
{
var result = string.Empty;
foreach (var peer in entry.Response.table.nodes)
{
var expected = GetExpectedDiscoveryEndpoint(allEntries, peer);
if (expected != peer.address)
{
result += $"Node:{entry.Node.GetName()} has incorrect peer table entry. Was: '{peer.address}', expected: '{expected}'. ";
}
}
return result;
}
public PeerConnectionState Check(Entry from, Entry to)
{
var peerId = to.Response.id;
var response = from.Node.GetDebugPeer(peerId);
if (!response.IsPeerFound)
{
return PeerConnectionState.NoConnection;
}
if (!string.IsNullOrEmpty(response.peerId) && response.addresses.Any())
{
return PeerConnectionState.Connection;
}
return PeerConnectionState.Unknown;
}
private static string GetExpectedDiscoveryEndpoint(Entry[] allEntries, CodexDebugTableNodeResponse node)
{
var peer = allEntries.SingleOrDefault(e => e.Response.table.localNode.peerId == node.peerId);
if (peer == null) return $"peerId: {node.peerId} is not known.";
var container = peer.Node.Container;
var ip = container.Pod.PodInfo.Ip;
var discPort = container.Recipe.GetPortByTag(CodexContainerRecipe.DiscoveryPortTag)!;
return $"{ip}:{discPort.Number}";
}
}
}

View File

@ -0,0 +1,78 @@
using CodexPlugin;
using FileUtils;
using Logging;
using Utils;
using static DistTestCore.Helpers.FullConnectivityHelper;
namespace DistTestCore.Helpers
{
public class PeerDownloadTestHelpers : IFullConnectivityImplementation
{
private readonly FullConnectivityHelper helper;
private readonly ILog log;
private readonly FileManager fileManager;
private ByteSize testFileSize;
public PeerDownloadTestHelpers(ILog log, FileManager fileManager)
{
helper = new FullConnectivityHelper(log, this);
testFileSize = 1.MB();
this.log = log;
this.fileManager = fileManager;
}
public void AssertFullDownloadInterconnectivity(IEnumerable<IOnlineCodexNode> nodes, ByteSize testFileSize)
{
this.testFileSize = testFileSize;
helper.AssertFullyConnected(nodes);
}
public string Description()
{
return "Download Connectivity";
}
public string ValidateEntry(Entry entry, Entry[] allEntries)
{
return string.Empty;
}
public PeerConnectionState Check(Entry from, Entry to)
{
return fileManager.ScopedFiles(() => CheckConnectivity(from, to));
}
private PeerConnectionState CheckConnectivity(Entry from, Entry to)
{
var expectedFile = GenerateTestFile(from.Node, to.Node);
var contentId = Stopwatch.Measure(log, "Upload", () => from.Node.UploadFile(expectedFile));
try
{
var downloadedFile = Stopwatch.Measure(log, "Download", () => DownloadFile(to.Node, contentId, expectedFile.Label + "_downloaded"));
expectedFile.AssertIsEqual(downloadedFile);
return PeerConnectionState.Connection;
}
catch
{
// Should an exception occur during the download or file-content assertion,
// We consider that as no-connection for the purpose of this test.
return PeerConnectionState.NoConnection;
}
// Should an exception occur during upload, then this try is inconclusive and we try again next loop.
}
private TrackedFile? DownloadFile(IOnlineCodexNode node, ContentId contentId, string label)
{
return node.DownloadContent(contentId, label);
}
private TrackedFile GenerateTestFile(IOnlineCodexNode uploader, IOnlineCodexNode downloader)
{
var up = uploader.GetName().Replace("<", "").Replace(">", "");
var down = downloader.GetName().Replace("<", "").Replace(">", "");
var label = $"~from:{up}-to:{down}~";
return fileManager.GenerateFile(testFileSize, label);
}
}
}

View File

@ -19,7 +19,7 @@ namespace Tests.PeerDiscoveryTests
[Test] [Test]
public void MetricsDoesNotInterfereWithPeerDiscovery() public void MetricsDoesNotInterfereWithPeerDiscovery()
{ {
//AddCodex(2, s => s.EnableMetrics()); AddCodex(2, s => s.EnableMetrics());
AssertAllNodesConnected(); AssertAllNodesConnected();
} }