diff --git a/codex/node.nim b/codex/node.nim index 1f97b2ad..cd77c972 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -138,10 +138,14 @@ proc fetchBatched*( proc retrieve*( node: CodexNodeRef, - cid: Cid): Future[?!LPStream] {.async.} = + cid: Cid, + local: bool = true): Future[?!LPStream] {.async.} = ## Retrieve by Cid a single block or an entire dataset described by manifest ## + if local and not await (cid in node.blockStore): + return failure((ref BlockNotFoundError)(msg: "Block not found in local store")) + if manifest =? (await node.fetchManifest(cid)): trace "Retrieving blocks from manifest", cid if manifest.protected: diff --git a/codex/rest/api.nim b/codex/rest/api.nim index bb24c83c..2631ed9c 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -51,7 +51,7 @@ proc validate( 0 proc formatManifestBlocks(node: CodexNodeRef): Future[JsonNode] {.async.} = - var content: seq[RestContent] = @[] + var content: seq[RestContent] proc formatManifest(cid: Cid, manifest: Manifest) = let restContent = RestContent.init(cid, manifest) @@ -60,6 +60,51 @@ proc formatManifestBlocks(node: CodexNodeRef): Future[JsonNode] {.async.} = await node.iterateManifests(formatManifest) return %content +proc retrieveCid( + node: CodexNodeRef, + cid: Cid, + local: bool = true, + resp: HttpResponseRef): Future[RestApiResponse] {.async.} = + ## Download a file from the node in a streaming + ## manner + ## + + var + stream: LPStream + + var bytes = 0 + try: + without stream =? (await node.retrieve(cid, local)), error: + if error of BlockNotFoundError: + return RestApiResponse.error(Http404, error.msg) + else: + return RestApiResponse.error(Http500, error.msg) + + resp.addHeader("Content-Type", "application/octet-stream") + await resp.prepareChunked() + + while not stream.atEof: + var + buff = newSeqUninitialized[byte](DefaultBlockSize.int) + len = await stream.readOnce(addr buff[0], buff.len) + + buff.setLen(len) + if buff.len <= 0: + break + + bytes += buff.len + trace "Sending chunk", size = buff.len + await resp.sendChunk(addr buff[0], buff.len) + await resp.finish() + codex_api_downloads.inc() + except CatchableError as exc: + trace "Excepting streaming blocks", exc = exc.msg + return RestApiResponse.error(Http500) + finally: + trace "Sent bytes", cid = cid, bytes + if not stream.isNil: + await stream.close() + proc initDataApi(node: CodexNodeRef, router: var RestRouter) = router.rawApi( MethodPost, @@ -105,7 +150,7 @@ proc initDataApi(node: CodexNodeRef, router: var RestRouter) = router.api( MethodGet, - "/api/codex/v1/local") do () -> RestApiResponse: + "/api/codex/v1/data") do () -> RestApiResponse: let json = await formatManifestBlocks(node) return RestApiResponse.response($json, contentType="application/json") @@ -113,7 +158,20 @@ proc initDataApi(node: CodexNodeRef, router: var RestRouter) = MethodGet, "/api/codex/v1/data/{cid}") do ( cid: Cid, resp: HttpResponseRef) -> RestApiResponse: - ## Download a file from the node in a streaming + ## Download a file from the local node in a streaming + ## manner + if cid.isErr: + return RestApiResponse.error( + Http400, + $cid.error()) + + await node.retrieveCid(cid.get(), local = true, resp=resp) + + router.api( + MethodGet, + "/api/codex/v1/data/{cid}/network") do ( + cid: Cid, resp: HttpResponseRef) -> RestApiResponse: + ## Download a file from the network in a streaming ## manner ## @@ -122,38 +180,7 @@ proc initDataApi(node: CodexNodeRef, router: var RestRouter) = Http400, $cid.error()) - var - stream: LPStream - - var bytes = 0 - try: - without stream =? (await node.retrieve(cid.get())), error: - return RestApiResponse.error(Http404, error.msg) - - resp.addHeader("Content-Type", "application/octet-stream") - await resp.prepareChunked() - - while not stream.atEof: - var - buff = newSeqUninitialized[byte](DefaultBlockSize.int) - len = await stream.readOnce(addr buff[0], buff.len) - - buff.setLen(len) - if buff.len <= 0: - break - - bytes += buff.len - trace "Sending chunk", size = buff.len - await resp.sendChunk(addr buff[0], buff.len) - await resp.finish() - codex_api_downloads.inc() - except CatchableError as exc: - trace "Excepting streaming blocks", exc = exc.msg - return RestApiResponse.error(Http500) - finally: - trace "Sent bytes", cid = cid.get(), bytes - if not stream.isNil: - await stream.close() + await node.retrieveCid(cid.get(), local = false, resp=resp) proc initSalesApi(node: CodexNodeRef, router: var RestRouter) = router.api( diff --git a/openapi.yaml b/openapi.yaml index a4dbb17a..40eb3288 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -287,7 +287,7 @@ paths: "400": description: Peer either not found or was not possible to dial - "/local": + "/data": get: summary: "Lists manifest CIDs stored locally in node." tags: [ Data ] @@ -307,8 +307,6 @@ paths: description: Content specified by the CID is not found "500": description: Well it was bad-bad - - "/data": post: summary: "Upload a file in a streaming manner. Once finished, the file is stored in the node and can be retrieved by any node in the network using the returned CID." tags: [ Data ] @@ -331,9 +329,37 @@ paths: "/data/{cid}": get: - summary: "Download a file from the node in a streaming manner. If the file is not available locally, it will be retrieved from other nodes in the network if able." + summary: "Download a file from the local node in a streaming manner. If the file is not available locally, a 404 is returned." tags: [ Data ] - operationId: download + operationId: downloadLocal + parameters: + - in: path + name: cid + required: true + schema: + $ref: "#/components/schemas/Cid" + description: File to be downloaded. + + responses: + "200": + description: Retrieved content specified by CID + content: + application/octet-stream: + schema: + type: string + format: binary + "400": + description: Invalid CID is specified + "404": + description: Content specified by the CID is unavailable locally + "500": + description: Well it was bad-bad + + "/data/{cid}/network": + get: + summary: "Download a file from the network in a streaming manner. If the file is not available locally, it will be retrieved from other nodes in the network if able." + tags: [ Data ] + operationId: downloadNetwork parameters: - in: path name: cid diff --git a/tests/integration/codexclient.nim b/tests/integration/codexclient.nim index 65665039..b0b24bf9 100644 --- a/tests/integration/codexclient.nim +++ b/tests/integration/codexclient.nim @@ -1,5 +1,7 @@ import std/httpclient import std/strutils +import std/sequtils + from pkg/libp2p import Cid, `$`, init import pkg/chronicles import pkg/stint @@ -31,6 +33,27 @@ proc upload*(client: CodexClient, contents: string): ?!Cid = assert response.status == "200 OK" Cid.init(response.body).mapFailure +proc download*(client: CodexClient, cid: Cid, local = false): ?!string = + let + response = client.http.get( + client.baseurl & "/data/" & $cid & + (if local: "" else: "/network")) + + if response.status != "200 OK": + return failure(response.status) + + success response.body + +proc list*(client: CodexClient): ?!seq[RestContent] = + let url = client.baseurl & "/data" + let response = client.http.get(url) + + if response.status != "200 OK": + return failure(response.status) + + let json = ? parseJson(response.body).catch + seq[RestContent].fromJson(json) + proc requestStorage*( client: CodexClient, cid: Cid, diff --git a/tests/integration/testIntegration.nim b/tests/integration/testIntegration.nim index 9b311a05..9d1cd1c4 100644 --- a/tests/integration/testIntegration.nim +++ b/tests/integration/testIntegration.nim @@ -43,6 +43,53 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false: let cid2 = client1.upload("some other contents").get check cid1 != cid2 + test "node allows local file downloads": + let content1 = "some file contents" + let content2 = "some other contents" + + let cid1 = client1.upload(content1).get + let cid2 = client2.upload(content2).get + + let resp1 = client1.download(cid1, local = true).get + let resp2 = client2.download(cid2, local = true).get + + check: + content1 == resp1 + content2 == resp2 + + test "node allows remote file downloads": + let content1 = "some file contents" + let content2 = "some other contents" + + let cid1 = client1.upload(content1).get + let cid2 = client2.upload(content2).get + + let resp2 = client1.download(cid2, local = false).get + let resp1 = client2.download(cid1, local = false).get + + check: + content1 == resp1 + content2 == resp2 + + test "node fails retrieving non-existing local file": + let content1 = "some file contents" + let cid1 = client1.upload(content1).get # upload to first node + let resp2 = client2.download(cid1, local = true) # try retrieving from second node + + check: + resp2.error.msg == "404 Not Found" + + test "node lists local files": + let content1 = "some file contents" + let content2 = "some other contents" + + let cid1 = client1.upload(content1).get + let cid2 = client1.upload(content2).get + let list = client1.list().get + + check: + [cid1, cid2].allIt(it in list.mapIt(it.cid)) + test "node handles new storage availability": let availability1 = client1.postAvailability(size=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get let availability2 = client1.postAvailability(size=4.u256, duration=5.u256, minPrice=6.u256, maxCollateral=7.u256).get diff --git a/tests/integration/testblockexpiration.nim b/tests/integration/testblockexpiration.nim index 9ffbf1a1..fc2bd0bd 100644 --- a/tests/integration/testblockexpiration.nim +++ b/tests/integration/testblockexpiration.nim @@ -44,19 +44,21 @@ ethersuite "Node block expiration tests": client.close() uploadResponse.body - proc downloadTestFile(contentId: string): Response = + proc downloadTestFile(contentId: string, local = false): Response = let client = newHttpClient(timeout=3000) - let downloadUrl = baseurl & "/data/" & contentId + let downloadUrl = baseurl & "/data/" & + contentId & (if local: "" else: "/network") + let content = client.get(downloadUrl) client.close() content proc hasFile(contentId: string): bool = let client = newHttpClient(timeout=3000) - let dataLocalUrl = baseurl & "/local" + let dataLocalUrl = baseurl & "/data/" & contentId let content = client.get(dataLocalUrl) client.close() - return content.body.contains(contentId) + content.code == Http200 test "node retains not-expired file": startTestNode(blockTtlSeconds = 10) @@ -65,7 +67,7 @@ ethersuite "Node block expiration tests": await sleepAsync(2.seconds) - let response = downloadTestFile(contentId) + let response = downloadTestFile(contentId, local = true) check: hasFile(contentId) response.status == "200 OK" @@ -80,6 +82,4 @@ ethersuite "Node block expiration tests": check: not hasFile(contentId) - - expect TimeoutError: - discard downloadTestFile(contentId) + downloadTestFile(contentId, local = true).code == Http404