From 7610b4e8bfcdfa249851e94588f99c5d8b198f19 Mon Sep 17 00:00:00 2001 From: Eric <5089238+emizzle@users.noreply.github.com> Date: Wed, 19 Mar 2025 12:59:06 +1100 Subject: [PATCH] Continuously read from stdout/stderr to prevent buffers from filling --- tests/integration/nodeprocess.nim | 4 +- tests/integration/testmanager.nim | 95 ++++++++++++++++++------------- 2 files changed, 58 insertions(+), 41 deletions(-) diff --git a/tests/integration/nodeprocess.nim b/tests/integration/nodeprocess.nim index 2852ea67..0dabff77 100644 --- a/tests/integration/nodeprocess.nim +++ b/tests/integration/nodeprocess.nim @@ -92,8 +92,8 @@ proc captureOutput( node.onOutputLineCaptured(line) - await sleepAsync(1.millis) - await sleepAsync(1.millis) + await sleepAsync(1.nanos) + await sleepAsync(1.nanos) except CancelledError: discard # do not propagate as captureOutput was asyncSpawned except AsyncStreamError as e: diff --git a/tests/integration/testmanager.nim b/tests/integration/testmanager.nim index c587ec31..f2711b62 100644 --- a/tests/integration/testmanager.nim +++ b/tests/integration/testmanager.nim @@ -75,8 +75,8 @@ type logsDir: string TestOutput = ref object - stdOut*: string - stdErr*: string + stdOut*: seq[string] + stdErr*: seq[string] exitCode*: ?int TestManagerError* = object of CatchableError @@ -135,7 +135,7 @@ func isErrorLike(output: ?!TestOutput): bool = # "LevelDB already build" line and blank line that is output to stdout. This # typically means that the exitCode == 1 (test failed) and if stdout is short, # we're dealing with an error - o =? output and o.stdOut.countLines() < 3 + o =? output and o.stdOut.len < 3 proc new*( _: type TestManager, @@ -276,29 +276,29 @@ proc printResult(test: IntegrationTest, printStdOut, printStdErr: bool) = if output =? test.output: if printStdErr: test.printOutputMarker(MarkerPosition.Start, "test file errors (stderr)") - echo output.stdErr + echo output.stdErr.join("\n") test.printOutputMarker(MarkerPosition.Finish, "test file errors (stderr)") of IntegrationTestStatus.Failed: if output =? test.output: if printStdErr: test.printOutputMarker(MarkerPosition.Start, "test file errors (stderr)") - echo output.stdErr + echo output.stdErr.join("\n") test.printOutputMarker(MarkerPosition.Finish, "test file errors (stderr)") if printStdOut: test.printOutputMarker(MarkerPosition.Start, "codex node output (stdout)") - echo output.stdOut.colorise + echo output.stdOut.join("\n").colorise test.printOutputMarker(MarkerPosition.Finish, "codex node output (stdout)") test.printResult(fgRed) of IntegrationTestStatus.Timeout: if printStdOut and output =? test.output: test.printOutputMarker(MarkerPosition.Start, "codex node output (stdout)") - echo output.stdOut.colorise + echo output.stdOut.join("\n").colorise test.printOutputMarker(MarkerPosition.Finish, "codex node output (stdout)") test.printResult(fgYellow) of IntegrationTestStatus.Ok: if printStdOut and output =? test.output: test.printOutputMarker(MarkerPosition.Start, "codex node output (stdout)") - echo output.stdOut.colorise + echo output.stdOut.join("\n").colorise test.printOutputMarker(MarkerPosition.Finish, "codex node output (stdout)") test.printResult(fgGreen) @@ -404,11 +404,7 @@ proc teardownHardhat(test: IntegrationTest, hardhat: Hardhat) {.async: (raises: test.manager.hardhats.keepItIf(it != hardhat) -proc teardownTest( - test: IntegrationTest, - stdOutStream, stdErrStream: - Future[seq[byte]].Raising([CancelledError, AsyncStreamError]), -) {.async: (raises: []).} = +proc teardownTest(test: IntegrationTest) {.async: (raises: []).} = logScope: test = test.config.name @@ -428,20 +424,15 @@ proc teardownTest( warn "Test process failed to terminate, check for zombies", error = e.msg try: - trace "Reading stdout and stderr streams" - output.stdOut = string.fromBytes(await noCancel stdOutStream.wait(1.seconds)) - output.stdErr = string.fromBytes(await noCancel stdErrStream.wait(1.seconds)) - test.output = success output - trace "Writing stdout and/or stderr streams to file", stdoutFile = test.logFile("stdout.log"), - stdoutBytes = output.stdOut.len, + stdoutLines = output.stdOut.len, stderrFile = test.logFile("stderr.log"), - stderrBytes = output.stdErr.len - test.logFile("stdout.log").appendFile(output.stdOut.stripAnsi) + stderrLines = output.stdErr.len + test.logFile("stdout.log").appendFile(output.stdOut.join("\n").stripAnsi) if test.status == IntegrationTestStatus.Error or (test.status == IntegrationTestStatus.Failed and test.output.isErrorLike): - test.logFile("stderr.log").appendFile(output.stdErr.stripAnsi) + test.logFile("stderr.log").appendFile(output.stdErr.join("\n").stripAnsi) except AsyncTimeoutError: error "Timeout waiting for stdout or stderr stream contents, nothing will be written to file" except IOError as e: @@ -450,23 +441,15 @@ proc teardownTest( error "Failed to read test process output stream", error = e.msg test.output = TestOutput.failure(e) test.status = IntegrationTestStatus.Error - finally: - await stdOutStream.cancelAndWait() - await stdErrStream.cancelAndWait() await test.process.closeWait() trace "Test process output streams closed" -proc teardown( - test: IntegrationTest, - hardhat: ?Hardhat, - stdOutStream, stdErrStream: - Future[seq[byte]].Raising([CancelledError, AsyncStreamError]), -) {.async: (raises: []).} = +proc teardown(test: IntegrationTest, hardhat: ?Hardhat) {.async: (raises: []).} = if test.config.startHardhat and hardhat =? hardhat and not hardhat.process.isNil: await test.teardownHardhat(hardhat) - await test.teardownTest(stdOutStream, stdErrStream) + await test.teardownTest() proc untilTimeout( fut: InternalRaisesFuture, timeout: Duration @@ -495,6 +478,35 @@ proc untilTimeout( else: # timeout raise newException(AsyncTimeoutError, "Timed out") +proc captureOutput( + process: AsyncProcessRef, stream: AsyncStreamReader +): Future[seq[string]] {.async: (raises: [CancelledError]).} = + var output: seq[string] = @[] + try: + while process.running.option == some true: + while (let line = await stream.readLine(0, "\n"); line != ""): + output.add line + await sleepAsync(1.nanos) + await sleepAsync(1.nanos) + return output + except CancelledError as e: + raise e + except AsyncStreamError as e: + error "Error reading output stream", error = e.msg + +proc captureProcessOutput( + test: IntegrationTest +): Future[(seq[string], seq[string])] {.async: (raises: [CancelledError]).} = + logScope: + name = test.config.name + + trace "Reading stdout and stderr streams from test process" + + let futStdOut = test.process.captureOutput(test.process.stdoutStream) + let futStdErr = test.process.captureOutput(test.process.stderrStream) + await allFutures(futStdOut, futStdErr) + return (await futStdOut, await futStdErr) + proc start(test: IntegrationTest) {.async: (raises: []).} = logScope: name = test.config.name @@ -531,6 +543,7 @@ proc start(test: IntegrationTest) {.async: (raises: []).} = trace "Starting parallel integration test", command = test.command, timeout = test.manager.config.testTimeout test.printStart() + try: test.process = await startProcess( command = test.command, @@ -545,17 +558,17 @@ proc start(test: IntegrationTest) {.async: (raises: []).} = test.status = IntegrationTestStatus.Error return - var output = TestOutput.new() - test.output = success(output) - let outputReader = test.process.stdoutStream.read() - let errorReader = test.process.stderrStream.read() - + var futCaptureOutput: Future[(seq[string], seq[string])].Raising([]) defer: # called at the end of successful runs but also when `start` is cancelled # (from `untilTimeout`) due to a timeout. This defer runs first before # `untilTimeout` exceptions are handled in `run` - await test.teardown(hardhat, outputReader, errorReader) - # doesn't raise CancelledError, so noCancel not needed + await test.teardown(hardhat) # doesn't raise CancelledError, so noCancel not needed + await futCaptureOutput.cancelAndWait() + + var output = TestOutput.new() + test.output = success(output) + futCaptureOutput = noCancel test.captureProcessOutput() output.exitCode = try: @@ -573,6 +586,10 @@ proc start(test: IntegrationTest) {.async: (raises: []).} = error "Test failed to complete", error = e.msg return + let (stdOut, stdErr) = await futCaptureOutput + output.stdOut = stdOut + output.stdErr = stdErr + test.status = if output.exitCode == some QuitSuccess: IntegrationTestStatus.Ok