mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-19 22:03:08 +00:00
switch to using chronos process handling
- chronos startProcess starts a process and uses an AsyncStream for output which allows us to consume when needed and not needed
This commit is contained in:
parent
8997573d60
commit
428f6d68fb
@ -28,7 +28,7 @@ type
|
||||
HardhatProcess* = ref object of NodeProcess
|
||||
logFile: ?IoHandle
|
||||
started: Future[void]
|
||||
trackedFutures: TrackedFutures
|
||||
# trackedFutures: TrackedFutures
|
||||
|
||||
proc captureOutput*(node: HardhatProcess, logFilePath: string) {.async.} =
|
||||
let logFileHandle = openFile(
|
||||
@ -44,27 +44,42 @@ proc captureOutput*(node: HardhatProcess, logFilePath: string) {.async.} =
|
||||
node.logFile = some fileHandle
|
||||
node.started = newFuture[void]("hardhat.started")
|
||||
try:
|
||||
for line in node.process.outputStream.lines:
|
||||
while true:
|
||||
while(let line = await node.process.stdOutStream.readLine(); line != ""):
|
||||
echo "got line: ", line
|
||||
if line.contains(startedOutput):
|
||||
node.started.complete()
|
||||
|
||||
if line.contains(startedOutput):
|
||||
node.started.complete()
|
||||
|
||||
if error =? fileHandle.writeFile(line & "\n").errorOption:
|
||||
error "failed to write to hardhat file", errorCode = error
|
||||
discard fileHandle.closeFile()
|
||||
return
|
||||
if error =? fileHandle.writeFile(line & "\n").errorOption:
|
||||
error "failed to write to hardhat file", errorCode = error
|
||||
discard fileHandle.closeFile()
|
||||
return
|
||||
|
||||
await sleepAsync(1.millis)
|
||||
await sleepAsync(1.millis)
|
||||
|
||||
# for line in node.process.outputStream.lines:
|
||||
|
||||
# if line.contains(startedOutput):
|
||||
# node.started.complete()
|
||||
|
||||
# if error =? fileHandle.writeFile(line & "\n").errorOption:
|
||||
# error "failed to write to hardhat file", errorCode = error
|
||||
# discard fileHandle.closeFile()
|
||||
# return
|
||||
|
||||
# await sleepAsync(1.millis)
|
||||
|
||||
except CancelledError:
|
||||
discard
|
||||
|
||||
proc start(node: HardhatProcess) =
|
||||
node.process = osproc.startProcess(
|
||||
proc start(node: HardhatProcess) {.async.} =
|
||||
node.process = await startProcess(
|
||||
"npm start",
|
||||
workingDir,
|
||||
# node.arguments,
|
||||
options={poEvalCommand})
|
||||
options={AsyncProcessOption.EvalCommand}
|
||||
)
|
||||
|
||||
for arg in node.arguments:
|
||||
if arg.contains "--log-file=":
|
||||
@ -72,41 +87,41 @@ proc start(node: HardhatProcess) =
|
||||
discard node.captureOutput(logFilePath).track(node)
|
||||
break
|
||||
|
||||
proc waitUntilOutput*(node: HardhatProcess, output: string) =
|
||||
proc waitUntilOutput*(node: HardhatProcess, output: string) {.async.} =
|
||||
if not node.started.isNil:
|
||||
try:
|
||||
waitFor node.started.wait(5000.milliseconds)
|
||||
await node.started.wait(5000.milliseconds)
|
||||
return
|
||||
except AsyncTimeoutError:
|
||||
discard # should raiseAssert below
|
||||
else:
|
||||
for line in node.process.outputStream.lines:
|
||||
if line.contains(output):
|
||||
return
|
||||
# else:
|
||||
# for line in node.process.outputStream.lines:
|
||||
# if line.contains(output):
|
||||
# return
|
||||
raiseAssert "node did not output '" & output & "'"
|
||||
|
||||
proc waitUntilStarted*(node: HardhatProcess) =
|
||||
node.waitUntilOutput(startedOutput)
|
||||
proc waitUntilStarted*(node: HardhatProcess) {.async.} =
|
||||
await node.waitUntilOutput(startedOutput)
|
||||
|
||||
proc startHardhatProcess*(args: openArray[string]): HardhatProcess =
|
||||
proc startHardhatProcess*(args: seq[string]): Future[HardhatProcess] {.async.} =
|
||||
## Starts a Hardhat Node with the specified arguments.
|
||||
let node = HardhatProcess(arguments: @args, trackedFutures: TrackedFutures.new())
|
||||
node.start()
|
||||
await node.start()
|
||||
node
|
||||
|
||||
method stop*(node: HardhatProcess) =
|
||||
method stop*(node: HardhatProcess) {.async.} =
|
||||
# terminate the process
|
||||
procCall NodeProcess(node).stop()
|
||||
|
||||
waitFor node.trackedFutures.cancelTracked()
|
||||
await node.trackedFutures.cancelTracked()
|
||||
|
||||
if logFile =? node.logFile:
|
||||
discard logFile.closeFile()
|
||||
|
||||
proc restart*(node: HardhatProcess) =
|
||||
node.stop()
|
||||
node.start()
|
||||
node.waitUntilStarted()
|
||||
proc restart*(node: HardhatProcess) {.async.} =
|
||||
await node.stop()
|
||||
await node.start()
|
||||
await node.waitUntilStarted()
|
||||
|
||||
proc removeDataDir*(node: HardhatProcess) =
|
||||
discard
|
||||
|
||||
@ -144,14 +144,18 @@ template multinodesuite*(name: string, body: untyped) =
|
||||
let fileName = logDir / fn
|
||||
return fileName
|
||||
|
||||
proc newHardhatProcess(config: HardhatConfig, role: Role): NodeProcess =
|
||||
proc newHardhatProcess(
|
||||
config: HardhatConfig,
|
||||
role: Role
|
||||
): Future[NodeProcess] {.async.} =
|
||||
|
||||
var options: seq[string] = @[]
|
||||
if config.logFile:
|
||||
let updatedLogFile = getLogFile(role, none int)
|
||||
options.add "--log-file=" & updatedLogFile
|
||||
|
||||
let node = startHardhatProcess(options)
|
||||
node.waitUntilStarted()
|
||||
let node = await startHardhatProcess(options)
|
||||
await node.waitUntilStarted()
|
||||
|
||||
debug "started new hardhat node"
|
||||
return node
|
||||
@ -159,7 +163,7 @@ template multinodesuite*(name: string, body: untyped) =
|
||||
proc newNodeProcess(roleIdx: int,
|
||||
config1: NodeConfig,
|
||||
role: Role
|
||||
): NodeProcess =
|
||||
): Future[NodeProcess] {.async.} =
|
||||
|
||||
let nodeIdx = running.len
|
||||
var config = config1
|
||||
@ -189,8 +193,10 @@ template multinodesuite*(name: string, body: untyped) =
|
||||
"--disc-port=" & $(8090 + nodeIdx),
|
||||
"--eth-account=" & $accounts[nodeIdx]])
|
||||
|
||||
let node = startNode(options, config.debugEnabled)
|
||||
node.waitUntilStarted()
|
||||
let node = await startNode(options, config.debugEnabled)
|
||||
echo "[multinodes.newNodeProcess] waiting until ", role, " node started"
|
||||
await node.waitUntilStarted()
|
||||
echo "[multinodes.newNodeProcess] ", role, " NODE STARTED"
|
||||
|
||||
return node
|
||||
|
||||
@ -203,17 +209,17 @@ template multinodesuite*(name: string, body: untyped) =
|
||||
proc validators(): seq[RunningNode] {.used.} =
|
||||
running.filter(proc(r: RunningNode): bool = r.role == Role.Validator)
|
||||
|
||||
proc startHardhatNode(): NodeProcess =
|
||||
proc startHardhatNode(): Future[NodeProcess] {.async.} =
|
||||
var config = nodeConfigs.hardhat
|
||||
return newHardhatProcess(config, Role.Hardhat)
|
||||
return await newHardhatProcess(config, Role.Hardhat)
|
||||
|
||||
proc startClientNode(): NodeProcess =
|
||||
proc startClientNode(): Future[NodeProcess] {.async.} =
|
||||
let clientIdx = clients().len
|
||||
var config = nodeConfigs.clients
|
||||
config.cliOptions.add CliOption(key: "--persistence")
|
||||
return newNodeProcess(clientIdx, config, Role.Client)
|
||||
return await newNodeProcess(clientIdx, config, Role.Client)
|
||||
|
||||
proc startProviderNode(): NodeProcess =
|
||||
proc startProviderNode(): Future[NodeProcess] {.async.} =
|
||||
let providerIdx = providers().len
|
||||
var config = nodeConfigs.providers
|
||||
config.cliOptions.add CliOption(key: "--bootstrap-node", value: bootstrap)
|
||||
@ -224,50 +230,57 @@ template multinodesuite*(name: string, body: untyped) =
|
||||
o => (let idx = o.nodeIdx |? providerIdx; idx == providerIdx)
|
||||
)
|
||||
|
||||
return newNodeProcess(providerIdx, config, Role.Provider)
|
||||
return await newNodeProcess(providerIdx, config, Role.Provider)
|
||||
|
||||
proc startValidatorNode(): NodeProcess =
|
||||
proc startValidatorNode(): Future[NodeProcess] {.async.} =
|
||||
let validatorIdx = validators().len
|
||||
var config = nodeConfigs.validators
|
||||
config.cliOptions.add CliOption(key: "--bootstrap-node", value: bootstrap)
|
||||
config.cliOptions.add CliOption(key: "--validator")
|
||||
|
||||
return newNodeProcess(validatorIdx, config, Role.Validator)
|
||||
return await newNodeProcess(validatorIdx, config, Role.Validator)
|
||||
|
||||
setup:
|
||||
if not nodeConfigs.hardhat.isNil:
|
||||
let node = startHardhatNode()
|
||||
let node = await startHardhatNode()
|
||||
running.add RunningNode(role: Role.Hardhat, node: node)
|
||||
|
||||
for i in 0..<nodeConfigs.clients.numNodes:
|
||||
let node = startClientNode()
|
||||
echo "[multinodes.setup] starting client node ", i
|
||||
let node = await startClientNode()
|
||||
running.add RunningNode(
|
||||
role: Role.Client,
|
||||
node: node,
|
||||
address: some accounts[running.len]
|
||||
)
|
||||
echo "[multinodes.setup] added running client node ", i
|
||||
if i == 0:
|
||||
echo "[multinodes.setup] getting client 0 bootstrap spr"
|
||||
bootstrap = node.client.info()["spr"].getStr()
|
||||
echo "[multinodes.setup] got client 0 bootstrap spr: ", bootstrap
|
||||
|
||||
for i in 0..<nodeConfigs.providers.numNodes:
|
||||
let node = startProviderNode()
|
||||
echo "[multinodes.setup] starting provider node ", i
|
||||
let node = await startProviderNode()
|
||||
running.add RunningNode(
|
||||
role: Role.Provider,
|
||||
node: node,
|
||||
address: some accounts[running.len]
|
||||
)
|
||||
echo "[multinodes.setup] added running provider node ", i
|
||||
|
||||
for i in 0..<nodeConfigs.validators.numNodes:
|
||||
let node = startValidatorNode()
|
||||
let node = await startValidatorNode()
|
||||
running.add RunningNode(
|
||||
role: Role.Validator,
|
||||
node: node,
|
||||
address: some accounts[running.len]
|
||||
)
|
||||
echo "[multinodes.setup] added running validator node ", i
|
||||
|
||||
teardown:
|
||||
for r in running:
|
||||
r.node.stop() # also stops rest client
|
||||
await r.node.stop() # also stops rest client
|
||||
r.node.removeDataDir()
|
||||
running = @[]
|
||||
|
||||
|
||||
@ -1,13 +1,17 @@
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/confutils
|
||||
import pkg/chronicles
|
||||
import pkg/libp2p
|
||||
import pkg/stew/byteutils
|
||||
import std/osproc
|
||||
import std/os
|
||||
import std/sequtils
|
||||
import std/streams
|
||||
import std/strutils
|
||||
import codex/conf
|
||||
import codex/utils/exceptions
|
||||
import codex/utils/trackedfutures
|
||||
import ./codexclient
|
||||
|
||||
export codexclient
|
||||
@ -23,40 +27,48 @@ const executable = "build" / "codex"
|
||||
|
||||
type
|
||||
NodeProcess* = ref object of RootObj
|
||||
process*: Process
|
||||
process*: AsyncProcessRef
|
||||
arguments*: seq[string]
|
||||
debug: bool
|
||||
client: ?CodexClient
|
||||
trackedFutures*: TrackedFutures
|
||||
|
||||
proc start(node: NodeProcess) =
|
||||
if node.debug or defined(windows):
|
||||
node.process = osproc.startProcess(
|
||||
executable,
|
||||
workingDir,
|
||||
node.arguments,
|
||||
options={poParentStreams}
|
||||
)
|
||||
else:
|
||||
node.process = osproc.startProcess(
|
||||
executable,
|
||||
workingDir,
|
||||
node.arguments
|
||||
)
|
||||
proc start(node: NodeProcess) {.async.} =
|
||||
node.process = await startProcess(
|
||||
executable,
|
||||
workingDir,
|
||||
node.arguments,
|
||||
options = {AsyncProcessOption.StdErrToStdOut},
|
||||
stdoutHandle = AsyncProcess.Pipe
|
||||
)
|
||||
|
||||
proc waitUntilOutput*(node: NodeProcess, output: string) =
|
||||
if node.debug:
|
||||
raiseAssert "cannot read node output when in debug mode"
|
||||
proc waitUntilOutput*(node: NodeProcess, output: string, started: Future[void]) {.async.} =
|
||||
let stream = node.process.stdOutStream
|
||||
|
||||
for line in node.process.outputStream.lines:
|
||||
if line.contains(output):
|
||||
return
|
||||
raiseAssert "node did not output '" & output & "'"
|
||||
try:
|
||||
let lineEnding = when defined(windows): "\r\n"
|
||||
else: "\n"
|
||||
|
||||
proc startNode*(args: openArray[string], debug: string | bool = false): NodeProcess =
|
||||
while(let line = await stream.readLine(0, lineEnding); line != ""):
|
||||
if node.debug:
|
||||
echo line
|
||||
|
||||
if line.contains(output):
|
||||
started.complete()
|
||||
|
||||
await sleepAsync(1.millis)
|
||||
except AsyncStreamReadError as e:
|
||||
echo "error reading node output stream: ", e.msgDetail
|
||||
|
||||
proc startNode*(args: seq[string], debug: string | bool = false): Future[NodeProcess] {.async.} =
|
||||
## Starts a Codex Node with the specified arguments.
|
||||
## Set debug to 'true' to see output of the node.
|
||||
let node = NodeProcess(arguments: @args, debug: ($debug != "false"))
|
||||
node.start()
|
||||
let node = NodeProcess(
|
||||
arguments: @args,
|
||||
debug: ($debug != "false"),
|
||||
trackedFutures: TrackedFutures.new()
|
||||
)
|
||||
await node.start()
|
||||
node
|
||||
|
||||
proc dataDir(node: NodeProcess): string =
|
||||
@ -67,6 +79,10 @@ proc apiUrl*(node: NodeProcess): string =
|
||||
let config = CodexConf.load(cmdLine = node.arguments)
|
||||
"http://" & config.apiBindAddress & ":" & $config.apiPort & "/api/codex/v1"
|
||||
|
||||
proc apiPort(node: NodeProcess): string =
|
||||
let config = CodexConf.load(cmdLine = node.arguments)
|
||||
$config.apiPort
|
||||
|
||||
proc discoveryAddress*(node: NodeProcess): string =
|
||||
let config = CodexConf.load(cmdLine = node.arguments)
|
||||
$config.discoveryIp & ":" & $config.discoveryPort
|
||||
@ -78,26 +94,52 @@ proc client*(node: NodeProcess): CodexClient =
|
||||
node.client = some client
|
||||
client
|
||||
|
||||
proc waitUntilStarted*(node: NodeProcess) =
|
||||
while true:
|
||||
if client(node).isAvailable:
|
||||
break
|
||||
sleep(1)
|
||||
proc closeAndWaitClient(node: NodeProcess) {.async.} =
|
||||
without client =? node.client:
|
||||
return
|
||||
|
||||
method stop*(node: NodeProcess) {.base.} =
|
||||
if node.process != nil:
|
||||
node.process.terminate()
|
||||
discard node.process.waitForExit(timeout=5_000)
|
||||
node.process.close()
|
||||
node.process = nil
|
||||
if client =? node.client:
|
||||
node.client = none CodexClient
|
||||
try:
|
||||
client.close()
|
||||
echo "waiting for port ", node.apiPort, " to be closed..."
|
||||
let cmd = when defined(windows):
|
||||
"netstat -ano | findstr "
|
||||
else:
|
||||
"lsof -ti:"
|
||||
while true:
|
||||
let portInUse = await execCommandEx(cmd & node.apiPort)
|
||||
if portInUse.stdOutput == "":
|
||||
echo "port ", node.apiPort, " is no longer in use, continuing..."
|
||||
break
|
||||
node.client = none CodexClient
|
||||
except CatchableError as e:
|
||||
echo "Failed to close codex client: ", e.msg
|
||||
|
||||
proc restart*(node: NodeProcess) =
|
||||
node.stop()
|
||||
node.start()
|
||||
node.waitUntilStarted()
|
||||
method stop*(node: NodeProcess) {.base, async.} =
|
||||
await node.trackedFutures.cancelTracked()
|
||||
if node.process != nil:
|
||||
if err =? node.process.terminate().errorOption:
|
||||
echo "ERROR terminating node process, error code: ", err
|
||||
echo "stopping codex client"
|
||||
await node.closeAndWaitClient().wait(5.seconds)
|
||||
discard await node.process.waitForExit(timeout=5.seconds)
|
||||
await node.process.closeWait()
|
||||
node.process = nil
|
||||
echo "code node and client stopped"
|
||||
|
||||
proc waitUntilStarted*(node: NodeProcess) {.async.} =
|
||||
let started = newFuture[void]()
|
||||
let output = "REST service started"
|
||||
try:
|
||||
discard node.waitUntilOutput(output, started).track(node)
|
||||
await started.wait(5.seconds)
|
||||
except AsyncTimeoutError as e:
|
||||
await node.stop() # allows subsequent tests to continue
|
||||
raiseAssert "node did not output '" & output & "'"
|
||||
|
||||
proc restart*(node: NodeProcess) {.async.} =
|
||||
await node.stop()
|
||||
await node.start()
|
||||
await node.waitUntilStarted()
|
||||
|
||||
proc removeDataDir*(node: NodeProcess) =
|
||||
removeDir(node.dataDir)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user