From 18946d0593356cd6df657261ac2aac219d35425c Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Thu, 4 Jun 2026 21:49:36 +0200 Subject: [PATCH] better ctx lifecycle management --- ffi/ffi_context.nim | 189 +++++++++++++++++++++++++++++++++++-- ffi/ffi_context_pool.nim | 56 ++++------- ffi/internal/ffi_macro.nim | 19 ++-- tests/test_ffi_context.nim | 144 +++++++++++++++++++++++++++- 4 files changed, 347 insertions(+), 61 deletions(-) diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index 2c7636b..f7a8a4a 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -2,7 +2,7 @@ {.pragma: callback, cdecl, raises: [], gcsafe.} {.passc: "-fPIC".} -import std/[atomics, locks, json, tables] +import std/[atomics, locks, json, tables, sequtils] import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results import ./ffi_types, ./ffi_thread_request, ./internal/ffi_macro, ./logging @@ -12,6 +12,26 @@ type FFICallbackState* = object callback*: pointer userData*: pointer +type CtxLifecycle {.pure.} = enum + ## Request-acceptance + recycle handshake for a pooled context, held as an + ## Atomic on FFIContext. ("Recycle" = drain the context and return its slot to + ## the pool for reuse, keeping the worker alive — unlike destroyFFIContext, which + ## fully tears the threads down.) Invariants: + ## * Requests are accepted ONLY in `Active`; the gate in sendRequestToFFIThread + ## and the watchdog ping both test `== Active`. + ## * Legal transitions, and who performs them: + ## Active -> RecyclePending requestRecycle (caller, under `lock`) + ## RecyclePending -> Recycling the FFI loop (one-shot compareExchange) + ## Recycling -> Active markReacquired (caller, on reuse) + ## (initContextResources starts a slot in `Active`.) + ## * The gate stays closed across BOTH RecyclePending and Recycling, so no + ## request can dispatch onto a context being recycled or about to be reused. + ## * Only the FFI loop makes the RecyclePending -> Recycling move, so the + ## recycle runs exactly once per request. + Active ## serving requests + RecyclePending ## recycle requested; FFI loop hasn't claimed it yet + Recycling ## FFI loop claimed it: draining handlers, then freeing + type FFIContext*[T] = object myLib*: ptr T # main library object (e.g., Waku, LibP2P, SDS, the one to be exposed as a library) @@ -33,6 +53,17 @@ type FFIContext*[T] = object userData*: pointer callbackState*: FFICallbackState running: Atomic[bool] # To control when the threads are running + lifecycle: Atomic[CtxLifecycle] + # Request gate + recycle handshake in one. See CtxLifecycle for the states, + # transitions and invariants. + recycleCallback: FFICallBack + # The destructor's callback, fired by the recycle handler with the outcome: + # RET_OK once drained, RET_ERR if it timed out. Set by requestRecycle. + recycleUserData: pointer + inUse: Atomic[bool] + # Whether the slot is claimed. createFFIContext claims it (false -> true); the + # recycle handler clears it once drained. On the slot so the owning thread can + # release it without reaching into the pool. registeredRequests: ptr Table[cstring, FFIRequestProc] # Pointer to with the registered requests at compile time @@ -97,6 +128,12 @@ proc sendRequestToFFIThread*( defer: ctx.lock.release() + ## A recycle closes this gate (under the same lock), so a queued or late sender + ## bails here instead of dispatching onto a slot about to be reused. + if ctx.lifecycle.load() != CtxLifecycle.Active: + deleteRequest(ffiRequest) + return err("FFI context is not accepting requests (being recycled)") + ## Sending the request let sentOk = ctx.reqChannel.trySend(ffiRequest) if not sentOk: @@ -162,6 +199,11 @@ proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} = debug "Watchdog thread exiting because FFIContext is not running" break + if ctx.lifecycle.load() != CtxLifecycle.Active: + ## Gate closed (being recycled, not yet reused): a ping would just fail + ## and spuriously trip onNotResponding. Skip until reused. + continue + let callback = proc( callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer ) {.cdecl, gcsafe, raises: [].} = @@ -186,6 +228,8 @@ proc processRequest[T]( request: ptr FFIThreadRequest, ctx: ptr FFIContext[T] ) {.async.} = ## Invoked within the FFI thread to process a request coming from the FFI API consumer thread. + ## ffiThreadBody keeps this proc's Future in the run loop's `pending` seq so a + ## destroy can await (or cancel) it before freeing myLib. let reqId = $request[].reqId ## The reqId determines which proc will handle the request. @@ -206,6 +250,10 @@ proc processRequest[T]( let res = try: await retFut + except CancelledError as exc: + ## Destroy timed out and cancelled us: turn it into an error so handleRes + ## still fires the callback and frees the request. + Result[string, string].err("Request cancelled during destroy: " & exc.msg) except AsyncError as exc: Result[string, string].err( "Async error in processRequest for " & reqId & ": " & exc.msg @@ -219,6 +267,78 @@ proc processRequest[T]( except Exception as exc: error "Unexpected exception in handleRes", error = exc.msg +proc freeLib[T](ctx: ptr FFIContext[T]) {.gcsafe.} = + ## Frees the createShared'd library object in ctx.myLib. Runs on the FFI + ## thread, which is what makes destroying its GC fields safe. + if ctx.myLib.isNil(): + return + when not defined(gcRefc): + ## orc: shared heap, so run the destructor here. The hook isn't inferred + ## gcsafe but only touches this (owning-thread) object, so assert it. + {.cast(gcsafe).}: + `=destroy`(ctx.myLib[]) + else: + ## refc: `=destroy` here hits the unsafe Selector path (see cleanUpResources). + ## Reclaim only the wrapper; leak the inner fields, like the signal fds. + discard + freeShared(ctx.myLib) + ctx.myLib = nil + +var RecycleTimeout* = 1500.milliseconds + ## Upper bound the recycle handler waits for in-flight handlers before it + ## cancels them and reports the ctx as stuck. The drain returns as soon as they + ## finish, so this only bounds a *stuck* handler. A `var` so tests can shorten it. + +proc recycleContext[T]( + ctx: ptr FFIContext[T], pending: ptr seq[Future[void]] +) {.async.} = + ## Recycle handler, on the FFI thread (requestRecycle already closed the gate): + ## drain the in-flight handlers, free the lib object, release the slot for reuse, + ## and fire the callback with the outcome. Never blocks the caller. + ## + ## `pending` is the run loop's seq of handler Futures, by ptr (async procs can't + ## take `var`); holding the instances lets us await and cancel them. + pending[].keepItIf(not it.finished()) + + ## 1. Let the in-flight handlers finish on their own, bounded by RecycleTimeout. + var naturallyDrained = pending[].len == 0 + if not naturallyDrained: + naturallyDrained = await allFutures(pending[]).withTimeout(RecycleTimeout) + + ## 2. If any are wedged, cancel them and give the cancellations a bounded moment + ## to unwind, so the slot can be reclaimed rather than leaked. + var safeToRecycle = naturallyDrained + if not naturallyDrained: + for fut in pending[]: + if not fut.finished(): + fut.cancelSoon() + safeToRecycle = await allFutures(pending[]).withTimeout(RecycleTimeout) + + let cb = ctx.recycleCallback + let ud = ctx.recycleUserData + ctx.recycleCallback = nil + + if safeToRecycle: + ## Nothing can touch the context now. Free the lib here, then release the slot + ## BEFORE the callback (the atomic store publishes these writes to whoever + ## reclaims it) so a caller reacquiring on the callback finds it already free. + freeLib(ctx) + ctx.callbackState = default(FFICallbackState) + pending[].setLen(0) + ctx.unclaim() + + if not cb.isNil(): + foreignThreadGc: + ## Never hand the callback nil: empty string on success, reason on timeout. + ## An empty string's cstring still points at a '\0', so msg[0] is a safe + ## address with len 0. + let msg = + if naturallyDrained: "" + else: "recycle: in-flight requests did not finish in time" + let cmsg = msg.cstring + let retCode = if naturallyDrained: RET_OK else: RET_ERR + cb(retCode, unsafeAddr cmsg[0], cast[csize_t](msg.len), ud) + proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = ## FFI thread body that attends library user API requests ffiCurrentCallbackState = addr ctx[].callbackState @@ -233,11 +353,20 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} = - var ffiReqHandler: T - ## Holds the main library object, i.e., in charge of handling the ffi requests. - ## e.g., Waku, LibP2P, SDS, etc. + ## Handler Futures live here on the FFI thread's heap, NOT on FFIContext + ## (shared pool memory, must stay GC-free); holding them lets destroy await + ## and cancel them. + var pending: seq[Future[void]] while ctx.running.load(): + ## Recycle requested: claim it (RecyclePending -> Recycling, one-shot) and run + ## the recycle on this owning thread, then keep looping so the worker stays + ## alive for the slot's next reuse. + var expected = CtxLifecycle.RecyclePending + if ctx.lifecycle.compareExchange(expected, CtxLifecycle.Recycling): + await recycleContext(ctx, addr pending) + continue + let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds) if not gotSignal: continue @@ -247,11 +376,10 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = if not ctx.reqChannel.tryRecv(request): continue - if ctx.myLib.isNil(): - ctx.myLib = addr ffiReqHandler - - ## Handle the request - asyncSpawn processRequest(request, ctx) + ## Dispatch and remember the handler's Future (pruning finished ones) so a + ## later recycle can await or cancel it. + pending.keepItIf(not it.finished()) + pending.add(processRequest(request, ctx)) let fireRes = ctx.reqReceivedSignal.fireSync() if fireRes.isErr(): @@ -323,6 +451,7 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = ctx.registeredRequests = addr ffi_types.registeredRequests + ctx.lifecycle.store(CtxLifecycle.Active) ctx.running.store(true) try: @@ -385,3 +514,45 @@ proc stopAndJoinThreads*[T](ctx: ptr FFIContext[T]): Result[void, string] = joinThread(ctx.ffiThread) joinThread(ctx.watchdogThread) return ok() + +proc requestRecycle*[T]( + ctx: ptr FFIContext[T], callback: FFICallBack, userData: pointer +): Result[void, string] = + ## Asks the FFI thread to recycle the context and fire `callback` with the + ## outcome (RET_OK drained, RET_ERR stuck). NON-BLOCKING. + ## + ## Order matters: set the callback before flipping to RecyclePending (the flip is + ## the trigger), under `lock` to serialise the gate with sendRequestToFFIThread. + ctx.recycleCallback = callback + ctx.recycleUserData = userData + + ctx.lock.acquire() + ctx.lifecycle.store(CtxLifecycle.RecyclePending) + ctx.lock.release() + + let fired = ctx.reqSignal.fireSync().valueOr: + return err("requestRecycle: failed to signal the FFI thread: " & $error) + if not fired: + return err("requestRecycle: failed to signal the FFI thread in time") + return ok() + +proc markReacquired*[T](ctx: ptr FFIContext[T]) = + ## Re-arms a recycled context when its slot is reacquired by createFFIContext: + ## moves Recycling -> Active (re-opening the gate). The FFI thread's `pending` + ## seq was already drained and myLib freed by the recycle handler. + ctx.lifecycle.store(CtxLifecycle.Active) + +proc tryClaim*[T](ctx: ptr FFIContext[T]): bool = + ## Atomically claim this slot (false -> true). Returns true if we won it, false + ## if it was already claimed. Used by createFFIContext to hand out a free slot. + var expected = false + ctx.inUse.compareExchange(expected, true) + +proc unclaim*[T](ctx: ptr FFIContext[T]) = + ## Mark the slot free for reuse. Called by the recycle handler on the FFI thread + ## once teardown is done, and on creation failure / full teardown. + ctx.inUse.store(false) + +proc isClaimed*[T](ctx: ptr FFIContext[T]): bool = + ## Whether the slot is currently claimed by a consumer. + ctx.inUse.load() diff --git a/ffi/ffi_context_pool.nim b/ffi/ffi_context_pool.nim index 5644121..720807e 100644 --- a/ffi/ffi_context_pool.nim +++ b/ffi/ffi_context_pool.nim @@ -1,6 +1,6 @@ import std/atomics import results -import ./ffi_context +import ./ffi_context, ./ffi_types const MaxFFIContexts* = 32 ## Maximum number of concurrently live FFI contexts when using FFIContextPool. @@ -12,19 +12,12 @@ type FFIContextPool*[T] = object ## and bounds the total number of file descriptors consumed by ThreadSignalPtrs ## to at most MaxFFIContexts * 2. slots: array[MaxFFIContexts, FFIContext[T]] - inUse: array[MaxFFIContexts, Atomic[bool]] initialized: array[MaxFFIContexts, Atomic[bool]] ## Whether a slot's worker (threads, chronos dispatcher and ThreadSignalPtrs) ## has been built. Set on first acquisition and kept set across park/reuse, ## so a reacquired slot reuses the same fds instead of allocating a fresh set ## every create/destroy cycle. Cleared only by full teardown. -proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) = - for i in 0 ..< MaxFFIContexts: - if pool.slots[i].addr == ctx: - pool.inUse[i].store(false) - return - proc createFFIContext*[T]( pool: var FFIContextPool[T] ): Result[ptr FFIContext[T], string] = @@ -34,44 +27,35 @@ proc createFFIContext*[T]( ## worker down). This is what keeps fd usage bounded: repeated create/destroy ## cycles no longer leak a fresh set of ThreadSignalPtr/dispatcher fds. for i in 0 ..< MaxFFIContexts: - var expected = false - if not pool.inUse[i].compareExchange(expected, true): - continue let ctx = pool.slots[i].addr + if not ctx.tryClaim(): + continue if pool.initialized[i].load(): - ## Reused slot: worker threads, dispatcher and signals are already alive. + ## Reused slot: a prior destroy drained and released it, worker still alive. + ## Re-arm the gate and hand it back. + ctx.markReacquired() return ok(ctx) initContextResources(ctx).isOkOr: - pool.inUse[i].store(false) + ctx.unclaim() return err("createFFIContext: initContextResources failed: " & $error) pool.initialized[i].store(true) return ok(ctx) return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)") proc releaseFFIContext*[T]( - pool: var FFIContextPool[T], ctx: ptr FFIContext[T] + pool: var FFIContextPool[T], + ctx: ptr FFIContext[T], + callback: FFICallBack, + userData: pointer, ): Result[void, string] = - ## Parks a context for reuse: returns its slot to the pool WITHOUT stopping the - ## worker threads, so the next createFFIContext reuses the same fds. This is the - ## steady-state cleanup path, called by the generated destructor (ffiDtor); - ## destroyFFIContext is only for creation failure and non-pooling callers. + ## Parks a context for reuse without stopping its worker, so the next + ## createFFIContext reuses the same threads and fds. Steady-state cleanup path + ## for the generated destructor; destroyFFIContext is for failure/non-pool use. ## - ## Runs on the CALLER's thread, not the FFI worker thread, and does NOT itself - ## wait for in-flight work to finish. It is safe to park here because the - ## framework processes one request at a time (see sendRequestToFFIThread): by - ## the time the destructor calls this, the worker has finished the previous - ## request and is idle (looping on reqSignal), so there is no handler still - ## touching the slot when it is reused. - ## - ## Clearing callbackState removes the stored C event callback. The - ## worker/watchdog threads stay alive after parking, so an event could still - ## fire on this slot; with no callback set that event does nothing, instead of - ## calling back into a consumer that has already released the context (whose - ## user-data pointer may now be freed). - ctx.callbackState = default(FFICallbackState) - ctx.myLib = nil - pool.releaseSlot(ctx) - return ok() + ## NON-BLOCKING: the FFI thread drains the handlers, frees the lib and releases + ## the slot, then fires `callback` (RET_OK drained, RET_ERR stuck). The slot + ## returns to the pool from that thread, so a reused slot never carries a straggler. + return ctx.requestRecycle(callback, userData) proc destroyFFIContext*[T]( pool: var FFIContextPool[T], ctx: ptr FFIContext[T] @@ -88,7 +72,7 @@ proc destroyFFIContext*[T]( if pool.slots[i].addr == ctx: pool.initialized[i].store(false) break - pool.releaseSlot(ctx) + ctx.unclaim() return ok() proc isValidCtx*[T](pool: var FFIContextPool[T], ctx: pointer): bool = @@ -99,5 +83,5 @@ proc isValidCtx*[T](pool: var FFIContextPool[T], ctx: pointer): bool = return false for i in 0 ..< MaxFFIContexts: if cast[pointer](pool.slots[i].addr) == ctx: - return pool.inUse[i].load() + return cast[ptr FFIContext[T]](ctx).isClaimed() return false diff --git a/ffi/internal/ffi_macro.nim b/ffi/internal/ffi_macro.nim index 9e62757..b796185 100644 --- a/ffi/internal/ffi_macro.nim +++ b/ffi/internal/ffi_macro.nim @@ -1565,24 +1565,19 @@ macro ffiDtor*(prc: untyped): untyped = ffiBody.add(bodyNode) let poolIdent = ident($libTypeName & "FFIPool") - # Park the slot (releaseFFIContext) instead of tearing it down - # (destroyFFIContext) so the next createFFIContext reuses the same worker and - # fds — this is what keeps fd usage bounded across create/destroy cycles. - # Safe because the framework processes one request at a time - # (see sendRequestToFFIThread): by the time this destructor runs the worker is - # idle, not mid-request, so parking cannot race an in-flight handler. + # Hand teardown to the FFI thread (releaseFFIContext -> requestRecycle), recycling + # the slot for reuse to keep fd usage bounded. NON-BLOCKING: returns RET_OK once + # accepted; the real outcome arrives via `callback`, so callers must wait on it, + # not the return code. ffiBody.add quote do: - let `destroyResIdent` = - `poolIdent`.releaseFFIContext(cast[ptr FFIContext[`libTypeName`]](ctx)) + let `destroyResIdent` = `poolIdent`.releaseFFIContext( + cast[ptr FFIContext[`libTypeName`]](ctx), callback, userData + ) if `destroyResIdent`.isErr(): if not callback.isNil: let errStr = "destroy failed: " & $`destroyResIdent`.error callback(RET_ERR, unsafeAddr errStr[0], cast[csize_t](errStr.len), userData) return RET_ERR - - ffiBody.add quote do: - if not callback.isNil: - callback(RET_OK, nil, 0, userData) return RET_OK let ffiProc = newProc( diff --git a/tests/test_ffi_context.nim b/tests/test_ffi_context.nim index 11f71a8..b6a2c95 100644 --- a/tests/test_ffi_context.nim +++ b/tests/test_ffi_context.nim @@ -421,6 +421,12 @@ proc testlib_create*( ): Future[Result[SimpleLib, string]] {.ffiCtor.} = return ok(SimpleLib(value: config.initialValue)) +# Records the value of the library object the destructor body saw, so a test can +# confirm the user cleanup body ran with the right lib state before teardown. +var gDestroyedValue {.threadvar.}: int +proc testlib_destroy*(lib: SimpleLib) {.ffiDtor.} = + gDestroyedValue = lib.value + suite "ffiCtor macro": test "creates context and returns pointer via callback": var d: CallbackData @@ -450,6 +456,123 @@ suite "ffiCtor macro": check SimpleLibFFIPool.destroyFFIContext(ctx).isOk() +proc createSimpleLib(initialValue: int): ptr FFIContext[SimpleLib] = + ## Helper: run the generated async ctor and return the live ctx. + var d: CallbackData + initCallbackData(d) + defer: + deinitCallbackData(d) + let ret = + testlib_create(ffiSerialize(SimpleConfig(initialValue: initialValue)).cstring, + testCallback, addr d) + doAssert not ret.isNil() + waitCallback(d) + doAssert d.retCode == RET_OK + return cast[ptr FFIContext[SimpleLib]](cast[uint](parseBiggestUInt(callbackMsg(d)))) + +suite "ffiDtor macro (async destroy + reuse)": + test "destroy fires RET_OK after teardown, frees myLib, and frees the slot": + let ctx = createSimpleLib(5) + check not ctx[].myLib.isNil + check ctx[].myLib[].value == 5 + + var dD: CallbackData + initCallbackData(dD) + defer: + deinitCallbackData(dD) + + # Async destroy: the C return is just "accepted"; the real outcome arrives + # via the callback once the FFI thread has finished tearing the lib down. + check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK + waitCallback(dD) + check dD.retCode == RET_OK + check gDestroyedValue == 5 # the user cleanup body saw the live lib + check ctx[].myLib.isNil() # freed on the FFI thread + + # The slot was freed from the FFI thread, so a fresh create reclaims it. + let ctx2 = createSimpleLib(9) + check ctx2 == ctx # same slot, reused worker + fds + check ctx2[].myLib[].value == 9 + check SimpleLibFFIPool.destroyFFIContext(ctx2).isOk() + + test "destroy waits for an in-flight request before reporting RET_OK": + let ctx = createSimpleLib(1) + + # Dispatch a 500 ms handler and do NOT wait — it is in flight at destroy time. + var slow: CallbackData + initCallbackData(slow) + defer: + deinitCallbackData(slow) + check sendRequestToFFIThread(ctx, SlowRequest.ffiNewReq(testCallback, addr slow)).isOk() + + var dD: CallbackData + initCallbackData(dD) + defer: + deinitCallbackData(dD) + check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK + waitCallback(dD) + check dD.retCode == RET_OK + # Drained: the in-flight handler ran to completion before destroy reported OK. + check slow.called + check callbackMsg(slow) == "slow-done" + + let ctx2 = createSimpleLib(2) + check ctx2 == ctx + check SimpleLibFFIPool.destroyFFIContext(ctx2).isOk() + + test "requests are rejected once a destroy closes the gate": + let ctx = createSimpleLib(3) + + var dD: CallbackData + initCallbackData(dD) + defer: + deinitCallbackData(dD) + check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK + waitCallback(dD) + check dD.retCode == RET_OK + + # Gate stays closed until the slot is reacquired: a late request must not + # dispatch onto a context about to be (or already) reused. + var d: CallbackData + initCallbackData(d) + defer: + deinitCallbackData(d) + check sendRequestToFFIThread( + ctx, SlowRequest.ffiNewReq(testCallback, addr d) + ).isErr() + + let ctx2 = createSimpleLib(4) + check ctx2 == ctx + check SimpleLibFFIPool.destroyFFIContext(ctx2).isOk() + + test "a stuck context is reported as RET_ERR rather than hanging": + let ctx = createSimpleLib(8) + + let savedTimeout = RecycleTimeout + RecycleTimeout = 150.milliseconds + defer: + RecycleTimeout = savedTimeout + + # In-flight handler outlasts the (shortened) drain timeout. + var slow: CallbackData + initCallbackData(slow) + defer: + deinitCallbackData(slow) + check sendRequestToFFIThread(ctx, SlowRequest.ffiNewReq(testCallback, addr slow)).isOk() + + var dD: CallbackData + initCallbackData(dD) + defer: + deinitCallbackData(dD) + check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK + waitCallback(dD) + check dD.retCode == RET_ERR # drain timed out -> ctx reported stuck + + # The stuck slot is leaked (not reused); the handler still finishes on its + # own. Wait for it, then fully tear the leaked slot down. + waitCallback(slow) + check SimpleLibFFIPool.destroyFFIContext(ctx).isOk() + # --------------------------------------------------------------------------- # Simplified .ffi. macro integration test # --------------------------------------------------------------------------- @@ -672,13 +795,26 @@ proc countOpenFds(): int = except CatchableError: return -1 +proc releaseAndWait[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]): cint = + ## Test helper mirroring how a C consumer destroys a context: kick off the + ## (non-blocking) teardown and block on the callback, returning its retCode. + ## RET_OK means the lib's in-flight tasks finished and the slot was parked. + var d: CallbackData + initCallbackData(d) + defer: + deinitCallbackData(d) + if pool.releaseFFIContext(ctx, testCallback, addr d).isErr(): + return RET_ERR + waitCallback(d) + return d.retCode + suite "releaseFFIContext (park & reuse)": test "park returns the slot and reuses the same live worker": var pool: FFIContextPool[TestLib] let ctx1 = pool.createFFIContext().valueOr: check false return - check pool.releaseFFIContext(ctx1).isOk() + check pool.releaseAndWait(ctx1) == RET_OK # Reacquire: must be the same array slot, with its worker still running. let ctx2 = pool.createFFIContext().valueOr: @@ -707,7 +843,7 @@ suite "releaseFFIContext (park & reuse)": ctx.callbackState.callback = cast[pointer](testCallback) ctx.callbackState.userData = cast[pointer](0xDEAD) - check pool.releaseFFIContext(ctx).isOk() + check pool.releaseAndWait(ctx) == RET_OK check ctx.callbackState.callback.isNil() # a watchdog tick can't call a freed cb check ctx.callbackState.userData.isNil() check ctx.myLib.isNil() @@ -726,7 +862,7 @@ suite "releaseFFIContext (park & reuse)": let ctx = pool.createFFIContext().valueOr: check false return - check pool.releaseFFIContext(ctx).isOk() + check pool.releaseAndWait(ctx) == RET_OK let baseline = countOpenFds() @@ -741,7 +877,7 @@ suite "releaseFFIContext (park & reuse)": ).isOk() waitCallback(d) deinitCallbackData(d) - check pool.releaseFFIContext(ctx).isOk() + check pool.releaseAndWait(ctx) == RET_OK let afterCycles = countOpenFds() # Reuse must not grow fds. Before the fix each cycle leaked ~10 fds (4