mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-07 16:03:13 +00:00
perf: optimize further fetchBatched
- Reduce DefaultFetchBatch to prevent blockData explosion - Add chunked onBatch processing to prevent memory accumulation - Implement memory buffer reuse to reduce GC allocation churn - Fix short-circuit evaluation Part of https://github.com/codex-storage/nim-codex/issues/974
This commit is contained in:
parent
0fbca17269
commit
2fdf234c9d
@ -52,7 +52,9 @@ export logutils
|
|||||||
logScope:
|
logScope:
|
||||||
topics = "codex node"
|
topics = "codex node"
|
||||||
|
|
||||||
const DefaultFetchBatch = 1_000_000
|
const
|
||||||
|
DefaultFetchBatch = 8192
|
||||||
|
MaxOnBatchBlocks = 128
|
||||||
|
|
||||||
type
|
type
|
||||||
Contracts* =
|
Contracts* =
|
||||||
@ -186,18 +188,14 @@ proc fetchBatched*(
|
|||||||
# (i: int) => self.networkStore.getBlock(BlockAddress.init(cid, i))
|
# (i: int) => self.networkStore.getBlock(BlockAddress.init(cid, i))
|
||||||
# )
|
# )
|
||||||
|
|
||||||
|
var addresses = newSeqOfCap[BlockAddress](batchSize)
|
||||||
while not iter.finished:
|
while not iter.finished:
|
||||||
let addresses = collect:
|
addresses.setLen(0)
|
||||||
for i in 0 ..< batchSize:
|
for i in 0 ..< batchSize:
|
||||||
if not iter.finished:
|
if not iter.finished:
|
||||||
let address = BlockAddress.init(cid, iter.next())
|
let address = BlockAddress.init(cid, iter.next())
|
||||||
if not (await address in self.networkStore) or fetchLocal:
|
if fetchLocal or not (await address in self.networkStore):
|
||||||
address
|
addresses.add(address)
|
||||||
|
|
||||||
proc successful(
|
|
||||||
blk: ?!bt.Block
|
|
||||||
): Future[bool] {.async: (raises: [CancelledError]).} =
|
|
||||||
return blk.isSuccess()
|
|
||||||
|
|
||||||
let blockResults = await self.networkStore.getBlocks(addresses)
|
let blockResults = await self.networkStore.getBlocks(addresses)
|
||||||
|
|
||||||
@ -218,11 +216,17 @@ proc fetchBatched*(
|
|||||||
if not onBatch.isNil:
|
if not onBatch.isNil:
|
||||||
blockData.add(blk)
|
blockData.add(blk)
|
||||||
|
|
||||||
|
if blockData.len >= MaxOnBatchBlocks:
|
||||||
|
if batchErr =? (await onBatch(blockData)).errorOption:
|
||||||
|
return failure(batchErr)
|
||||||
|
blockData = @[]
|
||||||
|
|
||||||
if failedBlocks > 0:
|
if failedBlocks > 0:
|
||||||
return failure("Some blocks failed (Result) to fetch (" & $failedBlocks & ")")
|
return failure("Some blocks failed (Result) to fetch (" & $failedBlocks & ")")
|
||||||
|
|
||||||
if not onBatch.isNil and batchErr =? (await onBatch(blockData)).errorOption:
|
if not onBatch.isNil and blockData.len > 0:
|
||||||
return failure(batchErr)
|
if batchErr =? (await onBatch(blockData)).errorOption:
|
||||||
|
return failure(batchErr)
|
||||||
|
|
||||||
if not iter.finished:
|
if not iter.finished:
|
||||||
await idleAsync()
|
await idleAsync()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user