updates torrent streaming to take advantage of the new interface of the piece validator

This commit is contained in:
Marcin Czenko 2025-03-20 03:14:33 +01:00
parent 1d47cf4870
commit 927e67e0c3
No known key found for this signature in database
GPG Key ID: 33DEA0C8E30937C0

View File

@ -399,14 +399,36 @@ proc retrieve*(
await self.streamEntireDataset(manifest, cid)
proc validatePiece(
self: CodexNodeRef, pieceValidator: TorrentPieceValidator, blocks: seq[bt.Block]
): ?!void {.raises: [].} =
trace "Fetched complete torrent piece - verifying..."
let pieceIndex = pieceValidator.validatePiece(blocks)
if pieceIndex < 0:
error "Piece verification failed", pieceIndex = pieceIndex
return failure(fmt"Piece verification failed for {pieceIndex=}")
trace "Piece successfully verified", pieceIndex
let confirmedPieceIndex = pieceValidator.confirmCurrentPiece()
if pieceIndex != confirmedPieceIndex:
error "Piece confirmation failed",
pieceIndex = pieceIndex, confirmedPieceIndex = confirmedPieceIndex
return
failure(fmt"Piece confirmation failed for {pieceIndex=}, {confirmedPieceIndex=}")
success()
proc fetchPieces*(
self: CodexNodeRef,
cid: Cid,
blockIter: Iter[int],
pieceIter: Iter[int],
numOfBlocksPerPiece: int,
onPiece: PieceProc,
torrentManifest: BitTorrentManifest,
codexManifest: Manifest,
pieceValidator: TorrentPieceValidator,
): Future[?!void] {.async: (raises: [CancelledError]).} =
let cid = codexManifest.treeCid
let numOfBlocksPerPiece = pieceValidator.numberOfBlocksPerPiece
let blockIter = Iter[int].new(0 ..< codexManifest.blocksCount)
while not blockIter.finished:
let blockFutures = collect:
for i in 0 ..< numOfBlocksPerPiece:
@ -417,30 +439,13 @@ proc fetchPieces*(
without blocks =? await allFinishedValues(blockFutures), err:
return failure(err)
if pieceErr =? (onPiece(blocks, pieceIter.next())).errorOption:
return failure(pieceErr)
if err =? self.validatePiece(pieceValidator, blocks).errorOption:
return failure(err)
await sleepAsync(1.millis)
success()
proc fetchPieces*(
self: CodexNodeRef,
torrentManifest: BitTorrentManifest,
codexManifest: Manifest,
onPiece: PieceProc,
): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} =
trace "Fetching torrent pieces"
let numOfPieces = torrentManifest.info.pieces.len
let numOfBlocksPerPiece =
torrentManifest.info.pieceLength.int div codexManifest.blockSize.int
let blockIter = Iter[int].new(0 ..< codexManifest.blocksCount)
let pieceIter = Iter[int].new(0 ..< numOfPieces)
self.fetchPieces(
codexManifest.treeCid, blockIter, pieceIter, numOfBlocksPerPiece, onPiece
)
proc streamTorrent*(
self: CodexNodeRef,
torrentManifest: BitTorrentManifest,
@ -449,33 +454,17 @@ proc streamTorrent*(
): Future[LPStream] {.async: (raises: []).} =
trace "Retrieving pieces from torrent"
let stream = LPStream(StoreStream.new(self.networkStore, codexManifest, pad = false))
var jobs: seq[Future[void]]
proc onPieceReceived(blocks: seq[bt.Block], pieceIndex: int): ?!void {.raises: [].} =
trace "Fetched torrent piece - verifying..."
if err =? pieceValidator.validatePiece(blocks, pieceIndex).errorOption:
error "Piece verification failed", pieceIndex = pieceIndex, err = err.msg
return failure(err)
if err =? pieceValidator.markPieceAsValid(pieceIndex).errorOption:
error "Unable to mark piece as valid", pieceIndex = pieceIndex
return failure("Unable to mark piece as valid")
trace "Piece verified", pieceIndex
success()
proc prefetch(): Future[void] {.async: (raises: []).} =
try:
if err =? (
await self.fetchPieces(torrentManifest, codexManifest, onPieceReceived)
).errorOption:
if err =? (await self.fetchPieces(torrentManifest, codexManifest, pieceValidator)).errorOption:
error "Unable to fetch blocks", err = err.msg
await noCancel pieceValidator.cancel()
await noCancel stream.close()
except CancelledError:
trace "Prefetch cancelled"
jobs.add(prefetch())
let prefetchTask = prefetch()
# Monitor stream completion and cancel background jobs when done
proc monitorStream() {.async: (raises: []).} =
@ -484,7 +473,7 @@ proc streamTorrent*(
except CancelledError:
trace "Stream cancelled"
finally:
await noCancel allFutures(jobs.mapIt(it.cancelAndWait))
await noCancel prefetchTask.cancelAndWait
self.trackedFutures.track(monitorStream())