parallel tests running

- need to test with longer tests to ensure the parallelisation is truly happening
- is the +10 hardhat port needed?
- try with more integration tests
This commit is contained in:
Eric 2024-12-20 16:23:40 +11:00
parent 20bb5e5a38
commit 2ee0146d42
No known key found for this signature in database
7 changed files with 394 additions and 39 deletions

View File

@ -16,10 +16,10 @@ import ./nodeprocess
export codexclient export codexclient
export chronicles export chronicles
export nodeprocess
logScope: logScope:
topics = "integration testing hardhat process" topics = "integration testing hardhat process"
nodeName = "hardhat"
type type
HardhatProcess* = ref object of NodeProcess HardhatProcess* = ref object of NodeProcess
@ -41,6 +41,9 @@ method outputLineEndings(node: HardhatProcess): string {.raises: [].} =
return "\n" return "\n"
proc openLogFile(node: HardhatProcess, logFilePath: string): IoHandle = proc openLogFile(node: HardhatProcess, logFilePath: string): IoHandle =
logScope:
nodeName = node.name
let logFileHandle = openFile( let logFileHandle = openFile(
logFilePath, logFilePath,
{OpenFlags.Write, OpenFlags.Create, OpenFlags.Truncate} {OpenFlags.Write, OpenFlags.Create, OpenFlags.Truncate}
@ -57,6 +60,9 @@ proc openLogFile(node: HardhatProcess, logFilePath: string): IoHandle =
method start*(node: HardhatProcess) {.async.} = method start*(node: HardhatProcess) {.async.} =
logScope:
nodeName = node.name
let poptions = node.processOptions + {AsyncProcessOption.StdErrToStdOut} let poptions = node.processOptions + {AsyncProcessOption.StdErrToStdOut}
trace "starting node", trace "starting node",
args = node.arguments, args = node.arguments,
@ -84,6 +90,9 @@ proc startNode*(
name: string name: string
): Future[HardhatProcess] {.async.} = ): Future[HardhatProcess] {.async.} =
logScope:
nodeName = name
var logFilePath = "" var logFilePath = ""
var arguments = newSeq[string]() var arguments = newSeq[string]()
@ -100,7 +109,7 @@ proc startNode*(
arguments: arguments, arguments: arguments,
debug: ($debug != "false"), debug: ($debug != "false"),
trackedFutures: TrackedFutures.new(), trackedFutures: TrackedFutures.new(),
name: "hardhat" name: name
) )
await hardhat.start() await hardhat.start()
@ -111,6 +120,10 @@ proc startNode*(
return hardhat return hardhat
method onOutputLineCaptured(node: HardhatProcess, line: string) = method onOutputLineCaptured(node: HardhatProcess, line: string) =
logScope:
nodeName = node.name
without logFile =? node.logFile: without logFile =? node.logFile:
return return

View File

@ -13,6 +13,7 @@ import ./codexprocess
import ./hardhatconfig import ./hardhatconfig
import ./hardhatprocess import ./hardhatprocess
import ./nodeconfigs import ./nodeconfigs
import ./utils
import ../asynctest import ../asynctest
import ../checktest import ../checktest
@ -34,30 +35,14 @@ type
Hardhat Hardhat
MultiNodeSuiteError = object of CatchableError MultiNodeSuiteError = object of CatchableError
const HardhatPort {.intdefine.}: int = 8545
const CodexApiPort {.intdefine.}: int = 8080
const CodexDiscPort {.intdefine.}: int = 8090
const TestId {.strdefine.}: string = "TestId"
proc raiseMultiNodeSuiteError(msg: string) = proc raiseMultiNodeSuiteError(msg: string) =
raise newException(MultiNodeSuiteError, msg) raise newException(MultiNodeSuiteError, msg)
proc nextFreePort(startPort: int): Future[int] {.async.} =
proc client(server: StreamServer, transp: StreamTransport) {.async.} =
await transp.closeWait()
var port = startPort
while true:
trace "checking if port is free", port
try:
let host = initTAddress("127.0.0.1", port)
# We use ReuseAddr here only to be able to reuse the same IP/Port when
# there's a TIME_WAIT socket. It's useful when running the test multiple
# times or if a test ran previously using the same port.
var server = createStreamServer(host, client, {ReuseAddr})
trace "port is free", port
await server.closeWait()
return port
except TransportOsError:
trace "port is not free", port
inc port
template multinodesuite*(name: string, body: untyped) = template multinodesuite*(name: string, body: untyped) =
asyncchecksuite name: asyncchecksuite name:
@ -81,7 +66,7 @@ template multinodesuite*(name: string, body: untyped) =
# .withEthProvider("ws://localhost:8545") # .withEthProvider("ws://localhost:8545")
# .some, # .some,
# ... # ...
let jsonRpcProviderUrl = "http://127.0.0.1:8545" var jsonRpcProviderUrl = "http://127.0.0.1:" & $HardhatPort
var running {.inject, used.}: seq[RunningNode] var running {.inject, used.}: seq[RunningNode]
var bootstrap: string var bootstrap: string
let starttime = now().format("yyyy-MM-dd'_'HH:mm:ss") let starttime = now().format("yyyy-MM-dd'_'HH:mm:ss")
@ -122,6 +107,10 @@ template multinodesuite*(name: string, body: untyped) =
let fileName = logDir / fn let fileName = logDir / fn
return fileName return fileName
proc updatePort(url: var string, port: int) =
let parts = url.split(':')
url = @[parts[0], parts[1], $port].join(":")
proc newHardhatProcess( proc newHardhatProcess(
config: HardhatConfig, config: HardhatConfig,
role: Role role: Role
@ -132,6 +121,12 @@ template multinodesuite*(name: string, body: untyped) =
let updatedLogFile = getLogFile(role, none int) let updatedLogFile = getLogFile(role, none int)
args.add "--log-file=" & updatedLogFile args.add "--log-file=" & updatedLogFile
let port = await nextFreePort(HardhatPort)
jsonRpcProviderUrl.updatePort(port)
trace "updated jsonRpcProviderUrl", jsonRpcProviderUrl
args.add("--port")
args.add($port)
let node = await HardhatProcess.startNode(args, config.debugEnabled, "hardhat") let node = await HardhatProcess.startNode(args, config.debugEnabled, "hardhat")
try: try:
await node.waitUntilStarted() await node.waitUntilStarted()
@ -154,6 +149,7 @@ template multinodesuite*(name: string, body: untyped) =
", not enough eth accounts." ", not enough eth accounts."
let datadir = getTempDir() / "Codex" / let datadir = getTempDir() / "Codex" /
sanitize(TestId) /
sanitize($starttime) / sanitize($starttime) /
sanitize($role & "_" & $roleIdx) sanitize($role & "_" & $roleIdx)
@ -164,12 +160,12 @@ template multinodesuite*(name: string, body: untyped) =
if bootstrap.len > 0: if bootstrap.len > 0:
config.addCliOption("--bootstrap-node", bootstrap) config.addCliOption("--bootstrap-node", bootstrap)
config.addCliOption("--api-port", $ await nextFreePort(8080 + nodeIdx)) config.addCliOption("--api-port", $ await nextFreePort(CodexApiPort + nodeIdx))
config.addCliOption("--data-dir", datadir) config.addCliOption("--data-dir", datadir)
config.addCliOption("--nat", "127.0.0.1") config.addCliOption("--nat", "127.0.0.1")
config.addCliOption("--listen-addrs", "/ip4/127.0.0.1/tcp/0") config.addCliOption("--listen-addrs", "/ip4/127.0.0.1/tcp/0")
config.addCliOption("--disc-ip", "127.0.0.1") config.addCliOption("--disc-ip", "127.0.0.1")
config.addCliOption("--disc-port", $ await nextFreePort(8090 + nodeIdx)) config.addCliOption("--disc-port", $ await nextFreePort(CodexDiscPort + nodeIdx))
except CodexConfigError as e: except CodexConfigError as e:
raiseMultiNodeSuiteError "invalid cli option, error: " & e.msg raiseMultiNodeSuiteError "invalid cli option, error: " & e.msg

View File

@ -162,6 +162,9 @@ proc waitUntilOutput*(node: NodeProcess, output: string) {.async.} =
await started.wait(60.seconds) # allow enough time for proof generation await started.wait(60.seconds) # allow enough time for proof generation
proc waitUntilStarted*(node: NodeProcess) {.async.} = proc waitUntilStarted*(node: NodeProcess) {.async.} =
logScope:
nodeName = node.name
try: try:
await node.waitUntilOutput(node.startedOutput) await node.waitUntilOutput(node.startedOutput)
trace "node started" trace "node started"

View File

@ -5,16 +5,22 @@ import ../asynctest
import ../checktest import ../checktest
import ./codexprocess import ./codexprocess
import ./nodeprocess import ./nodeprocess
import ./utils
import ../examples import ../examples
asyncchecksuite "Command line interface": asyncchecksuite "Command line interface":
let key = "4242424242424242424242424242424242424242424242424242424242424242" let key = "4242424242424242424242424242424242424242424242424242424242424242"
var nodeCount = -1
proc startCodex(args: seq[string]): Future[CodexProcess] {.async.} = proc startCodex(args: seq[string]): Future[CodexProcess] {.async.} =
inc nodeCount
return await CodexProcess.startNode( return await CodexProcess.startNode(
args, args.concat(@[
false, "--api-port=" & $(await nextFreePort(8080 + nodeCount)),
"--disc-port=" & $(await nextFreePort(8090 + nodeCount))
]),
true,
"cli-test-node" "cli-test-node"
) )

View File

@ -0,0 +1,275 @@
import std/os
import std/strformat
import pkg/chronos
import pkg/chronos/asyncproc
import pkg/codex/utils/exceptions
import pkg/codex/logutils
import pkg/questionable
import pkg/questionable/results
import ./hardhatprocess
import ./utils
import ../examples
type
TestManager* = ref object
configs: seq[IntegrationTestConfig]
tests: seq[IntegrationTest]
hardhats: seq[HardhatProcess]
lastHardhatPort: int
lastCodexApiPort: int
lastCodexDiscPort: int
debugTestHarness: bool # output chronicles logs for the manager and multinodes harness
debugHardhat: bool
timeStart: Moment
timeEnd: Moment
codexPortLock: AsyncLock
hardhatPortLock: AsyncLock
IntegrationTestConfig* = object
startHardhat*: bool
testFile*: string
name*: string
IntegrationTest = ref object
config: IntegrationTestConfig
process: Future[CommandExResponse].Raising([AsyncProcessError, AsyncProcessTimeoutError, CancelledError])
timeStart: Moment
timeEnd: Moment
output: ?!CommandExResponse
testId: string # when used in datadir path, prevents data dir clashes
TestManagerError = object of CatchableError
{.push raises: [].}
logScope:
topics = "testing integration testmanager"
func new*(
_: type TestManager,
configs: seq[IntegrationTestConfig],
debugTestHarness = false,
debugHardhat = false): TestManager =
TestManager(
configs: configs,
lastHardhatPort: 8545,
lastCodexApiPort: 8000,
lastCodexDiscPort: 9000,
debugTestHarness: debugTestHarness,
debugHardhat: debugHardhat
)
proc raiseTestManagerError(msg: string, parent: ref CatchableError = nil) {.raises: [TestManagerError].} =
raise newException(TestManagerError, msg, parent)
template withLock*(lock: AsyncLock, body: untyped) =
if lock.isNil:
lock = newAsyncLock()
await lock.acquire()
try:
body
await sleepAsync(1.millis)
finally:
try:
lock.release()
except AsyncLockError as e:
raiseAssert "failed to release lock, error: " & e.msg
proc startHardhat(
manager: TestManager,
config: IntegrationTestConfig): Future[int] {.async: (raises: [CancelledError, TestManagerError]).} =
var args: seq[string] = @[]
var port: int
withLock(manager.hardhatPortLock):
port = await nextFreePort(manager.lastHardhatPort + 10)
manager.lastHardhatPort = port
args.add("--port")
args.add($port)
trace "starting hardhat process on port ", port
try:
let node = await HardhatProcess.startNode(
args,
manager.debugHardhat,
"hardhat for '" & config.name & "'")
await node.waitUntilStarted()
manager.hardhats.add node
return port
except CancelledError as e:
raise e
except CatchableError as e:
raiseTestManagerError "hardhat node failed to start: " & e.msg, e
proc printOutput(manager: TestManager, test: IntegrationTest) =
without output =? test.output, error:
echo "[FATAL] Test '", test.config.name, "' failed to run to completion"
echo " Error: ", error.msg
echo " Stacktrace: ", error.getStackTrace()
return
if output.status != 0:
if manager.debugTestHarness:
echo output.stdError
echo output.stdOutput
echo "[FAILED] Test '", test.config.name, "' failed"
else:
echo output.stdOutput
echo "[OK] Test '", test.config.name, "' succeeded"
proc runTest(manager: TestManager, config: IntegrationTestConfig) {.async: (raises: [CancelledError]).} =
logScope:
config
trace "Running test"
var test = IntegrationTest(
config: config,
testId: $ uint16.example
)
var hardhatPort = 0
if config.startHardhat:
try:
hardhatPort = await manager.startHardhat(config)
except TestManagerError as e:
e.msg = "Failed to start hardhat: " & e.msg
test.output = CommandExResponse.failure(e)
var apiPort, discPort: int
withLock(manager.codexPortLock):
# inc by 20 to allow each test to run 20 codex nodes (clients, SPs,
# validators) giving a good chance the port will be free
apiPort = await nextFreePort(manager.lastCodexApiPort + 20)
manager.lastCodexApiPort = apiPort
discPort = await nextFreePort(manager.lastCodexDiscPort + 20)
manager.lastCodexDiscPort = discPort
var logging = ""
if manager.debugTestHarness:
logging = "-d:chronicles_log_level=TRACE " &
"-d:chronicles_disabled_topics=websock " &
"-d:chronicles_default_output_device=stdout " &
"-d:chronicles_sinks=textlines"
var testFile: string
try:
testFile = absolutePath(
config.testFile,
root = currentSourcePath().parentDir().parentDir())
except ValueError as e:
raiseAssert "bad file name, testFile: " & config.testFile & ", error: " & e.msg
var command: string
try:
withLock(manager.hardhatPortLock):
command = "nim c " &
&"-d:CodexApiPort={apiPort} " &
&"-d:CodexDiscPort={discPort} " &
(if config.startHardhat:
&"-d:HardhatPort={hardhatPort} "
else: "") &
&"-d:TestId={test.testId} " &
&"{logging} " &
"--verbosity:0 " &
"--hints:off " &
"-d:release " &
"-r " &
&"{testFile}"
except ValueError as e:
raiseAssert "bad command" &
", apiPort: " & $apiPort &
", discPort: " & $discPort &
", logging: " & logging &
", testFile: " & testFile &
", error: " & e.msg
trace "Starting parallel integration test", command
test.timeStart = Moment.now()
test.process = execCommandEx(
command = command,
# options = {AsyncProcessOption.StdErrToStdOut, AsyncProcessOption.EvalCommand},
timeout = 60.minutes
)
manager.tests.add test
try:
test.output = success(await test.process) # waits on waitForExit
test.timeEnd = Moment.now()
# echo "[OK] Test '" & config.name & "' completed in ", test.timeEnd - test.timeStart
info "Test completed", name = config.name, duration = test.timeEnd - test.timeStart
manager.printOutput(test)
except CancelledError as e:
raise e
except AsyncProcessTimeoutError as e:
test.timeEnd = Moment.now()
# echo "[TIMEOUT] Test '" & config.name & "' timed out in ", test.timeEnd - test.timeStart
error "Test timed out", name = config.name, duration = test.timeEnd - test.timeStart
test.output = CommandExResponse.failure(e)
manager.printOutput(test)
except AsyncProcessError as e:
test.timeEnd = Moment.now()
# echo "[FAILED] Test '" & config.name & "' failed in ", test.timeEnd - test.timeStart
error "Test failed to complete", name = config.name,duration = test.timeEnd - test.timeStart
test.output = CommandExResponse.failure(e)
manager.printOutput(test)
proc runTests(manager: TestManager) {.async: (raises: [CancelledError]).} =
var testFutures: seq[Future[void].Raising([CancelledError])]
manager.timeStart = Moment.now()
for config in manager.configs:
testFutures.add manager.runTest(config)
await allFutures testFutures
manager.timeEnd = Moment.now()
proc printOutput(manager: TestManager) =
var successes = 0
echo "▢=====================================================================▢"
echo "| TEST SUMMARY |"
echo "="
for test in manager.tests:
without output =? test.output:
echo "| [FATAL] Test '", test.config.name, "' failed to run to completion"
continue
if output.status != 0:
echo "| [FAILED] Test '", test.config.name, "' failed"
else:
echo "| [OK] Test '", test.config.name, "' succeeded"
inc successes
echo "| ------------------------------------------------------------------- |"
echo "| PASSING : ", successes, " / ", manager.tests.len
let totalDuration = manager.timeEnd - manager.timeStart
echo "| TOTAL TIME : ", totalDuration
var totalDurationSerial: Duration
for test in manager.tests:
totalDurationSerial += (test.timeEnd - test.timeStart)
# estimated time saved as serial execution with a single hardhat instance
# incurs less overhead
echo "| EST TOTAL TIME IF RUN SERIALLY: ", totalDurationSerial
echo "| EST TIME SAVED (ROUGH) : ", totalDurationSerial - totalDuration
echo "▢=====================================================================▢"
proc start*(manager: TestManager) {.async: (raises: [CancelledError]).} =
await manager.runTests()
manager.printOutput()
proc stop*(manager: TestManager) {.async: (raises: [CancelledError]).} =
for test in manager.tests:
if not test.process.isNil and not test.process.finished:
await test.process.cancelAndWait()
for hardhat in manager.hardhats:
try:
await hardhat.stop()
except CatchableError as e:
trace "failed to stop hardhat node", error = e.msg

View File

@ -0,0 +1,25 @@
import pkg/chronos
import pkg/codex/logutils
proc nextFreePort*(startPort: int): Future[int] {.async: (raises:[CancelledError]).} =
proc client(server: StreamServer, transp: StreamTransport) {.async.} =
await transp.closeWait()
var port = startPort
while true:
trace "checking if port is free", port
try:
let host = initTAddress("127.0.0.1", port)
# We use ReuseAddr here only to be able to reuse the same IP/Port when
# there's a TIME_WAIT socket. It's useful when running the test multiple
# times or if a test ran previously using the same port.
var server = createStreamServer(host, client, {ReuseAddr})
trace "port is free", port
await server.closeWait()
return port
except TransportOsError:
trace "port is not free", port
inc port
except TransportAddressError:
raiseAssert "bad address"

View File

@ -1,12 +1,49 @@
import ./integration/testcli # import ./integration/testcli
import ./integration/testrestapi # import ./integration/testrestapi
import ./integration/testupdownload # import ./integration/testupdownload
import ./integration/testsales # import ./integration/testsales
import ./integration/testpurchasing # import ./integration/testpurchasing
import ./integration/testblockexpiration # import ./integration/testblockexpiration
import ./integration/testmarketplace # import ./integration/testmarketplace
import ./integration/testproofs # import ./integration/testproofs
import ./integration/testvalidator # import ./integration/testvalidator
import ./integration/testecbug # import ./integration/testecbug
import pkg/chronos
import pkg/codex/logutils
import ./integration/testmanager
{.warning[UnusedImport]:off.} {.warning[UnusedImport]:off.}
const TestConfigs = @[
# IntegrationTestConfig(testFile: "./integration/testcli", startHardhat: true),
# IntegrationTestConfig(testFile: "./integration/testrestapi", startHardhat: true),
# IntegrationTestConfig(testFile: "./integration/testupdownload", startHardhat: true),
# IntegrationTestConfig(testFile: "./integration/testsales", startHardhat: true),
# IntegrationTestConfig(testFile: "./integration/testpurchasing", startHardhat: true),
# IntegrationTestConfig(testFile: "./integration/testblockexpiration", startHardhat: true),
IntegrationTestConfig(
name: "Basic Marketplace and payout tests",
testFile: "./integration/testmarketplace",
startHardhat: true),
# IntegrationTestConfig(testFile: "./integration/testproofs", startHardhat: true),
# IntegrationTestConfig(testFile: "./integration/testvalidator", startHardhat: true),
IntegrationTestConfig(
name: "Erasure Coding Bug",
testFile: "./integration/testecbug",
startHardhat: true)
]
proc run() {.async.} =
let manager = TestManager.new(
configs = TestConfigs,
debugTestHarness = true,
debugHardhat = false)
try:
trace "starting test manager"
await manager.start()
finally:
trace "stopping test manager"
await manager.stop()
waitFor run()