2
0
mirror of synced 2025-02-19 19:48:17 +00:00
This commit is contained in:
Ben 2024-10-01 13:38:08 +02:00
parent f6aa122245
commit fbf71e9fe8
No known key found for this signature in database
GPG Key ID: 0F16E812E736C24B
16 changed files with 277 additions and 25 deletions

View File

@ -21,5 +21,8 @@ namespace KubernetesWorkflow
[JsonIgnore]
public IK8sHooks Hooks { get; set; } = new DoNothingK8sHooks();
[JsonIgnore]
public Func<string?, string?> Replacer { get; set; } = s => s;
}
}

View File

@ -11,11 +11,13 @@ namespace KubernetesWorkflow
private readonly string podName;
private readonly string recipeName;
private readonly string k8sNamespace;
private readonly Func<string?, string?> replacer;
private CancellationTokenSource cts;
private Task? worker;
private Exception? workerException;
public CrashWatcher(ILog log, KubernetesClientConfiguration config, string containerName, string podName, string recipeName, string k8sNamespace)
public CrashWatcher(ILog log, KubernetesClientConfiguration config, string containerName, string podName, string recipeName, string k8sNamespace,
Func<string?, string?> replacer)
{
this.log = log;
this.config = config;
@ -23,6 +25,7 @@ namespace KubernetesWorkflow
this.podName = podName;
this.recipeName = recipeName;
this.k8sNamespace = k8sNamespace;
this.replacer = replacer;
cts = new CancellationTokenSource();
}
@ -92,7 +95,7 @@ namespace KubernetesWorkflow
{
using var stream = client.ReadNamespacedPodLog(podName, k8sNamespace, recipeName, previous: true);
var handler = new WriteToFileLogHandler(log, "Crash detected for " + containerName);
handler.Log(stream);
handler.Log(stream, replacer);
}
}
}

View File

@ -12,10 +12,11 @@ namespace KubernetesWorkflow
private readonly ILog log;
private readonly K8sCluster cluster;
private readonly WorkflowNumberSource workflowNumberSource;
private readonly Func<string?, string?> replacer;
private readonly K8sClient client;
public const string PodLabelKey = "pod-uuid";
public K8sController(ILog log, K8sCluster cluster, WorkflowNumberSource workflowNumberSource, string k8sNamespace)
public K8sController(ILog log, K8sCluster cluster, WorkflowNumberSource workflowNumberSource, string k8sNamespace, Func<string?, string?> replacer)
{
this.log = log;
this.cluster = cluster;
@ -23,6 +24,7 @@ namespace KubernetesWorkflow
client = new K8sClient(cluster.GetK8sClientConfig());
K8sNamespace = k8sNamespace;
this.replacer = replacer;
}
public void Dispose()
@ -64,7 +66,7 @@ namespace KubernetesWorkflow
if (waitTillStopped) WaitUntilPodsForDeploymentAreOffline(startResult.Deployment);
}
public void DownloadPodLog(RunningContainer container, ILogHandler logHandler, int? tailLines, bool? previous)
public void DownloadPodLog(RunningContainer container, ILogHandler logHandler, int? tailLines, bool? previous, Func<string?, string?> replacer)
{
log.Debug();
@ -72,7 +74,7 @@ namespace KubernetesWorkflow
var recipeName = container.Recipe.Name;
using var stream = client.Run(c => c.ReadNamespacedPodLog(podName, K8sNamespace, recipeName, tailLines: tailLines, previous: previous));
logHandler.Log(stream);
logHandler.Log(stream, replacer);
}
public string ExecuteCommand(RunningContainer container, string command, params string[] args)
@ -906,7 +908,7 @@ namespace KubernetesWorkflow
var msg = $"Pod crash detected for deployment {deploymentName} (pod:{podName})";
log.Error(msg);
DownloadPodLog(container, new WriteToFileLogHandler(log, msg), tailLines: null, previous: true);
DownloadPodLog(container, new WriteToFileLogHandler(log, msg), tailLines: null, previous: true, replacer);
throw new Exception(msg);
}
@ -952,7 +954,7 @@ namespace KubernetesWorkflow
var podName = GetPodName(container);
var recipeName = container.Recipe.Name;
return new CrashWatcher(log, cluster.GetK8sClientConfig(), containerName, podName, recipeName, K8sNamespace);
return new CrashWatcher(log, cluster.GetK8sClientConfig(), containerName, podName, recipeName, K8sNamespace, replacer);
}
private V1Pod[] FindPodsByLabel(string podLabel)

View File

@ -4,19 +4,19 @@ namespace KubernetesWorkflow
{
public interface ILogHandler
{
void Log(Stream log);
void Log(Stream log, Func<string?, string?> replacer);
}
public abstract class LogHandler : ILogHandler
{
public void Log(Stream log)
public void Log(Stream log, Func<string?, string?> replacer)
{
using var reader = new StreamReader(log);
var line = reader.ReadLine();
while (line != null)
{
ProcessLine(line);
line = reader.ReadLine();
line = replacer(reader.ReadLine());
if (line != null) ProcessLine(line);
}
}

View File

@ -28,16 +28,17 @@ namespace KubernetesWorkflow
private readonly WorkflowNumberSource numberSource;
private readonly K8sCluster cluster;
private readonly string k8sNamespace;
private readonly Func<string?, string?> replacer;
private readonly RecipeComponentFactory componentFactory = new RecipeComponentFactory();
private readonly LocationProvider locationProvider;
internal StartupWorkflow(ILog log, WorkflowNumberSource numberSource, K8sCluster cluster, string k8sNamespace)
internal StartupWorkflow(ILog log, WorkflowNumberSource numberSource, K8sCluster cluster, string k8sNamespace, Func<string?, string?> replacer)
{
this.log = log;
this.numberSource = numberSource;
this.cluster = cluster;
this.k8sNamespace = k8sNamespace;
this.replacer = replacer;
locationProvider = new LocationProvider(log, K8s);
}
@ -119,7 +120,7 @@ namespace KubernetesWorkflow
{
K8s(controller =>
{
controller.DownloadPodLog(container, logHandler, tailLines, previous);
controller.DownloadPodLog(container, logHandler, tailLines, previous, replacer);
});
}
@ -131,7 +132,7 @@ namespace KubernetesWorkflow
K8s(controller =>
{
controller.DownloadPodLog(container, logHandler, tailLines, previous);
controller.DownloadPodLog(container, logHandler, tailLines, previous, replacer);
});
return new DownloadedLog(logHandler, container.Name);
@ -257,7 +258,7 @@ namespace KubernetesWorkflow
{
try
{
var controller = new K8sController(log, cluster, numberSource, k8sNamespace);
var controller = new K8sController(log, cluster, numberSource, k8sNamespace, replacer);
action(controller);
controller.Dispose();
}
@ -272,7 +273,7 @@ namespace KubernetesWorkflow
{
try
{
var controller = new K8sController(log, cluster, numberSource, k8sNamespace);
var controller = new K8sController(log, cluster, numberSource, k8sNamespace, replacer);
var result = action(controller);
controller.Dispose();
return result;

View File

@ -25,7 +25,7 @@ namespace KubernetesWorkflow
var workflowNumberSource = new WorkflowNumberSource(numberSource.GetNextNumber(),
containerNumberSource);
return new StartupWorkflow(log, workflowNumberSource, cluster, GetNamespace(namespaceOverride));
return new StartupWorkflow(log, workflowNumberSource, cluster, GetNamespace(namespaceOverride), configuration.Replacer);
}
private string GetNamespace(string? namespaceOverride)

View File

@ -7,7 +7,7 @@ namespace CodexDiscordBotPlugin
public class RewarderBotContainerRecipe : ContainerRecipeFactory
{
public override string AppName => "discordbot-rewarder";
public override string Image => "codexstorage/codex-rewarderbot:sha-8033da1";
public override string Image => "codexstorage/codex-rewarderbot:sha-fb25372";
protected override void Initialize(StartupConfig startupConfig)
{

View File

@ -7,7 +7,8 @@ namespace CodexPlugin
{
public class CodexContainerRecipe : ContainerRecipeFactory
{
private const string DefaultDockerImage = "codexstorage/nim-codex:0.1.4";
private const string DefaultDockerImage = "thatbenbierens/nim-codex:netpeerdebug6";
//"codexstorage/nim-codex:0.1.4";
public const string ApiPortTag = "codex_api_port";
public const string ListenPortTag = "codex_listen_port";
public const string MetricsPortTag = "codex_metrics_port";

View File

@ -0,0 +1,57 @@
using CodexContractsPlugin.ChainMonitor;
using GethPlugin;
using Logging;
using System.Numerics;
namespace CodexTests.BasicTests
{
public class EventLogginHandler : IChainStateChangeHandler
{
private readonly ILog log;
public EventLogginHandler(ILog log)
{
this.log = log;
}
public void OnNewRequest(RequestEvent requestEvent)
{
Log(nameof(OnNewRequest), requestEvent);
}
public void OnRequestCancelled(RequestEvent requestEvent)
{
Log(nameof(OnRequestCancelled), requestEvent);
}
public void OnRequestFailed(RequestEvent requestEvent)
{
Log(nameof(OnRequestFailed), requestEvent);
}
public void OnRequestFinished(RequestEvent requestEvent)
{
Log(nameof(OnRequestFinished), requestEvent);
}
public void OnRequestFulfilled(RequestEvent requestEvent)
{
Log(nameof(OnRequestFulfilled), requestEvent);
}
public void OnSlotFilled(RequestEvent requestEvent, EthAddress host, BigInteger slotIndex)
{
Log(nameof(OnSlotFilled), requestEvent, host.ToString(), slotIndex.ToString());
}
public void OnSlotFreed(RequestEvent requestEvent, BigInteger slotIndex)
{
Log(nameof(OnNewRequest), requestEvent, slotIndex.ToString());
}
private void Log(string name, object o, params string[] str)
{
log.Log(name + ": " + o.ToString() + " - " + string.Join(",", str));
}
}
}

View File

@ -2,6 +2,7 @@
using DistTestCore;
using GethPlugin;
using MetricsPlugin;
using Nethereum.JsonRpc.Client;
using NUnit.Framework;
using Utils;
@ -10,6 +11,18 @@ namespace CodexTests.BasicTests
[TestFixture]
public class ExampleTests : CodexDistTest
{
[Test]
public void A()
{
var oneMb = GenerateTestFile(1.MB(), "oneMB");
var fiveMb = GenerateTestFile(5.MB(), "fiveMb");
var tenMb = GenerateTestFile(10.MB(), "tenMb");
var hundredMb = GenerateTestFile(100.MB(), "hundredMb");
var oneGb = GenerateTestFile(1.GB(), "oneGb");
var a = 0;
}
[Test]
public void CodexLogExample()
{
@ -20,11 +33,41 @@ namespace CodexTests.BasicTests
var localDatasets = primary.LocalFiles();
CollectionAssert.Contains(localDatasets.Content.Select(c => c.Cid), cid);
var nameMap = new Dictionary<string, string>();
AddNameMapping(nameMap, primary);
Get().Replacer = line =>
{
if (line == null) return null;
foreach (var pair in nameMap)
{
line = line.Replace(pair.Key, pair.Value);
}
return line;
};
var log = Ci.DownloadLog(primary);
log.AssertLogContains("Uploaded file");
}
private void AddNameMapping(Dictionary<string, string> nameMap, ICodexNode node)
{
var name = node.GetName();
var info = node.GetDebugInfo();
var nodeId = info.Table.LocalNode.NodeId;
var peerId = info.Table.LocalNode.PeerId;
nameMap.Add(nodeId, name);
nameMap.Add(peerId, name);
nameMap.Add(CodexUtils.ToShortId(nodeId), name);
nameMap.Add(CodexUtils.ToShortId(peerId), name);
nameMap.Add(CodexUtils.ToNodeIdShortId(nodeId), name);
nameMap.Add(CodexUtils.ToNodeIdShortId(peerId), name);
}
[Test]
public void TwoMetricsExample()
{

View File

@ -1,4 +1,5 @@
using CodexContractsPlugin;
using CodexContractsPlugin.ChainMonitor;
using CodexContractsPlugin.Marketplace;
using CodexPlugin;
using FileUtils;
@ -113,6 +114,119 @@ namespace CodexTests.BasicTests
Assert.That(contracts.GetRequestState(request), Is.EqualTo(RequestState.Finished));
}
[Test]
[Combinatorial]
public void FindBug(
[Values(64)] int numBlocks,
[Values(0)] int plusSizeKb,
[Values(0)] int plusSizeBytes
)
{
var numberOfHosts = 15;
var hostInitialBalance = 234.Tst();
var clientInitialBalance = 100000.Tst();
var fileSize = new ByteSize(
numBlocks * (64 * 1024) +
plusSizeKb * 1024 +
plusSizeBytes
);
var geth = Ci.StartGethNode(s => s.IsMiner().WithName("disttest-geth"));
var contracts = Ci.StartCodexContracts(geth);
var hosts = StartCodex(numberOfHosts, s => s
.WithName("Host")
.WithLogLevel(CodexLogLevel.Trace, new CodexLogCustomTopics(CodexLogLevel.Error, CodexLogLevel.Error, CodexLogLevel.Warn)
{
ContractClock = CodexLogLevel.Trace,
})
.WithStorageQuota(11.GB())
.EnableMarketplace(geth, contracts, m => m
.WithInitial(10.Eth(), hostInitialBalance)
.AsStorageNode()
.AsValidator()));
foreach (var host in hosts)
{
AssertBalance(contracts, host, Is.EqualTo(hostInitialBalance));
var availability = new StorageAvailability(
totalSpace: 10.GB(),
maxDuration: TimeSpan.FromMinutes(30),
minPriceForTotalSpace: 1.TstWei(),
maxCollateral: 20.TstWei()
);
host.Marketplace.MakeStorageAvailable(availability);
}
var client = StartCodex(s => s
.WithName("Client")
.EnableMarketplace(geth, contracts, m => m
.WithInitial(10.Eth(), clientInitialBalance)));
var nameMap = new Dictionary<string, string>();
AddNameMapping(nameMap, client);
foreach (var host in hosts) AddNameMapping(nameMap, host);
Get().Replacer = line =>
{
if (line == null) return null;
foreach (var pair in nameMap)
{
line = line.Replace(pair.Key, pair.Value);
}
return line;
};
var handler = new EventLogginHandler(GetTestLog());
var chainState = new ChainState(GetTestLog(), contracts, handler, GetTestRunTimeRange().From);
Task.Run(() =>
{
while (true)
{
chainState.Update();
Thread.Sleep(2000);
}
});
while (true)
{
var testFile = CreateFile(fileSize);
var uploadCid = client.UploadFile(testFile);
var purchase = new StoragePurchaseRequest(uploadCid)
{
PricePerSlotPerSecond = 2.TstWei(),
RequiredCollateral = 10.TstWei(),
MinRequiredNumberOfNodes = 5,
NodeFailureTolerance = 2,
ProofProbability = 5,
Duration = TimeSpan.FromMinutes(20),
Expiry = TimeSpan.FromMinutes(10)
};
var purchaseContract = client.Marketplace.RequestStorage(purchase);
purchaseContract.WaitForStorageContractStarted();
}
}
private void AddNameMapping(Dictionary<string, string> nameMap, ICodexNode node)
{
var name = node.GetName();
var info = node.GetDebugInfo();
var nodeId = info.Table.LocalNode.NodeId;
var peerId = info.Table.LocalNode.PeerId;
nameMap.Add(nodeId, name);
nameMap.Add(peerId, name);
nameMap.Add(CodexUtils.ToShortId(nodeId), name);
nameMap.Add(CodexUtils.ToShortId(peerId), name);
nameMap.Add(CodexUtils.ToNodeIdShortId(nodeId), name);
nameMap.Add(CodexUtils.ToNodeIdShortId(peerId), name);
}
private TrackedFile CreateFile(ByteSize fileSize)
{
var segmentSize = new ByteSize(fileSize.SizeInBytes / 4);

View File

@ -22,6 +22,26 @@ namespace CodexTests.BasicTests
testFile.AssertIsEqual(downloadedFile);
}
[Test]
public void FindBug()
{
var uploader = StartCodex();
var downloaders = StartCodex(10);
var start = DateTime.UtcNow;
while ((DateTime.UtcNow - start) < TimeSpan.FromMinutes(15))
{
var cid = uploader.UploadFile(GenerateTestFile(5.MB()));
var loop = Parallel.ForEach(downloaders, d =>
{
d.DownloadContent(cid);
});
Assert.That(loop.IsCompleted);
}
}
[Test]
public void DownloadingUnknownCidDoesNotCauseCrash()
{

View File

@ -29,12 +29,12 @@ namespace DistTestCore
/// </summary>
public bool AlwaysDownloadContainerLogs { get; set; }
public KubernetesWorkflow.Configuration GetK8sConfiguration(ITimeSet timeSet, string k8sNamespace)
public KubernetesWorkflow.Configuration GetK8sConfiguration(ITimeSet timeSet, string k8sNamespace, Func<string?, string?> replacer)
{
return GetK8sConfiguration(timeSet, new DoNothingK8sHooks(), k8sNamespace);
return GetK8sConfiguration(timeSet, new DoNothingK8sHooks(), k8sNamespace, replacer);
}
public KubernetesWorkflow.Configuration GetK8sConfiguration(ITimeSet timeSet, IK8sHooks hooks, string k8sNamespace)
public KubernetesWorkflow.Configuration GetK8sConfiguration(ITimeSet timeSet, IK8sHooks hooks, string k8sNamespace, Func<string?, string?> replacer)
{
var config = new KubernetesWorkflow.Configuration(
kubeConfigFile: kubeConfigFile,
@ -45,6 +45,7 @@ namespace DistTestCore
config.AllowNamespaceOverride = false;
config.Hooks = hooks;
config.Replacer = replacer;
return config;
}

View File

@ -35,7 +35,7 @@ namespace DistTestCore
fixtureLog = new FixtureLog(logConfig, startTime, deployId);
statusLog = new StatusLog(logConfig, startTime, "dist-tests", deployId);
globalEntryPoint = new EntryPoint(fixtureLog, configuration.GetK8sConfiguration(new DefaultTimeSet(), TestNamespacePrefix), configuration.GetFileManagerFolder());
globalEntryPoint = new EntryPoint(fixtureLog, configuration.GetK8sConfiguration(new DefaultTimeSet(), TestNamespacePrefix, s => s), configuration.GetFileManagerFolder());
Initialize(fixtureLog);
}

View File

@ -25,7 +25,7 @@ namespace DistTestCore
TestNamespace = testNamespace;
TestStart = DateTime.UtcNow;
entryPoint = new EntryPoint(log, configuration.GetK8sConfiguration(timeSet, this, testNamespace), configuration.GetFileManagerFolder(), timeSet);
entryPoint = new EntryPoint(log, configuration.GetK8sConfiguration(timeSet, this, testNamespace, InternalReplacer), configuration.GetFileManagerFolder(), timeSet);
metadata = entryPoint.GetPluginMetadata();
CoreInterface = entryPoint.CreateInterface();
this.deployId = deployId;
@ -33,6 +33,11 @@ namespace DistTestCore
log.WriteLogTag();
}
private string? InternalReplacer(string? arg)
{
return Replacer(arg);
}
public DateTime TestStart { get; }
public TestLog Log { get; }
public Configuration Configuration { get; }
@ -40,6 +45,7 @@ namespace DistTestCore
public string TestNamespace { get; }
public bool WaitForCleanup { get; }
public CoreInterface CoreInterface { get; }
public Func<string?, string?> Replacer { get; set; } = s => s;
public void DeleteAllResources()
{

View File

@ -21,6 +21,7 @@ namespace FrameworkTests.OverwatchTranscript
[Test]
public void WriteAndRun()
{
// unstable.
WriteTranscript();
ReadTranscript();