feat: add c binding (#1322)

Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com>
This commit is contained in:
Arnaud 2025-11-13 11:34:09 +04:00 committed by GitHub
parent be759baf4d
commit bd36032251
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 3862 additions and 72 deletions

View File

@ -232,6 +232,7 @@ format:
$(NPH) *.nim $(NPH) *.nim
$(NPH) codex/ $(NPH) codex/
$(NPH) tests/ $(NPH) tests/
$(NPH) library/
clean-nph: clean-nph:
rm -f $(NPH) rm -f $(NPH)
@ -242,4 +243,32 @@ print-nph-path:
clean: | clean-nph clean: | clean-nph
################
## C Bindings ##
################
.PHONY: libcodex
STATIC ?= 0
ifneq ($(strip $(CODEX_LIB_PARAMS)),)
NIM_PARAMS := $(NIM_PARAMS) $(CODEX_LIB_PARAMS)
endif
libcodex:
$(MAKE) deps
rm -f build/libcodex*
ifeq ($(STATIC), 1)
echo -e $(BUILD_MSG) "build/$@.a" && \
$(ENV_SCRIPT) nim libcodexStatic $(NIM_PARAMS) -d:LeopardCmakeFlags="\"-DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_BUILD_TYPE=Release\"" codex.nims
else ifeq ($(detected_OS),Windows)
echo -e $(BUILD_MSG) "build/$@.dll" && \
$(ENV_SCRIPT) nim libcodexDynamic $(NIM_PARAMS) -d:LeopardCmakeFlags="\"-G \\\"MSYS Makefiles\\\" -DCMAKE_BUILD_TYPE=Release\"" codex.nims
else ifeq ($(detected_OS),macOS)
echo -e $(BUILD_MSG) "build/$@.dylib" && \
$(ENV_SCRIPT) nim libcodexDynamic $(NIM_PARAMS) -d:LeopardCmakeFlags="\"-DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_BUILD_TYPE=Release\"" codex.nims
else
echo -e $(BUILD_MSG) "build/$@.so" && \
$(ENV_SCRIPT) nim libcodexDynamic $(NIM_PARAMS) -d:LeopardCmakeFlags="\"-DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_BUILD_TYPE=Release\"" codex.nims
endif
endif # "variables.mk" was not included endif # "variables.mk" was not included

View File

@ -53,6 +53,56 @@ To get acquainted with Codex, consider:
The client exposes a REST API that can be used to interact with the clients. Overview of the API can be found on [api.codex.storage](https://api.codex.storage). The client exposes a REST API that can be used to interact with the clients. Overview of the API can be found on [api.codex.storage](https://api.codex.storage).
## Bindings
Codex provides a C API that can be wrapped by other languages. The bindings is located in the `library` folder.
Currently, only a Go binding is included.
### Build the C library
```bash
make libcodex
```
This produces the shared library under `build/`.
### Run the Go example
Build the Go example:
```bash
go build -o codex-go examples/golang/codex.go
```
Export the library path:
```bash
export LD_LIBRARY_PATH=build
```
Run the example:
```bash
./codex-go
```
### Static vs Dynamic build
By default, Codex builds a dynamic library (`libcodex.so`), which you can load at runtime.
If you prefer a static library (`libcodex.a`), set the `STATIC` flag:
```bash
# Build dynamic (default)
make libcodex
# Build static
make STATIC=1 libcodex
```
### Limitation
Callbacks must be fast and non-blocking; otherwise, the working thread will hang and prevent other requests from being processed.
## Contributing and development ## Contributing and development
Feel free to dive in, contributions are welcomed! Open an issue or submit PRs. Feel free to dive in, contributions are welcomed! Open an issue or submit PRs.

View File

@ -25,6 +25,30 @@ proc buildBinary(name: string, srcDir = "./", params = "", lang = "c") =
exec(cmd) exec(cmd)
proc buildLibrary(name: string, srcDir = "./", params = "", `type` = "dynamic") =
if not dirExists "build":
mkDir "build"
if `type` == "dynamic":
let lib_name = (
when defined(windows): name & ".dll"
elif defined(macosx): name & ".dylib"
else: name & ".so"
)
exec "nim c" & " --out:build/" & lib_name &
" --threads:on --app:lib --opt:size --noMain --mm:refc --header --d:metrics " &
"--nimMainPrefix:libcodex -d:noSignalHandler " &
"-d:LeopardExtraCompilerFlags=-fPIC " & "-d:chronicles_runtime_filtering " &
"-d:chronicles_log_level=TRACE " & params & " " & srcDir & name & ".nim"
else:
exec "nim c" & " --out:build/" & name &
".a --threads:on --app:staticlib --opt:size --noMain --mm:refc --header --d:metrics " &
"--nimMainPrefix:libcodex -d:noSignalHandler " &
"-d:LeopardExtraCompilerFlags=-fPIC " &
"-d:chronicles_runtime_filtering " &
"-d:chronicles_log_level=TRACE " &
params & " " & srcDir & name & ".nim"
proc test(name: string, srcDir = "tests/", params = "", lang = "c") = proc test(name: string, srcDir = "tests/", params = "", lang = "c") =
buildBinary name, srcDir, params buildBinary name, srcDir, params
exec "build/" & name exec "build/" & name
@ -121,3 +145,23 @@ task showCoverage, "open coverage html":
echo " ======== Opening HTML coverage report in browser... ======== " echo " ======== Opening HTML coverage report in browser... ======== "
if findExe("open") != "": if findExe("open") != "":
exec("open coverage/report/index.html") exec("open coverage/report/index.html")
task libcodexDynamic, "Generate bindings":
var params = ""
when compiles(commandLineParams):
for param in commandLineParams():
if param.len > 0 and param.startsWith("-"):
params.add " " & param
let name = "libcodex"
buildLibrary name, "library/", params, "dynamic"
task libcodexStatic, "Generate bindings":
var params = ""
when compiles(commandLineParams):
for param in commandLineParams():
if param.len > 0 and param.startsWith("-"):
params.add " " & param
let name = "libcodex"
buildLibrary name, "library/", params, "static"

View File

@ -54,6 +54,16 @@ when isMainModule:
, ,
) )
config.setupLogging() config.setupLogging()
try:
updateLogLevel(config.logLevel)
except ValueError as err:
try:
stderr.write "Invalid value for --log-level. " & err.msg & "\n"
except IOError:
echo "Invalid value for --log-level. " & err.msg
quit QuitFailure
config.setupMetrics() config.setupMetrics()
if not (checkAndCreateDataDir((config.dataDir).string)): if not (checkAndCreateDataDir((config.dataDir).string)):
@ -94,7 +104,7 @@ when isMainModule:
## Ctrl+C handling ## Ctrl+C handling
proc doShutdown() = proc doShutdown() =
shutdown = server.stop() shutdown = server.shutdown()
state = CodexStatus.Stopping state = CodexStatus.Stopping
notice "Stopping Codex" notice "Stopping Codex"

View File

@ -1,3 +1,5 @@
{.push raises: [].}
import pkg/chronos import pkg/chronos
import pkg/stew/endians2 import pkg/stew/endians2
import pkg/upraises import pkg/upraises
@ -11,7 +13,9 @@ type
method now*(clock: Clock): SecondsSince1970 {.base, gcsafe, upraises: [].} = method now*(clock: Clock): SecondsSince1970 {.base, gcsafe, upraises: [].} =
raiseAssert "not implemented" raiseAssert "not implemented"
method waitUntil*(clock: Clock, time: SecondsSince1970) {.base, async.} = method waitUntil*(
clock: Clock, time: SecondsSince1970
) {.base, async: (raises: [CancelledError]).} =
raiseAssert "not implemented" raiseAssert "not implemented"
method start*(clock: Clock) {.base, async.} = method start*(clock: Clock) {.base, async.} =

View File

@ -57,10 +57,20 @@ type
repoStore: RepoStore repoStore: RepoStore
maintenance: BlockMaintainer maintenance: BlockMaintainer
taskpool: Taskpool taskpool: Taskpool
isStarted: bool
CodexPrivateKey* = libp2p.PrivateKey # alias CodexPrivateKey* = libp2p.PrivateKey # alias
EthWallet = ethers.Wallet EthWallet = ethers.Wallet
func config*(self: CodexServer): CodexConf =
return self.config
func node*(self: CodexServer): CodexNodeRef =
return self.codexNode
func repoStore*(self: CodexServer): RepoStore =
return self.repoStore
proc waitForSync(provider: Provider): Future[void] {.async.} = proc waitForSync(provider: Provider): Future[void] {.async.} =
var sleepTime = 1 var sleepTime = 1
trace "Checking sync state of Ethereum provider..." trace "Checking sync state of Ethereum provider..."
@ -159,9 +169,13 @@ proc bootstrapInteractions(s: CodexServer): Future[void] {.async.} =
s.codexNode.contracts = (client, host, validator) s.codexNode.contracts = (client, host, validator)
proc start*(s: CodexServer) {.async.} = proc start*(s: CodexServer) {.async.} =
trace "Starting codex node", config = $s.config if s.isStarted:
warn "Codex server already started, skipping"
return
trace "Starting codex node", config = $s.config
await s.repoStore.start() await s.repoStore.start()
s.maintenance.start() s.maintenance.start()
await s.codexNode.switch.start() await s.codexNode.switch.start()
@ -175,27 +189,55 @@ proc start*(s: CodexServer) {.async.} =
await s.bootstrapInteractions() await s.bootstrapInteractions()
await s.codexNode.start() await s.codexNode.start()
s.restServer.start()
if s.restServer != nil:
s.restServer.start()
s.isStarted = true
proc stop*(s: CodexServer) {.async.} = proc stop*(s: CodexServer) {.async.} =
if not s.isStarted:
warn "Codex is not started"
return
notice "Stopping codex node" notice "Stopping codex node"
let res = await noCancel allFinishedFailed[void]( var futures =
@[ @[
s.restServer.stop(),
s.codexNode.switch.stop(), s.codexNode.switch.stop(),
s.codexNode.stop(), s.codexNode.stop(),
s.repoStore.stop(), s.repoStore.stop(),
s.maintenance.stop(), s.maintenance.stop(),
] ]
)
if s.restServer != nil:
futures.add(s.restServer.stop())
let res = await noCancel allFinishedFailed[void](futures)
if res.failure.len > 0: if res.failure.len > 0:
error "Failed to stop codex node", failures = res.failure.len error "Failed to stop codex node", failures = res.failure.len
raiseAssert "Failed to stop codex node" raiseAssert "Failed to stop codex node"
proc close*(s: CodexServer) {.async.} =
var futures = @[s.codexNode.close(), s.repoStore.close()]
let res = await noCancel allFinishedFailed[void](futures)
if not s.taskpool.isNil: if not s.taskpool.isNil:
s.taskpool.shutdown() try:
s.taskpool.shutdown()
except Exception as exc:
error "Failed to stop the taskpool", failures = res.failure.len
raiseAssert("Failure in taskpool shutdown:" & exc.msg)
if res.failure.len > 0:
error "Failed to close codex node", failures = res.failure.len
raiseAssert "Failed to close codex node"
proc shutdown*(server: CodexServer) {.async.} =
await server.stop()
await server.close()
proc new*( proc new*(
T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey
@ -295,7 +337,7 @@ proc new*(
) )
peerStore = PeerCtxStore.new() peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new() pendingBlocks = PendingBlocksManager.new(retries = config.blockRetries)
advertiser = Advertiser.new(repoStore, discovery) advertiser = Advertiser.new(repoStore, discovery)
blockDiscovery = blockDiscovery =
DiscoveryEngine.new(repoStore, peerStore, network, discovery, pendingBlocks) DiscoveryEngine.new(repoStore, peerStore, network, discovery, pendingBlocks)
@ -320,10 +362,13 @@ proc new*(
taskPool = taskpool, taskPool = taskpool,
) )
var restServer: RestServerRef = nil
if config.apiBindAddress.isSome:
restServer = RestServerRef restServer = RestServerRef
.new( .new(
codexNode.initRestApi(config, repoStore, config.apiCorsAllowedOrigin), codexNode.initRestApi(config, repoStore, config.apiCorsAllowedOrigin),
initTAddress(config.apiBindAddress, config.apiPort), initTAddress(config.apiBindAddress.get(), config.apiPort),
bufferSize = (1024 * 64), bufferSize = (1024 * 64),
maxRequestBodySize = int.high, maxRequestBodySize = int.high,
) )

View File

@ -34,6 +34,7 @@ import pkg/libp2p
import pkg/ethers import pkg/ethers
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import pkg/stew/base64
import ./codextypes import ./codextypes
import ./discovery import ./discovery
@ -46,13 +47,14 @@ import ./utils/natutils
from ./contracts/config import DefaultRequestCacheSize, DefaultMaxPriorityFeePerGas from ./contracts/config import DefaultRequestCacheSize, DefaultMaxPriorityFeePerGas
from ./validationconfig import MaxSlots, ValidationGroups from ./validationconfig import MaxSlots, ValidationGroups
from ./blockexchange/engine/pendingblocks import DefaultBlockRetries
export units, net, codextypes, logutils, completeCmdArg, parseCmdArg, NatConfig export units, net, codextypes, logutils, completeCmdArg, parseCmdArg, NatConfig
export ValidationGroups, MaxSlots export ValidationGroups, MaxSlots
export export
DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockInterval, DefaultNumBlocksPerInterval, DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockInterval, DefaultNumBlocksPerInterval,
DefaultRequestCacheSize, DefaultMaxPriorityFeePerGas DefaultRequestCacheSize, DefaultMaxPriorityFeePerGas, DefaultBlockRetries
type ThreadCount* = distinct Natural type ThreadCount* = distinct Natural
@ -202,8 +204,10 @@ type
.}: string .}: string
apiBindAddress* {. apiBindAddress* {.
desc: "The REST API bind address", defaultValue: "127.0.0.1", name: "api-bindaddr" desc: "The REST API bind address",
.}: string defaultValue: "127.0.0.1".some,
name: "api-bindaddr"
.}: Option[string]
apiPort* {. apiPort* {.
desc: "The REST Api port", desc: "The REST Api port",
@ -261,6 +265,13 @@ type
name: "block-mn" name: "block-mn"
.}: int .}: int
blockRetries* {.
desc: "Number of times to retry fetching a block before giving up",
defaultValue: DefaultBlockRetries,
defaultValueDesc: $DefaultBlockRetries,
name: "block-retries"
.}: int
cacheSize* {. cacheSize* {.
desc: desc:
"The size of the block cache, 0 disables the cache - " & "The size of the block cache, 0 disables the cache - " &
@ -474,7 +485,7 @@ func prover*(self: CodexConf): bool =
self.persistence and self.persistenceCmd == PersistenceCmd.prover self.persistence and self.persistenceCmd == PersistenceCmd.prover
proc getCodexVersion(): string = proc getCodexVersion(): string =
let tag = strip(staticExec("git tag")) let tag = strip(staticExec("git describe --tags --abbrev=0"))
if tag.isEmptyOrWhitespace: if tag.isEmptyOrWhitespace:
return "untagged build" return "untagged build"
return tag return tag
@ -510,55 +521,73 @@ proc parseCmdArg*(
if res.isOk: if res.isOk:
ma = res.get() ma = res.get()
else: else:
warn "Invalid MultiAddress", input = input, error = res.error() fatal "Invalid MultiAddress", input = input, error = res.error()
quit QuitFailure quit QuitFailure
except LPError as exc: except LPError as exc:
warn "Invalid MultiAddress uri", uri = input, error = exc.msg fatal "Invalid MultiAddress uri", uri = input, error = exc.msg
quit QuitFailure quit QuitFailure
ma ma
proc parseCmdArg*(T: type ThreadCount, input: string): T {.upraises: [ValueError].} = proc parse*(T: type ThreadCount, p: string): Result[ThreadCount, string] =
let count = parseInt(input) try:
if count != 0 and count < 2: let count = parseInt(p)
warn "Invalid number of threads", input = input if count != 0 and count < 2:
quit QuitFailure return err("Invalid number of threads: " & p)
ThreadCount(count) return ok(ThreadCount(count))
except ValueError as e:
return err("Invalid number of threads: " & p & ", error=" & e.msg)
proc parseCmdArg*(T: type SignedPeerRecord, uri: string): T = proc parseCmdArg*(T: type ThreadCount, input: string): T =
let val = ThreadCount.parse(input)
if val.isErr:
fatal "Cannot parse the thread count.", input = input, error = val.error()
quit QuitFailure
return val.get()
proc parse*(T: type SignedPeerRecord, p: string): Result[SignedPeerRecord, string] =
var res: SignedPeerRecord var res: SignedPeerRecord
try: try:
if not res.fromURI(uri): if not res.fromURI(p):
warn "Invalid SignedPeerRecord uri", uri = uri return err("The uri is not a valid SignedPeerRecord: " & p)
quit QuitFailure return ok(res)
except LPError as exc: except LPError, Base64Error:
warn "Invalid SignedPeerRecord uri", uri = uri, error = exc.msg let e = getCurrentException()
quit QuitFailure return err(e.msg)
except CatchableError as exc:
warn "Invalid SignedPeerRecord uri", uri = uri, error = exc.msg
quit QuitFailure
res
func parseCmdArg*(T: type NatConfig, p: string): T {.raises: [ValueError].} = proc parseCmdArg*(T: type SignedPeerRecord, uri: string): T =
let res = SignedPeerRecord.parse(uri)
if res.isErr:
fatal "Cannot parse the signed peer.", error = res.error(), input = uri
quit QuitFailure
return res.get()
func parse*(T: type NatConfig, p: string): Result[NatConfig, string] =
case p.toLowerAscii case p.toLowerAscii
of "any": of "any":
NatConfig(hasExtIp: false, nat: NatStrategy.NatAny) return ok(NatConfig(hasExtIp: false, nat: NatStrategy.NatAny))
of "none": of "none":
NatConfig(hasExtIp: false, nat: NatStrategy.NatNone) return ok(NatConfig(hasExtIp: false, nat: NatStrategy.NatNone))
of "upnp": of "upnp":
NatConfig(hasExtIp: false, nat: NatStrategy.NatUpnp) return ok(NatConfig(hasExtIp: false, nat: NatStrategy.NatUpnp))
of "pmp": of "pmp":
NatConfig(hasExtIp: false, nat: NatStrategy.NatPmp) return ok(NatConfig(hasExtIp: false, nat: NatStrategy.NatPmp))
else: else:
if p.startsWith("extip:"): if p.startsWith("extip:"):
try: try:
let ip = parseIpAddress(p[6 ..^ 1]) let ip = parseIpAddress(p[6 ..^ 1])
NatConfig(hasExtIp: true, extIp: ip) return ok(NatConfig(hasExtIp: true, extIp: ip))
except ValueError: except ValueError:
let error = "Not a valid IP address: " & p[6 ..^ 1] let error = "Not a valid IP address: " & p[6 ..^ 1]
raise newException(ValueError, error) return err(error)
else: else:
let error = "Not a valid NAT option: " & p return err("Not a valid NAT option: " & p)
raise newException(ValueError, error)
proc parseCmdArg*(T: type NatConfig, p: string): T =
let res = NatConfig.parse(p)
if res.isErr:
fatal "Cannot parse the NAT config.", error = res.error(), input = p
quit QuitFailure
return res.get()
proc completeCmdArg*(T: type NatConfig, val: string): seq[string] = proc completeCmdArg*(T: type NatConfig, val: string): seq[string] =
return @[] return @[]
@ -566,19 +595,25 @@ proc completeCmdArg*(T: type NatConfig, val: string): seq[string] =
proc parseCmdArg*(T: type EthAddress, address: string): T = proc parseCmdArg*(T: type EthAddress, address: string): T =
EthAddress.init($address).get() EthAddress.init($address).get()
proc parseCmdArg*(T: type NBytes, val: string): T = func parse*(T: type NBytes, p: string): Result[NBytes, string] =
var num = 0'i64 var num = 0'i64
let count = parseSize(val, num, alwaysBin = true) let count = parseSize(p, num, alwaysBin = true)
if count == 0: if count == 0:
warn "Invalid number of bytes", nbytes = val return err("Invalid number of bytes: " & p)
return ok(NBytes(num))
proc parseCmdArg*(T: type NBytes, val: string): T =
let res = NBytes.parse(val)
if res.isErr:
fatal "Cannot parse NBytes.", error = res.error(), input = val
quit QuitFailure quit QuitFailure
NBytes(num) return res.get()
proc parseCmdArg*(T: type Duration, val: string): T = proc parseCmdArg*(T: type Duration, val: string): T =
var dur: Duration var dur: Duration
let count = parseDuration(val, dur) let count = parseDuration(val, dur)
if count == 0: if count == 0:
warn "Cannot parse duration", dur = dur fatal "Cannot parse duration", dur = dur
quit QuitFailure quit QuitFailure
dur dur
@ -595,7 +630,7 @@ proc readValue*(r: var TomlReader, val: var SignedPeerRecord) =
try: try:
val = SignedPeerRecord.parseCmdArg(uri) val = SignedPeerRecord.parseCmdArg(uri)
except LPError as err: except LPError as err:
warn "Invalid SignedPeerRecord uri", uri = uri, error = err.msg fatal "Invalid SignedPeerRecord uri", uri = uri, error = err.msg
quit QuitFailure quit QuitFailure
proc readValue*(r: var TomlReader, val: var MultiAddress) = proc readValue*(r: var TomlReader, val: var MultiAddress) =
@ -607,7 +642,7 @@ proc readValue*(r: var TomlReader, val: var MultiAddress) =
if res.isOk: if res.isOk:
val = res.get() val = res.get()
else: else:
warn "Invalid MultiAddress", input = input, error = res.error() fatal "Invalid MultiAddress", input = input, error = res.error()
quit QuitFailure quit QuitFailure
proc readValue*( proc readValue*(
@ -779,15 +814,6 @@ proc setupLogging*(conf: CodexConf) =
else: else:
defaultChroniclesStream.outputs[0].writer = writer defaultChroniclesStream.outputs[0].writer = writer
try:
updateLogLevel(conf.logLevel)
except ValueError as err:
try:
stderr.write "Invalid value for --log-level. " & err.msg & "\n"
except IOError:
echo "Invalid value for --log-level. " & err.msg
quit QuitFailure
proc setupMetrics*(config: CodexConf) = proc setupMetrics*(config: CodexConf) =
if config.metricsEnabled: if config.metricsEnabled:
let metricsAddress = config.metricsAddress let metricsAddress = config.metricsAddress

View File

@ -1,3 +1,5 @@
{.push raises: [].}
import std/times import std/times
import pkg/ethers import pkg/ethers
import pkg/questionable import pkg/questionable
@ -72,7 +74,9 @@ method now*(clock: OnChainClock): SecondsSince1970 =
doAssert clock.started, "clock should be started before calling now()" doAssert clock.started, "clock should be started before calling now()"
return toUnix(getTime() + clock.offset) return toUnix(getTime() + clock.offset)
method waitUntil*(clock: OnChainClock, time: SecondsSince1970) {.async.} = method waitUntil*(
clock: OnChainClock, time: SecondsSince1970
) {.async: (raises: [CancelledError]).} =
while (let difference = time - clock.now(); difference > 0): while (let difference = time - clock.now(); difference > 0):
clock.newBlock.clear() clock.newBlock.clear()
discard await clock.newBlock.wait().withTimeout(chronos.seconds(difference)) discard await clock.newBlock.wait().withTimeout(chronos.seconds(difference))

View File

@ -43,6 +43,7 @@ type Discovery* = ref object of RootObj
# record to advertice node connection information, this carry any # record to advertice node connection information, this carry any
# address that the node can be connected on # address that the node can be connected on
dhtRecord*: ?SignedPeerRecord # record to advertice DHT connection information dhtRecord*: ?SignedPeerRecord # record to advertice DHT connection information
isStarted: bool
proc toNodeId*(cid: Cid): NodeId = proc toNodeId*(cid: Cid): NodeId =
## Cid to discovery id ## Cid to discovery id
@ -203,10 +204,15 @@ proc start*(d: Discovery) {.async: (raises: []).} =
try: try:
d.protocol.open() d.protocol.open()
await d.protocol.start() await d.protocol.start()
d.isStarted = true
except CatchableError as exc: except CatchableError as exc:
error "Error starting discovery", exc = exc.msg error "Error starting discovery", exc = exc.msg
proc stop*(d: Discovery) {.async: (raises: []).} = proc stop*(d: Discovery) {.async: (raises: []).} =
if not d.isStarted:
warn "Discovery not started, skipping stop"
return
try: try:
await noCancel d.protocol.closeWait() await noCancel d.protocol.closeWait()
except CatchableError as exc: except CatchableError as exc:

View File

@ -84,6 +84,7 @@ type
BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {. BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {.
gcsafe, async: (raises: [CancelledError]) gcsafe, async: (raises: [CancelledError])
.} .}
OnBlockStoredProc = proc(chunk: seq[byte]): void {.gcsafe, raises: [].}
func switch*(self: CodexNodeRef): Switch = func switch*(self: CodexNodeRef): Switch =
return self.switch return self.switch
@ -434,6 +435,7 @@ proc store*(
filename: ?string = string.none, filename: ?string = string.none,
mimetype: ?string = string.none, mimetype: ?string = string.none,
blockSize = DefaultBlockSize, blockSize = DefaultBlockSize,
onBlockStored: OnBlockStoredProc = nil,
): Future[?!Cid] {.async.} = ): Future[?!Cid] {.async.} =
## Save stream contents as dataset with given blockSize ## Save stream contents as dataset with given blockSize
## to nodes's BlockStore, and return Cid of its manifest ## to nodes's BlockStore, and return Cid of its manifest
@ -463,6 +465,9 @@ proc store*(
if err =? (await self.networkStore.putBlock(blk)).errorOption: if err =? (await self.networkStore.putBlock(blk)).errorOption:
error "Unable to store block", cid = blk.cid, err = err.msg error "Unable to store block", cid = blk.cid, err = err.msg
return failure(&"Unable to store block {blk.cid}") return failure(&"Unable to store block {blk.cid}")
if not onBlockStored.isNil:
onBlockStored(chunk)
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
@ -902,6 +907,7 @@ proc stop*(self: CodexNodeRef) {.async.} =
if not self.clock.isNil: if not self.clock.isNil:
await self.clock.stop() await self.clock.stop()
proc close*(self: CodexNodeRef) {.async.} =
if not self.networkStore.isNil: if not self.networkStore.isNil:
await self.networkStore.close await self.networkStore.close

View File

@ -443,7 +443,6 @@ proc start*(
): Future[void] {.async: (raises: [CancelledError, CodexError]).} = ): Future[void] {.async: (raises: [CancelledError, CodexError]).} =
## Start repo ## Start repo
## ##
if self.started: if self.started:
trace "Repo already started" trace "Repo already started"
return return
@ -465,6 +464,5 @@ proc stop*(self: RepoStore): Future[void] {.async: (raises: []).} =
return return
trace "Stopping repo" trace "Stopping repo"
await self.close()
self.started = false self.started = false

View File

@ -80,7 +80,7 @@ proc removeSlotsThatHaveEnded(validation: Validation) {.async.} =
proc markProofAsMissing( proc markProofAsMissing(
validation: Validation, slotId: SlotId, period: Period validation: Validation, slotId: SlotId, period: Period
) {.async.} = ) {.async: (raises: [CancelledError]).} =
logScope: logScope:
currentPeriod = validation.getCurrentPeriod() currentPeriod = validation.getCurrentPeriod()
@ -91,18 +91,18 @@ proc markProofAsMissing(
else: else:
let inDowntime {.used.} = await validation.market.inDowntime(slotId) let inDowntime {.used.} = await validation.market.inDowntime(slotId)
trace "Proof not missing", checkedPeriod = period, inDowntime trace "Proof not missing", checkedPeriod = period, inDowntime
except CancelledError: except CancelledError as e:
raise raise e
except CatchableError as e: except CatchableError as e:
error "Marking proof as missing failed", msg = e.msg error "Marking proof as missing failed", msg = e.msg
proc markProofsAsMissing(validation: Validation) {.async.} = proc markProofsAsMissing(validation: Validation) {.async: (raises: [CancelledError]).} =
let slots = validation.slots let slots = validation.slots
for slotId in slots: for slotId in slots:
let previousPeriod = validation.getCurrentPeriod() - 1 let previousPeriod = validation.getCurrentPeriod() - 1
await validation.markProofAsMissing(slotId, previousPeriod) await validation.markProofAsMissing(slotId, previousPeriod)
proc run(validation: Validation) {.async: (raises: []).} = proc run(validation: Validation) {.async: (raises: [CancelledError]).} =
trace "Validation started" trace "Validation started"
try: try:
while true: while true:

24
examples/golang/README.md Normal file
View File

@ -0,0 +1,24 @@
## Pre-requisite
libcodex.so is needed to be compiled and present in build folder.
## Compilation
From the codex root folder:
```code
go build -o codex-go examples/golang/codex.go
```
## Run
From the codex root folder:
```code
export LD_LIBRARY_PATH=build
```
```code
./codex-go
```

885
examples/golang/codex.go Normal file
View File

@ -0,0 +1,885 @@
package main
/*
#cgo LDFLAGS: -L../../build/ -lcodex
#cgo LDFLAGS: -L../../ -Wl,-rpath,../../
#include <stdbool.h>
#include <stdlib.h>
#include "../../library/libcodex.h"
typedef struct {
int ret;
char* msg;
size_t len;
uintptr_t h;
} Resp;
static void* allocResp(uintptr_t h) {
Resp* r = (Resp*)calloc(1, sizeof(Resp));
r->h = h;
return r;
}
static void freeResp(void* resp) {
if (resp != NULL) {
free(resp);
}
}
static int getRet(void* resp) {
if (resp == NULL) {
return 0;
}
Resp* m = (Resp*) resp;
return m->ret;
}
void libcodexNimMain(void);
static void codex_host_init_once(void){
static int done;
if (!__atomic_exchange_n(&done, 1, __ATOMIC_SEQ_CST)) libcodexNimMain();
}
// resp must be set != NULL in case interest on retrieving data from the callback
void callback(int ret, char* msg, size_t len, void* resp);
static void* cGoCodexNew(const char* configJson, void* resp) {
void* ret = codex_new(configJson, (CodexCallback) callback, resp);
return ret;
}
static int cGoCodexStart(void* codexCtx, void* resp) {
return codex_start(codexCtx, (CodexCallback) callback, resp);
}
static int cGoCodexStop(void* codexCtx, void* resp) {
return codex_stop(codexCtx, (CodexCallback) callback, resp);
}
static int cGoCodexClose(void* codexCtx, void* resp) {
return codex_close(codexCtx, (CodexCallback) callback, resp);
}
static int cGoCodexDestroy(void* codexCtx, void* resp) {
return codex_destroy(codexCtx, (CodexCallback) callback, resp);
}
static int cGoCodexVersion(void* codexCtx, void* resp) {
return codex_version(codexCtx, (CodexCallback) callback, resp);
}
static int cGoCodexRevision(void* codexCtx, void* resp) {
return codex_revision(codexCtx, (CodexCallback) callback, resp);
}
static int cGoCodexRepo(void* codexCtx, void* resp) {
return codex_repo(codexCtx, (CodexCallback) callback, resp);
}
static int cGoCodexSpr(void* codexCtx, void* resp) {
return codex_spr(codexCtx, (CodexCallback) callback, resp);
}
static int cGoCodexPeerId(void* codexCtx, void* resp) {
return codex_peer_id(codexCtx, (CodexCallback) callback, resp);
}
static int cGoCodexUploadInit(void* codexCtx, char* filepath, size_t chunkSize, void* resp) {
return codex_upload_init(codexCtx, filepath, chunkSize, (CodexCallback) callback, resp);
}
static int cGoCodexUploadChunk(void* codexCtx, char* sessionId, const uint8_t* chunk, size_t len, void* resp) {
return codex_upload_chunk(codexCtx, sessionId, chunk, len, (CodexCallback) callback, resp);
}
static int cGoCodexUploadFinalize(void* codexCtx, char* sessionId, void* resp) {
return codex_upload_finalize(codexCtx, sessionId, (CodexCallback) callback, resp);
}
static int cGoCodexUploadCancel(void* codexCtx, char* sessionId, void* resp) {
return codex_upload_cancel(codexCtx, sessionId, (CodexCallback) callback, resp);
}
static int cGoCodexUploadFile(void* codexCtx, char* sessionId, void* resp) {
return codex_upload_file(codexCtx, sessionId, (CodexCallback) callback, resp);
}
static int cGoCodexLogLevel(void* codexCtx, char* logLevel, void* resp) {
return codex_log_level(codexCtx, logLevel, (CodexCallback) callback, resp);
}
static int cGoCodexExists(void* codexCtx, char* cid, void* resp) {
return codex_storage_exists(codexCtx, cid, (CodexCallback) callback, resp);
}
*/
import "C"
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"os"
"os/signal"
"runtime/cgo"
"sync"
"syscall"
"unsafe"
)
type LogFormat string
const (
LogFormatAuto LogFormat = "auto"
LogFormatColors LogFormat = "colors"
LogFormatNoColors LogFormat = "nocolors"
LogFormatJSON LogFormat = "json"
)
type RepoKind string
const (
FS RepoKind = "fs"
SQLite RepoKind = "sqlite"
LevelDb RepoKind = "leveldb"
)
const defaultBlockSize = 1024 * 64
type Config struct {
// Default: INFO
LogLevel string `json:"log-level,omitempty"`
// Specifies what kind of logs should be written to stdout
// Default: auto
LogFormat LogFormat `json:"log-format,omitempty"`
// Enable the metrics server
// Default: false
MetricsEnabled bool `json:"metrics,omitempty"`
// Listening address of the metrics server
// Default: 127.0.0.1
MetricsAddress string `json:"metrics-address,omitempty"`
// Listening HTTP port of the metrics server
// Default: 8008
MetricsPort int `json:"metrics-port,omitempty"`
// The directory where codex will store configuration and data
// Default:
// $HOME\AppData\Roaming\Codex on Windows
// $HOME/Library/Application Support/Codex on macOS
// $HOME/.cache/codex on Linux
DataDir string `json:"data-dir,omitempty"`
// Multi Addresses to listen on
// Default: ["/ip4/0.0.0.0/tcp/0"]
ListenAddrs []string `json:"listen-addrs,omitempty"`
// Specify method to use for determining public address.
// Must be one of: any, none, upnp, pmp, extip:<IP>
// Default: any
Nat string `json:"nat,omitempty"`
// Discovery (UDP) port
// Default: 8090
DiscoveryPort int `json:"disc-port,omitempty"`
// Source of network (secp256k1) private key file path or name
// Default: "key"
NetPrivKeyFile string `json:"net-privkey,omitempty"`
// Specifies one or more bootstrap nodes to use when connecting to the network.
BootstrapNodes []string `json:"bootstrap-node,omitempty"`
// The maximum number of peers to connect to.
// Default: 160
MaxPeers int `json:"max-peers,omitempty"`
// Number of worker threads (\"0\" = use as many threads as there are CPU cores available)
// Default: 0
NumThreads int `json:"num-threads,omitempty"`
// Node agent string which is used as identifier in network
// Default: "Codex"
AgentString string `json:"agent-string,omitempty"`
// Backend for main repo store (fs, sqlite, leveldb)
// Default: fs
RepoKind RepoKind `json:"repo-kind,omitempty"`
// The size of the total storage quota dedicated to the node
// Default: 20 GiBs
StorageQuota int `json:"storage-quota,omitempty"`
// Default block timeout in seconds - 0 disables the ttl
// Default: 30 days
BlockTtl int `json:"block-ttl,omitempty"`
// Time interval in seconds - determines frequency of block
// maintenance cycle: how often blocks are checked for expiration and cleanup
// Default: 10 minutes
BlockMaintenanceInterval int `json:"block-mi,omitempty"`
// Number of blocks to check every maintenance cycle
// Default: 1000
BlockMaintenanceNumberOfBlocks int `json:"block-mn,omitempty"`
// Number of times to retry fetching a block before giving up
// Default: 3000
BlockRetries int `json:"block-retries,omitempty"`
// The size of the block cache, 0 disables the cache -
// might help on slow hardrives
// Default: 0
CacheSize int `json:"cache-size,omitempty"`
// Default: "" (no log file)
LogFile string `json:"log-file,omitempty"`
}
type CodexNode struct {
ctx unsafe.Pointer
}
type ChunkSize int
func (c ChunkSize) valOrDefault() int {
if c == 0 {
return defaultBlockSize
}
return int(c)
}
func (c ChunkSize) toSizeT() C.size_t {
return C.size_t(c.valOrDefault())
}
// bridgeCtx is used for managing the C-Go bridge calls.
// It contains a wait group for synchronizing the calls,
// a cgo.Handle for passing context to the C code,
// a response pointer for receiving data from the C code,
// and fields for storing the result and error of the call.
type bridgeCtx struct {
wg *sync.WaitGroup
h cgo.Handle
resp unsafe.Pointer
result string
err error
// Callback used for receiving progress updates during upload/download.
//
// For the upload, the bytes parameter indicates the number of bytes uploaded.
// If the chunk size is superior or equal to the blocksize (passed in init function),
// the callback will be called when a block is put in the store.
// Otherwise, it will be called when a chunk is pushed into the stream.
//
// For the download, the bytes is the size of the chunk received, and the chunk
// is the actual chunk of data received.
onProgress func(bytes int, chunk []byte)
}
// newBridgeCtx creates a new bridge context for managing C-Go calls.
// The bridge context is initialized with a wait group and a cgo.Handle.
func newBridgeCtx() *bridgeCtx {
bridge := &bridgeCtx{}
bridge.wg = &sync.WaitGroup{}
bridge.wg.Add(1)
bridge.h = cgo.NewHandle(bridge)
bridge.resp = C.allocResp(C.uintptr_t(uintptr(bridge.h)))
return bridge
}
// callError creates an error message for a failed C-Go call.
func (b *bridgeCtx) callError(name string) error {
return fmt.Errorf("failed the call to %s returned code %d", name, C.getRet(b.resp))
}
// free releases the resources associated with the bridge context,
// including the cgo.Handle and the response pointer.
func (b *bridgeCtx) free() {
if b.h > 0 {
b.h.Delete()
b.h = 0
}
if b.resp != nil {
C.freeResp(b.resp)
b.resp = nil
}
}
// callback is the function called by the C code to communicate back to Go.
// It handles progress updates, successful completions, and errors.
// The function uses the response pointer to retrieve the bridge context
// and update its state accordingly.
//
//export callback
func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
if resp == nil {
return
}
m := (*C.Resp)(resp)
m.ret = ret
m.msg = msg
m.len = len
if m.h == 0 {
return
}
h := cgo.Handle(m.h)
if h == 0 {
return
}
if v, ok := h.Value().(*bridgeCtx); ok {
switch ret {
case C.RET_PROGRESS:
if v.onProgress == nil {
return
}
if msg != nil {
chunk := C.GoBytes(unsafe.Pointer(msg), C.int(len))
v.onProgress(int(C.int(len)), chunk)
} else {
v.onProgress(int(C.int(len)), nil)
}
case C.RET_OK:
retMsg := C.GoStringN(msg, C.int(len))
v.result = retMsg
v.err = nil
if v.wg != nil {
v.wg.Done()
}
case C.RET_ERR:
retMsg := C.GoStringN(msg, C.int(len))
v.err = errors.New(retMsg)
if v.wg != nil {
v.wg.Done()
}
}
}
}
// wait waits for the bridge context to complete its operation.
// It returns the result and error of the operation.
func (b *bridgeCtx) wait() (string, error) {
b.wg.Wait()
return b.result, b.err
}
type OnUploadProgressFunc func(read, total int, percent float64, err error)
type UploadOptions struct {
// Filepath can be the full path when using UploadFile
// otherwise the file name.
// It is used to detect the mimetype.
Filepath string
// ChunkSize is the size of each upload chunk, passed as `blockSize` to the Codex node
// store. Default is to 64 KB.
ChunkSize ChunkSize
// OnProgress is a callback function that is called after each chunk is uploaded with:
// - read: the number of bytes read in the last chunk.
// - total: the total number of bytes read so far.
// - percent: the percentage of the total file size that has been uploaded. It is
// determined from a `stat` call if it is a file and from the length of the buffer
// if it is a buffer. Otherwise, it is 0.
// - err: an error, if one occurred.
//
// If the chunk size is more than the `chunkSize` parameter, the callback is called
// after the block is actually stored in the block store. Otherwise, it is called
// after the chunk is sent to the stream.
OnProgress OnUploadProgressFunc
}
func getReaderSize(r io.Reader) int64 {
switch v := r.(type) {
case *os.File:
stat, err := v.Stat()
if err != nil {
return 0
}
return stat.Size()
case *bytes.Buffer:
return int64(v.Len())
default:
return 0
}
}
// New creates a new Codex node with the provided configuration.
// The node is not started automatically; you need to call CodexStart
// to start it.
// It returns a Codex node that can be used to interact
// with the Codex network.
func New(config Config) (*CodexNode, error) {
bridge := newBridgeCtx()
defer bridge.free()
jsonConfig, err := json.Marshal(config)
if err != nil {
return nil, err
}
cJsonConfig := C.CString(string(jsonConfig))
defer C.free(unsafe.Pointer(cJsonConfig))
ctx := C.cGoCodexNew(cJsonConfig, bridge.resp)
if _, err := bridge.wait(); err != nil {
return nil, bridge.err
}
return &CodexNode{ctx: ctx}, bridge.err
}
// Start starts the Codex node.
func (node CodexNode) Start() error {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexStart(node.ctx, bridge.resp) != C.RET_OK {
return bridge.callError("cGoCodexStart")
}
_, err := bridge.wait()
return err
}
// StartAsync is the asynchronous version of Start.
func (node CodexNode) StartAsync(onDone func(error)) {
go func() {
err := node.Start()
onDone(err)
}()
}
// Stop stops the Codex node.
func (node CodexNode) Stop() error {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexStop(node.ctx, bridge.resp) != C.RET_OK {
return bridge.callError("cGoCodexStop")
}
_, err := bridge.wait()
return err
}
// Destroy destroys the Codex node, freeing all resources.
// The node must be stopped before calling this method.
func (node CodexNode) Destroy() error {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexClose(node.ctx, bridge.resp) != C.RET_OK {
return bridge.callError("cGoCodexClose")
}
_, err := bridge.wait()
if err != nil {
return err
}
if C.cGoCodexDestroy(node.ctx, bridge.resp) != C.RET_OK {
return errors.New("Failed to destroy the codex node.")
}
return err
}
// Version returns the version of the Codex node.
func (node CodexNode) Version() (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexVersion(node.ctx, bridge.resp) != C.RET_OK {
return "", bridge.callError("cGoCodexVersion")
}
return bridge.wait()
}
func (node CodexNode) Revision() (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexRevision(node.ctx, bridge.resp) != C.RET_OK {
return "", bridge.callError("cGoCodexRevision")
}
return bridge.wait()
}
// Repo returns the path of the data dir folder.
func (node CodexNode) Repo() (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexRepo(node.ctx, bridge.resp) != C.RET_OK {
return "", bridge.callError("cGoCodexRepo")
}
return bridge.wait()
}
func (node CodexNode) Spr() (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexSpr(node.ctx, bridge.resp) != C.RET_OK {
return "", bridge.callError("cGoCodexSpr")
}
return bridge.wait()
}
func (node CodexNode) PeerId() (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexPeerId(node.ctx, bridge.resp) != C.RET_OK {
return "", bridge.callError("cGoCodexPeerId")
}
return bridge.wait()
}
// UploadInit initializes a new upload session.
// It returns a session ID that can be used for subsequent upload operations.
// This function is called by UploadReader and UploadFile internally.
// You should use this function only if you need to manage the upload session manually.
func (node CodexNode) UploadInit(options *UploadOptions) (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
var cFilename = C.CString(options.Filepath)
defer C.free(unsafe.Pointer(cFilename))
if C.cGoCodexUploadInit(node.ctx, cFilename, options.ChunkSize.toSizeT(), bridge.resp) != C.RET_OK {
return "", bridge.callError("cGoCodexUploadInit")
}
return bridge.wait()
}
// UploadChunk uploads a chunk of data to the Codex node.
// It takes the session ID returned by UploadInit
// and a byte slice containing the chunk data.
// This function is called by UploadReader internally.
// You should use this function only if you need to manage the upload session manually.
func (node CodexNode) UploadChunk(sessionId string, chunk []byte) error {
bridge := newBridgeCtx()
defer bridge.free()
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
var cChunkPtr *C.uint8_t
if len(chunk) > 0 {
cChunkPtr = (*C.uint8_t)(unsafe.Pointer(&chunk[0]))
}
if C.cGoCodexUploadChunk(node.ctx, cSessionId, cChunkPtr, C.size_t(len(chunk)), bridge.resp) != C.RET_OK {
return bridge.callError("cGoCodexUploadChunk")
}
_, err := bridge.wait()
return err
}
// UploadFinalize finalizes the upload session and returns the CID of the uploaded file.
// It takes the session ID returned by UploadInit.
// This function is called by UploadReader and UploadFile internally.
// You should use this function only if you need to manage the upload session manually.
func (node CodexNode) UploadFinalize(sessionId string) (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
if C.cGoCodexUploadFinalize(node.ctx, cSessionId, bridge.resp) != C.RET_OK {
return "", bridge.callError("cGoCodexUploadFinalize")
}
return bridge.wait()
}
// UploadCancel cancels an ongoing upload session.
// It can be only if the upload session is managed manually.
// It doesn't work with UploadFile.
func (node CodexNode) UploadCancel(sessionId string) error {
bridge := newBridgeCtx()
defer bridge.free()
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
if C.cGoCodexUploadCancel(node.ctx, cSessionId, bridge.resp) != C.RET_OK {
return bridge.callError("cGoCodexUploadCancel")
}
_, err := bridge.wait()
return err
}
// UploadReader uploads data from an io.Reader to the Codex node.
// It takes the upload options and the reader as parameters.
// It returns the CID of the uploaded file or an error.
//
// Internally, it calls:
// - UploadInit to create the upload session.
// - UploadChunk to upload a chunk to codex.
// - UploadFinalize to finalize the upload session.
// - UploadCancel if an error occurs.
func (node CodexNode) UploadReader(options UploadOptions, r io.Reader) (string, error) {
sessionId, err := node.UploadInit(&options)
if err != nil {
return "", err
}
buf := make([]byte, options.ChunkSize.valOrDefault())
total := 0
var size int64
if options.OnProgress != nil {
size = getReaderSize(r)
}
for {
n, err := r.Read(buf)
if err == io.EOF {
break
}
if err != nil {
if cancelErr := node.UploadCancel(sessionId); cancelErr != nil {
return "", fmt.Errorf("failed to upload chunk %v and failed to cancel upload session %v", err, cancelErr)
}
return "", err
}
if n == 0 {
break
}
if err := node.UploadChunk(sessionId, buf[:n]); err != nil {
if cancelErr := node.UploadCancel(sessionId); cancelErr != nil {
return "", fmt.Errorf("failed to upload chunk %v and failed to cancel upload session %v", err, cancelErr)
}
return "", err
}
total += n
if options.OnProgress != nil && size > 0 {
percent := float64(total) / float64(size) * 100.0
// The last block could be a bit over the size due to padding
// on the chunk size.
if percent > 100.0 {
percent = 100.0
}
options.OnProgress(n, total, percent, nil)
} else if options.OnProgress != nil {
options.OnProgress(n, total, 0, nil)
}
}
return node.UploadFinalize(sessionId)
}
// UploadReaderAsync is the asynchronous version of UploadReader using a goroutine.
func (node CodexNode) UploadReaderAsync(options UploadOptions, r io.Reader, onDone func(cid string, err error)) {
go func() {
cid, err := node.UploadReader(options, r)
onDone(cid, err)
}()
}
// UploadFile uploads a file to the Codex node.
// It takes the upload options as parameter.
// It returns the CID of the uploaded file or an error.
//
// The options parameter contains the following fields:
// - filepath: the full path of the file to upload.
// - chunkSize: the size of each upload chunk, passed as `blockSize` to the Codex node
// store. Default is to 64 KB.
// - onProgress: a callback function that is called after each chunk is uploaded with:
// - read: the number of bytes read in the last chunk.
// - total: the total number of bytes read so far.
// - percent: the percentage of the total file size that has been uploaded. It is
// determined from a `stat` call.
// - err: an error, if one occurred.
//
// If the chunk size is more than the `chunkSize` parameter, the callback is called after
// the block is actually stored in the block store. Otherwise, it is called after the chunk
// is sent to the stream.
//
// Internally, it calls UploadInit to create the upload session.
func (node CodexNode) UploadFile(options UploadOptions) (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
if options.OnProgress != nil {
stat, err := os.Stat(options.Filepath)
if err != nil {
return "", err
}
size := stat.Size()
total := 0
if size > 0 {
bridge.onProgress = func(read int, _ []byte) {
if read == 0 {
return
}
total += read
percent := float64(total) / float64(size) * 100.0
// The last block could be a bit over the size due to padding
// on the chunk size.
if percent > 100.0 {
percent = 100.0
}
options.OnProgress(read, int(size), percent, nil)
}
}
}
sessionId, err := node.UploadInit(&options)
if err != nil {
return "", err
}
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
if C.cGoCodexUploadFile(node.ctx, cSessionId, bridge.resp) != C.RET_OK {
return "", bridge.callError("cGoCodexUploadFile")
}
return bridge.wait()
}
// UploadFileAsync is the asynchronous version of UploadFile using a goroutine.
func (node CodexNode) UploadFileAsync(options UploadOptions, onDone func(cid string, err error)) {
go func() {
cid, err := node.UploadFile(options)
onDone(cid, err)
}()
}
func (node CodexNode) UpdateLogLevel(logLevel string) error {
bridge := newBridgeCtx()
defer bridge.free()
var cLogLevel = C.CString(string(logLevel))
defer C.free(unsafe.Pointer(cLogLevel))
if C.cGoCodexLogLevel(node.ctx, cLogLevel, bridge.resp) != C.RET_OK {
return bridge.callError("cGoCodexLogLevel")
}
_, err := bridge.wait()
return err
}
func (node CodexNode) Exists(cid string) (bool, error) {
bridge := newBridgeCtx()
defer bridge.free()
var cCid = C.CString(cid)
defer C.free(unsafe.Pointer(cCid))
if C.cGoCodexExists(node.ctx, cCid, bridge.resp) != C.RET_OK {
return false, bridge.callError("cGoCodexUploadCancel")
}
result, err := bridge.wait()
return result == "true", err
}
func main() {
dataDir := os.TempDir() + "/data-dir"
node, err := New(Config{
BlockRetries: 5,
LogLevel: "WARN",
DataDir: dataDir,
})
if err != nil {
log.Fatalf("Failed to create Codex node: %v", err)
}
defer os.RemoveAll(dataDir)
if err := node.Start(); err != nil {
log.Fatalf("Failed to start Codex node: %v", err)
}
log.Println("Codex node started")
version, err := node.Version()
if err != nil {
log.Fatalf("Failed to get Codex version: %v", err)
}
log.Printf("Codex version: %s", version)
err = node.UpdateLogLevel("ERROR")
if err != nil {
log.Fatalf("Failed to update log level: %v", err)
}
cid := "zDvZRwzmAkhzDRPH5EW242gJBNZ2T7aoH2v1fVH66FxXL4kSbvyM"
exists, err := node.Exists(cid)
if err != nil {
log.Fatalf("Failed to check data existence: %v", err)
}
if exists {
log.Fatalf("The data should not exist")
}
buf := bytes.NewBuffer([]byte("Hello World!"))
len := buf.Len()
cid, err = node.UploadReader(UploadOptions{Filepath: "hello.txt"}, buf)
if err != nil {
log.Fatalf("Failed to upload data: %v", err)
}
log.Printf("Uploaded data with CID: %s (size: %d bytes)", cid, len)
exists, err = node.Exists(cid)
if err != nil {
log.Fatalf("Failed to check data existence: %v", err)
}
if !exists {
log.Fatalf("The data should exist")
}
// Wait for a SIGINT or SIGTERM signal
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
<-ch
if err := node.Stop(); err != nil {
log.Fatalf("Failed to stop Codex node: %v", err)
}
log.Println("Codex node stopped")
if err := node.Destroy(); err != nil {
log.Fatalf("Failed to destroy Codex node: %v", err)
}
}

View File

@ -0,0 +1 @@
Hello World!

37
library/README.md Normal file
View File

@ -0,0 +1,37 @@
# Codex Library
Codex exposes a C binding that serves as a stable contract, making it straightforward to integrate Codex into other languages such as Go.
The implementation was inspired by [nim-library-template](https://github.com/logos-co/nim-library-template)
and by the [nwaku](https://github.com/waku-org/nwaku/tree/master/library) library.
The source code contains detailed comments to explain the threading and callback flow.
The diagram below summarizes the lifecycle: context creation, request execution, and shutdown.
```mermaid
sequenceDiagram
autonumber
actor App as App/User
participant Go as Go Wrapper
participant C as C API (libcodex.h)
participant Ctx as CodexContext
participant Thr as Worker Thread
participant Eng as CodexServer
App->>Go: Start
Go->>C: codex_start_node
C->>Ctx: enqueue request
C->>Ctx: fire signal
Ctx->>Thr: wake worker
Thr->>Ctx: dequeue request
Thr-->>Ctx: ACK
Ctx-->>C: forward ACK
C-->>Go: RET OK
Go->>App: Unblock
Thr->>Eng: execute (async)
Eng-->>Thr: result ready
Thr-->>Ctx: callback
Ctx-->>C: forward callback
C-->>Go: forward callback
Go-->>App: done
```

42
library/alloc.nim Normal file
View File

@ -0,0 +1,42 @@
## Can be shared safely between threads
type SharedSeq*[T] = tuple[data: ptr UncheckedArray[T], len: int]
proc alloc*(str: cstring): cstring =
# Byte allocation from the given address.
# There should be the corresponding manual deallocation with deallocShared !
if str.isNil():
var ret = cast[cstring](allocShared(1)) # Allocate memory for the null terminator
ret[0] = '\0' # Set the null terminator
return ret
let ret = cast[cstring](allocShared(len(str) + 1))
copyMem(ret, str, len(str) + 1)
return ret
proc alloc*(str: string): cstring =
## Byte allocation from the given address.
## There should be the corresponding manual deallocation with deallocShared !
var ret = cast[cstring](allocShared(str.len + 1))
let s = cast[seq[char]](str)
for i in 0 ..< str.len:
ret[i] = s[i]
ret[str.len] = '\0'
return ret
proc allocSharedSeq*[T](s: seq[T]): SharedSeq[T] =
let data = allocShared(sizeof(T) * s.len)
if s.len != 0:
copyMem(data, unsafeAddr s[0], s.len)
return (cast[ptr UncheckedArray[T]](data), s.len)
proc deallocSharedSeq*[T](s: var SharedSeq[T]) =
deallocShared(s.data)
s.len = 0
proc toSeq*[T](s: SharedSeq[T]): seq[T] =
## Creates a seq[T] from a SharedSeq[T]. No explicit dealloc is required
## as req[T] is a GC managed type.
var ret = newSeq[T]()
for i in 0 ..< s.len:
ret.add(s.data[i])
return ret

225
library/codex_context.nim Normal file
View File

@ -0,0 +1,225 @@
## This file defines the Codex context and its thread flow:
## 1. Client enqueues a request and signals the Codex thread.
## 2. The Codex thread dequeues the request and sends an ack (reqReceivedSignal).
## 3. The Codex thread executes the request asynchronously.
## 4. On completion, the Codex thread invokes the client callback with the result and userData.
{.pragma: exported, exportc, cdecl, raises: [].}
{.pragma: callback, cdecl, raises: [], gcsafe.}
{.passc: "-fPIC".}
import std/[options, locks, atomics]
import chronicles
import chronos
import chronos/threadsync
import taskpools/channels_spsc_single
import ./ffi_types
import ./codex_thread_requests/[codex_thread_request]
from ../codex/codex import CodexServer
logScope:
topics = "codexlib"
type CodexContext* = object
thread: Thread[(ptr CodexContext)]
# This lock is only necessary while we use a SP Channel and while the signalling
# between threads assumes that there aren't concurrent requests.
# Rearchitecting the signaling + migrating to a MP Channel will allow us to receive
# requests concurrently and spare us the need of locks
lock: Lock
# Channel to send requests to the Codex thread.
# Requests will be popped from this channel.
reqChannel: ChannelSPSCSingle[ptr CodexThreadRequest]
# To notify the Codex thread that a request is ready
reqSignal: ThreadSignalPtr
# To notify the client thread that the request was received.
# It is acknowledgment signal (handshake).
reqReceivedSignal: ThreadSignalPtr
# Custom state attached by the client to a request,
# returned when its callback is invoked
userData*: pointer
# Function called by the library to notify the client of global events
eventCallback*: pointer
# Custom state attached by the client to the context,
# returned with every event callback
eventUserData*: pointer
# Set to false to stop the Codex thread (during codex_destroy)
running: Atomic[bool]
template callEventCallback(ctx: ptr CodexContext, eventName: string, body: untyped) =
## Template used to notify the client of global events
## Example: onConnectionChanged, onProofMissing, etc.
if isNil(ctx[].eventCallback):
error eventName & " - eventCallback is nil"
return
foreignThreadGc:
try:
let event = body
cast[CodexCallback](ctx[].eventCallback)(
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData
)
except CatchableError:
let msg =
"Exception " & eventName & " when calling 'eventCallBack': " &
getCurrentExceptionMsg()
cast[CodexCallback](ctx[].eventCallback)(
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
)
proc sendRequestToCodexThread*(
ctx: ptr CodexContext,
reqType: RequestType,
reqContent: pointer,
callback: CodexCallback,
userData: pointer,
timeout = InfiniteDuration,
): Result[void, string] =
ctx.lock.acquire()
defer:
ctx.lock.release()
let req = CodexThreadRequest.createShared(reqType, reqContent, callback, userData)
# Send the request to the Codex thread
let sentOk = ctx.reqChannel.trySend(req)
if not sentOk:
deallocShared(req)
return err("Failed to send request to the codex thread: " & $req[])
# Notify the Codex thread that a request is available
let fireSyncRes = ctx.reqSignal.fireSync()
if fireSyncRes.isErr():
deallocShared(req)
return err(
"Failed to send request to the codex thread: unable to fireSync: " &
$fireSyncRes.error
)
if fireSyncRes.get() == false:
deallocShared(req)
return err("Failed to send request to the codex thread: fireSync timed out.")
# Wait until the Codex Thread properly received the request
let res = ctx.reqReceivedSignal.waitSync(timeout)
if res.isErr():
deallocShared(req)
return err(
"Failed to send request to the codex thread: unable to receive reqReceivedSignal signal."
)
## Notice that in case of "ok", the deallocShared(req) is performed by the Codex Thread in the
## process proc. See the 'codex_thread_request.nim' module for more details.
ok()
proc runCodex(ctx: ptr CodexContext) {.async: (raises: []).} =
var codex: CodexServer
while true:
try:
# Wait until a request is available
await ctx.reqSignal.wait()
except Exception as e:
error "Failure in run codex thread while waiting for reqSignal.", error = e.msg
continue
# If codex_destroy was called, exit the loop
if ctx.running.load == false:
break
var request: ptr CodexThreadRequest
# Pop a request from the channel
let recvOk = ctx.reqChannel.tryRecv(request)
if not recvOk:
error "Failure in run codex: unable to receive request in codex thread."
continue
# yield immediately to the event loop
# with asyncSpawn only, the code will be executed
# synchronously until the first await
asyncSpawn (
proc() {.async.} =
await sleepAsync(0)
await CodexThreadRequest.process(request, addr codex)
)()
# Notify the main thread that we picked up the request
let fireRes = ctx.reqReceivedSignal.fireSync()
if fireRes.isErr():
error "Failure in run codex: unable to fire back to requester thread.",
error = fireRes.error
proc run(ctx: ptr CodexContext) {.thread.} =
waitFor runCodex(ctx)
proc createCodexContext*(): Result[ptr CodexContext, string] =
## This proc is called from the main thread and it creates
## the Codex working thread.
# Allocates a CodexContext in shared memory (for the main thread)
var ctx = createShared(CodexContext, 1)
# This signal is used by the main side to wake the Codex thread
# when a new request is enqueued.
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
return
err("Failed to create a context: unable to create reqSignal ThreadSignalPtr.")
# Used to let the caller know that the Codex thread has
# acknowledged / picked up a request (like a handshake).
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
return err(
"Failed to create codex context: unable to create reqReceivedSignal ThreadSignalPtr."
)
# Protects shared state inside CodexContext
ctx.lock.initLock()
# Codex thread will loop until codex_destroy is called
ctx.running.store(true)
try:
createThread(ctx.thread, run, ctx)
except ValueError, ResourceExhaustedError:
freeShared(ctx)
return err(
"Failed to create codex context: unable to create thread: " &
getCurrentExceptionMsg()
)
return ok(ctx)
proc destroyCodexContext*(ctx: ptr CodexContext): Result[void, string] =
# Signal the Codex thread to stop
ctx.running.store(false)
# Wake the worker up if it's waiting
let signaledOnTime = ctx.reqSignal.fireSync().valueOr:
return err("Failed to destroy codex context: " & $error)
if not signaledOnTime:
return err(
"Failed to destroy codex context: unable to get signal reqSignal on time in destroyCodexContext."
)
# Wait for the thread to finish
joinThread(ctx.thread)
# Clean up
ctx.lock.deinitLock()
?ctx.reqSignal.close()
?ctx.reqReceivedSignal.close()
freeShared(ctx)
return ok()

View File

@ -0,0 +1,126 @@
## This file contains the base message request type that will be handled.
## The requests are created by the main thread and processed by
## the Codex Thread.
import std/json
import results
import chronos
import ../ffi_types
import ./requests/node_lifecycle_request
import ./requests/node_info_request
import ./requests/node_debug_request
import ./requests/node_p2p_request
import ./requests/node_upload_request
import ./requests/node_download_request
import ./requests/node_storage_request
from ../../codex/codex import CodexServer
type RequestType* {.pure.} = enum
LIFECYCLE
INFO
DEBUG
P2P
UPLOAD
DOWNLOAD
STORAGE
type CodexThreadRequest* = object
reqType: RequestType
# Request payloed
reqContent: pointer
# Callback to notify the client thread of the result
callback: CodexCallback
# Custom state attached by the client to the request,
# returned when its callback is invoked.
userData: pointer
proc createShared*(
T: type CodexThreadRequest,
reqType: RequestType,
reqContent: pointer,
callback: CodexCallback,
userData: pointer,
): ptr type T =
var ret = createShared(T)
ret[].reqType = reqType
ret[].reqContent = reqContent
ret[].callback = callback
ret[].userData = userData
return ret
# NOTE: User callbacks are executed on the working thread.
# They must be fast and non-blocking; otherwise this thread will be blocked
# and no further requests can be processed.
# We can improve this by dispatching the callbacks to a thread pool or
# moving to a MP channel.
# See: https://github.com/codex-storage/nim-codex/pull/1322#discussion_r2340708316
proc handleRes[T: string | void | seq[byte]](
res: Result[T, string], request: ptr CodexThreadRequest
) =
## Handles the Result responses, which can either be Result[string, string] or
## Result[void, string].
defer:
deallocShared(request)
if res.isErr():
foreignThreadGc:
let msg = $res.error
if msg == "":
request[].callback(RET_ERR, nil, cast[csize_t](0), request[].userData)
else:
request[].callback(
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData
)
return
foreignThreadGc:
var msg: cstring = ""
when T is string:
msg = res.get().cstring()
request[].callback(
RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData
)
return
proc process*(
T: type CodexThreadRequest, request: ptr CodexThreadRequest, codex: ptr CodexServer
) {.async: (raises: []).} =
## Processes the request in the Codex thread.
## Dispatch to the appropriate request handler based on reqType.
let retFut =
case request[].reqType
of LIFECYCLE:
cast[ptr NodeLifecycleRequest](request[].reqContent).process(codex)
of INFO:
cast[ptr NodeInfoRequest](request[].reqContent).process(codex)
of RequestType.DEBUG:
cast[ptr NodeDebugRequest](request[].reqContent).process(codex)
of P2P:
cast[ptr NodeP2PRequest](request[].reqContent).process(codex)
of STORAGE:
cast[ptr NodeStorageRequest](request[].reqContent).process(codex)
of DOWNLOAD:
let onChunk = proc(bytes: seq[byte]) =
if bytes.len > 0:
request[].callback(
RET_PROGRESS,
cast[ptr cchar](unsafeAddr bytes[0]),
cast[csize_t](bytes.len),
request[].userData,
)
cast[ptr NodeDownloadRequest](request[].reqContent).process(codex, onChunk)
of UPLOAD:
let onBlockReceived = proc(bytes: int) =
request[].callback(RET_PROGRESS, nil, cast[csize_t](bytes), request[].userData)
cast[ptr NodeUploadRequest](request[].reqContent).process(codex, onBlockReceived)
handleRes(await retFut, request)
proc `$`*(self: CodexThreadRequest): string =
return $self.reqType

View File

@ -0,0 +1,126 @@
{.push raises: [].}
## This file contains the debug info available with Codex.
## The DEBUG type will return info about the P2P node.
## The PEER type is available only with codex_enable_api_debug_peers flag.
## It will return info about a specific peer if available.
import std/[options]
import chronos
import chronicles
import codexdht/discv5/spr
import ../../alloc
import ../../../codex/conf
import ../../../codex/rest/json
import ../../../codex/node
from ../../../codex/codex import CodexServer, node
logScope:
topics = "codexlib codexlibdebug"
type NodeDebugMsgType* = enum
DEBUG
PEER
LOG_LEVEL
type NodeDebugRequest* = object
operation: NodeDebugMsgType
peerId: cstring
logLevel: cstring
proc createShared*(
T: type NodeDebugRequest,
op: NodeDebugMsgType,
peerId: cstring = "",
logLevel: cstring = "",
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].peerId = peerId.alloc()
ret[].logLevel = logLevel.alloc()
return ret
proc destroyShared(self: ptr NodeDebugRequest) =
deallocShared(self[].peerId)
deallocShared(self[].logLevel)
deallocShared(self)
proc getDebug(
codex: ptr CodexServer
): Future[Result[string, string]] {.async: (raises: []).} =
let node = codex[].node
let table = RestRoutingTable.init(node.discovery.protocol.routingTable)
let json =
%*{
"id": $node.switch.peerInfo.peerId,
"addrs": node.switch.peerInfo.addrs.mapIt($it),
"spr":
if node.discovery.dhtRecord.isSome: node.discovery.dhtRecord.get.toURI else: "",
"announceAddresses": node.discovery.announceAddrs,
"table": table,
}
return ok($json)
proc getPeer(
codex: ptr CodexServer, peerId: cstring
): Future[Result[string, string]] {.async: (raises: []).} =
when codex_enable_api_debug_peers:
let node = codex[].node
let res = PeerId.init($peerId)
if res.isErr:
return err("Failed to get peer: invalid peer ID " & $peerId & ": " & $res.error())
let id = res.get()
try:
let peerRecord = await node.findPeer(id)
if peerRecord.isNone:
return err("Failed to get peer: peer not found")
return ok($ %RestPeerRecord.init(peerRecord.get()))
except CancelledError:
return err("Failed to get peer: operation cancelled")
except CatchableError as e:
return err("Failed to get peer: " & e.msg)
else:
return err("Failed to get peer: peer debug API is disabled")
proc updateLogLevel(
codex: ptr CodexServer, logLevel: cstring
): Future[Result[string, string]] {.async: (raises: []).} =
try:
{.gcsafe.}:
updateLogLevel($logLevel)
except ValueError as err:
return err("Failed to update log level: invalid value for log level: " & err.msg)
return ok("")
proc process*(
self: ptr NodeDebugRequest, codex: ptr CodexServer
): Future[Result[string, string]] {.async: (raises: []).} =
defer:
destroyShared(self)
case self.operation
of NodeDebugMsgType.DEBUG:
let res = (await getDebug(codex))
if res.isErr:
error "Failed to get DEBUG.", error = res.error
return err($res.error)
return res
of NodeDebugMsgType.PEER:
let res = (await getPeer(codex, self.peerId))
if res.isErr:
error "Failed to get PEER.", error = res.error
return err($res.error)
return res
of NodeDebugMsgType.LOG_LEVEL:
let res = (await updateLogLevel(codex, self.logLevel))
if res.isErr:
error "Failed to update LOG_LEVEL.", error = res.error
return err($res.error)
return res

View File

@ -0,0 +1,336 @@
{.push raises: [].}
## This file contains the download request.
## A session is created for each download identified by the CID,
## allowing to resume, pause and cancel the download (using chunks).
##
## There are two ways to download a file:
## 1. Via chunks: the cid parameter is the CID of the file to download. Steps are:
## - INIT: initializes the download session
## - CHUNK: downloads the next chunk of the file
## - CANCEL: cancels the download session
## 2. Via stream.
## - INIT: initializes the download session
## - STREAM: downloads the file in a streaming manner, calling
## the onChunk handler for each chunk and / or writing to a file if filepath is set.
## - CANCEL: cancels the download session
import std/[options, streams]
import chronos
import chronicles
import libp2p/stream/[lpstream]
import serde/json as serde
import ../../alloc
import ../../../codex/units
import ../../../codex/codextypes
from ../../../codex/codex import CodexServer, node
from ../../../codex/node import retrieve, fetchManifest
from ../../../codex/rest/json import `%`, RestContent
from libp2p import Cid, init, `$`
logScope:
topics = "codexlib codexlibdownload"
type NodeDownloadMsgType* = enum
INIT
CHUNK
STREAM
CANCEL
MANIFEST
type OnChunkHandler = proc(bytes: seq[byte]): void {.gcsafe, raises: [].}
type NodeDownloadRequest* = object
operation: NodeDownloadMsgType
cid: cstring
chunkSize: csize_t
local: bool
filepath: cstring
type
DownloadSessionId* = string
DownloadSessionCount* = int
DownloadSession* = object
stream: LPStream
chunkSize: int
var downloadSessions {.threadvar.}: Table[DownloadSessionId, DownloadSession]
proc createShared*(
T: type NodeDownloadRequest,
op: NodeDownloadMsgType,
cid: cstring = "",
chunkSize: csize_t = 0,
local: bool = false,
filepath: cstring = "",
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].cid = cid.alloc()
ret[].chunkSize = chunkSize
ret[].local = local
ret[].filepath = filepath.alloc()
return ret
proc destroyShared(self: ptr NodeDownloadRequest) =
deallocShared(self[].cid)
deallocShared(self[].filepath)
deallocShared(self)
proc init(
codex: ptr CodexServer, cCid: cstring = "", chunkSize: csize_t = 0, local: bool
): Future[Result[string, string]] {.async: (raises: []).} =
## Init a new session to download the file identified by cid.
##
## If the session already exists, do nothing and return ok.
## Meaning that a cid can only have one active download session.
## If the chunkSize is 0, the default block size will be used.
## If local is true, the file will be retrived from the local store.
let cid = Cid.init($cCid)
if cid.isErr:
return err("Failed to download locally: cannot parse cid: " & $cCid)
if downloadSessions.contains($cid):
return ok("Download session already exists.")
let node = codex[].node
var stream: LPStream
try:
let res = await node.retrieve(cid.get(), local)
if res.isErr():
return err("Failed to init the download: " & res.error.msg)
stream = res.get()
except CancelledError:
downloadSessions.del($cid)
return err("Failed to init the download: download cancelled.")
let blockSize = if chunkSize.int > 0: chunkSize.int else: DefaultBlockSize.int
downloadSessions[$cid] = DownloadSession(stream: stream, chunkSize: blockSize)
return ok("")
proc chunk(
codex: ptr CodexServer, cCid: cstring = "", onChunk: OnChunkHandler
): Future[Result[string, string]] {.async: (raises: []).} =
## Download the next chunk of the file identified by cid.
## The chunk is passed to the onChunk handler.
##
## If the stream is at EOF, return ok with empty string.
##
## If an error is raised while reading the stream, the session is deleted
## and an error is returned.
let cid = Cid.init($cCid)
if cid.isErr:
return err("Failed to download locally: cannot parse cid: " & $cCid)
if not downloadSessions.contains($cid):
return err("Failed to download chunk: no session for cid " & $cid)
var session: DownloadSession
try:
session = downloadSessions[$cid]
except KeyError:
return err("Failed to download chunk: no session for cid " & $cid)
let stream = session.stream
if stream.atEof:
return ok("")
let chunkSize = session.chunkSize
var buf = newSeq[byte](chunkSize)
try:
let read = await stream.readOnce(addr buf[0], buf.len)
buf.setLen(read)
except LPStreamError as e:
await stream.close()
downloadSessions.del($cid)
return err("Failed to download chunk: " & $e.msg)
except CancelledError:
await stream.close()
downloadSessions.del($cid)
return err("Failed to download chunk: download cancelled.")
if buf.len <= 0:
return err("Failed to download chunk: no data")
onChunk(buf)
return ok("")
proc streamData(
codex: ptr CodexServer,
stream: LPStream,
onChunk: OnChunkHandler,
chunkSize: csize_t,
filepath: cstring,
): Future[Result[string, string]] {.
async: (raises: [CancelledError, LPStreamError, IOError])
.} =
let blockSize = if chunkSize.int > 0: chunkSize.int else: DefaultBlockSize.int
var buf = newSeq[byte](blockSize)
var read = 0
var outputStream: OutputStreamHandle
var filedest: string = $filepath
try:
if filepath != "":
outputStream = filedest.fileOutput()
while not stream.atEof:
## Yield immediately to the event loop
## It gives a chance to cancel request to be processed
await sleepAsync(0)
let read = await stream.readOnce(addr buf[0], buf.len)
buf.setLen(read)
if buf.len <= 0:
break
onChunk(buf)
if outputStream != nil:
outputStream.write(buf)
if outputStream != nil:
outputStream.close()
finally:
if outputStream != nil:
outputStream.close()
return ok("")
proc stream(
codex: ptr CodexServer,
cCid: cstring,
chunkSize: csize_t,
local: bool,
filepath: cstring,
onChunk: OnChunkHandler,
): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
## Stream the file identified by cid, calling the onChunk handler for each chunk
## and / or writing to a file if filepath is set.
##
## If local is true, the file will be retrieved from the local store.
let cid = Cid.init($cCid)
if cid.isErr:
return err("Failed to stream: cannot parse cid: " & $cCid)
if not downloadSessions.contains($cid):
return err("Failed to stream: no session for cid " & $cid)
var session: DownloadSession
try:
session = downloadSessions[$cid]
except KeyError:
return err("Failed to stream: no session for cid " & $cid)
let node = codex[].node
try:
let res =
await noCancel codex.streamData(session.stream, onChunk, chunkSize, filepath)
if res.isErr:
return err($res.error)
except LPStreamError as e:
return err("Failed to stream file: " & $e.msg)
except IOError as e:
return err("Failed to stream file: " & $e.msg)
finally:
if session.stream != nil:
await session.stream.close()
downloadSessions.del($cid)
return ok("")
proc cancel(
codex: ptr CodexServer, cCid: cstring
): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
## Cancel the download session identified by cid.
## This operation is not supported when using the stream mode,
## because the worker will be busy downloading the file.
let cid = Cid.init($cCid)
if cid.isErr:
return err("Failed to cancel : cannot parse cid: " & $cCid)
if not downloadSessions.contains($cid):
# The session is already cancelled
return ok("")
var session: DownloadSession
try:
session = downloadSessions[$cid]
except KeyError:
# The session is already cancelled
return ok("")
let stream = session.stream
await stream.close()
downloadSessions.del($cCid)
return ok("")
proc manifest(
codex: ptr CodexServer, cCid: cstring
): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
let cid = Cid.init($cCid)
if cid.isErr:
return err("Failed to fetch manifest: cannot parse cid: " & $cCid)
try:
let node = codex[].node
let manifest = await node.fetchManifest(cid.get())
if manifest.isErr:
return err("Failed to fetch manifest: " & manifest.error.msg)
return ok(serde.toJson(manifest.get()))
except CancelledError:
return err("Failed to fetch manifest: download cancelled.")
proc process*(
self: ptr NodeDownloadRequest, codex: ptr CodexServer, onChunk: OnChunkHandler
): Future[Result[string, string]] {.async: (raises: []).} =
defer:
destroyShared(self)
case self.operation
of NodeDownloadMsgType.INIT:
let res = (await init(codex, self.cid, self.chunkSize, self.local))
if res.isErr:
error "Failed to INIT.", error = res.error
return err($res.error)
return res
of NodeDownloadMsgType.CHUNK:
let res = (await chunk(codex, self.cid, onChunk))
if res.isErr:
error "Failed to CHUNK.", error = res.error
return err($res.error)
return res
of NodeDownloadMsgType.STREAM:
let res = (
await stream(codex, self.cid, self.chunkSize, self.local, self.filepath, onChunk)
)
if res.isErr:
error "Failed to STREAM.", error = res.error
return err($res.error)
return res
of NodeDownloadMsgType.CANCEL:
let res = (await cancel(codex, self.cid))
if res.isErr:
error "Failed to CANCEL.", error = res.error
return err($res.error)
return res
of NodeDownloadMsgType.MANIFEST:
let res = (await manifest(codex, self.cid))
if res.isErr:
error "Failed to MANIFEST.", error = res.error
return err($res.error)
return res

View File

@ -0,0 +1,76 @@
## This file contains the lifecycle request type that will be handled.
import std/[options]
import chronos
import chronicles
import confutils
import codexdht/discv5/spr
import ../../../codex/conf
import ../../../codex/rest/json
import ../../../codex/node
from ../../../codex/codex import CodexServer, config, node
logScope:
topics = "codexlib codexlibinfo"
type NodeInfoMsgType* = enum
REPO
SPR
PEERID
type NodeInfoRequest* = object
operation: NodeInfoMsgType
proc createShared*(T: type NodeInfoRequest, op: NodeInfoMsgType): ptr type T =
var ret = createShared(T)
ret[].operation = op
return ret
proc destroyShared(self: ptr NodeInfoRequest) =
deallocShared(self)
proc getRepo(
codex: ptr CodexServer
): Future[Result[string, string]] {.async: (raises: []).} =
return ok($(codex[].config.dataDir))
proc getSpr(
codex: ptr CodexServer
): Future[Result[string, string]] {.async: (raises: []).} =
let spr = codex[].node.discovery.dhtRecord
if spr.isNone:
return err("Failed to get SPR: no SPR record found.")
return ok(spr.get.toURI)
proc getPeerId(
codex: ptr CodexServer
): Future[Result[string, string]] {.async: (raises: []).} =
return ok($codex[].node.switch.peerInfo.peerId)
proc process*(
self: ptr NodeInfoRequest, codex: ptr CodexServer
): Future[Result[string, string]] {.async: (raises: []).} =
defer:
destroyShared(self)
case self.operation
of REPO:
let res = (await getRepo(codex))
if res.isErr:
error "Failed to get REPO.", error = res.error
return err($res.error)
return res
of SPR:
let res = (await getSpr(codex))
if res.isErr:
error "Failed to get SPR.", error = res.error
return err($res.error)
return res
of PEERID:
let res = (await getPeerId(codex))
if res.isErr:
error "Failed to get PEERID.", error = res.error
return err($res.error)
return res

View File

@ -0,0 +1,188 @@
## This file contains the lifecycle request type that will be handled.
## CREATE_NODE: create a new Codex node with the provided config.json.
## START_NODE: start the provided Codex node.
## STOP_NODE: stop the provided Codex node.
import std/[options, json, strutils, net, os]
import codexdht/discv5/spr
import stew/shims/parseutils
import contractabi/address
import chronos
import chronicles
import results
import confutils
import confutils/std/net
import confutils/defs
import libp2p
import json_serialization
import json_serialization/std/[options, net]
import ../../alloc
import ../../../codex/conf
import ../../../codex/utils
import ../../../codex/utils/[keyutils, fileutils]
import ../../../codex/units
from ../../../codex/codex import CodexServer, new, start, stop, close
logScope:
topics = "codexlib codexliblifecycle"
type NodeLifecycleMsgType* = enum
CREATE_NODE
START_NODE
STOP_NODE
CLOSE_NODE
proc readValue*[T: InputFile | InputDir | OutPath | OutDir | OutFile](
r: var JsonReader, val: var T
) =
val = T(r.readValue(string))
proc readValue*(r: var JsonReader, val: var MultiAddress) =
val = MultiAddress.init(r.readValue(string)).get()
proc readValue*(r: var JsonReader, val: var NatConfig) =
let res = NatConfig.parse(r.readValue(string))
if res.isErr:
raise
newException(SerializationError, "Cannot parse the NAT config: " & res.error())
val = res.get()
proc readValue*(r: var JsonReader, val: var SignedPeerRecord) =
let res = SignedPeerRecord.parse(r.readValue(string))
if res.isErr:
raise
newException(SerializationError, "Cannot parse the signed peer: " & res.error())
val = res.get()
proc readValue*(r: var JsonReader, val: var ThreadCount) =
val = ThreadCount(r.readValue(int))
proc readValue*(r: var JsonReader, val: var NBytes) =
val = NBytes(r.readValue(int))
proc readValue*(r: var JsonReader, val: var Duration) =
var dur: Duration
let input = r.readValue(string)
let count = parseDuration(input, dur)
if count == 0:
raise newException(SerializationError, "Cannot parse the duration: " & input)
val = dur
proc readValue*(r: var JsonReader, val: var EthAddress) =
val = EthAddress.init(r.readValue(string)).get()
type NodeLifecycleRequest* = object
operation: NodeLifecycleMsgType
configJson: cstring
proc createShared*(
T: type NodeLifecycleRequest, op: NodeLifecycleMsgType, configJson: cstring = ""
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].configJson = configJson.alloc()
return ret
proc destroyShared(self: ptr NodeLifecycleRequest) =
deallocShared(self[].configJson)
deallocShared(self)
proc createCodex(
configJson: cstring
): Future[Result[CodexServer, string]] {.async: (raises: []).} =
var conf: CodexConf
try:
conf = CodexConf.load(
version = codexFullVersion,
envVarsPrefix = "codex",
cmdLine = @[],
secondarySources = proc(
config: CodexConf, sources: auto
) {.gcsafe, raises: [ConfigurationError].} =
if configJson.len > 0:
sources.addConfigFileContent(Json, $(configJson))
,
)
except ConfigurationError as e:
return err("Failed to create codex: unable to load configuration: " & e.msg)
conf.setupLogging()
try:
{.gcsafe.}:
updateLogLevel(conf.logLevel)
except ValueError as err:
return err("Failed to create codex: invalid value for log level: " & err.msg)
conf.setupMetrics()
if not (checkAndCreateDataDir((conf.dataDir).string)):
# We are unable to access/create data folder or data folder's
# permissions are insecure.
return err(
"Failed to create codex: unable to access/create data folder or data folder's permissions are insecure."
)
if not (checkAndCreateDataDir((conf.dataDir / "repo"))):
# We are unable to access/create data folder or data folder's
# permissions are insecure.
return err(
"Failed to create codex: unable to access/create data folder or data folder's permissions are insecure."
)
let keyPath =
if isAbsolute(conf.netPrivKeyFile):
conf.netPrivKeyFile
else:
conf.dataDir / conf.netPrivKeyFile
let privateKey = setupKey(keyPath)
if privateKey.isErr:
return err("Failed to create codex: unable to get the private key.")
let pk = privateKey.get()
conf.apiBindAddress = string.none
let server =
try:
CodexServer.new(conf, pk)
except Exception as exc:
return err("Failed to create codex: " & exc.msg)
return ok(server)
proc process*(
self: ptr NodeLifecycleRequest, codex: ptr CodexServer
): Future[Result[string, string]] {.async: (raises: []).} =
defer:
destroyShared(self)
case self.operation
of CREATE_NODE:
codex[] = (
await createCodex(
self.configJson # , self.appCallbacks
)
).valueOr:
error "Failed to CREATE_NODE.", error = error
return err($error)
of START_NODE:
try:
await codex[].start()
except Exception as e:
error "Failed to START_NODE.", error = e.msg
return err(e.msg)
of STOP_NODE:
try:
await codex[].stop()
except Exception as e:
error "Failed to STOP_NODE.", error = e.msg
return err(e.msg)
of CLOSE_NODE:
try:
await codex[].close()
except Exception as e:
error "Failed to STOP_NODE.", error = e.msg
return err(e.msg)
return ok("")

View File

@ -0,0 +1,95 @@
{.push raises: [].}
## This file contains the P2p request type that will be handled.
## CONNECT: connect to a peer with the provided peer ID and optional addresses.
import std/[options]
import chronos
import chronicles
import libp2p
import ../../alloc
import ../../../codex/node
from ../../../codex/codex import CodexServer, node
logScope:
topics = "codexlib codexlibp2p"
type NodeP2PMsgType* = enum
CONNECT
type NodeP2PRequest* = object
operation: NodeP2PMsgType
peerId: cstring
peerAddresses: seq[cstring]
proc createShared*(
T: type NodeP2PRequest,
op: NodeP2PMsgType,
peerId: cstring = "",
peerAddresses: seq[cstring] = @[],
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].peerId = peerId.alloc()
ret[].peerAddresses = peerAddresses
return ret
proc destroyShared(self: ptr NodeP2PRequest) =
deallocShared(self[].peerId)
deallocShared(self)
proc connect(
codex: ptr CodexServer, peerId: cstring, peerAddresses: seq[cstring] = @[]
): Future[Result[string, string]] {.async: (raises: []).} =
let node = codex[].node
let res = PeerId.init($peerId)
if res.isErr:
return err("Failed to connect to peer: invalid peer ID: " & $res.error())
let id = res.get()
let addresses =
if peerAddresses.len > 0:
var addrs: seq[MultiAddress]
for addrStr in peerAddresses:
let res = MultiAddress.init($addrStr)
if res.isOk:
addrs.add(res[])
else:
return err("Failed to connect to peer: invalid address: " & $addrStr)
addrs
else:
try:
let peerRecord = await node.findPeer(id)
if peerRecord.isNone:
return err("Failed to connect to peer: peer not found.")
peerRecord.get().addresses.mapIt(it.address)
except CancelledError:
return err("Failed to connect to peer: operation cancelled.")
except CatchableError as e:
return err("Failed to connect to peer: " & $e.msg)
try:
await node.connect(id, addresses)
except CancelledError:
return err("Failed to connect to peer: operation cancelled.")
except CatchableError as e:
return err("Failed to connect to peer: " & $e.msg)
return ok("")
proc process*(
self: ptr NodeP2PRequest, codex: ptr CodexServer
): Future[Result[string, string]] {.async: (raises: []).} =
defer:
destroyShared(self)
case self.operation
of NodeP2PMsgType.CONNECT:
let res = (await connect(codex, self.peerId, self.peerAddresses))
if res.isErr:
error "Failed to CONNECT.", error = res.error
return err($res.error)
return res

View File

@ -0,0 +1,180 @@
{.push raises: [].}
## This file contains the node storage request.
## 4 operations are available:
## - LIST: list all manifests stored in the node.
## - DELETE: Deletes either a single block or an entire dataset from the local node.
## - FETCH: download a file from the network to the local node.
## - SPACE: get the amount of space used by the local node.
## - EXISTS: check the existence of a cid in a node (local store).
import std/[options]
import chronos
import chronicles
import libp2p/stream/[lpstream]
import serde/json as serde
import ../../alloc
import ../../../codex/units
import ../../../codex/manifest
import ../../../codex/stores/repostore
from ../../../codex/codex import CodexServer, node, repoStore
from ../../../codex/node import
iterateManifests, fetchManifest, fetchDatasetAsyncTask, delete, hasLocalBlock
from libp2p import Cid, init, `$`
logScope:
topics = "codexlib codexlibstorage"
type NodeStorageMsgType* = enum
LIST
DELETE
FETCH
SPACE
EXISTS
type NodeStorageRequest* = object
operation: NodeStorageMsgType
cid: cstring
type StorageSpace = object
totalBlocks* {.serialize.}: Natural
quotaMaxBytes* {.serialize.}: NBytes
quotaUsedBytes* {.serialize.}: NBytes
quotaReservedBytes* {.serialize.}: NBytes
proc createShared*(
T: type NodeStorageRequest, op: NodeStorageMsgType, cid: cstring = ""
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].cid = cid.alloc()
return ret
proc destroyShared(self: ptr NodeStorageRequest) =
deallocShared(self[].cid)
deallocShared(self)
type ManifestWithCid = object
cid {.serialize.}: string
manifest {.serialize.}: Manifest
proc list(
codex: ptr CodexServer
): Future[Result[string, string]] {.async: (raises: []).} =
var manifests = newSeq[ManifestWithCid]()
proc onManifest(cid: Cid, manifest: Manifest) {.raises: [], gcsafe.} =
manifests.add(ManifestWithCid(cid: $cid, manifest: manifest))
try:
let node = codex[].node
await node.iterateManifests(onManifest)
except CancelledError:
return err("Failed to list manifests: cancelled operation.")
except CatchableError as err:
return err("Failed to list manifest: : " & err.msg)
return ok(serde.toJson(manifests))
proc delete(
codex: ptr CodexServer, cCid: cstring
): Future[Result[string, string]] {.async: (raises: []).} =
let cid = Cid.init($cCid)
if cid.isErr:
return err("Failed to delete the data: cannot parse cid: " & $cCid)
let node = codex[].node
try:
let res = await node.delete(cid.get())
if res.isErr:
return err("Failed to delete the data: " & res.error.msg)
except CancelledError:
return err("Failed to delete the data: cancelled operation.")
except CatchableError as err:
return err("Failed to delete the data: " & err.msg)
return ok("")
proc fetch(
codex: ptr CodexServer, cCid: cstring
): Future[Result[string, string]] {.async: (raises: []).} =
let cid = Cid.init($cCid)
if cid.isErr:
return err("Failed to fetch the data: cannot parse cid: " & $cCid)
try:
let node = codex[].node
let manifest = await node.fetchManifest(cid.get())
if manifest.isErr:
return err("Failed to fetch the data: " & manifest.error.msg)
node.fetchDatasetAsyncTask(manifest.get())
return ok(serde.toJson(manifest.get()))
except CancelledError:
return err("Failed to fetch the data: download cancelled.")
proc space(
codex: ptr CodexServer
): Future[Result[string, string]] {.async: (raises: []).} =
let repoStore = codex[].repoStore
let space = StorageSpace(
totalBlocks: repoStore.totalBlocks,
quotaMaxBytes: repoStore.quotaMaxBytes,
quotaUsedBytes: repoStore.quotaUsedBytes,
quotaReservedBytes: repoStore.quotaReservedBytes,
)
return ok(serde.toJson(space))
proc exists(
codex: ptr CodexServer, cCid: cstring
): Future[Result[string, string]] {.async: (raises: []).} =
let cid = Cid.init($cCid)
if cid.isErr:
return err("Failed to check the data existence: cannot parse cid: " & $cCid)
try:
let node = codex[].node
let exists = await node.hasLocalBlock(cid.get())
return ok($exists)
except CancelledError:
return err("Failed to check the data existence: operation cancelled.")
proc process*(
self: ptr NodeStorageRequest, codex: ptr CodexServer
): Future[Result[string, string]] {.async: (raises: []).} =
defer:
destroyShared(self)
case self.operation
of NodeStorageMsgType.LIST:
let res = (await list(codex))
if res.isErr:
error "Failed to LIST.", error = res.error
return err($res.error)
return res
of NodeStorageMsgType.DELETE:
let res = (await delete(codex, self.cid))
if res.isErr:
error "Failed to DELETE.", error = res.error
return err($res.error)
return res
of NodeStorageMsgType.FETCH:
let res = (await fetch(codex, self.cid))
if res.isErr:
error "Failed to FETCH.", error = res.error
return err($res.error)
return res
of NodeStorageMsgType.SPACE:
let res = (await space(codex))
if res.isErr:
error "Failed to SPACE.", error = res.error
return err($res.error)
return res
of NodeStorageMsgType.EXISTS:
let res = (await exists(codex, self.cid))
if res.isErr:
error "Failed to EXISTS.", error = res.error
return err($res.error)
return res

View File

@ -0,0 +1,372 @@
{.push raises: [].}
## This file contains the upload request.
## A session is created for each upload allowing to resume,
## pause and cancel uploads (using chunks).
##
## There are two ways to upload a file:
## 1. Via chunks: the filepath parameter is the data filename. Steps are:
## - INIT: creates a new upload session and returns its ID.
## - CHUNK: sends a chunk of data to the upload session.
## - FINALIZE: finalizes the upload and returns the CID of the uploaded file.
## - CANCEL: cancels the upload session.
##
## 2. Directly from a file path: the filepath has to be absolute.
## - INIT: creates a new upload session and returns its ID
## - FILE: starts the upload and returns the CID of the uploaded file
## - CANCEL: cancels the upload session.
import std/[options, os, mimetypes]
import chronos
import chronicles
import questionable
import questionable/results
import faststreams/inputs
import libp2p/stream/[bufferstream, lpstream]
import ../../alloc
import ../../../codex/units
import ../../../codex/codextypes
from ../../../codex/codex import CodexServer, node
from ../../../codex/node import store
from libp2p import Cid, `$`
logScope:
topics = "codexlib codexlibupload"
type NodeUploadMsgType* = enum
INIT
CHUNK
FINALIZE
CANCEL
FILE
type OnProgressHandler = proc(bytes: int): void {.gcsafe, raises: [].}
type NodeUploadRequest* = object
operation: NodeUploadMsgType
sessionId: cstring
filepath: cstring
chunk: seq[byte]
chunkSize: csize_t
type
UploadSessionId* = string
UploadSessionCount* = int
UploadSession* = object
stream: BufferStream
fut: Future[?!Cid]
filepath: string
chunkSize: int
onProgress: OnProgressHandler
var uploadSessions {.threadvar.}: Table[UploadSessionId, UploadSession]
var nexUploadSessionCount {.threadvar.}: UploadSessionCount
proc createShared*(
T: type NodeUploadRequest,
op: NodeUploadMsgType,
sessionId: cstring = "",
filepath: cstring = "",
chunk: seq[byte] = @[],
chunkSize: csize_t = 0,
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].sessionId = sessionId.alloc()
ret[].filepath = filepath.alloc()
ret[].chunk = chunk
ret[].chunkSize = chunkSize
return ret
proc destroyShared(self: ptr NodeUploadRequest) =
deallocShared(self[].filepath)
deallocShared(self[].sessionId)
deallocShared(self)
proc init(
codex: ptr CodexServer, filepath: cstring = "", chunkSize: csize_t = 0
): Future[Result[string, string]] {.async: (raises: []).} =
## Init a new session upload and return its ID.
## The session contains the future corresponding to the
## `node.store` call.
## The filepath can be:
## - the filename when uploading via chunks
## - the absolute path to a file when uploading directly.
## The mimetype is deduced from the filename extension.
##
## The chunkSize matches by default the block size used to store the file.
##
## A callback `onBlockStore` is provided to `node.store` to
## report the progress of the upload. This callback will check
## that an `onProgress` handler is set in the session
## and call it with the number of bytes stored each time a block
## is stored.
var filenameOpt, mimetypeOpt = string.none
if isAbsolute($filepath):
if not fileExists($filepath):
return err(
"Failed to create an upload session, the filepath does not exist: " & $filepath
)
if filepath != "":
let (_, name, ext) = splitFile($filepath)
filenameOpt = (name & ext).some
if ext != "":
let extNoDot =
if ext.len > 0:
ext[1 ..^ 1]
else:
""
let mime = newMimetypes()
let mimetypeStr = mime.getMimetype(extNoDot, "")
mimetypeOpt = if mimetypeStr == "": string.none else: mimetypeStr.some
let sessionId = $nexUploadSessionCount
nexUploadSessionCount.inc()
let stream = BufferStream.new()
let lpStream = LPStream(stream)
let node = codex[].node
let onBlockStored = proc(chunk: seq[byte]): void {.gcsafe, raises: [].} =
try:
if uploadSessions.contains($sessionId):
let session = uploadSessions[$sessionId]
if session.onProgress != nil:
session.onProgress(chunk.len)
except KeyError:
error "Failed to push progress update, session is not found: ",
sessionId = $sessionId
let blockSize =
if chunkSize.NBytes > 0.NBytes: chunkSize.NBytes else: DefaultBlockSize
let fut = node.store(lpStream, filenameOpt, mimetypeOpt, blockSize, onBlockStored)
uploadSessions[sessionId] = UploadSession(
stream: stream, fut: fut, filepath: $filepath, chunkSize: blockSize.int
)
return ok(sessionId)
proc chunk(
codex: ptr CodexServer, sessionId: cstring, chunk: seq[byte]
): Future[Result[string, string]] {.async: (raises: []).} =
## Upload a chunk of data to the session identified by sessionId.
## The chunk is pushed to the BufferStream of the session.
## If the chunk size is equal or greater than the session chunkSize,
## the `onProgress` callback is temporarily set to receive the progress
## from `onBlockStored` callback. This provide a way to report progress
## precisely when a block is stored.
## If the chunk size is smaller than the session chunkSize,
## the `onProgress` callback is not set because the LPStream will
## wait until enough data is received to form a block before storing it.
## The wrapper may then report the progress because the data is in the stream
## but not yet stored.
if not uploadSessions.contains($sessionId):
return err("Failed to upload the chunk, the session is not found: " & $sessionId)
var fut = newFuture[void]()
try:
let session = uploadSessions[$sessionId]
if chunk.len >= session.chunkSize:
uploadSessions[$sessionId].onProgress = proc(
bytes: int
): void {.gcsafe, raises: [].} =
fut.complete()
await session.stream.pushData(chunk)
else:
fut = session.stream.pushData(chunk)
await fut
uploadSessions[$sessionId].onProgress = nil
except KeyError:
return err("Failed to upload the chunk, the session is not found: " & $sessionId)
except LPError as e:
return err("Failed to upload the chunk, stream error: " & $e.msg)
except CancelledError:
return err("Failed to upload the chunk, operation cancelled.")
except CatchableError as e:
return err("Failed to upload the chunk: " & $e.msg)
finally:
if not fut.finished():
fut.cancelSoon()
return ok("")
proc finalize(
codex: ptr CodexServer, sessionId: cstring
): Future[Result[string, string]] {.async: (raises: []).} =
## Finalize the upload session identified by sessionId.
## This closes the BufferStream and waits for the `node.store` future
## to complete. It returns the CID of the uploaded file.
if not uploadSessions.contains($sessionId):
return
err("Failed to finalize the upload session, session not found: " & $sessionId)
var session: UploadSession
try:
session = uploadSessions[$sessionId]
await session.stream.pushEof()
let res = await session.fut
if res.isErr:
return err("Failed to finalize the upload session: " & res.error().msg)
return ok($res.get())
except KeyError:
return
err("Failed to finalize the upload session, invalid session ID: " & $sessionId)
except LPStreamError as e:
return err("Failed to finalize the upload session, stream error: " & $e.msg)
except CancelledError:
return err("Failed to finalize the upload session, operation cancelled")
except CatchableError as e:
return err("Failed to finalize the upload session: " & $e.msg)
finally:
if uploadSessions.contains($sessionId):
uploadSessions.del($sessionId)
if session.fut != nil and not session.fut.finished():
session.fut.cancelSoon()
proc cancel(
codex: ptr CodexServer, sessionId: cstring
): Future[Result[string, string]] {.async: (raises: []).} =
## Cancel the upload session identified by sessionId.
## This cancels the `node.store` future and removes the session
## from the table.
if not uploadSessions.contains($sessionId):
# Session not found, nothing to cancel
return ok("")
try:
let session = uploadSessions[$sessionId]
session.fut.cancelSoon()
except KeyError:
# Session not found, nothing to cancel
return ok("")
uploadSessions.del($sessionId)
return ok("")
proc streamFile(
filepath: string, stream: BufferStream, chunkSize: int
): Future[Result[void, string]] {.async: (raises: [CancelledError]).} =
## Streams a file from the given filepath using faststream.
## fsMultiSync cannot be used with chronos because of this warning:
## Warning: chronos backend uses nested calls to `waitFor` which
## is not supported by chronos - it is not recommended to use it until
## this has been resolved.
##
## Ideally when it is solved, we should use fsMultiSync or find a way to use async
## file I/O with chronos, see https://github.com/status-im/nim-chronos/issues/501.
try:
let inputStreamHandle = filepath.fileInput()
let inputStream = inputStreamHandle.implicitDeref
var buf = newSeq[byte](chunkSize)
while inputStream.readable:
let read = inputStream.readIntoEx(buf)
if read == 0:
break
await stream.pushData(buf[0 ..< read])
# let byt = inputStream.read
# await stream.pushData(@[byt])
return ok()
except IOError, OSError, LPStreamError:
let e = getCurrentException()
return err("Failed to stream the file: " & $e.msg)
proc file(
codex: ptr CodexServer, sessionId: cstring, onProgress: OnProgressHandler
): Future[Result[string, string]] {.async: (raises: []).} =
## Starts the file upload for the session identified by sessionId.
## Will call finalize when done and return the CID of the uploaded file.
##
## The onProgress callback is called with the number of bytes
## to report the progress of the upload.
if not uploadSessions.contains($sessionId):
return err("Failed to upload the file, invalid session ID: " & $sessionId)
var session: UploadSession
try:
uploadSessions[$sessionId].onProgress = onProgress
session = uploadSessions[$sessionId]
let res = await streamFile(session.filepath, session.stream, session.chunkSize)
if res.isErr:
return err("Failed to upload the file: " & res.error)
return await codex.finalize(sessionId)
except KeyError:
return err("Failed to upload the file, the session is not found: " & $sessionId)
except LPStreamError, IOError:
let e = getCurrentException()
return err("Failed to upload the file: " & $e.msg)
except CancelledError:
return err("Failed to upload the file, the operation is cancelled.")
except CatchableError as e:
return err("Failed to upload the file: " & $e.msg)
finally:
if uploadSessions.contains($sessionId):
uploadSessions.del($sessionId)
if session.fut != nil and not session.fut.finished():
session.fut.cancelSoon()
proc process*(
self: ptr NodeUploadRequest,
codex: ptr CodexServer,
onUploadProgress: OnProgressHandler = nil,
): Future[Result[string, string]] {.async: (raises: []).} =
defer:
destroyShared(self)
case self.operation
of NodeUploadMsgType.INIT:
let res = (await init(codex, self.filepath, self.chunkSize))
if res.isErr:
error "Failed to INIT.", error = res.error
return err($res.error)
return res
of NodeUploadMsgType.CHUNK:
let res = (await chunk(codex, self.sessionId, self.chunk))
if res.isErr:
error "Failed to CHUNK.", error = res.error
return err($res.error)
return res
of NodeUploadMsgType.FINALIZE:
let res = (await finalize(codex, self.sessionId))
if res.isErr:
error "Failed to FINALIZE.", error = res.error
return err($res.error)
return res
of NodeUploadMsgType.CANCEL:
let res = (await cancel(codex, self.sessionId))
if res.isErr:
error "Failed to CANCEL.", error = res.error
return err($res.error)
return res
of NodeUploadMsgType.FILE:
let res = (await file(codex, self.sessionId, onUploadProgress))
if res.isErr:
error "Failed to FILE.", error = res.error
return err($res.error)
return res

View File

@ -0,0 +1,14 @@
# JSON Event definition
#
# This file defines de JsonEvent type, which serves as the base
# for all event types in the library
#
# Reference specification:
# https://github.com/vacp2p/rfc/blob/master/content/docs/rfcs/36/README.md#jsonsignal-type
type JsonEvent* = ref object of RootObj
eventType* {.requiresInit.}: string
method `$`*(jsonEvent: JsonEvent): string {.base.} =
discard
# All events should implement this

62
library/ffi_types.nim Normal file
View File

@ -0,0 +1,62 @@
# FFI Types and Utilities
#
# This file defines the core types and utilities for the library's foreign
# function interface (FFI), enabling interoperability with external code.
################################################################################
### Exported types
import results
type CodexCallback* = proc(
callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].}
const RET_OK*: cint = 0
const RET_ERR*: cint = 1
const RET_MISSING_CALLBACK*: cint = 2
const RET_PROGRESS*: cint = 3
## Returns RET_OK as acknowledgment and call the callback
## with RET_OK code and the provided message.
proc success*(callback: CodexCallback, msg: string, userData: pointer): cint =
callback(RET_OK, cast[ptr cchar](msg), cast[csize_t](len(msg)), userData)
return RET_OK
## Returns RET_ERR as acknowledgment and call the callback
## with RET_ERR code and the provided message.
proc error*(callback: CodexCallback, msg: string, userData: pointer): cint =
let msg = "libcodex error: " & msg
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
## Returns RET_OK as acknowledgment if the result is ok.
## If not, return RET_ERR and call the callback with the error message.
proc okOrError*[T](
callback: CodexCallback, res: Result[T, string], userData: pointer
): cint =
if res.isOk:
return RET_OK
return callback.error($res.error, userData)
### End of exported types
################################################################################
################################################################################
### FFI utils
template foreignThreadGc*(body: untyped) =
when declared(setupForeignThreadGc):
setupForeignThreadGc()
body
when declared(tearDownForeignThreadGc):
tearDownForeignThreadGc()
type onDone* = proc()
### End of FFI utils
################################################################################

206
library/libcodex.h Normal file
View File

@ -0,0 +1,206 @@
/**
* libcodex.h - C Interface for Example Library
*
* This header provides the public API for libcodex
*
* To see the auto-generated header by Nim, run `make libcodex` from the
* repository root. The generated file will be created at:
* nimcache/release/libcodex/libcodex.h
*/
#ifndef __libcodex__
#define __libcodex__
#include <stddef.h>
#include <stdint.h>
// The possible returned values for the functions that return int
#define RET_OK 0
#define RET_ERR 1
#define RET_MISSING_CALLBACK 2
#define RET_PROGRESS 3
#ifdef __cplusplus
extern "C" {
#endif
typedef void (*CodexCallback) (int callerRet, const char* msg, size_t len, void* userData);
void* codex_new(
const char* configJson,
CodexCallback callback,
void* userData);
int codex_version(
void* ctx,
CodexCallback callback,
void* userData);
int codex_revision(
void* ctx,
CodexCallback callback,
void* userData);
int codex_repo(
void* ctx,
CodexCallback callback,
void* userData);
int codex_debug(
void* ctx,
CodexCallback callback,
void* userData);
int codex_spr(
void* ctx,
CodexCallback callback,
void* userData);
int codex_peer_id(
void* ctx,
CodexCallback callback,
void* userData);
int codex_log_level(
void* ctx,
const char* logLevel,
CodexCallback callback,
void* userData);
int codex_connect(
void* ctx,
const char* peerId,
const char** peerAddresses,
size_t peerAddressesSize,
CodexCallback callback,
void* userData);
int codex_peer_debug(
void* ctx,
const char* peerId,
CodexCallback callback,
void* userData);
int codex_upload_init(
void* ctx,
const char* filepath,
size_t chunkSize,
CodexCallback callback,
void* userData);
int codex_upload_chunk(
void* ctx,
const char* sessionId,
const uint8_t* chunk,
size_t len,
CodexCallback callback,
void* userData);
int codex_upload_finalize(
void* ctx,
const char* sessionId,
CodexCallback callback,
void* userData);
int codex_upload_cancel(
void* ctx,
const char* sessionId,
CodexCallback callback,
void* userData);
int codex_upload_file(
void* ctx,
const char* sessionId,
CodexCallback callback,
void* userData);
int codex_download_stream(
void* ctx,
const char* cid,
size_t chunkSize,
bool local,
const char* filepath,
CodexCallback callback,
void* userData);
int codex_download_init(
void* ctx,
const char* cid,
size_t chunkSize,
bool local,
CodexCallback callback,
void* userData);
int codex_download_chunk(
void* ctx,
const char* cid,
CodexCallback callback,
void* userData);
int codex_download_cancel(
void* ctx,
const char* cid,
CodexCallback callback,
void* userData);
int codex_download_manifest(
void* ctx,
const char* cid,
CodexCallback callback,
void* userData);
int codex_storage_list(
void* ctx,
CodexCallback callback,
void* userData);
int codex_storage_space(
void* ctx,
CodexCallback callback,
void* userData);
int codex_storage_delete(
void* ctx,
const char* cid,
CodexCallback callback,
void* userData);
int codex_storage_fetch(
void* ctx,
const char* cid,
CodexCallback callback,
void* userData);
int codex_storage_exists(
void* ctx,
const char* cid,
CodexCallback callback,
void* userData);
int codex_start(void* ctx,
CodexCallback callback,
void* userData);
int codex_stop(void* ctx,
CodexCallback callback,
void* userData);
int codex_close(void* ctx,
CodexCallback callback,
void* userData);
// Destroys an instance of a codex node created with codex_new
int codex_destroy(void* ctx,
CodexCallback callback,
void* userData);
void codex_set_event_callback(void* ctx,
CodexCallback callback,
void* userData);
#ifdef __cplusplus
}
#endif
#endif /* __libcodex__ */

565
library/libcodex.nim Normal file
View File

@ -0,0 +1,565 @@
# libcodex.nim - C-exported interface for the Codex shared library
#
# This file implements the public C API for libcodex.
# It acts as the bridge between C programs and the internal Nim implementation.
#
# This file defines:
# - Initialization logic for the Nim runtime (once per process)
# - Thread-safe exported procs callable from C
# - Callback registration and invocation for asynchronous communication
# cdecl is C declaration calling convention.
# Its the standard way C compilers expect functions to behave:
# 1- Caller cleans up the stack after the call
# 2- Symbol names are exported in a predictable way
# In other termes, it is a glue that makes Nim functions callable as normal C functions.
{.pragma: exported, exportc, cdecl, raises: [].}
{.pragma: callback, cdecl, raises: [], gcsafe.}
# Ensure code is position-independent so it can be built into a shared library (.so).
# In other terms, the code that can run no matter where its placed in memory.
{.passc: "-fPIC".}
when defined(linux):
# Define the canonical name for this library
{.passl: "-Wl,-soname,libcodex.so".}
import std/[atomics]
import chronicles
import chronos
import chronos/threadsync
import ./codex_context
import ./codex_thread_requests/codex_thread_request
import ./codex_thread_requests/requests/node_lifecycle_request
import ./codex_thread_requests/requests/node_info_request
import ./codex_thread_requests/requests/node_debug_request
import ./codex_thread_requests/requests/node_p2p_request
import ./codex_thread_requests/requests/node_upload_request
import ./codex_thread_requests/requests/node_download_request
import ./codex_thread_requests/requests/node_storage_request
import ./ffi_types
from ../codex/conf import codexVersion
logScope:
topics = "codexlib"
template checkLibcodexParams*(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
) =
if not isNil(ctx):
ctx[].userData = userData
if isNil(callback):
return RET_MISSING_CALLBACK
# From Nim doc:
# "the C targets require you to initialize Nim's internals, which is done calling a NimMain function."
# "The name NimMain can be influenced via the --nimMainPrefix:prefix switch."
# "Use --nimMainPrefix:MyLib and the function to call is named MyLibNimMain."
proc libcodexNimMain() {.importc.}
# Atomic flag to prevent multiple initializations
var initialized: Atomic[bool]
if defined(android):
# Redirect chronicles to Android System logs
when compiles(defaultChroniclesStream.outputs[0].writer):
defaultChroniclesStream.outputs[0].writer = proc(
logLevel: LogLevel, msg: LogOutputStr
) {.raises: [].} =
echo logLevel, msg
# Initializes the Nim runtime and foreign-thread GC
proc initializeLibrary() {.exported.} =
if not initialized.exchange(true):
## Every Nim library must call `<prefix>NimMain()` once
libcodexNimMain()
when declared(setupForeignThreadGc):
setupForeignThreadGc()
when declared(nimGC_setStackBottom):
var locals {.volatile, noinit.}: pointer
locals = addr(locals)
nimGC_setStackBottom(locals)
proc codex_new(
configJson: cstring, callback: CodexCallback, userData: pointer
): pointer {.dynlib, exported.} =
initializeLibrary()
if isNil(callback):
error "Failed to create codex instance: the callback is missing."
return nil
var ctx = codex_context.createCodexContext().valueOr:
let msg = $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil
ctx.userData = userData
let reqContent =
NodeLifecycleRequest.createShared(NodeLifecycleMsgType.CREATE_NODE, configJson)
codex_context.sendRequestToCodexThread(
ctx, RequestType.LIFECYCLE, reqContent, callback, userData
).isOkOr:
let msg = $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil
return ctx
proc codex_version(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
callback(
RET_OK,
cast[ptr cchar](conf.codexVersion),
cast[csize_t](len(conf.codexVersion)),
userData,
)
return RET_OK
proc codex_revision(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
callback(
RET_OK,
cast[ptr cchar](conf.codexRevision),
cast[csize_t](len(conf.codexRevision)),
userData,
)
return RET_OK
proc codex_repo(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent = NodeInfoRequest.createShared(NodeInfoMsgType.REPO)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.INFO, reqContent, callback, userData
)
return callback.okOrError(res, userData)
proc codex_debug(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent = NodeDebugRequest.createShared(NodeDebugMsgType.DEBUG)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.DEBUG, reqContent, callback, userData
)
return callback.okOrError(res, userData)
proc codex_spr(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent = NodeInfoRequest.createShared(NodeInfoMsgType.SPR)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.INFO, reqContent, callback, userData
)
return callback.okOrError(res, userData)
proc codex_peer_id(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent = NodeInfoRequest.createShared(NodeInfoMsgType.PEERID)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.INFO, reqContent, callback, userData
)
return callback.okOrError(res, userData)
## Set the log level of the library at runtime.
## It uses updateLogLevel which is a synchronous proc and
## cannot be used inside an async context because of gcsafe issue.
proc codex_log_level(
ctx: ptr CodexContext, logLevel: cstring, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent =
NodeDebugRequest.createShared(NodeDebugMsgType.LOG_LEVEL, logLevel = logLevel)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.DEBUG, reqContent, callback, userData
)
return callback.okOrError(res, userData)
proc codex_connect(
ctx: ptr CodexContext,
peerId: cstring,
peerAddressesPtr: ptr cstring,
peerAddressesLength: csize_t,
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
var peerAddresses = newSeq[cstring](peerAddressesLength)
let peers = cast[ptr UncheckedArray[cstring]](peerAddressesPtr)
for i in 0 ..< peerAddressesLength:
peerAddresses[i] = peers[i]
let reqContent = NodeP2PRequest.createShared(
NodeP2PMsgType.CONNECT, peerId = peerId, peerAddresses = peerAddresses
)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.P2P, reqContent, callback, userData
)
return callback.okOrError(res, userData)
proc codex_peer_debug(
ctx: ptr CodexContext, peerId: cstring, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent = NodeDebugRequest.createShared(NodeDebugMsgType.PEER, peerId = peerId)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.DEBUG, reqContent, callback, userData
)
return callback.okOrError(res, userData)
proc codex_close(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent = NodeLifecycleRequest.createShared(NodeLifecycleMsgType.CLOSE_NODE)
var res = codex_context.sendRequestToCodexThread(
ctx, RequestType.LIFECYCLE, reqContent, callback, userData
)
if res.isErr:
return callback.error(res.error, userData)
return callback.okOrError(res, userData)
proc codex_destroy(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let res = codex_context.destroyCodexContext(ctx)
if res.isErr:
return RET_ERR
return RET_OK
proc codex_upload_init(
ctx: ptr CodexContext,
filepath: cstring,
chunkSize: csize_t,
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent = NodeUploadRequest.createShared(
NodeUploadMsgType.INIT, filepath = filepath, chunkSize = chunkSize
)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.UPLOAD, reqContent, callback, userData
)
return callback.okOrError(res, userData)
proc codex_upload_chunk(
ctx: ptr CodexContext,
sessionId: cstring,
data: ptr byte,
len: csize_t,
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let chunk = newSeq[byte](len)
copyMem(addr chunk[0], data, len)
let reqContent = NodeUploadRequest.createShared(
NodeUploadMsgType.CHUNK, sessionId = sessionId, chunk = chunk
)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.UPLOAD, reqContent, callback, userData
)
return callback.okOrError(res, userData)
proc codex_upload_finalize(
ctx: ptr CodexContext,
sessionId: cstring,
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent =
NodeUploadRequest.createShared(NodeUploadMsgType.FINALIZE, sessionId = sessionId)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.UPLOAD, reqContent, callback, userData
)
return callback.okOrError(res, userData)
proc codex_upload_cancel(
ctx: ptr CodexContext,
sessionId: cstring,
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent =
NodeUploadRequest.createShared(NodeUploadMsgType.CANCEL, sessionId = sessionId)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.UPLOAD, reqContent, callback, userData
)
return callback.okOrError(res, userData)
proc codex_upload_file(
ctx: ptr CodexContext,
sessionId: cstring,
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent =
NodeUploadRequest.createShared(NodeUploadMsgType.FILE, sessionId = sessionId)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.UPLOAD, reqContent, callback, userData
)
return callback.okOrError(res, userData)
proc codex_download_init(
ctx: ptr CodexContext,
cid: cstring,
chunkSize: csize_t,
local: bool,
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let req = NodeDownloadRequest.createShared(
NodeDownloadMsgType.INIT, cid = cid, chunkSize = chunkSize, local = local
)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.DOWNLOAD, req, callback, userData
)
return callback.okOrError(res, userData)
proc codex_download_chunk(
ctx: ptr CodexContext, cid: cstring, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let req = NodeDownloadRequest.createShared(NodeDownloadMsgType.CHUNK, cid = cid)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.DOWNLOAD, req, callback, userData
)
return callback.okOrError(res, userData)
proc codex_download_stream(
ctx: ptr CodexContext,
cid: cstring,
chunkSize: csize_t,
local: bool,
filepath: cstring,
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let req = NodeDownloadRequest.createShared(
NodeDownloadMsgType.STREAM,
cid = cid,
chunkSize = chunkSize,
local = local,
filepath = filepath,
)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.DOWNLOAD, req, callback, userData
)
return callback.okOrError(res, userData)
proc codex_download_cancel(
ctx: ptr CodexContext, cid: cstring, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let req = NodeDownloadRequest.createShared(NodeDownloadMsgType.CANCEL, cid = cid)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.DOWNLOAD, req, callback, userData
)
return callback.okOrError(res, userData)
proc codex_download_manifest(
ctx: ptr CodexContext, cid: cstring, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let req = NodeDownloadRequest.createShared(NodeDownloadMsgType.MANIFEST, cid = cid)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.DOWNLOAD, req, callback, userData
)
return callback.okOrError(res, userData)
proc codex_storage_list(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let req = NodeStorageRequest.createShared(NodeStorageMsgType.LIST)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.STORAGE, req, callback, userData
)
return callback.okOrError(res, userData)
proc codex_storage_space(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let req = NodeStorageRequest.createShared(NodeStorageMsgType.SPACE)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.STORAGE, req, callback, userData
)
return callback.okOrError(res, userData)
proc codex_storage_delete(
ctx: ptr CodexContext, cid: cstring, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let req = NodeStorageRequest.createShared(NodeStorageMsgType.DELETE, cid = cid)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.STORAGE, req, callback, userData
)
return callback.okOrError(res, userData)
proc codex_storage_fetch(
ctx: ptr CodexContext, cid: cstring, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let req = NodeStorageRequest.createShared(NodeStorageMsgType.FETCH, cid = cid)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.STORAGE, req, callback, userData
)
return callback.okOrError(res, userData)
proc codex_storage_exists(
ctx: ptr CodexContext, cid: cstring, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let req = NodeStorageRequest.createShared(NodeStorageMsgType.EXISTS, cid = cid)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.STORAGE, req, callback, userData
)
return callback.okOrError(res, userData)
proc codex_start(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent: ptr NodeLifecycleRequest =
NodeLifecycleRequest.createShared(NodeLifecycleMsgType.START_NODE)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.LIFECYCLE, reqContent, callback, userData
)
return callback.okOrError(res, userData)
proc codex_stop(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent: ptr NodeLifecycleRequest =
NodeLifecycleRequest.createShared(NodeLifecycleMsgType.STOP_NODE)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.LIFECYCLE, reqContent, callback, userData
)
return callback.okOrError(res, userData)
proc codex_set_event_callback(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
) {.dynlib, exportc.} =
initializeLibrary()
ctx[].eventCallback = cast[pointer](callback)
ctx[].eventUserData = userData

View File

@ -33,11 +33,18 @@ proc advance*(clock: MockClock, seconds: int64) =
method now*(clock: MockClock): SecondsSince1970 = method now*(clock: MockClock): SecondsSince1970 =
clock.time clock.time
method waitUntil*(clock: MockClock, time: SecondsSince1970) {.async.} = method waitUntil*(
if time > clock.now(): clock: MockClock, time: SecondsSince1970
let future = newFuture[void]() ) {.async: (raises: [CancelledError]).} =
clock.waiting.add(Waiting(until: time, future: future)) try:
await future if time > clock.now():
let future = newFuture[void]()
clock.waiting.add(Waiting(until: time, future: future))
await future
except CancelledError as e:
raise e
except Exception as e:
discard
proc isWaiting*(clock: MockClock): bool = proc isWaiting*(clock: MockClock): bool =
clock.waiting.len > 0 clock.waiting.len > 0

View File

@ -51,7 +51,8 @@ proc ethAccount*(node: CodexProcess): Address =
proc apiUrl*(node: CodexProcess): string = proc apiUrl*(node: CodexProcess): string =
let config = CodexConf.load(cmdLine = node.arguments, quitOnFailure = false) let config = CodexConf.load(cmdLine = node.arguments, quitOnFailure = false)
return "http://" & config.apiBindAddress & ":" & $config.apiPort & "/api/codex/v1" return
"http://" & config.apiBindAddress.get() & ":" & $config.apiPort & "/api/codex/v1"
proc client*(node: CodexProcess): CodexClient = proc client*(node: CodexProcess): CodexClient =
if client =? node.client: if client =? node.client: