From 9903473809cc6df5079d8bddcb2d72955204db58 Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Thu, 20 Mar 2025 02:48:04 +0100 Subject: [PATCH] updates pieceValidator with better internal state management --- codex/bittorrent/piecevalidator.nim | 27 ++++++++++--------- tests/codex/bittorrent/testpiecevalidator.nim | 18 ++++++------- 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/codex/bittorrent/piecevalidator.nim b/codex/bittorrent/piecevalidator.nim index e982865f..f7533a5b 100644 --- a/codex/bittorrent/piecevalidator.nim +++ b/codex/bittorrent/piecevalidator.nim @@ -53,24 +53,26 @@ proc getNewBlocksPerPieceIterator*(self: TorrentPieceValidator): Iter[int] = proc waitForNextPiece*( self: TorrentPieceValidator -): Future[bool] {.async: (raises: [CancelledError]).} = +): Future[int] {.async: (raises: [CancelledError]).} = if self.waitIter.finished: - return false - await self.pieces[self.waitIter.next()] - true + return -1 + let pieceIndex = self.waitIter.next() + await self.pieces[pieceIndex] + pieceIndex -proc confirmCurrentPiece*(self: TorrentPieceValidator): bool {.raises: [].} = +proc confirmCurrentPiece*(self: TorrentPieceValidator): int {.raises: [].} = if self.confirmIter.finished: - return false - self.pieces[self.confirmIter.next()].complete() - true + return -1 + let pieceIndex = self.confirmIter.next() + self.pieces[pieceIndex].complete() + pieceIndex proc cancel*(self: TorrentPieceValidator): Future[void] {.async: (raises: []).} = await noCancel allFutures(self.pieces.mapIt(it.cancelAndWait)) proc validatePiece*( self: TorrentPieceValidator, blocks: seq[Block] -): bool {.raises: [].} = +): int {.raises: [].} = var pieceHashCtx: sha1 pieceHashCtx.init() @@ -79,10 +81,11 @@ proc validatePiece*( let computedPieceHash = pieceHashCtx.finish() - if (computedPieceHash != self.torrentManifest.info.pieces[self.validationIter.next()]): - return false + let pieceIndex = self.validationIter.next() + if (computedPieceHash != self.torrentManifest.info.pieces[pieceIndex]): + return -1 - true + pieceIndex ################################################################# # Previous API, keeping it for now, probably will not be needed diff --git a/tests/codex/bittorrent/testpiecevalidator.nim b/tests/codex/bittorrent/testpiecevalidator.nim index f28f1df5..dd8cce24 100644 --- a/tests/codex/bittorrent/testpiecevalidator.nim +++ b/tests/codex/bittorrent/testpiecevalidator.nim @@ -85,18 +85,18 @@ suite "Torrent PieceValidator": let numberOfPieces = exampleTorrentManifest.info.pieces.len for i in 0 ..< numberOfPieces: let fut = pieceValidator.waitForNextPiece() - check pieceValidator.confirmCurrentPiece() - check await fut + check pieceValidator.confirmCurrentPiece() == i + check (await fut) == i test "awaiting is independent from confirming": let numberOfPieces = exampleTorrentManifest.info.pieces.len - var futs = newSeq[Future[bool]](numberOfPieces) + var futs = newSeq[Future[int]](numberOfPieces) for i in 0 ..< numberOfPieces: futs[i] = pieceValidator.waitForNextPiece() for i in 0 ..< numberOfPieces: - check pieceValidator.confirmCurrentPiece() + check pieceValidator.confirmCurrentPiece() == i for i in 0 ..< numberOfPieces: - check await futs[i] + check (await futs[i]) == i test "sequential validation of blocks": let blocksInPieces = newSeqWith( @@ -133,19 +133,19 @@ suite "Torrent PieceValidator": pieceValidator = newTorrentPieceValidator(torrentManifest, codexManifest) - for blks in blocksInPieces: + for pieceIndex, blks in blocksInPieces: # streaming client will wait on the piece validator to validate the piece let fut = pieceValidator.waitForNextPiece() # during prefetch we will validate each piece sequentially # piece validator maintains internal iterators in its object # to keep track of the validation order - check pieceValidator.validatePiece(blks) + check pieceValidator.validatePiece(blks) == pieceIndex # after piece is validated, the prefetch task will confirm the piece # again, using internal state, the validator knows which piece to confirm - check pieceValidator.confirmCurrentPiece() + check pieceValidator.confirmCurrentPiece() == pieceIndex # the fut will be resolved after the piece is confirmed # and the streaming client can continue - check await fut + check (await fut) == pieceIndex