From 45ab5eee9247b7d3f48757dcf2de957c4145381f Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Thu, 20 Mar 2025 02:12:02 +0100 Subject: [PATCH] adds torrent piece validator abstraction to keep streaming in sync with piece validation --- codex/bittorrent/piecevalidator.nim | 134 ++++++++++++++++ codex/node.nim | 33 ++-- codex/rest/api.nim | 51 +++--- tests/codex/bittorrent/testpiecevalidator.nim | 151 ++++++++++++++++++ 4 files changed, 334 insertions(+), 35 deletions(-) create mode 100644 codex/bittorrent/piecevalidator.nim create mode 100644 tests/codex/bittorrent/testpiecevalidator.nim diff --git a/codex/bittorrent/piecevalidator.nim b/codex/bittorrent/piecevalidator.nim new file mode 100644 index 00000000..e982865f --- /dev/null +++ b/codex/bittorrent/piecevalidator.nim @@ -0,0 +1,134 @@ +{.push raises: [].} + +import std/sequtils +import pkg/chronos +import pkg/libp2p/multihash +import pkg/questionable/results + +import ../utils/iter +import ../manifest +import ../blocktype +import ./manifest + +type + PieceHandle* = Future[void].Raising([CancelledError]) + TorrentPieceValidator* = ref object + torrentManifest: BitTorrentManifest + numberOfPieces: int + numberOfBlocksPerPiece: int + pieces: seq[PieceHandle] + waitIter: Iter[int] + confirmIter: Iter[int] + validationIter: Iter[int] + +proc newTorrentPieceValidator*( + torrentManifest: BitTorrentManifest, codexManifest: Manifest +): TorrentPieceValidator = + let numOfPieces = torrentManifest.info.pieces.len + let numOfBlocksPerPiece = + torrentManifest.info.pieceLength.int div codexManifest.blockSize.int + let pieces = newSeqWith( + numOfPieces, + cast[PieceHandle](newFuture[void]("PieceValidator.newTorrentPieceValidator")), + ) + + TorrentPieceValidator( + torrentManifest: torrentManifest, + numberOfPieces: numOfPieces, + numberOfBlocksPerPiece: numOfBlocksPerPiece, + pieces: pieces, + waitIter: Iter[int].new(0 ..< numOfPieces), + confirmIter: Iter[int].new(0 ..< numOfPieces), + validationIter: Iter[int].new(0 ..< numOfPieces), + ) + +func numberOfBlocksPerPiece*(self: TorrentPieceValidator): int = + self.numberOfBlocksPerPiece + +proc getNewPieceIterator*(self: TorrentPieceValidator): Iter[int] = + Iter[int].new(0 ..< self.numberOfPieces) + +proc getNewBlocksPerPieceIterator*(self: TorrentPieceValidator): Iter[int] = + Iter[int].new(0 ..< self.numberOfBlocksPerPiece) + +proc waitForNextPiece*( + self: TorrentPieceValidator +): Future[bool] {.async: (raises: [CancelledError]).} = + if self.waitIter.finished: + return false + await self.pieces[self.waitIter.next()] + true + +proc confirmCurrentPiece*(self: TorrentPieceValidator): bool {.raises: [].} = + if self.confirmIter.finished: + return false + self.pieces[self.confirmIter.next()].complete() + true + +proc cancel*(self: TorrentPieceValidator): Future[void] {.async: (raises: []).} = + await noCancel allFutures(self.pieces.mapIt(it.cancelAndWait)) + +proc validatePiece*( + self: TorrentPieceValidator, blocks: seq[Block] +): bool {.raises: [].} = + var pieceHashCtx: sha1 + pieceHashCtx.init() + + for blk in blocks: + pieceHashCtx.update(blk.data) + + let computedPieceHash = pieceHashCtx.finish() + + if (computedPieceHash != self.torrentManifest.info.pieces[self.validationIter.next()]): + return false + + true + +################################################################# +# Previous API, keeping it for now, probably will not be needed +# +################################################################# + +proc waitForPiece*( + self: TorrentPieceValidator, index: int +): Future[?!void] {.async: (raises: [CancelledError]).} = + if index < 0 or index >= self.pieces.len: + return failure("Invalid piece index") + await self.pieces[index] + success() + +proc cancelPiece*( + self: TorrentPieceValidator, index: int +): Future[?!void] {.async: (raises: [CancelledError]).} = + if index < 0 or index >= self.pieces.len: + return failure("Invalid piece index") + await noCancel self.pieces[index].cancelAndWait() + success() + +proc markPieceAsValid*(self: TorrentPieceValidator, index: int): ?!void {.raises: [].} = + if index < 0 or index >= self.pieces.len: + return failure("Invalid piece index") + self.pieces[index].complete() + success() + +proc validatePiece*( + self: TorrentPieceValidator, blocks: seq[Block], index: int +): ?!void {.raises: [].} = + if index < 0 or index >= self.pieces.len: + return failure("Invalid piece index") + + var pieceHashCtx: sha1 + pieceHashCtx.init() + + for blk in blocks: + pieceHashCtx.update(blk.data) + + let computedPieceHash = pieceHashCtx.finish() + + # if index == 1: + # return failure("Piece verification failed (simulated)") + + if (computedPieceHash != self.torrentManifest.info.pieces[index]): + return failure("Piece verification failed") + + success() diff --git a/codex/node.nim b/codex/node.nim index fec2c6b1..888ce154 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -50,6 +50,7 @@ import ./utils/trackedfutures # bittorrent from ./codextypes import InfoHashV1Codec import ./bittorrent/manifest +import ./bittorrent/piecevalidator export logutils @@ -441,8 +442,11 @@ proc fetchPieces*( ) proc streamTorrent*( - self: CodexNodeRef, torrentManifest: BitTorrentManifest, codexManifest: Manifest -): Future[?!LPStream] {.async: (raises: []).} = + self: CodexNodeRef, + torrentManifest: BitTorrentManifest, + codexManifest: Manifest, + pieceValidator: TorrentPieceValidator, +): Future[LPStream] {.async: (raises: []).} = trace "Retrieving pieces from torrent" let stream = LPStream(StoreStream.new(self.networkStore, codexManifest, pad = false)) var jobs: seq[Future[void]] @@ -450,20 +454,14 @@ proc streamTorrent*( proc onPieceReceived(blocks: seq[bt.Block], pieceIndex: int): ?!void {.raises: [].} = trace "Fetched torrent piece - verifying..." - var pieceHashCtx: sha1 - pieceHashCtx.init() + if err =? pieceValidator.validatePiece(blocks, pieceIndex).errorOption: + error "Piece verification failed", pieceIndex = pieceIndex, err = err.msg + return failure(err) - for blk in blocks: - pieceHashCtx.update(blk.data) - - let pieceHash = pieceHashCtx.finish() - - if (pieceHash != torrentManifest.info.pieces[pieceIndex]): - error "Piece verification failed", pieceIndex = pieceIndex - return failure("Piece verification failed") - - trace "Piece verified", pieceIndex, pieceHash - # great success + if err =? pieceValidator.markPieceAsValid(pieceIndex).errorOption: + error "Unable to mark piece as valid", pieceIndex = pieceIndex + return failure("Unable to mark piece as valid") + trace "Piece verified", pieceIndex success() proc prefetch(): Future[void] {.async: (raises: []).} = @@ -472,7 +470,8 @@ proc streamTorrent*( await self.fetchPieces(torrentManifest, codexManifest, onPieceReceived) ).errorOption: error "Unable to fetch blocks", err = err.msg - await stream.close() + await noCancel pieceValidator.cancel() + await noCancel stream.close() except CancelledError: trace "Prefetch cancelled" @@ -490,7 +489,7 @@ proc streamTorrent*( self.trackedFutures.track(monitorStream()) trace "Creating store stream for torrent manifest" - stream.success + stream proc retrieveTorrent*( self: CodexNodeRef, infoHash: MultiHash diff --git a/codex/rest/api.nim b/codex/rest/api.nim index a63a241d..76e2cc32 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -38,8 +38,10 @@ import ../erasure/erasure import ../manifest import ../streams/asyncstreamwrapper import ../stores +import ../utils/iter import ../utils/options import ../bittorrent/manifest +import ../bittorrent/piecevalidator import ./coders import ./json @@ -174,16 +176,6 @@ proc retrieveInfoHash( return let (torrentManifest, codexManifest) = torrent - without stream =? (await node.streamTorrent(torrentManifest, codexManifest)), err: - if err of BlockNotFoundError: - resp.status = Http404 - await resp.sendBody("") - return - else: - resp.status = Http500 - await resp.sendBody(err.msg) - return - if codexManifest.mimetype.isSome: resp.setHeader("Content-Type", codexManifest.mimetype.get()) else: @@ -199,21 +191,44 @@ proc retrieveInfoHash( await resp.prepareChunked() - while not stream.atEof: - var - buff = newSeqUninitialized[byte](BitTorrentBlockSize.int) - len = await stream.readOnce(addr buff[0], buff.len) + let torrentPieceValidator = newTorrentPieceValidator(torrentManifest, codexManifest) - buff.setLen(len) - if buff.len <= 0: + let stream = + await node.streamTorrent(torrentManifest, codexManifest, torrentPieceValidator) + + let pieceIter = torrentPieceValidator.getNewPieceIterator() + + var pieceIndex = 0 + + while not pieceIter.finished and not stream.atEof: + trace "Waiting for piece", pieceIndex + if not (await torrentPieceValidator.waitForNextPiece()): + warn "No more torrent pieces expected. TorrentPieceValidator out of sync" break - bytes += buff.len + trace "Got piece", pieceIndex + inc pieceIndex - await resp.sendChunk(addr buff[0], buff.len) + let blocksPerPieceIter = torrentPieceValidator.getNewBlocksPerPieceIterator() + while not stream.atEof: + if blocksPerPieceIter.finished: + break + var buff = newSeqUninitialized[byte](BitTorrentBlockSize.int) + # wait for the next the piece to prefetch + let len = await stream.readOnce(addr buff[0], buff.len) + + buff.setLen(len) + if buff.len <= 0: + break + + bytes += buff.len + + await resp.sendChunk(addr buff[0], buff.len) + discard blocksPerPieceIter.next() await resp.finish() codex_api_downloads.inc() except CancelledError as exc: + info "Stream cancelled", exc = exc.msg raise exc except CatchableError as exc: warn "Error streaming blocks", exc = exc.msg diff --git a/tests/codex/bittorrent/testpiecevalidator.nim b/tests/codex/bittorrent/testpiecevalidator.nim new file mode 100644 index 00000000..f28f1df5 --- /dev/null +++ b/tests/codex/bittorrent/testpiecevalidator.nim @@ -0,0 +1,151 @@ +import std/strformat + +import pkg/libp2p/[cid, multicodec, multihash] +import pkg/questionable/results + +import ../../asynctest +import ../examples + +import pkg/codex/manifest +import pkg/codex/bittorrent/manifest +import pkg/codex/bittorrent/piecevalidator + +suite "Torrent PieceValidator": + const numOfPieces = 10 + const pieceLength = 65536 + const contentLength = pieceLength * numOfPieces + let pieces = newSeqWith(numOfPieces, MultiHash.example(Sha1HashCodec)) + let exampleInfo = BitTorrentInfo( + length: contentLength, + pieceLength: pieceLength, + pieces: pieces, + name: "data.bin".some, + ) + let dummyCodexManifestCid = Cid.example() + let exampleTorrentManifest = + newBitTorrentManifest(info = exampleInfo, codexManifestCid = dummyCodexManifestCid) + let infoBencoded = exampleInfo.bencode() + let infoHash = MultiHash.digest($Sha1HashCodec, infoBencoded).tryGet + let exampleCodexManifest = Manifest.new( + treeCid = Cid.example, + blockSize = BitTorrentBlockSize.NBytes, + datasetSize = exampleInfo.length.NBytes, + filename = exampleInfo.name, + mimetype = "application/octet-stream".some, + ) + + var pieceValidator: TorrentPieceValidator + + setup: + pieceValidator = + newTorrentPieceValidator(exampleTorrentManifest, exampleCodexManifest) + + test "correctly sets numberOfBlocksPerPiece": + check pieceValidator.numberOfBlocksPerPiece == + exampleInfo.pieceLength.int div exampleCodexManifest.blockSize.int + + test "reports an error when trying to wait for an invalid piece": + let res = await pieceValidator.waitForPiece(exampleTorrentManifest.info.pieces.len) + check isFailure(res) + check res.error.msg == "Invalid piece index" + + test "reports an error when trying to mark an invalid piece as valid": + let res = pieceValidator.markPieceAsValid(exampleTorrentManifest.info.pieces.len) + check isFailure(res) + check res.error.msg == "Invalid piece index" + + for i in 0 ..< exampleTorrentManifest.info.pieces.len: + test fmt"can await piece {i}": + let fut = pieceValidator.waitForPiece(i) + check pieceValidator.markPieceAsValid(i) == success() + check (await fut) == success() + + test "awaiting for piece can be cancelled": + let pieceIndex = 0 + let fut = pieceValidator.waitForPiece(pieceIndex) + check (await pieceValidator.cancelPiece(pieceIndex)) == success() + let res = catch(await fut) + check isFailure(res) + check res.error of CancelledError + + test "all pieces can be cancelled": + let fut1 = pieceValidator.waitForPiece(1) + let fut2 = pieceValidator.waitForPiece(2) + + await pieceValidator.cancel() + + let res1 = catch(await fut1) + check isFailure(res1) + check res1.error of CancelledError + let res2 = catch(await fut2) + check isFailure(res2) + check res2.error of CancelledError + + test "awaiting all pieces sequentially": + let numberOfPieces = exampleTorrentManifest.info.pieces.len + for i in 0 ..< numberOfPieces: + let fut = pieceValidator.waitForNextPiece() + check pieceValidator.confirmCurrentPiece() + check await fut + + test "awaiting is independent from confirming": + let numberOfPieces = exampleTorrentManifest.info.pieces.len + var futs = newSeq[Future[bool]](numberOfPieces) + for i in 0 ..< numberOfPieces: + futs[i] = pieceValidator.waitForNextPiece() + for i in 0 ..< numberOfPieces: + check pieceValidator.confirmCurrentPiece() + for i in 0 ..< numberOfPieces: + check await futs[i] + + test "sequential validation of blocks": + let blocksInPieces = newSeqWith( + numOfPieces, + newSeqWith( + pieceLength div BitTorrentBlockSize.int, Block.example(BitTorrentBlockSize.int) + ), + ) + var pieces = newSeq[MultiHash](blocksInPieces.len) + for i in 0 ..< blocksInPieces.len: + let blocks = blocksInPieces[i] + var pieceHashCtx: sha1 + pieceHashCtx.init() + for blk in blocks: + pieceHashCtx.update(blk.data) + pieces[i] = MultiHash.init($Sha1HashCodec, pieceHashCtx.finish()).tryGet + + let info = BitTorrentInfo( + length: contentLength, + pieceLength: pieceLength, + pieces: pieces, + name: "data.bin".some, + ) + let manifestCid = Cid.example() + let torrentManifest = + newBitTorrentManifest(info = info, codexManifestCid = manifestCid) + let codexManifest = Manifest.new( + treeCid = Cid.example, + blockSize = BitTorrentBlockSize.NBytes, + datasetSize = info.length.NBytes, + filename = info.name, + mimetype = "application/octet-stream".some, + ) + + pieceValidator = newTorrentPieceValidator(torrentManifest, codexManifest) + + for blks in blocksInPieces: + # streaming client will wait on the piece validator to validate the piece + let fut = pieceValidator.waitForNextPiece() + + # during prefetch we will validate each piece sequentially + # piece validator maintains internal iterators in its object + # to keep track of the validation order + check pieceValidator.validatePiece(blks) + + # after piece is validated, the prefetch task will confirm the piece + # again, using internal state, the validator knows which piece to confirm + check pieceValidator.confirmCurrentPiece() + + # the fut will be resolved after the piece is confirmed + # and the streaming client can continue + check await fut