From 927e67e0c38cdbb35ab8e49fa7f143e96537a0b4 Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Thu, 20 Mar 2025 03:14:33 +0100 Subject: [PATCH] updates torrent streaming to take advantage of the new interface of the piece validator --- codex/node.nim | 75 +++++++++++++++++++++----------------------------- 1 file changed, 32 insertions(+), 43 deletions(-) diff --git a/codex/node.nim b/codex/node.nim index 888ce154..b64baf96 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -399,14 +399,36 @@ proc retrieve*( await self.streamEntireDataset(manifest, cid) +proc validatePiece( + self: CodexNodeRef, pieceValidator: TorrentPieceValidator, blocks: seq[bt.Block] +): ?!void {.raises: [].} = + trace "Fetched complete torrent piece - verifying..." + let pieceIndex = pieceValidator.validatePiece(blocks) + + if pieceIndex < 0: + error "Piece verification failed", pieceIndex = pieceIndex + return failure(fmt"Piece verification failed for {pieceIndex=}") + + trace "Piece successfully verified", pieceIndex + + let confirmedPieceIndex = pieceValidator.confirmCurrentPiece() + + if pieceIndex != confirmedPieceIndex: + error "Piece confirmation failed", + pieceIndex = pieceIndex, confirmedPieceIndex = confirmedPieceIndex + return + failure(fmt"Piece confirmation failed for {pieceIndex=}, {confirmedPieceIndex=}") + success() + proc fetchPieces*( self: CodexNodeRef, - cid: Cid, - blockIter: Iter[int], - pieceIter: Iter[int], - numOfBlocksPerPiece: int, - onPiece: PieceProc, + torrentManifest: BitTorrentManifest, + codexManifest: Manifest, + pieceValidator: TorrentPieceValidator, ): Future[?!void] {.async: (raises: [CancelledError]).} = + let cid = codexManifest.treeCid + let numOfBlocksPerPiece = pieceValidator.numberOfBlocksPerPiece + let blockIter = Iter[int].new(0 ..< codexManifest.blocksCount) while not blockIter.finished: let blockFutures = collect: for i in 0 ..< numOfBlocksPerPiece: @@ -417,30 +439,13 @@ proc fetchPieces*( without blocks =? await allFinishedValues(blockFutures), err: return failure(err) - if pieceErr =? (onPiece(blocks, pieceIter.next())).errorOption: - return failure(pieceErr) + if err =? self.validatePiece(pieceValidator, blocks).errorOption: + return failure(err) await sleepAsync(1.millis) success() -proc fetchPieces*( - self: CodexNodeRef, - torrentManifest: BitTorrentManifest, - codexManifest: Manifest, - onPiece: PieceProc, -): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} = - trace "Fetching torrent pieces" - - let numOfPieces = torrentManifest.info.pieces.len - let numOfBlocksPerPiece = - torrentManifest.info.pieceLength.int div codexManifest.blockSize.int - let blockIter = Iter[int].new(0 ..< codexManifest.blocksCount) - let pieceIter = Iter[int].new(0 ..< numOfPieces) - self.fetchPieces( - codexManifest.treeCid, blockIter, pieceIter, numOfBlocksPerPiece, onPiece - ) - proc streamTorrent*( self: CodexNodeRef, torrentManifest: BitTorrentManifest, @@ -449,33 +454,17 @@ proc streamTorrent*( ): Future[LPStream] {.async: (raises: []).} = trace "Retrieving pieces from torrent" let stream = LPStream(StoreStream.new(self.networkStore, codexManifest, pad = false)) - var jobs: seq[Future[void]] - - proc onPieceReceived(blocks: seq[bt.Block], pieceIndex: int): ?!void {.raises: [].} = - trace "Fetched torrent piece - verifying..." - - if err =? pieceValidator.validatePiece(blocks, pieceIndex).errorOption: - error "Piece verification failed", pieceIndex = pieceIndex, err = err.msg - return failure(err) - - if err =? pieceValidator.markPieceAsValid(pieceIndex).errorOption: - error "Unable to mark piece as valid", pieceIndex = pieceIndex - return failure("Unable to mark piece as valid") - trace "Piece verified", pieceIndex - success() proc prefetch(): Future[void] {.async: (raises: []).} = try: - if err =? ( - await self.fetchPieces(torrentManifest, codexManifest, onPieceReceived) - ).errorOption: + if err =? (await self.fetchPieces(torrentManifest, codexManifest, pieceValidator)).errorOption: error "Unable to fetch blocks", err = err.msg await noCancel pieceValidator.cancel() await noCancel stream.close() except CancelledError: trace "Prefetch cancelled" - jobs.add(prefetch()) + let prefetchTask = prefetch() # Monitor stream completion and cancel background jobs when done proc monitorStream() {.async: (raises: []).} = @@ -484,7 +473,7 @@ proc streamTorrent*( except CancelledError: trace "Stream cancelled" finally: - await noCancel allFutures(jobs.mapIt(it.cancelAndWait)) + await noCancel prefetchTask.cancelAndWait self.trackedFutures.track(monitorStream())