diff --git a/library/libsds.h b/library/libsds.h index 0d9840e..3c5d982 100644 --- a/library/libsds.h +++ b/library/libsds.h @@ -1,8 +1,11 @@ -// Generated manually and inspired by the one generated by the Nim Compiler. -// In order to see the header file generated by Nim just run `make libsds` -// from the root repo folder and the header should be created in -// nimcache/release/libsds/libsds.h +// C API for libsds, built on the nim-ffi framework (v0.2.0+). +// +// Requests, responses and events are marshalled as CBOR. Request payloads are +// passed as a (reqCbor, reqCborLen) byte buffer; results and events are +// delivered to the callback as a CBOR buffer (msg, len). Each request/response +// struct and event payload is defined in library/libsds.nim. Events are +// wrapped in a CBOR envelope { eventType: , payload: }. #ifndef __libsds__ #define __libsds__ @@ -18,51 +21,71 @@ extern "C" { #endif +// Result/event callback. `msg` is the CBOR payload of length `len`. +// callerRet is one of the RET_* codes above. typedef void (*SdsCallBack) (int callerRet, const char* msg, size_t len, void* userData); +// Synchronous provider invoked by SDS-R to fetch a retrieval hint for a +// message id. The implementation allocates `*hint` (and sets `*hintLen`); the +// library takes ownership and frees it with deallocShared. Registered via +// sds_set_retrieval_hint_provider (see below). typedef void (*SdsRetrievalHintProvider) (const char* messageId, char** hint, size_t* hintLen, void* userData); -// --- Core API Functions --- +// --- Lifecycle ------------------------------------------------------------- + +// Create a context + ReliabilityManager. reqCbor encodes SdsConfig +// { participantId: tstr } (empty participantId disables SDS-R). Returns the +// context handle, or NULL on failure; the callback also fires on completion. +void* sds_create(const uint8_t* reqCbor, size_t reqCborLen, SdsCallBack callback, void* userData); + +// Tear down the context created by sds_create. Blocks until the worker and +// watchdog threads have joined. +int sds_destroy(void* ctx); -void* SdsNewReliabilityManager(SdsCallBack callback, void* userData); +// --- Events ---------------------------------------------------------------- +// Subscribe `callback` to an event by wire name and receive a stable listener +// id (non-zero). Event wire names: "message_ready", "message_sent", +// "missing_dependencies", "periodic_sync", "repair_ready". Subscribe to each +// event separately. Payloads arrive as CBOR { eventType, payload }. +uint64_t sds_add_event_listener(void* ctx, const char* eventName, SdsCallBack callback, void* userData); -void SdsSetEventCallback(void* ctx, SdsCallBack callback, void* userData); +// Remove a listener by id. Returns 0 on success, non-zero if not found. +int sds_remove_event_listener(void* ctx, uint64_t listenerId); -void SdsSetRetrievalHintProvider(void* ctx, SdsRetrievalHintProvider callback, void* userData); +// Register the SDS-R retrieval-hint provider. reqCbor encodes +// SdsHintProviderRequest { callbackAddr: uint, userDataAddr: uint } — the +// SdsRetrievalHintProvider function pointer and its user-data as integer +// addresses. +int sds_set_retrieval_hint_provider(void* ctx, SdsCallBack callback, void* userData, const uint8_t* reqCbor, size_t reqCborLen); -int SdsCleanupReliabilityManager(void* ctx, SdsCallBack callback, void* userData); -int SdsResetReliabilityManager(void* ctx, SdsCallBack callback, void* userData); +// --- Core API Functions ---------------------------------------------------- +// Each takes a CBOR-encoded request buffer; the result is delivered to +// `callback` as CBOR. -int SdsWrapOutgoingMessage(void* ctx, - void* message, - size_t messageLen, - const char* messageId, - const char* channelId, - SdsCallBack callback, - void* userData); +// reqCbor: SdsWrapRequest { message: bytes, messageId: tstr, channelId: tstr } +// result: SdsWrapResponse { message: bytes } +int sds_wrap_outgoing_message(void* ctx, SdsCallBack callback, void* userData, const uint8_t* reqCbor, size_t reqCborLen); -int SdsUnwrapReceivedMessage(void* ctx, - void* message, - size_t messageLen, - SdsCallBack callback, - void* userData); +// reqCbor: SdsUnwrapRequest { message: bytes } +// result: SdsUnwrapResponse { message: bytes, channelId: tstr, +// missingDeps: [{ messageId: tstr, retrievalHint: bytes }] } +int sds_unwrap_received_message(void* ctx, SdsCallBack callback, void* userData, const uint8_t* reqCbor, size_t reqCborLen); -int SdsMarkDependenciesMet(void* ctx, - char** messageIDs, - size_t count, - const char* channelId, - SdsCallBack callback, - void* userData); +// reqCbor: SdsMarkDependenciesRequest { messageIds: [tstr], channelId: tstr } +int sds_mark_dependencies_met(void* ctx, SdsCallBack callback, void* userData, const uint8_t* reqCbor, size_t reqCborLen); -int SdsStartPeriodicTasks(void* ctx, SdsCallBack callback, void* userData); +// reqCbor: empty/unit payload (no fields). +int sds_reset(void* ctx, SdsCallBack callback, void* userData, const uint8_t* reqCbor, size_t reqCborLen); +// reqCbor: empty/unit payload (no fields). +int sds_start_periodic_tasks(void* ctx, SdsCallBack callback, void* userData, const uint8_t* reqCbor, size_t reqCborLen); #ifdef __cplusplus } #endif -#endif /* __libsds__ */ \ No newline at end of file +#endif /* __libsds__ */ diff --git a/library/libsds.nim b/library/libsds.nim index c7f41e9..5e614e9 100644 --- a/library/libsds.nim +++ b/library/libsds.nim @@ -1,469 +1,252 @@ -import std/[strutils, sequtils, json, base64, locks] +## C-compatible FFI wrapper around the SDS ReliabilityManager. +## +## Built on nim-ffi (v0.2.0+): `declareLibrary` emits the bootstrap plus the +## event-listener ABI (`sds_add_event_listener` / `sds_remove_event_listener`); +## `{.ffiCtor.}`/`{.ffi.}`/`{.ffiDtor.}` generate the C entry points; and +## `{.ffiEvent.}` declares library-initiated events. Requests, responses and +## events are marshalled as CBOR (see library/libsds.h). Exported C names are +## snake_case. The Go bindings (sds-go-bindings) must match this API. +## +## The one hand-written export is `sds_set_retrieval_hint_provider`: it takes a +## C function pointer (no CBOR representation), so it dispatches a request that +## stores the provider in a worker-thread thread-local. + +import std/[sequtils] import ffi import sds -import ./events/[ - json_message_ready_event, json_message_sent_event, json_missing_dependencies_event, - json_periodic_sync_event, json_repair_ready_event, -] -# Emit the library bootstrap: the {.exported.}/{.callback.} pragmas, the -# `-fPIC`/soname linker flags, the `libsdsNimMain` import and the -# `initializeLibrary()` proc the exported entry points call on every hop. -declareLibraryBase("sds") - -# C callback typedefs (mirrors libsds.h). `SdsCallBack` is structurally the -# nim-ffi `FFICallBack`; the alias keeps the exported signatures readable. -type SdsCallBack* = FFICallBack +# Bootstrap + sds_add_event_listener / sds_remove_event_listener. +declareLibrary("sds", ReliabilityManager) type SdsRetrievalHintProvider* = proc( messageId: cstring, hint: ptr cstring, hintLen: ptr csize_t, userData: pointer ) {.cdecl, gcsafe, raises: [].} -# One pool per library type; the macros that would normally declare it -# (ffiCtor/ffiDtor) are not used here because we hand-write the entry points -# to preserve the exact C ABI, so we declare it explicitly. -var ReliabilityManagerFFIPool: FFIContextPool[ReliabilityManager] - -# registerReqFFI inspects each request field's type via `$node`, which only -# handles plain identifiers — a bracketed `SharedSeq[byte]` makes it choke. The -# aliases give the generated request structs non-bracketed field types. -type - SdsSharedBytes = SharedSeq[byte] - SdsSharedCstrs = SharedSeq[cstring] +# Active retrieval-hint provider, per worker thread (one thread per context). +# Set by sds_set_retrieval_hint_provider through a dispatched request so the +# write lands on the worker thread, where the manager's hint closure reads it. +var sdsRetrievalHintCb {.threadvar.}: pointer +var sdsRetrievalHintUserData {.threadvar.}: pointer ################################################################################ -### Retrieval-hint provider registry -### -### The retrieval-hint provider is a synchronous request/response callback -### (the C side returns bytes inline), so it does not fit the fire-and-forget -### event model. nim-ffi's FFIContext has no slot for it, so we keep a small -### per-context registry here. A fixed array of plain (non-GC) records keeps -### the lookup callable from the {.gcsafe.} hint closure running on the FFI -### thread. +### CBOR-marshalled request/response types -type RetrievalHintSlot = object - ctx: pointer - cb: pointer - userData: pointer +type SdsConfig* {.ffi.} = object + participantId: string ## empty disables SDS-R (see newReliabilityManager) -var retrievalHintSlots: array[MaxFFIContexts, RetrievalHintSlot] -var retrievalHintsLock: Lock -retrievalHintsLock.initLock() +type SdsWrapRequest* {.ffi.} = object + message: seq[byte] + messageId: string + channelId: string -proc setRetrievalHint(ctx: pointer, cb: pointer, userData: pointer) = - withLock retrievalHintsLock: - var free = -1 - for i in 0 ..< MaxFFIContexts: - if retrievalHintSlots[i].ctx == ctx: - retrievalHintSlots[i] = RetrievalHintSlot(ctx: ctx, cb: cb, userData: userData) - return - if free < 0 and retrievalHintSlots[i].ctx.isNil: - free = i - if free >= 0: - retrievalHintSlots[free] = RetrievalHintSlot(ctx: ctx, cb: cb, userData: userData) +type SdsWrapResponse* {.ffi.} = object + message: seq[byte] -proc getRetrievalHint(ctx: pointer): tuple[cb: pointer, userData: pointer] {.gcsafe.} = - withLock retrievalHintsLock: - for i in 0 ..< MaxFFIContexts: - if retrievalHintSlots[i].ctx == ctx: - return (retrievalHintSlots[i].cb, retrievalHintSlots[i].userData) - return (nil, nil) +type SdsUnwrapRequest* {.ffi.} = object + message: seq[byte] -proc clearRetrievalHint(ctx: pointer) = - withLock retrievalHintsLock: - for i in 0 ..< MaxFFIContexts: - if retrievalHintSlots[i].ctx == ctx: - retrievalHintSlots[i] = RetrievalHintSlot() - return +type SdsMissingDep* {.ffi.} = object + messageId: string + retrievalHint: seq[byte] + +type SdsUnwrapResponse* {.ffi.} = object + message: seq[byte] + channelId: string + missingDeps: seq[SdsMissingDep] + +type SdsMarkDependenciesRequest* {.ffi.} = object + messageIds: seq[string] + channelId: string ################################################################################ -### Shared-memory copy helpers +### Library-initiated events ### -### Request payloads carrying binary/pointer data must be deep-copied into -### shared memory on the caller thread, because the FFI thread acks receipt -### before it reads the payload — the caller may free its buffer in between. -### cstring fields are deep-copied by the generated `ffiNewReq`; raw byte and -### `char**` arrays are not, so we copy them here. +### Each {.ffiEvent.} proc is an emitter: calling it from a worker-thread +### handler dispatches a CBOR EventEnvelope to every listener subscribed (via +### sds_add_event_listener) to the matching wire name. -proc copyToSharedSeqByte(p: pointer, len: int): SharedSeq[byte] = - if p.isNil or len <= 0: - return (cast[ptr UncheckedArray[byte]](nil), 0) - let data = allocShared(len) - copyMem(data, p, len) - return (cast[ptr UncheckedArray[byte]](data), len) +type SdsMessageReadyPayload* {.ffi.} = object + messageId: string + channelId: string -proc copyToSharedSeqCstr(p: pointer, count: int): SharedSeq[cstring] = - if p.isNil or count <= 0: - return (cast[ptr UncheckedArray[cstring]](nil), 0) - let data = cast[ptr UncheckedArray[cstring]](allocShared(sizeof(cstring) * count)) - let src = cast[ptr UncheckedArray[cstring]](p) - for i in 0 ..< count: - data[i] = src[i].alloc() - return (data, count) +type SdsMessageSentPayload* {.ffi.} = object + messageId: string + channelId: string -proc freeSharedSeqCstr(s: var SharedSeq[cstring]) = - if not s.data.isNil(): - for i in 0 ..< s.len: - if not s.data[i].isNil: - deallocShared(s.data[i]) - deallocShared(s.data) - s.len = 0 +type SdsMissingDependenciesPayload* {.ffi.} = object + messageId: string + channelId: string + missingDeps: seq[SdsMissingDep] + +type SdsPeriodicSyncPayload* {.ffi.} = object + placeholder: bool ## events need a payload type; periodic sync carries no data + +type SdsRepairReadyPayload* {.ffi.} = object + message: seq[byte] + channelId: string + +proc emitMessageReady*(p: SdsMessageReadyPayload) {.ffiEvent: "message_ready".} +proc emitMessageSent*(p: SdsMessageSentPayload) {.ffiEvent: "message_sent".} +proc emitMissingDependencies*( + p: SdsMissingDependenciesPayload +) {.ffiEvent: "missing_dependencies".} +proc emitPeriodicSync*(p: SdsPeriodicSyncPayload) {.ffiEvent: "periodic_sync".} +proc emitRepairReady*(p: SdsRepairReadyPayload) {.ffiEvent: "repair_ready".} ################################################################################ -### Event callbacks +### Constructor — creates the FFI context and the ReliabilityManager. ### -### These build the AppCallbacks closures handed to the ReliabilityManager. -### They run on the FFI worker thread and forward JSON event payloads to the -### C callback registered via SdsSetEventCallback (stored on the context). +### The AppCallbacks closures run on the worker thread; they build typed +### payloads and fire the {.ffiEvent.} emitters, which reach the C listeners. -proc onMessageReady(ctx: ptr FFIContext[ReliabilityManager]): MessageReadyCallback = - return proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = - callEventCallback(ctx, "onMessageReady"): - $JsonMessageReadyEvent.new(messageId, channelId) +proc sdsCreate*( + config: SdsConfig +): Future[Result[ReliabilityManager, string]] {.ffiCtor.} = + let rm = newReliabilityManager(participantId = config.participantId.SdsParticipantID).valueOr: + error "Failed creating reliability manager", error = error + return err("Failed creating reliability manager: " & $error) -proc onMessageSent(ctx: ptr FFIContext[ReliabilityManager]): MessageSentCallback = - return proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = - callEventCallback(ctx, "onMessageSent"): - $JsonMessageSentEvent.new(messageId, channelId) + let messageReadyCb = proc( + messageId: SdsMessageID, channelId: SdsChannelID + ) {.gcsafe.} = + {.cast(gcsafe).}: + emitMessageReady( + SdsMessageReadyPayload(messageId: $messageId, channelId: $channelId) + ) -proc onMissingDependencies( - ctx: ptr FFIContext[ReliabilityManager] -): MissingDependenciesCallback = - return proc( + let messageSentCb = proc( + messageId: SdsMessageID, channelId: SdsChannelID + ) {.gcsafe.} = + {.cast(gcsafe).}: + emitMessageSent( + SdsMessageSentPayload(messageId: $messageId, channelId: $channelId) + ) + + let missingDependenciesCb = proc( messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID ) {.gcsafe.} = - callEventCallback(ctx, "onMissingDependencies"): - $JsonMissingDependenciesEvent.new(messageId, missingDeps, channelId) + {.cast(gcsafe).}: + let deps = missingDeps.mapIt( + SdsMissingDep(messageId: $it.messageId, retrievalHint: it.retrievalHint) + ) + emitMissingDependencies( + SdsMissingDependenciesPayload( + messageId: $messageId, channelId: $channelId, missingDeps: deps + ) + ) -proc onPeriodicSync(ctx: ptr FFIContext[ReliabilityManager]): PeriodicSyncCallback = - return proc() {.gcsafe.} = - callEventCallback(ctx, "onPeriodicSync"): - $JsonPeriodicSyncEvent.new() + let periodicSyncCb = proc() {.gcsafe.} = + {.cast(gcsafe).}: + emitPeriodicSync(SdsPeriodicSyncPayload(placeholder: false)) -proc onRepairReady(ctx: ptr FFIContext[ReliabilityManager]): RepairReadyCallback = - return proc(message: seq[byte], channelId: SdsChannelID) {.gcsafe.} = - callEventCallback(ctx, "onRepairReady"): - $JsonRepairReadyEvent.new(message, channelId) + let repairReadyCb = proc(message: seq[byte], channelId: SdsChannelID) {.gcsafe.} = + {.cast(gcsafe).}: + emitRepairReady(SdsRepairReadyPayload(message: message, channelId: $channelId)) -proc onRetrievalHint(ctx: ptr FFIContext[ReliabilityManager]): RetrievalHintProvider = - return proc(messageId: SdsMessageID): seq[byte] {.gcsafe.} = - let (cb, userData) = getRetrievalHint(cast[pointer](ctx)) - if cb.isNil(): + let retrievalHintProvider = proc(messageId: SdsMessageID): seq[byte] {.gcsafe.} = + if sdsRetrievalHintCb.isNil(): return @[] - var hint: cstring var hintLen: csize_t - cast[SdsRetrievalHintProvider](cb)( - messageId.cstring, addr hint, addr hintLen, userData + cast[SdsRetrievalHintProvider](sdsRetrievalHintCb)( + messageId.cstring, addr hint, addr hintLen, sdsRetrievalHintUserData ) - if not hint.isNil() and hintLen > 0: var hintBytes = newSeq[byte](hintLen) copyMem(addr hintBytes[0], hint, hintLen) deallocShared(hint) return hintBytes - return @[] + await rm.setCallbacks( + messageReadyCb, messageSentCb, missingDependenciesCb, periodicSyncCb, + retrievalHintProvider, repairReadyCb, + ) + + return ok(rm) + ################################################################################ -### Request handlers (executed on the FFI worker thread) +### Async methods — each runs its body on the worker thread. -registerReqFFI(SdsCreateRmReq, ctx: ptr FFIContext[ReliabilityManager]): - proc(): Future[Result[string, string]] {.async.} = - # TODO: thread `participantId` through SdsNewReliabilityManager FFI input - # and remove this hardcoded "". Empty id silently disables SDS-R; this is - # acceptable as a temporary FFI-only fallback until sds-go-bindings and - # logos-delivery's C-side caller are updated to supply the identity. - let rm = newReliabilityManager(participantId = "".SdsParticipantID).valueOr: - error "Failed creating reliability manager", error = error - return err("Failed creating reliability manager: " & $error) - - await rm.setCallbacks( - onMessageReady(ctx), onMessageSent(ctx), onMissingDependencies(ctx), - onPeriodicSync(ctx), onRetrievalHint(ctx), onRepairReady(ctx), +proc sdsWrapOutgoingMessage*( + rm: ReliabilityManager, req: SdsWrapRequest +): Future[Result[SdsWrapResponse, string]] {.ffi.} = + let wrapped = ( + await wrapOutgoingMessage( + rm, req.message, req.messageId.SdsMessageID, req.channelId.SdsChannelID ) + ).valueOr: + error "WRAP_MESSAGE failed", error = error + return err("error processing wrap request: " & $error) + return ok(SdsWrapResponse(message: wrapped)) - ctx.myLib[] = rm - return ok("") +proc sdsUnwrapReceivedMessage*( + rm: ReliabilityManager, req: SdsUnwrapRequest +): Future[Result[SdsUnwrapResponse, string]] {.ffi.} = + let (unwrapped, missingDeps, channelId) = ( + await unwrapReceivedMessage(rm, req.message) + ).valueOr: + return err("error processing unwrap request: " & $error) -registerReqFFI(SdsResetRmReq, ctx: ptr FFIContext[ReliabilityManager]): - proc(): Future[Result[string, string]] {.async.} = - (await resetReliabilityManager(ctx.myLib[])).isOkOr: - error "RESET_RELIABILITY_MANAGER failed", error = error - return err("error processing RESET_RELIABILITY_MANAGER request: " & $error) - return ok("") - -registerReqFFI(SdsStartPeriodicTasksReq, ctx: ptr FFIContext[ReliabilityManager]): - proc(): Future[Result[string, string]] {.async.} = - ctx.myLib[].startPeriodicTasks() - return ok("") - -registerReqFFI(SdsWrapMessageReq, ctx: ptr FFIContext[ReliabilityManager]): - proc( - message: SdsSharedBytes, messageId: cstring, channelId: cstring - ): Future[Result[string, string]] {.async.} = - var msg = message - defer: - deallocSharedSeq(msg) - - let wrappedMessage = ( - await wrapOutgoingMessage(ctx.myLib[], message.toSeq(), $messageId, $channelId) - ).valueOr: - error "WRAP_MESSAGE failed", error = error - return err("error processing WRAP_MESSAGE request: " & $error) - - # returns a comma-separated string of bytes - return ok(wrappedMessage.mapIt($it).join(",")) - -registerReqFFI(SdsUnwrapMessageReq, ctx: ptr FFIContext[ReliabilityManager]): - proc(message: SdsSharedBytes): Future[Result[string, string]] {.async.} = - var msg = message - defer: - deallocSharedSeq(msg) - - let (unwrappedMessage, missingDeps, extractedChannelId) = ( - await unwrapReceivedMessage(ctx.myLib[], message.toSeq()) - ).valueOr: - return err("error processing UNWRAP_MESSAGE request: " & $error) - - # return the result as a json string - var node = newJObject() - node["message"] = %*unwrappedMessage - node["channelId"] = %*extractedChannelId - var missingDepsNode = newJArray() - for dep in missingDeps: - var depNode = newJObject() - depNode["messageId"] = %*dep.messageId - depNode["retrievalHint"] = %*encode(dep.retrievalHint) - missingDepsNode.add(depNode) - node["missingDeps"] = missingDepsNode - return ok($node) - -registerReqFFI(SdsMarkDepsReq, ctx: ptr FFIContext[ReliabilityManager]): - proc( - messageIds: SdsSharedCstrs, channelId: cstring - ): Future[Result[string, string]] {.async.} = - var ids = messageIds - defer: - freeSharedSeqCstr(ids) - - let messageIdSeq = ids.toSeq().mapIt($it) - (await markDependenciesMet(ctx.myLib[], messageIdSeq, $channelId)).isOkOr: - error "MARK_DEPENDENCIES_MET failed", error = error - return err("error processing MARK_DEPENDENCIES_MET request: " & $error) - return ok("") - -################################################################################ -### Dispatch helper -### -### Sends a request to the FFI worker thread and returns RET_OK/RET_ERR, -### reporting any failure through the callback. The try/except keeps the -### exported entry points `raises: []` (sendRequestToFFIThread can raise), -### which `processReq` alone would not guarantee. - -template dispatchReq( - ctx: untyped, callback: FFICallBack, userData: pointer, reqExpr: untyped -) = - let sendRes = - try: - ffi_context.sendRequestToFFIThread(ctx, reqExpr) - except Exception as exc: - Result[void, string].err("sendRequestToFFIThread exception: " & exc.msg) - if sendRes.isErr(): - let m = "libsds error: " & sendRes.error - callback(RET_ERR, unsafeAddr m[0], cast[csize_t](m.len), userData) - return RET_ERR - return RET_OK - -################################################################################ -### Exported C entry points (called from the application thread) -### -### Signatures must match library/libsds.h exactly. Each one validates the -### context against the pool (rejecting nil/dangling pointers at the boundary), -### checks the callback, deep-copies any pointer payloads into shared memory, -### then dispatches a request to the FFI worker thread. - -proc SdsNewReliabilityManager( - callback: FFICallBack, userData: pointer -): pointer {.dynlib, exportc, cdecl, raises: [].} = - initializeLibrary() - - if isNil(callback): - echo "error: missing callback in SdsNewReliabilityManager" - return nil - - let ctx = ReliabilityManagerFFIPool.createFFIContext().valueOr: - let msg = "Error creating SDS FFI context: " & $error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData) - return nil - - let sendRes = - try: - ffi_context.sendRequestToFFIThread(ctx, SdsCreateRmReq.ffiNewReq(callback, userData)) - except Exception as exc: - Result[void, string].err("sendRequestToFFIThread exception: " & exc.msg) - if sendRes.isErr(): - let msg = "error creating reliability manager: " & sendRes.error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData) - discard ReliabilityManagerFFIPool.destroyFFIContext(ctx) - return nil - - return cast[pointer](ctx) - -proc SdsSetEventCallback( - ctx: ptr FFIContext[ReliabilityManager], callback: FFICallBack, userData: pointer -) {.dynlib, exportc, cdecl, raises: [].} = - initializeLibrary() - if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)): - echo "error: invalid context in SdsSetEventCallback" - return - ctx[].callbackState.callback = cast[pointer](callback) - ctx[].callbackState.userData = userData - -proc SdsSetRetrievalHintProvider( - ctx: ptr FFIContext[ReliabilityManager], - callback: SdsRetrievalHintProvider, - userData: pointer, -) {.dynlib, exportc, cdecl, raises: [].} = - initializeLibrary() - if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)): - echo "error: invalid context in SdsSetRetrievalHintProvider" - return - setRetrievalHint(cast[pointer](ctx), cast[pointer](callback), userData) - -proc SdsCleanupReliabilityManager( - ctx: ptr FFIContext[ReliabilityManager], callback: FFICallBack, userData: pointer -): cint {.dynlib, exportc, cdecl, raises: [].} = - initializeLibrary() - if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)): - return RET_ERR - if isNil(callback): - return RET_MISSING_CALLBACK - - clearRetrievalHint(cast[pointer](ctx)) - - let res = ReliabilityManagerFFIPool.destroyFFIContext(ctx) - if res.isErr(): - let msg = "error cleaning up reliability manager: " & res.error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData) - return RET_ERR - - callback(RET_OK, nil, 0, userData) - return RET_OK - -proc SdsResetReliabilityManager( - ctx: ptr FFIContext[ReliabilityManager], callback: FFICallBack, userData: pointer -): cint {.dynlib, exportc, cdecl, raises: [].} = - initializeLibrary() - if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)): - return RET_ERR - if isNil(callback): - return RET_MISSING_CALLBACK - dispatchReq(ctx, callback, userData, SdsResetRmReq.ffiNewReq(callback, userData)) - -proc SdsWrapOutgoingMessage( - ctx: ptr FFIContext[ReliabilityManager], - message: pointer, - messageLen: csize_t, - messageId: cstring, - channelId: cstring, - callback: FFICallBack, - userData: pointer, -): cint {.dynlib, exportc, cdecl, raises: [].} = - initializeLibrary() - if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)): - return RET_ERR - if isNil(callback): - return RET_MISSING_CALLBACK - - if message == nil and messageLen > 0: - let msg = "libsds error: message pointer is NULL but length > 0" - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData) - return RET_ERR - - if messageId == nil: - let msg = "libsds error: message ID pointer is NULL" - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData) - return RET_ERR - - if channelId == nil: - let msg = "libsds error: channel ID pointer is NULL" - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData) - return RET_ERR - - if $channelId == "": - let msg = "libsds error: channel ID is empty string" - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData) - return RET_ERR - - let sharedMsg = copyToSharedSeqByte(message, messageLen.int) - dispatchReq( - ctx, callback, userData, - SdsWrapMessageReq.ffiNewReq(callback, userData, sharedMsg, messageId, channelId), + let deps = missingDeps.mapIt( + SdsMissingDep(messageId: $it.messageId, retrievalHint: it.retrievalHint) + ) + return ok( + SdsUnwrapResponse(message: unwrapped, channelId: $channelId, missingDeps: deps) ) -proc SdsUnwrapReceivedMessage( - ctx: ptr FFIContext[ReliabilityManager], - message: pointer, - messageLen: csize_t, - callback: FFICallBack, - userData: pointer, -): cint {.dynlib, exportc, cdecl, raises: [].} = - initializeLibrary() - if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)): - return RET_ERR - if isNil(callback): - return RET_MISSING_CALLBACK +proc sdsMarkDependenciesMet*( + rm: ReliabilityManager, req: SdsMarkDependenciesRequest +): Future[Result[string, string]] {.ffi.} = + let messageIds = req.messageIds.mapIt(it.SdsMessageID) + (await markDependenciesMet(rm, messageIds, req.channelId.SdsChannelID)).isOkOr: + error "MARK_DEPENDENCIES_MET failed", error = error + return err("error processing mark-dependencies request: " & $error) + return ok("") - if message == nil and messageLen > 0: - let msg = "libsds error: message pointer is NULL but length > 0" - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData) - return RET_ERR +proc sdsReset*(rm: ReliabilityManager): Future[Result[string, string]] {.ffi.} = + (await resetReliabilityManager(rm)).isOkOr: + error "RESET failed", error = error + return err("error processing reset request: " & $error) + return ok("") - let sharedMsg = copyToSharedSeqByte(message, messageLen.int) - dispatchReq(ctx, callback, userData, SdsUnwrapMessageReq.ffiNewReq(callback, userData, sharedMsg)) +proc sdsStartPeriodicTasks*( + rm: ReliabilityManager +): Future[Result[string, string]] {.ffi.} = + # The empty await forces the macro down its async path so the body runs on the + # worker thread — startPeriodicTasks schedules futures on that thread's loop. + await sleepAsync(chronos.milliseconds(0)) + rm.startPeriodicTasks() + return ok("") -proc SdsMarkDependenciesMet( - ctx: ptr FFIContext[ReliabilityManager], - messageIds: pointer, - count: csize_t, - channelId: cstring, - callback: FFICallBack, - userData: pointer, -): cint {.dynlib, exportc, cdecl, raises: [].} = - initializeLibrary() - if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)): - return RET_ERR - if isNil(callback): - return RET_MISSING_CALLBACK +################################################################################ +### Destructor — runs library cleanup then tears down the FFI context. - if messageIds == nil and count > 0: - let msg = "libsds error: MessageIDs pointer is NULL but count > 0" - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData) - return RET_ERR +proc sdsDestroy*(rm: ReliabilityManager) {.ffiDtor.} = + discard - if channelId == nil: - let msg = "libsds error: channel ID pointer is NULL" - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData) - return RET_ERR +################################################################################ +### Retrieval-hint provider. +### +### The provider is a C function pointer, which has no CBOR representation, so +### it is passed as integer addresses. The body runs on the worker thread (the +### empty await forces the async path) and stores the pointers in the +### thread-local that sdsCreate's hint closure reads. The caller passes the +### function pointer and user-data as uint64 addresses. - if $channelId == "": - let msg = "libsds error: channel ID is empty string" - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData) - return RET_ERR +type SdsHintProviderRequest* {.ffi.} = object + callbackAddr: uint64 + userDataAddr: uint64 - let sharedIds = copyToSharedSeqCstr(messageIds, count.int) - dispatchReq( - ctx, callback, userData, - SdsMarkDepsReq.ffiNewReq(callback, userData, sharedIds, channelId), - ) +proc sdsSetRetrievalHintProvider*( + rm: ReliabilityManager, req: SdsHintProviderRequest +): Future[Result[string, string]] {.ffi.} = + discard rm + await sleepAsync(chronos.milliseconds(0)) + sdsRetrievalHintCb = cast[pointer](req.callbackAddr) + sdsRetrievalHintUserData = cast[pointer](req.userDataAddr) + return ok("") -proc SdsStartPeriodicTasks( - ctx: ptr FFIContext[ReliabilityManager], callback: FFICallBack, userData: pointer -): cint {.dynlib, exportc, cdecl, raises: [].} = - initializeLibrary() - if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)): - return RET_ERR - if isNil(callback): - return RET_MISSING_CALLBACK - dispatchReq(ctx, callback, userData, SdsStartPeriodicTasksReq.ffiNewReq(callback, userData)) +# Emit binding metadata (no-op unless -d:ffiGenBindings). Must follow every +# {.ffi.}/{.ffiCtor.}/{.ffiDtor.}/{.ffiEvent.} annotation. +genBindings() diff --git a/nimble.lock b/nimble.lock index ad7d9e3..a014fc3 100644 --- a/nimble.lock +++ b/nimble.lock @@ -315,17 +315,32 @@ } }, "ffi": { - "version": "0.1.4", - "vcsRevision": "fb25f069d2dfae2b543d79d2c1a81f197de22a2b", + "version": "0.2.0", + "vcsRevision": "f96a5b158add9c321e33ff804f2275a5314a501b", "url": "https://github.com/logos-messaging/nim-ffi", "downloadMethod": "git", "dependencies": [ "chronos", "chronicles", - "taskpools" + "taskpools", + "cbor_serialization" ], "checksums": { - "sha1": "4a5d4020a40106fa2a698d5fe975b9a8ba961f91" + "sha1": "46fad75cc79b7ae6eabfadb8fc3e68764dde89a5" + } + }, + "cbor_serialization": { + "version": "0.3.0", + "vcsRevision": "1664160e04d153573373afddc552b9cbf6fbe4dc", + "url": "https://github.com/vacp2p/nim-cbor-serialization", + "downloadMethod": "git", + "dependencies": [ + "serialization", + "stew", + "results" + ], + "checksums": { + "sha1": "ab126eae09a6e39c72972a6a0b83cb06a2ffe8f0" } } }, diff --git a/nix/deps.nix b/nix/deps.nix index a73c829..f312dd6 100644 --- a/nix/deps.nix +++ b/nix/deps.nix @@ -166,8 +166,15 @@ ffi = pkgs.fetchgit { url = "https://github.com/logos-messaging/nim-ffi"; - rev = "fb25f069d2dfae2b543d79d2c1a81f197de22a2b"; - sha256 = "0zkjnrm2yjlw27q99kv2x8ll61mbz4nr0cvmyq0csydh43c08k0p"; + rev = "f96a5b158add9c321e33ff804f2275a5314a501b"; + sha256 = "1hcv1k3c18rhg2nrndld2b6xx23nfnlcfkm0bidqha4by4hzn9lr"; + fetchSubmodules = true; + }; + + cbor_serialization = pkgs.fetchgit { + url = "https://github.com/vacp2p/nim-cbor-serialization"; + rev = "1664160e04d153573373afddc552b9cbf6fbe4dc"; + sha256 = "0c1rj4fk0fcqvsf0yqhxvm8h10aww75gi4yfsjhlczh88ypywii2"; fetchSubmodules = true; }; diff --git a/sds.nimble b/sds.nimble index 25f65b4..d0a7b9a 100644 --- a/sds.nimble +++ b/sds.nimble @@ -16,7 +16,7 @@ requires "stew" requires "stint" requires "metrics" requires "results" -requires "https://github.com/logos-messaging/nim-ffi >= 0.1.4" +requires "https://github.com/logos-messaging/nim-ffi#f96a5b158add9c321e33ff804f2275a5314a501b" proc buildLibrary( outLibNameAndExt: string,