updates and integrates torrentdownloader into api and node

This commit is contained in:
Marcin Czenko 2025-03-27 16:10:12 +01:00
parent 2aa91e5f93
commit 534cb7193e
No known key found for this signature in database
GPG Key ID: 33DEA0C8E30937C0
4 changed files with 146 additions and 272 deletions

View File

@ -1,13 +1,20 @@
## Nim-Codex
## Copyright (c) 2025 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [].}
# import std/asyncstreams
import std/sequtils
import std/sugar
import pkg/chronos
import pkg/libp2p/multihash
import pkg/questionable/results
# import ../rng
import ../logutils
import ../utils/iter
import ../utils/trackedfutures
@ -42,111 +49,11 @@ type
queue: AsyncQueue[TorrentPiece]
trackedFutures: TrackedFutures
proc newTorrentPiece*(
pieceIndex: int, pieceHash: MultiHash, blockIndexStart: int, blockIndexEnd: int
): TorrentPiece =
TorrentPiece(
pieceIndex: pieceIndex,
pieceHash: pieceHash,
blockIndexStart: blockIndexStart,
blockIndexEnd: blockIndexEnd,
handle: cast[PieceHandle](newFuture[void]("PieceValidator.newTorrentPiece")),
)
proc newTorrentDownloader*(
torrentManifest: BitTorrentManifest,
codexManifest: Manifest,
networkStore: NetworkStore,
): ?!TorrentDownloader =
let
blocksCount = codexManifest.blocksCount
numOfPieces = torrentManifest.info.pieces.len
numOfBlocksPerPiece =
torrentManifest.info.pieceLength.int div codexManifest.blockSize.int
numOfBlocksInLastPiece = blocksCount - (numOfBlocksPerPiece * (numOfPieces - 1))
let pieces = collect:
for i in 0 ..< numOfPieces:
var blockIndexEnd = ((i + 1) * numOfBlocksPerPiece) - 1
if i == numOfPieces - 1:
# last piece can have less blocks than numOfBlocksPerPiece
blockIndexEnd = i * numOfBlocksPerPiece + numOfBlocksInLastPiece - 1
let piece = newTorrentPiece(
pieceIndex = i,
pieceHash = torrentManifest.info.pieces[i],
blockIndexStart = i * numOfBlocksPerPiece,
blockIndexEnd = blockIndexEnd,
)
piece
let queue = newAsyncQueue[TorrentPiece](maxsize = numOfPieces)
let iter = Iter.new(0 ..< numOfPieces)
var pieceDownloadSequence = newSeqWith(numOfPieces, iter.next())
# optional: randomize the order of pieces
# not sure if this is such a great idea when streaming content
# Rng.instance.shuffle(pieceDownloadSequence)
trace "Piece download sequence", pieceDownloadSequence
for i in pieceDownloadSequence:
try:
queue.addLastNoWait(pieces[i])
except AsyncQueueFullError:
raiseAssert "Fatal: could not add pieces to queue"
TorrentDownloader(
torrentManifest: torrentManifest,
codexManifest: codexManifest,
networkStore: networkStore,
numberOfPieces: numOfPieces,
numberOfBlocksPerPiece: numOfBlocksPerPiece,
pieces: pieces,
waitIter: Iter[int].new(0 ..< numOfPieces),
blockIter: Iter[int].empty(),
pieceIndex: 0,
queue: queue,
trackedFutures: TrackedFutures(),
).success
proc getNewBlockIterator(piece: TorrentPiece): Iter[int] =
Iter[int].new(piece.blockIndexStart .. piece.blockIndexEnd)
func numberOfBlocks(piece: TorrentPiece): int =
piece.blockIndexEnd - piece.blockIndexStart + 1
func numberOfBlocksInPiece*(self: TorrentDownloader, pieceIndex: int): ?!int =
if pieceIndex < 0 or pieceIndex >= self.numberOfPieces:
return failure("Invalid piece index")
let piece = self.pieces[pieceIndex]
success(piece.numberOfBlocks)
proc getNewBlocksInPieceIterator*(
self: TorrentDownloader, pieceIndex: int
): ?!Iter[int] =
if pieceIndex < 0 or pieceIndex >= self.numberOfPieces:
return failure("Invalid piece index")
let piece = self.pieces[pieceIndex]
success(piece.getNewBlockIterator())
proc getNewPieceIterator*(self: TorrentDownloader): Iter[int] =
Iter[int].new(0 ..< self.numberOfPieces)
# proc getNewBlocksPerPieceIterator*(self: TorrentDownloader): Iter[int] =
# Iter[int].new(0 ..< self.numberOfBlocksPerPiece)
proc waitForNextPiece*(
self: TorrentDownloader
): Future[int] {.async: (raises: [CancelledError]).} =
if self.waitIter.finished:
return -1
let pieceIndex = self.waitIter.next()
await self.pieces[pieceIndex].handle
pieceIndex
proc cancel*(self: TorrentDownloader): Future[void] {.async: (raises: []).} =
await noCancel allFutures(self.pieces.mapIt(it.handle.cancelAndWait))
proc getNewBlockIterator(piece: TorrentPiece): Iter[int] =
Iter[int].new(piece.blockIndexStart .. piece.blockIndexEnd)
proc validate(piece: TorrentPiece, blocks: seq[Block]): ?!void {.raises: [].} =
var pieceHashCtx: sha1
@ -223,6 +130,37 @@ proc fetchPiece(
success()
func numberOfBlocksInPiece*(self: TorrentDownloader, pieceIndex: int): ?!int =
if pieceIndex < 0 or pieceIndex >= self.numberOfPieces:
return failure("Invalid piece index")
let piece = self.pieces[pieceIndex]
success(piece.numberOfBlocks)
###########################################################################
# Public API
###########################################################################
proc getNewBlockIterator*(self: TorrentDownloader, pieceIndex: int): ?!Iter[int] =
if pieceIndex < 0 or pieceIndex >= self.numberOfPieces:
return failure("Invalid piece index")
let piece = self.pieces[pieceIndex]
success(piece.getNewBlockIterator())
proc getNewPieceIterator*(self: TorrentDownloader): Iter[int] =
Iter[int].new(0 ..< self.numberOfPieces)
proc waitForNextPiece*(
self: TorrentDownloader
): Future[int] {.async: (raises: [CancelledError]).} =
if self.waitIter.finished:
return -1
let pieceIndex = self.waitIter.next()
await self.pieces[pieceIndex].handle
pieceIndex
proc cancel*(self: TorrentDownloader): Future[void] {.async: (raises: []).} =
await noCancel allFutures(self.pieces.mapIt(it.handle.cancelAndWait))
proc downloadPieces*(self: TorrentDownloader): Future[void] {.async: (raises: []).} =
try:
while not self.queue.empty:
@ -249,33 +187,6 @@ proc downloadPieces*(self: TorrentDownloader): Future[void] {.async: (raises: []
finally:
await noCancel self.cancel()
# proc downloadPieces*(self: TorrentDownloader): Future[?!void] {.async: (raises: []).} =
# try:
# while not self.queue.empty:
# let piece = self.queue.popFirstNoWait()
# if err =? (await self.fetchPiece(piece)).errorOption:
# error "Could not fetch piece", err = err.msg
# # add the piece to the end of the queue
# # to try to fetch the piece again
# self.queue.addLastNoWait(piece)
# continue
# else:
# # piece fetched and validated successfully
# # mark it as ready
# piece.handle.complete()
# await sleepAsync(1.millis)
# except CancelledError:
# trace "Downloading pieces cancelled"
# except AsyncQueueFullError as e:
# error "Queue is full", error = e.msg
# return failure e
# except AsyncQueueEmptyError as e:
# error "Trying to pop from empty queue", error = e.msg
# return failure e
# finally:
# await noCancel self.cancel()
# success()
proc getNext*(
self: TorrentDownloader
): Future[?!(int, seq[byte])] {.async: (raises: []).} =
@ -283,7 +194,7 @@ proc getNext*(
if self.pieceIndex == -1:
return success((-1, newSeq[byte]()))
if self.blockIter.finished:
trace "Waiting for piece", pieceIndex = self.pieceIndex
trace "Waiting for piece to be ready"
self.pieceIndex = await self.waitForNextPiece()
trace "Got piece", pieceIndex = self.pieceIndex
if self.pieceIndex == -1:
@ -317,29 +228,70 @@ proc stop*(self: TorrentDownloader) {.async.} =
await noCancel self.cancel()
await noCancel self.trackedFutures.cancelTracked()
#################################################################
# Previous API, keeping it for now, probably will not be needed
#
#################################################################
proc newTorrentPiece*(
pieceIndex: int, pieceHash: MultiHash, blockIndexStart: int, blockIndexEnd: int
): TorrentPiece =
TorrentPiece(
pieceIndex: pieceIndex,
pieceHash: pieceHash,
blockIndexStart: blockIndexStart,
blockIndexEnd: blockIndexEnd,
handle: cast[PieceHandle](newFuture[void]("PieceValidator.newTorrentPiece")),
)
proc waitForPiece*(
self: TorrentDownloader, index: int
): Future[?!void] {.async: (raises: [CancelledError]).} =
if index < 0 or index >= self.pieces.len:
return failure("Invalid piece index")
await self.pieces[index].handle
success()
proc newTorrentDownloader*(
torrentManifest: BitTorrentManifest,
codexManifest: Manifest,
networkStore: NetworkStore,
): ?!TorrentDownloader =
let
blocksCount = codexManifest.blocksCount
numOfPieces = torrentManifest.info.pieces.len
numOfBlocksPerPiece =
torrentManifest.info.pieceLength.int div codexManifest.blockSize.int
numOfBlocksInLastPiece = blocksCount - (numOfBlocksPerPiece * (numOfPieces - 1))
proc cancelPiece*(
self: TorrentDownloader, index: int
): Future[?!void] {.async: (raises: [CancelledError]).} =
if index < 0 or index >= self.pieces.len:
return failure("Invalid piece index")
await noCancel self.pieces[index].handle.cancelAndWait()
success()
let pieces = collect:
for i in 0 ..< numOfPieces:
var blockIndexEnd = ((i + 1) * numOfBlocksPerPiece) - 1
if i == numOfPieces - 1:
# last piece can have less blocks than numOfBlocksPerPiece
blockIndexEnd = i * numOfBlocksPerPiece + numOfBlocksInLastPiece - 1
proc confirmPiece*(self: TorrentDownloader, index: int): ?!void {.raises: [].} =
if index < 0 or index >= self.pieces.len:
return failure("Invalid piece index")
self.pieces[index].handle.complete()
success()
let piece = newTorrentPiece(
pieceIndex = i,
pieceHash = torrentManifest.info.pieces[i],
blockIndexStart = i * numOfBlocksPerPiece,
blockIndexEnd = blockIndexEnd,
)
piece
let queue = newAsyncQueue[TorrentPiece](maxsize = numOfPieces)
let iter = Iter.new(0 ..< numOfPieces)
var pieceDownloadSequence = newSeqWith(numOfPieces, iter.next())
# optional: randomize the order of pieces
# not sure if this is such a great idea when streaming content
# Rng.instance.shuffle(pieceDownloadSequence)
trace "Piece download sequence", pieceDownloadSequence
for i in pieceDownloadSequence:
try:
queue.addLastNoWait(pieces[i])
except AsyncQueueFullError:
raiseAssert "Fatal: could not add pieces to queue"
TorrentDownloader(
torrentManifest: torrentManifest,
codexManifest: codexManifest,
networkStore: networkStore,
numberOfPieces: numOfPieces,
numberOfBlocksPerPiece: numOfBlocksPerPiece,
pieces: pieces,
waitIter: Iter[int].new(0 ..< numOfPieces),
blockIter: Iter[int].empty(),
pieceIndex: 0,
queue: queue,
trackedFutures: TrackedFutures(),
).success

View File

@ -50,7 +50,7 @@ import ./utils/trackedfutures
# bittorrent
from ./codextypes import InfoHashV1Codec
import ./bittorrent/manifest
import ./bittorrent/piecevalidator
import ./bittorrent/torrentdownloader
export logutils
@ -399,90 +399,10 @@ proc retrieve*(
await self.streamEntireDataset(manifest, cid)
proc validatePiece(
self: CodexNodeRef, pieceValidator: TorrentPieceValidator, blocks: seq[bt.Block]
): ?!void {.raises: [].} =
trace "Fetched complete torrent piece - verifying..."
let pieceIndex = pieceValidator.validatePiece(blocks)
if pieceIndex < 0:
error "Piece verification failed", pieceIndex = pieceIndex
return failure(fmt"Piece verification failed for {pieceIndex=}")
trace "Piece successfully verified", pieceIndex
let confirmedPieceIndex = pieceValidator.confirmCurrentPiece()
if pieceIndex != confirmedPieceIndex:
error "Piece confirmation failed",
pieceIndex = pieceIndex, confirmedPieceIndex = confirmedPieceIndex
return
failure(fmt"Piece confirmation failed for {pieceIndex=}, {confirmedPieceIndex=}")
success()
proc fetchPieces*(
self: CodexNodeRef,
torrentManifest: BitTorrentManifest,
codexManifest: Manifest,
pieceValidator: TorrentPieceValidator,
): Future[?!void] {.async: (raises: [CancelledError]).} =
let cid = codexManifest.treeCid
let numOfBlocksPerPiece = pieceValidator.numberOfBlocksPerPiece
let blockIter = Iter[int].new(0 ..< codexManifest.blocksCount)
while not blockIter.finished:
let blockFutures = collect:
for i in 0 ..< numOfBlocksPerPiece:
if not blockIter.finished:
let address = BlockAddress.init(cid, blockIter.next())
self.networkStore.getBlock(address)
without blockResults =? await allFinishedValues(blockFutures), err:
return failure(err)
let numOfFailedBlocks = blockResults.countIt(it.isFailure)
if numOfFailedBlocks > 0:
return failure("Some blocks failed to fetch")
if err =? self.validatePiece(pieceValidator, blockResults.mapIt(it.get)).errorOption:
return failure(err)
await sleepAsync(1.millis)
success()
proc streamTorrent*(
self: CodexNodeRef,
torrentManifest: BitTorrentManifest,
codexManifest: Manifest,
pieceValidator: TorrentPieceValidator,
): Future[LPStream] {.async: (raises: []).} =
trace "Retrieving pieces from torrent"
let stream = LPStream(StoreStream.new(self.networkStore, codexManifest, pad = false))
proc prefetch(): Future[void] {.async: (raises: []).} =
try:
if err =? (await self.fetchPieces(torrentManifest, codexManifest, pieceValidator)).errorOption:
error "Unable to fetch blocks", err = err.msg
await noCancel pieceValidator.cancel()
await noCancel stream.close()
except CancelledError:
trace "Prefetch cancelled"
let prefetchTask = prefetch()
# Monitor stream completion and cancel background jobs when done
proc monitorStream() {.async: (raises: []).} =
try:
await stream.join()
except CancelledError:
trace "Stream cancelled"
finally:
await noCancel prefetchTask.cancelAndWait
self.trackedFutures.track(monitorStream())
trace "Creating store stream for torrent manifest"
stream
proc getTorrentDownloader*(
self: CodexNodeRef, torrentManifest: BitTorrentManifest, codexManifest: Manifest
): ?!TorrentDownloader =
newTorrentDownloader(torrentManifest, codexManifest, self.networkStore)
proc retrieveTorrent*(
self: CodexNodeRef, infoHash: MultiHash

View File

@ -41,7 +41,7 @@ import ../stores
import ../utils/iter
import ../utils/options
import ../bittorrent/manifest
import ../bittorrent/piecevalidator
import ../bittorrent/torrentdownloader
import ./coders
import ./json
@ -165,7 +165,7 @@ proc retrieveInfoHash(
## Download torrent from the node in a streaming
## manner
##
var stream: LPStream
var torrentDownloader: TorrentDownloader
var bytes = 0
try:
@ -189,37 +189,31 @@ proc retrieveInfoHash(
else:
resp.setHeader("Content-Disposition", "attachment")
await resp.prepareChunked()
let contentLength = codexManifest.datasetSize
resp.setHeader("Content-Length", $(contentLength.int))
let torrentPieceValidator = newTorrentPieceValidator(torrentManifest, codexManifest)
await resp.prepare(HttpResponseStreamType.Plain)
let stream =
await node.streamTorrent(torrentManifest, codexManifest, torrentPieceValidator)
without torrentDownloader =?
node.getTorrentDownloader(torrentManifest, codexManifest), err:
error "Unable to stream torrent", err = err.msg
resp.status = Http500
await resp.sendBody(err.msg)
return
while not stream.atEof:
trace "Waiting for piece..."
let pieceIndex = await torrentPieceValidator.waitForNextPiece()
torrentDownloader.start()
if -1 == pieceIndex:
warn "No more torrent pieces expected. TorrentPieceValidator might be out of sync!"
break
while not torrentDownloader.finished:
without (blockIndex, data) =? (await torrentDownloader.getNext()), err:
error "Error streaming blocks", err = err.msg
resp.status = Http500
if resp.isPending():
await resp.sendBody(err.msg)
return
trace "streaming block", blockIndex, len = data.len
bytes += data.len
await resp.sendChunk(addr data[0], data.len)
trace "Got piece", pieceIndex
let blocksPerPieceIter = torrentPieceValidator.getNewBlocksPerPieceIterator()
while not blocksPerPieceIter.finished and not stream.atEof:
var buff = newSeqUninitialized[byte](BitTorrentBlockSize.int)
# wait for the next the piece to prefetch
let len = await stream.readOnce(addr buff[0], buff.len)
buff.setLen(len)
if buff.len <= 0:
break
bytes += buff.len
await resp.sendChunk(addr buff[0], buff.len)
discard blocksPerPieceIter.next()
await resp.finish()
codex_api_downloads.inc()
except CancelledError as exc:
@ -232,8 +226,7 @@ proc retrieveInfoHash(
await resp.sendBody(exc.msg)
finally:
info "Sent bytes for torrent", infoHash = $infoHash, bytes
if not stream.isNil:
await stream.close()
await torrentDownloader.stop()
proc buildCorsHeaders(
httpMethod: string, allowedOrigin: Option[string]

View File

@ -1,3 +1,12 @@
## Nim-Codex
## Copyright (c) 2025 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/importutils # private access
import pkg/libp2p/[cid, multicodec, multihash]
@ -107,7 +116,7 @@ asyncchecksuite "Torrent Downloader":
let treeCid = codexManifest.treeCid
var pieceHashCtx: sha1
pieceHashCtx.init()
let blockIter = torrentDownloader.getNewBlocksInPieceIterator(pieceIndex).tryGet
let blockIter = torrentDownloader.getNewBlockIterator(pieceIndex).tryGet
let blks = newSeq[Block]()
while not blockIter.finished:
let blockIndex = blockIter.next()
@ -162,7 +171,7 @@ asyncchecksuite "Torrent Downloader":
check piece.blockIndexEnd == expectedBlockIndexEnd
check torrentDownloader.numberOfBlocksInPiece(index).tryGet ==
expectedNumOfBlocksInPiece
let blockIterator = torrentDownloader.getNewBlocksInPieceIterator(index).tryGet
let blockIterator = torrentDownloader.getNewBlockIterator(index).tryGet
for blkIndex in expectedBlockIndexStart .. expectedBlockIndexEnd:
check blkIndex == blockIterator.next()
check blockIterator.finished == true