improves exception handling

This commit is contained in:
Marcin Czenko 2025-03-16 20:53:54 +01:00
parent 42f4aa21b9
commit 5846fbce70
No known key found for this signature in database
GPG Key ID: 33DEA0C8E30937C0

View File

@ -88,7 +88,7 @@ type
gcsafe, async: (raises: [CancelledError])
.}
PieceProc* =
proc(blocks: seq[bt.Block], pieceIndex: int): Future[?!void] {.gcsafe, raises: [].}
proc(blocks: seq[bt.Block], pieceIndex: int): ?!void {.gcsafe, raises: [].}
func switch*(self: CodexNodeRef): Switch =
return self.switch
@ -405,19 +405,18 @@ proc fetchPieces*(
pieceIter: Iter[int],
numOfBlocksPerPiece: int,
onPiece: PieceProc,
): Future[?!void] {.async, gcsafe.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
while not blockIter.finished:
let blocks = collect:
let blockFutures = collect:
for i in 0 ..< numOfBlocksPerPiece:
if not blockIter.finished:
let address = BlockAddress.init(cid, blockIter.next())
self.networkStore.getBlock(address)
if blocksErr =? (await allFutureResult(blocks)).errorOption:
return failure(blocksErr)
without blocks =? await allFinishedValues(blockFutures), err:
return failure(err)
if pieceErr =?
(await onPiece(blocks.mapIt(it.read.get), pieceIter.next())).errorOption:
if pieceErr =? (onPiece(blocks, pieceIter.next())).errorOption:
return failure(pieceErr)
await sleepAsync(1.millis)
@ -429,7 +428,7 @@ proc fetchPieces*(
torrentManifest: BitTorrentManifest,
codexManifest: Manifest,
onPiece: PieceProc,
): Future[?!void] =
): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} =
trace "Fetching torrent pieces"
let numOfPieces = torrentManifest.info.pieces.len
@ -443,14 +442,12 @@ proc fetchPieces*(
proc streamTorrent*(
self: CodexNodeRef, torrentManifest: BitTorrentManifest, codexManifest: Manifest
): Future[?!LPStream] {.async.} =
): Future[?!LPStream] {.async: (raises: []).} =
trace "Retrieving pieces from torrent"
let stream = LPStream(StoreStream.new(self.networkStore, codexManifest, pad = false))
var jobs: seq[Future[void]]
proc onPieceReceived(
blocks: seq[bt.Block], pieceIndex: int
): Future[?!void] {.async.} =
proc onPieceReceived(blocks: seq[bt.Block], pieceIndex: int): ?!void {.raises: [].} =
trace "Fetched torrent piece - verifying..."
var pieceHashCtx: sha1
@ -469,7 +466,7 @@ proc streamTorrent*(
# great success
success()
proc prefetch(): Future[void] {.async.} =
proc prefetch(): Future[void] {.async: (raises: []).} =
try:
if err =? (
await self.fetchPieces(torrentManifest, codexManifest, onPieceReceived)
@ -477,18 +474,18 @@ proc streamTorrent*(
error "Unable to fetch blocks", err = err.msg
await stream.close()
except CancelledError:
trace "Prefetch job cancelled"
except CatchableError as exc:
error "Error fetching blocks", exc = exc.msg
trace "Prefetch cancelled"
jobs.add(prefetch())
# Monitor stream completion and cancel background jobs when done
proc monitorStream() {.async.} =
proc monitorStream() {.async: (raises: []).} =
try:
await stream.join()
except CancelledError:
trace "Stream cancelled"
finally:
await allFutures(jobs.mapIt(it.cancelAndWait))
await noCancel allFutures(jobs.mapIt(it.cancelAndWait))
self.trackedFutures.track(monitorStream())