Checks and sets inflight flag when sending wantBlock messages.

This commit is contained in:
thatben 2025-01-31 10:25:14 +01:00
parent 8403d2738e
commit 88522d9093
No known key found for this signature in database
GPG Key ID: 62C543548433D43E
2 changed files with 19 additions and 4 deletions

View File

@ -152,10 +152,15 @@ proc sendWantHave(
proc sendWantBlock(
b: BlockExcEngine, addresses: seq[BlockAddress], blockPeer: BlockExcPeerCtx
): Future[void] {.async.} =
trace "Sending wantBlock request to", addresses, peer = blockPeer.id
let notInFlight = b.pendingBlocks.getNotInFlight(addresses)
if notInFlight.len == 0:
return
b.pendingBlocks.setInFlight(notInFlight)
trace "Sending wantBlock request to", addrs = notInFlight, peer = blockPeer.id
await b.network.request.sendWantList(
blockPeer.id, addresses, wantType = WantType.WantBlock
) # we want this remote to send us a block
blockPeer.id, notInFlight, wantType = WantType.WantBlock
)
codex_block_exchange_want_block_lists_sent.inc()
proc monitorBlockHandle(
@ -197,7 +202,6 @@ proc requestBlock*(
else:
let selected = pickPseudoRandom(address, peers.with)
asyncSpawn b.monitorBlockHandle(blockFuture, address, selected.id)
b.pendingBlocks.setInFlight(address)
await b.sendWantBlock(@[address], selected)
await b.sendWantHave(@[address], peers.without)

View File

@ -113,6 +113,12 @@ proc setInFlight*(p: PendingBlocksManager, address: BlockAddress, inFlight = tru
p.blocks.withValue(address, pending):
pending[].inFlight = inFlight
proc setInFlight*(
p: PendingBlocksManager, addresses: seq[BlockAddress], inFlight = true
) =
for addrs in addresses:
p.setInFlight(addrs, inFlight)
proc isInFlight*(p: PendingBlocksManager, address: BlockAddress): bool =
## Check if a block is in flight
##
@ -120,6 +126,11 @@ proc isInFlight*(p: PendingBlocksManager, address: BlockAddress): bool =
p.blocks.withValue(address, pending):
result = pending[].inFlight
proc getNotInFlight*(
p: PendingBlocksManager, addresses: seq[BlockAddress]
): seq[BlockAddress] =
addresses.filterIt(not p.isInFlight(it))
proc contains*(p: PendingBlocksManager, cid: Cid): bool =
BlockAddress.init(cid) in p.blocks