diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 453dea4f..ee706e3c 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -279,7 +279,7 @@ proc downloadInternal( if not self.pendingBlocks.isRequested(address): let peer = self.selectPeer(peers.with) self.pendingBlocks.markRequested(address, peer.id) - peer.blockRequested(address) + peer.blockRequestScheduled(address) trace "Request block from block retry loop" await self.sendWantBlock(@[address], peer) peer @@ -412,7 +412,7 @@ proc blockPresenceHandler*( for address in ourWantCids: self.pendingBlocks.decRetries(address) self.pendingBlocks.markRequested(address, peer) - peerCtx.blockRequested(address) + peerCtx.blockRequestScheduled(address) if ourWantCids.len > 0: trace "Peer has blocks in our wantList", peer, wants = ourWantCids @@ -773,6 +773,8 @@ proc taskHandler*( wantedBlocks = peerCtx.wantedBlocks.filterIt(not peerCtx.isBlockSent(it)) sent: HashSet[BlockAddress] + trace "Running task for peer", peer = peerCtx.id + for wantedBlock in wantedBlocks: peerCtx.markBlockAsSent(wantedBlock) diff --git a/codex/blockexchange/peers/peercontext.nim b/codex/blockexchange/peers/peercontext.nim index 5d805023..c917b7ee 100644 --- a/codex/blockexchange/peers/peercontext.nim +++ b/codex/blockexchange/peers/peercontext.nim @@ -80,13 +80,16 @@ func price*(self: BlockExcPeerCtx, addresses: seq[BlockAddress]): UInt256 = price -proc blockRequested*(self: BlockExcPeerCtx, address: BlockAddress) = - # We start counting the timeout from the first block requested. +proc blockRequestScheduled*(self: BlockExcPeerCtx, address: BlockAddress) = + ## Adds a block the set of blocks that have been requested to this peer + ## (its request schedule). if self.blocksRequested.len == 0: self.lastExchange = Moment.now() self.blocksRequested.incl(address) proc blockRequestCancelled*(self: BlockExcPeerCtx, address: BlockAddress) = + ## Removes a block from the set of blocks that have been requested to this peer + ## (its request schedule). self.blocksRequested.excl(address) proc blockReceived*(self: BlockExcPeerCtx, address: BlockAddress): bool = diff --git a/tests/codex/blockexchange/engine/testblockexc.nim b/tests/codex/blockexchange/engine/testblockexc.nim index 108ca539..e4bcf764 100644 --- a/tests/codex/blockexchange/engine/testblockexc.nim +++ b/tests/codex/blockexchange/engine/testblockexc.nim @@ -96,6 +96,8 @@ asyncchecksuite "NetworkStore engine - 2 nodes": test "Should send want-have for block": let blk = bt.Block.new("Block 1".toBytes).tryGet() let blkFut = nodeCmps1.pendingBlocks.getWantHandle(blk.cid) + peerCtx2.blockRequestScheduled(blk.address) + (await nodeCmps2.localStore.putBlock(blk)).tryGet() peerCtx1.wantedBlocks.incl(blk.address) diff --git a/tests/codex/blockexchange/engine/testengine.nim b/tests/codex/blockexchange/engine/testengine.nim index 7c3933c5..1afe2147 100644 --- a/tests/codex/blockexchange/engine/testengine.nim +++ b/tests/codex/blockexchange/engine/testengine.nim @@ -242,7 +242,7 @@ asyncchecksuite "NetworkStore engine handlers": let pending = blocks.mapIt(engine.pendingBlocks.getWantHandle(it.cid)) for blk in blocks: - peerCtx.blockRequested(blk.address) + peerCtx.blockRequestScheduled(blk.address) let blocksDelivery = blocks.mapIt(BlockDelivery(blk: it, address: it.address)) @@ -270,7 +270,7 @@ asyncchecksuite "NetworkStore engine handlers": ).toTable for blk in blocks: - peerContext.blockRequested(blk.address) + peerContext.blockRequestScheduled(blk.address) engine.network = BlockExcNetwork( request: BlockExcRequest( @@ -349,10 +349,10 @@ asyncchecksuite "NetworkStore engine handlers": engine.peers.add(senderPeerCtx) for address in reqBlockAddrs: - pendingPeerCtx.blockRequested(address) + pendingPeerCtx.blockRequestScheduled(address) for address in blocks.mapIt(it.address): - senderPeerCtx.blockRequested(address) + senderPeerCtx.blockRequestScheduled(address) proc sendWantCancellations( id: PeerId, addresses: seq[BlockAddress] diff --git a/tests/codex/blockexchange/testnetwork.nim b/tests/codex/blockexchange/testnetwork.nim index b9a51c9d..ab19a6ae 100644 --- a/tests/codex/blockexchange/testnetwork.nim +++ b/tests/codex/blockexchange/testnetwork.nim @@ -40,7 +40,7 @@ asyncchecksuite "Network - Handlers": done = newFuture[void]() buffer = BufferStream.new() network = BlockExcNetwork.new(switch = newStandardSwitch(), connProvider = getConn) - network.setupPeer(peerId) + await network.handlePeerJoined(peerId) networkPeer = network.peers[peerId] discard await networkPeer.connect()