moves BitTorrent stuff around a bit to de-clutter node.nim

This commit is contained in:
Marcin Czenko 2025-03-01 18:17:19 +01:00
parent 4981d5b943
commit cd8d2b130a
No known key found for this signature in database
GPG Key ID: 33DEA0C8E30937C0
5 changed files with 204 additions and 182 deletions

View File

@ -0,0 +1,5 @@
import ./manifest/manifest
import ./manifest/encoding
import ./manifest/decoding
export manifest, encoding, decoding

View File

@ -0,0 +1,78 @@
import pkg/libp2p/cid
import pkg/libp2p/protobuf/minprotobuf
import pkg/questionable/results
import ../../blocktype
import ./manifest
proc decode(_: type BitTorrentManifest, data: openArray[byte]): ?!BitTorrentManifest =
# ```protobuf
# Message BitTorrentManifest {
# Message Piece {
# bytes data = 1;
# }
#
# Message BitTorrentInfo {
# uint32 length = 1;
# uint32 pieceLength = 2;
# repeated Piece pieces = 3;
# optional string name = 4;
# }
#
# BitTorrentInfo info = 1;
# bytes codexManifestCid = 2;
# ```
var
pbNode = initProtoBuffer()
pbInfo: ProtoBuffer
length: uint64
pieceLength: uint32
pieces: seq[BitTorrentPiece]
piecesBytes: seq[seq[byte]]
name: string
cidBuf = newSeq[byte]()
codexManifestCid: Cid
if pbNode.getField(1, pbInfo).isErr:
return failure("Unable to decode `info` from BitTorrentManifest")
if pbInfo.getField(1, length).isErr:
return failure("Unable to decode `length` from BitTorrentInfo")
if pbInfo.getField(2, pieceLength).isErr:
return failure("Unable to decode `pieceLength` from BitTorrentInfo")
if ?pbInfo.getRepeatedField(3, piecesBytes).mapFailure:
for piece in piecesBytes:
var pbPiece = initProtoBuffer(piece)
var dataBuf = newSeq[byte]()
if pbPiece.getField(1, dataBuf).isErr:
return failure("Unable to decode `data` from BitTorrentPiece")
without mhash =? MultiHash.init("sha1", dataBuf).mapFailure, err:
return failure(err.msg)
pieces.add(mhash)
discard ?pbInfo.getField(4, name).mapFailure
if ?pbNode.getField(2, cidBuf).mapFailure:
without cid =? Cid.init(cidBuf).mapFailure, err:
return failure(err.msg)
codexManifestCid = cid
let info = BitTorrentInfo(
length: length,
pieceLength: pieceLength,
pieces: pieces,
name: if name.len > 0: name.some else: string.none,
)
BitTorrentManifest(info: info, codexManifestCid: codexManifestCid).success
func decode*(_: type BitTorrentManifest, blk: Block): ?!BitTorrentManifest =
## Decode a manifest using `decoder`
##
if not ?blk.cid.isTorrentInfoHash:
return failure "Cid not a torrent info hash codec"
BitTorrentManifest.decode(blk.data)

View File

@ -0,0 +1,47 @@
import pkg/libp2p/cid
import pkg/libp2p/protobuf/minprotobuf
import pkg/questionable/results
import ./manifest
proc write(pb: var ProtoBuffer, field: int, value: BitTorrentPiece) =
var ipb = initProtoBuffer()
ipb.write(1, value.data.buffer)
ipb.finish()
pb.write(field, ipb)
proc write(pb: var ProtoBuffer, field: int, value: BitTorrentInfo) =
var ipb = initProtoBuffer()
ipb.write(1, value.length)
ipb.write(2, value.pieceLength)
for piece in value.pieces:
ipb.write(3, piece)
if name =? value.name:
ipb.write(4, name)
ipb.finish()
pb.write(field, ipb)
proc encode*(manifest: BitTorrentManifest): seq[byte] =
# ```protobuf
# Message BitTorrentManifest {
# Message Piece {
# bytes data = 1;
# }
#
# Message BitTorrentInfo {
# uint32 length = 1;
# uint32 pieceLength = 2;
# repeated Piece pieces = 3;
# optional string name = 4;
# }
#
# BitTorrentInfo info = 1;
# bytes codexManifestCid = 2;
# ```
var ipb = initProtoBuffer()
ipb.write(1, manifest.info)
ipb.write(2, manifest.codexManifestCid.data.buffer)
ipb.finish()
ipb.buffer

View File

@ -0,0 +1,36 @@
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
type
BitTorrentPiece* = MultiHash
BitTorrentInfo* = ref object
length*: uint64
pieceLength*: uint32
pieces*: seq[BitTorrentPiece]
name*: ?string
BitTorrentInfoHash* = MultiHash
BitTorrentManifest* = ref object
info*: BitTorrentInfo
codexManifestCid*: Cid
proc newBitTorrentManifest*(
info: BitTorrentInfo, codexManifestCid: Cid
): BitTorrentManifest =
BitTorrentManifest(info: info, codexManifestCid: codexManifestCid)
func validate*(self: BitTorrentManifest, cid: Cid): ?!bool =
# First stage of validation:
# (1) bencode the info dictionary from the torrent manifest
# (2) hash the bencoded info dictionary
# (3) compare the hash with the info hash in the cid
#
# This will prove that our info metadata is correct.
# It still does not proof that the "codexManifestCid" from the torrent manifest
# points to genuine content. This validation will be done while fetching blocks
# where we will be able to detect that the aggregated pieces do not match
# the hashes in the info dictionary from the torrent manifest.
return success true

View File

@ -47,7 +47,9 @@ import ./logutils
import ./utils/asynciter
import ./utils/trackedfutures
# bittorrent
from ./codextypes import InfoHashV1Codec
import ./bittorrent/manifest
export logutils
@ -96,130 +98,6 @@ func engine*(self: CodexNodeRef): BlockExcEngine =
func discovery*(self: CodexNodeRef): Discovery =
return self.discovery
type
BitTorrentPiece* = MultiHash
BitTorrentInfo* = ref object
length: uint64
pieceLength: uint32
pieces: seq[BitTorrentPiece]
name: ?string
BitTorrentInfoHash* = MultiHash
BitTorrentManifest* = ref object
info: BitTorrentInfo
codexManifestCid: Cid
# const InfoHashV1Codec* = multiCodec("torrent-info-hash-v1")
proc newBitTorrentManifest(
info: BitTorrentInfo, codexManifestCid: Cid
): BitTorrentManifest =
BitTorrentManifest(info: info, codexManifestCid: codexManifestCid)
proc write*(pb: var ProtoBuffer, field: int, value: BitTorrentPiece) =
var ipb = initProtoBuffer()
ipb.write(1, value.data.buffer)
ipb.finish()
pb.write(field, ipb)
proc write*(pb: var ProtoBuffer, field: int, value: BitTorrentInfo) =
var ipb = initProtoBuffer()
ipb.write(1, value.length)
ipb.write(2, value.pieceLength)
for piece in value.pieces:
ipb.write(3, piece)
if name =? value.name:
ipb.write(4, name)
ipb.finish()
pb.write(field, ipb)
proc encode*(manifest: BitTorrentManifest): seq[byte] =
# ```protobuf
# Message BitTorrentManifest {
# Message Piece {
# bytes data = 1;
# }
#
# Message BitTorrentInfo {
# uint32 length = 1;
# uint32 pieceLength = 2;
# repeated Piece pieces = 3;
# optional string name = 4;
# }
#
# BitTorrentInfo info = 1;
# bytes codexManifestCid = 2;
# ```
var ipb = initProtoBuffer()
ipb.write(1, manifest.info)
ipb.write(2, manifest.codexManifestCid.data.buffer)
ipb.finish()
ipb.buffer
proc decode*(_: type BitTorrentManifest, data: openArray[byte]): ?!BitTorrentManifest =
# ```protobuf
# Message BitTorrentManifest {
# Message Piece {
# bytes data = 1;
# }
#
# Message BitTorrentInfo {
# uint32 length = 1;
# uint32 pieceLength = 2;
# repeated Piece pieces = 3;
# optional string name = 4;
# }
#
# BitTorrentInfo info = 1;
# bytes codexManifestCid = 2;
# ```
var
pbNode = initProtoBuffer()
pbInfo: ProtoBuffer
length: uint64
pieceLength: uint32
pieces: seq[BitTorrentPiece]
piecesBytes: seq[seq[byte]]
name: string
cidBuf = newSeq[byte]()
codexManifestCid: Cid
if pbNode.getField(1, pbInfo).isErr:
return failure("Unable to decode `info` from BitTorrentManifest")
if pbInfo.getField(1, length).isErr:
return failure("Unable to decode `length` from BitTorrentInfo")
if pbInfo.getField(2, pieceLength).isErr:
return failure("Unable to decode `pieceLength` from BitTorrentInfo")
if ?pbInfo.getRepeatedField(3, piecesBytes).mapFailure:
for piece in piecesBytes:
var pbPiece = initProtoBuffer(piece)
var dataBuf = newSeq[byte]()
if pbPiece.getField(1, dataBuf).isErr:
return failure("Unable to decode `data` from BitTorrentPiece")
without mhash =? MultiHash.init("sha1", dataBuf).mapFailure, err:
return failure(err.msg)
pieces.add(mhash)
discard ?pbInfo.getField(4, name).mapFailure
if ?pbNode.getField(2, cidBuf).mapFailure:
without cid =? Cid.init(cidBuf).mapFailure, err:
return failure(err.msg)
codexManifestCid = cid
let info = BitTorrentInfo(
length: length,
pieceLength: pieceLength,
pieces: pieces,
name: if name.len > 0: name.some else: string.none,
)
BitTorrentManifest(info: info, codexManifestCid: codexManifestCid).success
proc storeBitTorrentManifest*(
self: CodexNodeRef, manifest: BitTorrentManifest, infoHash: BitTorrentInfoHash
): Future[?!bt.Block] {.async.} =
@ -282,28 +160,6 @@ proc fetchManifest*(
manifest.success
func decode*(_: type BitTorrentManifest, blk: bt.Block): ?!BitTorrentManifest =
## Decode a manifest using `decoder`
##
if not ?blk.cid.isTorrentInfoHash:
return failure "Cid not a torrent info hash codec"
BitTorrentManifest.decode(blk.data)
func validate*(self: BitTorrentManifest, cid: Cid): ?!bool =
# First stage of validation:
# (1) bencode the info dictionary from the torrent manifest
# (2) hash the bencoded info dictionary
# (3) compare the hash with the info hash in the cid
#
# This will prove that our info metadata is correct.
# It still does not proof that the "codexManifestCid" from the torrent manifest
# points to genuine content. This validation will be done while fetching blocks
# where we will be able to detect that the aggregated pieces do not match
# the hashes in the info dictionary from the torrent manifest.
return success true
proc fetchTorrentManifest*(
self: CodexNodeRef, cid: Cid
): Future[?!BitTorrentManifest] {.async.} =
@ -532,6 +388,42 @@ proc retrieve*(
await self.streamEntireDataset(manifest, cid)
proc streamTorrent(
self: CodexNodeRef, torrentManifest: BitTorrentManifest, codexManifest: Manifest
): Future[?!LPStream] {.async.} =
trace "Retrieving pieces from torrent"
# Fetch torrent pieces and validate that each complete piece matches hashes
# in the torrent manifest
# WIP...
let stream = LPStream(StoreStream.new(self.networkStore, codexManifest, pad = false))
stream.success
proc retrieveInfoHash*(
self: CodexNodeRef, infoHashString: string
): Future[?!LPStream] {.async.} =
without infoHash =? MultiHash.init("sha1", infoHashString.hexToSeqByte).mapFailure,
err:
return failure(err)
without infoHashCid =? Cid.init(CIDv1, InfoHashV1Codec, infoHash).mapFailure, error:
trace "Unable to create CID for BitTorrent info hash"
return failure(error)
without torrentManifest =? (await self.fetchTorrentManifest(infoHashCid)), err:
trace "Unable to fetch Torrent Manifest"
return failure(err)
without codexManifest =? (await self.fetchManifest(torrentManifest.codexManifestCid)),
err:
trace "Unable to fetch Codex Manifest for torrent info hash"
return failure(err)
await self.streamTorrent(torrentManifest, codexManifest)
proc deleteSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.} =
if err =? (await self.networkStore.delBlock(cid)).errorOption:
error "Error deleting block", cid, err = err.msg
@ -698,42 +590,6 @@ proc storeBitTorrent*(
success manifestBlk.cid
proc streamTorrent(
self: CodexNodeRef, torrentManifest: BitTorrentManifest, codexManifest: Manifest
): Future[?!LPStream] {.async.} =
trace "Retrieving pieces from torrent"
# Fetch torrent pieces and validate that each complete piece matches hashes
# in the torrent manifest
# WIP...
let stream = LPStream(StoreStream.new(self.networkStore, codexManifest, pad = false))
stream.success
proc retrieveInfoHash*(
self: CodexNodeRef, infoHashString: string
): Future[?!LPStream] {.async.} =
without infoHash =? MultiHash.init("sha1", infoHashString.hexToSeqByte).mapFailure,
err:
return failure(err)
without infoHashCid =? Cid.init(CIDv1, InfoHashV1Codec, infoHash).mapFailure, error:
trace "Unable to create CID for BitTorrent info hash"
return failure(error)
without torrentManifest =? (await self.fetchTorrentManifest(infoHashCid)), err:
trace "Unable to fetch Torrent Manifest"
return failure(err)
without codexManifest =? (await self.fetchManifest(torrentManifest.codexManifestCid)),
err:
trace "Unable to fetch Codex Manifest for torrent info hash"
return failure(err)
await self.streamTorrent(torrentManifest, codexManifest)
proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} =
without cidsIter =? await self.networkStore.listBlocks(BlockType.Manifest):
warn "Failed to listBlocks"