diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8c39cdf..2b55175 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -2,9 +2,9 @@ name: CI on: push: - branches: [master, main] + branches: [master, main, 'release/*'] pull_request: - branches: [master, main] + branches: [master, main, 'release/*'] jobs: # Single source of truth for Nim / Nimble versions used by every job and diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index 3fa50cf..48e5e1b 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -6,11 +6,25 @@ {.passc: "-fPIC".} -import std/[atomics, locks, options, tables] +import std/[atomics, locks, options, tables, sequtils] import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results -import ./ffi_types, ./ffi_events, ./ffi_thread_request, ./logging, ./cbor_serial +import ./ffi_types, ./ffi_thread_request, ./internal/ffi_macro, ./logging -export ffi_events +type FFICallbackState* = object + ## Holds the C event callback and its associated user-data pointer. + ## Embedded in FFIContext and referenced from the FFI thread via a thread-local. + callback*: pointer + userData*: pointer + +type CtxLifecycle {.pure.} = enum + ## State machine guarding a pooled FFI context, held as an Atomic on FFIContext. + ## Transitions: + ## Active -> RecyclePending when ffiDtor is invoked + ## RecyclePending -> Recycling The process completed the in-flight processes and is ready for lib cleanup and release + ## Recycling -> Active When the FFI thread is ready again to attend to requests + Active ## accepting and serving requests + RecyclePending ## recycle requested; FFI thread loop hasn't claimed it yet + Recycling ## FFI loop draining handlers, then frees lib + returns to pool type FFIContext*[T] = object myLib*: ptr T # main library object (Waku, LibP2P, SDS, …) @@ -32,6 +46,15 @@ type FFIContext*[T] = object # advanced each FFI-thread loop; event thread reads for liveness eventQueueStuck*: Atomic[bool] # sticky overflow flag running: Atomic[bool] # To control when the threads are running + lifecycle: Atomic[CtxLifecycle] + recycleCallback: FFICallBack + # The destructor's callback, fired by the recycle handler with the outcome: + # RET_OK once drained, RET_ERR if it timed out. Set by requestRecycle. + recycleUserData: pointer + inUse: Atomic[bool] + # Whether the context is claimed. createFFIContext claims it (false -> true); the + # recycle handler clears it once drained. On the context so the owning thread can + # release it without reaching into the pool. registeredRequests: ptr Table[cstring, FFIRequestProc] var onFFIThread* {.threadvar.}: bool @@ -55,6 +78,285 @@ template closeAndNil(field: untyped) = proc deinitContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = ## Mirror of `initContextResources`. Threads MUST be joined first; ## fields are nil'd after close so re-init on the same slot is safe. +template callEventCallback*(ctx: ptr FFIContext, eventName: string, body: untyped) = + if isNil(ctx[].callbackState.callback): + chronicles.error eventName & " - eventCallback is nil" + return + + foreignThreadGc: + try: + let event = body + cast[FFICallBack](ctx[].callbackState.callback)( + RET_OK, + unsafeAddr event[0], + cast[csize_t](len(event)), + ctx[].callbackState.userData, + ) + except Exception, CatchableError: + let msg = + "Exception " & eventName & " when calling 'eventCallBack': " & + getCurrentExceptionMsg() + cast[FFICallBack](ctx[].callbackState.callback)( + RET_ERR, + unsafeAddr msg[0], + cast[csize_t](len(msg)), + ctx[].callbackState.userData, + ) + +template dispatchFfiEvent*(eventName: string, body: untyped) = + ## Dispatches an FFI event to the callback registered via `{libName}_set_event_callback`. + ## `body` is evaluated lazily — only when a callback is registered. + ## Valid only on the FFI thread (i.e., inside {.ffi.} proc bodies and their async closures). + let ffiState = ffiCurrentCallbackState + if isNil(ffiState) or isNil(ffiState[].callback): + chronicles.error eventName & " - event callback not set" + return + foreignThreadGc: + try: + let event = body + cast[FFICallBack](ffiState[].callback)( + RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ffiState[].userData + ) + except Exception, CatchableError: + let msg = "Exception dispatching " & eventName & ": " & getCurrentExceptionMsg() + cast[FFICallBack](ffiState[].callback)( + RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ffiState[].userData + ) + +proc sendRequestToFFIThread*( + ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration +): Result[void, string] = + ctx.lock.acquire() + defer: + ctx.lock.release() + + if ctx.lifecycle.load() != CtxLifecycle.Active: + deleteRequest(ffiRequest) + return err("FFI context is not accepting requests (being recycled)") + + ## Sending the request to the FFI thread + let sentOk = ctx.reqChannel.trySend(ffiRequest) + if not sentOk: + deleteRequest(ffiRequest) + return err("Couldn't send a request to the ffi thread") + + let fireSyncRes = ctx.reqSignal.fireSync() + if fireSyncRes.isErr(): + deleteRequest(ffiRequest) + return err("failed fireSync: " & $fireSyncRes.error) + + if fireSyncRes.get() == false: + deleteRequest(ffiRequest) + return err("Couldn't fireSync in time") + + ## wait until the FFI working thread properly received the request + let res = ctx.reqReceivedSignal.waitSync(timeout) + if res.isErr(): + return err("Couldn't receive reqReceivedSignal signal") + + return ok() + +type Foo = object +registerReqFFI(WatchdogReq, foo: ptr Foo): + proc(): Future[Result[string, string]] {.async.} = + return ok("FFI thread is not blocked") + +type JsonNotRespondingEvent = object + eventType: string + +proc init(T: type JsonNotRespondingEvent): T = + return JsonNotRespondingEvent(eventType: "not_responding") + +proc `$`(event: JsonNotRespondingEvent): string = + $(%*event) + +proc onNotResponding*(ctx: ptr FFIContext) = + callEventCallback(ctx, "onNotResponding"): + $JsonNotRespondingEvent.init() + +proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} = + ## Watchdog thread that monitors the FFI thread and notifies the library user if it hangs. + ## This thread never blocks. + + let watchdogRun = proc(ctx: ptr FFIContext) {.async.} = + const WatchdogStartDelay = 10.seconds + const WatchdogTimeinterval = 1.seconds + const WatchdogTimeout = 20.seconds + + # Give time for the node to be created and up before sending watchdog requests + let initialStop = await ctx.stopSignal.wait().withTimeout(WatchdogStartDelay) + if initialStop or ctx.running.load == false: + return + + while true: + let intervalStop = await ctx.stopSignal.wait().withTimeout(WatchdogTimeinterval) + + if intervalStop or ctx.running.load == false: + debug "Watchdog thread exiting because FFIContext is not running" + break + + if ctx.lifecycle.load() != CtxLifecycle.Active: + continue + + let callback = proc( + callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer + ) {.cdecl, gcsafe, raises: [].} = + discard ## Don't do anything. Just respecting the callback signature. + const nilUserData = nil + + trace "Sending watchdog request to FFI thread" + + try: + sendRequestToFFIThread( + ctx, WatchdogReq.ffiNewReq(callback, nilUserData), WatchdogTimeout + ).isOkOr: + error "Failed to send watchdog request to FFI thread", error = $error + onNotResponding(ctx) + except Exception as exc: + error "Exception sending watchdog request", exc = exc.msg + onNotResponding(ctx) + + waitFor watchdogRun(ctx) + +proc processRequest[T]( + request: ptr FFIThreadRequest, ctx: ptr FFIContext[T] +) {.async.} = + ## Invoked within the FFI thread to process a request coming from the FFI API consumer thread. + + let reqId = $request[].reqId + ## The reqId determines which proc will handle the request. + ## The registeredRequests represents a table defined at compile time. + ## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously] + + let reqIdCs = reqId.cstring + # keep `reqId` alive and avoid the implicit string→cstring warning. + + let retFut = + if not ctx[].registeredRequests[].contains(reqIdCs): + ## That shouldn't happen because only registered requests should be sent to the FFI thread. + nilProcess(request[].reqId) + else: + ctx[].registeredRequests[][reqIdCs](request[].reqContent, ctx) + + let res = + try: + await retFut + except CancelledError as exc: + Result[string, string].err("Request cancelled during destroy: " & exc.msg) + except AsyncError as exc: + Result[string, string].err( + "Async error in processRequest for " & reqId & ": " & exc.msg + ) + + ## handleRes may raise (OOM, GC setup) even though it is rare. + try: + handleRes(res, request) + except Exception as exc: + error "Unexpected exception in handleRes", error = exc.msg + +proc freeLib[T](ctx: ptr FFIContext[T]) {.gcsafe.} = + if ctx.myLib.isNil(): + return + + when not defined(gcRefc): + {.cast(gcsafe).}: + `=destroy`(ctx.myLib[]) + else: + discard + freeShared(ctx.myLib) + ctx.myLib = nil + +var RecycleTimeout* = 1500.milliseconds + ## Upper bound the recycle handler waits for in-flight handlers before it + ## cancels them and reports the ctx as stuck. The drain returns as soon as they + ## finish, so this only bounds a *stuck* handler. A `var` so tests can shorten it. + +proc recycleContext[T]( + ctx: ptr FFIContext[T], ongoingProcessReq: ptr seq[Future[void]] +) {.async.} = + ## Drain the in-flight handlers, free the lib object, release the context for reuse, + ## and fire the callback with the outcome. Never blocks the caller. + + ongoingProcessReq[].keepItIf(not it.finished()) + + ## 1. Let the in-flight handlers finish on their own, bounded by RecycleTimeout. + var naturallyDrained = ongoingProcessReq[].len == 0 + if not naturallyDrained: + naturallyDrained = await allFutures(ongoingProcessReq[]).withTimeout(RecycleTimeout) + + ## 2. If any are wedged, cancel them and give the cancellations a bounded moment + ## to unwind, so the context can be reclaimed rather than leaked. + var safeToRecycle = naturallyDrained + if not naturallyDrained: + for fut in ongoingProcessReq[]: + if not fut.finished(): + fut.cancelSoon() + safeToRecycle = await allFutures(ongoingProcessReq[]).withTimeout(RecycleTimeout) + + let cb = ctx.recycleCallback + let ud = ctx.recycleUserData + ctx.recycleCallback = nil + + if safeToRecycle: + freeLib(ctx) + ctx.callbackState = default(FFICallbackState) + ongoingProcessReq[].setLen(0) + ctx.release() + + if not cb.isNil(): + foreignThreadGc: + let msg = + if naturallyDrained: + "" + else: + "recycle: in-flight requests did not finish in time" + let cmsg = msg.cstring + let retCode = if naturallyDrained: RET_OK else: RET_ERR + cb(retCode, unsafeAddr cmsg[0], cast[csize_t](msg.len), ud) + +proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = + ## FFI thread body that attends library user API requests + ffiCurrentCallbackState = addr ctx[].callbackState + + logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT) + + defer: + let fireRes = ctx.threadExitSignal.fireSync() + if fireRes.isErr(): + error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error + + let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} = + + var ongoingProcessReq: seq[Future[void]] + + while ctx.running.load(): + var expected = CtxLifecycle.RecyclePending + if ctx.lifecycle.compareExchange(expected, CtxLifecycle.Recycling): + await recycleContext(ctx, addr ongoingProcessReq) + continue + + let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds) + if not gotSignal: + continue + + ## Wait for a request from the ffi consumer thread + var request: ptr FFIThreadRequest + if not ctx.reqChannel.tryRecv(request): + continue + + ongoingProcessReq.keepItIf(not it.finished()) + ongoingProcessReq.add(processRequest(request, ctx)) + + let fireRes = ctx.reqReceivedSignal.fireSync() + if fireRes.isErr(): + error "could not fireSync back to requester thread", error = fireRes.error + + waitFor ffiRun(ctx) + +proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] = + ## Full cleanup for heap-allocated contexts: closes all resources and frees memory. + defer: + freeShared(ctx) ctx.lock.deinitLock() deinitEventRegistry(ctx[].eventRegistry) deinitEventQueue(ctx[].eventQueue) @@ -115,6 +417,7 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = ctx.registeredRequests = addr ffi_types.registeredRequests + ctx.lifecycle.store(CtxLifecycle.Active) ctx.running.store(true) try: @@ -180,10 +483,41 @@ proc stopAndJoinThreads*[T](ctx: ptr FFIContext[T]): Result[void, string] = joinThread(ctx.eventThread) ok() -proc clearContext[T](ctx: ptr FFIContext[T]): Result[void, string] = - ## Stops a heap-allocated FFI context. - ctx.stopAndJoinThreads().isOkOr: - return err("clearContext: " & $error) - ctx.cleanUpResources().isOkOr: - return err("cleanUpResources failed: " & $error) - ok() +proc requestRecycle*[T]( + ctx: ptr FFIContext[T], callback: FFICallBack, userData: pointer +): Result[void, string] = + ## Starts ctx recycle process without stopping its worker, so the next + ## createFFIContext reuses the same threads and fds. + ## + ## During recycling, the FFI thread drains the handlers, frees the lib and releases + ## the context, then fires `callback` (RET_OK drained, RET_ERR stuck). + + ctx.lock.acquire() + if ctx.lifecycle.load() != CtxLifecycle.Active: + ctx.lock.release() + return err("requestRecycle: context is not Active (already recycling)") + + ctx.recycleCallback = callback + ctx.recycleUserData = userData + ctx.lifecycle.store(CtxLifecycle.RecyclePending) + ctx.lock.release() + + let fired = ctx.reqSignal.fireSync().valueOr: + return err("requestRecycle: failed to signal the FFI thread: " & $error) + if not fired: + return err("requestRecycle: failed to signal the FFI thread in time") + return ok() + +proc markAsActive*[T](ctx: ptr FFIContext[T]) = + ctx.lifecycle.store(CtxLifecycle.Active) + +proc tryClaim*[T](ctx: ptr FFIContext[T]): bool = + ## Returns true if acquired the contex, false if it was already claimed. + var expected = false + ctx.inUse.compareExchange(expected, true) + +proc release*[T](ctx: ptr FFIContext[T]) = + ctx.inUse.store(false) + +proc isInUse*[T](ctx: ptr FFIContext[T]): bool = + ctx.inUse.load() diff --git a/ffi/ffi_context_pool.nim b/ffi/ffi_context_pool.nim index 5e2d2cb..421a636 100644 --- a/ffi/ffi_context_pool.nim +++ b/ffi/ffi_context_pool.nim @@ -1,56 +1,60 @@ import std/atomics import results -import ./ffi_context +import ./ffi_context, ./ffi_types const MaxFFIContexts* = 32 - # Only affects upfront pool memory; fds/threads consumed per acquired slot. + ## Maximum number of concurrently live FFI contexts when using FFIContextPool. type FFIContextPool*[T] = object - ## Fixed pool. Bounds ThreadSignalPtr fds at MaxFFIContexts * 2. - slots: array[MaxFFIContexts, FFIContext[T]] - inUse: array[MaxFFIContexts, Atomic[bool]] - -proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], string] = - for i in 0 ..< MaxFFIContexts: - var expected = false - if pool.inUse[i].compareExchange(expected, true): - return ok(pool.slots[i].addr) - err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)") - -proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) = - for i in 0 ..< MaxFFIContexts: - if pool.slots[i].addr == ctx: - pool.inUse[i].store(false) - return + contexts: array[MaxFFIContexts, FFIContext[T]] + initialized: array[MaxFFIContexts, Atomic[bool]] proc createFFIContext*[T]( pool: var FFIContextPool[T] ): Result[ptr FFIContext[T], string] = - let ctx = pool.acquireSlot().valueOr: - return err("createFFIContext: acquireSlot failed: " & $error) - initContextResources(ctx).isOkOr: - pool.releaseSlot(ctx) - return err("createFFIContext: initContextResources failed: " & $error) - ok(ctx) + ## Acquires a context from the fixed pool. The context's worker is built once on + ## first use and reused on every later acquisition. + + for i in 0 ..< MaxFFIContexts: + let ctx = pool.contexts[i].addr + if not ctx.tryClaim(): + continue + if pool.initialized[i].load(): + ## Reused context: a prior destroy drained and released it, worker still alive. + ctx.markAsActive() + return ok(ctx) + initContextResources(ctx).isOkOr: + ctx.release() + return err("createFFIContext: initContextResources failed: " & $error) + pool.initialized[i].store(true) + return ok(ctx) + return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)") + +proc releaseFFIContext*[T]( + ctx: ptr FFIContext[T], callback: FFICallBack, userData: pointer +): Result[void, string] = + return ctx.requestRecycle(callback, userData) proc destroyFFIContext*[T]( pool: var FFIContextPool[T], ctx: ptr FFIContext[T] ): Result[void, string] = - ## On thread-exit timeout the slot is leaked — closing live-thread resources is unsafe. + ## Full teardown: stops/joins the worker threads and returns the context to the + ## pool, marking it uninitialised so a later createFFIContext rebuilds it. ctx.stopAndJoinThreads().isOkOr: return err("destroyFFIContext(pool): " & $error) - # Required: next acquisition would otherwise re-init a live lock (UB). - let deinitRes = ctx.deinitContextResources() - pool.releaseSlot(ctx) - deinitRes.isOkOr: - return err("destroyFFIContext(pool): " & $error) - ok() + for i in 0 ..< MaxFFIContexts: + if pool.contexts[i].addr == ctx: + pool.initialized[i].store(false) + break + ctx.release() + return ok() proc isValidCtx*[T](pool: var FFIContextPool[T], ctx: pointer): bool = - ## Rejects nil / offset-invalid / dangling pointers at the API boundary. + ## Returns true only if ctx points to one of the pool's contexts that is + ## currently in use. if ctx.isNil(): return false for i in 0 ..< MaxFFIContexts: - if cast[pointer](pool.slots[i].addr) == ctx: - return pool.inUse[i].load() - false + if cast[pointer](pool.contexts[i].addr) == ctx: + return cast[ptr FFIContext[T]](ctx).isInUse() + return false diff --git a/ffi/internal/ffi_macro.nim b/ffi/internal/ffi_macro.nim index b8e7be4..714abd0 100644 --- a/ffi/internal/ffi_macro.nim +++ b/ffi/internal/ffi_macro.nim @@ -1253,16 +1253,15 @@ macro ffiDtor*(prc: untyped): untyped = ## The body contains any library-level cleanup to run before context teardown. ## ## Example: - ## proc waku_destroy*(w: Waku) {.ffiDtor.} = - ## w.cleanup() + ## proc mylibobj_destroy*(obj: MyLibObj) {.ffiDtor.} = + ## obj.cleanup() ## ## The generated C-exported proc has the signature: - ## int waku_destroy(void* ctx) + ## cint mylibobj_destroy(void* ctx, FfiCallback callback, void* userData) ## - ## It extracts the library value from ctx, runs the body, then calls - ## destroyFFIContext to tear down the FFI thread and free the context. - ## Returns RET_OK on success, RET_ERR on failure (null/invalid ctx, or - ## destroyFFIContext failure). + ## Recycle the context for reuse to keep fd usage bounded. + ## NON-BLOCKING: returns RET_OK once accepted; + ## the real outcome arrives via `callback`. let procName = prc[0] let formalParams = prc[3] @@ -1271,8 +1270,8 @@ macro ffiDtor*(prc: untyped): untyped = if formalParams.len < 2: error("ffiDtor: proc must have exactly one parameter (w: LibType)") - let libParamName = formalParams[1][0] - let libTypeName = formalParams[1][1] + let libParamName = formalParams[1][0] # e.g. w + let libTypeName = formalParams[1][1] # e.g. MyLibObj let procNameStr = block: let raw = $procName @@ -1289,7 +1288,7 @@ macro ffiDtor*(prc: untyped): untyped = if procName.kind == nnkPostfix: cExportProcName = procName[1] - let destroyResIdent = genSym(nskLet, "destroyRes") + let releaseResIdent = genSym(nskLet, "destroyRes") let ffiBody = newStmtList() @@ -1314,12 +1313,14 @@ macro ffiDtor*(prc: untyped): untyped = let poolIdent = ident($libTypeName & "FFIPool") ffiBody.add quote do: - let `destroyResIdent` = - `poolIdent`.destroyFFIContext(cast[ptr FFIContext[`libTypeName`]](ctx)) - if `destroyResIdent`.isErr(): + let `releaseResIdent` = releaseFFIContext( + cast[ptr FFIContext[`libTypeName`]](ctx), callback, userData + ) + if `releaseResIdent`.isErr(): + if not callback.isNil(): + let errStr = "release failed: " & $`releaseResIdent`.error + callback(RET_ERR, unsafeAddr errStr[0], cast[csize_t](errStr.len), userData) return RET_ERR - - ffiBody.add quote do: return RET_OK let ffiProc = newProc( diff --git a/tests/test_ffi_context.nim b/tests/test_ffi_context.nim new file mode 100644 index 0000000..bf6989c --- /dev/null +++ b/tests/test_ffi_context.nim @@ -0,0 +1,892 @@ +import std/[locks, strutils, os, osproc, sequtils] +import unittest2 +import results +import ../ffi + +type TestLib = object + +## Per-request callback state. The test thread blocks on `cond` until the +## FFI thread signals it — no polling, no CPU waste. +type CallbackData = object + lock: Lock + cond: Cond + called: bool + retCode: cint + msg: array[512, char] + msgLen: int + +proc initCallbackData(d: var CallbackData) = + d.lock.initLock() + d.cond.initCond() + +proc deinitCallbackData(d: var CallbackData) = + d.cond.deinitCond() + d.lock.deinitLock() + +proc testCallback( + retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer +) {.cdecl, gcsafe, raises: [].} = + let d = cast[ptr CallbackData](userData) + acquire(d[].lock) + d[].retCode = retCode + let n = min(int(len), d[].msg.high) + if n > 0 and not msg.isNil: + copyMem(addr d[].msg[0], msg, n) + d[].msg[n] = '\0' + d[].msgLen = n + d[].called = true + signal(d[].cond) + release(d[].lock) + +proc waitCallback(d: var CallbackData) = + acquire(d.lock) + while not d.called: + wait(d.cond, d.lock) + release(d.lock) + +proc callbackMsg(d: var CallbackData): string = + result = newString(d.msgLen) + if d.msgLen > 0: + copyMem(addr result[0], addr d.msg[0], d.msgLen) + +registerReqFFI(PingRequest, lib: ptr TestLib): + proc(message: cstring): Future[Result[string, string]] {.async.} = + return ok("pong:" & $message) + +registerReqFFI(FailRequest, lib: ptr TestLib): + proc(): Future[Result[string, string]] {.async.} = + return err("intentional failure") + +registerReqFFI(EmptyOkRequest, lib: ptr TestLib): + proc(): Future[Result[string, string]] {.async.} = + return ok("") + +registerReqFFI(SlowRequest, lib: ptr TestLib): + proc(): Future[Result[string, string]] {.async.} = + await sleepAsync(500.milliseconds) + return ok("slow-done") + +# Coordination channel: the FFI handler signals the test thread the instant +# it is about to block the event loop, so the test can call destroyFFIContext +# while the event loop is truly frozen. +var gSyncBlockStarted: Channel[bool] +gSyncBlockStarted.open() + +registerReqFFI(SyncBlockingRequest, lib: ptr TestLib): + proc(): Future[Result[string, string]] {.async.} = + # Yield first so that reqReceivedSignal fires and sendRequestToFFIThread + # returns on the calling thread before we start the synchronous block. + await sleepAsync(0.milliseconds) + # Signal the test thread: the event loop is about to be frozen. + # Channel.send is annotated as raising under refc, so wrap. + try: + gSyncBlockStarted.send(true) + except Exception as exc: + return err("gSyncBlockStarted.send raised: " & exc.msg) + # Simulates a request that blocks the event-loop thread synchronously + # (e.g. w.stop() -> switch.stop() -> connManager.close() with blocking I/O). + # Unlike sleepAsync, os.sleep holds the OS thread and prevents Chronos from + # processing any callbacks -- including the reqSignal fired by destroyFFIContext. + os.sleep(5_000) + return ok("sync-blocking-done") + +# Approximates the heavy ref-object workload that libwaku/libp2p performs on +# the FFI thread. The exact cell count is large enough to force several refc +# GC cycles; under refc this stresses the heap state that, when later combined +# with a chronos Selector allocation on the main thread (via close()), used to +# trip the rawNewObj → signal-handler infinite recursion. +type RefCell = ref object + next: RefCell + payload: array[64, byte] + +registerReqFFI(HeavyRefAllocRequest, lib: ptr TestLib): + proc(): Future[Result[string, string]] {.async.} = + var head: RefCell + for i in 0 ..< 50_000: + let n = RefCell(next: head) + head = n + if i mod 1000 == 0: + await sleepAsync(0.milliseconds) + # Break the chain iteratively before releasing head. + # ORC's =destroy for RefCell recurses through .next, so a 50k-node chain + # would produce ~50k nested =destroy calls and overflow the stack. + # Walking the list and unlinking each node first keeps destruction O(n) + # iterative instead of O(n) recursive. + var node = head + head = nil + while not node.isNil(): + let nxt = node.next + node.next = nil # unlink before the refcount of `node` can drop to zero + node = nxt + await sleepAsync(10.milliseconds) + return ok("heavy-done") + +suite "FFIContextPool": + test "create and destroy via pool succeeds": + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: + assert false, "createFFIContext(pool) failed: " & $error + return + check pool.destroyFFIContext(ctx).isOk() + + test "context is reused after destroy": + var pool: FFIContextPool[TestLib] + let ctx1 = pool.createFFIContext().valueOr: + assert false, "createFFIContext(pool) failed: " & $error + return + check pool.destroyFFIContext(ctx1).isOk() + # After destroying, the same context must be available again + let ctx2 = pool.createFFIContext().valueOr: + assert false, "createFFIContext(pool) failed after context release: " & $error + return + check pool.destroyFFIContext(ctx2).isOk() + check ctx1 == ctx2 # same context reused + + test "pool exhaustion returns error": + var pool: FFIContextPool[TestLib] + var ctxs: array[MaxFFIContexts, ptr FFIContext[TestLib]] + for i in 0 ..< MaxFFIContexts: + ctxs[i] = pool.createFFIContext().valueOr: + for j in 0 ..< i: + discard pool.destroyFFIContext(ctxs[j]) + assert false, "createFFIContext(pool) failed at context " & $i & ": " & $error + return + # Pool is now full — next create must fail + check pool.createFFIContext().isErr() + for i in 0 ..< MaxFFIContexts: + discard pool.destroyFFIContext(ctxs[i]) + + test "requests are processed via pool context": + var pool: FFIContextPool[TestLib] + var d: CallbackData + initCallbackData(d) + defer: + deinitCallbackData(d) + + let ctx = pool.createFFIContext().valueOr: + assert false, "createFFIContext(pool) failed: " & $error + return + defer: + discard pool.destroyFFIContext(ctx) + + check sendRequestToFFIThread( + ctx, PingRequest.ffiNewReq(testCallback, addr d, "pool".cstring) + ) + .isOk() + waitCallback(d) + check d.retCode == RET_OK + check callbackMsg(d) == "pong:pool" + +suite "createFFIContext / destroyFFIContext": + test "create and destroy succeeds": + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: + checkpoint "createFFIContext failed: " & $error + check false + return + check pool.destroyFFIContext(ctx).isOk() + + test "double destroy is safe via running flag": + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: + check false + return + check pool.destroyFFIContext(ctx).isOk() + +suite "destroyFFIContext does not hang": + test "destroy while a slow async request is still in-flight": + ## Reproduces the race where destroyFFIContext was called while a long- + ## running async request (e.g. stop_node / w.stop()) was still executing. + ## The destroy must return well within 2 seconds; before the fix it would + ## block forever on joinThread(ffiThread). + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: + check false + return + + var d: CallbackData + initCallbackData(d) + defer: deinitCallbackData(d) + + # sendRequestToFFIThread returns as soon as the FFI thread ACKs receipt; + # the 500 ms work continues asynchronously on the FFI thread. + check sendRequestToFFIThread( + ctx, SlowRequest.ffiNewReq(testCallback, addr d) + ).isOk() + + # Destroy immediately while SlowRequest is still running. + let t0 = Moment.now() + check pool.destroyFFIContext(ctx).isOk() + check (Moment.now() - t0) < 2.seconds + +suite "destroyFFIContext does not hang when event loop is blocked": + test "destroy while sync-blocking request is in-flight": + ## Reproduces the hang seen in logosdelivery_example.c: + ## logosdelivery_stop_node(...) -- triggers w.stop() on the FFI thread + ## sleep(1) + ## logosdelivery_destroy(...) -- hangs forever + ## + ## Root cause: w.stop() (and similar tear-down calls) can execute a + ## synchronous blocking section that holds the OS thread, preventing + ## the Chronos event loop from processing the reqSignal fired by + ## destroyFFIContext. The result is joinThread(ffiThread) never returns. + ## + ## With the fix, destroyFFIContext must complete well within the 5 s that + ## SyncBlockingRequest holds the event loop. + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: + check false + return + + # CallbackData and ctx are kept alive past destroyFFIContext: the leaked + # FFI thread is still inside os.sleep(5_000) and will eventually wake, + # run handleRes, fire testCallback, and exit normally. We wait for that + # to happen at the end of the test so the leaked thread cannot race with + # subsequent tests' createFFIContext on Linux/Windows. Heap allocation + # ensures the late callback's userData is still valid when it fires. + let d = createShared(CallbackData) + initCallbackData(d[]) + + check sendRequestToFFIThread( + ctx, SyncBlockingRequest.ffiNewReq(testCallback, d) + ).isOk() + + # Block until the FFI handler has signalled that os.sleep is about to start. + # This guarantees destroyFFIContext is called while the event loop is frozen. + discard gSyncBlockStarted.recv() + + # Destroy must return promptly even though the event loop is frozen for 5s. + # It deliberately returns err and leaks ctx in this scenario rather than + # hanging on joinThread. + let t0 = Moment.now() + check pool.destroyFFIContext(ctx).isErr() + check (Moment.now() - t0) < 3.seconds + + # Drain the leaked thread before the test scope ends. + # 1. waitCallback blocks until os.sleep(5_000) returns and handleRes + # invokes testCallback (~3.5s after destroy returned), which proves + # the leaked thread has reached the end of processRequest. + # 2. Yield briefly so the thread can finish iterating its while loop, + # fire threadExitSignal in its defer, and return. Without this, on + # Linux/Windows the still-live thread can race with the next test's + # createFFIContext under --mm:orc and segfault. + # ctx.cleanUpResources is intentionally NOT called: destroyFFIContext + # skipped it for a reason, and the signal fds are reclaimed by the OS + # at process exit. + waitCallback(d[]) + os.sleep(200) + deinitCallbackData(d[]) + freeShared(d) + +suite "destroyFFIContext refc workaround": + ## Documents the refc-specific workaround in cleanUpResources. + ## + ## Background: when the FFI thread does heavy ref-object work (the workload + ## that triggered the libwaku hang in production), the refc GC heap reaches + ## a state where the very first chronos Selector allocation on the *main* + ## thread — which happens lazily inside ThreadSignalPtr.close() through + ## getThreadDispatcher() — traps in rawNewObj. The refc signal handler + ## itself re-enters the same allocator and the process never returns. + ## Captured stack: + ## close → safeUnregisterAndCloseFd → getThreadDispatcher → + ## newDispatcher → Selector.new → newObj (gc.nim:488) → rawNewObj → + ## _sigtramp → signalHandler → newObjNoInit → addNewObjToZCT (loop) + ## + ## The workaround in cleanUpResources is `when defined(gcRefc): discard`, + ## i.e. skip the close() calls under refc only. orc is unaffected and + ## still cleans up the signal fds normally. + ## + ## NOTE: this test is documentation more than regression: a synthetic + ## ref-allocation workload of ~50k cells does NOT corrupt the refc heap + ## the way the real libwaku/libp2p teardown does, so this test passes + ## even when the workaround is disabled. Reproducing the actual hang + ## requires the full libwaku workload (logosdelivery_example.c). + ## Verification of the workaround was done end-to-end against that + ## example: with `--mm:refc` and close() enabled it hangs forever in + ## the captured stack above; with `when defined(gcRefc): discard` it + ## returns immediately. Under `--mm:orc` it returns immediately either + ## way. + test "destroy after heavy ref-allocation workload returns promptly": + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: + check false + return + + var d: CallbackData + initCallbackData(d) + defer: deinitCallbackData(d) + + check sendRequestToFFIThread( + ctx, HeavyRefAllocRequest.ffiNewReq(testCallback, addr d) + ).isOk() + waitCallback(d) + check d.retCode == RET_OK + + let t0 = Moment.now() + check pool.destroyFFIContext(ctx).isOk() + check (Moment.now() - t0) < 3.seconds + +suite "sendRequestToFFIThread": + test "successful request triggers RET_OK callback": + var d: CallbackData + initCallbackData(d) + defer: + deinitCallbackData(d) + + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: + check false + return + defer: + discard pool.destroyFFIContext(ctx) + + check sendRequestToFFIThread( + ctx, PingRequest.ffiNewReq(testCallback, addr d, "hello".cstring) + ) + .isOk() + waitCallback(d) + check d.retCode == RET_OK + check callbackMsg(d) == "pong:hello" + + test "failing request triggers RET_ERR callback": + var d: CallbackData + initCallbackData(d) + defer: + deinitCallbackData(d) + + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: + check false + return + defer: + discard pool.destroyFFIContext(ctx) + + check sendRequestToFFIThread(ctx, FailRequest.ffiNewReq(testCallback, addr d)).isOk() + waitCallback(d) + check d.retCode == RET_ERR + + test "empty ok response delivers empty message": + var d: CallbackData + initCallbackData(d) + defer: + deinitCallbackData(d) + + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: + check false + return + defer: + discard pool.destroyFFIContext(ctx) + + check sendRequestToFFIThread(ctx, EmptyOkRequest.ffiNewReq(testCallback, addr d)) + .isOk() + waitCallback(d) + check d.retCode == RET_OK + check d.msgLen == 0 + + test "sequential requests are all processed": + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: + check false + return + defer: + discard pool.destroyFFIContext(ctx) + + for i in 1 .. 5: + var d: CallbackData + initCallbackData(d) + let msg = "msg" & $i + check sendRequestToFFIThread( + ctx, PingRequest.ffiNewReq(testCallback, addr d, msg.cstring) + ) + .isOk() + waitCallback(d) + deinitCallbackData(d) + check d.retCode == RET_OK + check callbackMsg(d) == "pong:" & msg + +# --------------------------------------------------------------------------- +# ffiCtor macro integration test +# --------------------------------------------------------------------------- + +type SimpleLib = object + value: int + +ffiType: + type SimpleConfig = object + initialValue: int + +proc testlib_create*( + config: SimpleConfig +): Future[Result[SimpleLib, string]] {.ffiCtor.} = + return ok(SimpleLib(value: config.initialValue)) + +# Records the value of the library object the destructor body saw, so a test can +# confirm the user cleanup body ran with the right lib state before teardown. +var gDestroyedValue {.threadvar.}: int +proc testlib_destroy*(lib: SimpleLib) {.ffiDtor.} = + gDestroyedValue = lib.value + +suite "ffiCtor macro": + test "creates context and returns pointer via callback": + var d: CallbackData + initCallbackData(d) + defer: deinitCallbackData(d) + + let configJson = ffiSerialize(SimpleConfig(initialValue: 42)) + let ret = testlib_create(configJson.cstring, testCallback, addr d) + + check not ret.isNil() + + waitCallback(d) + + check d.retCode == RET_OK + + # The callback message is the ctx address as a decimal string + let addrStr = callbackMsg(d) + check addrStr.len > 0 + + let ctxAddr = cast[uint](parseBiggestUInt(addrStr)) + check ctxAddr != 0 + let ctx = cast[ptr FFIContext[SimpleLib]](ctxAddr) + + # Verify the library was properly initialized + check not ctx[].myLib.isNil + check ctx[].myLib[].value == 42 + + check SimpleLibFFIPool.destroyFFIContext(ctx).isOk() + +proc createSimpleLib(initialValue: int): ptr FFIContext[SimpleLib] = + ## Helper: run the generated async ctor and return the live ctx. + var d: CallbackData + initCallbackData(d) + defer: + deinitCallbackData(d) + let ret = + testlib_create(ffiSerialize(SimpleConfig(initialValue: initialValue)).cstring, + testCallback, addr d) + doAssert not ret.isNil() + waitCallback(d) + doAssert d.retCode == RET_OK + return cast[ptr FFIContext[SimpleLib]](cast[uint](parseBiggestUInt(callbackMsg(d)))) + +suite "ffiDtor macro (async destroy + reuse)": + test "destroy fires RET_OK after teardown, frees myLib, and frees the context": + let ctx = createSimpleLib(5) + check not ctx[].myLib.isNil + check ctx[].myLib[].value == 5 + + var dD: CallbackData + initCallbackData(dD) + defer: + deinitCallbackData(dD) + + # Async destroy: the C return is just "accepted"; the real outcome arrives + # via the callback once the FFI thread has finished tearing the lib down. + check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK + waitCallback(dD) + check dD.retCode == RET_OK + check gDestroyedValue == 5 # the user cleanup body saw the live lib + check ctx[].myLib.isNil() # freed on the FFI thread + + # The context was freed from the FFI thread, so a fresh create reclaims it. + let ctx2 = createSimpleLib(9) + check ctx2 == ctx # same context, reused worker + fds + check ctx2[].myLib[].value == 9 + check SimpleLibFFIPool.destroyFFIContext(ctx2).isOk() + + test "destroy waits for an in-flight request before reporting RET_OK": + let ctx = createSimpleLib(1) + + # Dispatch a 500 ms handler and do NOT wait — it is in flight at destroy time. + var slow: CallbackData + initCallbackData(slow) + defer: + deinitCallbackData(slow) + check sendRequestToFFIThread(ctx, SlowRequest.ffiNewReq(testCallback, addr slow)).isOk() + + var dD: CallbackData + initCallbackData(dD) + defer: + deinitCallbackData(dD) + check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK + waitCallback(dD) + check dD.retCode == RET_OK + # Drained: the in-flight handler ran to completion before destroy reported OK. + check slow.called + check callbackMsg(slow) == "slow-done" + + let ctx2 = createSimpleLib(2) + check ctx2 == ctx + check SimpleLibFFIPool.destroyFFIContext(ctx2).isOk() + + test "requests are rejected once a destroy closes the gate": + let ctx = createSimpleLib(3) + + var dD: CallbackData + initCallbackData(dD) + defer: + deinitCallbackData(dD) + check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK + waitCallback(dD) + check dD.retCode == RET_OK + + # Gate stays closed until the context is reacquired: a late request must not + # dispatch onto a context about to be (or already) reused. + var d: CallbackData + initCallbackData(d) + defer: + deinitCallbackData(d) + check sendRequestToFFIThread( + ctx, SlowRequest.ffiNewReq(testCallback, addr d) + ).isErr() + + let ctx2 = createSimpleLib(4) + check ctx2 == ctx + check SimpleLibFFIPool.destroyFFIContext(ctx2).isOk() + + test "a stuck context is reported as RET_ERR rather than hanging": + let ctx = createSimpleLib(8) + + let savedTimeout = RecycleTimeout + RecycleTimeout = 150.milliseconds + defer: + RecycleTimeout = savedTimeout + + # In-flight handler outlasts the (shortened) drain timeout. + var slow: CallbackData + initCallbackData(slow) + defer: + deinitCallbackData(slow) + check sendRequestToFFIThread(ctx, SlowRequest.ffiNewReq(testCallback, addr slow)).isOk() + + var dD: CallbackData + initCallbackData(dD) + defer: + deinitCallbackData(dD) + check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK + waitCallback(dD) + check dD.retCode == RET_ERR # drain timed out -> ctx reported stuck + + # The stuck context is leaked (not reused); the handler still finishes on its + # own. Wait for it, then fully tear the leaked context down. + waitCallback(slow) + check SimpleLibFFIPool.destroyFFIContext(ctx).isOk() + +# --------------------------------------------------------------------------- +# Simplified .ffi. macro integration test +# --------------------------------------------------------------------------- + +ffiType: + type SendConfig = object + message: string + +proc testlib_send*( + lib: SimpleLib, cfg: SendConfig +): Future[Result[string, string]] {.ffi.} = + return ok("echo:" & cfg.message & ":" & $lib.value) + +suite "simplified .ffi. macro": + test "sends request and gets serialized response via callback": + # First create a context using ffiCtor + var ctorD: CallbackData + initCallbackData(ctorD) + defer: deinitCallbackData(ctorD) + + let configJson = ffiSerialize(SimpleConfig(initialValue: 7)) + let ctorRet = testlib_create(configJson.cstring, testCallback, addr ctorD) + check not ctorRet.isNil() + + waitCallback(ctorD) + check ctorD.retCode == RET_OK + + let addrStr = callbackMsg(ctorD) + check addrStr.len > 0 + + let ctxAddr = cast[uint](parseBiggestUInt(addrStr)) + check ctxAddr != 0 + let ctx = cast[ptr FFIContext[SimpleLib]](ctxAddr) + defer: check SimpleLibFFIPool.destroyFFIContext(ctx).isOk() + + # Now call the .ffi. proc + var d: CallbackData + initCallbackData(d) + defer: deinitCallbackData(d) + + let cfgJson = ffiSerialize(SendConfig(message: "hello")) + let ret = testlib_send(ctx, testCallback, addr d, cfgJson.cstring) + check ret == RET_OK + + waitCallback(d) + check d.retCode == RET_OK + + let receivedMsg = callbackMsg(d) + let decoded = ffiDeserialize(receivedMsg.cstring, string).valueOr: + check false + "" + check decoded == "echo:hello:7" + +# --------------------------------------------------------------------------- +# async/sync detection in .ffi. macro integration test +# --------------------------------------------------------------------------- + +# Sync proc (no await in body) — macro detects this and bypasses thread machinery +proc testlib_version*( + lib: SimpleLib +): Future[Result[string, string]] {.ffi.} = + return ok("v" & $lib.value) + +suite "async/sync detection in .ffi.": + test "sync proc invokes callback without thread hop": + # Create a context using ffiCtor + var ctorD: CallbackData + initCallbackData(ctorD) + defer: deinitCallbackData(ctorD) + + let configJson = ffiSerialize(SimpleConfig(initialValue: 3)) + let ctorRet = testlib_create(configJson.cstring, testCallback, addr ctorD) + check not ctorRet.isNil() + + waitCallback(ctorD) + check ctorD.retCode == RET_OK + + let addrStr = callbackMsg(ctorD) + check addrStr.len > 0 + + let ctxAddr = cast[uint](parseBiggestUInt(addrStr)) + check ctxAddr != 0 + let ctx = cast[ptr FFIContext[SimpleLib]](ctxAddr) + defer: check SimpleLibFFIPool.destroyFFIContext(ctx).isOk() + + var d2: CallbackData + initCallbackData(d2) + defer: deinitCallbackData(d2) + + # Call sync proc — callback should fire before the proc returns (no thread hop) + let ret = testlib_version(ctx, testCallback, addr d2) + # No sleep needed: sync path fires callback inline before returning + check ret == RET_OK + check d2.called # fires synchronously — no waitCallback needed + check d2.retCode == RET_OK + let receivedMsg = callbackMsg(d2) + let decoded = ffiDeserialize(receivedMsg.cstring, string).valueOr: + check false + "" + check decoded == "v3" + +# --------------------------------------------------------------------------- +# ptr T return type in .ffi. macro integration test +# --------------------------------------------------------------------------- + +type Handle = object + data: string + +ffiType: + type NameParam = object + name: string + +proc testlib_alloc_handle*( + lib: SimpleLib, np: NameParam +): Future[Result[ptr Handle, string]] {.ffi.} = + let h = createShared(Handle) + h[] = Handle(data: np.name & ":" & $lib.value) + return ok(h) + +proc testlib_read_handle*( + lib: SimpleLib, handle: pointer +): Future[Result[string, string]] {.ffi.} = + let h = cast[ptr Handle](handle) + return ok(h[].data) + +proc testlib_free_handle*( + lib: SimpleLib, handle: pointer +): Future[Result[string, string]] {.ffi.} = + let h = cast[ptr Handle](handle) + deallocShared(h) + return ok("freed") + +suite "ptr return type in .ffi.": + test "returns a heap-allocated handle and reads it back": + # Create context via ffiCtor + var ctorD: CallbackData + initCallbackData(ctorD) + defer: deinitCallbackData(ctorD) + + let configJson = ffiSerialize(SimpleConfig(initialValue: 5)) + let ctorRet = testlib_create(configJson.cstring, testCallback, addr ctorD) + check not ctorRet.isNil() + + waitCallback(ctorD) + check ctorD.retCode == RET_OK + + let ctxAddrStr = callbackMsg(ctorD) + check ctxAddrStr.len > 0 + let ctxAddr = cast[uint](parseBiggestUInt(ctxAddrStr)) + check ctxAddr != 0 + let ctx = cast[ptr FFIContext[SimpleLib]](ctxAddr) + defer: check SimpleLibFFIPool.destroyFFIContext(ctx).isOk() + + # Alloc a handle + var allocD: CallbackData + initCallbackData(allocD) + defer: deinitCallbackData(allocD) + + let npJson = ffiSerialize(NameParam(name: "test")) + let allocRet = testlib_alloc_handle(ctx, testCallback, addr allocD, npJson.cstring) + check allocRet == RET_OK + + waitCallback(allocD) + check allocD.retCode == RET_OK + + let handleAddrStr = callbackMsg(allocD) + check handleAddrStr.len > 0 + let handleAddr = parseBiggestUInt(handleAddrStr) + check handleAddr != 0 + + # Read the handle back + var readD: CallbackData + initCallbackData(readD) + defer: deinitCallbackData(readD) + + let handleJson = ffiSerialize(cast[pointer](handleAddr)) + let readRet = testlib_read_handle(ctx, testCallback, addr readD, handleJson.cstring) + check readRet == RET_OK + + waitCallback(readD) + check readD.retCode == RET_OK + + let readMsg = callbackMsg(readD) + let decodedStr = ffiDeserialize(readMsg.cstring, string).valueOr: + check false + "" + check decodedStr == "test:5" + + # Free the handle + var freeD: CallbackData + initCallbackData(freeD) + defer: deinitCallbackData(freeD) + + let freeRet = testlib_free_handle(ctx, testCallback, addr freeD, handleJson.cstring) + check freeRet == RET_OK + + waitCallback(freeD) + check freeD.retCode == RET_OK + +# --------------------------------------------------------------------------- +# releaseFFIContext: park & reuse (fd-leak regression) +# --------------------------------------------------------------------------- + +proc countOpenFds(): int = + ## Number of open fds for this process, or -1 if not determinable on this + ## platform. On Linux we count /proc/self/fd; elsewhere we shell out to lsof + ## (skipped if lsof is unavailable, e.g. Windows). + when defined(linux): + var n = 0 + for _ in walkDir("/proc/self/fd"): + inc n + return n + else: + if findExe("lsof").len == 0: + return -1 + try: + let output = + execProcess("lsof", args = ["-p", $getCurrentProcessId()], options = {poUsePath}) + return output.splitLines().countIt(it.len > 0) + except CatchableError: + return -1 + +proc releaseAndWait[T](ctx: ptr FFIContext[T]): cint = + ## Test helper mirroring how a C consumer destroys a context: kick off the + ## (non-blocking) teardown and block on the callback, returning its retCode. + ## RET_OK means the lib's in-flight tasks finished and the context was parked. + var d: CallbackData + initCallbackData(d) + defer: + deinitCallbackData(d) + if ctx.releaseFFIContext(testCallback, addr d).isErr(): + return RET_ERR + waitCallback(d) + return d.retCode + +suite "releaseFFIContext (park & reuse)": + test "park returns the context and reuses the same live worker": + var pool: FFIContextPool[TestLib] + let ctx1 = pool.createFFIContext().valueOr: + check false + return + check ctx1.releaseAndWait() == RET_OK + + # Reacquire: must be the same context, with its worker still running. + let ctx2 = pool.createFFIContext().valueOr: + check false + return + check ctx1 == ctx2 + + var d: CallbackData + initCallbackData(d) + defer: + deinitCallbackData(d) + check sendRequestToFFIThread( + ctx2, PingRequest.ffiNewReq(testCallback, addr d, "reuse".cstring) + ).isOk() + waitCallback(d) + check d.retCode == RET_OK + check callbackMsg(d) == "pong:reuse" # reused worker still processes requests + + check pool.destroyFFIContext(ctx2).isOk() + + test "park drops the stale event callback and library pointer": + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: + check false + return + ctx.callbackState.callback = cast[pointer](testCallback) + ctx.callbackState.userData = cast[pointer](0xDEAD) + + check ctx.releaseAndWait() == RET_OK + check ctx.callbackState.callback.isNil() # a watchdog tick can't call a freed cb + check ctx.callbackState.userData.isNil() + check ctx.myLib.isNil() + + check pool.destroyFFIContext(ctx).isOk() + + test "fd usage stays bounded across many park/reuse cycles": + if countOpenFds() < 0: + skip() # no fd-counting facility on this platform + else: + var pool: FFIContextPool[TestLib] + + # Warm up: the first create builds the context's worker (its fds are allocated + # once here); parking keeps them open for reuse. + block: + let ctx = pool.createFFIContext().valueOr: + check false + return + check ctx.releaseAndWait() == RET_OK + + let baseline = countOpenFds() + + for _ in 0 ..< 20: + let ctx = pool.createFFIContext().valueOr: + check false + return + var d: CallbackData + initCallbackData(d) + check sendRequestToFFIThread( + ctx, PingRequest.ffiNewReq(testCallback, addr d, "x".cstring) + ).isOk() + waitCallback(d) + deinitCallbackData(d) + check ctx.releaseAndWait() == RET_OK + + let afterCycles = countOpenFds() + # Reuse must not grow fds. Before the fix each cycle leaked ~10 fds (4 + # ThreadSignalPtr socketpairs + 2 dispatcher kqueues); the small slack + # only tolerates unrelated runtime fd noise, not a per-cycle leak. + check afterCycles <= baseline + 5 + + # Tear the (still parked) context's worker down so the test leaves no threads. + let last = pool.createFFIContext().valueOr: + check false + return + check pool.destroyFFIContext(last).isOk()