From 1135a513d479a6f939e106e0a1342100ed7a0863 Mon Sep 17 00:00:00 2001 From: gmega Date: Fri, 6 Jun 2025 12:56:56 -0300 Subject: [PATCH] feat: cap how many blocks we can pack in a single message --- codex/blockexchange/engine/engine.nim | 128 +++++++++++-------- codex/blockexchange/engine/pendingblocks.nim | 5 +- codex/blockexchange/peers/peercontext.nim | 13 ++ codex/blockexchange/protobuf/message.nim | 1 - 4 files changed, 90 insertions(+), 57 deletions(-) diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 54844f3c..c11ad0d1 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -66,6 +66,10 @@ declareCounter( const DefaultMaxPeersPerRequest* = 10 + # The default max message length of nim-libp2p is 100 megabytes, meaning we can + # in principle fit up to 1600 64k blocks per message, so 500 is well under + # that number. + DefaultMaxBlocksPerMessage = 500 DefaultTaskQueueSize = 100 DefaultConcurrentTasks = 10 @@ -82,6 +86,8 @@ type concurrentTasks: int # Number of concurrent peers we're serving at any given time trackedFutures: TrackedFutures # Tracks futures of blockexc tasks blockexcRunning: bool # Indicates if the blockexc task is running + maxBlocksPerMessage: int + # Maximum number of blocks we can squeeze in a single message pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved wallet*: WalletRef # Nitro wallet for micropayments pricing*: ?Pricing # Optional bandwidth pricing @@ -154,7 +160,9 @@ proc sendWantBlock( ) # we want this remote to send us a block codex_block_exchange_want_block_lists_sent.inc() -proc refreshBlockKnowledge(self: BlockExcEngine, peer: BlockExcPeerCtx) {.async: (raises: [CancelledError]).} = +proc refreshBlockKnowledge( + self: BlockExcEngine, peer: BlockExcPeerCtx +) {.async: (raises: [CancelledError]).} = # broadcast our want list, the other peer will do the same if self.pendingBlocks.wantListLen > 0: let cids = toSeq(self.pendingBlocks.wantList) @@ -214,6 +222,9 @@ proc downloadInternal( await self.refreshBlockKnowledge() self.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) + # FIXME: blocks should not blindly reschedule themselves. Instead, + # we should only reschedule a block if the peer drops, or we are + # in endgame mode. await (handle or sleepAsync(self.pendingBlocks.retryInterval)) self.pendingBlocks.decRetries(address) @@ -231,7 +242,9 @@ proc downloadInternal( finally: self.pendingBlocks.setInFlight(address, false) -proc requestBlocks*(self: BlockExcEngine, addresses: seq[BlockAddress]): Future[seq[?!Block]] {.async: (raises: [CancelledError]).} = +proc requestBlocks*( + self: BlockExcEngine, addresses: seq[BlockAddress] +): Future[seq[?!Block]] {.async: (raises: [CancelledError]).} = var handles: seq[BlockHandle] # Adds all blocks to pendingBlocks before calling the first downloadInternal. This will # ensure that we don't send incomplete want lists. @@ -317,6 +330,7 @@ proc blockPresenceHandler*( if ourWantCids.len > 0: trace "Peer has blocks in our wantList", peer, wants = ourWantCids + # FIXME: this will result in duplicate requests for blocks if err =? catch(await self.sendWantBlock(ourWantCids, peerCtx)).errorOption: warn "Failed to send wantBlock to peer", peer, err = err.msg @@ -629,65 +643,73 @@ proc dropPeer*(self: BlockExcEngine, peer: PeerId) {.raises: [].} = # drop the peer from the peers table self.peers.remove(peer) +proc localLookup( + self: BlockExcEngine, e: WantListEntry +): Future[?!BlockDelivery] {.async: (raises: [CancelledError]).} = + if e.address.leaf: + (await self.localStore.getBlockAndProof(e.address.treeCid, e.address.index)).map( + (blkAndProof: (Block, CodexProof)) => + BlockDelivery( + address: e.address, blk: blkAndProof[0], proof: blkAndProof[1].some + ) + ) + else: + (await self.localStore.getBlock(e.address)).map( + (blk: Block) => BlockDelivery( + address: e.address, blk: blk, proof: CodexProof.none + ) + ) + +iterator splitBatches[T](sequence: seq[T], batchSize: int): seq[T] = + var batch: seq[T] + for element in sequence: + if batch.len == batchSize: + yield batch + batch = @[] + batch.add(element) + + if batch.len > 0: + yield batch + proc taskHandler*( - self: BlockExcEngine, task: BlockExcPeerCtx + self: BlockExcEngine, peerCtx: BlockExcPeerCtx ) {.gcsafe, async: (raises: [CancelledError, RetriesExhaustedError]).} = # Send to the peer blocks he wants to get, # if they present in our local store - # TODO: There should be all sorts of accounting of - # bytes sent/received here + # Blocks that are in flight have already been picked up by other tasks and + # should not be re-sent. + var wantedBlocks = peerCtx.peerWants.filterIt( + it.wantType == WantType.WantBlock and not peerCtx.isInFlight(it.address) + ) - var wantsBlocks = - task.peerWants.filterIt(it.wantType == WantType.WantBlock and not it.inFlight) + wantedBlocks.sort(SortOrder.Descending) - proc updateInFlight(addresses: seq[BlockAddress], inFlight: bool) = - for peerWant in task.peerWants.mitems: - if peerWant.address in addresses: - peerWant.inFlight = inFlight + for wantedBlock in wantedBlocks: + peerCtx.addInFlight(wantedBlock.address) - if wantsBlocks.len > 0: - # Mark wants as in-flight. - let wantAddresses = wantsBlocks.mapIt(it.address) - updateInFlight(wantAddresses, true) - wantsBlocks.sort(SortOrder.Descending) + try: + for batch in wantedBlocks.splitBatches(self.maxBlocksPerMessage): + var blockDeliveries: seq[BlockDelivery] + for wantedBlock in batch: + # I/O is blocking so looking up blocks sequentially is fine. + without blockDelivery =? await self.localLookup(wantedBlock), err: + error "Error getting block from local store", + err = err.msg, address = wantedBlock.address + peerCtx.removeInFlight(wantedBlock.address) + continue + blockDeliveries.add(blockDelivery) - proc localLookup(e: WantListEntry): Future[?!BlockDelivery] {.async.} = - if e.address.leaf: - (await self.localStore.getBlockAndProof(e.address.treeCid, e.address.index)).map( - (blkAndProof: (Block, CodexProof)) => - BlockDelivery( - address: e.address, blk: blkAndProof[0], proof: blkAndProof[1].some - ) - ) - else: - (await self.localStore.getBlock(e.address)).map( - (blk: Block) => - BlockDelivery(address: e.address, blk: blk, proof: CodexProof.none) - ) - - let - blocksDeliveryFut = await allFinished(wantsBlocks.map(localLookup)) - blocksDelivery = blocksDeliveryFut.filterIt(it.completed and it.value.isOk).mapIt: - if bd =? it.value: - bd - else: - raiseAssert "Unexpected error in local lookup" - - # All the wants that failed local lookup must be set to not-in-flight again. - let - successAddresses = blocksDelivery.mapIt(it.address) - failedAddresses = wantAddresses.filterIt(it notin successAddresses) - updateInFlight(failedAddresses, false) - - if blocksDelivery.len > 0: - trace "Sending blocks to peer", - peer = task.id, blocks = (blocksDelivery.mapIt(it.address)) - await self.network.request.sendBlocksDelivery(task.id, blocksDelivery) - - codex_block_exchange_blocks_sent.inc(blocksDelivery.len.int64) - - task.peerWants.keepItIf(it.address notin successAddresses) + await self.network.request.sendBlocksDelivery(peerCtx.id, blockDeliveries) + codex_block_exchange_blocks_sent.inc(blockDeliveries.len.int64) + # Drops the batch from want list. Note that the send might still fail down the line + # and we will have removed them anyway, at which point we rely on the requester + # performing a retry for the request to succeed. + peerCtx.peerWants.keepItIf(it.address notin blockDeliveries.mapIt(it.address)) + finally: + # Better safe than sorry: if an exception does happen, we don't want to keep + # those in flight as it'll effectively prevent the blocks from ever being sent. + peerCtx.blocksInFlight.keepItIf(it notin wantedBlocks.mapIt(it.address)) proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} = ## process tasks @@ -712,6 +734,7 @@ proc new*( advertiser: Advertiser, peerStore: PeerCtxStore, pendingBlocks: PendingBlocksManager, + maxBlocksPerMessage = DefaultMaxBlocksPerMessage, concurrentTasks = DefaultConcurrentTasks, ): BlockExcEngine = ## Create new block exchange engine instance @@ -725,6 +748,7 @@ proc new*( wallet: wallet, concurrentTasks: concurrentTasks, trackedFutures: TrackedFutures(), + maxBlocksPerMessage: maxBlocksPerMessage, taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize), discovery: discovery, advertiser: advertiser, diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index 4ee9feaf..eee66bc2 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -34,7 +34,7 @@ declareGauge( const DefaultBlockRetries* = 3000 - DefaultRetryInterval* = 10.seconds + DefaultRetryInterval* = 180.seconds type RetriesExhaustedError* = object of CatchableError @@ -124,9 +124,6 @@ proc resolve*( blockReq.handle.complete(bd.blk) codex_block_exchange_retrieval_time_us.set(retrievalDurationUs) - - if retrievalDurationUs > 500000: - warn "High block retrieval time", retrievalDurationUs, address = bd.address else: trace "Block handle already finished", address = bd.address diff --git a/codex/blockexchange/peers/peercontext.nim b/codex/blockexchange/peers/peercontext.nim index aeb83de7..a4b0329a 100644 --- a/codex/blockexchange/peers/peercontext.nim +++ b/codex/blockexchange/peers/peercontext.nim @@ -34,10 +34,23 @@ type BlockExcPeerCtx* = ref object of RootObj lastRefresh*: Moment # last time we refreshed our knowledge of the blocks this peer has account*: ?Account # ethereum account of this peer paymentChannel*: ?ChannelId # payment channel id + blocksInFlight*: seq[BlockAddress] # blocks in flight towards peer proc isKnowledgeStale*(self: BlockExcPeerCtx): bool = self.lastRefresh + 15.seconds < Moment.now() +proc isInFlight*(self: BlockExcPeerCtx, address: BlockAddress): bool = + address in self.blocksInFlight + +proc addInFlight*(self: BlockExcPeerCtx, address: BlockAddress) = + if not self.isInFlight(address): + self.blocksInFlight.add(address) + +proc removeInFlight*(self: BlockExcPeerCtx, address: BlockAddress) = + let index = self.blocksInFlight.find(address) + if index != -1: + self.blocksInFlight.delete(index) + proc refreshed*(self: BlockExcPeerCtx) = self.lastRefresh = Moment.now() diff --git a/codex/blockexchange/protobuf/message.nim b/codex/blockexchange/protobuf/message.nim index 4db89729..a3384416 100644 --- a/codex/blockexchange/protobuf/message.nim +++ b/codex/blockexchange/protobuf/message.nim @@ -29,7 +29,6 @@ type cancel*: bool # Whether this revokes an entry wantType*: WantType # Note: defaults to enum 0, ie Block sendDontHave*: bool # Note: defaults to false - inFlight*: bool # Whether block sending is in progress. Not serialized. WantList* = object entries*: seq[WantListEntry] # A list of wantList entries