mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-08 08:23:07 +00:00
Refactor process launch
Refactor to keep a process handle so that the processes can be stopped on timeout.
This commit is contained in:
parent
0309cfd063
commit
ca62539e9c
@ -129,14 +129,14 @@ method stop*(
|
||||
let exitCode = await noCancel node.process.terminateAndWaitForExit(2.seconds)
|
||||
if exitCode > 0 and exitCode != 143 and # 143 = SIGTERM (initiated above)
|
||||
exitCode != expectedErrCode:
|
||||
error "process exited with a non-zero exit code", exitCode
|
||||
warn "process exited with a non-zero exit code", exitCode
|
||||
trace "node stopped", exitCode
|
||||
except CatchableError:
|
||||
try:
|
||||
let forcedExitCode = await noCancel node.process.killAndWaitForExit(3.seconds)
|
||||
trace "node process forcibly killed with exit code: ", exitCode = forcedExitCode
|
||||
except CatchableError as e:
|
||||
error "failed to kill node process in time, it will be killed when the parent process exits",
|
||||
warn "failed to kill node process in time, it will be killed when the parent process exits",
|
||||
error = e.msg
|
||||
writeStackTrace()
|
||||
finally:
|
||||
|
||||
@ -66,17 +66,23 @@ type
|
||||
IntegrationTest = ref object
|
||||
manager: TestManager
|
||||
config: IntegrationTestConfig
|
||||
process: Future[CommandExResponse].Raising(
|
||||
[AsyncProcessError, AsyncProcessTimeoutError, CancelledError]
|
||||
)
|
||||
# process: Future[CommandExResponse].Raising(
|
||||
# [AsyncProcessError, AsyncProcessTimeoutError, CancelledError]
|
||||
# )
|
||||
process: AsyncProcessRef
|
||||
timeStart: ?Moment
|
||||
timeEnd: ?Moment
|
||||
output: ?!CommandExResponse
|
||||
output: ?!TestOutput
|
||||
testId: string # when used in datadir path, prevents data dir clashes
|
||||
status: IntegrationTestStatus
|
||||
command: string
|
||||
logsDir: string
|
||||
|
||||
TestOutput = ref object
|
||||
stdOut*: string
|
||||
stdErr*: string
|
||||
exitCode*: ?int
|
||||
|
||||
TestManagerError* = object of CatchableError
|
||||
|
||||
Border {.pure.} = enum
|
||||
@ -261,23 +267,23 @@ proc printResult(
|
||||
if output =? test.output:
|
||||
if printStdErr: #manager.debugTestHarness
|
||||
test.printOutputMarker(MarkerPosition.Start, "test file errors (stderr)")
|
||||
echo output.stdError
|
||||
echo output.stdErr
|
||||
test.printOutputMarker(MarkerPosition.Finish, "test file errors (stderr)")
|
||||
if printStdOut:
|
||||
test.printOutputMarker(MarkerPosition.Start, "codex node output (stdout)")
|
||||
echo output.stdOutput
|
||||
test.printOutputMarker(MarkerPosition.Finish, "codex node output (stdout)")
|
||||
# if printStdOut:
|
||||
test.printOutputMarker(MarkerPosition.Start, "codex node output (stdout)")
|
||||
echo output.stdOut
|
||||
test.printOutputMarker(MarkerPosition.Finish, "codex node output (stdout)")
|
||||
test.printResult(fgRed)
|
||||
of IntegrationTestStatus.Timeout:
|
||||
if printStdOut and output =? test.output:
|
||||
test.printOutputMarker(MarkerPosition.Start, "codex node output (stdout)")
|
||||
echo output.stdOutput
|
||||
echo output.stdOut
|
||||
test.printOutputMarker(MarkerPosition.Finish, "codex node output (stdout)")
|
||||
test.printResult(fgYellow)
|
||||
of IntegrationTestStatus.Ok:
|
||||
if printStdOut and output =? test.output:
|
||||
test.printOutputMarker(MarkerPosition.Start, "codex node output (stdout)")
|
||||
echo output.stdOutput
|
||||
echo output.stdOut
|
||||
test.printOutputMarker(MarkerPosition.Finish, "codex node output (stdout)")
|
||||
test.printResult(fgGreen)
|
||||
|
||||
@ -394,26 +400,35 @@ proc teardown(
|
||||
test.manager.hardhats.keepItIf(it != hardhat)
|
||||
|
||||
proc untilTimeout(
|
||||
fut: FutureBase, timeout: Duration
|
||||
): Future[bool] {.async: (raises: [CancelledError]).} =
|
||||
# workaround for withTimeout, which did not work correctly
|
||||
fut: InternalRaisesFuture, timeout: Duration
|
||||
): Future[void] {.async: (raises: [CancelledError, AsyncTimeoutError]).} =
|
||||
## Returns a Future that completes when either fut finishes or timeout elapses,
|
||||
## or if they finish at the same time. If timeout elapses, an AsyncTimeoutError
|
||||
## is raised. If fut fails, its error is raised.
|
||||
|
||||
let timer = sleepAsync(timeout)
|
||||
try:
|
||||
let winner = await race(fut, timer)
|
||||
return winner.id == fut.id
|
||||
except CancelledError as e:
|
||||
defer:
|
||||
# called even when exception raised
|
||||
# race does not cancel its futures when it's cancelled
|
||||
if not fut.isNil and not fut.finished:
|
||||
await fut.cancelAndWait()
|
||||
if not timer.isNil and not timer.finished:
|
||||
await timer.cancelAndWait()
|
||||
raise e
|
||||
await fut.cancelAndWait()
|
||||
await timer.cancelAndWait()
|
||||
|
||||
try:
|
||||
discard await race(fut, timer)
|
||||
except ValueError as e:
|
||||
error "failed to wait for timeout", error = e.msg
|
||||
raiseAssert "should not happen"
|
||||
|
||||
if fut.finished(): # or fut and timer both finished simultaneously
|
||||
if fut.failed():
|
||||
await fut # raise fut error
|
||||
return # unreachable, for readability
|
||||
else: # timeout
|
||||
raise newException(AsyncTimeoutError, "Timed out")
|
||||
|
||||
proc start(test: IntegrationTest) {.async: (raises: []).} =
|
||||
logScope:
|
||||
config = test.config
|
||||
name = test.config.name
|
||||
duration = test.duration
|
||||
|
||||
trace "Running test"
|
||||
|
||||
@ -422,6 +437,9 @@ proc start(test: IntegrationTest) {.async: (raises: []).} =
|
||||
try:
|
||||
createDir(test.logsDir)
|
||||
except CatchableError as e:
|
||||
test.timeEnd = some Moment.now()
|
||||
test.status = IntegrationTestStatus.Error
|
||||
test.output = TestOutput.failure(e)
|
||||
error "failed to create test log dir", logDir = test.logsDir, error = e.msg
|
||||
|
||||
test.timeStart = some Moment.now()
|
||||
@ -433,61 +451,104 @@ proc start(test: IntegrationTest) {.async: (raises: []).} =
|
||||
try:
|
||||
hardhat = await test.setup()
|
||||
except TestManagerError as e:
|
||||
error "Failed to start hardhat and build command", error = e.msg
|
||||
test.timeEnd = some Moment.now()
|
||||
test.status = IntegrationTestStatus.Error
|
||||
test.output = CommandExResponse.failure(e)
|
||||
test.output = TestOutput.failure(e)
|
||||
error "Failed to start hardhat and build command", error = e.msg
|
||||
return
|
||||
|
||||
trace "Starting parallel integration test",
|
||||
command = test.command, timeout = test.manager.testTimeout
|
||||
test.printStart()
|
||||
try:
|
||||
trace "Starting parallel integration test", command = test.command
|
||||
test.printStart()
|
||||
test.process =
|
||||
execCommandEx(command = test.command, timeout = test.manager.testTimeout)
|
||||
|
||||
let completedBeforeTimeout =
|
||||
# untilTimeout will cancel its underlying futures for us so no need to
|
||||
# manually cancel them when cancelled
|
||||
await test.process.untilTimeout(test.manager.testTimeout)
|
||||
|
||||
if completedBeforeTimeout:
|
||||
let output = await test.process # should raise if there's an error
|
||||
test.output = success(output)
|
||||
if output.status != 0:
|
||||
test.status = IntegrationTestStatus.Failed
|
||||
else:
|
||||
test.status = IntegrationTestStatus.Ok
|
||||
else:
|
||||
test.timeEnd = some Moment.now()
|
||||
error "Test timed out, check for zombie codex process",
|
||||
name = test.config.name, duration = test.duration
|
||||
let e = newException(
|
||||
AsyncProcessTimeoutError, "Test did not complete before elapsed timeout"
|
||||
)
|
||||
test.output = CommandExResponse.failure(e)
|
||||
test.status = IntegrationTestStatus.Timeout
|
||||
|
||||
if not test.process.isNil and not test.process.finished:
|
||||
# cancel the process future, but the process itself may still be
|
||||
# running if the procedure was cancelled or the test timed out
|
||||
test.process.cancelSoon()
|
||||
trace "process future will cancel soon"
|
||||
|
||||
await test.teardown(hardhat)
|
||||
test.process = await startProcess(
|
||||
command = test.command,
|
||||
# arguments = test.command.split(" "),
|
||||
options = {AsyncProcessOption.EvalCommand},
|
||||
stdoutHandle = AsyncProcess.Pipe,
|
||||
stderrHandle = AsyncProcess.Pipe,
|
||||
)
|
||||
except AsyncProcessError as e:
|
||||
test.timeEnd = some Moment.now()
|
||||
error "Failed to start test process", error = e.msg
|
||||
test.output = TestOutput.failure(e)
|
||||
test.status = IntegrationTestStatus.Error
|
||||
return
|
||||
|
||||
defer:
|
||||
trace "Tearing down test"
|
||||
await noCancel test.teardown(hardhat)
|
||||
test.timeEnd = some Moment.now()
|
||||
if test.status == IntegrationTestStatus.Ok:
|
||||
info "Test completed", name = test.config.name, duration = test.duration
|
||||
except AsyncProcessTimeoutError as e:
|
||||
|
||||
if not test.process.isNil:
|
||||
if test.process.running |? false:
|
||||
var output = test.output.expect("should have output value")
|
||||
trace "Terminating test process"
|
||||
try:
|
||||
output.exitCode =
|
||||
some (await noCancel test.process.terminateAndWaitForExit(500.millis))
|
||||
test.output = success output
|
||||
except AsyncProcessError, AsyncProcessTimeoutError:
|
||||
warn "Test process failed to terminate, check for zombies"
|
||||
|
||||
await test.process.closeWait()
|
||||
|
||||
let outputReader = test.process.stdoutStream.read()
|
||||
let errorReader = test.process.stderrStream.read()
|
||||
|
||||
var output = TestOutput.new()
|
||||
test.output = success(output)
|
||||
output.exitCode =
|
||||
try:
|
||||
some (await test.process.waitForExit(test.manager.testTimeout))
|
||||
except AsyncProcessTimeoutError as e:
|
||||
test.timeEnd = some Moment.now()
|
||||
test.status = IntegrationTestStatus.Timeout
|
||||
error "Test process failed to exit before timeout",
|
||||
timeout = test.manager.testTimeout
|
||||
return
|
||||
except AsyncProcessError as e:
|
||||
test.timeEnd = some Moment.now()
|
||||
test.status = IntegrationTestStatus.Error
|
||||
test.output = TestOutput.failure(e)
|
||||
error "Test failed to complete", error = e.msg
|
||||
return
|
||||
|
||||
test.status =
|
||||
if output.exitCode == some QuitSuccess:
|
||||
IntegrationTestStatus.Ok
|
||||
else:
|
||||
IntegrationTestStatus.Failed
|
||||
|
||||
try:
|
||||
output.stdOut = string.fromBytes(await outputReader)
|
||||
output.stdErr = string.fromBytes(await errorReader)
|
||||
except AsyncStreamError as e:
|
||||
test.timeEnd = some Moment.now()
|
||||
error "Test timed out", name = test.config.name, duration = test.duration
|
||||
test.output = CommandExResponse.failure(e)
|
||||
test.status = IntegrationTestStatus.Timeout
|
||||
except AsyncProcessError as e:
|
||||
test.timeEnd = some Moment.now()
|
||||
error "Test failed to complete", name = test.config.name, duration = test.duration
|
||||
test.output = CommandExResponse.failure(e)
|
||||
error "Failed to read test process output stream", error = e.msg
|
||||
test.output = TestOutput.failure(e)
|
||||
test.status = IntegrationTestStatus.Error
|
||||
return
|
||||
|
||||
# let processRunning = test.process.waitForExit(test.manager.testTimeout)
|
||||
# trace "Running test until timeout", timeout = test.manager.testTimeout
|
||||
# let completedBeforeTimeout =
|
||||
# await processRunning.withTimeout(test.manager.testTimeout)
|
||||
|
||||
# if completedBeforeTimeout:
|
||||
|
||||
# else: # timed out
|
||||
# test.timeEnd = some Moment.now()
|
||||
# test.status = IntegrationTestStatus.Timeout
|
||||
# error "Test timed out, terminating process"
|
||||
# process will be terminated in defer
|
||||
|
||||
# try:
|
||||
# output.exitCode = some(await test.process.terminateAndWaitForExit(100.millis))
|
||||
# except AsyncProcessError, AsyncProcessTimeoutError:
|
||||
# warn "Test process failed to terminate, check for zombies"
|
||||
|
||||
proc continuallyShowUpdates(manager: TestManager) {.async: (raises: []).} =
|
||||
ignoreCancelled:
|
||||
@ -509,7 +570,18 @@ proc continuallyShowUpdates(manager: TestManager) {.async: (raises: []).} =
|
||||
|
||||
proc run(test: IntegrationTest) {.async: (raises: []).} =
|
||||
ignoreCancelled:
|
||||
await test.start()
|
||||
let futStart = test.start()
|
||||
# await futStart
|
||||
|
||||
try:
|
||||
await futStart.untilTimeout(test.manager.testTimeout)
|
||||
except AsyncTimeoutError:
|
||||
# if output =? test.output and output.exitCode.isNone: # timeout
|
||||
error "Test timed out"
|
||||
test.timeEnd = some Moment.now()
|
||||
test.status = IntegrationTestStatus.Timeout
|
||||
# await futStart.cancelAndWait()
|
||||
|
||||
test.printResult()
|
||||
|
||||
proc runTests(manager: TestManager) {.async: (raises: [CancelledError]).} =
|
||||
@ -527,18 +599,12 @@ proc runTests(manager: TestManager) {.async: (raises: [CancelledError]).} =
|
||||
|
||||
let futRun = test.run()
|
||||
testFutures.add futRun
|
||||
# may be overkill, but ensure no exceptions are missed
|
||||
asyncSpawn futRun
|
||||
|
||||
try:
|
||||
# if runTests is cancelled, await allFutures will be cancelled, but allFutures
|
||||
# does not propagate the cancellation to the futures it's waiting on, so we
|
||||
# need to cancel them here
|
||||
await allFutures testFutures
|
||||
except CancelledError as e:
|
||||
defer:
|
||||
for fut in testFutures:
|
||||
if not fut.isNil and not fut.finished:
|
||||
await fut.cancelAndWait()
|
||||
await fut.cancelAndWait()
|
||||
|
||||
await allFutures testFutures
|
||||
|
||||
manager.timeEnd = some Moment.now()
|
||||
|
||||
@ -649,16 +715,28 @@ proc start*(
|
||||
manager.printResult()
|
||||
|
||||
proc stop*(manager: TestManager) {.async: (raises: [CancelledError]).} =
|
||||
trace "[stop] START canelling tracked"
|
||||
await manager.trackedFutures.cancelTracked()
|
||||
trace "[stop] DONE cancelling tracked"
|
||||
|
||||
trace "[stop] stopping running processes"
|
||||
for test in manager.tests:
|
||||
if not test.process.isNil and not test.process.finished:
|
||||
# windows does not like cancelling processes, so waiting is not an option
|
||||
test.process.cancelSoon()
|
||||
if not test.process.isNil and test.process.running |? false:
|
||||
try:
|
||||
trace "[stop] terminating process", name = test.config.name
|
||||
discard await test.process.terminateAndWaitForExit(100.millis)
|
||||
except AsyncProcessError, AsyncProcessTimeoutError:
|
||||
warn "Test process failed to terminate, ignoring...", name = test.config.name
|
||||
finally:
|
||||
await test.process.closeWait()
|
||||
|
||||
trace "[stop] stopping hardhats"
|
||||
for hardhat in manager.hardhats:
|
||||
try:
|
||||
trace "[stop] stopping hardhat"
|
||||
if not hardhat.process.isNil:
|
||||
await hardhat.process.stop()
|
||||
await noCancel hardhat.process.stop()
|
||||
except CatchableError as e:
|
||||
trace "failed to stop hardhat node", error = e.msg
|
||||
|
||||
trace "[stop] done stopping hardhats"
|
||||
|
||||
@ -39,7 +39,7 @@ const TestTimeout {.intdefine.} = 60
|
||||
|
||||
const EnableParallelTests {.booldefine.} = true
|
||||
|
||||
proc run() {.async.} =
|
||||
proc run(): Future[bool] {.async: (raises: []).} =
|
||||
let manager = TestManager.new(
|
||||
configs = TestConfigs,
|
||||
DebugTestHarness,
|
||||
@ -51,18 +51,27 @@ proc run() {.async.} =
|
||||
try:
|
||||
trace "starting test manager"
|
||||
await manager.start()
|
||||
except TestManagerError as e:
|
||||
error "Failed to run test manager", error = e.msg
|
||||
return false
|
||||
except CancelledError:
|
||||
return
|
||||
finally:
|
||||
trace "stopping test manager"
|
||||
await manager.stop()
|
||||
await noCancel manager.stop()
|
||||
trace "test manager stopped"
|
||||
|
||||
without wasSuccessful =? manager.allTestsPassed, error:
|
||||
raiseAssert "Failed to get test status: " & error.msg
|
||||
|
||||
if not wasSuccessful:
|
||||
quit(1) # indicate with a non-zero exit code that the tests failed
|
||||
return wasSuccessful
|
||||
|
||||
when EnableParallelTests:
|
||||
waitFor run()
|
||||
let wasSuccessful = waitFor run()
|
||||
trace "[testIntegration] wasSuccessful", wasSuccessful
|
||||
trace "[testIntegration] AFTER run"
|
||||
if not wasSuccessful:
|
||||
quit(QuitFailure) # indicate with a non-zero exit code that the tests failed
|
||||
else:
|
||||
# run tests serially
|
||||
import ./integration/testcli
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user