From 6000ae5e0737e734423a87b51fe51dee78685adb Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Mon, 31 Mar 2025 06:02:29 +0200 Subject: [PATCH] uses SafeAsyncIter to stream the blocks in api --- codex/bittorrent/torrentdownloader.nim | 25 ++++++++++++++++++++----- codex/rest/api.nim | 8 +++++--- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/codex/bittorrent/torrentdownloader.nim b/codex/bittorrent/torrentdownloader.nim index b3c42292..d1b02dd4 100644 --- a/codex/bittorrent/torrentdownloader.nim +++ b/codex/bittorrent/torrentdownloader.nim @@ -17,6 +17,7 @@ import pkg/questionable/results import ../logutils import ../utils/iter +import ../utils/safeasynciter import ../utils/trackedfutures import ../errors import ../manifest @@ -188,9 +189,12 @@ proc downloadPieces*(self: TorrentDownloader): Future[void] {.async: (raises: [] finally: await noCancel self.cancel() +proc finished*(self: TorrentDownloader): bool = + self.pieceIndex == -1 + proc getNext*( self: TorrentDownloader -): Future[?!(int, seq[byte])] {.async: (raises: []).} = +): Future[?!(int, seq[byte])] {.async: (raises: [CancelledError]).} = try: if self.pieceIndex == -1: return success((-1, newSeq[byte]())) @@ -211,15 +215,26 @@ proc getNext*( error "Could not get block from local store", error = err.msg return failure("Could not get block from local store: " & err.msg) success((blockIndex, blk.data)) - except CancelledError: + except CancelledError as e: trace "Getting next block from downloader cancelled" - return success((-1, newSeq[byte]())) + raise e + # return success((-1, newSeq[byte]())) except CatchableError as e: warn "Could not get block from local store", error = e.msg return failure("Could not get block from local store: " & e.msg) -proc finished*(self: TorrentDownloader): bool = - self.pieceIndex == -1 +proc getAsyncBlockIterator*(self: TorrentDownloader): SafeAsyncIter[(int, seq[byte])] = + proc genNext(): Future[?!(int, seq[byte])] {. + async: (raw: true, raises: [CancelledError]) + .} = + self.getNext() + + proc isFinished(): bool = + self.finished() + + SafeAsyncIter[(int, seq[byte])].new( + genNext = genNext, isFinished = isFinished, finishOnErr = true + ) proc start*(self: TorrentDownloader) = self.trackedFutures.track(self.downloadPieces()) diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 01c21f9b..c891b8d5 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -38,7 +38,7 @@ import ../erasure/erasure import ../manifest import ../streams/asyncstreamwrapper import ../stores -import ../utils/iter +import ../utils/safeasynciter import ../utils/options import ../bittorrent/manifest import ../bittorrent/torrentdownloader @@ -203,8 +203,10 @@ proc retrieveInfoHash( torrentDownloader = downloader torrentDownloader.start() - while not torrentDownloader.finished: - without (blockIndex, data) =? (await torrentDownloader.getNext()), err: + for blockFut in torrentDownloader.getAsyncBlockIterator(): + let blockRes = + await cast[Future[?!(int, seq[byte])].Raising([CancelledError])](blockFut) + without (blockIndex, data) =? (blockRes), err: error "Error streaming blocks", err = err.msg resp.status = Http500 if resp.isPending():