Fix block retransmit (#651)

* Applies peer-scoped lock to peer task handler.

* Replace async lock with delete-first approach.

* Cleanup some logging

* Adds inFlight flag to WantListEntry

* Clears inflight flag when local retrieval fails.

* Adds test for setting of in-flight

* Adds test for clearing in-flight when lookup fails

* Review comments by Tomasz

---------

Co-authored-by: gmega <giuliano.mega@gmail.com>
This commit is contained in:
Ben Bierens 2024-02-29 08:37:12 +01:00 committed by GitHub
parent 1c66cb1e83
commit 5e7ce52fbe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 51 additions and 13 deletions

View File

@ -532,13 +532,19 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
var
wantsBlocks = task.peerWants.filterIt(
it.wantType == WantType.WantBlock
it.wantType == WantType.WantBlock and not it.inFlight
)
proc updateInFlight(addresses: seq[BlockAddress], inFlight: bool) =
for peerWant in task.peerWants.mitems:
if peerWant.address in addresses:
peerWant.inFlight = inFlight
trace "wantsBlocks", peer = task.id, n = wantsBlocks.len
if wantsBlocks.len > 0:
trace "Got peer want blocks list", items = wantsBlocks.len
# Mark wants as in-flight.
let wantAddresses = wantsBlocks.mapIt(it.address)
updateInFlight(wantAddresses, true)
wantsBlocks.sort(SortOrder.Descending)
proc localLookup(e: WantListEntry): Future[?!BlockDelivery] {.async.} =
@ -555,13 +561,16 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
let
blocksDeliveryFut = await allFinished(wantsBlocks.map(localLookup))
# Extract successfully received blocks
let
blocksDelivery = blocksDeliveryFut
.filterIt(it.completed and it.read.isOk)
.mapIt(it.read.get)
# All the wants that failed local lookup must be set to not-in-flight again.
let
successAddresses = blocksDelivery.mapIt(it.address)
failedAddresses = wantAddresses.filterIt(it notin successAddresses)
updateInFlight(failedAddresses, false)
if blocksDelivery.len > 0:
trace "Sending blocks to peer", peer = task.id, blocks = blocksDelivery.len
await b.network.request.sendBlocksDelivery(
@ -571,13 +580,8 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
codex_block_exchange_blocks_sent.inc(blocksDelivery.len.int64)
trace "About to remove entries from peerWants", blocks = blocksDelivery.len, items = task.peerWants.len
# Remove successfully sent blocks
task.peerWants.keepIf(
proc(e: WantListEntry): bool =
not blocksDelivery.anyIt( it.address == e.address )
)
trace "Removed entries from peerWants", items = task.peerWants.len
task.peerWants.keepItIf(it.address notin successAddresses)
trace "Removed entries from peerWants", peerWants = task.peerWants.len
proc blockexcTaskRunner(b: BlockExcEngine) {.async.} =
## process tasks

View File

@ -29,6 +29,7 @@ type
cancel*: bool # Whether this revokes an entry
wantType*: WantType # Note: defaults to enum 0, ie Block
sendDontHave*: bool # Note: defaults to false
inFlight*: bool # Whether block sending is in progress. Not serialized.
WantList* = object
entries*: seq[WantListEntry] # A list of wantList entries

View File

@ -490,6 +490,39 @@ asyncchecksuite "Task Handler":
await engine.taskHandler(peersCtx[0])
test "Should set in-flight for outgoing blocks":
proc sendBlocksDelivery(
id: PeerId,
blocksDelivery: seq[BlockDelivery]) {.gcsafe, async.} =
check peersCtx[0].peerWants[0].inFlight
for blk in blocks:
(await engine.localStore.putBlock(blk)).tryGet()
engine.network.request.sendBlocksDelivery = sendBlocksDelivery
peersCtx[0].peerWants.add(WantListEntry(
address: blocks[0].address,
priority: 50,
cancel: false,
wantType: WantType.WantBlock,
sendDontHave: false,
inFlight: false)
)
await engine.taskHandler(peersCtx[0])
test "Should clear in-flight when local lookup fails":
peersCtx[0].peerWants.add(WantListEntry(
address: blocks[0].address,
priority: 50,
cancel: false,
wantType: WantType.WantBlock,
sendDontHave: false,
inFlight: false)
)
await engine.taskHandler(peersCtx[0])
check not peersCtx[0].peerWants[0].inFlight
test "Should send presence":
let present = blocks
let missing = @[Block.new("missing".toBytes).tryGet()]