uses SafeAsyncIter to stream the blocks in api

This commit is contained in:
Marcin Czenko 2025-03-31 06:02:29 +02:00
parent 4d5dfb15d0
commit 6000ae5e07
No known key found for this signature in database
GPG Key ID: 33DEA0C8E30937C0
2 changed files with 25 additions and 8 deletions

View File

@ -17,6 +17,7 @@ import pkg/questionable/results
import ../logutils
import ../utils/iter
import ../utils/safeasynciter
import ../utils/trackedfutures
import ../errors
import ../manifest
@ -188,9 +189,12 @@ proc downloadPieces*(self: TorrentDownloader): Future[void] {.async: (raises: []
finally:
await noCancel self.cancel()
proc finished*(self: TorrentDownloader): bool =
self.pieceIndex == -1
proc getNext*(
self: TorrentDownloader
): Future[?!(int, seq[byte])] {.async: (raises: []).} =
): Future[?!(int, seq[byte])] {.async: (raises: [CancelledError]).} =
try:
if self.pieceIndex == -1:
return success((-1, newSeq[byte]()))
@ -211,15 +215,26 @@ proc getNext*(
error "Could not get block from local store", error = err.msg
return failure("Could not get block from local store: " & err.msg)
success((blockIndex, blk.data))
except CancelledError:
except CancelledError as e:
trace "Getting next block from downloader cancelled"
return success((-1, newSeq[byte]()))
raise e
# return success((-1, newSeq[byte]()))
except CatchableError as e:
warn "Could not get block from local store", error = e.msg
return failure("Could not get block from local store: " & e.msg)
proc finished*(self: TorrentDownloader): bool =
self.pieceIndex == -1
proc getAsyncBlockIterator*(self: TorrentDownloader): SafeAsyncIter[(int, seq[byte])] =
proc genNext(): Future[?!(int, seq[byte])] {.
async: (raw: true, raises: [CancelledError])
.} =
self.getNext()
proc isFinished(): bool =
self.finished()
SafeAsyncIter[(int, seq[byte])].new(
genNext = genNext, isFinished = isFinished, finishOnErr = true
)
proc start*(self: TorrentDownloader) =
self.trackedFutures.track(self.downloadPieces())

View File

@ -38,7 +38,7 @@ import ../erasure/erasure
import ../manifest
import ../streams/asyncstreamwrapper
import ../stores
import ../utils/iter
import ../utils/safeasynciter
import ../utils/options
import ../bittorrent/manifest
import ../bittorrent/torrentdownloader
@ -203,8 +203,10 @@ proc retrieveInfoHash(
torrentDownloader = downloader
torrentDownloader.start()
while not torrentDownloader.finished:
without (blockIndex, data) =? (await torrentDownloader.getNext()), err:
for blockFut in torrentDownloader.getAsyncBlockIterator():
let blockRes =
await cast[Future[?!(int, seq[byte])].Raising([CancelledError])](blockFut)
without (blockIndex, data) =? (blockRes), err:
error "Error streaming blocks", err = err.msg
resp.status = Http500
if resp.isPending():