adds torrent piece validator abstraction to keep streaming in sync with piece validation

This commit is contained in:
Marcin Czenko 2025-03-20 02:12:02 +01:00
parent 0748a348e7
commit 45ab5eee92
No known key found for this signature in database
GPG Key ID: 33DEA0C8E30937C0
4 changed files with 334 additions and 35 deletions

View File

@ -0,0 +1,134 @@
{.push raises: [].}
import std/sequtils
import pkg/chronos
import pkg/libp2p/multihash
import pkg/questionable/results
import ../utils/iter
import ../manifest
import ../blocktype
import ./manifest
type
PieceHandle* = Future[void].Raising([CancelledError])
TorrentPieceValidator* = ref object
torrentManifest: BitTorrentManifest
numberOfPieces: int
numberOfBlocksPerPiece: int
pieces: seq[PieceHandle]
waitIter: Iter[int]
confirmIter: Iter[int]
validationIter: Iter[int]
proc newTorrentPieceValidator*(
torrentManifest: BitTorrentManifest, codexManifest: Manifest
): TorrentPieceValidator =
let numOfPieces = torrentManifest.info.pieces.len
let numOfBlocksPerPiece =
torrentManifest.info.pieceLength.int div codexManifest.blockSize.int
let pieces = newSeqWith(
numOfPieces,
cast[PieceHandle](newFuture[void]("PieceValidator.newTorrentPieceValidator")),
)
TorrentPieceValidator(
torrentManifest: torrentManifest,
numberOfPieces: numOfPieces,
numberOfBlocksPerPiece: numOfBlocksPerPiece,
pieces: pieces,
waitIter: Iter[int].new(0 ..< numOfPieces),
confirmIter: Iter[int].new(0 ..< numOfPieces),
validationIter: Iter[int].new(0 ..< numOfPieces),
)
func numberOfBlocksPerPiece*(self: TorrentPieceValidator): int =
self.numberOfBlocksPerPiece
proc getNewPieceIterator*(self: TorrentPieceValidator): Iter[int] =
Iter[int].new(0 ..< self.numberOfPieces)
proc getNewBlocksPerPieceIterator*(self: TorrentPieceValidator): Iter[int] =
Iter[int].new(0 ..< self.numberOfBlocksPerPiece)
proc waitForNextPiece*(
self: TorrentPieceValidator
): Future[bool] {.async: (raises: [CancelledError]).} =
if self.waitIter.finished:
return false
await self.pieces[self.waitIter.next()]
true
proc confirmCurrentPiece*(self: TorrentPieceValidator): bool {.raises: [].} =
if self.confirmIter.finished:
return false
self.pieces[self.confirmIter.next()].complete()
true
proc cancel*(self: TorrentPieceValidator): Future[void] {.async: (raises: []).} =
await noCancel allFutures(self.pieces.mapIt(it.cancelAndWait))
proc validatePiece*(
self: TorrentPieceValidator, blocks: seq[Block]
): bool {.raises: [].} =
var pieceHashCtx: sha1
pieceHashCtx.init()
for blk in blocks:
pieceHashCtx.update(blk.data)
let computedPieceHash = pieceHashCtx.finish()
if (computedPieceHash != self.torrentManifest.info.pieces[self.validationIter.next()]):
return false
true
#################################################################
# Previous API, keeping it for now, probably will not be needed
#
#################################################################
proc waitForPiece*(
self: TorrentPieceValidator, index: int
): Future[?!void] {.async: (raises: [CancelledError]).} =
if index < 0 or index >= self.pieces.len:
return failure("Invalid piece index")
await self.pieces[index]
success()
proc cancelPiece*(
self: TorrentPieceValidator, index: int
): Future[?!void] {.async: (raises: [CancelledError]).} =
if index < 0 or index >= self.pieces.len:
return failure("Invalid piece index")
await noCancel self.pieces[index].cancelAndWait()
success()
proc markPieceAsValid*(self: TorrentPieceValidator, index: int): ?!void {.raises: [].} =
if index < 0 or index >= self.pieces.len:
return failure("Invalid piece index")
self.pieces[index].complete()
success()
proc validatePiece*(
self: TorrentPieceValidator, blocks: seq[Block], index: int
): ?!void {.raises: [].} =
if index < 0 or index >= self.pieces.len:
return failure("Invalid piece index")
var pieceHashCtx: sha1
pieceHashCtx.init()
for blk in blocks:
pieceHashCtx.update(blk.data)
let computedPieceHash = pieceHashCtx.finish()
# if index == 1:
# return failure("Piece verification failed (simulated)")
if (computedPieceHash != self.torrentManifest.info.pieces[index]):
return failure("Piece verification failed")
success()

View File

@ -50,6 +50,7 @@ import ./utils/trackedfutures
# bittorrent
from ./codextypes import InfoHashV1Codec
import ./bittorrent/manifest
import ./bittorrent/piecevalidator
export logutils
@ -441,8 +442,11 @@ proc fetchPieces*(
)
proc streamTorrent*(
self: CodexNodeRef, torrentManifest: BitTorrentManifest, codexManifest: Manifest
): Future[?!LPStream] {.async: (raises: []).} =
self: CodexNodeRef,
torrentManifest: BitTorrentManifest,
codexManifest: Manifest,
pieceValidator: TorrentPieceValidator,
): Future[LPStream] {.async: (raises: []).} =
trace "Retrieving pieces from torrent"
let stream = LPStream(StoreStream.new(self.networkStore, codexManifest, pad = false))
var jobs: seq[Future[void]]
@ -450,20 +454,14 @@ proc streamTorrent*(
proc onPieceReceived(blocks: seq[bt.Block], pieceIndex: int): ?!void {.raises: [].} =
trace "Fetched torrent piece - verifying..."
var pieceHashCtx: sha1
pieceHashCtx.init()
if err =? pieceValidator.validatePiece(blocks, pieceIndex).errorOption:
error "Piece verification failed", pieceIndex = pieceIndex, err = err.msg
return failure(err)
for blk in blocks:
pieceHashCtx.update(blk.data)
let pieceHash = pieceHashCtx.finish()
if (pieceHash != torrentManifest.info.pieces[pieceIndex]):
error "Piece verification failed", pieceIndex = pieceIndex
return failure("Piece verification failed")
trace "Piece verified", pieceIndex, pieceHash
# great success
if err =? pieceValidator.markPieceAsValid(pieceIndex).errorOption:
error "Unable to mark piece as valid", pieceIndex = pieceIndex
return failure("Unable to mark piece as valid")
trace "Piece verified", pieceIndex
success()
proc prefetch(): Future[void] {.async: (raises: []).} =
@ -472,7 +470,8 @@ proc streamTorrent*(
await self.fetchPieces(torrentManifest, codexManifest, onPieceReceived)
).errorOption:
error "Unable to fetch blocks", err = err.msg
await stream.close()
await noCancel pieceValidator.cancel()
await noCancel stream.close()
except CancelledError:
trace "Prefetch cancelled"
@ -490,7 +489,7 @@ proc streamTorrent*(
self.trackedFutures.track(monitorStream())
trace "Creating store stream for torrent manifest"
stream.success
stream
proc retrieveTorrent*(
self: CodexNodeRef, infoHash: MultiHash

View File

@ -38,8 +38,10 @@ import ../erasure/erasure
import ../manifest
import ../streams/asyncstreamwrapper
import ../stores
import ../utils/iter
import ../utils/options
import ../bittorrent/manifest
import ../bittorrent/piecevalidator
import ./coders
import ./json
@ -174,16 +176,6 @@ proc retrieveInfoHash(
return
let (torrentManifest, codexManifest) = torrent
without stream =? (await node.streamTorrent(torrentManifest, codexManifest)), err:
if err of BlockNotFoundError:
resp.status = Http404
await resp.sendBody("")
return
else:
resp.status = Http500
await resp.sendBody(err.msg)
return
if codexManifest.mimetype.isSome:
resp.setHeader("Content-Type", codexManifest.mimetype.get())
else:
@ -199,21 +191,44 @@ proc retrieveInfoHash(
await resp.prepareChunked()
while not stream.atEof:
var
buff = newSeqUninitialized[byte](BitTorrentBlockSize.int)
len = await stream.readOnce(addr buff[0], buff.len)
let torrentPieceValidator = newTorrentPieceValidator(torrentManifest, codexManifest)
buff.setLen(len)
if buff.len <= 0:
let stream =
await node.streamTorrent(torrentManifest, codexManifest, torrentPieceValidator)
let pieceIter = torrentPieceValidator.getNewPieceIterator()
var pieceIndex = 0
while not pieceIter.finished and not stream.atEof:
trace "Waiting for piece", pieceIndex
if not (await torrentPieceValidator.waitForNextPiece()):
warn "No more torrent pieces expected. TorrentPieceValidator out of sync"
break
bytes += buff.len
trace "Got piece", pieceIndex
inc pieceIndex
await resp.sendChunk(addr buff[0], buff.len)
let blocksPerPieceIter = torrentPieceValidator.getNewBlocksPerPieceIterator()
while not stream.atEof:
if blocksPerPieceIter.finished:
break
var buff = newSeqUninitialized[byte](BitTorrentBlockSize.int)
# wait for the next the piece to prefetch
let len = await stream.readOnce(addr buff[0], buff.len)
buff.setLen(len)
if buff.len <= 0:
break
bytes += buff.len
await resp.sendChunk(addr buff[0], buff.len)
discard blocksPerPieceIter.next()
await resp.finish()
codex_api_downloads.inc()
except CancelledError as exc:
info "Stream cancelled", exc = exc.msg
raise exc
except CatchableError as exc:
warn "Error streaming blocks", exc = exc.msg

View File

@ -0,0 +1,151 @@
import std/strformat
import pkg/libp2p/[cid, multicodec, multihash]
import pkg/questionable/results
import ../../asynctest
import ../examples
import pkg/codex/manifest
import pkg/codex/bittorrent/manifest
import pkg/codex/bittorrent/piecevalidator
suite "Torrent PieceValidator":
const numOfPieces = 10
const pieceLength = 65536
const contentLength = pieceLength * numOfPieces
let pieces = newSeqWith(numOfPieces, MultiHash.example(Sha1HashCodec))
let exampleInfo = BitTorrentInfo(
length: contentLength,
pieceLength: pieceLength,
pieces: pieces,
name: "data.bin".some,
)
let dummyCodexManifestCid = Cid.example()
let exampleTorrentManifest =
newBitTorrentManifest(info = exampleInfo, codexManifestCid = dummyCodexManifestCid)
let infoBencoded = exampleInfo.bencode()
let infoHash = MultiHash.digest($Sha1HashCodec, infoBencoded).tryGet
let exampleCodexManifest = Manifest.new(
treeCid = Cid.example,
blockSize = BitTorrentBlockSize.NBytes,
datasetSize = exampleInfo.length.NBytes,
filename = exampleInfo.name,
mimetype = "application/octet-stream".some,
)
var pieceValidator: TorrentPieceValidator
setup:
pieceValidator =
newTorrentPieceValidator(exampleTorrentManifest, exampleCodexManifest)
test "correctly sets numberOfBlocksPerPiece":
check pieceValidator.numberOfBlocksPerPiece ==
exampleInfo.pieceLength.int div exampleCodexManifest.blockSize.int
test "reports an error when trying to wait for an invalid piece":
let res = await pieceValidator.waitForPiece(exampleTorrentManifest.info.pieces.len)
check isFailure(res)
check res.error.msg == "Invalid piece index"
test "reports an error when trying to mark an invalid piece as valid":
let res = pieceValidator.markPieceAsValid(exampleTorrentManifest.info.pieces.len)
check isFailure(res)
check res.error.msg == "Invalid piece index"
for i in 0 ..< exampleTorrentManifest.info.pieces.len:
test fmt"can await piece {i}":
let fut = pieceValidator.waitForPiece(i)
check pieceValidator.markPieceAsValid(i) == success()
check (await fut) == success()
test "awaiting for piece can be cancelled":
let pieceIndex = 0
let fut = pieceValidator.waitForPiece(pieceIndex)
check (await pieceValidator.cancelPiece(pieceIndex)) == success()
let res = catch(await fut)
check isFailure(res)
check res.error of CancelledError
test "all pieces can be cancelled":
let fut1 = pieceValidator.waitForPiece(1)
let fut2 = pieceValidator.waitForPiece(2)
await pieceValidator.cancel()
let res1 = catch(await fut1)
check isFailure(res1)
check res1.error of CancelledError
let res2 = catch(await fut2)
check isFailure(res2)
check res2.error of CancelledError
test "awaiting all pieces sequentially":
let numberOfPieces = exampleTorrentManifest.info.pieces.len
for i in 0 ..< numberOfPieces:
let fut = pieceValidator.waitForNextPiece()
check pieceValidator.confirmCurrentPiece()
check await fut
test "awaiting is independent from confirming":
let numberOfPieces = exampleTorrentManifest.info.pieces.len
var futs = newSeq[Future[bool]](numberOfPieces)
for i in 0 ..< numberOfPieces:
futs[i] = pieceValidator.waitForNextPiece()
for i in 0 ..< numberOfPieces:
check pieceValidator.confirmCurrentPiece()
for i in 0 ..< numberOfPieces:
check await futs[i]
test "sequential validation of blocks":
let blocksInPieces = newSeqWith(
numOfPieces,
newSeqWith(
pieceLength div BitTorrentBlockSize.int, Block.example(BitTorrentBlockSize.int)
),
)
var pieces = newSeq[MultiHash](blocksInPieces.len)
for i in 0 ..< blocksInPieces.len:
let blocks = blocksInPieces[i]
var pieceHashCtx: sha1
pieceHashCtx.init()
for blk in blocks:
pieceHashCtx.update(blk.data)
pieces[i] = MultiHash.init($Sha1HashCodec, pieceHashCtx.finish()).tryGet
let info = BitTorrentInfo(
length: contentLength,
pieceLength: pieceLength,
pieces: pieces,
name: "data.bin".some,
)
let manifestCid = Cid.example()
let torrentManifest =
newBitTorrentManifest(info = info, codexManifestCid = manifestCid)
let codexManifest = Manifest.new(
treeCid = Cid.example,
blockSize = BitTorrentBlockSize.NBytes,
datasetSize = info.length.NBytes,
filename = info.name,
mimetype = "application/octet-stream".some,
)
pieceValidator = newTorrentPieceValidator(torrentManifest, codexManifest)
for blks in blocksInPieces:
# streaming client will wait on the piece validator to validate the piece
let fut = pieceValidator.waitForNextPiece()
# during prefetch we will validate each piece sequentially
# piece validator maintains internal iterators in its object
# to keep track of the validation order
check pieceValidator.validatePiece(blks)
# after piece is validated, the prefetch task will confirm the piece
# again, using internal state, the validator knows which piece to confirm
check pieceValidator.confirmCurrentPiece()
# the fut will be resolved after the piece is confirmed
# and the streaming client can continue
check await fut