replaces the old async iter with the new safe async iter in erasure, builder, and treehelper

This commit is contained in:
Marcin Czenko 2025-06-17 02:30:21 +02:00
parent 9fe75510de
commit 88c6a7291d
No known key found for this signature in database
GPG Key ID: 33DEA0C8E30937C0
3 changed files with 61 additions and 52 deletions

View File

@ -28,7 +28,6 @@ import ../stores
import ../clock
import ../blocktype as bt
import ../utils
import ../utils/asynciter
import ../indexingstrategy
import ../errors
import ../utils/arrayutils
@ -122,14 +121,19 @@ func indexToPos(steps, idx, step: int): int {.inline.} =
proc getPendingBlocks(
self: Erasure, manifest: Manifest, indices: seq[int]
): AsyncIter[(?!bt.Block, int)] =
): SafeAsyncIter[(?!bt.Block, int)] =
## Get pending blocks iterator
##
if indicies.len == 0:
trace "No indicies to fetch blocks for", treeCid = manifest.treeCid
return SafeAsyncIter[(?!bt.Block, int)].empty()
var pendingBlocks: seq[Future[(?!bt.Block, int)]] = @[]
proc attachIndex(
fut: Future[?!bt.Block], i: int
): Future[(?!bt.Block, int)] {.async.} =
): Future[(?!bt.Block, int)] {.async: (raises: [CancelledError]).} =
## avoids closure capture issues
return (await fut, i)
@ -141,20 +145,25 @@ proc getPendingBlocks(
proc isFinished(): bool =
pendingBlocks.len == 0
proc genNext(): Future[(?!bt.Block, int)] {.async.} =
let completedFut = await one(pendingBlocks)
if (let i = pendingBlocks.find(completedFut); i >= 0):
pendingBlocks.del(i)
return await completedFut
else:
let (_, index) = await completedFut
raise newException(
CatchableError,
"Future for block id not found, tree cid: " & $manifest.treeCid & ", index: " &
$index,
)
proc genNext(): Future[?!(?!bt.Block, int)] {.async: (raises: [CancelledError]).} =
try:
let completedFut = await one(pendingBlocks)
if (let i = pendingBlocks.find(completedFut); i >= 0):
pendingBlocks.del(i)
return success(await completedFut)
else:
let (_, index) = await completedFut
return failure(
"Future for block id not found, tree cid: " & $manifest.treeCid & ", index: " &
$index
)
except ValueError as err:
# ValueError is raised by `one` when the pendingBlocks is empty -
# but we check for that at the very beginning -
# thus, if this happens, we raise an assert
raiseAssert("fatal: pendingBlocks is empty - this should never happen")
AsyncIter[(?!bt.Block, int)].new(genNext, isFinished)
SafeAsyncIter[(?!bt.Block, int)].new(genNext, isFinished)
proc prepareEncodingData(
self: Erasure,
@ -164,7 +173,7 @@ proc prepareEncodingData(
data: ref seq[seq[byte]],
cids: ref seq[Cid],
emptyBlock: seq[byte],
): Future[?!Natural] {.async.} =
): Future[?!Natural] {.async: (raises: [CancelledError, IndexingError]).} =
## Prepare data for encoding
##
@ -178,7 +187,9 @@ proc prepareEncodingData(
var resolved = 0
for fut in pendingBlocksIter:
let (blkOrErr, idx) = await fut
let pendingBlocksRes = await fut
without (blkOrErr, idx) =? pendingBlocksRes, err:
return failure(err)
without blk =? blkOrErr, err:
warn "Failed retrieving a block", treeCid = manifest.treeCid, idx, msg = err.msg
return failure(err)
@ -208,7 +219,7 @@ proc prepareDecodingData(
parityData: ref seq[seq[byte]],
cids: ref seq[Cid],
emptyBlock: seq[byte],
): Future[?!(Natural, Natural)] {.async.} =
): Future[?!(Natural, Natural)] {.async: (raises: [CancelledError, IndexingError]).} =
## Prepare data for decoding
## `encoded` - the encoded manifest
## `step` - the current step
@ -235,7 +246,9 @@ proc prepareDecodingData(
if resolved >= encoded.ecK:
break
let (blkOrErr, idx) = await fut
let pendingBlocksRes = await fut
without (blkOrErr, idx) =? pendingBlocksRes, err:
return failure(err)
without blk =? blkOrErr, err:
trace "Failed retrieving a block", idx, treeCid = encoded.treeCid, msg = err.msg
continue
@ -362,7 +375,7 @@ proc asyncEncode*(
proc encodeData(
self: Erasure, manifest: Manifest, params: EncodingParams
): Future[?!Manifest] {.async.} =
): Future[?!Manifest] {.async: (raises: [CancelledError]).} =
## Encode blocks pointed to by the protected manifest
##
## `manifest` - the manifest to encode
@ -403,15 +416,12 @@ proc encodeData(
trace "Erasure coding data", data = data[].len
try:
if err =? (
await self.asyncEncode(
manifest.blockSize.int, params.ecK, params.ecM, data, parity
)
).errorOption:
return failure(err)
except CancelledError as exc:
raise exc
if err =? (
await self.asyncEncode(
manifest.blockSize.int, params.ecK, params.ecM, data, parity
)
).errorOption:
return failure(err)
var idx = params.rounded + step
for j in 0 ..< params.ecM:
@ -451,9 +461,9 @@ proc encodeData(
except CancelledError as exc:
trace "Erasure coding encoding cancelled"
raise exc # cancellation needs to be propagated
except CatchableError as exc:
trace "Erasure coding encoding error", exc = exc.msg
return failure(exc)
except IndexingError as err:
trace "Erasure coding encoding indexing error", error = err.msg
return failure(err)
proc encode*(
self: Erasure,
@ -461,7 +471,7 @@ proc encode*(
blocks: Natural,
parity: Natural,
strategy = SteppedStrategy,
): Future[?!Manifest] {.async.} =
): Future[?!Manifest] {.async: (raises: [CancelledError]).} =
## Encode a manifest into one that is erasure protected.
##
## `manifest` - the original manifest to be encoded
@ -554,7 +564,8 @@ proc asyncDecode*(
proc decodeInternal(
self: Erasure, encoded: Manifest
): Future[?!(ref seq[Cid], seq[Natural])] {.async.} =
): Future[?!(ref seq[Cid], seq[Natural])] {.async: (raises: [CancelledError]).} =
logScope:
steps = encoded.steps
rounded_blocks = encoded.rounded
@ -597,15 +608,12 @@ proc decodeInternal(
continue
trace "Erasure decoding data"
try:
if err =? (
await self.asyncDecode(
encoded.blockSize.int, encoded.ecK, encoded.ecM, data, parityData, recovered
)
).errorOption:
return failure(err)
except CancelledError as exc:
raise exc
if err =? (
await self.asyncDecode(
encoded.blockSize.int, encoded.ecK, encoded.ecM, data, parityData, recovered
)
).errorOption:
return failure(err)
for i in 0 ..< encoded.ecK:
let idx = i * encoded.steps + step
@ -630,9 +638,9 @@ proc decodeInternal(
except CancelledError as exc:
trace "Erasure coding decoding cancelled"
raise exc # cancellation needs to be propagated
except CatchableError as exc:
trace "Erasure coding decoding error", exc = exc.msg
return failure(exc)
except IndexingError as err:
trace "Erasure coding decoding indexing error", error = err.msg
return failure(err)
finally:
decoder.release()

View File

@ -24,12 +24,11 @@ import ../../utils
import ../../stores
import ../../manifest
import ../../merkletree
import ../../utils/asynciter
import ../../indexingstrategy
import ../converters
export converters, asynciter
export converters, iter
logScope:
topics = "codex slotsbuilder"

View File

@ -25,7 +25,7 @@ import ../merkletree
proc putSomeProofs*(
store: BlockStore, tree: CodexTree, iter: Iter[int]
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
without treeCid =? tree.rootCid, err:
return failure(err)
@ -51,8 +51,10 @@ proc putSomeProofs*(
proc putSomeProofs*(
store: BlockStore, tree: CodexTree, iter: Iter[Natural]
): Future[?!void] =
): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} =
store.putSomeProofs(tree, iter.map((i: Natural) => i.ord))
proc putAllProofs*(store: BlockStore, tree: CodexTree): Future[?!void] =
proc putAllProofs*(
store: BlockStore, tree: CodexTree
): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} =
store.putSomeProofs(tree, Iter[int].new(0 ..< tree.leavesCount))