From c7bc28d7232de827cd5414c99e1fb654e14a8ff4 Mon Sep 17 00:00:00 2001 From: Ben Bierens <39762930+benbierens@users.noreply.github.com> Date: Tue, 30 Apr 2024 11:31:06 +0200 Subject: [PATCH 1/6] Reduce logging during file upload (#792) * Removes warning * Updates logging for file upload * Restores trace for placing block and proof in repo store --- codex/blockexchange/engine/discovery.nim | 20 +++++++------------- codex/blockexchange/engine/engine.nim | 10 +++------- codex/blockexchange/engine/pendingblocks.nim | 2 -- codex/blockexchange/network/network.nim | 3 --- codex/discovery.nim | 5 +---- codex/node.nim | 9 +++------ codex/stores/networkstore.nim | 3 --- codex/stores/repostore.nim | 17 +++++++---------- 8 files changed, 21 insertions(+), 48 deletions(-) diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index 267d8f12..5434a5e6 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -68,13 +68,12 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} = try: await b.discoveryQueue.put(cid) except CatchableError as exc: - trace "Exception in discovery loop", exc = exc.msg + warn "Exception in discovery loop", exc = exc.msg logScope: sleep = b.discoveryLoopSleep wanted = b.pendingBlocks.len - trace "About to sleep discovery loop" await sleepAsync(b.discoveryLoopSleep) proc advertiseQueueLoop(b: DiscoveryEngine) {.async.} = @@ -87,10 +86,9 @@ proc advertiseQueueLoop(b: DiscoveryEngine) {.async.} = await sleepAsync(50.millis) trace "Iterating blocks finished." - trace "About to sleep advertise loop", sleep = b.advertiseLoopSleep await sleepAsync(b.advertiseLoopSleep) - trace "Exiting advertise task loop" + info "Exiting advertise task loop" proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} = ## Run advertise tasks @@ -102,7 +100,6 @@ proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} = cid = await b.advertiseQueue.get() if cid in b.inFlightAdvReqs: - trace "Advertise request already in progress", cid continue try: @@ -111,17 +108,15 @@ proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} = b.inFlightAdvReqs[cid] = request codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) - trace "Advertising block", cid, inflight = b.inFlightAdvReqs.len await request finally: b.inFlightAdvReqs.del(cid) codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) - trace "Advertised block", cid, inflight = b.inFlightAdvReqs.len except CatchableError as exc: - trace "Exception in advertise task runner", exc = exc.msg + warn "Exception in advertise task runner", exc = exc.msg - trace "Exiting advertise task runner" + info "Exiting advertise task runner" proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} = ## Run discovery tasks @@ -166,9 +161,9 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} = b.inFlightDiscReqs.del(cid) codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) except CatchableError as exc: - trace "Exception in discovery task runner", exc = exc.msg + warn "Exception in discovery task runner", exc = exc.msg - trace "Exiting discovery task runner" + info "Exiting discovery task runner" proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} = for cid in cids: @@ -183,10 +178,9 @@ proc queueProvideBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} = for cid in cids: if cid notin b.advertiseQueue: try: - trace "Queueing provide block", cid, queue = b.discoveryQueue.len b.advertiseQueue.putNoWait(cid) except CatchableError as exc: - trace "Exception queueing discovery request", exc = exc.msg + warn "Exception queueing discovery request", exc = exc.msg proc start*(b: DiscoveryEngine) {.async.} = ## Start the discengine task diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 291eea14..a4565b10 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -262,8 +262,6 @@ proc blockPresenceHandler*( not b.peers.anyIt( cid in it.peerHaveCids )) proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = - trace "Schedule a task for new blocks", items = blocksDelivery.len - let cids = blocksDelivery.mapIt( it.blk.cid ) @@ -277,7 +275,7 @@ proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.asyn if b.scheduleTask(p): trace "Task scheduled for peer", peer = p.id else: - trace "Unable to schedule task for peer", peer = p.id + warn "Unable to schedule task for peer", peer = p.id break # do next peer @@ -293,7 +291,7 @@ proc cancelBlocks(b: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} = .filterIt(it.failed) if failed.len > 0: - trace "Failed to send block request cancellations to peers", peers = failed.len + warn "Failed to send block request cancellations to peers", peers = failed.len proc getAnnouceCids(blocksDelivery: seq[BlockDelivery]): seq[Cid] = var cids = initHashSet[Cid]() @@ -309,8 +307,6 @@ proc getAnnouceCids(blocksDelivery: seq[BlockDelivery]): seq[Cid] = return cids.toSeq proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = - trace "Resolving blocks", blocks = blocksDelivery.len - b.pendingBlocks.resolve(blocksDelivery) await b.scheduleTasks(blocksDelivery) let announceCids = getAnnouceCids(blocksDelivery) @@ -618,7 +614,7 @@ proc blockexcTaskRunner(b: BlockExcEngine) {.async.} = trace "Got new task from queue", peerId = peerCtx.id await b.taskHandler(peerCtx) - trace "Exiting blockexc task runner" + info "Exiting blockexc task runner" proc new*( T: type BlockExcEngine, diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index 55a9da4c..0f8cd439 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -102,8 +102,6 @@ proc resolve*( trace "Block retrieval time", retrievalDurationUs, address = bd.address else: trace "Block handle already finished", address = bd.address - do: - warn "Attempting to resolve block that's not currently a pending block", address = bd.address proc setInFlight*( p: PendingBlocksManager, diff --git a/codex/blockexchange/network/network.nim b/codex/blockexchange/network/network.nim index 336e627a..d193779d 100644 --- a/codex/blockexchange/network/network.nim +++ b/codex/blockexchange/network/network.nim @@ -147,9 +147,6 @@ proc sendWantCancellations*( addresses: seq[BlockAddress]): Future[void] {.async.} = ## Informs a remote peer that we're no longer interested in a set of blocks ## - - trace "Sending block request cancellation to peer", addrs = addresses.len, peer = id - await b.sendWantList(id = id, addresses = addresses, cancel = true) proc handleBlocksDelivery( diff --git a/codex/discovery.nim b/codex/discovery.nim index af269324..d3cefe05 100644 --- a/codex/discovery.nim +++ b/codex/discovery.nim @@ -84,16 +84,13 @@ method find*( method provide*(d: Discovery, cid: Cid) {.async, base.} = ## Provide a bock Cid ## - - trace "Providing block", cid let nodes = await d.protocol.addProvider( cid.toNodeId(), d.providerRecord.get) if nodes.len <= 0: - trace "Couldn't provide to any nodes!" + warn "Couldn't provide to any nodes!" - trace "Provided to nodes", nodes = nodes.len method find*( d: Discovery, diff --git a/codex/node.nim b/codex/node.nim index effde9cf..f7dffb00 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -294,7 +294,7 @@ proc store*( ## Save stream contents as dataset with given blockSize ## to nodes's BlockStore, and return Cid of its manifest ## - trace "Storing data" + info "Storing data" let hcodec = Sha256HashCodec @@ -308,8 +308,6 @@ proc store*( let chunk = await chunker.getBytes(); chunk.len > 0): - trace "Got data from stream", len = chunk.len - without mhash =? MultiHash.digest($hcodec, chunk).mapFailure, err: return failure(err) @@ -322,7 +320,7 @@ proc store*( cids.add(cid) if err =? (await self.networkStore.putBlock(blk)).errorOption: - trace "Unable to store block", cid = blk.cid, err = err.msg + error "Unable to store block", cid = blk.cid, err = err.msg return failure(&"Unable to store block {blk.cid}") except CancelledError as exc: raise exc @@ -353,7 +351,7 @@ proc store*( codec = dataCodec) without manifestBlk =? await self.storeManifest(manifest), err: - trace "Unable to store manifest" + error "Unable to store manifest" return failure(err) info "Stored data", manifestCid = manifestBlk.cid, @@ -361,7 +359,6 @@ proc store*( blocks = manifest.blocksCount, datasetSize = manifest.datasetSize - # Announce manifest await self.discovery.provide(manifestBlk.cid) await self.discovery.provide(treeCid) diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index 39835e23..ce0761d6 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -69,9 +69,6 @@ method putBlock*( ttl = Duration.none): Future[?!void] {.async.} = ## Store block locally and notify the network ## - - trace "Putting block into network store", cid = blk.cid - let res = await self.localStore.putBlock(blk, ttl) if res.isErr: return res diff --git a/codex/stores/repostore.nim b/codex/stores/repostore.nim index dc63f4cd..5cabc726 100644 --- a/codex/stores/repostore.nim +++ b/codex/stores/repostore.nim @@ -119,7 +119,7 @@ method putCidAndProof*( without key =? createBlockCidAndProofMetadataKey(treeCid, index), err: return failure(err) - trace "Storing block cid and proof with key", key + trace "Storing block cid and proof", blockCid, key let value = (blockCid, proof).encode() @@ -313,7 +313,7 @@ method putBlock*( return success() without key =? makePrefixKey(self.postFixLen, blk.cid), err: - trace "Error getting key from provider", err = err.msg + warn "Error getting key from provider", err = err.msg return failure(err) if await key in self.repoDs: @@ -325,8 +325,6 @@ method putBlock*( return failure( newException(QuotaUsedError, "Cannot store block, quota used!")) - trace "Storing block with key", key - var batch: seq[BatchEntry] @@ -334,22 +332,21 @@ method putBlock*( used = self.quotaUsedBytes + blk.data.len.uint if err =? (await self.repoDs.put(key, blk.data)).errorOption: - trace "Error storing block", err = err.msg + error "Error storing block", err = err.msg return failure(err) - trace "Updating quota", used batch.add((QuotaUsedKey, @(used.uint64.toBytesBE))) without blockExpEntry =? self.getBlockExpirationEntry(blk.cid, ttl), err: - trace "Unable to create block expiration metadata key", err = err.msg + warn "Unable to create block expiration metadata key", err = err.msg return failure(err) batch.add(blockExpEntry) if err =? (await self.metaDs.put(batch)).errorOption: - trace "Error updating quota bytes", err = err.msg + error "Error updating quota bytes", err = err.msg if err =? (await self.repoDs.delete(key)).errorOption: - trace "Error deleting block after failed quota update", err = err.msg + error "Error deleting block after failed quota update", err = err.msg return failure(err) return failure(err) @@ -357,7 +354,7 @@ method putBlock*( self.quotaUsedBytes = used inc self.totalBlocks if isErr (await self.persistTotalBlocksCount()): - trace "Unable to update block total metadata" + warn "Unable to update block total metadata" return failure("Unable to update block total metadata") self.updateMetrics() From c58d4d7dbe9dd4663391acbfbdc2985735067bee Mon Sep 17 00:00:00 2001 From: Ben Bierens <39762930+benbierens@users.noreply.github.com> Date: Tue, 30 Apr 2024 17:27:17 +0200 Subject: [PATCH 2/6] Bug: Large manifests encoding (#800) * Adds test for encoding/decoding manifests with large datasets * switches datasetSize to uint64 --- codex/manifest/coders.nim | 8 ++++---- tests/codex/testmanifest.nim | 10 ++++++++++ 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/codex/manifest/coders.nim b/codex/manifest/coders.nim index 2e824e4d..e36039c7 100644 --- a/codex/manifest/coders.nim +++ b/codex/manifest/coders.nim @@ -66,7 +66,7 @@ proc encode*(manifest: Manifest): ?!seq[byte] = var header = initProtoBuffer() header.write(1, manifest.treeCid.data.buffer) header.write(2, manifest.blockSize.uint32) - header.write(3, manifest.datasetSize.uint32) + header.write(3, manifest.datasetSize.uint64) header.write(4, manifest.codec.uint32) header.write(5, manifest.hcodec.uint32) header.write(6, manifest.version.uint32) @@ -75,7 +75,7 @@ proc encode*(manifest: Manifest): ?!seq[byte] = erasureInfo.write(1, manifest.ecK.uint32) erasureInfo.write(2, manifest.ecM.uint32) erasureInfo.write(3, manifest.originalTreeCid.data.buffer) - erasureInfo.write(4, manifest.originalDatasetSize.uint32) + erasureInfo.write(4, manifest.originalDatasetSize.uint64) erasureInfo.write(5, manifest.protectedStrategy.uint32) if manifest.verifiable: @@ -106,12 +106,12 @@ proc decode*(_: type Manifest, data: openArray[byte]): ?!Manifest = pbVerificationInfo: ProtoBuffer treeCidBuf: seq[byte] originalTreeCid: seq[byte] - datasetSize: uint32 + datasetSize: uint64 codec: uint32 hcodec: uint32 version: uint32 blockSize: uint32 - originalDatasetSize: uint32 + originalDatasetSize: uint64 ecK, ecM: uint32 protectedStrategy: uint32 verifyRoot: seq[byte] diff --git a/tests/codex/testmanifest.nim b/tests/codex/testmanifest.nim index 10fb7772..188cc957 100644 --- a/tests/codex/testmanifest.nim +++ b/tests/codex/testmanifest.nim @@ -57,6 +57,16 @@ checksuite "Manifest": check: encodeDecode(manifest) == manifest + test "Should encode/decode large manifest": + let large = Manifest.new( + treeCid = Cid.example, + blockSize = (64 * 1024).NBytes, + datasetSize = (5 * 1024).MiBs + ) + + check: + encodeDecode(large) == large + test "Should encode/decode to/from protected manifest": check: encodeDecode(protectedManifest) == protectedManifest From 4312e5ca361c91b99508b4d4968e641a57e33a02 Mon Sep 17 00:00:00 2001 From: Ben Bierens <39762930+benbierens@users.noreply.github.com> Date: Mon, 6 May 2024 10:12:49 +0200 Subject: [PATCH 3/6] Use a copy of slots hashset for iteration. (#801) --- codex/validation.nim | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/codex/validation.nim b/codex/validation.nim index b549997e..7b1cdd1a 100644 --- a/codex/validation.nim +++ b/codex/validation.nim @@ -34,6 +34,12 @@ proc new*( proc slots*(validation: Validation): seq[SlotId] = validation.slots.toSeq +proc iterateSlots(validation: Validation, action: proc(s: SlotId): Future[void] {.async.}) {.async.} = + # Copy of hashSet, for iteration. + let slots = validation.slots + for slotId in slots: + await action(slotId) + proc getCurrentPeriod(validation: Validation): UInt256 = return validation.periodicity.periodOf(validation.clock.now().u256) @@ -55,11 +61,12 @@ proc subscribeSlotFilled(validation: Validation) {.async.} = proc removeSlotsThatHaveEnded(validation: Validation) {.async.} = var ended: HashSet[SlotId] - for slotId in validation.slots: + proc onSlot(slotId: SlotId) {.async.} = let state = await validation.market.slotState(slotId) if state != SlotState.Filled: trace "Removing slot", slotId ended.incl(slotId) + await validation.iterateSlots(onSlot) validation.slots.excl(ended) proc markProofAsMissing(validation: Validation, @@ -81,9 +88,10 @@ proc markProofAsMissing(validation: Validation, error "Marking proof as missing failed", msg = e.msg proc markProofsAsMissing(validation: Validation) {.async.} = - for slotId in validation.slots: + proc onSlot(slotId: SlotId) {.async.} = let previousPeriod = validation.getCurrentPeriod() - 1 await validation.markProofAsMissing(slotId, previousPeriod) + await validation.iterateSlots(onSlot) proc run(validation: Validation) {.async.} = trace "Validation started" From 1a0d2d424e45dc2c6277fd9f19c841cf946bde6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Uhl=C3=AD=C5=99?= Date: Mon, 6 May 2024 17:35:46 +0200 Subject: [PATCH 4/6] feat: expiry specified with number of seconds (#793) --- codex/contracts/market.nim | 5 +++ codex/contracts/marketplace.nim | 1 + codex/market.nim | 4 +++ codex/purchasing.nim | 5 --- codex/purchasing/states/submitted.nim | 2 +- codex/rest/api.nim | 15 +++------ codex/sales/salesagent.nim | 5 ++- openapi.yaml | 3 +- tests/codex/helpers/mockmarket.nim | 5 +++ tests/codex/sales/testsales.nim | 10 ++++-- tests/codex/sales/testsalesagent.nim | 21 +++---------- tests/codex/testpurchasing.nim | 43 ++++++++++++-------------- tests/contracts/testContracts.nim | 3 +- tests/contracts/testMarket.nim | 26 ++++++++-------- tests/examples.nim | 2 +- tests/integration/codexclient.nim | 6 ++-- tests/integration/marketplacesuite.nim | 4 +-- tests/integration/testIntegration.nim | 39 ++++++++--------------- vendor/codex-contracts-eth | 2 +- 19 files changed, 92 insertions(+), 109 deletions(-) diff --git a/codex/contracts/market.nim b/codex/contracts/market.nim index dba46163..4e228670 100644 --- a/codex/contracts/market.nim +++ b/codex/contracts/market.nim @@ -126,6 +126,11 @@ method getRequestEnd*(market: OnChainMarket, convertEthersError: return await market.contract.requestEnd(id) +method requestExpiresAt*(market: OnChainMarket, + id: RequestId): Future[SecondsSince1970] {.async.} = + convertEthersError: + return await market.contract.requestExpiry(id) + method getHost(market: OnChainMarket, requestId: RequestId, slotIndex: UInt256): Future[?Address] {.async.} = diff --git a/codex/contracts/marketplace.nim b/codex/contracts/marketplace.nim index 805681fe..2326ed3f 100644 --- a/codex/contracts/marketplace.nim +++ b/codex/contracts/marketplace.nim @@ -55,6 +55,7 @@ proc mySlots*(marketplace: Marketplace): seq[SlotId] {.contract, view.} proc requestState*(marketplace: Marketplace, requestId: RequestId): RequestState {.contract, view.} proc slotState*(marketplace: Marketplace, slotId: SlotId): SlotState {.contract, view.} proc requestEnd*(marketplace: Marketplace, requestId: RequestId): SecondsSince1970 {.contract, view.} +proc requestExpiry*(marketplace: Marketplace, requestId: RequestId): SecondsSince1970 {.contract, view.} proc proofTimeout*(marketplace: Marketplace): UInt256 {.contract, view.} diff --git a/codex/market.nim b/codex/market.nim index 76befc75..b521c395 100644 --- a/codex/market.nim +++ b/codex/market.nim @@ -84,6 +84,10 @@ method getRequestEnd*(market: Market, id: RequestId): Future[SecondsSince1970] {.base, async.} = raiseAssert("not implemented") +method requestExpiresAt*(market: Market, + id: RequestId): Future[SecondsSince1970] {.base, async.} = + raiseAssert("not implemented") + method getHost*(market: Market, requestId: RequestId, slotIndex: UInt256): Future[?Address] {.base, async.} = diff --git a/codex/purchasing.nim b/codex/purchasing.nim index a53a5150..ca92ece9 100644 --- a/codex/purchasing.nim +++ b/codex/purchasing.nim @@ -18,18 +18,15 @@ type clock: Clock purchases: Table[PurchaseId, Purchase] proofProbability*: UInt256 - requestExpiryInterval*: UInt256 PurchaseTimeout* = Timeout const DefaultProofProbability = 100.u256 -const DefaultRequestExpiryInterval = (10 * 60).u256 proc new*(_: type Purchasing, market: Market, clock: Clock): Purchasing = Purchasing( market: market, clock: clock, proofProbability: DefaultProofProbability, - requestExpiryInterval: DefaultRequestExpiryInterval, ) proc load*(purchasing: Purchasing) {.async.} = @@ -52,8 +49,6 @@ proc populate*(purchasing: Purchasing, result = request if result.ask.proofProbability == 0.u256: result.ask.proofProbability = purchasing.proofProbability - if result.expiry == 0.u256: - result.expiry = (purchasing.clock.now().u256 + purchasing.requestExpiryInterval) if result.nonce == Nonce.default: var id = result.nonce.toArray doAssert randomBytes(id) == 32 diff --git a/codex/purchasing/states/submitted.nim b/codex/purchasing/states/submitted.nim index 48666c46..5532c850 100644 --- a/codex/purchasing/states/submitted.nim +++ b/codex/purchasing/states/submitted.nim @@ -34,7 +34,7 @@ method run*(state: PurchaseSubmitted, machine: Machine): Future[?State] {.async. await subscription.unsubscribe() proc withTimeout(future: Future[void]) {.async.} = - let expiry = request.expiry.truncate(int64) + 1 + let expiry = (await market.requestExpiresAt(request.id)) + 1 trace "waiting for request fulfillment or expiry", expiry await future.withTimeout(clock, expiry) diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 1174886a..5478516d 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -399,7 +399,7 @@ proc initPurchasingApi(node: CodexNodeRef, router: var RestRouter) = ## duration - the duration of the request in seconds ## proofProbability - how often storage proofs are required ## reward - the maximum amount of tokens paid per second per slot to hosts the client is willing to pay - ## expiry - timestamp, in seconds, when the request expires if the Request does not find requested amount of nodes to host the data + ## expiry - specifies threshold in seconds from now when the request expires if the Request does not find requested amount of nodes to host the data ## nodes - number of nodes the content should be stored on ## tolerance - allowed number of nodes that can be lost before content is lost ## colateral - requested collateral from hosts when they fill slot @@ -425,15 +425,8 @@ proc initPurchasingApi(node: CodexNodeRef, router: var RestRouter) = without expiry =? params.expiry: return RestApiResponse.error(Http400, "Expiry required") - if node.clock.isNil: - return RestApiResponse.error(Http500) - - if expiry <= node.clock.now.u256: - return RestApiResponse.error(Http400, "Expiry needs to be in future. Now: " & $node.clock.now) - - let expiryLimit = node.clock.now.u256 + params.duration - if expiry > expiryLimit: - return RestApiResponse.error(Http400, "Expiry has to be before the request's end (now + duration). Limit: " & $expiryLimit) + if expiry <= 0 or expiry >= params.duration: + return RestApiResponse.error(Http400, "Expiry needs value bigger then zero and smaller then the request's duration") without purchaseId =? await node.requestStorage( cid, @@ -494,7 +487,7 @@ proc initPurchasingApi(node: CodexNodeRef, router: var RestRouter) = proc initNodeApi(node: CodexNodeRef, conf: CodexConf, router: var RestRouter) = ## various node management api's - ## + ## router.api( MethodGet, "/api/codex/v1/spr") do () -> RestApiResponse: diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index 337c643c..5bb0e9fb 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -72,8 +72,11 @@ proc subscribeCancellation(agent: SalesAgent) {.async.} = without request =? data.request: return + let market = agent.context.market + let expiry = await market.requestExpiresAt(data.requestId) + while true: - let deadline = max(clock.now, request.expiry.truncate(int64)) + 1 + let deadline = max(clock.now, expiry) + 1 trace "Waiting for request to be cancelled", now=clock.now, expiry=deadline await clock.waitUntil(deadline) diff --git a/openapi.yaml b/openapi.yaml index cf9b6b6c..49c75e64 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -213,8 +213,7 @@ components: description: Number as decimal string that represents how much collateral is asked from hosts that wants to fill a slots expiry: type: string - description: Number as decimal string that represents expiry time of the request (in unix timestamp) - + description: Number as decimal string that represents expiry threshold in seconds from when the Request is submitted. When the threshold is reached and the Request does not find requested amount of nodes to host the data, the Request is voided. The number of seconds can not be higher then the Request's duration itself. StorageAsk: type: object required: diff --git a/tests/codex/helpers/mockmarket.nim b/tests/codex/helpers/mockmarket.nim index 5b8dd894..7c36bee7 100644 --- a/tests/codex/helpers/mockmarket.nim +++ b/tests/codex/helpers/mockmarket.nim @@ -20,6 +20,7 @@ type activeSlots*: Table[Address, seq[SlotId]] requested*: seq[StorageRequest] requestEnds*: Table[RequestId, SecondsSince1970] + requestExpiry*: Table[RequestId, SecondsSince1970] requestState*: Table[RequestId, RequestState] slotState*: Table[SlotId, SlotState] fulfilled*: seq[Fulfillment] @@ -165,6 +166,10 @@ method getRequestEnd*(market: MockMarket, id: RequestId): Future[SecondsSince1970] {.async.} = return market.requestEnds[id] +method requestExpiresAt*(market: MockMarket, + id: RequestId): Future[SecondsSince1970] {.async.} = + return market.requestExpiry[id] + method getHost*(market: MockMarket, requestId: RequestId, slotIndex: UInt256): Future[?Address] {.async.} = diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index c25a0572..222ba0ff 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -473,6 +473,9 @@ asyncchecksuite "Sales": check eventually (await reservations.all(Availability)).get == @[availability] test "makes storage available again when request expires": + let expiry = getTime().toUnix() + 10 + market.requestExpiry[request.id] = expiry + let origSize = availability.freeSize sales.onStore = proc(request: StorageRequest, slot: UInt256, @@ -486,11 +489,14 @@ asyncchecksuite "Sales": # would otherwise not set the timeout early enough as it uses `clock.now` in the deadline calculation. await sleepAsync(chronos.milliseconds(100)) market.requestState[request.id]=RequestState.Cancelled - clock.set(request.expiry.truncate(int64)+1) + clock.set(expiry + 1) check eventually (await reservations.all(Availability)).get == @[availability] check getAvailability().freeSize == origSize test "verifies that request is indeed expired from onchain before firing onCancelled": + let expiry = getTime().toUnix() + 10 + market.requestExpiry[request.id] = expiry + let origSize = availability.freeSize sales.onStore = proc(request: StorageRequest, slot: UInt256, @@ -504,7 +510,7 @@ asyncchecksuite "Sales": # If we would not await, then the `clock.set` would run "too fast" as the `subscribeCancellation()` # would otherwise not set the timeout early enough as it uses `clock.now` in the deadline calculation. await sleepAsync(chronos.milliseconds(100)) - clock.set(request.expiry.truncate(int64)+1) + clock.set(expiry + 1) check getAvailability().freeSize == 0 market.requestState[request.id]=RequestState.Cancelled # Now "on-chain" is also expired diff --git a/tests/codex/sales/testsalesagent.nim b/tests/codex/sales/testsalesagent.nim index a71ac652..215f8bb4 100644 --- a/tests/codex/sales/testsalesagent.nim +++ b/tests/codex/sales/testsalesagent.nim @@ -41,19 +41,7 @@ method run*(state: MockErrorState, machine: Machine): Future[?State] {.async.} = raise newException(ValueError, "failure") asyncchecksuite "Sales agent": - var request = StorageRequest( - ask: StorageAsk( - slots: 4, - slotSize: 100.u256, - duration: 60.u256, - reward: 10.u256, - ), - content: StorageContent( - cid: "some cid" - ), - expiry: (getTime() + initDuration(hours=1)).toUnix.u256 - ) - + let request = StorageRequest.example var agent: SalesAgent var context: SalesContext var slotIndex: UInt256 @@ -62,6 +50,7 @@ asyncchecksuite "Sales agent": setup: market = MockMarket.new() + market.requestExpiry[request.id] = getTime().toUnix() + request.expiry.truncate(int64) clock = MockClock.new() context = SalesContext(market: market, clock: clock) slotIndex = 0.u256 @@ -109,7 +98,7 @@ asyncchecksuite "Sales agent": agent.start(MockState.new()) await agent.subscribe() market.requestState[request.id] = RequestState.Cancelled - clock.set(request.expiry.truncate(int64) + 1) + clock.set(market.requestExpiry[request.id] + 1) check eventually onCancelCalled for requestState in {RequestState.New, Started, Finished, Failed}: @@ -117,7 +106,7 @@ asyncchecksuite "Sales agent": agent.start(MockState.new()) await agent.subscribe() market.requestState[request.id] = requestState - clock.set(request.expiry.truncate(int64) + 1) + clock.set(market.requestExpiry[request.id] + 1) await sleepAsync(100.millis) check not onCancelCalled @@ -126,7 +115,7 @@ asyncchecksuite "Sales agent": agent.start(MockState.new()) await agent.subscribe() market.requestState[request.id] = requestState - clock.set(request.expiry.truncate(int64) + 1) + clock.set(market.requestExpiry[request.id] + 1) check eventually agent.data.cancelled.finished test "cancelled future is finished (cancelled) when onFulfilled called": diff --git a/tests/codex/testpurchasing.nim b/tests/codex/testpurchasing.nim index 3294e4e5..25504732 100644 --- a/tests/codex/testpurchasing.nim +++ b/tests/codex/testpurchasing.nim @@ -19,7 +19,7 @@ asyncchecksuite "Purchasing": var purchasing: Purchasing var market: MockMarket var clock: MockClock - var request: StorageRequest + var request, populatedRequest: StorageRequest setup: market = MockMarket.new() @@ -34,6 +34,12 @@ asyncchecksuite "Purchasing": ) ) + # We need request which has stable ID during the whole Purchasing pipeline + # for some tests (related to expiry). Because of Purchasing.populate() we need + # to do the steps bellow. + populatedRequest = StorageRequest.example + populatedRequest.client = await market.getSigner() + test "submits a storage request when asked": discard await purchasing.purchase(request) check eventually market.requested.len > 0 @@ -63,23 +69,6 @@ asyncchecksuite "Purchasing": check eventually market.requested.len > 0 check market.requested[0].ask.proofProbability == 42.u256 - test "has a default value for request expiration interval": - check purchasing.requestExpiryInterval != 0.u256 - - test "can change default value for request expiration interval": - purchasing.requestExpiryInterval = 42.u256 - let start = getTime().toUnix() - discard await purchasing.purchase(request) - check eventually market.requested.len > 0 - check market.requested[0].expiry == (start + 42).u256 - - test "can override expiry time per request": - let expiry = (getTime().toUnix() + 42).u256 - request.expiry = expiry - discard await purchasing.purchase(request) - check eventually market.requested.len > 0 - check market.requested[0].expiry == expiry - test "includes a random nonce in every storage request": discard await purchasing.purchase(request) discard await purchasing.purchase(request) @@ -92,29 +81,37 @@ asyncchecksuite "Purchasing": check market.requested[0].client == await market.getSigner() test "succeeds when request is finished": - let purchase = await purchasing.purchase(request) + market.requestExpiry[populatedRequest.id] = getTime().toUnix() + 10 + let purchase = await purchasing.purchase(populatedRequest) + check eventually market.requested.len > 0 let request = market.requested[0] let requestEnd = getTime().toUnix() + 42 market.requestEnds[request.id] = requestEnd + market.emitRequestFulfilled(request.id) clock.set(requestEnd + 1) await purchase.wait() check purchase.error.isNone test "fails when request times out": - let purchase = await purchasing.purchase(request) + let expiry = getTime().toUnix() + 10 + market.requestExpiry[populatedRequest.id] = expiry + let purchase = await purchasing.purchase(populatedRequest) check eventually market.requested.len > 0 let request = market.requested[0] - clock.set(request.expiry.truncate(int64) + 1) + + clock.set(expiry + 1) expect PurchaseTimeout: await purchase.wait() test "checks that funds were withdrawn when purchase times out": - let purchase = await purchasing.purchase(request) + let expiry = getTime().toUnix() + 10 + market.requestExpiry[populatedRequest.id] = expiry + let purchase = await purchasing.purchase(populatedRequest) check eventually market.requested.len > 0 let request = market.requested[0] - clock.set(request.expiry.truncate(int64) + 1) + clock.set(expiry + 1) expect PurchaseTimeout: await purchase.wait() check market.withdrawn == @[request.id] diff --git a/tests/contracts/testContracts.nim b/tests/contracts/testContracts.nim index 556efbd1..019de7aa 100644 --- a/tests/contracts/testContracts.nim +++ b/tests/contracts/testContracts.nim @@ -85,7 +85,8 @@ ethersuite "Marketplace contracts": check endBalance == (startBalance + request.ask.duration * request.ask.reward + request.ask.collateral) test "cannot mark proofs missing for cancelled request": - await ethProvider.advanceTimeTo(request.expiry + 1) + let expiry = await marketplace.requestExpiry(request.id) + await ethProvider.advanceTimeTo((expiry + 1).u256) switchAccount(client) let missingPeriod = periodicity.periodOf(await ethProvider.currentTime()) await ethProvider.advanceTime(periodicity.seconds) diff --git a/tests/contracts/testMarket.nim b/tests/contracts/testMarket.nim index e0a287a3..eb1f1134 100644 --- a/tests/contracts/testMarket.nim +++ b/tests/contracts/testMarket.nim @@ -32,6 +32,10 @@ ethersuite "On-Chain Market": let currentPeriod = periodicity.periodOf(await ethProvider.currentTime()) await ethProvider.advanceTimeTo(periodicity.periodEnd(currentPeriod) + 1) + proc advanceToCancelledRequest(request: StorageRequest) {.async.} = + let expiry = (await market.requestExpiresAt(request.id)) + 1 + await ethProvider.advanceTimeTo(expiry.u256) + proc waitUntilProofRequired(slotId: SlotId) {.async.} = await advanceToNextPeriod() while not ( @@ -70,22 +74,19 @@ ethersuite "On-Chain Market": test "supports withdrawing of funds": await market.requestStorage(request) - await ethProvider.advanceTimeTo(request.expiry + 1) + await advanceToCancelledRequest(request) await market.withdrawFunds(request.id) test "supports request subscriptions": var receivedIds: seq[RequestId] var receivedAsks: seq[StorageAsk] - var receivedExpirys: seq[UInt256] proc onRequest(id: RequestId, ask: StorageAsk, expiry: UInt256) = receivedIds.add(id) receivedAsks.add(ask) - receivedExpirys.add(expiry) let subscription = await market.subscribeRequests(onRequest) await market.requestStorage(request) check receivedIds == @[request.id] check receivedAsks == @[request.ask] - check receivedExpirys == @[request.expiry] await subscription.unsubscribe() test "supports filling of slots": @@ -216,7 +217,7 @@ ethersuite "On-Chain Market": receivedIds.add(id) let subscription = await market.subscribeRequestCancelled(request.id, onRequestCancelled) - await ethProvider.advanceTimeTo(request.expiry + 1) + await advanceToCancelledRequest(request) await market.withdrawFunds(request.id) check receivedIds == @[request.id] await subscription.unsubscribe() @@ -255,7 +256,7 @@ ethersuite "On-Chain Market": receivedIds.add(requestId) let subscription = await market.subscribeRequestCancelled(request.id, onRequestCancelled) - await ethProvider.advanceTimeTo(request.expiry + 1) # shares expiry with otherRequest + advanceToCancelledRequest(otherRequest) # shares expiry with otherRequest await market.withdrawFunds(otherRequest.id) check receivedIds.len == 0 await market.withdrawFunds(request.id) @@ -338,13 +339,12 @@ ethersuite "On-Chain Market": # 6 blocks, we only need to check 5 "blocks ago". We don't need to check the # `approve` for the first `requestStorage` call, so that's 1 less again = 4 # "blocks ago". - check eventually ( - (await market.queryPastStorageRequests(5)) == - @[ - PastStorageRequest(requestId: request.id, ask: request.ask, expiry: request.expiry), - PastStorageRequest(requestId: request1.id, ask: request1.ask, expiry: request1.expiry), - PastStorageRequest(requestId: request2.id, ask: request2.ask, expiry: request2.expiry) - ]) + + proc getsPastRequest(): Future[bool] {.async.} = + let reqs = await market.queryPastStorageRequests(5) + reqs.mapIt(it.requestId) == @[request.id, request1.id, request2.id] + + check eventually await getsPastRequest() test "past event query can specify negative `blocksAgo` parameter": await market.requestStorage(request) diff --git a/tests/examples.nim b/tests/examples.nim index bb506438..17391e1a 100644 --- a/tests/examples.nim +++ b/tests/examples.nim @@ -59,7 +59,7 @@ proc example*(_: type StorageRequest): StorageRequest = cid: "zb2rhheVmk3bLks5MgzTqyznLu1zqGH5jrfTA1eAZXrjx7Vob", merkleRoot: array[32, byte].example ), - expiry: (getTime() + 1.hours).toUnix.u256, + expiry:(60 * 60).u256, # 1 hour , nonce: Nonce.example ) diff --git a/tests/integration/codexclient.nim b/tests/integration/codexclient.nim index b4230195..4da96a5d 100644 --- a/tests/integration/codexclient.nim +++ b/tests/integration/codexclient.nim @@ -70,7 +70,7 @@ proc requestStorageRaw*( reward: UInt256, proofProbability: UInt256, collateral: UInt256, - expiry: UInt256 = 0.u256, + expiry: uint = 0, nodes: uint = 1, tolerance: uint = 0 ): Response = @@ -88,7 +88,7 @@ proc requestStorageRaw*( } if expiry != 0: - json["expiry"] = %expiry + json["expiry"] = %($expiry) return client.http.post(url, $json) @@ -98,7 +98,7 @@ proc requestStorage*( duration: UInt256, reward: UInt256, proofProbability: UInt256, - expiry: UInt256, + expiry: uint, collateral: UInt256, nodes: uint = 1, tolerance: uint = 0 diff --git a/tests/integration/marketplacesuite.nim b/tests/integration/marketplacesuite.nim index 82a82619..2b81bdd8 100644 --- a/tests/integration/marketplacesuite.nim +++ b/tests/integration/marketplacesuite.nim @@ -68,11 +68,9 @@ template marketplacesuite*(name: string, body: untyped) = expiry: uint64 = 4.periods, nodes = providers().len, tolerance = 0): Future[PurchaseId] {.async.} = - let expiry = (await ethProvider.currentTime()) + expiry.u256 - let id = client.requestStorage( cid, - expiry=expiry, + expiry=expiry.uint, duration=duration.u256, proofProbability=proofProbability.u256, collateral=collateral, diff --git a/tests/integration/testIntegration.nim b/tests/integration/testIntegration.nim index a18259aa..38a16b3a 100644 --- a/tests/integration/testIntegration.nim +++ b/tests/integration/testIntegration.nim @@ -112,10 +112,9 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false: check availability in client1.getAvailabilities().get test "node handles storage request": - let expiry = (await ethProvider.currentTime()) + 10 let cid = client1.upload("some file contents").get - let id1 = client1.requestStorage(cid, duration=100.u256, reward=2.u256, proofProbability=3.u256, expiry=expiry, collateral=200.u256).get - let id2 = client1.requestStorage(cid, duration=400.u256, reward=5.u256, proofProbability=6.u256, expiry=expiry, collateral=201.u256).get + let id1 = client1.requestStorage(cid, duration=100.u256, reward=2.u256, proofProbability=3.u256, expiry=10, collateral=200.u256).get + let id2 = client1.requestStorage(cid, duration=400.u256, reward=5.u256, proofProbability=6.u256, expiry=10, collateral=201.u256).get check id1 != id2 test "node retrieves purchase status": @@ -124,13 +123,12 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false: let chunker = RandomChunker.new(rng, size = DefaultBlockSize * 2, chunkSize = DefaultBlockSize * 2) let data = await chunker.getBytes() let cid = client1.upload(byteutils.toHex(data)).get - let expiry = (await ethProvider.currentTime()) + 30 let id = client1.requestStorage( cid, duration=100.u256, reward=2.u256, proofProbability=3.u256, - expiry=expiry, + expiry=30, collateral=200.u256, nodes=2, tolerance=1).get @@ -139,16 +137,15 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false: check request.ask.duration == 100.u256 check request.ask.reward == 2.u256 check request.ask.proofProbability == 3.u256 - check request.expiry == expiry + check request.expiry == 30 check request.ask.collateral == 200.u256 check request.ask.slots == 2'u64 check request.ask.maxSlotLoss == 1'u64 # TODO: We currently do not support encoding single chunks # test "node retrieves purchase status with 1 chunk": - # let expiry = (await ethProvider.currentTime()) + 30 # let cid = client1.upload("some file contents").get - # let id = client1.requestStorage(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, expiry=expiry, collateral=200.u256, nodes=2, tolerance=1).get + # let id = client1.requestStorage(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, expiry=30, collateral=200.u256, nodes=2, tolerance=1).get # let request = client1.getPurchase(id).get.request.get # check request.ask.duration == 1.u256 # check request.ask.reward == 2.u256 @@ -159,13 +156,12 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false: # check request.ask.maxSlotLoss == 1'u64 test "node remembers purchase status after restart": - let expiry = (await ethProvider.currentTime()) + 30 let cid = client1.upload("some file contents").get let id = client1.requestStorage(cid, duration=100.u256, reward=2.u256, proofProbability=3.u256, - expiry=expiry, + expiry=30, collateral=200.u256).get check eventually client1.purchaseStateIs(id, "submitted") @@ -177,7 +173,7 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false: check request.ask.duration == 100.u256 check request.ask.reward == 2.u256 check request.ask.proofProbability == 3.u256 - check request.expiry == expiry + check request.expiry == 30 check request.ask.collateral == 200.u256 check request.ask.slots == 1'u64 check request.ask.maxSlotLoss == 0'u64 @@ -189,14 +185,13 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false: let availability = client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get # client 1 requests storage - let expiry = (await ethProvider.currentTime()) + 5*60 let cid = client1.upload(data).get let id = client1.requestStorage( cid, duration=10*60.u256, reward=400.u256, proofProbability=3.u256, - expiry=expiry, + expiry=5*60, collateral=200.u256, nodes = 5, tolerance = 2).get @@ -228,14 +223,13 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false: discard client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get # client 1 requests storage - let expiry = (await ethProvider.currentTime()) + 5*60 let cid = client1.upload(data).get let id = client1.requestStorage( cid, duration=duration, reward=reward, proofProbability=3.u256, - expiry=expiry, + expiry=5*60, collateral=200.u256, nodes = nodes, tolerance = 2).get @@ -253,12 +247,11 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false: test "request storage fails if nodes and tolerance aren't correct": let cid = client1.upload("some file contents").get - let expiry = (await ethProvider.currentTime()) + 30 let responseBefore = client1.requestStorageRaw(cid, duration=100.u256, reward=2.u256, proofProbability=3.u256, - expiry=expiry, + expiry=30, collateral=200.u256, nodes=1, tolerance=1) @@ -267,20 +260,15 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false: check responseBefore.body == "Tolerance cannot be greater or equal than nodes (nodes - tolerance)" test "node requires expiry and its value to be in future": - let currentTime = await ethProvider.currentTime() let cid = client1.upload("some file contents").get let responseMissing = client1.requestStorageRaw(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256) check responseMissing.status == "400 Bad Request" check responseMissing.body == "Expiry required" - let responsePast = client1.requestStorageRaw(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256, expiry=currentTime-10) - check responsePast.status == "400 Bad Request" - check "Expiry needs to be in future" in responsePast.body - - let responseBefore = client1.requestStorageRaw(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256, expiry=currentTime+10) + let responseBefore = client1.requestStorageRaw(cid, duration=10.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256, expiry=10) check responseBefore.status == "400 Bad Request" - check "Expiry has to be before the request's end (now + duration)" in responseBefore.body + check "Expiry needs value bigger then zero and smaller then the request's duration" in responseBefore.body test "updating non-existing availability": let nonExistingResponse = client1.patchAvailabilityRaw(AvailabilityId.example, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some) @@ -317,14 +305,13 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false: let availability = client1.postAvailability(totalSize=originalSize, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get # Lets create storage request that will utilize some of the availability's space - let expiry = (await ethProvider.currentTime()) + 5*60 let cid = client2.upload(data).get let id = client2.requestStorage( cid, duration=10*60.u256, reward=400.u256, proofProbability=3.u256, - expiry=expiry, + expiry=5*60, collateral=200.u256, nodes = 5, tolerance = 2).get diff --git a/vendor/codex-contracts-eth b/vendor/codex-contracts-eth index a58427e4..57e8cd50 160000 --- a/vendor/codex-contracts-eth +++ b/vendor/codex-contracts-eth @@ -1 +1 @@ -Subproject commit a58427e496088b904aed070e92f1c479c45fd852 +Subproject commit 57e8cd5013325f05e16833a5320b575d32a403f3 From 9379c7c66260ed98cf5f2493b97e18a80061f7c4 Mon Sep 17 00:00:00 2001 From: Ben Bierens <39762930+benbierens@users.noreply.github.com> Date: Thu, 9 May 2024 10:03:35 +0200 Subject: [PATCH 5/6] Advertise manifests and trees (#797) * wip update advertise loop to announce only manifests and trees * fixes e2e tests * fixes tests for discovery engine --------- Co-authored-by: Dmitriy Ryajov --- codex/blockexchange/engine/discovery.nim | 26 +++- .../blockexchange/discovery/testdiscovery.nim | 126 +++++++++--------- .../discovery/testdiscoveryengine.nim | 45 +++---- 3 files changed, 102 insertions(+), 95 deletions(-) diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index 5434a5e6..eb68bce8 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -11,6 +11,7 @@ import std/sequtils import pkg/chronos import pkg/libp2p/cid +import pkg/libp2p/multicodec import pkg/metrics import pkg/questionable import pkg/questionable/results @@ -25,6 +26,7 @@ import ../../utils import ../../discovery import ../../stores/blockstore import ../../logutils +import ../../manifest logScope: topics = "codex discoveryengine" @@ -76,14 +78,32 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} = await sleepAsync(b.discoveryLoopSleep) +proc advertiseBlock(b: DiscoveryEngine, cid: Cid) {.async.} = + without isM =? cid.isManifest, err: + warn "Unable to determine if cid is manifest" + return + + if isM: + without blk =? await b.localStore.getBlock(cid), err: + error "Error retrieving manifest block", cid, err = err.msg + return + + without manifest =? Manifest.decode(blk), err: + error "Unable to decode as manifest", err = err.msg + return + + # announce manifest cid and tree cid + await b.advertiseQueue.put(cid) + await b.advertiseQueue.put(manifest.treeCid) + proc advertiseQueueLoop(b: DiscoveryEngine) {.async.} = while b.discEngineRunning: if cids =? await b.localStore.listBlocks(blockType = b.advertiseType): trace "Begin iterating blocks..." for c in cids: if cid =? await c: - await b.advertiseQueue.put(cid) - await sleepAsync(50.millis) + b.advertiseBlock(cid) + await sleepAsync(100.millis) trace "Iterating blocks finished." await sleepAsync(b.advertiseLoopSleep) @@ -248,7 +268,7 @@ proc new*( discoveryLoopSleep = DefaultDiscoveryLoopSleep, advertiseLoopSleep = DefaultAdvertiseLoopSleep, minPeersPerBlock = DefaultMinPeersPerBlock, - advertiseType = BlockType.Both + advertiseType = BlockType.Manifest ): DiscoveryEngine = ## Create a discovery engine instance for advertising services ## diff --git a/tests/codex/blockexchange/discovery/testdiscovery.nim b/tests/codex/blockexchange/discovery/testdiscovery.nim index 28d315f4..9ba29e5d 100644 --- a/tests/codex/blockexchange/discovery/testdiscovery.nim +++ b/tests/codex/blockexchange/discovery/testdiscovery.nim @@ -50,7 +50,7 @@ asyncchecksuite "Block Advertising and Discovery": blockDiscovery = MockDiscovery.new() wallet = WalletRef.example network = BlockExcNetwork.new(switch) - localStore = CacheStore.new(blocks.mapIt( it )) + localStore = CacheStore.new(blocks.mapIt(it)) peerStore = PeerCtxStore.new() pendingBlocks = PendingBlocksManager.new() @@ -92,57 +92,40 @@ asyncchecksuite "Block Advertising and Discovery": blockDiscovery.findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async.} = - await engine.resolveBlocks(blocks.filterIt( it.cid == cid )) + await engine.resolveBlocks(blocks.filterIt(it.cid == cid)) await allFuturesThrowing( allFinished(pendingBlocks)) await engine.stop() - test "Should advertise both manifests and blocks": + test "Should advertise both manifests and trees": let + cids = @[manifest.cid.tryGet, manifest.treeCid] advertised = initTable.collect: - for b in (blocks & manifestBlock): {b.cid: newFuture[void]()} + for cid in cids: {cid: newFuture[void]()} blockDiscovery .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} = if cid in advertised and not advertised[cid].finished(): advertised[cid].complete() - discovery.advertiseType = BlockType.Both - await engine.start() # fire up advertise loop + await engine.start() await allFuturesThrowing( allFinished(toSeq(advertised.values))) await engine.stop() - test "Should advertise local manifests": + test "Should not advertise local blocks": let - advertised = newFuture[Cid]() + blockCids = blocks.mapIt(it.cid) blockDiscovery .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} = - check manifestBlock.cid == cid - advertised.complete(cid) + check: + cid notin blockCids - discovery.advertiseType = BlockType.Manifest - await engine.start() # fire up advertise loop - check (await advertised.wait(10.millis)) == manifestBlock.cid - await engine.stop() - - test "Should advertise local blocks": - let - advertised = initTable.collect: - for b in blocks: {b.cid: newFuture[void]()} - - blockDiscovery - .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} = - if cid in advertised and not advertised[cid].finished(): - advertised[cid].complete() - - discovery.advertiseType = BlockType.Block - await engine.start() # fire up advertise loop - await allFuturesThrowing( - allFinished(toSeq(advertised.values))) + await engine.start() + await sleepAsync(3.seconds) await engine.stop() test "Should not launch discovery if remote peer has block": @@ -165,7 +148,7 @@ asyncchecksuite "Block Advertising and Discovery": proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] = check false - await engine.start() # fire up discovery loop + await engine.start() engine.pendingBlocks.resolve(blocks.mapIt(BlockDelivery(blk: it, address: it.address))) await allFuturesThrowing( @@ -173,23 +156,33 @@ asyncchecksuite "Block Advertising and Discovery": await engine.stop() -asyncchecksuite "E2E - Multiple Nodes Discovery": - let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) +proc asBlock(m: Manifest): bt.Block = + let mdata = m.encode().tryGet() + bt.Block.new(data = mdata, codec = ManifestCodec).tryGet() +asyncchecksuite "E2E - Multiple Nodes Discovery": var switch: seq[Switch] blockexc: seq[NetworkStore] - blocks: seq[bt.Block] + manifests: seq[Manifest] + mBlocks: seq[bt.Block] + trees: seq[CodexTree] setup: - while true: - let chunk = await chunker.getBytes() - if chunk.len <= 0: - break - - blocks.add(bt.Block.new(chunk).tryGet()) - for _ in 0..<4: + let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) + var blocks = newSeq[bt.Block]() + while true: + let chunk = await chunker.getBytes() + if chunk.len <= 0: + break + + blocks.add(bt.Block.new(chunk).tryGet()) + let (manifest, tree) = makeManifestAndTree(blocks).tryGet() + manifests.add(manifest) + mBlocks.add(manifest.asBlock()) + trees.add(tree) + let s = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr}) blockDiscovery = MockDiscovery.new() @@ -223,9 +216,12 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": teardown: switch = @[] blockexc = @[] + manifests = @[] + mBlocks = @[] + trees = @[] test "E2E - Should advertise and discover blocks": - # Distribute the blocks amongst 1..3 + # Distribute the manifests and trees amongst 1..3 # Ask 0 to download everything without connecting him beforehand var advertised: Table[Cid, SignedPeerRecord] @@ -242,14 +238,14 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = advertised[cid] = switch[3].peerInfo.signedPeerRecord - discard blocks[0..5].mapIt(blockexc[1].engine.pendingBlocks.getWantHandle(it.address)) - await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[0..5].mapIt(BlockDelivery(blk: it, address: it.address))) + discard blockexc[1].engine.pendingBlocks.getWantHandle(mBlocks[0].cid) + await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[0], address: BlockAddress(leaf: false, cid: mBlocks[0].cid))]) - discard blocks[4..10].mapIt(blockexc[2].engine.pendingBlocks.getWantHandle(it.address)) - await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[4..10].mapIt(BlockDelivery(blk: it, address: it.address))) + discard blockexc[2].engine.pendingBlocks.getWantHandle(mBlocks[1].cid) + await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[1], address: BlockAddress(leaf: false, cid: mBlocks[1].cid))]) - discard blocks[10..15].mapIt(blockexc[3].engine.pendingBlocks.getWantHandle(it.address)) - await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[10..15].mapIt(BlockDelivery(blk: it, address: it.address))) + discard blockexc[3].engine.pendingBlocks.getWantHandle(mBlocks[2].cid) + await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[2], address: BlockAddress(leaf: false, cid: mBlocks[2].cid))]) MockDiscovery(blockexc[0].engine.discovery.discovery) .findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid): @@ -258,22 +254,22 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": result.add(advertised[cid]) let futs = collect(newSeq): - for b in blocks: - blockexc[0].engine.requestBlock(b.cid) + for m in mBlocks[0..2]: + blockexc[0].engine.requestBlock(m.cid) await allFuturesThrowing( - switch.mapIt( it.start() ) & - blockexc.mapIt( it.engine.start() )).wait(10.seconds) + switch.mapIt(it.start()) & + blockexc.mapIt(it.engine.start())).wait(10.seconds) await allFutures(futs).wait(10.seconds) await allFuturesThrowing( - blockexc.mapIt( it.engine.stop() ) & - switch.mapIt( it.stop() )).wait(10.seconds) + blockexc.mapIt(it.engine.stop()) & + switch.mapIt(it.stop())).wait(10.seconds) test "E2E - Should advertise and discover blocks with peers already connected": # Distribute the blocks amongst 1..3 - # Ask 0 to download everything without connecting him beforehand + # Ask 0 to download everything *WITH* connecting him beforehand var advertised: Table[Cid, SignedPeerRecord] @@ -289,14 +285,14 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = advertised[cid] = switch[3].peerInfo.signedPeerRecord - discard blocks[0..5].mapIt(blockexc[1].engine.pendingBlocks.getWantHandle(it.address)) - await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[0..5].mapIt(BlockDelivery(blk: it, address: it.address))) + discard blockexc[1].engine.pendingBlocks.getWantHandle(mBlocks[0].cid) + await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[0], address: BlockAddress(leaf: false, cid: mBlocks[0].cid))]) - discard blocks[4..10].mapIt(blockexc[2].engine.pendingBlocks.getWantHandle(it.address)) - await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[4..10].mapIt(BlockDelivery(blk: it, address: it.address))) + discard blockexc[2].engine.pendingBlocks.getWantHandle(mBlocks[1].cid) + await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[1], address: BlockAddress(leaf: false, cid: mBlocks[1].cid))]) - discard blocks[10..15].mapIt(blockexc[3].engine.pendingBlocks.getWantHandle(it.address)) - await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[10..15].mapIt(BlockDelivery(blk: it, address: it.address))) + discard blockexc[3].engine.pendingBlocks.getWantHandle(mBlocks[2].cid) + await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[2], address: BlockAddress(leaf: false, cid: mBlocks[2].cid))]) MockDiscovery(blockexc[0].engine.discovery.discovery) .findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid): @@ -305,14 +301,14 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": return @[advertised[cid]] let - futs = blocks.mapIt( blockexc[0].engine.requestBlock( it.cid ) ) + futs = mBlocks[0..2].mapIt(blockexc[0].engine.requestBlock(it.cid)) await allFuturesThrowing( - switch.mapIt( it.start() ) & - blockexc.mapIt( it.engine.start() )).wait(10.seconds) + switch.mapIt(it.start()) & + blockexc.mapIt(it.engine.start())).wait(10.seconds) await allFutures(futs).wait(10.seconds) await allFuturesThrowing( - blockexc.mapIt( it.engine.stop() ) & - switch.mapIt( it.stop() )).wait(10.seconds) + blockexc.mapIt(it.engine.stop()) & + switch.mapIt(it.stop())).wait(10.seconds) diff --git a/tests/codex/blockexchange/discovery/testdiscoveryengine.nim b/tests/codex/blockexchange/discovery/testdiscoveryengine.nim index 3984fe6a..ff6b60d2 100644 --- a/tests/codex/blockexchange/discovery/testdiscoveryengine.nim +++ b/tests/codex/blockexchange/discovery/testdiscoveryengine.nim @@ -10,17 +10,26 @@ import pkg/codex/blockexchange import pkg/codex/chunker import pkg/codex/blocktype as bt import pkg/codex/blockexchange/engine +import pkg/codex/manifest +import pkg/codex/merkletree import ../../../asynctest import ../../helpers import ../../helpers/mockdiscovery import ../../examples +proc asBlock(m: Manifest): bt.Block = + let mdata = m.encode().tryGet() + bt.Block.new(data = mdata, codec = ManifestCodec).tryGet() + asyncchecksuite "Test Discovery Engine": let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) var blocks: seq[bt.Block] + manifest: Manifest + tree: CodexTree + manifestBlock: bt.Block switch: Switch peerStore: PeerCtxStore blockDiscovery: MockDiscovery @@ -35,6 +44,10 @@ asyncchecksuite "Test Discovery Engine": blocks.add(bt.Block.new(chunk).tryGet()) + (manifest, tree) = makeManifestAndTree(blocks).tryGet() + manifestBlock = manifest.asBlock() + blocks.add(manifestBlock) + switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr}) network = BlockExcNetwork.new(switch) peerStore = PeerCtxStore.new() @@ -51,11 +64,11 @@ asyncchecksuite "Test Discovery Engine": blockDiscovery, pendingBlocks, discoveryLoopSleep = 100.millis) - wants = blocks.mapIt( pendingBlocks.getWantHandle(it.cid) ) + wants = blocks.mapIt(pendingBlocks.getWantHandle(it.cid) ) blockDiscovery.findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async, gcsafe.} = - pendingBlocks.resolve(blocks.filterIt( it.cid == cid).mapIt(BlockDelivery(blk: it, address: it.address))) + pendingBlocks.resolve(blocks.filterIt(it.cid == cid).mapIt(BlockDelivery(blk: it, address: it.address))) await discoveryEngine.start() await allFuturesThrowing(allFinished(wants)).wait(1.seconds) @@ -63,7 +76,7 @@ asyncchecksuite "Test Discovery Engine": test "Should Advertise Haves": var - localStore = CacheStore.new(blocks.mapIt( it )) + localStore = CacheStore.new(blocks.mapIt(it)) discoveryEngine = DiscoveryEngine.new( localStore, peerStore, @@ -72,8 +85,8 @@ asyncchecksuite "Test Discovery Engine": pendingBlocks, discoveryLoopSleep = 100.millis) haves = collect(initTable): - for b in blocks: - { b.cid: newFuture[void]() } + for cid in @[manifestBlock.cid, manifest.treeCid]: + { cid: newFuture[void]() } blockDiscovery.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} = @@ -108,28 +121,6 @@ asyncchecksuite "Test Discovery Engine": await want.wait(1.seconds) await discoveryEngine.stop() - test "Should queue advertise request": - var - localStore = CacheStore.new(@[blocks[0]]) - discoveryEngine = DiscoveryEngine.new( - localStore, - peerStore, - network, - blockDiscovery, - pendingBlocks, - discoveryLoopSleep = 100.millis) - have = newFuture[void]() - - blockDiscovery.publishBlockProvideHandler = - proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} = - check cid == blocks[0].cid - if not have.finished: - have.complete() - - await discoveryEngine.start() - await have.wait(1.seconds) - await discoveryEngine.stop() - test "Should not request more than minPeersPerBlock": var localStore = CacheStore.new() From e208d0b13ae9742fbfd1b9c026d0a6a2df984d63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Uhl=C3=AD=C5=99?= Date: Tue, 14 May 2024 17:40:29 +0200 Subject: [PATCH 6/6] feat: one confirmation for all transactions (#802) * fix: make sure requestStorage is mined * fix: correct place to plug confirm * test: fixing contracts tests * feat: one confirmation for all transactions * fix: don't wait for confirmations only mined block --- codex/contracts/market.nim | 16 ++++++++-------- codex/contracts/marketplace.nim | 12 ++++++------ tests/contracts/testContracts.nim | 12 ++++++------ tests/contracts/testMarket.nim | 2 +- 4 files changed, 21 insertions(+), 21 deletions(-) diff --git a/codex/contracts/market.nim b/codex/contracts/market.nim index 4e228670..c874d5db 100644 --- a/codex/contracts/market.nim +++ b/codex/contracts/market.nim @@ -46,7 +46,7 @@ proc approveFunds(market: OnChainMarket, amount: UInt256) {.async.} = convertEthersError: let tokenAddress = await market.contract.token() let token = Erc20Token.new(tokenAddress, market.signer) - discard await token.increaseAllowance(market.contract.address(), amount).confirm(1) + discard await token.increaseAllowance(market.contract.address(), amount).confirm(0) method getZkeyHash*(market: OnChainMarket): Future[?string] {.async.} = let config = await market.contract.config() @@ -92,7 +92,7 @@ method requestStorage(market: OnChainMarket, request: StorageRequest){.async.} = convertEthersError: debug "Requesting storage" await market.approveFunds(request.price()) - await market.contract.requestStorage(request) + discard await market.contract.requestStorage(request).confirm(0) method getRequest(market: OnChainMarket, id: RequestId): Future[?StorageRequest] {.async.} = @@ -159,16 +159,16 @@ method fillSlot(market: OnChainMarket, collateral: UInt256) {.async.} = convertEthersError: await market.approveFunds(collateral) - await market.contract.fillSlot(requestId, slotIndex, proof) + discard await market.contract.fillSlot(requestId, slotIndex, proof).confirm(0) method freeSlot*(market: OnChainMarket, slotId: SlotId) {.async.} = convertEthersError: - await market.contract.freeSlot(slotId) + discard await market.contract.freeSlot(slotId).confirm(0) method withdrawFunds(market: OnChainMarket, requestId: RequestId) {.async.} = convertEthersError: - await market.contract.withdrawFunds(requestId) + discard await market.contract.withdrawFunds(requestId).confirm(0) method isProofRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async.} = @@ -201,13 +201,13 @@ method submitProof*(market: OnChainMarket, id: SlotId, proof: Groth16Proof) {.async.} = convertEthersError: - await market.contract.submitProof(id, proof) + discard await market.contract.submitProof(id, proof).confirm(0) method markProofAsMissing*(market: OnChainMarket, id: SlotId, period: Period) {.async.} = convertEthersError: - await market.contract.markProofAsMissing(id, period) + discard await market.contract.markProofAsMissing(id, period).confirm(0) method canProofBeMarkedAsMissing*( market: OnChainMarket, @@ -218,7 +218,7 @@ method canProofBeMarkedAsMissing*( let contractWithoutSigner = market.contract.connect(provider) let overrides = CallOverrides(blockTag: some BlockTag.pending) try: - await contractWithoutSigner.markProofAsMissing(id, period, overrides) + discard await contractWithoutSigner.markProofAsMissing(id, period, overrides) return true except EthersError as e: trace "Proof cannot be marked as missing", msg = e.msg diff --git a/codex/contracts/marketplace.nim b/codex/contracts/marketplace.nim index 2326ed3f..301f8c25 100644 --- a/codex/contracts/marketplace.nim +++ b/codex/contracts/marketplace.nim @@ -42,10 +42,10 @@ proc slashMisses*(marketplace: Marketplace): UInt256 {.contract, view.} proc slashPercentage*(marketplace: Marketplace): UInt256 {.contract, view.} proc minCollateralThreshold*(marketplace: Marketplace): UInt256 {.contract, view.} -proc requestStorage*(marketplace: Marketplace, request: StorageRequest) {.contract.} -proc fillSlot*(marketplace: Marketplace, requestId: RequestId, slotIndex: UInt256, proof: Groth16Proof) {.contract.} -proc withdrawFunds*(marketplace: Marketplace, requestId: RequestId) {.contract.} -proc freeSlot*(marketplace: Marketplace, id: SlotId) {.contract.} +proc requestStorage*(marketplace: Marketplace, request: StorageRequest): ?TransactionResponse {.contract.} +proc fillSlot*(marketplace: Marketplace, requestId: RequestId, slotIndex: UInt256, proof: Groth16Proof): ?TransactionResponse {.contract.} +proc withdrawFunds*(marketplace: Marketplace, requestId: RequestId): ?TransactionResponse {.contract.} +proc freeSlot*(marketplace: Marketplace, id: SlotId): ?TransactionResponse {.contract.} proc getRequest*(marketplace: Marketplace, id: RequestId): StorageRequest {.contract, view.} proc getHost*(marketplace: Marketplace, id: SlotId): Address {.contract, view.} proc getActiveSlot*(marketplace: Marketplace, id: SlotId): Slot {.contract, view.} @@ -66,5 +66,5 @@ proc willProofBeRequired*(marketplace: Marketplace, id: SlotId): bool {.contract proc getChallenge*(marketplace: Marketplace, id: SlotId): array[32, byte] {.contract, view.} proc getPointer*(marketplace: Marketplace, id: SlotId): uint8 {.contract, view.} -proc submitProof*(marketplace: Marketplace, id: SlotId, proof: Groth16Proof) {.contract.} -proc markProofAsMissing*(marketplace: Marketplace, id: SlotId, period: UInt256) {.contract.} +proc submitProof*(marketplace: Marketplace, id: SlotId, proof: Groth16Proof): ?TransactionResponse {.contract.} +proc markProofAsMissing*(marketplace: Marketplace, id: SlotId, period: UInt256): ?TransactionResponse {.contract.} diff --git a/tests/contracts/testContracts.nim b/tests/contracts/testContracts.nim index 019de7aa..4432160d 100644 --- a/tests/contracts/testContracts.nim +++ b/tests/contracts/testContracts.nim @@ -39,10 +39,10 @@ ethersuite "Marketplace contracts": switchAccount(client) discard await token.approve(marketplace.address, request.price) - await marketplace.requestStorage(request) + discard await marketplace.requestStorage(request) switchAccount(host) discard await token.approve(marketplace.address, request.ask.collateral) - await marketplace.fillSlot(request.id, 0.u256, proof) + discard await marketplace.fillSlot(request.id, 0.u256, proof) slotId = request.slotId(0.u256) proc waitUntilProofRequired(slotId: SlotId) {.async.} = @@ -57,12 +57,12 @@ ethersuite "Marketplace contracts": proc startContract() {.async.} = for slotIndex in 1..