From 7725beb5e5f7fab54a50cdadf28526d6d490e85d Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Wed, 3 Jun 2026 09:46:52 +0200 Subject: [PATCH] start using nim-ffi 0.1.4 (#74) --- library/alloc.nim | 73 --- library/ffi_types.nim | 34 - library/libsds.nim | 597 ++++++++++-------- .../requests/sds_dependencies_request.nim | 52 -- .../requests/sds_lifecycle_request.nim | 70 -- .../requests/sds_message_request.nim | 89 --- .../sds_thread_request.nim | 78 --- library/sds_thread/sds_thread.nim | 136 ---- nimble.lock | 14 + nix/deps.nix | 7 + sds.nimble | 2 +- 11 files changed, 359 insertions(+), 793 deletions(-) delete mode 100644 library/alloc.nim delete mode 100644 library/ffi_types.nim delete mode 100644 library/sds_thread/inter_thread_communication/requests/sds_dependencies_request.nim delete mode 100644 library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim delete mode 100644 library/sds_thread/inter_thread_communication/requests/sds_message_request.nim delete mode 100644 library/sds_thread/inter_thread_communication/sds_thread_request.nim delete mode 100644 library/sds_thread/sds_thread.nim diff --git a/library/alloc.nim b/library/alloc.nim deleted file mode 100644 index 746d4ef..0000000 --- a/library/alloc.nim +++ /dev/null @@ -1,73 +0,0 @@ -## Can be shared safely between threads -type SharedSeq*[T] = tuple[data: ptr UncheckedArray[T], len: int] - -proc alloc*(str: cstring): cstring = - # Byte allocation from the given address. - # There should be the corresponding manual deallocation with deallocShared ! - if str.isNil(): - var ret = cast[cstring](allocShared(1)) # Allocate memory for the null terminator - ret[0] = '\0' # Set the null terminator - return ret - - let ret = cast[cstring](allocShared(len(str) + 1)) - copyMem(ret, str, len(str) + 1) - return ret - -proc alloc*(str: string): cstring = - ## Byte allocation from the given address. - ## There should be the corresponding manual deallocation with deallocShared ! - var ret = cast[cstring](allocShared(str.len + 1)) - let s = cast[seq[char]](str) - for i in 0 ..< str.len: - ret[i] = s[i] - ret[str.len] = '\0' - return ret - -proc allocSharedSeq*[T](s: seq[T]): SharedSeq[T] = - let data = allocShared(sizeof(T) * s.len) - if s.len != 0: - copyMem(data, unsafeAddr s[0], s.len) - return (cast[ptr UncheckedArray[T]](data), s.len) - -proc deallocSharedSeq*[T](s: var SharedSeq[T]) = - if not s.data.isNil: - when T is cstring: - # For array of cstrings, deallocate each string first - for i in 0 ..< s.len: - if not s.data[i].isNil: - # Deallocate each cstring - deallocShared(s.data[i]) - - deallocShared(s.data) - s.len = 0 - -proc toSeq*[T](s: SharedSeq[T]): seq[T] = - ## Creates a seq[T] from a SharedSeq[T]. No explicit dealloc is required - ## as req[T] is a GC managed type. - var ret = newSeq[T]() - for i in 0 ..< s.len: - ret.add(s.data[i]) - return ret - -proc allocSharedSeqFromCArray*[T](arr: ptr T, len: int): SharedSeq[T] = - ## Creates a SharedSeq[T] from a C array pointer and length. - ## The data is copied to shared memory. - ## There should be a corresponding manual deallocation with deallocSharedSeq! - if arr.isNil or len <= 0: - return (nil, 0) - - when T is cstring: - # Special handling for arrays of cstrings - let data = cast[ptr UncheckedArray[cstring]](allocShared(sizeof(cstring) * len)) - let cstrArr = cast[ptr UncheckedArray[cstring]](arr) - - for i in 0 ..< len: - # Use the existing alloc proc to properly allocate each cstring - data[i] = cstrArr[i].alloc() - - return (data, len) - else: - # Original handling for non-cstring types - let data = allocShared(sizeof(T) * len) - copyMem(data, arr, sizeof(T) * len) - return (cast[ptr UncheckedArray[T]](data), len) diff --git a/library/ffi_types.nim b/library/ffi_types.nim deleted file mode 100644 index dbb531d..0000000 --- a/library/ffi_types.nim +++ /dev/null @@ -1,34 +0,0 @@ -################################################################################ -### Exported types - -type SdsCallBack* = proc( - callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer -) {.cdecl, gcsafe, raises: [].} - -type SdsRetrievalHintProvider* = proc( - messageId: cstring, hint: ptr cstring, hintLen: ptr csize_t, userData: pointer -) {.cdecl, gcsafe, raises: [].} - -const RET_OK*: cint = 0 -const RET_ERR*: cint = 1 -const RET_MISSING_CALLBACK*: cint = 2 - -### End of exported types -################################################################################ - -################################################################################ -### FFI utils - -template foreignThreadGc*(body: untyped) = - when declared(setupForeignThreadGc): - setupForeignThreadGc() - - body - - when declared(tearDownForeignThreadGc): - tearDownForeignThreadGc() - -type onDone* = proc() - -### End of FFI utils -################################################################################ diff --git a/library/libsds.nim b/library/libsds.nim index c5054bd..c7f41e9 100644 --- a/library/libsds.nim +++ b/library/libsds.nim @@ -1,140 +1,163 @@ -{.pragma: exported, exportc, cdecl, raises: [].} -{.pragma: callback, cdecl, raises: [], gcsafe.} -{.passc: "-fPIC".} +import std/[strutils, sequtils, json, base64, locks] +import ffi +import sds +import ./events/[ + json_message_ready_event, json_message_sent_event, json_missing_dependencies_event, + json_periodic_sync_event, json_repair_ready_event, +] -when defined(linux): - {.passl: "-Wl,-soname,libsds.so".} +# Emit the library bootstrap: the {.exported.}/{.callback.} pragmas, the +# `-fPIC`/soname linker flags, the `libsdsNimMain` import and the +# `initializeLibrary()` proc the exported entry points call on every hop. +declareLibraryBase("sds") -import std/[typetraits, tables, atomics, locks], chronos, chronicles -import - ./sds_thread/sds_thread, - ./alloc, - ./ffi_types, - ./sds_thread/inter_thread_communication/sds_thread_request, - ./sds_thread/inter_thread_communication/requests/ - [sds_lifecycle_request, sds_message_request, sds_dependencies_request], - sds, - ./events/[ - json_message_ready_event, json_message_sent_event, json_missing_dependencies_event, - json_periodic_sync_event, json_repair_ready_event, - ] +# C callback typedefs (mirrors libsds.h). `SdsCallBack` is structurally the +# nim-ffi `FFICallBack`; the alias keeps the exported signatures readable. +type SdsCallBack* = FFICallBack + +type SdsRetrievalHintProvider* = proc( + messageId: cstring, hint: ptr cstring, hintLen: ptr csize_t, userData: pointer +) {.cdecl, gcsafe, raises: [].} + +# One pool per library type; the macros that would normally declare it +# (ffiCtor/ffiDtor) are not used here because we hand-write the entry points +# to preserve the exact C ABI, so we declare it explicitly. +var ReliabilityManagerFFIPool: FFIContextPool[ReliabilityManager] + +# registerReqFFI inspects each request field's type via `$node`, which only +# handles plain identifiers — a bracketed `SharedSeq[byte]` makes it choke. The +# aliases give the generated request structs non-bracketed field types. +type + SdsSharedBytes = SharedSeq[byte] + SdsSharedCstrs = SharedSeq[cstring] ################################################################################ -### Wrapper around the reliability manager -################################################################################ +### Retrieval-hint provider registry +### +### The retrieval-hint provider is a synchronous request/response callback +### (the C side returns bytes inline), so it does not fit the fire-and-forget +### event model. nim-ffi's FFIContext has no slot for it, so we keep a small +### per-context registry here. A fixed array of plain (non-GC) records keeps +### the lookup callable from the {.gcsafe.} hint closure running on the FFI +### thread. + +type RetrievalHintSlot = object + ctx: pointer + cb: pointer + userData: pointer + +var retrievalHintSlots: array[MaxFFIContexts, RetrievalHintSlot] +var retrievalHintsLock: Lock +retrievalHintsLock.initLock() + +proc setRetrievalHint(ctx: pointer, cb: pointer, userData: pointer) = + withLock retrievalHintsLock: + var free = -1 + for i in 0 ..< MaxFFIContexts: + if retrievalHintSlots[i].ctx == ctx: + retrievalHintSlots[i] = RetrievalHintSlot(ctx: ctx, cb: cb, userData: userData) + return + if free < 0 and retrievalHintSlots[i].ctx.isNil: + free = i + if free >= 0: + retrievalHintSlots[free] = RetrievalHintSlot(ctx: ctx, cb: cb, userData: userData) + +proc getRetrievalHint(ctx: pointer): tuple[cb: pointer, userData: pointer] {.gcsafe.} = + withLock retrievalHintsLock: + for i in 0 ..< MaxFFIContexts: + if retrievalHintSlots[i].ctx == ctx: + return (retrievalHintSlots[i].cb, retrievalHintSlots[i].userData) + return (nil, nil) + +proc clearRetrievalHint(ctx: pointer) = + withLock retrievalHintsLock: + for i in 0 ..< MaxFFIContexts: + if retrievalHintSlots[i].ctx == ctx: + retrievalHintSlots[i] = RetrievalHintSlot() + return ################################################################################ -### Not-exported components +### Shared-memory copy helpers +### +### Request payloads carrying binary/pointer data must be deep-copied into +### shared memory on the caller thread, because the FFI thread acks receipt +### before it reads the payload — the caller may free its buffer in between. +### cstring fields are deep-copied by the generated `ffiNewReq`; raw byte and +### `char**` arrays are not, so we copy them here. -template checkLibsdsParams*( - ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer -) = - ctx[].userData = userData +proc copyToSharedSeqByte(p: pointer, len: int): SharedSeq[byte] = + if p.isNil or len <= 0: + return (cast[ptr UncheckedArray[byte]](nil), 0) + let data = allocShared(len) + copyMem(data, p, len) + return (cast[ptr UncheckedArray[byte]](data), len) - if isNil(callback): - return RET_MISSING_CALLBACK +proc copyToSharedSeqCstr(p: pointer, count: int): SharedSeq[cstring] = + if p.isNil or count <= 0: + return (cast[ptr UncheckedArray[cstring]](nil), 0) + let data = cast[ptr UncheckedArray[cstring]](allocShared(sizeof(cstring) * count)) + let src = cast[ptr UncheckedArray[cstring]](p) + for i in 0 ..< count: + data[i] = src[i].alloc() + return (data, count) -template callEventCallback(ctx: ptr SdsContext, eventName: string, body: untyped) = - if isNil(ctx[].eventCallback): - error eventName & " - eventCallback is nil" - return +proc freeSharedSeqCstr(s: var SharedSeq[cstring]) = + if not s.data.isNil(): + for i in 0 ..< s.len: + if not s.data[i].isNil: + deallocShared(s.data[i]) + deallocShared(s.data) + s.len = 0 - if isNil(ctx[].eventUserData): - error eventName & " - eventUserData is nil" - return +################################################################################ +### Event callbacks +### +### These build the AppCallbacks closures handed to the ReliabilityManager. +### They run on the FFI worker thread and forward JSON event payloads to the +### C callback registered via SdsSetEventCallback (stored on the context). - foreignThreadGc: - try: - let event = body - cast[SdsCallBack](ctx[].eventCallback)( - RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData - ) - except Exception, CatchableError: - let msg = - "Exception " & eventName & " when calling 'eventCallBack': " & - getCurrentExceptionMsg() - cast[SdsCallBack](ctx[].eventCallback)( - RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData - ) - -var - ctxPool: seq[ptr SdsContext] - ctxPoolLock: Lock - -proc acquireCtx(callback: SdsCallBack, userData: pointer): ptr SdsContext = - ctxPoolLock.acquire() - defer: - ctxPoolLock.release() - if ctxPool.len > 0: - result = ctxPool.pop() - else: - result = sds_thread.createSdsThread().valueOr: - let msg = "Error in createSdsThread: " & $error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) - return nil - -proc releaseCtx(ctx: ptr SdsContext) = - ctxPoolLock.acquire() - defer: - ctxPoolLock.release() - ctx.userData = nil - ctx.eventCallback = nil - ctx.eventUserData = nil - ctxPool.add(ctx) - -proc handleRequest( - ctx: ptr SdsContext, - requestType: RequestType, - content: pointer, - callback: SdsCallBack, - userData: pointer, -): cint = - sds_thread.sendRequestToSdsThread(ctx, requestType, content, callback, userData).isOkOr: - let msg = "libsds error: " & $error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) - return RET_ERR - - return RET_OK - -proc onMessageReady(ctx: ptr SdsContext): MessageReadyCallback = +proc onMessageReady(ctx: ptr FFIContext[ReliabilityManager]): MessageReadyCallback = return proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = callEventCallback(ctx, "onMessageReady"): $JsonMessageReadyEvent.new(messageId, channelId) -proc onMessageSent(ctx: ptr SdsContext): MessageSentCallback = +proc onMessageSent(ctx: ptr FFIContext[ReliabilityManager]): MessageSentCallback = return proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = callEventCallback(ctx, "onMessageSent"): $JsonMessageSentEvent.new(messageId, channelId) -proc onMissingDependencies(ctx: ptr SdsContext): MissingDependenciesCallback = +proc onMissingDependencies( + ctx: ptr FFIContext[ReliabilityManager] +): MissingDependenciesCallback = return proc( messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID ) {.gcsafe.} = callEventCallback(ctx, "onMissingDependencies"): $JsonMissingDependenciesEvent.new(messageId, missingDeps, channelId) -proc onPeriodicSync(ctx: ptr SdsContext): PeriodicSyncCallback = +proc onPeriodicSync(ctx: ptr FFIContext[ReliabilityManager]): PeriodicSyncCallback = return proc() {.gcsafe.} = callEventCallback(ctx, "onPeriodicSync"): $JsonPeriodicSyncEvent.new() -proc onRepairReady(ctx: ptr SdsContext): RepairReadyCallback = +proc onRepairReady(ctx: ptr FFIContext[ReliabilityManager]): RepairReadyCallback = return proc(message: seq[byte], channelId: SdsChannelID) {.gcsafe.} = callEventCallback(ctx, "onRepairReady"): $JsonRepairReadyEvent.new(message, channelId) -proc onRetrievalHint(ctx: ptr SdsContext): RetrievalHintProvider = +proc onRetrievalHint(ctx: ptr FFIContext[ReliabilityManager]): RetrievalHintProvider = return proc(messageId: SdsMessageID): seq[byte] {.gcsafe.} = - if isNil(ctx.retrievalHintProvider): + let (cb, userData) = getRetrievalHint(cast[pointer](ctx)) + if cb.isNil(): return @[] var hint: cstring var hintLen: csize_t - cast[SdsRetrievalHintProvider](ctx.retrievalHintProvider)( - messageId.cstring, addr hint, addr hintLen, ctx.retrievalHintUserData + cast[SdsRetrievalHintProvider](cb)( + messageId.cstring, addr hint, addr hintLen, userData ) - if not isNil(hint) and hintLen > 0: + if not hint.isNil() and hintLen > 0: var hintBytes = newSeq[byte](hintLen) copyMem(addr hintBytes[0], hint, hintLen) deallocShared(hint) @@ -142,251 +165,305 @@ proc onRetrievalHint(ctx: ptr SdsContext): RetrievalHintProvider = return @[] -### End of not-exported components ################################################################################ +### Request handlers (executed on the FFI worker thread) + +registerReqFFI(SdsCreateRmReq, ctx: ptr FFIContext[ReliabilityManager]): + proc(): Future[Result[string, string]] {.async.} = + # TODO: thread `participantId` through SdsNewReliabilityManager FFI input + # and remove this hardcoded "". Empty id silently disables SDS-R; this is + # acceptable as a temporary FFI-only fallback until sds-go-bindings and + # logos-delivery's C-side caller are updated to supply the identity. + let rm = newReliabilityManager(participantId = "".SdsParticipantID).valueOr: + error "Failed creating reliability manager", error = error + return err("Failed creating reliability manager: " & $error) + + await rm.setCallbacks( + onMessageReady(ctx), onMessageSent(ctx), onMissingDependencies(ctx), + onPeriodicSync(ctx), onRetrievalHint(ctx), onRepairReady(ctx), + ) + + ctx.myLib[] = rm + return ok("") + +registerReqFFI(SdsResetRmReq, ctx: ptr FFIContext[ReliabilityManager]): + proc(): Future[Result[string, string]] {.async.} = + (await resetReliabilityManager(ctx.myLib[])).isOkOr: + error "RESET_RELIABILITY_MANAGER failed", error = error + return err("error processing RESET_RELIABILITY_MANAGER request: " & $error) + return ok("") + +registerReqFFI(SdsStartPeriodicTasksReq, ctx: ptr FFIContext[ReliabilityManager]): + proc(): Future[Result[string, string]] {.async.} = + ctx.myLib[].startPeriodicTasks() + return ok("") + +registerReqFFI(SdsWrapMessageReq, ctx: ptr FFIContext[ReliabilityManager]): + proc( + message: SdsSharedBytes, messageId: cstring, channelId: cstring + ): Future[Result[string, string]] {.async.} = + var msg = message + defer: + deallocSharedSeq(msg) + + let wrappedMessage = ( + await wrapOutgoingMessage(ctx.myLib[], message.toSeq(), $messageId, $channelId) + ).valueOr: + error "WRAP_MESSAGE failed", error = error + return err("error processing WRAP_MESSAGE request: " & $error) + + # returns a comma-separated string of bytes + return ok(wrappedMessage.mapIt($it).join(",")) + +registerReqFFI(SdsUnwrapMessageReq, ctx: ptr FFIContext[ReliabilityManager]): + proc(message: SdsSharedBytes): Future[Result[string, string]] {.async.} = + var msg = message + defer: + deallocSharedSeq(msg) + + let (unwrappedMessage, missingDeps, extractedChannelId) = ( + await unwrapReceivedMessage(ctx.myLib[], message.toSeq()) + ).valueOr: + return err("error processing UNWRAP_MESSAGE request: " & $error) + + # return the result as a json string + var node = newJObject() + node["message"] = %*unwrappedMessage + node["channelId"] = %*extractedChannelId + var missingDepsNode = newJArray() + for dep in missingDeps: + var depNode = newJObject() + depNode["messageId"] = %*dep.messageId + depNode["retrievalHint"] = %*encode(dep.retrievalHint) + missingDepsNode.add(depNode) + node["missingDeps"] = missingDepsNode + return ok($node) + +registerReqFFI(SdsMarkDepsReq, ctx: ptr FFIContext[ReliabilityManager]): + proc( + messageIds: SdsSharedCstrs, channelId: cstring + ): Future[Result[string, string]] {.async.} = + var ids = messageIds + defer: + freeSharedSeqCstr(ids) + + let messageIdSeq = ids.toSeq().mapIt($it) + (await markDependenciesMet(ctx.myLib[], messageIdSeq, $channelId)).isOkOr: + error "MARK_DEPENDENCIES_MET failed", error = error + return err("error processing MARK_DEPENDENCIES_MET request: " & $error) + return ok("") ################################################################################ -### Library setup +### Dispatch helper +### +### Sends a request to the FFI worker thread and returns RET_OK/RET_ERR, +### reporting any failure through the callback. The try/except keeps the +### exported entry points `raises: []` (sendRequestToFFIThread can raise), +### which `processReq` alone would not guarantee. -# Every Nim library must have this function called - the name is derived from -# the `--nimMainPrefix` command line option -proc libsdsNimMain() {.importc.} - -# To control when the library has been initialized -var initialized: Atomic[bool] - -if defined(android): - # Redirect chronicles to Android System logs - when compiles(defaultChroniclesStream.outputs[0].writer): - defaultChroniclesStream.outputs[0].writer = proc( - logLevel: LogLevel, msg: LogOutputStr - ) {.raises: [].} = - echo logLevel, msg - -proc initializeLibrary() {.exported.} = - if not initialized.exchange(true): - ## Every Nim library needs to call `NimMain` once exactly, to initialize the Nim runtime. - ## Being `` the value given in the optional compilation flag --nimMainPrefix:yourprefix - libsdsNimMain() - ctxPoolLock.initLock() # ensure the lock is initialized once (fix Windows crash) - when declared(setupForeignThreadGc): - setupForeignThreadGc() - when declared(nimGC_setStackBottom): - var locals {.volatile, noinit.}: pointer - locals = addr(locals) - nimGC_setStackBottom(locals) - -### End of library setup -################################################################################ +template dispatchReq( + ctx: untyped, callback: FFICallBack, userData: pointer, reqExpr: untyped +) = + let sendRes = + try: + ffi_context.sendRequestToFFIThread(ctx, reqExpr) + except Exception as exc: + Result[void, string].err("sendRequestToFFIThread exception: " & exc.msg) + if sendRes.isErr(): + let m = "libsds error: " & sendRes.error + callback(RET_ERR, unsafeAddr m[0], cast[csize_t](m.len), userData) + return RET_ERR + return RET_OK ################################################################################ -### Exported procs +### Exported C entry points (called from the application thread) +### +### Signatures must match library/libsds.h exactly. Each one validates the +### context against the pool (rejecting nil/dangling pointers at the boundary), +### checks the callback, deep-copies any pointer payloads into shared memory, +### then dispatches a request to the FFI worker thread. proc SdsNewReliabilityManager( - callback: SdsCallBack, userData: pointer -): pointer {.dynlib, exportc, cdecl.} = + callback: FFICallBack, userData: pointer +): pointer {.dynlib, exportc, cdecl, raises: [].} = initializeLibrary() - ## Creates a new instance of the Reliability Manager. if isNil(callback): - echo "error: missing callback in NewReliabilityManager" + echo "error: missing callback in SdsNewReliabilityManager" return nil - ## Create or reuse the SDS thread that will keep waiting for req from the main thread. - var ctx = acquireCtx(callback, userData) - if ctx.isNil(): + let ctx = ReliabilityManagerFFIPool.createFFIContext().valueOr: + let msg = "Error creating SDS FFI context: " & $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData) return nil - ctx.userData = userData - - let appCallbacks = AppCallbacks( - messageReadyCb: onMessageReady(ctx), - messageSentCb: onMessageSent(ctx), - missingDependenciesCb: onMissingDependencies(ctx), - periodicSyncCb: onPeriodicSync(ctx), - retrievalHintProvider: onRetrievalHint(ctx), - repairReadyCb: onRepairReady(ctx), - ) - - let retCode = handleRequest( - ctx, - RequestType.LIFECYCLE, - SdsLifecycleRequest.createShared( - SdsLifecycleMsgType.CREATE_RELIABILITY_MANAGER, nil, appCallbacks - ), - callback, - userData, - ) - - if retCode == RET_ERR: + let sendRes = + try: + ffi_context.sendRequestToFFIThread(ctx, SdsCreateRmReq.ffiNewReq(callback, userData)) + except Exception as exc: + Result[void, string].err("sendRequestToFFIThread exception: " & exc.msg) + if sendRes.isErr(): + let msg = "error creating reliability manager: " & sendRes.error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData) + discard ReliabilityManagerFFIPool.destroyFFIContext(ctx) return nil - return ctx + return cast[pointer](ctx) proc SdsSetEventCallback( - ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer -) {.dynlib, exportc.} = + ctx: ptr FFIContext[ReliabilityManager], callback: FFICallBack, userData: pointer +) {.dynlib, exportc, cdecl, raises: [].} = initializeLibrary() - ctx[].eventCallback = cast[pointer](callback) - ctx[].eventUserData = userData + if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)): + echo "error: invalid context in SdsSetEventCallback" + return + ctx[].callbackState.callback = cast[pointer](callback) + ctx[].callbackState.userData = userData proc SdsSetRetrievalHintProvider( - ctx: ptr SdsContext, callback: SdsRetrievalHintProvider, userData: pointer -) {.dynlib, exportc.} = + ctx: ptr FFIContext[ReliabilityManager], + callback: SdsRetrievalHintProvider, + userData: pointer, +) {.dynlib, exportc, cdecl, raises: [].} = initializeLibrary() - ctx[].retrievalHintProvider = cast[pointer](callback) - ctx[].retrievalHintUserData = userData + if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)): + echo "error: invalid context in SdsSetRetrievalHintProvider" + return + setRetrievalHint(cast[pointer](ctx), cast[pointer](callback), userData) proc SdsCleanupReliabilityManager( - ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer -): cint {.dynlib, exportc.} = + ctx: ptr FFIContext[ReliabilityManager], callback: FFICallBack, userData: pointer +): cint {.dynlib, exportc, cdecl, raises: [].} = initializeLibrary() - checkLibsdsParams(ctx, callback, userData) + if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)): + return RET_ERR + if isNil(callback): + return RET_MISSING_CALLBACK - let resetRes = handleRequest( - ctx, - RequestType.LIFECYCLE, - SdsLifecycleRequest.createShared(SdsLifecycleMsgType.RESET_RELIABILITY_MANAGER), - callback, - userData, - ) + clearRetrievalHint(cast[pointer](ctx)) - if resetRes == RET_ERR: + let res = ReliabilityManagerFFIPool.destroyFFIContext(ctx) + if res.isErr(): + let msg = "error cleaning up reliability manager: " & res.error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData) return RET_ERR - releaseCtx(ctx) - - # handleRequest already invoked the callback; nothing else to signal here. + callback(RET_OK, nil, 0, userData) return RET_OK proc SdsResetReliabilityManager( - ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer -): cint {.dynlib, exportc.} = + ctx: ptr FFIContext[ReliabilityManager], callback: FFICallBack, userData: pointer +): cint {.dynlib, exportc, cdecl, raises: [].} = initializeLibrary() - checkLibsdsParams(ctx, callback, userData) - handleRequest( - ctx, - RequestType.LIFECYCLE, - SdsLifecycleRequest.createShared(SdsLifecycleMsgType.RESET_RELIABILITY_MANAGER), - callback, - userData, - ) + if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)): + return RET_ERR + if isNil(callback): + return RET_MISSING_CALLBACK + dispatchReq(ctx, callback, userData, SdsResetRmReq.ffiNewReq(callback, userData)) proc SdsWrapOutgoingMessage( - ctx: ptr SdsContext, + ctx: ptr FFIContext[ReliabilityManager], message: pointer, messageLen: csize_t, messageId: cstring, channelId: cstring, - callback: SdsCallBack, + callback: FFICallBack, userData: pointer, -): cint {.dynlib, exportc.} = +): cint {.dynlib, exportc, cdecl, raises: [].} = initializeLibrary() - checkLibsdsParams(ctx, callback, userData) + if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)): + return RET_ERR + if isNil(callback): + return RET_MISSING_CALLBACK if message == nil and messageLen > 0: - let msg = "libsds error: " & "message pointer is NULL but length > 0" - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + let msg = "libsds error: message pointer is NULL but length > 0" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData) return RET_ERR if messageId == nil: - let msg = "libsds error: " & "message ID pointer is NULL" - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + let msg = "libsds error: message ID pointer is NULL" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData) return RET_ERR if channelId == nil: - let msg = "libsds error: " & "channel ID pointer is NULL" - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + let msg = "libsds error: channel ID pointer is NULL" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData) return RET_ERR - if channelId != nil and $channelId == "": - let msg = "libsds error: " & "channel ID is empty string" - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + if $channelId == "": + let msg = "libsds error: channel ID is empty string" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData) return RET_ERR - handleRequest( - ctx, - RequestType.MESSAGE, - SdsMessageRequest.createShared( - SdsMessageMsgType.WRAP_MESSAGE, message, messageLen, messageId, channelId - ), - callback, - userData, + let sharedMsg = copyToSharedSeqByte(message, messageLen.int) + dispatchReq( + ctx, callback, userData, + SdsWrapMessageReq.ffiNewReq(callback, userData, sharedMsg, messageId, channelId), ) proc SdsUnwrapReceivedMessage( - ctx: ptr SdsContext, + ctx: ptr FFIContext[ReliabilityManager], message: pointer, messageLen: csize_t, - callback: SdsCallBack, + callback: FFICallBack, userData: pointer, -): cint {.dynlib, exportc.} = +): cint {.dynlib, exportc, cdecl, raises: [].} = initializeLibrary() - checkLibsdsParams(ctx, callback, userData) + if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)): + return RET_ERR + if isNil(callback): + return RET_MISSING_CALLBACK if message == nil and messageLen > 0: - let msg = "libsds error: " & "message pointer is NULL but length > 0" - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + let msg = "libsds error: message pointer is NULL but length > 0" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData) return RET_ERR - handleRequest( - ctx, - RequestType.MESSAGE, - SdsMessageRequest.createShared( - SdsMessageMsgType.UNWRAP_MESSAGE, message, messageLen - ), - callback, - userData, - ) + let sharedMsg = copyToSharedSeqByte(message, messageLen.int) + dispatchReq(ctx, callback, userData, SdsUnwrapMessageReq.ffiNewReq(callback, userData, sharedMsg)) proc SdsMarkDependenciesMet( - ctx: ptr SdsContext, + ctx: ptr FFIContext[ReliabilityManager], messageIds: pointer, count: csize_t, channelId: cstring, - callback: SdsCallBack, + callback: FFICallBack, userData: pointer, -): cint {.dynlib, exportc.} = +): cint {.dynlib, exportc, cdecl, raises: [].} = initializeLibrary() - checkLibsdsParams(ctx, callback, userData) + if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)): + return RET_ERR + if isNil(callback): + return RET_MISSING_CALLBACK if messageIds == nil and count > 0: - let msg = "libsds error: " & "MessageIDs pointer is NULL but count > 0" - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + let msg = "libsds error: MessageIDs pointer is NULL but count > 0" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData) return RET_ERR if channelId == nil: - let msg = "libsds error: " & "channel ID pointer is NULL" - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + let msg = "libsds error: channel ID pointer is NULL" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData) return RET_ERR - if channelId != nil and $channelId == "": - let msg = "libsds error: " & "channel ID is empty string" - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + if $channelId == "": + let msg = "libsds error: channel ID is empty string" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData) return RET_ERR - handleRequest( - ctx, - RequestType.DEPENDENCIES, - SdsDependenciesRequest.createShared( - SdsDependenciesMsgType.MARK_DEPENDENCIES_MET, messageIds, count, channelId - ), - callback, - userData, + let sharedIds = copyToSharedSeqCstr(messageIds, count.int) + dispatchReq( + ctx, callback, userData, + SdsMarkDepsReq.ffiNewReq(callback, userData, sharedIds, channelId), ) proc SdsStartPeriodicTasks( - ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer -): cint {.dynlib, exportc.} = + ctx: ptr FFIContext[ReliabilityManager], callback: FFICallBack, userData: pointer +): cint {.dynlib, exportc, cdecl, raises: [].} = initializeLibrary() - checkLibsdsParams(ctx, callback, userData) - handleRequest( - ctx, - RequestType.LIFECYCLE, - SdsLifecycleRequest.createShared(SdsLifecycleMsgType.START_PERIODIC_TASKS), - callback, - userData, - ) - -### End of exported procs -################################################################################ + if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)): + return RET_ERR + if isNil(callback): + return RET_MISSING_CALLBACK + dispatchReq(ctx, callback, userData, SdsStartPeriodicTasksReq.ffiNewReq(callback, userData)) diff --git a/library/sds_thread/inter_thread_communication/requests/sds_dependencies_request.nim b/library/sds_thread/inter_thread_communication/requests/sds_dependencies_request.nim deleted file mode 100644 index d3f73ff..0000000 --- a/library/sds_thread/inter_thread_communication/requests/sds_dependencies_request.nim +++ /dev/null @@ -1,52 +0,0 @@ -import std/[json, strutils, net, sequtils] -import chronos, chronicles, results - -import library/alloc -import sds - -type SdsDependenciesMsgType* = enum - MARK_DEPENDENCIES_MET - -type SdsDependenciesRequest* = object - operation: SdsDependenciesMsgType - messageIds: SharedSeq[cstring] - count: csize_t - channelId: cstring - -proc createShared*( - T: type SdsDependenciesRequest, - op: SdsDependenciesMsgType, - messageIds: pointer, - count: csize_t = 0, - channelId: cstring = "", -): ptr type T = - var ret = createShared(T) - ret[].operation = op - ret[].count = count - ret[].channelId = channelId.alloc() - ret[].messageIds = allocSharedSeqFromCArray(cast[ptr cstring](messageIds), count.int) - return ret - -proc destroyShared(self: ptr SdsDependenciesRequest) = - deallocSharedSeq(self[].messageIds) - deallocShared(self[].channelId) - deallocShared(self) - -proc process*( - self: ptr SdsDependenciesRequest, rm: ptr ReliabilityManager -): Future[Result[string, string]] {.async.} = - defer: - destroyShared(self) - - case self.operation - of MARK_DEPENDENCIES_MET: - let messageIdsC = self.messageIds.toSeq() - let messageIds = messageIdsC.mapIt($it) - - (await markDependenciesMet(rm[], messageIds, $self.channelId)).isOkOr: - error "MARK_DEPENDENCIES_MET failed", error = error - return err("error processing MARK_DEPENDENCIES_MET request: " & $error) - - return ok("") - - return ok("") diff --git a/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim b/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim deleted file mode 100644 index a5befc5..0000000 --- a/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim +++ /dev/null @@ -1,70 +0,0 @@ -import std/json -import chronos, chronicles, results - -import library/alloc -import sds - -type SdsLifecycleMsgType* = enum - CREATE_RELIABILITY_MANAGER - RESET_RELIABILITY_MANAGER - START_PERIODIC_TASKS - -type SdsLifecycleRequest* = object - operation: SdsLifecycleMsgType - channelId: cstring - appCallbacks: AppCallbacks - -proc createShared*( - T: type SdsLifecycleRequest, - op: SdsLifecycleMsgType, - channelId: cstring = "", - appCallbacks: AppCallbacks = nil, -): ptr type T = - var ret = createShared(T) - ret[].operation = op - ret[].appCallbacks = appCallbacks - ret[].channelId = channelId.alloc() - return ret - -proc destroyShared(self: ptr SdsLifecycleRequest) = - deallocShared(self[].channelId) - deallocShared(self) - -proc createReliabilityManager( - appCallbacks: AppCallbacks = nil -): Future[Result[ReliabilityManager, string]] {.async.} = - # TODO: thread `participantId` through SdsNewReliabilityManager FFI input - # and remove this hardcoded "". Empty id silently disables SDS-R; this is - # acceptable as a temporary FFI-only fallback until sds-go-bindings and - # logos-delivery's C-side caller are updated to supply the identity. - let rm = newReliabilityManager(participantId = "".SdsParticipantID).valueOr: - error "Failed creating reliability manager", error = error - return err("Failed creating reliability manager: " & $error) - - await rm.setCallbacks( - appCallbacks.messageReadyCb, appCallbacks.messageSentCb, - appCallbacks.missingDependenciesCb, appCallbacks.periodicSyncCb, - appCallbacks.retrievalHintProvider, appCallbacks.repairReadyCb, - ) - - return ok(rm) - -proc process*( - self: ptr SdsLifecycleRequest, rm: ptr ReliabilityManager -): Future[Result[string, string]] {.async.} = - defer: - destroyShared(self) - - case self.operation - of CREATE_RELIABILITY_MANAGER: - rm[] = (await createReliabilityManager(self.appCallbacks)).valueOr: - error "CREATE_RELIABILITY_MANAGER failed", error = error - return err("error processing CREATE_RELIABILITY_MANAGER request: " & $error) - of RESET_RELIABILITY_MANAGER: - (await resetReliabilityManager(rm[])).isOkOr: - error "RESET_RELIABILITY_MANAGER failed", error = error - return err("error processing RESET_RELIABILITY_MANAGER request: " & $error) - of START_PERIODIC_TASKS: - rm[].startPeriodicTasks() - - return ok("") diff --git a/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim b/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim deleted file mode 100644 index 380e2db..0000000 --- a/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim +++ /dev/null @@ -1,89 +0,0 @@ -import std/[json, strutils, net, sequtils, base64] -import chronos, chronicles, results - -import library/alloc -import sds - -type SdsMessageMsgType* = enum - WRAP_MESSAGE - UNWRAP_MESSAGE - -type SdsMessageRequest* = object - operation: SdsMessageMsgType - message: SharedSeq[byte] - messageLen: csize_t - messageId: cstring - channelId: cstring - -type SdsUnwrapResponse* = object - message*: seq[byte] - missingDeps*: seq[HistoryEntry] - channelId*: string - -proc createShared*( - T: type SdsMessageRequest, - op: SdsMessageMsgType, - message: pointer, - messageLen: csize_t = 0, - messageId: cstring = "", - channelId: cstring = "", -): ptr type T = - var ret = createShared(T) - ret[].operation = op - ret[].messageLen = messageLen - ret[].messageId = messageId.alloc() - ret[].channelId = channelId.alloc() - ret[].message = allocSharedSeqFromCArray(cast[ptr byte](message), messageLen.int) - - return ret - -proc destroyShared(self: ptr SdsMessageRequest) = - deallocSharedSeq(self[].message) - deallocShared(self[].messageId) - deallocShared(self[].channelId) - deallocShared(self) - -proc process*( - self: ptr SdsMessageRequest, rm: ptr ReliabilityManager -): Future[Result[string, string]] {.async.} = - defer: - destroyShared(self) - - case self.operation - of WRAP_MESSAGE: - let messageBytes = self.message.toSeq() - - let wrappedMessage = ( - await wrapOutgoingMessage(rm[], messageBytes, $self.messageId, $self.channelId) - ).valueOr: - error "WRAP_MESSAGE failed", error = error - return err("error processing WRAP_MESSAGE request: " & $error) - - # returns a comma-separates string of bytes - return ok(wrappedMessage.mapIt($it).join(",")) - of UNWRAP_MESSAGE: - let messageBytes = self.message.toSeq() - - let (unwrappedMessage, missingDeps, extractedChannelId) = ( - await unwrapReceivedMessage(rm[], messageBytes) - ).valueOr: - return err("error processing UNWRAP_MESSAGE request: " & $error) - - let res = SdsUnwrapResponse( - message: unwrappedMessage, missingDeps: missingDeps, channelId: extractedChannelId - ) - - # return the result as a json string - var node = newJObject() - node["message"] = %*res.message - node["channelId"] = %*extractedChannelId - var missingDepsNode = newJArray() - for dep in res.missingDeps: - var depNode = newJObject() - depNode["messageId"] = %*dep.messageId - depNode["retrievalHint"] = %*encode(dep.retrievalHint) - missingDepsNode.add(depNode) - node["missingDeps"] = missingDepsNode - return ok($node) - - return ok("") diff --git a/library/sds_thread/inter_thread_communication/sds_thread_request.nim b/library/sds_thread/inter_thread_communication/sds_thread_request.nim deleted file mode 100644 index 56229a7..0000000 --- a/library/sds_thread/inter_thread_communication/sds_thread_request.nim +++ /dev/null @@ -1,78 +0,0 @@ -## This file contains the base message request type that will be handled. -## The requests are created by the main thread and processed by -## the SDS Thread. - -import std/json, results -import chronos, chronos/threadsync -import - ../../ffi_types, - ./requests/[sds_lifecycle_request, sds_message_request, sds_dependencies_request], - sds/sds_utils - -type RequestType* {.pure.} = enum - LIFECYCLE - MESSAGE - DEPENDENCIES - -type SdsThreadRequest* = object - reqType: RequestType - reqContent: pointer - callback: SdsCallBack - userData: pointer - -proc createShared*( - T: type SdsThreadRequest, - reqType: RequestType, - reqContent: pointer, - callback: SdsCallBack, - userData: pointer, -): ptr type T = - var ret = createShared(T) - ret[].reqType = reqType - ret[].reqContent = reqContent - ret[].callback = callback - ret[].userData = userData - return ret - -proc handleRes[T: string | void]( - res: Result[T, string], request: ptr SdsThreadRequest -) = - ## Handles the Result responses, which can either be Result[string, string] or - ## Result[void, string]. - - defer: - deallocShared(request) - - if res.isErr(): - foreignThreadGc: - let msg = "libsds error: handleRes fireSyncRes error: " & $res.error - request[].callback( - RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData - ) - return - - foreignThreadGc: - var msg: cstring = "" - when T is string: - msg = res.get().cstring() - request[].callback( - RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData - ) - return - -proc process*( - T: type SdsThreadRequest, request: ptr SdsThreadRequest, rm: ptr ReliabilityManager -) {.async.} = - let retFut = - case request[].reqType - of LIFECYCLE: - cast[ptr SdsLifecycleRequest](request[].reqContent).process(rm) - of MESSAGE: - cast[ptr SdsMessageRequest](request[].reqContent).process(rm) - of DEPENDENCIES: - cast[ptr SdsDependenciesRequest](request[].reqContent).process(rm) - - handleRes(await retFut, request) - -proc `$`*(self: SdsThreadRequest): string = - return $self.reqType diff --git a/library/sds_thread/sds_thread.nim b/library/sds_thread/sds_thread.nim deleted file mode 100644 index efcc42d..0000000 --- a/library/sds_thread/sds_thread.nim +++ /dev/null @@ -1,136 +0,0 @@ -{.pragma: exported, exportc, cdecl, raises: [].} -{.pragma: callback, cdecl, raises: [], gcsafe.} -{.passc: "-fPIC".} - -import std/[options, atomics, os, net, locks] -import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results -import ../ffi_types, ./inter_thread_communication/sds_thread_request, sds/sds_utils - -type SdsContext* = object - thread: Thread[(ptr SdsContext)] - lock: Lock - reqChannel: ChannelSPSCSingle[ptr SdsThreadRequest] - reqSignal: ThreadSignalPtr - # to inform The SDS Thread (a.k.a TST) that a new request is sent - reqReceivedSignal: ThreadSignalPtr - # to inform the main thread that the request is rx by TST - userData*: pointer - eventCallback*: pointer - eventUserdata*: pointer - retrievalHintProvider*: pointer - retrievalHintUserData*: pointer - running: Atomic[bool] # To control when the thread is running - -proc runSds(ctx: ptr SdsContext) {.async.} = - ## This is the worker body. This runs the SDS instance - ## and attends library user requests (stop, connect_to, etc.) - - var rm: ReliabilityManager - - while true: - await ctx.reqSignal.wait() - - if ctx.running.load == false: - break - - ## Trying to get a request from the libsds requestor thread - var request: ptr SdsThreadRequest - let recvOk = ctx.reqChannel.tryRecv(request) - if not recvOk: - error "sds thread could not receive a request" - continue - - ## Ack receipt to the requester thread BEFORE processing — it only - ## waits for "received", not "processed", so the caller's throughput - ## doesn't change. Processing is then awaited (was: asyncSpawn'd), - ## which serializes requests on this worker. The SP channel + lock - ## above already assume no concurrent requests, so awaiting here - ## aligns the processing side with that assumption. - let fireRes = ctx.reqReceivedSignal.fireSync() - if fireRes.isErr(): - error "could not fireSync back to requester thread", error = fireRes.error - - await SdsThreadRequest.process(request, addr rm) - -proc run(ctx: ptr SdsContext) {.thread.} = - ## Launch sds worker - waitFor runSds(ctx) - -proc createSdsThread*(): Result[ptr SdsContext, string] = - ## This proc is called from the main thread and it creates - ## the SDS working thread. - var ctx = createShared(SdsContext, 1) - ctx.reqSignal = ThreadSignalPtr.new().valueOr: - return err("couldn't create reqSignal ThreadSignalPtr") - ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: - return err("couldn't create reqReceivedSignal ThreadSignalPtr") - ctx.lock.initLock() - - ctx.running.store(true) - - try: - createThread(ctx.thread, run, ctx) - except ValueError, ResourceExhaustedError: - # and freeShared for typed allocations! - freeShared(ctx) - - return err("failed to create the SDS thread: " & getCurrentExceptionMsg()) - - return ok(ctx) - -proc destroySdsThread*(ctx: ptr SdsContext): Result[void, string] = - ctx.running.store(false) - - let signaledOnTime = ctx.reqSignal.fireSync().valueOr: - return err("error in destroySdsThread: " & $error) - if not signaledOnTime: - return err("failed to signal reqSignal on time in destroySdsThread") - - joinThread(ctx.thread) - ctx.lock.deinitLock() - ?ctx.reqSignal.close() - ?ctx.reqReceivedSignal.close() - freeShared(ctx) - - return ok() - -proc sendRequestToSdsThread*( - ctx: ptr SdsContext, - reqType: RequestType, - reqContent: pointer, - callback: SdsCallBack, - userData: pointer, -): Result[void, string] = - let req = SdsThreadRequest.createShared(reqType, reqContent, callback, userData) - - # This lock is only necessary while we use a SP Channel and while the signalling - # between threads assumes that there aren't concurrent requests. - # Rearchitecting the signaling + migrating to a MP Channel will allow us to receive - # requests concurrently and spare us the need of locks - ctx.lock.acquire() - defer: - ctx.lock.release() - ## Sending the request - let sentOk = ctx.reqChannel.trySend(req) - if not sentOk: - deallocShared(req) - return err("Couldn't send a request to the sds thread: " & $req[]) - - let fireSyncRes = ctx.reqSignal.fireSync() - if fireSyncRes.isErr(): - deallocShared(req) - return err("failed fireSync: " & $fireSyncRes.error) - - if fireSyncRes.get() == false: - deallocShared(req) - return err("Couldn't fireSync in time") - - ## wait until the SDS Thread properly received the request - let res = ctx.reqReceivedSignal.waitSync() - if res.isErr(): - deallocShared(req) - return err("Couldn't receive reqReceivedSignal signal") - - ## Notice that in case of "ok", the deallocShared(req) is performed by the SDS Thread in the - ## process proc. - ok() diff --git a/nimble.lock b/nimble.lock index 5b84e8b..ad7d9e3 100644 --- a/nimble.lock +++ b/nimble.lock @@ -313,6 +313,20 @@ "checksums": { "sha1": "09e1b2fdad55b973724d61227971afc0df0b7a81" } + }, + "ffi": { + "version": "0.1.4", + "vcsRevision": "fb25f069d2dfae2b543d79d2c1a81f197de22a2b", + "url": "https://github.com/logos-messaging/nim-ffi", + "downloadMethod": "git", + "dependencies": [ + "chronos", + "chronicles", + "taskpools" + ], + "checksums": { + "sha1": "4a5d4020a40106fa2a698d5fe975b9a8ba961f91" + } } }, "tasks": {} diff --git a/nix/deps.nix b/nix/deps.nix index 062e9a9..a73c829 100644 --- a/nix/deps.nix +++ b/nix/deps.nix @@ -164,4 +164,11 @@ fetchSubmodules = true; }; + ffi = pkgs.fetchgit { + url = "https://github.com/logos-messaging/nim-ffi"; + rev = "fb25f069d2dfae2b543d79d2c1a81f197de22a2b"; + sha256 = "0zkjnrm2yjlw27q99kv2x8ll61mbz4nr0cvmyq0csydh43c08k0p"; + fetchSubmodules = true; + }; + } diff --git a/sds.nimble b/sds.nimble index 7c5727a..25f65b4 100644 --- a/sds.nimble +++ b/sds.nimble @@ -16,7 +16,7 @@ requires "stew" requires "stint" requires "metrics" requires "results" -requires "taskpools >= 0.1.0" ## This should be removed when using nim-ffi dependency +requires "https://github.com/logos-messaging/nim-ffi >= 0.1.4" proc buildLibrary( outLibNameAndExt: string,