From cd8d2b130a5d5a6e2e502ad0c56a39fff113ab7d Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Sat, 1 Mar 2025 18:17:19 +0100 Subject: [PATCH] moves BitTorrent stuff around a bit to de-clutter node.nim --- codex/bittorrent/manifest.nim | 5 + codex/bittorrent/manifest/decoding.nim | 78 +++++++++ codex/bittorrent/manifest/encoding.nim | 47 ++++++ codex/bittorrent/manifest/manifest.nim | 36 ++++ codex/node.nim | 220 +++++-------------------- 5 files changed, 204 insertions(+), 182 deletions(-) create mode 100644 codex/bittorrent/manifest.nim create mode 100644 codex/bittorrent/manifest/decoding.nim create mode 100644 codex/bittorrent/manifest/encoding.nim create mode 100644 codex/bittorrent/manifest/manifest.nim diff --git a/codex/bittorrent/manifest.nim b/codex/bittorrent/manifest.nim new file mode 100644 index 00000000..ac8cdb58 --- /dev/null +++ b/codex/bittorrent/manifest.nim @@ -0,0 +1,5 @@ +import ./manifest/manifest +import ./manifest/encoding +import ./manifest/decoding + +export manifest, encoding, decoding diff --git a/codex/bittorrent/manifest/decoding.nim b/codex/bittorrent/manifest/decoding.nim new file mode 100644 index 00000000..be312a1e --- /dev/null +++ b/codex/bittorrent/manifest/decoding.nim @@ -0,0 +1,78 @@ +import pkg/libp2p/cid +import pkg/libp2p/protobuf/minprotobuf + +import pkg/questionable/results + +import ../../blocktype +import ./manifest + +proc decode(_: type BitTorrentManifest, data: openArray[byte]): ?!BitTorrentManifest = + # ```protobuf + # Message BitTorrentManifest { + # Message Piece { + # bytes data = 1; + # } + # + # Message BitTorrentInfo { + # uint32 length = 1; + # uint32 pieceLength = 2; + # repeated Piece pieces = 3; + # optional string name = 4; + # } + # + # BitTorrentInfo info = 1; + # bytes codexManifestCid = 2; + # ``` + + var + pbNode = initProtoBuffer() + pbInfo: ProtoBuffer + length: uint64 + pieceLength: uint32 + pieces: seq[BitTorrentPiece] + piecesBytes: seq[seq[byte]] + name: string + cidBuf = newSeq[byte]() + codexManifestCid: Cid + + if pbNode.getField(1, pbInfo).isErr: + return failure("Unable to decode `info` from BitTorrentManifest") + + if pbInfo.getField(1, length).isErr: + return failure("Unable to decode `length` from BitTorrentInfo") + + if pbInfo.getField(2, pieceLength).isErr: + return failure("Unable to decode `pieceLength` from BitTorrentInfo") + + if ?pbInfo.getRepeatedField(3, piecesBytes).mapFailure: + for piece in piecesBytes: + var pbPiece = initProtoBuffer(piece) + var dataBuf = newSeq[byte]() + if pbPiece.getField(1, dataBuf).isErr: + return failure("Unable to decode `data` from BitTorrentPiece") + without mhash =? MultiHash.init("sha1", dataBuf).mapFailure, err: + return failure(err.msg) + pieces.add(mhash) + discard ?pbInfo.getField(4, name).mapFailure + + if ?pbNode.getField(2, cidBuf).mapFailure: + without cid =? Cid.init(cidBuf).mapFailure, err: + return failure(err.msg) + codexManifestCid = cid + + let info = BitTorrentInfo( + length: length, + pieceLength: pieceLength, + pieces: pieces, + name: if name.len > 0: name.some else: string.none, + ) + BitTorrentManifest(info: info, codexManifestCid: codexManifestCid).success + +func decode*(_: type BitTorrentManifest, blk: Block): ?!BitTorrentManifest = + ## Decode a manifest using `decoder` + ## + + if not ?blk.cid.isTorrentInfoHash: + return failure "Cid not a torrent info hash codec" + + BitTorrentManifest.decode(blk.data) diff --git a/codex/bittorrent/manifest/encoding.nim b/codex/bittorrent/manifest/encoding.nim new file mode 100644 index 00000000..445aa6f5 --- /dev/null +++ b/codex/bittorrent/manifest/encoding.nim @@ -0,0 +1,47 @@ +import pkg/libp2p/cid +import pkg/libp2p/protobuf/minprotobuf + +import pkg/questionable/results + +import ./manifest + +proc write(pb: var ProtoBuffer, field: int, value: BitTorrentPiece) = + var ipb = initProtoBuffer() + ipb.write(1, value.data.buffer) + ipb.finish() + pb.write(field, ipb) + +proc write(pb: var ProtoBuffer, field: int, value: BitTorrentInfo) = + var ipb = initProtoBuffer() + ipb.write(1, value.length) + ipb.write(2, value.pieceLength) + for piece in value.pieces: + ipb.write(3, piece) + if name =? value.name: + ipb.write(4, name) + ipb.finish() + pb.write(field, ipb) + +proc encode*(manifest: BitTorrentManifest): seq[byte] = + # ```protobuf + # Message BitTorrentManifest { + # Message Piece { + # bytes data = 1; + # } + # + # Message BitTorrentInfo { + # uint32 length = 1; + # uint32 pieceLength = 2; + # repeated Piece pieces = 3; + # optional string name = 4; + # } + # + # BitTorrentInfo info = 1; + # bytes codexManifestCid = 2; + # ``` + + var ipb = initProtoBuffer() + ipb.write(1, manifest.info) + ipb.write(2, manifest.codexManifestCid.data.buffer) + ipb.finish() + ipb.buffer diff --git a/codex/bittorrent/manifest/manifest.nim b/codex/bittorrent/manifest/manifest.nim new file mode 100644 index 00000000..54d0306c --- /dev/null +++ b/codex/bittorrent/manifest/manifest.nim @@ -0,0 +1,36 @@ + +import pkg/libp2p +import pkg/questionable +import pkg/questionable/results + +type + BitTorrentPiece* = MultiHash + BitTorrentInfo* = ref object + length*: uint64 + pieceLength*: uint32 + pieces*: seq[BitTorrentPiece] + name*: ?string + + BitTorrentInfoHash* = MultiHash + + BitTorrentManifest* = ref object + info*: BitTorrentInfo + codexManifestCid*: Cid + +proc newBitTorrentManifest*( + info: BitTorrentInfo, codexManifestCid: Cid +): BitTorrentManifest = + BitTorrentManifest(info: info, codexManifestCid: codexManifestCid) + +func validate*(self: BitTorrentManifest, cid: Cid): ?!bool = + # First stage of validation: + # (1) bencode the info dictionary from the torrent manifest + # (2) hash the bencoded info dictionary + # (3) compare the hash with the info hash in the cid + # + # This will prove that our info metadata is correct. + # It still does not proof that the "codexManifestCid" from the torrent manifest + # points to genuine content. This validation will be done while fetching blocks + # where we will be able to detect that the aggregated pieces do not match + # the hashes in the info dictionary from the torrent manifest. + return success true diff --git a/codex/node.nim b/codex/node.nim index b98595c7..51485663 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -47,7 +47,9 @@ import ./logutils import ./utils/asynciter import ./utils/trackedfutures +# bittorrent from ./codextypes import InfoHashV1Codec +import ./bittorrent/manifest export logutils @@ -96,130 +98,6 @@ func engine*(self: CodexNodeRef): BlockExcEngine = func discovery*(self: CodexNodeRef): Discovery = return self.discovery -type - BitTorrentPiece* = MultiHash - BitTorrentInfo* = ref object - length: uint64 - pieceLength: uint32 - pieces: seq[BitTorrentPiece] - name: ?string - - BitTorrentInfoHash* = MultiHash - - BitTorrentManifest* = ref object - info: BitTorrentInfo - codexManifestCid: Cid - -# const InfoHashV1Codec* = multiCodec("torrent-info-hash-v1") - -proc newBitTorrentManifest( - info: BitTorrentInfo, codexManifestCid: Cid -): BitTorrentManifest = - BitTorrentManifest(info: info, codexManifestCid: codexManifestCid) - -proc write*(pb: var ProtoBuffer, field: int, value: BitTorrentPiece) = - var ipb = initProtoBuffer() - ipb.write(1, value.data.buffer) - ipb.finish() - pb.write(field, ipb) - -proc write*(pb: var ProtoBuffer, field: int, value: BitTorrentInfo) = - var ipb = initProtoBuffer() - ipb.write(1, value.length) - ipb.write(2, value.pieceLength) - for piece in value.pieces: - ipb.write(3, piece) - if name =? value.name: - ipb.write(4, name) - ipb.finish() - pb.write(field, ipb) - -proc encode*(manifest: BitTorrentManifest): seq[byte] = - # ```protobuf - # Message BitTorrentManifest { - # Message Piece { - # bytes data = 1; - # } - # - # Message BitTorrentInfo { - # uint32 length = 1; - # uint32 pieceLength = 2; - # repeated Piece pieces = 3; - # optional string name = 4; - # } - # - # BitTorrentInfo info = 1; - # bytes codexManifestCid = 2; - # ``` - - var ipb = initProtoBuffer() - ipb.write(1, manifest.info) - ipb.write(2, manifest.codexManifestCid.data.buffer) - ipb.finish() - ipb.buffer - -proc decode*(_: type BitTorrentManifest, data: openArray[byte]): ?!BitTorrentManifest = - # ```protobuf - # Message BitTorrentManifest { - # Message Piece { - # bytes data = 1; - # } - # - # Message BitTorrentInfo { - # uint32 length = 1; - # uint32 pieceLength = 2; - # repeated Piece pieces = 3; - # optional string name = 4; - # } - # - # BitTorrentInfo info = 1; - # bytes codexManifestCid = 2; - # ``` - - var - pbNode = initProtoBuffer() - pbInfo: ProtoBuffer - length: uint64 - pieceLength: uint32 - pieces: seq[BitTorrentPiece] - piecesBytes: seq[seq[byte]] - name: string - cidBuf = newSeq[byte]() - codexManifestCid: Cid - - if pbNode.getField(1, pbInfo).isErr: - return failure("Unable to decode `info` from BitTorrentManifest") - - if pbInfo.getField(1, length).isErr: - return failure("Unable to decode `length` from BitTorrentInfo") - - if pbInfo.getField(2, pieceLength).isErr: - return failure("Unable to decode `pieceLength` from BitTorrentInfo") - - if ?pbInfo.getRepeatedField(3, piecesBytes).mapFailure: - for piece in piecesBytes: - var pbPiece = initProtoBuffer(piece) - var dataBuf = newSeq[byte]() - if pbPiece.getField(1, dataBuf).isErr: - return failure("Unable to decode `data` from BitTorrentPiece") - without mhash =? MultiHash.init("sha1", dataBuf).mapFailure, err: - return failure(err.msg) - pieces.add(mhash) - discard ?pbInfo.getField(4, name).mapFailure - - if ?pbNode.getField(2, cidBuf).mapFailure: - without cid =? Cid.init(cidBuf).mapFailure, err: - return failure(err.msg) - codexManifestCid = cid - - let info = BitTorrentInfo( - length: length, - pieceLength: pieceLength, - pieces: pieces, - name: if name.len > 0: name.some else: string.none, - ) - BitTorrentManifest(info: info, codexManifestCid: codexManifestCid).success - proc storeBitTorrentManifest*( self: CodexNodeRef, manifest: BitTorrentManifest, infoHash: BitTorrentInfoHash ): Future[?!bt.Block] {.async.} = @@ -282,28 +160,6 @@ proc fetchManifest*( manifest.success -func decode*(_: type BitTorrentManifest, blk: bt.Block): ?!BitTorrentManifest = - ## Decode a manifest using `decoder` - ## - - if not ?blk.cid.isTorrentInfoHash: - return failure "Cid not a torrent info hash codec" - - BitTorrentManifest.decode(blk.data) - -func validate*(self: BitTorrentManifest, cid: Cid): ?!bool = - # First stage of validation: - # (1) bencode the info dictionary from the torrent manifest - # (2) hash the bencoded info dictionary - # (3) compare the hash with the info hash in the cid - # - # This will prove that our info metadata is correct. - # It still does not proof that the "codexManifestCid" from the torrent manifest - # points to genuine content. This validation will be done while fetching blocks - # where we will be able to detect that the aggregated pieces do not match - # the hashes in the info dictionary from the torrent manifest. - return success true - proc fetchTorrentManifest*( self: CodexNodeRef, cid: Cid ): Future[?!BitTorrentManifest] {.async.} = @@ -532,6 +388,42 @@ proc retrieve*( await self.streamEntireDataset(manifest, cid) +proc streamTorrent( + self: CodexNodeRef, torrentManifest: BitTorrentManifest, codexManifest: Manifest +): Future[?!LPStream] {.async.} = + trace "Retrieving pieces from torrent" + + # Fetch torrent pieces and validate that each complete piece matches hashes + # in the torrent manifest + + # WIP... + + let stream = LPStream(StoreStream.new(self.networkStore, codexManifest, pad = false)) + + stream.success + +proc retrieveInfoHash*( + self: CodexNodeRef, infoHashString: string +): Future[?!LPStream] {.async.} = + without infoHash =? MultiHash.init("sha1", infoHashString.hexToSeqByte).mapFailure, + err: + return failure(err) + + without infoHashCid =? Cid.init(CIDv1, InfoHashV1Codec, infoHash).mapFailure, error: + trace "Unable to create CID for BitTorrent info hash" + return failure(error) + + without torrentManifest =? (await self.fetchTorrentManifest(infoHashCid)), err: + trace "Unable to fetch Torrent Manifest" + return failure(err) + + without codexManifest =? (await self.fetchManifest(torrentManifest.codexManifestCid)), + err: + trace "Unable to fetch Codex Manifest for torrent info hash" + return failure(err) + + await self.streamTorrent(torrentManifest, codexManifest) + proc deleteSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.} = if err =? (await self.networkStore.delBlock(cid)).errorOption: error "Error deleting block", cid, err = err.msg @@ -698,42 +590,6 @@ proc storeBitTorrent*( success manifestBlk.cid -proc streamTorrent( - self: CodexNodeRef, torrentManifest: BitTorrentManifest, codexManifest: Manifest -): Future[?!LPStream] {.async.} = - trace "Retrieving pieces from torrent" - - # Fetch torrent pieces and validate that each complete piece matches hashes - # in the torrent manifest - - # WIP... - - let stream = LPStream(StoreStream.new(self.networkStore, codexManifest, pad = false)) - - stream.success - -proc retrieveInfoHash*( - self: CodexNodeRef, infoHashString: string -): Future[?!LPStream] {.async.} = - without infoHash =? MultiHash.init("sha1", infoHashString.hexToSeqByte).mapFailure, - err: - return failure(err) - - without infoHashCid =? Cid.init(CIDv1, InfoHashV1Codec, infoHash).mapFailure, error: - trace "Unable to create CID for BitTorrent info hash" - return failure(error) - - without torrentManifest =? (await self.fetchTorrentManifest(infoHashCid)), err: - trace "Unable to fetch Torrent Manifest" - return failure(err) - - without codexManifest =? (await self.fetchManifest(torrentManifest.codexManifestCid)), - err: - trace "Unable to fetch Codex Manifest for torrent info hash" - return failure(err) - - await self.streamTorrent(torrentManifest, codexManifest) - proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} = without cidsIter =? await self.networkStore.listBlocks(BlockType.Manifest): warn "Failed to listBlocks"