From 05595e2d92c6618d5d75ed8dc0fa5ef3ebda9376 Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Wed, 3 Jun 2026 13:22:11 +0200 Subject: [PATCH] =?UTF-8?q?feat(ffi):=20target=20nim-ffi=20master=20(v0.2.?= =?UTF-8?q?0)=20=E2=80=94=20CBOR=20+=20event=20registry?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Port the {.ffi.} wrapper to nim-ffi 0.2.0 (master). 0.2.0 is a breaking redesign over 0.1.4: events move to a per-context multi-listener registry (sds_add_event_listener / sds_remove_event_listener) fired via {.ffiEvent.} emitters, and request/response/event marshalling switches from JSON to CBOR. - libsds.nim: typed {.ffiEvent.} payloads replace the JSON event modules; CBOR handles nesting, so unwrap returns a typed response again. The retrieval-hint provider (a C function pointer, not CBOR-encodable) passes its address as a uint64 via a {.ffi.} method that stores it in a worker-thread threadvar. - pin nim-ffi to master HEAD by commit. The lock version is kept a clean semver ("0.2.0") on purpose: nimble's `#`-prefixed special version in the lock breaks `nimble setup -l` (an unquoted `#` truncates the git path), so only vcsRevision carries the commit. - add the new transitive dep cbor_serialization to the lock and nix/deps.nix. - regenerate libsds.h for the CBOR/registry ABI. Co-Authored-By: Claude Opus 4.8 --- library/libsds.h | 78 +++++++++++------- library/libsds.nim | 192 +++++++++++++++++++++++++-------------------- nimble.lock | 23 +++++- nix/deps.nix | 11 ++- sds.nimble | 2 +- 5 files changed, 186 insertions(+), 120 deletions(-) diff --git a/library/libsds.h b/library/libsds.h index 01ce4fa..3c5d982 100644 --- a/library/libsds.h +++ b/library/libsds.h @@ -1,10 +1,11 @@ -// C API for libsds, built on the nim-ffi framework. +// C API for libsds, built on the nim-ffi framework (v0.2.0+). // -// Parameters and results are marshalled as JSON: each request/response struct -// in library/libsds.nim is a JSON object, passed in via the `*Json` cstring -// argument and returned to the callback as a JSON string. Binary fields -// (message bytes) are JSON arrays of byte values. +// Requests, responses and events are marshalled as CBOR. Request payloads are +// passed as a (reqCbor, reqCborLen) byte buffer; results and events are +// delivered to the callback as a CBOR buffer (msg, len). Each request/response +// struct and event payload is defined in library/libsds.nim. Events are +// wrapped in a CBOR envelope { eventType: , payload: }. #ifndef __libsds__ #define __libsds__ @@ -20,48 +21,67 @@ extern "C" { #endif -// Result/event callback. `msg` is the (JSON) payload of length `len`. +// Result/event callback. `msg` is the CBOR payload of length `len`. // callerRet is one of the RET_* codes above. typedef void (*SdsCallBack) (int callerRet, const char* msg, size_t len, void* userData); // Synchronous provider invoked by SDS-R to fetch a retrieval hint for a // message id. The implementation allocates `*hint` (and sets `*hintLen`); the -// library takes ownership and frees it with deallocShared. +// library takes ownership and frees it with deallocShared. Registered via +// sds_set_retrieval_hint_provider (see below). typedef void (*SdsRetrievalHintProvider) (const char* messageId, char** hint, size_t* hintLen, void* userData); -// --- Core API Functions --- +// --- Lifecycle ------------------------------------------------------------- + +// Create a context + ReliabilityManager. reqCbor encodes SdsConfig +// { participantId: tstr } (empty participantId disables SDS-R). Returns the +// context handle, or NULL on failure; the callback also fires on completion. +void* sds_create(const uint8_t* reqCbor, size_t reqCborLen, SdsCallBack callback, void* userData); + +// Tear down the context created by sds_create. Blocks until the worker and +// watchdog threads have joined. +int sds_destroy(void* ctx); -// Create a context + ReliabilityManager. configJson: {"participantId":"..."} -// (empty participantId disables SDS-R). Returns the context handle, or NULL on -// failure. The callback also fires on async completion. -void* sds_create(const char* configJson, SdsCallBack callback, void* userData); +// --- Events ---------------------------------------------------------------- +// Subscribe `callback` to an event by wire name and receive a stable listener +// id (non-zero). Event wire names: "message_ready", "message_sent", +// "missing_dependencies", "periodic_sync", "repair_ready". Subscribe to each +// event separately. Payloads arrive as CBOR { eventType, payload }. +uint64_t sds_add_event_listener(void* ctx, const char* eventName, SdsCallBack callback, void* userData); -// Register the event callback (message_ready, message_sent, -// missing_dependencies, periodic_sync, repair_ready). Payloads are JSON. -void sds_set_event_callback(void* ctx, SdsCallBack callback, void* userData); +// Remove a listener by id. Returns 0 on success, non-zero if not found. +int sds_remove_event_listener(void* ctx, uint64_t listenerId); -// Register the retrieval-hint provider used by SDS-R. -int sds_set_retrieval_hint_provider(void* ctx, SdsRetrievalHintProvider callback, void* userData); +// Register the SDS-R retrieval-hint provider. reqCbor encodes +// SdsHintProviderRequest { callbackAddr: uint, userDataAddr: uint } — the +// SdsRetrievalHintProvider function pointer and its user-data as integer +// addresses. +int sds_set_retrieval_hint_provider(void* ctx, SdsCallBack callback, void* userData, const uint8_t* reqCbor, size_t reqCborLen); -// reqJson: {"message":[..bytes..],"messageId":"..","channelId":".."} -// Result JSON: {"message":[..bytes..]} -int sds_wrap_outgoing_message(void* ctx, SdsCallBack callback, void* userData, const char* reqJson); -// reqJson: {"message":[..bytes..]} -// Result JSON: {"message":[..],"channelId":"..","missingDeps":[{"messageId":"..","retrievalHint":""}]} -int sds_unwrap_received_message(void* ctx, SdsCallBack callback, void* userData, const char* reqJson); +// --- Core API Functions ---------------------------------------------------- +// Each takes a CBOR-encoded request buffer; the result is delivered to +// `callback` as CBOR. -// reqJson: {"messageIds":["..",".."],"channelId":".."} -int sds_mark_dependencies_met(void* ctx, SdsCallBack callback, void* userData, const char* reqJson); +// reqCbor: SdsWrapRequest { message: bytes, messageId: tstr, channelId: tstr } +// result: SdsWrapResponse { message: bytes } +int sds_wrap_outgoing_message(void* ctx, SdsCallBack callback, void* userData, const uint8_t* reqCbor, size_t reqCborLen); -int sds_reset(void* ctx, SdsCallBack callback, void* userData); +// reqCbor: SdsUnwrapRequest { message: bytes } +// result: SdsUnwrapResponse { message: bytes, channelId: tstr, +// missingDeps: [{ messageId: tstr, retrievalHint: bytes }] } +int sds_unwrap_received_message(void* ctx, SdsCallBack callback, void* userData, const uint8_t* reqCbor, size_t reqCborLen); -int sds_start_periodic_tasks(void* ctx, SdsCallBack callback, void* userData); +// reqCbor: SdsMarkDependenciesRequest { messageIds: [tstr], channelId: tstr } +int sds_mark_dependencies_met(void* ctx, SdsCallBack callback, void* userData, const uint8_t* reqCbor, size_t reqCborLen); -// Tear down the context created by sds_create. -int sds_destroy(void* ctx, SdsCallBack callback, void* userData); +// reqCbor: empty/unit payload (no fields). +int sds_reset(void* ctx, SdsCallBack callback, void* userData, const uint8_t* reqCbor, size_t reqCborLen); + +// reqCbor: empty/unit payload (no fields). +int sds_start_periodic_tasks(void* ctx, SdsCallBack callback, void* userData, const uint8_t* reqCbor, size_t reqCborLen); #ifdef __cplusplus diff --git a/library/libsds.nim b/library/libsds.nim index a0bead0..5e614e9 100644 --- a/library/libsds.nim +++ b/library/libsds.nim @@ -1,40 +1,35 @@ ## C-compatible FFI wrapper around the SDS ReliabilityManager. ## -## Built on the `nim-ffi` package's high-level macros: `declareLibrary` emits the -## bootstrap + `sds_set_event_callback`; `{.ffiCtor.}`/`{.ffi.}`/`{.ffiDtor.}` -## generate the C entry points, marshalling parameters and return values as JSON. -## Exported C names are snake_case (`sds_wrap_outgoing_message`, …); see -## `library/libsds.h`. The Go bindings (sds-go-bindings) must match this API. +## Built on nim-ffi (v0.2.0+): `declareLibrary` emits the bootstrap plus the +## event-listener ABI (`sds_add_event_listener` / `sds_remove_event_listener`); +## `{.ffiCtor.}`/`{.ffi.}`/`{.ffiDtor.}` generate the C entry points; and +## `{.ffiEvent.}` declares library-initiated events. Requests, responses and +## events are marshalled as CBOR (see library/libsds.h). Exported C names are +## snake_case. The Go bindings (sds-go-bindings) must match this API. ## -## The one exception is `sds_set_retrieval_hint_provider`: it takes a C function -## pointer, which has no sensible JSON representation, so it is hand-written and -## dispatched to the worker thread to store the provider in a thread-local. +## The one hand-written export is `sds_set_retrieval_hint_provider`: it takes a +## C function pointer (no CBOR representation), so it dispatches a request that +## stores the provider in a worker-thread thread-local. -import std/[base64, json, sequtils] +import std/[sequtils] import ffi import sds -import ./events/[ - json_message_ready_event, json_message_sent_event, json_missing_dependencies_event, - json_periodic_sync_event, json_repair_ready_event, -] -# Bootstrap (pragmas, linker flags, libsdsNimMain, initializeLibrary) plus the -# `sds_set_event_callback(ctx, callback, userData)` C export. +# Bootstrap + sds_add_event_listener / sds_remove_event_listener. declareLibrary("sds", ReliabilityManager) type SdsRetrievalHintProvider* = proc( messageId: cstring, hint: ptr cstring, hintLen: ptr csize_t, userData: pointer ) {.cdecl, gcsafe, raises: [].} -# The active retrieval-hint provider, stored per worker thread (one thread per -# context). Set by sds_set_retrieval_hint_provider via a dispatched request so -# the write lands on the worker thread, where the manager's hint closure reads -# it during message processing. +# Active retrieval-hint provider, per worker thread (one thread per context). +# Set by sds_set_retrieval_hint_provider through a dispatched request so the +# write lands on the worker thread, where the manager's hint closure reads it. var sdsRetrievalHintCb {.threadvar.}: pointer var sdsRetrievalHintUserData {.threadvar.}: pointer ################################################################################ -### JSON-marshalled request/response types +### CBOR-marshalled request/response types type SdsConfig* {.ffi.} = object participantId: string ## empty disables SDS-R (see newReliabilityManager) @@ -50,16 +45,59 @@ type SdsWrapResponse* {.ffi.} = object type SdsUnwrapRequest* {.ffi.} = object message: seq[byte] +type SdsMissingDep* {.ffi.} = object + messageId: string + retrievalHint: seq[byte] + +type SdsUnwrapResponse* {.ffi.} = object + message: seq[byte] + channelId: string + missingDeps: seq[SdsMissingDep] + type SdsMarkDependenciesRequest* {.ffi.} = object messageIds: seq[string] channelId: string +################################################################################ +### Library-initiated events +### +### Each {.ffiEvent.} proc is an emitter: calling it from a worker-thread +### handler dispatches a CBOR EventEnvelope to every listener subscribed (via +### sds_add_event_listener) to the matching wire name. + +type SdsMessageReadyPayload* {.ffi.} = object + messageId: string + channelId: string + +type SdsMessageSentPayload* {.ffi.} = object + messageId: string + channelId: string + +type SdsMissingDependenciesPayload* {.ffi.} = object + messageId: string + channelId: string + missingDeps: seq[SdsMissingDep] + +type SdsPeriodicSyncPayload* {.ffi.} = object + placeholder: bool ## events need a payload type; periodic sync carries no data + +type SdsRepairReadyPayload* {.ffi.} = object + message: seq[byte] + channelId: string + +proc emitMessageReady*(p: SdsMessageReadyPayload) {.ffiEvent: "message_ready".} +proc emitMessageSent*(p: SdsMessageSentPayload) {.ffiEvent: "message_sent".} +proc emitMissingDependencies*( + p: SdsMissingDependenciesPayload +) {.ffiEvent: "missing_dependencies".} +proc emitPeriodicSync*(p: SdsPeriodicSyncPayload) {.ffiEvent: "periodic_sync".} +proc emitRepairReady*(p: SdsRepairReadyPayload) {.ffiEvent: "repair_ready".} + ################################################################################ ### Constructor — creates the FFI context and the ReliabilityManager. ### -### The AppCallbacks closures run on the worker thread and forward events to the -### C callback registered via sds_set_event_callback (dispatchFfiEvent reads the -### per-thread callback state, so no context handle is needed here). +### The AppCallbacks closures run on the worker thread; they build typed +### payloads and fire the {.ffiEvent.} emitters, which reach the C listeners. proc sdsCreate*( config: SdsConfig @@ -71,28 +109,39 @@ proc sdsCreate*( let messageReadyCb = proc( messageId: SdsMessageID, channelId: SdsChannelID ) {.gcsafe.} = - dispatchFfiEvent("message_ready"): - $JsonMessageReadyEvent.new(messageId, channelId) + {.cast(gcsafe).}: + emitMessageReady( + SdsMessageReadyPayload(messageId: $messageId, channelId: $channelId) + ) let messageSentCb = proc( messageId: SdsMessageID, channelId: SdsChannelID ) {.gcsafe.} = - dispatchFfiEvent("message_sent"): - $JsonMessageSentEvent.new(messageId, channelId) + {.cast(gcsafe).}: + emitMessageSent( + SdsMessageSentPayload(messageId: $messageId, channelId: $channelId) + ) let missingDependenciesCb = proc( messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID ) {.gcsafe.} = - dispatchFfiEvent("missing_dependencies"): - $JsonMissingDependenciesEvent.new(messageId, missingDeps, channelId) + {.cast(gcsafe).}: + let deps = missingDeps.mapIt( + SdsMissingDep(messageId: $it.messageId, retrievalHint: it.retrievalHint) + ) + emitMissingDependencies( + SdsMissingDependenciesPayload( + messageId: $messageId, channelId: $channelId, missingDeps: deps + ) + ) let periodicSyncCb = proc() {.gcsafe.} = - dispatchFfiEvent("periodic_sync"): - $JsonPeriodicSyncEvent.new() + {.cast(gcsafe).}: + emitPeriodicSync(SdsPeriodicSyncPayload(placeholder: false)) let repairReadyCb = proc(message: seq[byte], channelId: SdsChannelID) {.gcsafe.} = - dispatchFfiEvent("repair_ready"): - $JsonRepairReadyEvent.new(message, channelId) + {.cast(gcsafe).}: + emitRepairReady(SdsRepairReadyPayload(message: message, channelId: $channelId)) let retrievalHintProvider = proc(messageId: SdsMessageID): seq[byte] {.gcsafe.} = if sdsRetrievalHintCb.isNil(): @@ -133,26 +182,18 @@ proc sdsWrapOutgoingMessage*( proc sdsUnwrapReceivedMessage*( rm: ReliabilityManager, req: SdsUnwrapRequest -): Future[Result[string, string]] {.ffi.} = - # The response carries nested objects (missingDeps) which the framework's - # object serializer cannot emit, so the JSON is built by hand and returned as - # a string. Shape matches the legacy unwrap response. +): Future[Result[SdsUnwrapResponse, string]] {.ffi.} = let (unwrapped, missingDeps, channelId) = ( await unwrapReceivedMessage(rm, req.message) ).valueOr: return err("error processing unwrap request: " & $error) - var node = newJObject() - node["message"] = %*unwrapped - node["channelId"] = %*channelId - var missingDepsNode = newJArray() - for dep in missingDeps: - var depNode = newJObject() - depNode["messageId"] = %*dep.messageId - depNode["retrievalHint"] = %*encode(dep.retrievalHint) - missingDepsNode.add(depNode) - node["missingDeps"] = missingDepsNode - return ok($node) + let deps = missingDeps.mapIt( + SdsMissingDep(messageId: $it.messageId, retrievalHint: it.retrievalHint) + ) + return ok( + SdsUnwrapResponse(message: unwrapped, channelId: $channelId, missingDeps: deps) + ) proc sdsMarkDependenciesMet*( rm: ReliabilityManager, req: SdsMarkDependenciesRequest @@ -185,44 +226,27 @@ proc sdsDestroy*(rm: ReliabilityManager) {.ffiDtor.} = discard ################################################################################ -### Retrieval-hint provider (hand-written: a C function pointer cannot be passed -### as JSON). The setter dispatches a request so the provider is stored in the -### worker thread's thread-local, where sdsCreate's hint closure reads it. +### Retrieval-hint provider. +### +### The provider is a C function pointer, which has no CBOR representation, so +### it is passed as integer addresses. The body runs on the worker thread (the +### empty await forces the async path) and stores the pointers in the +### thread-local that sdsCreate's hint closure reads. The caller passes the +### function pointer and user-data as uint64 addresses. -proc sdsNoopCallback( - callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer -) {.cdecl, gcsafe, raises: [].} = - discard +type SdsHintProviderRequest* {.ffi.} = object + callbackAddr: uint64 + userDataAddr: uint64 -registerReqFFI(SdsSetHintReq, ctx: ptr FFIContext[ReliabilityManager]): - proc(cbPtr: pointer, udPtr: pointer): Future[Result[string, string]] {.async.} = - sdsRetrievalHintCb = cbPtr - sdsRetrievalHintUserData = udPtr - return ok("") - -proc sds_set_retrieval_hint_provider( - ctx: ptr FFIContext[ReliabilityManager], - callback: SdsRetrievalHintProvider, - userData: pointer, -): cint {.dynlib, exportc, cdecl, raises: [].} = - initializeLibrary() - if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)): - return RET_ERR - - let sendRes = - try: - ffi_context.sendRequestToFFIThread( - ctx, - SdsSetHintReq.ffiNewReq( - sdsNoopCallback, nil, cast[pointer](callback), userData - ), - ) - except Exception as exc: - Result[void, string].err("sendRequestToFFIThread exception: " & exc.msg) - if sendRes.isErr(): - return RET_ERR - return RET_OK +proc sdsSetRetrievalHintProvider*( + rm: ReliabilityManager, req: SdsHintProviderRequest +): Future[Result[string, string]] {.ffi.} = + discard rm + await sleepAsync(chronos.milliseconds(0)) + sdsRetrievalHintCb = cast[pointer](req.callbackAddr) + sdsRetrievalHintUserData = cast[pointer](req.userDataAddr) + return ok("") # Emit binding metadata (no-op unless -d:ffiGenBindings). Must follow every -# {.ffi.}/{.ffiCtor.}/{.ffiDtor.} annotation. +# {.ffi.}/{.ffiCtor.}/{.ffiDtor.}/{.ffiEvent.} annotation. genBindings() diff --git a/nimble.lock b/nimble.lock index ad7d9e3..a014fc3 100644 --- a/nimble.lock +++ b/nimble.lock @@ -315,17 +315,32 @@ } }, "ffi": { - "version": "0.1.4", - "vcsRevision": "fb25f069d2dfae2b543d79d2c1a81f197de22a2b", + "version": "0.2.0", + "vcsRevision": "f96a5b158add9c321e33ff804f2275a5314a501b", "url": "https://github.com/logos-messaging/nim-ffi", "downloadMethod": "git", "dependencies": [ "chronos", "chronicles", - "taskpools" + "taskpools", + "cbor_serialization" ], "checksums": { - "sha1": "4a5d4020a40106fa2a698d5fe975b9a8ba961f91" + "sha1": "46fad75cc79b7ae6eabfadb8fc3e68764dde89a5" + } + }, + "cbor_serialization": { + "version": "0.3.0", + "vcsRevision": "1664160e04d153573373afddc552b9cbf6fbe4dc", + "url": "https://github.com/vacp2p/nim-cbor-serialization", + "downloadMethod": "git", + "dependencies": [ + "serialization", + "stew", + "results" + ], + "checksums": { + "sha1": "ab126eae09a6e39c72972a6a0b83cb06a2ffe8f0" } } }, diff --git a/nix/deps.nix b/nix/deps.nix index a73c829..f312dd6 100644 --- a/nix/deps.nix +++ b/nix/deps.nix @@ -166,8 +166,15 @@ ffi = pkgs.fetchgit { url = "https://github.com/logos-messaging/nim-ffi"; - rev = "fb25f069d2dfae2b543d79d2c1a81f197de22a2b"; - sha256 = "0zkjnrm2yjlw27q99kv2x8ll61mbz4nr0cvmyq0csydh43c08k0p"; + rev = "f96a5b158add9c321e33ff804f2275a5314a501b"; + sha256 = "1hcv1k3c18rhg2nrndld2b6xx23nfnlcfkm0bidqha4by4hzn9lr"; + fetchSubmodules = true; + }; + + cbor_serialization = pkgs.fetchgit { + url = "https://github.com/vacp2p/nim-cbor-serialization"; + rev = "1664160e04d153573373afddc552b9cbf6fbe4dc"; + sha256 = "0c1rj4fk0fcqvsf0yqhxvm8h10aww75gi4yfsjhlczh88ypywii2"; fetchSubmodules = true; }; diff --git a/sds.nimble b/sds.nimble index 25f65b4..d0a7b9a 100644 --- a/sds.nimble +++ b/sds.nimble @@ -16,7 +16,7 @@ requires "stew" requires "stint" requires "metrics" requires "results" -requires "https://github.com/logos-messaging/nim-ffi >= 0.1.4" +requires "https://github.com/logos-messaging/nim-ffi#f96a5b158add9c321e33ff804f2275a5314a501b" proc buildLibrary( outLibNameAndExt: string,