From 475d31bef203a4fc910269731d270588736d2af7 Mon Sep 17 00:00:00 2001 From: gmega Date: Thu, 5 Jun 2025 15:30:08 -0300 Subject: [PATCH] feat: add dataset request batching --- codex/blockexchange/engine/discovery.nim | 2 + codex/blockexchange/engine/engine.nim | 64 +++++++++++++++++--- codex/blockexchange/engine/pendingblocks.nim | 5 +- codex/blockexchange/peers/peercontext.nim | 7 +++ codex/node.nim | 19 +++--- codex/stores/networkstore.nim | 20 ++++++ 6 files changed, 95 insertions(+), 22 deletions(-) diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index b32b8555..6288ceae 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -78,6 +78,8 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} = trace "Discovery request already in progress", cid continue + trace "Running discovery task for cid", cid + let haves = b.peers.peersHave(cid) if haves.len < b.minPeersPerBlock: diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 0d04fd7f..54844f3c 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -154,6 +154,28 @@ proc sendWantBlock( ) # we want this remote to send us a block codex_block_exchange_want_block_lists_sent.inc() +proc refreshBlockKnowledge(self: BlockExcEngine, peer: BlockExcPeerCtx) {.async: (raises: [CancelledError]).} = + # broadcast our want list, the other peer will do the same + if self.pendingBlocks.wantListLen > 0: + let cids = toSeq(self.pendingBlocks.wantList) + trace "Sending our want list to a peer", peer = peer.id, length = cids.len + await self.network.request.sendWantList(peer.id, cids, full = true) + +proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledError]).} = + for peer in self.peers.peers.values: + # We refresh block knowledge if: + # 1. the peer hasn't been refreshed in a while; + # 2. the list of blocks we care about has actually changed. + # + # Note that because of (2), it is important that we update our + # want list in the coarsest way possible instead of over many + # small updates. + # + # In dynamic swarms, staleness will dominate latency. + if peer.lastRefresh < self.pendingBlocks.lastInclusion or peer.isKnowledgeStale: + await self.refreshBlockKnowledge(peer) + peer.refreshed() + proc randomPeer(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx = Rng.instance.sample(peers) @@ -189,7 +211,7 @@ proc downloadInternal( else: self.pendingBlocks.setInFlight(address, false) if peers.without.len > 0: - await self.sendWantHave(@[address], peers.without) + await self.refreshBlockKnowledge() self.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) await (handle or sleepAsync(self.pendingBlocks.retryInterval)) @@ -209,6 +231,32 @@ proc downloadInternal( finally: self.pendingBlocks.setInFlight(address, false) +proc requestBlocks*(self: BlockExcEngine, addresses: seq[BlockAddress]): Future[seq[?!Block]] {.async: (raises: [CancelledError]).} = + var handles: seq[BlockHandle] + # Adds all blocks to pendingBlocks before calling the first downloadInternal. This will + # ensure that we don't send incomplete want lists. + for address in addresses: + if address notin self.pendingBlocks: + handles.add(self.pendingBlocks.getWantHandle(address)) + + for address in addresses: + self.trackedFutures.track(self.downloadInternal(address)) + + # TODO: we can reduce latency and improve download times + # by returning blocks out of order as futures complete. + var blocks: seq[?!Block] + for handle in handles: + try: + blocks.add(success await handle) + except CancelledError as err: + warn "Block request cancelled", addresses, err = err.msg + raise err + except CatchableError as err: + error "Error getting blocks from exchange engine", addresses, err = err.msg + blocks.add(Block.failure err) + + return blocks + proc requestBlock*( self: BlockExcEngine, address: BlockAddress ): Future[?!Block] {.async: (raises: [CancelledError]).} = @@ -239,7 +287,7 @@ proc completeBlock*(self: BlockExcEngine, address: BlockAddress, blk: Block) = proc blockPresenceHandler*( self: BlockExcEngine, peer: PeerId, blocks: seq[BlockPresence] ) {.async: (raises: []).} = - trace "Received block presence from peer", peer, blocks = blocks.mapIt($it) + trace "Received block presence from peer", peer, len = blocks.len let peerCtx = self.peers.get(peer) ourWantList = toSeq(self.pendingBlocks.wantList) @@ -482,12 +530,14 @@ proc wantListHandler*( case e.wantType of WantType.WantHave: if have: + trace "We HAVE the block", address = e.address presence.add( BlockPresence( address: e.address, `type`: BlockPresenceType.Have, price: price ) ) else: + trace "We DON'T HAVE the block", address = e.address if e.sendDontHave: presence.add( BlockPresence( @@ -560,15 +610,11 @@ proc setupPeer*( trace "Setting up peer", peer if peer notin self.peers: + let peerCtx = BlockExcPeerCtx(id: peer) trace "Setting up new peer", peer - self.peers.add(BlockExcPeerCtx(id: peer)) + self.peers.add(peerCtx) trace "Added peer", peers = self.peers.len - - # broadcast our want list, the other peer will do the same - if self.pendingBlocks.wantListLen > 0: - trace "Sending our want list to a peer", peer - let cids = toSeq(self.pendingBlocks.wantList) - await self.network.request.sendWantList(peer, cids, full = true) + await self.refreshBlockKnowledge(peerCtx) if address =? self.pricing .? address: trace "Sending account to peer", peer diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index 80c88527..4ee9feaf 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -34,7 +34,7 @@ declareGauge( const DefaultBlockRetries* = 3000 - DefaultRetryInterval* = 500.millis + DefaultRetryInterval* = 10.seconds type RetriesExhaustedError* = object of CatchableError @@ -50,6 +50,7 @@ type blockRetries*: int = DefaultBlockRetries retryInterval*: Duration = DefaultRetryInterval blocks*: Table[BlockAddress, BlockReq] # pending Block requests + lastInclusion*: Moment # time at which we last included a block into our wantlist proc updatePendingBlockGauge(p: PendingBlocksManager) = codex_block_exchange_pending_block_requests.set(p.blocks.len.int64) @@ -70,6 +71,8 @@ proc getWantHandle*( startTime: getMonoTime().ticks, ) self.blocks[address] = blk + self.lastInclusion = Moment.now() + let handle = blk.handle proc cleanUpBlock(data: pointer) {.raises: [].} = diff --git a/codex/blockexchange/peers/peercontext.nim b/codex/blockexchange/peers/peercontext.nim index 7a299b6b..aeb83de7 100644 --- a/codex/blockexchange/peers/peercontext.nim +++ b/codex/blockexchange/peers/peercontext.nim @@ -31,9 +31,16 @@ type BlockExcPeerCtx* = ref object of RootObj peerWants*: seq[WantListEntry] # remote peers want lists exchanged*: int # times peer has exchanged with us lastExchange*: Moment # last time peer has exchanged with us + lastRefresh*: Moment # last time we refreshed our knowledge of the blocks this peer has account*: ?Account # ethereum account of this peer paymentChannel*: ?ChannelId # payment channel id +proc isKnowledgeStale*(self: BlockExcPeerCtx): bool = + self.lastRefresh + 15.seconds < Moment.now() + +proc refreshed*(self: BlockExcPeerCtx) = + self.lastRefresh = Moment.now() + proc peerHave*(self: BlockExcPeerCtx): seq[BlockAddress] = toSeq(self.blocks.keys) diff --git a/codex/node.nim b/codex/node.nim index 1ca471d5..115272c3 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -52,7 +52,7 @@ export logutils logScope: topics = "codex node" -const DefaultFetchBatch = 10 +const DefaultFetchBatch = 1_000_000 type Contracts* = @@ -187,23 +187,18 @@ proc fetchBatched*( # ) while not iter.finished: - let blockFutures = collect: + let addresses = collect: for i in 0 ..< batchSize: if not iter.finished: let address = BlockAddress.init(cid, iter.next()) if not (await address in self.networkStore) or fetchLocal: - self.networkStore.getBlock(address) + address - if blockFutures.len == 0: - continue + let + blockResults = await self.networkStore.getBlocks(addresses) + blocks = blockResults.filterIt(it.isSuccess()).mapIt(it.value) + numOfFailedBlocks = blockResults.len - blocks.len - without blockResults =? await allFinishedValues[?!bt.Block](blockFutures), err: - trace "Some blocks failed to fetch", err = err.msg - return failure(err) - - let blocks = blockResults.filterIt(it.isSuccess()).mapIt(it.value) - - let numOfFailedBlocks = blockResults.len - blocks.len if numOfFailedBlocks > 0: return failure("Some blocks failed (Result) to fetch (" & $numOfFailedBlocks & ")") diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index 06b96b77..f1e7a294 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -31,6 +31,26 @@ type NetworkStore* = ref object of BlockStore engine*: BlockExcEngine # blockexc decision engine localStore*: BlockStore # local block store +proc getBlocks*( + self: NetworkStore, + addresses: seq[BlockAddress] +): Future[seq[?!Block]] {.async: (raises: [CancelledError]).} = + var + localBlocks: seq[?!Block] + remoteAddresses: seq[BlockAddress] + + # We can resolve local blocks sequentially as for now those are blocking anyway. Still: + # TODO: implement getBlocks for local store so we can delegate it here. + for address in addresses: + if not (await address in self.localStore): + remoteAddresses.add(address) + else: + localBlocks.add(await self.localStore.getBlock(address)) + + let remoteBlocks = await self.engine.requestBlocks(remoteAddresses) + + return localBlocks.concat(remoteBlocks) + method getBlock*( self: NetworkStore, address: BlockAddress ): Future[?!Block] {.async: (raises: [CancelledError]).} =