diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 33b0e6a1..b4ed734b 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -130,16 +130,15 @@ proc stop*(b: BlockExcEngine) {.async.} = proc sendWantHave( b: BlockExcEngine, addresses: seq[BlockAddress], - excluded: seq[BlockExcPeerCtx], peers: seq[BlockExcPeerCtx]): Future[void] {.async.} = for p in peers: - if p notin excluded: - let toAsk = addresses.filterIt(it notin p.peerHave) - trace "Sending wantHave request", toAsk, peer = p.id - await b.network.request.sendWantList( - p.id, - toAsk, - wantType = WantType.WantHave) + let toAsk = addresses.filterIt(it notin p.peerHave) + trace "Sending wantHave request", toAsk, peer = p.id + await b.network.request.sendWantList( + p.id, + toAsk, + wantType = WantType.WantHave) + codex_block_exchange_want_have_lists_sent.inc() proc sendWantBlock( b: BlockExcEngine, @@ -150,6 +149,7 @@ proc sendWantBlock( blockPeer.id, addresses, wantType = WantType.WantBlock) # we want this remote to send us a block + codex_block_exchange_want_block_lists_sent.inc() proc monitorBlockHandle( b: BlockExcEngine, @@ -175,6 +175,9 @@ proc monitorBlockHandle( await b.network.switch.disconnect(peerId) b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) +proc pickPseudoRandom(address: BlockAddress, peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx = + return peers[hash(address) mod peers.len] + proc requestBlock*( b: BlockExcEngine, address: BlockAddress, @@ -182,26 +185,17 @@ proc requestBlock*( let blockFuture = b.pendingBlocks.getWantHandle(address, b.blockFetchTimeout) if not b.pendingBlocks.isInFlight(address): - let peers = b.peers.selectCheapest(address) - if peers.len == 0: + let peers = b.peers.getPeersForBlock(address) + + if peers.with.len == 0: b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) - - let maybePeer = - if peers.len > 0: - peers[hash(address) mod peers.len].some - elif b.peers.len > 0: - toSeq(b.peers)[hash(address) mod b.peers.len].some - else: - BlockExcPeerCtx.none - - if peer =? maybePeer: - asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id) + else: + let selected = pickPseudoRandom(address, peers.with) + asyncSpawn b.monitorBlockHandle(blockFuture, address, selected.id) b.pendingBlocks.setInFlight(address) - # TODO: Send more block addresses if at all sensible. - await b.sendWantBlock(@[address], peer) - codex_block_exchange_want_block_lists_sent.inc() - await b.sendWantHave(@[address], @[peer], toSeq(b.peers)) - codex_block_exchange_want_have_lists_sent.inc() + await b.sendWantBlock(@[address], selected) + + await b.sendWantHave(@[address], peers.without) # Don't let timeouts bubble up. We can't be too broad here or we break # cancellations. diff --git a/codex/blockexchange/peers/peerctxstore.nim b/codex/blockexchange/peers/peerctxstore.nim index a64ecd22..575b658b 100644 --- a/codex/blockexchange/peers/peerctxstore.nim +++ b/codex/blockexchange/peers/peerctxstore.nim @@ -32,6 +32,9 @@ logScope: type PeerCtxStore* = ref object of RootObj peers*: OrderedTable[PeerId, BlockExcPeerCtx] + PeersForBlock* = ref object of RootObj + with*: seq[BlockExcPeerCtx] + without*: seq[BlockExcPeerCtx] iterator items*(self: PeerCtxStore): BlockExcPeerCtx = for p in self.peers.values: @@ -70,6 +73,15 @@ func peersWant*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = toSeq(self.peers.values).filterIt( it.peerWants.anyIt( it.address.cidOrTreeCid == cid ) ) +proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock = + var res = PeersForBlock() + for peer in self: + if peer.peerHave.anyIt( it == address ): + res.with.add(peer) + else: + res.without.add(peer) + res + func selectCheapest*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] = # assume that the price for all leaves in a tree is the same let rootAddress = BlockAddress(leaf: false, cid: address.cidOrTreeCid)