From 6a96bd7639f1a9a92dafd7240e362edff833311d Mon Sep 17 00:00:00 2001 From: benbierens Date: Mon, 2 Oct 2023 14:42:36 +0200 Subject: [PATCH] Implements elastic search log downloader. Requires log-counter codex update! --- Framework/Core/CoreInterface.cs | 11 - Framework/Core/Http.cs | 10 + .../ContainerLogStream.cs | 85 ------- .../ElasticSearchLogDownloader.cs | 210 ++++++++++++++++++ Tests/CodexContinuousTests/SingleTestRun.cs | 39 ++-- Tests/CodexTests/BasicTests/ExampleTests.cs | 128 ----------- 6 files changed, 242 insertions(+), 241 deletions(-) delete mode 100644 Tests/CodexContinuousTests/ContainerLogStream.cs create mode 100644 Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs diff --git a/Framework/Core/CoreInterface.cs b/Framework/Core/CoreInterface.cs index 217641d..28788aa 100644 --- a/Framework/Core/CoreInterface.cs +++ b/Framework/Core/CoreInterface.cs @@ -58,17 +58,6 @@ namespace Core var workflow = entryPoint.Tools.CreateWorkflow(); return workflow.ExecuteCommand(container, command, args); } - - public IHttp CreateHttp() - { - var address = new Address("http://localhost", 9200); - var baseUrl = ""; - return entryPoint.Tools.CreateHttp(address, baseUrl, client => - { - // -H "kbn-xsrf: reporting" -H "Content-Type: application/json" - client.DefaultRequestHeaders.Add("kbn-xsrf", "reporting"); - }); - } } public interface IHasContainer diff --git a/Framework/Core/Http.cs b/Framework/Core/Http.cs index aef61a9..60e52db 100644 --- a/Framework/Core/Http.cs +++ b/Framework/Core/Http.cs @@ -14,6 +14,7 @@ namespace Core TResponse HttpPostJson(string route, TRequest body); string HttpPostJson(string route, TRequest body); string HttpPostString(string route, string body); + TResponse HttpPostString(string route, string body); string HttpPostStream(string route, Stream stream); Stream HttpGetStream(string route); T TryJsonDeserialize(string json); @@ -99,6 +100,15 @@ namespace Core }, $"HTTP-POST-STRING: {route}"); } + public TResponse HttpPostString(string route, string body) + { + var response = HttpPostString(route, body); + if (response == null) throw new Exception("Received no response."); + var result = JsonConvert.DeserializeObject(response); + if (result == null) throw new Exception("Failed to deserialize response"); + return result; + } + public string HttpPostStream(string route, Stream stream) { return Retry(() => diff --git a/Tests/CodexContinuousTests/ContainerLogStream.cs b/Tests/CodexContinuousTests/ContainerLogStream.cs deleted file mode 100644 index 7b7a340..0000000 --- a/Tests/CodexContinuousTests/ContainerLogStream.cs +++ /dev/null @@ -1,85 +0,0 @@ -using Logging; - -namespace ContinuousTests -{ - public class ContainerLogStream - { - private readonly StreamReader reader; - private readonly Stream stream; - private readonly LogFile targetFile; - private readonly CancellationToken token; - private readonly TaskFactory taskFactory; - private int lastNumber = -1; - public bool Fault { get; private set; } - private bool run; - - public ContainerLogStream(Stream stream, string name, LogFile targetFile, CancellationToken token, TaskFactory taskFactory) - { - this.stream = stream; - this.targetFile = targetFile; - this.token = token; - this.taskFactory = taskFactory; - Fault = false; - reader = new StreamReader(stream); - - targetFile.Write(name); - } - - public void Run() - { - run = true; - taskFactory.Run(() => - { - while (run && !token.IsCancellationRequested) - { - Monitor(); - } - }); - } - - public void Stop() - { - run = false; - stream.Close(); - } - - public void DeleteFile() - { - if (run) throw new Exception("Cannot delete file while stream is still running."); - File.Delete(targetFile.FullFilename); - } - - private void Monitor() - { - var line = reader.ReadLine(); - while (run && !string.IsNullOrEmpty(line) && !token.IsCancellationRequested) - { - ProcessLine(line); - line = reader.ReadLine(); - } - } - - private void ProcessLine(string s) - { - targetFile.WriteRaw(s); - - // 000000004298 - var sub = s.Substring(0, 12); - if (!int.TryParse(sub, out int number)) return; - - if (lastNumber == -1) - { - lastNumber = number; - } - else - { - var expectedNumber = lastNumber + 1; - if (number != expectedNumber) - { - Fault = true; - } - lastNumber = number; - } - } - } -} diff --git a/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs b/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs new file mode 100644 index 0000000..84f4b38 --- /dev/null +++ b/Tests/CodexContinuousTests/ElasticSearchLogDownloader.cs @@ -0,0 +1,210 @@ +using Core; +using KubernetesWorkflow; +using Logging; +using Utils; + +namespace ContinuousTests +{ + public class ElasticSearchLogDownloader + { + private readonly IPluginTools tools; + private readonly ILog log; + + public ElasticSearchLogDownloader(IPluginTools tools, ILog log) + { + this.tools = tools; + this.log = log; + } + + public void Download(LogFile targetFile, RunningContainer container, DateTime startUtc, DateTime endUtc) + { + try + { + DownloadLog(targetFile, container, startUtc, endUtc); + } + catch (Exception ex) + { + log.Error("Failed to download log: " + ex); + } + } + + private void DownloadLog(LogFile targetFile, RunningContainer container, DateTime startUtc, DateTime endUtc) + { + log.Log($"Downloading log (from ElasticSearch) for container '{container.Name}' within time range: " + + $"{startUtc.ToString("o")} - {endUtc.ToString("o")}"); + + var http = CreateElasticSearchHttp(); + var queryTemplate = CreateQueryTemplate(container, startUtc, endUtc); + + targetFile.Write($"Downloading '{container.Name}' to '{targetFile.FullFilename}'."); + var reconstructor = new LogReconstructor(targetFile, http, queryTemplate); + reconstructor.DownloadFullLog(); + + log.Log("Log download finished."); + } + + private string CreateQueryTemplate(RunningContainer container, DateTime startUtc, DateTime endUtc) + { + var podName = container.Pod.PodInfo.Name; + var start = startUtc.ToString("o"); + var end = endUtc.ToString("o"); + + var source = "{ \"sort\": [ { \"@timestamp\": { \"order\": \"asc\" } } ], \"fields\": [ { \"field\": \"@timestamp\", \"format\": \"strict_date_optional_time\" }, { \"field\": \"pod_name\" }, { \"field\": \"message\" } ], \"size\": , \"_source\": false, \"query\": { \"bool\": { \"must\": [], \"filter\": [ { \"range\": { \"@timestamp\": { \"format\": \"strict_date_optional_time\", \"gte\": \"\", \"lte\": \"\" } } }, { \"match_phrase\": { \"pod_name\": \"\" } } ] } } }"; + return source + .Replace("", start) + .Replace("", end) + .Replace("", podName); + } + + private IHttp CreateElasticSearchHttp() + { + var address = new Address("http://localhost", 9200); + var baseUrl = ""; + return tools.CreateHttp(address, baseUrl, client => + { + client.DefaultRequestHeaders.Add("kbn-xsrf", "reporting"); + }); + } + + public class LogReconstructor + { + private readonly List queue = new List(); + private readonly LogFile targetFile; + private readonly IHttp http; + private readonly string queryTemplate; + private const int sizeOfPage = 2000; + private string searchAfter = ""; + private int lastHits = 1; + private int lastLogLine = -1; + + public LogReconstructor(LogFile targetFile, IHttp http, string queryTemplate) + { + this.targetFile = targetFile; + this.http = http; + this.queryTemplate = queryTemplate; + } + + public void DownloadFullLog() + { + while (lastHits > 0) + { + QueryElasticSearch(); + ProcessQueue(); + } + } + + private void QueryElasticSearch() + { + var query = queryTemplate + .Replace("", sizeOfPage.ToString()) + .Replace("", searchAfter); + + var response = http.HttpPostString("_search", query); + + lastHits = response.hits.hits.Length; + if (lastHits > 0) + { + UpdateSearchAfter(response); + foreach (var hit in response.hits.hits) + { + AddHitToQueue(hit); + } + } + } + + private void AddHitToQueue(SearchHitEntry hit) + { + var message = hit.fields.message.Single(); + var sub = message.Substring(0, 12); + if (int.TryParse(sub, out int number)) + { + queue.Add(new LogQueueEntry(message, number)); + } + } + + private void UpdateSearchAfter(SearchResponse response) + { + var uniqueSearchNumbers = response.hits.hits.Select(h => h.sort.Single()).Distinct().ToList(); + uniqueSearchNumbers.Reverse(); + + var searchNumber = GetSearchNumber(uniqueSearchNumbers); + searchAfter = $"\"search_after\": [{searchNumber}],"; + } + + private long GetSearchNumber(List uniqueSearchNumbers) + { + if (uniqueSearchNumbers.Count == 1) return uniqueSearchNumbers.First(); + return uniqueSearchNumbers.Skip(1).First(); + } + + private void ProcessQueue() + { + while (queue.Any()) + { + var wantedNumber = lastLogLine + 1; + DeleteOldEntries(wantedNumber); + + var currentEntry = queue.FirstOrDefault(e => e.Number == wantedNumber); + + if (currentEntry != null) + { + WriteEntryToFile(currentEntry); + queue.Remove(currentEntry); + lastLogLine = currentEntry.Number; + } + else + { + // The line number we want is not in the queue. + // It will be returned by the elastic search query, some time in the future. + // Stop processing the queue for now. + return; + } + } + } + + private void WriteEntryToFile(LogQueueEntry currentEntry) + { + targetFile.WriteRaw(currentEntry.Message); + } + + private void DeleteOldEntries(int wantedNumber) + { + queue.RemoveAll(e => e.Number < wantedNumber); + } + + public class LogQueueEntry + { + public LogQueueEntry(string message, int number) + { + Message = message; + Number = number; + } + + public string Message { get; } + public int Number { get; } + } + + public class SearchResponse + { + public SearchHits hits { get; set; } + } + + public class SearchHits + { + public SearchHitEntry[] hits { get; set; } + } + + public class SearchHitEntry + { + public SearchHitFields fields { get; set; } + public long[] sort { get; set; } + } + + public class SearchHitFields + { + public string[] @timestamp { get; set; } + public string[] message { get; set; } + } + } + } +} diff --git a/Tests/CodexContinuousTests/SingleTestRun.cs b/Tests/CodexContinuousTests/SingleTestRun.cs index 8cb384f..e3043e8 100644 --- a/Tests/CodexContinuousTests/SingleTestRun.cs +++ b/Tests/CodexContinuousTests/SingleTestRun.cs @@ -6,6 +6,7 @@ using System.Reflection; using CodexPlugin; using DistTestCore.Logs; using Core; +using System.ComponentModel; namespace ContinuousTests { @@ -63,24 +64,18 @@ namespace ContinuousTests private void RunTest(Action resultHandler) { var ci = entryPoint.CreateInterface(); - var monitors = nodes.Select(n => new ContainerLogStream( - stream: ci.MonitorLog(n), - name: n.GetName(), - targetFile: fixtureLog.CreateSubfile(), - token: cancelToken, - taskFactory: taskFactory)).ToArray(); - + var testStart = DateTime.UtcNow; + try { - foreach (var m in monitors) m.Run(); - fixtureLog.Log("Monitor start"); - RunTestMoments(); + var duration = DateTime.UtcNow - testStart; + OverviewLog($" > Test passed. ({Time.FormatDuration(duration)})"); + if (!config.KeepPassedTestLogs) { fixtureLog.Delete(); - foreach (var m in monitors) m.DeleteFile(); } resultHandler(true); } @@ -89,6 +84,8 @@ namespace ContinuousTests fixtureLog.Error("Test run failed with exception: " + ex); fixtureLog.MarkAsFailed(); + DownloadContainerLogs(testStart); + failureCount++; resultHandler(false); if (config.StopOnFailure > 0) @@ -101,12 +98,21 @@ namespace ContinuousTests } } } - finally + } + + private void DownloadContainerLogs(DateTime testStart) + { + // The test failed just now. We can't expect the logs to be available in elastic-search immediately: + Thread.Sleep(TimeSpan.FromMinutes(1)); + + var effectiveStart = testStart.Subtract(TimeSpan.FromSeconds(10)); + var effectiveEnd = DateTime.UtcNow.AddSeconds(30); + var elasticSearchLogDownloader = new ElasticSearchLogDownloader(entryPoint.Tools, fixtureLog); + + foreach (var node in nodes) { - Thread.Sleep(1000); - fixtureLog.Log("Monitor stop"); - foreach (var m in monitors) m.Stop(); - if (monitors.Any(m => m.Fault)) throw new Exception("One or more downloaded container log is missing lines!"); + var container = node.Container; + elasticSearchLogDownloader.Download(fixtureLog.CreateSubfile(), container, effectiveStart, effectiveEnd); } } @@ -144,7 +150,6 @@ namespace ContinuousTests { ThrowFailTest(); } - OverviewLog(" > Test passed."); return; } } diff --git a/Tests/CodexTests/BasicTests/ExampleTests.cs b/Tests/CodexTests/BasicTests/ExampleTests.cs index cac9935..4049b9a 100644 --- a/Tests/CodexTests/BasicTests/ExampleTests.cs +++ b/Tests/CodexTests/BasicTests/ExampleTests.cs @@ -2,9 +2,6 @@ using DistTestCore; using GethPlugin; using MetricsPlugin; -using Microsoft.IdentityModel.Abstractions; -using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities; -using Newtonsoft.Json; using NUnit.Framework; using Utils; @@ -13,131 +10,6 @@ namespace Tests.BasicTests [TestFixture] public class ExampleTests : CodexDistTest { - [Test] - public void AAA() - { - var http = this.Ci.CreateHttp(); - - //var query = "{\r\n \"query\": {\r\n \"bool\": {\r\n \"filter\": [\r\n {\r\n \"term\": {\r\n \"container_image.keyword\": \"docker.io/codexstorage/nim-codex:sha-9d735f9-dist-tests\"\r\n }\r\n },\r\n {\r\n \"term\": {\r\n \"pod_namespace.keyword\": \"codex-continuous-tests\"\r\n }\r\n },\r\n {\r\n \"term\": {\r\n \"pod_name.keyword\": \"codex3-workflow3-ff476767d-98zx4\"\r\n }\r\n },\r\n {\r\n \"range\": {\r\n \"@timestamp\": {\r\n \"lte\": \"2023-09-25T13:02:23.559Z\",\r\n \"gt\": \"2023-09-25T20:00:00.000Z\"\r\n }\r\n }\r\n }\r\n ]\r\n }\r\n }\r\n}"; - //var query = "{ \"query\": {\"bool\": { \"filter\": [{ \"term\": {\"container_image.keyword\": \"docker.io/codexstorage/nim-codex:sha-9d735f9-dist-tests\" }},{ \"term\": {\"pod_namespace.keyword\": \"codex-continuous-tests\" }},{ \"term\": {\"pod_name.keyword\": \"codex3-workflow3-ff476767d-98zx4\" }},{ \"range\": {\"@timestamp\": { \"lte\": \"2023-09-29T13:02:23.559Z\", \"gt\": \"2023-09-20T20:00:00.000Z\"} }} ]} }}"; - - - - //var queryTemplate = "{ \"query\": {\"bool\": { \"filter\": [{ \"term\": {\"pod_namespace.keyword\": \"codex-continuous-tests\" }},{ \"term\": {\"pod_name.keyword\": \"bootstrap-2-599dfb4d65-v4v2b\" }},{ \"range\": {\"@timestamp\": { \"lte\": \"2023-10-03T13:02:23.559Z\", \"gt\": \"2023-09-01T20:00:00.000Z\"} }} ]} },\t\"_source\": \"message\",\t\"from\": ,\t\"size\": }"; - var queryTemplate = "{ \"sort\": [ { \"@timestamp\": { \"order\": \"asc\" } } ], \"fields\": [ { \"field\": \"@timestamp\", \"format\": \"strict_date_optional_time\" }, { \"field\": \"pod_name\" }, { \"field\": \"message\" } ], \"size\": , \"_source\": false, \"query\": { \"bool\": { \"must\": [], \"filter\": [ { \"range\": { \"@timestamp\": { \"format\": \"strict_date_optional_time\", \"gte\": \"2023-10-01T00:00:00.000Z\", \"lte\": \"2023-10-03T09:00:00.000Z\" } } }, { \"match_phrase\": { \"pod_name\": \"bootstrap-2-599dfb4d65-v4v2b\" } } ] } } }"; - - var outputFile = "c:\\Users\\Ben\\Desktop\\Cluster\\reconstructed.log"; - - var sizeOfPage = 2000; - var searchAfter = ""; - var lastHits = 1; - var totalHits = 0; - var lastLogLine = -1; - - var queue = new List(); - //var jumpback = 0; - - while (lastHits > 0) - { - //if (queue.Any()) jumpback++; - //else jumpback = 0; - - var query = queryTemplate - .Replace("", sizeOfPage.ToString()) - .Replace("", searchAfter); - - var output = http.HttpPostString("_search", query); - - var response = JsonConvert.DeserializeObject(output)!; - - lastHits = response.hits.hits.Length; - totalHits += response.hits.hits.Length; - if (lastHits > 0) - { - var uniqueSearchNumbers = response.hits.hits.Select(h => h.sort.Single()).Distinct().ToList(); - uniqueSearchNumbers.Reverse(); - var searchNumber = uniqueSearchNumbers.Skip(1).First().ToString(); - searchAfter = $"\"search_after\": [{searchNumber}],"; - - foreach (var hit in response.hits.hits) - { - var message = hit.fields.message.Single(); - var sub = message.Substring(0, 12); - if (int.TryParse(sub, out int number)) - { - queue.Add(new QueryLogEntry(message, number)); - } - } - } - - // unload queue - var runQueue = 1; - while (runQueue > 0) - { - var wantedNumber = lastLogLine + 1; - var oldOnes = queue.Where(e => e.Number < wantedNumber).ToList(); - foreach (var old in oldOnes) queue.Remove(old); - - var entry = queue.FirstOrDefault(e => e.Number == wantedNumber); - if (entry != null) - { - File.AppendAllLines(outputFile, new[] { entry.Message }); - queue.Remove(entry); - lastLogLine = entry.Number; - if (!queue.Any()) runQueue = 0; - } - else - { - runQueue = 0; - } - } - } - - - //var ouput = http.HttpGetString(""); - - var aaa = 0; - } - - public class QueryLogEntry - { - public QueryLogEntry(string message, int number) - { - Message = message; - Number = number; - } - - public string Message { get; } - public int Number { get; } - - public override string ToString() - { - return Number.ToString(); - } - } - - public class SearchResponse - { - public SearchHits hits { get; set; } - } - - public class SearchHits - { - public SearchHitEntry[] hits { get; set; } - } - - public class SearchHitEntry - { - public SearchHitFields fields { get; set; } - public long[] sort { get; set; } - } - - public class SearchHitFields - { - public string[] @timestamp { get; set; } - public string[] message { get; set; } - } - [Test] public void CodexLogExample() {