From c859250c4aa03bf52d9114efa70df58480fc7f81 Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Wed, 26 Mar 2025 03:08:51 +0100 Subject: [PATCH] adds some good weather tests for torrentdownloader --- codex/bittorrent/torrentdownloader.nim | 60 ++++++--- .../bittorrent/testtorrentdownloader.nim | 122 ++++++++++++++++-- tests/codex/testbittorrent.nim | 1 + 3 files changed, 152 insertions(+), 31 deletions(-) diff --git a/codex/bittorrent/torrentdownloader.nim b/codex/bittorrent/torrentdownloader.nim index f3170f66..1167ae22 100644 --- a/codex/bittorrent/torrentdownloader.nim +++ b/codex/bittorrent/torrentdownloader.nim @@ -16,7 +16,7 @@ import ../stores/networkstore import ./manifest logScope: - topics = "codex piecedownloader" + topics = "codex node torrentdownloader" type PieceHandle* = Future[void].Raising([CancelledError]) @@ -26,7 +26,6 @@ type blockIndexStart: int blockIndexEnd: int handle: PieceHandle - randomIndex: int TorrentDownloader* = ref object torrentManifest: BitTorrentManifest @@ -49,25 +48,30 @@ proc newTorrentPiece*( handle: cast[PieceHandle](newFuture[void]("PieceValidator.newTorrentPiece")), ) -proc randomize(self: TorrentPiece, numberOfPieces: int): void = - self.randomIndex = Rng.instance.rand(max = numberOfPieces * 10) - proc newTorrentDownloader*( torrentManifest: BitTorrentManifest, codexManifest: Manifest, networkStore: NetworkStore, ): ?!TorrentDownloader = - let numOfPieces = torrentManifest.info.pieces.len - let numOfBlocksPerPiece = - torrentManifest.info.pieceLength.int div codexManifest.blockSize.int + let + blocksCount = codexManifest.blocksCount + numOfPieces = torrentManifest.info.pieces.len + numOfBlocksPerPiece = + torrentManifest.info.pieceLength.int div codexManifest.blockSize.int + numOfBlocksInLastPiece = blocksCount - (numOfBlocksPerPiece * (numOfPieces - 1)) let pieces = collect: for i in 0 ..< numOfPieces: + var blockIndexEnd = ((i + 1) * numOfBlocksPerPiece) - 1 + if i == numOfPieces - 1: + # last piece can have less blocks than numOfBlocksPerPiece + blockIndexEnd = i * numOfBlocksPerPiece + numOfBlocksInLastPiece - 1 + let piece = newTorrentPiece( pieceIndex = i, pieceHash = torrentManifest.info.pieces[i], blockIndexStart = i * numOfBlocksPerPiece, - blockIndexEnd = ((i + 1) * numOfBlocksPerPiece) - 1, + blockIndexEnd = blockIndexEnd, ) piece @@ -98,17 +102,31 @@ proc newTorrentDownloader*( queue: queue, ).success -func numberOfBlocksPerPiece*(self: TorrentDownloader): int = - self.numberOfBlocksPerPiece +proc getNewBlockIterator(piece: TorrentPiece): Iter[int] = + Iter[int].new(piece.blockIndexStart .. piece.blockIndexEnd) + +func numberOfBlocks(piece: TorrentPiece): int = + piece.blockIndexEnd - piece.blockIndexStart + 1 + +func numberOfBlocksInPiece*(self: TorrentDownloader, pieceIndex: int): ?!int = + if pieceIndex < 0 or pieceIndex >= self.numberOfPieces: + return failure("Invalid piece index") + let piece = self.pieces[pieceIndex] + success(piece.numberOfBlocks) + +proc getNewBlocksInPieceIterator*( + self: TorrentDownloader, pieceIndex: int +): ?!Iter[int] = + if pieceIndex < 0 or pieceIndex >= self.numberOfPieces: + return failure("Invalid piece index") + let piece = self.pieces[pieceIndex] + success(piece.getNewBlockIterator()) proc getNewPieceIterator*(self: TorrentDownloader): Iter[int] = Iter[int].new(0 ..< self.numberOfPieces) -proc getNewBlocksPerPieceIterator*(self: TorrentDownloader): Iter[int] = - Iter[int].new(0 ..< self.numberOfBlocksPerPiece) - -proc getBlockIterator(self: TorrentPiece): Iter[int] = - Iter[int].new(self.blockIndexStart .. self.blockIndexEnd) +# proc getNewBlocksPerPieceIterator*(self: TorrentDownloader): Iter[int] = +# Iter[int].new(0 ..< self.numberOfBlocksPerPiece) proc waitForNextPiece*( self: TorrentDownloader @@ -122,7 +140,7 @@ proc waitForNextPiece*( proc cancel*(self: TorrentDownloader): Future[void] {.async: (raises: []).} = await noCancel allFutures(self.pieces.mapIt(it.handle.cancelAndWait)) -proc validate*(piece: TorrentPiece, blocks: seq[Block]): ?!void {.raises: [].} = +proc validate(piece: TorrentPiece, blocks: seq[Block]): ?!void {.raises: [].} = var pieceHashCtx: sha1 pieceHashCtx.init() @@ -136,7 +154,7 @@ proc validate*(piece: TorrentPiece, blocks: seq[Block]): ?!void {.raises: [].} = success() -proc allBlocksFinished*(futs: seq[Future[?!Block]]): seq[?!Block] {.raises: [].} = +proc allBlocksFinished(futs: seq[Future[?!Block]]): seq[?!Block] {.raises: [].} = ## If all futures have finished, return corresponding values, ## otherwise return failure ## @@ -154,7 +172,7 @@ proc deleteBlocks( self: TorrentDownloader, piece: TorrentPiece ): Future[void] {.async: (raises: [CancelledError]).} = let treeCid = self.codexManifest.treeCid - let blockIter = piece.getBlockIterator() + let blockIter = piece.getNewBlockIterator() while not blockIter.finished: # deleting a block that is not in localStore is harmless # blocks that are in localStore and in use will not be deleted @@ -172,11 +190,11 @@ proc getSuccessfulBlocks(futs: seq[Future[?!Block]]): ?!seq[Block] {.raises: []. return failure("Some blocks failed to fetch") success blockResults.mapIt(it.get) -proc fetchPiece*( +proc fetchPiece( self: TorrentDownloader, piece: TorrentPiece ): Future[?!void] {.async: (raises: [CancelledError]).} = let treeCid = self.codexManifest.treeCid - let blockIter = piece.getBlockIterator() + let blockIter = piece.getNewBlockIterator() var blockFutures = newSeq[Future[?!Block]]() for blockIndex in blockIter: let address = BlockAddress.init(treeCid, blockIndex) diff --git a/tests/codex/bittorrent/testtorrentdownloader.nim b/tests/codex/bittorrent/testtorrentdownloader.nim index 379a3798..47bd0669 100644 --- a/tests/codex/bittorrent/testtorrentdownloader.nim +++ b/tests/codex/bittorrent/testtorrentdownloader.nim @@ -1,3 +1,5 @@ +import std/importutils # private access + import pkg/libp2p/[cid, multicodec, multihash] import pkg/questionable/results @@ -11,11 +13,20 @@ import pkg/codex/manifest import pkg/codex/bittorrent/manifest import pkg/codex/bittorrent/torrentdownloader +import pkg/codex/utils/iter +import pkg/codex/logutils + import ../../asynctest import ./helpers import ../helpers import ../examples +logScope: + topics = "testtorrentdownloader" + +privateAccess(TorrentPiece) +privateAccess(TorrentDownloader) + template setupDependencies() {.dirty.} = var rng: Rng @@ -55,6 +66,14 @@ template setupDependencies() {.dirty.} = asyncchecksuite "Torrent Downloader": setupDependencies() + const + pieceLength = 64.KiBs.int + blockSize = BitTorrentBlockSize.NBytes + + # this is an invariant that pieceLength is always power of two + # and multiple of blockSize + let numOfBlocksPerPiece = pieceLength div blockSize.int + var codexManifest: Manifest torrentInfo: BitTorrentInfo @@ -67,17 +86,15 @@ asyncchecksuite "Torrent Downloader": torrentDownloader: TorrentDownloader proc createTestData(datasetSize: int) {.async.} = - echo "datasetSize: ", datasetSize - blocks = await makeRandomBlocks( - datasetSize = datasetSize, blockSize = BitTorrentBlockSize.NBytes - ) - for blk in blocks: - echo "block: ", blk.data.len + trace "requested dataset", datasetSize + blocks = await makeRandomBlocks(datasetSize = datasetSize, blockSize = blockSize) + for index, blk in blocks: + trace "block created ", index, len = blk.data.len codexManifest = await storeDataGetManifest(localStore, blocks) codexManifestBlock = (await storeCodexManifest(codexManifest, localStore)).tryGet() torrentInfo = ( await torrentInfoForCodexManifest( - localStore, codexManifest, pieceLength = 64.KiBs.int, name = "data.bin".some + localStore, codexManifest, pieceLength = pieceLength, name = "data.bin".some ) ).tryGet() torrentManifest = newBitTorrentManifest( @@ -86,15 +103,100 @@ asyncchecksuite "Torrent Downloader": torrentManifestBlock = (await storeTorrentManifest(torrentManifest, localStore)).tryGet() - setup: - await createTestData(datasetSize = 40.KiBs.int) + # setup: + # await createTestData(datasetSize = 40.KiBs.int) + + # torrentDownloader = + # newTorrentDownloader(torrentManifest, codexManifest, networkStore).tryGet() + + test "correctly sets up the pieces": + await createTestData(datasetSize = 72.KiBs.int) + + assert torrentInfo.pieces.len == 2 + assert codexManifest.blocksCount == 5 + + let blocksCount = codexManifest.blocksCount + let numOfPieces = torrentInfo.pieces.len + # last piece can have less blocks than numOfBlocksPerPiece + # we know how many blocks we have: + let numOfBlocksInLastPiece = blocksCount - (numOfBlocksPerPiece * (numOfPieces - 1)) torrentDownloader = newTorrentDownloader(torrentManifest, codexManifest, networkStore).tryGet() - test "correctly sets up the download queue": echo "codeManifest: ", $codexManifest echo "torrentInfo: ", $torrentInfo echo "torrentManifest: ", $torrentManifest echo "codexManifestBlockCid: ", $(codexManifestBlock.cid) echo "torrentManifestBlockCid: ", $(torrentManifestBlock.cid) + + check torrentDownloader.pieces.len == torrentInfo.pieces.len + check torrentDownloader.numberOfBlocksPerPiece == numOfBlocksPerPiece + + for index, piece in torrentDownloader.pieces: + assert index < numOfPieces + let + expectedBlockIndexStart = index * numOfBlocksPerPiece + expectedBlockIndexEnd = + if index < numOfPieces - 1: + (index + 1) * numOfBlocksPerPiece - 1 + else: + index * numOfBlocksPerPiece + numOfBlocksInLastPiece - 1 + expectedNumOfBlocksInPiece = expectedBlockIndexEnd - expectedBlockIndexStart + 1 + check piece.pieceIndex == index + check piece.pieceHash == torrentInfo.pieces[index] + check piece.blockIndexStart == expectedBlockIndexStart + check piece.blockIndexEnd == expectedBlockIndexEnd + check torrentDownloader.numberOfBlocksInPiece(index).tryGet == + expectedNumOfBlocksInPiece + let blockIterator = torrentDownloader.getNewBlocksInPieceIterator(index).tryGet + for blkIndex in expectedBlockIndexStart .. expectedBlockIndexEnd: + check blkIndex == blockIterator.next() + check blockIterator.finished == true + check piece.handle.finished == false + + proc validatePiece(torrentDownloader: TorrentDownloader, pieceIndex: int) {.async.} = + let treeCid = codexManifest.treeCid + var pieceHashCtx: sha1 + pieceHashCtx.init() + let blockIter = torrentDownloader.getNewBlocksInPieceIterator(pieceIndex).tryGet + let blks = newSeq[Block]() + while not blockIter.finished: + let blockIndex = blockIter.next() + let address = BlockAddress.init(treeCid, blockIndex) + let blk = (await localStore.getBlock(address)).tryGet() + trace "got block from local store", treeCid, blockIndex, cid = blk.cid + pieceHashCtx.update(blk.data) + let computedPieceHash = pieceHashCtx.finish() + let expectedPieceHash = torrentDownloader.pieces[pieceIndex].pieceHash + trace "comparing piece hashes", expectedPieceHash, computedPieceHash + check expectedPieceHash == computedPieceHash + trace "piece validated", treeCid, pieceIndex + + test "downloading pieces": + await createTestData(datasetSize = 72.KiBs.int) + + assert torrentInfo.pieces.len == 2 + assert codexManifest.blocksCount == 5 + + torrentDownloader = + newTorrentDownloader(torrentManifest, codexManifest, networkStore).tryGet() + + # start background task + let downloadFut = torrentDownloader.downloadPieces() + + let pieceIter = torrentDownloader.getNewPieceIterator() + + while not pieceIter.finished: + let expectedPieceIndex = pieceIter.next() + trace "waiting for piece", expectedPieceIndex + let waitFut = torrentDownloader.waitForNextPiece() + let status = await waitFut.withTimeout(1.seconds) + assert status == true + let pieceIndex = await waitFut + trace "got piece", pieceIndex + check pieceIndex == expectedPieceIndex + await torrentDownloader.validatePiece(pieceIndex) + + check (await torrentDownloader.waitForNextPiece()) == -1 + check torrentDownloader.queue.empty diff --git a/tests/codex/testbittorrent.nim b/tests/codex/testbittorrent.nim index 0d735bb6..481490f2 100644 --- a/tests/codex/testbittorrent.nim +++ b/tests/codex/testbittorrent.nim @@ -1,5 +1,6 @@ import ./bittorrent/testbencoding import ./bittorrent/testmanifest import ./bittorrent/testpiecevalidator +import ./bittorrent/testtorrentdownloader {.warning[UnusedImport]: off.}