diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 95516500..8fb667fb 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -28,7 +28,6 @@ import ../stores import ../clock import ../blocktype as bt import ../utils -import ../utils/asynciter import ../indexingstrategy import ../errors import ../utils/arrayutils @@ -122,14 +121,19 @@ func indexToPos(steps, idx, step: int): int {.inline.} = proc getPendingBlocks( self: Erasure, manifest: Manifest, indices: seq[int] -): AsyncIter[(?!bt.Block, int)] = +): SafeAsyncIter[(?!bt.Block, int)] = ## Get pending blocks iterator ## + + if indicies.len == 0: + trace "No indicies to fetch blocks for", treeCid = manifest.treeCid + return SafeAsyncIter[(?!bt.Block, int)].empty() + var pendingBlocks: seq[Future[(?!bt.Block, int)]] = @[] proc attachIndex( fut: Future[?!bt.Block], i: int - ): Future[(?!bt.Block, int)] {.async.} = + ): Future[(?!bt.Block, int)] {.async: (raises: [CancelledError]).} = ## avoids closure capture issues return (await fut, i) @@ -141,20 +145,25 @@ proc getPendingBlocks( proc isFinished(): bool = pendingBlocks.len == 0 - proc genNext(): Future[(?!bt.Block, int)] {.async.} = - let completedFut = await one(pendingBlocks) - if (let i = pendingBlocks.find(completedFut); i >= 0): - pendingBlocks.del(i) - return await completedFut - else: - let (_, index) = await completedFut - raise newException( - CatchableError, - "Future for block id not found, tree cid: " & $manifest.treeCid & ", index: " & - $index, - ) + proc genNext(): Future[?!(?!bt.Block, int)] {.async: (raises: [CancelledError]).} = + try: + let completedFut = await one(pendingBlocks) + if (let i = pendingBlocks.find(completedFut); i >= 0): + pendingBlocks.del(i) + return success(await completedFut) + else: + let (_, index) = await completedFut + return failure( + "Future for block id not found, tree cid: " & $manifest.treeCid & ", index: " & + $index + ) + except ValueError as err: + # ValueError is raised by `one` when the pendingBlocks is empty - + # but we check for that at the very beginning - + # thus, if this happens, we raise an assert + raiseAssert("fatal: pendingBlocks is empty - this should never happen") - AsyncIter[(?!bt.Block, int)].new(genNext, isFinished) + SafeAsyncIter[(?!bt.Block, int)].new(genNext, isFinished) proc prepareEncodingData( self: Erasure, @@ -164,7 +173,7 @@ proc prepareEncodingData( data: ref seq[seq[byte]], cids: ref seq[Cid], emptyBlock: seq[byte], -): Future[?!Natural] {.async.} = +): Future[?!Natural] {.async: (raises: [CancelledError, IndexingError]).} = ## Prepare data for encoding ## @@ -178,7 +187,9 @@ proc prepareEncodingData( var resolved = 0 for fut in pendingBlocksIter: - let (blkOrErr, idx) = await fut + let pendingBlocksRes = await fut + without (blkOrErr, idx) =? pendingBlocksRes, err: + return failure(err) without blk =? blkOrErr, err: warn "Failed retrieving a block", treeCid = manifest.treeCid, idx, msg = err.msg return failure(err) @@ -208,7 +219,7 @@ proc prepareDecodingData( parityData: ref seq[seq[byte]], cids: ref seq[Cid], emptyBlock: seq[byte], -): Future[?!(Natural, Natural)] {.async.} = +): Future[?!(Natural, Natural)] {.async: (raises: [CancelledError, IndexingError]).} = ## Prepare data for decoding ## `encoded` - the encoded manifest ## `step` - the current step @@ -235,7 +246,9 @@ proc prepareDecodingData( if resolved >= encoded.ecK: break - let (blkOrErr, idx) = await fut + let pendingBlocksRes = await fut + without (blkOrErr, idx) =? pendingBlocksRes, err: + return failure(err) without blk =? blkOrErr, err: trace "Failed retrieving a block", idx, treeCid = encoded.treeCid, msg = err.msg continue @@ -362,7 +375,7 @@ proc asyncEncode*( proc encodeData( self: Erasure, manifest: Manifest, params: EncodingParams -): Future[?!Manifest] {.async.} = +): Future[?!Manifest] {.async: (raises: [CancelledError]).} = ## Encode blocks pointed to by the protected manifest ## ## `manifest` - the manifest to encode @@ -403,15 +416,12 @@ proc encodeData( trace "Erasure coding data", data = data[].len - try: - if err =? ( - await self.asyncEncode( - manifest.blockSize.int, params.ecK, params.ecM, data, parity - ) - ).errorOption: - return failure(err) - except CancelledError as exc: - raise exc + if err =? ( + await self.asyncEncode( + manifest.blockSize.int, params.ecK, params.ecM, data, parity + ) + ).errorOption: + return failure(err) var idx = params.rounded + step for j in 0 ..< params.ecM: @@ -451,9 +461,9 @@ proc encodeData( except CancelledError as exc: trace "Erasure coding encoding cancelled" raise exc # cancellation needs to be propagated - except CatchableError as exc: - trace "Erasure coding encoding error", exc = exc.msg - return failure(exc) + except IndexingError as err: + trace "Erasure coding encoding indexing error", error = err.msg + return failure(err) proc encode*( self: Erasure, @@ -461,7 +471,7 @@ proc encode*( blocks: Natural, parity: Natural, strategy = SteppedStrategy, -): Future[?!Manifest] {.async.} = +): Future[?!Manifest] {.async: (raises: [CancelledError]).} = ## Encode a manifest into one that is erasure protected. ## ## `manifest` - the original manifest to be encoded @@ -554,7 +564,8 @@ proc asyncDecode*( proc decodeInternal( self: Erasure, encoded: Manifest -): Future[?!(ref seq[Cid], seq[Natural])] {.async.} = +): Future[?!(ref seq[Cid], seq[Natural])] {.async: (raises: [CancelledError]).} = + logScope: steps = encoded.steps rounded_blocks = encoded.rounded @@ -597,15 +608,12 @@ proc decodeInternal( continue trace "Erasure decoding data" - try: - if err =? ( - await self.asyncDecode( - encoded.blockSize.int, encoded.ecK, encoded.ecM, data, parityData, recovered - ) - ).errorOption: - return failure(err) - except CancelledError as exc: - raise exc + if err =? ( + await self.asyncDecode( + encoded.blockSize.int, encoded.ecK, encoded.ecM, data, parityData, recovered + ) + ).errorOption: + return failure(err) for i in 0 ..< encoded.ecK: let idx = i * encoded.steps + step @@ -630,9 +638,9 @@ proc decodeInternal( except CancelledError as exc: trace "Erasure coding decoding cancelled" raise exc # cancellation needs to be propagated - except CatchableError as exc: - trace "Erasure coding decoding error", exc = exc.msg - return failure(exc) + except IndexingError as err: + trace "Erasure coding decoding indexing error", error = err.msg + return failure(err) finally: decoder.release() diff --git a/codex/slots/builder/builder.nim b/codex/slots/builder/builder.nim index 5fbb0fe1..9222df15 100644 --- a/codex/slots/builder/builder.nim +++ b/codex/slots/builder/builder.nim @@ -24,12 +24,11 @@ import ../../utils import ../../stores import ../../manifest import ../../merkletree -import ../../utils/asynciter import ../../indexingstrategy import ../converters -export converters, asynciter +export converters, iter logScope: topics = "codex slotsbuilder" diff --git a/codex/stores/treehelper.nim b/codex/stores/treehelper.nim index 9c5506f8..96aab34f 100644 --- a/codex/stores/treehelper.nim +++ b/codex/stores/treehelper.nim @@ -25,7 +25,7 @@ import ../merkletree proc putSomeProofs*( store: BlockStore, tree: CodexTree, iter: Iter[int] -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = without treeCid =? tree.rootCid, err: return failure(err) @@ -51,8 +51,10 @@ proc putSomeProofs*( proc putSomeProofs*( store: BlockStore, tree: CodexTree, iter: Iter[Natural] -): Future[?!void] = +): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} = store.putSomeProofs(tree, iter.map((i: Natural) => i.ord)) -proc putAllProofs*(store: BlockStore, tree: CodexTree): Future[?!void] = +proc putAllProofs*( + store: BlockStore, tree: CodexTree +): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} = store.putSomeProofs(tree, Iter[int].new(0 ..< tree.leavesCount))