diff --git a/codex/node.nim b/codex/node.nim index 51485663..f5773a24 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -85,6 +85,8 @@ type BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {. gcsafe, async: (raises: [CancelledError]) .} + PieceProc* = + proc(blocks: seq[bt.Block], pieceIndex: int): Future[?!void] {.gcsafe, raises: [].} func switch*(self: CodexNodeRef): Switch = return self.switch @@ -388,18 +390,100 @@ proc retrieve*( await self.streamEntireDataset(manifest, cid) +proc fetchPieces*( + self: CodexNodeRef, + cid: Cid, + blockIter: Iter[int], + pieceIter: Iter[int], + numOfBlocksPerPiece: int, + onPiece: PieceProc, +): Future[?!void] {.async, gcsafe.} = + while not blockIter.finished: + let blocks = collect: + for i in 0 ..< numOfBlocksPerPiece: + if not blockIter.finished: + let address = BlockAddress.init(cid, blockIter.next()) + self.networkStore.getBlock(address) + + if blocksErr =? (await allFutureResult(blocks)).errorOption: + return failure(blocksErr) + + if pieceErr =? + (await onPiece(blocks.mapIt(it.read.get), pieceIter.next())).errorOption: + return failure(pieceErr) + + await sleepAsync(1.millis) + + success() + +proc fetchPieces*( + self: CodexNodeRef, + torrentManifest: BitTorrentManifest, + codexManifest: Manifest, + onPiece: PieceProc, +): Future[?!void] = + trace "Fetching torrent pieces" + + let numOfPieces = torrentManifest.info.pieces.len + let numOfBlocksPerPiece = + torrentManifest.info.pieceLength.int div codexManifest.blockSize.int + let blockIter = Iter[int].new(0 ..< codexManifest.blocksCount) + let pieceIter = Iter[int].new(0 ..< numOfPieces) + self.fetchPieces( + codexManifest.treeCid, blockIter, pieceIter, numOfBlocksPerPiece, onPiece + ) + proc streamTorrent( self: CodexNodeRef, torrentManifest: BitTorrentManifest, codexManifest: Manifest ): Future[?!LPStream] {.async.} = trace "Retrieving pieces from torrent" - - # Fetch torrent pieces and validate that each complete piece matches hashes - # in the torrent manifest - - # WIP... - let stream = LPStream(StoreStream.new(self.networkStore, codexManifest, pad = false)) + var jobs: seq[Future[void]] + proc onPieceReceived( + blocks: seq[bt.Block], pieceIndex: int + ): Future[?!void] {.async.} = + trace "Fetched torrent piece - verifying..." + + var pieceHashCtx: sha1 + pieceHashCtx.init() + + for blk in blocks: + pieceHashCtx.update(blk.data) + + let pieceHash = pieceHashCtx.finish() + + if (pieceHash != torrentManifest.info.pieces[pieceIndex]): + error "Piece verification failed", pieceIndex = pieceIndex + return failure("Piece verification failed") + + # great success + success() + + proc prefetch(): Future[void] {.async.} = + try: + if err =? ( + await self.fetchPieces(torrentManifest, codexManifest, onPieceReceived) + ).errorOption: + error "Unable to fetch blocks", err = err.msg + await stream.close() + except CancelledError: + trace "Prefetch job cancelled" + except CatchableError as exc: + error "Error fetching blocks", exc = exc.msg + + jobs.add(prefetch()) + + # Monitor stream completion and cancel background jobs when done + proc monitorStream() {.async.} = + try: + await stream.join() + finally: + await allFutures(jobs.mapIt(it.cancelAndWait)) + + self.trackedFutures.track(monitorStream()) + + trace "Creating store stream for torrent manifest" stream.success proc retrieveInfoHash*( @@ -574,7 +658,9 @@ proc storeBitTorrent*( info "Storing BitTorrent data" without codexManifestCid =? - await self.store(stream, filename = info.name, mimetype = mimetype): + await self.store( + stream, filename = info.name, mimetype = mimetype, blockSize = NBytes 1024 * 16 + ): return failure("Unable to store BitTorrent data") let bitTorrentManifest = newBitTorrentManifest(info, codexManifestCid)