mirror of
https://github.com/logos-messaging/nim-ffi.git
synced 2026-06-25 02:39:47 +00:00
fix(pool): reuse parked contexts to stop per-cycle fd leak
destroyFFIContext stopped and joined the worker threads on every release, and createFFIContext rebuilt them on the next acquire. Each cycle therefore allocated a fresh worker — 4 ThreadSignalPtr socketpairs + the ffi/watchdog thread chronos dispatcher kqueues — and never reclaimed the old ones (the pool deliberately skips closing them, relying on slot reuse that never actually reused the resources). A consumer that creates and destroys contexts repeatedly (e.g. nim-sds ReliabilityManager) leaked ~10 fds per cycle, unbounded. Make the pool genuinely reuse a slot's worker: - Track per-slot `initialized`; createFFIContext builds the worker once and reuses it on every later acquisition of the same slot. - Add releaseFFIContext: parks a context (returns its slot) WITHOUT stopping the threads, so the next acquire reuses the same fds. It also drops the stale C event callback so a watchdog tick on a parked slot cannot invoke a callback whose user-data the consumer may already have freed. The caller is responsible for quiescing its library object (on the FFI thread) first. - destroyFFIContext keeps full-teardown semantics for error/non-pooling paths and now marks the slot uninitialised so a later acquire rebuilds it. Tests: add a park & reuse suite (same-slot live-worker reuse, callback/lib pointer dropped on park, and fd usage bounded across 20 park/reuse cycles). The fd test fails by ~10 fds/cycle against the pre-fix behaviour. Green under both --mm:refc and --mm:orc. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
fb25f069d2
commit
c77581b694
@ -13,13 +13,11 @@ type FFIContextPool*[T] = object
|
||||
## to at most MaxFFIContexts * 2.
|
||||
slots: array[MaxFFIContexts, FFIContext[T]]
|
||||
inUse: array[MaxFFIContexts, Atomic[bool]]
|
||||
|
||||
proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], string] =
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
var expected = false
|
||||
if pool.inUse[i].compareExchange(expected, true):
|
||||
return ok(pool.slots[i].addr)
|
||||
return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)")
|
||||
initialized: array[MaxFFIContexts, Atomic[bool]]
|
||||
## Whether a slot's worker (threads, chronos dispatcher and ThreadSignalPtrs)
|
||||
## has been built. Set on first acquisition and kept set across park/reuse,
|
||||
## so a reacquired slot reuses the same fds instead of allocating a fresh set
|
||||
## every create/destroy cycle. Cleared only by full teardown.
|
||||
|
||||
proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) =
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
@ -30,24 +28,55 @@ proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) =
|
||||
proc createFFIContext*[T](
|
||||
pool: var FFIContextPool[T]
|
||||
): Result[ptr FFIContext[T], string] =
|
||||
## Acquires a slot from the fixed pool and initialises it as an FFI context.
|
||||
## Bounded fd usage: at most MaxFFIContexts * 2 ThreadSignalPtr fds are ever open.
|
||||
let ctx = pool.acquireSlot().valueOr:
|
||||
return err("createFFIContext: acquireSlot failed: " & $error)
|
||||
initContextResources(ctx).isOkOr:
|
||||
pool.releaseSlot(ctx)
|
||||
return err("createFFIContext: initContextResources failed: " & $error)
|
||||
return ok(ctx)
|
||||
## Acquires a slot from the fixed pool. The slot's worker is built once on
|
||||
## first use and REUSED on every later acquisition of the same slot (a slot is
|
||||
## made reacquirable by releaseFFIContext, which parks it without tearing the
|
||||
## worker down). This is what keeps fd usage bounded: repeated create/destroy
|
||||
## cycles no longer leak a fresh set of ThreadSignalPtr/dispatcher fds.
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
var expected = false
|
||||
if not pool.inUse[i].compareExchange(expected, true):
|
||||
continue
|
||||
let ctx = pool.slots[i].addr
|
||||
if pool.initialized[i].load():
|
||||
## Reused slot: worker threads, dispatcher and signals are already alive.
|
||||
return ok(ctx)
|
||||
initContextResources(ctx).isOkOr:
|
||||
pool.inUse[i].store(false)
|
||||
return err("createFFIContext: initContextResources failed: " & $error)
|
||||
pool.initialized[i].store(true)
|
||||
return ok(ctx)
|
||||
return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)")
|
||||
|
||||
proc releaseFFIContext*[T](
|
||||
pool: var FFIContextPool[T], ctx: ptr FFIContext[T]
|
||||
): Result[void, string] =
|
||||
## Parks a context for reuse: returns its slot to the pool WITHOUT stopping the
|
||||
## worker threads, so the next createFFIContext reuses the same fds. The caller
|
||||
## MUST have already quiesced its library object (e.g. cancelled the object's
|
||||
## async tasks) on the FFI thread before calling this — the worker keeps
|
||||
## running. The stale C event callback is dropped so a watchdog tick on the
|
||||
## parked slot cannot invoke a callback whose user-data may already be freed.
|
||||
ctx.callbackState = default(FFICallbackState)
|
||||
ctx.myLib = nil
|
||||
pool.releaseSlot(ctx)
|
||||
return ok()
|
||||
|
||||
proc destroyFFIContext*[T](
|
||||
pool: var FFIContextPool[T], ctx: ptr FFIContext[T]
|
||||
): Result[void, string] =
|
||||
## Stops the FFI context and returns its slot to the pool. If the FFI thread
|
||||
## is blocked and does not exit in time, the slot is leaked rather than
|
||||
## reclaimed — closing its resources while the thread is still live would be
|
||||
## unsafe.
|
||||
## Full teardown: stops/joins the worker threads and returns the slot to the
|
||||
## pool, marking it uninitialised so a later createFFIContext rebuilds it. Used
|
||||
## on creation failure and by non-pooling callers; steady-state cleanup should
|
||||
## use releaseFFIContext to keep fd usage bounded. If the FFI thread is blocked
|
||||
## and does not exit in time, the slot is leaked rather than reclaimed —
|
||||
## closing its resources while the thread is still live would be unsafe.
|
||||
ctx.stopAndJoinThreads().isOkOr:
|
||||
return err("destroyFFIContext(pool): " & $error)
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
if pool.slots[i].addr == ctx:
|
||||
pool.initialized[i].store(false)
|
||||
break
|
||||
pool.releaseSlot(ctx)
|
||||
return ok()
|
||||
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import std/[locks, strutils, os]
|
||||
import std/[locks, strutils, os, osproc, sequtils]
|
||||
import unittest2
|
||||
import results
|
||||
import ../ffi
|
||||
@ -648,3 +648,109 @@ suite "ptr return type in .ffi.":
|
||||
|
||||
waitCallback(freeD)
|
||||
check freeD.retCode == RET_OK
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# releaseFFIContext: park & reuse (fd-leak regression)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
proc countOpenFds(): int =
|
||||
## Number of open fds for this process, or -1 if not determinable on this
|
||||
## platform. On Linux we count /proc/self/fd; elsewhere we shell out to lsof
|
||||
## (skipped if lsof is unavailable, e.g. Windows).
|
||||
when defined(linux):
|
||||
var n = 0
|
||||
for _ in walkDir("/proc/self/fd"):
|
||||
inc n
|
||||
return n
|
||||
else:
|
||||
if findExe("lsof").len == 0:
|
||||
return -1
|
||||
try:
|
||||
let output =
|
||||
execProcess("lsof", args = ["-p", $getCurrentProcessId()], options = {poUsePath})
|
||||
return output.splitLines().countIt(it.len > 0)
|
||||
except CatchableError:
|
||||
return -1
|
||||
|
||||
suite "releaseFFIContext (park & reuse)":
|
||||
test "park returns the slot and reuses the same live worker":
|
||||
var pool: FFIContextPool[TestLib]
|
||||
let ctx1 = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
check pool.releaseFFIContext(ctx1).isOk()
|
||||
|
||||
# Reacquire: must be the same array slot, with its worker still running.
|
||||
let ctx2 = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
check ctx1 == ctx2
|
||||
|
||||
var d: CallbackData
|
||||
initCallbackData(d)
|
||||
defer:
|
||||
deinitCallbackData(d)
|
||||
check sendRequestToFFIThread(
|
||||
ctx2, PingRequest.ffiNewReq(testCallback, addr d, "reuse".cstring)
|
||||
).isOk()
|
||||
waitCallback(d)
|
||||
check d.retCode == RET_OK
|
||||
check callbackMsg(d) == "pong:reuse" # reused worker still processes requests
|
||||
|
||||
check pool.destroyFFIContext(ctx2).isOk()
|
||||
|
||||
test "park drops the stale event callback and library pointer":
|
||||
var pool: FFIContextPool[TestLib]
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
ctx.callbackState.callback = cast[pointer](testCallback)
|
||||
ctx.callbackState.userData = cast[pointer](0xDEAD)
|
||||
|
||||
check pool.releaseFFIContext(ctx).isOk()
|
||||
check ctx.callbackState.callback.isNil() # a watchdog tick can't call a freed cb
|
||||
check ctx.callbackState.userData.isNil()
|
||||
check ctx.myLib.isNil()
|
||||
|
||||
check pool.destroyFFIContext(ctx).isOk()
|
||||
|
||||
test "fd usage stays bounded across many park/reuse cycles":
|
||||
if countOpenFds() < 0:
|
||||
skip() # no fd-counting facility on this platform
|
||||
else:
|
||||
var pool: FFIContextPool[TestLib]
|
||||
|
||||
# Warm up: the first create builds the slot's worker (its fds are allocated
|
||||
# once here); parking keeps them open for reuse.
|
||||
block:
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
check pool.releaseFFIContext(ctx).isOk()
|
||||
|
||||
let baseline = countOpenFds()
|
||||
|
||||
for _ in 0 ..< 20:
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
var d: CallbackData
|
||||
initCallbackData(d)
|
||||
check sendRequestToFFIThread(
|
||||
ctx, PingRequest.ffiNewReq(testCallback, addr d, "x".cstring)
|
||||
).isOk()
|
||||
waitCallback(d)
|
||||
deinitCallbackData(d)
|
||||
check pool.releaseFFIContext(ctx).isOk()
|
||||
|
||||
let afterCycles = countOpenFds()
|
||||
# Reuse must not grow fds. Before the fix each cycle leaked ~10 fds (4
|
||||
# ThreadSignalPtr socketpairs + 2 dispatcher kqueues); the small slack
|
||||
# only tolerates unrelated runtime fd noise, not a per-cycle leak.
|
||||
check afterCycles <= baseline + 5
|
||||
|
||||
# Tear the (still parked) slot's worker down so the test leaves no threads.
|
||||
let last = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
check pool.destroyFFIContext(last).isOk()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user