Fix REST endpoints semantics (#612)
* Fix REST endpoints semantics * update endpoint description * update, operation id * Adding enum support * make enum descerializer public * add support for listing manifests * test `/data` endpoint to list local manifests * debug leftovers * remove commented out line
This commit is contained in:
parent
70efd13c73
commit
ec8d0c98b2
|
@ -138,10 +138,14 @@ proc fetchBatched*(
|
||||||
|
|
||||||
proc retrieve*(
|
proc retrieve*(
|
||||||
node: CodexNodeRef,
|
node: CodexNodeRef,
|
||||||
cid: Cid): Future[?!LPStream] {.async.} =
|
cid: Cid,
|
||||||
|
local: bool = true): Future[?!LPStream] {.async.} =
|
||||||
## Retrieve by Cid a single block or an entire dataset described by manifest
|
## Retrieve by Cid a single block or an entire dataset described by manifest
|
||||||
##
|
##
|
||||||
|
|
||||||
|
if local and not await (cid in node.blockStore):
|
||||||
|
return failure((ref BlockNotFoundError)(msg: "Block not found in local store"))
|
||||||
|
|
||||||
if manifest =? (await node.fetchManifest(cid)):
|
if manifest =? (await node.fetchManifest(cid)):
|
||||||
trace "Retrieving blocks from manifest", cid
|
trace "Retrieving blocks from manifest", cid
|
||||||
if manifest.protected:
|
if manifest.protected:
|
||||||
|
|
|
@ -51,7 +51,7 @@ proc validate(
|
||||||
0
|
0
|
||||||
|
|
||||||
proc formatManifestBlocks(node: CodexNodeRef): Future[JsonNode] {.async.} =
|
proc formatManifestBlocks(node: CodexNodeRef): Future[JsonNode] {.async.} =
|
||||||
var content: seq[RestContent] = @[]
|
var content: seq[RestContent]
|
||||||
|
|
||||||
proc formatManifest(cid: Cid, manifest: Manifest) =
|
proc formatManifest(cid: Cid, manifest: Manifest) =
|
||||||
let restContent = RestContent.init(cid, manifest)
|
let restContent = RestContent.init(cid, manifest)
|
||||||
|
@ -60,6 +60,51 @@ proc formatManifestBlocks(node: CodexNodeRef): Future[JsonNode] {.async.} =
|
||||||
await node.iterateManifests(formatManifest)
|
await node.iterateManifests(formatManifest)
|
||||||
return %content
|
return %content
|
||||||
|
|
||||||
|
proc retrieveCid(
|
||||||
|
node: CodexNodeRef,
|
||||||
|
cid: Cid,
|
||||||
|
local: bool = true,
|
||||||
|
resp: HttpResponseRef): Future[RestApiResponse] {.async.} =
|
||||||
|
## Download a file from the node in a streaming
|
||||||
|
## manner
|
||||||
|
##
|
||||||
|
|
||||||
|
var
|
||||||
|
stream: LPStream
|
||||||
|
|
||||||
|
var bytes = 0
|
||||||
|
try:
|
||||||
|
without stream =? (await node.retrieve(cid, local)), error:
|
||||||
|
if error of BlockNotFoundError:
|
||||||
|
return RestApiResponse.error(Http404, error.msg)
|
||||||
|
else:
|
||||||
|
return RestApiResponse.error(Http500, error.msg)
|
||||||
|
|
||||||
|
resp.addHeader("Content-Type", "application/octet-stream")
|
||||||
|
await resp.prepareChunked()
|
||||||
|
|
||||||
|
while not stream.atEof:
|
||||||
|
var
|
||||||
|
buff = newSeqUninitialized[byte](DefaultBlockSize.int)
|
||||||
|
len = await stream.readOnce(addr buff[0], buff.len)
|
||||||
|
|
||||||
|
buff.setLen(len)
|
||||||
|
if buff.len <= 0:
|
||||||
|
break
|
||||||
|
|
||||||
|
bytes += buff.len
|
||||||
|
trace "Sending chunk", size = buff.len
|
||||||
|
await resp.sendChunk(addr buff[0], buff.len)
|
||||||
|
await resp.finish()
|
||||||
|
codex_api_downloads.inc()
|
||||||
|
except CatchableError as exc:
|
||||||
|
trace "Excepting streaming blocks", exc = exc.msg
|
||||||
|
return RestApiResponse.error(Http500)
|
||||||
|
finally:
|
||||||
|
trace "Sent bytes", cid = cid, bytes
|
||||||
|
if not stream.isNil:
|
||||||
|
await stream.close()
|
||||||
|
|
||||||
proc initDataApi(node: CodexNodeRef, router: var RestRouter) =
|
proc initDataApi(node: CodexNodeRef, router: var RestRouter) =
|
||||||
router.rawApi(
|
router.rawApi(
|
||||||
MethodPost,
|
MethodPost,
|
||||||
|
@ -105,7 +150,7 @@ proc initDataApi(node: CodexNodeRef, router: var RestRouter) =
|
||||||
|
|
||||||
router.api(
|
router.api(
|
||||||
MethodGet,
|
MethodGet,
|
||||||
"/api/codex/v1/local") do () -> RestApiResponse:
|
"/api/codex/v1/data") do () -> RestApiResponse:
|
||||||
let json = await formatManifestBlocks(node)
|
let json = await formatManifestBlocks(node)
|
||||||
return RestApiResponse.response($json, contentType="application/json")
|
return RestApiResponse.response($json, contentType="application/json")
|
||||||
|
|
||||||
|
@ -113,7 +158,20 @@ proc initDataApi(node: CodexNodeRef, router: var RestRouter) =
|
||||||
MethodGet,
|
MethodGet,
|
||||||
"/api/codex/v1/data/{cid}") do (
|
"/api/codex/v1/data/{cid}") do (
|
||||||
cid: Cid, resp: HttpResponseRef) -> RestApiResponse:
|
cid: Cid, resp: HttpResponseRef) -> RestApiResponse:
|
||||||
## Download a file from the node in a streaming
|
## Download a file from the local node in a streaming
|
||||||
|
## manner
|
||||||
|
if cid.isErr:
|
||||||
|
return RestApiResponse.error(
|
||||||
|
Http400,
|
||||||
|
$cid.error())
|
||||||
|
|
||||||
|
await node.retrieveCid(cid.get(), local = true, resp=resp)
|
||||||
|
|
||||||
|
router.api(
|
||||||
|
MethodGet,
|
||||||
|
"/api/codex/v1/data/{cid}/network") do (
|
||||||
|
cid: Cid, resp: HttpResponseRef) -> RestApiResponse:
|
||||||
|
## Download a file from the network in a streaming
|
||||||
## manner
|
## manner
|
||||||
##
|
##
|
||||||
|
|
||||||
|
@ -122,38 +180,7 @@ proc initDataApi(node: CodexNodeRef, router: var RestRouter) =
|
||||||
Http400,
|
Http400,
|
||||||
$cid.error())
|
$cid.error())
|
||||||
|
|
||||||
var
|
await node.retrieveCid(cid.get(), local = false, resp=resp)
|
||||||
stream: LPStream
|
|
||||||
|
|
||||||
var bytes = 0
|
|
||||||
try:
|
|
||||||
without stream =? (await node.retrieve(cid.get())), error:
|
|
||||||
return RestApiResponse.error(Http404, error.msg)
|
|
||||||
|
|
||||||
resp.addHeader("Content-Type", "application/octet-stream")
|
|
||||||
await resp.prepareChunked()
|
|
||||||
|
|
||||||
while not stream.atEof:
|
|
||||||
var
|
|
||||||
buff = newSeqUninitialized[byte](DefaultBlockSize.int)
|
|
||||||
len = await stream.readOnce(addr buff[0], buff.len)
|
|
||||||
|
|
||||||
buff.setLen(len)
|
|
||||||
if buff.len <= 0:
|
|
||||||
break
|
|
||||||
|
|
||||||
bytes += buff.len
|
|
||||||
trace "Sending chunk", size = buff.len
|
|
||||||
await resp.sendChunk(addr buff[0], buff.len)
|
|
||||||
await resp.finish()
|
|
||||||
codex_api_downloads.inc()
|
|
||||||
except CatchableError as exc:
|
|
||||||
trace "Excepting streaming blocks", exc = exc.msg
|
|
||||||
return RestApiResponse.error(Http500)
|
|
||||||
finally:
|
|
||||||
trace "Sent bytes", cid = cid.get(), bytes
|
|
||||||
if not stream.isNil:
|
|
||||||
await stream.close()
|
|
||||||
|
|
||||||
proc initSalesApi(node: CodexNodeRef, router: var RestRouter) =
|
proc initSalesApi(node: CodexNodeRef, router: var RestRouter) =
|
||||||
router.api(
|
router.api(
|
||||||
|
|
36
openapi.yaml
36
openapi.yaml
|
@ -287,7 +287,7 @@ paths:
|
||||||
"400":
|
"400":
|
||||||
description: Peer either not found or was not possible to dial
|
description: Peer either not found or was not possible to dial
|
||||||
|
|
||||||
"/local":
|
"/data":
|
||||||
get:
|
get:
|
||||||
summary: "Lists manifest CIDs stored locally in node."
|
summary: "Lists manifest CIDs stored locally in node."
|
||||||
tags: [ Data ]
|
tags: [ Data ]
|
||||||
|
@ -307,8 +307,6 @@ paths:
|
||||||
description: Content specified by the CID is not found
|
description: Content specified by the CID is not found
|
||||||
"500":
|
"500":
|
||||||
description: Well it was bad-bad
|
description: Well it was bad-bad
|
||||||
|
|
||||||
"/data":
|
|
||||||
post:
|
post:
|
||||||
summary: "Upload a file in a streaming manner. Once finished, the file is stored in the node and can be retrieved by any node in the network using the returned CID."
|
summary: "Upload a file in a streaming manner. Once finished, the file is stored in the node and can be retrieved by any node in the network using the returned CID."
|
||||||
tags: [ Data ]
|
tags: [ Data ]
|
||||||
|
@ -331,9 +329,37 @@ paths:
|
||||||
|
|
||||||
"/data/{cid}":
|
"/data/{cid}":
|
||||||
get:
|
get:
|
||||||
summary: "Download a file from the node in a streaming manner. If the file is not available locally, it will be retrieved from other nodes in the network if able."
|
summary: "Download a file from the local node in a streaming manner. If the file is not available locally, a 404 is returned."
|
||||||
tags: [ Data ]
|
tags: [ Data ]
|
||||||
operationId: download
|
operationId: downloadLocal
|
||||||
|
parameters:
|
||||||
|
- in: path
|
||||||
|
name: cid
|
||||||
|
required: true
|
||||||
|
schema:
|
||||||
|
$ref: "#/components/schemas/Cid"
|
||||||
|
description: File to be downloaded.
|
||||||
|
|
||||||
|
responses:
|
||||||
|
"200":
|
||||||
|
description: Retrieved content specified by CID
|
||||||
|
content:
|
||||||
|
application/octet-stream:
|
||||||
|
schema:
|
||||||
|
type: string
|
||||||
|
format: binary
|
||||||
|
"400":
|
||||||
|
description: Invalid CID is specified
|
||||||
|
"404":
|
||||||
|
description: Content specified by the CID is unavailable locally
|
||||||
|
"500":
|
||||||
|
description: Well it was bad-bad
|
||||||
|
|
||||||
|
"/data/{cid}/network":
|
||||||
|
get:
|
||||||
|
summary: "Download a file from the network in a streaming manner. If the file is not available locally, it will be retrieved from other nodes in the network if able."
|
||||||
|
tags: [ Data ]
|
||||||
|
operationId: downloadNetwork
|
||||||
parameters:
|
parameters:
|
||||||
- in: path
|
- in: path
|
||||||
name: cid
|
name: cid
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
import std/httpclient
|
import std/httpclient
|
||||||
import std/strutils
|
import std/strutils
|
||||||
|
import std/sequtils
|
||||||
|
|
||||||
from pkg/libp2p import Cid, `$`, init
|
from pkg/libp2p import Cid, `$`, init
|
||||||
import pkg/chronicles
|
import pkg/chronicles
|
||||||
import pkg/stint
|
import pkg/stint
|
||||||
|
@ -31,6 +33,27 @@ proc upload*(client: CodexClient, contents: string): ?!Cid =
|
||||||
assert response.status == "200 OK"
|
assert response.status == "200 OK"
|
||||||
Cid.init(response.body).mapFailure
|
Cid.init(response.body).mapFailure
|
||||||
|
|
||||||
|
proc download*(client: CodexClient, cid: Cid, local = false): ?!string =
|
||||||
|
let
|
||||||
|
response = client.http.get(
|
||||||
|
client.baseurl & "/data/" & $cid &
|
||||||
|
(if local: "" else: "/network"))
|
||||||
|
|
||||||
|
if response.status != "200 OK":
|
||||||
|
return failure(response.status)
|
||||||
|
|
||||||
|
success response.body
|
||||||
|
|
||||||
|
proc list*(client: CodexClient): ?!seq[RestContent] =
|
||||||
|
let url = client.baseurl & "/data"
|
||||||
|
let response = client.http.get(url)
|
||||||
|
|
||||||
|
if response.status != "200 OK":
|
||||||
|
return failure(response.status)
|
||||||
|
|
||||||
|
let json = ? parseJson(response.body).catch
|
||||||
|
seq[RestContent].fromJson(json)
|
||||||
|
|
||||||
proc requestStorage*(
|
proc requestStorage*(
|
||||||
client: CodexClient,
|
client: CodexClient,
|
||||||
cid: Cid,
|
cid: Cid,
|
||||||
|
|
|
@ -43,6 +43,53 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false:
|
||||||
let cid2 = client1.upload("some other contents").get
|
let cid2 = client1.upload("some other contents").get
|
||||||
check cid1 != cid2
|
check cid1 != cid2
|
||||||
|
|
||||||
|
test "node allows local file downloads":
|
||||||
|
let content1 = "some file contents"
|
||||||
|
let content2 = "some other contents"
|
||||||
|
|
||||||
|
let cid1 = client1.upload(content1).get
|
||||||
|
let cid2 = client2.upload(content2).get
|
||||||
|
|
||||||
|
let resp1 = client1.download(cid1, local = true).get
|
||||||
|
let resp2 = client2.download(cid2, local = true).get
|
||||||
|
|
||||||
|
check:
|
||||||
|
content1 == resp1
|
||||||
|
content2 == resp2
|
||||||
|
|
||||||
|
test "node allows remote file downloads":
|
||||||
|
let content1 = "some file contents"
|
||||||
|
let content2 = "some other contents"
|
||||||
|
|
||||||
|
let cid1 = client1.upload(content1).get
|
||||||
|
let cid2 = client2.upload(content2).get
|
||||||
|
|
||||||
|
let resp2 = client1.download(cid2, local = false).get
|
||||||
|
let resp1 = client2.download(cid1, local = false).get
|
||||||
|
|
||||||
|
check:
|
||||||
|
content1 == resp1
|
||||||
|
content2 == resp2
|
||||||
|
|
||||||
|
test "node fails retrieving non-existing local file":
|
||||||
|
let content1 = "some file contents"
|
||||||
|
let cid1 = client1.upload(content1).get # upload to first node
|
||||||
|
let resp2 = client2.download(cid1, local = true) # try retrieving from second node
|
||||||
|
|
||||||
|
check:
|
||||||
|
resp2.error.msg == "404 Not Found"
|
||||||
|
|
||||||
|
test "node lists local files":
|
||||||
|
let content1 = "some file contents"
|
||||||
|
let content2 = "some other contents"
|
||||||
|
|
||||||
|
let cid1 = client1.upload(content1).get
|
||||||
|
let cid2 = client1.upload(content2).get
|
||||||
|
let list = client1.list().get
|
||||||
|
|
||||||
|
check:
|
||||||
|
[cid1, cid2].allIt(it in list.mapIt(it.cid))
|
||||||
|
|
||||||
test "node handles new storage availability":
|
test "node handles new storage availability":
|
||||||
let availability1 = client1.postAvailability(size=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get
|
let availability1 = client1.postAvailability(size=1.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get
|
||||||
let availability2 = client1.postAvailability(size=4.u256, duration=5.u256, minPrice=6.u256, maxCollateral=7.u256).get
|
let availability2 = client1.postAvailability(size=4.u256, duration=5.u256, minPrice=6.u256, maxCollateral=7.u256).get
|
||||||
|
|
|
@ -44,19 +44,21 @@ ethersuite "Node block expiration tests":
|
||||||
client.close()
|
client.close()
|
||||||
uploadResponse.body
|
uploadResponse.body
|
||||||
|
|
||||||
proc downloadTestFile(contentId: string): Response =
|
proc downloadTestFile(contentId: string, local = false): Response =
|
||||||
let client = newHttpClient(timeout=3000)
|
let client = newHttpClient(timeout=3000)
|
||||||
let downloadUrl = baseurl & "/data/" & contentId
|
let downloadUrl = baseurl & "/data/" &
|
||||||
|
contentId & (if local: "" else: "/network")
|
||||||
|
|
||||||
let content = client.get(downloadUrl)
|
let content = client.get(downloadUrl)
|
||||||
client.close()
|
client.close()
|
||||||
content
|
content
|
||||||
|
|
||||||
proc hasFile(contentId: string): bool =
|
proc hasFile(contentId: string): bool =
|
||||||
let client = newHttpClient(timeout=3000)
|
let client = newHttpClient(timeout=3000)
|
||||||
let dataLocalUrl = baseurl & "/local"
|
let dataLocalUrl = baseurl & "/data/" & contentId
|
||||||
let content = client.get(dataLocalUrl)
|
let content = client.get(dataLocalUrl)
|
||||||
client.close()
|
client.close()
|
||||||
return content.body.contains(contentId)
|
content.code == Http200
|
||||||
|
|
||||||
test "node retains not-expired file":
|
test "node retains not-expired file":
|
||||||
startTestNode(blockTtlSeconds = 10)
|
startTestNode(blockTtlSeconds = 10)
|
||||||
|
@ -65,7 +67,7 @@ ethersuite "Node block expiration tests":
|
||||||
|
|
||||||
await sleepAsync(2.seconds)
|
await sleepAsync(2.seconds)
|
||||||
|
|
||||||
let response = downloadTestFile(contentId)
|
let response = downloadTestFile(contentId, local = true)
|
||||||
check:
|
check:
|
||||||
hasFile(contentId)
|
hasFile(contentId)
|
||||||
response.status == "200 OK"
|
response.status == "200 OK"
|
||||||
|
@ -80,6 +82,4 @@ ethersuite "Node block expiration tests":
|
||||||
|
|
||||||
check:
|
check:
|
||||||
not hasFile(contentId)
|
not hasFile(contentId)
|
||||||
|
downloadTestFile(contentId, local = true).code == Http404
|
||||||
expect TimeoutError:
|
|
||||||
discard downloadTestFile(contentId)
|
|
||||||
|
|
Loading…
Reference in New Issue