adds more tests and ability to retrieve the downloaded blocks

This commit is contained in:
Marcin Czenko 2025-03-26 18:19:18 +01:00
parent c859250c4a
commit 2aa91e5f93
No known key found for this signature in database
GPG Key ID: 33DEA0C8E30937C0
2 changed files with 161 additions and 48 deletions

View File

@ -1,14 +1,16 @@
{.push raises: [].}
# import std/asyncstreams
import std/sequtils
import std/sugar
import pkg/chronos
import pkg/libp2p/multihash
import pkg/questionable/results
import ../rng
# import ../rng
import ../logutils
import ../utils/iter
import ../utils/trackedfutures
import ../errors
import ../manifest
import ../blocktype
@ -35,7 +37,10 @@ type
numberOfBlocksPerPiece: int
pieces: seq[TorrentPiece]
waitIter: Iter[int]
blockIter: Iter[int]
pieceIndex: int
queue: AsyncQueue[TorrentPiece]
trackedFutures: TrackedFutures
proc newTorrentPiece*(
pieceIndex: int, pieceHash: MultiHash, blockIndexStart: int, blockIndexEnd: int
@ -99,7 +104,10 @@ proc newTorrentDownloader*(
numberOfBlocksPerPiece: numOfBlocksPerPiece,
pieces: pieces,
waitIter: Iter[int].new(0 ..< numOfPieces),
blockIter: Iter[int].empty(),
pieceIndex: 0,
queue: queue,
trackedFutures: TrackedFutures(),
).success
proc getNewBlockIterator(piece: TorrentPiece): Iter[int] =
@ -215,10 +223,11 @@ proc fetchPiece(
success()
proc downloadPieces*(self: TorrentDownloader): Future[?!void] {.async: (raises: []).} =
proc downloadPieces*(self: TorrentDownloader): Future[void] {.async: (raises: []).} =
try:
while not self.queue.empty:
let piece = self.queue.popFirstNoWait()
trace "Downloading piece", pieceIndex = piece.pieceIndex
if err =? (await self.fetchPiece(piece)).errorOption:
error "Could not fetch piece", err = err.msg
# add the piece to the end of the queue
@ -228,19 +237,85 @@ proc downloadPieces*(self: TorrentDownloader): Future[?!void] {.async: (raises:
else:
# piece fetched and validated successfully
# mark it as ready
trace "Piece fetched and validated", pieceIndex = piece.pieceIndex
piece.handle.complete()
await sleepAsync(1.millis)
except CancelledError:
trace "Downloading pieces cancelled"
except AsyncQueueFullError as e:
error "Queue is full", error = e.msg
return failure e
except AsyncQueueEmptyError as e:
error "Trying to pop from empty queue", error = e.msg
return failure e
finally:
await noCancel self.cancel()
success()
# proc downloadPieces*(self: TorrentDownloader): Future[?!void] {.async: (raises: []).} =
# try:
# while not self.queue.empty:
# let piece = self.queue.popFirstNoWait()
# if err =? (await self.fetchPiece(piece)).errorOption:
# error "Could not fetch piece", err = err.msg
# # add the piece to the end of the queue
# # to try to fetch the piece again
# self.queue.addLastNoWait(piece)
# continue
# else:
# # piece fetched and validated successfully
# # mark it as ready
# piece.handle.complete()
# await sleepAsync(1.millis)
# except CancelledError:
# trace "Downloading pieces cancelled"
# except AsyncQueueFullError as e:
# error "Queue is full", error = e.msg
# return failure e
# except AsyncQueueEmptyError as e:
# error "Trying to pop from empty queue", error = e.msg
# return failure e
# finally:
# await noCancel self.cancel()
# success()
proc getNext*(
self: TorrentDownloader
): Future[?!(int, seq[byte])] {.async: (raises: []).} =
try:
if self.pieceIndex == -1:
return success((-1, newSeq[byte]()))
if self.blockIter.finished:
trace "Waiting for piece", pieceIndex = self.pieceIndex
self.pieceIndex = await self.waitForNextPiece()
trace "Got piece", pieceIndex = self.pieceIndex
if self.pieceIndex == -1:
return success((-1, newSeq[byte]()))
else:
let piece = self.pieces[self.pieceIndex]
self.blockIter = piece.getNewBlockIterator()
let blockIndex = self.blockIter.next()
if blockIndex == self.codexManifest.blocksCount - 1:
self.pieceIndex = -1
let address = BlockAddress.init(self.codexManifest.treeCid, blockIndex)
without blk =? (await self.networkStore.localStore.getBlock(address)), err:
error "Could not get block from local store", error = err.msg
return failure("Could not get block from local store: " & err.msg)
success((blockIndex, blk.data))
except CancelledError:
trace "Getting next block from downloader cancelled"
return success((-1, newSeq[byte]()))
except CatchableError as e:
warn "Could not get block from local store", error = e.msg
return failure("Could not get block from local store: " & e.msg)
proc finished*(self: TorrentDownloader): bool =
self.pieceIndex == -1
proc start*(self: TorrentDownloader) =
self.trackedFutures.track(self.downloadPieces())
proc stop*(self: TorrentDownloader) {.async.} =
self.pieceIndex = -1
await noCancel self.cancel()
await noCancel self.trackedFutures.cancelTracked()
#################################################################
# Previous API, keeping it for now, probably will not be needed

View File

@ -103,32 +103,45 @@ asyncchecksuite "Torrent Downloader":
torrentManifestBlock =
(await storeTorrentManifest(torrentManifest, localStore)).tryGet()
# setup:
# await createTestData(datasetSize = 40.KiBs.int)
proc validatePiece(torrentDownloader: TorrentDownloader, pieceIndex: int) {.async.} =
let treeCid = codexManifest.treeCid
var pieceHashCtx: sha1
pieceHashCtx.init()
let blockIter = torrentDownloader.getNewBlocksInPieceIterator(pieceIndex).tryGet
let blks = newSeq[Block]()
while not blockIter.finished:
let blockIndex = blockIter.next()
let address = BlockAddress.init(treeCid, blockIndex)
let blk = (await localStore.getBlock(address)).tryGet()
trace "got block from local store", treeCid, blockIndex, cid = blk.cid
pieceHashCtx.update(blk.data)
let computedPieceHash = pieceHashCtx.finish()
let expectedPieceHash = torrentDownloader.pieces[pieceIndex].pieceHash
trace "comparing piece hashes", expectedPieceHash, computedPieceHash
check expectedPieceHash == computedPieceHash
trace "piece validated", treeCid, pieceIndex
# torrentDownloader =
# newTorrentDownloader(torrentManifest, codexManifest, networkStore).tryGet()
test "correctly sets up the pieces":
setup:
await createTestData(datasetSize = 72.KiBs.int)
torrentDownloader =
newTorrentDownloader(torrentManifest, codexManifest, networkStore).tryGet()
assert torrentInfo.pieces.len == 2
assert codexManifest.blocksCount == 5
test "correctly sets up the pieces":
let blocksCount = codexManifest.blocksCount
let numOfPieces = torrentInfo.pieces.len
# last piece can have less blocks than numOfBlocksPerPiece
# we know how many blocks we have:
let numOfBlocksInLastPiece = blocksCount - (numOfBlocksPerPiece * (numOfPieces - 1))
torrentDownloader =
newTorrentDownloader(torrentManifest, codexManifest, networkStore).tryGet()
echo "codeManifest: ", $codexManifest
echo "torrentInfo: ", $torrentInfo
echo "torrentManifest: ", $torrentManifest
echo "codexManifestBlockCid: ", $(codexManifestBlock.cid)
echo "torrentManifestBlockCid: ", $(torrentManifestBlock.cid)
# echo "codeManifest: ", $codexManifest
# echo "torrentInfo: ", $torrentInfo
# echo "torrentManifest: ", $torrentManifest
# echo "codexManifestBlockCid: ", $(codexManifestBlock.cid)
# echo "torrentManifestBlockCid: ", $(torrentManifestBlock.cid)
check torrentDownloader.pieces.len == torrentInfo.pieces.len
check torrentDownloader.numberOfBlocksPerPiece == numOfBlocksPerPiece
@ -155,35 +168,8 @@ asyncchecksuite "Torrent Downloader":
check blockIterator.finished == true
check piece.handle.finished == false
proc validatePiece(torrentDownloader: TorrentDownloader, pieceIndex: int) {.async.} =
let treeCid = codexManifest.treeCid
var pieceHashCtx: sha1
pieceHashCtx.init()
let blockIter = torrentDownloader.getNewBlocksInPieceIterator(pieceIndex).tryGet
let blks = newSeq[Block]()
while not blockIter.finished:
let blockIndex = blockIter.next()
let address = BlockAddress.init(treeCid, blockIndex)
let blk = (await localStore.getBlock(address)).tryGet()
trace "got block from local store", treeCid, blockIndex, cid = blk.cid
pieceHashCtx.update(blk.data)
let computedPieceHash = pieceHashCtx.finish()
let expectedPieceHash = torrentDownloader.pieces[pieceIndex].pieceHash
trace "comparing piece hashes", expectedPieceHash, computedPieceHash
check expectedPieceHash == computedPieceHash
trace "piece validated", treeCid, pieceIndex
test "downloading pieces":
await createTestData(datasetSize = 72.KiBs.int)
assert torrentInfo.pieces.len == 2
assert codexManifest.blocksCount == 5
torrentDownloader =
newTorrentDownloader(torrentManifest, codexManifest, networkStore).tryGet()
# start background task
let downloadFut = torrentDownloader.downloadPieces()
test "pieces are validated":
torrentDownloader.start()
let pieceIter = torrentDownloader.getNewPieceIterator()
@ -200,3 +186,55 @@ asyncchecksuite "Torrent Downloader":
check (await torrentDownloader.waitForNextPiece()) == -1
check torrentDownloader.queue.empty
test "get downloaded blocks":
torrentDownloader.start()
let blockIter = Iter.new(0 ..< codexManifest.blocksCount)
while not torrentDownloader.finished:
let dataFut = torrentDownloader.getNext()
let status = await dataFut.withTimeout(1.seconds)
assert status == true
let (blockIndex, data) = (await dataFut).tryGet()
trace "got data", blockIndex, len = data.len
let expectedBlockIndex = blockIter.next()
check blockIndex == expectedBlockIndex
let treeCid = codexManifest.treeCid
let address = BlockAddress.init(treeCid, expectedBlockIndex)
let blk = (await localStore.getBlock(address)).tryGet()
check blk.data == data
check blockIter.finished
await torrentDownloader.stop()
test "canceling download":
torrentDownloader.start()
let blockIter = Iter.new(0 ..< codexManifest.blocksCount)
var (blockIndex, data) = (await torrentDownloader.getNext()).tryGet()
check blockIndex == 0
check data.len > 0
await torrentDownloader.stop()
(blockIndex, data) = (await torrentDownloader.getNext()).tryGet()
check blockIndex == -1
check data.len == 0
test "stoping before starting (simulate cancellation)":
let blockIter = Iter.new(0 ..< codexManifest.blocksCount)
# download did not even start, thus this one will not complete
let dataFut = torrentDownloader.getNext()
# calling stop will cancel awaiting for the next block
await torrentDownloader.stop()
assert dataFut.finished
let (blockIndex, data) = dataFut.read.tryGet()
check blockIndex == -1
check data.len == 0