updates pieceValidator with better internal state management

This commit is contained in:
Marcin Czenko 2025-03-20 02:48:04 +01:00
parent 45ab5eee92
commit 9903473809
No known key found for this signature in database
GPG Key ID: 33DEA0C8E30937C0
2 changed files with 24 additions and 21 deletions

View File

@ -53,24 +53,26 @@ proc getNewBlocksPerPieceIterator*(self: TorrentPieceValidator): Iter[int] =
proc waitForNextPiece*(
self: TorrentPieceValidator
): Future[bool] {.async: (raises: [CancelledError]).} =
): Future[int] {.async: (raises: [CancelledError]).} =
if self.waitIter.finished:
return false
await self.pieces[self.waitIter.next()]
true
return -1
let pieceIndex = self.waitIter.next()
await self.pieces[pieceIndex]
pieceIndex
proc confirmCurrentPiece*(self: TorrentPieceValidator): bool {.raises: [].} =
proc confirmCurrentPiece*(self: TorrentPieceValidator): int {.raises: [].} =
if self.confirmIter.finished:
return false
self.pieces[self.confirmIter.next()].complete()
true
return -1
let pieceIndex = self.confirmIter.next()
self.pieces[pieceIndex].complete()
pieceIndex
proc cancel*(self: TorrentPieceValidator): Future[void] {.async: (raises: []).} =
await noCancel allFutures(self.pieces.mapIt(it.cancelAndWait))
proc validatePiece*(
self: TorrentPieceValidator, blocks: seq[Block]
): bool {.raises: [].} =
): int {.raises: [].} =
var pieceHashCtx: sha1
pieceHashCtx.init()
@ -79,10 +81,11 @@ proc validatePiece*(
let computedPieceHash = pieceHashCtx.finish()
if (computedPieceHash != self.torrentManifest.info.pieces[self.validationIter.next()]):
return false
let pieceIndex = self.validationIter.next()
if (computedPieceHash != self.torrentManifest.info.pieces[pieceIndex]):
return -1
true
pieceIndex
#################################################################
# Previous API, keeping it for now, probably will not be needed

View File

@ -85,18 +85,18 @@ suite "Torrent PieceValidator":
let numberOfPieces = exampleTorrentManifest.info.pieces.len
for i in 0 ..< numberOfPieces:
let fut = pieceValidator.waitForNextPiece()
check pieceValidator.confirmCurrentPiece()
check await fut
check pieceValidator.confirmCurrentPiece() == i
check (await fut) == i
test "awaiting is independent from confirming":
let numberOfPieces = exampleTorrentManifest.info.pieces.len
var futs = newSeq[Future[bool]](numberOfPieces)
var futs = newSeq[Future[int]](numberOfPieces)
for i in 0 ..< numberOfPieces:
futs[i] = pieceValidator.waitForNextPiece()
for i in 0 ..< numberOfPieces:
check pieceValidator.confirmCurrentPiece()
check pieceValidator.confirmCurrentPiece() == i
for i in 0 ..< numberOfPieces:
check await futs[i]
check (await futs[i]) == i
test "sequential validation of blocks":
let blocksInPieces = newSeqWith(
@ -133,19 +133,19 @@ suite "Torrent PieceValidator":
pieceValidator = newTorrentPieceValidator(torrentManifest, codexManifest)
for blks in blocksInPieces:
for pieceIndex, blks in blocksInPieces:
# streaming client will wait on the piece validator to validate the piece
let fut = pieceValidator.waitForNextPiece()
# during prefetch we will validate each piece sequentially
# piece validator maintains internal iterators in its object
# to keep track of the validation order
check pieceValidator.validatePiece(blks)
check pieceValidator.validatePiece(blks) == pieceIndex
# after piece is validated, the prefetch task will confirm the piece
# again, using internal state, the validator knows which piece to confirm
check pieceValidator.confirmCurrentPiece()
check pieceValidator.confirmCurrentPiece() == pieceIndex
# the fut will be resolved after the piece is confirmed
# and the streaming client can continue
check await fut
check (await fut) == pieceIndex