From 2fdf234c9da058494fadb3904f13795466b74226 Mon Sep 17 00:00:00 2001 From: Chrysostomos Nanakos Date: Mon, 15 Sep 2025 18:39:25 +0300 Subject: [PATCH] perf: optimize further fetchBatched - Reduce DefaultFetchBatch to prevent blockData explosion - Add chunked onBatch processing to prevent memory accumulation - Implement memory buffer reuse to reduce GC allocation churn - Fix short-circuit evaluation Part of https://github.com/codex-storage/nim-codex/issues/974 --- codex/node.nim | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/codex/node.nim b/codex/node.nim index 71466170..06eb81c0 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -52,7 +52,9 @@ export logutils logScope: topics = "codex node" -const DefaultFetchBatch = 1_000_000 +const + DefaultFetchBatch = 8192 + MaxOnBatchBlocks = 128 type Contracts* = @@ -186,18 +188,14 @@ proc fetchBatched*( # (i: int) => self.networkStore.getBlock(BlockAddress.init(cid, i)) # ) + var addresses = newSeqOfCap[BlockAddress](batchSize) while not iter.finished: - let addresses = collect: - for i in 0 ..< batchSize: - if not iter.finished: - let address = BlockAddress.init(cid, iter.next()) - if not (await address in self.networkStore) or fetchLocal: - address - - proc successful( - blk: ?!bt.Block - ): Future[bool] {.async: (raises: [CancelledError]).} = - return blk.isSuccess() + addresses.setLen(0) + for i in 0 ..< batchSize: + if not iter.finished: + let address = BlockAddress.init(cid, iter.next()) + if fetchLocal or not (await address in self.networkStore): + addresses.add(address) let blockResults = await self.networkStore.getBlocks(addresses) @@ -218,11 +216,17 @@ proc fetchBatched*( if not onBatch.isNil: blockData.add(blk) + if blockData.len >= MaxOnBatchBlocks: + if batchErr =? (await onBatch(blockData)).errorOption: + return failure(batchErr) + blockData = @[] + if failedBlocks > 0: return failure("Some blocks failed (Result) to fetch (" & $failedBlocks & ")") - if not onBatch.isNil and batchErr =? (await onBatch(blockData)).errorOption: - return failure(batchErr) + if not onBatch.isNil and blockData.len > 0: + if batchErr =? (await onBatch(blockData)).errorOption: + return failure(batchErr) if not iter.finished: await idleAsync()