makes torrent API ready for torrents v2, closes torrent streaming loop

This commit is contained in:
Marcin Czenko 2025-03-05 03:22:56 +01:00
parent 6c5816581a
commit 2ab59f616d
No known key found for this signature in database
GPG Key ID: 33DEA0C8E30937C0
5 changed files with 144 additions and 18 deletions

View File

@ -1,7 +1,10 @@
import pkg/libp2p import pkg/libp2p
import pkg/stew/byteutils
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import ../../merkletree/codex/codex
import ../../errors import ../../errors
import ../../codextypes import ../../codextypes
import ../bencoding import ../bencoding
@ -14,8 +17,6 @@ type
pieces*: seq[BitTorrentPiece] pieces*: seq[BitTorrentPiece]
name*: ?string name*: ?string
BitTorrentInfoHash* = MultiHash
BitTorrentManifest* = ref object BitTorrentManifest* = ref object
info*: BitTorrentInfo info*: BitTorrentInfo
codexManifestCid*: Cid codexManifestCid*: Cid
@ -55,3 +56,22 @@ func validate*(self: BitTorrentManifest, cid: Cid): ?!bool =
without cidInfoHash =? cid.mhash.mapFailure, err: without cidInfoHash =? cid.mhash.mapFailure, err:
return failure(err.msg) return failure(err.msg)
return success(infoHash == cidInfoHash) return success(infoHash == cidInfoHash)
func buildMultiHash*(_: type BitTorrentInfo, input: string): ?!MultiHash =
without bytes =? input.hexToSeqByte.catch, err:
return failure err.msg
without hash =? MultiHash.init(bytes):
without mhashMetaSha1 =? Sha1HashCodec.mhash, err:
return failure err.msg
if bytes.len == mhashMetaSha1.size:
without hash =? MultiHash.init($Sha1HashCodec, bytes).mapFailure, err:
return failure err.msg
return success hash
without mhashMetaSha256 =? Sha256HashCodec.mhash, err:
return failure err.msg
if bytes.len == mhashMetaSha256.size:
without hash =? MultiHash.init($Sha256HashCodec, bytes).mapFailure, err:
return failure err.msg
return success hash
return failure "given bytes is not a correct multihash"
return success hash

View File

@ -101,7 +101,7 @@ func discovery*(self: CodexNodeRef): Discovery =
return self.discovery return self.discovery
proc storeBitTorrentManifest*( proc storeBitTorrentManifest*(
self: CodexNodeRef, manifest: BitTorrentManifest, infoHash: BitTorrentInfoHash self: CodexNodeRef, manifest: BitTorrentManifest, infoHash: MultiHash
): Future[?!bt.Block] {.async.} = ): Future[?!bt.Block] {.async.} =
let encodedManifest = manifest.encode() let encodedManifest = manifest.encode()
@ -490,13 +490,9 @@ proc streamTorrent(
trace "Creating store stream for torrent manifest" trace "Creating store stream for torrent manifest"
stream.success stream.success
proc retrieveInfoHash*( proc retrieveTorrent*(
self: CodexNodeRef, infoHashString: string self: CodexNodeRef, infoHash: MultiHash
): Future[?!LPStream] {.async.} = ): Future[?!LPStream] {.async.} =
without infoHash =? MultiHash.init("sha1", infoHashString.hexToSeqByte).mapFailure,
err:
return failure(err)
without infoHashCid =? Cid.init(CIDv1, InfoHashV1Codec, infoHash).mapFailure, error: without infoHashCid =? Cid.init(CIDv1, InfoHashV1Codec, infoHash).mapFailure, error:
trace "Unable to create CID for BitTorrent info hash" trace "Unable to create CID for BitTorrent info hash"
return failure(error) return failure(error)
@ -652,11 +648,11 @@ proc store*(
return manifestBlk.cid.success return manifestBlk.cid.success
proc storeBitTorrent*( proc storeTorrent*(
self: CodexNodeRef, self: CodexNodeRef,
stream: LPStream, stream: LPStream,
info: BitTorrentInfo, info: BitTorrentInfo,
infoHash: BitTorrentInfoHash, infoHash: MultiHash,
mimetype: ?string = string.none, mimetype: ?string = string.none,
): Future[?!Cid] {.async.} = ): Future[?!Cid] {.async.} =
info "Storing BitTorrent data" info "Storing BitTorrent data"

View File

@ -152,6 +152,80 @@ proc retrieveCid(
if not lpStream.isNil: if not lpStream.isNil:
await lpStream.close() await lpStream.close()
proc retrieveInfoHash(
node: CodexNodeRef, infoHash: MultiHash, resp: HttpResponseRef
): Future[RestApiResponse] {.async.} =
## Download torrent from the node in a streaming
## manner
##
var stream: LPStream
var bytes = 0
try:
without stream =? (await node.retrieveTorrent(infoHash)), error:
if error of BlockNotFoundError:
resp.status = Http404
return await resp.sendBody("")
else:
resp.status = Http500
return await resp.sendBody(error.msg)
# It is ok to fetch again the manifest because it will hit the cache
without infoHashCid =? Cid.init(CIDv1, InfoHashV1Codec, infoHash).mapFailure, err:
error "Unable to create CID for BitTorrent info hash", err = err.msg
resp.status = Http404
return await resp.sendBody(err.msg)
without torrentManifest =? (await node.fetchTorrentManifest(infoHashCid)), err:
error "Unable to fetch Torrent Manifest", err = err.msg
resp.status = Http404
return await resp.sendBody(err.msg)
without codexManifest =? (
await node.fetchManifest(torrentManifest.codexManifestCid)
), err:
error "Unable to fetch Codex Manifest for torrent info hash", err = err.msg
resp.status = Http404
return await resp.sendBody(err.msg)
if codexManifest.mimetype.isSome:
resp.setHeader("Content-Type", codexManifest.mimetype.get())
else:
resp.addHeader("Content-Type", "application/octet-stream")
if codexManifest.filename.isSome:
resp.setHeader(
"Content-Disposition",
"attachment; filename=\"" & codexManifest.filename.get() & "\"",
)
else:
resp.setHeader("Content-Disposition", "attachment")
await resp.prepareChunked()
while not stream.atEof:
var
buff = newSeqUninitialized[byte](int(NBytes 1024 * 16))
len = await stream.readOnce(addr buff[0], buff.len)
buff.setLen(len)
if buff.len <= 0:
break
bytes += buff.len
await resp.sendChunk(addr buff[0], buff.len)
await resp.finish()
codex_api_downloads.inc()
except CatchableError as exc:
warn "Error streaming blocks", exc = exc.msg
resp.status = Http500
return await resp.sendBody("")
finally:
info "Sent bytes for torrent", infoHash = $infoHash, bytes
if not stream.isNil:
await stream.close()
proc buildCorsHeaders( proc buildCorsHeaders(
httpMethod: string, allowedOrigin: Option[string] httpMethod: string, allowedOrigin: Option[string]
): seq[(string, string)] = ): seq[(string, string)] =
@ -356,13 +430,20 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
without infoHash =? infoHash.mapFailure, error: without infoHash =? infoHash.mapFailure, error:
return RestApiResponse.error(Http400, error.msg, headers = headers) return RestApiResponse.error(Http400, error.msg, headers = headers)
if infoHash.mcodec != Sha1HashCodec:
return RestApiResponse.error(
Http400, "Only torrents version 1 are currently supported!", headers = headers
)
if corsOrigin =? allowedOrigin: if corsOrigin =? allowedOrigin:
resp.setCorsHeaders("GET", corsOrigin) resp.setCorsHeaders("GET", corsOrigin)
resp.setHeader("Access-Control-Headers", "X-Requested-With") resp.setHeader("Access-Control-Headers", "X-Requested-With")
trace "torrent requested: ", multihash = $infoHash trace "torrent requested: ", multihash = $infoHash
return RestApiResponse.response(Http200) await node.retrieveInfoHash(infoHash, resp = resp)
# return RestApiResponse.response(Http200)
router.api(MethodGet, "/api/codex/v1/data/{cid}/network/manifest") do( router.api(MethodGet, "/api/codex/v1/data/{cid}/network/manifest") do(
cid: Cid, resp: HttpResponseRef cid: Cid, resp: HttpResponseRef

View File

@ -22,6 +22,7 @@ import ../purchasing
import ../utils/stintutils import ../utils/stintutils
from ../codextypes import Sha1HashCodec from ../codextypes import Sha1HashCodec
import ../bittorrent/manifest
proc encodeString*(cid: type Cid): Result[string, cstring] = proc encodeString*(cid: type Cid): Result[string, cstring] =
ok($cid) ok($cid)
@ -85,11 +86,14 @@ proc decodeString*(
err e.msg.cstring err e.msg.cstring
proc decodeString*(_: type MultiHash, value: string): Result[MultiHash, cstring] = proc decodeString*(_: type MultiHash, value: string): Result[MultiHash, cstring] =
try: without mhash =? BitTorrentInfo.buildMultiHash(value), e:
let bytes = value.hexToSeqByte return err e.msg.cstring
MultiHash.init($Sha1HashCodec, bytes) ok mhash
except ValueError as e: # try:
err e.msg.cstring # let bytes = value.hexToSeqByte
# MultiHash.init($Sha1HashCodec, bytes)
# except ValueError as e:
# err e.msg.cstring
proc decodeString*[T: PurchaseId | RequestId | Nonce | SlotId | AvailabilityId]( proc decodeString*[T: PurchaseId | RequestId | Nonce | SlotId | AvailabilityId](
_: type T, value: string _: type T, value: string

View File

@ -1,11 +1,13 @@
import std/unittest import std/unittest
import std/strformat
import pkg/libp2p/[cid, multicodec, multihash] import pkg/libp2p/[cid, multicodec, multihash]
import pkg/stew/byteutils import pkg/stew/byteutils
import pkg/questionable import pkg/questionable
import ../../examples import ../../examples
import ../../../codex/bittorrent/manifest
import pkg/codex/bittorrent/manifest
suite "BitTorrent manifest": suite "BitTorrent manifest":
# In the tests below, we use an example info dictionary # In the tests below, we use an example info dictionary
@ -49,3 +51,26 @@ suite "BitTorrent manifest":
) )
check bitTorrentManifest.validate(cid = infoHashCid).tryGet == true check bitTorrentManifest.validate(cid = infoHashCid).tryGet == true
for testData in [
(
"1902d602db8c350f4f6d809ed01eff32f030da95",
"11141902D602DB8C350F4F6D809ED01EFF32F030DA95",
),
(
"499B3A24C2C653C9600D0C22B33EC504ECCA1999AAF56E559505F342A2062497",
"1220499B3A24C2C653C9600D0C22B33EC504ECCA1999AAF56E559505F342A2062497",
),
(
"1220499B3A24C2C653C9600D0C22B33EC504ECCA1999AAF56E559505F342A2062497",
"1220499B3A24C2C653C9600D0C22B33EC504ECCA1999AAF56E559505F342A2062497",
),
(
"11141902D602DB8C350F4F6D809ED01EFF32F030DA95",
"11141902D602DB8C350F4F6D809ED01EFF32F030DA95",
),
]:
let (input, expectedOutput) = testData
test fmt"Build MultiHash from '{input}'":
let hash = BitTorrentInfo.buildMultiHash(input).tryGet
check hash.hex == expectedOutput