diff --git a/codex/bittorrent/torrentdownloader.nim b/codex/bittorrent/torrentdownloader.nim index 93244ab4..ccff3572 100644 --- a/codex/bittorrent/torrentdownloader.nim +++ b/codex/bittorrent/torrentdownloader.nim @@ -1,13 +1,20 @@ +## Nim-Codex +## Copyright (c) 2025 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + {.push raises: [].} -# import std/asyncstreams import std/sequtils import std/sugar import pkg/chronos import pkg/libp2p/multihash import pkg/questionable/results -# import ../rng import ../logutils import ../utils/iter import ../utils/trackedfutures @@ -42,111 +49,11 @@ type queue: AsyncQueue[TorrentPiece] trackedFutures: TrackedFutures -proc newTorrentPiece*( - pieceIndex: int, pieceHash: MultiHash, blockIndexStart: int, blockIndexEnd: int -): TorrentPiece = - TorrentPiece( - pieceIndex: pieceIndex, - pieceHash: pieceHash, - blockIndexStart: blockIndexStart, - blockIndexEnd: blockIndexEnd, - handle: cast[PieceHandle](newFuture[void]("PieceValidator.newTorrentPiece")), - ) - -proc newTorrentDownloader*( - torrentManifest: BitTorrentManifest, - codexManifest: Manifest, - networkStore: NetworkStore, -): ?!TorrentDownloader = - let - blocksCount = codexManifest.blocksCount - numOfPieces = torrentManifest.info.pieces.len - numOfBlocksPerPiece = - torrentManifest.info.pieceLength.int div codexManifest.blockSize.int - numOfBlocksInLastPiece = blocksCount - (numOfBlocksPerPiece * (numOfPieces - 1)) - - let pieces = collect: - for i in 0 ..< numOfPieces: - var blockIndexEnd = ((i + 1) * numOfBlocksPerPiece) - 1 - if i == numOfPieces - 1: - # last piece can have less blocks than numOfBlocksPerPiece - blockIndexEnd = i * numOfBlocksPerPiece + numOfBlocksInLastPiece - 1 - - let piece = newTorrentPiece( - pieceIndex = i, - pieceHash = torrentManifest.info.pieces[i], - blockIndexStart = i * numOfBlocksPerPiece, - blockIndexEnd = blockIndexEnd, - ) - piece - - let queue = newAsyncQueue[TorrentPiece](maxsize = numOfPieces) - - let iter = Iter.new(0 ..< numOfPieces) - var pieceDownloadSequence = newSeqWith(numOfPieces, iter.next()) - # optional: randomize the order of pieces - # not sure if this is such a great idea when streaming content - # Rng.instance.shuffle(pieceDownloadSequence) - - trace "Piece download sequence", pieceDownloadSequence - - for i in pieceDownloadSequence: - try: - queue.addLastNoWait(pieces[i]) - except AsyncQueueFullError: - raiseAssert "Fatal: could not add pieces to queue" - - TorrentDownloader( - torrentManifest: torrentManifest, - codexManifest: codexManifest, - networkStore: networkStore, - numberOfPieces: numOfPieces, - numberOfBlocksPerPiece: numOfBlocksPerPiece, - pieces: pieces, - waitIter: Iter[int].new(0 ..< numOfPieces), - blockIter: Iter[int].empty(), - pieceIndex: 0, - queue: queue, - trackedFutures: TrackedFutures(), - ).success - -proc getNewBlockIterator(piece: TorrentPiece): Iter[int] = - Iter[int].new(piece.blockIndexStart .. piece.blockIndexEnd) - func numberOfBlocks(piece: TorrentPiece): int = piece.blockIndexEnd - piece.blockIndexStart + 1 -func numberOfBlocksInPiece*(self: TorrentDownloader, pieceIndex: int): ?!int = - if pieceIndex < 0 or pieceIndex >= self.numberOfPieces: - return failure("Invalid piece index") - let piece = self.pieces[pieceIndex] - success(piece.numberOfBlocks) - -proc getNewBlocksInPieceIterator*( - self: TorrentDownloader, pieceIndex: int -): ?!Iter[int] = - if pieceIndex < 0 or pieceIndex >= self.numberOfPieces: - return failure("Invalid piece index") - let piece = self.pieces[pieceIndex] - success(piece.getNewBlockIterator()) - -proc getNewPieceIterator*(self: TorrentDownloader): Iter[int] = - Iter[int].new(0 ..< self.numberOfPieces) - -# proc getNewBlocksPerPieceIterator*(self: TorrentDownloader): Iter[int] = -# Iter[int].new(0 ..< self.numberOfBlocksPerPiece) - -proc waitForNextPiece*( - self: TorrentDownloader -): Future[int] {.async: (raises: [CancelledError]).} = - if self.waitIter.finished: - return -1 - let pieceIndex = self.waitIter.next() - await self.pieces[pieceIndex].handle - pieceIndex - -proc cancel*(self: TorrentDownloader): Future[void] {.async: (raises: []).} = - await noCancel allFutures(self.pieces.mapIt(it.handle.cancelAndWait)) +proc getNewBlockIterator(piece: TorrentPiece): Iter[int] = + Iter[int].new(piece.blockIndexStart .. piece.blockIndexEnd) proc validate(piece: TorrentPiece, blocks: seq[Block]): ?!void {.raises: [].} = var pieceHashCtx: sha1 @@ -223,6 +130,37 @@ proc fetchPiece( success() +func numberOfBlocksInPiece*(self: TorrentDownloader, pieceIndex: int): ?!int = + if pieceIndex < 0 or pieceIndex >= self.numberOfPieces: + return failure("Invalid piece index") + let piece = self.pieces[pieceIndex] + success(piece.numberOfBlocks) + +########################################################################### +# Public API +########################################################################### + +proc getNewBlockIterator*(self: TorrentDownloader, pieceIndex: int): ?!Iter[int] = + if pieceIndex < 0 or pieceIndex >= self.numberOfPieces: + return failure("Invalid piece index") + let piece = self.pieces[pieceIndex] + success(piece.getNewBlockIterator()) + +proc getNewPieceIterator*(self: TorrentDownloader): Iter[int] = + Iter[int].new(0 ..< self.numberOfPieces) + +proc waitForNextPiece*( + self: TorrentDownloader +): Future[int] {.async: (raises: [CancelledError]).} = + if self.waitIter.finished: + return -1 + let pieceIndex = self.waitIter.next() + await self.pieces[pieceIndex].handle + pieceIndex + +proc cancel*(self: TorrentDownloader): Future[void] {.async: (raises: []).} = + await noCancel allFutures(self.pieces.mapIt(it.handle.cancelAndWait)) + proc downloadPieces*(self: TorrentDownloader): Future[void] {.async: (raises: []).} = try: while not self.queue.empty: @@ -249,33 +187,6 @@ proc downloadPieces*(self: TorrentDownloader): Future[void] {.async: (raises: [] finally: await noCancel self.cancel() -# proc downloadPieces*(self: TorrentDownloader): Future[?!void] {.async: (raises: []).} = -# try: -# while not self.queue.empty: -# let piece = self.queue.popFirstNoWait() -# if err =? (await self.fetchPiece(piece)).errorOption: -# error "Could not fetch piece", err = err.msg -# # add the piece to the end of the queue -# # to try to fetch the piece again -# self.queue.addLastNoWait(piece) -# continue -# else: -# # piece fetched and validated successfully -# # mark it as ready -# piece.handle.complete() -# await sleepAsync(1.millis) -# except CancelledError: -# trace "Downloading pieces cancelled" -# except AsyncQueueFullError as e: -# error "Queue is full", error = e.msg -# return failure e -# except AsyncQueueEmptyError as e: -# error "Trying to pop from empty queue", error = e.msg -# return failure e -# finally: -# await noCancel self.cancel() -# success() - proc getNext*( self: TorrentDownloader ): Future[?!(int, seq[byte])] {.async: (raises: []).} = @@ -283,7 +194,7 @@ proc getNext*( if self.pieceIndex == -1: return success((-1, newSeq[byte]())) if self.blockIter.finished: - trace "Waiting for piece", pieceIndex = self.pieceIndex + trace "Waiting for piece to be ready" self.pieceIndex = await self.waitForNextPiece() trace "Got piece", pieceIndex = self.pieceIndex if self.pieceIndex == -1: @@ -317,29 +228,70 @@ proc stop*(self: TorrentDownloader) {.async.} = await noCancel self.cancel() await noCancel self.trackedFutures.cancelTracked() -################################################################# -# Previous API, keeping it for now, probably will not be needed -# -################################################################# +proc newTorrentPiece*( + pieceIndex: int, pieceHash: MultiHash, blockIndexStart: int, blockIndexEnd: int +): TorrentPiece = + TorrentPiece( + pieceIndex: pieceIndex, + pieceHash: pieceHash, + blockIndexStart: blockIndexStart, + blockIndexEnd: blockIndexEnd, + handle: cast[PieceHandle](newFuture[void]("PieceValidator.newTorrentPiece")), + ) -proc waitForPiece*( - self: TorrentDownloader, index: int -): Future[?!void] {.async: (raises: [CancelledError]).} = - if index < 0 or index >= self.pieces.len: - return failure("Invalid piece index") - await self.pieces[index].handle - success() +proc newTorrentDownloader*( + torrentManifest: BitTorrentManifest, + codexManifest: Manifest, + networkStore: NetworkStore, +): ?!TorrentDownloader = + let + blocksCount = codexManifest.blocksCount + numOfPieces = torrentManifest.info.pieces.len + numOfBlocksPerPiece = + torrentManifest.info.pieceLength.int div codexManifest.blockSize.int + numOfBlocksInLastPiece = blocksCount - (numOfBlocksPerPiece * (numOfPieces - 1)) -proc cancelPiece*( - self: TorrentDownloader, index: int -): Future[?!void] {.async: (raises: [CancelledError]).} = - if index < 0 or index >= self.pieces.len: - return failure("Invalid piece index") - await noCancel self.pieces[index].handle.cancelAndWait() - success() + let pieces = collect: + for i in 0 ..< numOfPieces: + var blockIndexEnd = ((i + 1) * numOfBlocksPerPiece) - 1 + if i == numOfPieces - 1: + # last piece can have less blocks than numOfBlocksPerPiece + blockIndexEnd = i * numOfBlocksPerPiece + numOfBlocksInLastPiece - 1 -proc confirmPiece*(self: TorrentDownloader, index: int): ?!void {.raises: [].} = - if index < 0 or index >= self.pieces.len: - return failure("Invalid piece index") - self.pieces[index].handle.complete() - success() + let piece = newTorrentPiece( + pieceIndex = i, + pieceHash = torrentManifest.info.pieces[i], + blockIndexStart = i * numOfBlocksPerPiece, + blockIndexEnd = blockIndexEnd, + ) + piece + + let queue = newAsyncQueue[TorrentPiece](maxsize = numOfPieces) + + let iter = Iter.new(0 ..< numOfPieces) + var pieceDownloadSequence = newSeqWith(numOfPieces, iter.next()) + # optional: randomize the order of pieces + # not sure if this is such a great idea when streaming content + # Rng.instance.shuffle(pieceDownloadSequence) + + trace "Piece download sequence", pieceDownloadSequence + + for i in pieceDownloadSequence: + try: + queue.addLastNoWait(pieces[i]) + except AsyncQueueFullError: + raiseAssert "Fatal: could not add pieces to queue" + + TorrentDownloader( + torrentManifest: torrentManifest, + codexManifest: codexManifest, + networkStore: networkStore, + numberOfPieces: numOfPieces, + numberOfBlocksPerPiece: numOfBlocksPerPiece, + pieces: pieces, + waitIter: Iter[int].new(0 ..< numOfPieces), + blockIter: Iter[int].empty(), + pieceIndex: 0, + queue: queue, + trackedFutures: TrackedFutures(), + ).success diff --git a/codex/node.nim b/codex/node.nim index 164d6ca0..2ffd0495 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -50,7 +50,7 @@ import ./utils/trackedfutures # bittorrent from ./codextypes import InfoHashV1Codec import ./bittorrent/manifest -import ./bittorrent/piecevalidator +import ./bittorrent/torrentdownloader export logutils @@ -399,90 +399,10 @@ proc retrieve*( await self.streamEntireDataset(manifest, cid) -proc validatePiece( - self: CodexNodeRef, pieceValidator: TorrentPieceValidator, blocks: seq[bt.Block] -): ?!void {.raises: [].} = - trace "Fetched complete torrent piece - verifying..." - let pieceIndex = pieceValidator.validatePiece(blocks) - - if pieceIndex < 0: - error "Piece verification failed", pieceIndex = pieceIndex - return failure(fmt"Piece verification failed for {pieceIndex=}") - - trace "Piece successfully verified", pieceIndex - - let confirmedPieceIndex = pieceValidator.confirmCurrentPiece() - - if pieceIndex != confirmedPieceIndex: - error "Piece confirmation failed", - pieceIndex = pieceIndex, confirmedPieceIndex = confirmedPieceIndex - return - failure(fmt"Piece confirmation failed for {pieceIndex=}, {confirmedPieceIndex=}") - success() - -proc fetchPieces*( - self: CodexNodeRef, - torrentManifest: BitTorrentManifest, - codexManifest: Manifest, - pieceValidator: TorrentPieceValidator, -): Future[?!void] {.async: (raises: [CancelledError]).} = - let cid = codexManifest.treeCid - let numOfBlocksPerPiece = pieceValidator.numberOfBlocksPerPiece - let blockIter = Iter[int].new(0 ..< codexManifest.blocksCount) - while not blockIter.finished: - let blockFutures = collect: - for i in 0 ..< numOfBlocksPerPiece: - if not blockIter.finished: - let address = BlockAddress.init(cid, blockIter.next()) - self.networkStore.getBlock(address) - - without blockResults =? await allFinishedValues(blockFutures), err: - return failure(err) - - let numOfFailedBlocks = blockResults.countIt(it.isFailure) - if numOfFailedBlocks > 0: - return failure("Some blocks failed to fetch") - - if err =? self.validatePiece(pieceValidator, blockResults.mapIt(it.get)).errorOption: - return failure(err) - - await sleepAsync(1.millis) - - success() - -proc streamTorrent*( - self: CodexNodeRef, - torrentManifest: BitTorrentManifest, - codexManifest: Manifest, - pieceValidator: TorrentPieceValidator, -): Future[LPStream] {.async: (raises: []).} = - trace "Retrieving pieces from torrent" - let stream = LPStream(StoreStream.new(self.networkStore, codexManifest, pad = false)) - - proc prefetch(): Future[void] {.async: (raises: []).} = - try: - if err =? (await self.fetchPieces(torrentManifest, codexManifest, pieceValidator)).errorOption: - error "Unable to fetch blocks", err = err.msg - await noCancel pieceValidator.cancel() - await noCancel stream.close() - except CancelledError: - trace "Prefetch cancelled" - - let prefetchTask = prefetch() - - # Monitor stream completion and cancel background jobs when done - proc monitorStream() {.async: (raises: []).} = - try: - await stream.join() - except CancelledError: - trace "Stream cancelled" - finally: - await noCancel prefetchTask.cancelAndWait - - self.trackedFutures.track(monitorStream()) - - trace "Creating store stream for torrent manifest" - stream +proc getTorrentDownloader*( + self: CodexNodeRef, torrentManifest: BitTorrentManifest, codexManifest: Manifest +): ?!TorrentDownloader = + newTorrentDownloader(torrentManifest, codexManifest, self.networkStore) proc retrieveTorrent*( self: CodexNodeRef, infoHash: MultiHash diff --git a/codex/rest/api.nim b/codex/rest/api.nim index cb1403d0..0be3a455 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -41,7 +41,7 @@ import ../stores import ../utils/iter import ../utils/options import ../bittorrent/manifest -import ../bittorrent/piecevalidator +import ../bittorrent/torrentdownloader import ./coders import ./json @@ -165,7 +165,7 @@ proc retrieveInfoHash( ## Download torrent from the node in a streaming ## manner ## - var stream: LPStream + var torrentDownloader: TorrentDownloader var bytes = 0 try: @@ -189,37 +189,31 @@ proc retrieveInfoHash( else: resp.setHeader("Content-Disposition", "attachment") - await resp.prepareChunked() + let contentLength = codexManifest.datasetSize + resp.setHeader("Content-Length", $(contentLength.int)) - let torrentPieceValidator = newTorrentPieceValidator(torrentManifest, codexManifest) + await resp.prepare(HttpResponseStreamType.Plain) - let stream = - await node.streamTorrent(torrentManifest, codexManifest, torrentPieceValidator) + without torrentDownloader =? + node.getTorrentDownloader(torrentManifest, codexManifest), err: + error "Unable to stream torrent", err = err.msg + resp.status = Http500 + await resp.sendBody(err.msg) + return - while not stream.atEof: - trace "Waiting for piece..." - let pieceIndex = await torrentPieceValidator.waitForNextPiece() + torrentDownloader.start() - if -1 == pieceIndex: - warn "No more torrent pieces expected. TorrentPieceValidator might be out of sync!" - break + while not torrentDownloader.finished: + without (blockIndex, data) =? (await torrentDownloader.getNext()), err: + error "Error streaming blocks", err = err.msg + resp.status = Http500 + if resp.isPending(): + await resp.sendBody(err.msg) + return + trace "streaming block", blockIndex, len = data.len + bytes += data.len + await resp.sendChunk(addr data[0], data.len) - trace "Got piece", pieceIndex - - let blocksPerPieceIter = torrentPieceValidator.getNewBlocksPerPieceIterator() - while not blocksPerPieceIter.finished and not stream.atEof: - var buff = newSeqUninitialized[byte](BitTorrentBlockSize.int) - # wait for the next the piece to prefetch - let len = await stream.readOnce(addr buff[0], buff.len) - - buff.setLen(len) - if buff.len <= 0: - break - - bytes += buff.len - - await resp.sendChunk(addr buff[0], buff.len) - discard blocksPerPieceIter.next() await resp.finish() codex_api_downloads.inc() except CancelledError as exc: @@ -232,8 +226,7 @@ proc retrieveInfoHash( await resp.sendBody(exc.msg) finally: info "Sent bytes for torrent", infoHash = $infoHash, bytes - if not stream.isNil: - await stream.close() + await torrentDownloader.stop() proc buildCorsHeaders( httpMethod: string, allowedOrigin: Option[string] diff --git a/tests/codex/bittorrent/testtorrentdownloader.nim b/tests/codex/bittorrent/testtorrentdownloader.nim index 9cec01e6..29a7ad26 100644 --- a/tests/codex/bittorrent/testtorrentdownloader.nim +++ b/tests/codex/bittorrent/testtorrentdownloader.nim @@ -1,3 +1,12 @@ +## Nim-Codex +## Copyright (c) 2025 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + import std/importutils # private access import pkg/libp2p/[cid, multicodec, multihash] @@ -107,7 +116,7 @@ asyncchecksuite "Torrent Downloader": let treeCid = codexManifest.treeCid var pieceHashCtx: sha1 pieceHashCtx.init() - let blockIter = torrentDownloader.getNewBlocksInPieceIterator(pieceIndex).tryGet + let blockIter = torrentDownloader.getNewBlockIterator(pieceIndex).tryGet let blks = newSeq[Block]() while not blockIter.finished: let blockIndex = blockIter.next() @@ -162,7 +171,7 @@ asyncchecksuite "Torrent Downloader": check piece.blockIndexEnd == expectedBlockIndexEnd check torrentDownloader.numberOfBlocksInPiece(index).tryGet == expectedNumOfBlocksInPiece - let blockIterator = torrentDownloader.getNewBlocksInPieceIterator(index).tryGet + let blockIterator = torrentDownloader.getNewBlockIterator(index).tryGet for blkIndex in expectedBlockIndexStart .. expectedBlockIndexEnd: check blkIndex == blockIterator.next() check blockIterator.finished == true