Adds endpoint for listing files (manifests) in node. Useful for demo UI. (#599)
* Adds endpoint for listing files (manifests) in node. Useful for demo UI. * Moves upload/download/files into content API calls. * Cleans up json serialization for manifest * Cleans up some more json serialization * Moves block iteration and decoding to node.nim * Moves api methods into their own init procs. * Applies RestContent api object. * Replaces format methods with Rest objects in json.nim * Unused import * Review comments by Adam * Fixes issue where content/local endpoint clashes with content/cid. * faulty merge resolution * Renames content API to data. * Fixes faulty rebase * Adds test for data/local API * Renames local and download api.
This commit is contained in:
parent
7d4ea878d2
commit
cb02962231
|
@ -21,6 +21,7 @@ import pkg/chronicles
|
||||||
|
|
||||||
import ../errors
|
import ../errors
|
||||||
import ../utils
|
import ../utils
|
||||||
|
import ../utils/json
|
||||||
import ../units
|
import ../units
|
||||||
import ../blocktype
|
import ../blocktype
|
||||||
import ./types
|
import ./types
|
||||||
|
@ -29,14 +30,14 @@ export types
|
||||||
|
|
||||||
type
|
type
|
||||||
Manifest* = ref object of RootObj
|
Manifest* = ref object of RootObj
|
||||||
rootHash: ?Cid # Root (tree) hash of the contained data set
|
rootHash {.serialize.}: ?Cid # Root (tree) hash of the contained data set
|
||||||
originalBytes*: NBytes # Exact size of the original (uploaded) file
|
originalBytes* {.serialize.}: NBytes # Exact size of the original (uploaded) file
|
||||||
blockSize: NBytes # Size of each contained block (might not be needed if blocks are len-prefixed)
|
blockSize {.serialize.}: NBytes # Size of each contained block (might not be needed if blocks are len-prefixed)
|
||||||
blocks: seq[Cid] # Block Cid
|
blocks: seq[Cid] # Block Cid
|
||||||
version: CidVersion # Cid version
|
version: CidVersion # Cid version
|
||||||
hcodec: MultiCodec # Multihash codec
|
hcodec: MultiCodec # Multihash codec
|
||||||
codec: MultiCodec # Data set codec
|
codec: MultiCodec # Data set codec
|
||||||
case protected: bool # Protected datasets have erasure coded info
|
case protected {.serialize.}: bool # Protected datasets have erasure coded info
|
||||||
of true:
|
of true:
|
||||||
ecK: int # Number of blocks to encode
|
ecK: int # Number of blocks to encode
|
||||||
ecM: int # Number of resulting parity blocks
|
ecM: int # Number of resulting parity blocks
|
||||||
|
|
|
@ -60,6 +60,8 @@ type
|
||||||
discovery*: Discovery
|
discovery*: Discovery
|
||||||
contracts*: Contracts
|
contracts*: Contracts
|
||||||
|
|
||||||
|
OnManifest* = proc(cid: Cid, manifest: Manifest): void {.gcsafe, closure.}
|
||||||
|
|
||||||
proc findPeer*(
|
proc findPeer*(
|
||||||
node: CodexNodeRef,
|
node: CodexNodeRef,
|
||||||
peerId: PeerId): Future[?PeerRecord] {.async.} =
|
peerId: PeerId): Future[?PeerRecord] {.async.} =
|
||||||
|
@ -235,6 +237,23 @@ proc store*(
|
||||||
|
|
||||||
return manifest.cid.success
|
return manifest.cid.success
|
||||||
|
|
||||||
|
proc iterateManifests*(node: CodexNodeRef, onManifest: OnManifest) {.async.} =
|
||||||
|
without cids =? await node.blockStore.listBlocks(BlockType.Manifest):
|
||||||
|
warn "Failed to listBlocks"
|
||||||
|
return
|
||||||
|
|
||||||
|
for c in cids:
|
||||||
|
if cid =? await c:
|
||||||
|
without blk =? await node.blockStore.getBlock(cid):
|
||||||
|
warn "Failed to get manifest block by cid", cid
|
||||||
|
return
|
||||||
|
|
||||||
|
without manifest =? Manifest.decode(blk):
|
||||||
|
warn "Failed to decode manifest", cid
|
||||||
|
return
|
||||||
|
|
||||||
|
onManifest(cid, manifest)
|
||||||
|
|
||||||
proc requestStorage*(
|
proc requestStorage*(
|
||||||
self: CodexNodeRef,
|
self: CodexNodeRef,
|
||||||
cid: Cid,
|
cid: Cid,
|
||||||
|
|
|
@ -28,8 +28,6 @@ import pkg/confutils
|
||||||
import pkg/libp2p
|
import pkg/libp2p
|
||||||
import pkg/libp2p/routing_record
|
import pkg/libp2p/routing_record
|
||||||
import pkg/codexdht/discv5/spr as spr
|
import pkg/codexdht/discv5/spr as spr
|
||||||
import pkg/codexdht/discv5/routing_table as rt
|
|
||||||
import pkg/codexdht/discv5/node as dn
|
|
||||||
|
|
||||||
import ../node
|
import ../node
|
||||||
import ../blocktype
|
import ../blocktype
|
||||||
|
@ -52,173 +50,20 @@ proc validate(
|
||||||
{.gcsafe, raises: [Defect].} =
|
{.gcsafe, raises: [Defect].} =
|
||||||
0
|
0
|
||||||
|
|
||||||
proc formatAddress(address: Option[dn.Address]): string =
|
proc formatManifestBlocks(node: CodexNodeRef): Future[JsonNode] {.async.} =
|
||||||
if address.isSome():
|
var content: seq[RestContent] = @[]
|
||||||
return $address.get()
|
|
||||||
return "<none>"
|
|
||||||
|
|
||||||
proc formatNode(node: dn.Node): JsonNode =
|
proc formatManifest(cid: Cid, manifest: Manifest) =
|
||||||
let jobj = %*{
|
let restContent = RestContent.init(cid, manifest)
|
||||||
"nodeId": $node.id,
|
content.add(restContent)
|
||||||
"peerId": $node.record.data.peerId,
|
|
||||||
"record": $node.record,
|
|
||||||
"address": formatAddress(node.address),
|
|
||||||
"seen": $node.seen
|
|
||||||
}
|
|
||||||
return jobj
|
|
||||||
|
|
||||||
proc formatTable(routingTable: rt.RoutingTable): JsonNode =
|
await node.iterateManifests(formatManifest)
|
||||||
let jarray = newJArray()
|
return %content
|
||||||
for bucket in routingTable.buckets:
|
|
||||||
for node in bucket.nodes:
|
|
||||||
jarray.add(formatNode(node))
|
|
||||||
|
|
||||||
let jobj = %*{
|
|
||||||
"localNode": formatNode(routingTable.localNode),
|
|
||||||
"nodes": jarray
|
|
||||||
}
|
|
||||||
return jobj
|
|
||||||
|
|
||||||
proc formatPeerRecord(peerRecord: PeerRecord): JsonNode =
|
|
||||||
let jarray = newJArray()
|
|
||||||
for maddr in peerRecord.addresses:
|
|
||||||
jarray.add(%*{
|
|
||||||
"address": $maddr.address
|
|
||||||
})
|
|
||||||
|
|
||||||
let jobj = %*{
|
|
||||||
"peerId": $peerRecord.peerId,
|
|
||||||
"seqNo": $peerRecord.seqNo,
|
|
||||||
"addresses": jarray
|
|
||||||
}
|
|
||||||
return jobj
|
|
||||||
|
|
||||||
proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
|
|
||||||
var router = RestRouter.init(validate)
|
|
||||||
router.api(
|
|
||||||
MethodGet,
|
|
||||||
"/api/codex/v1/connect/{peerId}") do (
|
|
||||||
peerId: PeerId,
|
|
||||||
addrs: seq[MultiAddress]) -> RestApiResponse:
|
|
||||||
## Connect to a peer
|
|
||||||
##
|
|
||||||
## If `addrs` param is supplied, it will be used to
|
|
||||||
## dial the peer, otherwise the `peerId` is used
|
|
||||||
## to invoke peer discovery, if it succeeds
|
|
||||||
## the returned addresses will be used to dial
|
|
||||||
##
|
|
||||||
## `addrs` the listening addresses of the peers to dial, eg the one specified with `--listen-addrs`
|
|
||||||
##
|
|
||||||
|
|
||||||
if peerId.isErr:
|
|
||||||
return RestApiResponse.error(
|
|
||||||
Http400,
|
|
||||||
$peerId.error())
|
|
||||||
|
|
||||||
let addresses = if addrs.isOk and addrs.get().len > 0:
|
|
||||||
addrs.get()
|
|
||||||
else:
|
|
||||||
without peerRecord =? (await node.findPeer(peerId.get())):
|
|
||||||
return RestApiResponse.error(
|
|
||||||
Http400,
|
|
||||||
"Unable to find Peer!")
|
|
||||||
peerRecord.addresses.mapIt(it.address)
|
|
||||||
try:
|
|
||||||
await node.connect(peerId.get(), addresses)
|
|
||||||
return RestApiResponse.response("Successfully connected to peer")
|
|
||||||
except DialFailedError:
|
|
||||||
return RestApiResponse.error(Http400, "Unable to dial peer")
|
|
||||||
except CatchableError:
|
|
||||||
return RestApiResponse.error(Http400, "Unknown error dialling peer")
|
|
||||||
|
|
||||||
router.api(
|
|
||||||
MethodGet,
|
|
||||||
"/api/codex/v1/download/{id}") do (
|
|
||||||
id: Cid, resp: HttpResponseRef) -> RestApiResponse:
|
|
||||||
## Download a file from the node in a streaming
|
|
||||||
## manner
|
|
||||||
##
|
|
||||||
|
|
||||||
if id.isErr:
|
|
||||||
return RestApiResponse.error(
|
|
||||||
Http400,
|
|
||||||
$id.error())
|
|
||||||
|
|
||||||
var
|
|
||||||
stream: LPStream
|
|
||||||
|
|
||||||
var bytes = 0
|
|
||||||
try:
|
|
||||||
without stream =? (await node.retrieve(id.get())), error:
|
|
||||||
return RestApiResponse.error(Http404, error.msg)
|
|
||||||
|
|
||||||
resp.addHeader("Content-Type", "application/octet-stream")
|
|
||||||
await resp.prepareChunked()
|
|
||||||
|
|
||||||
while not stream.atEof:
|
|
||||||
var
|
|
||||||
buff = newSeqUninitialized[byte](DefaultBlockSize.int)
|
|
||||||
len = await stream.readOnce(addr buff[0], buff.len)
|
|
||||||
|
|
||||||
buff.setLen(len)
|
|
||||||
if buff.len <= 0:
|
|
||||||
break
|
|
||||||
|
|
||||||
bytes += buff.len
|
|
||||||
trace "Sending chunk", size = buff.len
|
|
||||||
await resp.sendChunk(addr buff[0], buff.len)
|
|
||||||
await resp.finish()
|
|
||||||
codex_api_downloads.inc()
|
|
||||||
except CatchableError as exc:
|
|
||||||
trace "Excepting streaming blocks", exc = exc.msg
|
|
||||||
return RestApiResponse.error(Http500)
|
|
||||||
finally:
|
|
||||||
trace "Sent bytes", cid = id.get(), bytes
|
|
||||||
if not stream.isNil:
|
|
||||||
await stream.close()
|
|
||||||
|
|
||||||
|
proc initDataApi(node: CodexNodeRef, router: var RestRouter) =
|
||||||
router.rawApi(
|
router.rawApi(
|
||||||
MethodPost,
|
MethodPost,
|
||||||
"/api/codex/v1/storage/request/{cid}") do (cid: Cid) -> RestApiResponse:
|
"/api/codex/v1/data") do (
|
||||||
## Create a request for storage
|
|
||||||
##
|
|
||||||
## cid - the cid of a previously uploaded dataset
|
|
||||||
## duration - the duration of the request in seconds
|
|
||||||
## proofProbability - how often storage proofs are required
|
|
||||||
## reward - the maximum amount of tokens paid per second per slot to hosts the client is willing to pay
|
|
||||||
## expiry - timestamp, in seconds, when the request expires if the Request does not find requested amount of nodes to host the data
|
|
||||||
## nodes - minimal number of nodes the content should be stored on
|
|
||||||
## tolerance - allowed number of nodes that can be lost before pronouncing the content lost
|
|
||||||
## colateral - requested collateral from hosts when they fill slot
|
|
||||||
|
|
||||||
without cid =? cid.tryGet.catch, error:
|
|
||||||
return RestApiResponse.error(Http400, error.msg)
|
|
||||||
|
|
||||||
let body = await request.getBody()
|
|
||||||
|
|
||||||
without params =? StorageRequestParams.fromJson(body), error:
|
|
||||||
return RestApiResponse.error(Http400, error.msg)
|
|
||||||
|
|
||||||
let nodes = params.nodes |? 1
|
|
||||||
let tolerance = params.tolerance |? 0
|
|
||||||
|
|
||||||
without purchaseId =? await node.requestStorage(
|
|
||||||
cid,
|
|
||||||
params.duration,
|
|
||||||
params.proofProbability,
|
|
||||||
nodes,
|
|
||||||
tolerance,
|
|
||||||
params.reward,
|
|
||||||
params.collateral,
|
|
||||||
params.expiry), error:
|
|
||||||
|
|
||||||
return RestApiResponse.error(Http500, error.msg)
|
|
||||||
|
|
||||||
return RestApiResponse.response(purchaseId.toHex)
|
|
||||||
|
|
||||||
router.rawApi(
|
|
||||||
MethodPost,
|
|
||||||
"/api/codex/v1/upload") do (
|
|
||||||
) -> RestApiResponse:
|
) -> RestApiResponse:
|
||||||
## Upload a file in a streaming manner
|
## Upload a file in a streaming manner
|
||||||
##
|
##
|
||||||
|
@ -258,69 +103,59 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
|
||||||
trace "Something went wrong error"
|
trace "Something went wrong error"
|
||||||
return RestApiResponse.error(Http500)
|
return RestApiResponse.error(Http500)
|
||||||
|
|
||||||
router.api(
|
|
||||||
MethodPost,
|
|
||||||
"/api/codex/v1/debug/chronicles/loglevel") do (
|
|
||||||
level: Option[string]) -> RestApiResponse:
|
|
||||||
## Set log level at run time
|
|
||||||
##
|
|
||||||
## e.g. `chronicles/loglevel?level=DEBUG`
|
|
||||||
##
|
|
||||||
## `level` - chronicles log level
|
|
||||||
##
|
|
||||||
|
|
||||||
without res =? level and level =? res:
|
|
||||||
return RestApiResponse.error(Http400, "Missing log level")
|
|
||||||
|
|
||||||
try:
|
|
||||||
{.gcsafe.}:
|
|
||||||
updateLogLevel(level)
|
|
||||||
except CatchableError as exc:
|
|
||||||
return RestApiResponse.error(Http500, exc.msg)
|
|
||||||
|
|
||||||
return RestApiResponse.response("")
|
|
||||||
|
|
||||||
router.api(
|
router.api(
|
||||||
MethodGet,
|
MethodGet,
|
||||||
"/api/codex/v1/debug/info") do () -> RestApiResponse:
|
"/api/codex/v1/local") do () -> RestApiResponse:
|
||||||
## Print rudimentary node information
|
let json = await formatManifestBlocks(node)
|
||||||
##
|
|
||||||
|
|
||||||
let
|
|
||||||
json = %*{
|
|
||||||
"id": $node.switch.peerInfo.peerId,
|
|
||||||
"addrs": node.switch.peerInfo.addrs.mapIt( $it ),
|
|
||||||
"repo": $conf.dataDir,
|
|
||||||
"spr":
|
|
||||||
if node.discovery.dhtRecord.isSome:
|
|
||||||
node.discovery.dhtRecord.get.toURI
|
|
||||||
else:
|
|
||||||
"",
|
|
||||||
"table": formatTable(node.discovery.protocol.routingTable),
|
|
||||||
"codex": {
|
|
||||||
"version": $codexVersion,
|
|
||||||
"revision": $codexRevision
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return RestApiResponse.response($json, contentType="application/json")
|
return RestApiResponse.response($json, contentType="application/json")
|
||||||
|
|
||||||
when codex_enable_api_debug_peers:
|
|
||||||
router.api(
|
router.api(
|
||||||
MethodGet,
|
MethodGet,
|
||||||
"/api/codex/v1/debug/peer/{peerId}") do (peerId: PeerId) -> RestApiResponse:
|
"/api/codex/v1/data/{cid}") do (
|
||||||
|
cid: Cid, resp: HttpResponseRef) -> RestApiResponse:
|
||||||
|
## Download a file from the node in a streaming
|
||||||
|
## manner
|
||||||
|
##
|
||||||
|
|
||||||
trace "debug/peer start"
|
if cid.isErr:
|
||||||
without peerRecord =? (await node.findPeer(peerId.get())):
|
|
||||||
trace "debug/peer peer not found!"
|
|
||||||
return RestApiResponse.error(
|
return RestApiResponse.error(
|
||||||
Http400,
|
Http400,
|
||||||
"Unable to find Peer!")
|
$cid.error())
|
||||||
|
|
||||||
let json = formatPeerRecord(peerRecord)
|
var
|
||||||
trace "debug/peer returning peer record"
|
stream: LPStream
|
||||||
return RestApiResponse.response($json)
|
|
||||||
|
|
||||||
|
var bytes = 0
|
||||||
|
try:
|
||||||
|
without stream =? (await node.retrieve(cid.get())), error:
|
||||||
|
return RestApiResponse.error(Http404, error.msg)
|
||||||
|
|
||||||
|
resp.addHeader("Content-Type", "application/octet-stream")
|
||||||
|
await resp.prepareChunked()
|
||||||
|
|
||||||
|
while not stream.atEof:
|
||||||
|
var
|
||||||
|
buff = newSeqUninitialized[byte](DefaultBlockSize.int)
|
||||||
|
len = await stream.readOnce(addr buff[0], buff.len)
|
||||||
|
|
||||||
|
buff.setLen(len)
|
||||||
|
if buff.len <= 0:
|
||||||
|
break
|
||||||
|
|
||||||
|
bytes += buff.len
|
||||||
|
trace "Sending chunk", size = buff.len
|
||||||
|
await resp.sendChunk(addr buff[0], buff.len)
|
||||||
|
await resp.finish()
|
||||||
|
codex_api_downloads.inc()
|
||||||
|
except CatchableError as exc:
|
||||||
|
trace "Excepting streaming blocks", exc = exc.msg
|
||||||
|
return RestApiResponse.error(Http500)
|
||||||
|
finally:
|
||||||
|
trace "Sent bytes", cid = cid.get(), bytes
|
||||||
|
if not stream.isNil:
|
||||||
|
await stream.close()
|
||||||
|
|
||||||
|
proc initSalesApi(node: CodexNodeRef, router: var RestRouter) =
|
||||||
router.api(
|
router.api(
|
||||||
MethodGet,
|
MethodGet,
|
||||||
"/api/codex/v1/sales/slots") do () -> RestApiResponse:
|
"/api/codex/v1/sales/slots") do () -> RestApiResponse:
|
||||||
|
@ -381,6 +216,46 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
|
||||||
return RestApiResponse.response(availability.toJson,
|
return RestApiResponse.response(availability.toJson,
|
||||||
contentType="application/json")
|
contentType="application/json")
|
||||||
|
|
||||||
|
proc initPurchasingApi(node: CodexNodeRef, router: var RestRouter) =
|
||||||
|
router.rawApi(
|
||||||
|
MethodPost,
|
||||||
|
"/api/codex/v1/storage/request/{cid}") do (cid: Cid) -> RestApiResponse:
|
||||||
|
## Create a request for storage
|
||||||
|
##
|
||||||
|
## cid - the cid of a previously uploaded dataset
|
||||||
|
## duration - the duration of the request in seconds
|
||||||
|
## proofProbability - how often storage proofs are required
|
||||||
|
## reward - the maximum amount of tokens paid per second per slot to hosts the client is willing to pay
|
||||||
|
## expiry - timestamp, in seconds, when the request expires if the Request does not find requested amount of nodes to host the data
|
||||||
|
## nodes - minimal number of nodes the content should be stored on
|
||||||
|
## tolerance - allowed number of nodes that can be lost before pronouncing the content lost
|
||||||
|
## colateral - requested collateral from hosts when they fill slot
|
||||||
|
|
||||||
|
without cid =? cid.tryGet.catch, error:
|
||||||
|
return RestApiResponse.error(Http400, error.msg)
|
||||||
|
|
||||||
|
let body = await request.getBody()
|
||||||
|
|
||||||
|
without params =? StorageRequestParams.fromJson(body), error:
|
||||||
|
return RestApiResponse.error(Http400, error.msg)
|
||||||
|
|
||||||
|
let nodes = params.nodes |? 1
|
||||||
|
let tolerance = params.tolerance |? 0
|
||||||
|
|
||||||
|
without purchaseId =? await node.requestStorage(
|
||||||
|
cid,
|
||||||
|
params.duration,
|
||||||
|
params.proofProbability,
|
||||||
|
nodes,
|
||||||
|
tolerance,
|
||||||
|
params.reward,
|
||||||
|
params.collateral,
|
||||||
|
params.expiry), error:
|
||||||
|
|
||||||
|
return RestApiResponse.error(Http500, error.msg)
|
||||||
|
|
||||||
|
return RestApiResponse.response(purchaseId.toHex)
|
||||||
|
|
||||||
router.api(
|
router.api(
|
||||||
MethodGet,
|
MethodGet,
|
||||||
"/api/codex/v1/storage/purchases/{id}") do (
|
"/api/codex/v1/storage/purchases/{id}") do (
|
||||||
|
@ -404,4 +279,114 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
|
||||||
|
|
||||||
return RestApiResponse.response($json, contentType="application/json")
|
return RestApiResponse.response($json, contentType="application/json")
|
||||||
|
|
||||||
|
proc initDebugApi(node: CodexNodeRef, conf: CodexConf, router: var RestRouter) =
|
||||||
|
router.api(
|
||||||
|
MethodGet,
|
||||||
|
"/api/codex/v1/debug/info") do () -> RestApiResponse:
|
||||||
|
## Print rudimentary node information
|
||||||
|
##
|
||||||
|
|
||||||
|
let table = RestRoutingTable.init(node.discovery.protocol.routingTable)
|
||||||
|
|
||||||
|
let
|
||||||
|
json = %*{
|
||||||
|
"id": $node.switch.peerInfo.peerId,
|
||||||
|
"addrs": node.switch.peerInfo.addrs.mapIt( $it ),
|
||||||
|
"repo": $conf.dataDir,
|
||||||
|
"spr":
|
||||||
|
if node.discovery.dhtRecord.isSome:
|
||||||
|
node.discovery.dhtRecord.get.toURI
|
||||||
|
else:
|
||||||
|
"",
|
||||||
|
"table": table,
|
||||||
|
"codex": {
|
||||||
|
"version": $codexVersion,
|
||||||
|
"revision": $codexRevision
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return RestApiResponse.response($json, contentType="application/json")
|
||||||
|
|
||||||
|
router.api(
|
||||||
|
MethodPost,
|
||||||
|
"/api/codex/v1/debug/chronicles/loglevel") do (
|
||||||
|
level: Option[string]) -> RestApiResponse:
|
||||||
|
## Set log level at run time
|
||||||
|
##
|
||||||
|
## e.g. `chronicles/loglevel?level=DEBUG`
|
||||||
|
##
|
||||||
|
## `level` - chronicles log level
|
||||||
|
##
|
||||||
|
|
||||||
|
without res =? level and level =? res:
|
||||||
|
return RestApiResponse.error(Http400, "Missing log level")
|
||||||
|
|
||||||
|
try:
|
||||||
|
{.gcsafe.}:
|
||||||
|
updateLogLevel(level)
|
||||||
|
except CatchableError as exc:
|
||||||
|
return RestApiResponse.error(Http500, exc.msg)
|
||||||
|
|
||||||
|
return RestApiResponse.response("")
|
||||||
|
|
||||||
|
when codex_enable_api_debug_peers:
|
||||||
|
router.api(
|
||||||
|
MethodGet,
|
||||||
|
"/api/codex/v1/debug/peer/{peerId}") do (peerId: PeerId) -> RestApiResponse:
|
||||||
|
|
||||||
|
trace "debug/peer start"
|
||||||
|
without peerRecord =? (await node.findPeer(peerId.get())):
|
||||||
|
trace "debug/peer peer not found!"
|
||||||
|
return RestApiResponse.error(
|
||||||
|
Http400,
|
||||||
|
"Unable to find Peer!")
|
||||||
|
|
||||||
|
let json = %RestPeerRecord.init(peerRecord)
|
||||||
|
trace "debug/peer returning peer record"
|
||||||
|
return RestApiResponse.response($json)
|
||||||
|
|
||||||
|
proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
|
||||||
|
var router = RestRouter.init(validate)
|
||||||
|
|
||||||
|
initDataApi(node, router)
|
||||||
|
initSalesApi(node, router)
|
||||||
|
initPurchasingApi(node, router)
|
||||||
|
initDebugApi(node, conf, router)
|
||||||
|
|
||||||
|
router.api(
|
||||||
|
MethodGet,
|
||||||
|
"/api/codex/v1/connect/{peerId}") do (
|
||||||
|
peerId: PeerId,
|
||||||
|
addrs: seq[MultiAddress]) -> RestApiResponse:
|
||||||
|
## Connect to a peer
|
||||||
|
##
|
||||||
|
## If `addrs` param is supplied, it will be used to
|
||||||
|
## dial the peer, otherwise the `peerId` is used
|
||||||
|
## to invoke peer discovery, if it succeeds
|
||||||
|
## the returned addresses will be used to dial
|
||||||
|
##
|
||||||
|
## `addrs` the listening addresses of the peers to dial, eg the one specified with `--listen-addrs`
|
||||||
|
##
|
||||||
|
|
||||||
|
if peerId.isErr:
|
||||||
|
return RestApiResponse.error(
|
||||||
|
Http400,
|
||||||
|
$peerId.error())
|
||||||
|
|
||||||
|
let addresses = if addrs.isOk and addrs.get().len > 0:
|
||||||
|
addrs.get()
|
||||||
|
else:
|
||||||
|
without peerRecord =? (await node.findPeer(peerId.get())):
|
||||||
|
return RestApiResponse.error(
|
||||||
|
Http400,
|
||||||
|
"Unable to find Peer!")
|
||||||
|
peerRecord.addresses.mapIt(it.address)
|
||||||
|
try:
|
||||||
|
await node.connect(peerId.get(), addresses)
|
||||||
|
return RestApiResponse.response("Successfully connected to peer")
|
||||||
|
except DialFailedError:
|
||||||
|
return RestApiResponse.error(Http400, "Unable to dial peer")
|
||||||
|
except CatchableError:
|
||||||
|
return RestApiResponse.error(Http400, "Unknown error dialling peer")
|
||||||
|
|
||||||
return router
|
return router
|
||||||
|
|
|
@ -1,9 +1,14 @@
|
||||||
import pkg/questionable
|
import pkg/questionable
|
||||||
import pkg/questionable/results
|
import pkg/questionable/results
|
||||||
import pkg/stew/byteutils
|
import pkg/stew/byteutils
|
||||||
|
import pkg/libp2p
|
||||||
|
import pkg/codexdht/discv5/node as dn
|
||||||
|
import pkg/codexdht/discv5/routing_table as rt
|
||||||
import ../sales
|
import ../sales
|
||||||
import ../purchasing
|
import ../purchasing
|
||||||
import ../utils/json
|
import ../utils/json
|
||||||
|
import ../units
|
||||||
|
import ../manifest
|
||||||
|
|
||||||
export json
|
export json
|
||||||
|
|
||||||
|
@ -29,9 +34,77 @@ type
|
||||||
minPrice* {.serialize.}: UInt256
|
minPrice* {.serialize.}: UInt256
|
||||||
maxCollateral* {.serialize.}: UInt256
|
maxCollateral* {.serialize.}: UInt256
|
||||||
|
|
||||||
|
RestContent* = object
|
||||||
|
cid* {.serialize.}: Cid
|
||||||
|
manifest* {.serialize.}: Manifest
|
||||||
|
|
||||||
|
RestNode* = object
|
||||||
|
nodeId* {.serialize.}: NodeId
|
||||||
|
peerId* {.serialize.}: PeerId
|
||||||
|
record* {.serialize.}: SignedPeerRecord
|
||||||
|
address* {.serialize.}: Option[dn.Address]
|
||||||
|
seen* {.serialize.}: bool
|
||||||
|
|
||||||
|
RestRoutingTable* = object
|
||||||
|
localNode* {.serialize.}: RestNode
|
||||||
|
nodes* {.serialize.}: seq[RestNode]
|
||||||
|
|
||||||
|
RestPeerRecord* = object
|
||||||
|
peerId* {.serialize.}: PeerId
|
||||||
|
seqNo* {.serialize.}: uint64
|
||||||
|
addresses* {.serialize.}: seq[AddressInfo]
|
||||||
|
|
||||||
|
proc init*(_: type RestContent, cid: Cid, manifest: Manifest): RestContent =
|
||||||
|
RestContent(
|
||||||
|
cid: cid,
|
||||||
|
manifest: manifest
|
||||||
|
)
|
||||||
|
|
||||||
|
proc init*(_: type RestNode, node: dn.Node): RestNode =
|
||||||
|
RestNode(
|
||||||
|
nodeId: node.id,
|
||||||
|
peerId: node.record.data.peerId,
|
||||||
|
record: node.record,
|
||||||
|
address: node.address,
|
||||||
|
seen: node.seen
|
||||||
|
)
|
||||||
|
|
||||||
|
proc init*(_: type RestRoutingTable, routingTable: rt.RoutingTable): RestRoutingTable =
|
||||||
|
var nodes: seq[RestNode] = @[]
|
||||||
|
for bucket in routingTable.buckets:
|
||||||
|
for node in bucket.nodes:
|
||||||
|
nodes.add(RestNode.init(node))
|
||||||
|
|
||||||
|
RestRoutingTable(
|
||||||
|
localNode: RestNode.init(routingTable.localNode),
|
||||||
|
nodes: nodes
|
||||||
|
)
|
||||||
|
|
||||||
|
proc init*(_: type RestPeerRecord, peerRecord: PeerRecord): RestPeerRecord =
|
||||||
|
RestPeerRecord(
|
||||||
|
peerId: peerRecord.peerId,
|
||||||
|
seqNo: peerRecord.seqNo,
|
||||||
|
addresses: peerRecord.addresses
|
||||||
|
)
|
||||||
|
|
||||||
func `%`*(obj: StorageRequest | Slot): JsonNode =
|
func `%`*(obj: StorageRequest | Slot): JsonNode =
|
||||||
let jsonObj = newJObject()
|
let jsonObj = newJObject()
|
||||||
for k, v in obj.fieldPairs: jsonObj[k] = %v
|
for k, v in obj.fieldPairs: jsonObj[k] = %v
|
||||||
jsonObj["id"] = %(obj.id)
|
jsonObj["id"] = %(obj.id)
|
||||||
|
|
||||||
return jsonObj
|
return jsonObj
|
||||||
|
|
||||||
|
func `%`*(obj: Cid): JsonNode =
|
||||||
|
% $obj
|
||||||
|
|
||||||
|
func `%`*(obj: PeerId): JsonNode =
|
||||||
|
% $obj
|
||||||
|
|
||||||
|
func `%`*(obj: SignedPeerRecord): JsonNode =
|
||||||
|
% $obj
|
||||||
|
|
||||||
|
func `%`*(obj: dn.Address): JsonNode =
|
||||||
|
% $obj
|
||||||
|
|
||||||
|
func `%`*(obj: AddressInfo): JsonNode =
|
||||||
|
% $obj.address
|
||||||
|
|
|
@ -80,6 +80,11 @@ method ensureExpiry*(
|
||||||
|
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
|
method listBlocks*(
|
||||||
|
self: NetworkStore,
|
||||||
|
blockType = BlockType.Manifest): Future[?!BlocksIter] =
|
||||||
|
self.localStore.listBlocks(blockType)
|
||||||
|
|
||||||
method delBlock*(self: NetworkStore, cid: Cid): Future[?!void] =
|
method delBlock*(self: NetworkStore, cid: Cid): Future[?!void] =
|
||||||
## Delete a block from the blockstore
|
## Delete a block from the blockstore
|
||||||
##
|
##
|
||||||
|
|
|
@ -110,7 +110,7 @@ We're now ready to upload a file to the network. In this example we'll use node
|
||||||
Replace `<FILE PATH>` with the path to the file you want to upload in the following command:
|
Replace `<FILE PATH>` with the path to the file you want to upload in the following command:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
curl -H "content-type: application/octet-stream" -H "Expect: 100-continue" -T "<FILE PATH>" 127.0.0.1:8080/api/codex/v1/upload -X POST
|
curl -H "content-type: application/octet-stream" -H "Expect: 100-continue" -T "<FILE PATH>" 127.0.0.1:8080/api/codex/v1/content -X POST
|
||||||
```
|
```
|
||||||
|
|
||||||
(Hint: if curl is reluctant to show you the response, add `-o <FILENAME>` to write the result to a file.)
|
(Hint: if curl is reluctant to show you the response, add `-o <FILENAME>` to write the result to a file.)
|
||||||
|
@ -122,7 +122,7 @@ Depending on the file size this may take a moment. Codex is processing the file
|
||||||
Replace `<CID>` with the identifier returned in the previous step. Replace `<OUTPUT FILE>` with the filename where you want to store the downloaded file.
|
Replace `<CID>` with the identifier returned in the previous step. Replace `<OUTPUT FILE>` with the filename where you want to store the downloaded file.
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
curl 127.0.0.1:8081/api/codex/v1/download/zdj7Wfm18wewSWL9SPqddhJuu5ii1TJD39rtt3JbVYdKcqM1K --output <OUTPUT FILE>
|
curl 127.0.0.1:8081/api/codex/v1/content/zdj7Wfm18wewSWL9SPqddhJuu5ii1TJD39rtt3JbVYdKcqM1K --output <OUTPUT FILE>
|
||||||
```
|
```
|
||||||
|
|
||||||
Notice we are connecting to the second node in order to download the file. The CID we provide contains the information needed to locate the file within the network.
|
Notice we are connecting to the second node in order to download the file. The CID we provide contains the information needed to locate the file within the network.
|
||||||
|
|
99
openapi.yaml
99
openapi.yaml
|
@ -205,6 +205,38 @@ components:
|
||||||
request:
|
request:
|
||||||
$ref: "#/components/schemas/StorageRequest"
|
$ref: "#/components/schemas/StorageRequest"
|
||||||
|
|
||||||
|
DataList:
|
||||||
|
type: object
|
||||||
|
properties:
|
||||||
|
content:
|
||||||
|
type: array
|
||||||
|
items:
|
||||||
|
$ref: "#/components/schemas/DataItem"
|
||||||
|
|
||||||
|
DataItem:
|
||||||
|
type: object
|
||||||
|
properties:
|
||||||
|
cid:
|
||||||
|
$ref: "#/components/schemas/Cid"
|
||||||
|
manifest:
|
||||||
|
$ref: "#/components/schemas/ManifestItem"
|
||||||
|
|
||||||
|
ManifestItem:
|
||||||
|
type: object
|
||||||
|
properties:
|
||||||
|
rootHash:
|
||||||
|
$ref: "#/components/schemas/Cid"
|
||||||
|
description: "Root hash of the content"
|
||||||
|
originalBytes:
|
||||||
|
type: number
|
||||||
|
description: "Length of original content in bytes"
|
||||||
|
blockSize:
|
||||||
|
type: number
|
||||||
|
description: "Size of blocks"
|
||||||
|
protected:
|
||||||
|
type: boolean
|
||||||
|
description: "Indicates if content is protected by erasure-coding"
|
||||||
|
|
||||||
servers:
|
servers:
|
||||||
- url: "http://localhost:8080/api/codex/v1"
|
- url: "http://localhost:8080/api/codex/v1"
|
||||||
|
|
||||||
|
@ -252,9 +284,51 @@ paths:
|
||||||
"400":
|
"400":
|
||||||
description: Peer either not found or was not possible to dial
|
description: Peer either not found or was not possible to dial
|
||||||
|
|
||||||
"/download/{cid}":
|
"/local":
|
||||||
get:
|
get:
|
||||||
summary: "Download a file from the node in a streaming manner"
|
summary: "Lists manifest CIDs stored locally in node."
|
||||||
|
tags: [ Data ]
|
||||||
|
operationId: listData
|
||||||
|
responses:
|
||||||
|
"200":
|
||||||
|
description: Retrieved list of content CIDs
|
||||||
|
content:
|
||||||
|
application/json:
|
||||||
|
schema:
|
||||||
|
type: array
|
||||||
|
items:
|
||||||
|
$ref: "#/components/schemas/DataList"
|
||||||
|
"400":
|
||||||
|
description: Invalid CID is specified
|
||||||
|
"404":
|
||||||
|
description: Content specified by the CID is not found
|
||||||
|
"500":
|
||||||
|
description: Well it was bad-bad
|
||||||
|
|
||||||
|
"/data":
|
||||||
|
post:
|
||||||
|
summary: "Upload a file in a streaming manner. Once finished, the file is stored in the node and can be retrieved by any node in the network using the returned CID."
|
||||||
|
tags: [ Data ]
|
||||||
|
operationId: upload
|
||||||
|
requestBody:
|
||||||
|
content:
|
||||||
|
application/octet-stream:
|
||||||
|
schema:
|
||||||
|
type: string
|
||||||
|
format: binary
|
||||||
|
responses:
|
||||||
|
"200":
|
||||||
|
description: CID of uploaded file
|
||||||
|
content:
|
||||||
|
text/plain:
|
||||||
|
schema:
|
||||||
|
type: string
|
||||||
|
"500":
|
||||||
|
description: Well it was bad-bad and the upload did not work out
|
||||||
|
|
||||||
|
"/data/{cid}":
|
||||||
|
get:
|
||||||
|
summary: "Download a file from the node in a streaming manner. If the file is not available locally, it will be retrieved from other nodes in the network if able."
|
||||||
tags: [ Data ]
|
tags: [ Data ]
|
||||||
operationId: download
|
operationId: download
|
||||||
parameters:
|
parameters:
|
||||||
|
@ -280,27 +354,6 @@ paths:
|
||||||
"500":
|
"500":
|
||||||
description: Well it was bad-bad
|
description: Well it was bad-bad
|
||||||
|
|
||||||
"/upload":
|
|
||||||
post:
|
|
||||||
summary: "Upload a file in a streaming manner"
|
|
||||||
tags: [ Data ]
|
|
||||||
operationId: upload
|
|
||||||
requestBody:
|
|
||||||
content:
|
|
||||||
application/octet-stream:
|
|
||||||
schema:
|
|
||||||
type: string
|
|
||||||
format: binary
|
|
||||||
responses:
|
|
||||||
"200":
|
|
||||||
description: CID of uploaded file
|
|
||||||
content:
|
|
||||||
text/plain:
|
|
||||||
schema:
|
|
||||||
type: string
|
|
||||||
"500":
|
|
||||||
description: Well it was bad-bad and the upload did not work out
|
|
||||||
|
|
||||||
"/sales/slots":
|
"/sales/slots":
|
||||||
get:
|
get:
|
||||||
summary: "Returns active slots"
|
summary: "Returns active slots"
|
||||||
|
|
|
@ -27,7 +27,7 @@ proc setLogLevel*(client: CodexClient, level: string) =
|
||||||
assert response.status == "200 OK"
|
assert response.status == "200 OK"
|
||||||
|
|
||||||
proc upload*(client: CodexClient, contents: string): ?!Cid =
|
proc upload*(client: CodexClient, contents: string): ?!Cid =
|
||||||
let response = client.http.post(client.baseurl & "/upload", contents)
|
let response = client.http.post(client.baseurl & "/data", contents)
|
||||||
assert response.status == "200 OK"
|
assert response.status == "200 OK"
|
||||||
Cid.init(response.body).mapFailure
|
Cid.init(response.body).mapFailure
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
import std/os
|
import std/os
|
||||||
import std/httpclient
|
import std/httpclient
|
||||||
|
import std/strutils
|
||||||
from std/net import TimeoutError
|
from std/net import TimeoutError
|
||||||
|
|
||||||
import pkg/chronos
|
import pkg/chronos
|
||||||
|
@ -37,7 +38,7 @@ ethersuite "Node block expiration tests":
|
||||||
|
|
||||||
proc uploadTestFile(): string =
|
proc uploadTestFile(): string =
|
||||||
let client = newHttpClient()
|
let client = newHttpClient()
|
||||||
let uploadUrl = baseurl & "/upload"
|
let uploadUrl = baseurl & "/data"
|
||||||
let uploadResponse = client.post(uploadUrl, content)
|
let uploadResponse = client.post(uploadUrl, content)
|
||||||
check uploadResponse.status == "200 OK"
|
check uploadResponse.status == "200 OK"
|
||||||
client.close()
|
client.close()
|
||||||
|
@ -45,11 +46,18 @@ ethersuite "Node block expiration tests":
|
||||||
|
|
||||||
proc downloadTestFile(contentId: string): Response =
|
proc downloadTestFile(contentId: string): Response =
|
||||||
let client = newHttpClient(timeout=3000)
|
let client = newHttpClient(timeout=3000)
|
||||||
let downloadUrl = baseurl & "/download/" & contentId
|
let downloadUrl = baseurl & "/data/" & contentId
|
||||||
let content = client.get(downloadUrl)
|
let content = client.get(downloadUrl)
|
||||||
client.close()
|
client.close()
|
||||||
content
|
content
|
||||||
|
|
||||||
|
proc hasFile(contentId: string): bool =
|
||||||
|
let client = newHttpClient(timeout=3000)
|
||||||
|
let dataLocalUrl = baseurl & "/local"
|
||||||
|
let content = client.get(dataLocalUrl)
|
||||||
|
client.close()
|
||||||
|
return content.body.contains(contentId)
|
||||||
|
|
||||||
test "node retains not-expired file":
|
test "node retains not-expired file":
|
||||||
startTestNode(blockTtlSeconds = 10)
|
startTestNode(blockTtlSeconds = 10)
|
||||||
|
|
||||||
|
@ -59,6 +67,7 @@ ethersuite "Node block expiration tests":
|
||||||
|
|
||||||
let response = downloadTestFile(contentId)
|
let response = downloadTestFile(contentId)
|
||||||
check:
|
check:
|
||||||
|
hasFile(contentId)
|
||||||
response.status == "200 OK"
|
response.status == "200 OK"
|
||||||
response.body == content
|
response.body == content
|
||||||
|
|
||||||
|
@ -69,5 +78,8 @@ ethersuite "Node block expiration tests":
|
||||||
|
|
||||||
await sleepAsync(3.seconds)
|
await sleepAsync(3.seconds)
|
||||||
|
|
||||||
|
check:
|
||||||
|
not hasFile(contentId)
|
||||||
|
|
||||||
expect TimeoutError:
|
expect TimeoutError:
|
||||||
discard downloadTestFile(contentId)
|
discard downloadTestFile(contentId)
|
||||||
|
|
Loading…
Reference in New Issue