diff --git a/Framework/Core/CoreInterface.cs b/Framework/Core/CoreInterface.cs index 6e8e6ef..3b137d8 100644 --- a/Framework/Core/CoreInterface.cs +++ b/Framework/Core/CoreInterface.cs @@ -36,6 +36,11 @@ namespace Core return logHandler.DownloadLog(); } + public Stream MonitorLog(IHasContainer container) + { + return MonitorLog(container.Container); + } + public Stream MonitorLog(RunningContainer container) { var workflow = entryPoint.Tools.CreateWorkflow(); diff --git a/Framework/KubernetesWorkflow/K8sController.cs b/Framework/KubernetesWorkflow/K8sController.cs index 5168f27..ee3c5e8 100644 --- a/Framework/KubernetesWorkflow/K8sController.cs +++ b/Framework/KubernetesWorkflow/K8sController.cs @@ -60,7 +60,7 @@ namespace KubernetesWorkflow public Stream MonitorContainerLog(RunningContainer container) { log.Debug(); - return client.Run(c => c.ReadNamespacedPodLog(container.Pod.PodInfo.Name, K8sNamespace, container.Recipe.Name, follow: true)); + return client.Run(c => c.ReadNamespacedPodLog(container.Pod.PodInfo.Name, K8sNamespace, container.Recipe.Name, follow: true, sinceSeconds: 1)); } public string ExecuteCommand(RunningPod pod, string containerName, string command, params string[] args) diff --git a/Tests/CodexContinuousTests/ContainerLogStream.cs b/Tests/CodexContinuousTests/ContainerLogStream.cs new file mode 100644 index 0000000..7b7a340 --- /dev/null +++ b/Tests/CodexContinuousTests/ContainerLogStream.cs @@ -0,0 +1,85 @@ +using Logging; + +namespace ContinuousTests +{ + public class ContainerLogStream + { + private readonly StreamReader reader; + private readonly Stream stream; + private readonly LogFile targetFile; + private readonly CancellationToken token; + private readonly TaskFactory taskFactory; + private int lastNumber = -1; + public bool Fault { get; private set; } + private bool run; + + public ContainerLogStream(Stream stream, string name, LogFile targetFile, CancellationToken token, TaskFactory taskFactory) + { + this.stream = stream; + this.targetFile = targetFile; + this.token = token; + this.taskFactory = taskFactory; + Fault = false; + reader = new StreamReader(stream); + + targetFile.Write(name); + } + + public void Run() + { + run = true; + taskFactory.Run(() => + { + while (run && !token.IsCancellationRequested) + { + Monitor(); + } + }); + } + + public void Stop() + { + run = false; + stream.Close(); + } + + public void DeleteFile() + { + if (run) throw new Exception("Cannot delete file while stream is still running."); + File.Delete(targetFile.FullFilename); + } + + private void Monitor() + { + var line = reader.ReadLine(); + while (run && !string.IsNullOrEmpty(line) && !token.IsCancellationRequested) + { + ProcessLine(line); + line = reader.ReadLine(); + } + } + + private void ProcessLine(string s) + { + targetFile.WriteRaw(s); + + // 000000004298 + var sub = s.Substring(0, 12); + if (!int.TryParse(sub, out int number)) return; + + if (lastNumber == -1) + { + lastNumber = number; + } + else + { + var expectedNumber = lastNumber + 1; + if (number != expectedNumber) + { + Fault = true; + } + lastNumber = number; + } + } + } +} diff --git a/Tests/CodexContinuousTests/SingleTestRun.cs b/Tests/CodexContinuousTests/SingleTestRun.cs index adfa05f..a046bfc 100644 --- a/Tests/CodexContinuousTests/SingleTestRun.cs +++ b/Tests/CodexContinuousTests/SingleTestRun.cs @@ -62,11 +62,28 @@ namespace ContinuousTests private void RunTest(Action resultHandler) { + var ci = entryPoint.CreateInterface(); + var monitors = nodes.Select(n => new ContainerLogStream( + stream: ci.MonitorLog(n), + name: n.GetName(), + targetFile: fixtureLog.CreateSubfile(), + token: cancelToken, + taskFactory: taskFactory)).ToArray(); + try { + foreach (var m in monitors) m.Run(); + RunTestMoments(); - if (!config.KeepPassedTestLogs) fixtureLog.Delete(); + foreach (var m in monitors) m.Stop(); + if (monitors.Any(m => m.Fault)) throw new Exception("Any faulted"); + + if (!config.KeepPassedTestLogs) + { + fixtureLog.Delete(); + foreach (var m in monitors) m.DeleteFile(); + } resultHandler(true); } catch (Exception ex) @@ -81,9 +98,7 @@ namespace ContinuousTests OverviewLog($"Failures: {failureCount} / {config.StopOnFailure}"); if (failureCount >= config.StopOnFailure) { - OverviewLog($"Configured to stop after {config.StopOnFailure} failures. Downloading cluster logs..."); - DownloadClusterLogs(); - OverviewLog("Log download finished. Cancelling test runner..."); + OverviewLog($"Configured to stop after {config.StopOnFailure} failures."); Cancellation.Cts.Cancel(); } } @@ -100,10 +115,8 @@ namespace ContinuousTests var earliestMoment = handle.GetEarliestMoment(); var t = earliestMoment; - while (true) + while (!cancelToken.IsCancellationRequested) { - cancelToken.ThrowIfCancellationRequested(); - RunMoment(t); if (handle.Test.TestFailMode == TestFailMode.StopAfterFirstFailure && exceptions.Any()) @@ -141,19 +154,6 @@ namespace ContinuousTests throw new Exception(exceptionsMessage); } - private void DownloadClusterLogs() - { - var entryPointFactory = new EntryPointFactory(); - var log = new NullLog(); - log.FullFilename = Path.Combine(config.LogPath, "NODE"); - var entryPoint = entryPointFactory.CreateEntryPoint(config.KubeConfigFile, config.DataPath, config.CodexDeployment.Metadata.KubeNamespace, log); - - foreach (var container in config.CodexDeployment.CodexContainers) - { - entryPoint.CreateInterface().DownloadLog(container); - } - } - private string GetCombinedExceptionsMessage(Exception[] exceptions) { return string.Join(Environment.NewLine, exceptions.Select(ex => ex.ToString()));