adds torrent streaming and piece validation

This commit is contained in:
Marcin Czenko 2025-03-02 03:21:21 +01:00
parent cd8d2b130a
commit 0d9bad674a
No known key found for this signature in database
GPG Key ID: 33DEA0C8E30937C0

View File

@ -85,6 +85,8 @@ type
BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {.
gcsafe, async: (raises: [CancelledError])
.}
PieceProc* =
proc(blocks: seq[bt.Block], pieceIndex: int): Future[?!void] {.gcsafe, raises: [].}
func switch*(self: CodexNodeRef): Switch =
return self.switch
@ -388,18 +390,100 @@ proc retrieve*(
await self.streamEntireDataset(manifest, cid)
proc fetchPieces*(
self: CodexNodeRef,
cid: Cid,
blockIter: Iter[int],
pieceIter: Iter[int],
numOfBlocksPerPiece: int,
onPiece: PieceProc,
): Future[?!void] {.async, gcsafe.} =
while not blockIter.finished:
let blocks = collect:
for i in 0 ..< numOfBlocksPerPiece:
if not blockIter.finished:
let address = BlockAddress.init(cid, blockIter.next())
self.networkStore.getBlock(address)
if blocksErr =? (await allFutureResult(blocks)).errorOption:
return failure(blocksErr)
if pieceErr =?
(await onPiece(blocks.mapIt(it.read.get), pieceIter.next())).errorOption:
return failure(pieceErr)
await sleepAsync(1.millis)
success()
proc fetchPieces*(
self: CodexNodeRef,
torrentManifest: BitTorrentManifest,
codexManifest: Manifest,
onPiece: PieceProc,
): Future[?!void] =
trace "Fetching torrent pieces"
let numOfPieces = torrentManifest.info.pieces.len
let numOfBlocksPerPiece =
torrentManifest.info.pieceLength.int div codexManifest.blockSize.int
let blockIter = Iter[int].new(0 ..< codexManifest.blocksCount)
let pieceIter = Iter[int].new(0 ..< numOfPieces)
self.fetchPieces(
codexManifest.treeCid, blockIter, pieceIter, numOfBlocksPerPiece, onPiece
)
proc streamTorrent(
self: CodexNodeRef, torrentManifest: BitTorrentManifest, codexManifest: Manifest
): Future[?!LPStream] {.async.} =
trace "Retrieving pieces from torrent"
# Fetch torrent pieces and validate that each complete piece matches hashes
# in the torrent manifest
# WIP...
let stream = LPStream(StoreStream.new(self.networkStore, codexManifest, pad = false))
var jobs: seq[Future[void]]
proc onPieceReceived(
blocks: seq[bt.Block], pieceIndex: int
): Future[?!void] {.async.} =
trace "Fetched torrent piece - verifying..."
var pieceHashCtx: sha1
pieceHashCtx.init()
for blk in blocks:
pieceHashCtx.update(blk.data)
let pieceHash = pieceHashCtx.finish()
if (pieceHash != torrentManifest.info.pieces[pieceIndex]):
error "Piece verification failed", pieceIndex = pieceIndex
return failure("Piece verification failed")
# great success
success()
proc prefetch(): Future[void] {.async.} =
try:
if err =? (
await self.fetchPieces(torrentManifest, codexManifest, onPieceReceived)
).errorOption:
error "Unable to fetch blocks", err = err.msg
await stream.close()
except CancelledError:
trace "Prefetch job cancelled"
except CatchableError as exc:
error "Error fetching blocks", exc = exc.msg
jobs.add(prefetch())
# Monitor stream completion and cancel background jobs when done
proc monitorStream() {.async.} =
try:
await stream.join()
finally:
await allFutures(jobs.mapIt(it.cancelAndWait))
self.trackedFutures.track(monitorStream())
trace "Creating store stream for torrent manifest"
stream.success
proc retrieveInfoHash*(
@ -574,7 +658,9 @@ proc storeBitTorrent*(
info "Storing BitTorrent data"
without codexManifestCid =?
await self.store(stream, filename = info.name, mimetype = mimetype):
await self.store(
stream, filename = info.name, mimetype = mimetype, blockSize = NBytes 1024 * 16
):
return failure("Unable to store BitTorrent data")
let bitTorrentManifest = newBitTorrentManifest(info, codexManifestCid)