nim-codex/tests/integration/nodeprocess.nim
Marcin Czenko 92a0eda79a
Validator historical state restoration (#922)
* adds a new overload of queryPastEvents allowing to query past events based on timestamp in the past

* adds state restoration to validator

* refactors a bit to get the tests back to work

* replaces deprecated generic methods from Market with methods for specific event types

* Refactors binary search

* adds market tests for querying past SlotFilled events and binary search

* Takes into account that <<earliest>> block available is not necessarily the genesis block

* Adds more logging and makes testing earliest block boundary more reliable

* adds validation tests for historical state restoration

* adds mockprovider to simplify and improve testing of the edge conditions

* adds slot reservation to the new tests after rebasing

* adds validation groups and group index in logs of validator

* adds integration test with two validators

* adds comment on how to enable logging in integration test executable itself

* testIntegration: makes list is running nodes injected and available in the body of the test

* validation: adds integration test for historical state

* adds more logging to validator

* integration test: validator only looks 30 days back for historical state

* adds logging of the slotState when removing slots during validation

* review and refactor validator integration tests

* adds validation to the set of integration tests

* Fixes mistyped name of the mock provider module in testMarket

* Fixes a typo in the name of the validation suite in integration tests

* Makes validation unit test a bit easier to follow

* better use of logScopes to reduce duplication

* improves timing and clarifies the test conditions

* uses http as default RPC provider for nodes running in integration tests as a workaround for dropped subscriptions

* simplifies the validation integration tests by waiting for failed request instead of tracking slots

* adds config option allowing selectively to set different provider url

* Brings back the default settings for RPC provider in integration tests

* use http RPC provider for clients in validation integration tests

* fine-tune the tests

* Makes validator integration test more robust - adds extra tracking

* brings tracking of marketplace event back to validator integration test

* refactors integration tests

* deletes tmp file

* adds <<return>> after forcing integration test to fail preliminarily

* re-enables all integration tests and matrix

* stops debug output in CI

* allows to choose a different RPC provider for a given integration test suite

* fixes signature of <<getBlock>> method in mockProvider

* adds missing import which seem to be braking integration tests on windows

* makes sure that clients, SPs, and validators use the same provider url

* makes validator integration tests using http at 127.0.0.1:8545

* testvalidator: stop resubscribing as we are now using http polling as rpc provider

* applying review comments

* groups queryPastStorage overrides together (review comment)

* groups the historical validation tests into a sub suite

* removes the temporary extensions in marketplacesuite and multinodesuite allowing to specify provider url

* simplifies validation integration tests

* Removes debug logs when waiting for request to fail

* Renaming waitForRequestFailed => waitForRequestToFail

* renames blockNumberForBlocksAgo to pastBlockTag and makes it private

* removes redundant debugging logs

* refines logging in validation

* removes dev logging from mockmarket

* improves exception handling in provider helper procs and prepares for extraction to a separate module

* Uses chronos instead of std/times for Duration

* extracts provider and binary search helpers to a separate module

* removes redundant log entry params from validator

* unifies the notation to consistently use method call syntax

* reuses ProviderError from nim-ethers in the provider extension

* clarifies the comment in multinodesuite

* uses == operator to check the predefined tags and raises exception when `BlockTag.pending` is requested.

* when waiting for request to fail, we break on any request state that is not Started

* removes tests that were moved to testProvider from testMarket

* extracts tests that use MockProvider to a separate async suite

* improves performance of the historical state restoration

* removing redundant log messages in validator (groupIndex and groups)

* adds testProvider to testContracts group

* removes unused import in testMarket
2024-12-14 05:07:55 +00:00

177 lines
4.7 KiB
Nim

import pkg/questionable
import pkg/questionable/results
import pkg/confutils
import pkg/chronicles
import pkg/chronos/asyncproc
import pkg/libp2p
import std/os
import std/strutils
import codex/conf
import codex/utils/exceptions
import codex/utils/trackedfutures
import ./codexclient
export codexclient
export chronicles
logScope:
topics = "integration testing node process"
type
NodeProcess* = ref object of RootObj
process*: AsyncProcessRef
arguments*: seq[string]
debug: bool
trackedFutures*: TrackedFutures
name*: string
NodeProcessError* = object of CatchableError
method workingDir(node: NodeProcess): string {.base.} =
raiseAssert "not implemented"
method executable(node: NodeProcess): string {.base.} =
raiseAssert "not implemented"
method startedOutput(node: NodeProcess): string {.base.} =
raiseAssert "not implemented"
method processOptions(node: NodeProcess): set[AsyncProcessOption] {.base.} =
raiseAssert "not implemented"
method outputLineEndings(node: NodeProcess): string {.base.} =
raiseAssert "not implemented"
method onOutputLineCaptured(node: NodeProcess, line: string) {.base.} =
raiseAssert "not implemented"
method start*(node: NodeProcess) {.base, async.} =
logScope:
nodeName = node.name
let poptions = node.processOptions + {AsyncProcessOption.StdErrToStdOut}
trace "starting node",
args = node.arguments,
executable = node.executable,
workingDir = node.workingDir,
processOptions = poptions
try:
if node.debug:
echo "starting codex node with args: ", node.arguments.join(" ")
node.process = await startProcess(
node.executable,
node.workingDir,
node.arguments,
options = poptions,
stdoutHandle = AsyncProcess.Pipe
)
except CancelledError as error:
raise error
except CatchableError as e:
error "failed to start node process", error = e.msg
proc captureOutput(
node: NodeProcess,
output: string,
started: Future[void]
) {.async.} =
logScope:
nodeName = node.name
trace "waiting for output", output
let stream = node.process.stdoutStream
try:
while node.process.running.option == some true:
while(let line = await stream.readLine(0, node.outputLineEndings); line != ""):
if node.debug:
# would be nice if chronicles could parse and display with colors
echo line
if not started.isNil and not started.finished and line.contains(output):
started.complete()
node.onOutputLineCaptured(line)
await sleepAsync(1.millis)
await sleepAsync(1.millis)
except AsyncStreamReadError as e:
error "error reading output stream", error = e.msgDetail
proc startNode*[T: NodeProcess](
_: type T,
args: seq[string],
debug: string | bool = false,
name: string
): Future[T] {.async.} =
## Starts a Codex Node with the specified arguments.
## Set debug to 'true' to see output of the node.
let node = T(
arguments: @args,
debug: ($debug != "false"),
trackedFutures: TrackedFutures.new(),
name: name
)
await node.start()
return node
method stop*(node: NodeProcess) {.base, async.} =
logScope:
nodeName = node.name
await node.trackedFutures.cancelTracked()
if node.process != nil:
try:
trace "terminating node process..."
if errCode =? node.process.terminate().errorOption:
error "failed to terminate process", errCode = $errCode
trace "waiting for node process to exit"
let exitCode = await node.process.waitForExit(3.seconds)
if exitCode > 0:
error "failed to exit process, check for zombies", exitCode
trace "closing node process' streams"
await node.process.closeWait()
except CancelledError as error:
raise error
except CatchableError as e:
error "error stopping node process", error = e.msg
finally:
node.process = nil
trace "node stopped"
proc waitUntilStarted*(node: NodeProcess) {.async.} =
logScope:
nodeName = node.name
trace "waiting until node started"
let started = newFuture[void]()
try:
discard node.captureOutput(node.startedOutput, started).track(node)
await started.wait(60.seconds) # allow enough time for proof generation
trace "node started"
except AsyncTimeoutError:
# attempt graceful shutdown in case node was partially started, prevent
# zombies
await node.stop()
# raise error here so that all nodes (not just this one) can be
# shutdown gracefully
raise newException(NodeProcessError, "node did not output '" &
node.startedOutput & "'")
proc restart*(node: NodeProcess) {.async.} =
await node.stop()
await node.start()
await node.waitUntilStarted()
method removeDataDir*(node: NodeProcess) {.base.} =
raiseAssert "[removeDataDir] not implemented"