diff --git a/ffi/ffi_context_pool.nim b/ffi/ffi_context_pool.nim index dc1e154..27659b0 100644 --- a/ffi/ffi_context_pool.nim +++ b/ffi/ffi_context_pool.nim @@ -13,13 +13,11 @@ type FFIContextPool*[T] = object ## to at most MaxFFIContexts * 2. slots: array[MaxFFIContexts, FFIContext[T]] inUse: array[MaxFFIContexts, Atomic[bool]] - -proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], string] = - for i in 0 ..< MaxFFIContexts: - var expected = false - if pool.inUse[i].compareExchange(expected, true): - return ok(pool.slots[i].addr) - return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)") + initialized: array[MaxFFIContexts, Atomic[bool]] + ## Whether a slot's worker (threads, chronos dispatcher and ThreadSignalPtrs) + ## has been built. Set on first acquisition and kept set across park/reuse, + ## so a reacquired slot reuses the same fds instead of allocating a fresh set + ## every create/destroy cycle. Cleared only by full teardown. proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) = for i in 0 ..< MaxFFIContexts: @@ -30,24 +28,55 @@ proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) = proc createFFIContext*[T]( pool: var FFIContextPool[T] ): Result[ptr FFIContext[T], string] = - ## Acquires a slot from the fixed pool and initialises it as an FFI context. - ## Bounded fd usage: at most MaxFFIContexts * 2 ThreadSignalPtr fds are ever open. - let ctx = pool.acquireSlot().valueOr: - return err("createFFIContext: acquireSlot failed: " & $error) - initContextResources(ctx).isOkOr: - pool.releaseSlot(ctx) - return err("createFFIContext: initContextResources failed: " & $error) - return ok(ctx) + ## Acquires a slot from the fixed pool. The slot's worker is built once on + ## first use and REUSED on every later acquisition of the same slot (a slot is + ## made reacquirable by releaseFFIContext, which parks it without tearing the + ## worker down). This is what keeps fd usage bounded: repeated create/destroy + ## cycles no longer leak a fresh set of ThreadSignalPtr/dispatcher fds. + for i in 0 ..< MaxFFIContexts: + var expected = false + if not pool.inUse[i].compareExchange(expected, true): + continue + let ctx = pool.slots[i].addr + if pool.initialized[i].load(): + ## Reused slot: worker threads, dispatcher and signals are already alive. + return ok(ctx) + initContextResources(ctx).isOkOr: + pool.inUse[i].store(false) + return err("createFFIContext: initContextResources failed: " & $error) + pool.initialized[i].store(true) + return ok(ctx) + return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)") + +proc releaseFFIContext*[T]( + pool: var FFIContextPool[T], ctx: ptr FFIContext[T] +): Result[void, string] = + ## Parks a context for reuse: returns its slot to the pool WITHOUT stopping the + ## worker threads, so the next createFFIContext reuses the same fds. The caller + ## MUST have already quiesced its library object (e.g. cancelled the object's + ## async tasks) on the FFI thread before calling this — the worker keeps + ## running. The stale C event callback is dropped so a watchdog tick on the + ## parked slot cannot invoke a callback whose user-data may already be freed. + ctx.callbackState = default(FFICallbackState) + ctx.myLib = nil + pool.releaseSlot(ctx) + return ok() proc destroyFFIContext*[T]( pool: var FFIContextPool[T], ctx: ptr FFIContext[T] ): Result[void, string] = - ## Stops the FFI context and returns its slot to the pool. If the FFI thread - ## is blocked and does not exit in time, the slot is leaked rather than - ## reclaimed — closing its resources while the thread is still live would be - ## unsafe. + ## Full teardown: stops/joins the worker threads and returns the slot to the + ## pool, marking it uninitialised so a later createFFIContext rebuilds it. Used + ## on creation failure and by non-pooling callers; steady-state cleanup should + ## use releaseFFIContext to keep fd usage bounded. If the FFI thread is blocked + ## and does not exit in time, the slot is leaked rather than reclaimed — + ## closing its resources while the thread is still live would be unsafe. ctx.stopAndJoinThreads().isOkOr: return err("destroyFFIContext(pool): " & $error) + for i in 0 ..< MaxFFIContexts: + if pool.slots[i].addr == ctx: + pool.initialized[i].store(false) + break pool.releaseSlot(ctx) return ok() diff --git a/tests/test_ffi_context.nim b/tests/test_ffi_context.nim index dafc4e3..11f71a8 100644 --- a/tests/test_ffi_context.nim +++ b/tests/test_ffi_context.nim @@ -1,4 +1,4 @@ -import std/[locks, strutils, os] +import std/[locks, strutils, os, osproc, sequtils] import unittest2 import results import ../ffi @@ -648,3 +648,109 @@ suite "ptr return type in .ffi.": waitCallback(freeD) check freeD.retCode == RET_OK + +# --------------------------------------------------------------------------- +# releaseFFIContext: park & reuse (fd-leak regression) +# --------------------------------------------------------------------------- + +proc countOpenFds(): int = + ## Number of open fds for this process, or -1 if not determinable on this + ## platform. On Linux we count /proc/self/fd; elsewhere we shell out to lsof + ## (skipped if lsof is unavailable, e.g. Windows). + when defined(linux): + var n = 0 + for _ in walkDir("/proc/self/fd"): + inc n + return n + else: + if findExe("lsof").len == 0: + return -1 + try: + let output = + execProcess("lsof", args = ["-p", $getCurrentProcessId()], options = {poUsePath}) + return output.splitLines().countIt(it.len > 0) + except CatchableError: + return -1 + +suite "releaseFFIContext (park & reuse)": + test "park returns the slot and reuses the same live worker": + var pool: FFIContextPool[TestLib] + let ctx1 = pool.createFFIContext().valueOr: + check false + return + check pool.releaseFFIContext(ctx1).isOk() + + # Reacquire: must be the same array slot, with its worker still running. + let ctx2 = pool.createFFIContext().valueOr: + check false + return + check ctx1 == ctx2 + + var d: CallbackData + initCallbackData(d) + defer: + deinitCallbackData(d) + check sendRequestToFFIThread( + ctx2, PingRequest.ffiNewReq(testCallback, addr d, "reuse".cstring) + ).isOk() + waitCallback(d) + check d.retCode == RET_OK + check callbackMsg(d) == "pong:reuse" # reused worker still processes requests + + check pool.destroyFFIContext(ctx2).isOk() + + test "park drops the stale event callback and library pointer": + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: + check false + return + ctx.callbackState.callback = cast[pointer](testCallback) + ctx.callbackState.userData = cast[pointer](0xDEAD) + + check pool.releaseFFIContext(ctx).isOk() + check ctx.callbackState.callback.isNil() # a watchdog tick can't call a freed cb + check ctx.callbackState.userData.isNil() + check ctx.myLib.isNil() + + check pool.destroyFFIContext(ctx).isOk() + + test "fd usage stays bounded across many park/reuse cycles": + if countOpenFds() < 0: + skip() # no fd-counting facility on this platform + else: + var pool: FFIContextPool[TestLib] + + # Warm up: the first create builds the slot's worker (its fds are allocated + # once here); parking keeps them open for reuse. + block: + let ctx = pool.createFFIContext().valueOr: + check false + return + check pool.releaseFFIContext(ctx).isOk() + + let baseline = countOpenFds() + + for _ in 0 ..< 20: + let ctx = pool.createFFIContext().valueOr: + check false + return + var d: CallbackData + initCallbackData(d) + check sendRequestToFFIThread( + ctx, PingRequest.ffiNewReq(testCallback, addr d, "x".cstring) + ).isOk() + waitCallback(d) + deinitCallbackData(d) + check pool.releaseFFIContext(ctx).isOk() + + let afterCycles = countOpenFds() + # Reuse must not grow fds. Before the fix each cycle leaked ~10 fds (4 + # ThreadSignalPtr socketpairs + 2 dispatcher kqueues); the small slack + # only tolerates unrelated runtime fd noise, not a per-cycle leak. + check afterCycles <= baseline + 5 + + # Tear the (still parked) slot's worker down so the test leaves no threads. + let last = pool.createFFIContext().valueOr: + check false + return + check pool.destroyFFIContext(last).isOk()