From d70ab590044c1bccb762391eb6f20d5f93021065 Mon Sep 17 00:00:00 2001 From: Eric <5089238+emizzle@users.noreply.github.com> Date: Mon, 19 Feb 2024 15:55:39 +1100 Subject: [PATCH] refactor: multinode integration test refactor (#662) * refactor multi node test suite Refactor the multinode test suite into the marketplace test suite. - Arbitrary number of nodes can be started with each test: clients, providers, validators - Hardhat can also be started locally with each test, usually for the purpose of saving and inspecting its log file. - Log files for all nodes can be persisted on disk, with configuration at the test-level - Log files, if persisted (as specified in the test), will be persisted to a CI artifact - Node config is specified at the test-level instead of the suite-level - Node/Hardhat process starting/stopping is now async, and runs much faster - Per-node config includes: - simulating proof failures - logging to file - log level - log topics - storage quota - debug (print logs to stdout) - Tests find next available ports when starting nodes, as closing ports on Windows can lag - Hardhat is no longer required to be running prior to starting the integration tests (as long as Hardhat is configured to run in the tests). - If Hardhat is already running, a snapshot will be taken and reverted before and after each test, respectively. - If Hardhat is not already running and configured to run at the test-level, a Hardhat process will be spawned and torn down before and after each test, respectively. * additional logging for debug purposes * address PR feedback - fix spelling - revert change from catching ProviderError to SignerError -- this should be handled more consistently in the Market abstraction, and will be handled in another PR. - remove method label from raiseAssert - remove unused import * Use API instead of command exec to test for free port Use chronos `createStreamServer` API to test for free port by binding localhost address and port. Use `ServerFlags.ReuseAddr` to enable reuse of same IP/Port on multiple test runs. * clean up * remove upraises annotations from tests * Update tests to work with updated erasure coding slot sizes * update dataset size, nodes, tolerance to match valid ec params Integration tests now have valid dataset sizes (blocks), tolerances, and number of nodes, to work with valid ec params. These values are validated when requested storage. Print the rest api failure message (via doAssert) when a rest api call fails (eg the rest api may validate some ec params). All integration tests pass when the async `clock.now` changes are reverted. * dont use async clock for now * fix workflow * move integration logs uplod to reusable --------- Co-authored-by: Dmitriy Ryajov --- .github/workflows/ci-reusable.yml | 8 + .gitignore | 1 + codex/node.nim | 3 +- codex/purchasing/states/submitted.nim | 1 + codex/sales/states/downloading.nim | 3 +- codex/sales/states/filling.nim | 1 + codex/sales/states/proving.nim | 13 +- codex/sales/states/provingsimulated.nim | 5 +- tests/examples.nim | 23 ++ tests/integration/clioption.nim | 13 + tests/integration/codexclient.nim | 16 +- tests/integration/codexconfig.nim | 61 ++++ tests/integration/codexprocess.nim | 75 ++++ tests/integration/hardhatconfig.nim | 6 + tests/integration/hardhatprocess.nim | 128 +++++++ tests/integration/marketplacesuite.nim | 120 +++++++ tests/integration/multinodes.nim | 365 +++++++++++++------- tests/integration/nodeconfig.nim | 34 ++ tests/integration/nodeprocess.nim | 165 +++++++++ tests/integration/testIntegration.nim | 38 --- tests/integration/testmarketplace.nim | 93 +++++ tests/integration/testproofs.nim | 435 ++++++++++++++---------- tests/testIntegration.nim | 1 + 23 files changed, 1240 insertions(+), 368 deletions(-) create mode 100644 tests/integration/clioption.nim create mode 100644 tests/integration/codexconfig.nim create mode 100644 tests/integration/codexprocess.nim create mode 100644 tests/integration/hardhatconfig.nim create mode 100644 tests/integration/hardhatprocess.nim create mode 100644 tests/integration/marketplacesuite.nim create mode 100644 tests/integration/nodeconfig.nim create mode 100644 tests/integration/nodeprocess.nim create mode 100644 tests/integration/testmarketplace.nim diff --git a/.github/workflows/ci-reusable.yml b/.github/workflows/ci-reusable.yml index 401ef161..ec84875b 100644 --- a/.github/workflows/ci-reusable.yml +++ b/.github/workflows/ci-reusable.yml @@ -70,6 +70,14 @@ jobs: if: matrix.tests == 'integration' || matrix.tests == 'all' run: make -j${ncpu} testIntegration + - name: Upload integration tests log files + uses: actions/upload-artifact@v3 + if: always() + with: + name: integration-tests-logs + path: tests/integration/logs/ + retention-days: 1 + status: if: always() needs: [build] diff --git a/.gitignore b/.gitignore index c85aa931..c1866c8c 100644 --- a/.gitignore +++ b/.gitignore @@ -39,3 +39,4 @@ docker/hostdatadir docker/prometheus-data .DS_Store nim.cfg +tests/integration/logs diff --git a/codex/node.nim b/codex/node.nim index 68896bb1..5a55c586 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -459,7 +459,8 @@ proc requestStorage*( reward = reward proofProbability = proofProbability collateral = collateral - expiry = expiry + expiry = expiry.truncate(int64) + now = self.clock.now trace "Received a request for storage!" diff --git a/codex/purchasing/states/submitted.nim b/codex/purchasing/states/submitted.nim index 391dbf7c..48666c46 100644 --- a/codex/purchasing/states/submitted.nim +++ b/codex/purchasing/states/submitted.nim @@ -35,6 +35,7 @@ method run*(state: PurchaseSubmitted, machine: Machine): Future[?State] {.async. proc withTimeout(future: Future[void]) {.async.} = let expiry = request.expiry.truncate(int64) + 1 + trace "waiting for request fulfillment or expiry", expiry await future.withTimeout(clock, expiry) try: diff --git a/codex/sales/states/downloading.nim b/codex/sales/states/downloading.nim index 118f97cf..c301ab2e 100644 --- a/codex/sales/states/downloading.nim +++ b/codex/sales/states/downloading.nim @@ -57,7 +57,8 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} # update availability size var bytes: uint = 0 for blk in blocks: - bytes += blk.data.len.uint + if not blk.cid.isEmpty: + bytes += blk.data.len.uint trace "Releasing batch of bytes written to disk", bytes return await reservations.release(reservation.id, diff --git a/codex/sales/states/filling.nim b/codex/sales/states/filling.nim index 73db9fcb..c96dd0b9 100644 --- a/codex/sales/states/filling.nim +++ b/codex/sales/states/filling.nim @@ -34,3 +34,4 @@ method run(state: SaleFilling, machine: Machine): Future[?State] {.async.} = debug "Filling slot", requestId = data.requestId, slotIndex = data.slotIndex await market.fillSlot(data.requestId, data.slotIndex, state.proof, collateral) + debug "Waiting for slot filled event...", requestId = $data.requestId, slotIndex = $data.slotIndex diff --git a/codex/sales/states/proving.nim b/codex/sales/states/proving.nim index a56242bd..ca374bd3 100644 --- a/codex/sales/states/proving.nim +++ b/codex/sales/states/proving.nim @@ -81,6 +81,7 @@ proc proveLoop( debug "Proof is required", period = currentPeriod, challenge = challenge await state.prove(slot, challenge, onProve, market, currentPeriod) + debug "waiting until next period" await waitUntilPeriod(currentPeriod + 1) method `$`*(state: SaleProving): string = "SaleProving" @@ -126,12 +127,12 @@ method run*(state: SaleProving, machine: Machine): Future[?State] {.async.} = debug "Stopping proving.", requestId = data.requestId, slotIndex = data.slotIndex if not state.loop.isNil: - if not state.loop.finished: - try: - await state.loop.cancelAndWait() - except CatchableError as e: - error "Error during cancelation of prooving loop", msg = e.msg + if not state.loop.finished: + try: + await state.loop.cancelAndWait() + except CatchableError as e: + error "Error during cancellation of proving loop", msg = e.msg - state.loop = nil + state.loop = nil return some State(SalePayout()) diff --git a/codex/sales/states/provingsimulated.nim b/codex/sales/states/provingsimulated.nim index 9c77e85a..cb3729fa 100644 --- a/codex/sales/states/provingsimulated.nim +++ b/codex/sales/states/provingsimulated.nim @@ -8,6 +8,7 @@ when codex_enable_proof_failures: import ../../contracts/requests import ../../logutils import ../../market + import ../../utils/exceptions import ../salescontext import ./proving @@ -20,7 +21,7 @@ when codex_enable_proof_failures: proofCount: int proc onSubmitProofError(error: ref CatchableError, period: UInt256, slotId: SlotId) = - error "Submitting invalid proof failed", period = period, slotId, msg = error.msg + error "Submitting invalid proof failed", period, slotId, msg = error.msgDetail method prove*(state: SaleProvingSimulated, slot: Slot, challenge: ProofChallenge, onProve: OnProve, market: Market, currentPeriod: Period) {.async.} = trace "Processing proving in simulated mode" @@ -33,7 +34,7 @@ when codex_enable_proof_failures: warn "Submitting INVALID proof", period = currentPeriod, slotId = slot.id await market.submitProof(slot.id, Groth16Proof.default) except ProviderError as e: - if not e.revertReason.contains("Invalid proof"): + if not e.msgDetail.contains("Invalid proof"): onSubmitProofError(e, currentPeriod, slot.id) except CatchableError as e: onSubmitProofError(e, currentPeriod, slot.id) diff --git a/tests/examples.nim b/tests/examples.nim index cf0b7977..c70d0dbb 100644 --- a/tests/examples.nim +++ b/tests/examples.nim @@ -4,11 +4,21 @@ import std/times import std/typetraits import pkg/codex/contracts/requests +import pkg/codex/rng import pkg/codex/contracts/proofs import pkg/codex/sales/slotqueue import pkg/codex/stores +import pkg/codex/units +import pkg/chronos +import pkg/stew/byteutils import pkg/stint + +import ./codex/helpers/randomchunker + +export randomchunker +export units + proc exampleString*(length: int): string = let chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" result = newString(length) # Create a new empty string with a given length @@ -78,3 +88,16 @@ proc example*(_: type Groth16Proof): Groth16Proof = b: G2Point.example, c: G1Point.example ) + +proc example*(_: type RandomChunker, blocks: int): Future[string] {.async.} = + # doAssert blocks >= 3, "must be more than 3 blocks" + let rng = Rng.instance() + let chunker = RandomChunker.new( + rng, size = DefaultBlockSize * blocks.NBytes, chunkSize = DefaultBlockSize) + var data: seq[byte] + while (let moar = await chunker.getBytes(); moar != []): + data.add moar + return byteutils.toHex(data) + +proc example*(_: type RandomChunker): Future[string] {.async.} = + await RandomChunker.example(3) diff --git a/tests/integration/clioption.nim b/tests/integration/clioption.nim new file mode 100644 index 00000000..379cebb8 --- /dev/null +++ b/tests/integration/clioption.nim @@ -0,0 +1,13 @@ +import pkg/questionable + +type + CliOption* = object of RootObj + nodeIdx*: ?int + key*: string + value*: string + +proc `$`*(option: CliOption): string = + var res = option.key + if option.value.len > 0: + res &= "=" & option.value + return res diff --git a/tests/integration/codexclient.nim b/tests/integration/codexclient.nim index 7cb0b50e..a0919feb 100644 --- a/tests/integration/codexclient.nim +++ b/tests/integration/codexclient.nim @@ -1,6 +1,5 @@ import std/httpclient import std/strutils -import std/sequtils from pkg/libp2p import Cid, `$`, init import pkg/stint @@ -109,25 +108,26 @@ proc requestStorage*( ## Call request storage REST endpoint ## let response = client.requestStorageRaw(cid, duration, reward, proofProbability, collateral, expiry, nodes, tolerance) - assert response.status == "200 OK" + if response.status != "200 OK": + doAssert(false, response.body) PurchaseId.fromHex(response.body).catch proc getPurchase*(client: CodexClient, purchaseId: PurchaseId): ?!RestPurchase = let url = client.baseurl & "/storage/purchases/" & purchaseId.toHex - let body = client.http.getContent(url) - let json = ? parseJson(body).catch - RestPurchase.fromJson(json) + try: + let body = client.http.getContent(url) + let json = ? parseJson(body).catch + return RestPurchase.fromJson(json) + except CatchableError as e: + return failure e.msg proc getSalesAgent*(client: CodexClient, slotId: SlotId): ?!RestSalesAgent = let url = client.baseurl & "/sales/slots/" & slotId.toHex - echo "getting sales agent for id, ", slotId.toHex try: let body = client.http.getContent(url) - echo "get sales agent body: ", body let json = ? parseJson(body).catch return RestSalesAgent.fromJson(json) except CatchableError as e: - echo "[client.getSalesAgent] error getting agent: ", e.msg return failure e.msg proc getSlots*(client: CodexClient): ?!seq[Slot] = diff --git a/tests/integration/codexconfig.nim b/tests/integration/codexconfig.nim new file mode 100644 index 00000000..eda9c81a --- /dev/null +++ b/tests/integration/codexconfig.nim @@ -0,0 +1,61 @@ +import std/options +import std/sequtils +import pkg/codex/units +import ./clioption +import ./nodeconfig + +export nodeconfig +export clioption + +type + CodexConfig* = ref object of NodeConfig + numNodes*: int + cliOptions*: seq[CliOption] + logTopics*: seq[string] + +proc nodes*(config: CodexConfig, numNodes: int): CodexConfig = + if numNodes < 0: + raise newException(ValueError, "numNodes must be >= 0") + + var startConfig = config + startConfig.numNodes = numNodes + return startConfig + +proc simulateProofFailuresFor*( + config: CodexConfig, + providerIdx: int, + failEveryNProofs: int +): CodexConfig = + + if providerIdx > config.numNodes - 1: + raise newException(ValueError, "provider index out of bounds") + + var startConfig = config + startConfig.cliOptions.add( + CliOption( + nodeIdx: some providerIdx, + key: "--simulate-proof-failures", + value: $failEveryNProofs + ) + ) + return startConfig + +proc withLogTopics*( + config: CodexConfig, + topics: varargs[string] +): CodexConfig = + + var startConfig = config + startConfig.logTopics = startConfig.logTopics.concat(@topics) + return startConfig + +proc withStorageQuota*( + config: CodexConfig, + quota: NBytes +): CodexConfig = + + var startConfig = config + startConfig.cliOptions.add( + CliOption(key: "--storage-quota", value: $quota) + ) + return startConfig diff --git a/tests/integration/codexprocess.nim b/tests/integration/codexprocess.nim new file mode 100644 index 00000000..c7c0c6f1 --- /dev/null +++ b/tests/integration/codexprocess.nim @@ -0,0 +1,75 @@ +import pkg/questionable +import pkg/questionable/results +import pkg/confutils +import pkg/chronicles +import pkg/ethers +import pkg/libp2p +import std/os +import std/strutils +import codex/conf +import ./codexclient +import ./nodeprocess + +export codexclient +export chronicles +export nodeprocess + +logScope: + topics = "integration testing codex process" + +type + CodexProcess* = ref object of NodeProcess + client: ?CodexClient + +method workingDir(node: CodexProcess): string = + return currentSourcePath() / ".." / ".." / ".." + +method executable(node: CodexProcess): string = + return "build" / "codex" + +method startedOutput(node: CodexProcess): string = + return "REST service started" + +method processOptions(node: CodexProcess): set[AsyncProcessOption] = + return {AsyncProcessOption.StdErrToStdOut} + +method outputLineEndings(node: CodexProcess): string = + return "\n" + +method onOutputLineCaptured(node: CodexProcess, line: string) = + discard + +proc dataDir(node: CodexProcess): string = + let config = CodexConf.load(cmdLine = node.arguments) + return config.dataDir.string + +proc ethAccount*(node: CodexProcess): Address = + let config = CodexConf.load(cmdLine = node.arguments) + without ethAccount =? config.ethAccount: + raiseAssert "eth account not set" + return Address(ethAccount) + +proc apiUrl*(node: CodexProcess): string = + let config = CodexConf.load(cmdLine = node.arguments) + return "http://" & config.apiBindAddress & ":" & $config.apiPort & "/api/codex/v1" + +proc client*(node: CodexProcess): CodexClient = + if client =? node.client: + return client + let client = CodexClient.new(node.apiUrl) + node.client = some client + return client + +method stop*(node: CodexProcess) {.async.} = + logScope: + nodeName = node.name + + await procCall NodeProcess(node).stop() + + trace "stopping codex client" + if client =? node.client: + client.close() + node.client = none CodexClient + +method removeDataDir*(node: CodexProcess) = + removeDir(node.dataDir) diff --git a/tests/integration/hardhatconfig.nim b/tests/integration/hardhatconfig.nim new file mode 100644 index 00000000..637b40c0 --- /dev/null +++ b/tests/integration/hardhatconfig.nim @@ -0,0 +1,6 @@ +import ./nodeconfig + +export nodeconfig + +type + HardhatConfig* = ref object of NodeConfig diff --git a/tests/integration/hardhatprocess.nim b/tests/integration/hardhatprocess.nim new file mode 100644 index 00000000..935b4d16 --- /dev/null +++ b/tests/integration/hardhatprocess.nim @@ -0,0 +1,128 @@ +import pkg/questionable +import pkg/questionable/results +import pkg/confutils +import pkg/chronicles +import pkg/chronos +import pkg/stew/io2 +import std/os +import std/sets +import std/sequtils +import std/strutils +import pkg/codex/conf +import pkg/codex/utils/trackedfutures +import ./codexclient +import ./nodeprocess + +export codexclient +export chronicles + +logScope: + topics = "integration testing hardhat process" + nodeName = "hardhat" + +type + HardhatProcess* = ref object of NodeProcess + logFile: ?IoHandle + +method workingDir(node: HardhatProcess): string = + return currentSourcePath() / ".." / ".." / ".." / "vendor" / "codex-contracts-eth" + +method executable(node: HardhatProcess): string = + return "node_modules" / ".bin" / "hardhat" + +method startedOutput(node: HardhatProcess): string = + return "Started HTTP and WebSocket JSON-RPC server at" + +method processOptions(node: HardhatProcess): set[AsyncProcessOption] = + return {} + +method outputLineEndings(node: HardhatProcess): string = + return "\n" + +proc openLogFile(node: HardhatProcess, logFilePath: string): IoHandle = + let logFileHandle = openFile( + logFilePath, + {OpenFlags.Write, OpenFlags.Create, OpenFlags.Truncate} + ) + + without fileHandle =? logFileHandle: + fatal "failed to open log file", + path = logFilePath, + errorCode = $logFileHandle.error + + raiseAssert "failed to open log file, aborting" + + return fileHandle + +method start*(node: HardhatProcess) {.async.} = + + let poptions = node.processOptions + {AsyncProcessOption.StdErrToStdOut} + trace "starting node", + args = node.arguments, + executable = node.executable, + workingDir = node.workingDir, + processOptions = poptions + + try: + node.process = await startProcess( + node.executable, + node.workingDir, + @["node", "--export", "deployment-localhost.json"].concat(node.arguments), + options = poptions, + stdoutHandle = AsyncProcess.Pipe + ) + except CatchableError as e: + error "failed to start hardhat process", error = e.msg + +proc startNode*( + _: type HardhatProcess, + args: seq[string], + debug: string | bool = false, + name: string +): Future[HardhatProcess] {.async.} = + + var logFilePath = "" + + var arguments = newSeq[string]() + for arg in args: + if arg.contains "--log-file=": + logFilePath = arg.split("=")[1] + else: + arguments.add arg + + trace "starting hardhat node", arguments + ## Starts a Hardhat Node with the specified arguments. + ## Set debug to 'true' to see output of the node. + let hardhat = HardhatProcess( + arguments: arguments, + debug: ($debug != "false"), + trackedFutures: TrackedFutures.new(), + name: "hardhat" + ) + + await hardhat.start() + + if logFilePath != "": + hardhat.logFile = some hardhat.openLogFile(logFilePath) + + return hardhat + +method onOutputLineCaptured(node: HardhatProcess, line: string) = + without logFile =? node.logFile: + return + + if error =? logFile.writeFile(line & "\n").errorOption: + error "failed to write to hardhat file", errorCode = error + discard logFile.closeFile() + node.logFile = none IoHandle + +method stop*(node: HardhatProcess) {.async.} = + # terminate the process + await procCall NodeProcess(node).stop() + + if logFile =? node.logFile: + trace "closing hardhat log file" + discard logFile.closeFile() + +method removeDataDir*(node: HardhatProcess) = + discard diff --git a/tests/integration/marketplacesuite.nim b/tests/integration/marketplacesuite.nim new file mode 100644 index 00000000..4ed7fb9f --- /dev/null +++ b/tests/integration/marketplacesuite.nim @@ -0,0 +1,120 @@ +import pkg/chronos +import pkg/ethers/erc20 +from pkg/libp2p import Cid +import pkg/codex/contracts/marketplace as mp +import pkg/codex/periods +import pkg/codex/utils/json +import ./multinodes +import ../contracts/time +import ../contracts/deployment + +export mp +export multinodes + +template marketplacesuite*(name: string, body: untyped) = + + multinodesuite name: + + var marketplace {.inject, used.}: Marketplace + var period: uint64 + var periodicity: Periodicity + var token {.inject, used.}: Erc20Token + var continuousMineFut: Future[void] + + proc getCurrentPeriod(): Future[Period] {.async.} = + return periodicity.periodOf(await ethProvider.currentTime()) + + proc advanceToNextPeriod() {.async.} = + let periodicity = Periodicity(seconds: period.u256) + let currentTime = await ethProvider.currentTime() + let currentPeriod = periodicity.periodOf(currentTime) + let endOfPeriod = periodicity.periodEnd(currentPeriod) + await ethProvider.advanceTimeTo(endOfPeriod + 1) + + template eventuallyP(condition: untyped, finalPeriod: Period): bool = + + proc eventuallyP: Future[bool] {.async.} = + while( + let currentPeriod = await getCurrentPeriod(); + currentPeriod <= finalPeriod + ): + if condition: + return true + await sleepAsync(1.millis) + return condition + + await eventuallyP() + + proc periods(p: int): uint64 = + p.uint64 * period + + proc createAvailabilities(datasetSize: int, duration: uint64) = + # post availability to each provider + for i in 0.. 1: + doAssert(origDatasetSizeInBlocks >= 3, + "dataset size must be greater than or equal to 3 blocks with " & + "more than one node") + + proc requestStorage(client: CodexClient, + cid: Cid, + proofProbability: uint64 = 1, + duration: uint64 = 12.periods, + reward = 400.u256, + collateral = 100.u256, + expiry: uint64 = 4.periods, + nodes = providers().len, + tolerance = 0, + origDatasetSizeInBlocks: int): Future[PurchaseId] {.async.} = + + let expiry = (await ethProvider.currentTime()) + expiry.u256 + + let id = client.requestStorage( + cid, + expiry=expiry, + duration=duration.u256, + proofProbability=proofProbability.u256, + collateral=collateral, + reward=reward, + nodes=nodes.uint, + tolerance=tolerance.uint + ).get + + return id + + proc continuouslyAdvanceEvery(every: chronos.Duration) {.async.} = + try: + while true: + await advanceToNextPeriod() + await sleepAsync(every) + except CancelledError: + discard + + setup: + # TODO: This is currently the address of the marketplace with a dummy + # verifier. Use real marketplace address, `Marketplace.address` once we + # can generate actual Groth16 ZK proofs. + let marketplaceAddress = Marketplace.address(dummyVerifier = true) + marketplace = Marketplace.new(marketplaceAddress, ethProvider.getSigner()) + let tokenAddress = await marketplace.token() + token = Erc20Token.new(tokenAddress, ethProvider.getSigner()) + let config = await mp.config(marketplace) + period = config.proofs.period.truncate(uint64) + periodicity = Periodicity(seconds: period.u256) + + continuousMineFut = continuouslyAdvanceEvery(chronos.millis(500)) + + teardown: + await continuousMineFut.cancelAndWait() + + body diff --git a/tests/integration/multinodes.nim b/tests/integration/multinodes.nim index c9adaa89..58a013a1 100644 --- a/tests/integration/multinodes.nim +++ b/tests/integration/multinodes.nim @@ -1,167 +1,276 @@ import std/os -import std/macros -import std/httpclient +import std/sequtils +import std/strutils +import std/sugar +import std/times import pkg/codex/logutils -import ../ethertest -import ./codexclient -import ./nodes +import pkg/chronos/transports/stream +import pkg/ethers +import ./hardhatprocess +import ./codexprocess +import ./hardhatconfig +import ./codexconfig +import ../asynctest +import ../checktest -export ethertest -export codexclient -export nodes +export asynctest +export ethers except `%` +export hardhatprocess +export codexprocess +export hardhatconfig +export codexconfig type RunningNode* = ref object role*: Role node*: NodeProcess - restClient*: CodexClient - datadir*: string - ethAccount*: Address - StartNodes* = object - clients*: uint - providers*: uint - validators*: uint - DebugNodes* = object - client*: bool - provider*: bool - validator*: bool - topics*: string + NodeConfigs* = object + clients*: CodexConfig + providers*: CodexConfig + validators*: CodexConfig + hardhat*: HardhatConfig Role* {.pure.} = enum Client, Provider, - Validator + Validator, + Hardhat -proc new*(_: type RunningNode, - role: Role, - node: NodeProcess, - restClient: CodexClient, - datadir: string, - ethAccount: Address): RunningNode = - RunningNode(role: role, - node: node, - restClient: restClient, - datadir: datadir, - ethAccount: ethAccount) +proc nextFreePort(startPort: int): Future[int] {.async.} = -proc init*(_: type StartNodes, - clients, providers, validators: uint): StartNodes = - StartNodes(clients: clients, providers: providers, validators: validators) + proc client(server: StreamServer, transp: StreamTransport) {.async.} = + await transp.closeWait() -proc init*(_: type DebugNodes, - client, provider, validator: bool, - topics: string = "validator,proving,market"): DebugNodes = - DebugNodes(client: client, provider: provider, validator: validator, - topics: topics) + var port = startPort + while true: + trace "checking if port is free", port + try: + let host = initTAddress("127.0.0.1", port) + # We use ReuseAddr here only to be able to reuse the same IP/Port when + # there's a TIME_WAIT socket. It's useful when running the test multiple + # times or if a test ran previously using the same port. + var server = createStreamServer(host, client, {ReuseAddr}) + trace "port is free", port + await server.closeWait() + return port + except TransportOsError: + trace "port is not free", port + inc port -template multinodesuite*(name: string, - startNodes: StartNodes, debugNodes: DebugNodes, body: untyped) = +template multinodesuite*(name: string, body: untyped) = - if (debugNodes.client or debugNodes.provider) and - (enabledLogLevel > LogLevel.TRACE or - enabledLogLevel == LogLevel.NONE): - echo "" - echo "More test debug logging is available by running the tests with " & - "'-d:chronicles_log_level=TRACE " & - "-d:chronicles_disabled_topics=websock " & - "-d:chronicles_default_output_device=stdout " & - "-d:chronicles_sinks=textlines'" - echo "" - - ethersuite name: + asyncchecksuite name: var running: seq[RunningNode] var bootstrap: string + let starttime = now().format("yyyy-MM-dd'_'HH:mm:ss") + var currentTestName = "" + var nodeConfigs: NodeConfigs + var ethProvider {.inject, used.}: JsonRpcProvider + var accounts {.inject, used.}: seq[Address] + var snapshot: JsonNode - proc newNodeProcess(index: int, - addlOptions: seq[string], - debug: bool): (NodeProcess, string, Address) = + template test(tname, startNodeConfigs, tbody) = + currentTestName = tname + nodeConfigs = startNodeConfigs + test tname: + tbody - if index > accounts.len - 1: - raiseAssert("Cannot start node at index " & $index & + proc sanitize(pathSegment: string): string = + var sanitized = pathSegment + for invalid in invalidFilenameChars.items: + sanitized = sanitized.replace(invalid, '_') + sanitized + + proc getLogFile(role: Role, index: ?int): string = + # create log file path, format: + # tests/integration/logs/ //_.log + + var logDir = currentSourcePath.parentDir() / + "logs" / + sanitize($starttime & " " & name) / + sanitize($currentTestName) + createDir(logDir) + + var fn = $role + if idx =? index: + fn &= "_" & $idx + fn &= ".log" + + let fileName = logDir / fn + return fileName + + proc newHardhatProcess( + config: HardhatConfig, + role: Role + ): Future[NodeProcess] {.async.} = + + var args: seq[string] = @[] + if config.logFile: + let updatedLogFile = getLogFile(role, none int) + args.add "--log-file=" & updatedLogFile + + let node = await HardhatProcess.startNode(args, config.debugEnabled, "hardhat") + await node.waitUntilStarted() + + trace "hardhat node started" + return node + + proc newCodexProcess(roleIdx: int, + config: CodexConfig, + role: Role + ): Future[NodeProcess] {.async.} = + + let nodeIdx = running.len + var conf = config + + if nodeIdx > accounts.len - 1: + raiseAssert("Cannot start node at nodeIdx " & $nodeIdx & ", not enough eth accounts.") - let datadir = getTempDir() / "Codex" & $index - var options = @[ - "--api-port=" & $(8080 + index), - "--data-dir=" & datadir, - "--nat=127.0.0.1", - "--listen-addrs=/ip4/127.0.0.1/tcp/0", - "--disc-ip=127.0.0.1", - "--disc-port=" & $(8090 + index), - "--eth-account=" & $accounts[index]] - .concat(addlOptions) - if debug: options.add "--log-level=INFO;TRACE: " & debugNodes.topics - let node = startNode(options, debug = debug) - node.waitUntilStarted() - (node, datadir, accounts[index]) + let datadir = getTempDir() / "Codex" / + sanitize($starttime) / + sanitize($role & "_" & $roleIdx) - proc newCodexClient(index: int): CodexClient = - CodexClient.new("http://localhost:" & $(8080 + index) & "/api/codex/v1") + if conf.logFile: + let updatedLogFile = getLogFile(role, some roleIdx) + conf.cliOptions.add CliOption(key: "--log-file", value: updatedLogFile) - proc startClientNode() = - let index = running.len - let (node, datadir, account) = newNodeProcess( - index, @["--persistence"], debugNodes.client) - let restClient = newCodexClient(index) - running.add RunningNode.new(Role.Client, node, restClient, datadir, - account) - if debugNodes.client: - debug "started new client node and codex client", - restApiPort = 8080 + index, discPort = 8090 + index, account + let logLevel = conf.logLevel |? LogLevel.INFO + if conf.logTopics.len > 0: + conf.cliOptions.add CliOption( + key: "--log-level", + value: $logLevel & ";TRACE: " & conf.logTopics.join(",") + ) + else: + conf.cliOptions.add CliOption(key: "--log-level", value: $logLevel) - proc startProviderNode(failEveryNProofs: uint = 0) = - let index = running.len - let (node, datadir, account) = newNodeProcess(index, @[ - "--bootstrap-node=" & bootstrap, - "--persistence", - "--simulate-proof-failures=" & $failEveryNProofs], - debugNodes.provider) - let restClient = newCodexClient(index) - running.add RunningNode.new(Role.Provider, node, restClient, datadir, - account) - if debugNodes.provider: - debug "started new provider node and codex client", - restApiPort = 8080 + index, discPort = 8090 + index, account + var args = conf.cliOptions.map(o => $o) + .concat(@[ + "--api-port=" & $ await nextFreePort(8080 + nodeIdx), + "--data-dir=" & datadir, + "--nat=127.0.0.1", + "--listen-addrs=/ip4/127.0.0.1/tcp/0", + "--disc-ip=127.0.0.1", + "--disc-port=" & $ await nextFreePort(8090 + nodeIdx), + "--eth-account=" & $accounts[nodeIdx]]) - proc startValidatorNode() = - let index = running.len - let (node, datadir, account) = newNodeProcess(index, @[ - "--bootstrap-node=" & bootstrap, - "--validator"], - debugNodes.validator) - let restClient = newCodexClient(index) - running.add RunningNode.new(Role.Validator, node, restClient, datadir, - account) - if debugNodes.validator: - debug "started new validator node and codex client", - restApiPort = 8080 + index, discPort = 8090 + index, account + let node = await CodexProcess.startNode(args, conf.debugEnabled, $role & $roleIdx) + await node.waitUntilStarted() + trace "node started", nodeName = $role & $roleIdx - proc clients(): seq[RunningNode] {.used.} = - running.filter(proc(r: RunningNode): bool = r.role == Role.Client) + return node - proc providers(): seq[RunningNode] {.used.} = - running.filter(proc(r: RunningNode): bool = r.role == Role.Provider) + proc hardhat: HardhatProcess = + for r in running: + if r.role == Role.Hardhat: + return HardhatProcess(r.node) + return nil - proc validators(): seq[RunningNode] {.used.} = - running.filter(proc(r: RunningNode): bool = r.role == Role.Validator) + proc clients: seq[CodexProcess] {.used.} = + return collect: + for r in running: + if r.role == Role.Client: + CodexProcess(r.node) + + proc providers: seq[CodexProcess] {.used.} = + return collect: + for r in running: + if r.role == Role.Provider: + CodexProcess(r.node) + + proc validators: seq[CodexProcess] {.used.} = + return collect: + for r in running: + if r.role == Role.Validator: + CodexProcess(r.node) + + proc startHardhatNode(): Future[NodeProcess] {.async.} = + var config = nodeConfigs.hardhat + return await newHardhatProcess(config, Role.Hardhat) + + proc startClientNode(): Future[NodeProcess] {.async.} = + let clientIdx = clients().len + var config = nodeConfigs.clients + config.cliOptions.add CliOption(key: "--persistence") + return await newCodexProcess(clientIdx, config, Role.Client) + + proc startProviderNode(): Future[NodeProcess] {.async.} = + let providerIdx = providers().len + var config = nodeConfigs.providers + config.cliOptions.add CliOption(key: "--bootstrap-node", value: bootstrap) + config.cliOptions.add CliOption(key: "--persistence") + + # filter out provider options by provided index + config.cliOptions = config.cliOptions.filter( + o => (let idx = o.nodeIdx |? providerIdx; idx == providerIdx) + ) + + return await newCodexProcess(providerIdx, config, Role.Provider) + + proc startValidatorNode(): Future[NodeProcess] {.async.} = + let validatorIdx = validators().len + var config = nodeConfigs.validators + config.cliOptions.add CliOption(key: "--bootstrap-node", value: bootstrap) + config.cliOptions.add CliOption(key: "--validator") + + return await newCodexProcess(validatorIdx, config, Role.Validator) setup: - for i in 0.. 0: + error "failed to exit process, check for zombies", exitCode + + trace "closing node process' streams" + await node.process.closeWait() + + except CatchableError as e: + error "error stopping node process", error = e.msg + + finally: + node.process = nil + + trace "node stopped" + +proc waitUntilStarted*(node: NodeProcess) {.async.} = + logScope: + nodeName = node.name + + trace "waiting until node started" + + let started = newFuture[void]() + try: + discard node.captureOutput(node.startedOutput, started).track(node) + await started.wait(5.seconds) + except AsyncTimeoutError as e: + # attempt graceful shutdown in case node was partially started, prevent + # zombies + await node.stop() + raiseAssert "node did not output '" & node.startedOutput & "'" + +proc restart*(node: NodeProcess) {.async.} = + await node.stop() + await node.start() + await node.waitUntilStarted() + +method removeDataDir*(node: NodeProcess) {.base.} = + raiseAssert "[removeDataDir] not implemented" diff --git a/tests/integration/testIntegration.nim b/tests/integration/testIntegration.nim index 7863cba3..79d36513 100644 --- a/tests/integration/testIntegration.nim +++ b/tests/integration/testIntegration.nim @@ -8,7 +8,6 @@ import pkg/codex/rng import pkg/stew/byteutils import pkg/ethers/erc20 import pkg/codex/contracts -import pkg/codex/utils/stintutils import ../contracts/time import ../contracts/deployment import ../codex/helpers @@ -20,12 +19,6 @@ import ./twonodes # to enable custom logging levels for specific topics like: debug2 = "INFO; TRACE: marketplace" twonodessuite "Integration tests", debug1 = false, debug2 = false: - - proc purchaseStateIs(client: CodexClient, id: PurchaseId, state: string): bool = - without purchase =? client.getPurchase(id): - return false - return purchase.state == state - setup: # Our Hardhat configuration does use automine, which means that time tracked by `ethProvider.currentTime()` is not # advanced until blocks are mined and that happens only when transaction is submitted. @@ -255,34 +248,3 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false: let responseBefore = client1.requestStorageRaw(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256, expiry=currentTime+10) check responseBefore.status == "400 Bad Request" check responseBefore.body == "Expiry has to be before the request's end (now + duration)" - - # TODO: skipping this test for now as is not passing on macos/linux for some - # reason. This test has been completely refactored in - # https://github.com/codex-storage/nim-codex/pull/607 in which it will be - # reintroduced. - # test "expired request partially pays out for stored time": - # let marketplace = Marketplace.new(Marketplace.address, ethProvider.getSigner()) - # let tokenAddress = await marketplace.token() - # let token = Erc20Token.new(tokenAddress, ethProvider.getSigner()) - # let reward = 400.u256 - # let duration = 100.u256 - - # # client 2 makes storage available - # let startBalanceClient2 = await token.balanceOf(account2) - # discard client2.postAvailability(size=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get - - # # client 1 requests storage but requires two nodes to host the content - # let startBalanceClient1 = await token.balanceOf(account1) - # let expiry = (await ethProvider.currentTime()) + 10 - # let cid = client1.upload(exampleString(100000)).get - # let id = client1.requestStorage(cid, duration=duration, reward=reward, proofProbability=3.u256, expiry=expiry, collateral=200.u256, nodes=2).get - - # # We have to wait for Client 2 fills the slot, before advancing time. - # # Until https://github.com/codex-storage/nim-codex/issues/594 is implemented nothing better then - # # sleeping some seconds is available. - # await sleepAsync(2.seconds) - # await ethProvider.advanceTimeTo(expiry+1) - # check eventually(client1.purchaseStateIs(id, "cancelled"), 20000) - - # check eventually ((await token.balanceOf(account2)) - startBalanceClient2) > 0 and ((await token.balanceOf(account2)) - startBalanceClient2) < 10*reward - # check eventually (startBalanceClient1 - (await token.balanceOf(account1))) == ((await token.balanceOf(account2)) - startBalanceClient2) diff --git a/tests/integration/testmarketplace.nim b/tests/integration/testmarketplace.nim new file mode 100644 index 00000000..27e0eeab --- /dev/null +++ b/tests/integration/testmarketplace.nim @@ -0,0 +1,93 @@ +import std/math +import pkg/stew/byteutils +import pkg/codex/units +import ./marketplacesuite +import ../examples + +marketplacesuite "Marketplace payouts": + + test "expired request partially pays out for stored time", + NodeConfigs( + # Uncomment to start Hardhat automatically, typically so logs can be inspected locally + # hardhat: HardhatConfig().withLogFile() + + clients: + CodexConfig() + .nodes(1) + # .debug() # uncomment to enable console log output.debug() + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + .withLogTopics("node", "erasure"), + + providers: + CodexConfig() + .nodes(1) + # .debug() # uncomment to enable console log output + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + .withLogTopics("node", "marketplace", "sales", "reservations", "node", "proving", "clock"), + ): + let reward = 400.u256 + let duration = 100.periods + let collateral = 200.u256 + let expiry = 4.periods + let datasetSizeInBlocks = 3 + let data = await RandomChunker.example(blocks=datasetSizeInBlocks) + let client = clients()[0] + let provider = providers()[0] + let clientApi = client.client + let providerApi = provider.client + let startBalanceProvider = await token.balanceOf(provider.ethAccount) + let startBalanceClient = await token.balanceOf(client.ethAccount) + # original data = 3 blocks so slot size will be 4 blocks + let slotSize = (DefaultBlockSize * 4.NBytes).Natural.u256 + + # provider makes storage available + discard providerApi.postAvailability( + # make availability size large enough to only fill 1 slot, thus causing a + # cancellation + size=slotSize, + duration=duration.u256, + minPrice=reward, + maxCollateral=collateral) + + let cid = clientApi.upload(data).get + + var slotIdxFilled = none UInt256 + proc onSlotFilled(event: SlotFilled) = + slotIdxFilled = some event.slotIndex + + let subscription = await marketplace.subscribe(SlotFilled, onSlotFilled) + + # client requests storage but requires two nodes to host the content + let id = await clientApi.requestStorage( + cid, + duration=duration, + reward=reward, + expiry=expiry, + collateral=collateral, + nodes=3, + tolerance=1, + origDatasetSizeInBlocks=datasetSizeInBlocks + ) + + # wait until one slot is filled + check eventually slotIdxFilled.isSome + + # wait until sale is cancelled + without requestId =? clientApi.requestId(id): + fail() + let slotId = slotId(requestId, !slotIdxFilled) + check eventually(providerApi.saleStateIs(slotId, "SaleCancelled")) + + check eventually ( + let endBalanceProvider = (await token.balanceOf(provider.ethAccount)); + let difference = endBalanceProvider - startBalanceProvider; + difference > 0 and + difference < expiry.u256*reward + ) + check eventually ( + let endBalanceClient = (await token.balanceOf(client.ethAccount)); + let endBalanceProvider = (await token.balanceOf(provider.ethAccount)); + (startBalanceClient - endBalanceClient) == (endBalanceProvider - startBalanceProvider) + ) + + await subscription.unsubscribe() diff --git a/tests/integration/testproofs.nim b/tests/integration/testproofs.nim index a3440f99..13fd1605 100644 --- a/tests/integration/testproofs.nim +++ b/tests/integration/testproofs.nim @@ -1,232 +1,299 @@ -import std/sequtils -import std/os -from std/times import getTime, toUnix -import pkg/codex/contracts +import std/math +from std/times import inMilliseconds import pkg/codex/logutils -import pkg/codex/periods +import pkg/stew/byteutils import ../contracts/time import ../contracts/deployment -import ./twonodes -import ./multinodes +import ../codex/helpers +import ../examples +import ./marketplacesuite + +export chronicles logScope: - topics = "test proofs" + topics = "integration test proofs" -# TODO: This is currently the address of the marketplace with a dummy -# verifier. Use real marketplace address once we can generate actual -# Groth16 ZK proofs. -let marketplaceAddress = Marketplace.address(dummyVerifier = true) -twonodessuite "Proving integration test", debug1=false, debug2=false: - let validatorDir = getTempDir() / "CodexValidator" +marketplacesuite "Hosts submit regular proofs": - var marketplace: Marketplace - var period: uint64 + test "hosts submit periodic proofs for slots they fill", NodeConfigs( + # Uncomment to start Hardhat automatically, typically so logs can be inspected locally + # hardhat: HardhatConfig().withLogFile(), - proc purchaseStateIs(client: CodexClient, id: PurchaseId, state: string): bool = - client.getPurchase(id).option.?state == some state + clients: + CodexConfig() + .nodes(1) + # .debug() # uncomment to enable console log output + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + .withLogTopics("node"), - setup: - marketplace = Marketplace.new(marketplaceAddress, ethProvider) - period = (await marketplace.config()).proofs.period.truncate(uint64) + providers: + CodexConfig() + .nodes(1) + # .debug() # uncomment to enable console log output + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + .withLogTopics("marketplace", "sales", "reservations", "node"), + ): + let client0 = clients()[0].client + let totalPeriods = 50 + let datasetSizeInBlocks = 2 - # Our Hardhat configuration does use automine, which means that time tracked by `ethProvider.currentTime()` is not - # advanced until blocks are mined and that happens only when transaction is submitted. - # As we use in tests ethProvider.currentTime() which uses block timestamp this can lead to synchronization issues. - await ethProvider.advanceTime(1.u256) + let data = await RandomChunker.example(blocks=1) + createAvailabilities(data.len, totalPeriods.periods) - proc waitUntilPurchaseIsStarted(proofProbability: uint64 = 3, - duration: uint64 = 100 * period, - expiry: uint64 = 30) {.async.} = - discard client2.postAvailability( - size=0xFFFFF.u256, - duration=duration.u256, - minPrice=300.u256, - maxCollateral=200.u256 - ) - let cid = client1.upload("some file contents").get - let expiry = (await ethProvider.currentTime()) + expiry.u256 - let id = client1.requestStorage( + let cid = client0.upload(data).get + + let purchaseId = await client0.requestStorage( cid, - expiry=expiry, - duration=duration.u256, - proofProbability=proofProbability.u256, - collateral=100.u256, - reward=400.u256 - ).get - check eventually client1.purchaseStateIs(id, "started") + duration=totalPeriods.periods, + origDatasetSizeInBlocks = datasetSizeInBlocks) + check eventually client0.purchaseStateIs(purchaseId, "started") - proc advanceToNextPeriod {.async.} = - let periodicity = Periodicity(seconds: period.u256) - let currentPeriod = periodicity.periodOf(await ethProvider.currentTime()) - let endOfPeriod = periodicity.periodEnd(currentPeriod) - await ethProvider.advanceTimeTo(endOfPeriod + 1) - - proc startValidator: NodeProcess = - let validator = startNode( - [ - "--data-dir=" & validatorDir, - "--api-port=8089", - "--disc-port=8099", - "--listen-addrs=/ip4/127.0.0.1/tcp/0", - "--validator", - "--eth-account=" & $accounts[2] - ], debug = false - ) - validator.waitUntilStarted() - validator - - proc stopValidator(node: NodeProcess) = - node.stop() - removeDir(validatorDir) - - test "hosts submit periodic proofs for slots they fill": - await waitUntilPurchaseIsStarted(proofProbability=1) var proofWasSubmitted = false proc onProofSubmitted(event: ProofSubmitted) = proofWasSubmitted = true + let subscription = await marketplace.subscribe(ProofSubmitted, onProofSubmitted) - await ethProvider.advanceTime(period.u256) - check eventually proofWasSubmitted - await subscription.unsubscribe() - test "validator will mark proofs as missing": - let validator = startValidator() - await waitUntilPurchaseIsStarted(proofProbability=1) - - node2.stop() - - var slotWasFreed = false - proc onSlotFreed(event: SlotFreed) = - slotWasFreed = true - let subscription = await marketplace.subscribe(SlotFreed, onSlotFreed) - - for _ in 0..<100: - if slotWasFreed: - break - else: - await advanceToNextPeriod() - await sleepAsync(1.seconds) - - check slotWasFreed + let currentPeriod = await getCurrentPeriod() + check eventuallyP(proofWasSubmitted, currentPeriod + totalPeriods.u256 + 1) await subscription.unsubscribe() - stopValidator(validator) -multinodesuite "Simulate invalid proofs", - StartNodes.init(clients=1'u, providers=0'u, validators=1'u), - DebugNodes.init(client=false, provider=false, validator=false): - proc purchaseStateIs(client: CodexClient, id: PurchaseId, state: string): bool = - client.getPurchase(id).option.?state == some state - - var marketplace: Marketplace - var period: uint64 - var slotId: SlotId - - setup: - marketplace = Marketplace.new(marketplaceAddress, ethProvider) - let config = await marketplace.config() - period = config.proofs.period.truncate(uint64) - slotId = SlotId(array[32, byte].default) # ensure we aren't reusing from prev test - - # Our Hardhat configuration does use automine, which means that time tracked by `ethProvider.currentTime()` is not - # advanced until blocks are mined and that happens only when transaction is submitted. - # As we use in tests ethProvider.currentTime() which uses block timestamp this can lead to synchronization issues. - await ethProvider.advanceTime(1.u256) - - proc periods(p: Ordinal | uint): uint64 = - when p is uint: - p * period - else: p.uint * period - - proc advanceToNextPeriod {.async.} = - let periodicity = Periodicity(seconds: period.u256) - let currentPeriod = periodicity.periodOf(await ethProvider.currentTime()) - let endOfPeriod = periodicity.periodEnd(currentPeriod) - await ethProvider.advanceTimeTo(endOfPeriod + 1) - - proc waitUntilPurchaseIsStarted(proofProbability: uint64 = 1, - duration: uint64 = 12.periods, - expiry: uint64 = 4.periods) {.async.} = - - if clients().len < 1 or providers().len < 1: - raiseAssert("must start at least one client and one ethProvider") - - let client = clients()[0].restClient - let storageProvider = providers()[0].restClient - - discard storageProvider.postAvailability( - size=0xFFFFF.u256, - duration=duration.u256, - minPrice=300.u256, - maxCollateral=200.u256 - ) - let cid = client.upload("some file contents " & $ getTime().toUnix).get - let expiry = (await ethProvider.currentTime()) + expiry.u256 - # avoid timing issues by filling the slot at the start of the next period - await advanceToNextPeriod() - let id = client.requestStorage( - cid, - expiry=expiry, - duration=duration.u256, - proofProbability=proofProbability.u256, - collateral=100.u256, - reward=400.u256 - ).get - check eventually client.purchaseStateIs(id, "started") - let purchase = client.getPurchase(id).get - slotId = slotId(purchase.requestId, 0.u256) +marketplacesuite "Simulate invalid proofs": # TODO: these are very loose tests in that they are not testing EXACTLY how # proofs were marked as missed by the validator. These tests should be # tightened so that they are showing, as an integration test, that specific # proofs are being marked as missed by the validator. - test "slot is freed after too many invalid proofs submitted": - let failEveryNProofs = 2'u - let totalProofs = 100'u - startProviderNode(failEveryNProofs) + test "slot is freed after too many invalid proofs submitted", NodeConfigs( + # Uncomment to start Hardhat automatically, typically so logs can be inspected locally + # hardhat: HardhatConfig().withLogFile(), - await waitUntilPurchaseIsStarted(duration=totalProofs.periods) + clients: + CodexConfig() + .nodes(1) + # .debug() # uncomment to enable console log output + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + .withLogTopics("node"), + + providers: + CodexConfig() + .nodes(1) + .simulateProofFailuresFor(providerIdx=0, failEveryNProofs=1) + # .debug() # uncomment to enable console log output + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + .withLogTopics("marketplace", "sales", "reservations", "node"), + + validators: + CodexConfig() + .nodes(1) + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + # .debug() # uncomment to enable console log output + .withLogTopics("validator", "onchain", "ethers") + ): + let client0 = clients()[0].client + let totalPeriods = 50 + + let datasetSizeInBlocks = 2 + let data = await RandomChunker.example(blocks=datasetSizeInBlocks) + createAvailabilities(data.len, totalPeriods.periods) + + let cid = client0.upload(data).get + + let purchaseId = await client0.requestStorage( + cid, + duration=totalPeriods.periods, + origDatasetSizeInBlocks=datasetSizeInBlocks) + let requestId = client0.requestId(purchaseId).get + + check eventually client0.purchaseStateIs(purchaseId, "started") var slotWasFreed = false proc onSlotFreed(event: SlotFreed) = - if slotId(event.requestId, event.slotIndex) == slotId: + if event.requestId == requestId and + event.slotIndex == 0.u256: # assume only one slot, so index 0 slotWasFreed = true + let subscription = await marketplace.subscribe(SlotFreed, onSlotFreed) - for _ in 0.. //_.log + .withLogTopics("node"), + + providers: + CodexConfig() + .nodes(1) + .simulateProofFailuresFor(providerIdx=0, failEveryNProofs=3) + # .debug() # uncomment to enable console log output + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + .withLogTopics("marketplace", "sales", "reservations", "node"), + + validators: + CodexConfig() + .nodes(1) + # .debug() + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + .withLogTopics("validator", "onchain", "ethers") + ): + let client0 = clients()[0].client + let totalPeriods = 25 + + let datasetSizeInBlocks = 2 + let data = await RandomChunker.example(blocks=datasetSizeInBlocks) + createAvailabilities(data.len, totalPeriods.periods) + + let cid = client0.upload(data).get + + let purchaseId = await client0.requestStorage( + cid, + duration=totalPeriods.periods, + origDatasetSizeInBlocks=datasetSizeInBlocks) + let requestId = client0.requestId(purchaseId).get + + check eventually client0.purchaseStateIs(purchaseId, "started") var slotWasFreed = false proc onSlotFreed(event: SlotFreed) = - if slotId(event.requestId, event.slotIndex) == slotId: + if event.requestId == requestId and + event.slotIndex == 0.u256: slotWasFreed = true + let subscription = await marketplace.subscribe(SlotFreed, onSlotFreed) - for _ in 0.. //_.log + .withLogTopics("node", "erasure", "clock", "purchases"), + + providers: + CodexConfig() + .nodes(3) + .simulateProofFailuresFor(providerIdx=0, failEveryNProofs=2) + # .debug() # uncomment to enable console log output + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + .withLogTopics("marketplace", "sales", "reservations", "node"), + + validators: + CodexConfig() + .nodes(1) + # .debug() + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + .withLogTopics("validator") + ): + let client0 = clients()[0].client + let provider0 = providers()[0] + let provider1 = providers()[1] + let provider2 = providers()[2] + let totalPeriods = 25 + + let datasetSizeInBlocks = 3 + let data = await RandomChunker.example(blocks=datasetSizeInBlocks) + # original data = 3 blocks so slot size will be 4 blocks + let slotSize = (DefaultBlockSize * 4.NBytes).Natural.u256 + + discard provider0.client.postAvailability( + size=slotSize, # should match 1 slot only + duration=totalPeriods.periods.u256, + minPrice=300.u256, + maxCollateral=200.u256 + ) + + let cid = client0.upload(data).get + + let purchaseId = await client0.requestStorage( + cid, + duration=totalPeriods.periods, + expiry=10.periods, + nodes=3, + tolerance=1, + origDatasetSizeInBlocks=datasetSizeInBlocks + ) + + without requestId =? client0.requestId(purchaseId): + fail() + + var filledSlotIds: seq[SlotId] = @[] + proc onSlotFilled(event: SlotFilled) = + let slotId = slotId(event.requestId, event.slotIndex) + filledSlotIds.add slotId + + let subscription = await marketplace.subscribe(SlotFilled, onSlotFilled) + + # wait til first slot is filled + check eventually filledSlotIds.len > 0 + + # now add availability for providers 1 and 2, which should allow them to to + # put the remaining slots in their queues + discard provider1.client.postAvailability( + size=slotSize, # should match 1 slot only + duration=totalPeriods.periods.u256, + minPrice=300.u256, + maxCollateral=200.u256 + ) + + check eventually filledSlotIds.len > 1 + + discard provider2.client.postAvailability( + size=slotSize, # should match 1 slot only + duration=totalPeriods.periods.u256, + minPrice=300.u256, + maxCollateral=200.u256 + ) + + check eventually filledSlotIds.len > 2 + + # Wait til second slot is filled. SaleFilled happens too quickly, check SaleProving instead. + check eventually provider1.client.saleStateIs(filledSlotIds[1], "SaleProving") + check eventually provider2.client.saleStateIs(filledSlotIds[2], "SaleProving") + + check eventually client0.purchaseStateIs(purchaseId, "started") + + let currentPeriod = await getCurrentPeriod() + check eventuallyP( + # SaleFinished happens too quickly, check SalePayout instead + provider0.client.saleStateIs(filledSlotIds[0], "SalePayout"), + currentPeriod + totalPeriods.u256 + 1) + + check eventuallyP( + # SaleFinished happens too quickly, check SalePayout instead + provider1.client.saleStateIs(filledSlotIds[1], "SalePayout"), + currentPeriod + totalPeriods.u256 + 1) + + check eventuallyP( + # SaleFinished happens too quickly, check SalePayout instead + provider2.client.saleStateIs(filledSlotIds[2], "SalePayout"), + currentPeriod + totalPeriods.u256 + 1) + + check eventually( + (await token.balanceOf(provider1.ethAccount)) > + (await token.balanceOf(provider0.ethAccount)) + ) await subscription.unsubscribe() diff --git a/tests/testIntegration.nim b/tests/testIntegration.nim index 926e992a..eca2e957 100644 --- a/tests/testIntegration.nim +++ b/tests/testIntegration.nim @@ -1,6 +1,7 @@ import ./integration/testcli import ./integration/testIntegration import ./integration/testblockexpiration +import ./integration/testmarketplace import ./integration/testproofs {.warning[UnusedImport]:off.}