feat(node): implement sliding window for block prefetching

Now fetchBatched maintains a sliding window of batchSize blocks in-flight.
When 75% complete, adds next chunk to maintain constant window size.
This ensures blocks are already pending or have been fetched when
StoreStream needs them.

Part of https://github.com/codex-storage/nim-codex/issues/974

Signed-off-by: Chrysostomos Nanakos <chris@include.gr>
This commit is contained in:
Chrysostomos Nanakos 2025-10-23 21:44:49 +03:00
parent aea9337ddc
commit ad4d3b5d62
No known key found for this signature in database
3 changed files with 59 additions and 45 deletions

View File

@ -82,7 +82,6 @@ declareCounter(
)
const
DefaultMaxPeersPerRequest* = 10
# The default max message length of nim-libp2p is 100 megabytes, meaning we can
# in principle fit up to 1600 64k blocks per message, so 20 is well under
# that number.
@ -90,9 +89,10 @@ const
DefaultTaskQueueSize = 100
DefaultConcurrentTasks = 10
# Don't do more than one discovery request per `DiscoveryRateLimit` seconds.
DiscoveryRateLimit = 1.seconds
DiscoveryRateLimit = 3.seconds
DefaultPeerActivityTimeout = 1.minutes
PresenceBatchSize = 1024
# Match MaxWantListBatchSize to efficiently respond to incoming WantLists
PresenceBatchSize = MaxWantListBatchSize
type
TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.}

View File

@ -34,7 +34,7 @@ declareGauge(
const
DefaultBlockRetries* = 3000
DefaultRetryInterval* = 1.seconds
DefaultRetryInterval* = 2.seconds
type
RetriesExhaustedError* = object of CatchableError

View File

@ -53,8 +53,9 @@ logScope:
topics = "codex node"
const
DefaultFetchBatch = 2048
DefaultFetchBatch = 1024
MaxOnBatchBlocks = 128
BatchRefillThreshold = 0.75 # Refill when 75% of window completes
type
Contracts* =
@ -188,48 +189,61 @@ proc fetchBatched*(
# (i: int) => self.networkStore.getBlock(BlockAddress.init(cid, i))
# )
# Sliding window: maintain batchSize blocks in-flight
let
refillThreshold = int(float(batchSize) * BatchRefillThreshold)
refillSize = max(refillThreshold, 1)
maxCallbackBlocks = min(batchSize, MaxOnBatchBlocks)
var
blockData: seq[bt.Block]
failedBlocks = 0
successfulBlocks = 0
completedInWindow = 0
var addresses = newSeqOfCap[BlockAddress](batchSize)
while not iter.finished:
addresses.setLen(0)
for i in 0 ..< batchSize:
if not iter.finished:
let address = BlockAddress.init(cid, iter.next())
if fetchLocal or not (await address in self.networkStore):
addresses.add(address)
let blockResults = await self.networkStore.getBlocks(addresses)
var
successfulBlocks = 0
failedBlocks = 0
blockData: seq[bt.Block]
for res in blockResults:
without blk =? await res:
inc(failedBlocks)
continue
inc(successfulBlocks)
# Only retains block data in memory if there's
# a callback.
if not onBatch.isNil:
blockData.add(blk)
if blockData.len >= MaxOnBatchBlocks:
if batchErr =? (await onBatch(blockData)).errorOption:
return failure(batchErr)
blockData = @[]
if failedBlocks > 0:
return failure("Some blocks failed (Result) to fetch (" & $failedBlocks & ")")
if not onBatch.isNil and blockData.len > 0:
if batchErr =? (await onBatch(blockData)).errorOption:
return failure(batchErr)
for i in 0 ..< batchSize:
if not iter.finished:
await idleAsync()
let address = BlockAddress.init(cid, iter.next())
if fetchLocal or not (await address in self.networkStore):
addresses.add(address)
var blockResults = await self.networkStore.getBlocks(addresses)
while not blockResults.finished:
without blk =? await blockResults.next(), err:
inc(failedBlocks)
continue
inc(successfulBlocks)
inc(completedInWindow)
if not onBatch.isNil:
blockData.add(blk)
if blockData.len >= maxCallbackBlocks:
if batchErr =? (await onBatch(blockData)).errorOption:
return failure(batchErr)
blockData = @[]
if completedInWindow >= refillThreshold and not iter.finished:
var refillAddresses = newSeqOfCap[BlockAddress](refillSize)
for i in 0 ..< refillSize:
if not iter.finished:
let address = BlockAddress.init(cid, iter.next())
if fetchLocal or not (await address in self.networkStore):
refillAddresses.add(address)
if refillAddresses.len > 0:
blockResults =
chain(blockResults, await self.networkStore.getBlocks(refillAddresses))
completedInWindow = 0
if failedBlocks > 0:
return failure("Some blocks failed (Result) to fetch (" & $failedBlocks & ")")
if not onBatch.isNil and blockData.len > 0:
if batchErr =? (await onBatch(blockData)).errorOption:
return failure(batchErr)
success()