From 68918242268675cb989b234281053523d8eac631 Mon Sep 17 00:00:00 2001 From: gmega Date: Wed, 9 Jul 2025 14:44:14 -0300 Subject: [PATCH] fix: randomize block refresh time, optimize context store checks --- codex/blockexchange/engine/engine.nim | 10 +++++++--- codex/blockexchange/peers/peerctxstore.nim | 2 +- tests/codex/blockexchange/engine/testblockexc.nim | 2 +- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 3511f912..4cf84454 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -12,6 +12,7 @@ import std/sets import std/options import std/algorithm import std/sugar +import std/random import pkg/chronos import pkg/libp2p/[cid, switch, multihash, multicodec] @@ -199,7 +200,6 @@ proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledErr # In dynamic swarms, staleness will dominate latency. if peer.lastRefresh < self.pendingBlocks.lastInclusion or peer.isKnowledgeStale: - trace "Refreshing block knowledge for peer", peer = peer.id peer.refreshRequested() # TODO: optimize this by keeping track of what was sent and sending deltas. # This should allow us to run much more frequent refreshes, and be way more @@ -269,8 +269,9 @@ proc downloadInternal( # We now wait for a bit and then retry. If the handle gets completed in the # meantime (cause the presence handler might have requested the block and - # received it in the meantime), we are done. - await handle or sleepAsync(self.pendingBlocks.retryInterval) + # received it in the meantime), we are done. Retry delays are randomized + # so we don't get all block loops spinning at the same time. + await handle or sleepAsync(secs(rand(self.pendingBlocks.retryInterval.secs))) if handle.finished: break # If we still don't have the block, we'll go for another cycle. @@ -484,6 +485,9 @@ proc cancelBlocks( # If so, schedules a cancellation. scheduledCancellations[peerCtx.id] = intersection + if scheduledCancellations.len == 0: + return + let (succeededFuts, failedFuts) = await allFinishedFailed[PeerId]( toSeq(scheduledCancellations.pairs).map(dispatchCancellations) ) diff --git a/codex/blockexchange/peers/peerctxstore.nim b/codex/blockexchange/peers/peerctxstore.nim index 171206ba..d2762fc8 100644 --- a/codex/blockexchange/peers/peerctxstore.nim +++ b/codex/blockexchange/peers/peerctxstore.nim @@ -78,7 +78,7 @@ func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock = var res: PeersForBlock = (@[], @[]) for peer in self: - if address in peer.peerHave: + if address in peer: res.with.add(peer) else: res.without.add(peer) diff --git a/tests/codex/blockexchange/engine/testblockexc.nim b/tests/codex/blockexchange/engine/testblockexc.nim index 68aec5fe..ca37bc8a 100644 --- a/tests/codex/blockexchange/engine/testblockexc.nim +++ b/tests/codex/blockexchange/engine/testblockexc.nim @@ -213,4 +213,4 @@ asyncchecksuite "NetworkStore - dissemination": await nodes.linearTopology() let downloads = nodes.mapIt(downloadDataset(it, dataset)) - await allFuturesThrowing(downloads).wait(20.seconds) + await allFuturesThrowing(downloads).wait(30.seconds)