Refactor to use CodexProcess/CodexConfig and HardhatProcess/HardhatConfig

Test work as long as hardhat is already running. Hardhat config needs to be moved back to suite-level
This commit is contained in:
Eric 2023-11-29 20:03:55 +11:00
parent 47e4783b14
commit fa8f5148b2
No known key found for this signature in database
13 changed files with 581 additions and 395 deletions

View File

@ -0,0 +1,13 @@
import pkg/questionable
type
CliOption* = object of RootObj
nodeIdx*: ?int
key*: string
value*: string
proc `$`*(option: CliOption): string =
var res = option.key
if option.value.len > 0:
res &= "=" & option.value
return res

View File

@ -24,15 +24,6 @@ proc info*(client: CodexClient): JsonNode =
let url = client.baseurl & "/debug/info"
client.http.getContent(url).parseJson()
proc isAvailable*(client: CodexClient): bool =
try:
discard client.info
return true
except OSError as e:
if e.msg == "Connection refused":
return false
raise e
proc setLogLevel*(client: CodexClient, level: string) =
let url = client.baseurl & "/debug/chronicles/loglevel?level=" & level
let headers = newHttpHeaders({"Content-Type": "text/plain"})

View File

@ -0,0 +1,49 @@
import std/options
import std/sequtils
import ./clioption
import ./nodeconfig
export nodeconfig
export clioption
type
CodexConfig* = ref object of NodeConfig
numNodes*: int
cliOptions*: seq[CliOption]
logTopics*: seq[string]
proc nodes*(config: CodexConfig, numNodes: int): CodexConfig =
if numNodes < 0:
raise newException(ValueError, "numNodes must be >= 0")
var startConfig = config
startConfig.numNodes = numNodes
return startConfig
proc simulateProofFailuresFor*(
config: CodexConfig,
providerIdx: int,
failEveryNProofs: int
): CodexConfig =
if providerIdx > config.numNodes - 1:
raise newException(ValueError, "provider index out of bounds")
var startConfig = config
startConfig.cliOptions.add(
CliOption(
nodeIdx: some providerIdx,
key: "--simulate-proof-failures",
value: $failEveryNProofs
)
)
return startConfig
proc withLogTopics*(
config: CodexConfig,
topics: varargs[string]
): CodexConfig =
var startConfig = config
startConfig.logTopics = startConfig.logTopics.concat(@topics)
return startConfig

View File

@ -0,0 +1,73 @@
import pkg/questionable
import pkg/questionable/results
import pkg/confutils
import pkg/chronicles
import pkg/ethers
import pkg/libp2p
import std/os
import std/strutils
import codex/conf
import ./codexclient
import ./nodeprocess
export codexclient
export chronicles
export nodeprocess
logScope:
topics = "integration testing codex process"
type
CodexProcess* = ref object of NodeProcess
client: ?CodexClient
method workingDir(node: CodexProcess): string =
return currentSourcePath() / ".." / ".." / ".."
method executable(node: CodexProcess): string =
return "build" / "codex"
method startedOutput(node: CodexProcess): string =
return "REST service started"
method processOptions(node: CodexProcess): set[AsyncProcessOption] =
return {AsyncProcessOption.StdErrToStdOut}
method onOutputLineCaptured(node: CodexProcess, line: string) =
discard
proc dataDir(node: CodexProcess): string =
let config = CodexConf.load(cmdLine = node.arguments)
return config.dataDir.string
proc ethAccount*(node: CodexProcess): Address =
let config = CodexConf.load(cmdLine = node.arguments)
without ethAccount =? config.ethAccount:
raiseAssert "eth account not set"
return Address(ethAccount)
proc apiUrl*(node: CodexProcess): string =
let config = CodexConf.load(cmdLine = node.arguments)
return "http://" & config.apiBindAddress & ":" & $config.apiPort & "/api/codex/v1"
proc client*(node: CodexProcess): CodexClient =
if client =? node.client:
return client
let client = CodexClient.new(node.apiUrl)
node.client = some client
return client
method stop*(node: CodexProcess) {.async.} =
logScope:
nodeName = node.name
await procCall NodeProcess(node).stop()
trace "stopping codex client"
if client =? node.client:
client.close()
node.client = none CodexClient
method removeDataDir*(node: CodexProcess) =
removeDir(node.dataDir)

View File

@ -1,127 +0,0 @@
import pkg/questionable
import pkg/questionable/results
import pkg/confutils
import pkg/chronicles
import pkg/chronos
import pkg/stew/io2
import std/osproc
import std/os
import std/streams
import std/strutils
import pkg/codex/conf
import pkg/codex/utils/trackedfutures
import ./codexclient
import ./nodes
export codexclient
export codexclient
export chronicles
logScope:
topics = "integration testing nodes"
const workingDir = currentSourcePath() / ".." / ".." / ".." / "vendor" / "codex-contracts-eth"
const startedOutput = "Started HTTP and WebSocket JSON-RPC server at"
type
HardhatProcess* = ref object of NodeProcess
logFile: ?IoHandle
started: Future[void]
# trackedFutures: TrackedFutures
proc captureOutput*(node: HardhatProcess, logFilePath: string) {.async.} =
let logFileHandle = openFile(
logFilePath,
{OpenFlags.Write, OpenFlags.Create, OpenFlags.Truncate}
)
without fileHandle =? logFileHandle:
error "failed to open log file",
path = logFilePath,
errorCode = $logFileHandle.error
node.logFile = some fileHandle
node.started = newFuture[void]("hardhat.started")
try:
while true:
while(let line = await node.process.stdOutStream.readLine(); line != ""):
echo "got line: ", line
if line.contains(startedOutput):
node.started.complete()
if error =? fileHandle.writeFile(line & "\n").errorOption:
error "failed to write to hardhat file", errorCode = error
discard fileHandle.closeFile()
return
await sleepAsync(1.millis)
await sleepAsync(1.millis)
# for line in node.process.outputStream.lines:
# if line.contains(startedOutput):
# node.started.complete()
# if error =? fileHandle.writeFile(line & "\n").errorOption:
# error "failed to write to hardhat file", errorCode = error
# discard fileHandle.closeFile()
# return
# await sleepAsync(1.millis)
except CancelledError:
discard
proc start(node: HardhatProcess) {.async.} =
node.process = await startProcess(
"npm start",
workingDir,
# node.arguments,
options={AsyncProcessOption.EvalCommand}
)
for arg in node.arguments:
if arg.contains "--log-file=":
let logFilePath = arg.split("=")[1]
discard node.captureOutput(logFilePath).track(node)
break
proc waitUntilOutput*(node: HardhatProcess, output: string) {.async.} =
if not node.started.isNil:
try:
await node.started.wait(5000.milliseconds)
return
except AsyncTimeoutError:
discard # should raiseAssert below
# else:
# for line in node.process.outputStream.lines:
# if line.contains(output):
# return
raiseAssert "node did not output '" & output & "'"
proc waitUntilStarted*(node: HardhatProcess) {.async.} =
await node.waitUntilOutput(startedOutput)
proc startHardhatProcess*(args: seq[string]): Future[HardhatProcess] {.async.} =
## Starts a Hardhat Node with the specified arguments.
let node = HardhatProcess(arguments: @args, trackedFutures: TrackedFutures.new())
await node.start()
node
method stop*(node: HardhatProcess) {.async.} =
# terminate the process
procCall NodeProcess(node).stop()
await node.trackedFutures.cancelTracked()
if logFile =? node.logFile:
discard logFile.closeFile()
proc restart*(node: HardhatProcess) {.async.} =
await node.stop()
await node.start()
await node.waitUntilStarted()
proc removeDataDir*(node: HardhatProcess) =
discard

View File

@ -0,0 +1,6 @@
import ./nodeconfig
export nodeconfig
type
HardhatConfig* = ref object of NodeConfig

View File

@ -0,0 +1,100 @@
import pkg/questionable
import pkg/questionable/results
import pkg/confutils
import pkg/chronicles
import pkg/chronos
import pkg/stew/io2
import std/osproc
import std/os
import std/sets
import std/streams
import std/strutils
import std/sugar
import pkg/codex/conf
import pkg/codex/utils/trackedfutures
import ./codexclient
import ./nodeprocess
export codexclient
export chronicles
logScope:
topics = "integration testing hardhat process"
nodeName = "hardhat"
type
HardhatProcess* = ref object of NodeProcess
logFile: ?IoHandle
method workingDir(node: HardhatProcess): string =
return currentSourcePath() / ".." / ".." / ".." / "vendor" / "codex-contracts-eth"
method executable(node: HardhatProcess): string =
return "npm start"
method startedOutput(node: HardhatProcess): string =
return "Started HTTP and WebSocket JSON-RPC server at"
method processOptions(node: HardhatProcess): set[AsyncProcessOption] =
return {AsyncProcessOption.EvalCommand, AsyncProcessOption.StdErrToStdOut}
proc openLogFile(node: HardhatProcess, logFilePath: string): IoHandle =
let logFileHandle = openFile(
logFilePath,
{OpenFlags.Write, OpenFlags.Create, OpenFlags.Truncate}
)
without fileHandle =? logFileHandle:
fatal "failed to open log file",
path = logFilePath,
errorCode = $logFileHandle.error
raiseAssert "failed to open log file, aborting"
return fileHandle
proc startNode*(
_: type HardhatProcess,
args: seq[string] = @[],
debug: string | bool = false,
name: string = "hardhat"
): Future[HardhatProcess] {.async.} =
var logFilePath = ""
var arguments = newSeq[string]()
for arg in args:
if arg.contains "--log-file=":
logFilePath = arg.split("=")[1]
else:
arguments.add arg
trace "starting hardhat node", arguments
echo ">>> starting hardhat node with args: ", arguments
let node = await NodeProcess.startNode(arguments, debug, "hardhat")
let hardhat = HardhatProcess(node)
if logFilePath != "":
hardhat.logFile = some hardhat.openLogFile(logFilePath)
# let hardhat = HardhatProcess()
return hardhat
method onOutputLineCaptured(node: HardhatProcess, line: string) =
without logFile =? node.logFile:
return
if error =? logFile.writeFile(line & "\n").errorOption:
error "failed to write to hardhat file", errorCode = error
discard logFile.closeFile()
node.logFile = none IoHandle
method stop*(node: HardhatProcess) {.async.} =
# terminate the process
procCall NodeProcess(node).stop()
if logFile =? node.logFile:
discard logFile.closeFile()
method removeDataDir*(node: HardhatProcess) =
discard

View File

@ -47,7 +47,7 @@ template marketplacesuite*(name: string, body: untyped) =
proc createAvailabilities(datasetSize: int, duration: uint64) =
# post availability to each provider
for i in 0..<providers().len:
let provider = providers()[i].node.client
let provider = providers()[i].client
discard provider.postAvailability(
size=datasetSize.u256, # should match 1 slot only
@ -90,6 +90,7 @@ template marketplacesuite*(name: string, body: untyped) =
discard
setup:
echo "[marketplacesuite.setup] setup start"
marketplace = Marketplace.new(Marketplace.address, ethProvider.getSigner())
let tokenAddress = await marketplace.token()
token = Erc20Token.new(tokenAddress, ethProvider.getSigner())

View File

@ -5,47 +5,31 @@ import std/sugar
import std/times
import pkg/chronicles
import ../ethertest
import ./codexclient
import ./hardhat
import ./nodes
import ./hardhatprocess
import ./codexprocess
import ./hardhatconfig
import ./codexconfig
export ethertest
export codexclient
export nodes
export hardhatprocess
export codexprocess
export hardhatconfig
export codexconfig
type
RunningNode* = ref object
role*: Role
node*: NodeProcess
address*: ?Address
NodeConfigs* = object
clients*: NodeConfig
providers*: NodeConfig
validators*: NodeConfig
clients*: CodexConfig
providers*: CodexConfig
validators*: CodexConfig
hardhat*: HardhatConfig
Config* = object of RootObj
logFile*: bool
NodeConfig* = object of Config
numNodes*: int
cliOptions*: seq[CliOption]
logTopics*: seq[string]
debugEnabled*: bool
HardhatConfig* = ref object of Config
Role* {.pure.} = enum
Client,
Provider,
Validator,
Hardhat
CliOption* = object of RootObj
nodeIdx*: ?int
key*: string
value*: string
proc `$`*(option: CliOption): string =
var res = option.key
if option.value.len > 0:
res &= "=" & option.value
return res
proc new*(_: type RunningNode,
role: Role,
@ -53,57 +37,6 @@ proc new*(_: type RunningNode,
RunningNode(role: role,
node: node)
proc nodes*(config: NodeConfig, numNodes: int): NodeConfig =
if numNodes < 0:
raise newException(ValueError, "numNodes must be >= 0")
var startConfig = config
startConfig.numNodes = numNodes
return startConfig
proc simulateProofFailuresFor*(
config: NodeConfig,
providerIdx: int,
failEveryNProofs: int
): NodeConfig =
if providerIdx > config.numNodes - 1:
raise newException(ValueError, "provider index out of bounds")
var startConfig = config
startConfig.cliOptions.add(
CliOption(
nodeIdx: some providerIdx,
key: "--simulate-proof-failures",
value: $failEveryNProofs
)
)
return startConfig
proc debug*(config: NodeConfig, enabled = true): NodeConfig =
## output log in stdout
var startConfig = config
startConfig.debugEnabled = enabled
return startConfig
proc withLogTopics*(
config: NodeConfig,
topics: varargs[string]
): NodeConfig =
var startConfig = config
startConfig.logTopics = startConfig.logTopics.concat(@topics)
return startConfig
proc withLogFile*[T: Config](
config: T,
logToFile: bool = true
): T =
var startConfig = config
startConfig.logFile = logToFile
return startConfig
proc nextFreePort(startPort: int): Future[int] {.async.} =
let cmd = when defined(windows):
"netstat -ano | findstr :"
@ -129,6 +62,7 @@ template multinodesuite*(name: string, body: untyped) =
var nodeConfigs: NodeConfigs
template test(tname, startNodeConfigs, tbody) =
echo "[multinodes] inside test template, tname: ", tname, ", startNodeConfigs: ", startNodeConfigs
currentTestName = tname
nodeConfigs = startNodeConfigs
test tname:
@ -163,24 +97,24 @@ template multinodesuite*(name: string, body: untyped) =
role: Role
): Future[NodeProcess] {.async.} =
var options: seq[string] = @[]
var args: seq[string] = @[]
if config.logFile:
let updatedLogFile = getLogFile(role, none int)
options.add "--log-file=" & updatedLogFile
let node = await startHardhatProcess(options)
args.add "--log-file=" & updatedLogFile
echo ">>> [multinodes] starting hardhat node with args: ", args
let node = await HardhatProcess.startNode(args, config.debugEnabled, "hardhat")
await node.waitUntilStarted()
debug "started new hardhat node"
return node
proc newNodeProcess(roleIdx: int,
config1: NodeConfig,
proc newCodexProcess(roleIdx: int,
config: CodexConfig,
role: Role
): Future[NodeProcess] {.async.} =
let nodeIdx = running.len
var config = config1
var conf = config
if nodeIdx > accounts.len - 1:
raiseAssert("Cannot start node at nodeIdx " & $nodeIdx &
@ -190,14 +124,20 @@ template multinodesuite*(name: string, body: untyped) =
sanitize($starttime) /
sanitize($role & "_" & $roleIdx)
if config.logFile:
if conf.logFile:
let updatedLogFile = getLogFile(role, some roleIdx)
config.cliOptions.add CliOption(key: "--log-file", value: updatedLogFile)
conf.cliOptions.add CliOption(key: "--log-file", value: updatedLogFile)
if config.logTopics.len > 0:
config.cliOptions.add CliOption(key: "--log-level", value: "INFO;TRACE: " & config.logTopics.join(","))
let logLevel = conf.logLevel |? LogLevel.INFO
if conf.logTopics.len > 0:
conf.cliOptions.add CliOption(
key: "--log-level",
value: $logLevel & ";TRACE: " & conf.logTopics.join(",")
)
else:
conf.cliOptions.add CliOption(key: "--log-level", value: $logLevel)
var options = config.cliOptions.map(o => $o)
var args = conf.cliOptions.map(o => $o)
.concat(@[
"--api-port=" & $ await nextFreePort(8080 + nodeIdx),
"--data-dir=" & datadir,
@ -207,21 +147,30 @@ template multinodesuite*(name: string, body: untyped) =
"--disc-port=" & $ await nextFreePort(8090 + nodeIdx),
"--eth-account=" & $accounts[nodeIdx]])
let node = await startNode(options, config.debugEnabled)
echo "[multinodes.newNodeProcess] waiting until ", role, " node started"
let node = await CodexProcess.startNode(args, conf.debugEnabled, $role & $roleIdx)
echo "[multinodes.newCodexProcess] waiting until ", role, " node started"
await node.waitUntilStarted()
echo "[multinodes.newNodeProcess] ", role, " NODE STARTED"
echo "[multinodes.newCodexProcess] ", role, " NODE STARTED"
return node
proc clients(): seq[RunningNode] {.used.} =
running.filter(proc(r: RunningNode): bool = r.role == Role.Client)
proc clients(): seq[CodexProcess] {.used.} =
return collect:
for r in running:
if r.role == Role.Client:
CodexProcess(r.node)
proc providers(): seq[RunningNode] {.used.} =
running.filter(proc(r: RunningNode): bool = r.role == Role.Provider)
proc providers(): seq[CodexProcess] {.used.} =
return collect:
for r in running:
if r.role == Role.Provider:
CodexProcess(r.node)
proc validators(): seq[RunningNode] {.used.} =
running.filter(proc(r: RunningNode): bool = r.role == Role.Validator)
proc validators(): seq[CodexProcess] {.used.} =
return collect:
for r in running:
if r.role == Role.Validator:
CodexProcess(r.node)
proc startHardhatNode(): Future[NodeProcess] {.async.} =
var config = nodeConfigs.hardhat
@ -231,7 +180,7 @@ template multinodesuite*(name: string, body: untyped) =
let clientIdx = clients().len
var config = nodeConfigs.clients
config.cliOptions.add CliOption(key: "--persistence")
return await newNodeProcess(clientIdx, config, Role.Client)
return await newCodexProcess(clientIdx, config, Role.Client)
proc startProviderNode(): Future[NodeProcess] {.async.} =
let providerIdx = providers().len
@ -244,7 +193,7 @@ template multinodesuite*(name: string, body: untyped) =
o => (let idx = o.nodeIdx |? providerIdx; idx == providerIdx)
)
return await newNodeProcess(providerIdx, config, Role.Provider)
return await newCodexProcess(providerIdx, config, Role.Provider)
proc startValidatorNode(): Future[NodeProcess] {.async.} =
let validatorIdx = validators().len
@ -252,45 +201,47 @@ template multinodesuite*(name: string, body: untyped) =
config.cliOptions.add CliOption(key: "--bootstrap-node", value: bootstrap)
config.cliOptions.add CliOption(key: "--validator")
return await newNodeProcess(validatorIdx, config, Role.Validator)
return await newCodexProcess(validatorIdx, config, Role.Validator)
setup:
echo "[multinodes.setup] setup start"
if not nodeConfigs.hardhat.isNil:
echo "[multinodes.setup] starting hardhat node "
let node = await startHardhatNode()
running.add RunningNode(role: Role.Hardhat, node: node)
for i in 0..<nodeConfigs.clients.numNodes:
echo "[multinodes.setup] starting client node ", i
let node = await startClientNode()
running.add RunningNode(
role: Role.Client,
node: node,
address: some accounts[running.len]
)
echo "[multinodes.setup] added running client node ", i
if i == 0:
echo "[multinodes.setup] getting client 0 bootstrap spr"
bootstrap = node.client.info()["spr"].getStr()
echo "[multinodes.setup] got client 0 bootstrap spr: ", bootstrap
if not nodeConfigs.clients.isNil:
for i in 0..<nodeConfigs.clients.numNodes:
echo "[multinodes.setup] starting client node ", i
let node = await startClientNode()
running.add RunningNode(
role: Role.Client,
node: node
)
echo "[multinodes.setup] added running client node ", i
if i == 0:
echo "[multinodes.setup] getting client 0 bootstrap spr"
bootstrap = CodexProcess(node).client.info()["spr"].getStr()
echo "[multinodes.setup] got client 0 bootstrap spr: ", bootstrap
for i in 0..<nodeConfigs.providers.numNodes:
echo "[multinodes.setup] starting provider node ", i
let node = await startProviderNode()
running.add RunningNode(
role: Role.Provider,
node: node,
address: some accounts[running.len]
)
echo "[multinodes.setup] added running provider node ", i
if not nodeConfigs.providers.isNil:
for i in 0..<nodeConfigs.providers.numNodes:
echo "[multinodes.setup] starting provider node ", i
let node = await startProviderNode()
running.add RunningNode(
role: Role.Provider,
node: node
)
echo "[multinodes.setup] added running provider node ", i
for i in 0..<nodeConfigs.validators.numNodes:
let node = await startValidatorNode()
running.add RunningNode(
role: Role.Validator,
node: node,
address: some accounts[running.len]
)
echo "[multinodes.setup] added running validator node ", i
if not nodeConfigs.validators.isNil:
for i in 0..<nodeConfigs.validators.numNodes:
let node = await startValidatorNode()
running.add RunningNode(
role: Role.Validator,
node: node
)
echo "[multinodes.setup] added running validator node ", i
teardown:
for r in running:

View File

@ -0,0 +1,34 @@
import pkg/chronicles
import pkg/questionable
export chronicles
type
NodeConfig* = ref object of RootObj
logFile*: bool
logLevel*: ?LogLevel
debugEnabled*: bool
proc debug*[T: NodeConfig](config: T, enabled = true): T =
## output log in stdout
var startConfig = config
startConfig.debugEnabled = enabled
return startConfig
proc withLogFile*[T: NodeConfig](
config: T,
logToFile: bool = true
): T =
var startConfig = config
startConfig.logFile = logToFile
return startConfig
proc withLogLevel*[T: NodeConfig](
config: NodeConfig,
level: LogLevel
): T =
var startConfig = config
startConfig.logLevel = some level
return startConfig

View File

@ -0,0 +1,149 @@
import pkg/questionable
import pkg/questionable/results
import pkg/confutils
import pkg/chronicles
import pkg/libp2p
import pkg/stew/byteutils
import std/osproc
import std/os
import std/sequtils
import std/streams
import std/strutils
import codex/conf
import codex/utils/exceptions
import codex/utils/trackedfutures
import ./codexclient
export codexclient
export chronicles
logScope:
topics = "integration testing node process"
type
NodeProcess* = ref object of RootObj
process*: AsyncProcessRef
arguments*: seq[string]
debug: bool
trackedFutures*: TrackedFutures
name*: string
method workingDir(node: NodeProcess): string {.base.} =
raiseAssert "[workingDir] not implemented"
method executable(node: NodeProcess): string {.base.} =
raiseAssert "[executable] not implemented"
method startedOutput(node: NodeProcess): string {.base.} =
raiseAssert "[startedOutput] not implemented"
method processOptions(node: NodeProcess): set[AsyncProcessOption] {.base.} =
raiseAssert "[processOptions] not implemented"
method onOutputLineCaptured(node: NodeProcess, line: string) {.base.} =
raiseAssert "[onOutputLineCaptured] not implemented"
method start(node: NodeProcess) {.base, async.} =
logScope:
nodeName = node.name
trace "starting node", args = node.arguments
node.process = await startProcess(
node.executable,
node.workingDir,
node.arguments,
options = node.processOptions,
stdoutHandle = AsyncProcess.Pipe
)
proc captureOutput*(
node: NodeProcess,
output: string,
started: Future[void]
) {.async.} =
logScope:
nodeName = node.name
trace "waiting for output", output
let stream = node.process.stdOutStream
try:
while(let line = await stream.readLine(0, "\n"); line != ""):
if node.debug:
# would be nice if chronicles could parse and display with colors
echo line
if not started.isNil and not started.finished and line.contains(output):
started.complete()
node.onOutputLineCaptured(line)
await sleepAsync(1.millis)
except AsyncStreamReadError as e:
error "error reading output stream", error = e.msgDetail
proc startNode*[T: NodeProcess](
_: type T,
args: seq[string],
debug: string | bool = false,
name: string
): Future[T] {.async.} =
## Starts a Codex Node with the specified arguments.
## Set debug to 'true' to see output of the node.
let node = T(
arguments: @args,
debug: ($debug != "false"),
trackedFutures: TrackedFutures.new(),
name: name
)
await node.start()
return node
method stop*(node: NodeProcess) {.base, async.} =
logScope:
nodeName = node.name
await node.trackedFutures.cancelTracked()
if node.process != nil:
try:
if err =? node.process.terminate().errorOption:
error "failed to terminate node process", errorCode = err
discard await node.process.waitForExit(timeout=5.seconds)
# close process' streams
await node.process.closeWait()
except AsyncTimeoutError as e:
error "waiting for process exit timed out", error = e.msgDetail
except CatchableError as e:
error "error stopping node process", error = e.msg
finally:
node.process = nil
trace "node stopped"
proc waitUntilStarted*(node: NodeProcess) {.async.} =
logScope:
nodeName = node.name
trace "waiting until node started"
let started = newFuture[void]()
try:
discard node.captureOutput(node.startedOutput, started).track(node)
await started.wait(5.seconds)
except AsyncTimeoutError as e:
# attempt graceful shutdown in case node was partially started, prevent
# zombies
await node.stop()
raiseAssert "node did not output '" & node.startedOutput & "'"
proc restart*(node: NodeProcess) {.async.} =
await node.stop()
await node.start()
await node.waitUntilStarted()
method removeDataDir*(node: NodeProcess) {.base.} =
raiseAssert "[removeDataDir] not implemented"

View File

@ -1,89 +1,70 @@
import pkg/questionable
import pkg/questionable/results
import pkg/confutils
import pkg/chronicles
import pkg/libp2p
import pkg/stew/byteutils
import std/osproc
import std/os
import std/sequtils
import std/streams
import std/strutils
import codex/conf
import codex/utils/exceptions
import codex/utils/trackedfutures
import ./codexclient
export codexclient
export codexclient
export chronicles
logScope:
topics = "integration testing nodes"
const workingDir = currentSourcePath() / ".." / ".." / ".."
const executable = "build" / "codex"
type
NodeProcess* = ref object of RootObj
process*: AsyncProcessRef
arguments*: seq[string]
NodeProcess* = ref object
process: Process
arguments: seq[string]
debug: bool
client: ?CodexClient
trackedFutures*: TrackedFutures
proc start(node: NodeProcess) {.async.} =
node.process = await startProcess(
executable,
workingDir,
node.arguments,
options = {AsyncProcessOption.StdErrToStdOut},
stdoutHandle = AsyncProcess.Pipe
)
proc start(node: NodeProcess) =
if node.debug:
node.process = osproc.startProcess(
executable,
workingDir,
node.arguments,
options={poParentStreams}
)
else:
node.process = osproc.startProcess(
executable,
workingDir,
node.arguments
)
proc waitUntilOutput*(node: NodeProcess, output: string, started: Future[void]) {.async.} =
let stream = node.process.stdOutStream
proc waitUntilOutput*(node: NodeProcess, output: string) =
if node.debug:
raiseAssert "cannot read node output when in debug mode"
for line in node.process.outputStream.lines:
if line.contains(output):
return
raiseAssert "node did not output '" & output & "'"
try:
while(let line = await stream.readLine(0, "\n"); line != ""):
if node.debug:
echo line
proc waitUntilStarted*(node: NodeProcess) =
if node.debug:
sleep(5_000)
else:
node.waitUntilOutput("Started codex node")
if line.contains(output):
started.complete()
await sleepAsync(1.millis)
except AsyncStreamReadError as e:
echo "error reading node output stream: ", e.msgDetail
proc startNode*(args: seq[string], debug: string | bool = false): Future[NodeProcess] {.async.} =
proc startNode*(args: openArray[string], debug: string | bool = false): NodeProcess =
## Starts a Codex Node with the specified arguments.
## Set debug to 'true' to see output of the node.
let node = NodeProcess(
arguments: @args,
debug: ($debug != "false"),
trackedFutures: TrackedFutures.new()
)
await node.start()
let node = NodeProcess(arguments: @args, debug: ($debug != "false"))
node.start()
node
proc dataDir(node: NodeProcess): string =
let config = CodexConf.load(cmdLine = node.arguments)
config.dataDir.string
proc apiUrl*(node: NodeProcess): string =
proc apiUrl(node: NodeProcess): string =
let config = CodexConf.load(cmdLine = node.arguments)
"http://" & config.apiBindAddress & ":" & $config.apiPort & "/api/codex/v1"
proc apiPort(node: NodeProcess): string =
let config = CodexConf.load(cmdLine = node.arguments)
$config.apiPort
proc discoveryAddress*(node: NodeProcess): string =
let config = CodexConf.load(cmdLine = node.arguments)
$config.discoveryIp & ":" & $config.discoveryPort
proc client*(node: NodeProcess): CodexClient =
if client =? node.client:
return client
@ -91,55 +72,20 @@ proc client*(node: NodeProcess): CodexClient =
node.client = some client
client
proc closeAndWaitClient(node: NodeProcess) {.async.} =
without client =? node.client:
return
try:
client.close()
echo "waiting for port ", node.apiPort, " to be closed..."
let cmd = when defined(windows):
"netstat -ano | findstr "
else:
"lsof -ti:"
while true:
let portInUse = await execCommandEx(cmd & node.apiPort)
if portInUse.stdOutput == "":
echo "port ", node.apiPort, " is no longer in use, continuing..."
break
node.client = none CodexClient
except CatchableError as e:
echo "Failed to close codex client: ", e.msg
method stop*(node: NodeProcess) {.base, async.} =
await node.trackedFutures.cancelTracked()
proc stop*(node: NodeProcess) =
if node.process != nil:
if err =? node.process.terminate().errorOption:
echo "ERROR terminating node process, error code: ", err
echo "stopping codex client"
discard await node.process.waitForExit(timeout=5.seconds)
await node.process.closeWait()
if client =? node.client:
client.close()
node.client = none CodexClient
# await node.closeAndWaitClient().wait(5.seconds)
node.process.terminate()
discard node.process.waitForExit(timeout=5_000)
node.process.close()
node.process = nil
echo "code node and client stopped"
if client =? node.client:
node.client = none CodexClient
client.close()
proc waitUntilStarted*(node: NodeProcess) {.async.} =
let started = newFuture[void]()
let output = "REST service started"
try:
discard node.waitUntilOutput(output, started).track(node)
await started.wait(5.seconds)
except AsyncTimeoutError as e:
await node.stop() # allows subsequent tests to continue
raiseAssert "node did not output '" & output & "'"
proc restart*(node: NodeProcess) {.async.} =
await node.stop()
await node.start()
await node.waitUntilStarted()
proc restart*(node: NodeProcess) =
node.stop()
node.start()
node.waitUntilStarted()
proc removeDataDir*(node: NodeProcess) =
removeDir(node.dataDir)

View File

@ -17,23 +17,23 @@ marketplacesuite "Hosts submit regular proofs":
test "hosts submit periodic proofs for slots they fill", NodeConfigs(
# Uncomment to start Hardhat automatically, mainly so logs can be inspected locally
# hardhat: HardhatConfig().withLogFile()
# hardhat: HardhatConfig().debug().withLogFile(),
clients:
NodeConfig()
CodexConfig()
.nodes(1)
# .debug() # uncomment to enable console log output
.debug() # uncomment to enable console log output
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("node"),
providers:
NodeConfig()
CodexConfig()
.nodes(1)
# .debug() # uncomment to enable console log output
.debug() # uncomment to enable console log output
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("marketplace", "sales", "reservations", "node"),
):
let client0 = clients()[0].node.client
let client0 = clients()[0].client
let totalPeriods = 50
let data = byteutils.toHex(await exampleData())
@ -66,17 +66,17 @@ marketplacesuite "Simulate invalid proofs":
test "slot is freed after too many invalid proofs submitted", NodeConfigs(
# Uncomment to start Hardhat automatically, mainly so logs can be inspected locally
# hardhat: HardhatConfig().withLogFile()
# hardhat: HardhatConfig().debug().withLogFile(),
clients:
NodeConfig()
CodexConfig()
.nodes(1)
# .debug() # uncomment to enable console log output
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("node"),
providers:
NodeConfig()
CodexConfig()
.nodes(1)
.simulateProofFailuresFor(providerIdx=0, failEveryNProofs=1)
# .debug() # uncomment to enable console log output
@ -84,13 +84,13 @@ marketplacesuite "Simulate invalid proofs":
.withLogTopics("marketplace", "sales", "reservations", "node"),
validators:
NodeConfig()
CodexConfig()
.nodes(1)
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
# .debug() # uncomment to enable console log output
.withLogTopics("validator", "onchain", "ethers")
):
let client0 = clients()[0].node.client
let client0 = clients()[0].client
let totalPeriods = 50
let data = byteutils.toHex(await exampleData())
@ -118,17 +118,17 @@ marketplacesuite "Simulate invalid proofs":
test "slot is not freed when not enough invalid proofs submitted", NodeConfigs(
# Uncomment to start Hardhat automatically, mainly so logs can be inspected locally
# hardhat: HardhatConfig().withLogFile()
# hardhat: HardhatConfig().debug().withLogFile(),
clients:
NodeConfig()
CodexConfig()
.nodes(1)
# .debug() # uncomment to enable console log output
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("node"),
providers:
NodeConfig()
CodexConfig()
.nodes(1)
.simulateProofFailuresFor(providerIdx=0, failEveryNProofs=3)
# .debug() # uncomment to enable console log output
@ -136,13 +136,13 @@ marketplacesuite "Simulate invalid proofs":
.withLogTopics("marketplace", "sales", "reservations", "node"),
validators:
NodeConfig()
CodexConfig()
.nodes(1)
# .debug()
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("validator", "onchain", "ethers")
):
let client0 = clients()[0].node.client
let client0 = clients()[0].client
let totalPeriods = 25
let data = byteutils.toHex(await exampleData())
@ -171,17 +171,17 @@ marketplacesuite "Simulate invalid proofs":
test "host that submits invalid proofs is paid out less", NodeConfigs(
# Uncomment to start Hardhat automatically, mainly so logs can be inspected locally
# hardhat: HardhatConfig().withLogFile()
# hardhat: HardhatConfig().debug().withLogFile(),
clients:
NodeConfig()
CodexConfig()
.nodes(1)
# .debug() # uncomment to enable console log output.debug()
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("node", "erasure", "clock", "purchases"),
providers:
NodeConfig()
CodexConfig()
.nodes(2)
.simulateProofFailuresFor(providerIdx=0, failEveryNProofs=2)
# .debug() # uncomment to enable console log output
@ -189,20 +189,20 @@ marketplacesuite "Simulate invalid proofs":
.withLogTopics("marketplace", "sales", "reservations", "node"),
validators:
NodeConfig()
CodexConfig()
.nodes(1)
# .debug()
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("validator")
):
let client0 = clients()[0].node.client
let client0 = clients()[0].client
let provider0 = providers()[0]
let provider1 = providers()[1]
let totalPeriods = 25
let data = byteutils.toHex(await exampleData())
discard provider0.node.client.postAvailability(
discard provider0.client.postAvailability(
size=data.len.u256, # should match 1 slot only
duration=totalPeriods.periods.u256,
minPrice=300.u256,
@ -232,7 +232,7 @@ marketplacesuite "Simulate invalid proofs":
# now add availability for provider1, which should allow provider1 to put
# the remaining slot in its queue
discard provider1.node.client.postAvailability(
discard provider1.client.postAvailability(
size=data.len.u256, # should match 1 slot only
duration=totalPeriods.periods.u256,
minPrice=300.u256,
@ -243,24 +243,24 @@ marketplacesuite "Simulate invalid proofs":
let provider1slotId = slotId(requestId, provider1slotIndex)
# Wait til second slot is filled. SaleFilled happens too quickly, check SaleProving instead.
check eventually provider1.node.client.saleStateIs(provider1slotId, "SaleProving")
check eventually provider1.client.saleStateIs(provider1slotId, "SaleProving")
check eventually client0.purchaseStateIs(purchaseId, "started")
let currentPeriod = await getCurrentPeriod()
check eventuallyP(
# SaleFinished happens too quickly, check SalePayout instead
provider0.node.client.saleStateIs(provider0slotId, "SalePayout"),
provider0.client.saleStateIs(provider0slotId, "SalePayout"),
currentPeriod + totalPeriods.u256 + 1)
check eventuallyP(
# SaleFinished happens too quickly, check SalePayout instead
provider1.node.client.saleStateIs(provider1slotId, "SalePayout"),
provider1.client.saleStateIs(provider1slotId, "SalePayout"),
currentPeriod + totalPeriods.u256 + 1)
check eventually(
(await token.balanceOf(!provider1.address)) >
(await token.balanceOf(!provider0.address))
(await token.balanceOf(provider1.ethAccount)) >
(await token.balanceOf(provider0.ethAccount))
)
await subscription.unsubscribe()