From 5846fbce709f09707e4fe9e62e0ff5df6c1de63c Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Sun, 16 Mar 2025 20:53:54 +0100 Subject: [PATCH] improves exception handling --- codex/node.nim | 33 +++++++++++++++------------------ 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/codex/node.nim b/codex/node.nim index 19122781..fec2c6b1 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -88,7 +88,7 @@ type gcsafe, async: (raises: [CancelledError]) .} PieceProc* = - proc(blocks: seq[bt.Block], pieceIndex: int): Future[?!void] {.gcsafe, raises: [].} + proc(blocks: seq[bt.Block], pieceIndex: int): ?!void {.gcsafe, raises: [].} func switch*(self: CodexNodeRef): Switch = return self.switch @@ -405,19 +405,18 @@ proc fetchPieces*( pieceIter: Iter[int], numOfBlocksPerPiece: int, onPiece: PieceProc, -): Future[?!void] {.async, gcsafe.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = while not blockIter.finished: - let blocks = collect: + let blockFutures = collect: for i in 0 ..< numOfBlocksPerPiece: if not blockIter.finished: let address = BlockAddress.init(cid, blockIter.next()) self.networkStore.getBlock(address) - if blocksErr =? (await allFutureResult(blocks)).errorOption: - return failure(blocksErr) + without blocks =? await allFinishedValues(blockFutures), err: + return failure(err) - if pieceErr =? - (await onPiece(blocks.mapIt(it.read.get), pieceIter.next())).errorOption: + if pieceErr =? (onPiece(blocks, pieceIter.next())).errorOption: return failure(pieceErr) await sleepAsync(1.millis) @@ -429,7 +428,7 @@ proc fetchPieces*( torrentManifest: BitTorrentManifest, codexManifest: Manifest, onPiece: PieceProc, -): Future[?!void] = +): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} = trace "Fetching torrent pieces" let numOfPieces = torrentManifest.info.pieces.len @@ -443,14 +442,12 @@ proc fetchPieces*( proc streamTorrent*( self: CodexNodeRef, torrentManifest: BitTorrentManifest, codexManifest: Manifest -): Future[?!LPStream] {.async.} = +): Future[?!LPStream] {.async: (raises: []).} = trace "Retrieving pieces from torrent" let stream = LPStream(StoreStream.new(self.networkStore, codexManifest, pad = false)) var jobs: seq[Future[void]] - proc onPieceReceived( - blocks: seq[bt.Block], pieceIndex: int - ): Future[?!void] {.async.} = + proc onPieceReceived(blocks: seq[bt.Block], pieceIndex: int): ?!void {.raises: [].} = trace "Fetched torrent piece - verifying..." var pieceHashCtx: sha1 @@ -469,7 +466,7 @@ proc streamTorrent*( # great success success() - proc prefetch(): Future[void] {.async.} = + proc prefetch(): Future[void] {.async: (raises: []).} = try: if err =? ( await self.fetchPieces(torrentManifest, codexManifest, onPieceReceived) @@ -477,18 +474,18 @@ proc streamTorrent*( error "Unable to fetch blocks", err = err.msg await stream.close() except CancelledError: - trace "Prefetch job cancelled" - except CatchableError as exc: - error "Error fetching blocks", exc = exc.msg + trace "Prefetch cancelled" jobs.add(prefetch()) # Monitor stream completion and cancel background jobs when done - proc monitorStream() {.async.} = + proc monitorStream() {.async: (raises: []).} = try: await stream.join() + except CancelledError: + trace "Stream cancelled" finally: - await allFutures(jobs.mapIt(it.cancelAndWait)) + await noCancel allFutures(jobs.mapIt(it.cancelAndWait)) self.trackedFutures.track(monitorStream())