From 2aa91e5f93bde4a97868859d2434e03db82e0527 Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Wed, 26 Mar 2025 18:19:18 +0100 Subject: [PATCH] adds more tests and ability to retrieve the downloaded blocks --- codex/bittorrent/torrentdownloader.nim | 85 +++++++++++- .../bittorrent/testtorrentdownloader.nim | 124 ++++++++++++------ 2 files changed, 161 insertions(+), 48 deletions(-) diff --git a/codex/bittorrent/torrentdownloader.nim b/codex/bittorrent/torrentdownloader.nim index 1167ae22..93244ab4 100644 --- a/codex/bittorrent/torrentdownloader.nim +++ b/codex/bittorrent/torrentdownloader.nim @@ -1,14 +1,16 @@ {.push raises: [].} +# import std/asyncstreams import std/sequtils import std/sugar import pkg/chronos import pkg/libp2p/multihash import pkg/questionable/results -import ../rng +# import ../rng import ../logutils import ../utils/iter +import ../utils/trackedfutures import ../errors import ../manifest import ../blocktype @@ -35,7 +37,10 @@ type numberOfBlocksPerPiece: int pieces: seq[TorrentPiece] waitIter: Iter[int] + blockIter: Iter[int] + pieceIndex: int queue: AsyncQueue[TorrentPiece] + trackedFutures: TrackedFutures proc newTorrentPiece*( pieceIndex: int, pieceHash: MultiHash, blockIndexStart: int, blockIndexEnd: int @@ -99,7 +104,10 @@ proc newTorrentDownloader*( numberOfBlocksPerPiece: numOfBlocksPerPiece, pieces: pieces, waitIter: Iter[int].new(0 ..< numOfPieces), + blockIter: Iter[int].empty(), + pieceIndex: 0, queue: queue, + trackedFutures: TrackedFutures(), ).success proc getNewBlockIterator(piece: TorrentPiece): Iter[int] = @@ -215,10 +223,11 @@ proc fetchPiece( success() -proc downloadPieces*(self: TorrentDownloader): Future[?!void] {.async: (raises: []).} = +proc downloadPieces*(self: TorrentDownloader): Future[void] {.async: (raises: []).} = try: while not self.queue.empty: let piece = self.queue.popFirstNoWait() + trace "Downloading piece", pieceIndex = piece.pieceIndex if err =? (await self.fetchPiece(piece)).errorOption: error "Could not fetch piece", err = err.msg # add the piece to the end of the queue @@ -228,19 +237,85 @@ proc downloadPieces*(self: TorrentDownloader): Future[?!void] {.async: (raises: else: # piece fetched and validated successfully # mark it as ready + trace "Piece fetched and validated", pieceIndex = piece.pieceIndex piece.handle.complete() await sleepAsync(1.millis) except CancelledError: trace "Downloading pieces cancelled" except AsyncQueueFullError as e: error "Queue is full", error = e.msg - return failure e except AsyncQueueEmptyError as e: error "Trying to pop from empty queue", error = e.msg - return failure e finally: await noCancel self.cancel() - success() + +# proc downloadPieces*(self: TorrentDownloader): Future[?!void] {.async: (raises: []).} = +# try: +# while not self.queue.empty: +# let piece = self.queue.popFirstNoWait() +# if err =? (await self.fetchPiece(piece)).errorOption: +# error "Could not fetch piece", err = err.msg +# # add the piece to the end of the queue +# # to try to fetch the piece again +# self.queue.addLastNoWait(piece) +# continue +# else: +# # piece fetched and validated successfully +# # mark it as ready +# piece.handle.complete() +# await sleepAsync(1.millis) +# except CancelledError: +# trace "Downloading pieces cancelled" +# except AsyncQueueFullError as e: +# error "Queue is full", error = e.msg +# return failure e +# except AsyncQueueEmptyError as e: +# error "Trying to pop from empty queue", error = e.msg +# return failure e +# finally: +# await noCancel self.cancel() +# success() + +proc getNext*( + self: TorrentDownloader +): Future[?!(int, seq[byte])] {.async: (raises: []).} = + try: + if self.pieceIndex == -1: + return success((-1, newSeq[byte]())) + if self.blockIter.finished: + trace "Waiting for piece", pieceIndex = self.pieceIndex + self.pieceIndex = await self.waitForNextPiece() + trace "Got piece", pieceIndex = self.pieceIndex + if self.pieceIndex == -1: + return success((-1, newSeq[byte]())) + else: + let piece = self.pieces[self.pieceIndex] + self.blockIter = piece.getNewBlockIterator() + let blockIndex = self.blockIter.next() + if blockIndex == self.codexManifest.blocksCount - 1: + self.pieceIndex = -1 + let address = BlockAddress.init(self.codexManifest.treeCid, blockIndex) + without blk =? (await self.networkStore.localStore.getBlock(address)), err: + error "Could not get block from local store", error = err.msg + return failure("Could not get block from local store: " & err.msg) + success((blockIndex, blk.data)) + except CancelledError: + trace "Getting next block from downloader cancelled" + return success((-1, newSeq[byte]())) + except CatchableError as e: + warn "Could not get block from local store", error = e.msg + return failure("Could not get block from local store: " & e.msg) + +proc finished*(self: TorrentDownloader): bool = + self.pieceIndex == -1 + +proc start*(self: TorrentDownloader) = + self.trackedFutures.track(self.downloadPieces()) + +proc stop*(self: TorrentDownloader) {.async.} = + self.pieceIndex = -1 + await noCancel self.cancel() + await noCancel self.trackedFutures.cancelTracked() ################################################################# # Previous API, keeping it for now, probably will not be needed diff --git a/tests/codex/bittorrent/testtorrentdownloader.nim b/tests/codex/bittorrent/testtorrentdownloader.nim index 47bd0669..9cec01e6 100644 --- a/tests/codex/bittorrent/testtorrentdownloader.nim +++ b/tests/codex/bittorrent/testtorrentdownloader.nim @@ -103,32 +103,45 @@ asyncchecksuite "Torrent Downloader": torrentManifestBlock = (await storeTorrentManifest(torrentManifest, localStore)).tryGet() - # setup: - # await createTestData(datasetSize = 40.KiBs.int) + proc validatePiece(torrentDownloader: TorrentDownloader, pieceIndex: int) {.async.} = + let treeCid = codexManifest.treeCid + var pieceHashCtx: sha1 + pieceHashCtx.init() + let blockIter = torrentDownloader.getNewBlocksInPieceIterator(pieceIndex).tryGet + let blks = newSeq[Block]() + while not blockIter.finished: + let blockIndex = blockIter.next() + let address = BlockAddress.init(treeCid, blockIndex) + let blk = (await localStore.getBlock(address)).tryGet() + trace "got block from local store", treeCid, blockIndex, cid = blk.cid + pieceHashCtx.update(blk.data) + let computedPieceHash = pieceHashCtx.finish() + let expectedPieceHash = torrentDownloader.pieces[pieceIndex].pieceHash + trace "comparing piece hashes", expectedPieceHash, computedPieceHash + check expectedPieceHash == computedPieceHash + trace "piece validated", treeCid, pieceIndex - # torrentDownloader = - # newTorrentDownloader(torrentManifest, codexManifest, networkStore).tryGet() - - test "correctly sets up the pieces": + setup: await createTestData(datasetSize = 72.KiBs.int) + torrentDownloader = + newTorrentDownloader(torrentManifest, codexManifest, networkStore).tryGet() + assert torrentInfo.pieces.len == 2 assert codexManifest.blocksCount == 5 + test "correctly sets up the pieces": let blocksCount = codexManifest.blocksCount let numOfPieces = torrentInfo.pieces.len # last piece can have less blocks than numOfBlocksPerPiece # we know how many blocks we have: let numOfBlocksInLastPiece = blocksCount - (numOfBlocksPerPiece * (numOfPieces - 1)) - torrentDownloader = - newTorrentDownloader(torrentManifest, codexManifest, networkStore).tryGet() - - echo "codeManifest: ", $codexManifest - echo "torrentInfo: ", $torrentInfo - echo "torrentManifest: ", $torrentManifest - echo "codexManifestBlockCid: ", $(codexManifestBlock.cid) - echo "torrentManifestBlockCid: ", $(torrentManifestBlock.cid) + # echo "codeManifest: ", $codexManifest + # echo "torrentInfo: ", $torrentInfo + # echo "torrentManifest: ", $torrentManifest + # echo "codexManifestBlockCid: ", $(codexManifestBlock.cid) + # echo "torrentManifestBlockCid: ", $(torrentManifestBlock.cid) check torrentDownloader.pieces.len == torrentInfo.pieces.len check torrentDownloader.numberOfBlocksPerPiece == numOfBlocksPerPiece @@ -155,35 +168,8 @@ asyncchecksuite "Torrent Downloader": check blockIterator.finished == true check piece.handle.finished == false - proc validatePiece(torrentDownloader: TorrentDownloader, pieceIndex: int) {.async.} = - let treeCid = codexManifest.treeCid - var pieceHashCtx: sha1 - pieceHashCtx.init() - let blockIter = torrentDownloader.getNewBlocksInPieceIterator(pieceIndex).tryGet - let blks = newSeq[Block]() - while not blockIter.finished: - let blockIndex = blockIter.next() - let address = BlockAddress.init(treeCid, blockIndex) - let blk = (await localStore.getBlock(address)).tryGet() - trace "got block from local store", treeCid, blockIndex, cid = blk.cid - pieceHashCtx.update(blk.data) - let computedPieceHash = pieceHashCtx.finish() - let expectedPieceHash = torrentDownloader.pieces[pieceIndex].pieceHash - trace "comparing piece hashes", expectedPieceHash, computedPieceHash - check expectedPieceHash == computedPieceHash - trace "piece validated", treeCid, pieceIndex - - test "downloading pieces": - await createTestData(datasetSize = 72.KiBs.int) - - assert torrentInfo.pieces.len == 2 - assert codexManifest.blocksCount == 5 - - torrentDownloader = - newTorrentDownloader(torrentManifest, codexManifest, networkStore).tryGet() - - # start background task - let downloadFut = torrentDownloader.downloadPieces() + test "pieces are validated": + torrentDownloader.start() let pieceIter = torrentDownloader.getNewPieceIterator() @@ -200,3 +186,55 @@ asyncchecksuite "Torrent Downloader": check (await torrentDownloader.waitForNextPiece()) == -1 check torrentDownloader.queue.empty + + test "get downloaded blocks": + torrentDownloader.start() + + let blockIter = Iter.new(0 ..< codexManifest.blocksCount) + + while not torrentDownloader.finished: + let dataFut = torrentDownloader.getNext() + let status = await dataFut.withTimeout(1.seconds) + assert status == true + let (blockIndex, data) = (await dataFut).tryGet() + trace "got data", blockIndex, len = data.len + let expectedBlockIndex = blockIter.next() + check blockIndex == expectedBlockIndex + let treeCid = codexManifest.treeCid + let address = BlockAddress.init(treeCid, expectedBlockIndex) + let blk = (await localStore.getBlock(address)).tryGet() + check blk.data == data + + check blockIter.finished + await torrentDownloader.stop() + + test "canceling download": + torrentDownloader.start() + + let blockIter = Iter.new(0 ..< codexManifest.blocksCount) + + var (blockIndex, data) = (await torrentDownloader.getNext()).tryGet() + + check blockIndex == 0 + check data.len > 0 + + await torrentDownloader.stop() + + (blockIndex, data) = (await torrentDownloader.getNext()).tryGet() + check blockIndex == -1 + check data.len == 0 + + test "stoping before starting (simulate cancellation)": + let blockIter = Iter.new(0 ..< codexManifest.blocksCount) + + # download did not even start, thus this one will not complete + let dataFut = torrentDownloader.getNext() + + # calling stop will cancel awaiting for the next block + await torrentDownloader.stop() + + assert dataFut.finished + + let (blockIndex, data) = dataFut.read.tryGet() + check blockIndex == -1 + check data.len == 0