From 1cac3e2a117eb9ce89c6ba4f6c324e44371da5fa Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Thu, 13 Mar 2025 08:33:15 -0600 Subject: [PATCH] Fix/rework async exceptions (#1130) * cleanup imports and logs * add BlockHandle type * revert deps * refactor: async error handling and future tracking improvements - Update async procedures to use explicit raises annotation - Modify TrackedFutures to handle futures with no raised exceptions - Replace `asyncSpawn` with explicit future tracking - Update test suites to use `unittest2` - Standardize error handling across network and async components - Remove deprecated error handling patterns This commit introduces a more robust approach to async error handling and future management, improving type safety and reducing potential runtime errors. * bump nim-serde * remove asyncSpawn * rework background downloads and prefetch * imporove logging * refactor: enhance async procedures with error handling and raise annotations * misc cleanup * misc * refactor: implement allFinishedFailed to aggregate future results with success and failure tracking * refactor: update error handling in reader procedures to raise ChunkerError and CancelledError * refactor: improve error handling in wantListHandler and accountHandler procedures * refactor: simplify LPStreamReadError creation by consolidating parameters * refactor: enhance error handling in AsyncStreamWrapper to catch unexpected errors * refactor: enhance error handling in advertiser and discovery loops to improve resilience * misc * refactor: improve code structure and readability * remove cancellation from addSlotToQueue * refactor: add assertion for unexpected errors in local store checks * refactor: prevent tracking of finished futures and improve test assertions * refactor: improve error handling in local store checks * remove usage of msgDetail * feat: add initial implementation of discovery engine and related components * refactor: improve task scheduling logic by removing unnecessary break statement * break after scheduling a task * make taskHandler cancelable * refactor: update async handlers to raise CancelledError * refactor(advertiser): streamline error handling and improve task flow in advertise loops * fix: correct spelling of "divisible" in error messages and comments * refactor(discovery): simplify discovery task loop and improve error handling * refactor(engine): filter peers before processing in cancelBlocks procedure --- codex/blockexchange/engine/advertiser.nim | 88 ++--- codex/blockexchange/engine/discovery.nim | 63 ++-- codex/blockexchange/engine/engine.nim | 320 ++++++++++-------- codex/blockexchange/engine/payments.nim | 5 +- codex/blockexchange/network/network.nim | 98 +++--- codex/blockexchange/network/networkpeer.nim | 68 ++-- codex/blockexchange/peers/peerctxstore.nim | 7 +- codex/blockexchange/protobuf/payments.nim | 6 +- codex/blockexchange/protobuf/presence.nim | 6 +- codex/chunker.nim | 11 +- codex/codex.nim | 18 +- codex/contracts/clock.nim | 14 +- codex/discovery.nim | 138 +++++--- codex/erasure/erasure.nim | 4 +- codex/errors.nim | 45 +-- codex/node.nim | 91 ++--- codex/rest/api.nim | 11 +- codex/sales.nim | 65 ++-- codex/sales/salesagent.nim | 1 - codex/sales/slotqueue.nim | 22 +- codex/slots/builder/builder.nim | 10 +- codex/slots/proofs/prover.nim | 2 + codex/streams/asyncstreamwrapper.nim | 8 +- codex/streams/storestream.nim | 6 +- codex/utils/asyncstatemachine.nim | 2 - codex/utils/timer.nim | 1 - codex/utils/trackedfutures.nim | 26 +- codex/validation.nim | 1 - tests/asynctest.nim | 4 +- .../blockexchange/discovery/testdiscovery.nim | 68 ++-- .../discovery/testdiscoveryengine.nim | 10 +- .../blockexchange/engine/testadvertiser.nim | 2 +- .../codex/blockexchange/engine/testengine.nim | 61 ++-- .../blockexchange/engine/testpayments.nim | 4 +- .../blockexchange/protobuf/testpayments.nim | 4 +- .../blockexchange/protobuf/testpresence.nim | 2 +- tests/codex/blockexchange/testnetwork.nim | 28 +- .../codex/blockexchange/testpeerctxstore.nim | 6 +- .../codex/blockexchange/testpendingblocks.nim | 2 +- tests/codex/helpers/mockchunker.nim | 2 +- tests/codex/helpers/mockdiscovery.nim | 41 ++- tests/codex/helpers/randomchunker.nim | 2 +- tests/codex/merkletree/generictreetests.nim | 2 +- tests/codex/merkletree/testcodexcoders.nim | 4 +- tests/codex/merkletree/testcodextree.nim | 2 +- tests/codex/merkletree/testmerkledigest.nim | 2 +- tests/codex/merkletree/testposeidon2tree.nim | 2 +- tests/codex/sales/states/testdownloading.nim | 4 +- tests/codex/sales/states/testfilled.nim | 2 +- tests/codex/sales/states/testfilling.nim | 4 +- tests/codex/sales/states/testunknown.nim | 2 +- tests/codex/sales/testsales.nim | 29 +- tests/codex/sales/testslotqueue.nim | 42 ++- tests/codex/slots/testslotbuilder.nim | 4 +- tests/codex/stores/repostore/testcoders.nim | 4 +- tests/codex/stores/testcachestore.nim | 2 +- tests/codex/stores/testkeyutils.nim | 2 +- tests/codex/stores/testmaintenance.nim | 2 +- tests/codex/stores/testrepostore.nim | 2 +- tests/codex/testasyncheapqueue.nim | 2 +- tests/codex/testchunking.nim | 13 +- tests/codex/testclock.nim | 4 +- tests/codex/testlogutils.nim | 3 +- tests/codex/testmanifest.nim | 2 +- tests/codex/testpurchasing.nim | 2 +- tests/codex/testsystemclock.nim | 6 +- tests/codex/utils/testiter.nim | 2 +- tests/codex/utils/testkeyutils.nim | 8 +- tests/codex/utils/testoptions.nim | 9 +- tests/codex/utils/testtrackedfutures.nim | 54 ++- tests/codex/utils/testutils.nim | 2 +- tests/helpers.nim | 32 ++ tests/helpers/trackers.nim | 2 +- vendor/nim-serde | 2 +- 74 files changed, 937 insertions(+), 690 deletions(-) diff --git a/codex/blockexchange/engine/advertiser.nim b/codex/blockexchange/engine/advertiser.nim index f5f28bc1..d094c454 100644 --- a/codex/blockexchange/engine/advertiser.nim +++ b/codex/blockexchange/engine/advertiser.nim @@ -41,80 +41,86 @@ type Advertiser* = ref object of RootObj advertiserRunning*: bool # Indicates if discovery is running concurrentAdvReqs: int # Concurrent advertise requests - advertiseLocalStoreLoop*: Future[void] # Advertise loop task handle + advertiseLocalStoreLoop*: Future[void].Raising([]) # Advertise loop task handle advertiseQueue*: AsyncQueue[Cid] # Advertise queue trackedFutures*: TrackedFutures # Advertise tasks futures advertiseLocalStoreLoopSleep: Duration # Advertise loop sleep inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests -proc addCidToQueue(b: Advertiser, cid: Cid) {.async.} = +proc addCidToQueue(b: Advertiser, cid: Cid) {.async: (raises: [CancelledError]).} = if cid notin b.advertiseQueue: await b.advertiseQueue.put(cid) + trace "Advertising", cid -proc advertiseBlock(b: Advertiser, cid: Cid) {.async.} = +proc advertiseBlock(b: Advertiser, cid: Cid) {.async: (raises: [CancelledError]).} = without isM =? cid.isManifest, err: warn "Unable to determine if cid is manifest" return - if isM: - without blk =? await b.localStore.getBlock(cid), err: - error "Error retrieving manifest block", cid, err = err.msg - return + try: + if isM: + without blk =? await b.localStore.getBlock(cid), err: + error "Error retrieving manifest block", cid, err = err.msg + return - without manifest =? Manifest.decode(blk), err: - error "Unable to decode as manifest", err = err.msg - return + without manifest =? Manifest.decode(blk), err: + error "Unable to decode as manifest", err = err.msg + return - # announce manifest cid and tree cid - await b.addCidToQueue(cid) - await b.addCidToQueue(manifest.treeCid) + # announce manifest cid and tree cid + await b.addCidToQueue(cid) + await b.addCidToQueue(manifest.treeCid) + except CancelledError as exc: + trace "Cancelled advertise block", cid + raise exc + except CatchableError as e: + error "failed to advertise block", cid, error = e.msgDetail proc advertiseLocalStoreLoop(b: Advertiser) {.async: (raises: []).} = - while b.advertiserRunning: - try: - if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest): - trace "Advertiser begins iterating blocks..." - for c in cids: - if cid =? await c: - await b.advertiseBlock(cid) - trace "Advertiser iterating blocks finished." + try: + while b.advertiserRunning: + try: + if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest): + trace "Advertiser begins iterating blocks..." + for c in cids: + if cid =? await c: + await b.advertiseBlock(cid) + trace "Advertiser iterating blocks finished." + except CatchableError as e: + error "Error in advertise local store loop", error = e.msgDetail + raiseAssert("Unexpected exception in advertiseLocalStoreLoop") await sleepAsync(b.advertiseLocalStoreLoopSleep) - except CancelledError: - break # do not propagate as advertiseLocalStoreLoop was asyncSpawned - except CatchableError as e: - error "failed to advertise blocks in local store", error = e.msgDetail + except CancelledError: + warn "Cancelled advertise local store loop" info "Exiting advertise task loop" proc processQueueLoop(b: Advertiser) {.async: (raises: []).} = - while b.advertiserRunning: - try: + try: + while b.advertiserRunning: let cid = await b.advertiseQueue.get() if cid in b.inFlightAdvReqs: continue - try: - let request = b.discovery.provide(cid) + let request = b.discovery.provide(cid) + b.inFlightAdvReqs[cid] = request + codex_inflight_advertise.set(b.inFlightAdvReqs.len.int64) - b.inFlightAdvReqs[cid] = request - codex_inflight_advertise.set(b.inFlightAdvReqs.len.int64) - await request - finally: + defer: b.inFlightAdvReqs.del(cid) codex_inflight_advertise.set(b.inFlightAdvReqs.len.int64) - except CancelledError: - trace "Advertise task cancelled" - return - except CatchableError as exc: - warn "Exception in advertise task runner", exc = exc.msg + + await request + except CancelledError: + warn "Cancelled advertise task runner" info "Exiting advertise task runner" -proc start*(b: Advertiser) {.async.} = +proc start*(b: Advertiser) {.async: (raises: []).} = ## Start the advertiser ## @@ -134,13 +140,11 @@ proc start*(b: Advertiser) {.async.} = for i in 0 ..< b.concurrentAdvReqs: let fut = b.processQueueLoop() b.trackedFutures.track(fut) - asyncSpawn fut b.advertiseLocalStoreLoop = advertiseLocalStoreLoop(b) b.trackedFutures.track(b.advertiseLocalStoreLoop) - asyncSpawn b.advertiseLocalStoreLoop -proc stop*(b: Advertiser) {.async.} = +proc stop*(b: Advertiser) {.async: (raises: []).} = ## Stop the advertiser ## diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index c664f212..b32b8555 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -48,7 +48,7 @@ type DiscoveryEngine* = ref object of RootObj pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved discEngineRunning*: bool # Indicates if discovery is running concurrentDiscReqs: int # Concurrent discovery requests - discoveryLoop*: Future[void] # Discovery loop task handle + discoveryLoop*: Future[void].Raising([]) # Discovery loop task handle discoveryQueue*: AsyncQueue[Cid] # Discovery queue trackedFutures*: TrackedFutures # Tracked Discovery tasks futures minPeersPerBlock*: int # Max number of peers with block @@ -57,30 +57,21 @@ type DiscoveryEngine* = ref object of RootObj # Inflight discovery requests proc discoveryQueueLoop(b: DiscoveryEngine) {.async: (raises: []).} = - while b.discEngineRunning: - for cid in toSeq(b.pendingBlocks.wantListBlockCids): - try: + try: + while b.discEngineRunning: + for cid in toSeq(b.pendingBlocks.wantListBlockCids): await b.discoveryQueue.put(cid) - except CancelledError: - trace "Discovery loop cancelled" - return - except CatchableError as exc: - warn "Exception in discovery loop", exc = exc.msg - try: - logScope: - sleep = b.discoveryLoopSleep - wanted = b.pendingBlocks.len await sleepAsync(b.discoveryLoopSleep) - except CancelledError: - discard # do not propagate as discoveryQueueLoop was asyncSpawned + except CancelledError: + trace "Discovery loop cancelled" proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} = ## Run discovery tasks ## - while b.discEngineRunning: - try: + try: + while b.discEngineRunning: let cid = await b.discoveryQueue.get() if cid in b.inFlightDiscReqs: @@ -90,35 +81,28 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} = let haves = b.peers.peersHave(cid) if haves.len < b.minPeersPerBlock: - try: - let request = b.discovery.find(cid).wait(DefaultDiscoveryTimeout) + let request = b.discovery.find(cid) + b.inFlightDiscReqs[cid] = request + codex_inflight_discovery.set(b.inFlightDiscReqs.len.int64) - b.inFlightDiscReqs[cid] = request + defer: + b.inFlightDiscReqs.del(cid) codex_inflight_discovery.set(b.inFlightDiscReqs.len.int64) - let peers = await request + if (await request.withTimeout(DefaultDiscoveryTimeout)) and + peers =? (await request).catch: let dialed = await allFinished(peers.mapIt(b.network.dialPeer(it.data))) for i, f in dialed: if f.failed: await b.discovery.removeProvider(peers[i].data.peerId) - finally: - b.inFlightDiscReqs.del(cid) - codex_inflight_discovery.set(b.inFlightDiscReqs.len.int64) - except CancelledError: - trace "Discovery task cancelled" - return - except CatchableError as exc: - warn "Exception in discovery task runner", exc = exc.msg - except Exception as e: - # Raised by b.discovery.removeProvider somehow... - # This should not be catchable, and we should never get here. Therefore, - # raise a Defect. - raiseAssert "Exception when removing provider" + except CancelledError: + trace "Discovery task cancelled" + return info "Exiting discovery task runner" -proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} = +proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) = for cid in cids: if cid notin b.discoveryQueue: try: @@ -126,11 +110,11 @@ proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} = except CatchableError as exc: warn "Exception queueing discovery request", exc = exc.msg -proc start*(b: DiscoveryEngine) {.async.} = +proc start*(b: DiscoveryEngine) {.async: (raises: []).} = ## Start the discengine task ## - trace "Discovery engine start" + trace "Discovery engine starting" if b.discEngineRunning: warn "Starting discovery engine twice" @@ -140,12 +124,13 @@ proc start*(b: DiscoveryEngine) {.async.} = for i in 0 ..< b.concurrentDiscReqs: let fut = b.discoveryTaskLoop() b.trackedFutures.track(fut) - asyncSpawn fut b.discoveryLoop = b.discoveryQueueLoop() b.trackedFutures.track(b.discoveryLoop) -proc stop*(b: DiscoveryEngine) {.async.} = + trace "Discovery engine started" + +proc stop*(b: DiscoveryEngine) {.async: (raises: []).} = ## Stop the discovery engine ## diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index dafdd520..befb8ae9 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -93,12 +93,15 @@ type price*: UInt256 # attach task scheduler to engine -proc scheduleTask(self: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe.} = - self.taskQueue.pushOrUpdateNoWait(task).isOk() +proc scheduleTask(self: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, raises: [].} = + if self.taskQueue.pushOrUpdateNoWait(task).isOk(): + trace "Task scheduled for peer", peer = task.id + else: + warn "Unable to schedule task for peer", peer = task.id proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} -proc start*(self: BlockExcEngine) {.async.} = +proc start*(self: BlockExcEngine) {.async: (raises: []).} = ## Start the blockexc task ## @@ -115,7 +118,7 @@ proc start*(self: BlockExcEngine) {.async.} = let fut = self.blockexcTaskRunner() self.trackedFutures.track(fut) -proc stop*(self: BlockExcEngine) {.async.} = +proc stop*(self: BlockExcEngine) {.async: (raises: []).} = ## Stop the blockexc blockexc ## @@ -135,7 +138,7 @@ proc stop*(self: BlockExcEngine) {.async.} = proc sendWantHave( self: BlockExcEngine, addresses: seq[BlockAddress], peers: seq[BlockExcPeerCtx] -): Future[void] {.async.} = +): Future[void] {.async: (raises: [CancelledError]).} = for p in peers: let toAsk = addresses.filterIt(it notin p.peerHave) trace "Sending wantHave request", toAsk, peer = p.id @@ -144,7 +147,7 @@ proc sendWantHave( proc sendWantBlock( self: BlockExcEngine, addresses: seq[BlockAddress], blockPeer: BlockExcPeerCtx -): Future[void] {.async.} = +): Future[void] {.async: (raises: [CancelledError]).} = trace "Sending wantBlock request to", addresses, peer = blockPeer.id await self.network.request.sendWantList( blockPeer.id, addresses, wantType = WantType.WantBlock @@ -229,7 +232,7 @@ proc requestBlock*( proc blockPresenceHandler*( self: BlockExcEngine, peer: PeerId, blocks: seq[BlockPresence] -) {.async.} = +) {.async: (raises: []).} = trace "Received block presence from peer", peer, blocks = blocks.mapIt($it) let peerCtx = self.peers.get(peer) @@ -249,20 +252,23 @@ proc blockPresenceHandler*( if dontWantCids.len > 0: peerCtx.cleanPresence(dontWantCids) - let ourWantCids = ourWantList.filter do(address: BlockAddress) -> bool: - if address in peerHave and not self.pendingBlocks.retriesExhausted(address) and - not self.pendingBlocks.isInFlight(address): - self.pendingBlocks.setInFlight(address, true) - self.pendingBlocks.decRetries(address) - true - else: - false + let ourWantCids = ourWantList.filterIt( + it in peerHave and not self.pendingBlocks.retriesExhausted(it) and + not self.pendingBlocks.isInFlight(it) + ) + + for address in ourWantCids: + self.pendingBlocks.setInFlight(address, true) + self.pendingBlocks.decRetries(address) if ourWantCids.len > 0: trace "Peer has blocks in our wantList", peer, wants = ourWantCids - await self.sendWantBlock(ourWantCids, peerCtx) + if err =? catch(await self.sendWantBlock(ourWantCids, peerCtx)).errorOption: + warn "Failed to send wantBlock to peer", peer, err = err.msg -proc scheduleTasks(self: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = +proc scheduleTasks( + self: BlockExcEngine, blocksDelivery: seq[BlockDelivery] +) {.async: (raises: [CancelledError]).} = let cids = blocksDelivery.mapIt(it.blk.cid) # schedule any new peers to provide blocks to @@ -271,15 +277,21 @@ proc scheduleTasks(self: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.a # schedule a peer if it wants at least one cid # and we have it in our local store if c in p.peerWantsCids: - if await (c in self.localStore): - if self.scheduleTask(p): - trace "Task scheduled for peer", peer = p.id - else: - warn "Unable to schedule task for peer", peer = p.id + try: + if await (c in self.localStore): + # TODO: the try/except should go away once blockstore tracks exceptions + self.scheduleTask(p) + break + except CancelledError as exc: + warn "Checking local store canceled", cid = c, err = exc.msg + return + except CatchableError as exc: + error "Error checking local store for cid", cid = c, err = exc.msg + raiseAssert "Unexpected error checking local store for cid" - break # do next peer - -proc cancelBlocks(self: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} = +proc cancelBlocks( + self: BlockExcEngine, addrs: seq[BlockAddress] +) {.async: (raises: [CancelledError]).} = ## Tells neighboring peers that we're no longer interested in a block. ## @@ -289,35 +301,43 @@ proc cancelBlocks(self: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} = trace "Sending block request cancellations to peers", addrs, peers = self.peers.peerIds - proc mapPeers(peerCtx: BlockExcPeerCtx): Future[BlockExcPeerCtx] {.async.} = - let blocks = addrs.filter do(a: BlockAddress) -> bool: - a in peerCtx.blocks + proc processPeer(peerCtx: BlockExcPeerCtx): Future[BlockExcPeerCtx] {.async.} = + await self.network.request.sendWantCancellations( + peer = peerCtx.id, addresses = addrs.filterIt(it in peerCtx) + ) - if blocks.len > 0: - trace "Sending block request cancellations to peer", peer = peerCtx.id, blocks - await self.network.request.sendWantCancellations( - peer = peerCtx.id, addresses = blocks + return peerCtx + + try: + let (succeededFuts, failedFuts) = await allFinishedFailed( + toSeq(self.peers.peers.values).filterIt(it.peerHave.anyIt(it in addrs)).map( + processPeer ) + ) + + (await allFinished(succeededFuts)).mapIt(it.read).apply do(peerCtx: BlockExcPeerCtx): peerCtx.cleanPresence(addrs) - peerCtx - let failed = (await allFinished(map(toSeq(self.peers.peers.values), mapPeers))).filterIt( - it.failed - ) - - if failed.len > 0: - warn "Failed to send block request cancellations to peers", peers = failed.len - else: - trace "Block request cancellations sent to peers", peers = self.peers.len + if failedFuts.len > 0: + warn "Failed to send block request cancellations to peers", peers = failedFuts.len + else: + trace "Block request cancellations sent to peers", peers = self.peers.len + except CancelledError as exc: + warn "Error sending block request cancellations", error = exc.msg + raise exc + except CatchableError as exc: + warn "Error sending block request cancellations", error = exc.msg proc resolveBlocks*( self: BlockExcEngine, blocksDelivery: seq[BlockDelivery] -) {.async.} = +) {.async: (raises: [CancelledError]).} = self.pendingBlocks.resolve(blocksDelivery) await self.scheduleTasks(blocksDelivery) await self.cancelBlocks(blocksDelivery.mapIt(it.address)) -proc resolveBlocks*(self: BlockExcEngine, blocks: seq[Block]) {.async.} = +proc resolveBlocks*( + self: BlockExcEngine, blocks: seq[Block] +) {.async: (raises: [CancelledError]).} = await self.resolveBlocks( blocks.mapIt( BlockDelivery(blk: it, address: BlockAddress(leaf: false, cid: it.cid)) @@ -326,7 +346,7 @@ proc resolveBlocks*(self: BlockExcEngine, blocks: seq[Block]) {.async.} = proc payForBlocks( self: BlockExcEngine, peer: BlockExcPeerCtx, blocksDelivery: seq[BlockDelivery] -) {.async.} = +) {.async: (raises: [CancelledError]).} = let sendPayment = self.network.request.sendPayment price = peer.price(blocksDelivery.mapIt(it.address)) @@ -367,7 +387,7 @@ proc validateBlockDelivery(self: BlockExcEngine, bd: BlockDelivery): ?!void = proc blocksDeliveryHandler*( self: BlockExcEngine, peer: PeerId, blocksDelivery: seq[BlockDelivery] -) {.async.} = +) {.async: (raises: []).} = trace "Received blocks from peer", peer, blocks = (blocksDelivery.mapIt(it.address)) var validatedBlocksDelivery: seq[BlockDelivery] @@ -376,41 +396,47 @@ proc blocksDeliveryHandler*( peer = peer address = bd.address - if err =? self.validateBlockDelivery(bd).errorOption: - warn "Block validation failed", msg = err.msg - continue - - if err =? (await self.localStore.putBlock(bd.blk)).errorOption: - error "Unable to store block", err = err.msg - continue - - if bd.address.leaf: - without proof =? bd.proof: - error "Proof expected for a leaf block delivery" + try: + if err =? self.validateBlockDelivery(bd).errorOption: + warn "Block validation failed", msg = err.msg continue - if err =? ( - await self.localStore.putCidAndProof( - bd.address.treeCid, bd.address.index, bd.blk.cid, proof - ) - ).errorOption: - error "Unable to store proof and cid for a block" + + if err =? (await self.localStore.putBlock(bd.blk)).errorOption: + error "Unable to store block", err = err.msg continue + if bd.address.leaf: + without proof =? bd.proof: + warn "Proof expected for a leaf block delivery" + continue + if err =? ( + await self.localStore.putCidAndProof( + bd.address.treeCid, bd.address.index, bd.blk.cid, proof + ) + ).errorOption: + warn "Unable to store proof and cid for a block" + continue + except CatchableError as exc: + warn "Error handling block delivery", error = exc.msg + continue + validatedBlocksDelivery.add(bd) - await self.resolveBlocks(validatedBlocksDelivery) codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64) let peerCtx = self.peers.get(peer) - if peerCtx != nil: - await self.payForBlocks(peerCtx, blocksDelivery) - ## shouldn't we remove them from the want-list instead of this: - peerCtx.cleanPresence(blocksDelivery.mapIt(it.address)) + if err =? catch(await self.payForBlocks(peerCtx, blocksDelivery)).errorOption: + warn "Error paying for blocks", err = err.msg + return + + if err =? catch(await self.resolveBlocks(validatedBlocksDelivery)).errorOption: + warn "Error resolving blocks", err = err.msg + return proc wantListHandler*( self: BlockExcEngine, peer: PeerId, wantList: WantList -) {.async.} = +) {.async: (raises: []).} = trace "Received want list from peer", peer, wantList = wantList.entries.len let peerCtx = self.peers.get(peer) @@ -422,68 +448,81 @@ proc wantListHandler*( presence: seq[BlockPresence] schedulePeer = false - for e in wantList.entries: - let idx = peerCtx.peerWants.findIt(it.address == e.address) + try: + for e in wantList.entries: + let idx = peerCtx.peerWants.findIt(it.address == e.address) - logScope: - peer = peerCtx.id - address = e.address - wantType = $e.wantType + logScope: + peer = peerCtx.id + address = e.address + wantType = $e.wantType - if idx < 0: # Adding new entry to peer wants - let - have = await e.address in self.localStore - price = @(self.pricing.get(Pricing(price: 0.u256)).price.toBytesBE) + if idx < 0: # Adding new entry to peer wants + let + have = + try: + await e.address in self.localStore + except CatchableError as exc: + # TODO: should not be necessary once we have proper exception tracking on the BlockStore interface + false + price = @(self.pricing.get(Pricing(price: 0.u256)).price.toBytesBE) - if e.cancel: - trace "Received cancelation for untracked block, skipping", address = e.address - continue + if e.cancel: + trace "Received cancelation for untracked block, skipping", + address = e.address + continue - trace "Processing want list entry", wantList = $e - case e.wantType - of WantType.WantHave: - if have: - presence.add( - BlockPresence( - address: e.address, `type`: BlockPresenceType.Have, price: price - ) - ) - else: - if e.sendDontHave: + trace "Processing want list entry", wantList = $e + case e.wantType + of WantType.WantHave: + if have: presence.add( BlockPresence( - address: e.address, `type`: BlockPresenceType.DontHave, price: price + address: e.address, `type`: BlockPresenceType.Have, price: price ) ) + else: + if e.sendDontHave: + presence.add( + BlockPresence( + address: e.address, `type`: BlockPresenceType.DontHave, price: price + ) + ) - codex_block_exchange_want_have_lists_received.inc() - of WantType.WantBlock: - peerCtx.peerWants.add(e) - schedulePeer = true - codex_block_exchange_want_block_lists_received.inc() - else: # Updating existing entry in peer wants - # peer doesn't want this block anymore - if e.cancel: - trace "Canceling want for block", address = e.address - peerCtx.peerWants.del(idx) - trace "Canceled block request", address = e.address, len = peerCtx.peerWants.len - else: - if e.wantType == WantType.WantBlock: + codex_block_exchange_want_have_lists_received.inc() + of WantType.WantBlock: + peerCtx.peerWants.add(e) schedulePeer = true - # peer might want to ask for the same cid with - # different want params - trace "Updating want for block", address = e.address - peerCtx.peerWants[idx] = e # update entry - trace "Updated block request", address = e.address, len = peerCtx.peerWants.len + codex_block_exchange_want_block_lists_received.inc() + else: # Updating existing entry in peer wants + # peer doesn't want this block anymore + if e.cancel: + trace "Canceling want for block", address = e.address + peerCtx.peerWants.del(idx) + trace "Canceled block request", + address = e.address, len = peerCtx.peerWants.len + else: + if e.wantType == WantType.WantBlock: + schedulePeer = true + # peer might want to ask for the same cid with + # different want params + trace "Updating want for block", address = e.address + peerCtx.peerWants[idx] = e # update entry + trace "Updated block request", + address = e.address, len = peerCtx.peerWants.len - if presence.len > 0: - trace "Sending presence to remote", items = presence.mapIt($it).join(",") - await self.network.request.sendPresence(peer, presence) + if presence.len > 0: + trace "Sending presence to remote", items = presence.mapIt($it).join(",") + await self.network.request.sendPresence(peer, presence) - if schedulePeer and not self.scheduleTask(peerCtx): - warn "Unable to schedule task for peer", peer + if schedulePeer: + self.scheduleTask(peerCtx) + except CancelledError as exc: #TODO: replace with CancelledError + warn "Error processing want list", error = exc.msg -proc accountHandler*(self: BlockExcEngine, peer: PeerId, account: Account) {.async.} = +proc accountHandler*( + self: BlockExcEngine, peer: PeerId, account: Account +) {.async: (raises: []).} = let context = self.peers.get(peer) if context.isNil: return @@ -492,7 +531,7 @@ proc accountHandler*(self: BlockExcEngine, peer: PeerId, account: Account) {.asy proc paymentHandler*( self: BlockExcEngine, peer: PeerId, payment: SignedState -) {.async.} = +) {.async: (raises: []).} = trace "Handling payments", peer without context =? self.peers.get(peer).option and account =? context.account: @@ -505,7 +544,9 @@ proc paymentHandler*( else: context.paymentChannel = self.wallet.acceptChannel(payment).option -proc setupPeer*(self: BlockExcEngine, peer: PeerId) {.async.} = +proc setupPeer*( + self: BlockExcEngine, peer: PeerId +) {.async: (raises: [CancelledError]).} = ## Perform initial setup, such as want ## list exchange ## @@ -524,9 +565,10 @@ proc setupPeer*(self: BlockExcEngine, peer: PeerId) {.async.} = await self.network.request.sendWantList(peer, cids, full = true) if address =? self.pricing .? address: + trace "Sending account to peer", peer await self.network.request.sendAccount(peer, Account(address: address)) -proc dropPeer*(self: BlockExcEngine, peer: PeerId) = +proc dropPeer*(self: BlockExcEngine, peer: PeerId) {.raises: [].} = ## Cleanup disconnected peer ## @@ -535,7 +577,9 @@ proc dropPeer*(self: BlockExcEngine, peer: PeerId) = # drop the peer from the peers table self.peers.remove(peer) -proc taskHandler*(self: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = +proc taskHandler*( + self: BlockExcEngine, task: BlockExcPeerCtx +) {.gcsafe, async: (raises: [CancelledError, RetriesExhaustedError]).} = # Send to the peer blocks he wants to get, # if they present in our local store @@ -572,8 +616,11 @@ proc taskHandler*(self: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} let blocksDeliveryFut = await allFinished(wantsBlocks.map(localLookup)) - blocksDelivery = - blocksDeliveryFut.filterIt(it.completed and it.read.isOk).mapIt(it.read.get) + blocksDelivery = blocksDeliveryFut.filterIt(it.completed and it.value.isOk).mapIt: + if bd =? it.value: + bd + else: + raiseAssert "Unexpected error in local lookup" # All the wants that failed local lookup must be set to not-in-flight again. let @@ -595,15 +642,12 @@ proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} = ## trace "Starting blockexc task runner" - while self.blockexcRunning: - try: + try: + while self.blockexcRunning: let peerCtx = await self.taskQueue.pop() - await self.taskHandler(peerCtx) - except CancelledError: - break # do not propagate as blockexcTaskRunner was asyncSpawned - except CatchableError as e: - error "error running block exchange task", error = e.msgDetail + except CatchableError as exc: + error "error running block exchange task", error = exc.msg info "Exiting blockexc task runner" @@ -644,23 +688,29 @@ proc new*( network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined) network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) - proc blockWantListHandler(peer: PeerId, wantList: WantList): Future[void] {.gcsafe.} = + proc blockWantListHandler( + peer: PeerId, wantList: WantList + ): Future[void] {.async: (raises: []).} = self.wantListHandler(peer, wantList) proc blockPresenceHandler( peer: PeerId, presence: seq[BlockPresence] - ): Future[void] {.gcsafe.} = + ): Future[void] {.async: (raises: []).} = self.blockPresenceHandler(peer, presence) proc blocksDeliveryHandler( peer: PeerId, blocksDelivery: seq[BlockDelivery] - ): Future[void] {.gcsafe.} = + ): Future[void] {.async: (raises: []).} = self.blocksDeliveryHandler(peer, blocksDelivery) - proc accountHandler(peer: PeerId, account: Account): Future[void] {.gcsafe.} = + proc accountHandler( + peer: PeerId, account: Account + ): Future[void] {.async: (raises: []).} = self.accountHandler(peer, account) - proc paymentHandler(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} = + proc paymentHandler( + peer: PeerId, payment: SignedState + ): Future[void] {.async: (raises: []).} = self.paymentHandler(peer, payment) network.handlers = BlockExcHandlers( diff --git a/codex/blockexchange/engine/payments.nim b/codex/blockexchange/engine/payments.nim index 88953976..260a3005 100644 --- a/codex/blockexchange/engine/payments.nim +++ b/codex/blockexchange/engine/payments.nim @@ -7,6 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +{.push raises: [].} + import std/math import pkg/nitro import pkg/questionable/results @@ -15,9 +17,6 @@ import ../peers export nitro export results -push: - {.upraises: [].} - const ChainId* = 0.u256 # invalid chain id for now const Asset* = EthAddress.zero # invalid ERC20 asset address for now const AmountPerChannel = (10'u64 ^ 18).u256 # 1 asset, ERC20 default is 18 decimals diff --git a/codex/blockexchange/network/network.nim b/codex/blockexchange/network/network.nim index daf358de..26c07445 100644 --- a/codex/blockexchange/network/network.nim +++ b/codex/blockexchange/network/network.nim @@ -35,13 +35,15 @@ const DefaultMaxInflight* = 100 type - WantListHandler* = proc(peer: PeerId, wantList: WantList): Future[void] {.gcsafe.} + WantListHandler* = + proc(peer: PeerId, wantList: WantList) {.gcsafe, async: (raises: []).} BlocksDeliveryHandler* = - proc(peer: PeerId, blocks: seq[BlockDelivery]): Future[void] {.gcsafe.} + proc(peer: PeerId, blocks: seq[BlockDelivery]) {.gcsafe, async: (raises: []).} BlockPresenceHandler* = - proc(peer: PeerId, precense: seq[BlockPresence]): Future[void] {.gcsafe.} - AccountHandler* = proc(peer: PeerId, account: Account): Future[void] {.gcsafe.} - PaymentHandler* = proc(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} + proc(peer: PeerId, precense: seq[BlockPresence]) {.gcsafe, async: (raises: []).} + AccountHandler* = proc(peer: PeerId, account: Account) {.gcsafe, async: (raises: []).} + PaymentHandler* = + proc(peer: PeerId, payment: SignedState) {.gcsafe, async: (raises: []).} BlockExcHandlers* = object onWantList*: WantListHandler @@ -58,15 +60,20 @@ type wantType: WantType = WantType.WantHave, full: bool = false, sendDontHave: bool = false, - ): Future[void] {.gcsafe.} - WantCancellationSender* = - proc(peer: PeerId, addresses: seq[BlockAddress]): Future[void] {.gcsafe.} - BlocksDeliverySender* = - proc(peer: PeerId, blocksDelivery: seq[BlockDelivery]): Future[void] {.gcsafe.} - PresenceSender* = - proc(peer: PeerId, presence: seq[BlockPresence]): Future[void] {.gcsafe.} - AccountSender* = proc(peer: PeerId, account: Account): Future[void] {.gcsafe.} - PaymentSender* = proc(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} + ) {.async: (raises: [CancelledError]).} + WantCancellationSender* = proc(peer: PeerId, addresses: seq[BlockAddress]) {. + async: (raises: [CancelledError]) + .} + BlocksDeliverySender* = proc(peer: PeerId, blocksDelivery: seq[BlockDelivery]) {. + async: (raises: [CancelledError]) + .} + PresenceSender* = proc(peer: PeerId, presence: seq[BlockPresence]) {. + async: (raises: [CancelledError]) + .} + AccountSender* = + proc(peer: PeerId, account: Account) {.async: (raises: [CancelledError]).} + PaymentSender* = + proc(peer: PeerId, payment: SignedState) {.async: (raises: [CancelledError]).} BlockExcRequest* = object sendWantList*: WantListSender @@ -98,7 +105,9 @@ proc isSelf*(b: BlockExcNetwork, peer: PeerId): bool = return b.peerId == peer -proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} = +proc send*( + b: BlockExcNetwork, id: PeerId, msg: pb.Message +) {.async: (raises: [CancelledError]).} = ## Send message to peer ## @@ -106,8 +115,9 @@ proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} = trace "Unable to send, peer not found", peerId = id return - let peer = b.peers[id] try: + let peer = b.peers[id] + await b.inflightSema.acquire() await peer.send(msg) except CancelledError as error: @@ -117,7 +127,9 @@ proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} = finally: b.inflightSema.release() -proc handleWantList(b: BlockExcNetwork, peer: NetworkPeer, list: WantList) {.async.} = +proc handleWantList( + b: BlockExcNetwork, peer: NetworkPeer, list: WantList +) {.async: (raises: []).} = ## Handle incoming want list ## @@ -133,7 +145,7 @@ proc sendWantList*( wantType: WantType = WantType.WantHave, full: bool = false, sendDontHave: bool = false, -): Future[void] = +) {.async: (raw: true, raises: [CancelledError]).} = ## Send a want message to peer ## @@ -154,14 +166,14 @@ proc sendWantList*( proc sendWantCancellations*( b: BlockExcNetwork, id: PeerId, addresses: seq[BlockAddress] -): Future[void] {.async.} = +): Future[void] {.async: (raises: [CancelledError]).} = ## Informs a remote peer that we're no longer interested in a set of blocks ## await b.sendWantList(id = id, addresses = addresses, cancel = true) proc handleBlocksDelivery( b: BlockExcNetwork, peer: NetworkPeer, blocksDelivery: seq[BlockDelivery] -) {.async.} = +) {.async: (raises: []).} = ## Handle incoming blocks ## @@ -170,7 +182,7 @@ proc handleBlocksDelivery( proc sendBlocksDelivery*( b: BlockExcNetwork, id: PeerId, blocksDelivery: seq[BlockDelivery] -): Future[void] = +) {.async: (raw: true, raises: [CancelledError]).} = ## Send blocks to remote ## @@ -178,7 +190,7 @@ proc sendBlocksDelivery*( proc handleBlockPresence( b: BlockExcNetwork, peer: NetworkPeer, presence: seq[BlockPresence] -) {.async.} = +) {.async: (raises: []).} = ## Handle block presence ## @@ -187,7 +199,7 @@ proc handleBlockPresence( proc sendBlockPresence*( b: BlockExcNetwork, id: PeerId, presence: seq[BlockPresence] -): Future[void] = +) {.async: (raw: true, raises: [CancelledError]).} = ## Send presence to remote ## @@ -195,20 +207,24 @@ proc sendBlockPresence*( proc handleAccount( network: BlockExcNetwork, peer: NetworkPeer, account: Account -) {.async.} = +) {.async: (raises: []).} = ## Handle account info ## if not network.handlers.onAccount.isNil: await network.handlers.onAccount(peer.id, account) -proc sendAccount*(b: BlockExcNetwork, id: PeerId, account: Account): Future[void] = +proc sendAccount*( + b: BlockExcNetwork, id: PeerId, account: Account +) {.async: (raw: true, raises: [CancelledError]).} = ## Send account info to remote ## b.send(id, Message(account: AccountMessage.init(account))) -proc sendPayment*(b: BlockExcNetwork, id: PeerId, payment: SignedState): Future[void] = +proc sendPayment*( + b: BlockExcNetwork, id: PeerId, payment: SignedState +) {.async: (raw: true, raises: [CancelledError]).} = ## Send payment to remote ## @@ -216,7 +232,7 @@ proc sendPayment*(b: BlockExcNetwork, id: PeerId, payment: SignedState): Future[ proc handlePayment( network: BlockExcNetwork, peer: NetworkPeer, payment: SignedState -) {.async.} = +) {.async: (raises: []).} = ## Handle payment ## @@ -225,7 +241,7 @@ proc handlePayment( proc rpcHandler( b: BlockExcNetwork, peer: NetworkPeer, msg: Message -) {.async: (raises: [CatchableError]).} = +) {.async: (raises: []).} = ## handle rpc messages ## if msg.wantList.entries.len > 0: @@ -250,7 +266,9 @@ proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer = if peer in b.peers: return b.peers.getOrDefault(peer, nil) - var getConn: ConnProvider = proc(): Future[Connection] {.async, gcsafe, closure.} = + var getConn: ConnProvider = proc(): Future[Connection] {. + async: (raises: [CancelledError]) + .} = try: trace "Getting new connection stream", peer return await b.switch.dial(peer, Codec) @@ -262,9 +280,7 @@ proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer = if not isNil(b.getConn): getConn = b.getConn - let rpcHandler = proc( - p: NetworkPeer, msg: Message - ) {.async: (raises: [CatchableError]).} = + let rpcHandler = proc(p: NetworkPeer, msg: Message) {.async: (raises: []).} = await b.rpcHandler(p, msg) # create new pubsub peer @@ -353,26 +369,32 @@ proc new*( wantType: WantType = WantType.WantHave, full: bool = false, sendDontHave: bool = false, - ): Future[void] {.gcsafe.} = + ): Future[void] {.async: (raw: true, raises: [CancelledError]).} = self.sendWantList(id, cids, priority, cancel, wantType, full, sendDontHave) proc sendWantCancellations( id: PeerId, addresses: seq[BlockAddress] - ): Future[void] {.gcsafe.} = + ): Future[void] {.async: (raw: true, raises: [CancelledError]).} = self.sendWantCancellations(id, addresses) proc sendBlocksDelivery( id: PeerId, blocksDelivery: seq[BlockDelivery] - ): Future[void] {.gcsafe.} = + ): Future[void] {.async: (raw: true, raises: [CancelledError]).} = self.sendBlocksDelivery(id, blocksDelivery) - proc sendPresence(id: PeerId, presence: seq[BlockPresence]): Future[void] {.gcsafe.} = + proc sendPresence( + id: PeerId, presence: seq[BlockPresence] + ): Future[void] {.async: (raw: true, raises: [CancelledError]).} = self.sendBlockPresence(id, presence) - proc sendAccount(id: PeerId, account: Account): Future[void] {.gcsafe.} = + proc sendAccount( + id: PeerId, account: Account + ): Future[void] {.async: (raw: true, raises: [CancelledError]).} = self.sendAccount(id, account) - proc sendPayment(id: PeerId, payment: SignedState): Future[void] {.gcsafe.} = + proc sendPayment( + id: PeerId, payment: SignedState + ): Future[void] {.async: (raw: true, raises: [CancelledError]).} = self.sendPayment(id, payment) self.request = BlockExcRequest( diff --git a/codex/blockexchange/network/networkpeer.nim b/codex/blockexchange/network/networkpeer.nim index 4a100340..66c39294 100644 --- a/codex/blockexchange/network/networkpeer.nim +++ b/codex/blockexchange/network/networkpeer.nim @@ -7,9 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import pkg/upraises -push: - {.upraises: [].} +{.push raises: [].} import pkg/chronos import pkg/libp2p @@ -18,6 +16,7 @@ import ../protobuf/blockexc import ../protobuf/message import ../../errors import ../../logutils +import ../../utils/trackedfutures logScope: topics = "codex blockexcnetworkpeer" @@ -25,11 +24,10 @@ logScope: const DefaultYieldInterval = 50.millis type - ConnProvider* = proc(): Future[Connection] {.gcsafe, closure.} + ConnProvider* = + proc(): Future[Connection] {.gcsafe, async: (raises: [CancelledError]).} - RPCHandler* = proc( - peer: NetworkPeer, msg: Message - ): Future[void].Raising(CatchableError) {.gcsafe.} + RPCHandler* = proc(peer: NetworkPeer, msg: Message) {.gcsafe, async: (raises: []).} NetworkPeer* = ref object of RootObj id*: PeerId @@ -37,55 +35,60 @@ type sendConn: Connection getConn: ConnProvider yieldInterval*: Duration = DefaultYieldInterval + trackedFutures: TrackedFutures -proc connected*(b: NetworkPeer): bool = - not (isNil(b.sendConn)) and not (b.sendConn.closed or b.sendConn.atEof) +proc connected*(self: NetworkPeer): bool = + not (isNil(self.sendConn)) and not (self.sendConn.closed or self.sendConn.atEof) -proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} = +proc readLoop*(self: NetworkPeer, conn: Connection) {.async: (raises: []).} = if isNil(conn): - trace "No connection to read from", peer = b.id + trace "No connection to read from", peer = self.id return - trace "Attaching read loop", peer = b.id, connId = conn.oid + trace "Attaching read loop", peer = self.id, connId = conn.oid try: - var nextYield = Moment.now() + b.yieldInterval + var nextYield = Moment.now() + self.yieldInterval while not conn.atEof or not conn.closed: if Moment.now() > nextYield: - nextYield = Moment.now() + b.yieldInterval + nextYield = Moment.now() + self.yieldInterval trace "Yielding in read loop", - peer = b.id, nextYield = nextYield, interval = b.yieldInterval + peer = self.id, nextYield = nextYield, interval = self.yieldInterval await sleepAsync(10.millis) let data = await conn.readLp(MaxMessageSize.int) msg = Message.protobufDecode(data).mapFailure().tryGet() - trace "Received message", peer = b.id, connId = conn.oid - await b.handler(b, msg) + trace "Received message", peer = self.id, connId = conn.oid + await self.handler(self, msg) except CancelledError: trace "Read loop cancelled" except CatchableError as err: warn "Exception in blockexc read loop", msg = err.msg finally: - trace "Detaching read loop", peer = b.id, connId = conn.oid + trace "Detaching read loop", peer = self.id, connId = conn.oid await conn.close() -proc connect*(b: NetworkPeer): Future[Connection] {.async.} = - if b.connected: - trace "Already connected", peer = b.id, connId = b.sendConn.oid - return b.sendConn +proc connect*( + self: NetworkPeer +): Future[Connection] {.async: (raises: [CancelledError]).} = + if self.connected: + trace "Already connected", peer = self.id, connId = self.sendConn.oid + return self.sendConn - b.sendConn = await b.getConn() - asyncSpawn b.readLoop(b.sendConn) - return b.sendConn + self.sendConn = await self.getConn() + self.trackedFutures.track(self.readLoop(self.sendConn)) + return self.sendConn -proc send*(b: NetworkPeer, msg: Message) {.async.} = - let conn = await b.connect() +proc send*( + self: NetworkPeer, msg: Message +) {.async: (raises: [CancelledError, LPStreamError]).} = + let conn = await self.connect() if isNil(conn): - warn "Unable to get send connection for peer message not sent", peer = b.id + warn "Unable to get send connection for peer message not sent", peer = self.id return - trace "Sending message", peer = b.id, connId = conn.oid + trace "Sending message", peer = self.id, connId = conn.oid await conn.writeLp(protobufEncode(msg)) func new*( @@ -96,4 +99,9 @@ func new*( ): NetworkPeer = doAssert(not isNil(connProvider), "should supply connection provider") - NetworkPeer(id: peer, getConn: connProvider, handler: rpcHandler) + NetworkPeer( + id: peer, + getConn: connProvider, + handler: rpcHandler, + trackedFutures: TrackedFutures(), + ) diff --git a/codex/blockexchange/peers/peerctxstore.nim b/codex/blockexchange/peers/peerctxstore.nim index 739d92b5..ce2506a8 100644 --- a/codex/blockexchange/peers/peerctxstore.nim +++ b/codex/blockexchange/peers/peerctxstore.nim @@ -7,16 +7,13 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +{.push raises: [].} + import std/sequtils import std/tables import std/algorithm import std/sequtils -import pkg/upraises - -push: - {.upraises: [].} - import pkg/chronos import pkg/libp2p diff --git a/codex/blockexchange/protobuf/payments.nim b/codex/blockexchange/protobuf/payments.nim index 5d010a81..885562c4 100644 --- a/codex/blockexchange/protobuf/payments.nim +++ b/codex/blockexchange/protobuf/payments.nim @@ -1,8 +1,9 @@ +{.push raises: [].} + import pkg/stew/byteutils import pkg/stint import pkg/nitro import pkg/questionable -import pkg/upraises import ./blockexc export AccountMessage @@ -11,9 +12,6 @@ export StateChannelUpdate export stint export nitro -push: - {.upraises: [].} - type Account* = object address*: EthAddress diff --git a/codex/blockexchange/protobuf/presence.nim b/codex/blockexchange/protobuf/presence.nim index d941746d..3b24a570 100644 --- a/codex/blockexchange/protobuf/presence.nim +++ b/codex/blockexchange/protobuf/presence.nim @@ -1,8 +1,9 @@ +{.push raises: [].} + import libp2p import pkg/stint import pkg/questionable import pkg/questionable/results -import pkg/upraises import ./blockexc import ../../blocktype @@ -11,9 +12,6 @@ export questionable export stint export BlockPresenceType -upraises.push: - {.upraises: [].} - type PresenceMessage* = blockexc.BlockPresence Presence* = object diff --git a/codex/chunker.nim b/codex/chunker.nim index f735aa4b..908dd0c0 100644 --- a/codex/chunker.nim +++ b/codex/chunker.nim @@ -28,8 +28,11 @@ const DefaultChunkSize* = DefaultBlockSize type # default reader type + ChunkerError* = object of CatchableError ChunkBuffer* = ptr UncheckedArray[byte] - Reader* = proc(data: ChunkBuffer, len: int): Future[int] {.gcsafe, raises: [Defect].} + Reader* = proc(data: ChunkBuffer, len: int): Future[int] {. + gcsafe, async: (raises: [ChunkerError, CancelledError]) + .} # Reader that splits input data into fixed-size chunks Chunker* = ref object @@ -74,7 +77,7 @@ proc new*( proc reader( data: ChunkBuffer, len: int - ): Future[int] {.gcsafe, async, raises: [Defect].} = + ): Future[int] {.gcsafe, async: (raises: [ChunkerError, CancelledError]).} = var res = 0 try: while res < len: @@ -85,7 +88,7 @@ proc new*( raise error except LPStreamError as error: error "LPStream error", err = error.msg - raise error + raise newException(ChunkerError, "LPStream error", error) except CatchableError as exc: error "CatchableError exception", exc = exc.msg raise newException(Defect, exc.msg) @@ -102,7 +105,7 @@ proc new*( proc reader( data: ChunkBuffer, len: int - ): Future[int] {.gcsafe, async, raises: [Defect].} = + ): Future[int] {.gcsafe, async: (raises: [ChunkerError, CancelledError]).} = var total = 0 try: while total < len: diff --git a/codex/codex.nim b/codex/codex.nim index 8a03510c..391a94fc 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -177,14 +177,20 @@ proc start*(s: CodexServer) {.async.} = proc stop*(s: CodexServer) {.async.} = notice "Stopping codex node" - await allFuturesThrowing( - s.restServer.stop(), - s.codexNode.switch.stop(), - s.codexNode.stop(), - s.repoStore.stop(), - s.maintenance.stop(), + let res = await noCancel allFinishedFailed( + @[ + s.restServer.stop(), + s.codexNode.switch.stop(), + s.codexNode.stop(), + s.repoStore.stop(), + s.maintenance.stop(), + ] ) + if res.failure.len > 0: + error "Failed to stop codex node", failures = res.failure.len + raiseAssert "Failed to stop codex node" + proc new*( T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey ): CodexServer = diff --git a/codex/contracts/clock.nim b/codex/contracts/clock.nim index b5bf7ebb..b7863539 100644 --- a/codex/contracts/clock.nim +++ b/codex/contracts/clock.nim @@ -5,6 +5,7 @@ import pkg/chronos import pkg/stint import ../clock import ../conf +import ../utils/trackedfutures export clock @@ -18,9 +19,12 @@ type OnChainClock* = ref object of Clock blockNumber: UInt256 started: bool newBlock: AsyncEvent + trackedFutures: TrackedFutures proc new*(_: type OnChainClock, provider: Provider): OnChainClock = - OnChainClock(provider: provider, newBlock: newAsyncEvent()) + OnChainClock( + provider: provider, newBlock: newAsyncEvent(), trackedFutures: TrackedFutures() + ) proc update(clock: OnChainClock, blck: Block) = if number =? blck.number and number > clock.blockNumber: @@ -32,15 +36,12 @@ proc update(clock: OnChainClock, blck: Block) = blockTime = blck.timestamp, blockNumber = number, offset = clock.offset clock.newBlock.fire() -proc update(clock: OnChainClock) {.async.} = +proc update(clock: OnChainClock) {.async: (raises: []).} = try: if latest =? (await clock.provider.getBlock(BlockTag.latest)): clock.update(latest) - except CancelledError as error: - raise error except CatchableError as error: debug "error updating clock: ", error = error.msg - discard method start*(clock: OnChainClock) {.async.} = if clock.started: @@ -52,7 +53,7 @@ method start*(clock: OnChainClock) {.async.} = return # ignore block parameter; hardhat may call this with pending blocks - asyncSpawn clock.update() + clock.trackedFutures.track(clock.update()) await clock.update() @@ -64,6 +65,7 @@ method stop*(clock: OnChainClock) {.async.} = return await clock.subscription.unsubscribe() + await clock.trackedFutures.cancelTracked() clock.started = false method now*(clock: OnChainClock): SecondsSince1970 = diff --git a/codex/discovery.nim b/codex/discovery.nim index 9aa8c7d8..eed1f89b 100644 --- a/codex/discovery.nim +++ b/codex/discovery.nim @@ -7,6 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +{.push raises: [].} + import std/algorithm import std/sequtils @@ -54,70 +56,122 @@ proc toNodeId*(host: ca.Address): NodeId = readUintBE[256](keccak256.digest(host.toArray).data) -proc findPeer*(d: Discovery, peerId: PeerId): Future[?PeerRecord] {.async.} = +proc findPeer*( + d: Discovery, peerId: PeerId +): Future[?PeerRecord] {.async: (raises: [CancelledError]).} = trace "protocol.resolve..." ## Find peer using the given Discovery object ## - let node = await d.protocol.resolve(toNodeId(peerId)) - return - if node.isSome(): - node.get().record.data.some - else: - PeerRecord.none + try: + let node = await d.protocol.resolve(toNodeId(peerId)) -method find*(d: Discovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async, base.} = + return + if node.isSome(): + node.get().record.data.some + else: + PeerRecord.none + except CancelledError as exc: + warn "Error finding peer", peerId = peerId, exc = exc.msg + raise exc + except CatchableError as exc: + warn "Error finding peer", peerId = peerId, exc = exc.msg + + return PeerRecord.none + +method find*( + d: Discovery, cid: Cid +): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]), base.} = ## Find block providers ## - without providers =? (await d.protocol.getProviders(cid.toNodeId())).mapFailure, error: - warn "Error finding providers for block", cid, error = error.msg - return providers.filterIt(not (it.data.peerId == d.peerId)) + try: + without providers =? (await d.protocol.getProviders(cid.toNodeId())).mapFailure, + error: + warn "Error finding providers for block", cid, error = error.msg -method provide*(d: Discovery, cid: Cid) {.async, base.} = + return providers.filterIt(not (it.data.peerId == d.peerId)) + except CancelledError as exc: + warn "Error finding providers for block", cid, exc = exc.msg + raise exc + except CatchableError as exc: + warn "Error finding providers for block", cid, exc = exc.msg + +method provide*(d: Discovery, cid: Cid) {.async: (raises: [CancelledError]), base.} = ## Provide a block Cid ## - let nodes = await d.protocol.addProvider(cid.toNodeId(), d.providerRecord.get) + try: + let nodes = await d.protocol.addProvider(cid.toNodeId(), d.providerRecord.get) - if nodes.len <= 0: - warn "Couldn't provide to any nodes!" + if nodes.len <= 0: + warn "Couldn't provide to any nodes!" + except CancelledError as exc: + warn "Error providing block", cid, exc = exc.msg + raise exc + except CatchableError as exc: + warn "Error providing block", cid, exc = exc.msg method find*( d: Discovery, host: ca.Address -): Future[seq[SignedPeerRecord]] {.async, base.} = +): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]), base.} = ## Find host providers ## - trace "Finding providers for host", host = $host - without var providers =? (await d.protocol.getProviders(host.toNodeId())).mapFailure, - error: - trace "Error finding providers for host", host = $host, exc = error.msg - return + try: + trace "Finding providers for host", host = $host + without var providers =? (await d.protocol.getProviders(host.toNodeId())).mapFailure, + error: + trace "Error finding providers for host", host = $host, exc = error.msg + return - if providers.len <= 0: - trace "No providers found", host = $host - return + if providers.len <= 0: + trace "No providers found", host = $host + return - providers.sort do(a, b: SignedPeerRecord) -> int: - system.cmp[uint64](a.data.seqNo, b.data.seqNo) + providers.sort do(a, b: SignedPeerRecord) -> int: + system.cmp[uint64](a.data.seqNo, b.data.seqNo) - return providers + return providers + except CancelledError as exc: + warn "Error finding providers for host", host = $host, exc = exc.msg + raise exc + except CatchableError as exc: + warn "Error finding providers for host", host = $host, exc = exc.msg -method provide*(d: Discovery, host: ca.Address) {.async, base.} = +method provide*( + d: Discovery, host: ca.Address +) {.async: (raises: [CancelledError]), base.} = ## Provide hosts ## - trace "Providing host", host = $host - let nodes = await d.protocol.addProvider(host.toNodeId(), d.providerRecord.get) - if nodes.len > 0: - trace "Provided to nodes", nodes = nodes.len + try: + trace "Providing host", host = $host + let nodes = await d.protocol.addProvider(host.toNodeId(), d.providerRecord.get) + if nodes.len > 0: + trace "Provided to nodes", nodes = nodes.len + except CancelledError as exc: + warn "Error providing host", host = $host, exc = exc.msg + raise exc + except CatchableError as exc: + warn "Error providing host", host = $host, exc = exc.msg -method removeProvider*(d: Discovery, peerId: PeerId): Future[void] {.base, gcsafe.} = +method removeProvider*( + d: Discovery, peerId: PeerId +): Future[void] {.base, gcsafe, async: (raises: [CancelledError]).} = ## Remove provider from providers table ## trace "Removing provider", peerId - d.protocol.removeProvidersLocal(peerId) + try: + await d.protocol.removeProvidersLocal(peerId) + except CancelledError as exc: + warn "Error removing provider", peerId = peerId, exc = exc.msg + raise exc + except CatchableError as exc: + warn "Error removing provider", peerId = peerId, exc = exc.msg + except Exception as exc: # Something in discv5 is raising Exception + warn "Error removing provider", peerId = peerId, exc = exc.msg + raiseAssert("Unexpected Exception in removeProvider") proc updateAnnounceRecord*(d: Discovery, addrs: openArray[MultiAddress]) = ## Update providers record @@ -145,12 +199,18 @@ proc updateDhtRecord*(d: Discovery, addrs: openArray[MultiAddress]) = if not d.protocol.isNil: d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR") -proc start*(d: Discovery) {.async.} = - d.protocol.open() - await d.protocol.start() +proc start*(d: Discovery) {.async: (raises: []).} = + try: + d.protocol.open() + await d.protocol.start() + except CatchableError as exc: + error "Error starting discovery", exc = exc.msg -proc stop*(d: Discovery) {.async.} = - await d.protocol.closeWait() +proc stop*(d: Discovery) {.async: (raises: []).} = + try: + await noCancel d.protocol.closeWait() + except CatchableError as exc: + error "Error stopping discovery", exc = exc.msg proc new*( T: type Discovery, diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 107f85bc..78ce3971 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -330,7 +330,7 @@ proc encodeAsync*( defer: freeDoubleArray(blockData, blocksLen) - ## Create an ecode task with block data + ## Create an ecode task with block data var task = EncodeTask( erasure: addr self, blockSize: blockSize, @@ -540,7 +540,7 @@ proc decodeAsync*( freeDoubleArray(blocksData, blocksLen) freeDoubleArray(parityData, parityLen) - ## Create an decode task with block data + ## Create an decode task with block data var task = DecodeTask( erasure: addr self, blockSize: blockSize, diff --git a/codex/errors.nim b/codex/errors.nim index 75cefde4..fadf7299 100644 --- a/codex/errors.nim +++ b/codex/errors.nim @@ -19,6 +19,8 @@ type CodexError* = object of CatchableError # base codex error CodexResult*[T] = Result[T, ref CodexError] + FinishedFailed*[T] = tuple[success: seq[Future[T]], failure: seq[Future[T]]] + template mapFailure*[T, V, E]( exp: Result[T, V], exc: typedesc[E] ): Result[T, ref CatchableError] = @@ -40,35 +42,18 @@ func toFailure*[T](exp: Option[T]): Result[T, ref CatchableError] {.inline.} = else: T.failure("Option is None") -# allFuturesThrowing was moved to the tests in libp2p -proc allFuturesThrowing*[T](args: varargs[Future[T]]): Future[void] = - var futs: seq[Future[T]] - for fut in args: - futs &= fut - proc call() {.async.} = - var first: ref CatchableError = nil - futs = await allFinished(futs) - for fut in futs: - if fut.failed: - let err = fut.readError() - if err of Defect: - raise err - else: - if err of CancelledError: - raise err - if isNil(first): - first = err - if not isNil(first): - raise first +proc allFinishedFailed*[T](futs: seq[Future[T]]): Future[FinishedFailed[T]] {.async.} = + ## Check if all futures have finished or failed + ## + ## TODO: wip, not sure if we want this - at the minimum, + ## we should probably avoid the async transform - return call() + var res: FinishedFailed[T] = (@[], @[]) + await allFutures(futs) + for f in futs: + if f.failed: + res.failure.add f + else: + res.success.add f -proc allFutureResult*[T](fut: seq[Future[T]]): Future[?!void] {.async.} = - try: - await allFuturesThrowing(fut) - except CancelledError as exc: - raise exc - except CatchableError as exc: - return failure(exc.msg) - - return success() + return res diff --git a/codex/node.nim b/codex/node.nim index b248e6df..203e034a 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -153,7 +153,11 @@ proc updateExpiry*( let ensuringFutures = Iter[int].new(0 ..< manifest.blocksCount).mapIt( self.networkStore.localStore.ensureExpiry(manifest.treeCid, it, expiry) ) - await allFuturesThrowing(ensuringFutures) + + let res = await allFinishedFailed(ensuringFutures) + if res.failure.len > 0: + trace "Some blocks failed to update expiry", len = res.failure.len + return failure("Some blocks failed to update expiry (" & $res.failure.len & " )") except CancelledError as exc: raise exc except CatchableError as exc: @@ -186,8 +190,10 @@ proc fetchBatched*( if not (await address in self.networkStore) or fetchLocal: self.networkStore.getBlock(address) - if blocksErr =? (await allFutureResult(blocks)).errorOption: - return failure(blocksErr) + let res = await allFinishedFailed(blocks) + if res.failure.len > 0: + trace "Some blocks failed to fetch", len = res.failure.len + return failure("Some blocks failed to fetch (" & $res.failure.len & " )") if not onBatch.isNil and batchErr =? (await onBatch(blocks.mapIt(it.read.get))).errorOption: @@ -213,6 +219,30 @@ proc fetchBatched*( let iter = Iter[int].new(0 ..< manifest.blocksCount) self.fetchBatched(manifest.treeCid, iter, batchSize, onBatch, fetchLocal) +proc fetchDatasetAsync*( + self: CodexNodeRef, manifest: Manifest, fetchLocal = true +): Future[void] {.async: (raises: []).} = + ## Asynchronously fetch a dataset in the background. + ## This task will be tracked and cleaned up on node shutdown. + ## + try: + if err =? ( + await self.fetchBatched( + manifest = manifest, batchSize = DefaultFetchBatch, fetchLocal = fetchLocal + ) + ).errorOption: + error "Unable to fetch blocks", err = err.msg + except CancelledError as exc: + trace "Cancelled fetching blocks", exc = exc.msg + except CatchableError as exc: + error "Error fetching blocks", exc = exc.msg + +proc fetchDatasetAsyncTask*(self: CodexNodeRef, manifest: Manifest) = + ## Start fetching a dataset in the background. + ## The task will be tracked and cleaned up on node shutdown. + ## + self.trackedFutures.track(self.fetchDatasetAsync(manifest, fetchLocal = false)) + proc streamSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async.} = ## Streams the contents of a single block. ## @@ -223,36 +253,27 @@ proc streamSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async without blk =? (await self.networkStore.getBlock(BlockAddress.init(cid))), err: return failure(err) - proc streamOneBlock(): Future[void] {.async.} = + proc streamOneBlock(): Future[void] {.async: (raises: []).} = try: + defer: + await stream.pushEof() await stream.pushData(blk.data) except CatchableError as exc: trace "Unable to send block", cid, exc = exc.msg - discard - finally: - await stream.pushEof() self.trackedFutures.track(streamOneBlock()) LPStream(stream).success proc streamEntireDataset( - self: CodexNodeRef, - manifest: Manifest, - manifestCid: Cid, - prefetchBatch = DefaultFetchBatch, + self: CodexNodeRef, manifest: Manifest, manifestCid: Cid ): Future[?!LPStream] {.async.} = ## Streams the contents of the entire dataset described by the manifest. - ## Background jobs (erasure decoding and prefetching) will be cancelled when - ## the stream is closed. ## trace "Retrieving blocks from manifest", manifestCid - let stream = LPStream(StoreStream.new(self.networkStore, manifest, pad = false)) - var jobs: seq[Future[void]] - if manifest.protected: # Retrieve, decode and save to the local store all EС groups - proc erasureJob(): Future[void] {.async.} = + proc erasureJob(): Future[void] {.async: (raises: []).} = try: # Spawn an erasure decoding job let erasure = Erasure.new( @@ -260,36 +281,17 @@ proc streamEntireDataset( ) without _ =? (await erasure.decode(manifest)), error: error "Unable to erasure decode manifest", manifestCid, exc = error.msg - except CancelledError: - trace "Erasure job cancelled", manifestCid except CatchableError as exc: trace "Error erasure decoding manifest", manifestCid, exc = exc.msg - jobs.add(erasureJob()) + self.trackedFutures.track(erasureJob()) - proc prefetch(): Future[void] {.async.} = - try: - if err =? - (await self.fetchBatched(manifest, prefetchBatch, fetchLocal = false)).errorOption: - error "Unable to fetch blocks", err = err.msg - except CancelledError: - trace "Prefetch job cancelled" - except CatchableError as exc: - error "Error fetching blocks", exc = exc.msg - - jobs.add(prefetch()) - - # Monitor stream completion and cancel background jobs when done - proc monitorStream() {.async.} = - try: - await stream.join() - finally: - await allFutures(jobs.mapIt(it.cancelAndWait)) - - self.trackedFutures.track(monitorStream()) + self.trackedFutures.track(self.fetchDatasetAsync(manifest, fetchLocal = false)) + # prefetch task should not fetch from local store + # Retrieve all blocks of the dataset sequentially from the local store or network trace "Creating store stream for manifest", manifestCid - stream.success + LPStream(StoreStream.new(self.networkStore, manifest, pad = false)).success proc retrieve*( self: CodexNodeRef, cid: Cid, local: bool = true @@ -632,8 +634,11 @@ proc onStore( let ensureExpiryFutures = blocks.mapIt(self.networkStore.ensureExpiry(it.cid, expiry.toSecondsSince1970)) - if updateExpiryErr =? (await allFutureResult(ensureExpiryFutures)).errorOption: - return failure(updateExpiryErr) + + let res = await allFinishedFailed(ensureExpiryFutures) + if res.failure.len > 0: + trace "Some blocks failed to update expiry", len = res.failure.len + return failure("Some blocks failed to update expiry (" & $res.failure.len & " )") if not blocksCb.isNil and err =? (await blocksCb(blocks)).errorOption: trace "Unable to process blocks", err = err.msg diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 7cb0b43f..054e1c2b 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -315,15 +315,8 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute error "Failed to fetch manifest", err = err.msg return RestApiResponse.error(Http404, err.msg, headers = headers) - proc fetchDatasetAsync(): Future[void] {.async.} = - try: - if err =? (await node.fetchBatched(manifest)).errorOption: - error "Unable to fetch dataset", cid = cid.get(), err = err.msg - except CatchableError as exc: - error "CatchableError when fetching dataset", cid = cid.get(), exc = exc.msg - discard - - asyncSpawn fetchDatasetAsync() + # Start fetching the dataset in the background + node.fetchDatasetAsyncTask(manifest) let json = %formatManifest(cid.get(), manifest) return RestApiResponse.response($json, contentType = "application/json") diff --git a/codex/sales.nim b/codex/sales.nim index e2a884df..998a2967 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -341,48 +341,51 @@ proc onSlotFreed(sales: Sales, requestId: RequestId, slotIndex: uint64) = trace "slot freed, adding to queue" - proc addSlotToQueue() {.async: (raises: [CancelledError]).} = + proc addSlotToQueue() {.async: (raises: []).} = let context = sales.context let market = context.market let queue = context.slotQueue - without request =? (await market.getRequest(requestId)), err: - error "unknown request in contract", error = err.msgDetail - return + try: + without request =? (await market.getRequest(requestId)), err: + error "unknown request in contract", error = err.msgDetail + return - # Take the repairing state into consideration to calculate the collateral. - # This is particularly needed because it will affect the priority in the queue - # and we want to give the user the ability to tweak the parameters. - # Adding the repairing state directly in the queue priority calculation - # would not allow this flexibility. - without collateral =? - market.slotCollateral(request.ask.collateralPerSlot, SlotState.Repair), err: - error "Failed to add freed slot to queue: unable to calculate collateral", - error = err.msg - return + # Take the repairing state into consideration to calculate the collateral. + # This is particularly needed because it will affect the priority in the queue + # and we want to give the user the ability to tweak the parameters. + # Adding the repairing state directly in the queue priority calculation + # would not allow this flexibility. + without collateral =? + market.slotCollateral(request.ask.collateralPerSlot, SlotState.Repair), err: + error "Failed to add freed slot to queue: unable to calculate collateral", + error = err.msg + return - if slotIndex > uint16.high.uint64: - error "Cannot cast slot index to uint16, value = ", slotIndex - return + if slotIndex > uint16.high.uint64: + error "Cannot cast slot index to uint16, value = ", slotIndex + return - without slotQueueItem =? - SlotQueueItem.init(request, slotIndex.uint16, collateral = collateral).catch, err: - warn "Too many slots, cannot add to queue", error = err.msgDetail - return + without slotQueueItem =? + SlotQueueItem.init(request, slotIndex.uint16, collateral = collateral).catch, + err: + warn "Too many slots, cannot add to queue", error = err.msgDetail + return - if err =? queue.push(slotQueueItem).errorOption: - if err of SlotQueueItemExistsError: - error "Failed to push item to queue becaue it already exists", - error = err.msgDetail - elif err of QueueNotRunningError: - warn "Failed to push item to queue becaue queue is not running", - error = err.msgDetail + if err =? queue.push(slotQueueItem).errorOption: + if err of SlotQueueItemExistsError: + error "Failed to push item to queue becaue it already exists", + error = err.msgDetail + elif err of QueueNotRunningError: + warn "Failed to push item to queue becaue queue is not running", + error = err.msgDetail + except CatchableError as e: + warn "Failed to add slot to queue", error = e.msg # We could get rid of this by adding the storage ask in the SlotFreed event, # so we would not need to call getRequest to get the collateralPerSlot. let fut = addSlotToQueue() sales.trackedFutures.track(fut) - asyncSpawn fut proc subscribeRequested(sales: Sales) {.async.} = let context = sales.context @@ -522,7 +525,9 @@ proc startSlotQueue(sales: Sales) = let slotQueue = sales.context.slotQueue let reservations = sales.context.reservations - slotQueue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = + slotQueue.onProcessSlot = proc( + item: SlotQueueItem, done: Future[void] + ) {.async: (raises: []).} = trace "processing slot queue item", reqId = item.requestId, slotIdx = item.slotIndex sales.processSlot(item, done) diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index e6328a83..f0abf3ee 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -103,7 +103,6 @@ proc subscribeCancellation(agent: SalesAgent) {.async.} = error "Error while waiting for expiry to lapse", error = e.msgDetail data.cancelled = onCancelled() - asyncSpawn data.cancelled method onFulfilled*( agent: SalesAgent, requestId: RequestId diff --git a/codex/sales/slotqueue.nim b/codex/sales/slotqueue.nim index fa57a983..60700d44 100644 --- a/codex/sales/slotqueue.nim +++ b/codex/sales/slotqueue.nim @@ -3,7 +3,6 @@ import std/tables import pkg/chronos import pkg/questionable import pkg/questionable/results -import pkg/upraises import ../errors import ../clock import ../logutils @@ -17,8 +16,9 @@ logScope: topics = "marketplace slotqueue" type - OnProcessSlot* = - proc(item: SlotQueueItem, done: Future[void]): Future[void] {.gcsafe, upraises: [].} + OnProcessSlot* = proc(item: SlotQueueItem, done: Future[void]): Future[void] {. + gcsafe, async: (raises: []) + .} # Non-ref obj copies value when assigned, preventing accidental modification # of values which could cause an incorrect order (eg @@ -26,7 +26,7 @@ type # but the heap invariant would no longer be honoured. When non-ref, the # compiler can ensure that statement will fail). SlotQueueWorker = object - doneProcessing*: Future[void] + doneProcessing*: Future[void].Raising([]) SlotQueueItem* = object requestId: RequestId @@ -126,7 +126,17 @@ proc new*( # `newAsyncQueue` procedure proc init(_: type SlotQueueWorker): SlotQueueWorker = - SlotQueueWorker(doneProcessing: newFuture[void]("slotqueue.worker.processing")) + let workerFut = Future[void].Raising([]).init( + "slotqueue.worker.processing", {FutureFlag.OwnCancelSchedule} + ) + + workerFut.cancelCallback = proc(data: pointer) {.raises: [].} = + # this is equivalent to try: ... except CatchableError: ... + if not workerFut.finished: + workerFut.complete() + trace "Cancelling `SlotQueue` worker processing future" + + SlotQueueWorker(doneProcessing: workerFut) proc init*( _: type SlotQueueItem, @@ -419,7 +429,6 @@ proc run(self: SlotQueue) {.async: (raises: []).} = let fut = self.dispatch(worker, item) self.trackedFutures.track(fut) - asyncSpawn fut await sleepAsync(1.millis) # poll except CancelledError: @@ -447,7 +456,6 @@ proc start*(self: SlotQueue) = let fut = self.run() self.trackedFutures.track(fut) - asyncSpawn fut proc stop*(self: SlotQueue) {.async.} = if not self.running: diff --git a/codex/slots/builder/builder.nim b/codex/slots/builder/builder.nim index 30332f1c..1ea57a0f 100644 --- a/codex/slots/builder/builder.nim +++ b/codex/slots/builder/builder.nim @@ -315,13 +315,15 @@ proc new*[T, H]( cellSize = cellSize if (manifest.blocksCount mod manifest.numSlots) != 0: - trace "Number of blocks must be divisable by number of slots." - return failure("Number of blocks must be divisable by number of slots.") + const msg = "Number of blocks must be divisible by number of slots." + trace msg + return failure(msg) let cellSize = if manifest.verifiable: manifest.cellSize else: cellSize if (manifest.blockSize mod cellSize) != 0.NBytes: - trace "Block size must be divisable by cell size." - return failure("Block size must be divisable by cell size.") + const msg = "Block size must be divisible by cell size." + trace msg + return failure(msg) let numSlotBlocks = manifest.numSlotBlocks diff --git a/codex/slots/proofs/prover.nim b/codex/slots/proofs/prover.nim index 36fc0a05..b1aa77c0 100644 --- a/codex/slots/proofs/prover.nim +++ b/codex/slots/proofs/prover.nim @@ -38,7 +38,9 @@ type AnyProof* = CircomProof AnySampler* = Poseidon2Sampler + # add any other generic type here, eg. Poseidon2Sampler | ReinforceConcreteSampler AnyBuilder* = Poseidon2Builder + # add any other generic type here, eg. Poseidon2Builder | ReinforceConcreteBuilder AnyProofInputs* = ProofInputs[Poseidon2Hash] Prover* = ref object of RootObj diff --git a/codex/streams/asyncstreamwrapper.nim b/codex/streams/asyncstreamwrapper.nim index 6d5e703a..6708816d 100644 --- a/codex/streams/asyncstreamwrapper.nim +++ b/codex/streams/asyncstreamwrapper.nim @@ -57,6 +57,8 @@ template withExceptions(body: untyped) = raise newLPStreamEOFError() except AsyncStreamError as exc: raise newException(LPStreamError, exc.msg) + except CatchableError as exc: + raise newException(Defect, "Unexpected error in AsyncStreamWrapper", exc) method readOnce*( self: AsyncStreamWrapper, pbytes: pointer, nbytes: int @@ -74,11 +76,13 @@ method readOnce*( proc completeWrite( self: AsyncStreamWrapper, fut: Future[void], msgLen: int -): Future[void] {.async.} = +): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} = withExceptions: await fut -method write*(self: AsyncStreamWrapper, msg: seq[byte]): Future[void] = +method write*( + self: AsyncStreamWrapper, msg: seq[byte] +): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true).} = # Avoid a copy of msg being kept in the closure created by `{.async.}` as this # drives up memory usage diff --git a/codex/streams/storestream.nim b/codex/streams/storestream.nim index a68e2ea7..64a356de 100644 --- a/codex/streams/storestream.nim +++ b/codex/streams/storestream.nim @@ -67,13 +67,9 @@ method atEof*(self: StoreStream): bool = self.offset >= self.size type LPStreamReadError* = object of LPStreamError - par*: ref CatchableError proc newLPStreamReadError*(p: ref CatchableError): ref LPStreamReadError = - var w = newException(LPStreamReadError, "Read stream failed") - w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg - w.par = p - result = w + newException(LPStreamReadError, "Read stream failed", p) method readOnce*( self: StoreStream, pbytes: pointer, nbytes: int diff --git a/codex/utils/asyncstatemachine.nim b/codex/utils/asyncstatemachine.nim index 2d87ebc1..eb84378c 100644 --- a/codex/utils/asyncstatemachine.nim +++ b/codex/utils/asyncstatemachine.nim @@ -74,7 +74,6 @@ proc scheduler(machine: Machine) {.async: (raises: []).} = debug "enter state", state = fromState & " => " & $machine.state running = machine.run(machine.state) machine.trackedFutures.track(running) - asyncSpawn running except CancelledError: break # do not propagate bc it is asyncSpawned @@ -88,7 +87,6 @@ proc start*(machine: Machine, initialState: State) = machine.started = true let fut = machine.scheduler() machine.trackedFutures.track(fut) - asyncSpawn fut machine.schedule(Event.transition(machine.state, initialState)) proc stop*(machine: Machine) {.async.} = diff --git a/codex/utils/timer.nim b/codex/utils/timer.nim index 0a5a940a..5a9537cf 100644 --- a/codex/utils/timer.nim +++ b/codex/utils/timer.nim @@ -50,7 +50,6 @@ method start*( timer.callback = callback timer.interval = interval timer.loopFuture = timerLoop(timer) - asyncSpawn timer.loopFuture method stop*(timer: Timer) {.async, base.} = if timer.loopFuture != nil and not timer.loopFuture.finished: diff --git a/codex/utils/trackedfutures.nim b/codex/utils/trackedfutures.nim index eb3cc219..34007e08 100644 --- a/codex/utils/trackedfutures.nim +++ b/codex/utils/trackedfutures.nim @@ -5,9 +5,11 @@ import ../logutils {.push raises: [].} -type TrackedFutures* = ref object - futures: Table[uint, FutureBase] - cancelling: bool +type + TrackedFuture = Future[void].Raising([]) + TrackedFutures* = ref object + futures: Table[uint, TrackedFuture] + cancelling: bool logScope: topics = "trackable futures" @@ -15,15 +17,18 @@ logScope: proc len*(self: TrackedFutures): int = self.futures.len -proc removeFuture(self: TrackedFutures, future: FutureBase) = +proc removeFuture(self: TrackedFutures, future: TrackedFuture) = if not self.cancelling and not future.isNil: self.futures.del(future.id) -proc track*[T](self: TrackedFutures, fut: Future[T]) = +proc track*(self: TrackedFutures, fut: TrackedFuture) = if self.cancelling: return - self.futures[fut.id] = FutureBase(fut) + if fut.finished: + return + + self.futures[fut.id] = fut proc cb(udata: pointer) = self.removeFuture(fut) @@ -33,13 +38,8 @@ proc track*[T](self: TrackedFutures, fut: Future[T]) = proc cancelTracked*(self: TrackedFutures) {.async: (raises: []).} = self.cancelling = true - trace "cancelling tracked futures" - - var cancellations: seq[FutureBase] - for future in self.futures.values: - if not future.isNil and not future.finished: - cancellations.add future.cancelAndWait() - + trace "cancelling tracked futures", len = self.futures.len + let cancellations = self.futures.values.toSeq.mapIt(it.cancelAndWait()) await noCancel allFutures cancellations self.futures.clear() diff --git a/codex/validation.nim b/codex/validation.nim index 18a444a6..e6d74840 100644 --- a/codex/validation.nim +++ b/codex/validation.nim @@ -142,7 +142,6 @@ proc start*(validation: Validation) {.async.} = await validation.subscribeSlotFilled() await validation.restoreHistoricalState() validation.running = validation.run() - asyncSpawn validation.running proc stop*(validation: Validation) {.async.} = if not validation.running.isNil and not validation.running.finished: diff --git a/tests/asynctest.nim b/tests/asynctest.nim index 7c6a4afd..4db8277f 100644 --- a/tests/asynctest.nim +++ b/tests/asynctest.nim @@ -1,3 +1,3 @@ -import pkg/asynctest/chronos/unittest +import pkg/asynctest/chronos/unittest2 -export unittest +export unittest2 diff --git a/tests/codex/blockexchange/discovery/testdiscovery.nim b/tests/codex/blockexchange/discovery/testdiscovery.nim index 97a455e1..c54a1fff 100644 --- a/tests/codex/blockexchange/discovery/testdiscovery.nim +++ b/tests/codex/blockexchange/discovery/testdiscovery.nim @@ -84,12 +84,12 @@ asyncchecksuite "Block Advertising and Discovery": blockDiscovery.publishBlockProvideHandler = proc( d: MockDiscovery, cid: Cid - ): Future[void] {.async, gcsafe.} = + ): Future[void] {.async: (raises: [CancelledError]).} = return blockDiscovery.findBlockProvidersHandler = proc( d: MockDiscovery, cid: Cid - ): Future[seq[SignedPeerRecord]] {.async.} = + ): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} = await engine.resolveBlocks(blocks.filterIt(it.cid == cid)) await allFuturesThrowing(allFinished(pendingBlocks)) @@ -97,17 +97,17 @@ asyncchecksuite "Block Advertising and Discovery": await engine.stop() test "Should advertise trees": - let - cids = @[manifest.treeCid] - advertised = initTable.collect: - for cid in cids: - {cid: newFuture[void]()} + let cids = @[manifest.treeCid] + var advertised = initTable.collect: + for cid in cids: + {cid: newFuture[void]()} blockDiscovery.publishBlockProvideHandler = proc( d: MockDiscovery, cid: Cid - ) {.async.} = - if cid in advertised and not advertised[cid].finished(): - advertised[cid].complete() + ) {.async: (raises: [CancelledError]).} = + advertised.withValue(cid, fut): + if not fut[].finished: + fut[].complete() await engine.start() await allFuturesThrowing(allFinished(toSeq(advertised.values))) @@ -118,7 +118,7 @@ asyncchecksuite "Block Advertising and Discovery": blockDiscovery.publishBlockProvideHandler = proc( d: MockDiscovery, cid: Cid - ) {.async.} = + ) {.async: (raises: [CancelledError]).} = check: cid notin blockCids @@ -138,7 +138,7 @@ asyncchecksuite "Block Advertising and Discovery": blockDiscovery.findBlockProvidersHandler = proc( d: MockDiscovery, cid: Cid - ): Future[seq[SignedPeerRecord]] = + ): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} = check false await engine.start() @@ -221,17 +221,17 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": MockDiscovery(blockexc[1].engine.discovery.discovery).publishBlockProvideHandler = proc( d: MockDiscovery, cid: Cid - ): Future[void] {.async.} = + ) {.async: (raises: [CancelledError]).} = advertised[cid] = switch[1].peerInfo.signedPeerRecord MockDiscovery(blockexc[2].engine.discovery.discovery).publishBlockProvideHandler = proc( d: MockDiscovery, cid: Cid - ): Future[void] {.async.} = + ) {.async: (raises: [CancelledError]).} = advertised[cid] = switch[2].peerInfo.signedPeerRecord MockDiscovery(blockexc[3].engine.discovery.discovery).publishBlockProvideHandler = proc( d: MockDiscovery, cid: Cid - ): Future[void] {.async.} = + ) {.async: (raises: [CancelledError]).} = advertised[cid] = switch[3].peerInfo.signedPeerRecord discard blockexc[1].engine.pendingBlocks.getWantHandle(mBlocks[0].cid) @@ -266,23 +266,21 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": MockDiscovery(blockexc[0].engine.discovery.discovery).findBlockProvidersHandler = proc( d: MockDiscovery, cid: Cid - ): Future[seq[SignedPeerRecord]] {.async.} = - if cid in advertised: - result.add(advertised[cid]) + ): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} = + advertised.withValue(cid, val): + result.add(val[]) let futs = collect(newSeq): for m in mBlocks[0 .. 2]: blockexc[0].engine.requestBlock(m.cid) - await allFuturesThrowing( - switch.mapIt(it.start()) & blockexc.mapIt(it.engine.start()) - ) - .wait(10.seconds) + await allFuturesThrowing(switch.mapIt(it.start())).wait(10.seconds) + await allFuturesThrowing(blockexc.mapIt(it.engine.start())).wait(10.seconds) await allFutures(futs).wait(10.seconds) - await allFuturesThrowing(blockexc.mapIt(it.engine.stop()) & switch.mapIt(it.stop())) - .wait(10.seconds) + await allFuturesThrowing(blockexc.mapIt(it.engine.stop())).wait(10.seconds) + await allFuturesThrowing(switch.mapIt(it.stop())).wait(10.seconds) test "E2E - Should advertise and discover blocks with peers already connected": # Distribute the blocks amongst 1..3 @@ -292,17 +290,17 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": MockDiscovery(blockexc[1].engine.discovery.discovery).publishBlockProvideHandler = proc( d: MockDiscovery, cid: Cid - ): Future[void] {.async.} = + ) {.async: (raises: [CancelledError]).} = advertised[cid] = switch[1].peerInfo.signedPeerRecord MockDiscovery(blockexc[2].engine.discovery.discovery).publishBlockProvideHandler = proc( d: MockDiscovery, cid: Cid - ): Future[void] {.async.} = + ) {.async: (raises: [CancelledError]).} = advertised[cid] = switch[2].peerInfo.signedPeerRecord MockDiscovery(blockexc[3].engine.discovery.discovery).publishBlockProvideHandler = proc( d: MockDiscovery, cid: Cid - ): Future[void] {.async.} = + ) {.async: (raises: [CancelledError]).} = advertised[cid] = switch[3].peerInfo.signedPeerRecord discard blockexc[1].engine.pendingBlocks.getWantHandle(mBlocks[0].cid) @@ -337,18 +335,16 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": MockDiscovery(blockexc[0].engine.discovery.discovery).findBlockProvidersHandler = proc( d: MockDiscovery, cid: Cid - ): Future[seq[SignedPeerRecord]] {.async.} = - if cid in advertised: - return @[advertised[cid]] + ): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} = + advertised.withValue(cid, val): + return @[val[]] let futs = mBlocks[0 .. 2].mapIt(blockexc[0].engine.requestBlock(it.cid)) - await allFuturesThrowing( - switch.mapIt(it.start()) & blockexc.mapIt(it.engine.start()) - ) - .wait(10.seconds) + await allFuturesThrowing(switch.mapIt(it.start())).wait(10.seconds) + await allFuturesThrowing(blockexc.mapIt(it.engine.start())).wait(10.seconds) await allFutures(futs).wait(10.seconds) - await allFuturesThrowing(blockexc.mapIt(it.engine.stop()) & switch.mapIt(it.stop())) - .wait(10.seconds) + await allFuturesThrowing(blockexc.mapIt(it.engine.stop())).wait(10.seconds) + await allFuturesThrowing(switch.mapIt(it.stop())).wait(10.seconds) diff --git a/tests/codex/blockexchange/discovery/testdiscoveryengine.nim b/tests/codex/blockexchange/discovery/testdiscoveryengine.nim index 93704726..9efab1a6 100644 --- a/tests/codex/blockexchange/discovery/testdiscoveryengine.nim +++ b/tests/codex/blockexchange/discovery/testdiscoveryengine.nim @@ -68,7 +68,7 @@ asyncchecksuite "Test Discovery Engine": blockDiscovery.findBlockProvidersHandler = proc( d: MockDiscovery, cid: Cid - ): Future[seq[SignedPeerRecord]] {.async, gcsafe.} = + ): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} = pendingBlocks.resolve( blocks.filterIt(it.cid == cid).mapIt( BlockDelivery(blk: it, address: it.address) @@ -94,7 +94,7 @@ asyncchecksuite "Test Discovery Engine": blockDiscovery.findBlockProvidersHandler = proc( d: MockDiscovery, cid: Cid - ): Future[seq[SignedPeerRecord]] {.async, gcsafe.} = + ): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} = check cid == blocks[0].cid if not want.finished: want.complete() @@ -122,7 +122,7 @@ asyncchecksuite "Test Discovery Engine": var pendingCids = newSeq[Cid]() blockDiscovery.findBlockProvidersHandler = proc( d: MockDiscovery, cid: Cid - ): Future[seq[SignedPeerRecord]] {.async, gcsafe.} = + ): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} = check cid in pendingCids pendingCids.keepItIf(it != cid) check peerStore.len < minPeers @@ -159,12 +159,12 @@ asyncchecksuite "Test Discovery Engine": discoveryLoopSleep = 100.millis, concurrentDiscReqs = 2, ) - reqs = newFuture[void]() + reqs = Future[void].Raising([CancelledError]).init() count = 0 blockDiscovery.findBlockProvidersHandler = proc( d: MockDiscovery, cid: Cid - ): Future[seq[SignedPeerRecord]] {.gcsafe, async.} = + ): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} = check cid == blocks[0].cid if count > 0: check false diff --git a/tests/codex/blockexchange/engine/testadvertiser.nim b/tests/codex/blockexchange/engine/testadvertiser.nim index 157564d6..83a70f65 100644 --- a/tests/codex/blockexchange/engine/testadvertiser.nim +++ b/tests/codex/blockexchange/engine/testadvertiser.nim @@ -34,7 +34,7 @@ asyncchecksuite "Advertiser": advertised = newSeq[Cid]() blockDiscovery.publishBlockProvideHandler = proc( d: MockDiscovery, cid: Cid - ) {.async, gcsafe.} = + ) {.async: (raises: [CancelledError]), gcsafe.} = advertised.add(cid) advertiser = Advertiser.new(localStore, blockDiscovery) diff --git a/tests/codex/blockexchange/engine/testengine.nim b/tests/codex/blockexchange/engine/testengine.nim index cc5511e8..0541c119 100644 --- a/tests/codex/blockexchange/engine/testengine.nim +++ b/tests/codex/blockexchange/engine/testengine.nim @@ -22,7 +22,7 @@ import ../../examples const NopSendWantCancellationsProc = proc( id: PeerId, addresses: seq[BlockAddress] -) {.gcsafe, async.} = +) {.async: (raises: [CancelledError]).} = discard asyncchecksuite "NetworkStore engine basic": @@ -66,20 +66,17 @@ asyncchecksuite "NetworkStore engine basic": wantType: WantType = WantType.WantHave, full: bool = false, sendDontHave: bool = false, - ) {.gcsafe, async.} = + ) {.async: (raises: [CancelledError]).} = check addresses.mapIt($it.cidOrTreeCid).sorted == blocks.mapIt($it.cid).sorted done.complete() let network = BlockExcNetwork(request: BlockExcRequest(sendWantList: sendWantList)) - localStore = CacheStore.new(blocks.mapIt(it)) discovery = DiscoveryEngine.new( localStore, peerStore, network, blockDiscovery, pendingBlocks ) - advertiser = Advertiser.new(localStore, blockDiscovery) - engine = BlockExcEngine.new( localStore, wallet, network, discovery, advertiser, peerStore, pendingBlocks ) @@ -93,7 +90,9 @@ asyncchecksuite "NetworkStore engine basic": test "Should send account to new peers": let pricing = Pricing.example - proc sendAccount(peer: PeerId, account: Account) {.gcsafe, async.} = + proc sendAccount( + peer: PeerId, account: Account + ) {.async: (raises: [CancelledError]).} = check account.address == pricing.address done.complete() @@ -186,7 +185,9 @@ asyncchecksuite "NetworkStore engine handlers": done = newFuture[void]() wantList = makeWantList(blocks.mapIt(it.cid)) - proc sendPresence(peerId: PeerId, presence: seq[BlockPresence]) {.gcsafe, async.} = + proc sendPresence( + peerId: PeerId, presence: seq[BlockPresence] + ) {.async: (raises: [CancelledError]).} = check presence.mapIt(it.address) == wantList.entries.mapIt(it.address) done.complete() @@ -203,7 +204,9 @@ asyncchecksuite "NetworkStore engine handlers": done = newFuture[void]() wantList = makeWantList(blocks.mapIt(it.cid), sendDontHave = true) - proc sendPresence(peerId: PeerId, presence: seq[BlockPresence]) {.gcsafe, async.} = + proc sendPresence( + peerId: PeerId, presence: seq[BlockPresence] + ) {.async: (raises: [CancelledError]).} = check presence.mapIt(it.address) == wantList.entries.mapIt(it.address) for p in presence: check: @@ -222,7 +225,9 @@ asyncchecksuite "NetworkStore engine handlers": done = newFuture[void]() wantList = makeWantList(blocks.mapIt(it.cid), sendDontHave = true) - proc sendPresence(peerId: PeerId, presence: seq[BlockPresence]) {.gcsafe, async.} = + proc sendPresence( + peerId: PeerId, presence: seq[BlockPresence] + ) {.async: (raises: [CancelledError]).} = for p in presence: if p.address.cidOrTreeCid != blocks[0].cid and p.address.cidOrTreeCid != blocks[1].cid: @@ -266,19 +271,21 @@ asyncchecksuite "NetworkStore engine handlers": peerContext.account = account.some peerContext.blocks = blocks.mapIt( - (it.address, Presence(address: it.address, price: rand(uint16).u256)) + (it.address, Presence(address: it.address, price: rand(uint16).u256, have: true)) ).toTable engine.network = BlockExcNetwork( request: BlockExcRequest( - sendPayment: proc(receiver: PeerId, payment: SignedState) {.gcsafe, async.} = + sendPayment: proc( + receiver: PeerId, payment: SignedState + ) {.async: (raises: [CancelledError]).} = let - amount = blocks.mapIt(peerContext.blocks[it.address].price).foldl(a + b) - + amount = + blocks.mapIt(peerContext.blocks[it.address].catch.get.price).foldl(a + b) balances = !payment.state.outcome.balances(Asset) check receiver == peerId - check balances[account.address.toDestination] == amount + check balances[account.address.toDestination].catch.get == amount done.complete(), # Install NOP for want list cancellations so they don't cause a crash @@ -286,10 +293,12 @@ asyncchecksuite "NetworkStore engine handlers": ) ) + let requestedBlocks = blocks.mapIt(engine.pendingBlocks.getWantHandle(it.address)) await engine.blocksDeliveryHandler( peerId, blocks.mapIt(BlockDelivery(blk: it, address: it.address)) ) await done.wait(100.millis) + await allFuturesThrowing(requestedBlocks).wait(100.millis) test "Should handle block presence": var handles: @@ -303,7 +312,7 @@ asyncchecksuite "NetworkStore engine handlers": wantType: WantType = WantType.WantHave, full: bool = false, sendDontHave: bool = false, - ) {.gcsafe, async.} = + ) {.async: (raises: [CancelledError]).} = engine.pendingBlocks.resolve( blocks.filterIt(it.address in addresses).mapIt( BlockDelivery(blk: it, address: it.address) @@ -340,9 +349,9 @@ asyncchecksuite "NetworkStore engine handlers": proc sendWantCancellations( id: PeerId, addresses: seq[BlockAddress] - ) {.gcsafe, async.} = + ) {.async: (raises: [CancelledError]).} = for address in addresses: - cancellations[address].complete() + cancellations[address].catch.expect("address should exist").complete() engine.network = BlockExcNetwork( request: BlockExcRequest(sendWantCancellations: sendWantCancellations) @@ -416,7 +425,7 @@ asyncchecksuite "Block Download": wantType: WantType = WantType.WantHave, full: bool = false, sendDontHave: bool = false, - ) {.gcsafe, async.} = + ) {.async: (raises: [CancelledError]).} = check wantType == WantHave check not engine.pendingBlocks.isInFlight(address) check engine.pendingBlocks.retries(address) == retries @@ -433,7 +442,7 @@ asyncchecksuite "Block Download": discard (await pending).tryGet() test "Should retry block request": - let + var address = BlockAddress.init(blocks[0].cid) steps = newAsyncEvent() @@ -445,7 +454,7 @@ asyncchecksuite "Block Download": wantType: WantType = WantType.WantHave, full: bool = false, sendDontHave: bool = false, - ) {.gcsafe, async.} = + ) {.async: (raises: [CancelledError]).} = case wantType of WantHave: check engine.pendingBlocks.isInFlight(address) == false @@ -467,7 +476,7 @@ asyncchecksuite "Block Download": let pending = engine.requestBlock(address) await steps.wait() - # add blocks presence + # add blocks precense peerCtx.blocks = blocks.mapIt( (it.address, Presence(address: it.address, have: true, price: UInt256.example)) ).toTable @@ -493,7 +502,7 @@ asyncchecksuite "Block Download": wantType: WantType = WantType.WantHave, full: bool = false, sendDontHave: bool = false, - ) {.gcsafe, async.} = + ) {.async: (raises: [CancelledError]).} = done.complete() engine.pendingBlocks.blockRetries = 10 @@ -573,7 +582,7 @@ asyncchecksuite "Task Handler": test "Should send want-blocks in priority order": proc sendBlocksDelivery( id: PeerId, blocksDelivery: seq[BlockDelivery] - ) {.gcsafe, async.} = + ) {.async: (raises: [CancelledError]).} = check blocksDelivery.len == 2 check: blocksDelivery[1].address == blocks[0].address @@ -610,7 +619,7 @@ asyncchecksuite "Task Handler": test "Should set in-flight for outgoing blocks": proc sendBlocksDelivery( id: PeerId, blocksDelivery: seq[BlockDelivery] - ) {.gcsafe, async.} = + ) {.async: (raises: [CancelledError]).} = check peersCtx[0].peerWants[0].inFlight for blk in blocks: @@ -649,7 +658,9 @@ asyncchecksuite "Task Handler": let missing = @[Block.new("missing".toBytes).tryGet()] let price = (!engine.pricing).price - proc sendPresence(id: PeerId, presence: seq[BlockPresence]) {.gcsafe, async.} = + proc sendPresence( + id: PeerId, presence: seq[BlockPresence] + ) {.async: (raises: [CancelledError]).} = check presence.mapIt(!Presence.init(it)) == @[ Presence(address: present[0].address, have: true, price: price), diff --git a/tests/codex/blockexchange/engine/testpayments.nim b/tests/codex/blockexchange/engine/testpayments.nim index 24d5dab6..e93cc837 100644 --- a/tests/codex/blockexchange/engine/testpayments.nim +++ b/tests/codex/blockexchange/engine/testpayments.nim @@ -1,10 +1,10 @@ -import std/unittest +import pkg/unittest2 import pkg/codex/stores import ../../examples import ../../helpers -checksuite "engine payments": +suite "Engine payments": let address = EthAddress.example let amount = 42.u256 diff --git a/tests/codex/blockexchange/protobuf/testpayments.nim b/tests/codex/blockexchange/protobuf/testpayments.nim index d0773d70..3ada0105 100644 --- a/tests/codex/blockexchange/protobuf/testpayments.nim +++ b/tests/codex/blockexchange/protobuf/testpayments.nim @@ -6,7 +6,7 @@ import ../../../asynctest import ../../examples import ../../helpers -checksuite "account protobuf messages": +suite "account protobuf messages": let account = Account(address: EthAddress.example) let message = AccountMessage.init(account) @@ -21,7 +21,7 @@ checksuite "account protobuf messages": incorrect.address.del(0) check Account.init(incorrect).isNone -checksuite "channel update messages": +suite "channel update messages": let state = SignedState.example let update = StateChannelUpdate.init(state) diff --git a/tests/codex/blockexchange/protobuf/testpresence.nim b/tests/codex/blockexchange/protobuf/testpresence.nim index 7e3b94e6..dc048c59 100644 --- a/tests/codex/blockexchange/protobuf/testpresence.nim +++ b/tests/codex/blockexchange/protobuf/testpresence.nim @@ -6,7 +6,7 @@ import ../../../asynctest import ../../examples import ../../helpers -checksuite "block presence protobuf messages": +suite "block presence protobuf messages": let cid = Cid.example address = BlockAddress(leaf: false, cid: cid) diff --git a/tests/codex/blockexchange/testnetwork.nim b/tests/codex/blockexchange/testnetwork.nim index 0fae4ffe..b9a51c9d 100644 --- a/tests/codex/blockexchange/testnetwork.nim +++ b/tests/codex/blockexchange/testnetwork.nim @@ -26,7 +26,7 @@ asyncchecksuite "Network - Handlers": blocks: seq[bt.Block] done: Future[void] - proc getConn(): Future[Connection] {.async.} = + proc getConn(): Future[Connection] {.async: (raises: [CancelledError]).} = return Connection(buffer) setup: @@ -45,7 +45,7 @@ asyncchecksuite "Network - Handlers": discard await networkPeer.connect() test "Want List handler": - proc wantListHandler(peer: PeerId, wantList: WantList) {.gcsafe, async.} = + proc wantListHandler(peer: PeerId, wantList: WantList) {.async: (raises: []).} = # check that we got the correct amount of entries check wantList.entries.len == 4 @@ -72,7 +72,7 @@ asyncchecksuite "Network - Handlers": test "Blocks Handler": proc blocksDeliveryHandler( peer: PeerId, blocksDelivery: seq[BlockDelivery] - ) {.gcsafe, async.} = + ) {.async: (raises: []).} = check blocks == blocksDelivery.mapIt(it.blk) done.complete() @@ -85,7 +85,9 @@ asyncchecksuite "Network - Handlers": await done.wait(500.millis) test "Presence Handler": - proc presenceHandler(peer: PeerId, presence: seq[BlockPresence]) {.gcsafe, async.} = + proc presenceHandler( + peer: PeerId, presence: seq[BlockPresence] + ) {.async: (raises: []).} = for b in blocks: check: b.address in presence @@ -105,7 +107,7 @@ asyncchecksuite "Network - Handlers": test "Handles account messages": let account = Account(address: EthAddress.example) - proc handleAccount(peer: PeerId, received: Account) {.gcsafe, async.} = + proc handleAccount(peer: PeerId, received: Account) {.async: (raises: []).} = check received == account done.complete() @@ -119,7 +121,7 @@ asyncchecksuite "Network - Handlers": test "Handles payment messages": let payment = SignedState.example - proc handlePayment(peer: PeerId, received: SignedState) {.gcsafe, async.} = + proc handlePayment(peer: PeerId, received: SignedState) {.async: (raises: []).} = check received == payment done.complete() @@ -165,7 +167,7 @@ asyncchecksuite "Network - Senders": await allFuturesThrowing(switch1.stop(), switch2.stop()) test "Send want list": - proc wantListHandler(peer: PeerId, wantList: WantList) {.gcsafe, async.} = + proc wantListHandler(peer: PeerId, wantList: WantList) {.async: (raises: []).} = # check that we got the correct amount of entries check wantList.entries.len == 4 @@ -195,7 +197,7 @@ asyncchecksuite "Network - Senders": test "send blocks": proc blocksDeliveryHandler( peer: PeerId, blocksDelivery: seq[BlockDelivery] - ) {.gcsafe, async.} = + ) {.async: (raises: []).} = check blocks == blocksDelivery.mapIt(it.blk) done.complete() @@ -207,7 +209,9 @@ asyncchecksuite "Network - Senders": await done.wait(500.millis) test "send presence": - proc presenceHandler(peer: PeerId, precense: seq[BlockPresence]) {.gcsafe, async.} = + proc presenceHandler( + peer: PeerId, precense: seq[BlockPresence] + ) {.async: (raises: []).} = for b in blocks: check: b.address in precense @@ -226,7 +230,7 @@ asyncchecksuite "Network - Senders": test "send account": let account = Account(address: EthAddress.example) - proc handleAccount(peer: PeerId, received: Account) {.gcsafe, async.} = + proc handleAccount(peer: PeerId, received: Account) {.async: (raises: []).} = check received == account done.complete() @@ -238,7 +242,7 @@ asyncchecksuite "Network - Senders": test "send payment": let payment = SignedState.example - proc handlePayment(peer: PeerId, received: SignedState) {.gcsafe, async.} = + proc handlePayment(peer: PeerId, received: SignedState) {.async: (raises: []).} = check received == payment done.complete() @@ -276,7 +280,7 @@ asyncchecksuite "Network - Test Limits": let account = Account(address: EthAddress.example) network2.handlers.onAccount = proc( peer: PeerId, received: Account - ) {.gcsafe, async.} = + ) {.async: (raises: []).} = check false let fut = network1.send( diff --git a/tests/codex/blockexchange/testpeerctxstore.nim b/tests/codex/blockexchange/testpeerctxstore.nim index 6ea601d1..e2983d10 100644 --- a/tests/codex/blockexchange/testpeerctxstore.nim +++ b/tests/codex/blockexchange/testpeerctxstore.nim @@ -1,7 +1,7 @@ import std/sugar import std/sequtils -import std/unittest +import pkg/unittest2 import pkg/libp2p import pkg/codex/blockexchange/peers @@ -11,7 +11,7 @@ import pkg/codex/blockexchange/protobuf/presence import ../helpers import ../examples -checksuite "Peer Context Store": +suite "Peer Context Store": var store: PeerCtxStore peerCtx: BlockExcPeerCtx @@ -31,7 +31,7 @@ checksuite "Peer Context Store": test "Should get peer": check store.get(peerCtx.id) == peerCtx -checksuite "Peer Context Store Peer Selection": +suite "Peer Context Store Peer Selection": var store: PeerCtxStore peerCtxs: seq[BlockExcPeerCtx] diff --git a/tests/codex/blockexchange/testpendingblocks.nim b/tests/codex/blockexchange/testpendingblocks.nim index 29410db7..af1e6728 100644 --- a/tests/codex/blockexchange/testpendingblocks.nim +++ b/tests/codex/blockexchange/testpendingblocks.nim @@ -10,7 +10,7 @@ import pkg/codex/blockexchange import ../helpers import ../../asynctest -checksuite "Pending Blocks": +suite "Pending Blocks": test "Should add want handle": let pendingBlocks = PendingBlocksManager.new() diff --git a/tests/codex/helpers/mockchunker.nim b/tests/codex/helpers/mockchunker.nim index 0d38cf3b..eb51f7ca 100644 --- a/tests/codex/helpers/mockchunker.nim +++ b/tests/codex/helpers/mockchunker.nim @@ -21,7 +21,7 @@ proc new*( var consumed = 0 proc reader( data: ChunkBuffer, len: int - ): Future[int] {.async, gcsafe, raises: [Defect].} = + ): Future[int] {.gcsafe, async: (raises: [ChunkerError, CancelledError]).} = if consumed >= dataset.len: return 0 diff --git a/tests/codex/helpers/mockdiscovery.nim b/tests/codex/helpers/mockdiscovery.nim index 42ad76a9..4110c577 100644 --- a/tests/codex/helpers/mockdiscovery.nim +++ b/tests/codex/helpers/mockdiscovery.nim @@ -14,29 +14,42 @@ import pkg/codex/discovery import pkg/contractabi/address as ca type MockDiscovery* = ref object of Discovery - findBlockProvidersHandler*: - proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.gcsafe.} - publishBlockProvideHandler*: proc(d: MockDiscovery, cid: Cid): Future[void] {.gcsafe.} - findHostProvidersHandler*: - proc(d: MockDiscovery, host: ca.Address): Future[seq[SignedPeerRecord]] {.gcsafe.} - publishHostProvideHandler*: - proc(d: MockDiscovery, host: ca.Address): Future[void] {.gcsafe.} + findBlockProvidersHandler*: proc( + d: MockDiscovery, cid: Cid + ): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} + + publishBlockProvideHandler*: + proc(d: MockDiscovery, cid: Cid): Future[void] {.async: (raises: [CancelledError]).} + + findHostProvidersHandler*: proc( + d: MockDiscovery, host: ca.Address + ): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} + + publishHostProvideHandler*: proc(d: MockDiscovery, host: ca.Address): Future[void] {. + async: (raises: [CancelledError]) + .} proc new*(T: type MockDiscovery): MockDiscovery = MockDiscovery() -proc findPeer*(d: Discovery, peerId: PeerId): Future[?PeerRecord] {.async.} = +proc findPeer*( + d: Discovery, peerId: PeerId +): Future[?PeerRecord] {.async: (raises: [CancelledError]).} = ## mock find a peer - always return none - ## + ## return none(PeerRecord) -method find*(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async.} = +method find*( + d: MockDiscovery, cid: Cid +): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} = if isNil(d.findBlockProvidersHandler): return return await d.findBlockProvidersHandler(d, cid) -method provide*(d: MockDiscovery, cid: Cid): Future[void] {.async.} = +method provide*( + d: MockDiscovery, cid: Cid +): Future[void] {.async: (raises: [CancelledError]).} = if isNil(d.publishBlockProvideHandler): return @@ -44,13 +57,15 @@ method provide*(d: MockDiscovery, cid: Cid): Future[void] {.async.} = method find*( d: MockDiscovery, host: ca.Address -): Future[seq[SignedPeerRecord]] {.async.} = +): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} = if isNil(d.findHostProvidersHandler): return return await d.findHostProvidersHandler(d, host) -method provide*(d: MockDiscovery, host: ca.Address): Future[void] {.async.} = +method provide*( + d: MockDiscovery, host: ca.Address +): Future[void] {.async: (raises: [CancelledError]).} = if isNil(d.publishHostProvideHandler): return diff --git a/tests/codex/helpers/randomchunker.nim b/tests/codex/helpers/randomchunker.nim index b482f67f..cf857595 100644 --- a/tests/codex/helpers/randomchunker.nim +++ b/tests/codex/helpers/randomchunker.nim @@ -26,7 +26,7 @@ proc new*( var consumed = 0 proc reader( data: ChunkBuffer, len: int - ): Future[int] {.async, gcsafe, raises: [Defect].} = + ): Future[int] {.async: (raises: [ChunkerError, CancelledError]), gcsafe.} = var alpha = toSeq(byte('A') .. byte('z')) if consumed >= size: diff --git a/tests/codex/merkletree/generictreetests.nim b/tests/codex/merkletree/generictreetests.nim index 0e1f7c9f..6244bc1c 100644 --- a/tests/codex/merkletree/generictreetests.nim +++ b/tests/codex/merkletree/generictreetests.nim @@ -1,4 +1,4 @@ -import std/unittest +import pkg/unittest2 import pkg/codex/merkletree diff --git a/tests/codex/merkletree/testcodexcoders.nim b/tests/codex/merkletree/testcodexcoders.nim index d9544083..6da56844 100644 --- a/tests/codex/merkletree/testcodexcoders.nim +++ b/tests/codex/merkletree/testcodexcoders.nim @@ -1,4 +1,4 @@ -import std/unittest +import pkg/unittest2 import pkg/questionable/results import pkg/stew/byteutils @@ -18,7 +18,7 @@ const data = [ "00000000000000000000000000000009".toBytes, "00000000000000000000000000000010".toBytes, ] -checksuite "merkletree - coders": +suite "merkletree - coders": test "encoding and decoding a tree yields the same tree": let tree = CodexTree.init(Sha256HashCodec, data).tryGet() diff --git a/tests/codex/merkletree/testcodextree.nim b/tests/codex/merkletree/testcodextree.nim index c4713d40..29390c16 100644 --- a/tests/codex/merkletree/testcodextree.nim +++ b/tests/codex/merkletree/testcodextree.nim @@ -1,6 +1,6 @@ -import std/unittest import std/sequtils +import pkg/unittest2 import pkg/questionable/results import pkg/stew/byteutils import pkg/libp2p diff --git a/tests/codex/merkletree/testmerkledigest.nim b/tests/codex/merkletree/testmerkledigest.nim index ccb138da..4cc2d197 100644 --- a/tests/codex/merkletree/testmerkledigest.nim +++ b/tests/codex/merkletree/testmerkledigest.nim @@ -1,7 +1,7 @@ -import std/unittest import std/sequtils import std/random +import pkg/unittest2 import pkg/poseidon2 import pkg/poseidon2/sponge diff --git a/tests/codex/merkletree/testposeidon2tree.nim b/tests/codex/merkletree/testposeidon2tree.nim index f60fdb39..e12751b7 100644 --- a/tests/codex/merkletree/testposeidon2tree.nim +++ b/tests/codex/merkletree/testposeidon2tree.nim @@ -1,6 +1,6 @@ -import std/unittest import std/sequtils +import pkg/unittest2 import pkg/poseidon2 import pkg/poseidon2/io import pkg/questionable/results diff --git a/tests/codex/sales/states/testdownloading.nim b/tests/codex/sales/states/testdownloading.nim index 3df45749..71376fc8 100644 --- a/tests/codex/sales/states/testdownloading.nim +++ b/tests/codex/sales/states/testdownloading.nim @@ -1,4 +1,4 @@ -import std/unittest +import pkg/unittest2 import pkg/questionable import pkg/codex/contracts/requests import pkg/codex/sales/states/cancelled @@ -8,7 +8,7 @@ import pkg/codex/sales/states/filled import ../../examples import ../../helpers -checksuite "sales state 'downloading'": +suite "sales state 'downloading'": let request = StorageRequest.example let slotIndex = request.ask.slots div 2 var state: SaleDownloading diff --git a/tests/codex/sales/states/testfilled.nim b/tests/codex/sales/states/testfilled.nim index 04ff26db..f077b780 100644 --- a/tests/codex/sales/states/testfilled.nim +++ b/tests/codex/sales/states/testfilled.nim @@ -14,7 +14,7 @@ import ../../helpers/mockmarket import ../../examples import ../../helpers -checksuite "sales state 'filled'": +suite "sales state 'filled'": let request = StorageRequest.example let slotIndex = request.ask.slots div 2 diff --git a/tests/codex/sales/states/testfilling.nim b/tests/codex/sales/states/testfilling.nim index ce1d32f2..1a26753d 100644 --- a/tests/codex/sales/states/testfilling.nim +++ b/tests/codex/sales/states/testfilling.nim @@ -1,4 +1,4 @@ -import std/unittest +import pkg/unittest2 import pkg/questionable import pkg/codex/contracts/requests import pkg/codex/sales/states/filling @@ -7,7 +7,7 @@ import pkg/codex/sales/states/failed import ../../examples import ../../helpers -checksuite "sales state 'filling'": +suite "sales state 'filling'": let request = StorageRequest.example let slotIndex = request.ask.slots div 2 var state: SaleFilling diff --git a/tests/codex/sales/states/testunknown.nim b/tests/codex/sales/states/testunknown.nim index 5e9f81f9..98b23224 100644 --- a/tests/codex/sales/states/testunknown.nim +++ b/tests/codex/sales/states/testunknown.nim @@ -14,7 +14,7 @@ import ../../helpers/mockmarket import ../../examples import ../../helpers -checksuite "sales state 'unknown'": +suite "sales state 'unknown'": let request = StorageRequest.example let slotIndex = request.ask.slots div 2 let slotId = slotId(request.id, slotIndex) diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index e92f9607..74ea8a2b 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -236,10 +236,17 @@ asyncchecksuite "Sales": return true proc addRequestToSaturatedQueue(): Future[StorageRequest] {.async.} = - queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = - await sleepAsync(10.millis) - itemsProcessed.add item - done.complete() + queue.onProcessSlot = proc( + item: SlotQueueItem, done: Future[void] + ) {.async: (raises: []).} = + try: + await sleepAsync(10.millis) + itemsProcessed.add item + except CancelledError as exc: + checkpoint(exc.msg) + finally: + if not done.finished: + done.complete() var request1 = StorageRequest.example request1.ask.collateralPerByte = request.ask.collateralPerByte + 1 @@ -261,9 +268,12 @@ asyncchecksuite "Sales": waitFor run() test "processes all request's slots once StorageRequested emitted": - queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = + queue.onProcessSlot = proc( + item: SlotQueueItem, done: Future[void] + ) {.async: (raises: []).} = itemsProcessed.add item - done.complete() + if not done.finished: + done.complete() createAvailability() await market.requestStorage(request) let items = SlotQueueItem.init(request, collateral = request.ask.collateralPerSlot) @@ -299,9 +309,12 @@ asyncchecksuite "Sales": check always (not itemsProcessed.contains(expected)) test "adds slot index to slot queue once SlotFreed emitted": - queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = + queue.onProcessSlot = proc( + item: SlotQueueItem, done: Future[void] + ) {.async: (raises: []).} = itemsProcessed.add item - done.complete() + if not done.finished: + done.complete() createAvailability() market.requested.add request # "contract" must be able to return request diff --git a/tests/codex/sales/testslotqueue.nim b/tests/codex/sales/testslotqueue.nim index 03c658be..7abad7eb 100644 --- a/tests/codex/sales/testslotqueue.nim +++ b/tests/codex/sales/testslotqueue.nim @@ -50,12 +50,19 @@ suite "Slot queue start/stop": suite "Slot queue workers": var queue: SlotQueue - proc onProcessSlot(item: SlotQueueItem, doneProcessing: Future[void]) {.async.} = - await sleepAsync(1000.millis) + proc onProcessSlot( + item: SlotQueueItem, doneProcessing: Future[void] + ) {.async: (raises: []).} = # this is not illustrative of the realistic scenario as the # `doneProcessing` future would be passed to another context before being # completed and therefore is not as simple as making the callback async - doneProcessing.complete() + try: + await sleepAsync(1000.millis) + except CatchableError as exc: + checkpoint(exc.msg) + finally: + if not doneProcessing.finished: + doneProcessing.complete() setup: let request = StorageRequest.example @@ -89,9 +96,14 @@ suite "Slot queue workers": check eventually queue.activeWorkers == 3 test "discards workers once processing completed": - proc processSlot(item: SlotQueueItem, done: Future[void]) {.async.} = - await sleepAsync(1.millis) - done.complete() + proc processSlot(item: SlotQueueItem, done: Future[void]) {.async: (raises: []).} = + try: + await sleepAsync(1.millis) + except CatchableError as exc: + checkpoint(exc.msg) + finally: + if not done.finished: + done.complete() queue.onProcessSlot = processSlot @@ -114,11 +126,19 @@ suite "Slot queue": proc newSlotQueue(maxSize, maxWorkers: int, processSlotDelay = 1.millis) = queue = SlotQueue.new(maxWorkers, maxSize.uint16) - queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = - await sleepAsync(processSlotDelay) - onProcessSlotCalled = true - onProcessSlotCalledWith.add (item.requestId, item.slotIndex) - done.complete() + queue.onProcessSlot = proc( + item: SlotQueueItem, done: Future[void] + ) {.async: (raises: []).} = + try: + await sleepAsync(processSlotDelay) + except CatchableError as exc: + checkpoint(exc.msg) + finally: + onProcessSlotCalled = true + onProcessSlotCalledWith.add (item.requestId, item.slotIndex) + if not done.finished: + done.complete() + queue.start() setup: diff --git a/tests/codex/slots/testslotbuilder.nim b/tests/codex/slots/testslotbuilder.nim index ef83bdee..9a2043a8 100644 --- a/tests/codex/slots/testslotbuilder.nim +++ b/tests/codex/slots/testslotbuilder.nim @@ -133,7 +133,7 @@ suite "Slot builder": check: Poseidon2Builder.new(localStore, mismatchManifest, cellSize = cellSize).error.msg == - "Number of blocks must be divisable by number of slots." + "Number of blocks must be divisible by number of slots." test "Block size must be divisable by cell size": let mismatchManifest = Manifest.new( @@ -151,7 +151,7 @@ suite "Slot builder": check: Poseidon2Builder.new(localStore, mismatchManifest, cellSize = cellSize).error.msg == - "Block size must be divisable by cell size." + "Block size must be divisible by cell size." test "Should build correct slot builder": builder = diff --git a/tests/codex/stores/repostore/testcoders.nim b/tests/codex/stores/repostore/testcoders.nim index f4d2b5e7..9d341af0 100644 --- a/tests/codex/stores/repostore/testcoders.nim +++ b/tests/codex/stores/repostore/testcoders.nim @@ -1,6 +1,6 @@ -import std/unittest import std/random +import pkg/unittest2 import pkg/stew/objects import pkg/questionable import pkg/questionable/results @@ -11,7 +11,7 @@ import pkg/codex/stores/repostore/coders import ../../helpers -checksuite "Test coders": +suite "Test coders": proc rand(T: type NBytes): T = rand(Natural).NBytes diff --git a/tests/codex/stores/testcachestore.nim b/tests/codex/stores/testcachestore.nim index e7025388..03075e1a 100644 --- a/tests/codex/stores/testcachestore.nim +++ b/tests/codex/stores/testcachestore.nim @@ -11,7 +11,7 @@ import ./commonstoretests import ../../asynctest import ../helpers -checksuite "Cache Store": +suite "Cache Store": var newBlock, newBlock1, newBlock2, newBlock3: Block store: CacheStore diff --git a/tests/codex/stores/testkeyutils.nim b/tests/codex/stores/testkeyutils.nim index 238e2681..86365c5c 100644 --- a/tests/codex/stores/testkeyutils.nim +++ b/tests/codex/stores/testkeyutils.nim @@ -36,7 +36,7 @@ proc createManifestCid(): ?!Cid = let cid = ?Cid.init(version, codec, hash).mapFailure return success cid -checksuite "KeyUtils": +suite "KeyUtils": test "makePrefixKey should create block key": let length = 6 let cid = Cid.example diff --git a/tests/codex/stores/testmaintenance.nim b/tests/codex/stores/testmaintenance.nim index e5ff519e..89e75700 100644 --- a/tests/codex/stores/testmaintenance.nim +++ b/tests/codex/stores/testmaintenance.nim @@ -21,7 +21,7 @@ import ../examples import codex/stores/maintenance -checksuite "BlockMaintainer": +suite "BlockMaintainer": var mockRepoStore: MockRepoStore var interval: Duration var mockTimer: MockTimer diff --git a/tests/codex/stores/testrepostore.nim b/tests/codex/stores/testrepostore.nim index 0279b56f..5274d046 100644 --- a/tests/codex/stores/testrepostore.nim +++ b/tests/codex/stores/testrepostore.nim @@ -24,7 +24,7 @@ import ../helpers/mockclock import ../examples import ./commonstoretests -checksuite "Test RepoStore start/stop": +suite "Test RepoStore start/stop": var repoDs: Datastore metaDs: Datastore diff --git a/tests/codex/testasyncheapqueue.nim b/tests/codex/testasyncheapqueue.nim index a9c6769b..2d2cfb0c 100644 --- a/tests/codex/testasyncheapqueue.nim +++ b/tests/codex/testasyncheapqueue.nim @@ -22,7 +22,7 @@ proc toSortedSeq[T](h: AsyncHeapQueue[T], queueType = QueueType.Min): seq[T] = while tmp.len > 0: result.add(popNoWait(tmp).tryGet()) -checksuite "Synchronous tests": +suite "Synchronous tests": test "Test pushNoWait - Min": var heap = newAsyncHeapQueue[int]() let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] diff --git a/tests/codex/testchunking.nim b/tests/codex/testchunking.nim index 2241a82b..44202c40 100644 --- a/tests/codex/testchunking.nim +++ b/tests/codex/testchunking.nim @@ -27,7 +27,7 @@ asyncchecksuite "Chunking": let contents = [1.byte, 2, 3, 4, 5, 6, 7, 8, 9, 0] proc reader( data: ChunkBuffer, len: int - ): Future[int] {.gcsafe, async, raises: [Defect].} = + ): Future[int] {.gcsafe, async: (raises: [ChunkerError, CancelledError]).} = let read = min(contents.len - offset, len) if read == 0: return 0 @@ -97,8 +97,13 @@ asyncchecksuite "Chunking": discard (await chunker.getBytes()) test "stream should forward LPStreamError": - expect LPStreamError: + try: await raiseStreamException(newException(LPStreamError, "test error")) + except ChunkerError as exc: + check exc.parent of LPStreamError + except CatchableError as exc: + checkpoint("Unexpected error: " & exc.msg) + fail() test "stream should catch LPStreamEOFError": await raiseStreamException(newException(LPStreamEOFError, "test error")) @@ -106,7 +111,3 @@ asyncchecksuite "Chunking": test "stream should forward CancelledError": expect CancelledError: await raiseStreamException(newException(CancelledError, "test error")) - - test "stream should forward LPStreamError": - expect LPStreamError: - await raiseStreamException(newException(LPStreamError, "test error")) diff --git a/tests/codex/testclock.nim b/tests/codex/testclock.nim index 2b0158cf..967de672 100644 --- a/tests/codex/testclock.nim +++ b/tests/codex/testclock.nim @@ -1,9 +1,9 @@ -import std/unittest +import pkg/unittest2 import codex/clock import ./helpers -checksuite "Clock": +suite "Clock": proc testConversion(seconds: SecondsSince1970) = let asBytes = seconds.toBytes diff --git a/tests/codex/testlogutils.nim b/tests/codex/testlogutils.nim index b2694ee9..2077fb81 100644 --- a/tests/codex/testlogutils.nim +++ b/tests/codex/testlogutils.nim @@ -1,6 +1,7 @@ import std/options import std/strutils -import std/unittest + +import pkg/unittest2 import pkg/codex/blocktype import pkg/codex/conf import pkg/codex/contracts/requests diff --git a/tests/codex/testmanifest.nim b/tests/codex/testmanifest.nim index 241bec61..ea9465d5 100644 --- a/tests/codex/testmanifest.nim +++ b/tests/codex/testmanifest.nim @@ -13,7 +13,7 @@ import ../asynctest import ./helpers import ./examples -checksuite "Manifest": +suite "Manifest": let manifest = Manifest.new(treeCid = Cid.example, blockSize = 1.MiBs, datasetSize = 100.MiBs) diff --git a/tests/codex/testpurchasing.nim b/tests/codex/testpurchasing.nim index 5a4e85e9..1834ee03 100644 --- a/tests/codex/testpurchasing.nim +++ b/tests/codex/testpurchasing.nim @@ -116,7 +116,7 @@ asyncchecksuite "Purchasing": await purchase.wait() check market.withdrawn == @[request.id] -checksuite "Purchasing state machine": +suite "Purchasing state machine": var purchasing: Purchasing var market: MockMarket var clock: MockClock diff --git a/tests/codex/testsystemclock.nim b/tests/codex/testsystemclock.nim index 6f743283..3f179260 100644 --- a/tests/codex/testsystemclock.nim +++ b/tests/codex/testsystemclock.nim @@ -1,10 +1,10 @@ import std/times -import std/unittest -import codex/systemclock +import pkg/unittest2 +import pkg/codex/systemclock import ./helpers -checksuite "SystemClock": +suite "SystemClock": test "Should get now": let clock = SystemClock.new() diff --git a/tests/codex/utils/testiter.nim b/tests/codex/utils/testiter.nim index 801e1937..ec19c484 100644 --- a/tests/codex/utils/testiter.nim +++ b/tests/codex/utils/testiter.nim @@ -7,7 +7,7 @@ import pkg/codex/utils/iter import ../../asynctest import ../helpers -checksuite "Test Iter": +suite "Test Iter": test "Should be finished": let iter = Iter[int].empty() diff --git a/tests/codex/utils/testkeyutils.nim b/tests/codex/utils/testkeyutils.nim index 2124e682..104258f3 100644 --- a/tests/codex/utils/testkeyutils.nim +++ b/tests/codex/utils/testkeyutils.nim @@ -1,12 +1,14 @@ -import std/unittest import std/os -import codex/utils/keyutils + +import pkg/unittest2 +import pkg/codex/utils/keyutils + import ../helpers when defined(windows): import stew/windows/acl -checksuite "keyutils": +suite "keyutils": let path = getTempDir() / "CodexTest" setup: diff --git a/tests/codex/utils/testoptions.nim b/tests/codex/utils/testoptions.nim index 05f7509e..650715bc 100644 --- a/tests/codex/utils/testoptions.nim +++ b/tests/codex/utils/testoptions.nim @@ -1,8 +1,9 @@ -import std/unittest -import codex/utils/options +import pkg/unittest2 +import pkg/codex/utils/options + import ../helpers -checksuite "optional casts": +suite "optional casts": test "casting value to same type works": check 42 as int == some 42 @@ -31,7 +32,7 @@ checksuite "optional casts": check 42.some as string == string.none check int.none as int == int.none -checksuite "Optionalize": +suite "Optionalize": test "does not except non-object types": static: doAssert not compiles(Optionalize(int)) diff --git a/tests/codex/utils/testtrackedfutures.nim b/tests/codex/utils/testtrackedfutures.nim index 35074919..993d5b43 100644 --- a/tests/codex/utils/testtrackedfutures.nim +++ b/tests/codex/utils/testtrackedfutures.nim @@ -17,47 +17,71 @@ asyncchecksuite "tracked futures": check module.trackedFutures.len == 0 test "tracks unfinished futures": - let fut = newFuture[void]("test") + let fut = Future[void].Raising([]).init("test", {FutureFlag.OwnCancelSchedule}) module.trackedFutures.track(fut) check module.trackedFutures.len == 1 test "does not track completed futures": - let fut = newFuture[void]("test") + let fut = Future[void].Raising([]).init("test", {FutureFlag.OwnCancelSchedule}) fut.complete() module.trackedFutures.track(fut) - check eventually module.trackedFutures.len == 0 - - test "does not track failed futures": - let fut = newFuture[void]("test") - fut.fail((ref CatchableError)(msg: "some error")) - module.trackedFutures.track(fut) - check eventually module.trackedFutures.len == 0 + check module.trackedFutures.len == 0 test "does not track cancelled futures": - let fut = newFuture[void]("test") + let fut = Future[void].Raising([]).init("test", {FutureFlag.OwnCancelSchedule}) + fut.cancelCallback = proc(data: pointer) = + fut.cancelAndSchedule() # manually schedule the cancel + await fut.cancelAndWait() module.trackedFutures.track(fut) check eventually module.trackedFutures.len == 0 test "removes tracked future when finished": - let fut = newFuture[void]("test") + let fut = Future[void].Raising([]).init("test", {FutureFlag.OwnCancelSchedule}) module.trackedFutures.track(fut) + check module.trackedFutures.len == 1 fut.complete() check eventually module.trackedFutures.len == 0 test "removes tracked future when cancelled": - let fut = newFuture[void]("test") + let fut = Future[void].Raising([]).init("test", {FutureFlag.OwnCancelSchedule}) + fut.cancelCallback = proc(data: pointer) = + fut.cancelAndSchedule() # manually schedule the cancel + module.trackedFutures.track(fut) + check module.trackedFutures.len == 1 + await fut.cancelAndWait() + check eventually module.trackedFutures.len == 0 + + test "completed and removes future on cancel": + let fut = Future[void].Raising([]).init("test", {FutureFlag.OwnCancelSchedule}) + fut.cancelCallback = proc(data: pointer) = + fut.complete() + + module.trackedFutures.track(fut) + check module.trackedFutures.len == 1 await fut.cancelAndWait() check eventually module.trackedFutures.len == 0 test "cancels and removes all tracked futures": - let fut1 = newFuture[void]("test1") - let fut2 = newFuture[void]("test2") - let fut3 = newFuture[void]("test3") + let fut1 = Future[void].Raising([]).init("test1", {FutureFlag.OwnCancelSchedule}) + fut1.cancelCallback = proc(data: pointer) = + fut1.cancelAndSchedule() # manually schedule the cancel + + let fut2 = Future[void].Raising([]).init("test2", {FutureFlag.OwnCancelSchedule}) + fut2.cancelCallback = proc(data: pointer) = + fut2.cancelAndSchedule() # manually schedule the cancel + + let fut3 = Future[void].Raising([]).init("test3", {FutureFlag.OwnCancelSchedule}) + fut3.cancelCallback = proc(data: pointer) = + fut3.cancelAndSchedule() # manually schedule the cancel + module.trackedFutures.track(fut1) + check module.trackedFutures.len == 1 module.trackedFutures.track(fut2) + check module.trackedFutures.len == 2 module.trackedFutures.track(fut3) + check module.trackedFutures.len == 3 await module.trackedFutures.cancelTracked() check eventually fut1.cancelled check eventually fut2.cancelled diff --git a/tests/codex/utils/testutils.nim b/tests/codex/utils/testutils.nim index 92c883be..b0bb20b5 100644 --- a/tests/codex/utils/testutils.nim +++ b/tests/codex/utils/testutils.nim @@ -1,4 +1,4 @@ -import std/unittest +import pkg/unittest2 import pkg/codex/utils diff --git a/tests/helpers.nim b/tests/helpers.nim index a6a6ff44..82b544f1 100644 --- a/tests/helpers.nim +++ b/tests/helpers.nim @@ -2,4 +2,36 @@ import helpers/multisetup import helpers/trackers import helpers/templeveldb +import std/sequtils, chronos + export multisetup, trackers, templeveldb + +### taken from libp2p errorhelpers.nim +proc allFuturesThrowing*(args: varargs[FutureBase]): Future[void] = + # This proc is only meant for use in tests / not suitable for general use. + # - Swallowing errors arbitrarily instead of aggregating them is bad design + # - It raises `CatchableError` instead of the union of the `futs` errors, + # inflating the caller's `raises` list unnecessarily. `macro` could fix it + let futs = @args + ( + proc() {.async: (raises: [CatchableError]).} = + await allFutures(futs) + var firstErr: ref CatchableError + for fut in futs: + if fut.failed: + let err = fut.error() + if err of CancelledError: + raise err + if firstErr == nil: + firstErr = err + if firstErr != nil: + raise firstErr + )() + +proc allFuturesThrowing*[T](futs: varargs[Future[T]]): Future[void] = + allFuturesThrowing(futs.mapIt(FutureBase(it))) + +proc allFuturesThrowing*[T, E]( # https://github.com/nim-lang/Nim/issues/23432 + futs: varargs[InternalRaisesFuture[T, E]] +): Future[void] = + allFuturesThrowing(futs.mapIt(FutureBase(it))) diff --git a/tests/helpers/trackers.nim b/tests/helpers/trackers.nim index ed8c5692..898053c2 100644 --- a/tests/helpers/trackers.nim +++ b/tests/helpers/trackers.nim @@ -1,5 +1,5 @@ import pkg/codex/streams/storestream -import std/unittest +import pkg/unittest2 # From lip2p/tests/helpers const trackerNames = [StoreStreamTrackerName] diff --git a/vendor/nim-serde b/vendor/nim-serde index c82e85c6..5ced7c88 160000 --- a/vendor/nim-serde +++ b/vendor/nim-serde @@ -1 +1 @@ -Subproject commit c82e85c62436218592fbe876df5ac389ef8b964b +Subproject commit 5ced7c88b97d99c582285ce796957fb71fd42434