From 5e7ce52fbe650f459eb8923fd07386131aef4067 Mon Sep 17 00:00:00 2001 From: Ben Bierens <39762930+benbierens@users.noreply.github.com> Date: Thu, 29 Feb 2024 08:37:12 +0100 Subject: [PATCH] Fix block retransmit (#651) * Applies peer-scoped lock to peer task handler. * Replace async lock with delete-first approach. * Cleanup some logging * Adds inFlight flag to WantListEntry * Clears inflight flag when local retrieval fails. * Adds test for setting of in-flight * Adds test for clearing in-flight when lookup fails * Review comments by Tomasz --------- Co-authored-by: gmega --- codex/blockexchange/engine/engine.nim | 30 +++++++++-------- codex/blockexchange/protobuf/message.nim | 1 + .../codex/blockexchange/engine/testengine.nim | 33 +++++++++++++++++++ 3 files changed, 51 insertions(+), 13 deletions(-) diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 921e9550..97e7cb99 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -532,13 +532,19 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = var wantsBlocks = task.peerWants.filterIt( - it.wantType == WantType.WantBlock + it.wantType == WantType.WantBlock and not it.inFlight ) + proc updateInFlight(addresses: seq[BlockAddress], inFlight: bool) = + for peerWant in task.peerWants.mitems: + if peerWant.address in addresses: + peerWant.inFlight = inFlight + trace "wantsBlocks", peer = task.id, n = wantsBlocks.len if wantsBlocks.len > 0: - trace "Got peer want blocks list", items = wantsBlocks.len - + # Mark wants as in-flight. + let wantAddresses = wantsBlocks.mapIt(it.address) + updateInFlight(wantAddresses, true) wantsBlocks.sort(SortOrder.Descending) proc localLookup(e: WantListEntry): Future[?!BlockDelivery] {.async.} = @@ -555,13 +561,16 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = let blocksDeliveryFut = await allFinished(wantsBlocks.map(localLookup)) - - # Extract successfully received blocks - let blocksDelivery = blocksDeliveryFut .filterIt(it.completed and it.read.isOk) .mapIt(it.read.get) + # All the wants that failed local lookup must be set to not-in-flight again. + let + successAddresses = blocksDelivery.mapIt(it.address) + failedAddresses = wantAddresses.filterIt(it notin successAddresses) + updateInFlight(failedAddresses, false) + if blocksDelivery.len > 0: trace "Sending blocks to peer", peer = task.id, blocks = blocksDelivery.len await b.network.request.sendBlocksDelivery( @@ -571,13 +580,8 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = codex_block_exchange_blocks_sent.inc(blocksDelivery.len.int64) - trace "About to remove entries from peerWants", blocks = blocksDelivery.len, items = task.peerWants.len - # Remove successfully sent blocks - task.peerWants.keepIf( - proc(e: WantListEntry): bool = - not blocksDelivery.anyIt( it.address == e.address ) - ) - trace "Removed entries from peerWants", items = task.peerWants.len + task.peerWants.keepItIf(it.address notin successAddresses) + trace "Removed entries from peerWants", peerWants = task.peerWants.len proc blockexcTaskRunner(b: BlockExcEngine) {.async.} = ## process tasks diff --git a/codex/blockexchange/protobuf/message.nim b/codex/blockexchange/protobuf/message.nim index 7624bd9c..61488b40 100644 --- a/codex/blockexchange/protobuf/message.nim +++ b/codex/blockexchange/protobuf/message.nim @@ -29,6 +29,7 @@ type cancel*: bool # Whether this revokes an entry wantType*: WantType # Note: defaults to enum 0, ie Block sendDontHave*: bool # Note: defaults to false + inFlight*: bool # Whether block sending is in progress. Not serialized. WantList* = object entries*: seq[WantListEntry] # A list of wantList entries diff --git a/tests/codex/blockexchange/engine/testengine.nim b/tests/codex/blockexchange/engine/testengine.nim index 4cf84724..96571cf3 100644 --- a/tests/codex/blockexchange/engine/testengine.nim +++ b/tests/codex/blockexchange/engine/testengine.nim @@ -490,6 +490,39 @@ asyncchecksuite "Task Handler": await engine.taskHandler(peersCtx[0]) + test "Should set in-flight for outgoing blocks": + proc sendBlocksDelivery( + id: PeerId, + blocksDelivery: seq[BlockDelivery]) {.gcsafe, async.} = + check peersCtx[0].peerWants[0].inFlight + + for blk in blocks: + (await engine.localStore.putBlock(blk)).tryGet() + engine.network.request.sendBlocksDelivery = sendBlocksDelivery + + peersCtx[0].peerWants.add(WantListEntry( + address: blocks[0].address, + priority: 50, + cancel: false, + wantType: WantType.WantBlock, + sendDontHave: false, + inFlight: false) + ) + await engine.taskHandler(peersCtx[0]) + + test "Should clear in-flight when local lookup fails": + peersCtx[0].peerWants.add(WantListEntry( + address: blocks[0].address, + priority: 50, + cancel: false, + wantType: WantType.WantBlock, + sendDontHave: false, + inFlight: false) + ) + await engine.taskHandler(peersCtx[0]) + + check not peersCtx[0].peerWants[0].inFlight + test "Should send presence": let present = blocks let missing = @[Block.new("missing".toBytes).tryGet()]