Cleanup of full connectivity helpers

This commit is contained in:
benbierens 2023-07-18 16:07:52 +02:00
parent bfefba9633
commit bbf66bb0f0
No known key found for this signature in database
GPG Key ID: FE44815D96D0A1AA
3 changed files with 273 additions and 258 deletions

View File

@ -0,0 +1,211 @@
using DistTestCore.Codex;
using NUnit.Framework;
using Utils;
namespace DistTestCore.Helpers
{
public interface IFullConnectivityImplementation
{
string Description();
string ValidateEntry(FullConnectivityHelper.Entry entry, FullConnectivityHelper.Entry[] allEntries);
FullConnectivityHelper.PeerConnectionState Check(FullConnectivityHelper.Entry from, FullConnectivityHelper.Entry to);
}
public class FullConnectivityHelper
{
private static string Nl = Environment.NewLine;
private readonly DistTest test;
private readonly IFullConnectivityImplementation implementation;
public FullConnectivityHelper(DistTest test, IFullConnectivityImplementation implementation)
{
this.test = test;
this.implementation = implementation;
}
public void AssertFullyConnected(IEnumerable<IOnlineCodexNode> nodes)
{
AssertFullyConnected(nodes.ToArray());
}
private void AssertFullyConnected(IOnlineCodexNode[] nodes)
{
test.Log($"Asserting '{implementation.Description()}' for nodes: '{string.Join(",", nodes.Select(n => n.GetName()))}'...");
var entries = CreateEntries(nodes);
var pairs = CreatePairs(entries);
RetryWhilePairs(pairs, () =>
{
CheckAndRemoveSuccessful(pairs);
});
if (pairs.Any())
{
var pairDetails = string.Join(Nl, pairs.SelectMany(p => p.GetResultMessages()));
test.Log($"Connections failed:{Nl}{pairDetails}");
Assert.Fail(string.Join(Nl, pairs.SelectMany(p => p.GetResultMessages())));
}
else
{
test.Log($"'{implementation.Description()}' = Success! for nodes: {string.Join(",", nodes.Select(n => n.GetName()))}");
}
}
private static void RetryWhilePairs(List<Pair> pairs, Action action)
{
var timeout = DateTime.UtcNow + TimeSpan.FromMinutes(5);
while (pairs.Any(p => p.Inconclusive) && timeout > DateTime.UtcNow)
{
action();
Time.Sleep(TimeSpan.FromSeconds(2));
}
}
private void CheckAndRemoveSuccessful(List<Pair> pairs)
{
// For large sets, don't try and do all of them at once.
var selectedPair = pairs.Take(20).ToArray();
var pairDetails = new List<string>();
foreach (var pair in selectedPair)
{
test.ScopedTestFiles(pair.Check);
if (pair.Success)
{
pairDetails.AddRange(pair.GetResultMessages());
pairs.Remove(pair);
}
}
test.Log($"Connections successful:{Nl}{string.Join(Nl, pairDetails)}");
}
private Entry[] CreateEntries(IOnlineCodexNode[] nodes)
{
var entries = nodes.Select(n => new Entry(n)).ToArray();
var errors = entries
.Select(e => implementation.ValidateEntry(e, entries))
.Where(s => !string.IsNullOrEmpty(s))
.ToArray();
if (errors.Any())
{
Assert.Fail("Some node entries failed to validate: " + string.Join(Nl, errors));
}
return entries;
}
private List<Pair> CreatePairs(Entry[] entries)
{
return CreatePairsIterator(entries).ToList();
}
private IEnumerable<Pair> CreatePairsIterator(Entry[] entries)
{
for (var x = 0; x < entries.Length; x++)
{
for (var y = x + 1; y < entries.Length; y++)
{
yield return new Pair(implementation, entries[x], entries[y]);
}
}
}
public class Entry
{
public Entry(IOnlineCodexNode node)
{
Node = node;
Response = node.GetDebugInfo();
}
public IOnlineCodexNode Node { get; }
public CodexDebugResponse Response { get; }
public override string ToString()
{
if (Response == null || string.IsNullOrEmpty(Response.id)) return "UNKNOWN";
return Response.id;
}
}
public enum PeerConnectionState
{
Unknown,
Connection,
NoConnection,
}
public class Pair
{
private TimeSpan aToBTime = TimeSpan.FromSeconds(0);
private TimeSpan bToATime = TimeSpan.FromSeconds(0);
private readonly IFullConnectivityImplementation implementation;
public Pair(IFullConnectivityImplementation implementation, Entry a, Entry b)
{
this.implementation = implementation;
A = a;
B = b;
}
public Entry A { get; }
public Entry B { get; }
public PeerConnectionState AKnowsB { get; private set; }
public PeerConnectionState BKnowsA { get; private set; }
public bool Success { get { return AKnowsB == PeerConnectionState.Connection && BKnowsA == PeerConnectionState.Connection; } }
public bool Inconclusive { get { return AKnowsB == PeerConnectionState.Unknown || BKnowsA == PeerConnectionState.Unknown; } }
public void Check()
{
aToBTime = Measure(() => AKnowsB = Check(A, B));
bToATime = Measure(() => BKnowsA = Check(B, A));
}
public override string ToString()
{
return $"[{string.Join(",", GetResultMessages())}]";
}
public string[] GetResultMessages()
{
var aName = A.ToString();
var bName = B.ToString();
return new[]
{
$"[{aName} --> {bName}] = {AKnowsB} ({aToBTime.TotalSeconds} seconds)",
$"[{aName} <-- {bName}] = {BKnowsA} ({bToATime.TotalSeconds} seconds)"
};
}
private static TimeSpan Measure(Action action)
{
var start = DateTime.UtcNow;
action();
return DateTime.UtcNow - start;
}
private PeerConnectionState Check(Entry from, Entry to)
{
Thread.Sleep(10);
try
{
return implementation.Check(from, to);
}
catch
{
// Didn't get a conclusive answer. Try again later.
return PeerConnectionState.Unknown;
}
}
}
}
}

View File

@ -1,251 +1,66 @@
using DistTestCore.Codex;
using NUnit.Framework;
using Utils;
using static DistTestCore.Helpers.FullConnectivityHelper;
namespace DistTestCore.Helpers
{
public class PeerConnectionTestHelpers
public class PeerConnectionTestHelpers : IFullConnectivityImplementation
{
private static string Nl = Environment.NewLine;
private readonly Random random = new Random();
private readonly DistTest test;
private readonly FullConnectivityHelper helper;
public PeerConnectionTestHelpers(DistTest test)
{
this.test = test;
helper = new FullConnectivityHelper(test, this);
}
public void AssertFullyConnected(IEnumerable<IOnlineCodexNode> nodes)
{
var n = nodes.ToArray();
AssertFullyConnected(n);
for (int i = 0; i < 5; i++)
{
Time.Sleep(TimeSpan.FromSeconds(30));
AssertFullyConnected(n);
}
helper.AssertFullyConnected(nodes);
}
private void AssertFullyConnected(IOnlineCodexNode[] nodes)
public string Description()
{
test.Log($"Asserting peers are fully-connected for nodes: '{string.Join(",", nodes.Select(n => n.GetName()))}'...");
var entries = CreateEntries(nodes);
var pairs = CreatePairs(entries);
RetryWhilePairs(pairs, () =>
{
CheckAndRemoveSuccessful(pairs);
});
if (pairs.Any())
{
var pairDetails = string.Join(Nl, pairs.SelectMany(p => p.GetResultMessages()));
test.Log($"Connections failed:{Nl}{pairDetails}");
Assert.Fail(string.Join(Nl, pairs.SelectMany(p => p.GetResultMessages())));
}
else
{
test.Log($"Success! Peers are fully-connected: {string.Join(",", nodes.Select(n => n.GetName()))}");
}
return "Peer Discovery";
}
private static void RetryWhilePairs(List<Pair> pairs, Action action)
public string ValidateEntry(Entry entry, Entry[] allEntries)
{
var timeout = DateTime.UtcNow + TimeSpan.FromMinutes(5);
while (pairs.Any(p => p.Inconclusive) && timeout > DateTime.UtcNow)
var result = string.Empty;
foreach (var peer in entry.Response.table.nodes)
{
action();
Time.Sleep(TimeSpan.FromSeconds(2));
}
}
private void CheckAndRemoveSuccessful(List<Pair> pairs)
{
// For large sets, don't try and do all of them at once.
var checkTasks = pairs.Take(20).Select(p => Task.Run(() =>
{
ApplyRandomDelay();
p.Check();
})).ToArray();
Task.WaitAll(checkTasks);
var pairDetails = new List<string>();
foreach (var pair in pairs.ToArray())
{
if (pair.Success)
var expected = GetExpectedDiscoveryEndpoint(allEntries, peer);
if (expected != peer.address)
{
pairDetails.AddRange(pair.GetResultMessages());
pairs.Remove(pair);
result += $"Node:{entry.Node.GetName()} has incorrect peer table entry. Was: '{peer.address}', expected: '{expected}'. ";
}
}
test.Log($"Connections successful:{Nl}{string.Join(Nl, pairDetails)}");
return result;
}
private static Entry[] CreateEntries(IOnlineCodexNode[] nodes)
public PeerConnectionState Check(Entry from, Entry to)
{
var entries = nodes.Select(n => new Entry(n)).ToArray();
var incorrectDiscoveryEndpoints = entries.SelectMany(e => e.GetInCorrectDiscoveryEndpoints(entries)).ToArray();
var peerId = to.Response.id;
if (incorrectDiscoveryEndpoints.Any())
var response = from.Node.GetDebugPeer(peerId);
if (!response.IsPeerFound)
{
Assert.Fail("Some nodes contain peer records with incorrect discovery ip/port information: " +
string.Join(Nl, incorrectDiscoveryEndpoints));
return PeerConnectionState.NoConnection;
}
return entries;
if (!string.IsNullOrEmpty(response.peerId) && response.addresses.Any())
{
return PeerConnectionState.Connection;
}
return PeerConnectionState.Unknown;
}
private static List<Pair> CreatePairs(Entry[] entries)
private static string GetExpectedDiscoveryEndpoint(Entry[] allEntries, CodexDebugTableNodeResponse node)
{
return CreatePairsIterator(entries).ToList();
}
var peer = allEntries.SingleOrDefault(e => e.Response.table.localNode.peerId == node.peerId);
if (peer == null) return $"peerId: {node.peerId} is not known.";
private static IEnumerable<Pair> CreatePairsIterator(Entry[] entries)
{
for (var x = 0; x < entries.Length; x++)
{
for (var y = x + 1; y < entries.Length; y++)
{
yield return new Pair(entries[x], entries[y]);
}
}
}
private void ApplyRandomDelay()
{
// Calling all the nodes all at the same time is not exactly nice.
Time.Sleep(TimeSpan.FromMicroseconds(random.Next(10, 1000)));
}
public class Entry
{
public Entry(IOnlineCodexNode node)
{
Node = node;
Response = node.GetDebugInfo();
}
public IOnlineCodexNode Node { get; }
public CodexDebugResponse Response { get; }
public IEnumerable<string> GetInCorrectDiscoveryEndpoints(Entry[] allEntries)
{
foreach (var peer in Response.table.nodes)
{
var expected = GetExpectedDiscoveryEndpoint(allEntries, peer);
if (expected != peer.address)
{
yield return $"Node:{Node.GetName()} has incorrect peer table entry. Was: '{peer.address}', expected: '{expected}'";
}
}
}
public override string ToString()
{
if (Response == null || string.IsNullOrEmpty(Response.id)) return "UNKNOWN";
return Response.id;
}
private static string GetExpectedDiscoveryEndpoint(Entry[] allEntries, CodexDebugTableNodeResponse node)
{
var peer = allEntries.SingleOrDefault(e => e.Response.table.localNode.peerId == node.peerId);
if (peer == null) return $"peerId: {node.peerId} is not known.";
var n = (OnlineCodexNode)peer.Node;
var ip = n.CodexAccess.Container.Pod.PodInfo.Ip;
var discPort = n.CodexAccess.Container.Recipe.GetPortByTag(CodexContainerRecipe.DiscoveryPortTag);
return $"{ip}:{discPort.Number}";
}
}
public enum PeerConnectionState
{
Unknown,
Connection,
NoConnection,
}
public class Pair
{
private TimeSpan aToBTime = TimeSpan.FromSeconds(0);
private TimeSpan bToATime = TimeSpan.FromSeconds(0);
public Pair(Entry a, Entry b)
{
A = a;
B = b;
}
public Entry A { get; }
public Entry B { get; }
public PeerConnectionState AKnowsB { get; private set; }
public PeerConnectionState BKnowsA { get; private set; }
public bool Success { get { return AKnowsB == PeerConnectionState.Connection && BKnowsA == PeerConnectionState.Connection; } }
public bool Inconclusive { get { return AKnowsB == PeerConnectionState.Unknown || BKnowsA == PeerConnectionState.Unknown; } }
public void Check()
{
aToBTime = Measure(() => AKnowsB = Knows(A, B));
bToATime = Measure(() => BKnowsA = Knows(B, A));
}
public override string ToString()
{
return $"[{string.Join(",", GetResultMessages())}]";
}
public string[] GetResultMessages()
{
var aName = A.ToString();
var bName = B.ToString();
return new[]
{
$"[{aName} --> {bName}] = {AKnowsB} ({aToBTime.TotalSeconds} seconds)",
$"[{aName} <-- {bName}] = {BKnowsA} ({bToATime.TotalSeconds} seconds)"
};
}
private static TimeSpan Measure(Action action)
{
var start = DateTime.UtcNow;
action();
return DateTime.UtcNow - start;
}
private PeerConnectionState Knows(Entry a, Entry b)
{
lock (a)
{
Thread.Sleep(10);
var peerId = b.Response.id;
try
{
var response = a.Node.GetDebugPeer(peerId);
if (!response.IsPeerFound)
{
return PeerConnectionState.NoConnection;
}
if (!string.IsNullOrEmpty(response.peerId) && response.addresses.Any())
{
return PeerConnectionState.Connection;
}
}
catch
{
}
// Didn't get a conclusive answer. Try again later.
return PeerConnectionState.Unknown;
}
}
var n = (OnlineCodexNode)peer.Node;
var ip = n.CodexAccess.Container.Pod.PodInfo.Ip;
var discPort = n.CodexAccess.Container.Recipe.GetPortByTag(CodexContainerRecipe.DiscoveryPortTag);
return $"{ip}:{discPort.Number}";
}
}
}

View File

@ -1,74 +1,63 @@
using DistTestCore.Codex;
using NUnit.Framework;
using static DistTestCore.Helpers.FullConnectivityHelper;
namespace DistTestCore.Helpers
{
public class PeerDownloadTestHelpers
public class PeerDownloadTestHelpers : IFullConnectivityImplementation
{
private readonly FullConnectivityHelper helper;
private readonly DistTest test;
private ByteSize testFileSize;
public PeerDownloadTestHelpers(DistTest test)
{
helper = new FullConnectivityHelper(test, this);
testFileSize = 1.MB();
this.test = test;
}
public void AssertFullDownloadInterconnectivity(IEnumerable<IOnlineCodexNode> nodes, ByteSize testFileSize)
{
test.Log($"Asserting full download interconnectivity for nodes: '{string.Join(",", nodes.Select(n => n.GetName()))}'...");
var start = DateTime.UtcNow;
foreach (var node in nodes)
{
var uploader = node;
var downloaders = nodes.Where(n => n != uploader).ToArray();
test.ScopedTestFiles(() =>
{
PerformTest(uploader, downloaders, testFileSize);
});
}
test.Log($"Success! Full download interconnectivity for nodes: {string.Join(",", nodes.Select(n => n.GetName()))}");
var timeTaken = DateTime.UtcNow - start;
AssertTimePerMB(timeTaken, nodes.Count(), testFileSize);
this.testFileSize = testFileSize;
helper.AssertFullyConnected(nodes);
}
private void AssertTimePerMB(TimeSpan timeTaken, int numberOfNodes, ByteSize size)
public string Description()
{
var numberOfDownloads = numberOfNodes * (numberOfNodes - 1);
var timePerDownload = timeTaken / numberOfDownloads;
float sizeInMB = size.ToMB();
var timePerMB = timePerDownload / sizeInMB;
test.Log($"Performed {numberOfDownloads} downloads of {size} in {timeTaken.TotalSeconds} seconds, for an average of {timePerMB.TotalSeconds} seconds per MB.");
Assert.That(timePerMB, Is.LessThan(CodexContainerRecipe.MaxDownloadTimePerMegabyte), "MaxDownloadTimePerMegabyte performance threshold breached.");
return "Download Connectivity";
}
private void PerformTest(IOnlineCodexNode uploader, IOnlineCodexNode[] downloaders, ByteSize testFileSize)
public string ValidateEntry(Entry entry, Entry[] allEntries)
{
// Generate 1 test file per downloader.
var files = downloaders.Select(d => GenerateTestFile(uploader, d, testFileSize)).ToArray();
return string.Empty;
}
// Upload all the test files to the uploader.
var contentIds = files.Select(uploader.UploadFile).ToArray();
public PeerConnectionState Check(Entry from, Entry to)
{
var expectedFile = GenerateTestFile(from.Node, to.Node);
// Each downloader should retrieve its own test file.
for (var i = 0; i < downloaders.Length; i++)
var contentId = from.Node.UploadFile(expectedFile);
try
{
var expectedFile = files[i];
var downloadedFile = downloaders[i].DownloadContent(contentIds[i], $"{expectedFile.Label}DOWNLOADED");
var downloadedFile = to.Node.DownloadContent(contentId, expectedFile.Label + "_downloaded");
expectedFile.AssertIsEqual(downloadedFile);
return PeerConnectionState.Connection;
}
catch
{
// Should an exception occur during the download or file-content assertion,
// We consider that as no-connection for the purpose of this test.
return PeerConnectionState.NoConnection;
}
// Should an exception occur during upload, then this try is inconclusive and we try again next loop.
}
private TestFile GenerateTestFile(IOnlineCodexNode uploader, IOnlineCodexNode downloader, ByteSize testFileSize)
private TestFile GenerateTestFile(IOnlineCodexNode uploader, IOnlineCodexNode downloader)
{
var up = uploader.GetName().Replace("<", "").Replace(">", "");
var down = downloader.GetName().Replace("<", "").Replace(">", "");
var label = $"FROM{up}TO{down}";
var label = $"~from:{up}-to:{down}~";
return test.GenerateTestFile(testFileSize, label);
}
}