sends wantBlock to peers with block. wantHave to everyone else

This commit is contained in:
Ben 2024-12-04 11:36:17 +01:00
parent 8e29939cf8
commit 3607b88e9d
No known key found for this signature in database
GPG Key ID: 0F16E812E736C24B
2 changed files with 32 additions and 26 deletions

View File

@ -130,16 +130,15 @@ proc stop*(b: BlockExcEngine) {.async.} =
proc sendWantHave(
b: BlockExcEngine,
addresses: seq[BlockAddress],
excluded: seq[BlockExcPeerCtx],
peers: seq[BlockExcPeerCtx]): Future[void] {.async.} =
for p in peers:
if p notin excluded:
let toAsk = addresses.filterIt(it notin p.peerHave)
trace "Sending wantHave request", toAsk, peer = p.id
await b.network.request.sendWantList(
p.id,
toAsk,
wantType = WantType.WantHave)
codex_block_exchange_want_have_lists_sent.inc()
proc sendWantBlock(
b: BlockExcEngine,
@ -150,6 +149,7 @@ proc sendWantBlock(
blockPeer.id,
addresses,
wantType = WantType.WantBlock) # we want this remote to send us a block
codex_block_exchange_want_block_lists_sent.inc()
proc monitorBlockHandle(
b: BlockExcEngine,
@ -175,6 +175,9 @@ proc monitorBlockHandle(
await b.network.switch.disconnect(peerId)
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
proc pickPseudoRandom(address: BlockAddress, peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx =
return peers[hash(address) mod peers.len]
proc requestBlock*(
b: BlockExcEngine,
address: BlockAddress,
@ -182,26 +185,17 @@ proc requestBlock*(
let blockFuture = b.pendingBlocks.getWantHandle(address, b.blockFetchTimeout)
if not b.pendingBlocks.isInFlight(address):
let peers = b.peers.selectCheapest(address)
if peers.len == 0:
let peers = b.peers.getPeersForBlock(address)
if peers.with.len == 0:
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
let maybePeer =
if peers.len > 0:
peers[hash(address) mod peers.len].some
elif b.peers.len > 0:
toSeq(b.peers)[hash(address) mod b.peers.len].some
else:
BlockExcPeerCtx.none
if peer =? maybePeer:
asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id)
let selected = pickPseudoRandom(address, peers.with)
asyncSpawn b.monitorBlockHandle(blockFuture, address, selected.id)
b.pendingBlocks.setInFlight(address)
# TODO: Send more block addresses if at all sensible.
await b.sendWantBlock(@[address], peer)
codex_block_exchange_want_block_lists_sent.inc()
await b.sendWantHave(@[address], @[peer], toSeq(b.peers))
codex_block_exchange_want_have_lists_sent.inc()
await b.sendWantBlock(@[address], selected)
await b.sendWantHave(@[address], peers.without)
# Don't let timeouts bubble up. We can't be too broad here or we break
# cancellations.

View File

@ -32,6 +32,9 @@ logScope:
type
PeerCtxStore* = ref object of RootObj
peers*: OrderedTable[PeerId, BlockExcPeerCtx]
PeersForBlock* = ref object of RootObj
with*: seq[BlockExcPeerCtx]
without*: seq[BlockExcPeerCtx]
iterator items*(self: PeerCtxStore): BlockExcPeerCtx =
for p in self.peers.values:
@ -70,6 +73,15 @@ func peersWant*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx]
func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt( it.peerWants.anyIt( it.address.cidOrTreeCid == cid ) )
proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock =
var res = PeersForBlock()
for peer in self:
if peer.peerHave.anyIt( it == address ):
res.with.add(peer)
else:
res.without.add(peer)
res
func selectCheapest*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] =
# assume that the price for all leaves in a tree is the same
let rootAddress = BlockAddress(leaf: false, cid: address.cidOrTreeCid)