mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-08 08:23:07 +00:00
Add proper async exception tracking to multinodesuite
This commit is contained in:
parent
faaddb879b
commit
bd68388d2a
@ -85,7 +85,7 @@ proc client*(node: CodexProcess): CodexClient {.raises: [CodexProcessError].} =
|
||||
node.client = some client
|
||||
return client
|
||||
|
||||
method stop*(node: CodexProcess) {.async.} =
|
||||
method stop*(node: CodexProcess) {.async: (raises: []).} =
|
||||
logScope:
|
||||
nodeName = node.name
|
||||
|
||||
|
||||
@ -101,7 +101,7 @@ proc startNode*(
|
||||
debug: string | bool = false,
|
||||
name: string,
|
||||
onOutputLineCaptured: OnOutputLineCaptured = nil,
|
||||
): Future[HardhatProcess] {.async.} =
|
||||
): Future[HardhatProcess] {.async: (raises: [CancelledError, NodeProcessError]).} =
|
||||
logScope:
|
||||
nodeName = name
|
||||
|
||||
@ -132,7 +132,7 @@ proc startNode*(
|
||||
|
||||
return hardhat
|
||||
|
||||
method onOutputLineCaptured(node: HardhatProcess, line: string) {.raises: [].} =
|
||||
method onOutputLineCaptured(node: HardhatProcess, line: string) =
|
||||
logScope:
|
||||
nodeName = node.name
|
||||
|
||||
@ -147,7 +147,7 @@ method onOutputLineCaptured(node: HardhatProcess, line: string) {.raises: [].} =
|
||||
discard logFile.closeFile()
|
||||
node.logFile = none IoHandle
|
||||
|
||||
method stop*(node: HardhatProcess) {.async.} =
|
||||
method stop*(node: HardhatProcess) {.async: (raises: []).} =
|
||||
# terminate the process
|
||||
await procCall NodeProcess(node).stop()
|
||||
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
import std/httpclient
|
||||
import std/os
|
||||
import std/sequtils
|
||||
import std/strutils
|
||||
@ -25,6 +26,8 @@ export hardhatconfig
|
||||
export codexconfig
|
||||
export nodeconfigs
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
type
|
||||
RunningNode* = ref object
|
||||
role*: Role
|
||||
@ -37,6 +40,7 @@ type
|
||||
Hardhat
|
||||
|
||||
MultiNodeSuiteError = object of CatchableError
|
||||
SuiteTimeoutError = object of MultiNodeSuiteError
|
||||
|
||||
const HardhatPort {.intdefine.}: int = 8545
|
||||
const CodexApiPort {.intdefine.}: int = 8080
|
||||
@ -45,7 +49,9 @@ const TestId {.strdefine.}: string = "TestId"
|
||||
const DebugCodexNodes {.booldefine.}: bool = false
|
||||
const LogsDir {.strdefine.}: string = ""
|
||||
|
||||
proc raiseMultiNodeSuiteError(msg: string, parent: ref CatchableError = nil) =
|
||||
proc raiseMultiNodeSuiteError(
|
||||
msg: string, parent: ref CatchableError = nil
|
||||
) {.raises: [MultiNodeSuiteError].} =
|
||||
raise newException(MultiNodeSuiteError, msg, parent)
|
||||
|
||||
template withLock(lock: AsyncLock, body: untyped) =
|
||||
@ -98,6 +104,7 @@ template multinodesuite*(name: string, body: untyped) =
|
||||
var lastUsedCodexApiPort = CodexApiPort
|
||||
var lastUsedCodexDiscPort = CodexDiscPort
|
||||
var codexPortLock: AsyncLock
|
||||
var futTimeout: Future[void]
|
||||
|
||||
template test(tname, startNodeConfigs, tbody) =
|
||||
currentTestName = tname
|
||||
@ -111,12 +118,25 @@ template multinodesuite*(name: string, body: untyped) =
|
||||
|
||||
proc newHardhatProcess(
|
||||
config: HardhatConfig, role: Role
|
||||
): Future[NodeProcess] {.async.} =
|
||||
): Future[NodeProcess] {.async: (raises: [MultiNodeSuiteError, CancelledError]).} =
|
||||
var args: seq[string] = @[]
|
||||
if config.logFile:
|
||||
let updatedLogFile =
|
||||
getLogFile(LogsDir, starttime, name, currentTestName, $role, none int)
|
||||
args.add "--log-file=" & updatedLogFile
|
||||
try:
|
||||
let updatedLogFile =
|
||||
getLogFile(LogsDir, starttime, name, currentTestName, $role, none int)
|
||||
args.add "--log-file=" & updatedLogFile
|
||||
except IOError as e:
|
||||
raiseMultiNodeSuiteError(
|
||||
"failed to start hardhat because logfile path could not be obtained: " &
|
||||
e.msg,
|
||||
e,
|
||||
)
|
||||
except OSError as e:
|
||||
raiseMultiNodeSuiteError(
|
||||
"failed to start hardhat because logfile path could not be obtained: " &
|
||||
e.msg,
|
||||
e,
|
||||
)
|
||||
|
||||
let port = await nextFreePort(lastUsedHardhatPort)
|
||||
jsonRpcProviderUrl.updatePort(port)
|
||||
@ -134,7 +154,7 @@ template multinodesuite*(name: string, body: untyped) =
|
||||
|
||||
proc newCodexProcess(
|
||||
roleIdx: int, conf: CodexConfig, role: Role
|
||||
): Future[NodeProcess] {.async.} =
|
||||
): Future[NodeProcess] {.async: (raises: [MultiNodeSuiteError, CancelledError]).} =
|
||||
let nodeIdx = running.len
|
||||
var config = conf
|
||||
|
||||
@ -148,9 +168,22 @@ template multinodesuite*(name: string, body: untyped) =
|
||||
|
||||
try:
|
||||
if config.logFile.isSome or DebugCodexNodes:
|
||||
let updatedLogFile =
|
||||
getLogFile(LogsDir, starttime, name, currentTestName, $role, some roleIdx)
|
||||
config.withLogFile(updatedLogFile)
|
||||
try:
|
||||
let updatedLogFile =
|
||||
getLogFile(LogsDir, starttime, name, currentTestName, $role, some roleIdx)
|
||||
config.withLogFile(updatedLogFile)
|
||||
except IOError as e:
|
||||
raiseMultiNodeSuiteError(
|
||||
"failed to start " & $role &
|
||||
" because logfile path could not be obtained: " & e.msg,
|
||||
e,
|
||||
)
|
||||
except OSError as e:
|
||||
raiseMultiNodeSuiteError(
|
||||
"failed to start " & $role &
|
||||
" because logfile path could not be obtained: " & e.msg,
|
||||
e,
|
||||
)
|
||||
|
||||
if DebugCodexNodes:
|
||||
config.debugEnabled = true
|
||||
@ -172,17 +205,17 @@ template multinodesuite*(name: string, body: untyped) =
|
||||
except CodexConfigError as e:
|
||||
raiseMultiNodeSuiteError "invalid cli option, error: " & e.msg
|
||||
|
||||
let node = await CodexProcess.startNode(
|
||||
config.cliArgs, config.debugEnabled, $role & $roleIdx
|
||||
)
|
||||
|
||||
try:
|
||||
let node = await CodexProcess.startNode(
|
||||
config.cliArgs, config.debugEnabled, $role & $roleIdx
|
||||
)
|
||||
await node.waitUntilStarted()
|
||||
trace "node started", nodeName = $role & $roleIdx
|
||||
return node
|
||||
except CodexConfigError as e:
|
||||
raiseMultiNodeSuiteError "failed to get cli args from config: " & e.msg, e
|
||||
except NodeProcessError as e:
|
||||
raiseMultiNodeSuiteError "node not started, error: " & e.msg
|
||||
|
||||
return node
|
||||
raiseMultiNodeSuiteError "node not started, error: " & e.msg, e
|
||||
|
||||
proc hardhat(): HardhatProcess =
|
||||
for r in running:
|
||||
@ -208,7 +241,9 @@ template multinodesuite*(name: string, body: untyped) =
|
||||
if r.role == Role.Validator:
|
||||
CodexProcess(r.node)
|
||||
|
||||
proc startHardhatNode(config: HardhatConfig): Future[NodeProcess] {.async.} =
|
||||
proc startHardhatNode(
|
||||
config: HardhatConfig
|
||||
): Future[NodeProcess] {.async: (raises: [MultiNodeSuiteError, CancelledError]).} =
|
||||
return await newHardhatProcess(config, Role.Hardhat)
|
||||
|
||||
proc startClientNode(conf: CodexConfig): Future[NodeProcess] {.async.} =
|
||||
@ -220,44 +255,63 @@ template multinodesuite*(name: string, body: untyped) =
|
||||
)
|
||||
return await newCodexProcess(clientIdx, config, Role.Client)
|
||||
|
||||
proc startProviderNode(conf: CodexConfig): Future[NodeProcess] {.async.} =
|
||||
let providerIdx = providers().len
|
||||
var config = conf
|
||||
config.addCliOption(StartUpCmd.persistence, "--eth-provider", jsonRpcProviderUrl)
|
||||
config.addCliOption(
|
||||
StartUpCmd.persistence, "--eth-account", $accounts[running.len]
|
||||
)
|
||||
config.addCliOption(
|
||||
PersistenceCmd.prover, "--circom-r1cs",
|
||||
"vendor/codex-contracts-eth/verifier/networks/hardhat/proof_main.r1cs",
|
||||
)
|
||||
config.addCliOption(
|
||||
PersistenceCmd.prover, "--circom-wasm",
|
||||
"vendor/codex-contracts-eth/verifier/networks/hardhat/proof_main.wasm",
|
||||
)
|
||||
config.addCliOption(
|
||||
PersistenceCmd.prover, "--circom-zkey",
|
||||
"vendor/codex-contracts-eth/verifier/networks/hardhat/proof_main.zkey",
|
||||
)
|
||||
proc startProviderNode(
|
||||
conf: CodexConfig
|
||||
): Future[NodeProcess] {.async: (raises: [MultiNodeSuiteError, CancelledError]).} =
|
||||
try:
|
||||
let providerIdx = providers().len
|
||||
var config = conf
|
||||
config.addCliOption(
|
||||
StartUpCmd.persistence, "--eth-provider", jsonRpcProviderUrl
|
||||
)
|
||||
config.addCliOption(
|
||||
StartUpCmd.persistence, "--eth-account", $accounts[running.len]
|
||||
)
|
||||
config.addCliOption(
|
||||
PersistenceCmd.prover, "--circom-r1cs",
|
||||
"vendor/codex-contracts-eth/verifier/networks/hardhat/proof_main.r1cs",
|
||||
)
|
||||
config.addCliOption(
|
||||
PersistenceCmd.prover, "--circom-wasm",
|
||||
"vendor/codex-contracts-eth/verifier/networks/hardhat/proof_main.wasm",
|
||||
)
|
||||
config.addCliOption(
|
||||
PersistenceCmd.prover, "--circom-zkey",
|
||||
"vendor/codex-contracts-eth/verifier/networks/hardhat/proof_main.zkey",
|
||||
)
|
||||
|
||||
return await newCodexProcess(providerIdx, config, Role.Provider)
|
||||
return await newCodexProcess(providerIdx, config, Role.Provider)
|
||||
except CodexConfigError as e:
|
||||
raiseMultiNodeSuiteError "Failed to start codex node, error adding cli options: " &
|
||||
e.msg, e
|
||||
|
||||
proc startValidatorNode(conf: CodexConfig): Future[NodeProcess] {.async.} =
|
||||
let validatorIdx = validators().len
|
||||
var config = conf
|
||||
config.addCliOption(StartUpCmd.persistence, "--eth-provider", jsonRpcProviderUrl)
|
||||
config.addCliOption(
|
||||
StartUpCmd.persistence, "--eth-account", $accounts[running.len]
|
||||
)
|
||||
config.addCliOption(StartUpCmd.persistence, "--validator")
|
||||
proc startValidatorNode(
|
||||
conf: CodexConfig
|
||||
): Future[NodeProcess] {.async: (raises: [MultiNodeSuiteError, CancelledError]).} =
|
||||
try:
|
||||
let validatorIdx = validators().len
|
||||
var config = conf
|
||||
config.addCliOption(
|
||||
StartUpCmd.persistence, "--eth-provider", jsonRpcProviderUrl
|
||||
)
|
||||
config.addCliOption(
|
||||
StartUpCmd.persistence, "--eth-account", $accounts[running.len]
|
||||
)
|
||||
config.addCliOption(StartUpCmd.persistence, "--validator")
|
||||
|
||||
return await newCodexProcess(validatorIdx, config, Role.Validator)
|
||||
return await newCodexProcess(validatorIdx, config, Role.Validator)
|
||||
except CodexConfigError as e:
|
||||
raiseMultiNodeSuiteError "Failed to start validator node, error adding cli options: " &
|
||||
e.msg, e
|
||||
|
||||
proc teardownImpl() {.async.} =
|
||||
proc teardownImpl() {.async: (raises: []).} =
|
||||
for nodes in @[validators(), clients(), providers()]:
|
||||
for node in nodes:
|
||||
await node.stop() # also stops rest client
|
||||
node.removeDataDir()
|
||||
try:
|
||||
node.removeDataDir()
|
||||
except CodexProcessError as e:
|
||||
error "Failed to remove data dir during teardown", error = e.msg
|
||||
|
||||
# if hardhat was started in the test, kill the node
|
||||
# otherwise revert the snapshot taken in the test setup
|
||||
@ -265,7 +319,10 @@ template multinodesuite*(name: string, body: untyped) =
|
||||
if not hardhat.isNil:
|
||||
await hardhat.stop()
|
||||
else:
|
||||
discard await send(ethProvider, "evm_revert", @[snapshot])
|
||||
try:
|
||||
discard await noCancel send(ethProvider, "evm_revert", @[snapshot])
|
||||
except ProviderError as e:
|
||||
error "Failed to revert hardhat state during teardown", error = e.msg
|
||||
|
||||
await ethProvider.close()
|
||||
|
||||
@ -274,6 +331,8 @@ template multinodesuite*(name: string, body: untyped) =
|
||||
template failAndTeardownOnError(message: string, tryBody: untyped) =
|
||||
try:
|
||||
tryBody
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except CatchableError as er:
|
||||
fatal message, error = er.msg
|
||||
echo "[FATAL] ", message, ": ", er.msg
|
||||
@ -285,18 +344,35 @@ template multinodesuite*(name: string, body: untyped) =
|
||||
|
||||
proc updateBootstrapNodes(
|
||||
node: CodexProcess
|
||||
): Future[void] {.async: (raises: [CatchableError]).} =
|
||||
without ninfo =? await node.client.info():
|
||||
# raise CatchableError instead of Defect (with .get or !) so we
|
||||
# can gracefully shutdown and prevent zombies
|
||||
raiseMultiNodeSuiteError "Failed to get node info"
|
||||
bootstrapNodes.add ninfo["spr"].getStr()
|
||||
): Future[void] {.async: (raises: [MultiNodeSuiteError]).} =
|
||||
try:
|
||||
without ninfo =? await node.client.info():
|
||||
# raise CatchableError instead of Defect (with .get or !) so we
|
||||
# can gracefully shutdown and prevent zombies
|
||||
raiseMultiNodeSuiteError "Failed to get node info"
|
||||
bootstrapNodes.add ninfo["spr"].getStr()
|
||||
except CatchableError as e:
|
||||
raiseMultiNodeSuiteError "Failed to get node info: " & e.msg, e
|
||||
|
||||
setupAll:
|
||||
proc raiseOnTimeout() {.async: (raises: [CancelledError, SuiteTimeoutError]).} =
|
||||
await sleepAsync(chronos.seconds(10))
|
||||
raise newException(SuiteTimeoutError, "suite timed out")
|
||||
|
||||
failAndTeardownOnError "suite timed out":
|
||||
futTimeout = raiseOnTimeout()
|
||||
|
||||
teardownAll:
|
||||
await futTimeout.cancelAndWait()
|
||||
|
||||
setup:
|
||||
if var conf =? nodeConfigs.hardhat:
|
||||
try:
|
||||
let node = await startHardhatNode(conf)
|
||||
let node = await noCancel startHardhatNode(conf)
|
||||
running.add RunningNode(role: Role.Hardhat, node: node)
|
||||
except CancelledError as e:
|
||||
# should not happen because of noCancel, but added for clarity
|
||||
raise e
|
||||
except CatchableError as e:
|
||||
echo "failed to start hardhat node"
|
||||
fail()
|
||||
@ -312,6 +388,8 @@ template multinodesuite*(name: string, body: untyped) =
|
||||
if nodeConfigs.hardhat.isNone:
|
||||
snapshot = await send(ethProvider, "evm_snapshot")
|
||||
accounts = await ethProvider.listAccounts()
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except CatchableError as e:
|
||||
echo "Hardhat not running. Run hardhat manually " &
|
||||
"before executing tests, or include a " & "HardhatConfig in the test setup."
|
||||
@ -321,21 +399,21 @@ template multinodesuite*(name: string, body: untyped) =
|
||||
if var clients =? nodeConfigs.clients:
|
||||
failAndTeardownOnError "failed to start client nodes":
|
||||
for config in clients.configs:
|
||||
let node = await startClientNode(config)
|
||||
let node = await noCancel startClientNode(config)
|
||||
running.add RunningNode(role: Role.Client, node: node)
|
||||
await CodexProcess(node).updateBootstrapNodes()
|
||||
|
||||
if var providers =? nodeConfigs.providers:
|
||||
failAndTeardownOnError "failed to start provider nodes":
|
||||
for config in providers.configs.mitems:
|
||||
let node = await startProviderNode(config)
|
||||
let node = await noCancel startProviderNode(config)
|
||||
running.add RunningNode(role: Role.Provider, node: node)
|
||||
await CodexProcess(node).updateBootstrapNodes()
|
||||
|
||||
if var validators =? nodeConfigs.validators:
|
||||
failAndTeardownOnError "failed to start validator nodes":
|
||||
for config in validators.configs.mitems:
|
||||
let node = await startValidatorNode(config)
|
||||
let node = await noCancel startValidatorNode(config)
|
||||
running.add RunningNode(role: Role.Validator, node: node)
|
||||
|
||||
# ensure that we have a recent block with a fresh timestamp
|
||||
|
||||
@ -47,7 +47,7 @@ method outputLineEndings(node: NodeProcess): string {.base, gcsafe.} =
|
||||
method onOutputLineCaptured(node: NodeProcess, line: string) {.base, gcsafe.} =
|
||||
raiseAssert "not implemented"
|
||||
|
||||
method start*(node: NodeProcess) {.base, async.} =
|
||||
method start*(node: NodeProcess) {.base, async: (raises: [CancelledError]).} =
|
||||
logScope:
|
||||
nodeName = node.name
|
||||
|
||||
@ -104,7 +104,7 @@ proc captureOutput(
|
||||
|
||||
proc startNode*[T: NodeProcess](
|
||||
_: type T, args: seq[string], debug: string | bool = false, name: string
|
||||
): Future[T] {.async.} =
|
||||
): Future[T] {.async: (raises: [CancelledError]).} =
|
||||
## Starts a Codex Node with the specified arguments.
|
||||
## Set debug to 'true' to see output of the node.
|
||||
let node = T(
|
||||
@ -116,7 +116,9 @@ proc startNode*[T: NodeProcess](
|
||||
await node.start()
|
||||
return node
|
||||
|
||||
method stop*(node: NodeProcess, expectedErrCode: int = -1) {.base, async.} =
|
||||
method stop*(
|
||||
node: NodeProcess, expectedErrCode: int = -1
|
||||
) {.base, async: (raises: []).} =
|
||||
logScope:
|
||||
nodeName = node.name
|
||||
|
||||
@ -124,16 +126,14 @@ method stop*(node: NodeProcess, expectedErrCode: int = -1) {.base, async.} =
|
||||
if not node.process.isNil:
|
||||
trace "terminating node process..."
|
||||
try:
|
||||
let exitCode = await node.process.terminateAndWaitForExit(2.seconds)
|
||||
let exitCode = await noCancel node.process.terminateAndWaitForExit(2.seconds)
|
||||
if exitCode > 0 and exitCode != 143 and # 143 = SIGTERM (initiated above)
|
||||
exitCode != expectedErrCode:
|
||||
error "process exited with a non-zero exit code", exitCode
|
||||
trace "node stopped", exitCode
|
||||
except CancelledError as error:
|
||||
raise error
|
||||
except CatchableError:
|
||||
try:
|
||||
let forcedExitCode = await node.process.killAndWaitForExit(3.seconds)
|
||||
let forcedExitCode = await noCancel node.process.killAndWaitForExit(3.seconds)
|
||||
trace "node process forcibly killed with exit code: ", exitCode = forcedExitCode
|
||||
except CatchableError as e:
|
||||
error "failed to kill node process in time, it will be killed when the parent process exits",
|
||||
@ -148,7 +148,9 @@ method stop*(node: NodeProcess, expectedErrCode: int = -1) {.base, async.} =
|
||||
|
||||
asyncSpawn closeProcessStreams()
|
||||
|
||||
proc waitUntilOutput*(node: NodeProcess, output: string) {.async.} =
|
||||
proc waitUntilOutput*(
|
||||
node: NodeProcess, output: string
|
||||
) {.async: (raises: [CancelledError, AsyncTimeoutError]).} =
|
||||
logScope:
|
||||
nodeName = node.name
|
||||
|
||||
@ -158,9 +160,18 @@ proc waitUntilOutput*(node: NodeProcess, output: string) {.async.} =
|
||||
let fut = node.captureOutput(output, started)
|
||||
node.trackedFutures.track(fut)
|
||||
asyncSpawn fut
|
||||
await started.wait(60.seconds) # allow enough time for proof generation
|
||||
try:
|
||||
await started.wait(60.seconds) # allow enough time for proof generation
|
||||
except AsyncTimeoutError as e:
|
||||
raise e
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except CatchableError as e: # unsure where this originates from
|
||||
error "unexpected error occurred waiting for node output", error = e.msg
|
||||
|
||||
proc waitUntilStarted*(node: NodeProcess) {.async.} =
|
||||
proc waitUntilStarted*(
|
||||
node: NodeProcess
|
||||
) {.async: (raises: [CancelledError, NodeProcessError]).} =
|
||||
logScope:
|
||||
nodeName = node.name
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user