From ad4d3b5d622d90e634d5dcf5700f73143fa8c44f Mon Sep 17 00:00:00 2001 From: Chrysostomos Nanakos Date: Thu, 23 Oct 2025 21:44:49 +0300 Subject: [PATCH] feat(node): implement sliding window for block prefetching Now fetchBatched maintains a sliding window of batchSize blocks in-flight. When 75% complete, adds next chunk to maintain constant window size. This ensures blocks are already pending or have been fetched when StoreStream needs them. Part of https://github.com/codex-storage/nim-codex/issues/974 Signed-off-by: Chrysostomos Nanakos --- codex/blockexchange/engine/engine.nim | 6 +- codex/blockexchange/engine/pendingblocks.nim | 2 +- codex/node.nim | 96 +++++++++++--------- 3 files changed, 59 insertions(+), 45 deletions(-) diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 79eb9c44..bd3afc98 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -82,7 +82,6 @@ declareCounter( ) const - DefaultMaxPeersPerRequest* = 10 # The default max message length of nim-libp2p is 100 megabytes, meaning we can # in principle fit up to 1600 64k blocks per message, so 20 is well under # that number. @@ -90,9 +89,10 @@ const DefaultTaskQueueSize = 100 DefaultConcurrentTasks = 10 # Don't do more than one discovery request per `DiscoveryRateLimit` seconds. - DiscoveryRateLimit = 1.seconds + DiscoveryRateLimit = 3.seconds DefaultPeerActivityTimeout = 1.minutes - PresenceBatchSize = 1024 + # Match MaxWantListBatchSize to efficiently respond to incoming WantLists + PresenceBatchSize = MaxWantListBatchSize type TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.} diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index 44bb26c1..2ff1062c 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -34,7 +34,7 @@ declareGauge( const DefaultBlockRetries* = 3000 - DefaultRetryInterval* = 1.seconds + DefaultRetryInterval* = 2.seconds type RetriesExhaustedError* = object of CatchableError diff --git a/codex/node.nim b/codex/node.nim index 18fc4609..67145d22 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -53,8 +53,9 @@ logScope: topics = "codex node" const - DefaultFetchBatch = 2048 + DefaultFetchBatch = 1024 MaxOnBatchBlocks = 128 + BatchRefillThreshold = 0.75 # Refill when 75% of window completes type Contracts* = @@ -188,48 +189,61 @@ proc fetchBatched*( # (i: int) => self.networkStore.getBlock(BlockAddress.init(cid, i)) # ) + # Sliding window: maintain batchSize blocks in-flight + let + refillThreshold = int(float(batchSize) * BatchRefillThreshold) + refillSize = max(refillThreshold, 1) + maxCallbackBlocks = min(batchSize, MaxOnBatchBlocks) + + var + blockData: seq[bt.Block] + failedBlocks = 0 + successfulBlocks = 0 + completedInWindow = 0 + var addresses = newSeqOfCap[BlockAddress](batchSize) - while not iter.finished: - addresses.setLen(0) - for i in 0 ..< batchSize: - if not iter.finished: - let address = BlockAddress.init(cid, iter.next()) - if fetchLocal or not (await address in self.networkStore): - addresses.add(address) - - let blockResults = await self.networkStore.getBlocks(addresses) - - var - successfulBlocks = 0 - failedBlocks = 0 - blockData: seq[bt.Block] - - for res in blockResults: - without blk =? await res: - inc(failedBlocks) - continue - - inc(successfulBlocks) - - # Only retains block data in memory if there's - # a callback. - if not onBatch.isNil: - blockData.add(blk) - - if blockData.len >= MaxOnBatchBlocks: - if batchErr =? (await onBatch(blockData)).errorOption: - return failure(batchErr) - blockData = @[] - - if failedBlocks > 0: - return failure("Some blocks failed (Result) to fetch (" & $failedBlocks & ")") - - if not onBatch.isNil and blockData.len > 0: - if batchErr =? (await onBatch(blockData)).errorOption: - return failure(batchErr) - + for i in 0 ..< batchSize: if not iter.finished: - await idleAsync() + let address = BlockAddress.init(cid, iter.next()) + if fetchLocal or not (await address in self.networkStore): + addresses.add(address) + + var blockResults = await self.networkStore.getBlocks(addresses) + + while not blockResults.finished: + without blk =? await blockResults.next(), err: + inc(failedBlocks) + continue + + inc(successfulBlocks) + inc(completedInWindow) + + if not onBatch.isNil: + blockData.add(blk) + if blockData.len >= maxCallbackBlocks: + if batchErr =? (await onBatch(blockData)).errorOption: + return failure(batchErr) + blockData = @[] + + if completedInWindow >= refillThreshold and not iter.finished: + var refillAddresses = newSeqOfCap[BlockAddress](refillSize) + for i in 0 ..< refillSize: + if not iter.finished: + let address = BlockAddress.init(cid, iter.next()) + if fetchLocal or not (await address in self.networkStore): + refillAddresses.add(address) + + if refillAddresses.len > 0: + blockResults = + chain(blockResults, await self.networkStore.getBlocks(refillAddresses)) + completedInWindow = 0 + + if failedBlocks > 0: + return failure("Some blocks failed (Result) to fetch (" & $failedBlocks & ")") + + if not onBatch.isNil and blockData.len > 0: + if batchErr =? (await onBatch(blockData)).errorOption: + return failure(batchErr) success()