From 313d6bac1f884e6037dc8afa7a56d45f0dc36bb4 Mon Sep 17 00:00:00 2001 From: gmega Date: Tue, 10 Jun 2025 19:55:51 -0300 Subject: [PATCH] fix: refresh timestamp before issuing request to prevent flood of knowledge updates --- codex/blockexchange/engine/engine.nim | 19 ++- tests/codex/blockexchange/engine/fakepeer.nim | 155 ++++++++++++++++++ .../codex/blockexchange/engine/testengine.nim | 5 + 3 files changed, 175 insertions(+), 4 deletions(-) create mode 100644 tests/codex/blockexchange/engine/fakepeer.nim diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 25eabb71..2550c364 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -164,17 +164,16 @@ proc sendWantBlock( proc refreshBlockKnowledge( self: BlockExcEngine, peer: BlockExcPeerCtx ) {.async: (raises: [CancelledError]).} = - # broadcast our want list, the other peer will do the same if self.pendingBlocks.wantListLen > 0: let cids = toSeq(self.pendingBlocks.wantList) trace "Sending our want list to a peer", peer = peer.id, length = cids.len await self.network.request.sendWantList(peer.id, cids, full = true) proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledError]).} = - for peer in self.peers.peers.values: + for peer in self.peers.peers.values.toSeq: # We refresh block knowledge if: # 1. the peer hasn't been refreshed in a while; - # 2. the list of blocks we care about has actually changed. + # 2. the list of blocks we care about has changed. # # Note that because of (2), it is important that we update our # want list in the coarsest way possible instead of over many @@ -182,8 +181,17 @@ proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledErr # # In dynamic swarms, staleness will dominate latency. if peer.lastRefresh < self.pendingBlocks.lastInclusion or peer.isKnowledgeStale: - await self.refreshBlockKnowledge(peer) + # FIXME: we update the lastRefresh before actually refreshing because otherwise + # a slow peer will be bombarded with requests. If the request does fail or the + # peer does not reply, a retrying block will eventually issue this again. This + # is a complex and convoluted flow - ideally we should simply be tracking this + # request and retrying it on the absence of a response, eventually disconnecting + # the peer if it consistently fails to respond. peer.refreshed() + # TODO: optimize this by keeping track of what was sent and sending deltas. + # This should allow us to run much more frequent refreshes, and be way more + # efficient about it. + await self.refreshBlockKnowledge(peer) proc randomPeer(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx = Rng.instance.sample(peers) @@ -220,6 +228,9 @@ proc downloadInternal( else: self.pendingBlocks.setInFlight(address, false) if peers.without.len > 0: + # We have peers connected, but none of them have the block. This + # could be because our knowledge about what they have has run stale. + # Tries to refresh it. await self.refreshBlockKnowledge() self.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) diff --git a/tests/codex/blockexchange/engine/fakepeer.nim b/tests/codex/blockexchange/engine/fakepeer.nim new file mode 100644 index 00000000..b3fcb48b --- /dev/null +++ b/tests/codex/blockexchange/engine/fakepeer.nim @@ -0,0 +1,155 @@ +import std/assertions +import std/enumerate +import std/sugar + +import pkg/chronos +import pkg/libp2p + +import pkg/codex/manifest +import pkg/codex/merkletree +import pkg/codex/blockexchange +import pkg/codex/blockexchange/network/network {.all.} +import pkg/codex/blockexchange/protobuf/[message, blockexc] +import pkg/codex/blocktype +import pkg/codex/rng + +import ../../helpers + +type + ## Fake network in which one real peer can talk to + ## k fake peers. + FakeNetwork* = ref object + fakePeers*: Table[PeerId, FakePeer] + sender*: BlockExcNetwork + + FakePeer* = ref object + id*: PeerId + fakeNetwork*: FakeNetwork + pendingRequests*: seq[BlockAddress] + blocks*: Table[BlockAddress, Block] + proofs*: Table[BlockAddress, CodexProof] + + Dataset* = object + blocks*: seq[Block] + proofs*: seq[CodexProof] + manifest*: Manifest + +proc makePeerId(): PeerId = + let + gen = Rng.instance() + secKey = PrivateKey.random(gen[]).tryGet() + + return PeerId.init(secKey.getPublicKey().tryGet()).tryGet() + +proc newDataset*( + nBlocks: int = 5, blockSize: NBytes = 1024.NBytes +): Future[Dataset] {.async.} = + let + blocks = await makeRandomBlocks(blockSize.int * nBlocks, blockSize) + (manifest, tree) = makeManifestAndTree(blocks).tryGet() + treeCid = tree.rootCid.tryGet() + + return Dataset( + blocks: blocks, + proofs: (0 ..< blocks.len).mapIt(tree.getProof(it).tryGet()).toSeq, + manifest: manifest, + ) + +proc storeDataset*(self: FakePeer, dataset: Dataset, slice: HSlice[int, int] = 1 .. 0) = + let actualSlice = + if slice.len == 0: + 0 ..< dataset.blocks.len + else: + slice + + for index in actualSlice: + let address = BlockAddress.init(dataset.manifest.treeCid, index.Natural) + self.proofs[address] = dataset.proofs[index] + self.blocks[address] = dataset.blocks[index] + +proc blockPresences(self: FakePeer, addresses: seq[BlockAddress]): seq[BlockPresence] = + collect: + for address in addresses: + if self.blocks.hasKey(address): + BlockPresence(address: address, `type`: BlockPresenceType.Have) + +proc getPeer(self: FakeNetwork, id: PeerId): FakePeer = + try: + return self.fakePeers[id] + except KeyError as exc: + raise newException(Defect, "peer not found") + +proc newInstrumentedNetwork(self: FakeNetwork): BlockExcNetwork = + var sender = BlockExcNetwork() + + proc sendWantList( + id: PeerId, + addresses: seq[BlockAddress], + priority: int32 = 0, + cancel: bool = false, + wantType: WantType = WantType.WantHave, + full: bool = false, + sendDontHave: bool = false, + ) {.async: (raises: [CancelledError]).} = + var peer = self.getPeer(id) + case wantType + # WantHaves are replied to immediately. + of WantType.WantHave: + let haves = peer.blockPresences(addresses) + if haves.len > 0: + await sender.handlers.onPresence(id, haves) + + # WantBlocks are deferred till `sendPendingBlocks` is called. + of WantType.WantBlock: + let blockAddresses = addresses.filterIt(peer.blocks.hasKey(it)).toSeq + if blockAddresses.len > 0: + for blockAddress in blockAddresses: + peer.pendingRequests.add(blockAddress) + + proc sendBlocksDelivery( + id: PeerId, blocksDelivery: seq[BlockDelivery] + ) {.async: (raises: [CancelledError]).} = + var peer = self.getPeer(id) + for delivery in blocksDelivery: + peer.blocks[delivery.address] = delivery.blk + if delivery.proof.isSome: + peer.proofs[delivery.address] = delivery.proof.get + + sender.request = BlockExcRequest( + sendWantList: sendWantList, + sendBlocksDelivery: sendBlocksDelivery, + sendWantCancellations: proc( + id: PeerId, addresses: seq[BlockAddress] + ) {.async: (raises: [CancelledError]).} = + discard, + ) + + return sender + +proc sendPendingBlocks*(self: FakePeer) {.async.} = + ## Replies to any pending block requests. + let blocks = collect: + for blockAddress in self.pendingRequests: + if not self.blocks.hasKey(blockAddress): + continue + + let proof = + if blockAddress in self.proofs: + self.proofs[blockAddress].some + else: + CodexProof.none + + BlockDelivery(address: blockAddress, blk: self.blocks[blockAddress], proof: proof) + + await self.fakeNetwork.sender.handlers.onBlocksDelivery(self.id, blocks) + +proc newPeer*(self: FakeNetwork): FakePeer = + ## Adds a new `FakePeer` to a `FakeNetwork`. + let peer = FakePeer(id: makePeerId(), fakeNetwork: self) + self.fakePeers[peer.id] = peer + return peer + +proc new*(_: type FakeNetwork): FakeNetwork = + let fakeNetwork = FakeNetwork() + fakeNetwork.sender = fakeNetwork.newInstrumentedNetwork() + return fakeNetwork diff --git a/tests/codex/blockexchange/engine/testengine.nim b/tests/codex/blockexchange/engine/testengine.nim index c0360fed..6688b417 100644 --- a/tests/codex/blockexchange/engine/testengine.nim +++ b/tests/codex/blockexchange/engine/testengine.nim @@ -520,6 +520,11 @@ asyncchecksuite "Block Download": expect CancelledError: discard (await pending).tryGet() + # test "Should not keep looking up providers for the same dataset repeatedly": + # let + # blocks = await makeRandomBlocks(datasetSize = 4096, blockSize = 128'nb) + # manifest = await storeDataGetManifest(store, blocks) + asyncchecksuite "Task Handler": var rng: Rng