From c77581b694a9db5f0f43495c5490e32be49cf699 Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Thu, 4 Jun 2026 12:07:17 +0200 Subject: [PATCH] fix(pool): reuse parked contexts to stop per-cycle fd leak MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit destroyFFIContext stopped and joined the worker threads on every release, and createFFIContext rebuilt them on the next acquire. Each cycle therefore allocated a fresh worker — 4 ThreadSignalPtr socketpairs + the ffi/watchdog thread chronos dispatcher kqueues — and never reclaimed the old ones (the pool deliberately skips closing them, relying on slot reuse that never actually reused the resources). A consumer that creates and destroys contexts repeatedly (e.g. nim-sds ReliabilityManager) leaked ~10 fds per cycle, unbounded. Make the pool genuinely reuse a slot's worker: - Track per-slot `initialized`; createFFIContext builds the worker once and reuses it on every later acquisition of the same slot. - Add releaseFFIContext: parks a context (returns its slot) WITHOUT stopping the threads, so the next acquire reuses the same fds. It also drops the stale C event callback so a watchdog tick on a parked slot cannot invoke a callback whose user-data the consumer may already have freed. The caller is responsible for quiescing its library object (on the FFI thread) first. - destroyFFIContext keeps full-teardown semantics for error/non-pooling paths and now marks the slot uninitialised so a later acquire rebuilds it. Tests: add a park & reuse suite (same-slot live-worker reuse, callback/lib pointer dropped on park, and fd usage bounded across 20 park/reuse cycles). The fd test fails by ~10 fds/cycle against the pre-fix behaviour. Green under both --mm:refc and --mm:orc. Co-Authored-By: Claude Opus 4.8 --- ffi/ffi_context_pool.nim | 67 ++++++++++++++++------- tests/test_ffi_context.nim | 108 ++++++++++++++++++++++++++++++++++++- 2 files changed, 155 insertions(+), 20 deletions(-) diff --git a/ffi/ffi_context_pool.nim b/ffi/ffi_context_pool.nim index dc1e154..27659b0 100644 --- a/ffi/ffi_context_pool.nim +++ b/ffi/ffi_context_pool.nim @@ -13,13 +13,11 @@ type FFIContextPool*[T] = object ## to at most MaxFFIContexts * 2. slots: array[MaxFFIContexts, FFIContext[T]] inUse: array[MaxFFIContexts, Atomic[bool]] - -proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], string] = - for i in 0 ..< MaxFFIContexts: - var expected = false - if pool.inUse[i].compareExchange(expected, true): - return ok(pool.slots[i].addr) - return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)") + initialized: array[MaxFFIContexts, Atomic[bool]] + ## Whether a slot's worker (threads, chronos dispatcher and ThreadSignalPtrs) + ## has been built. Set on first acquisition and kept set across park/reuse, + ## so a reacquired slot reuses the same fds instead of allocating a fresh set + ## every create/destroy cycle. Cleared only by full teardown. proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) = for i in 0 ..< MaxFFIContexts: @@ -30,24 +28,55 @@ proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) = proc createFFIContext*[T]( pool: var FFIContextPool[T] ): Result[ptr FFIContext[T], string] = - ## Acquires a slot from the fixed pool and initialises it as an FFI context. - ## Bounded fd usage: at most MaxFFIContexts * 2 ThreadSignalPtr fds are ever open. - let ctx = pool.acquireSlot().valueOr: - return err("createFFIContext: acquireSlot failed: " & $error) - initContextResources(ctx).isOkOr: - pool.releaseSlot(ctx) - return err("createFFIContext: initContextResources failed: " & $error) - return ok(ctx) + ## Acquires a slot from the fixed pool. The slot's worker is built once on + ## first use and REUSED on every later acquisition of the same slot (a slot is + ## made reacquirable by releaseFFIContext, which parks it without tearing the + ## worker down). This is what keeps fd usage bounded: repeated create/destroy + ## cycles no longer leak a fresh set of ThreadSignalPtr/dispatcher fds. + for i in 0 ..< MaxFFIContexts: + var expected = false + if not pool.inUse[i].compareExchange(expected, true): + continue + let ctx = pool.slots[i].addr + if pool.initialized[i].load(): + ## Reused slot: worker threads, dispatcher and signals are already alive. + return ok(ctx) + initContextResources(ctx).isOkOr: + pool.inUse[i].store(false) + return err("createFFIContext: initContextResources failed: " & $error) + pool.initialized[i].store(true) + return ok(ctx) + return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)") + +proc releaseFFIContext*[T]( + pool: var FFIContextPool[T], ctx: ptr FFIContext[T] +): Result[void, string] = + ## Parks a context for reuse: returns its slot to the pool WITHOUT stopping the + ## worker threads, so the next createFFIContext reuses the same fds. The caller + ## MUST have already quiesced its library object (e.g. cancelled the object's + ## async tasks) on the FFI thread before calling this — the worker keeps + ## running. The stale C event callback is dropped so a watchdog tick on the + ## parked slot cannot invoke a callback whose user-data may already be freed. + ctx.callbackState = default(FFICallbackState) + ctx.myLib = nil + pool.releaseSlot(ctx) + return ok() proc destroyFFIContext*[T]( pool: var FFIContextPool[T], ctx: ptr FFIContext[T] ): Result[void, string] = - ## Stops the FFI context and returns its slot to the pool. If the FFI thread - ## is blocked and does not exit in time, the slot is leaked rather than - ## reclaimed — closing its resources while the thread is still live would be - ## unsafe. + ## Full teardown: stops/joins the worker threads and returns the slot to the + ## pool, marking it uninitialised so a later createFFIContext rebuilds it. Used + ## on creation failure and by non-pooling callers; steady-state cleanup should + ## use releaseFFIContext to keep fd usage bounded. If the FFI thread is blocked + ## and does not exit in time, the slot is leaked rather than reclaimed — + ## closing its resources while the thread is still live would be unsafe. ctx.stopAndJoinThreads().isOkOr: return err("destroyFFIContext(pool): " & $error) + for i in 0 ..< MaxFFIContexts: + if pool.slots[i].addr == ctx: + pool.initialized[i].store(false) + break pool.releaseSlot(ctx) return ok() diff --git a/tests/test_ffi_context.nim b/tests/test_ffi_context.nim index dafc4e3..11f71a8 100644 --- a/tests/test_ffi_context.nim +++ b/tests/test_ffi_context.nim @@ -1,4 +1,4 @@ -import std/[locks, strutils, os] +import std/[locks, strutils, os, osproc, sequtils] import unittest2 import results import ../ffi @@ -648,3 +648,109 @@ suite "ptr return type in .ffi.": waitCallback(freeD) check freeD.retCode == RET_OK + +# --------------------------------------------------------------------------- +# releaseFFIContext: park & reuse (fd-leak regression) +# --------------------------------------------------------------------------- + +proc countOpenFds(): int = + ## Number of open fds for this process, or -1 if not determinable on this + ## platform. On Linux we count /proc/self/fd; elsewhere we shell out to lsof + ## (skipped if lsof is unavailable, e.g. Windows). + when defined(linux): + var n = 0 + for _ in walkDir("/proc/self/fd"): + inc n + return n + else: + if findExe("lsof").len == 0: + return -1 + try: + let output = + execProcess("lsof", args = ["-p", $getCurrentProcessId()], options = {poUsePath}) + return output.splitLines().countIt(it.len > 0) + except CatchableError: + return -1 + +suite "releaseFFIContext (park & reuse)": + test "park returns the slot and reuses the same live worker": + var pool: FFIContextPool[TestLib] + let ctx1 = pool.createFFIContext().valueOr: + check false + return + check pool.releaseFFIContext(ctx1).isOk() + + # Reacquire: must be the same array slot, with its worker still running. + let ctx2 = pool.createFFIContext().valueOr: + check false + return + check ctx1 == ctx2 + + var d: CallbackData + initCallbackData(d) + defer: + deinitCallbackData(d) + check sendRequestToFFIThread( + ctx2, PingRequest.ffiNewReq(testCallback, addr d, "reuse".cstring) + ).isOk() + waitCallback(d) + check d.retCode == RET_OK + check callbackMsg(d) == "pong:reuse" # reused worker still processes requests + + check pool.destroyFFIContext(ctx2).isOk() + + test "park drops the stale event callback and library pointer": + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: + check false + return + ctx.callbackState.callback = cast[pointer](testCallback) + ctx.callbackState.userData = cast[pointer](0xDEAD) + + check pool.releaseFFIContext(ctx).isOk() + check ctx.callbackState.callback.isNil() # a watchdog tick can't call a freed cb + check ctx.callbackState.userData.isNil() + check ctx.myLib.isNil() + + check pool.destroyFFIContext(ctx).isOk() + + test "fd usage stays bounded across many park/reuse cycles": + if countOpenFds() < 0: + skip() # no fd-counting facility on this platform + else: + var pool: FFIContextPool[TestLib] + + # Warm up: the first create builds the slot's worker (its fds are allocated + # once here); parking keeps them open for reuse. + block: + let ctx = pool.createFFIContext().valueOr: + check false + return + check pool.releaseFFIContext(ctx).isOk() + + let baseline = countOpenFds() + + for _ in 0 ..< 20: + let ctx = pool.createFFIContext().valueOr: + check false + return + var d: CallbackData + initCallbackData(d) + check sendRequestToFFIThread( + ctx, PingRequest.ffiNewReq(testCallback, addr d, "x".cstring) + ).isOk() + waitCallback(d) + deinitCallbackData(d) + check pool.releaseFFIContext(ctx).isOk() + + let afterCycles = countOpenFds() + # Reuse must not grow fds. Before the fix each cycle leaked ~10 fds (4 + # ThreadSignalPtr socketpairs + 2 dispatcher kqueues); the small slack + # only tolerates unrelated runtime fd noise, not a per-cycle leak. + check afterCycles <= baseline + 5 + + # Tear the (still parked) slot's worker down so the test leaves no threads. + let last = pool.createFFIContext().valueOr: + check false + return + check pool.destroyFFIContext(last).isOk()