diff --git a/.github/workflows/ci-reusable.yml b/.github/workflows/ci-reusable.yml index 401ef161..ec84875b 100644 --- a/.github/workflows/ci-reusable.yml +++ b/.github/workflows/ci-reusable.yml @@ -70,6 +70,14 @@ jobs: if: matrix.tests == 'integration' || matrix.tests == 'all' run: make -j${ncpu} testIntegration + - name: Upload integration tests log files + uses: actions/upload-artifact@v3 + if: always() + with: + name: integration-tests-logs + path: tests/integration/logs/ + retention-days: 1 + status: if: always() needs: [build] diff --git a/.gitignore b/.gitignore index c85aa931..c1866c8c 100644 --- a/.gitignore +++ b/.gitignore @@ -39,3 +39,4 @@ docker/hostdatadir docker/prometheus-data .DS_Store nim.cfg +tests/integration/logs diff --git a/codex/node.nim b/codex/node.nim index 68896bb1..5a55c586 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -459,7 +459,8 @@ proc requestStorage*( reward = reward proofProbability = proofProbability collateral = collateral - expiry = expiry + expiry = expiry.truncate(int64) + now = self.clock.now trace "Received a request for storage!" diff --git a/codex/purchasing/states/submitted.nim b/codex/purchasing/states/submitted.nim index 391dbf7c..48666c46 100644 --- a/codex/purchasing/states/submitted.nim +++ b/codex/purchasing/states/submitted.nim @@ -35,6 +35,7 @@ method run*(state: PurchaseSubmitted, machine: Machine): Future[?State] {.async. proc withTimeout(future: Future[void]) {.async.} = let expiry = request.expiry.truncate(int64) + 1 + trace "waiting for request fulfillment or expiry", expiry await future.withTimeout(clock, expiry) try: diff --git a/codex/sales/states/downloading.nim b/codex/sales/states/downloading.nim index 118f97cf..c301ab2e 100644 --- a/codex/sales/states/downloading.nim +++ b/codex/sales/states/downloading.nim @@ -57,7 +57,8 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} # update availability size var bytes: uint = 0 for blk in blocks: - bytes += blk.data.len.uint + if not blk.cid.isEmpty: + bytes += blk.data.len.uint trace "Releasing batch of bytes written to disk", bytes return await reservations.release(reservation.id, diff --git a/codex/sales/states/filling.nim b/codex/sales/states/filling.nim index 73db9fcb..c96dd0b9 100644 --- a/codex/sales/states/filling.nim +++ b/codex/sales/states/filling.nim @@ -34,3 +34,4 @@ method run(state: SaleFilling, machine: Machine): Future[?State] {.async.} = debug "Filling slot", requestId = data.requestId, slotIndex = data.slotIndex await market.fillSlot(data.requestId, data.slotIndex, state.proof, collateral) + debug "Waiting for slot filled event...", requestId = $data.requestId, slotIndex = $data.slotIndex diff --git a/codex/sales/states/proving.nim b/codex/sales/states/proving.nim index a56242bd..ca374bd3 100644 --- a/codex/sales/states/proving.nim +++ b/codex/sales/states/proving.nim @@ -81,6 +81,7 @@ proc proveLoop( debug "Proof is required", period = currentPeriod, challenge = challenge await state.prove(slot, challenge, onProve, market, currentPeriod) + debug "waiting until next period" await waitUntilPeriod(currentPeriod + 1) method `$`*(state: SaleProving): string = "SaleProving" @@ -126,12 +127,12 @@ method run*(state: SaleProving, machine: Machine): Future[?State] {.async.} = debug "Stopping proving.", requestId = data.requestId, slotIndex = data.slotIndex if not state.loop.isNil: - if not state.loop.finished: - try: - await state.loop.cancelAndWait() - except CatchableError as e: - error "Error during cancelation of prooving loop", msg = e.msg + if not state.loop.finished: + try: + await state.loop.cancelAndWait() + except CatchableError as e: + error "Error during cancellation of proving loop", msg = e.msg - state.loop = nil + state.loop = nil return some State(SalePayout()) diff --git a/codex/sales/states/provingsimulated.nim b/codex/sales/states/provingsimulated.nim index 9c77e85a..cb3729fa 100644 --- a/codex/sales/states/provingsimulated.nim +++ b/codex/sales/states/provingsimulated.nim @@ -8,6 +8,7 @@ when codex_enable_proof_failures: import ../../contracts/requests import ../../logutils import ../../market + import ../../utils/exceptions import ../salescontext import ./proving @@ -20,7 +21,7 @@ when codex_enable_proof_failures: proofCount: int proc onSubmitProofError(error: ref CatchableError, period: UInt256, slotId: SlotId) = - error "Submitting invalid proof failed", period = period, slotId, msg = error.msg + error "Submitting invalid proof failed", period, slotId, msg = error.msgDetail method prove*(state: SaleProvingSimulated, slot: Slot, challenge: ProofChallenge, onProve: OnProve, market: Market, currentPeriod: Period) {.async.} = trace "Processing proving in simulated mode" @@ -33,7 +34,7 @@ when codex_enable_proof_failures: warn "Submitting INVALID proof", period = currentPeriod, slotId = slot.id await market.submitProof(slot.id, Groth16Proof.default) except ProviderError as e: - if not e.revertReason.contains("Invalid proof"): + if not e.msgDetail.contains("Invalid proof"): onSubmitProofError(e, currentPeriod, slot.id) except CatchableError as e: onSubmitProofError(e, currentPeriod, slot.id) diff --git a/tests/examples.nim b/tests/examples.nim index cf0b7977..c70d0dbb 100644 --- a/tests/examples.nim +++ b/tests/examples.nim @@ -4,11 +4,21 @@ import std/times import std/typetraits import pkg/codex/contracts/requests +import pkg/codex/rng import pkg/codex/contracts/proofs import pkg/codex/sales/slotqueue import pkg/codex/stores +import pkg/codex/units +import pkg/chronos +import pkg/stew/byteutils import pkg/stint + +import ./codex/helpers/randomchunker + +export randomchunker +export units + proc exampleString*(length: int): string = let chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" result = newString(length) # Create a new empty string with a given length @@ -78,3 +88,16 @@ proc example*(_: type Groth16Proof): Groth16Proof = b: G2Point.example, c: G1Point.example ) + +proc example*(_: type RandomChunker, blocks: int): Future[string] {.async.} = + # doAssert blocks >= 3, "must be more than 3 blocks" + let rng = Rng.instance() + let chunker = RandomChunker.new( + rng, size = DefaultBlockSize * blocks.NBytes, chunkSize = DefaultBlockSize) + var data: seq[byte] + while (let moar = await chunker.getBytes(); moar != []): + data.add moar + return byteutils.toHex(data) + +proc example*(_: type RandomChunker): Future[string] {.async.} = + await RandomChunker.example(3) diff --git a/tests/integration/clioption.nim b/tests/integration/clioption.nim new file mode 100644 index 00000000..379cebb8 --- /dev/null +++ b/tests/integration/clioption.nim @@ -0,0 +1,13 @@ +import pkg/questionable + +type + CliOption* = object of RootObj + nodeIdx*: ?int + key*: string + value*: string + +proc `$`*(option: CliOption): string = + var res = option.key + if option.value.len > 0: + res &= "=" & option.value + return res diff --git a/tests/integration/codexclient.nim b/tests/integration/codexclient.nim index 7cb0b50e..a0919feb 100644 --- a/tests/integration/codexclient.nim +++ b/tests/integration/codexclient.nim @@ -1,6 +1,5 @@ import std/httpclient import std/strutils -import std/sequtils from pkg/libp2p import Cid, `$`, init import pkg/stint @@ -109,25 +108,26 @@ proc requestStorage*( ## Call request storage REST endpoint ## let response = client.requestStorageRaw(cid, duration, reward, proofProbability, collateral, expiry, nodes, tolerance) - assert response.status == "200 OK" + if response.status != "200 OK": + doAssert(false, response.body) PurchaseId.fromHex(response.body).catch proc getPurchase*(client: CodexClient, purchaseId: PurchaseId): ?!RestPurchase = let url = client.baseurl & "/storage/purchases/" & purchaseId.toHex - let body = client.http.getContent(url) - let json = ? parseJson(body).catch - RestPurchase.fromJson(json) + try: + let body = client.http.getContent(url) + let json = ? parseJson(body).catch + return RestPurchase.fromJson(json) + except CatchableError as e: + return failure e.msg proc getSalesAgent*(client: CodexClient, slotId: SlotId): ?!RestSalesAgent = let url = client.baseurl & "/sales/slots/" & slotId.toHex - echo "getting sales agent for id, ", slotId.toHex try: let body = client.http.getContent(url) - echo "get sales agent body: ", body let json = ? parseJson(body).catch return RestSalesAgent.fromJson(json) except CatchableError as e: - echo "[client.getSalesAgent] error getting agent: ", e.msg return failure e.msg proc getSlots*(client: CodexClient): ?!seq[Slot] = diff --git a/tests/integration/codexconfig.nim b/tests/integration/codexconfig.nim new file mode 100644 index 00000000..eda9c81a --- /dev/null +++ b/tests/integration/codexconfig.nim @@ -0,0 +1,61 @@ +import std/options +import std/sequtils +import pkg/codex/units +import ./clioption +import ./nodeconfig + +export nodeconfig +export clioption + +type + CodexConfig* = ref object of NodeConfig + numNodes*: int + cliOptions*: seq[CliOption] + logTopics*: seq[string] + +proc nodes*(config: CodexConfig, numNodes: int): CodexConfig = + if numNodes < 0: + raise newException(ValueError, "numNodes must be >= 0") + + var startConfig = config + startConfig.numNodes = numNodes + return startConfig + +proc simulateProofFailuresFor*( + config: CodexConfig, + providerIdx: int, + failEveryNProofs: int +): CodexConfig = + + if providerIdx > config.numNodes - 1: + raise newException(ValueError, "provider index out of bounds") + + var startConfig = config + startConfig.cliOptions.add( + CliOption( + nodeIdx: some providerIdx, + key: "--simulate-proof-failures", + value: $failEveryNProofs + ) + ) + return startConfig + +proc withLogTopics*( + config: CodexConfig, + topics: varargs[string] +): CodexConfig = + + var startConfig = config + startConfig.logTopics = startConfig.logTopics.concat(@topics) + return startConfig + +proc withStorageQuota*( + config: CodexConfig, + quota: NBytes +): CodexConfig = + + var startConfig = config + startConfig.cliOptions.add( + CliOption(key: "--storage-quota", value: $quota) + ) + return startConfig diff --git a/tests/integration/codexprocess.nim b/tests/integration/codexprocess.nim new file mode 100644 index 00000000..c7c0c6f1 --- /dev/null +++ b/tests/integration/codexprocess.nim @@ -0,0 +1,75 @@ +import pkg/questionable +import pkg/questionable/results +import pkg/confutils +import pkg/chronicles +import pkg/ethers +import pkg/libp2p +import std/os +import std/strutils +import codex/conf +import ./codexclient +import ./nodeprocess + +export codexclient +export chronicles +export nodeprocess + +logScope: + topics = "integration testing codex process" + +type + CodexProcess* = ref object of NodeProcess + client: ?CodexClient + +method workingDir(node: CodexProcess): string = + return currentSourcePath() / ".." / ".." / ".." + +method executable(node: CodexProcess): string = + return "build" / "codex" + +method startedOutput(node: CodexProcess): string = + return "REST service started" + +method processOptions(node: CodexProcess): set[AsyncProcessOption] = + return {AsyncProcessOption.StdErrToStdOut} + +method outputLineEndings(node: CodexProcess): string = + return "\n" + +method onOutputLineCaptured(node: CodexProcess, line: string) = + discard + +proc dataDir(node: CodexProcess): string = + let config = CodexConf.load(cmdLine = node.arguments) + return config.dataDir.string + +proc ethAccount*(node: CodexProcess): Address = + let config = CodexConf.load(cmdLine = node.arguments) + without ethAccount =? config.ethAccount: + raiseAssert "eth account not set" + return Address(ethAccount) + +proc apiUrl*(node: CodexProcess): string = + let config = CodexConf.load(cmdLine = node.arguments) + return "http://" & config.apiBindAddress & ":" & $config.apiPort & "/api/codex/v1" + +proc client*(node: CodexProcess): CodexClient = + if client =? node.client: + return client + let client = CodexClient.new(node.apiUrl) + node.client = some client + return client + +method stop*(node: CodexProcess) {.async.} = + logScope: + nodeName = node.name + + await procCall NodeProcess(node).stop() + + trace "stopping codex client" + if client =? node.client: + client.close() + node.client = none CodexClient + +method removeDataDir*(node: CodexProcess) = + removeDir(node.dataDir) diff --git a/tests/integration/hardhatconfig.nim b/tests/integration/hardhatconfig.nim new file mode 100644 index 00000000..637b40c0 --- /dev/null +++ b/tests/integration/hardhatconfig.nim @@ -0,0 +1,6 @@ +import ./nodeconfig + +export nodeconfig + +type + HardhatConfig* = ref object of NodeConfig diff --git a/tests/integration/hardhatprocess.nim b/tests/integration/hardhatprocess.nim new file mode 100644 index 00000000..935b4d16 --- /dev/null +++ b/tests/integration/hardhatprocess.nim @@ -0,0 +1,128 @@ +import pkg/questionable +import pkg/questionable/results +import pkg/confutils +import pkg/chronicles +import pkg/chronos +import pkg/stew/io2 +import std/os +import std/sets +import std/sequtils +import std/strutils +import pkg/codex/conf +import pkg/codex/utils/trackedfutures +import ./codexclient +import ./nodeprocess + +export codexclient +export chronicles + +logScope: + topics = "integration testing hardhat process" + nodeName = "hardhat" + +type + HardhatProcess* = ref object of NodeProcess + logFile: ?IoHandle + +method workingDir(node: HardhatProcess): string = + return currentSourcePath() / ".." / ".." / ".." / "vendor" / "codex-contracts-eth" + +method executable(node: HardhatProcess): string = + return "node_modules" / ".bin" / "hardhat" + +method startedOutput(node: HardhatProcess): string = + return "Started HTTP and WebSocket JSON-RPC server at" + +method processOptions(node: HardhatProcess): set[AsyncProcessOption] = + return {} + +method outputLineEndings(node: HardhatProcess): string = + return "\n" + +proc openLogFile(node: HardhatProcess, logFilePath: string): IoHandle = + let logFileHandle = openFile( + logFilePath, + {OpenFlags.Write, OpenFlags.Create, OpenFlags.Truncate} + ) + + without fileHandle =? logFileHandle: + fatal "failed to open log file", + path = logFilePath, + errorCode = $logFileHandle.error + + raiseAssert "failed to open log file, aborting" + + return fileHandle + +method start*(node: HardhatProcess) {.async.} = + + let poptions = node.processOptions + {AsyncProcessOption.StdErrToStdOut} + trace "starting node", + args = node.arguments, + executable = node.executable, + workingDir = node.workingDir, + processOptions = poptions + + try: + node.process = await startProcess( + node.executable, + node.workingDir, + @["node", "--export", "deployment-localhost.json"].concat(node.arguments), + options = poptions, + stdoutHandle = AsyncProcess.Pipe + ) + except CatchableError as e: + error "failed to start hardhat process", error = e.msg + +proc startNode*( + _: type HardhatProcess, + args: seq[string], + debug: string | bool = false, + name: string +): Future[HardhatProcess] {.async.} = + + var logFilePath = "" + + var arguments = newSeq[string]() + for arg in args: + if arg.contains "--log-file=": + logFilePath = arg.split("=")[1] + else: + arguments.add arg + + trace "starting hardhat node", arguments + ## Starts a Hardhat Node with the specified arguments. + ## Set debug to 'true' to see output of the node. + let hardhat = HardhatProcess( + arguments: arguments, + debug: ($debug != "false"), + trackedFutures: TrackedFutures.new(), + name: "hardhat" + ) + + await hardhat.start() + + if logFilePath != "": + hardhat.logFile = some hardhat.openLogFile(logFilePath) + + return hardhat + +method onOutputLineCaptured(node: HardhatProcess, line: string) = + without logFile =? node.logFile: + return + + if error =? logFile.writeFile(line & "\n").errorOption: + error "failed to write to hardhat file", errorCode = error + discard logFile.closeFile() + node.logFile = none IoHandle + +method stop*(node: HardhatProcess) {.async.} = + # terminate the process + await procCall NodeProcess(node).stop() + + if logFile =? node.logFile: + trace "closing hardhat log file" + discard logFile.closeFile() + +method removeDataDir*(node: HardhatProcess) = + discard diff --git a/tests/integration/marketplacesuite.nim b/tests/integration/marketplacesuite.nim new file mode 100644 index 00000000..4ed7fb9f --- /dev/null +++ b/tests/integration/marketplacesuite.nim @@ -0,0 +1,120 @@ +import pkg/chronos +import pkg/ethers/erc20 +from pkg/libp2p import Cid +import pkg/codex/contracts/marketplace as mp +import pkg/codex/periods +import pkg/codex/utils/json +import ./multinodes +import ../contracts/time +import ../contracts/deployment + +export mp +export multinodes + +template marketplacesuite*(name: string, body: untyped) = + + multinodesuite name: + + var marketplace {.inject, used.}: Marketplace + var period: uint64 + var periodicity: Periodicity + var token {.inject, used.}: Erc20Token + var continuousMineFut: Future[void] + + proc getCurrentPeriod(): Future[Period] {.async.} = + return periodicity.periodOf(await ethProvider.currentTime()) + + proc advanceToNextPeriod() {.async.} = + let periodicity = Periodicity(seconds: period.u256) + let currentTime = await ethProvider.currentTime() + let currentPeriod = periodicity.periodOf(currentTime) + let endOfPeriod = periodicity.periodEnd(currentPeriod) + await ethProvider.advanceTimeTo(endOfPeriod + 1) + + template eventuallyP(condition: untyped, finalPeriod: Period): bool = + + proc eventuallyP: Future[bool] {.async.} = + while( + let currentPeriod = await getCurrentPeriod(); + currentPeriod <= finalPeriod + ): + if condition: + return true + await sleepAsync(1.millis) + return condition + + await eventuallyP() + + proc periods(p: int): uint64 = + p.uint64 * period + + proc createAvailabilities(datasetSize: int, duration: uint64) = + # post availability to each provider + for i in 0.. 1: + doAssert(origDatasetSizeInBlocks >= 3, + "dataset size must be greater than or equal to 3 blocks with " & + "more than one node") + + proc requestStorage(client: CodexClient, + cid: Cid, + proofProbability: uint64 = 1, + duration: uint64 = 12.periods, + reward = 400.u256, + collateral = 100.u256, + expiry: uint64 = 4.periods, + nodes = providers().len, + tolerance = 0, + origDatasetSizeInBlocks: int): Future[PurchaseId] {.async.} = + + let expiry = (await ethProvider.currentTime()) + expiry.u256 + + let id = client.requestStorage( + cid, + expiry=expiry, + duration=duration.u256, + proofProbability=proofProbability.u256, + collateral=collateral, + reward=reward, + nodes=nodes.uint, + tolerance=tolerance.uint + ).get + + return id + + proc continuouslyAdvanceEvery(every: chronos.Duration) {.async.} = + try: + while true: + await advanceToNextPeriod() + await sleepAsync(every) + except CancelledError: + discard + + setup: + # TODO: This is currently the address of the marketplace with a dummy + # verifier. Use real marketplace address, `Marketplace.address` once we + # can generate actual Groth16 ZK proofs. + let marketplaceAddress = Marketplace.address(dummyVerifier = true) + marketplace = Marketplace.new(marketplaceAddress, ethProvider.getSigner()) + let tokenAddress = await marketplace.token() + token = Erc20Token.new(tokenAddress, ethProvider.getSigner()) + let config = await mp.config(marketplace) + period = config.proofs.period.truncate(uint64) + periodicity = Periodicity(seconds: period.u256) + + continuousMineFut = continuouslyAdvanceEvery(chronos.millis(500)) + + teardown: + await continuousMineFut.cancelAndWait() + + body diff --git a/tests/integration/multinodes.nim b/tests/integration/multinodes.nim index c9adaa89..58a013a1 100644 --- a/tests/integration/multinodes.nim +++ b/tests/integration/multinodes.nim @@ -1,167 +1,276 @@ import std/os -import std/macros -import std/httpclient +import std/sequtils +import std/strutils +import std/sugar +import std/times import pkg/codex/logutils -import ../ethertest -import ./codexclient -import ./nodes +import pkg/chronos/transports/stream +import pkg/ethers +import ./hardhatprocess +import ./codexprocess +import ./hardhatconfig +import ./codexconfig +import ../asynctest +import ../checktest -export ethertest -export codexclient -export nodes +export asynctest +export ethers except `%` +export hardhatprocess +export codexprocess +export hardhatconfig +export codexconfig type RunningNode* = ref object role*: Role node*: NodeProcess - restClient*: CodexClient - datadir*: string - ethAccount*: Address - StartNodes* = object - clients*: uint - providers*: uint - validators*: uint - DebugNodes* = object - client*: bool - provider*: bool - validator*: bool - topics*: string + NodeConfigs* = object + clients*: CodexConfig + providers*: CodexConfig + validators*: CodexConfig + hardhat*: HardhatConfig Role* {.pure.} = enum Client, Provider, - Validator + Validator, + Hardhat -proc new*(_: type RunningNode, - role: Role, - node: NodeProcess, - restClient: CodexClient, - datadir: string, - ethAccount: Address): RunningNode = - RunningNode(role: role, - node: node, - restClient: restClient, - datadir: datadir, - ethAccount: ethAccount) +proc nextFreePort(startPort: int): Future[int] {.async.} = -proc init*(_: type StartNodes, - clients, providers, validators: uint): StartNodes = - StartNodes(clients: clients, providers: providers, validators: validators) + proc client(server: StreamServer, transp: StreamTransport) {.async.} = + await transp.closeWait() -proc init*(_: type DebugNodes, - client, provider, validator: bool, - topics: string = "validator,proving,market"): DebugNodes = - DebugNodes(client: client, provider: provider, validator: validator, - topics: topics) + var port = startPort + while true: + trace "checking if port is free", port + try: + let host = initTAddress("127.0.0.1", port) + # We use ReuseAddr here only to be able to reuse the same IP/Port when + # there's a TIME_WAIT socket. It's useful when running the test multiple + # times or if a test ran previously using the same port. + var server = createStreamServer(host, client, {ReuseAddr}) + trace "port is free", port + await server.closeWait() + return port + except TransportOsError: + trace "port is not free", port + inc port -template multinodesuite*(name: string, - startNodes: StartNodes, debugNodes: DebugNodes, body: untyped) = +template multinodesuite*(name: string, body: untyped) = - if (debugNodes.client or debugNodes.provider) and - (enabledLogLevel > LogLevel.TRACE or - enabledLogLevel == LogLevel.NONE): - echo "" - echo "More test debug logging is available by running the tests with " & - "'-d:chronicles_log_level=TRACE " & - "-d:chronicles_disabled_topics=websock " & - "-d:chronicles_default_output_device=stdout " & - "-d:chronicles_sinks=textlines'" - echo "" - - ethersuite name: + asyncchecksuite name: var running: seq[RunningNode] var bootstrap: string + let starttime = now().format("yyyy-MM-dd'_'HH:mm:ss") + var currentTestName = "" + var nodeConfigs: NodeConfigs + var ethProvider {.inject, used.}: JsonRpcProvider + var accounts {.inject, used.}: seq[Address] + var snapshot: JsonNode - proc newNodeProcess(index: int, - addlOptions: seq[string], - debug: bool): (NodeProcess, string, Address) = + template test(tname, startNodeConfigs, tbody) = + currentTestName = tname + nodeConfigs = startNodeConfigs + test tname: + tbody - if index > accounts.len - 1: - raiseAssert("Cannot start node at index " & $index & + proc sanitize(pathSegment: string): string = + var sanitized = pathSegment + for invalid in invalidFilenameChars.items: + sanitized = sanitized.replace(invalid, '_') + sanitized + + proc getLogFile(role: Role, index: ?int): string = + # create log file path, format: + # tests/integration/logs/ //_.log + + var logDir = currentSourcePath.parentDir() / + "logs" / + sanitize($starttime & " " & name) / + sanitize($currentTestName) + createDir(logDir) + + var fn = $role + if idx =? index: + fn &= "_" & $idx + fn &= ".log" + + let fileName = logDir / fn + return fileName + + proc newHardhatProcess( + config: HardhatConfig, + role: Role + ): Future[NodeProcess] {.async.} = + + var args: seq[string] = @[] + if config.logFile: + let updatedLogFile = getLogFile(role, none int) + args.add "--log-file=" & updatedLogFile + + let node = await HardhatProcess.startNode(args, config.debugEnabled, "hardhat") + await node.waitUntilStarted() + + trace "hardhat node started" + return node + + proc newCodexProcess(roleIdx: int, + config: CodexConfig, + role: Role + ): Future[NodeProcess] {.async.} = + + let nodeIdx = running.len + var conf = config + + if nodeIdx > accounts.len - 1: + raiseAssert("Cannot start node at nodeIdx " & $nodeIdx & ", not enough eth accounts.") - let datadir = getTempDir() / "Codex" & $index - var options = @[ - "--api-port=" & $(8080 + index), - "--data-dir=" & datadir, - "--nat=127.0.0.1", - "--listen-addrs=/ip4/127.0.0.1/tcp/0", - "--disc-ip=127.0.0.1", - "--disc-port=" & $(8090 + index), - "--eth-account=" & $accounts[index]] - .concat(addlOptions) - if debug: options.add "--log-level=INFO;TRACE: " & debugNodes.topics - let node = startNode(options, debug = debug) - node.waitUntilStarted() - (node, datadir, accounts[index]) + let datadir = getTempDir() / "Codex" / + sanitize($starttime) / + sanitize($role & "_" & $roleIdx) - proc newCodexClient(index: int): CodexClient = - CodexClient.new("http://localhost:" & $(8080 + index) & "/api/codex/v1") + if conf.logFile: + let updatedLogFile = getLogFile(role, some roleIdx) + conf.cliOptions.add CliOption(key: "--log-file", value: updatedLogFile) - proc startClientNode() = - let index = running.len - let (node, datadir, account) = newNodeProcess( - index, @["--persistence"], debugNodes.client) - let restClient = newCodexClient(index) - running.add RunningNode.new(Role.Client, node, restClient, datadir, - account) - if debugNodes.client: - debug "started new client node and codex client", - restApiPort = 8080 + index, discPort = 8090 + index, account + let logLevel = conf.logLevel |? LogLevel.INFO + if conf.logTopics.len > 0: + conf.cliOptions.add CliOption( + key: "--log-level", + value: $logLevel & ";TRACE: " & conf.logTopics.join(",") + ) + else: + conf.cliOptions.add CliOption(key: "--log-level", value: $logLevel) - proc startProviderNode(failEveryNProofs: uint = 0) = - let index = running.len - let (node, datadir, account) = newNodeProcess(index, @[ - "--bootstrap-node=" & bootstrap, - "--persistence", - "--simulate-proof-failures=" & $failEveryNProofs], - debugNodes.provider) - let restClient = newCodexClient(index) - running.add RunningNode.new(Role.Provider, node, restClient, datadir, - account) - if debugNodes.provider: - debug "started new provider node and codex client", - restApiPort = 8080 + index, discPort = 8090 + index, account + var args = conf.cliOptions.map(o => $o) + .concat(@[ + "--api-port=" & $ await nextFreePort(8080 + nodeIdx), + "--data-dir=" & datadir, + "--nat=127.0.0.1", + "--listen-addrs=/ip4/127.0.0.1/tcp/0", + "--disc-ip=127.0.0.1", + "--disc-port=" & $ await nextFreePort(8090 + nodeIdx), + "--eth-account=" & $accounts[nodeIdx]]) - proc startValidatorNode() = - let index = running.len - let (node, datadir, account) = newNodeProcess(index, @[ - "--bootstrap-node=" & bootstrap, - "--validator"], - debugNodes.validator) - let restClient = newCodexClient(index) - running.add RunningNode.new(Role.Validator, node, restClient, datadir, - account) - if debugNodes.validator: - debug "started new validator node and codex client", - restApiPort = 8080 + index, discPort = 8090 + index, account + let node = await CodexProcess.startNode(args, conf.debugEnabled, $role & $roleIdx) + await node.waitUntilStarted() + trace "node started", nodeName = $role & $roleIdx - proc clients(): seq[RunningNode] {.used.} = - running.filter(proc(r: RunningNode): bool = r.role == Role.Client) + return node - proc providers(): seq[RunningNode] {.used.} = - running.filter(proc(r: RunningNode): bool = r.role == Role.Provider) + proc hardhat: HardhatProcess = + for r in running: + if r.role == Role.Hardhat: + return HardhatProcess(r.node) + return nil - proc validators(): seq[RunningNode] {.used.} = - running.filter(proc(r: RunningNode): bool = r.role == Role.Validator) + proc clients: seq[CodexProcess] {.used.} = + return collect: + for r in running: + if r.role == Role.Client: + CodexProcess(r.node) + + proc providers: seq[CodexProcess] {.used.} = + return collect: + for r in running: + if r.role == Role.Provider: + CodexProcess(r.node) + + proc validators: seq[CodexProcess] {.used.} = + return collect: + for r in running: + if r.role == Role.Validator: + CodexProcess(r.node) + + proc startHardhatNode(): Future[NodeProcess] {.async.} = + var config = nodeConfigs.hardhat + return await newHardhatProcess(config, Role.Hardhat) + + proc startClientNode(): Future[NodeProcess] {.async.} = + let clientIdx = clients().len + var config = nodeConfigs.clients + config.cliOptions.add CliOption(key: "--persistence") + return await newCodexProcess(clientIdx, config, Role.Client) + + proc startProviderNode(): Future[NodeProcess] {.async.} = + let providerIdx = providers().len + var config = nodeConfigs.providers + config.cliOptions.add CliOption(key: "--bootstrap-node", value: bootstrap) + config.cliOptions.add CliOption(key: "--persistence") + + # filter out provider options by provided index + config.cliOptions = config.cliOptions.filter( + o => (let idx = o.nodeIdx |? providerIdx; idx == providerIdx) + ) + + return await newCodexProcess(providerIdx, config, Role.Provider) + + proc startValidatorNode(): Future[NodeProcess] {.async.} = + let validatorIdx = validators().len + var config = nodeConfigs.validators + config.cliOptions.add CliOption(key: "--bootstrap-node", value: bootstrap) + config.cliOptions.add CliOption(key: "--validator") + + return await newCodexProcess(validatorIdx, config, Role.Validator) setup: - for i in 0.. 0: + error "failed to exit process, check for zombies", exitCode + + trace "closing node process' streams" + await node.process.closeWait() + + except CatchableError as e: + error "error stopping node process", error = e.msg + + finally: + node.process = nil + + trace "node stopped" + +proc waitUntilStarted*(node: NodeProcess) {.async.} = + logScope: + nodeName = node.name + + trace "waiting until node started" + + let started = newFuture[void]() + try: + discard node.captureOutput(node.startedOutput, started).track(node) + await started.wait(5.seconds) + except AsyncTimeoutError as e: + # attempt graceful shutdown in case node was partially started, prevent + # zombies + await node.stop() + raiseAssert "node did not output '" & node.startedOutput & "'" + +proc restart*(node: NodeProcess) {.async.} = + await node.stop() + await node.start() + await node.waitUntilStarted() + +method removeDataDir*(node: NodeProcess) {.base.} = + raiseAssert "[removeDataDir] not implemented" diff --git a/tests/integration/testIntegration.nim b/tests/integration/testIntegration.nim index 7863cba3..79d36513 100644 --- a/tests/integration/testIntegration.nim +++ b/tests/integration/testIntegration.nim @@ -8,7 +8,6 @@ import pkg/codex/rng import pkg/stew/byteutils import pkg/ethers/erc20 import pkg/codex/contracts -import pkg/codex/utils/stintutils import ../contracts/time import ../contracts/deployment import ../codex/helpers @@ -20,12 +19,6 @@ import ./twonodes # to enable custom logging levels for specific topics like: debug2 = "INFO; TRACE: marketplace" twonodessuite "Integration tests", debug1 = false, debug2 = false: - - proc purchaseStateIs(client: CodexClient, id: PurchaseId, state: string): bool = - without purchase =? client.getPurchase(id): - return false - return purchase.state == state - setup: # Our Hardhat configuration does use automine, which means that time tracked by `ethProvider.currentTime()` is not # advanced until blocks are mined and that happens only when transaction is submitted. @@ -255,34 +248,3 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false: let responseBefore = client1.requestStorageRaw(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256, expiry=currentTime+10) check responseBefore.status == "400 Bad Request" check responseBefore.body == "Expiry has to be before the request's end (now + duration)" - - # TODO: skipping this test for now as is not passing on macos/linux for some - # reason. This test has been completely refactored in - # https://github.com/codex-storage/nim-codex/pull/607 in which it will be - # reintroduced. - # test "expired request partially pays out for stored time": - # let marketplace = Marketplace.new(Marketplace.address, ethProvider.getSigner()) - # let tokenAddress = await marketplace.token() - # let token = Erc20Token.new(tokenAddress, ethProvider.getSigner()) - # let reward = 400.u256 - # let duration = 100.u256 - - # # client 2 makes storage available - # let startBalanceClient2 = await token.balanceOf(account2) - # discard client2.postAvailability(size=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get - - # # client 1 requests storage but requires two nodes to host the content - # let startBalanceClient1 = await token.balanceOf(account1) - # let expiry = (await ethProvider.currentTime()) + 10 - # let cid = client1.upload(exampleString(100000)).get - # let id = client1.requestStorage(cid, duration=duration, reward=reward, proofProbability=3.u256, expiry=expiry, collateral=200.u256, nodes=2).get - - # # We have to wait for Client 2 fills the slot, before advancing time. - # # Until https://github.com/codex-storage/nim-codex/issues/594 is implemented nothing better then - # # sleeping some seconds is available. - # await sleepAsync(2.seconds) - # await ethProvider.advanceTimeTo(expiry+1) - # check eventually(client1.purchaseStateIs(id, "cancelled"), 20000) - - # check eventually ((await token.balanceOf(account2)) - startBalanceClient2) > 0 and ((await token.balanceOf(account2)) - startBalanceClient2) < 10*reward - # check eventually (startBalanceClient1 - (await token.balanceOf(account1))) == ((await token.balanceOf(account2)) - startBalanceClient2) diff --git a/tests/integration/testmarketplace.nim b/tests/integration/testmarketplace.nim new file mode 100644 index 00000000..27e0eeab --- /dev/null +++ b/tests/integration/testmarketplace.nim @@ -0,0 +1,93 @@ +import std/math +import pkg/stew/byteutils +import pkg/codex/units +import ./marketplacesuite +import ../examples + +marketplacesuite "Marketplace payouts": + + test "expired request partially pays out for stored time", + NodeConfigs( + # Uncomment to start Hardhat automatically, typically so logs can be inspected locally + # hardhat: HardhatConfig().withLogFile() + + clients: + CodexConfig() + .nodes(1) + # .debug() # uncomment to enable console log output.debug() + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + .withLogTopics("node", "erasure"), + + providers: + CodexConfig() + .nodes(1) + # .debug() # uncomment to enable console log output + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + .withLogTopics("node", "marketplace", "sales", "reservations", "node", "proving", "clock"), + ): + let reward = 400.u256 + let duration = 100.periods + let collateral = 200.u256 + let expiry = 4.periods + let datasetSizeInBlocks = 3 + let data = await RandomChunker.example(blocks=datasetSizeInBlocks) + let client = clients()[0] + let provider = providers()[0] + let clientApi = client.client + let providerApi = provider.client + let startBalanceProvider = await token.balanceOf(provider.ethAccount) + let startBalanceClient = await token.balanceOf(client.ethAccount) + # original data = 3 blocks so slot size will be 4 blocks + let slotSize = (DefaultBlockSize * 4.NBytes).Natural.u256 + + # provider makes storage available + discard providerApi.postAvailability( + # make availability size large enough to only fill 1 slot, thus causing a + # cancellation + size=slotSize, + duration=duration.u256, + minPrice=reward, + maxCollateral=collateral) + + let cid = clientApi.upload(data).get + + var slotIdxFilled = none UInt256 + proc onSlotFilled(event: SlotFilled) = + slotIdxFilled = some event.slotIndex + + let subscription = await marketplace.subscribe(SlotFilled, onSlotFilled) + + # client requests storage but requires two nodes to host the content + let id = await clientApi.requestStorage( + cid, + duration=duration, + reward=reward, + expiry=expiry, + collateral=collateral, + nodes=3, + tolerance=1, + origDatasetSizeInBlocks=datasetSizeInBlocks + ) + + # wait until one slot is filled + check eventually slotIdxFilled.isSome + + # wait until sale is cancelled + without requestId =? clientApi.requestId(id): + fail() + let slotId = slotId(requestId, !slotIdxFilled) + check eventually(providerApi.saleStateIs(slotId, "SaleCancelled")) + + check eventually ( + let endBalanceProvider = (await token.balanceOf(provider.ethAccount)); + let difference = endBalanceProvider - startBalanceProvider; + difference > 0 and + difference < expiry.u256*reward + ) + check eventually ( + let endBalanceClient = (await token.balanceOf(client.ethAccount)); + let endBalanceProvider = (await token.balanceOf(provider.ethAccount)); + (startBalanceClient - endBalanceClient) == (endBalanceProvider - startBalanceProvider) + ) + + await subscription.unsubscribe() diff --git a/tests/integration/testproofs.nim b/tests/integration/testproofs.nim index a3440f99..13fd1605 100644 --- a/tests/integration/testproofs.nim +++ b/tests/integration/testproofs.nim @@ -1,232 +1,299 @@ -import std/sequtils -import std/os -from std/times import getTime, toUnix -import pkg/codex/contracts +import std/math +from std/times import inMilliseconds import pkg/codex/logutils -import pkg/codex/periods +import pkg/stew/byteutils import ../contracts/time import ../contracts/deployment -import ./twonodes -import ./multinodes +import ../codex/helpers +import ../examples +import ./marketplacesuite + +export chronicles logScope: - topics = "test proofs" + topics = "integration test proofs" -# TODO: This is currently the address of the marketplace with a dummy -# verifier. Use real marketplace address once we can generate actual -# Groth16 ZK proofs. -let marketplaceAddress = Marketplace.address(dummyVerifier = true) -twonodessuite "Proving integration test", debug1=false, debug2=false: - let validatorDir = getTempDir() / "CodexValidator" +marketplacesuite "Hosts submit regular proofs": - var marketplace: Marketplace - var period: uint64 + test "hosts submit periodic proofs for slots they fill", NodeConfigs( + # Uncomment to start Hardhat automatically, typically so logs can be inspected locally + # hardhat: HardhatConfig().withLogFile(), - proc purchaseStateIs(client: CodexClient, id: PurchaseId, state: string): bool = - client.getPurchase(id).option.?state == some state + clients: + CodexConfig() + .nodes(1) + # .debug() # uncomment to enable console log output + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + .withLogTopics("node"), - setup: - marketplace = Marketplace.new(marketplaceAddress, ethProvider) - period = (await marketplace.config()).proofs.period.truncate(uint64) + providers: + CodexConfig() + .nodes(1) + # .debug() # uncomment to enable console log output + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + .withLogTopics("marketplace", "sales", "reservations", "node"), + ): + let client0 = clients()[0].client + let totalPeriods = 50 + let datasetSizeInBlocks = 2 - # Our Hardhat configuration does use automine, which means that time tracked by `ethProvider.currentTime()` is not - # advanced until blocks are mined and that happens only when transaction is submitted. - # As we use in tests ethProvider.currentTime() which uses block timestamp this can lead to synchronization issues. - await ethProvider.advanceTime(1.u256) + let data = await RandomChunker.example(blocks=1) + createAvailabilities(data.len, totalPeriods.periods) - proc waitUntilPurchaseIsStarted(proofProbability: uint64 = 3, - duration: uint64 = 100 * period, - expiry: uint64 = 30) {.async.} = - discard client2.postAvailability( - size=0xFFFFF.u256, - duration=duration.u256, - minPrice=300.u256, - maxCollateral=200.u256 - ) - let cid = client1.upload("some file contents").get - let expiry = (await ethProvider.currentTime()) + expiry.u256 - let id = client1.requestStorage( + let cid = client0.upload(data).get + + let purchaseId = await client0.requestStorage( cid, - expiry=expiry, - duration=duration.u256, - proofProbability=proofProbability.u256, - collateral=100.u256, - reward=400.u256 - ).get - check eventually client1.purchaseStateIs(id, "started") + duration=totalPeriods.periods, + origDatasetSizeInBlocks = datasetSizeInBlocks) + check eventually client0.purchaseStateIs(purchaseId, "started") - proc advanceToNextPeriod {.async.} = - let periodicity = Periodicity(seconds: period.u256) - let currentPeriod = periodicity.periodOf(await ethProvider.currentTime()) - let endOfPeriod = periodicity.periodEnd(currentPeriod) - await ethProvider.advanceTimeTo(endOfPeriod + 1) - - proc startValidator: NodeProcess = - let validator = startNode( - [ - "--data-dir=" & validatorDir, - "--api-port=8089", - "--disc-port=8099", - "--listen-addrs=/ip4/127.0.0.1/tcp/0", - "--validator", - "--eth-account=" & $accounts[2] - ], debug = false - ) - validator.waitUntilStarted() - validator - - proc stopValidator(node: NodeProcess) = - node.stop() - removeDir(validatorDir) - - test "hosts submit periodic proofs for slots they fill": - await waitUntilPurchaseIsStarted(proofProbability=1) var proofWasSubmitted = false proc onProofSubmitted(event: ProofSubmitted) = proofWasSubmitted = true + let subscription = await marketplace.subscribe(ProofSubmitted, onProofSubmitted) - await ethProvider.advanceTime(period.u256) - check eventually proofWasSubmitted - await subscription.unsubscribe() - test "validator will mark proofs as missing": - let validator = startValidator() - await waitUntilPurchaseIsStarted(proofProbability=1) - - node2.stop() - - var slotWasFreed = false - proc onSlotFreed(event: SlotFreed) = - slotWasFreed = true - let subscription = await marketplace.subscribe(SlotFreed, onSlotFreed) - - for _ in 0..<100: - if slotWasFreed: - break - else: - await advanceToNextPeriod() - await sleepAsync(1.seconds) - - check slotWasFreed + let currentPeriod = await getCurrentPeriod() + check eventuallyP(proofWasSubmitted, currentPeriod + totalPeriods.u256 + 1) await subscription.unsubscribe() - stopValidator(validator) -multinodesuite "Simulate invalid proofs", - StartNodes.init(clients=1'u, providers=0'u, validators=1'u), - DebugNodes.init(client=false, provider=false, validator=false): - proc purchaseStateIs(client: CodexClient, id: PurchaseId, state: string): bool = - client.getPurchase(id).option.?state == some state - - var marketplace: Marketplace - var period: uint64 - var slotId: SlotId - - setup: - marketplace = Marketplace.new(marketplaceAddress, ethProvider) - let config = await marketplace.config() - period = config.proofs.period.truncate(uint64) - slotId = SlotId(array[32, byte].default) # ensure we aren't reusing from prev test - - # Our Hardhat configuration does use automine, which means that time tracked by `ethProvider.currentTime()` is not - # advanced until blocks are mined and that happens only when transaction is submitted. - # As we use in tests ethProvider.currentTime() which uses block timestamp this can lead to synchronization issues. - await ethProvider.advanceTime(1.u256) - - proc periods(p: Ordinal | uint): uint64 = - when p is uint: - p * period - else: p.uint * period - - proc advanceToNextPeriod {.async.} = - let periodicity = Periodicity(seconds: period.u256) - let currentPeriod = periodicity.periodOf(await ethProvider.currentTime()) - let endOfPeriod = periodicity.periodEnd(currentPeriod) - await ethProvider.advanceTimeTo(endOfPeriod + 1) - - proc waitUntilPurchaseIsStarted(proofProbability: uint64 = 1, - duration: uint64 = 12.periods, - expiry: uint64 = 4.periods) {.async.} = - - if clients().len < 1 or providers().len < 1: - raiseAssert("must start at least one client and one ethProvider") - - let client = clients()[0].restClient - let storageProvider = providers()[0].restClient - - discard storageProvider.postAvailability( - size=0xFFFFF.u256, - duration=duration.u256, - minPrice=300.u256, - maxCollateral=200.u256 - ) - let cid = client.upload("some file contents " & $ getTime().toUnix).get - let expiry = (await ethProvider.currentTime()) + expiry.u256 - # avoid timing issues by filling the slot at the start of the next period - await advanceToNextPeriod() - let id = client.requestStorage( - cid, - expiry=expiry, - duration=duration.u256, - proofProbability=proofProbability.u256, - collateral=100.u256, - reward=400.u256 - ).get - check eventually client.purchaseStateIs(id, "started") - let purchase = client.getPurchase(id).get - slotId = slotId(purchase.requestId, 0.u256) +marketplacesuite "Simulate invalid proofs": # TODO: these are very loose tests in that they are not testing EXACTLY how # proofs were marked as missed by the validator. These tests should be # tightened so that they are showing, as an integration test, that specific # proofs are being marked as missed by the validator. - test "slot is freed after too many invalid proofs submitted": - let failEveryNProofs = 2'u - let totalProofs = 100'u - startProviderNode(failEveryNProofs) + test "slot is freed after too many invalid proofs submitted", NodeConfigs( + # Uncomment to start Hardhat automatically, typically so logs can be inspected locally + # hardhat: HardhatConfig().withLogFile(), - await waitUntilPurchaseIsStarted(duration=totalProofs.periods) + clients: + CodexConfig() + .nodes(1) + # .debug() # uncomment to enable console log output + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + .withLogTopics("node"), + + providers: + CodexConfig() + .nodes(1) + .simulateProofFailuresFor(providerIdx=0, failEveryNProofs=1) + # .debug() # uncomment to enable console log output + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + .withLogTopics("marketplace", "sales", "reservations", "node"), + + validators: + CodexConfig() + .nodes(1) + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + # .debug() # uncomment to enable console log output + .withLogTopics("validator", "onchain", "ethers") + ): + let client0 = clients()[0].client + let totalPeriods = 50 + + let datasetSizeInBlocks = 2 + let data = await RandomChunker.example(blocks=datasetSizeInBlocks) + createAvailabilities(data.len, totalPeriods.periods) + + let cid = client0.upload(data).get + + let purchaseId = await client0.requestStorage( + cid, + duration=totalPeriods.periods, + origDatasetSizeInBlocks=datasetSizeInBlocks) + let requestId = client0.requestId(purchaseId).get + + check eventually client0.purchaseStateIs(purchaseId, "started") var slotWasFreed = false proc onSlotFreed(event: SlotFreed) = - if slotId(event.requestId, event.slotIndex) == slotId: + if event.requestId == requestId and + event.slotIndex == 0.u256: # assume only one slot, so index 0 slotWasFreed = true + let subscription = await marketplace.subscribe(SlotFreed, onSlotFreed) - for _ in 0.. //_.log + .withLogTopics("node"), + + providers: + CodexConfig() + .nodes(1) + .simulateProofFailuresFor(providerIdx=0, failEveryNProofs=3) + # .debug() # uncomment to enable console log output + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + .withLogTopics("marketplace", "sales", "reservations", "node"), + + validators: + CodexConfig() + .nodes(1) + # .debug() + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + .withLogTopics("validator", "onchain", "ethers") + ): + let client0 = clients()[0].client + let totalPeriods = 25 + + let datasetSizeInBlocks = 2 + let data = await RandomChunker.example(blocks=datasetSizeInBlocks) + createAvailabilities(data.len, totalPeriods.periods) + + let cid = client0.upload(data).get + + let purchaseId = await client0.requestStorage( + cid, + duration=totalPeriods.periods, + origDatasetSizeInBlocks=datasetSizeInBlocks) + let requestId = client0.requestId(purchaseId).get + + check eventually client0.purchaseStateIs(purchaseId, "started") var slotWasFreed = false proc onSlotFreed(event: SlotFreed) = - if slotId(event.requestId, event.slotIndex) == slotId: + if event.requestId == requestId and + event.slotIndex == 0.u256: slotWasFreed = true + let subscription = await marketplace.subscribe(SlotFreed, onSlotFreed) - for _ in 0.. //_.log + .withLogTopics("node", "erasure", "clock", "purchases"), + + providers: + CodexConfig() + .nodes(3) + .simulateProofFailuresFor(providerIdx=0, failEveryNProofs=2) + # .debug() # uncomment to enable console log output + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + .withLogTopics("marketplace", "sales", "reservations", "node"), + + validators: + CodexConfig() + .nodes(1) + # .debug() + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + .withLogTopics("validator") + ): + let client0 = clients()[0].client + let provider0 = providers()[0] + let provider1 = providers()[1] + let provider2 = providers()[2] + let totalPeriods = 25 + + let datasetSizeInBlocks = 3 + let data = await RandomChunker.example(blocks=datasetSizeInBlocks) + # original data = 3 blocks so slot size will be 4 blocks + let slotSize = (DefaultBlockSize * 4.NBytes).Natural.u256 + + discard provider0.client.postAvailability( + size=slotSize, # should match 1 slot only + duration=totalPeriods.periods.u256, + minPrice=300.u256, + maxCollateral=200.u256 + ) + + let cid = client0.upload(data).get + + let purchaseId = await client0.requestStorage( + cid, + duration=totalPeriods.periods, + expiry=10.periods, + nodes=3, + tolerance=1, + origDatasetSizeInBlocks=datasetSizeInBlocks + ) + + without requestId =? client0.requestId(purchaseId): + fail() + + var filledSlotIds: seq[SlotId] = @[] + proc onSlotFilled(event: SlotFilled) = + let slotId = slotId(event.requestId, event.slotIndex) + filledSlotIds.add slotId + + let subscription = await marketplace.subscribe(SlotFilled, onSlotFilled) + + # wait til first slot is filled + check eventually filledSlotIds.len > 0 + + # now add availability for providers 1 and 2, which should allow them to to + # put the remaining slots in their queues + discard provider1.client.postAvailability( + size=slotSize, # should match 1 slot only + duration=totalPeriods.periods.u256, + minPrice=300.u256, + maxCollateral=200.u256 + ) + + check eventually filledSlotIds.len > 1 + + discard provider2.client.postAvailability( + size=slotSize, # should match 1 slot only + duration=totalPeriods.periods.u256, + minPrice=300.u256, + maxCollateral=200.u256 + ) + + check eventually filledSlotIds.len > 2 + + # Wait til second slot is filled. SaleFilled happens too quickly, check SaleProving instead. + check eventually provider1.client.saleStateIs(filledSlotIds[1], "SaleProving") + check eventually provider2.client.saleStateIs(filledSlotIds[2], "SaleProving") + + check eventually client0.purchaseStateIs(purchaseId, "started") + + let currentPeriod = await getCurrentPeriod() + check eventuallyP( + # SaleFinished happens too quickly, check SalePayout instead + provider0.client.saleStateIs(filledSlotIds[0], "SalePayout"), + currentPeriod + totalPeriods.u256 + 1) + + check eventuallyP( + # SaleFinished happens too quickly, check SalePayout instead + provider1.client.saleStateIs(filledSlotIds[1], "SalePayout"), + currentPeriod + totalPeriods.u256 + 1) + + check eventuallyP( + # SaleFinished happens too quickly, check SalePayout instead + provider2.client.saleStateIs(filledSlotIds[2], "SalePayout"), + currentPeriod + totalPeriods.u256 + 1) + + check eventually( + (await token.balanceOf(provider1.ethAccount)) > + (await token.balanceOf(provider0.ethAccount)) + ) await subscription.unsubscribe() diff --git a/tests/testIntegration.nim b/tests/testIntegration.nim index 926e992a..eca2e957 100644 --- a/tests/testIntegration.nim +++ b/tests/testIntegration.nim @@ -1,6 +1,7 @@ import ./integration/testcli import ./integration/testIntegration import ./integration/testblockexpiration +import ./integration/testmarketplace import ./integration/testproofs {.warning[UnusedImport]:off.}