fix: randomize block refresh time, optimize context store checks

This commit is contained in:
gmega 2025-07-09 14:44:14 -03:00 committed by Chrysostomos Nanakos
parent 1fdb14f092
commit 8e8d9f8e60
No known key found for this signature in database
3 changed files with 9 additions and 5 deletions

View File

@ -12,6 +12,7 @@ import std/sets
import std/options import std/options
import std/algorithm import std/algorithm
import std/sugar import std/sugar
import std/random
import pkg/chronos import pkg/chronos
import pkg/libp2p/[cid, switch, multihash, multicodec] import pkg/libp2p/[cid, switch, multihash, multicodec]
@ -199,7 +200,6 @@ proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledErr
# In dynamic swarms, staleness will dominate latency. # In dynamic swarms, staleness will dominate latency.
if peer.lastRefresh < self.pendingBlocks.lastInclusion or peer.isKnowledgeStale: if peer.lastRefresh < self.pendingBlocks.lastInclusion or peer.isKnowledgeStale:
trace "Refreshing block knowledge for peer", peer = peer.id
peer.refreshRequested() peer.refreshRequested()
# TODO: optimize this by keeping track of what was sent and sending deltas. # TODO: optimize this by keeping track of what was sent and sending deltas.
# This should allow us to run much more frequent refreshes, and be way more # This should allow us to run much more frequent refreshes, and be way more
@ -269,8 +269,9 @@ proc downloadInternal(
# We now wait for a bit and then retry. If the handle gets completed in the # We now wait for a bit and then retry. If the handle gets completed in the
# meantime (cause the presence handler might have requested the block and # meantime (cause the presence handler might have requested the block and
# received it in the meantime), we are done. # received it in the meantime), we are done. Retry delays are randomized
await handle or sleepAsync(self.pendingBlocks.retryInterval) # so we don't get all block loops spinning at the same time.
await handle or sleepAsync(secs(rand(self.pendingBlocks.retryInterval.secs)))
if handle.finished: if handle.finished:
break break
# If we still don't have the block, we'll go for another cycle. # If we still don't have the block, we'll go for another cycle.
@ -490,6 +491,9 @@ proc cancelBlocks(
# If so, schedules a cancellation. # If so, schedules a cancellation.
scheduledCancellations[peerCtx.id] = intersection scheduledCancellations[peerCtx.id] = intersection
if scheduledCancellations.len == 0:
return
let (succeededFuts, failedFuts) = await allFinishedFailed[PeerId]( let (succeededFuts, failedFuts) = await allFinishedFailed[PeerId](
toSeq(scheduledCancellations.pairs).map(dispatchCancellations) toSeq(scheduledCancellations.pairs).map(dispatchCancellations)
) )

View File

@ -78,7 +78,7 @@ func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock = proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock =
var res: PeersForBlock = (@[], @[]) var res: PeersForBlock = (@[], @[])
for peer in self: for peer in self:
if address in peer.peerHave: if address in peer:
res.with.add(peer) res.with.add(peer)
else: else:
res.without.add(peer) res.without.add(peer)

View File

@ -223,4 +223,4 @@ asyncchecksuite "NetworkStore - dissemination":
await nodes.linearTopology() await nodes.linearTopology()
let downloads = nodes.mapIt(downloadDataset(it, dataset)) let downloads = nodes.mapIt(downloadDataset(it, dataset))
await allFuturesThrowing(downloads).wait(20.seconds) await allFuturesThrowing(downloads).wait(30.seconds)