From b3e57a37e2bffd9522ab3ea8c13b33a0e1164abf Mon Sep 17 00:00:00 2001 From: markspanbroek Date: Tue, 12 Mar 2024 13:10:14 +0100 Subject: [PATCH] Wire up prover (#736) * wire prover into node * stricter case object checks * return correct proof * misc renames * adding usefull traces * fix nodes and tolerance to match expected params * format challenges in logs * add circom compat to solidity groth16 convertion * update * bump time to give nodes time to load with all circom artifacts * misc * misc * use correct dataset geometry in erasure * make errors more searchable * use parens around `=? (await...)` calls * styling * styling * use push raises * fix to match constructor arguments * merge master * merge master * integration: fix proof parameters for a test Increased times due to ZK proof generation. Increased storage requirement because we're now hosting 5 slots instead of 1. * sales: calculate initial proof at start of period reason: this ensures that the period (and therefore the challenge) doesn't change while we're calculating the proof * integration: fix proof parameters for tests Increased times due to waiting on next period. Fixed data to be of right size. Updated expected payout due to hosting 5 slots. * sales: wait for stable proof challenge When the block pointer is nearing the wrap-around point, we wait another period before calculating a proof. * fix merge conflict --------- Co-authored-by: Dmitriy Ryajov Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com> --- codex/blockexchange/engine/engine.nim | 62 +++++++++++-------- codex/blockexchange/engine/pendingblocks.nim | 41 ++++++------ codex/erasure/backends/leopard.nim | 38 +++++------- codex/erasure/erasure.nim | 2 +- codex/logutils.nim | 2 + codex/node.nim | 39 +++++++----- codex/sales/reservations.nim | 13 ++-- codex/sales/states/initialproving.nim | 24 ++++++- codex/stores/blockstore.nim | 26 ++++---- codex/stores/networkstore.nim | 22 +++---- codex/utils/poseidon2digest.nim | 4 -- config.nims | 2 + tests/circuits/fixtures/proof_main.circom | 4 -- .../blockexchange/engine/testblockexc.nim | 2 +- tests/codex/helpers.nim | 2 +- tests/codex/helpers/mockmarket.nim | 3 +- tests/codex/node/helpers.nim | 8 ++- tests/codex/node/testnode.nim | 2 +- tests/codex/sales/helpers/periods.nim | 8 +++ .../codex/sales/states/testinitialproving.nim | 51 ++++++++++++--- tests/codex/sales/testsales.nim | 20 +++++- tests/integration/nodes.nim | 2 +- tests/integration/testIntegration.nim | 46 ++++++++++---- 23 files changed, 270 insertions(+), 153 deletions(-) delete mode 100644 tests/circuits/fixtures/proof_main.circom create mode 100644 tests/codex/sales/helpers/periods.nim diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 97e7cb99..60a7a689 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -129,7 +129,6 @@ proc sendWantHave( for p in peers: if p notin excluded: if address notin p.peerHave: - trace " wantHave > ", peer = p.id await b.network.request.sendWantList( p.id, @[address], @@ -145,11 +144,18 @@ proc sendWantBlock( @[address], wantType = WantType.WantBlock) # we want this remote to send us a block -proc monitorBlockHandle(b: BlockExcEngine, handle: Future[Block], address: BlockAddress, peerId: PeerId) {.async.} = +proc monitorBlockHandle( + b: BlockExcEngine, + handle: Future[Block], + address: BlockAddress, + peerId: PeerId) {.async.} = + try: trace "Monitoring block handle", address, peerId discard await handle trace "Block handle success", address, peerId + except CancelledError as exc: + trace "Block handle cancelled", address, peerId except CatchableError as exc: trace "Error block handle, disconnecting peer", address, exc = exc.msg, peerId @@ -167,38 +173,38 @@ proc monitorBlockHandle(b: BlockExcEngine, handle: Future[Block], address: Block proc requestBlock*( b: BlockExcEngine, address: BlockAddress, - timeout = DefaultBlockTimeout): Future[Block] {.async.} = + timeout = DefaultBlockTimeout): Future[?!Block] {.async.} = let blockFuture = b.pendingBlocks.getWantHandle(address, timeout) if b.pendingBlocks.isInFlight(address): - return await blockFuture + success await blockFuture + else: + let peers = b.peers.selectCheapest(address) + if peers.len == 0: + b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) - let peers = b.peers.selectCheapest(address) - if peers.len == 0: - b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) + let maybePeer = + if peers.len > 0: + peers[hash(address) mod peers.len].some + elif b.peers.len > 0: + toSeq(b.peers)[hash(address) mod b.peers.len].some + else: + BlockExcPeerCtx.none - let maybePeer = - if peers.len > 0: - peers[hash(address) mod peers.len].some - elif b.peers.len > 0: - toSeq(b.peers)[hash(address) mod b.peers.len].some - else: - BlockExcPeerCtx.none + if peer =? maybePeer: + asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id) + b.pendingBlocks.setInFlight(address) + await b.sendWantBlock(address, peer) + codex_block_exchange_want_block_lists_sent.inc() + await b.sendWantHave(address, @[peer], toSeq(b.peers)) + codex_block_exchange_want_have_lists_sent.inc() - if peer =? maybePeer: - asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id) - b.pendingBlocks.setInFlight(address) - await b.sendWantBlock(address, peer) - codex_block_exchange_want_block_lists_sent.inc() - await b.sendWantHave(address, @[peer], toSeq(b.peers)) - codex_block_exchange_want_have_lists_sent.inc() - - return await blockFuture + success await blockFuture proc requestBlock*( b: BlockExcEngine, cid: Cid, - timeout = DefaultBlockTimeout): Future[Block] = + timeout = DefaultBlockTimeout): Future[?!Block] = b.requestBlock(BlockAddress.init(cid)) proc blockPresenceHandler*( @@ -298,7 +304,10 @@ proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.asy b.discovery.queueProvideBlocksReq(cids.toSeq) proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} = - await b.resolveBlocks(blocks.mapIt(BlockDelivery(blk: it, address: BlockAddress(leaf: false, cid: it.cid)))) + await b.resolveBlocks( + blocks.mapIt( + BlockDelivery(blk: it, address: BlockAddress(leaf: false, cid: it.cid) + ))) proc payForBlocks(engine: BlockExcEngine, peer: BlockExcPeerCtx, @@ -315,8 +324,7 @@ proc payForBlocks(engine: BlockExcEngine, proc validateBlockDelivery( b: BlockExcEngine, - bd: BlockDelivery -): ?!void = + bd: BlockDelivery): ?!void = if bd.address notin b.pendingBlocks: return failure("Received block is not currently a pending block") diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index 2d517dfc..55a9da4c 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -44,11 +44,10 @@ proc updatePendingBlockGauge(p: PendingBlocksManager) = codex_block_exchange_pending_block_requests.set(p.blocks.len.int64) proc getWantHandle*( - p: PendingBlocksManager, - address: BlockAddress, - timeout = DefaultBlockTimeout, - inFlight = false -): Future[Block] {.async.} = + p: PendingBlocksManager, + address: BlockAddress, + timeout = DefaultBlockTimeout, + inFlight = false): Future[Block] {.async.} = ## Add an event for a block ## @@ -75,17 +74,15 @@ proc getWantHandle*( p.updatePendingBlockGauge() proc getWantHandle*( - p: PendingBlocksManager, - cid: Cid, - timeout = DefaultBlockTimeout, - inFlight = false -): Future[Block] = + p: PendingBlocksManager, + cid: Cid, + timeout = DefaultBlockTimeout, + inFlight = false): Future[Block] = p.getWantHandle(BlockAddress.init(cid), timeout, inFlight) proc resolve*( p: PendingBlocksManager, - blocksDelivery: seq[BlockDelivery] - ) {.gcsafe, raises: [].} = + blocksDelivery: seq[BlockDelivery]) {.gcsafe, raises: [].} = ## Resolve pending blocks ## @@ -108,16 +105,23 @@ proc resolve*( do: warn "Attempting to resolve block that's not currently a pending block", address = bd.address -proc setInFlight*(p: PendingBlocksManager, - address: BlockAddress, - inFlight = true) = +proc setInFlight*( + p: PendingBlocksManager, + address: BlockAddress, + inFlight = true) = + ## Set inflight status for a block + ## + p.blocks.withValue(address, pending): pending[].inFlight = inFlight trace "Setting inflight", address, inFlight = pending[].inFlight -proc isInFlight*(p: PendingBlocksManager, - address: BlockAddress, - ): bool = +proc isInFlight*( + p: PendingBlocksManager, + address: BlockAddress): bool = + ## Check if a block is in flight + ## + p.blocks.withValue(address, pending): result = pending[].inFlight trace "Getting inflight", address, inFlight = result @@ -145,7 +149,6 @@ iterator wantListCids*(p: PendingBlocksManager): Cid = yieldedCids.incl(cid) yield cid - iterator wantHandles*(p: PendingBlocksManager): Future[Block] = for v in p.blocks.values: yield v.handle diff --git a/codex/erasure/backends/leopard.nim b/codex/erasure/backends/leopard.nim index 2051fc97..992e34da 100644 --- a/codex/erasure/backends/leopard.nim +++ b/codex/erasure/backends/leopard.nim @@ -22,10 +22,9 @@ type decoder*: Option[LeoDecoder] method encode*( - self: LeoEncoderBackend, - data, - parity: var openArray[seq[byte]] -): Result[void, cstring] = + self: LeoEncoderBackend, + data, + parity: var openArray[seq[byte]]): Result[void, cstring] = ## Encode data using Leopard backend if parity.len == 0: @@ -43,11 +42,10 @@ method encode*( encoder.encode(data, parity) method decode*( - self: LeoDecoderBackend, - data, - parity, - recovered: var openArray[seq[byte]] -): Result[void, cstring] = + self: LeoDecoderBackend, + data, + parity, + recovered: var openArray[seq[byte]]): Result[void, cstring] = ## Decode data using given Leopard backend var decoder = @@ -71,26 +69,24 @@ method release*(self: LeoDecoderBackend) = self.decoder.get().free() proc new*( - T: type LeoEncoderBackend, - blockSize, - buffers, - parity: int -): LeoEncoderBackend = + T: type LeoEncoderBackend, + blockSize, + buffers, + parity: int): LeoEncoderBackend = ## Create an instance of an Leopard Encoder backend - ## + ## LeoEncoderBackend( blockSize: blockSize, buffers: buffers, parity: parity) proc new*( - T: type LeoDecoderBackend, - blockSize, - buffers, - parity: int -): LeoDecoderBackend = + T: type LeoDecoderBackend, + blockSize, + buffers, + parity: int): LeoDecoderBackend = ## Create an instance of an Leopard Decoder backend - ## + ## LeoDecoderBackend( blockSize: blockSize, buffers: buffers, diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 92b227aa..b03cfedd 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -241,7 +241,7 @@ proc init*( let rounded = roundUp(manifest.blocksCount, ecK) - steps = divUp(manifest.blocksCount, ecK) + steps = divUp(rounded, ecK) blocksCount = rounded + (steps * ecM) success EncodingParams( diff --git a/codex/logutils.nim b/codex/logutils.nim index 1c002b70..6abc21a5 100644 --- a/codex/logutils.nim +++ b/codex/logutils.nim @@ -239,3 +239,5 @@ formatIt(LogFormat.textLines, Cid): shortLog($it) formatIt(LogFormat.json, Cid): $it formatIt(UInt256): $it formatIt(MultiAddress): $it +formatIt(LogFormat.textLines, array[32, byte]): it.short0xHexLog +formatIt(LogFormat.json, array[32, byte]): it.to0xHex diff --git a/codex/node.nim b/codex/node.nim index ea66b55a..b8466943 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -556,6 +556,8 @@ proc onStore( trace "Unable to build slot", err = err.msg return failure(err) + trace "Slot successfully retrieved and reconstructed" + if cid =? slotRoot.toSlotCid() and cid != manifest.slotRoots[slotIdx.int]: trace "Slot root mismatch", manifest = manifest.slotRoots[slotIdx.int], recovered = slotRoot.toSlotCid() return failure(newException(CodexError, "Slot root mismatch")) @@ -593,25 +595,32 @@ proc onProve( error "Unable to fetch manifest for cid", err = err.msg return failure(err) - without builder =? Poseidon2Builder.new(self.networkStore.localStore, manifest), err: - error "Unable to create slots builder", err = err.msg - return failure(err) + when defined(verify_circuit): + without (inputs, proof) =? await prover.prove(slotIdx, manifest, challenge), err: + error "Unable to generate proof", err = err.msg + return failure(err) - without sampler =? DataSampler.new(slotIdx, self.networkStore.localStore, builder), err: - error "Unable to create data sampler", err = err.msg - return failure(err) + without checked =? await prover.verify(proof, inputs), err: + error "Unable to verify proof", err = err.msg + return failure(err) - without proofInput =? await sampler.getProofInput(challenge, nSamples = 3), err: - error "Unable to get proof input for slot", err = err.msg - return failure(err) + if not checked: + error "Proof verification failed" + return failure("Proof verification failed") - # Todo: send proofInput to circuit. Get proof. (Profit, repeat.) + trace "Proof verified successfully" + else: + without (_, proof) =? await prover.prove(slotIdx, manifest, challenge), err: + error "Unable to generate proof", err = err.msg + return failure(err) - # For now: dummy proof that is not all zero's, so that it is accepted by the - # dummy verifier: - var proof = Groth16Proof.default - proof.a.x = 42.u256 - success(proof) + let groth16Proof = proof.toGroth16Proof() + trace "Proof generated successfully", groth16Proof + + success groth16Proof + else: + warn "Prover not enabled" + failure "Prover not enabled" proc onExpiryUpdate( self: CodexNodeRef, diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index 6c68853e..997a085f 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -325,8 +325,9 @@ proc createReservation*( return failure(error) if availability.size < slotSize: - let error = newException(BytesOutOfBoundsError, "trying to reserve an " & - "amount of bytes that is greater than the total size of the Availability") + let error = newException( + BytesOutOfBoundsError, + "trying to reserve an amount of bytes that is greater than the total size of the Availability") return failure(error) if createResErr =? (await self.update(reservation)).errorOption: @@ -427,9 +428,9 @@ proc release*( return failure(error) if reservation.size < bytes.u256: - let error = newException(BytesOutOfBoundsError, - "trying to release an amount of bytes that is greater than the total " & - "size of the Reservation") + let error = newException( + BytesOutOfBoundsError, + "trying to release an amount of bytes that is greater than the total size of the Reservation") return failure(error) if releaseErr =? (await self.repo.release(bytes)).errorOption: @@ -540,7 +541,7 @@ proc findAvailability*( return some availability - trace "availiability did not match", + trace "availability did not match", size, availsize = availability.size, duration, availDuration = availability.duration, minPrice, availMinPrice = availability.minPrice, diff --git a/codex/sales/states/initialproving.nim b/codex/sales/states/initialproving.nim index 8f3a7287..4a5b8515 100644 --- a/codex/sales/states/initialproving.nim +++ b/codex/sales/states/initialproving.nim @@ -1,4 +1,5 @@ import pkg/questionable/results +import ../../clock import ../../logutils import ../statemachine import ../salesagent @@ -22,9 +23,24 @@ method onCancelled*(state: SaleInitialProving, request: StorageRequest): ?State method onFailed*(state: SaleInitialProving, request: StorageRequest): ?State = return some State(SaleFailed()) +proc waitUntilNextPeriod(clock: Clock, periodicity: Periodicity) {.async.} = + trace "Waiting until next period" + let period = periodicity.periodOf(clock.now().u256) + let periodEnd = periodicity.periodEnd(period).truncate(int64) + await clock.waitUntil(periodEnd + 1) + +proc waitForStableChallenge(market: Market, clock: Clock, slotId: SlotId) {.async.} = + let periodicity = await market.periodicity() + let downtime = await market.proofDowntime() + await clock.waitUntilNextPeriod(periodicity) + while (await market.getPointer(slotId)) > (256 - downtime): + await clock.waitUntilNextPeriod(periodicity) + method run*(state: SaleInitialProving, machine: Machine): Future[?State] {.async.} = let data = SalesAgent(machine).data let context = SalesAgent(machine).context + let market = context.market + let clock = context.clock without request =? data.request: raiseAssert "no sale request" @@ -32,10 +48,12 @@ method run*(state: SaleInitialProving, machine: Machine): Future[?State] {.async without onProve =? context.onProve: raiseAssert "onProve callback not set" + debug "Waiting for a proof challenge that is valid for the entire period" + let slot = Slot(request: request, slotIndex: data.slotIndex) + await waitForStableChallenge(market, clock, slot.id) + debug "Generating initial proof", requestId = data.requestId - let - slot = Slot(request: request, slotIndex: data.slotIndex) - challenge = await context.market.getChallenge(slot.id) + let challenge = await context.market.getChallenge(slot.id) without proof =? (await onProve(slot, challenge)), err: error "Failed to generate initial proof", error = err.msg return some State(SaleErrored(error: err)) diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index d97d6948..4921bebb 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -61,10 +61,9 @@ method getBlockAndProof*(self: BlockStore, treeCid: Cid, index: Natural): Future raiseAssert("getBlockAndProof not implemented!") method putBlock*( - self: BlockStore, - blk: Block, - ttl = Duration.none -): Future[?!void] {.base.} = + self: BlockStore, + blk: Block, + ttl = Duration.none): Future[?!void] {.base.} = ## Put a block to the blockstore ## @@ -75,8 +74,7 @@ method putCidAndProof*( treeCid: Cid, index: Natural, blockCid: Cid, - proof: CodexProof -): Future[?!void] {.base.} = + proof: CodexProof): Future[?!void] {.base.} = ## Put a block proof to the blockstore ## @@ -92,10 +90,9 @@ method getCidAndProof*( raiseAssert("getCidAndProof not implemented!") method ensureExpiry*( - self: BlockStore, - cid: Cid, - expiry: SecondsSince1970 -): Future[?!void] {.base.} = + self: BlockStore, + cid: Cid, + expiry: SecondsSince1970): Future[?!void] {.base.} = ## Ensure that block's assosicated expiry is at least given timestamp ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact ## @@ -103,11 +100,10 @@ method ensureExpiry*( raiseAssert("Not implemented!") method ensureExpiry*( - self: BlockStore, - treeCid: Cid, - index: Natural, - expiry: SecondsSince1970 -): Future[?!void] {.base.} = + self: BlockStore, + treeCid: Cid, + index: Natural, + expiry: SecondsSince1970): Future[?!void] {.base.} = ## Ensure that block's associated expiry is at least given timestamp ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact ## diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index 82a1428d..39835e23 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -8,8 +8,7 @@ ## those terms. -import pkg/upraises -push: {.upraises: [].} +{.push raises: [].} import pkg/chronos import pkg/libp2p @@ -37,13 +36,16 @@ type method getBlock*(self: NetworkStore, address: BlockAddress): Future[?!Block] {.async.} = trace "Getting block from local store or network", address - without blk =? await self.localStore.getBlock(address), error: - if not (error of BlockNotFoundError): return failure error - trace "Block not in local store", address + without blk =? (await self.localStore.getBlock(address)), err: + if not (err of BlockNotFoundError): + trace "Error getting block from local store", address, err = err.msg + return failure err - without newBlock =? (await self.engine.requestBlock(address)).catch, error: - trace "Unable to get block from exchange engine", address - return failure error + trace "Block not in local store", address, err = err.msg + + without newBlock =? (await self.engine.requestBlock(address)), err: + trace "Unable to get block from exchange engine", address, err = err.msg + return failure err return success newBlock @@ -166,6 +168,4 @@ proc new*( ): NetworkStore = ## Create new instance of a NetworkStore ## - NetworkStore( - localStore: localStore, - engine: engine) + NetworkStore(localStore: localStore, engine: engine) diff --git a/codex/utils/poseidon2digest.nim b/codex/utils/poseidon2digest.nim index 56f072cd..efdb3c6a 100644 --- a/codex/utils/poseidon2digest.nim +++ b/codex/utils/poseidon2digest.nim @@ -12,10 +12,6 @@ import pkg/questionable/results import pkg/libp2p/multihash import pkg/stew/byteutils -import pkg/constantine/math/arithmetic -import pkg/constantine/math/io/io_bigints -import pkg/constantine/math/io/io_fields - import ../merkletree func spongeDigest*( diff --git a/config.nims b/config.nims index f04255ba..45830fad 100644 --- a/config.nims +++ b/config.nims @@ -74,6 +74,8 @@ else: --styleCheck:usages --styleCheck:error --maxLoopIterationsVM:1000000000 +--fieldChecks:on +--warningAsError:"ProveField:on" when (NimMajor, NimMinor) >= (1, 4): --warning:"ObservableStores:off" diff --git a/tests/circuits/fixtures/proof_main.circom b/tests/circuits/fixtures/proof_main.circom deleted file mode 100644 index 1018d4bc..00000000 --- a/tests/circuits/fixtures/proof_main.circom +++ /dev/null @@ -1,4 +0,0 @@ -pragma circom 2.0.0; -include "sample_cells.circom"; -// SampleAndProven( maxDepth, maxLog2NSlots, blockTreeDepth, nFieldElemsPerCell, nSamples ) -component main {public [entropy,dataSetRoot,slotIndex]} = SampleAndProve(32, 8, 5, 67, 5); diff --git a/tests/codex/blockexchange/engine/testblockexc.nim b/tests/codex/blockexchange/engine/testblockexc.nim index 4f1baf75..105f3a8e 100644 --- a/tests/codex/blockexchange/engine/testblockexc.nim +++ b/tests/codex/blockexchange/engine/testblockexc.nim @@ -278,7 +278,7 @@ asyncchecksuite "NetworkStore - multiple nodes": # As soon as we connect the downloader to the blockHolder, the block should # propagate to the downloader... await connectNodes(@[downloader, blockHolder]) - check (await blockRequest).cid == aBlock.cid + check (await blockRequest).tryGet().cid == aBlock.cid check (await downloader.engine.localStore.hasBlock(aBlock.cid)).tryGet() # ... and the bystander should have cancelled the want-have diff --git a/tests/codex/helpers.nim b/tests/codex/helpers.nim index 9192968c..89aeafd1 100644 --- a/tests/codex/helpers.nim +++ b/tests/codex/helpers.nim @@ -31,7 +31,7 @@ func `==`*(a, b: Block): bool = proc calcEcBlocksCount*(blocksCount: int, ecK, ecM: int): int = let rounded = roundUp(blocksCount, ecK) - steps = divUp(blocksCount, ecK) + steps = divUp(rounded, ecK) rounded + (steps * ecM) diff --git a/tests/codex/helpers/mockmarket.nim b/tests/codex/helpers/mockmarket.nim index db84b208..5b8dd894 100644 --- a/tests/codex/helpers/mockmarket.nim +++ b/tests/codex/helpers/mockmarket.nim @@ -29,6 +29,7 @@ type markedAsMissingProofs*: seq[SlotId] canBeMarkedAsMissing: HashSet[SlotId] withdrawn*: seq[RequestId] + proofPointer*: uint8 proofsRequired: HashSet[SlotId] proofsToBeRequired: HashSet[SlotId] proofChallenge*: ProofChallenge @@ -117,7 +118,7 @@ method proofDowntime*(market: MockMarket): Future[uint8] {.async.} = return market.config.proofs.downtime method getPointer*(market: MockMarket, slotId: SlotId): Future[uint8] {.async.} = - return 0 # TODO + return market.proofPointer method requestStorage*(market: MockMarket, request: StorageRequest) {.async.} = market.requested.add(request) diff --git a/tests/codex/node/helpers.nim b/tests/codex/node/helpers.nim index e86c9613..5dd020ab 100644 --- a/tests/codex/node/helpers.nim +++ b/tests/codex/node/helpers.nim @@ -6,6 +6,7 @@ import pkg/chronos import pkg/codex/codextypes import pkg/codex/chunker +import pkg/codex/slots import ../../asynctest @@ -91,7 +92,12 @@ template setupAndTearDown*() {.dirty.} = discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks) engine = BlockExcEngine.new(localStore, wallet, network, discovery, peerStore, pendingBlocks) store = NetworkStore.new(engine, localStore) - node = CodexNodeRef.new(switch, store, engine, blockDiscovery) + node = CodexNodeRef.new( + switch = switch, + networkStore = store, + engine = engine, + prover = Prover.none, + discovery = blockDiscovery) await node.start() diff --git a/tests/codex/node/testnode.nim b/tests/codex/node/testnode.nim index bf444a48..2338085c 100644 --- a/tests/codex/node/testnode.nim +++ b/tests/codex/node/testnode.nim @@ -120,11 +120,11 @@ asyncchecksuite "Test Node - Basic": test "Setup purchase request": let + erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) manifest = await storeDataGetManifest(localStore, chunker) manifestBlock = bt.Block.new( manifest.encode().tryGet(), codec = ManifestCodec).tryGet() - erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) protected = (await erasure.encode(manifest, 3, 2)).tryGet() builder = Poseidon2Builder.new(localStore, protected).tryGet() verifiable = (await builder.buildManifest()).tryGet() diff --git a/tests/codex/sales/helpers/periods.nim b/tests/codex/sales/helpers/periods.nim new file mode 100644 index 00000000..ba1793c2 --- /dev/null +++ b/tests/codex/sales/helpers/periods.nim @@ -0,0 +1,8 @@ +import pkg/codex/market +import ../../helpers/mockclock + +proc advanceToNextPeriod*(clock: MockClock, market: Market) {.async.} = + let periodicity = await market.periodicity() + let period = periodicity.periodOf(clock.now().u256) + let periodEnd = periodicity.periodEnd(period) + clock.set((periodEnd + 1).truncate(int)) diff --git a/tests/codex/sales/states/testinitialproving.nim b/tests/codex/sales/states/testinitialproving.nim index 0659edd3..69355567 100644 --- a/tests/codex/sales/states/testinitialproving.nim +++ b/tests/codex/sales/states/testinitialproving.nim @@ -14,12 +14,15 @@ import ../../../asynctest import ../../examples import ../../helpers import ../../helpers/mockmarket +import ../../helpers/mockclock +import ../helpers/periods asyncchecksuite "sales state 'initialproving'": let proof = Groth16Proof.example let request = StorageRequest.example let slotIndex = (request.ask.slots div 2).u256 let market = MockMarket.new() + let clock = MockClock.new() var state: SaleInitialProving var agent: SalesAgent @@ -31,7 +34,8 @@ asyncchecksuite "sales state 'initialproving'": return success(proof) let context = SalesContext( onProve: onProve.some, - market: market + market: market, + clock: clock ) agent = newSalesAgent(context, request.id, @@ -39,6 +43,12 @@ asyncchecksuite "sales state 'initialproving'": request.some) state = SaleInitialProving.new() + proc allowProofToStart {.async.} = + # wait until we're in initialproving state + await sleepAsync(10.millis) + # it won't start proving until the next period + await clock.advanceToNextPeriod(market) + test "switches to cancelled state when request expires": let next = state.onCancelled(request) check !next of SaleCancelled @@ -47,30 +57,57 @@ asyncchecksuite "sales state 'initialproving'": let next = state.onFailed(request) check !next of SaleFailed - test "switches to filling state when initial proving is complete": - let next = await state.run(agent) - check !next of SaleFilling - check SaleFilling(!next).proof == proof + test "waits for the beginning of the period to get the challenge": + let future = state.run(agent) + await sleepAsync(10.millis) + check not future.finished + await allowProofToStart() + discard await future + + test "waits another period when the proof pointer is about to wrap around": + market.proofPointer = 250 + let future = state.run(agent) + await allowProofToStart() + await sleepAsync(10.millis) + check not future.finished + market.proofPointer = 100 + await allowProofToStart() + discard await future test "onProve callback provides proof challenge": market.proofChallenge = ProofChallenge.example let future = state.run(agent) + await allowProofToStart() + + discard await future check receivedChallenge == market.proofChallenge + test "switches to filling state when initial proving is complete": + let future = state.run(agent) + await allowProofToStart() + let next = await future + + check !next of SaleFilling + check SaleFilling(!next).proof == proof + test "switches to errored state when onProve callback fails": let onProveFailed: OnProve = proc(slot: Slot, challenge: ProofChallenge): Future[?!Groth16Proof] {.async.} = return failure("oh no!") let proofFailedContext = SalesContext( onProve: onProveFailed.some, - market: market + market: market, + clock: clock ) agent = newSalesAgent(proofFailedContext, request.id, slotIndex, request.some) - let next = await state.run(agent) + let future = state.run(agent) + await allowProofToStart() + let next = await future + check !next of SaleErrored diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index 921ece83..b5fe4821 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -19,6 +19,7 @@ import ../helpers/mockmarket import ../helpers/mockclock import ../helpers/always import ../examples +import ./helpers/periods asyncchecksuite "Sales - start": let proof = Groth16Proof.example @@ -176,6 +177,12 @@ asyncchecksuite "Sales": await sales.stop() await repo.stop() + proc allowRequestToStart {.async.} = + # wait until we're in initialproving state + await sleepAsync(10.millis) + # it won't start proving until the next period + await clock.advanceToNextPeriod(market) + proc getAvailability: Availability = let key = availability.id.key.get (waitFor reservations.get(key, Availability)).get @@ -311,7 +318,8 @@ asyncchecksuite "Sales": createAvailability() let origSize = availability.size await market.requestStorage(request) - await sold # allow proving to start + await allowRequestToStart() + await sold # complete request market.slotState[request.slotId(slotIndex)] = SlotState.Finished @@ -379,6 +387,8 @@ asyncchecksuite "Sales": saleFailed = true createAvailability() await market.requestStorage(request) + await allowRequestToStart() + check eventually saleFailed test "makes storage available again when data retrieval fails": @@ -400,12 +410,16 @@ asyncchecksuite "Sales": return success(Groth16Proof.example) createAvailability() await market.requestStorage(request) + await allowRequestToStart() + check eventually provingRequest == request check provingSlot < request.ask.slots.u256 test "fills a slot": createAvailability() await market.requestStorage(request) + await allowRequestToStart() + check eventually market.filled.len > 0 check market.filled[0].requestId == request.id check market.filled[0].slotIndex < request.ask.slots.u256 @@ -421,6 +435,8 @@ asyncchecksuite "Sales": soldSlotIndex = slotIndex createAvailability() await market.requestStorage(request) + await allowRequestToStart() + check eventually soldRequest == request check soldSlotIndex < request.ask.slots.u256 @@ -437,6 +453,8 @@ asyncchecksuite "Sales": clearedSlotIndex = slotIndex createAvailability() await market.requestStorage(request) + await allowRequestToStart() + check eventually clearedRequest == request check clearedSlotIndex < request.ask.slots.u256 diff --git a/tests/integration/nodes.nim b/tests/integration/nodes.nim index 3a9e93cc..a60a55c7 100644 --- a/tests/integration/nodes.nim +++ b/tests/integration/nodes.nim @@ -46,7 +46,7 @@ proc waitUntilOutput*(node: NodeProcess, output: string) = proc waitUntilStarted*(node: NodeProcess) = if node.debug: - sleep(5_000) + sleep(10_000) else: node.waitUntilOutput("Started codex node") diff --git a/tests/integration/testIntegration.nim b/tests/integration/testIntegration.nim index 79d36513..88f27ffd 100644 --- a/tests/integration/testIntegration.nim +++ b/tests/integration/testIntegration.nim @@ -174,16 +174,25 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false: check request.ask.maxSlotLoss == 0'u64 test "nodes negotiate contracts on the marketplace": - let size = 0xFFFFF.u256 + let size = 0xFFFFFF.u256 + let data = await RandomChunker.example(blocks=8) # client 2 makes storage available - discard client2.postAvailability(size=size, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256) + discard client2.postAvailability(size=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256) # client 1 requests storage - let expiry = (await ethProvider.currentTime()) + 30 - let cid = client1.upload("some file contents").get - let id = client1.requestStorage(cid, duration=100.u256, reward=400.u256, proofProbability=3.u256, expiry=expiry, collateral=200.u256).get + let expiry = (await ethProvider.currentTime()) + 5*60 + let cid = client1.upload(data).get + let id = client1.requestStorage( + cid, + duration=10*60.u256, + reward=400.u256, + proofProbability=3.u256, + expiry=expiry, + collateral=200.u256, + nodes = 5, + tolerance = 2).get - check eventually client1.purchaseStateIs(id, "started") + check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000) let purchase = client1.getPurchase(id).get check purchase.error == none string let availabilities = client2.getAvailabilities().get @@ -192,22 +201,33 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false: check newSize > 0 and newSize < size test "node slots gets paid out": + let size = 0xFFFFFF.u256 + let data = await RandomChunker.example(blocks = 8) let marketplace = Marketplace.new(Marketplace.address, ethProvider.getSigner()) let tokenAddress = await marketplace.token() let token = Erc20Token.new(tokenAddress, ethProvider.getSigner()) let reward = 400.u256 - let duration = 100.u256 + let duration = 10*60.u256 + let nodes = 5'u # client 2 makes storage available let startBalance = await token.balanceOf(account2) - discard client2.postAvailability(size=0xFFFFF.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get + discard client2.postAvailability(size=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get # client 1 requests storage - let expiry = (await ethProvider.currentTime()) + 30 - let cid = client1.upload("some file contents").get - let id = client1.requestStorage(cid, duration=duration, reward=reward, proofProbability=3.u256, expiry=expiry, collateral=200.u256).get + let expiry = (await ethProvider.currentTime()) + 5*60 + let cid = client1.upload(data).get + let id = client1.requestStorage( + cid, + duration=duration, + reward=reward, + proofProbability=3.u256, + expiry=expiry, + collateral=200.u256, + nodes = nodes, + tolerance = 2).get - check eventually client1.purchaseStateIs(id, "started") + check eventually(client1.purchaseStateIs(id, "started"), timeout=5*60*1000) let purchase = client1.getPurchase(id).get check purchase.error == none string @@ -216,7 +236,7 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false: # only with new transaction await ethProvider.advanceTime(duration) - check eventually (await token.balanceOf(account2)) - startBalance == duration*reward + check eventually (await token.balanceOf(account2)) - startBalance == duration*reward*nodes.u256 test "request storage fails if nodes and tolerance aren't correct": let cid = client1.upload("some file contents").get