From 494765ba744e40558b6f93170ed8b77857f87602 Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Fri, 28 Feb 2025 16:32:17 +0100 Subject: [PATCH] general idea of BitTorrent integration --- codex/node.nim | 258 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 257 insertions(+), 1 deletion(-) diff --git a/codex/node.nim b/codex/node.nim index b742df2c..b98595c7 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -47,6 +47,8 @@ import ./logutils import ./utils/asynciter import ./utils/trackedfutures +from ./codextypes import InfoHashV1Codec + export logutils logScope: @@ -94,6 +96,150 @@ func engine*(self: CodexNodeRef): BlockExcEngine = func discovery*(self: CodexNodeRef): Discovery = return self.discovery +type + BitTorrentPiece* = MultiHash + BitTorrentInfo* = ref object + length: uint64 + pieceLength: uint32 + pieces: seq[BitTorrentPiece] + name: ?string + + BitTorrentInfoHash* = MultiHash + + BitTorrentManifest* = ref object + info: BitTorrentInfo + codexManifestCid: Cid + +# const InfoHashV1Codec* = multiCodec("torrent-info-hash-v1") + +proc newBitTorrentManifest( + info: BitTorrentInfo, codexManifestCid: Cid +): BitTorrentManifest = + BitTorrentManifest(info: info, codexManifestCid: codexManifestCid) + +proc write*(pb: var ProtoBuffer, field: int, value: BitTorrentPiece) = + var ipb = initProtoBuffer() + ipb.write(1, value.data.buffer) + ipb.finish() + pb.write(field, ipb) + +proc write*(pb: var ProtoBuffer, field: int, value: BitTorrentInfo) = + var ipb = initProtoBuffer() + ipb.write(1, value.length) + ipb.write(2, value.pieceLength) + for piece in value.pieces: + ipb.write(3, piece) + if name =? value.name: + ipb.write(4, name) + ipb.finish() + pb.write(field, ipb) + +proc encode*(manifest: BitTorrentManifest): seq[byte] = + # ```protobuf + # Message BitTorrentManifest { + # Message Piece { + # bytes data = 1; + # } + # + # Message BitTorrentInfo { + # uint32 length = 1; + # uint32 pieceLength = 2; + # repeated Piece pieces = 3; + # optional string name = 4; + # } + # + # BitTorrentInfo info = 1; + # bytes codexManifestCid = 2; + # ``` + + var ipb = initProtoBuffer() + ipb.write(1, manifest.info) + ipb.write(2, manifest.codexManifestCid.data.buffer) + ipb.finish() + ipb.buffer + +proc decode*(_: type BitTorrentManifest, data: openArray[byte]): ?!BitTorrentManifest = + # ```protobuf + # Message BitTorrentManifest { + # Message Piece { + # bytes data = 1; + # } + # + # Message BitTorrentInfo { + # uint32 length = 1; + # uint32 pieceLength = 2; + # repeated Piece pieces = 3; + # optional string name = 4; + # } + # + # BitTorrentInfo info = 1; + # bytes codexManifestCid = 2; + # ``` + + var + pbNode = initProtoBuffer() + pbInfo: ProtoBuffer + length: uint64 + pieceLength: uint32 + pieces: seq[BitTorrentPiece] + piecesBytes: seq[seq[byte]] + name: string + cidBuf = newSeq[byte]() + codexManifestCid: Cid + + if pbNode.getField(1, pbInfo).isErr: + return failure("Unable to decode `info` from BitTorrentManifest") + + if pbInfo.getField(1, length).isErr: + return failure("Unable to decode `length` from BitTorrentInfo") + + if pbInfo.getField(2, pieceLength).isErr: + return failure("Unable to decode `pieceLength` from BitTorrentInfo") + + if ?pbInfo.getRepeatedField(3, piecesBytes).mapFailure: + for piece in piecesBytes: + var pbPiece = initProtoBuffer(piece) + var dataBuf = newSeq[byte]() + if pbPiece.getField(1, dataBuf).isErr: + return failure("Unable to decode `data` from BitTorrentPiece") + without mhash =? MultiHash.init("sha1", dataBuf).mapFailure, err: + return failure(err.msg) + pieces.add(mhash) + discard ?pbInfo.getField(4, name).mapFailure + + if ?pbNode.getField(2, cidBuf).mapFailure: + without cid =? Cid.init(cidBuf).mapFailure, err: + return failure(err.msg) + codexManifestCid = cid + + let info = BitTorrentInfo( + length: length, + pieceLength: pieceLength, + pieces: pieces, + name: if name.len > 0: name.some else: string.none, + ) + BitTorrentManifest(info: info, codexManifestCid: codexManifestCid).success + +proc storeBitTorrentManifest*( + self: CodexNodeRef, manifest: BitTorrentManifest, infoHash: BitTorrentInfoHash +): Future[?!bt.Block] {.async.} = + let encodedManifest = manifest.encode() + + without infoHashCid =? Cid.init(CIDv1, InfoHashV1Codec, infoHash).mapFailure, error: + trace "Unable to create CID for BitTorrent info hash" + return failure(error) + + without blk =? bt.Block.new(data = encodedManifest, cid = infoHashCid, verify = false), + error: + trace "Unable to create block from manifest" + return failure(error) + + if err =? (await self.networkStore.putBlock(blk)).errorOption: + trace "Unable to store BitTorrent manifest block", cid = blk.cid, err = err.msg + return failure(err) + + success blk + proc storeManifest*( self: CodexNodeRef, manifest: Manifest ): Future[?!bt.Block] {.async.} = @@ -134,7 +280,55 @@ proc fetchManifest*( trace "Decoded manifest", cid - return manifest.success + manifest.success + +func decode*(_: type BitTorrentManifest, blk: bt.Block): ?!BitTorrentManifest = + ## Decode a manifest using `decoder` + ## + + if not ?blk.cid.isTorrentInfoHash: + return failure "Cid not a torrent info hash codec" + + BitTorrentManifest.decode(blk.data) + +func validate*(self: BitTorrentManifest, cid: Cid): ?!bool = + # First stage of validation: + # (1) bencode the info dictionary from the torrent manifest + # (2) hash the bencoded info dictionary + # (3) compare the hash with the info hash in the cid + # + # This will prove that our info metadata is correct. + # It still does not proof that the "codexManifestCid" from the torrent manifest + # points to genuine content. This validation will be done while fetching blocks + # where we will be able to detect that the aggregated pieces do not match + # the hashes in the info dictionary from the torrent manifest. + return success true + +proc fetchTorrentManifest*( + self: CodexNodeRef, cid: Cid +): Future[?!BitTorrentManifest] {.async.} = + if err =? cid.isTorrentInfoHash.errorOption: + return failure "CID has invalid content type for torrent info hash {$cid}" + + trace "Retrieving torrent manifest for cid", cid + + without blk =? await self.networkStore.getBlock(BlockAddress.init(cid)), err: + trace "Error retrieve manifest block", cid, err = err.msg + return failure err + + trace "Decoding torrent manifest for cid", cid + + without torrentManifest =? BitTorrentManifest.decode(blk), err: + trace "Unable to decode torrent manifest", err = err.msg + return failure("Unable to decode torrent manifest") + + trace "Decoded torrent manifest", cid + + if err =? torrentManifest.validate(cid).errorOption: + trace "Torrent manifest does not match torrent info hash", cid + return failure "Torrent manifest does not match torrent info hash {$cid}" + + return torrentManifest.success proc findPeer*(self: CodexNodeRef, peerId: PeerId): Future[?PeerRecord] {.async.} = ## Find peer using the discovery service from the given CodexNode @@ -478,6 +672,68 @@ proc store*( return manifestBlk.cid.success +proc storeBitTorrent*( + self: CodexNodeRef, + stream: LPStream, + info: BitTorrentInfo, + infoHash: BitTorrentInfoHash, + mimetype: ?string = string.none, +): Future[?!Cid] {.async.} = + info "Storing BitTorrent data" + + without codexManifestCid =? + await self.store(stream, filename = info.name, mimetype = mimetype): + return failure("Unable to store BitTorrent data") + + let bitTorrentManifest = newBitTorrentManifest(info, codexManifestCid) + + without manifestBlk =? await self.storeBitTorrentManifest( + bitTorrentManifest, infoHash + ), err: + error "Unable to store manifest" + return failure(err) + + info "Stored BitTorrent data", + manifestCid = manifestBlk.cid, codeManifestCid = codexManifestCid + + success manifestBlk.cid + +proc streamTorrent( + self: CodexNodeRef, torrentManifest: BitTorrentManifest, codexManifest: Manifest +): Future[?!LPStream] {.async.} = + trace "Retrieving pieces from torrent" + + # Fetch torrent pieces and validate that each complete piece matches hashes + # in the torrent manifest + + # WIP... + + let stream = LPStream(StoreStream.new(self.networkStore, codexManifest, pad = false)) + + stream.success + +proc retrieveInfoHash*( + self: CodexNodeRef, infoHashString: string +): Future[?!LPStream] {.async.} = + without infoHash =? MultiHash.init("sha1", infoHashString.hexToSeqByte).mapFailure, + err: + return failure(err) + + without infoHashCid =? Cid.init(CIDv1, InfoHashV1Codec, infoHash).mapFailure, error: + trace "Unable to create CID for BitTorrent info hash" + return failure(error) + + without torrentManifest =? (await self.fetchTorrentManifest(infoHashCid)), err: + trace "Unable to fetch Torrent Manifest" + return failure(err) + + without codexManifest =? (await self.fetchManifest(torrentManifest.codexManifestCid)), + err: + trace "Unable to fetch Codex Manifest for torrent info hash" + return failure(err) + + await self.streamTorrent(torrentManifest, codexManifest) + proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} = without cidsIter =? await self.networkStore.listBlocks(BlockType.Manifest): warn "Failed to listBlocks"