Fix sending of WantBlocks messages and tracking of peerWants (#1019)

* sends wantBlock to peers with block. wantHave to everyone else

* Cleanup cheapestPeer. Fixes test for peers lists

* Fixes issue where peerWants are only stored for type wantBlock.

* Review comments by Dmitriy

* consistent logging of addresses

* prevents duplicate scheduling. Fixes cancellation

* fast

* Marks cancel-presence situation with todo comment.

* fixtest: testsales enable logging

* Review by Dmitriy: Remember peerWants only if we don't have them.

* rework `wantListHandler` handling

---------

Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
This commit is contained in:
Ben Bierens 2025-01-09 23:44:02 +01:00 committed by GitHub
parent 74c46b3651
commit caed3c07a3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 77 additions and 97 deletions

View File

@ -130,16 +130,15 @@ proc stop*(b: BlockExcEngine) {.async.} =
proc sendWantHave(
b: BlockExcEngine,
addresses: seq[BlockAddress],
excluded: seq[BlockExcPeerCtx],
peers: seq[BlockExcPeerCtx]): Future[void] {.async.} =
for p in peers:
if p notin excluded:
let toAsk = addresses.filterIt(it notin p.peerHave)
trace "Sending wantHave request", toAsk, peer = p.id
await b.network.request.sendWantList(
p.id,
toAsk,
wantType = WantType.WantHave)
let toAsk = addresses.filterIt(it notin p.peerHave)
trace "Sending wantHave request", toAsk, peer = p.id
await b.network.request.sendWantList(
p.id,
toAsk,
wantType = WantType.WantHave)
codex_block_exchange_want_have_lists_sent.inc()
proc sendWantBlock(
b: BlockExcEngine,
@ -150,6 +149,7 @@ proc sendWantBlock(
blockPeer.id,
addresses,
wantType = WantType.WantBlock) # we want this remote to send us a block
codex_block_exchange_want_block_lists_sent.inc()
proc monitorBlockHandle(
b: BlockExcEngine,
@ -175,6 +175,9 @@ proc monitorBlockHandle(
await b.network.switch.disconnect(peerId)
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
proc pickPseudoRandom(address: BlockAddress, peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx =
return peers[hash(address) mod peers.len]
proc requestBlock*(
b: BlockExcEngine,
address: BlockAddress,
@ -182,26 +185,17 @@ proc requestBlock*(
let blockFuture = b.pendingBlocks.getWantHandle(address, b.blockFetchTimeout)
if not b.pendingBlocks.isInFlight(address):
let peers = b.peers.selectCheapest(address)
if peers.len == 0:
let peers = b.peers.getPeersForBlock(address)
if peers.with.len == 0:
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
let maybePeer =
if peers.len > 0:
peers[hash(address) mod peers.len].some
elif b.peers.len > 0:
toSeq(b.peers)[hash(address) mod b.peers.len].some
else:
BlockExcPeerCtx.none
if peer =? maybePeer:
asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id)
else:
let selected = pickPseudoRandom(address, peers.with)
asyncSpawn b.monitorBlockHandle(blockFuture, address, selected.id)
b.pendingBlocks.setInFlight(address)
# TODO: Send more block addresses if at all sensible.
await b.sendWantBlock(@[address], peer)
codex_block_exchange_want_block_lists_sent.inc()
await b.sendWantHave(@[address], @[peer], toSeq(b.peers))
codex_block_exchange_want_have_lists_sent.inc()
await b.sendWantBlock(@[address], selected)
await b.sendWantHave(@[address], peers.without)
# Don't let timeouts bubble up. We can't be too broad here or we break
# cancellations.
@ -246,7 +240,7 @@ proc blockPresenceHandler*(
)
if wantCids.len > 0:
trace "Peer has blocks in our wantList", peer, wantCount = wantCids.len
trace "Peer has blocks in our wantList", peer, wants = wantCids
await b.sendWantBlock(wantCids, peerCtx)
# if none of the connected peers report our wants in their have list,
@ -276,7 +270,7 @@ proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.asyn
proc cancelBlocks(b: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} =
## Tells neighboring peers that we're no longer interested in a block.
trace "Sending block request cancellations to peers", addrs = addrs.len
trace "Sending block request cancellations to peers", addrs, peers = b.peers.mapIt($it.id)
let failed = (await allFinished(
b.peers.mapIt(
@ -342,13 +336,13 @@ proc blocksDeliveryHandler*(
b: BlockExcEngine,
peer: PeerId,
blocksDelivery: seq[BlockDelivery]) {.async.} =
trace "Received blocks from peer", peer, blocks = (blocksDelivery.mapIt($it.address)).join(",")
trace "Received blocks from peer", peer, blocks = (blocksDelivery.mapIt(it.address))
var validatedBlocksDelivery: seq[BlockDelivery]
for bd in blocksDelivery:
logScope:
peer = peer
address = bd.address
peer = peer
address = bd.address
if err =? b.validateBlockDelivery(bd).errorOption:
warn "Block validation failed", msg = err.msg
@ -390,11 +384,13 @@ proc wantListHandler*(
wantList: WantList) {.async.} =
let
peerCtx = b.peers.get(peer)
if isNil(peerCtx):
if peerCtx.isNil:
return
var
presence: seq[BlockPresence]
schedulePeer = false
for e in wantList.entries:
let
@ -405,7 +401,7 @@ proc wantListHandler*(
address = e.address
wantType = $e.wantType
if idx < 0: # updating entry
if idx < 0: # Adding new entry to peer wants
let
have = await e.address in b.localStore
price = @(
@ -413,24 +409,27 @@ proc wantListHandler*(
.price.toBytesBE)
if e.wantType == WantType.WantHave:
codex_block_exchange_want_have_lists_received.inc()
if have:
presence.add(
BlockPresence(
address: e.address,
`type`: BlockPresenceType.Have,
price: price))
else:
if e.sendDontHave:
presence.add(
BlockPresence(
address: e.address,
`type`: BlockPresenceType.DontHave,
price: price))
peerCtx.peerWants.add(e)
if not have and e.sendDontHave:
presence.add(
BlockPresence(
address: e.address,
`type`: BlockPresenceType.DontHave,
price: price))
elif have and e.wantType == WantType.WantHave:
presence.add(
BlockPresence(
address: e.address,
`type`: BlockPresenceType.Have,
price: price))
codex_block_exchange_want_have_lists_received.inc()
elif e.wantType == WantType.WantBlock:
peerCtx.peerWants.add(e)
schedulePeer = true
codex_block_exchange_want_block_lists_received.inc()
else:
else: # Updating existing entry in peer wants
# peer doesn't want this block anymore
if e.cancel:
peerCtx.peerWants.del(idx)
@ -443,8 +442,9 @@ proc wantListHandler*(
trace "Sending presence to remote", items = presence.mapIt($it).join(",")
await b.network.request.sendPresence(peer, presence)
if not b.scheduleTask(peerCtx):
warn "Unable to schedule task for peer", peer
if schedulePeer:
if not b.scheduleTask(peerCtx):
warn "Unable to schedule task for peer", peer
proc accountHandler*(
engine: BlockExcEngine,
@ -555,7 +555,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
updateInFlight(failedAddresses, false)
if blocksDelivery.len > 0:
trace "Sending blocks to peer", peer = task.id, blocks = (blocksDelivery.mapIt($it.address)).join(",")
trace "Sending blocks to peer", peer = task.id, blocks = (blocksDelivery.mapIt(it.address))
await b.network.request.sendBlocksDelivery(
task.id,
blocksDelivery

View File

@ -32,6 +32,9 @@ logScope:
type
PeerCtxStore* = ref object of RootObj
peers*: OrderedTable[PeerId, BlockExcPeerCtx]
PeersForBlock* = object of RootObj
with*: seq[BlockExcPeerCtx]
without*: seq[BlockExcPeerCtx]
iterator items*(self: PeerCtxStore): BlockExcPeerCtx =
for p in self.peers.values:
@ -70,32 +73,14 @@ func peersWant*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx]
func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt( it.peerWants.anyIt( it.address.cidOrTreeCid == cid ) )
func selectCheapest*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] =
# assume that the price for all leaves in a tree is the same
let rootAddress = BlockAddress(leaf: false, cid: address.cidOrTreeCid)
var peers = self.peersHave(rootAddress)
func cmp(a, b: BlockExcPeerCtx): int =
var
priceA = 0.u256
priceB = 0.u256
a.blocks.withValue(rootAddress, precense):
priceA = precense[].price
b.blocks.withValue(rootAddress, precense):
priceB = precense[].price
if priceA == priceB:
0
elif priceA > priceB:
1
proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock =
var res = PeersForBlock()
for peer in self:
if peer.peerHave.anyIt( it == address ):
res.with.add(peer)
else:
-1
peers.sort(cmp)
trace "Selected cheapest peers", peers = peers.len
return peers
res.without.add(peer)
res
proc new*(T: type PeerCtxStore): PeerCtxStore =
## create new instance of a peer context store

View File

@ -69,27 +69,6 @@ checksuite "Peer Context Store Peer Selection":
check peerCtxs[0] in peers
check peerCtxs[5] in peers
test "Should select cheapest peers for Cid":
peerCtxs[0].blocks = collect(initTable):
for i, a in addresses:
{ a: Presence(address: a, price: (5 + i).u256) }
peerCtxs[5].blocks = collect(initTable):
for i, a in addresses:
{ a: Presence(address: a, price: (2 + i).u256) }
peerCtxs[9].blocks = collect(initTable):
for i, a in addresses:
{ a: Presence(address: a, price: i.u256) }
let
peers = store.selectCheapest(addresses[0])
check peers.len == 3
check peers[0] == peerCtxs[9]
check peers[1] == peerCtxs[5]
check peers[2] == peerCtxs[0]
test "Should select peers that want Cid":
let
entries = addresses.mapIt(
@ -109,3 +88,19 @@ checksuite "Peer Context Store Peer Selection":
check peers.len == 2
check peerCtxs[0] in peers
check peerCtxs[5] in peers
test "Should return peers with and without block":
let address = addresses[2]
peerCtxs[1].blocks[address] = Presence(address: address, price: 0.u256)
peerCtxs[2].blocks[address] = Presence(address: address, price: 0.u256)
let peers = store.getPeersForBlock(address)
for i, pc in peerCtxs:
if i == 1 or i == 2:
check pc in peers.with
check pc notin peers.without
else:
check pc notin peers.with
check pc in peers.without

View File

@ -19,7 +19,7 @@ multinodesuite "Sales":
clients: CodexConfigs.init(nodes=1).some,
providers: CodexConfigs.init(nodes=1).some,
)
var host: CodexClient
var client: CodexClient