From c1aed9cfa596abb44a9bb6f4e478d175e8f060e7 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Mon, 8 Jun 2026 13:36:14 +0200 Subject: [PATCH] fix(pool): recycle contexts to stop per-cycle fd leak (#74) --- .github/workflows/ci.yml | 4 +- ffi/ffi_context.nim | 178 +++++++++++++++++++++----- ffi/ffi_context_pool.nim | 75 ++++++----- ffi/internal/ffi_macro.nim | 30 ++--- tests/test_ffi_context.nim | 254 ++++++++++++++++++++++++++++++++++++- 5 files changed, 443 insertions(+), 98 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2874557..ac5f716 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -2,9 +2,9 @@ name: CI on: push: - branches: [master, main] + branches: [master, main, 'release/*'] pull_request: - branches: [master, main] + branches: [master, main, 'release/*'] env: NIM_VERSION: '2.2.4' diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index f0a2cde..729d9a4 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -2,7 +2,7 @@ {.pragma: callback, cdecl, raises: [], gcsafe.} {.passc: "-fPIC".} -import std/[atomics, locks, json, tables] +import std/[atomics, locks, json, tables, sequtils] import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results import ./ffi_types, ./ffi_thread_request, ./internal/ffi_macro, ./logging @@ -12,6 +12,16 @@ type FFICallbackState* = object callback*: pointer userData*: pointer +type CtxLifecycle {.pure.} = enum + ## State machine guarding a pooled FFI context, held as an Atomic on FFIContext. + ## Transitions: + ## Active -> RecyclePending when ffiDtor is invoked + ## RecyclePending -> Recycling The process completed the in-flight processes and is ready for lib cleanup and release + ## Recycling -> Active When the FFI thread is ready again to attend to requests + Active ## accepting and serving requests + RecyclePending ## recycle requested; FFI thread loop hasn't claimed it yet + Recycling ## FFI loop draining handlers, then frees lib + returns to pool + type FFIContext*[T] = object myLib*: ptr T # main library object (e.g., Waku, LibP2P, SDS, the one to be exposed as a library) @@ -33,6 +43,15 @@ type FFIContext*[T] = object userData*: pointer callbackState*: FFICallbackState running: Atomic[bool] # To control when the threads are running + lifecycle: Atomic[CtxLifecycle] + recycleCallback: FFICallBack + # The destructor's callback, fired by the recycle handler with the outcome: + # RET_OK once drained, RET_ERR if it timed out. Set by requestRecycle. + recycleUserData: pointer + inUse: Atomic[bool] + # Whether the context is claimed. createFFIContext claims it (false -> true); the + # recycle handler clears it once drained. On the context so the owning thread can + # release it without reaching into the pool. registeredRequests: ptr Table[cstring, FFIRequestProc] # Pointer to with the registered requests at compile time @@ -90,14 +109,14 @@ proc sendRequestToFFIThread*( ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration ): Result[void, string] = ctx.lock.acquire() - # This lock is only necessary while we use a SP Channel and while the signalling - # between threads assumes that there aren't concurrent requests. - # Rearchitecting the signaling + migrating to a MP Channel will allow us to receive - # requests concurrently and spare us the need of locks defer: ctx.lock.release() - ## Sending the request + if ctx.lifecycle.load() != CtxLifecycle.Active: + deleteRequest(ffiRequest) + return err("FFI context is not accepting requests (being recycled)") + + ## Sending the request to the FFI thread let sentOk = ctx.reqChannel.trySend(ffiRequest) if not sentOk: deleteRequest(ffiRequest) @@ -115,12 +134,8 @@ proc sendRequestToFFIThread*( ## wait until the FFI working thread properly received the request let res = ctx.reqReceivedSignal.waitSync(timeout) if res.isErr(): - ## Do not free ffiRequest here: the FFI thread was already signaled and - ## will process (and free) it. return err("Couldn't receive reqReceivedSignal signal") - ## Notice that in case of "ok", the deallocShared(req) is performed by the FFI Thread in the - ## process proc. return ok() type Foo = object @@ -162,6 +177,9 @@ proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} = debug "Watchdog thread exiting because FFIContext is not running" break + if ctx.lifecycle.load() != CtxLifecycle.Active: + continue + let callback = proc( callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer ) {.cdecl, gcsafe, raises: [].} = @@ -192,9 +210,8 @@ proc processRequest[T]( ## The registeredRequests represents a table defined at compile time. ## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously] - ## Explicit conversion keeps `reqId` alive as the backing string, - ## avoiding the implicit string→cstring warning that will become an error. let reqIdCs = reqId.cstring + # keep `reqId` alive and avoid the implicit string→cstring warning. let retFut = if not ctx[].registeredRequests[].contains(reqIdCs): @@ -206,19 +223,79 @@ proc processRequest[T]( let res = try: await retFut + except CancelledError as exc: + Result[string, string].err("Request cancelled during destroy: " & exc.msg) except AsyncError as exc: Result[string, string].err( "Async error in processRequest for " & reqId & ": " & exc.msg ) - ## handleRes may raise (OOM, GC setup) even though it is rare. Catching here - ## keeps the async proc raises:[] compatible. The defer inside handleRes - ## guarantees request is freed before the exception propagates. + ## handleRes may raise (OOM, GC setup) even though it is rare. try: handleRes(res, request) except Exception as exc: error "Unexpected exception in handleRes", error = exc.msg +proc freeLib[T](ctx: ptr FFIContext[T]) {.gcsafe.} = + if ctx.myLib.isNil(): + return + + when not defined(gcRefc): + {.cast(gcsafe).}: + `=destroy`(ctx.myLib[]) + else: + discard + freeShared(ctx.myLib) + ctx.myLib = nil + +var RecycleTimeout* = 1500.milliseconds + ## Upper bound the recycle handler waits for in-flight handlers before it + ## cancels them and reports the ctx as stuck. The drain returns as soon as they + ## finish, so this only bounds a *stuck* handler. A `var` so tests can shorten it. + +proc recycleContext[T]( + ctx: ptr FFIContext[T], ongoingProcessReq: ptr seq[Future[void]] +) {.async.} = + ## Drain the in-flight handlers, free the lib object, release the context for reuse, + ## and fire the callback with the outcome. Never blocks the caller. + + ongoingProcessReq[].keepItIf(not it.finished()) + + ## 1. Let the in-flight handlers finish on their own, bounded by RecycleTimeout. + var naturallyDrained = ongoingProcessReq[].len == 0 + if not naturallyDrained: + naturallyDrained = await allFutures(ongoingProcessReq[]).withTimeout(RecycleTimeout) + + ## 2. If any are wedged, cancel them and give the cancellations a bounded moment + ## to unwind, so the context can be reclaimed rather than leaked. + var safeToRecycle = naturallyDrained + if not naturallyDrained: + for fut in ongoingProcessReq[]: + if not fut.finished(): + fut.cancelSoon() + safeToRecycle = await allFutures(ongoingProcessReq[]).withTimeout(RecycleTimeout) + + let cb = ctx.recycleCallback + let ud = ctx.recycleUserData + ctx.recycleCallback = nil + + if safeToRecycle: + freeLib(ctx) + ctx.callbackState = default(FFICallbackState) + ongoingProcessReq[].setLen(0) + ctx.release() + + if not cb.isNil(): + foreignThreadGc: + let msg = + if naturallyDrained: + "" + else: + "recycle: in-flight requests did not finish in time" + let cmsg = msg.cstring + let retCode = if naturallyDrained: RET_OK else: RET_ERR + cb(retCode, unsafeAddr cmsg[0], cast[csize_t](msg.len), ud) + proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = ## FFI thread body that attends library user API requests ffiCurrentCallbackState = addr ctx[].callbackState @@ -226,18 +303,20 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT) defer: - # Signal destroyFFIContext that this thread has exited, so its bounded - # wait can unblock and proceed with cleanup. let fireRes = ctx.threadExitSignal.fireSync() if fireRes.isErr(): error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} = - var ffiReqHandler: T - ## Holds the main library object, i.e., in charge of handling the ffi requests. - ## e.g., Waku, LibP2P, SDS, etc. + + var ongoingProcessReq: seq[Future[void]] while ctx.running.load(): + var expected = CtxLifecycle.RecyclePending + if ctx.lifecycle.compareExchange(expected, CtxLifecycle.Recycling): + await recycleContext(ctx, addr ongoingProcessReq) + continue + let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds) if not gotSignal: continue @@ -247,11 +326,8 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = if not ctx.reqChannel.tryRecv(request): continue - if ctx.myLib.isNil(): - ctx.myLib = addr ffiReqHandler - - ## Handle the request - asyncSpawn processRequest(request, ctx) + ongoingProcessReq.keepItIf(not it.finished()) + ongoingProcessReq.add(processRequest(request, ctx)) let fireRes = ctx.reqReceivedSignal.fireSync() if fireRes.isErr(): @@ -297,9 +373,9 @@ proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] = return ok() proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = - ## Initialises all resources inside an already-allocated FFIContext slot. + ## Initialises all resources inside an already-allocated FFIContext. ## On failure every partially-initialised resource is closed; the caller - ## is responsible for releasing the slot (freeShared or pool.releaseSlot). + ## is responsible for releasing the context. ctx.lock.initLock() var success = false @@ -323,6 +399,7 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = ctx.registeredRequests = addr ffi_types.registeredRequests + ctx.lifecycle.store(CtxLifecycle.Active) ctx.running.store(true) try: @@ -361,8 +438,8 @@ proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] = ## If the FFI thread's event loop is blocked by a synchronous handler ## (e.g. blocking I/O), it cannot process reqSignal in time to exit. -## clearContext waits on threadExitSignal up to this bound; on timeout it -## returns err and skips joinThread/cleanup (leaking the thread + ctx slot) +## stopAndJoinThreads waits on threadExitSignal up to this bound; on timeout it +## returns err and skips joinThread/cleanup (leaking the thread + ctx) ## rather than hanging the caller forever. const ThreadExitTimeout* = 1500.milliseconds @@ -386,10 +463,41 @@ proc stopAndJoinThreads*[T](ctx: ptr FFIContext[T]): Result[void, string] = joinThread(ctx.watchdogThread) return ok() -proc clearContext[T](ctx: ptr FFIContext[T]): Result[void, string] = - ## Stops the FFI context that was created via createFFIContext[T]() (heap). - ctx.stopAndJoinThreads().isOkOr: - return err("clearContext: " & $error) - ctx.cleanUpResources().isOkOr: - return err("cleanUpResources failed: " & $error) +proc requestRecycle*[T]( + ctx: ptr FFIContext[T], callback: FFICallBack, userData: pointer +): Result[void, string] = + ## Starts ctx recycle process without stopping its worker, so the next + ## createFFIContext reuses the same threads and fds. + ## + ## During recycling, the FFI thread drains the handlers, frees the lib and releases + ## the context, then fires `callback` (RET_OK drained, RET_ERR stuck). + + ctx.lock.acquire() + if ctx.lifecycle.load() != CtxLifecycle.Active: + ctx.lock.release() + return err("requestRecycle: context is not Active (already recycling)") + + ctx.recycleCallback = callback + ctx.recycleUserData = userData + ctx.lifecycle.store(CtxLifecycle.RecyclePending) + ctx.lock.release() + + let fired = ctx.reqSignal.fireSync().valueOr: + return err("requestRecycle: failed to signal the FFI thread: " & $error) + if not fired: + return err("requestRecycle: failed to signal the FFI thread in time") return ok() + +proc markAsActive*[T](ctx: ptr FFIContext[T]) = + ctx.lifecycle.store(CtxLifecycle.Active) + +proc tryClaim*[T](ctx: ptr FFIContext[T]): bool = + ## Returns true if acquired the contex, false if it was already claimed. + var expected = false + ctx.inUse.compareExchange(expected, true) + +proc release*[T](ctx: ptr FFIContext[T]) = + ctx.inUse.store(false) + +proc isInUse*[T](ctx: ptr FFIContext[T]): bool = + ctx.inUse.load() diff --git a/ffi/ffi_context_pool.nim b/ffi/ffi_context_pool.nim index dc1e154..421a636 100644 --- a/ffi/ffi_context_pool.nim +++ b/ffi/ffi_context_pool.nim @@ -1,63 +1,60 @@ import std/atomics import results -import ./ffi_context +import ./ffi_context, ./ffi_types const MaxFFIContexts* = 32 ## Maximum number of concurrently live FFI contexts when using FFIContextPool. - ## Fds and threads are only consumed for slots that are actually acquired, - ## so this value only affects the upfront memory of the pool array. type FFIContextPool*[T] = object - ## Fixed-size pool of FFI contexts. Avoids dynamic heap allocation per context - ## and bounds the total number of file descriptors consumed by ThreadSignalPtrs - ## to at most MaxFFIContexts * 2. - slots: array[MaxFFIContexts, FFIContext[T]] - inUse: array[MaxFFIContexts, Atomic[bool]] - -proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], string] = - for i in 0 ..< MaxFFIContexts: - var expected = false - if pool.inUse[i].compareExchange(expected, true): - return ok(pool.slots[i].addr) - return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)") - -proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) = - for i in 0 ..< MaxFFIContexts: - if pool.slots[i].addr == ctx: - pool.inUse[i].store(false) - return + contexts: array[MaxFFIContexts, FFIContext[T]] + initialized: array[MaxFFIContexts, Atomic[bool]] proc createFFIContext*[T]( pool: var FFIContextPool[T] ): Result[ptr FFIContext[T], string] = - ## Acquires a slot from the fixed pool and initialises it as an FFI context. - ## Bounded fd usage: at most MaxFFIContexts * 2 ThreadSignalPtr fds are ever open. - let ctx = pool.acquireSlot().valueOr: - return err("createFFIContext: acquireSlot failed: " & $error) - initContextResources(ctx).isOkOr: - pool.releaseSlot(ctx) - return err("createFFIContext: initContextResources failed: " & $error) - return ok(ctx) + ## Acquires a context from the fixed pool. The context's worker is built once on + ## first use and reused on every later acquisition. + + for i in 0 ..< MaxFFIContexts: + let ctx = pool.contexts[i].addr + if not ctx.tryClaim(): + continue + if pool.initialized[i].load(): + ## Reused context: a prior destroy drained and released it, worker still alive. + ctx.markAsActive() + return ok(ctx) + initContextResources(ctx).isOkOr: + ctx.release() + return err("createFFIContext: initContextResources failed: " & $error) + pool.initialized[i].store(true) + return ok(ctx) + return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)") + +proc releaseFFIContext*[T]( + ctx: ptr FFIContext[T], callback: FFICallBack, userData: pointer +): Result[void, string] = + return ctx.requestRecycle(callback, userData) proc destroyFFIContext*[T]( pool: var FFIContextPool[T], ctx: ptr FFIContext[T] ): Result[void, string] = - ## Stops the FFI context and returns its slot to the pool. If the FFI thread - ## is blocked and does not exit in time, the slot is leaked rather than - ## reclaimed — closing its resources while the thread is still live would be - ## unsafe. + ## Full teardown: stops/joins the worker threads and returns the context to the + ## pool, marking it uninitialised so a later createFFIContext rebuilds it. ctx.stopAndJoinThreads().isOkOr: return err("destroyFFIContext(pool): " & $error) - pool.releaseSlot(ctx) + for i in 0 ..< MaxFFIContexts: + if pool.contexts[i].addr == ctx: + pool.initialized[i].store(false) + break + ctx.release() return ok() proc isValidCtx*[T](pool: var FFIContextPool[T], ctx: pointer): bool = - ## Returns true only if ctx points to one of the pool's slots that is - ## currently in use. Rejects nil, offset-invalid, and dangling pointers - ## at the API boundary, preventing use-after-free dereferences. + ## Returns true only if ctx points to one of the pool's contexts that is + ## currently in use. if ctx.isNil(): return false for i in 0 ..< MaxFFIContexts: - if cast[pointer](pool.slots[i].addr) == ctx: - return pool.inUse[i].load() + if cast[pointer](pool.contexts[i].addr) == ctx: + return cast[ptr FFIContext[T]](ctx).isInUse() return false diff --git a/ffi/internal/ffi_macro.nim b/ffi/internal/ffi_macro.nim index 45077bd..be23618 100644 --- a/ffi/internal/ffi_macro.nim +++ b/ffi/internal/ffi_macro.nim @@ -1511,14 +1511,15 @@ macro ffiDtor*(prc: untyped): untyped = ## The body contains any library-level cleanup to run before context teardown. ## ## Example: - ## proc waku_destroy*(w: Waku) {.ffiDtor.} = - ## w.cleanup() + ## proc mylibobj_destroy*(obj: MyLibObj) {.ffiDtor.} = + ## obj.cleanup() ## ## The generated C-exported proc has the signature: - ## cint waku_destroy(void* ctx, FfiCallback callback, void* userData) + ## cint mylibobj_destroy(void* ctx, FfiCallback callback, void* userData) ## - ## It extracts the library value from ctx, runs the body, then calls - ## destroyFFIContext to tear down the FFI thread and free the context. + ## Recycle the context for reuse to keep fd usage bounded. + ## NON-BLOCKING: returns RET_OK once accepted; + ## the real outcome arrives via `callback`. let procName = prc[0] let formalParams = prc[3] @@ -1528,7 +1529,7 @@ macro ffiDtor*(prc: untyped): untyped = error("ffiDtor: proc must have exactly one parameter (w: LibType)") let libParamName = formalParams[1][0] # e.g. w - let libTypeName = formalParams[1][1] # e.g. Waku + let libTypeName = formalParams[1][1] # e.g. MyLibObj let procNameStr = block: let raw = $procName @@ -1537,7 +1538,7 @@ macro ffiDtor*(prc: untyped): untyped = let exportedProcName = if procName.kind == nnkPostfix: procName[1] else: procName - let destroyResIdent = genSym(nskLet, "destroyRes") + let releaseResIdent = genSym(nskLet, "destroyRes") let ffiBody = newStmtList() @@ -1566,17 +1567,14 @@ macro ffiDtor*(prc: untyped): untyped = let poolIdent = ident($libTypeName & "FFIPool") ffiBody.add quote do: - let `destroyResIdent` = - `poolIdent`.destroyFFIContext(cast[ptr FFIContext[`libTypeName`]](ctx)) - if `destroyResIdent`.isErr(): - if not callback.isNil: - let errStr = "destroy failed: " & $`destroyResIdent`.error + let `releaseResIdent` = releaseFFIContext( + cast[ptr FFIContext[`libTypeName`]](ctx), callback, userData + ) + if `releaseResIdent`.isErr(): + if not callback.isNil(): + let errStr = "release failed: " & $`releaseResIdent`.error callback(RET_ERR, unsafeAddr errStr[0], cast[csize_t](errStr.len), userData) return RET_ERR - - ffiBody.add quote do: - if not callback.isNil: - callback(RET_OK, nil, 0, userData) return RET_OK let ffiProc = newProc( diff --git a/tests/test_ffi_context.nim b/tests/test_ffi_context.nim index dafc4e3..bf6989c 100644 --- a/tests/test_ffi_context.nim +++ b/tests/test_ffi_context.nim @@ -1,4 +1,4 @@ -import std/[locks, strutils, os] +import std/[locks, strutils, os, osproc, sequtils] import unittest2 import results import ../ffi @@ -129,18 +129,18 @@ suite "FFIContextPool": return check pool.destroyFFIContext(ctx).isOk() - test "slot is reused after destroy": + test "context is reused after destroy": var pool: FFIContextPool[TestLib] let ctx1 = pool.createFFIContext().valueOr: assert false, "createFFIContext(pool) failed: " & $error return check pool.destroyFFIContext(ctx1).isOk() - # After destroying, the same slot must be available again + # After destroying, the same context must be available again let ctx2 = pool.createFFIContext().valueOr: - assert false, "createFFIContext(pool) failed after slot release: " & $error + assert false, "createFFIContext(pool) failed after context release: " & $error return check pool.destroyFFIContext(ctx2).isOk() - check ctx1 == ctx2 # same array slot reused + check ctx1 == ctx2 # same context reused test "pool exhaustion returns error": var pool: FFIContextPool[TestLib] @@ -149,7 +149,7 @@ suite "FFIContextPool": ctxs[i] = pool.createFFIContext().valueOr: for j in 0 ..< i: discard pool.destroyFFIContext(ctxs[j]) - assert false, "createFFIContext(pool) failed at slot " & $i & ": " & $error + assert false, "createFFIContext(pool) failed at context " & $i & ": " & $error return # Pool is now full — next create must fail check pool.createFFIContext().isErr() @@ -421,6 +421,12 @@ proc testlib_create*( ): Future[Result[SimpleLib, string]] {.ffiCtor.} = return ok(SimpleLib(value: config.initialValue)) +# Records the value of the library object the destructor body saw, so a test can +# confirm the user cleanup body ran with the right lib state before teardown. +var gDestroyedValue {.threadvar.}: int +proc testlib_destroy*(lib: SimpleLib) {.ffiDtor.} = + gDestroyedValue = lib.value + suite "ffiCtor macro": test "creates context and returns pointer via callback": var d: CallbackData @@ -450,6 +456,123 @@ suite "ffiCtor macro": check SimpleLibFFIPool.destroyFFIContext(ctx).isOk() +proc createSimpleLib(initialValue: int): ptr FFIContext[SimpleLib] = + ## Helper: run the generated async ctor and return the live ctx. + var d: CallbackData + initCallbackData(d) + defer: + deinitCallbackData(d) + let ret = + testlib_create(ffiSerialize(SimpleConfig(initialValue: initialValue)).cstring, + testCallback, addr d) + doAssert not ret.isNil() + waitCallback(d) + doAssert d.retCode == RET_OK + return cast[ptr FFIContext[SimpleLib]](cast[uint](parseBiggestUInt(callbackMsg(d)))) + +suite "ffiDtor macro (async destroy + reuse)": + test "destroy fires RET_OK after teardown, frees myLib, and frees the context": + let ctx = createSimpleLib(5) + check not ctx[].myLib.isNil + check ctx[].myLib[].value == 5 + + var dD: CallbackData + initCallbackData(dD) + defer: + deinitCallbackData(dD) + + # Async destroy: the C return is just "accepted"; the real outcome arrives + # via the callback once the FFI thread has finished tearing the lib down. + check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK + waitCallback(dD) + check dD.retCode == RET_OK + check gDestroyedValue == 5 # the user cleanup body saw the live lib + check ctx[].myLib.isNil() # freed on the FFI thread + + # The context was freed from the FFI thread, so a fresh create reclaims it. + let ctx2 = createSimpleLib(9) + check ctx2 == ctx # same context, reused worker + fds + check ctx2[].myLib[].value == 9 + check SimpleLibFFIPool.destroyFFIContext(ctx2).isOk() + + test "destroy waits for an in-flight request before reporting RET_OK": + let ctx = createSimpleLib(1) + + # Dispatch a 500 ms handler and do NOT wait — it is in flight at destroy time. + var slow: CallbackData + initCallbackData(slow) + defer: + deinitCallbackData(slow) + check sendRequestToFFIThread(ctx, SlowRequest.ffiNewReq(testCallback, addr slow)).isOk() + + var dD: CallbackData + initCallbackData(dD) + defer: + deinitCallbackData(dD) + check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK + waitCallback(dD) + check dD.retCode == RET_OK + # Drained: the in-flight handler ran to completion before destroy reported OK. + check slow.called + check callbackMsg(slow) == "slow-done" + + let ctx2 = createSimpleLib(2) + check ctx2 == ctx + check SimpleLibFFIPool.destroyFFIContext(ctx2).isOk() + + test "requests are rejected once a destroy closes the gate": + let ctx = createSimpleLib(3) + + var dD: CallbackData + initCallbackData(dD) + defer: + deinitCallbackData(dD) + check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK + waitCallback(dD) + check dD.retCode == RET_OK + + # Gate stays closed until the context is reacquired: a late request must not + # dispatch onto a context about to be (or already) reused. + var d: CallbackData + initCallbackData(d) + defer: + deinitCallbackData(d) + check sendRequestToFFIThread( + ctx, SlowRequest.ffiNewReq(testCallback, addr d) + ).isErr() + + let ctx2 = createSimpleLib(4) + check ctx2 == ctx + check SimpleLibFFIPool.destroyFFIContext(ctx2).isOk() + + test "a stuck context is reported as RET_ERR rather than hanging": + let ctx = createSimpleLib(8) + + let savedTimeout = RecycleTimeout + RecycleTimeout = 150.milliseconds + defer: + RecycleTimeout = savedTimeout + + # In-flight handler outlasts the (shortened) drain timeout. + var slow: CallbackData + initCallbackData(slow) + defer: + deinitCallbackData(slow) + check sendRequestToFFIThread(ctx, SlowRequest.ffiNewReq(testCallback, addr slow)).isOk() + + var dD: CallbackData + initCallbackData(dD) + defer: + deinitCallbackData(dD) + check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK + waitCallback(dD) + check dD.retCode == RET_ERR # drain timed out -> ctx reported stuck + + # The stuck context is leaked (not reused); the handler still finishes on its + # own. Wait for it, then fully tear the leaked context down. + waitCallback(slow) + check SimpleLibFFIPool.destroyFFIContext(ctx).isOk() + # --------------------------------------------------------------------------- # Simplified .ffi. macro integration test # --------------------------------------------------------------------------- @@ -648,3 +771,122 @@ suite "ptr return type in .ffi.": waitCallback(freeD) check freeD.retCode == RET_OK + +# --------------------------------------------------------------------------- +# releaseFFIContext: park & reuse (fd-leak regression) +# --------------------------------------------------------------------------- + +proc countOpenFds(): int = + ## Number of open fds for this process, or -1 if not determinable on this + ## platform. On Linux we count /proc/self/fd; elsewhere we shell out to lsof + ## (skipped if lsof is unavailable, e.g. Windows). + when defined(linux): + var n = 0 + for _ in walkDir("/proc/self/fd"): + inc n + return n + else: + if findExe("lsof").len == 0: + return -1 + try: + let output = + execProcess("lsof", args = ["-p", $getCurrentProcessId()], options = {poUsePath}) + return output.splitLines().countIt(it.len > 0) + except CatchableError: + return -1 + +proc releaseAndWait[T](ctx: ptr FFIContext[T]): cint = + ## Test helper mirroring how a C consumer destroys a context: kick off the + ## (non-blocking) teardown and block on the callback, returning its retCode. + ## RET_OK means the lib's in-flight tasks finished and the context was parked. + var d: CallbackData + initCallbackData(d) + defer: + deinitCallbackData(d) + if ctx.releaseFFIContext(testCallback, addr d).isErr(): + return RET_ERR + waitCallback(d) + return d.retCode + +suite "releaseFFIContext (park & reuse)": + test "park returns the context and reuses the same live worker": + var pool: FFIContextPool[TestLib] + let ctx1 = pool.createFFIContext().valueOr: + check false + return + check ctx1.releaseAndWait() == RET_OK + + # Reacquire: must be the same context, with its worker still running. + let ctx2 = pool.createFFIContext().valueOr: + check false + return + check ctx1 == ctx2 + + var d: CallbackData + initCallbackData(d) + defer: + deinitCallbackData(d) + check sendRequestToFFIThread( + ctx2, PingRequest.ffiNewReq(testCallback, addr d, "reuse".cstring) + ).isOk() + waitCallback(d) + check d.retCode == RET_OK + check callbackMsg(d) == "pong:reuse" # reused worker still processes requests + + check pool.destroyFFIContext(ctx2).isOk() + + test "park drops the stale event callback and library pointer": + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: + check false + return + ctx.callbackState.callback = cast[pointer](testCallback) + ctx.callbackState.userData = cast[pointer](0xDEAD) + + check ctx.releaseAndWait() == RET_OK + check ctx.callbackState.callback.isNil() # a watchdog tick can't call a freed cb + check ctx.callbackState.userData.isNil() + check ctx.myLib.isNil() + + check pool.destroyFFIContext(ctx).isOk() + + test "fd usage stays bounded across many park/reuse cycles": + if countOpenFds() < 0: + skip() # no fd-counting facility on this platform + else: + var pool: FFIContextPool[TestLib] + + # Warm up: the first create builds the context's worker (its fds are allocated + # once here); parking keeps them open for reuse. + block: + let ctx = pool.createFFIContext().valueOr: + check false + return + check ctx.releaseAndWait() == RET_OK + + let baseline = countOpenFds() + + for _ in 0 ..< 20: + let ctx = pool.createFFIContext().valueOr: + check false + return + var d: CallbackData + initCallbackData(d) + check sendRequestToFFIThread( + ctx, PingRequest.ffiNewReq(testCallback, addr d, "x".cstring) + ).isOk() + waitCallback(d) + deinitCallbackData(d) + check ctx.releaseAndWait() == RET_OK + + let afterCycles = countOpenFds() + # Reuse must not grow fds. Before the fix each cycle leaked ~10 fds (4 + # ThreadSignalPtr socketpairs + 2 dispatcher kqueues); the small slack + # only tolerates unrelated runtime fd noise, not a per-cycle leak. + check afterCycles <= baseline + 5 + + # Tear the (still parked) context's worker down so the test leaves no threads. + let last = pool.createFFIContext().valueOr: + check false + return + check pool.destroyFFIContext(last).isOk()