mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-14 03:18:17 +00:00
Merge branch 'master' into blockexchange-uses-merkle-tree
This commit is contained in:
commit
eceebbff8f
@ -129,7 +129,7 @@ proc bootstrapInteractions(
|
||||
return (client, host, validator)
|
||||
|
||||
proc start*(s: CodexServer) {.async.} =
|
||||
notice "Starting codex node"
|
||||
trace "Starting codex node", config = $s.config
|
||||
|
||||
await s.repoStore.start()
|
||||
s.maintenance.start()
|
||||
|
||||
@ -28,8 +28,11 @@ import pkg/metrics
|
||||
import pkg/metrics/chronos_httpserver
|
||||
import pkg/stew/shims/net as stewnet
|
||||
import pkg/stew/shims/parseutils
|
||||
import pkg/stew/byteutils
|
||||
import pkg/libp2p
|
||||
import pkg/ethers
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
||||
import ./discovery
|
||||
import ./stores
|
||||
@ -260,6 +263,13 @@ type
|
||||
hidden
|
||||
.}: int
|
||||
|
||||
logFile* {.
|
||||
desc: "Logs to file"
|
||||
defaultValue: string.none
|
||||
name: "log-file"
|
||||
hidden
|
||||
.}: Option[string]
|
||||
|
||||
of initNode:
|
||||
discard
|
||||
|
||||
@ -445,9 +455,10 @@ proc updateLogLevel*(logLevel: string) {.upraises: [ValueError].} =
|
||||
warn "Unrecognized logging topic", topic = topicName
|
||||
|
||||
proc setupLogging*(conf: CodexConf) =
|
||||
when defaultChroniclesStream.outputs.type.arity != 2:
|
||||
when defaultChroniclesStream.outputs.type.arity != 3:
|
||||
warn "Logging configuration options not enabled in the current build"
|
||||
else:
|
||||
var logFile: ?IoHandle
|
||||
proc noOutput(logLevel: LogLevel, msg: LogOutputStr) = discard
|
||||
proc writeAndFlush(f: File, msg: LogOutputStr) =
|
||||
try:
|
||||
@ -462,6 +473,25 @@ proc setupLogging*(conf: CodexConf) =
|
||||
proc noColorsFlush(logLevel: LogLevel, msg: LogOutputStr) =
|
||||
writeAndFlush(stdout, stripAnsi(msg))
|
||||
|
||||
proc fileFlush(logLevel: LogLevel, msg: LogOutputStr) =
|
||||
if file =? logFile:
|
||||
if error =? file.writeFile(stripAnsi(msg).toBytes).errorOption:
|
||||
error "failed to write to log file", errorCode = $error
|
||||
|
||||
defaultChroniclesStream.outputs[2].writer = noOutput
|
||||
if logFilePath =? conf.logFile and logFilePath.len > 0:
|
||||
let logFileHandle = openFile(
|
||||
logFilePath,
|
||||
{OpenFlags.Write, OpenFlags.Create, OpenFlags.Truncate}
|
||||
)
|
||||
if logFileHandle.isErr:
|
||||
error "failed to open log file",
|
||||
path = logFilePath,
|
||||
errorCode = $logFileHandle.error
|
||||
else:
|
||||
logFile = logFileHandle.option
|
||||
defaultChroniclesStream.outputs[2].writer = fileFlush
|
||||
|
||||
defaultChroniclesStream.outputs[1].writer = noOutput
|
||||
|
||||
let writer =
|
||||
|
||||
@ -21,6 +21,7 @@ import pkg/chronicles
|
||||
|
||||
import ../errors
|
||||
import ../utils
|
||||
import ../utils/json
|
||||
import ../units
|
||||
import ../blocktype
|
||||
import ./types
|
||||
@ -29,17 +30,17 @@ export types
|
||||
|
||||
type
|
||||
Manifest* = ref object of RootObj
|
||||
treeCid: Cid # Root of the merkle tree
|
||||
datasetSize: NBytes # Total size of all blocks
|
||||
blockSize: NBytes # Size of each contained block (might not be needed if blocks are len-prefixed)
|
||||
version: CidVersion # Cid version
|
||||
hcodec: MultiCodec # Multihash codec
|
||||
codec: MultiCodec # Data set codec
|
||||
case protected: bool # Protected datasets have erasure coded info
|
||||
treeCid {.serialize.}: Cid # Root of the merkle tree
|
||||
datasetSize {.serialize.}: NBytes # Total size of all blocks
|
||||
blockSize {.serialize.}: NBytes # Size of each contained block (might not be needed if blocks are len-prefixed)
|
||||
version: CidVersion # Cid version
|
||||
hcodec: MultiCodec # Multihash codec
|
||||
codec: MultiCodec # Data set codec
|
||||
case protected {.serialize.}: bool # Protected datasets have erasure coded info
|
||||
of true:
|
||||
ecK: int # Number of blocks to encode
|
||||
ecM: int # Number of resulting parity blocks
|
||||
originalTreeCid: Cid # The original root of the dataset being erasure coded
|
||||
ecK: int # Number of blocks to encode
|
||||
ecM: int # Number of resulting parity blocks
|
||||
originalTreeCid: Cid # The original root of the dataset being erasure coded
|
||||
originalDatasetSize: NBytes
|
||||
else:
|
||||
discard
|
||||
|
||||
@ -63,6 +63,8 @@ type
|
||||
discovery*: Discovery
|
||||
contracts*: Contracts
|
||||
|
||||
OnManifest* = proc(cid: Cid, manifest: Manifest): void {.gcsafe, closure.}
|
||||
|
||||
proc findPeer*(
|
||||
node: CodexNodeRef,
|
||||
peerId: PeerId): Future[?PeerRecord] {.async.} =
|
||||
@ -267,6 +269,23 @@ proc store*(
|
||||
|
||||
return manifestBlk.cid.success
|
||||
|
||||
proc iterateManifests*(node: CodexNodeRef, onManifest: OnManifest) {.async.} =
|
||||
without cids =? await node.blockStore.listBlocks(BlockType.Manifest):
|
||||
warn "Failed to listBlocks"
|
||||
return
|
||||
|
||||
for c in cids:
|
||||
if cid =? await c:
|
||||
without blk =? await node.blockStore.getBlock(cid):
|
||||
warn "Failed to get manifest block by cid", cid
|
||||
return
|
||||
|
||||
without manifest =? Manifest.decode(blk):
|
||||
warn "Failed to decode manifest", cid
|
||||
return
|
||||
|
||||
onManifest(cid, manifest)
|
||||
|
||||
proc requestStorage*(
|
||||
self: CodexNodeRef,
|
||||
cid: Cid,
|
||||
|
||||
@ -28,8 +28,6 @@ import pkg/confutils
|
||||
import pkg/libp2p
|
||||
import pkg/libp2p/routing_record
|
||||
import pkg/codexdht/discv5/spr as spr
|
||||
import pkg/codexdht/discv5/routing_table as rt
|
||||
import pkg/codexdht/discv5/node as dn
|
||||
|
||||
import ../node
|
||||
import ../blocktype
|
||||
@ -52,173 +50,20 @@ proc validate(
|
||||
{.gcsafe, raises: [Defect].} =
|
||||
0
|
||||
|
||||
proc formatAddress(address: Option[dn.Address]): string =
|
||||
if address.isSome():
|
||||
return $address.get()
|
||||
return "<none>"
|
||||
proc formatManifestBlocks(node: CodexNodeRef): Future[JsonNode] {.async.} =
|
||||
var content: seq[RestContent] = @[]
|
||||
|
||||
proc formatNode(node: dn.Node): JsonNode =
|
||||
let jobj = %*{
|
||||
"nodeId": $node.id,
|
||||
"peerId": $node.record.data.peerId,
|
||||
"record": $node.record,
|
||||
"address": formatAddress(node.address),
|
||||
"seen": $node.seen
|
||||
}
|
||||
return jobj
|
||||
proc formatManifest(cid: Cid, manifest: Manifest) =
|
||||
let restContent = RestContent.init(cid, manifest)
|
||||
content.add(restContent)
|
||||
|
||||
proc formatTable(routingTable: rt.RoutingTable): JsonNode =
|
||||
let jarray = newJArray()
|
||||
for bucket in routingTable.buckets:
|
||||
for node in bucket.nodes:
|
||||
jarray.add(formatNode(node))
|
||||
|
||||
let jobj = %*{
|
||||
"localNode": formatNode(routingTable.localNode),
|
||||
"nodes": jarray
|
||||
}
|
||||
return jobj
|
||||
|
||||
proc formatPeerRecord(peerRecord: PeerRecord): JsonNode =
|
||||
let jarray = newJArray()
|
||||
for maddr in peerRecord.addresses:
|
||||
jarray.add(%*{
|
||||
"address": $maddr.address
|
||||
})
|
||||
|
||||
let jobj = %*{
|
||||
"peerId": $peerRecord.peerId,
|
||||
"seqNo": $peerRecord.seqNo,
|
||||
"addresses": jarray
|
||||
}
|
||||
return jobj
|
||||
|
||||
proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
|
||||
var router = RestRouter.init(validate)
|
||||
router.api(
|
||||
MethodGet,
|
||||
"/api/codex/v1/connect/{peerId}") do (
|
||||
peerId: PeerId,
|
||||
addrs: seq[MultiAddress]) -> RestApiResponse:
|
||||
## Connect to a peer
|
||||
##
|
||||
## If `addrs` param is supplied, it will be used to
|
||||
## dial the peer, otherwise the `peerId` is used
|
||||
## to invoke peer discovery, if it succeeds
|
||||
## the returned addresses will be used to dial
|
||||
##
|
||||
## `addrs` the listening addresses of the peers to dial, eg the one specified with `--listen-addrs`
|
||||
##
|
||||
|
||||
if peerId.isErr:
|
||||
return RestApiResponse.error(
|
||||
Http400,
|
||||
$peerId.error())
|
||||
|
||||
let addresses = if addrs.isOk and addrs.get().len > 0:
|
||||
addrs.get()
|
||||
else:
|
||||
without peerRecord =? (await node.findPeer(peerId.get())):
|
||||
return RestApiResponse.error(
|
||||
Http400,
|
||||
"Unable to find Peer!")
|
||||
peerRecord.addresses.mapIt(it.address)
|
||||
try:
|
||||
await node.connect(peerId.get(), addresses)
|
||||
return RestApiResponse.response("Successfully connected to peer")
|
||||
except DialFailedError:
|
||||
return RestApiResponse.error(Http400, "Unable to dial peer")
|
||||
except CatchableError:
|
||||
return RestApiResponse.error(Http400, "Unknown error dialling peer")
|
||||
|
||||
router.api(
|
||||
MethodGet,
|
||||
"/api/codex/v1/download/{id}") do (
|
||||
id: Cid, resp: HttpResponseRef) -> RestApiResponse:
|
||||
## Download a file from the node in a streaming
|
||||
## manner
|
||||
##
|
||||
|
||||
if id.isErr:
|
||||
return RestApiResponse.error(
|
||||
Http400,
|
||||
$id.error())
|
||||
|
||||
var
|
||||
stream: LPStream
|
||||
|
||||
var bytes = 0
|
||||
try:
|
||||
without stream =? (await node.retrieve(id.get())), error:
|
||||
return RestApiResponse.error(Http404, error.msg)
|
||||
|
||||
resp.addHeader("Content-Type", "application/octet-stream")
|
||||
await resp.prepareChunked()
|
||||
|
||||
while not stream.atEof:
|
||||
var
|
||||
buff = newSeqUninitialized[byte](DefaultBlockSize.int)
|
||||
len = await stream.readOnce(addr buff[0], buff.len)
|
||||
|
||||
buff.setLen(len)
|
||||
if buff.len <= 0:
|
||||
break
|
||||
|
||||
bytes += buff.len
|
||||
trace "Sending chunk", size = buff.len
|
||||
await resp.sendChunk(addr buff[0], buff.len)
|
||||
await resp.finish()
|
||||
codex_api_downloads.inc()
|
||||
except CatchableError as exc:
|
||||
trace "Excepting streaming blocks", exc = exc.msg
|
||||
return RestApiResponse.error(Http500)
|
||||
finally:
|
||||
trace "Sent bytes", cid = id.get(), bytes
|
||||
if not stream.isNil:
|
||||
await stream.close()
|
||||
await node.iterateManifests(formatManifest)
|
||||
return %content
|
||||
|
||||
proc initDataApi(node: CodexNodeRef, router: var RestRouter) =
|
||||
router.rawApi(
|
||||
MethodPost,
|
||||
"/api/codex/v1/storage/request/{cid}") do (cid: Cid) -> RestApiResponse:
|
||||
## Create a request for storage
|
||||
##
|
||||
## cid - the cid of a previously uploaded dataset
|
||||
## duration - the duration of the request in seconds
|
||||
## proofProbability - how often storage proofs are required
|
||||
## reward - the maximum amount of tokens paid per second per slot to hosts the client is willing to pay
|
||||
## expiry - timestamp, in seconds, when the request expires if the Request does not find requested amount of nodes to host the data
|
||||
## nodes - minimal number of nodes the content should be stored on
|
||||
## tolerance - allowed number of nodes that can be lost before pronouncing the content lost
|
||||
## colateral - requested collateral from hosts when they fill slot
|
||||
|
||||
without cid =? cid.tryGet.catch, error:
|
||||
return RestApiResponse.error(Http400, error.msg)
|
||||
|
||||
let body = await request.getBody()
|
||||
|
||||
without params =? StorageRequestParams.fromJson(body), error:
|
||||
return RestApiResponse.error(Http400, error.msg)
|
||||
|
||||
let nodes = params.nodes |? 1
|
||||
let tolerance = params.tolerance |? 0
|
||||
|
||||
without purchaseId =? await node.requestStorage(
|
||||
cid,
|
||||
params.duration,
|
||||
params.proofProbability,
|
||||
nodes,
|
||||
tolerance,
|
||||
params.reward,
|
||||
params.collateral,
|
||||
params.expiry), error:
|
||||
|
||||
return RestApiResponse.error(Http500, error.msg)
|
||||
|
||||
return RestApiResponse.response(purchaseId.toHex)
|
||||
|
||||
router.rawApi(
|
||||
MethodPost,
|
||||
"/api/codex/v1/upload") do (
|
||||
"/api/codex/v1/data") do (
|
||||
) -> RestApiResponse:
|
||||
## Upload a file in a streaming manner
|
||||
##
|
||||
@ -259,68 +104,58 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
|
||||
return RestApiResponse.error(Http500)
|
||||
|
||||
router.api(
|
||||
MethodPost,
|
||||
"/api/codex/v1/debug/chronicles/loglevel") do (
|
||||
level: Option[string]) -> RestApiResponse:
|
||||
## Set log level at run time
|
||||
##
|
||||
## e.g. `chronicles/loglevel?level=DEBUG`
|
||||
##
|
||||
## `level` - chronicles log level
|
||||
##
|
||||
|
||||
without res =? level and level =? res:
|
||||
return RestApiResponse.error(Http400, "Missing log level")
|
||||
|
||||
try:
|
||||
{.gcsafe.}:
|
||||
updateLogLevel(level)
|
||||
except CatchableError as exc:
|
||||
return RestApiResponse.error(Http500, exc.msg)
|
||||
|
||||
return RestApiResponse.response("")
|
||||
MethodGet,
|
||||
"/api/codex/v1/local") do () -> RestApiResponse:
|
||||
let json = await formatManifestBlocks(node)
|
||||
return RestApiResponse.response($json, contentType="application/json")
|
||||
|
||||
router.api(
|
||||
MethodGet,
|
||||
"/api/codex/v1/debug/info") do () -> RestApiResponse:
|
||||
## Print rudimentary node information
|
||||
"/api/codex/v1/data/{cid}") do (
|
||||
cid: Cid, resp: HttpResponseRef) -> RestApiResponse:
|
||||
## Download a file from the node in a streaming
|
||||
## manner
|
||||
##
|
||||
|
||||
let
|
||||
json = %*{
|
||||
"id": $node.switch.peerInfo.peerId,
|
||||
"addrs": node.switch.peerInfo.addrs.mapIt( $it ),
|
||||
"repo": $conf.dataDir,
|
||||
"spr":
|
||||
if node.discovery.dhtRecord.isSome:
|
||||
node.discovery.dhtRecord.get.toURI
|
||||
else:
|
||||
"",
|
||||
"table": formatTable(node.discovery.protocol.routingTable),
|
||||
"codex": {
|
||||
"version": $codexVersion,
|
||||
"revision": $codexRevision
|
||||
}
|
||||
}
|
||||
if cid.isErr:
|
||||
return RestApiResponse.error(
|
||||
Http400,
|
||||
$cid.error())
|
||||
|
||||
return RestApiResponse.response($json, contentType="application/json")
|
||||
var
|
||||
stream: LPStream
|
||||
|
||||
when codex_enable_api_debug_peers:
|
||||
router.api(
|
||||
MethodGet,
|
||||
"/api/codex/v1/debug/peer/{peerId}") do (peerId: PeerId) -> RestApiResponse:
|
||||
var bytes = 0
|
||||
try:
|
||||
without stream =? (await node.retrieve(cid.get())), error:
|
||||
return RestApiResponse.error(Http404, error.msg)
|
||||
|
||||
trace "debug/peer start"
|
||||
without peerRecord =? (await node.findPeer(peerId.get())):
|
||||
trace "debug/peer peer not found!"
|
||||
return RestApiResponse.error(
|
||||
Http400,
|
||||
"Unable to find Peer!")
|
||||
resp.addHeader("Content-Type", "application/octet-stream")
|
||||
await resp.prepareChunked()
|
||||
|
||||
let json = formatPeerRecord(peerRecord)
|
||||
trace "debug/peer returning peer record"
|
||||
return RestApiResponse.response($json)
|
||||
while not stream.atEof:
|
||||
var
|
||||
buff = newSeqUninitialized[byte](DefaultBlockSize.int)
|
||||
len = await stream.readOnce(addr buff[0], buff.len)
|
||||
|
||||
buff.setLen(len)
|
||||
if buff.len <= 0:
|
||||
break
|
||||
|
||||
bytes += buff.len
|
||||
trace "Sending chunk", size = buff.len
|
||||
await resp.sendChunk(addr buff[0], buff.len)
|
||||
await resp.finish()
|
||||
codex_api_downloads.inc()
|
||||
except CatchableError as exc:
|
||||
trace "Excepting streaming blocks", exc = exc.msg
|
||||
return RestApiResponse.error(Http500)
|
||||
finally:
|
||||
trace "Sent bytes", cid = cid.get(), bytes
|
||||
if not stream.isNil:
|
||||
await stream.close()
|
||||
|
||||
proc initSalesApi(node: CodexNodeRef, router: var RestRouter) =
|
||||
router.api(
|
||||
MethodGet,
|
||||
"/api/codex/v1/sales/slots") do () -> RestApiResponse:
|
||||
@ -381,6 +216,46 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
|
||||
return RestApiResponse.response(availability.toJson,
|
||||
contentType="application/json")
|
||||
|
||||
proc initPurchasingApi(node: CodexNodeRef, router: var RestRouter) =
|
||||
router.rawApi(
|
||||
MethodPost,
|
||||
"/api/codex/v1/storage/request/{cid}") do (cid: Cid) -> RestApiResponse:
|
||||
## Create a request for storage
|
||||
##
|
||||
## cid - the cid of a previously uploaded dataset
|
||||
## duration - the duration of the request in seconds
|
||||
## proofProbability - how often storage proofs are required
|
||||
## reward - the maximum amount of tokens paid per second per slot to hosts the client is willing to pay
|
||||
## expiry - timestamp, in seconds, when the request expires if the Request does not find requested amount of nodes to host the data
|
||||
## nodes - minimal number of nodes the content should be stored on
|
||||
## tolerance - allowed number of nodes that can be lost before pronouncing the content lost
|
||||
## colateral - requested collateral from hosts when they fill slot
|
||||
|
||||
without cid =? cid.tryGet.catch, error:
|
||||
return RestApiResponse.error(Http400, error.msg)
|
||||
|
||||
let body = await request.getBody()
|
||||
|
||||
without params =? StorageRequestParams.fromJson(body), error:
|
||||
return RestApiResponse.error(Http400, error.msg)
|
||||
|
||||
let nodes = params.nodes |? 1
|
||||
let tolerance = params.tolerance |? 0
|
||||
|
||||
without purchaseId =? await node.requestStorage(
|
||||
cid,
|
||||
params.duration,
|
||||
params.proofProbability,
|
||||
nodes,
|
||||
tolerance,
|
||||
params.reward,
|
||||
params.collateral,
|
||||
params.expiry), error:
|
||||
|
||||
return RestApiResponse.error(Http500, error.msg)
|
||||
|
||||
return RestApiResponse.response(purchaseId.toHex)
|
||||
|
||||
router.api(
|
||||
MethodGet,
|
||||
"/api/codex/v1/storage/purchases/{id}") do (
|
||||
@ -404,4 +279,114 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
|
||||
|
||||
return RestApiResponse.response($json, contentType="application/json")
|
||||
|
||||
proc initDebugApi(node: CodexNodeRef, conf: CodexConf, router: var RestRouter) =
|
||||
router.api(
|
||||
MethodGet,
|
||||
"/api/codex/v1/debug/info") do () -> RestApiResponse:
|
||||
## Print rudimentary node information
|
||||
##
|
||||
|
||||
let table = RestRoutingTable.init(node.discovery.protocol.routingTable)
|
||||
|
||||
let
|
||||
json = %*{
|
||||
"id": $node.switch.peerInfo.peerId,
|
||||
"addrs": node.switch.peerInfo.addrs.mapIt( $it ),
|
||||
"repo": $conf.dataDir,
|
||||
"spr":
|
||||
if node.discovery.dhtRecord.isSome:
|
||||
node.discovery.dhtRecord.get.toURI
|
||||
else:
|
||||
"",
|
||||
"table": table,
|
||||
"codex": {
|
||||
"version": $codexVersion,
|
||||
"revision": $codexRevision
|
||||
}
|
||||
}
|
||||
|
||||
return RestApiResponse.response($json, contentType="application/json")
|
||||
|
||||
router.api(
|
||||
MethodPost,
|
||||
"/api/codex/v1/debug/chronicles/loglevel") do (
|
||||
level: Option[string]) -> RestApiResponse:
|
||||
## Set log level at run time
|
||||
##
|
||||
## e.g. `chronicles/loglevel?level=DEBUG`
|
||||
##
|
||||
## `level` - chronicles log level
|
||||
##
|
||||
|
||||
without res =? level and level =? res:
|
||||
return RestApiResponse.error(Http400, "Missing log level")
|
||||
|
||||
try:
|
||||
{.gcsafe.}:
|
||||
updateLogLevel(level)
|
||||
except CatchableError as exc:
|
||||
return RestApiResponse.error(Http500, exc.msg)
|
||||
|
||||
return RestApiResponse.response("")
|
||||
|
||||
when codex_enable_api_debug_peers:
|
||||
router.api(
|
||||
MethodGet,
|
||||
"/api/codex/v1/debug/peer/{peerId}") do (peerId: PeerId) -> RestApiResponse:
|
||||
|
||||
trace "debug/peer start"
|
||||
without peerRecord =? (await node.findPeer(peerId.get())):
|
||||
trace "debug/peer peer not found!"
|
||||
return RestApiResponse.error(
|
||||
Http400,
|
||||
"Unable to find Peer!")
|
||||
|
||||
let json = %RestPeerRecord.init(peerRecord)
|
||||
trace "debug/peer returning peer record"
|
||||
return RestApiResponse.response($json)
|
||||
|
||||
proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
|
||||
var router = RestRouter.init(validate)
|
||||
|
||||
initDataApi(node, router)
|
||||
initSalesApi(node, router)
|
||||
initPurchasingApi(node, router)
|
||||
initDebugApi(node, conf, router)
|
||||
|
||||
router.api(
|
||||
MethodGet,
|
||||
"/api/codex/v1/connect/{peerId}") do (
|
||||
peerId: PeerId,
|
||||
addrs: seq[MultiAddress]) -> RestApiResponse:
|
||||
## Connect to a peer
|
||||
##
|
||||
## If `addrs` param is supplied, it will be used to
|
||||
## dial the peer, otherwise the `peerId` is used
|
||||
## to invoke peer discovery, if it succeeds
|
||||
## the returned addresses will be used to dial
|
||||
##
|
||||
## `addrs` the listening addresses of the peers to dial, eg the one specified with `--listen-addrs`
|
||||
##
|
||||
|
||||
if peerId.isErr:
|
||||
return RestApiResponse.error(
|
||||
Http400,
|
||||
$peerId.error())
|
||||
|
||||
let addresses = if addrs.isOk and addrs.get().len > 0:
|
||||
addrs.get()
|
||||
else:
|
||||
without peerRecord =? (await node.findPeer(peerId.get())):
|
||||
return RestApiResponse.error(
|
||||
Http400,
|
||||
"Unable to find Peer!")
|
||||
peerRecord.addresses.mapIt(it.address)
|
||||
try:
|
||||
await node.connect(peerId.get(), addresses)
|
||||
return RestApiResponse.response("Successfully connected to peer")
|
||||
except DialFailedError:
|
||||
return RestApiResponse.error(Http400, "Unable to dial peer")
|
||||
except CatchableError:
|
||||
return RestApiResponse.error(Http400, "Unknown error dialling peer")
|
||||
|
||||
return router
|
||||
|
||||
@ -1,9 +1,14 @@
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/stew/byteutils
|
||||
import pkg/libp2p
|
||||
import pkg/codexdht/discv5/node as dn
|
||||
import pkg/codexdht/discv5/routing_table as rt
|
||||
import ../sales
|
||||
import ../purchasing
|
||||
import ../utils/json
|
||||
import ../units
|
||||
import ../manifest
|
||||
|
||||
export json
|
||||
|
||||
@ -29,9 +34,77 @@ type
|
||||
minPrice* {.serialize.}: UInt256
|
||||
maxCollateral* {.serialize.}: UInt256
|
||||
|
||||
RestContent* = object
|
||||
cid* {.serialize.}: Cid
|
||||
manifest* {.serialize.}: Manifest
|
||||
|
||||
RestNode* = object
|
||||
nodeId* {.serialize.}: NodeId
|
||||
peerId* {.serialize.}: PeerId
|
||||
record* {.serialize.}: SignedPeerRecord
|
||||
address* {.serialize.}: Option[dn.Address]
|
||||
seen* {.serialize.}: bool
|
||||
|
||||
RestRoutingTable* = object
|
||||
localNode* {.serialize.}: RestNode
|
||||
nodes* {.serialize.}: seq[RestNode]
|
||||
|
||||
RestPeerRecord* = object
|
||||
peerId* {.serialize.}: PeerId
|
||||
seqNo* {.serialize.}: uint64
|
||||
addresses* {.serialize.}: seq[AddressInfo]
|
||||
|
||||
proc init*(_: type RestContent, cid: Cid, manifest: Manifest): RestContent =
|
||||
RestContent(
|
||||
cid: cid,
|
||||
manifest: manifest
|
||||
)
|
||||
|
||||
proc init*(_: type RestNode, node: dn.Node): RestNode =
|
||||
RestNode(
|
||||
nodeId: node.id,
|
||||
peerId: node.record.data.peerId,
|
||||
record: node.record,
|
||||
address: node.address,
|
||||
seen: node.seen
|
||||
)
|
||||
|
||||
proc init*(_: type RestRoutingTable, routingTable: rt.RoutingTable): RestRoutingTable =
|
||||
var nodes: seq[RestNode] = @[]
|
||||
for bucket in routingTable.buckets:
|
||||
for node in bucket.nodes:
|
||||
nodes.add(RestNode.init(node))
|
||||
|
||||
RestRoutingTable(
|
||||
localNode: RestNode.init(routingTable.localNode),
|
||||
nodes: nodes
|
||||
)
|
||||
|
||||
proc init*(_: type RestPeerRecord, peerRecord: PeerRecord): RestPeerRecord =
|
||||
RestPeerRecord(
|
||||
peerId: peerRecord.peerId,
|
||||
seqNo: peerRecord.seqNo,
|
||||
addresses: peerRecord.addresses
|
||||
)
|
||||
|
||||
func `%`*(obj: StorageRequest | Slot): JsonNode =
|
||||
let jsonObj = newJObject()
|
||||
for k, v in obj.fieldPairs: jsonObj[k] = %v
|
||||
jsonObj["id"] = %(obj.id)
|
||||
|
||||
return jsonObj
|
||||
|
||||
func `%`*(obj: Cid): JsonNode =
|
||||
% $obj
|
||||
|
||||
func `%`*(obj: PeerId): JsonNode =
|
||||
% $obj
|
||||
|
||||
func `%`*(obj: SignedPeerRecord): JsonNode =
|
||||
% $obj
|
||||
|
||||
func `%`*(obj: dn.Address): JsonNode =
|
||||
% $obj
|
||||
|
||||
func `%`*(obj: AddressInfo): JsonNode =
|
||||
% $obj.address
|
||||
|
||||
@ -108,6 +108,11 @@ method ensureExpiry*(
|
||||
|
||||
return success()
|
||||
|
||||
method listBlocks*(
|
||||
self: NetworkStore,
|
||||
blockType = BlockType.Manifest): Future[?!AsyncIter[?Cid]] =
|
||||
self.localStore.listBlocks(blockType)
|
||||
|
||||
method delBlock*(self: NetworkStore, cid: Cid): Future[?!void] =
|
||||
## Delete a block from the blockstore
|
||||
##
|
||||
|
||||
@ -111,7 +111,7 @@ switch("define", "libp2p_pki_schemes=secp256k1")
|
||||
#TODO this infects everything in this folder, ideally it would only
|
||||
# apply to codex.nim, but since codex.nims is used for other purpose
|
||||
# we can't use it. And codex.cfg doesn't work
|
||||
switch("define", "chronicles_sinks=textlines[dynamic],json[dynamic]")
|
||||
switch("define", "chronicles_sinks=textlines[dynamic],json[dynamic],textlines[dynamic]")
|
||||
|
||||
# begin Nimble config (version 1)
|
||||
when system.fileExists("nimble.paths"):
|
||||
|
||||
@ -110,7 +110,7 @@ We're now ready to upload a file to the network. In this example we'll use node
|
||||
Replace `<FILE PATH>` with the path to the file you want to upload in the following command:
|
||||
|
||||
```bash
|
||||
curl -H "content-type: application/octet-stream" -H "Expect: 100-continue" -T "<FILE PATH>" 127.0.0.1:8080/api/codex/v1/upload -X POST
|
||||
curl -H "content-type: application/octet-stream" -H "Expect: 100-continue" -T "<FILE PATH>" 127.0.0.1:8080/api/codex/v1/content -X POST
|
||||
```
|
||||
|
||||
(Hint: if curl is reluctant to show you the response, add `-o <FILENAME>` to write the result to a file.)
|
||||
@ -122,7 +122,7 @@ Depending on the file size this may take a moment. Codex is processing the file
|
||||
Replace `<CID>` with the identifier returned in the previous step. Replace `<OUTPUT FILE>` with the filename where you want to store the downloaded file.
|
||||
|
||||
```bash
|
||||
curl 127.0.0.1:8081/api/codex/v1/download/zdj7Wfm18wewSWL9SPqddhJuu5ii1TJD39rtt3JbVYdKcqM1K --output <OUTPUT FILE>
|
||||
curl 127.0.0.1:8081/api/codex/v1/content/zdj7Wfm18wewSWL9SPqddhJuu5ii1TJD39rtt3JbVYdKcqM1K --output <OUTPUT FILE>
|
||||
```
|
||||
|
||||
Notice we are connecting to the second node in order to download the file. The CID we provide contains the information needed to locate the file within the network.
|
||||
|
||||
99
openapi.yaml
99
openapi.yaml
@ -205,6 +205,38 @@ components:
|
||||
request:
|
||||
$ref: "#/components/schemas/StorageRequest"
|
||||
|
||||
DataList:
|
||||
type: object
|
||||
properties:
|
||||
content:
|
||||
type: array
|
||||
items:
|
||||
$ref: "#/components/schemas/DataItem"
|
||||
|
||||
DataItem:
|
||||
type: object
|
||||
properties:
|
||||
cid:
|
||||
$ref: "#/components/schemas/Cid"
|
||||
manifest:
|
||||
$ref: "#/components/schemas/ManifestItem"
|
||||
|
||||
ManifestItem:
|
||||
type: object
|
||||
properties:
|
||||
rootHash:
|
||||
$ref: "#/components/schemas/Cid"
|
||||
description: "Root hash of the content"
|
||||
originalBytes:
|
||||
type: number
|
||||
description: "Length of original content in bytes"
|
||||
blockSize:
|
||||
type: number
|
||||
description: "Size of blocks"
|
||||
protected:
|
||||
type: boolean
|
||||
description: "Indicates if content is protected by erasure-coding"
|
||||
|
||||
servers:
|
||||
- url: "http://localhost:8080/api/codex/v1"
|
||||
|
||||
@ -252,9 +284,51 @@ paths:
|
||||
"400":
|
||||
description: Peer either not found or was not possible to dial
|
||||
|
||||
"/download/{cid}":
|
||||
"/local":
|
||||
get:
|
||||
summary: "Download a file from the node in a streaming manner"
|
||||
summary: "Lists manifest CIDs stored locally in node."
|
||||
tags: [ Data ]
|
||||
operationId: listData
|
||||
responses:
|
||||
"200":
|
||||
description: Retrieved list of content CIDs
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: array
|
||||
items:
|
||||
$ref: "#/components/schemas/DataList"
|
||||
"400":
|
||||
description: Invalid CID is specified
|
||||
"404":
|
||||
description: Content specified by the CID is not found
|
||||
"500":
|
||||
description: Well it was bad-bad
|
||||
|
||||
"/data":
|
||||
post:
|
||||
summary: "Upload a file in a streaming manner. Once finished, the file is stored in the node and can be retrieved by any node in the network using the returned CID."
|
||||
tags: [ Data ]
|
||||
operationId: upload
|
||||
requestBody:
|
||||
content:
|
||||
application/octet-stream:
|
||||
schema:
|
||||
type: string
|
||||
format: binary
|
||||
responses:
|
||||
"200":
|
||||
description: CID of uploaded file
|
||||
content:
|
||||
text/plain:
|
||||
schema:
|
||||
type: string
|
||||
"500":
|
||||
description: Well it was bad-bad and the upload did not work out
|
||||
|
||||
"/data/{cid}":
|
||||
get:
|
||||
summary: "Download a file from the node in a streaming manner. If the file is not available locally, it will be retrieved from other nodes in the network if able."
|
||||
tags: [ Data ]
|
||||
operationId: download
|
||||
parameters:
|
||||
@ -280,27 +354,6 @@ paths:
|
||||
"500":
|
||||
description: Well it was bad-bad
|
||||
|
||||
"/upload":
|
||||
post:
|
||||
summary: "Upload a file in a streaming manner"
|
||||
tags: [ Data ]
|
||||
operationId: upload
|
||||
requestBody:
|
||||
content:
|
||||
application/octet-stream:
|
||||
schema:
|
||||
type: string
|
||||
format: binary
|
||||
responses:
|
||||
"200":
|
||||
description: CID of uploaded file
|
||||
content:
|
||||
text/plain:
|
||||
schema:
|
||||
type: string
|
||||
"500":
|
||||
description: Well it was bad-bad and the upload did not work out
|
||||
|
||||
"/sales/slots":
|
||||
get:
|
||||
summary: "Returns active slots"
|
||||
|
||||
@ -68,7 +68,8 @@ ethersuite "Marketplace contracts":
|
||||
switchAccount(host)
|
||||
await waitUntilProofRequired(slotId)
|
||||
let missingPeriod = periodicity.periodOf(await provider.currentTime())
|
||||
await provider.advanceTime(periodicity.seconds)
|
||||
let endOfPeriod = periodicity.periodEnd(missingPeriod)
|
||||
await provider.advanceTimeTo(endOfPeriod + 1)
|
||||
switchAccount(client)
|
||||
await marketplace.markProofAsMissing(slotId, missingPeriod)
|
||||
|
||||
@ -77,7 +78,7 @@ ethersuite "Marketplace contracts":
|
||||
let address = await host.getAddress()
|
||||
await startContract()
|
||||
let requestEnd = await marketplace.requestEnd(request.id)
|
||||
await provider.advanceTimeTo(requestEnd.u256)
|
||||
await provider.advanceTimeTo(requestEnd.u256 + 1)
|
||||
let startBalance = await token.balanceOf(address)
|
||||
await marketplace.freeSlot(slotId)
|
||||
let endBalance = await token.balanceOf(address)
|
||||
|
||||
@ -70,7 +70,7 @@ ethersuite "On-Chain Market":
|
||||
|
||||
test "supports withdrawing of funds":
|
||||
await market.requestStorage(request)
|
||||
await provider.advanceTimeTo(request.expiry)
|
||||
await provider.advanceTimeTo(request.expiry + 1)
|
||||
await market.withdrawFunds(request.id)
|
||||
|
||||
test "supports request subscriptions":
|
||||
@ -213,7 +213,7 @@ ethersuite "On-Chain Market":
|
||||
receivedIds.add(id)
|
||||
let subscription = await market.subscribeRequestCancelled(request.id, onRequestCancelled)
|
||||
|
||||
await provider.advanceTimeTo(request.expiry)
|
||||
await provider.advanceTimeTo(request.expiry + 1)
|
||||
await market.withdrawFunds(request.id)
|
||||
check receivedIds == @[request.id]
|
||||
await subscription.unsubscribe()
|
||||
@ -252,7 +252,7 @@ ethersuite "On-Chain Market":
|
||||
receivedIds.add(requestId)
|
||||
|
||||
let subscription = await market.subscribeRequestCancelled(request.id, onRequestCancelled)
|
||||
await provider.advanceTimeTo(request.expiry) # shares expiry with otherRequest
|
||||
await provider.advanceTimeTo(request.expiry + 1) # shares expiry with otherRequest
|
||||
await market.withdrawFunds(otherRequest.id)
|
||||
check receivedIds.len == 0
|
||||
await market.withdrawFunds(request.id)
|
||||
|
||||
@ -27,7 +27,7 @@ proc setLogLevel*(client: CodexClient, level: string) =
|
||||
assert response.status == "200 OK"
|
||||
|
||||
proc upload*(client: CodexClient, contents: string): ?!Cid =
|
||||
let response = client.http.post(client.baseurl & "/upload", contents)
|
||||
let response = client.http.post(client.baseurl & "/data", contents)
|
||||
assert response.status == "200 OK"
|
||||
Cid.init(response.body).mapFailure
|
||||
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import std/os
|
||||
import std/httpclient
|
||||
import std/strutils
|
||||
from std/net import TimeoutError
|
||||
|
||||
import pkg/chronos
|
||||
@ -37,7 +38,7 @@ ethersuite "Node block expiration tests":
|
||||
|
||||
proc uploadTestFile(): string =
|
||||
let client = newHttpClient()
|
||||
let uploadUrl = baseurl & "/upload"
|
||||
let uploadUrl = baseurl & "/data"
|
||||
let uploadResponse = client.post(uploadUrl, content)
|
||||
check uploadResponse.status == "200 OK"
|
||||
client.close()
|
||||
@ -45,11 +46,18 @@ ethersuite "Node block expiration tests":
|
||||
|
||||
proc downloadTestFile(contentId: string): Response =
|
||||
let client = newHttpClient(timeout=3000)
|
||||
let downloadUrl = baseurl & "/download/" & contentId
|
||||
let downloadUrl = baseurl & "/data/" & contentId
|
||||
let content = client.get(downloadUrl)
|
||||
client.close()
|
||||
content
|
||||
|
||||
proc hasFile(contentId: string): bool =
|
||||
let client = newHttpClient(timeout=3000)
|
||||
let dataLocalUrl = baseurl & "/local"
|
||||
let content = client.get(dataLocalUrl)
|
||||
client.close()
|
||||
return content.body.contains(contentId)
|
||||
|
||||
test "node retains not-expired file":
|
||||
startTestNode(blockTtlSeconds = 10)
|
||||
|
||||
@ -59,6 +67,7 @@ ethersuite "Node block expiration tests":
|
||||
|
||||
let response = downloadTestFile(contentId)
|
||||
check:
|
||||
hasFile(contentId)
|
||||
response.status == "200 OK"
|
||||
response.body == content
|
||||
|
||||
@ -69,5 +78,8 @@ ethersuite "Node block expiration tests":
|
||||
|
||||
await sleepAsync(3.seconds)
|
||||
|
||||
check:
|
||||
not hasFile(contentId)
|
||||
|
||||
expect TimeoutError:
|
||||
discard downloadTestFile(contentId)
|
||||
|
||||
2
vendor/codex-contracts-eth
vendored
2
vendor/codex-contracts-eth
vendored
@ -1 +1 @@
|
||||
Subproject commit 14e453ac3150e6c9ca277e605d5df9389ac7eea7
|
||||
Subproject commit 1854dfba9991a25532de5f6a53cf50e66afb3c8b
|
||||
Loading…
x
Reference in New Issue
Block a user