perf: optimize further fetchBatched

- Reduce DefaultFetchBatch to prevent blockData explosion
- Add chunked onBatch processing to prevent memory accumulation
- Implement memory buffer reuse to reduce GC allocation churn
- Fix short-circuit evaluation
This commit is contained in:
Chrysostomos Nanakos 2025-09-15 18:39:25 +03:00
parent 524e93090d
commit 3a36e38b1f
No known key found for this signature in database

View File

@ -52,7 +52,9 @@ export logutils
logScope:
topics = "codex node"
const DefaultFetchBatch = 1_000_000
const
DefaultFetchBatch = 8192
MaxOnBatchBlocks = 128
type
Contracts* =
@ -186,18 +188,14 @@ proc fetchBatched*(
# (i: int) => self.networkStore.getBlock(BlockAddress.init(cid, i))
# )
var addresses = newSeqOfCap[BlockAddress](batchSize)
while not iter.finished:
let addresses = collect:
for i in 0 ..< batchSize:
if not iter.finished:
let address = BlockAddress.init(cid, iter.next())
if not (await address in self.networkStore) or fetchLocal:
address
proc successful(
blk: ?!bt.Block
): Future[bool] {.async: (raises: [CancelledError]).} =
return blk.isSuccess()
addresses.setLen(0)
for i in 0 ..< batchSize:
if not iter.finished:
let address = BlockAddress.init(cid, iter.next())
if fetchLocal or not (await address in self.networkStore):
addresses.add(address)
let blockResults = await self.networkStore.getBlocks(addresses)
@ -218,11 +216,17 @@ proc fetchBatched*(
if not onBatch.isNil:
blockData.add(blk)
if blockData.len >= MaxOnBatchBlocks:
if batchErr =? (await onBatch(blockData)).errorOption:
return failure(batchErr)
blockData = @[]
if failedBlocks > 0:
return failure("Some blocks failed (Result) to fetch (" & $failedBlocks & ")")
if not onBatch.isNil and batchErr =? (await onBatch(blockData)).errorOption:
return failure(batchErr)
if not onBatch.isNil and blockData.len > 0:
if batchErr =? (await onBatch(blockData)).errorOption:
return failure(batchErr)
if not iter.finished:
await idleAsync()