Eric ec5cf2ad96
terminate process before waiting for exit with timeout
solved an issue on windows where the processes were not exiting cleanly and causing an attempted write to the dht db after the data dir had already been deleted
2023-12-06 15:41:18 +11:00

166 lines
4.4 KiB
Nim

import pkg/questionable
import pkg/questionable/results
import pkg/confutils
import pkg/chronicles
import pkg/libp2p
import std/os
import std/strutils
import codex/conf
import codex/utils/exceptions
import codex/utils/trackedfutures
import ./codexclient
export codexclient
export chronicles
logScope:
topics = "integration testing node process"
type
NodeProcess* = ref object of RootObj
process*: AsyncProcessRef
arguments*: seq[string]
debug: bool
trackedFutures*: TrackedFutures
name*: string
method workingDir(node: NodeProcess): string {.base.} =
raiseAssert "[workingDir] not implemented"
method executable(node: NodeProcess): string {.base.} =
raiseAssert "[executable] not implemented"
method startedOutput(node: NodeProcess): string {.base.} =
raiseAssert "[startedOutput] not implemented"
method processOptions(node: NodeProcess): set[AsyncProcessOption] {.base.} =
raiseAssert "[processOptions] not implemented"
method outputLineEndings(node: NodeProcess): string {.base.} =
raiseAssert "[outputLineEndings] not implemented"
method onOutputLineCaptured(node: NodeProcess, line: string) {.base.} =
raiseAssert "[onOutputLineCaptured] not implemented"
method start*(node: NodeProcess) {.base, async.} =
logScope:
nodeName = node.name
let poptions = node.processOptions + {AsyncProcessOption.StdErrToStdOut}
trace "starting node",
args = node.arguments,
executable = node.executable,
workingDir = node.workingDir,
processOptions = poptions
try:
node.process = await startProcess(
node.executable,
node.workingDir,
node.arguments,
options = poptions,
stdoutHandle = AsyncProcess.Pipe
)
except CatchableError as e:
error "failed to start node process", error = e.msg
proc captureOutput(
node: NodeProcess,
output: string,
started: Future[void]
) {.async.} =
logScope:
nodeName = node.name
trace "waiting for output", output
let stream = node.process.stdoutStream
try:
while node.process.running.option == some true:
while(let line = await stream.readLine(0, node.outputLineEndings); line != ""):
if node.debug:
# would be nice if chronicles could parse and display with colors
echo line
if not started.isNil and not started.finished and line.contains(output):
started.complete()
node.onOutputLineCaptured(line)
await sleepAsync(1.millis)
await sleepAsync(1.millis)
except AsyncStreamReadError as e:
error "error reading output stream", error = e.msgDetail
proc startNode*[T: NodeProcess](
_: type T,
args: seq[string],
debug: string | bool = false,
name: string
): Future[T] {.async.} =
## Starts a Codex Node with the specified arguments.
## Set debug to 'true' to see output of the node.
let node = T(
arguments: @args,
debug: ($debug != "false"),
trackedFutures: TrackedFutures.new(),
name: name
)
await node.start()
return node
method stop*(node: NodeProcess) {.base, async.} =
logScope:
nodeName = node.name
await node.trackedFutures.cancelTracked()
if node.process != nil:
try:
trace "terminating node process..."
if errCode =? node.process.terminate().errorOption:
error "failed to terminate process", errCode
trace "waiting for node process to exit"
let exitCode = await node.process.waitForExit(3.seconds)
if exitCode > 0:
error "failed to exit process, check for zombies", exitCode
trace "closing node process' streams"
await node.process.closeWait()
except CatchableError as e:
error "error stopping node process", error = e.msg
finally:
node.process = nil
trace "node stopped"
proc waitUntilStarted*(node: NodeProcess) {.async.} =
logScope:
nodeName = node.name
trace "waiting until node started"
let started = newFuture[void]()
try:
discard node.captureOutput(node.startedOutput, started).track(node)
await started.wait(5.seconds)
except AsyncTimeoutError as e:
# attempt graceful shutdown in case node was partially started, prevent
# zombies
await node.stop()
raiseAssert "node did not output '" & node.startedOutput & "'"
proc restart*(node: NodeProcess) {.async.} =
await node.stop()
await node.start()
await node.waitUntilStarted()
method removeDataDir*(node: NodeProcess) {.base.} =
raiseAssert "[removeDataDir] not implemented"