wires crashWatcher into CodexStarter

This commit is contained in:
benbierens 2023-08-15 11:01:18 +02:00
parent 72fa368357
commit 08ee09af1e
No known key found for this signature in database
GPG Key ID: FE44815D96D0A1AA
7 changed files with 171 additions and 68 deletions

View File

@ -4,10 +4,11 @@ using Utils;
namespace DistTestCore.Codex namespace DistTestCore.Codex
{ {
public class CodexAccess public class CodexAccess : ILogHandler
{ {
private readonly BaseLog log; private readonly BaseLog log;
private readonly ITimeSet timeSet; private readonly ITimeSet timeSet;
private bool hasContainerCrashed;
public CodexAccess(BaseLog log, RunningContainer container, ITimeSet timeSet, Address address) public CodexAccess(BaseLog log, RunningContainer container, ITimeSet timeSet, Address address)
{ {
@ -15,6 +16,9 @@ namespace DistTestCore.Codex
Container = container; Container = container;
this.timeSet = timeSet; this.timeSet = timeSet;
Address = address; Address = address;
hasContainerCrashed = false;
if (container.CrashWatcher != null) container.CrashWatcher.Start(this);
} }
public RunningContainer Container { get; } public RunningContainer Container { get; }
@ -86,7 +90,30 @@ namespace DistTestCore.Codex
private Http Http() private Http Http()
{ {
CheckContainerCrashed();
return new Http(log, timeSet, Address, baseUrl: "/api/codex/v1", Container.Name); return new Http(log, timeSet, Address, baseUrl: "/api/codex/v1", Container.Name);
} }
private void CheckContainerCrashed()
{
if (hasContainerCrashed) throw new Exception("Container has crashed.");
}
public void Log(Stream crashLog)
{
var file = log.CreateSubfile();
log.Log($"Container {Container.Name} has crashed. Downloading crash log to '{file.FullFilename}'...");
using var reader = new StreamReader(crashLog);
var line = reader.ReadLine();
while (line != null)
{
file.Write(line);
line = reader.ReadLine();
}
log.Log("Crash log successfully downloaded.");
hasContainerCrashed = true;
}
} }
} }

View File

@ -47,7 +47,11 @@ namespace DistTestCore
{ {
LogStart($"Stopping {group.Describe()}..."); LogStart($"Stopping {group.Describe()}...");
var workflow = CreateWorkflow(); var workflow = CreateWorkflow();
foreach (var c in group.Containers) workflow.Stop(c); foreach (var c in group.Containers)
{
StopCrashWatcher(c);
workflow.Stop(c);
}
RunningGroups.Remove(group); RunningGroups.Remove(group);
LogEnd("Stopped."); LogEnd("Stopped.");
} }
@ -96,7 +100,9 @@ namespace DistTestCore
for (var i = 0; i < numberOfNodes; i++) for (var i = 0; i < numberOfNodes; i++)
{ {
var workflow = CreateWorkflow(); var workflow = CreateWorkflow();
result.Add(workflow.Start(1, location, recipe, startupConfig)); var rc = workflow.Start(1, location, recipe, startupConfig);
CreateCrashWatcher(workflow, rc);
result.Add(rc);
} }
return result.ToArray(); return result.ToArray();
} }
@ -134,5 +140,19 @@ namespace DistTestCore
{ {
Log("----------------------------------------------------------------------------"); Log("----------------------------------------------------------------------------");
} }
private void CreateCrashWatcher(StartupWorkflow workflow, RunningContainers rc)
{
var c = rc.Containers.Single();
c.CrashWatcher = workflow.CreateCrashWatcher(c);
}
private void StopCrashWatcher(RunningContainers containers)
{
foreach (var c in containers.Containers)
{
c.CrashWatcher?.Stop();
}
}
} }
} }

View File

@ -0,0 +1,85 @@
using k8s;
using Logging;
namespace KubernetesWorkflow
{
public class CrashWatcher
{
private readonly BaseLog log;
private readonly KubernetesClientConfiguration config;
private readonly string k8sNamespace;
private readonly RunningContainer container;
private ILogHandler? logHandler;
private CancellationTokenSource cts;
private Task? worker;
private Exception? workerException;
public CrashWatcher(BaseLog log, KubernetesClientConfiguration config, string k8sNamespace, RunningContainer container)
{
this.log = log;
this.config = config;
this.k8sNamespace = k8sNamespace;
this.container = container;
cts = new CancellationTokenSource();
}
public void Start(ILogHandler logHandler)
{
if (worker != null) throw new InvalidOperationException();
this.logHandler = logHandler;
cts = new CancellationTokenSource();
worker = Task.Run(Worker);
}
public void Stop()
{
if (worker == null) throw new InvalidOperationException();
cts.Cancel();
worker.Wait();
worker = null;
if (workerException != null) throw new Exception("Exception occurred in CrashWatcher worker thread.", workerException);
}
private void Worker()
{
try
{
MonitorContainer(cts.Token);
}
catch (Exception ex)
{
workerException = ex;
}
}
private void MonitorContainer(CancellationToken token)
{
var client = new Kubernetes(config);
while (!token.IsCancellationRequested)
{
token.WaitHandle.WaitOne(TimeSpan.FromSeconds(10));
var pod = container.Pod;
var recipe = container.Recipe;
var podName = pod.PodInfo.Name;
var podInfo = client.ReadNamespacedPod(podName, k8sNamespace);
if (podInfo.Status.ContainerStatuses.Any(c => c.RestartCount > 0))
{
DownloadCrashedContainerLogs(client, podName, recipe);
return;
}
}
}
private void DownloadCrashedContainerLogs(Kubernetes client, string podName, ContainerRecipe recipe)
{
log.Log("Pod crash detected for " + container.Name);
using var stream = client.ReadNamespacedPodLog(podName, k8sNamespace, recipe.Name, previous: true);
logHandler!.Log(stream);
}
}
}

View File

@ -604,29 +604,9 @@ namespace KubernetesWorkflow
#endregion #endregion
public Task WatchForCrashLogs(RunningContainer container, CancellationToken token, ILogHandler logHandler) public CrashWatcher CreateCrashWatcher(RunningContainer container)
{ {
return Task.Run(() => return new CrashWatcher(log, cluster.GetK8sClientConfig(), K8sTestNamespace, container);
{
var myOwnClient = new Kubernetes(cluster.GetK8sClientConfig());
while (!token.IsCancellationRequested)
{
token.WaitHandle.WaitOne(TimeSpan.FromSeconds(3));
var pod = container.Pod;
var recipe = container.Recipe;
var podName = pod.PodInfo.Name;
var podInfo = myOwnClient.ReadNamespacedPod(podName, K8sTestNamespace);
if (podInfo.Status.ContainerStatuses.Any(c => c.RestartCount > 0))
{
log.Log("Pod crash detected for " + container.Name);
using var stream = myOwnClient.ReadNamespacedPodLog(podName, K8sTestNamespace, recipe.Name, previous: true);
logHandler.Log(stream);
return;
}
}
});
} }
private PodInfo FetchNewPod() private PodInfo FetchNewPod()

View File

@ -1,4 +1,5 @@
using Utils; using Newtonsoft.Json;
using Utils;
namespace KubernetesWorkflow namespace KubernetesWorkflow
{ {
@ -39,6 +40,9 @@ namespace KubernetesWorkflow
public Port[] ServicePorts { get; } public Port[] ServicePorts { get; }
public Address ClusterExternalAddress { get; } public Address ClusterExternalAddress { get; }
public Address ClusterInternalAddress { get; } public Address ClusterInternalAddress { get; }
[JsonIgnore]
public CrashWatcher? CrashWatcher { get; set; }
} }
public static class RunningContainersExtensions public static class RunningContainersExtensions

View File

@ -37,9 +37,9 @@ namespace KubernetesWorkflow
}, pl); }, pl);
} }
public Task WatchForCrashLogs(RunningContainer container, CancellationToken token, ILogHandler logHandler) public CrashWatcher CreateCrashWatcher(RunningContainer container)
{ {
return K8s(controller => controller.WatchForCrashLogs(container, token, logHandler)); return K8s(controller => controller.CreateCrashWatcher(container));
} }
public void Stop(RunningContainers runningContainers) public void Stop(RunningContainers runningContainers)

View File

@ -1,7 +1,6 @@
using DistTestCore; using DistTestCore;
using KubernetesWorkflow; using KubernetesWorkflow;
using NUnit.Framework; using NUnit.Framework;
using System.ComponentModel;
using Utils; using Utils;
namespace Tests.BasicTests namespace Tests.BasicTests
@ -73,53 +72,41 @@ namespace Tests.BasicTests
var nodes = group.Cast<OnlineCodexNode>().ToArray(); var nodes = group.Cast<OnlineCodexNode>().ToArray();
var flow = Get().WorkflowCreator.CreateWorkflow(); //foreach (var node in nodes)
var cst = new CancellationTokenSource(); //{
var tasks = nodes.Select(n => flow.WatchForCrashLogs(n.CodexAccess.Container, cst.Token, this)).ToArray(); // node.Marketplace.MakeStorageAvailable(
// size: 1.GB(),
// minPricePerBytePerSecond: 1.TestTokens(),
// maxCollateral: 1024.TestTokens(),
// maxDuration: TimeSpan.FromMinutes(5));
//}
try Thread.Sleep(2000);
{
//foreach (var node in nodes)
//{
// node.Marketplace.MakeStorageAvailable(
// size: 1.GB(),
// minPricePerBytePerSecond: 1.TestTokens(),
// maxCollateral: 1024.TestTokens(),
// maxDuration: TimeSpan.FromMinutes(5));
//}
Thread.Sleep(2000); Log("calling crash...");
var http = new Http(Get().Log, Get().TimeSet, nodes.First().CodexAccess.Address, baseUrl: "/api/codex/v1", nodes.First().CodexAccess.Container.Name);
var str = http.HttpGetString("debug/crash");
Log("calling crash..."); Log("crash called.");
var http = new Http(Get().Log, Get().TimeSet, nodes.First().CodexAccess.Address, baseUrl: "/api/codex/v1", nodes.First().CodexAccess.Container.Name);
var str = http.HttpGetString("debug/crash");
Log("crash called."); Thread.Sleep(TimeSpan.FromSeconds(60));
Thread.Sleep(TimeSpan.FromSeconds(60)); Log("test done.");
Log("test done."); //var endTime = DateTime.UtcNow + TimeSpan.FromHours(2);
//while (DateTime.UtcNow < endTime)
//{
// foreach (var node in nodes)
// {
// var file = GenerateTestFile(80.MB());
// var cid = node.UploadFile(file);
//var endTime = DateTime.UtcNow + TimeSpan.FromHours(2); // var dl = node.DownloadContent(cid);
//while (DateTime.UtcNow < endTime) // file.AssertIsEqual(dl);
//{ // }
// foreach (var node in nodes)
// {
// var file = GenerateTestFile(80.MB());
// var cid = node.UploadFile(file);
// var dl = node.DownloadContent(cid); // Thread.Sleep(TimeSpan.FromSeconds(30));
// file.AssertIsEqual(dl); //}
// }
// Thread.Sleep(TimeSpan.FromSeconds(30));
//}
}
finally
{
cst.Cancel();
foreach (var t in tasks) t.Wait();
}
} }
public void Log(Stream log) public void Log(Stream log)