From 457567531fadb263486447e597616f02660044ae Mon Sep 17 00:00:00 2001 From: Giuliano Mega Date: Thu, 22 Feb 2024 11:54:45 -0300 Subject: [PATCH] Fixes active cancellation for pending want requests (#714) * add block cancellation support + tests * tie issueCancellations into resolveBlocks for proper exception tracking, address comments * pull cancellation as separate primitive in BlockExcNetwork * use allFutures, rename issueBlockCancellations -> cancelBlocks * use trc instead of wrn to register send error * do not log peer IDs --- codex/blockexchange/engine/engine.nim | 31 +++--- codex/blockexchange/network/network.nim | 33 ++++-- codex/utils.nim | 36 ++++--- .../blockexchange/engine/testblockexc.nim | 102 ++++++++++-------- .../codex/blockexchange/engine/testengine.nim | 41 ++++++- tests/codex/helpers.nim | 13 +++ tests/codex/helpers/nodeutils.nim | 3 + tests/codex/utils/testutils.nim | 36 +++++++ 8 files changed, 213 insertions(+), 82 deletions(-) create mode 100644 tests/codex/utils/testutils.nim diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index ec7b7f64..921e9550 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -120,7 +120,6 @@ proc stop*(b: BlockExcEngine) {.async.} = trace "NetworkStore stopped" - proc sendWantHave( b: BlockExcEngine, address: BlockAddress, @@ -146,16 +145,6 @@ proc sendWantBlock( @[address], wantType = WantType.WantBlock) # we want this remote to send us a block -proc findCheapestPeerForBlock(b: BlockExcEngine, cheapestPeers: seq[BlockExcPeerCtx]): ?BlockExcPeerCtx = - if cheapestPeers.len <= 0: - trace "No cheapest peers, selecting first in list" - let - peers = toSeq(b.peers) # Get any peer - if peers.len <= 0: - return none(BlockExcPeerCtx) - return some(peers[0]) - return some(cheapestPeers[0]) # get cheapest - proc monitorBlockHandle(b: BlockExcEngine, handle: Future[Block], address: BlockAddress, peerId: PeerId) {.async.} = try: trace "Monitoring block handle", address, peerId @@ -231,7 +220,7 @@ proc blockPresenceHandler*( have = presence.have price = presence.price - trace "Updating precense" + trace "Updating presence" peerCtx.setPresence(presence) let @@ -280,6 +269,20 @@ proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.asyn break # do next peer +proc cancelBlocks(b: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} = + ## Tells neighboring peers that we're no longer interested in a block. + trace "Sending block request cancellations to peers", addrs = addrs.len + + let failed = (await allFinished( + b.peers.mapIt( + b.network.request.sendWantCancellations( + peer = it.id, + addresses = addrs)))) + .filterIt(it.failed) + + if failed.len > 0: + trace "Failed to send block request cancellations to peers", peers = failed.len + proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = trace "Resolving blocks", blocks = blocksDelivery.len @@ -290,6 +293,8 @@ proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.asy cids.incl(bd.blk.cid) if bd.address.leaf: cids.incl(bd.address.treeCid) + + await b.cancelBlocks(blocksDelivery.mapIt(it.address)) b.discovery.queueProvideBlocksReq(cids.toSeq) proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} = @@ -398,7 +403,7 @@ proc wantListHandler*( for e in wantList.entries: let - idx = peerCtx.peerWants.find(e) + idx = peerCtx.peerWants.findIt(it.address == e.address) logScope: peer = peerCtx.id diff --git a/codex/blockexchange/network/network.nim b/codex/blockexchange/network/network.nim index 1a39364b..336e627a 100644 --- a/codex/blockexchange/network/network.nim +++ b/codex/blockexchange/network/network.nim @@ -39,14 +39,6 @@ type BlockPresenceHandler* = proc(peer: PeerId, precense: seq[BlockPresence]): Future[void] {.gcsafe.} AccountHandler* = proc(peer: PeerId, account: Account): Future[void] {.gcsafe.} PaymentHandler* = proc(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} - WantListSender* = proc( - id: PeerId, - addresses: seq[BlockAddress], - priority: int32 = 0, - cancel: bool = false, - wantType: WantType = WantType.WantHave, - full: bool = false, - sendDontHave: bool = false): Future[void] {.gcsafe.} BlockExcHandlers* = object onWantList*: WantListHandler @@ -55,6 +47,15 @@ type onAccount*: AccountHandler onPayment*: PaymentHandler + WantListSender* = proc( + id: PeerId, + addresses: seq[BlockAddress], + priority: int32 = 0, + cancel: bool = false, + wantType: WantType = WantType.WantHave, + full: bool = false, + sendDontHave: bool = false): Future[void] {.gcsafe.} + WantCancellationSender* = proc(peer: PeerId, addresses: seq[BlockAddress]): Future[void] {.gcsafe.} BlocksDeliverySender* = proc(peer: PeerId, blocksDelivery: seq[BlockDelivery]): Future[void] {.gcsafe.} PresenceSender* = proc(peer: PeerId, presence: seq[BlockPresence]): Future[void] {.gcsafe.} AccountSender* = proc(peer: PeerId, account: Account): Future[void] {.gcsafe.} @@ -62,6 +63,7 @@ type BlockExcRequest* = object sendWantList*: WantListSender + sendWantCancellations*: WantCancellationSender sendBlocksDelivery*: BlocksDeliverySender sendPresence*: PresenceSender sendAccount*: AccountSender @@ -139,6 +141,17 @@ proc sendWantList*( b.send(id, Message(wantlist: msg)) +proc sendWantCancellations*( + b: BlockExcNetwork, + id: PeerId, + addresses: seq[BlockAddress]): Future[void] {.async.} = + ## Informs a remote peer that we're no longer interested in a set of blocks + ## + + trace "Sending block request cancellation to peer", addrs = addresses.len, peer = id + + await b.sendWantList(id = id, addresses = addresses, cancel = true) + proc handleBlocksDelivery( b: BlockExcNetwork, peer: NetworkPeer, @@ -340,6 +353,9 @@ proc new*( id, cids, priority, cancel, wantType, full, sendDontHave) + proc sendWantCancellations(id: PeerId, addresses: seq[BlockAddress]): Future[void] {.gcsafe.} = + self.sendWantCancellations(id, addresses) + proc sendBlocksDelivery(id: PeerId, blocksDelivery: seq[BlockDelivery]): Future[void] {.gcsafe.} = self.sendBlocksDelivery(id, blocksDelivery) @@ -354,6 +370,7 @@ proc new*( self.request = BlockExcRequest( sendWantList: sendWantList, + sendWantCancellations: sendWantCancellations, sendBlocksDelivery: sendBlocksDelivery, sendPresence: sendPresence, sendAccount: sendAccount, diff --git a/codex/utils.nim b/codex/utils.nim index 20ff6af8..17dd924f 100644 --- a/codex/utils.nim +++ b/codex/utils.nim @@ -8,6 +8,7 @@ ## those terms. ## +import std/enumerate import std/parseutils import std/options @@ -17,7 +18,7 @@ import ./utils/asyncheapqueue import ./utils/fileutils import ./utils/asynciter -export asyncheapqueue, fileutils, asynciter +export asyncheapqueue, fileutils, asynciter, chronos func divUp*[T: SomeInteger](a, b : T): T = @@ -35,6 +36,24 @@ proc orElse*[A](a, b: Option[A]): Option[A] = else: b +template findIt*(s, pred: untyped): untyped = + ## Returns the index of the first object matching a predicate, or -1 if no + ## object matches it. + runnableExamples: + type MyType = object + att: int + + var s = @[MyType(att: 1), MyType(att: 2), MyType(att: 3)] + doAssert s.findIt(it.att == 2) == 1 + doAssert s.findIt(it.att == 4) == -1 + + var index = -1 + for i, it {.inject.} in enumerate(items(s)): + if pred: + index = i + break + index + when not declared(parseDuration): # Odd code formatting to minimize diff v. mainLine const Whitespace = {' ', '\t', '\v', '\r', '\l', '\f'} @@ -75,18 +94,3 @@ when not declared(parseDuration): # Odd code formatting to minimize diff v. main result = start #..is no unit to the end of `s`. var sizeF = number * scale + 0.5 # Saturate to int64.high when too big size = seconds(int(sizeF)) - -when isMainModule: - import unittest2 - - suite "time parse": - test "parseDuration": - var res: Duration # caller must still know if 'b' refers to bytes|bits - check parseDuration("10Hr", res) == 3 - check res == hours(10) - check parseDuration("64min", res) == 3 - check res == minutes(64) - check parseDuration("7m/block", res) == 2 # '/' stops parse - check res == minutes(7) # 1 shl 30, forced binary metric - check parseDuration("3d", res) == 2 # '/' stops parse - check res == days(3) # 1 shl 30, forced binary metric diff --git a/tests/codex/blockexchange/engine/testblockexc.nim b/tests/codex/blockexchange/engine/testblockexc.nim index 33ec374e..4f1baf75 100644 --- a/tests/codex/blockexchange/engine/testblockexc.nim +++ b/tests/codex/blockexchange/engine/testblockexc.nim @@ -16,10 +16,6 @@ import ../../examples import ../../helpers asyncchecksuite "NetworkStore engine - 2 nodes": - let - chunker1 = RandomChunker.new(Rng.instance(), size = 2048, chunkSize = 256'nb) - chunker2 = RandomChunker.new(Rng.instance(), size = 2048, chunkSize = 256'nb) - var nodeCmps1, nodeCmps2: NodesComponents peerCtx1, peerCtx2: BlockExcPeerCtx @@ -28,20 +24,8 @@ asyncchecksuite "NetworkStore engine - 2 nodes": pendingBlocks1, pendingBlocks2: seq[Future[bt.Block]] setup: - while true: - let chunk = await chunker1.getBytes() - if chunk.len <= 0: - break - - blocks1.add(bt.Block.new(chunk).tryGet()) - - while true: - let chunk = await chunker2.getBytes() - if chunk.len <= 0: - break - - blocks2.add(bt.Block.new(chunk).tryGet()) - + blocks1 = await makeRandomBlocks(datasetSize = 2048, blockSize = 256'nb) + blocks2 = await makeRandomBlocks(datasetSize = 2048, blockSize = 256'nb) nodeCmps1 = generateNodes(1, blocks1)[0] nodeCmps2 = generateNodes(1, blocks2)[0] @@ -180,42 +164,30 @@ asyncchecksuite "NetworkStore engine - 2 nodes": check eventually wallet.balance(channel, Asset) > 0 asyncchecksuite "NetworkStore - multiple nodes": - let - chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256'nb) - var - switch: seq[Switch] - networkStore: seq[NetworkStore] + nodes: seq[NodesComponents] blocks: seq[bt.Block] setup: - while true: - let chunk = await chunker.getBytes() - if chunk.len <= 0: - break - - blocks.add(bt.Block.new(chunk).tryGet()) - - for e in generateNodes(5): - switch.add(e.switch) - networkStore.add(e.networkStore) + blocks = await makeRandomBlocks(datasetSize = 4096, blockSize = 256'nb) + nodes = generateNodes(5) + for e in nodes: await e.engine.start() await allFuturesThrowing( - switch.mapIt( it.start() ) + nodes.mapIt( it.switch.start() ) ) teardown: await allFuturesThrowing( - switch.mapIt( it.stop() ) + nodes.mapIt( it.switch.stop() ) ) - switch = @[] - networkStore = @[] + nodes = @[] test "Should receive blocks for own want list": let - downloader = networkStore[4] + downloader = nodes[4].networkStore engine = downloader.engine # Add blocks from 1st peer to want list @@ -233,9 +205,9 @@ asyncchecksuite "NetworkStore - multiple nodes": ) for i in 0..15: - (await networkStore[i div 4].engine.localStore.putBlock(blocks[i])).tryGet() + (await nodes[i div 4].networkStore.engine.localStore.putBlock(blocks[i])).tryGet() - await connectNodes(switch) + await connectNodes(nodes) await sleepAsync(1.seconds) await allFuturesThrowing( @@ -251,7 +223,7 @@ asyncchecksuite "NetworkStore - multiple nodes": test "Should exchange blocks with multiple nodes": let - downloader = networkStore[4] + downloader = nodes[4].networkStore engine = downloader.engine # Add blocks from 1st peer to want list @@ -264,9 +236,9 @@ asyncchecksuite "NetworkStore - multiple nodes": ) for i in 0..15: - (await networkStore[i div 4].engine.localStore.putBlock(blocks[i])).tryGet() + (await nodes[i div 4].networkStore.engine.localStore.putBlock(blocks[i])).tryGet() - await connectNodes(switch) + await connectNodes(nodes) await sleepAsync(1.seconds) await allFuturesThrowing( @@ -275,3 +247,47 @@ asyncchecksuite "NetworkStore - multiple nodes": check pendingBlocks1.mapIt( it.read ) == blocks[0..3] check pendingBlocks2.mapIt( it.read ) == blocks[12..15] + + test "Should actively cancel want-haves if block received from elsewhere": + let + # Peer wanting to download blocks + downloader = nodes[4] + # Bystander peer - gets block request but can't satisfy them + bystander = nodes[3] + # Holder of actual blocks + blockHolder = nodes[1] + + let aBlock = blocks[0] + (await blockHolder.engine.localStore.putBlock(aBlock)).tryGet() + + await connectNodes(@[downloader, bystander]) + # Downloader asks for block... + let blockRequest = downloader.engine.requestBlock(aBlock.cid) + + # ... and bystander learns that downloader wants it, but can't provide it. + check eventually( + bystander + .engine + .peers + .get(downloader.switch.peerInfo.peerId) + .peerWants + .filterIt( it.address == aBlock.address ) + .len == 1 + ) + + # As soon as we connect the downloader to the blockHolder, the block should + # propagate to the downloader... + await connectNodes(@[downloader, blockHolder]) + check (await blockRequest).cid == aBlock.cid + check (await downloader.engine.localStore.hasBlock(aBlock.cid)).tryGet() + + # ... and the bystander should have cancelled the want-have + check eventually( + bystander + .engine + .peers + .get(downloader.switch.peerInfo.peerId) + .peerWants + .filterIt( it.address == aBlock.address ) + .len == 0 + ) diff --git a/tests/codex/blockexchange/engine/testengine.nim b/tests/codex/blockexchange/engine/testengine.nim index d1a5f242..4cf84724 100644 --- a/tests/codex/blockexchange/engine/testengine.nim +++ b/tests/codex/blockexchange/engine/testengine.nim @@ -142,6 +142,11 @@ asyncchecksuite "NetworkStore engine handlers": localStore: BlockStore blocks: seq[Block] + const NopSendWantCancellationsProc = proc( + id: PeerId, + addresses: seq[BlockAddress] + ) {.gcsafe, async.} = discard + setup: rng = Rng.instance() chunker = RandomChunker.new(rng, size = 1024'nb, chunkSize = 256'nb) @@ -275,6 +280,10 @@ asyncchecksuite "NetworkStore engine handlers": let blocksDelivery = blocks.mapIt(BlockDelivery(blk: it, address: it.address)) + # Install NOP for want list cancellations so they don't cause a crash + engine.network = BlockExcNetwork( + request: BlockExcRequest(sendWantCancellations: NopSendWantCancellationsProc)) + await engine.blocksDeliveryHandler(peerId, blocksDelivery) let resolved = await allFinished(pending) check resolved.mapIt( it.read ) == blocks @@ -306,10 +315,14 @@ asyncchecksuite "NetworkStore engine handlers": check receiver == peerId check balances[account.address.toDestination] == amount - done.complete() + done.complete(), + + # Install NOP for want list cancellations so they don't cause a crash + sendWantCancellations: NopSendWantCancellationsProc )) - await engine.blocksDeliveryHandler(peerId, blocks.mapIt(BlockDelivery(blk: it, address: it.address))) + await engine.blocksDeliveryHandler(peerId, blocks.mapIt( + BlockDelivery(blk: it, address: it.address))) await done.wait(100.millis) test "Should handle block presence": @@ -352,6 +365,30 @@ asyncchecksuite "NetworkStore engine handlers": check a in peerCtx.peerHave check peerCtx.blocks[a].price == price + test "Should send cancellations for received blocks": + let + pending = blocks.mapIt(engine.pendingBlocks.getWantHandle(it.cid)) + blocksDelivery = blocks.mapIt(BlockDelivery(blk: it, address: it.address)) + cancellations = newTable( + blocks.mapIt((it.address, newFuture[void]())).toSeq + ) + + proc sendWantCancellations( + id: PeerId, + addresses: seq[BlockAddress] + ) {.gcsafe, async.} = + for address in addresses: + cancellations[address].complete() + + engine.network = BlockExcNetwork( + request: BlockExcRequest( + sendWantCancellations: sendWantCancellations + )) + + await engine.blocksDeliveryHandler(peerId, blocksDelivery) + discard await allFinished(pending) + await allFuturesThrowing(cancellations.values().toSeq) + asyncchecksuite "Task Handler": var rng: Rng diff --git a/tests/codex/helpers.nim b/tests/codex/helpers.nim index 9ab8f52c..9192968c 100644 --- a/tests/codex/helpers.nim +++ b/tests/codex/helpers.nim @@ -106,6 +106,19 @@ proc storeDataGetManifest*(store: BlockStore, chunker: Chunker): Future[Manifest return manifest +proc makeRandomBlocks*( + datasetSize: int, blockSize: NBytes): Future[seq[Block]] {.async.} = + + var chunker = RandomChunker.new(Rng.instance(), size = datasetSize, + chunkSize = blockSize) + + while true: + let chunk = await chunker.getBytes() + if chunk.len <= 0: + break + + result.add(Block.new(chunk).tryGet()) + proc corruptBlocks*( store: BlockStore, manifest: Manifest, diff --git a/tests/codex/helpers/nodeutils.nim b/tests/codex/helpers/nodeutils.nim index 83e59a69..d8798b3d 100644 --- a/tests/codex/helpers/nodeutils.nim +++ b/tests/codex/helpers/nodeutils.nim @@ -62,3 +62,6 @@ proc connectNodes*(nodes: seq[Switch]) {.async.} = for node in nodes: if dialer.peerInfo.peerId != node.peerInfo.peerId: await dialer.connect(node.peerInfo.peerId, node.peerInfo.addrs) + +proc connectNodes*(nodes: seq[NodesComponents]) {.async.} = + await connectNodes(nodes.mapIt( it.switch )) diff --git a/tests/codex/utils/testutils.nim b/tests/codex/utils/testutils.nim new file mode 100644 index 00000000..b8e386d0 --- /dev/null +++ b/tests/codex/utils/testutils.nim @@ -0,0 +1,36 @@ +import std/unittest + +import pkg/codex/utils + +suite "findIt": + + setup: + type AnObject = object + attribute1*: int + + var objList = @[ + AnObject(attribute1: 1), + AnObject(attribute1: 3), + AnObject(attribute1: 5), + AnObject(attribute1: 3), + ] + + test "should retur index of first object matching predicate": + assert objList.findIt(it.attribute1 == 3) == 1 + + test "should return -1 when no object matches predicate": + assert objList.findIt(it.attribute1 == 15) == -1 + +suite "parseDuration": + + test "should parse durations": + var res: Duration # caller must still know if 'b' refers to bytes|bits + check parseDuration("10Hr", res) == 3 + check res == hours(10) + check parseDuration("64min", res) == 3 + check res == minutes(64) + check parseDuration("7m/block", res) == 2 # '/' stops parse + check res == minutes(7) # 1 shl 30, forced binary metric + check parseDuration("3d", res) == 2 # '/' stops parse + check res == days(3) # 1 shl 30, forced binary metric +