From f529a70bd42e05c8a29d57d904deefaf628ac044 Mon Sep 17 00:00:00 2001 From: gmega Date: Mon, 21 Apr 2025 17:54:03 -0300 Subject: [PATCH] fill request schedules after: _i)_ new block knowledge is received; _ii)_ new blocks are received. --- codex/blockexchange/swarm.nim | 50 +++++++++++++++-------------------- 1 file changed, 22 insertions(+), 28 deletions(-) diff --git a/codex/blockexchange/swarm.nim b/codex/blockexchange/swarm.nim index 529b4b5b..e2a8680f 100644 --- a/codex/blockexchange/swarm.nim +++ b/codex/blockexchange/swarm.nim @@ -39,7 +39,7 @@ type # Last time we refreshed block knowledge from peer lastRefresh: Moment # Pending knowledge update request - blockKnowledgeRequest: Future[void].Raising([CancelledError]) + pendingBlockKnowledgeRequest: bool Swarm* = ref object of RootObj # Dataset Manifest manifest: Manifest @@ -139,15 +139,13 @@ proc refreshBlockKnowledge( trace "Asking for block knowledge to peer", peer = peer try: - if not self.peers[peer].blockKnowledgeRequest.isNil: + if self.peers[peer].pendingBlockKnowledgeRequest: trace "Pending knowledge update already in progress", peer = peer return trace "Setup reply future for block knowledge request" - self.peers[peer].blockKnowledgeRequest = Future[void].Raising([CancelledError]).init( - "codex.blockexchange.swarm.refreshBlockKnowledge" - ) + self.peers[peer].pendingBlockKnowledgeRequest = true # We abuse the want list message to ask for block knowledge. await self.network.request.sendWantList( @@ -243,13 +241,16 @@ proc fillRequests*(self: Swarm, peer: PeerId) {.async: (raises: [CancelledError] for blockIndex in peerCtx.blockIndices: if peerCtx.pendingRequests >= self.maxPendingRequests: + trace "Max pending requests reached for peer, not sending new ones", peer = peer break # Already have the block. if self.blocks[blockIndex].completed: continue - # Skip busy blocks. + # Skip busy blocks. This is not very robust - we should + # allow busy blocks once we're done with the idle ones. We + # also need to return failed block requests to idle state. if self.blocks[blockIndex].requests > 0: continue @@ -336,23 +337,9 @@ proc setupPeer(self: Swarm, peer: PeerId) {.async: (raises: [CancelledError]).} trace "Setting up peer", peer = peer self.addNeighbor(peer) - + # Starts by asking peer for block knowledge. await self.refreshBlockKnowledge(peer) - # Don't fill the request schedule before we get a - # reply to the block knowledge request. - try: - trace "Await for reply to block knowledge request" - await self.peers[peer].blockKnowledgeRequest - except KeyError: - error "Cannot fill request schedule for unknown peer", peer = peer - return - - trace "Got initial block knowledge. Setup initial block request schedule", peer = peer - - # Fill initial request schedule to peer. - await self.fillRequests(peer) - proc handleBlockKnowledgeRequest(self: Swarm, peer: PeerId) {.async: (raises: []).} = trace "Handling block knowledge request from peer", peer = peer var presenceInfo: seq[BlockPresence] @@ -386,16 +373,23 @@ proc handleBlockKnowledgeResponse( trace "Peer has blocks", peer = peer, count = i - var fut = self.peers[peer].blockKnowledgeRequest - if fut.isNil: - error "Invalid state for knowledge update" - return - - fut.complete() - self.peers[peer].blockKnowledgeRequest = nil + # Learned some potentially new blocks for peer, maybe we can push + # some more requests. + await self.fillRequests(peer) except KeyError: error "Cannot update block presence for peer", peer = peer return + except CancelledError: + trace "Sending of block requests cancelled", peer = peer + finally: + try: + if not self.peers[peer].pendingBlockKnowledgeRequest: + trace "Illegal state for pending block knowledge request (already cleared)", + peer = peer + self.peers[peer].pendingBlockKnowledgeRequest = false + except KeyError: + trace "Error updating pending block knowledge request state (peer dropped)", + peer = peer proc handleBlockRequest( self: Swarm, peer: PeerId, addresses: seq[BlockAddress]