mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-02-27 00:43:34 +00:00
general idea of BitTorrent integration
This commit is contained in:
parent
d85b146602
commit
494765ba74
258
codex/node.nim
258
codex/node.nim
@ -47,6 +47,8 @@ import ./logutils
|
||||
import ./utils/asynciter
|
||||
import ./utils/trackedfutures
|
||||
|
||||
from ./codextypes import InfoHashV1Codec
|
||||
|
||||
export logutils
|
||||
|
||||
logScope:
|
||||
@ -94,6 +96,150 @@ func engine*(self: CodexNodeRef): BlockExcEngine =
|
||||
func discovery*(self: CodexNodeRef): Discovery =
|
||||
return self.discovery
|
||||
|
||||
type
|
||||
BitTorrentPiece* = MultiHash
|
||||
BitTorrentInfo* = ref object
|
||||
length: uint64
|
||||
pieceLength: uint32
|
||||
pieces: seq[BitTorrentPiece]
|
||||
name: ?string
|
||||
|
||||
BitTorrentInfoHash* = MultiHash
|
||||
|
||||
BitTorrentManifest* = ref object
|
||||
info: BitTorrentInfo
|
||||
codexManifestCid: Cid
|
||||
|
||||
# const InfoHashV1Codec* = multiCodec("torrent-info-hash-v1")
|
||||
|
||||
proc newBitTorrentManifest(
|
||||
info: BitTorrentInfo, codexManifestCid: Cid
|
||||
): BitTorrentManifest =
|
||||
BitTorrentManifest(info: info, codexManifestCid: codexManifestCid)
|
||||
|
||||
proc write*(pb: var ProtoBuffer, field: int, value: BitTorrentPiece) =
|
||||
var ipb = initProtoBuffer()
|
||||
ipb.write(1, value.data.buffer)
|
||||
ipb.finish()
|
||||
pb.write(field, ipb)
|
||||
|
||||
proc write*(pb: var ProtoBuffer, field: int, value: BitTorrentInfo) =
|
||||
var ipb = initProtoBuffer()
|
||||
ipb.write(1, value.length)
|
||||
ipb.write(2, value.pieceLength)
|
||||
for piece in value.pieces:
|
||||
ipb.write(3, piece)
|
||||
if name =? value.name:
|
||||
ipb.write(4, name)
|
||||
ipb.finish()
|
||||
pb.write(field, ipb)
|
||||
|
||||
proc encode*(manifest: BitTorrentManifest): seq[byte] =
|
||||
# ```protobuf
|
||||
# Message BitTorrentManifest {
|
||||
# Message Piece {
|
||||
# bytes data = 1;
|
||||
# }
|
||||
#
|
||||
# Message BitTorrentInfo {
|
||||
# uint32 length = 1;
|
||||
# uint32 pieceLength = 2;
|
||||
# repeated Piece pieces = 3;
|
||||
# optional string name = 4;
|
||||
# }
|
||||
#
|
||||
# BitTorrentInfo info = 1;
|
||||
# bytes codexManifestCid = 2;
|
||||
# ```
|
||||
|
||||
var ipb = initProtoBuffer()
|
||||
ipb.write(1, manifest.info)
|
||||
ipb.write(2, manifest.codexManifestCid.data.buffer)
|
||||
ipb.finish()
|
||||
ipb.buffer
|
||||
|
||||
proc decode*(_: type BitTorrentManifest, data: openArray[byte]): ?!BitTorrentManifest =
|
||||
# ```protobuf
|
||||
# Message BitTorrentManifest {
|
||||
# Message Piece {
|
||||
# bytes data = 1;
|
||||
# }
|
||||
#
|
||||
# Message BitTorrentInfo {
|
||||
# uint32 length = 1;
|
||||
# uint32 pieceLength = 2;
|
||||
# repeated Piece pieces = 3;
|
||||
# optional string name = 4;
|
||||
# }
|
||||
#
|
||||
# BitTorrentInfo info = 1;
|
||||
# bytes codexManifestCid = 2;
|
||||
# ```
|
||||
|
||||
var
|
||||
pbNode = initProtoBuffer()
|
||||
pbInfo: ProtoBuffer
|
||||
length: uint64
|
||||
pieceLength: uint32
|
||||
pieces: seq[BitTorrentPiece]
|
||||
piecesBytes: seq[seq[byte]]
|
||||
name: string
|
||||
cidBuf = newSeq[byte]()
|
||||
codexManifestCid: Cid
|
||||
|
||||
if pbNode.getField(1, pbInfo).isErr:
|
||||
return failure("Unable to decode `info` from BitTorrentManifest")
|
||||
|
||||
if pbInfo.getField(1, length).isErr:
|
||||
return failure("Unable to decode `length` from BitTorrentInfo")
|
||||
|
||||
if pbInfo.getField(2, pieceLength).isErr:
|
||||
return failure("Unable to decode `pieceLength` from BitTorrentInfo")
|
||||
|
||||
if ?pbInfo.getRepeatedField(3, piecesBytes).mapFailure:
|
||||
for piece in piecesBytes:
|
||||
var pbPiece = initProtoBuffer(piece)
|
||||
var dataBuf = newSeq[byte]()
|
||||
if pbPiece.getField(1, dataBuf).isErr:
|
||||
return failure("Unable to decode `data` from BitTorrentPiece")
|
||||
without mhash =? MultiHash.init("sha1", dataBuf).mapFailure, err:
|
||||
return failure(err.msg)
|
||||
pieces.add(mhash)
|
||||
discard ?pbInfo.getField(4, name).mapFailure
|
||||
|
||||
if ?pbNode.getField(2, cidBuf).mapFailure:
|
||||
without cid =? Cid.init(cidBuf).mapFailure, err:
|
||||
return failure(err.msg)
|
||||
codexManifestCid = cid
|
||||
|
||||
let info = BitTorrentInfo(
|
||||
length: length,
|
||||
pieceLength: pieceLength,
|
||||
pieces: pieces,
|
||||
name: if name.len > 0: name.some else: string.none,
|
||||
)
|
||||
BitTorrentManifest(info: info, codexManifestCid: codexManifestCid).success
|
||||
|
||||
proc storeBitTorrentManifest*(
|
||||
self: CodexNodeRef, manifest: BitTorrentManifest, infoHash: BitTorrentInfoHash
|
||||
): Future[?!bt.Block] {.async.} =
|
||||
let encodedManifest = manifest.encode()
|
||||
|
||||
without infoHashCid =? Cid.init(CIDv1, InfoHashV1Codec, infoHash).mapFailure, error:
|
||||
trace "Unable to create CID for BitTorrent info hash"
|
||||
return failure(error)
|
||||
|
||||
without blk =? bt.Block.new(data = encodedManifest, cid = infoHashCid, verify = false),
|
||||
error:
|
||||
trace "Unable to create block from manifest"
|
||||
return failure(error)
|
||||
|
||||
if err =? (await self.networkStore.putBlock(blk)).errorOption:
|
||||
trace "Unable to store BitTorrent manifest block", cid = blk.cid, err = err.msg
|
||||
return failure(err)
|
||||
|
||||
success blk
|
||||
|
||||
proc storeManifest*(
|
||||
self: CodexNodeRef, manifest: Manifest
|
||||
): Future[?!bt.Block] {.async.} =
|
||||
@ -134,7 +280,55 @@ proc fetchManifest*(
|
||||
|
||||
trace "Decoded manifest", cid
|
||||
|
||||
return manifest.success
|
||||
manifest.success
|
||||
|
||||
func decode*(_: type BitTorrentManifest, blk: bt.Block): ?!BitTorrentManifest =
|
||||
## Decode a manifest using `decoder`
|
||||
##
|
||||
|
||||
if not ?blk.cid.isTorrentInfoHash:
|
||||
return failure "Cid not a torrent info hash codec"
|
||||
|
||||
BitTorrentManifest.decode(blk.data)
|
||||
|
||||
func validate*(self: BitTorrentManifest, cid: Cid): ?!bool =
|
||||
# First stage of validation:
|
||||
# (1) bencode the info dictionary from the torrent manifest
|
||||
# (2) hash the bencoded info dictionary
|
||||
# (3) compare the hash with the info hash in the cid
|
||||
#
|
||||
# This will prove that our info metadata is correct.
|
||||
# It still does not proof that the "codexManifestCid" from the torrent manifest
|
||||
# points to genuine content. This validation will be done while fetching blocks
|
||||
# where we will be able to detect that the aggregated pieces do not match
|
||||
# the hashes in the info dictionary from the torrent manifest.
|
||||
return success true
|
||||
|
||||
proc fetchTorrentManifest*(
|
||||
self: CodexNodeRef, cid: Cid
|
||||
): Future[?!BitTorrentManifest] {.async.} =
|
||||
if err =? cid.isTorrentInfoHash.errorOption:
|
||||
return failure "CID has invalid content type for torrent info hash {$cid}"
|
||||
|
||||
trace "Retrieving torrent manifest for cid", cid
|
||||
|
||||
without blk =? await self.networkStore.getBlock(BlockAddress.init(cid)), err:
|
||||
trace "Error retrieve manifest block", cid, err = err.msg
|
||||
return failure err
|
||||
|
||||
trace "Decoding torrent manifest for cid", cid
|
||||
|
||||
without torrentManifest =? BitTorrentManifest.decode(blk), err:
|
||||
trace "Unable to decode torrent manifest", err = err.msg
|
||||
return failure("Unable to decode torrent manifest")
|
||||
|
||||
trace "Decoded torrent manifest", cid
|
||||
|
||||
if err =? torrentManifest.validate(cid).errorOption:
|
||||
trace "Torrent manifest does not match torrent info hash", cid
|
||||
return failure "Torrent manifest does not match torrent info hash {$cid}"
|
||||
|
||||
return torrentManifest.success
|
||||
|
||||
proc findPeer*(self: CodexNodeRef, peerId: PeerId): Future[?PeerRecord] {.async.} =
|
||||
## Find peer using the discovery service from the given CodexNode
|
||||
@ -478,6 +672,68 @@ proc store*(
|
||||
|
||||
return manifestBlk.cid.success
|
||||
|
||||
proc storeBitTorrent*(
|
||||
self: CodexNodeRef,
|
||||
stream: LPStream,
|
||||
info: BitTorrentInfo,
|
||||
infoHash: BitTorrentInfoHash,
|
||||
mimetype: ?string = string.none,
|
||||
): Future[?!Cid] {.async.} =
|
||||
info "Storing BitTorrent data"
|
||||
|
||||
without codexManifestCid =?
|
||||
await self.store(stream, filename = info.name, mimetype = mimetype):
|
||||
return failure("Unable to store BitTorrent data")
|
||||
|
||||
let bitTorrentManifest = newBitTorrentManifest(info, codexManifestCid)
|
||||
|
||||
without manifestBlk =? await self.storeBitTorrentManifest(
|
||||
bitTorrentManifest, infoHash
|
||||
), err:
|
||||
error "Unable to store manifest"
|
||||
return failure(err)
|
||||
|
||||
info "Stored BitTorrent data",
|
||||
manifestCid = manifestBlk.cid, codeManifestCid = codexManifestCid
|
||||
|
||||
success manifestBlk.cid
|
||||
|
||||
proc streamTorrent(
|
||||
self: CodexNodeRef, torrentManifest: BitTorrentManifest, codexManifest: Manifest
|
||||
): Future[?!LPStream] {.async.} =
|
||||
trace "Retrieving pieces from torrent"
|
||||
|
||||
# Fetch torrent pieces and validate that each complete piece matches hashes
|
||||
# in the torrent manifest
|
||||
|
||||
# WIP...
|
||||
|
||||
let stream = LPStream(StoreStream.new(self.networkStore, codexManifest, pad = false))
|
||||
|
||||
stream.success
|
||||
|
||||
proc retrieveInfoHash*(
|
||||
self: CodexNodeRef, infoHashString: string
|
||||
): Future[?!LPStream] {.async.} =
|
||||
without infoHash =? MultiHash.init("sha1", infoHashString.hexToSeqByte).mapFailure,
|
||||
err:
|
||||
return failure(err)
|
||||
|
||||
without infoHashCid =? Cid.init(CIDv1, InfoHashV1Codec, infoHash).mapFailure, error:
|
||||
trace "Unable to create CID for BitTorrent info hash"
|
||||
return failure(error)
|
||||
|
||||
without torrentManifest =? (await self.fetchTorrentManifest(infoHashCid)), err:
|
||||
trace "Unable to fetch Torrent Manifest"
|
||||
return failure(err)
|
||||
|
||||
without codexManifest =? (await self.fetchManifest(torrentManifest.codexManifestCid)),
|
||||
err:
|
||||
trace "Unable to fetch Codex Manifest for torrent info hash"
|
||||
return failure(err)
|
||||
|
||||
await self.streamTorrent(torrentManifest, codexManifest)
|
||||
|
||||
proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} =
|
||||
without cidsIter =? await self.networkStore.listBlocks(BlockType.Manifest):
|
||||
warn "Failed to listBlocks"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user