From db279d8fa9c67297e729189e9d13d8bf70e0a196 Mon Sep 17 00:00:00 2001 From: gmega Date: Mon, 30 Jun 2025 17:30:47 -0300 Subject: [PATCH] feat: remove quadratic joins in cancelBlocks; use SafeAsyncIterator for getBlocks; limit memory usage for fetchBatched when used as prefetcher --- codex/blockexchange/engine/engine.nim | 69 ++++++++++++++++--------- codex/node.nim | 38 ++++++++++---- codex/stores/blockstore.nim | 8 +++ codex/stores/networkstore.nim | 17 +++--- codex/stores/repostore/store.nim | 15 ++++++ tests/codex/utils/testsafeasynciter.nim | 4 +- 6 files changed, 108 insertions(+), 43 deletions(-) diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 2550c364..bb46b62f 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -256,7 +256,7 @@ proc downloadInternal( proc requestBlocks*( self: BlockExcEngine, addresses: seq[BlockAddress] -): Future[seq[?!Block]] {.async: (raises: [CancelledError]).} = +): SafeAsyncIter[Block] = var handles: seq[BlockHandle] # Adds all blocks to pendingBlocks before calling the first downloadInternal. This will # ensure that we don't send incomplete want lists. @@ -267,20 +267,27 @@ proc requestBlocks*( for address in addresses: self.trackedFutures.track(self.downloadInternal(address)) - # TODO: we can reduce latency and improve download times - # by returning blocks out of order as futures complete. - var blocks: seq[?!Block] - for handle in handles: - try: - blocks.add(success await handle) - except CancelledError as err: - warn "Block request cancelled", addresses, err = err.msg - raise err - except CatchableError as err: - error "Error getting blocks from exchange engine", addresses, err = err.msg - blocks.add(Block.failure err) + var completed: int = 0 - return blocks + proc isFinished(): bool = + completed == handles.len + + proc genNext(): Future[?!Block] {.async: (raises: [CancelledError]).} = + # Be it success or failure, we're completing this future. + let value = + try: + success await handles[completed] + except CancelledError as err: + warn "Block request cancelled", addresses, err = err.msg + raise err + except CatchableError as err: + error "Error getting blocks from exchange engine", addresses, err = err.msg + failure err + + inc(completed) + return value + + return SafeAsyncIter[Block].new(genNext, isFinished) proc requestBlock*( self: BlockExcEngine, address: BlockAddress @@ -368,28 +375,42 @@ proc cancelBlocks( ## Tells neighboring peers that we're no longer interested in a block. ## + let addrSet = toHashSet(addrs) + var pendingCancellations: Table[PeerId, HashSet[BlockAddress]] + if self.peers.len == 0: return trace "Sending block request cancellations to peers", addrs, peers = self.peers.peerIds - proc processPeer(peerCtx: BlockExcPeerCtx): Future[BlockExcPeerCtx] {.async.} = + proc processPeer( + entry: tuple[peerId: PeerId, addresses: HashSet[BlockAddress]] + ): Future[PeerId] {.async: (raises: [CancelledError]).} = await self.network.request.sendWantCancellations( - peer = peerCtx.id, addresses = addrs.filterIt(it in peerCtx) + peer = entry.peerId, addresses = entry.addresses.toSeq ) - return peerCtx + return entry.peerId try: - let (succeededFuts, failedFuts) = await allFinishedFailed[BlockExcPeerCtx]( - toSeq(self.peers.peers.values).filterIt(it.peerHave.anyIt(it in addrs)).map( - processPeer - ) + # Does the peer have any of the blocks we're canceling? + for peerCtx in self.peers.peers.values: + let intersection = peerCtx.peerHave.intersection(addrSet) + if intersection.len > 0: + pendingCancellations[peerCtx.id] = intersection + + # If so, dispatches cancellations. + # FIXME: we're still spamming peers - the fact that the peer has the block does + # not mean we've requested it. + let (succeededFuts, failedFuts) = await allFinishedFailed[PeerId]( + toSeq(pendingCancellations.pairs).map(processPeer) ) - (await allFinished(succeededFuts)).mapIt(it.read).apply do(peerCtx: BlockExcPeerCtx): - peerCtx.cleanPresence(addrs) + (await allFinished(succeededFuts)).mapIt(it.read).apply do(peerId: PeerId): + let ctx = self.peers.get(peerId) + if not ctx.isNil: + ctx.cleanPresence(addrs) if failedFuts.len > 0: warn "Failed to send block request cancellations to peers", peers = failedFuts.len @@ -539,6 +560,8 @@ proc wantListHandler*( price = @(self.pricing.get(Pricing(price: 0.u256)).price.toBytesBE) if e.cancel: + # This is sort of expected if we sent the block to the peer, as we have removed + # it from the peer's wantlist ourselves. trace "Received cancelation for untracked block, skipping", address = e.address continue diff --git a/codex/node.nim b/codex/node.nim index ea173ad1..76029032 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -44,7 +44,7 @@ import ./indexingstrategy import ./utils import ./errors import ./logutils -import ./utils/asynciter +import ./utils/safeasynciter import ./utils/trackedfutures export logutils @@ -194,20 +194,38 @@ proc fetchBatched*( if not (await address in self.networkStore) or fetchLocal: address - let - blockResults = await self.networkStore.getBlocks(addresses) - blocks = blockResults.filterIt(it.isSuccess()).mapIt(it.value) - numOfFailedBlocks = blockResults.len - blocks.len + proc successful( + blk: ?!bt.Block + ): Future[bool] {.async: (raises: [CancelledError]).} = + return blk.isSuccess() - if numOfFailedBlocks > 0: - return - failure("Some blocks failed (Result) to fetch (" & $numOfFailedBlocks & ")") + let blockResults = await self.networkStore.getBlocks(addresses) - if not onBatch.isNil and batchErr =? (await onBatch(blocks)).errorOption: + var + successfulBlocks = 0 + failedBlocks = 0 + blockData: seq[bt.Block] + + for res in blockResults: + without blk =? await res: + inc(failedBlocks) + continue + + inc(successfulBlocks) + + # Only retains block data in memory if there's + # a callback. + if not onBatch.isNil: + blockData.add(blk) + + if failedBlocks > 0: + return failure("Some blocks failed (Result) to fetch (" & $failedBlocks & ")") + + if not onBatch.isNil and batchErr =? (await onBatch(blockData)).errorOption: return failure(batchErr) if not iter.finished: - await sleepAsync(1.millis) + await idleAsync() success() diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index bbe0bef8..6c49a859 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -65,6 +65,14 @@ method getBlock*( raiseAssert("getBlock by addr not implemented!") +method getBlocks*( + self: BlockStore, addresses: seq[BlockAddress] +): Future[SafeAsyncIter[Block]] {.async: (raises: [CancelledError]).} = + ## Gets a set of blocks from the blockstore. Blocks might + ## be returned in any order. + + raiseAssert("getBlocks not implemented!") + method getBlockAndProof*( self: BlockStore, treeCid: Cid, index: Natural ): Future[?!(Block, CodexProof)] {.base, async: (raises: [CancelledError]), gcsafe.} = diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index b20e878c..976a53cb 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -31,24 +31,23 @@ type NetworkStore* = ref object of BlockStore engine*: BlockExcEngine # blockexc decision engine localStore*: BlockStore # local block store -proc getBlocks*( +method getBlocks*( self: NetworkStore, addresses: seq[BlockAddress] -): Future[seq[?!Block]] {.async: (raises: [CancelledError]).} = +): Future[SafeAsyncIter[Block]] {.async: (raises: [CancelledError]).} = var - localBlocks: seq[?!Block] + localAddresses: seq[BlockAddress] remoteAddresses: seq[BlockAddress] - # We can resolve local blocks sequentially as for now those are blocking anyway. Still: - # TODO: implement getBlocks for local store so we can delegate it here. for address in addresses: if not (await address in self.localStore): remoteAddresses.add(address) else: - localBlocks.add(await self.localStore.getBlock(address)) + localAddresses.add(address) - let remoteBlocks = await self.engine.requestBlocks(remoteAddresses) - - return localBlocks.concat(remoteBlocks) + return chain( + await self.localStore.getBlocks(localAddresses), + self.engine.requestBlocks(remoteAddresses), + ) method getBlock*( self: NetworkStore, address: BlockAddress diff --git a/codex/stores/repostore/store.nim b/codex/stores/repostore/store.nim index bea2971c..ad6f03fc 100644 --- a/codex/stores/repostore/store.nim +++ b/codex/stores/repostore/store.nim @@ -38,6 +38,21 @@ logScope: # BlockStore API ########################################################### +method getBlocks*( + self: RepoStore, addresses: seq[BlockAddress] +): Future[SafeAsyncIter[Block]] {.async: (raises: [CancelledError]).} = + var i = 0 + + proc isFinished(): bool = + i == addresses.len + + proc genNext(): Future[?!Block] {.async: (raises: [CancelledError]).} = + let value = await self.getBlock(addresses[i]) + inc(i) + return value + + return SafeAsyncIter[Block].new(genNext, isFinished) + method getBlock*( self: RepoStore, cid: Cid ): Future[?!Block] {.async: (raises: [CancelledError]).} = diff --git a/tests/codex/utils/testsafeasynciter.nim b/tests/codex/utils/testsafeasynciter.nim index 7acd3640..87b0d84a 100644 --- a/tests/codex/utils/testsafeasynciter.nim +++ b/tests/codex/utils/testsafeasynciter.nim @@ -404,8 +404,10 @@ asyncchecksuite "Test SafeAsyncIter": expect CancelledError: for fut in iter2: - without i =? (await fut), err: + if i =? (await fut): collected.add(i) + else: + fail() check: # We expect only values "0" and "1" to be collected