Continuously read from stdout/stderr to prevent buffers from filling

This commit is contained in:
Eric 2025-03-19 12:59:06 +11:00
parent a9b094ae86
commit 114a46afa2
No known key found for this signature in database
2 changed files with 58 additions and 41 deletions

View File

@ -92,8 +92,8 @@ proc captureOutput(
node.onOutputLineCaptured(line)
await sleepAsync(1.millis)
await sleepAsync(1.millis)
await sleepAsync(1.nanos)
await sleepAsync(1.nanos)
except CancelledError:
discard # do not propagate as captureOutput was asyncSpawned
except AsyncStreamError as e:

View File

@ -75,8 +75,8 @@ type
logsDir: string
TestOutput = ref object
stdOut*: string
stdErr*: string
stdOut*: seq[string]
stdErr*: seq[string]
exitCode*: ?int
TestManagerError* = object of CatchableError
@ -135,7 +135,7 @@ func isErrorLike(output: ?!TestOutput): bool =
# "LevelDB already build" line and blank line that is output to stdout. This
# typically means that the exitCode == 1 (test failed) and if stdout is short,
# we're dealing with an error
o =? output and o.stdOut.countLines() < 3
o =? output and o.stdOut.len < 3
proc new*(
_: type TestManager,
@ -276,29 +276,29 @@ proc printResult(test: IntegrationTest, printStdOut, printStdErr: bool) =
if output =? test.output:
if printStdErr:
test.printOutputMarker(MarkerPosition.Start, "test file errors (stderr)")
echo output.stdErr
echo output.stdErr.join("\n")
test.printOutputMarker(MarkerPosition.Finish, "test file errors (stderr)")
of IntegrationTestStatus.Failed:
if output =? test.output:
if printStdErr:
test.printOutputMarker(MarkerPosition.Start, "test file errors (stderr)")
echo output.stdErr
echo output.stdErr.join("\n")
test.printOutputMarker(MarkerPosition.Finish, "test file errors (stderr)")
if printStdOut:
test.printOutputMarker(MarkerPosition.Start, "codex node output (stdout)")
echo output.stdOut.colorise
echo output.stdOut.join("\n").colorise
test.printOutputMarker(MarkerPosition.Finish, "codex node output (stdout)")
test.printResult(fgRed)
of IntegrationTestStatus.Timeout:
if printStdOut and output =? test.output:
test.printOutputMarker(MarkerPosition.Start, "codex node output (stdout)")
echo output.stdOut.colorise
echo output.stdOut.join("\n").colorise
test.printOutputMarker(MarkerPosition.Finish, "codex node output (stdout)")
test.printResult(fgYellow)
of IntegrationTestStatus.Ok:
if printStdOut and output =? test.output:
test.printOutputMarker(MarkerPosition.Start, "codex node output (stdout)")
echo output.stdOut.colorise
echo output.stdOut.join("\n").colorise
test.printOutputMarker(MarkerPosition.Finish, "codex node output (stdout)")
test.printResult(fgGreen)
@ -404,11 +404,7 @@ proc teardownHardhat(test: IntegrationTest, hardhat: Hardhat) {.async: (raises:
test.manager.hardhats.keepItIf(it != hardhat)
proc teardownTest(
test: IntegrationTest,
stdOutStream, stdErrStream:
Future[seq[byte]].Raising([CancelledError, AsyncStreamError]),
) {.async: (raises: []).} =
proc teardownTest(test: IntegrationTest) {.async: (raises: []).} =
logScope:
test = test.config.name
@ -428,20 +424,15 @@ proc teardownTest(
warn "Test process failed to terminate, check for zombies", error = e.msg
try:
trace "Reading stdout and stderr streams"
output.stdOut = string.fromBytes(await noCancel stdOutStream.wait(1.seconds))
output.stdErr = string.fromBytes(await noCancel stdErrStream.wait(1.seconds))
test.output = success output
trace "Writing stdout and/or stderr streams to file",
stdoutFile = test.logFile("stdout.log"),
stdoutBytes = output.stdOut.len,
stdoutLines = output.stdOut.len,
stderrFile = test.logFile("stderr.log"),
stderrBytes = output.stdErr.len
test.logFile("stdout.log").appendFile(output.stdOut.stripAnsi)
stderrLines = output.stdErr.len
test.logFile("stdout.log").appendFile(output.stdOut.join("\n").stripAnsi)
if test.status == IntegrationTestStatus.Error or
(test.status == IntegrationTestStatus.Failed and test.output.isErrorLike):
test.logFile("stderr.log").appendFile(output.stdErr.stripAnsi)
test.logFile("stderr.log").appendFile(output.stdErr.join("\n").stripAnsi)
except AsyncTimeoutError:
error "Timeout waiting for stdout or stderr stream contents, nothing will be written to file"
except IOError as e:
@ -450,23 +441,15 @@ proc teardownTest(
error "Failed to read test process output stream", error = e.msg
test.output = TestOutput.failure(e)
test.status = IntegrationTestStatus.Error
finally:
await stdOutStream.cancelAndWait()
await stdErrStream.cancelAndWait()
await test.process.closeWait()
trace "Test process output streams closed"
proc teardown(
test: IntegrationTest,
hardhat: ?Hardhat,
stdOutStream, stdErrStream:
Future[seq[byte]].Raising([CancelledError, AsyncStreamError]),
) {.async: (raises: []).} =
proc teardown(test: IntegrationTest, hardhat: ?Hardhat) {.async: (raises: []).} =
if test.config.startHardhat and hardhat =? hardhat and not hardhat.process.isNil:
await test.teardownHardhat(hardhat)
await test.teardownTest(stdOutStream, stdErrStream)
await test.teardownTest()
proc untilTimeout(
fut: InternalRaisesFuture, timeout: Duration
@ -495,6 +478,35 @@ proc untilTimeout(
else: # timeout
raise newException(AsyncTimeoutError, "Timed out")
proc captureOutput(
process: AsyncProcessRef, stream: AsyncStreamReader
): Future[seq[string]] {.async: (raises: [CancelledError]).} =
var output: seq[string] = @[]
try:
while process.running.option == some true:
while (let line = await stream.readLine(0, "\n"); line != ""):
output.add line
await sleepAsync(1.nanos)
await sleepAsync(1.nanos)
return output
except CancelledError as e:
raise e
except AsyncStreamError as e:
error "Error reading output stream", error = e.msg
proc captureProcessOutput(
test: IntegrationTest
): Future[(seq[string], seq[string])] {.async: (raises: [CancelledError]).} =
logScope:
name = test.config.name
trace "Reading stdout and stderr streams from test process"
let futStdOut = test.process.captureOutput(test.process.stdoutStream)
let futStdErr = test.process.captureOutput(test.process.stderrStream)
await allFutures(futStdOut, futStdErr)
return (await futStdOut, await futStdErr)
proc start(test: IntegrationTest) {.async: (raises: []).} =
logScope:
name = test.config.name
@ -531,6 +543,7 @@ proc start(test: IntegrationTest) {.async: (raises: []).} =
trace "Starting parallel integration test",
command = test.command, timeout = test.manager.config.testTimeout
test.printStart()
try:
test.process = await startProcess(
command = test.command,
@ -545,17 +558,17 @@ proc start(test: IntegrationTest) {.async: (raises: []).} =
test.status = IntegrationTestStatus.Error
return
var output = TestOutput.new()
test.output = success(output)
let outputReader = test.process.stdoutStream.read()
let errorReader = test.process.stderrStream.read()
var futCaptureOutput: Future[(seq[string], seq[string])].Raising([])
defer:
# called at the end of successful runs but also when `start` is cancelled
# (from `untilTimeout`) due to a timeout. This defer runs first before
# `untilTimeout` exceptions are handled in `run`
await test.teardown(hardhat, outputReader, errorReader)
# doesn't raise CancelledError, so noCancel not needed
await test.teardown(hardhat) # doesn't raise CancelledError, so noCancel not needed
await futCaptureOutput.cancelAndWait()
var output = TestOutput.new()
test.output = success(output)
futCaptureOutput = noCancel test.captureProcessOutput()
output.exitCode =
try:
@ -573,6 +586,10 @@ proc start(test: IntegrationTest) {.async: (raises: []).} =
error "Test failed to complete", error = e.msg
return
let (stdOut, stdErr) = await futCaptureOutput
output.stdOut = stdOut
output.stdErr = stdErr
test.status =
if output.exitCode == some QuitSuccess:
IntegrationTestStatus.Ok