adds some good weather tests for torrentdownloader

This commit is contained in:
Marcin Czenko 2025-03-26 03:08:51 +01:00
parent ee2f403e52
commit c859250c4a
No known key found for this signature in database
GPG Key ID: 33DEA0C8E30937C0
3 changed files with 152 additions and 31 deletions

View File

@ -16,7 +16,7 @@ import ../stores/networkstore
import ./manifest
logScope:
topics = "codex piecedownloader"
topics = "codex node torrentdownloader"
type
PieceHandle* = Future[void].Raising([CancelledError])
@ -26,7 +26,6 @@ type
blockIndexStart: int
blockIndexEnd: int
handle: PieceHandle
randomIndex: int
TorrentDownloader* = ref object
torrentManifest: BitTorrentManifest
@ -49,25 +48,30 @@ proc newTorrentPiece*(
handle: cast[PieceHandle](newFuture[void]("PieceValidator.newTorrentPiece")),
)
proc randomize(self: TorrentPiece, numberOfPieces: int): void =
self.randomIndex = Rng.instance.rand(max = numberOfPieces * 10)
proc newTorrentDownloader*(
torrentManifest: BitTorrentManifest,
codexManifest: Manifest,
networkStore: NetworkStore,
): ?!TorrentDownloader =
let numOfPieces = torrentManifest.info.pieces.len
let numOfBlocksPerPiece =
torrentManifest.info.pieceLength.int div codexManifest.blockSize.int
let
blocksCount = codexManifest.blocksCount
numOfPieces = torrentManifest.info.pieces.len
numOfBlocksPerPiece =
torrentManifest.info.pieceLength.int div codexManifest.blockSize.int
numOfBlocksInLastPiece = blocksCount - (numOfBlocksPerPiece * (numOfPieces - 1))
let pieces = collect:
for i in 0 ..< numOfPieces:
var blockIndexEnd = ((i + 1) * numOfBlocksPerPiece) - 1
if i == numOfPieces - 1:
# last piece can have less blocks than numOfBlocksPerPiece
blockIndexEnd = i * numOfBlocksPerPiece + numOfBlocksInLastPiece - 1
let piece = newTorrentPiece(
pieceIndex = i,
pieceHash = torrentManifest.info.pieces[i],
blockIndexStart = i * numOfBlocksPerPiece,
blockIndexEnd = ((i + 1) * numOfBlocksPerPiece) - 1,
blockIndexEnd = blockIndexEnd,
)
piece
@ -98,17 +102,31 @@ proc newTorrentDownloader*(
queue: queue,
).success
func numberOfBlocksPerPiece*(self: TorrentDownloader): int =
self.numberOfBlocksPerPiece
proc getNewBlockIterator(piece: TorrentPiece): Iter[int] =
Iter[int].new(piece.blockIndexStart .. piece.blockIndexEnd)
func numberOfBlocks(piece: TorrentPiece): int =
piece.blockIndexEnd - piece.blockIndexStart + 1
func numberOfBlocksInPiece*(self: TorrentDownloader, pieceIndex: int): ?!int =
if pieceIndex < 0 or pieceIndex >= self.numberOfPieces:
return failure("Invalid piece index")
let piece = self.pieces[pieceIndex]
success(piece.numberOfBlocks)
proc getNewBlocksInPieceIterator*(
self: TorrentDownloader, pieceIndex: int
): ?!Iter[int] =
if pieceIndex < 0 or pieceIndex >= self.numberOfPieces:
return failure("Invalid piece index")
let piece = self.pieces[pieceIndex]
success(piece.getNewBlockIterator())
proc getNewPieceIterator*(self: TorrentDownloader): Iter[int] =
Iter[int].new(0 ..< self.numberOfPieces)
proc getNewBlocksPerPieceIterator*(self: TorrentDownloader): Iter[int] =
Iter[int].new(0 ..< self.numberOfBlocksPerPiece)
proc getBlockIterator(self: TorrentPiece): Iter[int] =
Iter[int].new(self.blockIndexStart .. self.blockIndexEnd)
# proc getNewBlocksPerPieceIterator*(self: TorrentDownloader): Iter[int] =
# Iter[int].new(0 ..< self.numberOfBlocksPerPiece)
proc waitForNextPiece*(
self: TorrentDownloader
@ -122,7 +140,7 @@ proc waitForNextPiece*(
proc cancel*(self: TorrentDownloader): Future[void] {.async: (raises: []).} =
await noCancel allFutures(self.pieces.mapIt(it.handle.cancelAndWait))
proc validate*(piece: TorrentPiece, blocks: seq[Block]): ?!void {.raises: [].} =
proc validate(piece: TorrentPiece, blocks: seq[Block]): ?!void {.raises: [].} =
var pieceHashCtx: sha1
pieceHashCtx.init()
@ -136,7 +154,7 @@ proc validate*(piece: TorrentPiece, blocks: seq[Block]): ?!void {.raises: [].} =
success()
proc allBlocksFinished*(futs: seq[Future[?!Block]]): seq[?!Block] {.raises: [].} =
proc allBlocksFinished(futs: seq[Future[?!Block]]): seq[?!Block] {.raises: [].} =
## If all futures have finished, return corresponding values,
## otherwise return failure
##
@ -154,7 +172,7 @@ proc deleteBlocks(
self: TorrentDownloader, piece: TorrentPiece
): Future[void] {.async: (raises: [CancelledError]).} =
let treeCid = self.codexManifest.treeCid
let blockIter = piece.getBlockIterator()
let blockIter = piece.getNewBlockIterator()
while not blockIter.finished:
# deleting a block that is not in localStore is harmless
# blocks that are in localStore and in use will not be deleted
@ -172,11 +190,11 @@ proc getSuccessfulBlocks(futs: seq[Future[?!Block]]): ?!seq[Block] {.raises: [].
return failure("Some blocks failed to fetch")
success blockResults.mapIt(it.get)
proc fetchPiece*(
proc fetchPiece(
self: TorrentDownloader, piece: TorrentPiece
): Future[?!void] {.async: (raises: [CancelledError]).} =
let treeCid = self.codexManifest.treeCid
let blockIter = piece.getBlockIterator()
let blockIter = piece.getNewBlockIterator()
var blockFutures = newSeq[Future[?!Block]]()
for blockIndex in blockIter:
let address = BlockAddress.init(treeCid, blockIndex)

View File

@ -1,3 +1,5 @@
import std/importutils # private access
import pkg/libp2p/[cid, multicodec, multihash]
import pkg/questionable/results
@ -11,11 +13,20 @@ import pkg/codex/manifest
import pkg/codex/bittorrent/manifest
import pkg/codex/bittorrent/torrentdownloader
import pkg/codex/utils/iter
import pkg/codex/logutils
import ../../asynctest
import ./helpers
import ../helpers
import ../examples
logScope:
topics = "testtorrentdownloader"
privateAccess(TorrentPiece)
privateAccess(TorrentDownloader)
template setupDependencies() {.dirty.} =
var
rng: Rng
@ -55,6 +66,14 @@ template setupDependencies() {.dirty.} =
asyncchecksuite "Torrent Downloader":
setupDependencies()
const
pieceLength = 64.KiBs.int
blockSize = BitTorrentBlockSize.NBytes
# this is an invariant that pieceLength is always power of two
# and multiple of blockSize
let numOfBlocksPerPiece = pieceLength div blockSize.int
var
codexManifest: Manifest
torrentInfo: BitTorrentInfo
@ -67,17 +86,15 @@ asyncchecksuite "Torrent Downloader":
torrentDownloader: TorrentDownloader
proc createTestData(datasetSize: int) {.async.} =
echo "datasetSize: ", datasetSize
blocks = await makeRandomBlocks(
datasetSize = datasetSize, blockSize = BitTorrentBlockSize.NBytes
)
for blk in blocks:
echo "block: ", blk.data.len
trace "requested dataset", datasetSize
blocks = await makeRandomBlocks(datasetSize = datasetSize, blockSize = blockSize)
for index, blk in blocks:
trace "block created ", index, len = blk.data.len
codexManifest = await storeDataGetManifest(localStore, blocks)
codexManifestBlock = (await storeCodexManifest(codexManifest, localStore)).tryGet()
torrentInfo = (
await torrentInfoForCodexManifest(
localStore, codexManifest, pieceLength = 64.KiBs.int, name = "data.bin".some
localStore, codexManifest, pieceLength = pieceLength, name = "data.bin".some
)
).tryGet()
torrentManifest = newBitTorrentManifest(
@ -86,15 +103,100 @@ asyncchecksuite "Torrent Downloader":
torrentManifestBlock =
(await storeTorrentManifest(torrentManifest, localStore)).tryGet()
setup:
await createTestData(datasetSize = 40.KiBs.int)
# setup:
# await createTestData(datasetSize = 40.KiBs.int)
# torrentDownloader =
# newTorrentDownloader(torrentManifest, codexManifest, networkStore).tryGet()
test "correctly sets up the pieces":
await createTestData(datasetSize = 72.KiBs.int)
assert torrentInfo.pieces.len == 2
assert codexManifest.blocksCount == 5
let blocksCount = codexManifest.blocksCount
let numOfPieces = torrentInfo.pieces.len
# last piece can have less blocks than numOfBlocksPerPiece
# we know how many blocks we have:
let numOfBlocksInLastPiece = blocksCount - (numOfBlocksPerPiece * (numOfPieces - 1))
torrentDownloader =
newTorrentDownloader(torrentManifest, codexManifest, networkStore).tryGet()
test "correctly sets up the download queue":
echo "codeManifest: ", $codexManifest
echo "torrentInfo: ", $torrentInfo
echo "torrentManifest: ", $torrentManifest
echo "codexManifestBlockCid: ", $(codexManifestBlock.cid)
echo "torrentManifestBlockCid: ", $(torrentManifestBlock.cid)
check torrentDownloader.pieces.len == torrentInfo.pieces.len
check torrentDownloader.numberOfBlocksPerPiece == numOfBlocksPerPiece
for index, piece in torrentDownloader.pieces:
assert index < numOfPieces
let
expectedBlockIndexStart = index * numOfBlocksPerPiece
expectedBlockIndexEnd =
if index < numOfPieces - 1:
(index + 1) * numOfBlocksPerPiece - 1
else:
index * numOfBlocksPerPiece + numOfBlocksInLastPiece - 1
expectedNumOfBlocksInPiece = expectedBlockIndexEnd - expectedBlockIndexStart + 1
check piece.pieceIndex == index
check piece.pieceHash == torrentInfo.pieces[index]
check piece.blockIndexStart == expectedBlockIndexStart
check piece.blockIndexEnd == expectedBlockIndexEnd
check torrentDownloader.numberOfBlocksInPiece(index).tryGet ==
expectedNumOfBlocksInPiece
let blockIterator = torrentDownloader.getNewBlocksInPieceIterator(index).tryGet
for blkIndex in expectedBlockIndexStart .. expectedBlockIndexEnd:
check blkIndex == blockIterator.next()
check blockIterator.finished == true
check piece.handle.finished == false
proc validatePiece(torrentDownloader: TorrentDownloader, pieceIndex: int) {.async.} =
let treeCid = codexManifest.treeCid
var pieceHashCtx: sha1
pieceHashCtx.init()
let blockIter = torrentDownloader.getNewBlocksInPieceIterator(pieceIndex).tryGet
let blks = newSeq[Block]()
while not blockIter.finished:
let blockIndex = blockIter.next()
let address = BlockAddress.init(treeCid, blockIndex)
let blk = (await localStore.getBlock(address)).tryGet()
trace "got block from local store", treeCid, blockIndex, cid = blk.cid
pieceHashCtx.update(blk.data)
let computedPieceHash = pieceHashCtx.finish()
let expectedPieceHash = torrentDownloader.pieces[pieceIndex].pieceHash
trace "comparing piece hashes", expectedPieceHash, computedPieceHash
check expectedPieceHash == computedPieceHash
trace "piece validated", treeCid, pieceIndex
test "downloading pieces":
await createTestData(datasetSize = 72.KiBs.int)
assert torrentInfo.pieces.len == 2
assert codexManifest.blocksCount == 5
torrentDownloader =
newTorrentDownloader(torrentManifest, codexManifest, networkStore).tryGet()
# start background task
let downloadFut = torrentDownloader.downloadPieces()
let pieceIter = torrentDownloader.getNewPieceIterator()
while not pieceIter.finished:
let expectedPieceIndex = pieceIter.next()
trace "waiting for piece", expectedPieceIndex
let waitFut = torrentDownloader.waitForNextPiece()
let status = await waitFut.withTimeout(1.seconds)
assert status == true
let pieceIndex = await waitFut
trace "got piece", pieceIndex
check pieceIndex == expectedPieceIndex
await torrentDownloader.validatePiece(pieceIndex)
check (await torrentDownloader.waitForNextPiece()) == -1
check torrentDownloader.queue.empty

View File

@ -1,5 +1,6 @@
import ./bittorrent/testbencoding
import ./bittorrent/testmanifest
import ./bittorrent/testpiecevalidator
import ./bittorrent/testtorrentdownloader
{.warning[UnusedImport]: off.}