From 2ab59f616d65bb490bb6099a287c2bd21f80711e Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Wed, 5 Mar 2025 03:22:56 +0100 Subject: [PATCH] makes torrent API ready for torrents v2, closes torrent streaming loop --- codex/bittorrent/manifest/manifest.nim | 24 ++++++- codex/node.nim | 14 ++--- codex/rest/api.nim | 83 ++++++++++++++++++++++++- codex/rest/coders.nim | 14 +++-- tests/codex/bittorrent/testmanifest.nim | 27 +++++++- 5 files changed, 144 insertions(+), 18 deletions(-) diff --git a/codex/bittorrent/manifest/manifest.nim b/codex/bittorrent/manifest/manifest.nim index 83a31ec2..113ce53f 100644 --- a/codex/bittorrent/manifest/manifest.nim +++ b/codex/bittorrent/manifest/manifest.nim @@ -1,7 +1,10 @@ import pkg/libp2p +import pkg/stew/byteutils import pkg/questionable import pkg/questionable/results +import ../../merkletree/codex/codex + import ../../errors import ../../codextypes import ../bencoding @@ -14,8 +17,6 @@ type pieces*: seq[BitTorrentPiece] name*: ?string - BitTorrentInfoHash* = MultiHash - BitTorrentManifest* = ref object info*: BitTorrentInfo codexManifestCid*: Cid @@ -55,3 +56,22 @@ func validate*(self: BitTorrentManifest, cid: Cid): ?!bool = without cidInfoHash =? cid.mhash.mapFailure, err: return failure(err.msg) return success(infoHash == cidInfoHash) + +func buildMultiHash*(_: type BitTorrentInfo, input: string): ?!MultiHash = + without bytes =? input.hexToSeqByte.catch, err: + return failure err.msg + without hash =? MultiHash.init(bytes): + without mhashMetaSha1 =? Sha1HashCodec.mhash, err: + return failure err.msg + if bytes.len == mhashMetaSha1.size: + without hash =? MultiHash.init($Sha1HashCodec, bytes).mapFailure, err: + return failure err.msg + return success hash + without mhashMetaSha256 =? Sha256HashCodec.mhash, err: + return failure err.msg + if bytes.len == mhashMetaSha256.size: + without hash =? MultiHash.init($Sha256HashCodec, bytes).mapFailure, err: + return failure err.msg + return success hash + return failure "given bytes is not a correct multihash" + return success hash diff --git a/codex/node.nim b/codex/node.nim index 55e6cee0..79bd962c 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -101,7 +101,7 @@ func discovery*(self: CodexNodeRef): Discovery = return self.discovery proc storeBitTorrentManifest*( - self: CodexNodeRef, manifest: BitTorrentManifest, infoHash: BitTorrentInfoHash + self: CodexNodeRef, manifest: BitTorrentManifest, infoHash: MultiHash ): Future[?!bt.Block] {.async.} = let encodedManifest = manifest.encode() @@ -490,13 +490,9 @@ proc streamTorrent( trace "Creating store stream for torrent manifest" stream.success -proc retrieveInfoHash*( - self: CodexNodeRef, infoHashString: string +proc retrieveTorrent*( + self: CodexNodeRef, infoHash: MultiHash ): Future[?!LPStream] {.async.} = - without infoHash =? MultiHash.init("sha1", infoHashString.hexToSeqByte).mapFailure, - err: - return failure(err) - without infoHashCid =? Cid.init(CIDv1, InfoHashV1Codec, infoHash).mapFailure, error: trace "Unable to create CID for BitTorrent info hash" return failure(error) @@ -652,11 +648,11 @@ proc store*( return manifestBlk.cid.success -proc storeBitTorrent*( +proc storeTorrent*( self: CodexNodeRef, stream: LPStream, info: BitTorrentInfo, - infoHash: BitTorrentInfoHash, + infoHash: MultiHash, mimetype: ?string = string.none, ): Future[?!Cid] {.async.} = info "Storing BitTorrent data" diff --git a/codex/rest/api.nim b/codex/rest/api.nim index dbebf49a..40fc0ae5 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -152,6 +152,80 @@ proc retrieveCid( if not lpStream.isNil: await lpStream.close() +proc retrieveInfoHash( + node: CodexNodeRef, infoHash: MultiHash, resp: HttpResponseRef +): Future[RestApiResponse] {.async.} = + ## Download torrent from the node in a streaming + ## manner + ## + var stream: LPStream + + var bytes = 0 + try: + without stream =? (await node.retrieveTorrent(infoHash)), error: + if error of BlockNotFoundError: + resp.status = Http404 + return await resp.sendBody("") + else: + resp.status = Http500 + return await resp.sendBody(error.msg) + + # It is ok to fetch again the manifest because it will hit the cache + without infoHashCid =? Cid.init(CIDv1, InfoHashV1Codec, infoHash).mapFailure, err: + error "Unable to create CID for BitTorrent info hash", err = err.msg + resp.status = Http404 + return await resp.sendBody(err.msg) + + without torrentManifest =? (await node.fetchTorrentManifest(infoHashCid)), err: + error "Unable to fetch Torrent Manifest", err = err.msg + resp.status = Http404 + return await resp.sendBody(err.msg) + + without codexManifest =? ( + await node.fetchManifest(torrentManifest.codexManifestCid) + ), err: + error "Unable to fetch Codex Manifest for torrent info hash", err = err.msg + resp.status = Http404 + return await resp.sendBody(err.msg) + + if codexManifest.mimetype.isSome: + resp.setHeader("Content-Type", codexManifest.mimetype.get()) + else: + resp.addHeader("Content-Type", "application/octet-stream") + + if codexManifest.filename.isSome: + resp.setHeader( + "Content-Disposition", + "attachment; filename=\"" & codexManifest.filename.get() & "\"", + ) + else: + resp.setHeader("Content-Disposition", "attachment") + + await resp.prepareChunked() + + while not stream.atEof: + var + buff = newSeqUninitialized[byte](int(NBytes 1024 * 16)) + len = await stream.readOnce(addr buff[0], buff.len) + + buff.setLen(len) + if buff.len <= 0: + break + + bytes += buff.len + + await resp.sendChunk(addr buff[0], buff.len) + await resp.finish() + codex_api_downloads.inc() + except CatchableError as exc: + warn "Error streaming blocks", exc = exc.msg + resp.status = Http500 + return await resp.sendBody("") + finally: + info "Sent bytes for torrent", infoHash = $infoHash, bytes + if not stream.isNil: + await stream.close() + proc buildCorsHeaders( httpMethod: string, allowedOrigin: Option[string] ): seq[(string, string)] = @@ -356,13 +430,20 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute without infoHash =? infoHash.mapFailure, error: return RestApiResponse.error(Http400, error.msg, headers = headers) + if infoHash.mcodec != Sha1HashCodec: + return RestApiResponse.error( + Http400, "Only torrents version 1 are currently supported!", headers = headers + ) + if corsOrigin =? allowedOrigin: resp.setCorsHeaders("GET", corsOrigin) resp.setHeader("Access-Control-Headers", "X-Requested-With") trace "torrent requested: ", multihash = $infoHash - return RestApiResponse.response(Http200) + await node.retrieveInfoHash(infoHash, resp = resp) + + # return RestApiResponse.response(Http200) router.api(MethodGet, "/api/codex/v1/data/{cid}/network/manifest") do( cid: Cid, resp: HttpResponseRef diff --git a/codex/rest/coders.nim b/codex/rest/coders.nim index 8d51251b..88688db6 100644 --- a/codex/rest/coders.nim +++ b/codex/rest/coders.nim @@ -22,6 +22,7 @@ import ../purchasing import ../utils/stintutils from ../codextypes import Sha1HashCodec +import ../bittorrent/manifest proc encodeString*(cid: type Cid): Result[string, cstring] = ok($cid) @@ -85,11 +86,14 @@ proc decodeString*( err e.msg.cstring proc decodeString*(_: type MultiHash, value: string): Result[MultiHash, cstring] = - try: - let bytes = value.hexToSeqByte - MultiHash.init($Sha1HashCodec, bytes) - except ValueError as e: - err e.msg.cstring + without mhash =? BitTorrentInfo.buildMultiHash(value), e: + return err e.msg.cstring + ok mhash + # try: + # let bytes = value.hexToSeqByte + # MultiHash.init($Sha1HashCodec, bytes) + # except ValueError as e: + # err e.msg.cstring proc decodeString*[T: PurchaseId | RequestId | Nonce | SlotId | AvailabilityId]( _: type T, value: string diff --git a/tests/codex/bittorrent/testmanifest.nim b/tests/codex/bittorrent/testmanifest.nim index 50e98e8d..a368d99f 100644 --- a/tests/codex/bittorrent/testmanifest.nim +++ b/tests/codex/bittorrent/testmanifest.nim @@ -1,11 +1,13 @@ import std/unittest +import std/strformat import pkg/libp2p/[cid, multicodec, multihash] import pkg/stew/byteutils import pkg/questionable import ../../examples -import ../../../codex/bittorrent/manifest + +import pkg/codex/bittorrent/manifest suite "BitTorrent manifest": # In the tests below, we use an example info dictionary @@ -49,3 +51,26 @@ suite "BitTorrent manifest": ) check bitTorrentManifest.validate(cid = infoHashCid).tryGet == true + + for testData in [ + ( + "1902d602db8c350f4f6d809ed01eff32f030da95", + "11141902D602DB8C350F4F6D809ED01EFF32F030DA95", + ), + ( + "499B3A24C2C653C9600D0C22B33EC504ECCA1999AAF56E559505F342A2062497", + "1220499B3A24C2C653C9600D0C22B33EC504ECCA1999AAF56E559505F342A2062497", + ), + ( + "1220499B3A24C2C653C9600D0C22B33EC504ECCA1999AAF56E559505F342A2062497", + "1220499B3A24C2C653C9600D0C22B33EC504ECCA1999AAF56E559505F342A2062497", + ), + ( + "11141902D602DB8C350F4F6D809ED01EFF32F030DA95", + "11141902D602DB8C350F4F6D809ED01EFF32F030DA95", + ), + ]: + let (input, expectedOutput) = testData + test fmt"Build MultiHash from '{input}'": + let hash = BitTorrentInfo.buildMultiHash(input).tryGet + check hash.hex == expectedOutput