From bd360322515addd487c8f97fc4447bf25eda6c99 Mon Sep 17 00:00:00 2001 From: Arnaud Date: Thu, 13 Nov 2025 11:34:09 +0400 Subject: [PATCH] feat: add c binding (#1322) Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com> --- Makefile | 29 + README.md | 50 + build.nims | 44 + codex.nim | 12 +- codex/clock.nim | 6 +- codex/codex.nim | 61 +- codex/conf.nim | 122 ++- codex/contracts/clock.nim | 6 +- codex/discovery.nim | 6 + codex/node.nim | 6 + codex/stores/repostore/store.nim | 2 - codex/validation.nim | 10 +- examples/golang/README.md | 24 + examples/golang/codex.go | 885 ++++++++++++++++++ examples/golang/hello.txt | 1 + library/README.md | 37 + library/alloc.nim | 42 + library/codex_context.nim | 225 +++++ .../codex_thread_request.nim | 126 +++ .../requests/node_debug_request.nim | 126 +++ .../requests/node_download_request.nim | 336 +++++++ .../requests/node_info_request.nim | 76 ++ .../requests/node_lifecycle_request.nim | 188 ++++ .../requests/node_p2p_request.nim | 95 ++ .../requests/node_storage_request.nim | 180 ++++ .../requests/node_upload_request.nim | 372 ++++++++ library/events/json_base_event.nim | 14 + library/ffi_types.nim | 62 ++ library/libcodex.h | 206 ++++ library/libcodex.nim | 565 +++++++++++ tests/codex/helpers/mockclock.nim | 17 +- tests/integration/codexprocess.nim | 3 +- 32 files changed, 3862 insertions(+), 72 deletions(-) create mode 100644 examples/golang/README.md create mode 100644 examples/golang/codex.go create mode 100644 examples/golang/hello.txt create mode 100644 library/README.md create mode 100644 library/alloc.nim create mode 100644 library/codex_context.nim create mode 100644 library/codex_thread_requests/codex_thread_request.nim create mode 100644 library/codex_thread_requests/requests/node_debug_request.nim create mode 100644 library/codex_thread_requests/requests/node_download_request.nim create mode 100644 library/codex_thread_requests/requests/node_info_request.nim create mode 100644 library/codex_thread_requests/requests/node_lifecycle_request.nim create mode 100644 library/codex_thread_requests/requests/node_p2p_request.nim create mode 100644 library/codex_thread_requests/requests/node_storage_request.nim create mode 100644 library/codex_thread_requests/requests/node_upload_request.nim create mode 100644 library/events/json_base_event.nim create mode 100644 library/ffi_types.nim create mode 100644 library/libcodex.h create mode 100644 library/libcodex.nim diff --git a/Makefile b/Makefile index f39a3394..4ec12fb5 100644 --- a/Makefile +++ b/Makefile @@ -232,6 +232,7 @@ format: $(NPH) *.nim $(NPH) codex/ $(NPH) tests/ + $(NPH) library/ clean-nph: rm -f $(NPH) @@ -242,4 +243,32 @@ print-nph-path: clean: | clean-nph +################ +## C Bindings ## +################ +.PHONY: libcodex + +STATIC ?= 0 + +ifneq ($(strip $(CODEX_LIB_PARAMS)),) +NIM_PARAMS := $(NIM_PARAMS) $(CODEX_LIB_PARAMS) +endif + +libcodex: + $(MAKE) deps + rm -f build/libcodex* + +ifeq ($(STATIC), 1) + echo -e $(BUILD_MSG) "build/$@.a" && \ + $(ENV_SCRIPT) nim libcodexStatic $(NIM_PARAMS) -d:LeopardCmakeFlags="\"-DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_BUILD_TYPE=Release\"" codex.nims +else ifeq ($(detected_OS),Windows) + echo -e $(BUILD_MSG) "build/$@.dll" && \ + $(ENV_SCRIPT) nim libcodexDynamic $(NIM_PARAMS) -d:LeopardCmakeFlags="\"-G \\\"MSYS Makefiles\\\" -DCMAKE_BUILD_TYPE=Release\"" codex.nims +else ifeq ($(detected_OS),macOS) + echo -e $(BUILD_MSG) "build/$@.dylib" && \ + $(ENV_SCRIPT) nim libcodexDynamic $(NIM_PARAMS) -d:LeopardCmakeFlags="\"-DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_BUILD_TYPE=Release\"" codex.nims +else + echo -e $(BUILD_MSG) "build/$@.so" && \ + $(ENV_SCRIPT) nim libcodexDynamic $(NIM_PARAMS) -d:LeopardCmakeFlags="\"-DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_BUILD_TYPE=Release\"" codex.nims +endif endif # "variables.mk" was not included diff --git a/README.md b/README.md index 2a15051f..78b4e6f5 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,56 @@ To get acquainted with Codex, consider: The client exposes a REST API that can be used to interact with the clients. Overview of the API can be found on [api.codex.storage](https://api.codex.storage). +## Bindings + +Codex provides a C API that can be wrapped by other languages. The bindings is located in the `library` folder. +Currently, only a Go binding is included. + +### Build the C library + +```bash +make libcodex +``` + +This produces the shared library under `build/`. + +### Run the Go example + +Build the Go example: + +```bash +go build -o codex-go examples/golang/codex.go +``` + +Export the library path: + +```bash +export LD_LIBRARY_PATH=build +``` + +Run the example: + +```bash +./codex-go +``` + +### Static vs Dynamic build + +By default, Codex builds a dynamic library (`libcodex.so`), which you can load at runtime. +If you prefer a static library (`libcodex.a`), set the `STATIC` flag: + +```bash +# Build dynamic (default) +make libcodex + +# Build static +make STATIC=1 libcodex +``` + +### Limitation + +Callbacks must be fast and non-blocking; otherwise, the working thread will hang and prevent other requests from being processed. + ## Contributing and development Feel free to dive in, contributions are welcomed! Open an issue or submit PRs. diff --git a/build.nims b/build.nims index 88660321..dcfe5c13 100644 --- a/build.nims +++ b/build.nims @@ -25,6 +25,30 @@ proc buildBinary(name: string, srcDir = "./", params = "", lang = "c") = exec(cmd) +proc buildLibrary(name: string, srcDir = "./", params = "", `type` = "dynamic") = + if not dirExists "build": + mkDir "build" + + if `type` == "dynamic": + let lib_name = ( + when defined(windows): name & ".dll" + elif defined(macosx): name & ".dylib" + else: name & ".so" + ) + exec "nim c" & " --out:build/" & lib_name & + " --threads:on --app:lib --opt:size --noMain --mm:refc --header --d:metrics " & + "--nimMainPrefix:libcodex -d:noSignalHandler " & + "-d:LeopardExtraCompilerFlags=-fPIC " & "-d:chronicles_runtime_filtering " & + "-d:chronicles_log_level=TRACE " & params & " " & srcDir & name & ".nim" + else: + exec "nim c" & " --out:build/" & name & + ".a --threads:on --app:staticlib --opt:size --noMain --mm:refc --header --d:metrics " & + "--nimMainPrefix:libcodex -d:noSignalHandler " & + "-d:LeopardExtraCompilerFlags=-fPIC " & + "-d:chronicles_runtime_filtering " & + "-d:chronicles_log_level=TRACE " & + params & " " & srcDir & name & ".nim" + proc test(name: string, srcDir = "tests/", params = "", lang = "c") = buildBinary name, srcDir, params exec "build/" & name @@ -121,3 +145,23 @@ task showCoverage, "open coverage html": echo " ======== Opening HTML coverage report in browser... ======== " if findExe("open") != "": exec("open coverage/report/index.html") + +task libcodexDynamic, "Generate bindings": + var params = "" + when compiles(commandLineParams): + for param in commandLineParams(): + if param.len > 0 and param.startsWith("-"): + params.add " " & param + + let name = "libcodex" + buildLibrary name, "library/", params, "dynamic" + +task libcodexStatic, "Generate bindings": + var params = "" + when compiles(commandLineParams): + for param in commandLineParams(): + if param.len > 0 and param.startsWith("-"): + params.add " " & param + + let name = "libcodex" + buildLibrary name, "library/", params, "static" diff --git a/codex.nim b/codex.nim index 7749bdee..b3e40608 100644 --- a/codex.nim +++ b/codex.nim @@ -54,6 +54,16 @@ when isMainModule: , ) config.setupLogging() + + try: + updateLogLevel(config.logLevel) + except ValueError as err: + try: + stderr.write "Invalid value for --log-level. " & err.msg & "\n" + except IOError: + echo "Invalid value for --log-level. " & err.msg + quit QuitFailure + config.setupMetrics() if not (checkAndCreateDataDir((config.dataDir).string)): @@ -94,7 +104,7 @@ when isMainModule: ## Ctrl+C handling proc doShutdown() = - shutdown = server.stop() + shutdown = server.shutdown() state = CodexStatus.Stopping notice "Stopping Codex" diff --git a/codex/clock.nim b/codex/clock.nim index c02e04aa..5c63d15e 100644 --- a/codex/clock.nim +++ b/codex/clock.nim @@ -1,3 +1,5 @@ +{.push raises: [].} + import pkg/chronos import pkg/stew/endians2 import pkg/upraises @@ -11,7 +13,9 @@ type method now*(clock: Clock): SecondsSince1970 {.base, gcsafe, upraises: [].} = raiseAssert "not implemented" -method waitUntil*(clock: Clock, time: SecondsSince1970) {.base, async.} = +method waitUntil*( + clock: Clock, time: SecondsSince1970 +) {.base, async: (raises: [CancelledError]).} = raiseAssert "not implemented" method start*(clock: Clock) {.base, async.} = diff --git a/codex/codex.nim b/codex/codex.nim index 4cb6cfb8..682fe9b5 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -57,10 +57,20 @@ type repoStore: RepoStore maintenance: BlockMaintainer taskpool: Taskpool + isStarted: bool CodexPrivateKey* = libp2p.PrivateKey # alias EthWallet = ethers.Wallet +func config*(self: CodexServer): CodexConf = + return self.config + +func node*(self: CodexServer): CodexNodeRef = + return self.codexNode + +func repoStore*(self: CodexServer): RepoStore = + return self.repoStore + proc waitForSync(provider: Provider): Future[void] {.async.} = var sleepTime = 1 trace "Checking sync state of Ethereum provider..." @@ -159,9 +169,13 @@ proc bootstrapInteractions(s: CodexServer): Future[void] {.async.} = s.codexNode.contracts = (client, host, validator) proc start*(s: CodexServer) {.async.} = - trace "Starting codex node", config = $s.config + if s.isStarted: + warn "Codex server already started, skipping" + return + trace "Starting codex node", config = $s.config await s.repoStore.start() + s.maintenance.start() await s.codexNode.switch.start() @@ -175,27 +189,55 @@ proc start*(s: CodexServer) {.async.} = await s.bootstrapInteractions() await s.codexNode.start() - s.restServer.start() + + if s.restServer != nil: + s.restServer.start() + + s.isStarted = true proc stop*(s: CodexServer) {.async.} = + if not s.isStarted: + warn "Codex is not started" + return + notice "Stopping codex node" - let res = await noCancel allFinishedFailed[void]( + var futures = @[ - s.restServer.stop(), s.codexNode.switch.stop(), s.codexNode.stop(), s.repoStore.stop(), s.maintenance.stop(), ] - ) + + if s.restServer != nil: + futures.add(s.restServer.stop()) + + let res = await noCancel allFinishedFailed[void](futures) if res.failure.len > 0: error "Failed to stop codex node", failures = res.failure.len raiseAssert "Failed to stop codex node" +proc close*(s: CodexServer) {.async.} = + var futures = @[s.codexNode.close(), s.repoStore.close()] + + let res = await noCancel allFinishedFailed[void](futures) + if not s.taskpool.isNil: - s.taskpool.shutdown() + try: + s.taskpool.shutdown() + except Exception as exc: + error "Failed to stop the taskpool", failures = res.failure.len + raiseAssert("Failure in taskpool shutdown:" & exc.msg) + + if res.failure.len > 0: + error "Failed to close codex node", failures = res.failure.len + raiseAssert "Failed to close codex node" + +proc shutdown*(server: CodexServer) {.async.} = + await server.stop() + await server.close() proc new*( T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey @@ -295,7 +337,7 @@ proc new*( ) peerStore = PeerCtxStore.new() - pendingBlocks = PendingBlocksManager.new() + pendingBlocks = PendingBlocksManager.new(retries = config.blockRetries) advertiser = Advertiser.new(repoStore, discovery) blockDiscovery = DiscoveryEngine.new(repoStore, peerStore, network, discovery, pendingBlocks) @@ -320,10 +362,13 @@ proc new*( taskPool = taskpool, ) + var restServer: RestServerRef = nil + + if config.apiBindAddress.isSome: restServer = RestServerRef .new( codexNode.initRestApi(config, repoStore, config.apiCorsAllowedOrigin), - initTAddress(config.apiBindAddress, config.apiPort), + initTAddress(config.apiBindAddress.get(), config.apiPort), bufferSize = (1024 * 64), maxRequestBodySize = int.high, ) diff --git a/codex/conf.nim b/codex/conf.nim index 77ef96ca..206a3e04 100644 --- a/codex/conf.nim +++ b/codex/conf.nim @@ -34,6 +34,7 @@ import pkg/libp2p import pkg/ethers import pkg/questionable import pkg/questionable/results +import pkg/stew/base64 import ./codextypes import ./discovery @@ -46,13 +47,14 @@ import ./utils/natutils from ./contracts/config import DefaultRequestCacheSize, DefaultMaxPriorityFeePerGas from ./validationconfig import MaxSlots, ValidationGroups +from ./blockexchange/engine/pendingblocks import DefaultBlockRetries export units, net, codextypes, logutils, completeCmdArg, parseCmdArg, NatConfig export ValidationGroups, MaxSlots export DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockInterval, DefaultNumBlocksPerInterval, - DefaultRequestCacheSize, DefaultMaxPriorityFeePerGas + DefaultRequestCacheSize, DefaultMaxPriorityFeePerGas, DefaultBlockRetries type ThreadCount* = distinct Natural @@ -202,8 +204,10 @@ type .}: string apiBindAddress* {. - desc: "The REST API bind address", defaultValue: "127.0.0.1", name: "api-bindaddr" - .}: string + desc: "The REST API bind address", + defaultValue: "127.0.0.1".some, + name: "api-bindaddr" + .}: Option[string] apiPort* {. desc: "The REST Api port", @@ -261,6 +265,13 @@ type name: "block-mn" .}: int + blockRetries* {. + desc: "Number of times to retry fetching a block before giving up", + defaultValue: DefaultBlockRetries, + defaultValueDesc: $DefaultBlockRetries, + name: "block-retries" + .}: int + cacheSize* {. desc: "The size of the block cache, 0 disables the cache - " & @@ -474,7 +485,7 @@ func prover*(self: CodexConf): bool = self.persistence and self.persistenceCmd == PersistenceCmd.prover proc getCodexVersion(): string = - let tag = strip(staticExec("git tag")) + let tag = strip(staticExec("git describe --tags --abbrev=0")) if tag.isEmptyOrWhitespace: return "untagged build" return tag @@ -510,55 +521,73 @@ proc parseCmdArg*( if res.isOk: ma = res.get() else: - warn "Invalid MultiAddress", input = input, error = res.error() + fatal "Invalid MultiAddress", input = input, error = res.error() quit QuitFailure except LPError as exc: - warn "Invalid MultiAddress uri", uri = input, error = exc.msg + fatal "Invalid MultiAddress uri", uri = input, error = exc.msg quit QuitFailure ma -proc parseCmdArg*(T: type ThreadCount, input: string): T {.upraises: [ValueError].} = - let count = parseInt(input) - if count != 0 and count < 2: - warn "Invalid number of threads", input = input - quit QuitFailure - ThreadCount(count) +proc parse*(T: type ThreadCount, p: string): Result[ThreadCount, string] = + try: + let count = parseInt(p) + if count != 0 and count < 2: + return err("Invalid number of threads: " & p) + return ok(ThreadCount(count)) + except ValueError as e: + return err("Invalid number of threads: " & p & ", error=" & e.msg) -proc parseCmdArg*(T: type SignedPeerRecord, uri: string): T = +proc parseCmdArg*(T: type ThreadCount, input: string): T = + let val = ThreadCount.parse(input) + if val.isErr: + fatal "Cannot parse the thread count.", input = input, error = val.error() + quit QuitFailure + return val.get() + +proc parse*(T: type SignedPeerRecord, p: string): Result[SignedPeerRecord, string] = var res: SignedPeerRecord try: - if not res.fromURI(uri): - warn "Invalid SignedPeerRecord uri", uri = uri - quit QuitFailure - except LPError as exc: - warn "Invalid SignedPeerRecord uri", uri = uri, error = exc.msg - quit QuitFailure - except CatchableError as exc: - warn "Invalid SignedPeerRecord uri", uri = uri, error = exc.msg - quit QuitFailure - res + if not res.fromURI(p): + return err("The uri is not a valid SignedPeerRecord: " & p) + return ok(res) + except LPError, Base64Error: + let e = getCurrentException() + return err(e.msg) -func parseCmdArg*(T: type NatConfig, p: string): T {.raises: [ValueError].} = +proc parseCmdArg*(T: type SignedPeerRecord, uri: string): T = + let res = SignedPeerRecord.parse(uri) + if res.isErr: + fatal "Cannot parse the signed peer.", error = res.error(), input = uri + quit QuitFailure + return res.get() + +func parse*(T: type NatConfig, p: string): Result[NatConfig, string] = case p.toLowerAscii of "any": - NatConfig(hasExtIp: false, nat: NatStrategy.NatAny) + return ok(NatConfig(hasExtIp: false, nat: NatStrategy.NatAny)) of "none": - NatConfig(hasExtIp: false, nat: NatStrategy.NatNone) + return ok(NatConfig(hasExtIp: false, nat: NatStrategy.NatNone)) of "upnp": - NatConfig(hasExtIp: false, nat: NatStrategy.NatUpnp) + return ok(NatConfig(hasExtIp: false, nat: NatStrategy.NatUpnp)) of "pmp": - NatConfig(hasExtIp: false, nat: NatStrategy.NatPmp) + return ok(NatConfig(hasExtIp: false, nat: NatStrategy.NatPmp)) else: if p.startsWith("extip:"): try: let ip = parseIpAddress(p[6 ..^ 1]) - NatConfig(hasExtIp: true, extIp: ip) + return ok(NatConfig(hasExtIp: true, extIp: ip)) except ValueError: let error = "Not a valid IP address: " & p[6 ..^ 1] - raise newException(ValueError, error) + return err(error) else: - let error = "Not a valid NAT option: " & p - raise newException(ValueError, error) + return err("Not a valid NAT option: " & p) + +proc parseCmdArg*(T: type NatConfig, p: string): T = + let res = NatConfig.parse(p) + if res.isErr: + fatal "Cannot parse the NAT config.", error = res.error(), input = p + quit QuitFailure + return res.get() proc completeCmdArg*(T: type NatConfig, val: string): seq[string] = return @[] @@ -566,19 +595,25 @@ proc completeCmdArg*(T: type NatConfig, val: string): seq[string] = proc parseCmdArg*(T: type EthAddress, address: string): T = EthAddress.init($address).get() -proc parseCmdArg*(T: type NBytes, val: string): T = +func parse*(T: type NBytes, p: string): Result[NBytes, string] = var num = 0'i64 - let count = parseSize(val, num, alwaysBin = true) + let count = parseSize(p, num, alwaysBin = true) if count == 0: - warn "Invalid number of bytes", nbytes = val + return err("Invalid number of bytes: " & p) + return ok(NBytes(num)) + +proc parseCmdArg*(T: type NBytes, val: string): T = + let res = NBytes.parse(val) + if res.isErr: + fatal "Cannot parse NBytes.", error = res.error(), input = val quit QuitFailure - NBytes(num) + return res.get() proc parseCmdArg*(T: type Duration, val: string): T = var dur: Duration let count = parseDuration(val, dur) if count == 0: - warn "Cannot parse duration", dur = dur + fatal "Cannot parse duration", dur = dur quit QuitFailure dur @@ -595,7 +630,7 @@ proc readValue*(r: var TomlReader, val: var SignedPeerRecord) = try: val = SignedPeerRecord.parseCmdArg(uri) except LPError as err: - warn "Invalid SignedPeerRecord uri", uri = uri, error = err.msg + fatal "Invalid SignedPeerRecord uri", uri = uri, error = err.msg quit QuitFailure proc readValue*(r: var TomlReader, val: var MultiAddress) = @@ -607,7 +642,7 @@ proc readValue*(r: var TomlReader, val: var MultiAddress) = if res.isOk: val = res.get() else: - warn "Invalid MultiAddress", input = input, error = res.error() + fatal "Invalid MultiAddress", input = input, error = res.error() quit QuitFailure proc readValue*( @@ -779,15 +814,6 @@ proc setupLogging*(conf: CodexConf) = else: defaultChroniclesStream.outputs[0].writer = writer - try: - updateLogLevel(conf.logLevel) - except ValueError as err: - try: - stderr.write "Invalid value for --log-level. " & err.msg & "\n" - except IOError: - echo "Invalid value for --log-level. " & err.msg - quit QuitFailure - proc setupMetrics*(config: CodexConf) = if config.metricsEnabled: let metricsAddress = config.metricsAddress diff --git a/codex/contracts/clock.nim b/codex/contracts/clock.nim index b7863539..1d4f57ba 100644 --- a/codex/contracts/clock.nim +++ b/codex/contracts/clock.nim @@ -1,3 +1,5 @@ +{.push raises: [].} + import std/times import pkg/ethers import pkg/questionable @@ -72,7 +74,9 @@ method now*(clock: OnChainClock): SecondsSince1970 = doAssert clock.started, "clock should be started before calling now()" return toUnix(getTime() + clock.offset) -method waitUntil*(clock: OnChainClock, time: SecondsSince1970) {.async.} = +method waitUntil*( + clock: OnChainClock, time: SecondsSince1970 +) {.async: (raises: [CancelledError]).} = while (let difference = time - clock.now(); difference > 0): clock.newBlock.clear() discard await clock.newBlock.wait().withTimeout(chronos.seconds(difference)) diff --git a/codex/discovery.nim b/codex/discovery.nim index 4a211c20..9ff89caf 100644 --- a/codex/discovery.nim +++ b/codex/discovery.nim @@ -43,6 +43,7 @@ type Discovery* = ref object of RootObj # record to advertice node connection information, this carry any # address that the node can be connected on dhtRecord*: ?SignedPeerRecord # record to advertice DHT connection information + isStarted: bool proc toNodeId*(cid: Cid): NodeId = ## Cid to discovery id @@ -203,10 +204,15 @@ proc start*(d: Discovery) {.async: (raises: []).} = try: d.protocol.open() await d.protocol.start() + d.isStarted = true except CatchableError as exc: error "Error starting discovery", exc = exc.msg proc stop*(d: Discovery) {.async: (raises: []).} = + if not d.isStarted: + warn "Discovery not started, skipping stop" + return + try: await noCancel d.protocol.closeWait() except CatchableError as exc: diff --git a/codex/node.nim b/codex/node.nim index 67145d22..ccae080c 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -84,6 +84,7 @@ type BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {. gcsafe, async: (raises: [CancelledError]) .} + OnBlockStoredProc = proc(chunk: seq[byte]): void {.gcsafe, raises: [].} func switch*(self: CodexNodeRef): Switch = return self.switch @@ -434,6 +435,7 @@ proc store*( filename: ?string = string.none, mimetype: ?string = string.none, blockSize = DefaultBlockSize, + onBlockStored: OnBlockStoredProc = nil, ): Future[?!Cid] {.async.} = ## Save stream contents as dataset with given blockSize ## to nodes's BlockStore, and return Cid of its manifest @@ -463,6 +465,9 @@ proc store*( if err =? (await self.networkStore.putBlock(blk)).errorOption: error "Unable to store block", cid = blk.cid, err = err.msg return failure(&"Unable to store block {blk.cid}") + + if not onBlockStored.isNil: + onBlockStored(chunk) except CancelledError as exc: raise exc except CatchableError as exc: @@ -902,6 +907,7 @@ proc stop*(self: CodexNodeRef) {.async.} = if not self.clock.isNil: await self.clock.stop() +proc close*(self: CodexNodeRef) {.async.} = if not self.networkStore.isNil: await self.networkStore.close diff --git a/codex/stores/repostore/store.nim b/codex/stores/repostore/store.nim index ad6f03fc..16813a16 100644 --- a/codex/stores/repostore/store.nim +++ b/codex/stores/repostore/store.nim @@ -443,7 +443,6 @@ proc start*( ): Future[void] {.async: (raises: [CancelledError, CodexError]).} = ## Start repo ## - if self.started: trace "Repo already started" return @@ -465,6 +464,5 @@ proc stop*(self: RepoStore): Future[void] {.async: (raises: []).} = return trace "Stopping repo" - await self.close() self.started = false diff --git a/codex/validation.nim b/codex/validation.nim index 58a0e6b7..d9f8fb5e 100644 --- a/codex/validation.nim +++ b/codex/validation.nim @@ -80,7 +80,7 @@ proc removeSlotsThatHaveEnded(validation: Validation) {.async.} = proc markProofAsMissing( validation: Validation, slotId: SlotId, period: Period -) {.async.} = +) {.async: (raises: [CancelledError]).} = logScope: currentPeriod = validation.getCurrentPeriod() @@ -91,18 +91,18 @@ proc markProofAsMissing( else: let inDowntime {.used.} = await validation.market.inDowntime(slotId) trace "Proof not missing", checkedPeriod = period, inDowntime - except CancelledError: - raise + except CancelledError as e: + raise e except CatchableError as e: error "Marking proof as missing failed", msg = e.msg -proc markProofsAsMissing(validation: Validation) {.async.} = +proc markProofsAsMissing(validation: Validation) {.async: (raises: [CancelledError]).} = let slots = validation.slots for slotId in slots: let previousPeriod = validation.getCurrentPeriod() - 1 await validation.markProofAsMissing(slotId, previousPeriod) -proc run(validation: Validation) {.async: (raises: []).} = +proc run(validation: Validation) {.async: (raises: [CancelledError]).} = trace "Validation started" try: while true: diff --git a/examples/golang/README.md b/examples/golang/README.md new file mode 100644 index 00000000..30a12932 --- /dev/null +++ b/examples/golang/README.md @@ -0,0 +1,24 @@ + +## Pre-requisite + +libcodex.so is needed to be compiled and present in build folder. + +## Compilation + +From the codex root folder: + +```code +go build -o codex-go examples/golang/codex.go +``` + +## Run +From the codex root folder: + + +```code +export LD_LIBRARY_PATH=build +``` + +```code +./codex-go +``` diff --git a/examples/golang/codex.go b/examples/golang/codex.go new file mode 100644 index 00000000..cf6f9aa3 --- /dev/null +++ b/examples/golang/codex.go @@ -0,0 +1,885 @@ +package main + +/* + #cgo LDFLAGS: -L../../build/ -lcodex + #cgo LDFLAGS: -L../../ -Wl,-rpath,../../ + + #include + #include + #include "../../library/libcodex.h" + + typedef struct { + int ret; + char* msg; + size_t len; + uintptr_t h; + } Resp; + + static void* allocResp(uintptr_t h) { + Resp* r = (Resp*)calloc(1, sizeof(Resp)); + r->h = h; + return r; + } + + static void freeResp(void* resp) { + if (resp != NULL) { + free(resp); + } + } + + static int getRet(void* resp) { + if (resp == NULL) { + return 0; + } + Resp* m = (Resp*) resp; + return m->ret; + } + + void libcodexNimMain(void); + + static void codex_host_init_once(void){ + static int done; + if (!__atomic_exchange_n(&done, 1, __ATOMIC_SEQ_CST)) libcodexNimMain(); + } + + // resp must be set != NULL in case interest on retrieving data from the callback + void callback(int ret, char* msg, size_t len, void* resp); + + static void* cGoCodexNew(const char* configJson, void* resp) { + void* ret = codex_new(configJson, (CodexCallback) callback, resp); + return ret; + } + + static int cGoCodexStart(void* codexCtx, void* resp) { + return codex_start(codexCtx, (CodexCallback) callback, resp); + } + + static int cGoCodexStop(void* codexCtx, void* resp) { + return codex_stop(codexCtx, (CodexCallback) callback, resp); + } + + static int cGoCodexClose(void* codexCtx, void* resp) { + return codex_close(codexCtx, (CodexCallback) callback, resp); + } + + static int cGoCodexDestroy(void* codexCtx, void* resp) { + return codex_destroy(codexCtx, (CodexCallback) callback, resp); + } + + static int cGoCodexVersion(void* codexCtx, void* resp) { + return codex_version(codexCtx, (CodexCallback) callback, resp); + } + + static int cGoCodexRevision(void* codexCtx, void* resp) { + return codex_revision(codexCtx, (CodexCallback) callback, resp); + } + + static int cGoCodexRepo(void* codexCtx, void* resp) { + return codex_repo(codexCtx, (CodexCallback) callback, resp); + } + + static int cGoCodexSpr(void* codexCtx, void* resp) { + return codex_spr(codexCtx, (CodexCallback) callback, resp); + } + + static int cGoCodexPeerId(void* codexCtx, void* resp) { + return codex_peer_id(codexCtx, (CodexCallback) callback, resp); + } + + static int cGoCodexUploadInit(void* codexCtx, char* filepath, size_t chunkSize, void* resp) { + return codex_upload_init(codexCtx, filepath, chunkSize, (CodexCallback) callback, resp); + } + + static int cGoCodexUploadChunk(void* codexCtx, char* sessionId, const uint8_t* chunk, size_t len, void* resp) { + return codex_upload_chunk(codexCtx, sessionId, chunk, len, (CodexCallback) callback, resp); + } + + static int cGoCodexUploadFinalize(void* codexCtx, char* sessionId, void* resp) { + return codex_upload_finalize(codexCtx, sessionId, (CodexCallback) callback, resp); + } + + static int cGoCodexUploadCancel(void* codexCtx, char* sessionId, void* resp) { + return codex_upload_cancel(codexCtx, sessionId, (CodexCallback) callback, resp); + } + + static int cGoCodexUploadFile(void* codexCtx, char* sessionId, void* resp) { + return codex_upload_file(codexCtx, sessionId, (CodexCallback) callback, resp); + } + + static int cGoCodexLogLevel(void* codexCtx, char* logLevel, void* resp) { + return codex_log_level(codexCtx, logLevel, (CodexCallback) callback, resp); + } + + static int cGoCodexExists(void* codexCtx, char* cid, void* resp) { + return codex_storage_exists(codexCtx, cid, (CodexCallback) callback, resp); + } +*/ +import "C" +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "os" + "os/signal" + "runtime/cgo" + "sync" + "syscall" + "unsafe" +) + +type LogFormat string + +const ( + LogFormatAuto LogFormat = "auto" + LogFormatColors LogFormat = "colors" + LogFormatNoColors LogFormat = "nocolors" + LogFormatJSON LogFormat = "json" +) + +type RepoKind string + +const ( + FS RepoKind = "fs" + SQLite RepoKind = "sqlite" + LevelDb RepoKind = "leveldb" +) + +const defaultBlockSize = 1024 * 64 + +type Config struct { + // Default: INFO + LogLevel string `json:"log-level,omitempty"` + + // Specifies what kind of logs should be written to stdout + // Default: auto + LogFormat LogFormat `json:"log-format,omitempty"` + + // Enable the metrics server + // Default: false + MetricsEnabled bool `json:"metrics,omitempty"` + + // Listening address of the metrics server + // Default: 127.0.0.1 + MetricsAddress string `json:"metrics-address,omitempty"` + + // Listening HTTP port of the metrics server + // Default: 8008 + MetricsPort int `json:"metrics-port,omitempty"` + + // The directory where codex will store configuration and data + // Default: + // $HOME\AppData\Roaming\Codex on Windows + // $HOME/Library/Application Support/Codex on macOS + // $HOME/.cache/codex on Linux + DataDir string `json:"data-dir,omitempty"` + + // Multi Addresses to listen on + // Default: ["/ip4/0.0.0.0/tcp/0"] + ListenAddrs []string `json:"listen-addrs,omitempty"` + + // Specify method to use for determining public address. + // Must be one of: any, none, upnp, pmp, extip: + // Default: any + Nat string `json:"nat,omitempty"` + + // Discovery (UDP) port + // Default: 8090 + DiscoveryPort int `json:"disc-port,omitempty"` + + // Source of network (secp256k1) private key file path or name + // Default: "key" + NetPrivKeyFile string `json:"net-privkey,omitempty"` + + // Specifies one or more bootstrap nodes to use when connecting to the network. + BootstrapNodes []string `json:"bootstrap-node,omitempty"` + + // The maximum number of peers to connect to. + // Default: 160 + MaxPeers int `json:"max-peers,omitempty"` + + // Number of worker threads (\"0\" = use as many threads as there are CPU cores available) + // Default: 0 + NumThreads int `json:"num-threads,omitempty"` + + // Node agent string which is used as identifier in network + // Default: "Codex" + AgentString string `json:"agent-string,omitempty"` + + // Backend for main repo store (fs, sqlite, leveldb) + // Default: fs + RepoKind RepoKind `json:"repo-kind,omitempty"` + + // The size of the total storage quota dedicated to the node + // Default: 20 GiBs + StorageQuota int `json:"storage-quota,omitempty"` + + // Default block timeout in seconds - 0 disables the ttl + // Default: 30 days + BlockTtl int `json:"block-ttl,omitempty"` + + // Time interval in seconds - determines frequency of block + // maintenance cycle: how often blocks are checked for expiration and cleanup + // Default: 10 minutes + BlockMaintenanceInterval int `json:"block-mi,omitempty"` + + // Number of blocks to check every maintenance cycle + // Default: 1000 + BlockMaintenanceNumberOfBlocks int `json:"block-mn,omitempty"` + + // Number of times to retry fetching a block before giving up + // Default: 3000 + BlockRetries int `json:"block-retries,omitempty"` + + // The size of the block cache, 0 disables the cache - + // might help on slow hardrives + // Default: 0 + CacheSize int `json:"cache-size,omitempty"` + + // Default: "" (no log file) + LogFile string `json:"log-file,omitempty"` +} + +type CodexNode struct { + ctx unsafe.Pointer +} + +type ChunkSize int + +func (c ChunkSize) valOrDefault() int { + if c == 0 { + return defaultBlockSize + } + + return int(c) +} + +func (c ChunkSize) toSizeT() C.size_t { + return C.size_t(c.valOrDefault()) +} + +// bridgeCtx is used for managing the C-Go bridge calls. +// It contains a wait group for synchronizing the calls, +// a cgo.Handle for passing context to the C code, +// a response pointer for receiving data from the C code, +// and fields for storing the result and error of the call. +type bridgeCtx struct { + wg *sync.WaitGroup + h cgo.Handle + resp unsafe.Pointer + result string + err error + + // Callback used for receiving progress updates during upload/download. + // + // For the upload, the bytes parameter indicates the number of bytes uploaded. + // If the chunk size is superior or equal to the blocksize (passed in init function), + // the callback will be called when a block is put in the store. + // Otherwise, it will be called when a chunk is pushed into the stream. + // + // For the download, the bytes is the size of the chunk received, and the chunk + // is the actual chunk of data received. + onProgress func(bytes int, chunk []byte) +} + +// newBridgeCtx creates a new bridge context for managing C-Go calls. +// The bridge context is initialized with a wait group and a cgo.Handle. +func newBridgeCtx() *bridgeCtx { + bridge := &bridgeCtx{} + bridge.wg = &sync.WaitGroup{} + bridge.wg.Add(1) + bridge.h = cgo.NewHandle(bridge) + bridge.resp = C.allocResp(C.uintptr_t(uintptr(bridge.h))) + return bridge +} + +// callError creates an error message for a failed C-Go call. +func (b *bridgeCtx) callError(name string) error { + return fmt.Errorf("failed the call to %s returned code %d", name, C.getRet(b.resp)) +} + +// free releases the resources associated with the bridge context, +// including the cgo.Handle and the response pointer. +func (b *bridgeCtx) free() { + if b.h > 0 { + b.h.Delete() + b.h = 0 + } + + if b.resp != nil { + C.freeResp(b.resp) + b.resp = nil + } +} + +// callback is the function called by the C code to communicate back to Go. +// It handles progress updates, successful completions, and errors. +// The function uses the response pointer to retrieve the bridge context +// and update its state accordingly. +// +//export callback +func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { + if resp == nil { + return + } + + m := (*C.Resp)(resp) + m.ret = ret + m.msg = msg + m.len = len + + if m.h == 0 { + return + } + + h := cgo.Handle(m.h) + if h == 0 { + return + } + + if v, ok := h.Value().(*bridgeCtx); ok { + switch ret { + case C.RET_PROGRESS: + if v.onProgress == nil { + return + } + if msg != nil { + chunk := C.GoBytes(unsafe.Pointer(msg), C.int(len)) + v.onProgress(int(C.int(len)), chunk) + } else { + v.onProgress(int(C.int(len)), nil) + } + case C.RET_OK: + retMsg := C.GoStringN(msg, C.int(len)) + v.result = retMsg + v.err = nil + if v.wg != nil { + v.wg.Done() + } + case C.RET_ERR: + retMsg := C.GoStringN(msg, C.int(len)) + v.err = errors.New(retMsg) + if v.wg != nil { + v.wg.Done() + } + } + } +} + +// wait waits for the bridge context to complete its operation. +// It returns the result and error of the operation. +func (b *bridgeCtx) wait() (string, error) { + b.wg.Wait() + return b.result, b.err +} + +type OnUploadProgressFunc func(read, total int, percent float64, err error) + +type UploadOptions struct { + // Filepath can be the full path when using UploadFile + // otherwise the file name. + // It is used to detect the mimetype. + Filepath string + + // ChunkSize is the size of each upload chunk, passed as `blockSize` to the Codex node + // store. Default is to 64 KB. + ChunkSize ChunkSize + + // OnProgress is a callback function that is called after each chunk is uploaded with: + // - read: the number of bytes read in the last chunk. + // - total: the total number of bytes read so far. + // - percent: the percentage of the total file size that has been uploaded. It is + // determined from a `stat` call if it is a file and from the length of the buffer + // if it is a buffer. Otherwise, it is 0. + // - err: an error, if one occurred. + // + // If the chunk size is more than the `chunkSize` parameter, the callback is called + // after the block is actually stored in the block store. Otherwise, it is called + // after the chunk is sent to the stream. + OnProgress OnUploadProgressFunc +} + +func getReaderSize(r io.Reader) int64 { + switch v := r.(type) { + case *os.File: + stat, err := v.Stat() + if err != nil { + return 0 + } + return stat.Size() + case *bytes.Buffer: + return int64(v.Len()) + default: + return 0 + } +} + +// New creates a new Codex node with the provided configuration. +// The node is not started automatically; you need to call CodexStart +// to start it. +// It returns a Codex node that can be used to interact +// with the Codex network. +func New(config Config) (*CodexNode, error) { + bridge := newBridgeCtx() + defer bridge.free() + + jsonConfig, err := json.Marshal(config) + if err != nil { + return nil, err + } + + cJsonConfig := C.CString(string(jsonConfig)) + defer C.free(unsafe.Pointer(cJsonConfig)) + + ctx := C.cGoCodexNew(cJsonConfig, bridge.resp) + + if _, err := bridge.wait(); err != nil { + return nil, bridge.err + } + + return &CodexNode{ctx: ctx}, bridge.err +} + +// Start starts the Codex node. +func (node CodexNode) Start() error { + bridge := newBridgeCtx() + defer bridge.free() + + if C.cGoCodexStart(node.ctx, bridge.resp) != C.RET_OK { + return bridge.callError("cGoCodexStart") + } + + _, err := bridge.wait() + return err +} + +// StartAsync is the asynchronous version of Start. +func (node CodexNode) StartAsync(onDone func(error)) { + go func() { + err := node.Start() + onDone(err) + }() +} + +// Stop stops the Codex node. +func (node CodexNode) Stop() error { + bridge := newBridgeCtx() + defer bridge.free() + + if C.cGoCodexStop(node.ctx, bridge.resp) != C.RET_OK { + return bridge.callError("cGoCodexStop") + } + + _, err := bridge.wait() + return err +} + +// Destroy destroys the Codex node, freeing all resources. +// The node must be stopped before calling this method. +func (node CodexNode) Destroy() error { + bridge := newBridgeCtx() + defer bridge.free() + + if C.cGoCodexClose(node.ctx, bridge.resp) != C.RET_OK { + return bridge.callError("cGoCodexClose") + } + + _, err := bridge.wait() + if err != nil { + return err + } + + if C.cGoCodexDestroy(node.ctx, bridge.resp) != C.RET_OK { + return errors.New("Failed to destroy the codex node.") + } + + return err +} + +// Version returns the version of the Codex node. +func (node CodexNode) Version() (string, error) { + bridge := newBridgeCtx() + defer bridge.free() + + if C.cGoCodexVersion(node.ctx, bridge.resp) != C.RET_OK { + return "", bridge.callError("cGoCodexVersion") + } + + return bridge.wait() +} + +func (node CodexNode) Revision() (string, error) { + bridge := newBridgeCtx() + defer bridge.free() + + if C.cGoCodexRevision(node.ctx, bridge.resp) != C.RET_OK { + return "", bridge.callError("cGoCodexRevision") + } + + return bridge.wait() +} + +// Repo returns the path of the data dir folder. +func (node CodexNode) Repo() (string, error) { + bridge := newBridgeCtx() + defer bridge.free() + + if C.cGoCodexRepo(node.ctx, bridge.resp) != C.RET_OK { + return "", bridge.callError("cGoCodexRepo") + } + + return bridge.wait() +} + +func (node CodexNode) Spr() (string, error) { + bridge := newBridgeCtx() + defer bridge.free() + + if C.cGoCodexSpr(node.ctx, bridge.resp) != C.RET_OK { + return "", bridge.callError("cGoCodexSpr") + } + + return bridge.wait() +} + +func (node CodexNode) PeerId() (string, error) { + bridge := newBridgeCtx() + defer bridge.free() + + if C.cGoCodexPeerId(node.ctx, bridge.resp) != C.RET_OK { + return "", bridge.callError("cGoCodexPeerId") + } + + return bridge.wait() +} + +// UploadInit initializes a new upload session. +// It returns a session ID that can be used for subsequent upload operations. +// This function is called by UploadReader and UploadFile internally. +// You should use this function only if you need to manage the upload session manually. +func (node CodexNode) UploadInit(options *UploadOptions) (string, error) { + bridge := newBridgeCtx() + defer bridge.free() + + var cFilename = C.CString(options.Filepath) + defer C.free(unsafe.Pointer(cFilename)) + + if C.cGoCodexUploadInit(node.ctx, cFilename, options.ChunkSize.toSizeT(), bridge.resp) != C.RET_OK { + return "", bridge.callError("cGoCodexUploadInit") + } + + return bridge.wait() +} + +// UploadChunk uploads a chunk of data to the Codex node. +// It takes the session ID returned by UploadInit +// and a byte slice containing the chunk data. +// This function is called by UploadReader internally. +// You should use this function only if you need to manage the upload session manually. +func (node CodexNode) UploadChunk(sessionId string, chunk []byte) error { + bridge := newBridgeCtx() + defer bridge.free() + + var cSessionId = C.CString(sessionId) + defer C.free(unsafe.Pointer(cSessionId)) + + var cChunkPtr *C.uint8_t + if len(chunk) > 0 { + cChunkPtr = (*C.uint8_t)(unsafe.Pointer(&chunk[0])) + } + + if C.cGoCodexUploadChunk(node.ctx, cSessionId, cChunkPtr, C.size_t(len(chunk)), bridge.resp) != C.RET_OK { + return bridge.callError("cGoCodexUploadChunk") + } + + _, err := bridge.wait() + return err +} + +// UploadFinalize finalizes the upload session and returns the CID of the uploaded file. +// It takes the session ID returned by UploadInit. +// This function is called by UploadReader and UploadFile internally. +// You should use this function only if you need to manage the upload session manually. +func (node CodexNode) UploadFinalize(sessionId string) (string, error) { + bridge := newBridgeCtx() + defer bridge.free() + + var cSessionId = C.CString(sessionId) + defer C.free(unsafe.Pointer(cSessionId)) + + if C.cGoCodexUploadFinalize(node.ctx, cSessionId, bridge.resp) != C.RET_OK { + return "", bridge.callError("cGoCodexUploadFinalize") + } + + return bridge.wait() +} + +// UploadCancel cancels an ongoing upload session. +// It can be only if the upload session is managed manually. +// It doesn't work with UploadFile. +func (node CodexNode) UploadCancel(sessionId string) error { + bridge := newBridgeCtx() + defer bridge.free() + + var cSessionId = C.CString(sessionId) + defer C.free(unsafe.Pointer(cSessionId)) + + if C.cGoCodexUploadCancel(node.ctx, cSessionId, bridge.resp) != C.RET_OK { + return bridge.callError("cGoCodexUploadCancel") + } + + _, err := bridge.wait() + return err +} + +// UploadReader uploads data from an io.Reader to the Codex node. +// It takes the upload options and the reader as parameters. +// It returns the CID of the uploaded file or an error. +// +// Internally, it calls: +// - UploadInit to create the upload session. +// - UploadChunk to upload a chunk to codex. +// - UploadFinalize to finalize the upload session. +// - UploadCancel if an error occurs. +func (node CodexNode) UploadReader(options UploadOptions, r io.Reader) (string, error) { + sessionId, err := node.UploadInit(&options) + if err != nil { + return "", err + } + + buf := make([]byte, options.ChunkSize.valOrDefault()) + total := 0 + + var size int64 + if options.OnProgress != nil { + size = getReaderSize(r) + } + + for { + n, err := r.Read(buf) + if err == io.EOF { + break + } + + if err != nil { + if cancelErr := node.UploadCancel(sessionId); cancelErr != nil { + return "", fmt.Errorf("failed to upload chunk %v and failed to cancel upload session %v", err, cancelErr) + } + + return "", err + } + + if n == 0 { + break + } + + if err := node.UploadChunk(sessionId, buf[:n]); err != nil { + if cancelErr := node.UploadCancel(sessionId); cancelErr != nil { + return "", fmt.Errorf("failed to upload chunk %v and failed to cancel upload session %v", err, cancelErr) + } + + return "", err + } + + total += n + if options.OnProgress != nil && size > 0 { + percent := float64(total) / float64(size) * 100.0 + // The last block could be a bit over the size due to padding + // on the chunk size. + if percent > 100.0 { + percent = 100.0 + } + options.OnProgress(n, total, percent, nil) + } else if options.OnProgress != nil { + options.OnProgress(n, total, 0, nil) + } + } + + return node.UploadFinalize(sessionId) +} + +// UploadReaderAsync is the asynchronous version of UploadReader using a goroutine. +func (node CodexNode) UploadReaderAsync(options UploadOptions, r io.Reader, onDone func(cid string, err error)) { + go func() { + cid, err := node.UploadReader(options, r) + onDone(cid, err) + }() +} + +// UploadFile uploads a file to the Codex node. +// It takes the upload options as parameter. +// It returns the CID of the uploaded file or an error. +// +// The options parameter contains the following fields: +// - filepath: the full path of the file to upload. +// - chunkSize: the size of each upload chunk, passed as `blockSize` to the Codex node +// store. Default is to 64 KB. +// - onProgress: a callback function that is called after each chunk is uploaded with: +// - read: the number of bytes read in the last chunk. +// - total: the total number of bytes read so far. +// - percent: the percentage of the total file size that has been uploaded. It is +// determined from a `stat` call. +// - err: an error, if one occurred. +// +// If the chunk size is more than the `chunkSize` parameter, the callback is called after +// the block is actually stored in the block store. Otherwise, it is called after the chunk +// is sent to the stream. +// +// Internally, it calls UploadInit to create the upload session. +func (node CodexNode) UploadFile(options UploadOptions) (string, error) { + bridge := newBridgeCtx() + defer bridge.free() + + if options.OnProgress != nil { + stat, err := os.Stat(options.Filepath) + if err != nil { + return "", err + } + + size := stat.Size() + total := 0 + + if size > 0 { + bridge.onProgress = func(read int, _ []byte) { + if read == 0 { + return + } + + total += read + percent := float64(total) / float64(size) * 100.0 + // The last block could be a bit over the size due to padding + // on the chunk size. + if percent > 100.0 { + percent = 100.0 + } + + options.OnProgress(read, int(size), percent, nil) + } + } + } + + sessionId, err := node.UploadInit(&options) + if err != nil { + return "", err + } + + var cSessionId = C.CString(sessionId) + defer C.free(unsafe.Pointer(cSessionId)) + + if C.cGoCodexUploadFile(node.ctx, cSessionId, bridge.resp) != C.RET_OK { + return "", bridge.callError("cGoCodexUploadFile") + } + + return bridge.wait() +} + +// UploadFileAsync is the asynchronous version of UploadFile using a goroutine. +func (node CodexNode) UploadFileAsync(options UploadOptions, onDone func(cid string, err error)) { + go func() { + cid, err := node.UploadFile(options) + onDone(cid, err) + }() +} + +func (node CodexNode) UpdateLogLevel(logLevel string) error { + bridge := newBridgeCtx() + defer bridge.free() + + var cLogLevel = C.CString(string(logLevel)) + defer C.free(unsafe.Pointer(cLogLevel)) + + if C.cGoCodexLogLevel(node.ctx, cLogLevel, bridge.resp) != C.RET_OK { + return bridge.callError("cGoCodexLogLevel") + } + + _, err := bridge.wait() + return err +} + +func (node CodexNode) Exists(cid string) (bool, error) { + bridge := newBridgeCtx() + defer bridge.free() + + var cCid = C.CString(cid) + defer C.free(unsafe.Pointer(cCid)) + + if C.cGoCodexExists(node.ctx, cCid, bridge.resp) != C.RET_OK { + return false, bridge.callError("cGoCodexUploadCancel") + } + + result, err := bridge.wait() + return result == "true", err +} + +func main() { + dataDir := os.TempDir() + "/data-dir" + + node, err := New(Config{ + BlockRetries: 5, + LogLevel: "WARN", + DataDir: dataDir, + }) + if err != nil { + log.Fatalf("Failed to create Codex node: %v", err) + } + defer os.RemoveAll(dataDir) + + if err := node.Start(); err != nil { + log.Fatalf("Failed to start Codex node: %v", err) + } + log.Println("Codex node started") + + version, err := node.Version() + if err != nil { + log.Fatalf("Failed to get Codex version: %v", err) + } + log.Printf("Codex version: %s", version) + + err = node.UpdateLogLevel("ERROR") + if err != nil { + log.Fatalf("Failed to update log level: %v", err) + } + + cid := "zDvZRwzmAkhzDRPH5EW242gJBNZ2T7aoH2v1fVH66FxXL4kSbvyM" + exists, err := node.Exists(cid) + if err != nil { + log.Fatalf("Failed to check data existence: %v", err) + } + + if exists { + log.Fatalf("The data should not exist") + } + + buf := bytes.NewBuffer([]byte("Hello World!")) + len := buf.Len() + cid, err = node.UploadReader(UploadOptions{Filepath: "hello.txt"}, buf) + if err != nil { + log.Fatalf("Failed to upload data: %v", err) + } + log.Printf("Uploaded data with CID: %s (size: %d bytes)", cid, len) + + exists, err = node.Exists(cid) + if err != nil { + log.Fatalf("Failed to check data existence: %v", err) + } + + if !exists { + log.Fatalf("The data should exist") + } + + // Wait for a SIGINT or SIGTERM signal + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) + <-ch + + if err := node.Stop(); err != nil { + log.Fatalf("Failed to stop Codex node: %v", err) + } + log.Println("Codex node stopped") + + if err := node.Destroy(); err != nil { + log.Fatalf("Failed to destroy Codex node: %v", err) + } +} diff --git a/examples/golang/hello.txt b/examples/golang/hello.txt new file mode 100644 index 00000000..c57eff55 --- /dev/null +++ b/examples/golang/hello.txt @@ -0,0 +1 @@ +Hello World! \ No newline at end of file diff --git a/library/README.md b/library/README.md new file mode 100644 index 00000000..db6423fe --- /dev/null +++ b/library/README.md @@ -0,0 +1,37 @@ +# Codex Library + +Codex exposes a C binding that serves as a stable contract, making it straightforward to integrate Codex into other languages such as Go. + +The implementation was inspired by [nim-library-template](https://github.com/logos-co/nim-library-template) +and by the [nwaku](https://github.com/waku-org/nwaku/tree/master/library) library. + +The source code contains detailed comments to explain the threading and callback flow. +The diagram below summarizes the lifecycle: context creation, request execution, and shutdown. + +```mermaid +sequenceDiagram + autonumber + actor App as App/User + participant Go as Go Wrapper + participant C as C API (libcodex.h) + participant Ctx as CodexContext + participant Thr as Worker Thread + participant Eng as CodexServer + + App->>Go: Start + Go->>C: codex_start_node + C->>Ctx: enqueue request + C->>Ctx: fire signal + Ctx->>Thr: wake worker + Thr->>Ctx: dequeue request + Thr-->>Ctx: ACK + Ctx-->>C: forward ACK + C-->>Go: RET OK + Go->>App: Unblock + Thr->>Eng: execute (async) + Eng-->>Thr: result ready + Thr-->>Ctx: callback + Ctx-->>C: forward callback + C-->>Go: forward callback + Go-->>App: done +``` \ No newline at end of file diff --git a/library/alloc.nim b/library/alloc.nim new file mode 100644 index 00000000..1a6f118b --- /dev/null +++ b/library/alloc.nim @@ -0,0 +1,42 @@ +## Can be shared safely between threads +type SharedSeq*[T] = tuple[data: ptr UncheckedArray[T], len: int] + +proc alloc*(str: cstring): cstring = + # Byte allocation from the given address. + # There should be the corresponding manual deallocation with deallocShared ! + if str.isNil(): + var ret = cast[cstring](allocShared(1)) # Allocate memory for the null terminator + ret[0] = '\0' # Set the null terminator + return ret + + let ret = cast[cstring](allocShared(len(str) + 1)) + copyMem(ret, str, len(str) + 1) + return ret + +proc alloc*(str: string): cstring = + ## Byte allocation from the given address. + ## There should be the corresponding manual deallocation with deallocShared ! + var ret = cast[cstring](allocShared(str.len + 1)) + let s = cast[seq[char]](str) + for i in 0 ..< str.len: + ret[i] = s[i] + ret[str.len] = '\0' + return ret + +proc allocSharedSeq*[T](s: seq[T]): SharedSeq[T] = + let data = allocShared(sizeof(T) * s.len) + if s.len != 0: + copyMem(data, unsafeAddr s[0], s.len) + return (cast[ptr UncheckedArray[T]](data), s.len) + +proc deallocSharedSeq*[T](s: var SharedSeq[T]) = + deallocShared(s.data) + s.len = 0 + +proc toSeq*[T](s: SharedSeq[T]): seq[T] = + ## Creates a seq[T] from a SharedSeq[T]. No explicit dealloc is required + ## as req[T] is a GC managed type. + var ret = newSeq[T]() + for i in 0 ..< s.len: + ret.add(s.data[i]) + return ret diff --git a/library/codex_context.nim b/library/codex_context.nim new file mode 100644 index 00000000..f1b6b1a5 --- /dev/null +++ b/library/codex_context.nim @@ -0,0 +1,225 @@ +## This file defines the Codex context and its thread flow: +## 1. Client enqueues a request and signals the Codex thread. +## 2. The Codex thread dequeues the request and sends an ack (reqReceivedSignal). +## 3. The Codex thread executes the request asynchronously. +## 4. On completion, the Codex thread invokes the client callback with the result and userData. + +{.pragma: exported, exportc, cdecl, raises: [].} +{.pragma: callback, cdecl, raises: [], gcsafe.} +{.passc: "-fPIC".} + +import std/[options, locks, atomics] +import chronicles +import chronos +import chronos/threadsync +import taskpools/channels_spsc_single +import ./ffi_types +import ./codex_thread_requests/[codex_thread_request] + +from ../codex/codex import CodexServer + +logScope: + topics = "codexlib" + +type CodexContext* = object + thread: Thread[(ptr CodexContext)] + + # This lock is only necessary while we use a SP Channel and while the signalling + # between threads assumes that there aren't concurrent requests. + # Rearchitecting the signaling + migrating to a MP Channel will allow us to receive + # requests concurrently and spare us the need of locks + lock: Lock + + # Channel to send requests to the Codex thread. + # Requests will be popped from this channel. + reqChannel: ChannelSPSCSingle[ptr CodexThreadRequest] + + # To notify the Codex thread that a request is ready + reqSignal: ThreadSignalPtr + + # To notify the client thread that the request was received. + # It is acknowledgment signal (handshake). + reqReceivedSignal: ThreadSignalPtr + + # Custom state attached by the client to a request, + # returned when its callback is invoked + userData*: pointer + + # Function called by the library to notify the client of global events + eventCallback*: pointer + + # Custom state attached by the client to the context, + # returned with every event callback + eventUserData*: pointer + + # Set to false to stop the Codex thread (during codex_destroy) + running: Atomic[bool] + +template callEventCallback(ctx: ptr CodexContext, eventName: string, body: untyped) = + ## Template used to notify the client of global events + ## Example: onConnectionChanged, onProofMissing, etc. + if isNil(ctx[].eventCallback): + error eventName & " - eventCallback is nil" + return + + foreignThreadGc: + try: + let event = body + cast[CodexCallback](ctx[].eventCallback)( + RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData + ) + except CatchableError: + let msg = + "Exception " & eventName & " when calling 'eventCallBack': " & + getCurrentExceptionMsg() + cast[CodexCallback](ctx[].eventCallback)( + RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData + ) + +proc sendRequestToCodexThread*( + ctx: ptr CodexContext, + reqType: RequestType, + reqContent: pointer, + callback: CodexCallback, + userData: pointer, + timeout = InfiniteDuration, +): Result[void, string] = + ctx.lock.acquire() + + defer: + ctx.lock.release() + + let req = CodexThreadRequest.createShared(reqType, reqContent, callback, userData) + + # Send the request to the Codex thread + let sentOk = ctx.reqChannel.trySend(req) + if not sentOk: + deallocShared(req) + return err("Failed to send request to the codex thread: " & $req[]) + + # Notify the Codex thread that a request is available + let fireSyncRes = ctx.reqSignal.fireSync() + if fireSyncRes.isErr(): + deallocShared(req) + return err( + "Failed to send request to the codex thread: unable to fireSync: " & + $fireSyncRes.error + ) + + if fireSyncRes.get() == false: + deallocShared(req) + return err("Failed to send request to the codex thread: fireSync timed out.") + + # Wait until the Codex Thread properly received the request + let res = ctx.reqReceivedSignal.waitSync(timeout) + if res.isErr(): + deallocShared(req) + return err( + "Failed to send request to the codex thread: unable to receive reqReceivedSignal signal." + ) + + ## Notice that in case of "ok", the deallocShared(req) is performed by the Codex Thread in the + ## process proc. See the 'codex_thread_request.nim' module for more details. + ok() + +proc runCodex(ctx: ptr CodexContext) {.async: (raises: []).} = + var codex: CodexServer + + while true: + try: + # Wait until a request is available + await ctx.reqSignal.wait() + except Exception as e: + error "Failure in run codex thread while waiting for reqSignal.", error = e.msg + continue + + # If codex_destroy was called, exit the loop + if ctx.running.load == false: + break + + var request: ptr CodexThreadRequest + + # Pop a request from the channel + let recvOk = ctx.reqChannel.tryRecv(request) + if not recvOk: + error "Failure in run codex: unable to receive request in codex thread." + continue + + # yield immediately to the event loop + # with asyncSpawn only, the code will be executed + # synchronously until the first await + asyncSpawn ( + proc() {.async.} = + await sleepAsync(0) + await CodexThreadRequest.process(request, addr codex) + )() + + # Notify the main thread that we picked up the request + let fireRes = ctx.reqReceivedSignal.fireSync() + if fireRes.isErr(): + error "Failure in run codex: unable to fire back to requester thread.", + error = fireRes.error + +proc run(ctx: ptr CodexContext) {.thread.} = + waitFor runCodex(ctx) + +proc createCodexContext*(): Result[ptr CodexContext, string] = + ## This proc is called from the main thread and it creates + ## the Codex working thread. + + # Allocates a CodexContext in shared memory (for the main thread) + var ctx = createShared(CodexContext, 1) + + # This signal is used by the main side to wake the Codex thread + # when a new request is enqueued. + ctx.reqSignal = ThreadSignalPtr.new().valueOr: + return + err("Failed to create a context: unable to create reqSignal ThreadSignalPtr.") + + # Used to let the caller know that the Codex thread has + # acknowledged / picked up a request (like a handshake). + ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: + return err( + "Failed to create codex context: unable to create reqReceivedSignal ThreadSignalPtr." + ) + + # Protects shared state inside CodexContext + ctx.lock.initLock() + + # Codex thread will loop until codex_destroy is called + ctx.running.store(true) + + try: + createThread(ctx.thread, run, ctx) + except ValueError, ResourceExhaustedError: + freeShared(ctx) + return err( + "Failed to create codex context: unable to create thread: " & + getCurrentExceptionMsg() + ) + + return ok(ctx) + +proc destroyCodexContext*(ctx: ptr CodexContext): Result[void, string] = + # Signal the Codex thread to stop + ctx.running.store(false) + + # Wake the worker up if it's waiting + let signaledOnTime = ctx.reqSignal.fireSync().valueOr: + return err("Failed to destroy codex context: " & $error) + + if not signaledOnTime: + return err( + "Failed to destroy codex context: unable to get signal reqSignal on time in destroyCodexContext." + ) + + # Wait for the thread to finish + joinThread(ctx.thread) + + # Clean up + ctx.lock.deinitLock() + ?ctx.reqSignal.close() + ?ctx.reqReceivedSignal.close() + freeShared(ctx) + + return ok() diff --git a/library/codex_thread_requests/codex_thread_request.nim b/library/codex_thread_requests/codex_thread_request.nim new file mode 100644 index 00000000..2229cddf --- /dev/null +++ b/library/codex_thread_requests/codex_thread_request.nim @@ -0,0 +1,126 @@ +## This file contains the base message request type that will be handled. +## The requests are created by the main thread and processed by +## the Codex Thread. + +import std/json +import results +import chronos +import ../ffi_types +import ./requests/node_lifecycle_request +import ./requests/node_info_request +import ./requests/node_debug_request +import ./requests/node_p2p_request +import ./requests/node_upload_request +import ./requests/node_download_request +import ./requests/node_storage_request + +from ../../codex/codex import CodexServer + +type RequestType* {.pure.} = enum + LIFECYCLE + INFO + DEBUG + P2P + UPLOAD + DOWNLOAD + STORAGE + +type CodexThreadRequest* = object + reqType: RequestType + + # Request payloed + reqContent: pointer + + # Callback to notify the client thread of the result + callback: CodexCallback + + # Custom state attached by the client to the request, + # returned when its callback is invoked. + userData: pointer + +proc createShared*( + T: type CodexThreadRequest, + reqType: RequestType, + reqContent: pointer, + callback: CodexCallback, + userData: pointer, +): ptr type T = + var ret = createShared(T) + ret[].reqType = reqType + ret[].reqContent = reqContent + ret[].callback = callback + ret[].userData = userData + return ret + +# NOTE: User callbacks are executed on the working thread. +# They must be fast and non-blocking; otherwise this thread will be blocked +# and no further requests can be processed. +# We can improve this by dispatching the callbacks to a thread pool or +# moving to a MP channel. +# See: https://github.com/codex-storage/nim-codex/pull/1322#discussion_r2340708316 +proc handleRes[T: string | void | seq[byte]]( + res: Result[T, string], request: ptr CodexThreadRequest +) = + ## Handles the Result responses, which can either be Result[string, string] or + ## Result[void, string]. + defer: + deallocShared(request) + + if res.isErr(): + foreignThreadGc: + let msg = $res.error + if msg == "": + request[].callback(RET_ERR, nil, cast[csize_t](0), request[].userData) + else: + request[].callback( + RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData + ) + return + + foreignThreadGc: + var msg: cstring = "" + when T is string: + msg = res.get().cstring() + request[].callback( + RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData + ) + return + +proc process*( + T: type CodexThreadRequest, request: ptr CodexThreadRequest, codex: ptr CodexServer +) {.async: (raises: []).} = + ## Processes the request in the Codex thread. + ## Dispatch to the appropriate request handler based on reqType. + let retFut = + case request[].reqType + of LIFECYCLE: + cast[ptr NodeLifecycleRequest](request[].reqContent).process(codex) + of INFO: + cast[ptr NodeInfoRequest](request[].reqContent).process(codex) + of RequestType.DEBUG: + cast[ptr NodeDebugRequest](request[].reqContent).process(codex) + of P2P: + cast[ptr NodeP2PRequest](request[].reqContent).process(codex) + of STORAGE: + cast[ptr NodeStorageRequest](request[].reqContent).process(codex) + of DOWNLOAD: + let onChunk = proc(bytes: seq[byte]) = + if bytes.len > 0: + request[].callback( + RET_PROGRESS, + cast[ptr cchar](unsafeAddr bytes[0]), + cast[csize_t](bytes.len), + request[].userData, + ) + + cast[ptr NodeDownloadRequest](request[].reqContent).process(codex, onChunk) + of UPLOAD: + let onBlockReceived = proc(bytes: int) = + request[].callback(RET_PROGRESS, nil, cast[csize_t](bytes), request[].userData) + + cast[ptr NodeUploadRequest](request[].reqContent).process(codex, onBlockReceived) + + handleRes(await retFut, request) + +proc `$`*(self: CodexThreadRequest): string = + return $self.reqType diff --git a/library/codex_thread_requests/requests/node_debug_request.nim b/library/codex_thread_requests/requests/node_debug_request.nim new file mode 100644 index 00000000..d051ea16 --- /dev/null +++ b/library/codex_thread_requests/requests/node_debug_request.nim @@ -0,0 +1,126 @@ +{.push raises: [].} + +## This file contains the debug info available with Codex. +## The DEBUG type will return info about the P2P node. +## The PEER type is available only with codex_enable_api_debug_peers flag. +## It will return info about a specific peer if available. + +import std/[options] +import chronos +import chronicles +import codexdht/discv5/spr +import ../../alloc +import ../../../codex/conf +import ../../../codex/rest/json +import ../../../codex/node + +from ../../../codex/codex import CodexServer, node + +logScope: + topics = "codexlib codexlibdebug" + +type NodeDebugMsgType* = enum + DEBUG + PEER + LOG_LEVEL + +type NodeDebugRequest* = object + operation: NodeDebugMsgType + peerId: cstring + logLevel: cstring + +proc createShared*( + T: type NodeDebugRequest, + op: NodeDebugMsgType, + peerId: cstring = "", + logLevel: cstring = "", +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].peerId = peerId.alloc() + ret[].logLevel = logLevel.alloc() + return ret + +proc destroyShared(self: ptr NodeDebugRequest) = + deallocShared(self[].peerId) + deallocShared(self[].logLevel) + deallocShared(self) + +proc getDebug( + codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + let node = codex[].node + let table = RestRoutingTable.init(node.discovery.protocol.routingTable) + + let json = + %*{ + "id": $node.switch.peerInfo.peerId, + "addrs": node.switch.peerInfo.addrs.mapIt($it), + "spr": + if node.discovery.dhtRecord.isSome: node.discovery.dhtRecord.get.toURI else: "", + "announceAddresses": node.discovery.announceAddrs, + "table": table, + } + + return ok($json) + +proc getPeer( + codex: ptr CodexServer, peerId: cstring +): Future[Result[string, string]] {.async: (raises: []).} = + when codex_enable_api_debug_peers: + let node = codex[].node + let res = PeerId.init($peerId) + if res.isErr: + return err("Failed to get peer: invalid peer ID " & $peerId & ": " & $res.error()) + + let id = res.get() + + try: + let peerRecord = await node.findPeer(id) + if peerRecord.isNone: + return err("Failed to get peer: peer not found") + + return ok($ %RestPeerRecord.init(peerRecord.get())) + except CancelledError: + return err("Failed to get peer: operation cancelled") + except CatchableError as e: + return err("Failed to get peer: " & e.msg) + else: + return err("Failed to get peer: peer debug API is disabled") + +proc updateLogLevel( + codex: ptr CodexServer, logLevel: cstring +): Future[Result[string, string]] {.async: (raises: []).} = + try: + {.gcsafe.}: + updateLogLevel($logLevel) + except ValueError as err: + return err("Failed to update log level: invalid value for log level: " & err.msg) + + return ok("") + +proc process*( + self: ptr NodeDebugRequest, codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + defer: + destroyShared(self) + + case self.operation + of NodeDebugMsgType.DEBUG: + let res = (await getDebug(codex)) + if res.isErr: + error "Failed to get DEBUG.", error = res.error + return err($res.error) + return res + of NodeDebugMsgType.PEER: + let res = (await getPeer(codex, self.peerId)) + if res.isErr: + error "Failed to get PEER.", error = res.error + return err($res.error) + return res + of NodeDebugMsgType.LOG_LEVEL: + let res = (await updateLogLevel(codex, self.logLevel)) + if res.isErr: + error "Failed to update LOG_LEVEL.", error = res.error + return err($res.error) + return res diff --git a/library/codex_thread_requests/requests/node_download_request.nim b/library/codex_thread_requests/requests/node_download_request.nim new file mode 100644 index 00000000..394b667e --- /dev/null +++ b/library/codex_thread_requests/requests/node_download_request.nim @@ -0,0 +1,336 @@ +{.push raises: [].} + +## This file contains the download request. +## A session is created for each download identified by the CID, +## allowing to resume, pause and cancel the download (using chunks). +## +## There are two ways to download a file: +## 1. Via chunks: the cid parameter is the CID of the file to download. Steps are: +## - INIT: initializes the download session +## - CHUNK: downloads the next chunk of the file +## - CANCEL: cancels the download session +## 2. Via stream. +## - INIT: initializes the download session +## - STREAM: downloads the file in a streaming manner, calling +## the onChunk handler for each chunk and / or writing to a file if filepath is set. +## - CANCEL: cancels the download session + +import std/[options, streams] +import chronos +import chronicles +import libp2p/stream/[lpstream] +import serde/json as serde +import ../../alloc +import ../../../codex/units +import ../../../codex/codextypes + +from ../../../codex/codex import CodexServer, node +from ../../../codex/node import retrieve, fetchManifest +from ../../../codex/rest/json import `%`, RestContent +from libp2p import Cid, init, `$` + +logScope: + topics = "codexlib codexlibdownload" + +type NodeDownloadMsgType* = enum + INIT + CHUNK + STREAM + CANCEL + MANIFEST + +type OnChunkHandler = proc(bytes: seq[byte]): void {.gcsafe, raises: [].} + +type NodeDownloadRequest* = object + operation: NodeDownloadMsgType + cid: cstring + chunkSize: csize_t + local: bool + filepath: cstring + +type + DownloadSessionId* = string + DownloadSessionCount* = int + DownloadSession* = object + stream: LPStream + chunkSize: int + +var downloadSessions {.threadvar.}: Table[DownloadSessionId, DownloadSession] + +proc createShared*( + T: type NodeDownloadRequest, + op: NodeDownloadMsgType, + cid: cstring = "", + chunkSize: csize_t = 0, + local: bool = false, + filepath: cstring = "", +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].cid = cid.alloc() + ret[].chunkSize = chunkSize + ret[].local = local + ret[].filepath = filepath.alloc() + + return ret + +proc destroyShared(self: ptr NodeDownloadRequest) = + deallocShared(self[].cid) + deallocShared(self[].filepath) + deallocShared(self) + +proc init( + codex: ptr CodexServer, cCid: cstring = "", chunkSize: csize_t = 0, local: bool +): Future[Result[string, string]] {.async: (raises: []).} = + ## Init a new session to download the file identified by cid. + ## + ## If the session already exists, do nothing and return ok. + ## Meaning that a cid can only have one active download session. + ## If the chunkSize is 0, the default block size will be used. + ## If local is true, the file will be retrived from the local store. + + let cid = Cid.init($cCid) + if cid.isErr: + return err("Failed to download locally: cannot parse cid: " & $cCid) + + if downloadSessions.contains($cid): + return ok("Download session already exists.") + + let node = codex[].node + var stream: LPStream + + try: + let res = await node.retrieve(cid.get(), local) + if res.isErr(): + return err("Failed to init the download: " & res.error.msg) + stream = res.get() + except CancelledError: + downloadSessions.del($cid) + return err("Failed to init the download: download cancelled.") + + let blockSize = if chunkSize.int > 0: chunkSize.int else: DefaultBlockSize.int + downloadSessions[$cid] = DownloadSession(stream: stream, chunkSize: blockSize) + + return ok("") + +proc chunk( + codex: ptr CodexServer, cCid: cstring = "", onChunk: OnChunkHandler +): Future[Result[string, string]] {.async: (raises: []).} = + ## Download the next chunk of the file identified by cid. + ## The chunk is passed to the onChunk handler. + ## + ## If the stream is at EOF, return ok with empty string. + ## + ## If an error is raised while reading the stream, the session is deleted + ## and an error is returned. + + let cid = Cid.init($cCid) + if cid.isErr: + return err("Failed to download locally: cannot parse cid: " & $cCid) + + if not downloadSessions.contains($cid): + return err("Failed to download chunk: no session for cid " & $cid) + + var session: DownloadSession + try: + session = downloadSessions[$cid] + except KeyError: + return err("Failed to download chunk: no session for cid " & $cid) + + let stream = session.stream + if stream.atEof: + return ok("") + + let chunkSize = session.chunkSize + var buf = newSeq[byte](chunkSize) + + try: + let read = await stream.readOnce(addr buf[0], buf.len) + buf.setLen(read) + except LPStreamError as e: + await stream.close() + downloadSessions.del($cid) + return err("Failed to download chunk: " & $e.msg) + except CancelledError: + await stream.close() + downloadSessions.del($cid) + return err("Failed to download chunk: download cancelled.") + + if buf.len <= 0: + return err("Failed to download chunk: no data") + + onChunk(buf) + + return ok("") + +proc streamData( + codex: ptr CodexServer, + stream: LPStream, + onChunk: OnChunkHandler, + chunkSize: csize_t, + filepath: cstring, +): Future[Result[string, string]] {. + async: (raises: [CancelledError, LPStreamError, IOError]) +.} = + let blockSize = if chunkSize.int > 0: chunkSize.int else: DefaultBlockSize.int + var buf = newSeq[byte](blockSize) + var read = 0 + var outputStream: OutputStreamHandle + var filedest: string = $filepath + + try: + if filepath != "": + outputStream = filedest.fileOutput() + + while not stream.atEof: + ## Yield immediately to the event loop + ## It gives a chance to cancel request to be processed + await sleepAsync(0) + + let read = await stream.readOnce(addr buf[0], buf.len) + buf.setLen(read) + + if buf.len <= 0: + break + + onChunk(buf) + + if outputStream != nil: + outputStream.write(buf) + + if outputStream != nil: + outputStream.close() + finally: + if outputStream != nil: + outputStream.close() + + return ok("") + +proc stream( + codex: ptr CodexServer, + cCid: cstring, + chunkSize: csize_t, + local: bool, + filepath: cstring, + onChunk: OnChunkHandler, +): Future[Result[string, string]] {.raises: [], async: (raises: []).} = + ## Stream the file identified by cid, calling the onChunk handler for each chunk + ## and / or writing to a file if filepath is set. + ## + ## If local is true, the file will be retrieved from the local store. + + let cid = Cid.init($cCid) + if cid.isErr: + return err("Failed to stream: cannot parse cid: " & $cCid) + + if not downloadSessions.contains($cid): + return err("Failed to stream: no session for cid " & $cid) + + var session: DownloadSession + try: + session = downloadSessions[$cid] + except KeyError: + return err("Failed to stream: no session for cid " & $cid) + + let node = codex[].node + + try: + let res = + await noCancel codex.streamData(session.stream, onChunk, chunkSize, filepath) + if res.isErr: + return err($res.error) + except LPStreamError as e: + return err("Failed to stream file: " & $e.msg) + except IOError as e: + return err("Failed to stream file: " & $e.msg) + finally: + if session.stream != nil: + await session.stream.close() + downloadSessions.del($cid) + + return ok("") + +proc cancel( + codex: ptr CodexServer, cCid: cstring +): Future[Result[string, string]] {.raises: [], async: (raises: []).} = + ## Cancel the download session identified by cid. + ## This operation is not supported when using the stream mode, + ## because the worker will be busy downloading the file. + + let cid = Cid.init($cCid) + if cid.isErr: + return err("Failed to cancel : cannot parse cid: " & $cCid) + + if not downloadSessions.contains($cid): + # The session is already cancelled + return ok("") + + var session: DownloadSession + try: + session = downloadSessions[$cid] + except KeyError: + # The session is already cancelled + return ok("") + + let stream = session.stream + await stream.close() + downloadSessions.del($cCid) + + return ok("") + +proc manifest( + codex: ptr CodexServer, cCid: cstring +): Future[Result[string, string]] {.raises: [], async: (raises: []).} = + let cid = Cid.init($cCid) + if cid.isErr: + return err("Failed to fetch manifest: cannot parse cid: " & $cCid) + + try: + let node = codex[].node + let manifest = await node.fetchManifest(cid.get()) + if manifest.isErr: + return err("Failed to fetch manifest: " & manifest.error.msg) + + return ok(serde.toJson(manifest.get())) + except CancelledError: + return err("Failed to fetch manifest: download cancelled.") + +proc process*( + self: ptr NodeDownloadRequest, codex: ptr CodexServer, onChunk: OnChunkHandler +): Future[Result[string, string]] {.async: (raises: []).} = + defer: + destroyShared(self) + + case self.operation + of NodeDownloadMsgType.INIT: + let res = (await init(codex, self.cid, self.chunkSize, self.local)) + if res.isErr: + error "Failed to INIT.", error = res.error + return err($res.error) + return res + of NodeDownloadMsgType.CHUNK: + let res = (await chunk(codex, self.cid, onChunk)) + if res.isErr: + error "Failed to CHUNK.", error = res.error + return err($res.error) + return res + of NodeDownloadMsgType.STREAM: + let res = ( + await stream(codex, self.cid, self.chunkSize, self.local, self.filepath, onChunk) + ) + if res.isErr: + error "Failed to STREAM.", error = res.error + return err($res.error) + return res + of NodeDownloadMsgType.CANCEL: + let res = (await cancel(codex, self.cid)) + if res.isErr: + error "Failed to CANCEL.", error = res.error + return err($res.error) + return res + of NodeDownloadMsgType.MANIFEST: + let res = (await manifest(codex, self.cid)) + if res.isErr: + error "Failed to MANIFEST.", error = res.error + return err($res.error) + return res diff --git a/library/codex_thread_requests/requests/node_info_request.nim b/library/codex_thread_requests/requests/node_info_request.nim new file mode 100644 index 00000000..2e397fcf --- /dev/null +++ b/library/codex_thread_requests/requests/node_info_request.nim @@ -0,0 +1,76 @@ +## This file contains the lifecycle request type that will be handled. + +import std/[options] +import chronos +import chronicles +import confutils +import codexdht/discv5/spr +import ../../../codex/conf +import ../../../codex/rest/json +import ../../../codex/node + +from ../../../codex/codex import CodexServer, config, node + +logScope: + topics = "codexlib codexlibinfo" + +type NodeInfoMsgType* = enum + REPO + SPR + PEERID + +type NodeInfoRequest* = object + operation: NodeInfoMsgType + +proc createShared*(T: type NodeInfoRequest, op: NodeInfoMsgType): ptr type T = + var ret = createShared(T) + ret[].operation = op + return ret + +proc destroyShared(self: ptr NodeInfoRequest) = + deallocShared(self) + +proc getRepo( + codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + return ok($(codex[].config.dataDir)) + +proc getSpr( + codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + let spr = codex[].node.discovery.dhtRecord + if spr.isNone: + return err("Failed to get SPR: no SPR record found.") + + return ok(spr.get.toURI) + +proc getPeerId( + codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + return ok($codex[].node.switch.peerInfo.peerId) + +proc process*( + self: ptr NodeInfoRequest, codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + defer: + destroyShared(self) + + case self.operation + of REPO: + let res = (await getRepo(codex)) + if res.isErr: + error "Failed to get REPO.", error = res.error + return err($res.error) + return res + of SPR: + let res = (await getSpr(codex)) + if res.isErr: + error "Failed to get SPR.", error = res.error + return err($res.error) + return res + of PEERID: + let res = (await getPeerId(codex)) + if res.isErr: + error "Failed to get PEERID.", error = res.error + return err($res.error) + return res diff --git a/library/codex_thread_requests/requests/node_lifecycle_request.nim b/library/codex_thread_requests/requests/node_lifecycle_request.nim new file mode 100644 index 00000000..532facf3 --- /dev/null +++ b/library/codex_thread_requests/requests/node_lifecycle_request.nim @@ -0,0 +1,188 @@ +## This file contains the lifecycle request type that will be handled. +## CREATE_NODE: create a new Codex node with the provided config.json. +## START_NODE: start the provided Codex node. +## STOP_NODE: stop the provided Codex node. + +import std/[options, json, strutils, net, os] +import codexdht/discv5/spr +import stew/shims/parseutils +import contractabi/address +import chronos +import chronicles +import results +import confutils +import confutils/std/net +import confutils/defs +import libp2p +import json_serialization +import json_serialization/std/[options, net] +import ../../alloc +import ../../../codex/conf +import ../../../codex/utils +import ../../../codex/utils/[keyutils, fileutils] +import ../../../codex/units + +from ../../../codex/codex import CodexServer, new, start, stop, close + +logScope: + topics = "codexlib codexliblifecycle" + +type NodeLifecycleMsgType* = enum + CREATE_NODE + START_NODE + STOP_NODE + CLOSE_NODE + +proc readValue*[T: InputFile | InputDir | OutPath | OutDir | OutFile]( + r: var JsonReader, val: var T +) = + val = T(r.readValue(string)) + +proc readValue*(r: var JsonReader, val: var MultiAddress) = + val = MultiAddress.init(r.readValue(string)).get() + +proc readValue*(r: var JsonReader, val: var NatConfig) = + let res = NatConfig.parse(r.readValue(string)) + if res.isErr: + raise + newException(SerializationError, "Cannot parse the NAT config: " & res.error()) + val = res.get() + +proc readValue*(r: var JsonReader, val: var SignedPeerRecord) = + let res = SignedPeerRecord.parse(r.readValue(string)) + if res.isErr: + raise + newException(SerializationError, "Cannot parse the signed peer: " & res.error()) + val = res.get() + +proc readValue*(r: var JsonReader, val: var ThreadCount) = + val = ThreadCount(r.readValue(int)) + +proc readValue*(r: var JsonReader, val: var NBytes) = + val = NBytes(r.readValue(int)) + +proc readValue*(r: var JsonReader, val: var Duration) = + var dur: Duration + let input = r.readValue(string) + let count = parseDuration(input, dur) + if count == 0: + raise newException(SerializationError, "Cannot parse the duration: " & input) + val = dur + +proc readValue*(r: var JsonReader, val: var EthAddress) = + val = EthAddress.init(r.readValue(string)).get() + +type NodeLifecycleRequest* = object + operation: NodeLifecycleMsgType + configJson: cstring + +proc createShared*( + T: type NodeLifecycleRequest, op: NodeLifecycleMsgType, configJson: cstring = "" +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].configJson = configJson.alloc() + return ret + +proc destroyShared(self: ptr NodeLifecycleRequest) = + deallocShared(self[].configJson) + deallocShared(self) + +proc createCodex( + configJson: cstring +): Future[Result[CodexServer, string]] {.async: (raises: []).} = + var conf: CodexConf + + try: + conf = CodexConf.load( + version = codexFullVersion, + envVarsPrefix = "codex", + cmdLine = @[], + secondarySources = proc( + config: CodexConf, sources: auto + ) {.gcsafe, raises: [ConfigurationError].} = + if configJson.len > 0: + sources.addConfigFileContent(Json, $(configJson)) + , + ) + except ConfigurationError as e: + return err("Failed to create codex: unable to load configuration: " & e.msg) + + conf.setupLogging() + + try: + {.gcsafe.}: + updateLogLevel(conf.logLevel) + except ValueError as err: + return err("Failed to create codex: invalid value for log level: " & err.msg) + + conf.setupMetrics() + + if not (checkAndCreateDataDir((conf.dataDir).string)): + # We are unable to access/create data folder or data folder's + # permissions are insecure. + return err( + "Failed to create codex: unable to access/create data folder or data folder's permissions are insecure." + ) + + if not (checkAndCreateDataDir((conf.dataDir / "repo"))): + # We are unable to access/create data folder or data folder's + # permissions are insecure. + return err( + "Failed to create codex: unable to access/create data folder or data folder's permissions are insecure." + ) + + let keyPath = + if isAbsolute(conf.netPrivKeyFile): + conf.netPrivKeyFile + else: + conf.dataDir / conf.netPrivKeyFile + let privateKey = setupKey(keyPath) + if privateKey.isErr: + return err("Failed to create codex: unable to get the private key.") + let pk = privateKey.get() + + conf.apiBindAddress = string.none + + let server = + try: + CodexServer.new(conf, pk) + except Exception as exc: + return err("Failed to create codex: " & exc.msg) + + return ok(server) + +proc process*( + self: ptr NodeLifecycleRequest, codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + defer: + destroyShared(self) + + case self.operation + of CREATE_NODE: + codex[] = ( + await createCodex( + self.configJson # , self.appCallbacks + ) + ).valueOr: + error "Failed to CREATE_NODE.", error = error + return err($error) + of START_NODE: + try: + await codex[].start() + except Exception as e: + error "Failed to START_NODE.", error = e.msg + return err(e.msg) + of STOP_NODE: + try: + await codex[].stop() + except Exception as e: + error "Failed to STOP_NODE.", error = e.msg + return err(e.msg) + of CLOSE_NODE: + try: + await codex[].close() + except Exception as e: + error "Failed to STOP_NODE.", error = e.msg + return err(e.msg) + return ok("") diff --git a/library/codex_thread_requests/requests/node_p2p_request.nim b/library/codex_thread_requests/requests/node_p2p_request.nim new file mode 100644 index 00000000..3bdbbf97 --- /dev/null +++ b/library/codex_thread_requests/requests/node_p2p_request.nim @@ -0,0 +1,95 @@ +{.push raises: [].} + +## This file contains the P2p request type that will be handled. +## CONNECT: connect to a peer with the provided peer ID and optional addresses. + +import std/[options] +import chronos +import chronicles +import libp2p +import ../../alloc +import ../../../codex/node + +from ../../../codex/codex import CodexServer, node + +logScope: + topics = "codexlib codexlibp2p" + +type NodeP2PMsgType* = enum + CONNECT + +type NodeP2PRequest* = object + operation: NodeP2PMsgType + peerId: cstring + peerAddresses: seq[cstring] + +proc createShared*( + T: type NodeP2PRequest, + op: NodeP2PMsgType, + peerId: cstring = "", + peerAddresses: seq[cstring] = @[], +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].peerId = peerId.alloc() + ret[].peerAddresses = peerAddresses + return ret + +proc destroyShared(self: ptr NodeP2PRequest) = + deallocShared(self[].peerId) + deallocShared(self) + +proc connect( + codex: ptr CodexServer, peerId: cstring, peerAddresses: seq[cstring] = @[] +): Future[Result[string, string]] {.async: (raises: []).} = + let node = codex[].node + let res = PeerId.init($peerId) + if res.isErr: + return err("Failed to connect to peer: invalid peer ID: " & $res.error()) + + let id = res.get() + + let addresses = + if peerAddresses.len > 0: + var addrs: seq[MultiAddress] + for addrStr in peerAddresses: + let res = MultiAddress.init($addrStr) + if res.isOk: + addrs.add(res[]) + else: + return err("Failed to connect to peer: invalid address: " & $addrStr) + addrs + else: + try: + let peerRecord = await node.findPeer(id) + if peerRecord.isNone: + return err("Failed to connect to peer: peer not found.") + + peerRecord.get().addresses.mapIt(it.address) + except CancelledError: + return err("Failed to connect to peer: operation cancelled.") + except CatchableError as e: + return err("Failed to connect to peer: " & $e.msg) + + try: + await node.connect(id, addresses) + except CancelledError: + return err("Failed to connect to peer: operation cancelled.") + except CatchableError as e: + return err("Failed to connect to peer: " & $e.msg) + + return ok("") + +proc process*( + self: ptr NodeP2PRequest, codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + defer: + destroyShared(self) + + case self.operation + of NodeP2PMsgType.CONNECT: + let res = (await connect(codex, self.peerId, self.peerAddresses)) + if res.isErr: + error "Failed to CONNECT.", error = res.error + return err($res.error) + return res diff --git a/library/codex_thread_requests/requests/node_storage_request.nim b/library/codex_thread_requests/requests/node_storage_request.nim new file mode 100644 index 00000000..21eceb17 --- /dev/null +++ b/library/codex_thread_requests/requests/node_storage_request.nim @@ -0,0 +1,180 @@ +{.push raises: [].} + +## This file contains the node storage request. +## 4 operations are available: +## - LIST: list all manifests stored in the node. +## - DELETE: Deletes either a single block or an entire dataset from the local node. +## - FETCH: download a file from the network to the local node. +## - SPACE: get the amount of space used by the local node. +## - EXISTS: check the existence of a cid in a node (local store). + +import std/[options] +import chronos +import chronicles +import libp2p/stream/[lpstream] +import serde/json as serde +import ../../alloc +import ../../../codex/units +import ../../../codex/manifest +import ../../../codex/stores/repostore + +from ../../../codex/codex import CodexServer, node, repoStore +from ../../../codex/node import + iterateManifests, fetchManifest, fetchDatasetAsyncTask, delete, hasLocalBlock +from libp2p import Cid, init, `$` + +logScope: + topics = "codexlib codexlibstorage" + +type NodeStorageMsgType* = enum + LIST + DELETE + FETCH + SPACE + EXISTS + +type NodeStorageRequest* = object + operation: NodeStorageMsgType + cid: cstring + +type StorageSpace = object + totalBlocks* {.serialize.}: Natural + quotaMaxBytes* {.serialize.}: NBytes + quotaUsedBytes* {.serialize.}: NBytes + quotaReservedBytes* {.serialize.}: NBytes + +proc createShared*( + T: type NodeStorageRequest, op: NodeStorageMsgType, cid: cstring = "" +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].cid = cid.alloc() + + return ret + +proc destroyShared(self: ptr NodeStorageRequest) = + deallocShared(self[].cid) + deallocShared(self) + +type ManifestWithCid = object + cid {.serialize.}: string + manifest {.serialize.}: Manifest + +proc list( + codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + var manifests = newSeq[ManifestWithCid]() + proc onManifest(cid: Cid, manifest: Manifest) {.raises: [], gcsafe.} = + manifests.add(ManifestWithCid(cid: $cid, manifest: manifest)) + + try: + let node = codex[].node + await node.iterateManifests(onManifest) + except CancelledError: + return err("Failed to list manifests: cancelled operation.") + except CatchableError as err: + return err("Failed to list manifest: : " & err.msg) + + return ok(serde.toJson(manifests)) + +proc delete( + codex: ptr CodexServer, cCid: cstring +): Future[Result[string, string]] {.async: (raises: []).} = + let cid = Cid.init($cCid) + if cid.isErr: + return err("Failed to delete the data: cannot parse cid: " & $cCid) + + let node = codex[].node + try: + let res = await node.delete(cid.get()) + if res.isErr: + return err("Failed to delete the data: " & res.error.msg) + except CancelledError: + return err("Failed to delete the data: cancelled operation.") + except CatchableError as err: + return err("Failed to delete the data: " & err.msg) + + return ok("") + +proc fetch( + codex: ptr CodexServer, cCid: cstring +): Future[Result[string, string]] {.async: (raises: []).} = + let cid = Cid.init($cCid) + if cid.isErr: + return err("Failed to fetch the data: cannot parse cid: " & $cCid) + + try: + let node = codex[].node + let manifest = await node.fetchManifest(cid.get()) + if manifest.isErr: + return err("Failed to fetch the data: " & manifest.error.msg) + + node.fetchDatasetAsyncTask(manifest.get()) + + return ok(serde.toJson(manifest.get())) + except CancelledError: + return err("Failed to fetch the data: download cancelled.") + +proc space( + codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + let repoStore = codex[].repoStore + let space = StorageSpace( + totalBlocks: repoStore.totalBlocks, + quotaMaxBytes: repoStore.quotaMaxBytes, + quotaUsedBytes: repoStore.quotaUsedBytes, + quotaReservedBytes: repoStore.quotaReservedBytes, + ) + return ok(serde.toJson(space)) + +proc exists( + codex: ptr CodexServer, cCid: cstring +): Future[Result[string, string]] {.async: (raises: []).} = + let cid = Cid.init($cCid) + if cid.isErr: + return err("Failed to check the data existence: cannot parse cid: " & $cCid) + + try: + let node = codex[].node + let exists = await node.hasLocalBlock(cid.get()) + return ok($exists) + except CancelledError: + return err("Failed to check the data existence: operation cancelled.") + +proc process*( + self: ptr NodeStorageRequest, codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + defer: + destroyShared(self) + + case self.operation + of NodeStorageMsgType.LIST: + let res = (await list(codex)) + if res.isErr: + error "Failed to LIST.", error = res.error + return err($res.error) + return res + of NodeStorageMsgType.DELETE: + let res = (await delete(codex, self.cid)) + if res.isErr: + error "Failed to DELETE.", error = res.error + return err($res.error) + return res + of NodeStorageMsgType.FETCH: + let res = (await fetch(codex, self.cid)) + if res.isErr: + error "Failed to FETCH.", error = res.error + return err($res.error) + return res + of NodeStorageMsgType.SPACE: + let res = (await space(codex)) + if res.isErr: + error "Failed to SPACE.", error = res.error + return err($res.error) + return res + of NodeStorageMsgType.EXISTS: + let res = (await exists(codex, self.cid)) + if res.isErr: + error "Failed to EXISTS.", error = res.error + return err($res.error) + return res diff --git a/library/codex_thread_requests/requests/node_upload_request.nim b/library/codex_thread_requests/requests/node_upload_request.nim new file mode 100644 index 00000000..a850abb7 --- /dev/null +++ b/library/codex_thread_requests/requests/node_upload_request.nim @@ -0,0 +1,372 @@ +{.push raises: [].} + +## This file contains the upload request. +## A session is created for each upload allowing to resume, +## pause and cancel uploads (using chunks). +## +## There are two ways to upload a file: +## 1. Via chunks: the filepath parameter is the data filename. Steps are: +## - INIT: creates a new upload session and returns its ID. +## - CHUNK: sends a chunk of data to the upload session. +## - FINALIZE: finalizes the upload and returns the CID of the uploaded file. +## - CANCEL: cancels the upload session. +## +## 2. Directly from a file path: the filepath has to be absolute. +## - INIT: creates a new upload session and returns its ID +## - FILE: starts the upload and returns the CID of the uploaded file +## - CANCEL: cancels the upload session. + +import std/[options, os, mimetypes] +import chronos +import chronicles +import questionable +import questionable/results +import faststreams/inputs +import libp2p/stream/[bufferstream, lpstream] +import ../../alloc +import ../../../codex/units +import ../../../codex/codextypes + +from ../../../codex/codex import CodexServer, node +from ../../../codex/node import store +from libp2p import Cid, `$` + +logScope: + topics = "codexlib codexlibupload" + +type NodeUploadMsgType* = enum + INIT + CHUNK + FINALIZE + CANCEL + FILE + +type OnProgressHandler = proc(bytes: int): void {.gcsafe, raises: [].} + +type NodeUploadRequest* = object + operation: NodeUploadMsgType + sessionId: cstring + filepath: cstring + chunk: seq[byte] + chunkSize: csize_t + +type + UploadSessionId* = string + UploadSessionCount* = int + UploadSession* = object + stream: BufferStream + fut: Future[?!Cid] + filepath: string + chunkSize: int + onProgress: OnProgressHandler + +var uploadSessions {.threadvar.}: Table[UploadSessionId, UploadSession] +var nexUploadSessionCount {.threadvar.}: UploadSessionCount + +proc createShared*( + T: type NodeUploadRequest, + op: NodeUploadMsgType, + sessionId: cstring = "", + filepath: cstring = "", + chunk: seq[byte] = @[], + chunkSize: csize_t = 0, +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].sessionId = sessionId.alloc() + ret[].filepath = filepath.alloc() + ret[].chunk = chunk + ret[].chunkSize = chunkSize + + return ret + +proc destroyShared(self: ptr NodeUploadRequest) = + deallocShared(self[].filepath) + deallocShared(self[].sessionId) + deallocShared(self) + +proc init( + codex: ptr CodexServer, filepath: cstring = "", chunkSize: csize_t = 0 +): Future[Result[string, string]] {.async: (raises: []).} = + ## Init a new session upload and return its ID. + ## The session contains the future corresponding to the + ## `node.store` call. + ## The filepath can be: + ## - the filename when uploading via chunks + ## - the absolute path to a file when uploading directly. + ## The mimetype is deduced from the filename extension. + ## + ## The chunkSize matches by default the block size used to store the file. + ## + ## A callback `onBlockStore` is provided to `node.store` to + ## report the progress of the upload. This callback will check + ## that an `onProgress` handler is set in the session + ## and call it with the number of bytes stored each time a block + ## is stored. + + var filenameOpt, mimetypeOpt = string.none + + if isAbsolute($filepath): + if not fileExists($filepath): + return err( + "Failed to create an upload session, the filepath does not exist: " & $filepath + ) + + if filepath != "": + let (_, name, ext) = splitFile($filepath) + + filenameOpt = (name & ext).some + + if ext != "": + let extNoDot = + if ext.len > 0: + ext[1 ..^ 1] + else: + "" + let mime = newMimetypes() + let mimetypeStr = mime.getMimetype(extNoDot, "") + + mimetypeOpt = if mimetypeStr == "": string.none else: mimetypeStr.some + + let sessionId = $nexUploadSessionCount + nexUploadSessionCount.inc() + + let stream = BufferStream.new() + let lpStream = LPStream(stream) + let node = codex[].node + + let onBlockStored = proc(chunk: seq[byte]): void {.gcsafe, raises: [].} = + try: + if uploadSessions.contains($sessionId): + let session = uploadSessions[$sessionId] + if session.onProgress != nil: + session.onProgress(chunk.len) + except KeyError: + error "Failed to push progress update, session is not found: ", + sessionId = $sessionId + + let blockSize = + if chunkSize.NBytes > 0.NBytes: chunkSize.NBytes else: DefaultBlockSize + let fut = node.store(lpStream, filenameOpt, mimetypeOpt, blockSize, onBlockStored) + + uploadSessions[sessionId] = UploadSession( + stream: stream, fut: fut, filepath: $filepath, chunkSize: blockSize.int + ) + + return ok(sessionId) + +proc chunk( + codex: ptr CodexServer, sessionId: cstring, chunk: seq[byte] +): Future[Result[string, string]] {.async: (raises: []).} = + ## Upload a chunk of data to the session identified by sessionId. + ## The chunk is pushed to the BufferStream of the session. + ## If the chunk size is equal or greater than the session chunkSize, + ## the `onProgress` callback is temporarily set to receive the progress + ## from `onBlockStored` callback. This provide a way to report progress + ## precisely when a block is stored. + ## If the chunk size is smaller than the session chunkSize, + ## the `onProgress` callback is not set because the LPStream will + ## wait until enough data is received to form a block before storing it. + ## The wrapper may then report the progress because the data is in the stream + ## but not yet stored. + + if not uploadSessions.contains($sessionId): + return err("Failed to upload the chunk, the session is not found: " & $sessionId) + + var fut = newFuture[void]() + + try: + let session = uploadSessions[$sessionId] + + if chunk.len >= session.chunkSize: + uploadSessions[$sessionId].onProgress = proc( + bytes: int + ): void {.gcsafe, raises: [].} = + fut.complete() + await session.stream.pushData(chunk) + else: + fut = session.stream.pushData(chunk) + + await fut + + uploadSessions[$sessionId].onProgress = nil + except KeyError: + return err("Failed to upload the chunk, the session is not found: " & $sessionId) + except LPError as e: + return err("Failed to upload the chunk, stream error: " & $e.msg) + except CancelledError: + return err("Failed to upload the chunk, operation cancelled.") + except CatchableError as e: + return err("Failed to upload the chunk: " & $e.msg) + finally: + if not fut.finished(): + fut.cancelSoon() + + return ok("") + +proc finalize( + codex: ptr CodexServer, sessionId: cstring +): Future[Result[string, string]] {.async: (raises: []).} = + ## Finalize the upload session identified by sessionId. + ## This closes the BufferStream and waits for the `node.store` future + ## to complete. It returns the CID of the uploaded file. + + if not uploadSessions.contains($sessionId): + return + err("Failed to finalize the upload session, session not found: " & $sessionId) + + var session: UploadSession + try: + session = uploadSessions[$sessionId] + await session.stream.pushEof() + + let res = await session.fut + if res.isErr: + return err("Failed to finalize the upload session: " & res.error().msg) + + return ok($res.get()) + except KeyError: + return + err("Failed to finalize the upload session, invalid session ID: " & $sessionId) + except LPStreamError as e: + return err("Failed to finalize the upload session, stream error: " & $e.msg) + except CancelledError: + return err("Failed to finalize the upload session, operation cancelled") + except CatchableError as e: + return err("Failed to finalize the upload session: " & $e.msg) + finally: + if uploadSessions.contains($sessionId): + uploadSessions.del($sessionId) + + if session.fut != nil and not session.fut.finished(): + session.fut.cancelSoon() + +proc cancel( + codex: ptr CodexServer, sessionId: cstring +): Future[Result[string, string]] {.async: (raises: []).} = + ## Cancel the upload session identified by sessionId. + ## This cancels the `node.store` future and removes the session + ## from the table. + + if not uploadSessions.contains($sessionId): + # Session not found, nothing to cancel + return ok("") + + try: + let session = uploadSessions[$sessionId] + session.fut.cancelSoon() + except KeyError: + # Session not found, nothing to cancel + return ok("") + + uploadSessions.del($sessionId) + + return ok("") + +proc streamFile( + filepath: string, stream: BufferStream, chunkSize: int +): Future[Result[void, string]] {.async: (raises: [CancelledError]).} = + ## Streams a file from the given filepath using faststream. + ## fsMultiSync cannot be used with chronos because of this warning: + ## Warning: chronos backend uses nested calls to `waitFor` which + ## is not supported by chronos - it is not recommended to use it until + ## this has been resolved. + ## + ## Ideally when it is solved, we should use fsMultiSync or find a way to use async + ## file I/O with chronos, see https://github.com/status-im/nim-chronos/issues/501. + + try: + let inputStreamHandle = filepath.fileInput() + let inputStream = inputStreamHandle.implicitDeref + + var buf = newSeq[byte](chunkSize) + while inputStream.readable: + let read = inputStream.readIntoEx(buf) + if read == 0: + break + await stream.pushData(buf[0 ..< read]) + # let byt = inputStream.read + # await stream.pushData(@[byt]) + return ok() + except IOError, OSError, LPStreamError: + let e = getCurrentException() + return err("Failed to stream the file: " & $e.msg) + +proc file( + codex: ptr CodexServer, sessionId: cstring, onProgress: OnProgressHandler +): Future[Result[string, string]] {.async: (raises: []).} = + ## Starts the file upload for the session identified by sessionId. + ## Will call finalize when done and return the CID of the uploaded file. + ## + ## The onProgress callback is called with the number of bytes + ## to report the progress of the upload. + + if not uploadSessions.contains($sessionId): + return err("Failed to upload the file, invalid session ID: " & $sessionId) + + var session: UploadSession + + try: + uploadSessions[$sessionId].onProgress = onProgress + session = uploadSessions[$sessionId] + + let res = await streamFile(session.filepath, session.stream, session.chunkSize) + if res.isErr: + return err("Failed to upload the file: " & res.error) + + return await codex.finalize(sessionId) + except KeyError: + return err("Failed to upload the file, the session is not found: " & $sessionId) + except LPStreamError, IOError: + let e = getCurrentException() + return err("Failed to upload the file: " & $e.msg) + except CancelledError: + return err("Failed to upload the file, the operation is cancelled.") + except CatchableError as e: + return err("Failed to upload the file: " & $e.msg) + finally: + if uploadSessions.contains($sessionId): + uploadSessions.del($sessionId) + + if session.fut != nil and not session.fut.finished(): + session.fut.cancelSoon() + +proc process*( + self: ptr NodeUploadRequest, + codex: ptr CodexServer, + onUploadProgress: OnProgressHandler = nil, +): Future[Result[string, string]] {.async: (raises: []).} = + defer: + destroyShared(self) + + case self.operation + of NodeUploadMsgType.INIT: + let res = (await init(codex, self.filepath, self.chunkSize)) + if res.isErr: + error "Failed to INIT.", error = res.error + return err($res.error) + return res + of NodeUploadMsgType.CHUNK: + let res = (await chunk(codex, self.sessionId, self.chunk)) + if res.isErr: + error "Failed to CHUNK.", error = res.error + return err($res.error) + return res + of NodeUploadMsgType.FINALIZE: + let res = (await finalize(codex, self.sessionId)) + if res.isErr: + error "Failed to FINALIZE.", error = res.error + return err($res.error) + return res + of NodeUploadMsgType.CANCEL: + let res = (await cancel(codex, self.sessionId)) + if res.isErr: + error "Failed to CANCEL.", error = res.error + return err($res.error) + return res + of NodeUploadMsgType.FILE: + let res = (await file(codex, self.sessionId, onUploadProgress)) + if res.isErr: + error "Failed to FILE.", error = res.error + return err($res.error) + return res diff --git a/library/events/json_base_event.nim b/library/events/json_base_event.nim new file mode 100644 index 00000000..743444ed --- /dev/null +++ b/library/events/json_base_event.nim @@ -0,0 +1,14 @@ +# JSON Event definition +# +# This file defines de JsonEvent type, which serves as the base +# for all event types in the library +# +# Reference specification: +# https://github.com/vacp2p/rfc/blob/master/content/docs/rfcs/36/README.md#jsonsignal-type + +type JsonEvent* = ref object of RootObj + eventType* {.requiresInit.}: string + +method `$`*(jsonEvent: JsonEvent): string {.base.} = + discard + # All events should implement this diff --git a/library/ffi_types.nim b/library/ffi_types.nim new file mode 100644 index 00000000..1a865eaf --- /dev/null +++ b/library/ffi_types.nim @@ -0,0 +1,62 @@ +# FFI Types and Utilities +# +# This file defines the core types and utilities for the library's foreign +# function interface (FFI), enabling interoperability with external code. + +################################################################################ +### Exported types +import results + +type CodexCallback* = proc( + callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer +) {.cdecl, gcsafe, raises: [].} + +const RET_OK*: cint = 0 +const RET_ERR*: cint = 1 +const RET_MISSING_CALLBACK*: cint = 2 +const RET_PROGRESS*: cint = 3 + +## Returns RET_OK as acknowledgment and call the callback +## with RET_OK code and the provided message. +proc success*(callback: CodexCallback, msg: string, userData: pointer): cint = + callback(RET_OK, cast[ptr cchar](msg), cast[csize_t](len(msg)), userData) + + return RET_OK + +## Returns RET_ERR as acknowledgment and call the callback +## with RET_ERR code and the provided message. +proc error*(callback: CodexCallback, msg: string, userData: pointer): cint = + let msg = "libcodex error: " & msg + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + + return RET_ERR + +## Returns RET_OK as acknowledgment if the result is ok. +## If not, return RET_ERR and call the callback with the error message. +proc okOrError*[T]( + callback: CodexCallback, res: Result[T, string], userData: pointer +): cint = + if res.isOk: + return RET_OK + + return callback.error($res.error, userData) + +### End of exported types +################################################################################ + +################################################################################ +### FFI utils + +template foreignThreadGc*(body: untyped) = + when declared(setupForeignThreadGc): + setupForeignThreadGc() + + body + + when declared(tearDownForeignThreadGc): + tearDownForeignThreadGc() + +type onDone* = proc() + +### End of FFI utils +################################################################################ diff --git a/library/libcodex.h b/library/libcodex.h new file mode 100644 index 00000000..bade4c95 --- /dev/null +++ b/library/libcodex.h @@ -0,0 +1,206 @@ +/** +* libcodex.h - C Interface for Example Library +* +* This header provides the public API for libcodex +* +* To see the auto-generated header by Nim, run `make libcodex` from the +* repository root. The generated file will be created at: +* nimcache/release/libcodex/libcodex.h +*/ + +#ifndef __libcodex__ +#define __libcodex__ + +#include +#include + +// The possible returned values for the functions that return int +#define RET_OK 0 +#define RET_ERR 1 +#define RET_MISSING_CALLBACK 2 +#define RET_PROGRESS 3 + +#ifdef __cplusplus +extern "C" { +#endif + +typedef void (*CodexCallback) (int callerRet, const char* msg, size_t len, void* userData); + +void* codex_new( + const char* configJson, + CodexCallback callback, + void* userData); + +int codex_version( + void* ctx, + CodexCallback callback, + void* userData); + +int codex_revision( + void* ctx, + CodexCallback callback, + void* userData); + +int codex_repo( + void* ctx, + CodexCallback callback, + void* userData); + +int codex_debug( + void* ctx, + CodexCallback callback, + void* userData); + +int codex_spr( + void* ctx, + CodexCallback callback, + void* userData); + +int codex_peer_id( + void* ctx, + CodexCallback callback, + void* userData); + +int codex_log_level( + void* ctx, + const char* logLevel, + CodexCallback callback, + void* userData); + +int codex_connect( + void* ctx, + const char* peerId, + const char** peerAddresses, + size_t peerAddressesSize, + CodexCallback callback, + void* userData); + +int codex_peer_debug( + void* ctx, + const char* peerId, + CodexCallback callback, + void* userData); + + +int codex_upload_init( + void* ctx, + const char* filepath, + size_t chunkSize, + CodexCallback callback, + void* userData); + +int codex_upload_chunk( + void* ctx, + const char* sessionId, + const uint8_t* chunk, + size_t len, + CodexCallback callback, + void* userData); + +int codex_upload_finalize( + void* ctx, + const char* sessionId, + CodexCallback callback, + void* userData); + +int codex_upload_cancel( + void* ctx, + const char* sessionId, + CodexCallback callback, + void* userData); + +int codex_upload_file( + void* ctx, + const char* sessionId, + CodexCallback callback, + void* userData); + +int codex_download_stream( + void* ctx, + const char* cid, + size_t chunkSize, + bool local, + const char* filepath, + CodexCallback callback, + void* userData); + +int codex_download_init( + void* ctx, + const char* cid, + size_t chunkSize, + bool local, + CodexCallback callback, + void* userData); + +int codex_download_chunk( + void* ctx, + const char* cid, + CodexCallback callback, + void* userData); + +int codex_download_cancel( + void* ctx, + const char* cid, + CodexCallback callback, + void* userData); + +int codex_download_manifest( + void* ctx, + const char* cid, + CodexCallback callback, + void* userData); + +int codex_storage_list( + void* ctx, + CodexCallback callback, + void* userData); + +int codex_storage_space( + void* ctx, + CodexCallback callback, + void* userData); + +int codex_storage_delete( + void* ctx, + const char* cid, + CodexCallback callback, + void* userData); + +int codex_storage_fetch( + void* ctx, + const char* cid, + CodexCallback callback, + void* userData); + +int codex_storage_exists( + void* ctx, + const char* cid, + CodexCallback callback, + void* userData); + +int codex_start(void* ctx, + CodexCallback callback, + void* userData); + +int codex_stop(void* ctx, + CodexCallback callback, + void* userData); + +int codex_close(void* ctx, + CodexCallback callback, + void* userData); + +// Destroys an instance of a codex node created with codex_new +int codex_destroy(void* ctx, + CodexCallback callback, + void* userData); + +void codex_set_event_callback(void* ctx, + CodexCallback callback, + void* userData); + +#ifdef __cplusplus +} +#endif + +#endif /* __libcodex__ */ \ No newline at end of file diff --git a/library/libcodex.nim b/library/libcodex.nim new file mode 100644 index 00000000..d1c7e052 --- /dev/null +++ b/library/libcodex.nim @@ -0,0 +1,565 @@ +# libcodex.nim - C-exported interface for the Codex shared library +# +# This file implements the public C API for libcodex. +# It acts as the bridge between C programs and the internal Nim implementation. +# +# This file defines: +# - Initialization logic for the Nim runtime (once per process) +# - Thread-safe exported procs callable from C +# - Callback registration and invocation for asynchronous communication + +# cdecl is C declaration calling convention. +# It’s the standard way C compilers expect functions to behave: +# 1- Caller cleans up the stack after the call +# 2- Symbol names are exported in a predictable way +# In other termes, it is a glue that makes Nim functions callable as normal C functions. +{.pragma: exported, exportc, cdecl, raises: [].} +{.pragma: callback, cdecl, raises: [], gcsafe.} + +# Ensure code is position-independent so it can be built into a shared library (.so). +# In other terms, the code that can run no matter where it’s placed in memory. +{.passc: "-fPIC".} + +when defined(linux): + # Define the canonical name for this library + {.passl: "-Wl,-soname,libcodex.so".} + +import std/[atomics] +import chronicles +import chronos +import chronos/threadsync +import ./codex_context +import ./codex_thread_requests/codex_thread_request +import ./codex_thread_requests/requests/node_lifecycle_request +import ./codex_thread_requests/requests/node_info_request +import ./codex_thread_requests/requests/node_debug_request +import ./codex_thread_requests/requests/node_p2p_request +import ./codex_thread_requests/requests/node_upload_request +import ./codex_thread_requests/requests/node_download_request +import ./codex_thread_requests/requests/node_storage_request +import ./ffi_types + +from ../codex/conf import codexVersion + +logScope: + topics = "codexlib" + +template checkLibcodexParams*( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +) = + if not isNil(ctx): + ctx[].userData = userData + + if isNil(callback): + return RET_MISSING_CALLBACK + +# From Nim doc: +# "the C targets require you to initialize Nim's internals, which is done calling a NimMain function." +# "The name NimMain can be influenced via the --nimMainPrefix:prefix switch." +# "Use --nimMainPrefix:MyLib and the function to call is named MyLibNimMain." +proc libcodexNimMain() {.importc.} + +# Atomic flag to prevent multiple initializations +var initialized: Atomic[bool] + +if defined(android): + # Redirect chronicles to Android System logs + when compiles(defaultChroniclesStream.outputs[0].writer): + defaultChroniclesStream.outputs[0].writer = proc( + logLevel: LogLevel, msg: LogOutputStr + ) {.raises: [].} = + echo logLevel, msg + +# Initializes the Nim runtime and foreign-thread GC +proc initializeLibrary() {.exported.} = + if not initialized.exchange(true): + ## Every Nim library must call `NimMain()` once + libcodexNimMain() + when declared(setupForeignThreadGc): + setupForeignThreadGc() + when declared(nimGC_setStackBottom): + var locals {.volatile, noinit.}: pointer + locals = addr(locals) + nimGC_setStackBottom(locals) + +proc codex_new( + configJson: cstring, callback: CodexCallback, userData: pointer +): pointer {.dynlib, exported.} = + initializeLibrary() + + if isNil(callback): + error "Failed to create codex instance: the callback is missing." + return nil + + var ctx = codex_context.createCodexContext().valueOr: + let msg = $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return nil + + ctx.userData = userData + + let reqContent = + NodeLifecycleRequest.createShared(NodeLifecycleMsgType.CREATE_NODE, configJson) + + codex_context.sendRequestToCodexThread( + ctx, RequestType.LIFECYCLE, reqContent, callback, userData + ).isOkOr: + let msg = $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return nil + + return ctx + +proc codex_version( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + callback( + RET_OK, + cast[ptr cchar](conf.codexVersion), + cast[csize_t](len(conf.codexVersion)), + userData, + ) + + return RET_OK + +proc codex_revision( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + callback( + RET_OK, + cast[ptr cchar](conf.codexRevision), + cast[csize_t](len(conf.codexRevision)), + userData, + ) + + return RET_OK + +proc codex_repo( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent = NodeInfoRequest.createShared(NodeInfoMsgType.REPO) + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.INFO, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_debug( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent = NodeDebugRequest.createShared(NodeDebugMsgType.DEBUG) + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.DEBUG, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_spr( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent = NodeInfoRequest.createShared(NodeInfoMsgType.SPR) + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.INFO, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_peer_id( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent = NodeInfoRequest.createShared(NodeInfoMsgType.PEERID) + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.INFO, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +## Set the log level of the library at runtime. +## It uses updateLogLevel which is a synchronous proc and +## cannot be used inside an async context because of gcsafe issue. +proc codex_log_level( + ctx: ptr CodexContext, logLevel: cstring, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent = + NodeDebugRequest.createShared(NodeDebugMsgType.LOG_LEVEL, logLevel = logLevel) + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.DEBUG, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_connect( + ctx: ptr CodexContext, + peerId: cstring, + peerAddressesPtr: ptr cstring, + peerAddressesLength: csize_t, + callback: CodexCallback, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + var peerAddresses = newSeq[cstring](peerAddressesLength) + let peers = cast[ptr UncheckedArray[cstring]](peerAddressesPtr) + for i in 0 ..< peerAddressesLength: + peerAddresses[i] = peers[i] + + let reqContent = NodeP2PRequest.createShared( + NodeP2PMsgType.CONNECT, peerId = peerId, peerAddresses = peerAddresses + ) + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.P2P, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_peer_debug( + ctx: ptr CodexContext, peerId: cstring, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent = NodeDebugRequest.createShared(NodeDebugMsgType.PEER, peerId = peerId) + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.DEBUG, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_close( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent = NodeLifecycleRequest.createShared(NodeLifecycleMsgType.CLOSE_NODE) + var res = codex_context.sendRequestToCodexThread( + ctx, RequestType.LIFECYCLE, reqContent, callback, userData + ) + if res.isErr: + return callback.error(res.error, userData) + + return callback.okOrError(res, userData) + +proc codex_destroy( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let res = codex_context.destroyCodexContext(ctx) + if res.isErr: + return RET_ERR + + return RET_OK + +proc codex_upload_init( + ctx: ptr CodexContext, + filepath: cstring, + chunkSize: csize_t, + callback: CodexCallback, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent = NodeUploadRequest.createShared( + NodeUploadMsgType.INIT, filepath = filepath, chunkSize = chunkSize + ) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.UPLOAD, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_upload_chunk( + ctx: ptr CodexContext, + sessionId: cstring, + data: ptr byte, + len: csize_t, + callback: CodexCallback, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let chunk = newSeq[byte](len) + copyMem(addr chunk[0], data, len) + + let reqContent = NodeUploadRequest.createShared( + NodeUploadMsgType.CHUNK, sessionId = sessionId, chunk = chunk + ) + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.UPLOAD, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_upload_finalize( + ctx: ptr CodexContext, + sessionId: cstring, + callback: CodexCallback, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent = + NodeUploadRequest.createShared(NodeUploadMsgType.FINALIZE, sessionId = sessionId) + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.UPLOAD, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_upload_cancel( + ctx: ptr CodexContext, + sessionId: cstring, + callback: CodexCallback, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent = + NodeUploadRequest.createShared(NodeUploadMsgType.CANCEL, sessionId = sessionId) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.UPLOAD, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_upload_file( + ctx: ptr CodexContext, + sessionId: cstring, + callback: CodexCallback, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent = + NodeUploadRequest.createShared(NodeUploadMsgType.FILE, sessionId = sessionId) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.UPLOAD, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_download_init( + ctx: ptr CodexContext, + cid: cstring, + chunkSize: csize_t, + local: bool, + callback: CodexCallback, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let req = NodeDownloadRequest.createShared( + NodeDownloadMsgType.INIT, cid = cid, chunkSize = chunkSize, local = local + ) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.DOWNLOAD, req, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_download_chunk( + ctx: ptr CodexContext, cid: cstring, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let req = NodeDownloadRequest.createShared(NodeDownloadMsgType.CHUNK, cid = cid) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.DOWNLOAD, req, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_download_stream( + ctx: ptr CodexContext, + cid: cstring, + chunkSize: csize_t, + local: bool, + filepath: cstring, + callback: CodexCallback, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let req = NodeDownloadRequest.createShared( + NodeDownloadMsgType.STREAM, + cid = cid, + chunkSize = chunkSize, + local = local, + filepath = filepath, + ) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.DOWNLOAD, req, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_download_cancel( + ctx: ptr CodexContext, cid: cstring, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let req = NodeDownloadRequest.createShared(NodeDownloadMsgType.CANCEL, cid = cid) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.DOWNLOAD, req, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_download_manifest( + ctx: ptr CodexContext, cid: cstring, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let req = NodeDownloadRequest.createShared(NodeDownloadMsgType.MANIFEST, cid = cid) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.DOWNLOAD, req, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_storage_list( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let req = NodeStorageRequest.createShared(NodeStorageMsgType.LIST) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.STORAGE, req, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_storage_space( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let req = NodeStorageRequest.createShared(NodeStorageMsgType.SPACE) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.STORAGE, req, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_storage_delete( + ctx: ptr CodexContext, cid: cstring, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let req = NodeStorageRequest.createShared(NodeStorageMsgType.DELETE, cid = cid) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.STORAGE, req, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_storage_fetch( + ctx: ptr CodexContext, cid: cstring, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let req = NodeStorageRequest.createShared(NodeStorageMsgType.FETCH, cid = cid) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.STORAGE, req, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_storage_exists( + ctx: ptr CodexContext, cid: cstring, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let req = NodeStorageRequest.createShared(NodeStorageMsgType.EXISTS, cid = cid) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.STORAGE, req, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_start( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent: ptr NodeLifecycleRequest = + NodeLifecycleRequest.createShared(NodeLifecycleMsgType.START_NODE) + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.LIFECYCLE, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_stop( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent: ptr NodeLifecycleRequest = + NodeLifecycleRequest.createShared(NodeLifecycleMsgType.STOP_NODE) + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.LIFECYCLE, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_set_event_callback( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +) {.dynlib, exportc.} = + initializeLibrary() + ctx[].eventCallback = cast[pointer](callback) + ctx[].eventUserData = userData diff --git a/tests/codex/helpers/mockclock.nim b/tests/codex/helpers/mockclock.nim index be1eb4d2..76446041 100644 --- a/tests/codex/helpers/mockclock.nim +++ b/tests/codex/helpers/mockclock.nim @@ -33,11 +33,18 @@ proc advance*(clock: MockClock, seconds: int64) = method now*(clock: MockClock): SecondsSince1970 = clock.time -method waitUntil*(clock: MockClock, time: SecondsSince1970) {.async.} = - if time > clock.now(): - let future = newFuture[void]() - clock.waiting.add(Waiting(until: time, future: future)) - await future +method waitUntil*( + clock: MockClock, time: SecondsSince1970 +) {.async: (raises: [CancelledError]).} = + try: + if time > clock.now(): + let future = newFuture[void]() + clock.waiting.add(Waiting(until: time, future: future)) + await future + except CancelledError as e: + raise e + except Exception as e: + discard proc isWaiting*(clock: MockClock): bool = clock.waiting.len > 0 diff --git a/tests/integration/codexprocess.nim b/tests/integration/codexprocess.nim index 3eca5b04..351a78e2 100644 --- a/tests/integration/codexprocess.nim +++ b/tests/integration/codexprocess.nim @@ -51,7 +51,8 @@ proc ethAccount*(node: CodexProcess): Address = proc apiUrl*(node: CodexProcess): string = let config = CodexConf.load(cmdLine = node.arguments, quitOnFailure = false) - return "http://" & config.apiBindAddress & ":" & $config.apiPort & "/api/codex/v1" + return + "http://" & config.apiBindAddress.get() & ":" & $config.apiPort & "/api/codex/v1" proc client*(node: CodexProcess): CodexClient = if client =? node.client: