From cec6d787de2591fe9af963899455bd4a2962ecdf Mon Sep 17 00:00:00 2001 From: benbierens Date: Sun, 1 Oct 2023 08:56:21 +0200 Subject: [PATCH 01/13] Adds methods for streaming container log --- Framework/Core/CoreInterface.cs | 6 ++++++ Framework/KubernetesWorkflow/K8sController.cs | 6 ++++++ Framework/KubernetesWorkflow/StartupWorkflow.cs | 9 +++++++++ 3 files changed, 21 insertions(+) diff --git a/Framework/Core/CoreInterface.cs b/Framework/Core/CoreInterface.cs index 21b9491..6e8e6ef 100644 --- a/Framework/Core/CoreInterface.cs +++ b/Framework/Core/CoreInterface.cs @@ -36,6 +36,12 @@ namespace Core return logHandler.DownloadLog(); } + public Stream MonitorLog(RunningContainer container) + { + var workflow = entryPoint.Tools.CreateWorkflow(); + return workflow.MonitorContainerLog(container); + } + public string ExecuteContainerCommand(IHasContainer containerSource, string command, params string[] args) { return ExecuteContainerCommand(containerSource.Container, command, args); diff --git a/Framework/KubernetesWorkflow/K8sController.cs b/Framework/KubernetesWorkflow/K8sController.cs index 3f5504f..5168f27 100644 --- a/Framework/KubernetesWorkflow/K8sController.cs +++ b/Framework/KubernetesWorkflow/K8sController.cs @@ -57,6 +57,12 @@ namespace KubernetesWorkflow logHandler.Log(stream); } + public Stream MonitorContainerLog(RunningContainer container) + { + log.Debug(); + return client.Run(c => c.ReadNamespacedPodLog(container.Pod.PodInfo.Name, K8sNamespace, container.Recipe.Name, follow: true)); + } + public string ExecuteCommand(RunningPod pod, string containerName, string command, params string[] args) { var cmdAndArgs = $"{containerName}: {command} ({string.Join(",", args)})"; diff --git a/Framework/KubernetesWorkflow/StartupWorkflow.cs b/Framework/KubernetesWorkflow/StartupWorkflow.cs index bda8a9b..d5e40d9 100644 --- a/Framework/KubernetesWorkflow/StartupWorkflow.cs +++ b/Framework/KubernetesWorkflow/StartupWorkflow.cs @@ -11,6 +11,7 @@ namespace KubernetesWorkflow CrashWatcher CreateCrashWatcher(RunningContainer container); void Stop(RunningContainers runningContainers); void DownloadContainerLog(RunningContainer container, ILogHandler logHandler, int? tailLines = null); + Stream MonitorContainerLog(RunningContainer container); string ExecuteCommand(RunningContainer container, string command, params string[] args); void DeleteNamespace(); void DeleteNamespacesStartingWith(string namespacePrefix); @@ -83,6 +84,14 @@ namespace KubernetesWorkflow }); } + public Stream MonitorContainerLog(RunningContainer container) + { + return K8s(controller => + { + return controller.MonitorContainerLog(container); + }); + } + public string ExecuteCommand(RunningContainer container, string command, params string[] args) { return K8s(controller => From b23c66c86ec9f531f2c8e291f195fb03229f2861 Mon Sep 17 00:00:00 2001 From: benbierens Date: Sun, 1 Oct 2023 09:57:32 +0200 Subject: [PATCH 02/13] Working timerange limited log download --- Framework/Core/CoreInterface.cs | 5 ++ Framework/KubernetesWorkflow/K8sController.cs | 2 +- .../ContainerLogStream.cs | 85 +++++++++++++++++++ Tests/CodexContinuousTests/SingleTestRun.cs | 40 ++++----- 4 files changed, 111 insertions(+), 21 deletions(-) create mode 100644 Tests/CodexContinuousTests/ContainerLogStream.cs diff --git a/Framework/Core/CoreInterface.cs b/Framework/Core/CoreInterface.cs index 6e8e6ef..3b137d8 100644 --- a/Framework/Core/CoreInterface.cs +++ b/Framework/Core/CoreInterface.cs @@ -36,6 +36,11 @@ namespace Core return logHandler.DownloadLog(); } + public Stream MonitorLog(IHasContainer container) + { + return MonitorLog(container.Container); + } + public Stream MonitorLog(RunningContainer container) { var workflow = entryPoint.Tools.CreateWorkflow(); diff --git a/Framework/KubernetesWorkflow/K8sController.cs b/Framework/KubernetesWorkflow/K8sController.cs index 5168f27..ee3c5e8 100644 --- a/Framework/KubernetesWorkflow/K8sController.cs +++ b/Framework/KubernetesWorkflow/K8sController.cs @@ -60,7 +60,7 @@ namespace KubernetesWorkflow public Stream MonitorContainerLog(RunningContainer container) { log.Debug(); - return client.Run(c => c.ReadNamespacedPodLog(container.Pod.PodInfo.Name, K8sNamespace, container.Recipe.Name, follow: true)); + return client.Run(c => c.ReadNamespacedPodLog(container.Pod.PodInfo.Name, K8sNamespace, container.Recipe.Name, follow: true, sinceSeconds: 1)); } public string ExecuteCommand(RunningPod pod, string containerName, string command, params string[] args) diff --git a/Tests/CodexContinuousTests/ContainerLogStream.cs b/Tests/CodexContinuousTests/ContainerLogStream.cs new file mode 100644 index 0000000..7b7a340 --- /dev/null +++ b/Tests/CodexContinuousTests/ContainerLogStream.cs @@ -0,0 +1,85 @@ +using Logging; + +namespace ContinuousTests +{ + public class ContainerLogStream + { + private readonly StreamReader reader; + private readonly Stream stream; + private readonly LogFile targetFile; + private readonly CancellationToken token; + private readonly TaskFactory taskFactory; + private int lastNumber = -1; + public bool Fault { get; private set; } + private bool run; + + public ContainerLogStream(Stream stream, string name, LogFile targetFile, CancellationToken token, TaskFactory taskFactory) + { + this.stream = stream; + this.targetFile = targetFile; + this.token = token; + this.taskFactory = taskFactory; + Fault = false; + reader = new StreamReader(stream); + + targetFile.Write(name); + } + + public void Run() + { + run = true; + taskFactory.Run(() => + { + while (run && !token.IsCancellationRequested) + { + Monitor(); + } + }); + } + + public void Stop() + { + run = false; + stream.Close(); + } + + public void DeleteFile() + { + if (run) throw new Exception("Cannot delete file while stream is still running."); + File.Delete(targetFile.FullFilename); + } + + private void Monitor() + { + var line = reader.ReadLine(); + while (run && !string.IsNullOrEmpty(line) && !token.IsCancellationRequested) + { + ProcessLine(line); + line = reader.ReadLine(); + } + } + + private void ProcessLine(string s) + { + targetFile.WriteRaw(s); + + // 000000004298 + var sub = s.Substring(0, 12); + if (!int.TryParse(sub, out int number)) return; + + if (lastNumber == -1) + { + lastNumber = number; + } + else + { + var expectedNumber = lastNumber + 1; + if (number != expectedNumber) + { + Fault = true; + } + lastNumber = number; + } + } + } +} diff --git a/Tests/CodexContinuousTests/SingleTestRun.cs b/Tests/CodexContinuousTests/SingleTestRun.cs index adfa05f..a046bfc 100644 --- a/Tests/CodexContinuousTests/SingleTestRun.cs +++ b/Tests/CodexContinuousTests/SingleTestRun.cs @@ -62,11 +62,28 @@ namespace ContinuousTests private void RunTest(Action resultHandler) { + var ci = entryPoint.CreateInterface(); + var monitors = nodes.Select(n => new ContainerLogStream( + stream: ci.MonitorLog(n), + name: n.GetName(), + targetFile: fixtureLog.CreateSubfile(), + token: cancelToken, + taskFactory: taskFactory)).ToArray(); + try { + foreach (var m in monitors) m.Run(); + RunTestMoments(); - if (!config.KeepPassedTestLogs) fixtureLog.Delete(); + foreach (var m in monitors) m.Stop(); + if (monitors.Any(m => m.Fault)) throw new Exception("Any faulted"); + + if (!config.KeepPassedTestLogs) + { + fixtureLog.Delete(); + foreach (var m in monitors) m.DeleteFile(); + } resultHandler(true); } catch (Exception ex) @@ -81,9 +98,7 @@ namespace ContinuousTests OverviewLog($"Failures: {failureCount} / {config.StopOnFailure}"); if (failureCount >= config.StopOnFailure) { - OverviewLog($"Configured to stop after {config.StopOnFailure} failures. Downloading cluster logs..."); - DownloadClusterLogs(); - OverviewLog("Log download finished. Cancelling test runner..."); + OverviewLog($"Configured to stop after {config.StopOnFailure} failures."); Cancellation.Cts.Cancel(); } } @@ -100,10 +115,8 @@ namespace ContinuousTests var earliestMoment = handle.GetEarliestMoment(); var t = earliestMoment; - while (true) + while (!cancelToken.IsCancellationRequested) { - cancelToken.ThrowIfCancellationRequested(); - RunMoment(t); if (handle.Test.TestFailMode == TestFailMode.StopAfterFirstFailure && exceptions.Any()) @@ -141,19 +154,6 @@ namespace ContinuousTests throw new Exception(exceptionsMessage); } - private void DownloadClusterLogs() - { - var entryPointFactory = new EntryPointFactory(); - var log = new NullLog(); - log.FullFilename = Path.Combine(config.LogPath, "NODE"); - var entryPoint = entryPointFactory.CreateEntryPoint(config.KubeConfigFile, config.DataPath, config.CodexDeployment.Metadata.KubeNamespace, log); - - foreach (var container in config.CodexDeployment.CodexContainers) - { - entryPoint.CreateInterface().DownloadLog(container); - } - } - private string GetCombinedExceptionsMessage(Exception[] exceptions) { return string.Join(Environment.NewLine, exceptions.Select(ex => ex.ToString())); From da855b8d0d502402a2b0962120ce2c09d8134ae4 Mon Sep 17 00:00:00 2001 From: benbierens Date: Sun, 1 Oct 2023 10:52:05 +0200 Subject: [PATCH 03/13] Proper log monitoring shutdown --- Tests/CodexContinuousTests/ContinuousTestRunner.cs | 1 + Tests/CodexContinuousTests/SingleTestRun.cs | 12 +++++++++--- Tests/CodexContinuousTests/TestLoop.cs | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/Tests/CodexContinuousTests/ContinuousTestRunner.cs b/Tests/CodexContinuousTests/ContinuousTestRunner.cs index d3982a8..1132106 100644 --- a/Tests/CodexContinuousTests/ContinuousTestRunner.cs +++ b/Tests/CodexContinuousTests/ContinuousTestRunner.cs @@ -73,6 +73,7 @@ namespace ContinuousTests { var targetDuration = TimeSpan.FromSeconds(config.TargetDurationSeconds); cancelToken.WaitHandle.WaitOne(targetDuration); + Cancellation.Cts.Cancel(); overviewLog.Log($"Congratulations! The targer duration has been reached! ({Time.FormatDuration(targetDuration)})"); statusLog.ConcludeTest("Passed", testDuration, testData); } diff --git a/Tests/CodexContinuousTests/SingleTestRun.cs b/Tests/CodexContinuousTests/SingleTestRun.cs index a046bfc..8cb384f 100644 --- a/Tests/CodexContinuousTests/SingleTestRun.cs +++ b/Tests/CodexContinuousTests/SingleTestRun.cs @@ -73,12 +73,10 @@ namespace ContinuousTests try { foreach (var m in monitors) m.Run(); + fixtureLog.Log("Monitor start"); RunTestMoments(); - foreach (var m in monitors) m.Stop(); - if (monitors.Any(m => m.Fault)) throw new Exception("Any faulted"); - if (!config.KeepPassedTestLogs) { fixtureLog.Delete(); @@ -103,6 +101,13 @@ namespace ContinuousTests } } } + finally + { + Thread.Sleep(1000); + fixtureLog.Log("Monitor stop"); + foreach (var m in monitors) m.Stop(); + if (monitors.Any(m => m.Fault)) throw new Exception("One or more downloaded container log is missing lines!"); + } } private void ApplyLogReplacements(FixtureLog fixtureLog, StartupChecker startupChecker) @@ -143,6 +148,7 @@ namespace ContinuousTests return; } } + fixtureLog.Log("Test run has been cancelled."); } private void ThrowFailTest() diff --git a/Tests/CodexContinuousTests/TestLoop.cs b/Tests/CodexContinuousTests/TestLoop.cs index 002e4ad..c1002b8 100644 --- a/Tests/CodexContinuousTests/TestLoop.cs +++ b/Tests/CodexContinuousTests/TestLoop.cs @@ -39,7 +39,7 @@ namespace ContinuousTests { NumberOfPasses = 0; NumberOfFailures = 0; - while (true) + while (!cancelToken.IsCancellationRequested) { WaitHandle.WaitAny(new[] { runFinishedHandle, cancelToken.WaitHandle }); From 73c49b42c6a0c8e171d7c4b258b56ea2ac7da90b Mon Sep 17 00:00:00 2001 From: benbierens Date: Mon, 2 Oct 2023 09:24:01 +0200 Subject: [PATCH 04/13] Fixes incorrect log message when test run gets cancelled. --- .../ContinuousTestRunner.cs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/Tests/CodexContinuousTests/ContinuousTestRunner.cs b/Tests/CodexContinuousTests/ContinuousTestRunner.cs index 1132106..be7c640 100644 --- a/Tests/CodexContinuousTests/ContinuousTestRunner.cs +++ b/Tests/CodexContinuousTests/ContinuousTestRunner.cs @@ -59,9 +59,9 @@ namespace ContinuousTests overviewLog.Log("Finished launching test-loops."); WaitUntilFinished(overviewLog, statusLog, startTime, testLoops); - overviewLog.Log("Cancelling all test-loops..."); + overviewLog.Log("Stopping all test-loops..."); taskFactory.WaitAll(); - overviewLog.Log("All tasks cancelled."); + overviewLog.Log("All tasks finished."); } private void WaitUntilFinished(LogSplitter overviewLog, StatusLog statusLog, DateTime startTime, TestLoop[] testLoops) @@ -72,16 +72,20 @@ namespace ContinuousTests if (config.TargetDurationSeconds > 0) { var targetDuration = TimeSpan.FromSeconds(config.TargetDurationSeconds); - cancelToken.WaitHandle.WaitOne(targetDuration); - Cancellation.Cts.Cancel(); - overviewLog.Log($"Congratulations! The targer duration has been reached! ({Time.FormatDuration(targetDuration)})"); - statusLog.ConcludeTest("Passed", testDuration, testData); + var wasCancelled = cancelToken.WaitHandle.WaitOne(targetDuration); + if (!wasCancelled) + { + Cancellation.Cts.Cancel(); + overviewLog.Log($"Congratulations! The targer duration has been reached! ({Time.FormatDuration(targetDuration)})"); + statusLog.ConcludeTest("Passed", testDuration, testData); + return; + } } else { cancelToken.WaitHandle.WaitOne(); - statusLog.ConcludeTest("Failed", testDuration, testData); } + statusLog.ConcludeTest("Failed", testDuration, testData); } private Dictionary FormatTestRuns(TestLoop[] testLoops) From 3dcbb78204754e7a1ee45dae674a714bebde654d Mon Sep 17 00:00:00 2001 From: benbierens Date: Mon, 2 Oct 2023 11:18:27 +0200 Subject: [PATCH 05/13] Prevents multiple continuous tests from running interleaved. --- Tests/CodexContinuousTests/TestLoop.cs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/Tests/CodexContinuousTests/TestLoop.cs b/Tests/CodexContinuousTests/TestLoop.cs index c1002b8..6c1157a 100644 --- a/Tests/CodexContinuousTests/TestLoop.cs +++ b/Tests/CodexContinuousTests/TestLoop.cs @@ -13,6 +13,7 @@ namespace ContinuousTests private readonly StartupChecker startupChecker; private readonly CancellationToken cancelToken; private readonly EventWaitHandle runFinishedHandle = new EventWaitHandle(true, EventResetMode.ManualReset); + private static object testLock = new object(); public TestLoop(EntryPointFactory entryPointFactory, TaskFactory taskFactory, Configuration config, ILog overviewLog, Type testType, TimeSpan runsEvery, StartupChecker startupChecker, CancellationToken cancelToken) { @@ -41,13 +42,19 @@ namespace ContinuousTests NumberOfFailures = 0; while (!cancelToken.IsCancellationRequested) { - WaitHandle.WaitAny(new[] { runFinishedHandle, cancelToken.WaitHandle }); + lock (testLock) + // In the original design, multiple tests are allowed to interleave their test-moments, increasing test through-put. + // Since we're still stabilizing some of the basics, this lock limits us to 1 test run at a time. + { + WaitHandle.WaitAny(new[] { runFinishedHandle, cancelToken.WaitHandle }); - cancelToken.ThrowIfCancellationRequested(); + cancelToken.ThrowIfCancellationRequested(); - StartTest(); + StartTest(); - cancelToken.WaitHandle.WaitOne(runsEvery); + cancelToken.WaitHandle.WaitOne(runsEvery); + } + Thread.Sleep(100); } } catch (OperationCanceledException) From c3ec64f25e7691f1cca0efd515ccd890ddb00d5d Mon Sep 17 00:00:00 2001 From: benbierens Date: Mon, 2 Oct 2023 12:01:49 +0200 Subject: [PATCH 06/13] very very WIP for how to download complete log from elastic search --- Framework/Core/CoreInterface.cs | 12 ++ Framework/Core/Http.cs | 2 +- Tests/CodexTests/BasicTests/ExampleTests.cs | 128 ++++++++++++++++++++ 3 files changed, 141 insertions(+), 1 deletion(-) diff --git a/Framework/Core/CoreInterface.cs b/Framework/Core/CoreInterface.cs index 3b137d8..217641d 100644 --- a/Framework/Core/CoreInterface.cs +++ b/Framework/Core/CoreInterface.cs @@ -1,4 +1,5 @@ using KubernetesWorkflow; +using Utils; namespace Core { @@ -57,6 +58,17 @@ namespace Core var workflow = entryPoint.Tools.CreateWorkflow(); return workflow.ExecuteCommand(container, command, args); } + + public IHttp CreateHttp() + { + var address = new Address("http://localhost", 9200); + var baseUrl = ""; + return entryPoint.Tools.CreateHttp(address, baseUrl, client => + { + // -H "kbn-xsrf: reporting" -H "Content-Type: application/json" + client.DefaultRequestHeaders.Add("kbn-xsrf", "reporting"); + }); + } } public interface IHasContainer diff --git a/Framework/Core/Http.cs b/Framework/Core/Http.cs index 5bbd132..aef61a9 100644 --- a/Framework/Core/Http.cs +++ b/Framework/Core/Http.cs @@ -79,7 +79,7 @@ namespace Core public string HttpPostJson(string route, TRequest body) { - var response = PostJson(route, body); + var response = PostJson(route, body); return Time.Wait(response.Content.ReadAsStringAsync()); } diff --git a/Tests/CodexTests/BasicTests/ExampleTests.cs b/Tests/CodexTests/BasicTests/ExampleTests.cs index 4049b9a..cac9935 100644 --- a/Tests/CodexTests/BasicTests/ExampleTests.cs +++ b/Tests/CodexTests/BasicTests/ExampleTests.cs @@ -2,6 +2,9 @@ using DistTestCore; using GethPlugin; using MetricsPlugin; +using Microsoft.IdentityModel.Abstractions; +using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities; +using Newtonsoft.Json; using NUnit.Framework; using Utils; @@ -10,6 +13,131 @@ namespace Tests.BasicTests [TestFixture] public class ExampleTests : CodexDistTest { + [Test] + public void AAA() + { + var http = this.Ci.CreateHttp(); + + //var query = "{\r\n \"query\": {\r\n \"bool\": {\r\n \"filter\": [\r\n {\r\n \"term\": {\r\n \"container_image.keyword\": \"docker.io/codexstorage/nim-codex:sha-9d735f9-dist-tests\"\r\n }\r\n },\r\n {\r\n \"term\": {\r\n \"pod_namespace.keyword\": \"codex-continuous-tests\"\r\n }\r\n },\r\n {\r\n \"term\": {\r\n \"pod_name.keyword\": \"codex3-workflow3-ff476767d-98zx4\"\r\n }\r\n },\r\n {\r\n \"range\": {\r\n \"@timestamp\": {\r\n \"lte\": \"2023-09-25T13:02:23.559Z\",\r\n \"gt\": \"2023-09-25T20:00:00.000Z\"\r\n }\r\n }\r\n }\r\n ]\r\n }\r\n }\r\n}"; + //var query = "{ \"query\": {\"bool\": { \"filter\": [{ \"term\": {\"container_image.keyword\": \"docker.io/codexstorage/nim-codex:sha-9d735f9-dist-tests\" }},{ \"term\": {\"pod_namespace.keyword\": \"codex-continuous-tests\" }},{ \"term\": {\"pod_name.keyword\": \"codex3-workflow3-ff476767d-98zx4\" }},{ \"range\": {\"@timestamp\": { \"lte\": \"2023-09-29T13:02:23.559Z\", \"gt\": \"2023-09-20T20:00:00.000Z\"} }} ]} }}"; + + + + //var queryTemplate = "{ \"query\": {\"bool\": { \"filter\": [{ \"term\": {\"pod_namespace.keyword\": \"codex-continuous-tests\" }},{ \"term\": {\"pod_name.keyword\": \"bootstrap-2-599dfb4d65-v4v2b\" }},{ \"range\": {\"@timestamp\": { \"lte\": \"2023-10-03T13:02:23.559Z\", \"gt\": \"2023-09-01T20:00:00.000Z\"} }} ]} },\t\"_source\": \"message\",\t\"from\": ,\t\"size\": }"; + var queryTemplate = "{ \"sort\": [ { \"@timestamp\": { \"order\": \"asc\" } } ], \"fields\": [ { \"field\": \"@timestamp\", \"format\": \"strict_date_optional_time\" }, { \"field\": \"pod_name\" }, { \"field\": \"message\" } ], \"size\": , \"_source\": false, \"query\": { \"bool\": { \"must\": [], \"filter\": [ { \"range\": { \"@timestamp\": { \"format\": \"strict_date_optional_time\", \"gte\": \"2023-10-01T00:00:00.000Z\", \"lte\": \"2023-10-03T09:00:00.000Z\" } } }, { \"match_phrase\": { \"pod_name\": \"bootstrap-2-599dfb4d65-v4v2b\" } } ] } } }"; + + var outputFile = "c:\\Users\\Ben\\Desktop\\Cluster\\reconstructed.log"; + + var sizeOfPage = 2000; + var searchAfter = ""; + var lastHits = 1; + var totalHits = 0; + var lastLogLine = -1; + + var queue = new List(); + //var jumpback = 0; + + while (lastHits > 0) + { + //if (queue.Any()) jumpback++; + //else jumpback = 0; + + var query = queryTemplate + .Replace("", sizeOfPage.ToString()) + .Replace("", searchAfter); + + var output = http.HttpPostString("_search", query); + + var response = JsonConvert.DeserializeObject(output)!; + + lastHits = response.hits.hits.Length; + totalHits += response.hits.hits.Length; + if (lastHits > 0) + { + var uniqueSearchNumbers = response.hits.hits.Select(h => h.sort.Single()).Distinct().ToList(); + uniqueSearchNumbers.Reverse(); + var searchNumber = uniqueSearchNumbers.Skip(1).First().ToString(); + searchAfter = $"\"search_after\": [{searchNumber}],"; + + foreach (var hit in response.hits.hits) + { + var message = hit.fields.message.Single(); + var sub = message.Substring(0, 12); + if (int.TryParse(sub, out int number)) + { + queue.Add(new QueryLogEntry(message, number)); + } + } + } + + // unload queue + var runQueue = 1; + while (runQueue > 0) + { + var wantedNumber = lastLogLine + 1; + var oldOnes = queue.Where(e => e.Number < wantedNumber).ToList(); + foreach (var old in oldOnes) queue.Remove(old); + + var entry = queue.FirstOrDefault(e => e.Number == wantedNumber); + if (entry != null) + { + File.AppendAllLines(outputFile, new[] { entry.Message }); + queue.Remove(entry); + lastLogLine = entry.Number; + if (!queue.Any()) runQueue = 0; + } + else + { + runQueue = 0; + } + } + } + + + //var ouput = http.HttpGetString(""); + + var aaa = 0; + } + + public class QueryLogEntry + { + public QueryLogEntry(string message, int number) + { + Message = message; + Number = number; + } + + public string Message { get; } + public int Number { get; } + + public override string ToString() + { + return Number.ToString(); + } + } + + public class SearchResponse + { + public SearchHits hits { get; set; } + } + + public class SearchHits + { + public SearchHitEntry[] hits { get; set; } + } + + public class SearchHitEntry + { + public SearchHitFields fields { get; set; } + public long[] sort { get; set; } + } + + public class SearchHitFields + { + public string[] @timestamp { get; set; } + public string[] message { get; set; } + } + [Test] public void CodexLogExample() { From 6a96bd7639f1a9a92dafd7240e362edff833311d Mon Sep 17 00:00:00 2001 From: benbierens Date: Mon, 2 Oct 2023 14:42:36 +0200 Subject: [PATCH 07/13] Implements elastic search log downloader. Requires log-counter codex update! --- Framework/Core/CoreInterface.cs | 11 - Framework/Core/Http.cs | 10 + .../ContainerLogStream.cs | 85 ------- .../ElasticSearchLogDownloader.cs | 210 ++++++++++++++++++ Tests/CodexContinuousTests/SingleTestRun.cs | 39 ++-- Tests/CodexTests/BasicTests/ExampleTests.cs | 128 ----------- 6 files changed, 242 insertions(+), 241 deletions(-) delete mode 100644 Tests/CodexContinuousTests/ContainerLogStream.cs create mode 100644 Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs diff --git a/Framework/Core/CoreInterface.cs b/Framework/Core/CoreInterface.cs index 217641d..28788aa 100644 --- a/Framework/Core/CoreInterface.cs +++ b/Framework/Core/CoreInterface.cs @@ -58,17 +58,6 @@ namespace Core var workflow = entryPoint.Tools.CreateWorkflow(); return workflow.ExecuteCommand(container, command, args); } - - public IHttp CreateHttp() - { - var address = new Address("http://localhost", 9200); - var baseUrl = ""; - return entryPoint.Tools.CreateHttp(address, baseUrl, client => - { - // -H "kbn-xsrf: reporting" -H "Content-Type: application/json" - client.DefaultRequestHeaders.Add("kbn-xsrf", "reporting"); - }); - } } public interface IHasContainer diff --git a/Framework/Core/Http.cs b/Framework/Core/Http.cs index aef61a9..60e52db 100644 --- a/Framework/Core/Http.cs +++ b/Framework/Core/Http.cs @@ -14,6 +14,7 @@ namespace Core TResponse HttpPostJson(string route, TRequest body); string HttpPostJson(string route, TRequest body); string HttpPostString(string route, string body); + TResponse HttpPostString(string route, string body); string HttpPostStream(string route, Stream stream); Stream HttpGetStream(string route); T TryJsonDeserialize(string json); @@ -99,6 +100,15 @@ namespace Core }, $"HTTP-POST-STRING: {route}"); } + public TResponse HttpPostString(string route, string body) + { + var response = HttpPostString(route, body); + if (response == null) throw new Exception("Received no response."); + var result = JsonConvert.DeserializeObject(response); + if (result == null) throw new Exception("Failed to deserialize response"); + return result; + } + public string HttpPostStream(string route, Stream stream) { return Retry(() => diff --git a/Tests/CodexContinuousTests/ContainerLogStream.cs b/Tests/CodexContinuousTests/ContainerLogStream.cs deleted file mode 100644 index 7b7a340..0000000 --- a/Tests/CodexContinuousTests/ContainerLogStream.cs +++ /dev/null @@ -1,85 +0,0 @@ -using Logging; - -namespace ContinuousTests -{ - public class ContainerLogStream - { - private readonly StreamReader reader; - private readonly Stream stream; - private readonly LogFile targetFile; - private readonly CancellationToken token; - private readonly TaskFactory taskFactory; - private int lastNumber = -1; - public bool Fault { get; private set; } - private bool run; - - public ContainerLogStream(Stream stream, string name, LogFile targetFile, CancellationToken token, TaskFactory taskFactory) - { - this.stream = stream; - this.targetFile = targetFile; - this.token = token; - this.taskFactory = taskFactory; - Fault = false; - reader = new StreamReader(stream); - - targetFile.Write(name); - } - - public void Run() - { - run = true; - taskFactory.Run(() => - { - while (run && !token.IsCancellationRequested) - { - Monitor(); - } - }); - } - - public void Stop() - { - run = false; - stream.Close(); - } - - public void DeleteFile() - { - if (run) throw new Exception("Cannot delete file while stream is still running."); - File.Delete(targetFile.FullFilename); - } - - private void Monitor() - { - var line = reader.ReadLine(); - while (run && !string.IsNullOrEmpty(line) && !token.IsCancellationRequested) - { - ProcessLine(line); - line = reader.ReadLine(); - } - } - - private void ProcessLine(string s) - { - targetFile.WriteRaw(s); - - // 000000004298 - var sub = s.Substring(0, 12); - if (!int.TryParse(sub, out int number)) return; - - if (lastNumber == -1) - { - lastNumber = number; - } - else - { - var expectedNumber = lastNumber + 1; - if (number != expectedNumber) - { - Fault = true; - } - lastNumber = number; - } - } - } -} diff --git a/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs b/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs new file mode 100644 index 0000000..84f4b38 --- /dev/null +++ b/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs @@ -0,0 +1,210 @@ +using Core; +using KubernetesWorkflow; +using Logging; +using Utils; + +namespace ContinuousTests +{ + public class ElasticSearchLogDownloader + { + private readonly IPluginTools tools; + private readonly ILog log; + + public ElasticSearchLogDownloader(IPluginTools tools, ILog log) + { + this.tools = tools; + this.log = log; + } + + public void Download(LogFile targetFile, RunningContainer container, DateTime startUtc, DateTime endUtc) + { + try + { + DownloadLog(targetFile, container, startUtc, endUtc); + } + catch (Exception ex) + { + log.Error("Failed to download log: " + ex); + } + } + + private void DownloadLog(LogFile targetFile, RunningContainer container, DateTime startUtc, DateTime endUtc) + { + log.Log($"Downloading log (from ElasticSearch) for container '{container.Name}' within time range: " + + $"{startUtc.ToString("o")} - {endUtc.ToString("o")}"); + + var http = CreateElasticSearchHttp(); + var queryTemplate = CreateQueryTemplate(container, startUtc, endUtc); + + targetFile.Write($"Downloading '{container.Name}' to '{targetFile.FullFilename}'."); + var reconstructor = new LogReconstructor(targetFile, http, queryTemplate); + reconstructor.DownloadFullLog(); + + log.Log("Log download finished."); + } + + private string CreateQueryTemplate(RunningContainer container, DateTime startUtc, DateTime endUtc) + { + var podName = container.Pod.PodInfo.Name; + var start = startUtc.ToString("o"); + var end = endUtc.ToString("o"); + + var source = "{ \"sort\": [ { \"@timestamp\": { \"order\": \"asc\" } } ], \"fields\": [ { \"field\": \"@timestamp\", \"format\": \"strict_date_optional_time\" }, { \"field\": \"pod_name\" }, { \"field\": \"message\" } ], \"size\": , \"_source\": false, \"query\": { \"bool\": { \"must\": [], \"filter\": [ { \"range\": { \"@timestamp\": { \"format\": \"strict_date_optional_time\", \"gte\": \"\", \"lte\": \"\" } } }, { \"match_phrase\": { \"pod_name\": \"\" } } ] } } }"; + return source + .Replace("", start) + .Replace("", end) + .Replace("", podName); + } + + private IHttp CreateElasticSearchHttp() + { + var address = new Address("http://localhost", 9200); + var baseUrl = ""; + return tools.CreateHttp(address, baseUrl, client => + { + client.DefaultRequestHeaders.Add("kbn-xsrf", "reporting"); + }); + } + + public class LogReconstructor + { + private readonly List queue = new List(); + private readonly LogFile targetFile; + private readonly IHttp http; + private readonly string queryTemplate; + private const int sizeOfPage = 2000; + private string searchAfter = ""; + private int lastHits = 1; + private int lastLogLine = -1; + + public LogReconstructor(LogFile targetFile, IHttp http, string queryTemplate) + { + this.targetFile = targetFile; + this.http = http; + this.queryTemplate = queryTemplate; + } + + public void DownloadFullLog() + { + while (lastHits > 0) + { + QueryElasticSearch(); + ProcessQueue(); + } + } + + private void QueryElasticSearch() + { + var query = queryTemplate + .Replace("", sizeOfPage.ToString()) + .Replace("", searchAfter); + + var response = http.HttpPostString("_search", query); + + lastHits = response.hits.hits.Length; + if (lastHits > 0) + { + UpdateSearchAfter(response); + foreach (var hit in response.hits.hits) + { + AddHitToQueue(hit); + } + } + } + + private void AddHitToQueue(SearchHitEntry hit) + { + var message = hit.fields.message.Single(); + var sub = message.Substring(0, 12); + if (int.TryParse(sub, out int number)) + { + queue.Add(new LogQueueEntry(message, number)); + } + } + + private void UpdateSearchAfter(SearchResponse response) + { + var uniqueSearchNumbers = response.hits.hits.Select(h => h.sort.Single()).Distinct().ToList(); + uniqueSearchNumbers.Reverse(); + + var searchNumber = GetSearchNumber(uniqueSearchNumbers); + searchAfter = $"\"search_after\": [{searchNumber}],"; + } + + private long GetSearchNumber(List uniqueSearchNumbers) + { + if (uniqueSearchNumbers.Count == 1) return uniqueSearchNumbers.First(); + return uniqueSearchNumbers.Skip(1).First(); + } + + private void ProcessQueue() + { + while (queue.Any()) + { + var wantedNumber = lastLogLine + 1; + DeleteOldEntries(wantedNumber); + + var currentEntry = queue.FirstOrDefault(e => e.Number == wantedNumber); + + if (currentEntry != null) + { + WriteEntryToFile(currentEntry); + queue.Remove(currentEntry); + lastLogLine = currentEntry.Number; + } + else + { + // The line number we want is not in the queue. + // It will be returned by the elastic search query, some time in the future. + // Stop processing the queue for now. + return; + } + } + } + + private void WriteEntryToFile(LogQueueEntry currentEntry) + { + targetFile.WriteRaw(currentEntry.Message); + } + + private void DeleteOldEntries(int wantedNumber) + { + queue.RemoveAll(e => e.Number < wantedNumber); + } + + public class LogQueueEntry + { + public LogQueueEntry(string message, int number) + { + Message = message; + Number = number; + } + + public string Message { get; } + public int Number { get; } + } + + public class SearchResponse + { + public SearchHits hits { get; set; } + } + + public class SearchHits + { + public SearchHitEntry[] hits { get; set; } + } + + public class SearchHitEntry + { + public SearchHitFields fields { get; set; } + public long[] sort { get; set; } + } + + public class SearchHitFields + { + public string[] @timestamp { get; set; } + public string[] message { get; set; } + } + } + } +} diff --git a/Tests/CodexContinuousTests/SingleTestRun.cs b/Tests/CodexContinuousTests/SingleTestRun.cs index 8cb384f..e3043e8 100644 --- a/Tests/CodexContinuousTests/SingleTestRun.cs +++ b/Tests/CodexContinuousTests/SingleTestRun.cs @@ -6,6 +6,7 @@ using System.Reflection; using CodexPlugin; using DistTestCore.Logs; using Core; +using System.ComponentModel; namespace ContinuousTests { @@ -63,24 +64,18 @@ namespace ContinuousTests private void RunTest(Action resultHandler) { var ci = entryPoint.CreateInterface(); - var monitors = nodes.Select(n => new ContainerLogStream( - stream: ci.MonitorLog(n), - name: n.GetName(), - targetFile: fixtureLog.CreateSubfile(), - token: cancelToken, - taskFactory: taskFactory)).ToArray(); - + var testStart = DateTime.UtcNow; + try { - foreach (var m in monitors) m.Run(); - fixtureLog.Log("Monitor start"); - RunTestMoments(); + var duration = DateTime.UtcNow - testStart; + OverviewLog($" > Test passed. ({Time.FormatDuration(duration)})"); + if (!config.KeepPassedTestLogs) { fixtureLog.Delete(); - foreach (var m in monitors) m.DeleteFile(); } resultHandler(true); } @@ -89,6 +84,8 @@ namespace ContinuousTests fixtureLog.Error("Test run failed with exception: " + ex); fixtureLog.MarkAsFailed(); + DownloadContainerLogs(testStart); + failureCount++; resultHandler(false); if (config.StopOnFailure > 0) @@ -101,12 +98,21 @@ namespace ContinuousTests } } } - finally + } + + private void DownloadContainerLogs(DateTime testStart) + { + // The test failed just now. We can't expect the logs to be available in elastic-search immediately: + Thread.Sleep(TimeSpan.FromMinutes(1)); + + var effectiveStart = testStart.Subtract(TimeSpan.FromSeconds(10)); + var effectiveEnd = DateTime.UtcNow.AddSeconds(30); + var elasticSearchLogDownloader = new ElasticSearchLogDownloader(entryPoint.Tools, fixtureLog); + + foreach (var node in nodes) { - Thread.Sleep(1000); - fixtureLog.Log("Monitor stop"); - foreach (var m in monitors) m.Stop(); - if (monitors.Any(m => m.Fault)) throw new Exception("One or more downloaded container log is missing lines!"); + var container = node.Container; + elasticSearchLogDownloader.Download(fixtureLog.CreateSubfile(), container, effectiveStart, effectiveEnd); } } @@ -144,7 +150,6 @@ namespace ContinuousTests { ThrowFailTest(); } - OverviewLog(" > Test passed."); return; } } diff --git a/Tests/CodexTests/BasicTests/ExampleTests.cs b/Tests/CodexTests/BasicTests/ExampleTests.cs index cac9935..4049b9a 100644 --- a/Tests/CodexTests/BasicTests/ExampleTests.cs +++ b/Tests/CodexTests/BasicTests/ExampleTests.cs @@ -2,9 +2,6 @@ using DistTestCore; using GethPlugin; using MetricsPlugin; -using Microsoft.IdentityModel.Abstractions; -using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities; -using Newtonsoft.Json; using NUnit.Framework; using Utils; @@ -13,131 +10,6 @@ namespace Tests.BasicTests [TestFixture] public class ExampleTests : CodexDistTest { - [Test] - public void AAA() - { - var http = this.Ci.CreateHttp(); - - //var query = "{\r\n \"query\": {\r\n \"bool\": {\r\n \"filter\": [\r\n {\r\n \"term\": {\r\n \"container_image.keyword\": \"docker.io/codexstorage/nim-codex:sha-9d735f9-dist-tests\"\r\n }\r\n },\r\n {\r\n \"term\": {\r\n \"pod_namespace.keyword\": \"codex-continuous-tests\"\r\n }\r\n },\r\n {\r\n \"term\": {\r\n \"pod_name.keyword\": \"codex3-workflow3-ff476767d-98zx4\"\r\n }\r\n },\r\n {\r\n \"range\": {\r\n \"@timestamp\": {\r\n \"lte\": \"2023-09-25T13:02:23.559Z\",\r\n \"gt\": \"2023-09-25T20:00:00.000Z\"\r\n }\r\n }\r\n }\r\n ]\r\n }\r\n }\r\n}"; - //var query = "{ \"query\": {\"bool\": { \"filter\": [{ \"term\": {\"container_image.keyword\": \"docker.io/codexstorage/nim-codex:sha-9d735f9-dist-tests\" }},{ \"term\": {\"pod_namespace.keyword\": \"codex-continuous-tests\" }},{ \"term\": {\"pod_name.keyword\": \"codex3-workflow3-ff476767d-98zx4\" }},{ \"range\": {\"@timestamp\": { \"lte\": \"2023-09-29T13:02:23.559Z\", \"gt\": \"2023-09-20T20:00:00.000Z\"} }} ]} }}"; - - - - //var queryTemplate = "{ \"query\": {\"bool\": { \"filter\": [{ \"term\": {\"pod_namespace.keyword\": \"codex-continuous-tests\" }},{ \"term\": {\"pod_name.keyword\": \"bootstrap-2-599dfb4d65-v4v2b\" }},{ \"range\": {\"@timestamp\": { \"lte\": \"2023-10-03T13:02:23.559Z\", \"gt\": \"2023-09-01T20:00:00.000Z\"} }} ]} },\t\"_source\": \"message\",\t\"from\": ,\t\"size\": }"; - var queryTemplate = "{ \"sort\": [ { \"@timestamp\": { \"order\": \"asc\" } } ], \"fields\": [ { \"field\": \"@timestamp\", \"format\": \"strict_date_optional_time\" }, { \"field\": \"pod_name\" }, { \"field\": \"message\" } ], \"size\": , \"_source\": false, \"query\": { \"bool\": { \"must\": [], \"filter\": [ { \"range\": { \"@timestamp\": { \"format\": \"strict_date_optional_time\", \"gte\": \"2023-10-01T00:00:00.000Z\", \"lte\": \"2023-10-03T09:00:00.000Z\" } } }, { \"match_phrase\": { \"pod_name\": \"bootstrap-2-599dfb4d65-v4v2b\" } } ] } } }"; - - var outputFile = "c:\\Users\\Ben\\Desktop\\Cluster\\reconstructed.log"; - - var sizeOfPage = 2000; - var searchAfter = ""; - var lastHits = 1; - var totalHits = 0; - var lastLogLine = -1; - - var queue = new List(); - //var jumpback = 0; - - while (lastHits > 0) - { - //if (queue.Any()) jumpback++; - //else jumpback = 0; - - var query = queryTemplate - .Replace("", sizeOfPage.ToString()) - .Replace("", searchAfter); - - var output = http.HttpPostString("_search", query); - - var response = JsonConvert.DeserializeObject(output)!; - - lastHits = response.hits.hits.Length; - totalHits += response.hits.hits.Length; - if (lastHits > 0) - { - var uniqueSearchNumbers = response.hits.hits.Select(h => h.sort.Single()).Distinct().ToList(); - uniqueSearchNumbers.Reverse(); - var searchNumber = uniqueSearchNumbers.Skip(1).First().ToString(); - searchAfter = $"\"search_after\": [{searchNumber}],"; - - foreach (var hit in response.hits.hits) - { - var message = hit.fields.message.Single(); - var sub = message.Substring(0, 12); - if (int.TryParse(sub, out int number)) - { - queue.Add(new QueryLogEntry(message, number)); - } - } - } - - // unload queue - var runQueue = 1; - while (runQueue > 0) - { - var wantedNumber = lastLogLine + 1; - var oldOnes = queue.Where(e => e.Number < wantedNumber).ToList(); - foreach (var old in oldOnes) queue.Remove(old); - - var entry = queue.FirstOrDefault(e => e.Number == wantedNumber); - if (entry != null) - { - File.AppendAllLines(outputFile, new[] { entry.Message }); - queue.Remove(entry); - lastLogLine = entry.Number; - if (!queue.Any()) runQueue = 0; - } - else - { - runQueue = 0; - } - } - } - - - //var ouput = http.HttpGetString(""); - - var aaa = 0; - } - - public class QueryLogEntry - { - public QueryLogEntry(string message, int number) - { - Message = message; - Number = number; - } - - public string Message { get; } - public int Number { get; } - - public override string ToString() - { - return Number.ToString(); - } - } - - public class SearchResponse - { - public SearchHits hits { get; set; } - } - - public class SearchHits - { - public SearchHitEntry[] hits { get; set; } - } - - public class SearchHitEntry - { - public SearchHitFields fields { get; set; } - public long[] sort { get; set; } - } - - public class SearchHitFields - { - public string[] @timestamp { get; set; } - public string[] message { get; set; } - } - [Test] public void CodexLogExample() { From 1bca2bb92804f28abd9e5ca8e0d8ab103343057a Mon Sep 17 00:00:00 2001 From: benbierens Date: Tue, 3 Oct 2023 13:34:39 +0200 Subject: [PATCH 08/13] Updates parser for new counter format. --- Framework/Core/CoreInterface.cs | 11 ------- Framework/KubernetesWorkflow/K8sController.cs | 6 ---- .../KubernetesWorkflow/StartupWorkflow.cs | 9 ------ .../ElasticSearchLogDownloader.cs | 31 ++++++++++++++----- 4 files changed, 23 insertions(+), 34 deletions(-) diff --git a/Framework/Core/CoreInterface.cs b/Framework/Core/CoreInterface.cs index 28788aa..0358f93 100644 --- a/Framework/Core/CoreInterface.cs +++ b/Framework/Core/CoreInterface.cs @@ -37,17 +37,6 @@ namespace Core return logHandler.DownloadLog(); } - public Stream MonitorLog(IHasContainer container) - { - return MonitorLog(container.Container); - } - - public Stream MonitorLog(RunningContainer container) - { - var workflow = entryPoint.Tools.CreateWorkflow(); - return workflow.MonitorContainerLog(container); - } - public string ExecuteContainerCommand(IHasContainer containerSource, string command, params string[] args) { return ExecuteContainerCommand(containerSource.Container, command, args); diff --git a/Framework/KubernetesWorkflow/K8sController.cs b/Framework/KubernetesWorkflow/K8sController.cs index ee3c5e8..3f5504f 100644 --- a/Framework/KubernetesWorkflow/K8sController.cs +++ b/Framework/KubernetesWorkflow/K8sController.cs @@ -57,12 +57,6 @@ namespace KubernetesWorkflow logHandler.Log(stream); } - public Stream MonitorContainerLog(RunningContainer container) - { - log.Debug(); - return client.Run(c => c.ReadNamespacedPodLog(container.Pod.PodInfo.Name, K8sNamespace, container.Recipe.Name, follow: true, sinceSeconds: 1)); - } - public string ExecuteCommand(RunningPod pod, string containerName, string command, params string[] args) { var cmdAndArgs = $"{containerName}: {command} ({string.Join(",", args)})"; diff --git a/Framework/KubernetesWorkflow/StartupWorkflow.cs b/Framework/KubernetesWorkflow/StartupWorkflow.cs index d5e40d9..bda8a9b 100644 --- a/Framework/KubernetesWorkflow/StartupWorkflow.cs +++ b/Framework/KubernetesWorkflow/StartupWorkflow.cs @@ -11,7 +11,6 @@ namespace KubernetesWorkflow CrashWatcher CreateCrashWatcher(RunningContainer container); void Stop(RunningContainers runningContainers); void DownloadContainerLog(RunningContainer container, ILogHandler logHandler, int? tailLines = null); - Stream MonitorContainerLog(RunningContainer container); string ExecuteCommand(RunningContainer container, string command, params string[] args); void DeleteNamespace(); void DeleteNamespacesStartingWith(string namespacePrefix); @@ -84,14 +83,6 @@ namespace KubernetesWorkflow }); } - public Stream MonitorContainerLog(RunningContainer container) - { - return K8s(controller => - { - return controller.MonitorContainerLog(container); - }); - } - public string ExecuteCommand(RunningContainer container, string command, params string[] args) { return K8s(controller => diff --git a/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs b/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs index 84f4b38..f66381d 100644 --- a/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs +++ b/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs @@ -75,7 +75,7 @@ namespace ContinuousTests private const int sizeOfPage = 2000; private string searchAfter = ""; private int lastHits = 1; - private int lastLogLine = -1; + private ulong lastLogLine = 0; public LogReconstructor(LogFile targetFile, IHttp http, string queryTemplate) { @@ -115,13 +115,28 @@ namespace ContinuousTests private void AddHitToQueue(SearchHitEntry hit) { var message = hit.fields.message.Single(); - var sub = message.Substring(0, 12); - if (int.TryParse(sub, out int number)) + var number = ParseCountNumber(message); + if (number != null) { - queue.Add(new LogQueueEntry(message, number)); + queue.Add(new LogQueueEntry(message, number.Value)); } } + private ulong? ParseCountNumber(string message) + { + if (string.IsNullOrEmpty(message)) return null; + var tokens = message.Split(' ', StringSplitOptions.RemoveEmptyEntries); + if (!tokens.Any()) return null; + var countToken = tokens.SingleOrDefault(t => t.StartsWith("count=")); + if (countToken == null) return null; + var number = countToken.Substring(6); + if (ulong.TryParse(number, out ulong value)) + { + return value; + } + return null; + } + private void UpdateSearchAfter(SearchResponse response) { var uniqueSearchNumbers = response.hits.hits.Select(h => h.sort.Single()).Distinct().ToList(); @@ -141,7 +156,7 @@ namespace ContinuousTests { while (queue.Any()) { - var wantedNumber = lastLogLine + 1; + ulong wantedNumber = lastLogLine + 1; DeleteOldEntries(wantedNumber); var currentEntry = queue.FirstOrDefault(e => e.Number == wantedNumber); @@ -167,21 +182,21 @@ namespace ContinuousTests targetFile.WriteRaw(currentEntry.Message); } - private void DeleteOldEntries(int wantedNumber) + private void DeleteOldEntries(ulong wantedNumber) { queue.RemoveAll(e => e.Number < wantedNumber); } public class LogQueueEntry { - public LogQueueEntry(string message, int number) + public LogQueueEntry(string message, ulong number) { Message = message; Number = number; } public string Message { get; } - public int Number { get; } + public ulong Number { get; } } public class SearchResponse From adbd481c7cafba9c98bc992fe2c296b0d5eb0bed Mon Sep 17 00:00:00 2001 From: benbierens Date: Tue, 3 Oct 2023 14:32:31 +0200 Subject: [PATCH 09/13] Sets elasticsearch downloader for in-cluster running. --- .../ElasticSearchLogDownloader.cs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs b/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs index f66381d..a703d18 100644 --- a/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs +++ b/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs @@ -58,7 +58,9 @@ namespace ContinuousTests private IHttp CreateElasticSearchHttp() { - var address = new Address("http://localhost", 9200); + var serviceName = "elasticsearch"; + var k8sNamespace = "monitoring"; + var address = new Address($"http://{serviceName}.{k8sNamespace}.svc.cluster.local", 9200); var baseUrl = ""; return tools.CreateHttp(address, baseUrl, client => { @@ -201,24 +203,24 @@ namespace ContinuousTests public class SearchResponse { - public SearchHits hits { get; set; } + public SearchHits hits { get; set; } = new SearchHits(); } public class SearchHits { - public SearchHitEntry[] hits { get; set; } + public SearchHitEntry[] hits { get; set; } = Array.Empty(); } public class SearchHitEntry { - public SearchHitFields fields { get; set; } - public long[] sort { get; set; } + public SearchHitFields fields { get; set; } = new SearchHitFields(); + public long[] sort { get; set; } = Array.Empty(); } public class SearchHitFields { - public string[] @timestamp { get; set; } - public string[] message { get; set; } + public string[] @timestamp { get; set; } = Array.Empty(); + public string[] message { get; set; } = Array.Empty(); } } } From a69bea47af634fa96028a825145b614cf7e0f6ac Mon Sep 17 00:00:00 2001 From: benbierens Date: Tue, 3 Oct 2023 15:11:45 +0200 Subject: [PATCH 10/13] Debugging downloader in cluster --- Tests/CodexContinuousTests/ContinuousTest.cs | 2 ++ .../ElasticSearchLogDownloader.cs | 18 +++++++++++++++--- Tests/CodexContinuousTests/SingleTestRun.cs | 1 + .../Tests/HoldMyBeerTest.cs | 4 ++++ 4 files changed, 22 insertions(+), 3 deletions(-) diff --git a/Tests/CodexContinuousTests/ContinuousTest.cs b/Tests/CodexContinuousTests/ContinuousTest.cs index 5b74668..a8657e1 100644 --- a/Tests/CodexContinuousTests/ContinuousTest.cs +++ b/Tests/CodexContinuousTests/ContinuousTest.cs @@ -74,6 +74,8 @@ namespace ContinuousTests return GetType().Name; } } + + public IPluginTools Tools { get; internal set; } } public enum TestFailMode diff --git a/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs b/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs index a703d18..686e4d5 100644 --- a/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs +++ b/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs @@ -37,7 +37,7 @@ namespace ContinuousTests var queryTemplate = CreateQueryTemplate(container, startUtc, endUtc); targetFile.Write($"Downloading '{container.Name}' to '{targetFile.FullFilename}'."); - var reconstructor = new LogReconstructor(targetFile, http, queryTemplate); + var reconstructor = new LogReconstructor(log, targetFile, http, queryTemplate); reconstructor.DownloadFullLog(); log.Log("Log download finished."); @@ -50,10 +50,13 @@ namespace ContinuousTests var end = endUtc.ToString("o"); var source = "{ \"sort\": [ { \"@timestamp\": { \"order\": \"asc\" } } ], \"fields\": [ { \"field\": \"@timestamp\", \"format\": \"strict_date_optional_time\" }, { \"field\": \"pod_name\" }, { \"field\": \"message\" } ], \"size\": , \"_source\": false, \"query\": { \"bool\": { \"must\": [], \"filter\": [ { \"range\": { \"@timestamp\": { \"format\": \"strict_date_optional_time\", \"gte\": \"\", \"lte\": \"\" } } }, { \"match_phrase\": { \"pod_name\": \"\" } } ] } } }"; - return source + var result = source .Replace("", start) .Replace("", end) .Replace("", podName); + + log.Log($"query template: '{result}'"); + return result; } private IHttp CreateElasticSearchHttp() @@ -62,6 +65,9 @@ namespace ContinuousTests var k8sNamespace = "monitoring"; var address = new Address($"http://{serviceName}.{k8sNamespace}.svc.cluster.local", 9200); var baseUrl = ""; + + log.Log("elastic search: " + address.Host + ":" + address.Port); + return tools.CreateHttp(address, baseUrl, client => { client.DefaultRequestHeaders.Add("kbn-xsrf", "reporting"); @@ -71,6 +77,7 @@ namespace ContinuousTests public class LogReconstructor { private readonly List queue = new List(); + private readonly ILog log; private readonly LogFile targetFile; private readonly IHttp http; private readonly string queryTemplate; @@ -79,8 +86,9 @@ namespace ContinuousTests private int lastHits = 1; private ulong lastLogLine = 0; - public LogReconstructor(LogFile targetFile, IHttp http, string queryTemplate) + public LogReconstructor(ILog log, LogFile targetFile, IHttp http, string queryTemplate) { + this.log = log; this.targetFile = targetFile; this.http = http; this.queryTemplate = queryTemplate; @@ -101,8 +109,12 @@ namespace ContinuousTests .Replace("", sizeOfPage.ToString()) .Replace("", searchAfter); + log.Log($"query with size {sizeOfPage} and searchAfter '{searchAfter}'."); + var response = http.HttpPostString("_search", query); + log.Log("number of hits: " + response.hits.hits.Length); + lastHits = response.hits.hits.Length; if (lastHits > 0) { diff --git a/Tests/CodexContinuousTests/SingleTestRun.cs b/Tests/CodexContinuousTests/SingleTestRun.cs index e3043e8..7321d1e 100644 --- a/Tests/CodexContinuousTests/SingleTestRun.cs +++ b/Tests/CodexContinuousTests/SingleTestRun.cs @@ -210,6 +210,7 @@ namespace ContinuousTests { Log($" > Running TestMoment '{name}'"); handle.Test.Initialize(nodes, fixtureLog, entryPoint.Tools.GetFileManager(), config, cancelToken); + handle.Test.Tools = entryPoint.Tools; } private void DecommissionTest() diff --git a/Tests/CodexContinuousTests/Tests/HoldMyBeerTest.cs b/Tests/CodexContinuousTests/Tests/HoldMyBeerTest.cs index 76457e2..0b0b779 100644 --- a/Tests/CodexContinuousTests/Tests/HoldMyBeerTest.cs +++ b/Tests/CodexContinuousTests/Tests/HoldMyBeerTest.cs @@ -26,6 +26,10 @@ namespace ContinuousTests.Tests var dl = Nodes[0].DownloadContent(cid); file.AssertIsEqual(dl); + + var targetFile = Log.CreateSubfile(); + var downloader = new ElasticSearchLogDownloader(Tools, Log); + downloader.Download(targetFile, Nodes[0].Container, DateTime.UtcNow.Subtract(TimeSpan.FromMinutes(1)), DateTime.UtcNow.AddMinutes(1)); } } } From 696fb103861d1c6f533fefac93a35be6745c6c64 Mon Sep 17 00:00:00 2001 From: benbierens Date: Tue, 3 Oct 2023 15:36:44 +0200 Subject: [PATCH 11/13] debugging the downloader in the cluster --- Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs b/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs index 686e4d5..2f9890a 100644 --- a/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs +++ b/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs @@ -168,9 +168,15 @@ namespace ContinuousTests private void ProcessQueue() { + log.Log($"queue length: {queue.Count}"); + log.Log("lowest number: " + queue.Min(q => q.Number)); + log.Log("highest number: " + queue.Max(q => q.Number)); + while (queue.Any()) { ulong wantedNumber = lastLogLine + 1; + log.Log("looking for number: " + wantedNumber); + DeleteOldEntries(wantedNumber); var currentEntry = queue.FirstOrDefault(e => e.Number == wantedNumber); From 917d715497c7f5f9b82e26bc17e83b5ea1d53eb9 Mon Sep 17 00:00:00 2001 From: benbierens Date: Tue, 3 Oct 2023 16:11:14 +0200 Subject: [PATCH 12/13] Fixes startup of log reconstruction --- .../ElasticSearchLogDownloader.cs | 29 ++++++------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs b/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs index 2f9890a..c78ed63 100644 --- a/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs +++ b/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs @@ -37,7 +37,7 @@ namespace ContinuousTests var queryTemplate = CreateQueryTemplate(container, startUtc, endUtc); targetFile.Write($"Downloading '{container.Name}' to '{targetFile.FullFilename}'."); - var reconstructor = new LogReconstructor(log, targetFile, http, queryTemplate); + var reconstructor = new LogReconstructor(targetFile, http, queryTemplate); reconstructor.DownloadFullLog(); log.Log("Log download finished."); @@ -50,13 +50,10 @@ namespace ContinuousTests var end = endUtc.ToString("o"); var source = "{ \"sort\": [ { \"@timestamp\": { \"order\": \"asc\" } } ], \"fields\": [ { \"field\": \"@timestamp\", \"format\": \"strict_date_optional_time\" }, { \"field\": \"pod_name\" }, { \"field\": \"message\" } ], \"size\": , \"_source\": false, \"query\": { \"bool\": { \"must\": [], \"filter\": [ { \"range\": { \"@timestamp\": { \"format\": \"strict_date_optional_time\", \"gte\": \"\", \"lte\": \"\" } } }, { \"match_phrase\": { \"pod_name\": \"\" } } ] } } }"; - var result = source + return source .Replace("", start) .Replace("", end) .Replace("", podName); - - log.Log($"query template: '{result}'"); - return result; } private IHttp CreateElasticSearchHttp() @@ -66,8 +63,6 @@ namespace ContinuousTests var address = new Address($"http://{serviceName}.{k8sNamespace}.svc.cluster.local", 9200); var baseUrl = ""; - log.Log("elastic search: " + address.Host + ":" + address.Port); - return tools.CreateHttp(address, baseUrl, client => { client.DefaultRequestHeaders.Add("kbn-xsrf", "reporting"); @@ -77,18 +72,16 @@ namespace ContinuousTests public class LogReconstructor { private readonly List queue = new List(); - private readonly ILog log; private readonly LogFile targetFile; private readonly IHttp http; private readonly string queryTemplate; private const int sizeOfPage = 2000; private string searchAfter = ""; private int lastHits = 1; - private ulong lastLogLine = 0; + private ulong? lastLogLine; - public LogReconstructor(ILog log, LogFile targetFile, IHttp http, string queryTemplate) + public LogReconstructor(LogFile targetFile, IHttp http, string queryTemplate) { - this.log = log; this.targetFile = targetFile; this.http = http; this.queryTemplate = queryTemplate; @@ -109,12 +102,8 @@ namespace ContinuousTests .Replace("", sizeOfPage.ToString()) .Replace("", searchAfter); - log.Log($"query with size {sizeOfPage} and searchAfter '{searchAfter}'."); - var response = http.HttpPostString("_search", query); - log.Log("number of hits: " + response.hits.hits.Length); - lastHits = response.hits.hits.Length; if (lastHits > 0) { @@ -168,14 +157,14 @@ namespace ContinuousTests private void ProcessQueue() { - log.Log($"queue length: {queue.Count}"); - log.Log("lowest number: " + queue.Min(q => q.Number)); - log.Log("highest number: " + queue.Max(q => q.Number)); + if (lastLogLine == null) + { + lastLogLine = queue.Min(q => q.Number) - 1; + } while (queue.Any()) { - ulong wantedNumber = lastLogLine + 1; - log.Log("looking for number: " + wantedNumber); + ulong wantedNumber = lastLogLine.Value + 1; DeleteOldEntries(wantedNumber); From ccc47327524daf721043ece3657c9b383e9c1670 Mon Sep 17 00:00:00 2001 From: benbierens Date: Wed, 4 Oct 2023 09:36:59 +0200 Subject: [PATCH 13/13] Cleanup --- Framework/Core/CoreInterface.cs | 1 - Tests/CodexContinuousTests/ContinuousTest.cs | 2 -- Tests/CodexContinuousTests/SingleTestRun.cs | 10 +++------- Tests/CodexContinuousTests/Tests/HoldMyBeerTest.cs | 4 ---- 4 files changed, 3 insertions(+), 14 deletions(-) diff --git a/Framework/Core/CoreInterface.cs b/Framework/Core/CoreInterface.cs index 0358f93..21b9491 100644 --- a/Framework/Core/CoreInterface.cs +++ b/Framework/Core/CoreInterface.cs @@ -1,5 +1,4 @@ using KubernetesWorkflow; -using Utils; namespace Core { diff --git a/Tests/CodexContinuousTests/ContinuousTest.cs b/Tests/CodexContinuousTests/ContinuousTest.cs index a8657e1..5b74668 100644 --- a/Tests/CodexContinuousTests/ContinuousTest.cs +++ b/Tests/CodexContinuousTests/ContinuousTest.cs @@ -74,8 +74,6 @@ namespace ContinuousTests return GetType().Name; } } - - public IPluginTools Tools { get; internal set; } } public enum TestFailMode diff --git a/Tests/CodexContinuousTests/SingleTestRun.cs b/Tests/CodexContinuousTests/SingleTestRun.cs index 7321d1e..85283fa 100644 --- a/Tests/CodexContinuousTests/SingleTestRun.cs +++ b/Tests/CodexContinuousTests/SingleTestRun.cs @@ -6,7 +6,6 @@ using System.Reflection; using CodexPlugin; using DistTestCore.Logs; using Core; -using System.ComponentModel; namespace ContinuousTests { @@ -63,7 +62,6 @@ namespace ContinuousTests private void RunTest(Action resultHandler) { - var ci = entryPoint.CreateInterface(); var testStart = DateTime.UtcNow; try @@ -105,14 +103,13 @@ namespace ContinuousTests // The test failed just now. We can't expect the logs to be available in elastic-search immediately: Thread.Sleep(TimeSpan.FromMinutes(1)); - var effectiveStart = testStart.Subtract(TimeSpan.FromSeconds(10)); - var effectiveEnd = DateTime.UtcNow.AddSeconds(30); + var effectiveStart = testStart.Subtract(TimeSpan.FromSeconds(30)); + var effectiveEnd = DateTime.UtcNow; var elasticSearchLogDownloader = new ElasticSearchLogDownloader(entryPoint.Tools, fixtureLog); foreach (var node in nodes) { - var container = node.Container; - elasticSearchLogDownloader.Download(fixtureLog.CreateSubfile(), container, effectiveStart, effectiveEnd); + elasticSearchLogDownloader.Download(fixtureLog.CreateSubfile(), node.Container, effectiveStart, effectiveEnd); } } @@ -210,7 +207,6 @@ namespace ContinuousTests { Log($" > Running TestMoment '{name}'"); handle.Test.Initialize(nodes, fixtureLog, entryPoint.Tools.GetFileManager(), config, cancelToken); - handle.Test.Tools = entryPoint.Tools; } private void DecommissionTest() diff --git a/Tests/CodexContinuousTests/Tests/HoldMyBeerTest.cs b/Tests/CodexContinuousTests/Tests/HoldMyBeerTest.cs index 0b0b779..76457e2 100644 --- a/Tests/CodexContinuousTests/Tests/HoldMyBeerTest.cs +++ b/Tests/CodexContinuousTests/Tests/HoldMyBeerTest.cs @@ -26,10 +26,6 @@ namespace ContinuousTests.Tests var dl = Nodes[0].DownloadContent(cid); file.AssertIsEqual(dl); - - var targetFile = Log.CreateSubfile(); - var downloader = new ElasticSearchLogDownloader(Tools, Log); - downloader.Download(targetFile, Nodes[0].Container, DateTime.UtcNow.Subtract(TimeSpan.FromMinutes(1)), DateTime.UtcNow.AddMinutes(1)); } } }