Merge branch 'feature/log-mon'

# Conflicts:
#	Tests/CodexContinuousTests/ContinuousTestRunner.cs
This commit is contained in:
benbierens 2023-10-04 09:48:21 +02:00
commit 5db8be3252
No known key found for this signature in database
GPG Key ID: FE44815D96D0A1AA
5 changed files with 296 additions and 32 deletions

View File

@ -14,6 +14,7 @@ namespace Core
TResponse HttpPostJson<TRequest, TResponse>(string route, TRequest body); TResponse HttpPostJson<TRequest, TResponse>(string route, TRequest body);
string HttpPostJson<TRequest>(string route, TRequest body); string HttpPostJson<TRequest>(string route, TRequest body);
string HttpPostString(string route, string body); string HttpPostString(string route, string body);
TResponse HttpPostString<TResponse>(string route, string body);
string HttpPostStream(string route, Stream stream); string HttpPostStream(string route, Stream stream);
Stream HttpGetStream(string route); Stream HttpGetStream(string route);
T TryJsonDeserialize<T>(string json); T TryJsonDeserialize<T>(string json);
@ -79,7 +80,7 @@ namespace Core
public string HttpPostJson<TRequest>(string route, TRequest body) public string HttpPostJson<TRequest>(string route, TRequest body)
{ {
var response = PostJson<TRequest>(route, body); var response = PostJson(route, body);
return Time.Wait(response.Content.ReadAsStringAsync()); return Time.Wait(response.Content.ReadAsStringAsync());
} }
@ -99,6 +100,15 @@ namespace Core
}, $"HTTP-POST-STRING: {route}"); }, $"HTTP-POST-STRING: {route}");
} }
public TResponse HttpPostString<TResponse>(string route, string body)
{
var response = HttpPostString(route, body);
if (response == null) throw new Exception("Received no response.");
var result = JsonConvert.DeserializeObject<TResponse>(response);
if (result == null) throw new Exception("Failed to deserialize response");
return result;
}
public string HttpPostStream(string route, Stream stream) public string HttpPostStream(string route, Stream stream)
{ {
return Retry(() => return Retry(() =>

View File

@ -65,7 +65,7 @@ namespace ContinuousTests
overviewLog.Log("Finished launching test-loops."); overviewLog.Log("Finished launching test-loops.");
WaitUntilFinished(overviewLog, statusLog, startTime, testLoops); WaitUntilFinished(overviewLog, statusLog, startTime, testLoops);
overviewLog.Log("Cancelling all test-loops..."); overviewLog.Log("Stopping all test-loops...");
taskFactory.WaitAll(); taskFactory.WaitAll();
overviewLog.Log("All tasks cancelled."); overviewLog.Log("All tasks cancelled.");
@ -96,15 +96,20 @@ namespace ContinuousTests
if (config.TargetDurationSeconds > 0) if (config.TargetDurationSeconds > 0)
{ {
var targetDuration = TimeSpan.FromSeconds(config.TargetDurationSeconds); var targetDuration = TimeSpan.FromSeconds(config.TargetDurationSeconds);
cancelToken.WaitHandle.WaitOne(targetDuration); var wasCancelled = cancelToken.WaitHandle.WaitOne(targetDuration);
overviewLog.Log($"Congratulations! The targer duration has been reached! ({Time.FormatDuration(targetDuration)})"); if (!wasCancelled)
statusLog.ConcludeTest("Passed", testDuration, testData); {
Cancellation.Cts.Cancel();
overviewLog.Log($"Congratulations! The targer duration has been reached! ({Time.FormatDuration(targetDuration)})");
statusLog.ConcludeTest("Passed", testDuration, testData);
return;
}
} }
else else
{ {
cancelToken.WaitHandle.WaitOne(); cancelToken.WaitHandle.WaitOne();
statusLog.ConcludeTest("Failed", testDuration, testData);
} }
statusLog.ConcludeTest("Failed", testDuration, testData);
} }
private Dictionary<string, string> FormatTestRuns(TestLoop[] testLoops) private Dictionary<string, string> FormatTestRuns(TestLoop[] testLoops)

View File

@ -0,0 +1,234 @@
using Core;
using KubernetesWorkflow;
using Logging;
using Utils;
namespace ContinuousTests
{
public class ElasticSearchLogDownloader
{
private readonly IPluginTools tools;
private readonly ILog log;
public ElasticSearchLogDownloader(IPluginTools tools, ILog log)
{
this.tools = tools;
this.log = log;
}
public void Download(LogFile targetFile, RunningContainer container, DateTime startUtc, DateTime endUtc)
{
try
{
DownloadLog(targetFile, container, startUtc, endUtc);
}
catch (Exception ex)
{
log.Error("Failed to download log: " + ex);
}
}
private void DownloadLog(LogFile targetFile, RunningContainer container, DateTime startUtc, DateTime endUtc)
{
log.Log($"Downloading log (from ElasticSearch) for container '{container.Name}' within time range: " +
$"{startUtc.ToString("o")} - {endUtc.ToString("o")}");
var http = CreateElasticSearchHttp();
var queryTemplate = CreateQueryTemplate(container, startUtc, endUtc);
targetFile.Write($"Downloading '{container.Name}' to '{targetFile.FullFilename}'.");
var reconstructor = new LogReconstructor(targetFile, http, queryTemplate);
reconstructor.DownloadFullLog();
log.Log("Log download finished.");
}
private string CreateQueryTemplate(RunningContainer container, DateTime startUtc, DateTime endUtc)
{
var podName = container.Pod.PodInfo.Name;
var start = startUtc.ToString("o");
var end = endUtc.ToString("o");
var source = "{ \"sort\": [ { \"@timestamp\": { \"order\": \"asc\" } } ], \"fields\": [ { \"field\": \"@timestamp\", \"format\": \"strict_date_optional_time\" }, { \"field\": \"pod_name\" }, { \"field\": \"message\" } ], \"size\": <SIZE>, <SEARCHAFTER> \"_source\": false, \"query\": { \"bool\": { \"must\": [], \"filter\": [ { \"range\": { \"@timestamp\": { \"format\": \"strict_date_optional_time\", \"gte\": \"<STARTTIME>\", \"lte\": \"<ENDTIME>\" } } }, { \"match_phrase\": { \"pod_name\": \"<PODNAME>\" } } ] } } }";
return source
.Replace("<STARTTIME>", start)
.Replace("<ENDTIME>", end)
.Replace("<PODNAME>", podName);
}
private IHttp CreateElasticSearchHttp()
{
var serviceName = "elasticsearch";
var k8sNamespace = "monitoring";
var address = new Address($"http://{serviceName}.{k8sNamespace}.svc.cluster.local", 9200);
var baseUrl = "";
return tools.CreateHttp(address, baseUrl, client =>
{
client.DefaultRequestHeaders.Add("kbn-xsrf", "reporting");
});
}
public class LogReconstructor
{
private readonly List<LogQueueEntry> queue = new List<LogQueueEntry>();
private readonly LogFile targetFile;
private readonly IHttp http;
private readonly string queryTemplate;
private const int sizeOfPage = 2000;
private string searchAfter = "";
private int lastHits = 1;
private ulong? lastLogLine;
public LogReconstructor(LogFile targetFile, IHttp http, string queryTemplate)
{
this.targetFile = targetFile;
this.http = http;
this.queryTemplate = queryTemplate;
}
public void DownloadFullLog()
{
while (lastHits > 0)
{
QueryElasticSearch();
ProcessQueue();
}
}
private void QueryElasticSearch()
{
var query = queryTemplate
.Replace("<SIZE>", sizeOfPage.ToString())
.Replace("<SEARCHAFTER>", searchAfter);
var response = http.HttpPostString<SearchResponse>("_search", query);
lastHits = response.hits.hits.Length;
if (lastHits > 0)
{
UpdateSearchAfter(response);
foreach (var hit in response.hits.hits)
{
AddHitToQueue(hit);
}
}
}
private void AddHitToQueue(SearchHitEntry hit)
{
var message = hit.fields.message.Single();
var number = ParseCountNumber(message);
if (number != null)
{
queue.Add(new LogQueueEntry(message, number.Value));
}
}
private ulong? ParseCountNumber(string message)
{
if (string.IsNullOrEmpty(message)) return null;
var tokens = message.Split(' ', StringSplitOptions.RemoveEmptyEntries);
if (!tokens.Any()) return null;
var countToken = tokens.SingleOrDefault(t => t.StartsWith("count="));
if (countToken == null) return null;
var number = countToken.Substring(6);
if (ulong.TryParse(number, out ulong value))
{
return value;
}
return null;
}
private void UpdateSearchAfter(SearchResponse response)
{
var uniqueSearchNumbers = response.hits.hits.Select(h => h.sort.Single()).Distinct().ToList();
uniqueSearchNumbers.Reverse();
var searchNumber = GetSearchNumber(uniqueSearchNumbers);
searchAfter = $"\"search_after\": [{searchNumber}],";
}
private long GetSearchNumber(List<long> uniqueSearchNumbers)
{
if (uniqueSearchNumbers.Count == 1) return uniqueSearchNumbers.First();
return uniqueSearchNumbers.Skip(1).First();
}
private void ProcessQueue()
{
if (lastLogLine == null)
{
lastLogLine = queue.Min(q => q.Number) - 1;
}
while (queue.Any())
{
ulong wantedNumber = lastLogLine.Value + 1;
DeleteOldEntries(wantedNumber);
var currentEntry = queue.FirstOrDefault(e => e.Number == wantedNumber);
if (currentEntry != null)
{
WriteEntryToFile(currentEntry);
queue.Remove(currentEntry);
lastLogLine = currentEntry.Number;
}
else
{
// The line number we want is not in the queue.
// It will be returned by the elastic search query, some time in the future.
// Stop processing the queue for now.
return;
}
}
}
private void WriteEntryToFile(LogQueueEntry currentEntry)
{
targetFile.WriteRaw(currentEntry.Message);
}
private void DeleteOldEntries(ulong wantedNumber)
{
queue.RemoveAll(e => e.Number < wantedNumber);
}
public class LogQueueEntry
{
public LogQueueEntry(string message, ulong number)
{
Message = message;
Number = number;
}
public string Message { get; }
public ulong Number { get; }
}
public class SearchResponse
{
public SearchHits hits { get; set; } = new SearchHits();
}
public class SearchHits
{
public SearchHitEntry[] hits { get; set; } = Array.Empty<SearchHitEntry>();
}
public class SearchHitEntry
{
public SearchHitFields fields { get; set; } = new SearchHitFields();
public long[] sort { get; set; } = Array.Empty<long>();
}
public class SearchHitFields
{
public string[] @timestamp { get; set; } = Array.Empty<string>();
public string[] message { get; set; } = Array.Empty<string>();
}
}
}
}

View File

@ -62,11 +62,19 @@ namespace ContinuousTests
private void RunTest(Action<bool> resultHandler) private void RunTest(Action<bool> resultHandler)
{ {
var testStart = DateTime.UtcNow;
try try
{ {
RunTestMoments(); RunTestMoments();
if (!config.KeepPassedTestLogs) fixtureLog.Delete(); var duration = DateTime.UtcNow - testStart;
OverviewLog($" > Test passed. ({Time.FormatDuration(duration)})");
if (!config.KeepPassedTestLogs)
{
fixtureLog.Delete();
}
resultHandler(true); resultHandler(true);
} }
catch (Exception ex) catch (Exception ex)
@ -74,6 +82,8 @@ namespace ContinuousTests
fixtureLog.Error("Test run failed with exception: " + ex); fixtureLog.Error("Test run failed with exception: " + ex);
fixtureLog.MarkAsFailed(); fixtureLog.MarkAsFailed();
DownloadContainerLogs(testStart);
failureCount++; failureCount++;
resultHandler(false); resultHandler(false);
if (config.StopOnFailure > 0) if (config.StopOnFailure > 0)
@ -81,15 +91,28 @@ namespace ContinuousTests
OverviewLog($"Failures: {failureCount} / {config.StopOnFailure}"); OverviewLog($"Failures: {failureCount} / {config.StopOnFailure}");
if (failureCount >= config.StopOnFailure) if (failureCount >= config.StopOnFailure)
{ {
OverviewLog($"Configured to stop after {config.StopOnFailure} failures. Downloading cluster logs..."); OverviewLog($"Configured to stop after {config.StopOnFailure} failures.");
DownloadClusterLogs();
OverviewLog("Log download finished. Cancelling test runner...");
Cancellation.Cts.Cancel(); Cancellation.Cts.Cancel();
} }
} }
} }
} }
private void DownloadContainerLogs(DateTime testStart)
{
// The test failed just now. We can't expect the logs to be available in elastic-search immediately:
Thread.Sleep(TimeSpan.FromMinutes(1));
var effectiveStart = testStart.Subtract(TimeSpan.FromSeconds(30));
var effectiveEnd = DateTime.UtcNow;
var elasticSearchLogDownloader = new ElasticSearchLogDownloader(entryPoint.Tools, fixtureLog);
foreach (var node in nodes)
{
elasticSearchLogDownloader.Download(fixtureLog.CreateSubfile(), node.Container, effectiveStart, effectiveEnd);
}
}
private void ApplyLogReplacements(FixtureLog fixtureLog, StartupChecker startupChecker) private void ApplyLogReplacements(FixtureLog fixtureLog, StartupChecker startupChecker)
{ {
foreach (var replacement in startupChecker.LogReplacements) fixtureLog.AddStringReplace(replacement.From, replacement.To); foreach (var replacement in startupChecker.LogReplacements) fixtureLog.AddStringReplace(replacement.From, replacement.To);
@ -100,10 +123,8 @@ namespace ContinuousTests
var earliestMoment = handle.GetEarliestMoment(); var earliestMoment = handle.GetEarliestMoment();
var t = earliestMoment; var t = earliestMoment;
while (true) while (!cancelToken.IsCancellationRequested)
{ {
cancelToken.ThrowIfCancellationRequested();
RunMoment(t); RunMoment(t);
if (handle.Test.TestFailMode == TestFailMode.StopAfterFirstFailure && exceptions.Any()) if (handle.Test.TestFailMode == TestFailMode.StopAfterFirstFailure && exceptions.Any())
@ -126,10 +147,10 @@ namespace ContinuousTests
{ {
ThrowFailTest(); ThrowFailTest();
} }
OverviewLog(" > Test passed.");
return; return;
} }
} }
fixtureLog.Log("Test run has been cancelled.");
} }
private void ThrowFailTest() private void ThrowFailTest()
@ -141,19 +162,6 @@ namespace ContinuousTests
throw new Exception(exceptionsMessage); throw new Exception(exceptionsMessage);
} }
private void DownloadClusterLogs()
{
var entryPointFactory = new EntryPointFactory();
var log = new NullLog();
log.FullFilename = Path.Combine(config.LogPath, "NODE");
var entryPoint = entryPointFactory.CreateEntryPoint(config.KubeConfigFile, config.DataPath, config.CodexDeployment.Metadata.KubeNamespace, log);
foreach (var container in config.CodexDeployment.CodexContainers)
{
entryPoint.CreateInterface().DownloadLog(container);
}
}
private string GetCombinedExceptionsMessage(Exception[] exceptions) private string GetCombinedExceptionsMessage(Exception[] exceptions)
{ {
return string.Join(Environment.NewLine, exceptions.Select(ex => ex.ToString())); return string.Join(Environment.NewLine, exceptions.Select(ex => ex.ToString()));

View File

@ -13,6 +13,7 @@ namespace ContinuousTests
private readonly StartupChecker startupChecker; private readonly StartupChecker startupChecker;
private readonly CancellationToken cancelToken; private readonly CancellationToken cancelToken;
private readonly EventWaitHandle runFinishedHandle = new EventWaitHandle(true, EventResetMode.ManualReset); private readonly EventWaitHandle runFinishedHandle = new EventWaitHandle(true, EventResetMode.ManualReset);
private static object testLock = new object();
public TestLoop(EntryPointFactory entryPointFactory, TaskFactory taskFactory, Configuration config, ILog overviewLog, Type testType, TimeSpan runsEvery, StartupChecker startupChecker, CancellationToken cancelToken) public TestLoop(EntryPointFactory entryPointFactory, TaskFactory taskFactory, Configuration config, ILog overviewLog, Type testType, TimeSpan runsEvery, StartupChecker startupChecker, CancellationToken cancelToken)
{ {
@ -39,15 +40,21 @@ namespace ContinuousTests
{ {
NumberOfPasses = 0; NumberOfPasses = 0;
NumberOfFailures = 0; NumberOfFailures = 0;
while (true) while (!cancelToken.IsCancellationRequested)
{ {
WaitHandle.WaitAny(new[] { runFinishedHandle, cancelToken.WaitHandle }); lock (testLock)
// In the original design, multiple tests are allowed to interleave their test-moments, increasing test through-put.
// Since we're still stabilizing some of the basics, this lock limits us to 1 test run at a time.
{
WaitHandle.WaitAny(new[] { runFinishedHandle, cancelToken.WaitHandle });
cancelToken.ThrowIfCancellationRequested(); cancelToken.ThrowIfCancellationRequested();
StartTest(); StartTest();
cancelToken.WaitHandle.WaitOne(runsEvery); cancelToken.WaitHandle.WaitOne(runsEvery);
}
Thread.Sleep(100);
} }
} }
catch (OperationCanceledException) catch (OperationCanceledException)