fix(pool): recycle contexts to stop per-cycle fd leak (#74)

This commit is contained in:
Ivan FB 2026-06-08 13:36:14 +02:00 committed by Ivan FB
parent c2ea4f3f98
commit 24cf796fcd
No known key found for this signature in database
GPG Key ID: DF0C67A04C543270
5 changed files with 1293 additions and 62 deletions

View File

@ -2,9 +2,9 @@ name: CI
on:
push:
branches: [master, main]
branches: [master, main, 'release/*']
pull_request:
branches: [master, main]
branches: [master, main, 'release/*']
jobs:
# Single source of truth for Nim / Nimble versions used by every job and

View File

@ -6,11 +6,25 @@
{.passc: "-fPIC".}
import std/[atomics, locks, options, tables]
import std/[atomics, locks, options, tables, sequtils]
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
import ./ffi_types, ./ffi_events, ./ffi_thread_request, ./logging, ./cbor_serial
import ./ffi_types, ./ffi_thread_request, ./internal/ffi_macro, ./logging
export ffi_events
type FFICallbackState* = object
## Holds the C event callback and its associated user-data pointer.
## Embedded in FFIContext and referenced from the FFI thread via a thread-local.
callback*: pointer
userData*: pointer
type CtxLifecycle {.pure.} = enum
## State machine guarding a pooled FFI context, held as an Atomic on FFIContext.
## Transitions:
## Active -> RecyclePending when ffiDtor is invoked
## RecyclePending -> Recycling The process completed the in-flight processes and is ready for lib cleanup and release
## Recycling -> Active When the FFI thread is ready again to attend to requests
Active ## accepting and serving requests
RecyclePending ## recycle requested; FFI thread loop hasn't claimed it yet
Recycling ## FFI loop draining handlers, then frees lib + returns to pool
type FFIContext*[T] = object
myLib*: ptr T # main library object (Waku, LibP2P, SDS, …)
@ -32,6 +46,15 @@ type FFIContext*[T] = object
# advanced each FFI-thread loop; event thread reads for liveness
eventQueueStuck*: Atomic[bool] # sticky overflow flag
running: Atomic[bool] # To control when the threads are running
lifecycle: Atomic[CtxLifecycle]
recycleCallback: FFICallBack
# The destructor's callback, fired by the recycle handler with the outcome:
# RET_OK once drained, RET_ERR if it timed out. Set by requestRecycle.
recycleUserData: pointer
inUse: Atomic[bool]
# Whether the context is claimed. createFFIContext claims it (false -> true); the
# recycle handler clears it once drained. On the context so the owning thread can
# release it without reaching into the pool.
registeredRequests: ptr Table[cstring, FFIRequestProc]
var onFFIThread* {.threadvar.}: bool
@ -55,6 +78,285 @@ template closeAndNil(field: untyped) =
proc deinitContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
## Mirror of `initContextResources`. Threads MUST be joined first;
## fields are nil'd after close so re-init on the same slot is safe.
template callEventCallback*(ctx: ptr FFIContext, eventName: string, body: untyped) =
if isNil(ctx[].callbackState.callback):
chronicles.error eventName & " - eventCallback is nil"
return
foreignThreadGc:
try:
let event = body
cast[FFICallBack](ctx[].callbackState.callback)(
RET_OK,
unsafeAddr event[0],
cast[csize_t](len(event)),
ctx[].callbackState.userData,
)
except Exception, CatchableError:
let msg =
"Exception " & eventName & " when calling 'eventCallBack': " &
getCurrentExceptionMsg()
cast[FFICallBack](ctx[].callbackState.callback)(
RET_ERR,
unsafeAddr msg[0],
cast[csize_t](len(msg)),
ctx[].callbackState.userData,
)
template dispatchFfiEvent*(eventName: string, body: untyped) =
## Dispatches an FFI event to the callback registered via `{libName}_set_event_callback`.
## `body` is evaluated lazily — only when a callback is registered.
## Valid only on the FFI thread (i.e., inside {.ffi.} proc bodies and their async closures).
let ffiState = ffiCurrentCallbackState
if isNil(ffiState) or isNil(ffiState[].callback):
chronicles.error eventName & " - event callback not set"
return
foreignThreadGc:
try:
let event = body
cast[FFICallBack](ffiState[].callback)(
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ffiState[].userData
)
except Exception, CatchableError:
let msg = "Exception dispatching " & eventName & ": " & getCurrentExceptionMsg()
cast[FFICallBack](ffiState[].callback)(
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ffiState[].userData
)
proc sendRequestToFFIThread*(
ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration
): Result[void, string] =
ctx.lock.acquire()
defer:
ctx.lock.release()
if ctx.lifecycle.load() != CtxLifecycle.Active:
deleteRequest(ffiRequest)
return err("FFI context is not accepting requests (being recycled)")
## Sending the request to the FFI thread
let sentOk = ctx.reqChannel.trySend(ffiRequest)
if not sentOk:
deleteRequest(ffiRequest)
return err("Couldn't send a request to the ffi thread")
let fireSyncRes = ctx.reqSignal.fireSync()
if fireSyncRes.isErr():
deleteRequest(ffiRequest)
return err("failed fireSync: " & $fireSyncRes.error)
if fireSyncRes.get() == false:
deleteRequest(ffiRequest)
return err("Couldn't fireSync in time")
## wait until the FFI working thread properly received the request
let res = ctx.reqReceivedSignal.waitSync(timeout)
if res.isErr():
return err("Couldn't receive reqReceivedSignal signal")
return ok()
type Foo = object
registerReqFFI(WatchdogReq, foo: ptr Foo):
proc(): Future[Result[string, string]] {.async.} =
return ok("FFI thread is not blocked")
type JsonNotRespondingEvent = object
eventType: string
proc init(T: type JsonNotRespondingEvent): T =
return JsonNotRespondingEvent(eventType: "not_responding")
proc `$`(event: JsonNotRespondingEvent): string =
$(%*event)
proc onNotResponding*(ctx: ptr FFIContext) =
callEventCallback(ctx, "onNotResponding"):
$JsonNotRespondingEvent.init()
proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} =
## Watchdog thread that monitors the FFI thread and notifies the library user if it hangs.
## This thread never blocks.
let watchdogRun = proc(ctx: ptr FFIContext) {.async.} =
const WatchdogStartDelay = 10.seconds
const WatchdogTimeinterval = 1.seconds
const WatchdogTimeout = 20.seconds
# Give time for the node to be created and up before sending watchdog requests
let initialStop = await ctx.stopSignal.wait().withTimeout(WatchdogStartDelay)
if initialStop or ctx.running.load == false:
return
while true:
let intervalStop = await ctx.stopSignal.wait().withTimeout(WatchdogTimeinterval)
if intervalStop or ctx.running.load == false:
debug "Watchdog thread exiting because FFIContext is not running"
break
if ctx.lifecycle.load() != CtxLifecycle.Active:
continue
let callback = proc(
callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
discard ## Don't do anything. Just respecting the callback signature.
const nilUserData = nil
trace "Sending watchdog request to FFI thread"
try:
sendRequestToFFIThread(
ctx, WatchdogReq.ffiNewReq(callback, nilUserData), WatchdogTimeout
).isOkOr:
error "Failed to send watchdog request to FFI thread", error = $error
onNotResponding(ctx)
except Exception as exc:
error "Exception sending watchdog request", exc = exc.msg
onNotResponding(ctx)
waitFor watchdogRun(ctx)
proc processRequest[T](
request: ptr FFIThreadRequest, ctx: ptr FFIContext[T]
) {.async.} =
## Invoked within the FFI thread to process a request coming from the FFI API consumer thread.
let reqId = $request[].reqId
## The reqId determines which proc will handle the request.
## The registeredRequests represents a table defined at compile time.
## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously]
let reqIdCs = reqId.cstring
# keep `reqId` alive and avoid the implicit string→cstring warning.
let retFut =
if not ctx[].registeredRequests[].contains(reqIdCs):
## That shouldn't happen because only registered requests should be sent to the FFI thread.
nilProcess(request[].reqId)
else:
ctx[].registeredRequests[][reqIdCs](request[].reqContent, ctx)
let res =
try:
await retFut
except CancelledError as exc:
Result[string, string].err("Request cancelled during destroy: " & exc.msg)
except AsyncError as exc:
Result[string, string].err(
"Async error in processRequest for " & reqId & ": " & exc.msg
)
## handleRes may raise (OOM, GC setup) even though it is rare.
try:
handleRes(res, request)
except Exception as exc:
error "Unexpected exception in handleRes", error = exc.msg
proc freeLib[T](ctx: ptr FFIContext[T]) {.gcsafe.} =
if ctx.myLib.isNil():
return
when not defined(gcRefc):
{.cast(gcsafe).}:
`=destroy`(ctx.myLib[])
else:
discard
freeShared(ctx.myLib)
ctx.myLib = nil
var RecycleTimeout* = 1500.milliseconds
## Upper bound the recycle handler waits for in-flight handlers before it
## cancels them and reports the ctx as stuck. The drain returns as soon as they
## finish, so this only bounds a *stuck* handler. A `var` so tests can shorten it.
proc recycleContext[T](
ctx: ptr FFIContext[T], ongoingProcessReq: ptr seq[Future[void]]
) {.async.} =
## Drain the in-flight handlers, free the lib object, release the context for reuse,
## and fire the callback with the outcome. Never blocks the caller.
ongoingProcessReq[].keepItIf(not it.finished())
## 1. Let the in-flight handlers finish on their own, bounded by RecycleTimeout.
var naturallyDrained = ongoingProcessReq[].len == 0
if not naturallyDrained:
naturallyDrained = await allFutures(ongoingProcessReq[]).withTimeout(RecycleTimeout)
## 2. If any are wedged, cancel them and give the cancellations a bounded moment
## to unwind, so the context can be reclaimed rather than leaked.
var safeToRecycle = naturallyDrained
if not naturallyDrained:
for fut in ongoingProcessReq[]:
if not fut.finished():
fut.cancelSoon()
safeToRecycle = await allFutures(ongoingProcessReq[]).withTimeout(RecycleTimeout)
let cb = ctx.recycleCallback
let ud = ctx.recycleUserData
ctx.recycleCallback = nil
if safeToRecycle:
freeLib(ctx)
ctx.callbackState = default(FFICallbackState)
ongoingProcessReq[].setLen(0)
ctx.release()
if not cb.isNil():
foreignThreadGc:
let msg =
if naturallyDrained:
""
else:
"recycle: in-flight requests did not finish in time"
let cmsg = msg.cstring
let retCode = if naturallyDrained: RET_OK else: RET_ERR
cb(retCode, unsafeAddr cmsg[0], cast[csize_t](msg.len), ud)
proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
## FFI thread body that attends library user API requests
ffiCurrentCallbackState = addr ctx[].callbackState
logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)
defer:
let fireRes = ctx.threadExitSignal.fireSync()
if fireRes.isErr():
error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error
let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} =
var ongoingProcessReq: seq[Future[void]]
while ctx.running.load():
var expected = CtxLifecycle.RecyclePending
if ctx.lifecycle.compareExchange(expected, CtxLifecycle.Recycling):
await recycleContext(ctx, addr ongoingProcessReq)
continue
let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds)
if not gotSignal:
continue
## Wait for a request from the ffi consumer thread
var request: ptr FFIThreadRequest
if not ctx.reqChannel.tryRecv(request):
continue
ongoingProcessReq.keepItIf(not it.finished())
ongoingProcessReq.add(processRequest(request, ctx))
let fireRes = ctx.reqReceivedSignal.fireSync()
if fireRes.isErr():
error "could not fireSync back to requester thread", error = fireRes.error
waitFor ffiRun(ctx)
proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
## Full cleanup for heap-allocated contexts: closes all resources and frees memory.
defer:
freeShared(ctx)
ctx.lock.deinitLock()
deinitEventRegistry(ctx[].eventRegistry)
deinitEventQueue(ctx[].eventQueue)
@ -115,6 +417,7 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
ctx.registeredRequests = addr ffi_types.registeredRequests
ctx.lifecycle.store(CtxLifecycle.Active)
ctx.running.store(true)
try:
@ -180,10 +483,41 @@ proc stopAndJoinThreads*[T](ctx: ptr FFIContext[T]): Result[void, string] =
joinThread(ctx.eventThread)
ok()
proc clearContext[T](ctx: ptr FFIContext[T]): Result[void, string] =
## Stops a heap-allocated FFI context.
ctx.stopAndJoinThreads().isOkOr:
return err("clearContext: " & $error)
ctx.cleanUpResources().isOkOr:
return err("cleanUpResources failed: " & $error)
ok()
proc requestRecycle*[T](
ctx: ptr FFIContext[T], callback: FFICallBack, userData: pointer
): Result[void, string] =
## Starts ctx recycle process without stopping its worker, so the next
## createFFIContext reuses the same threads and fds.
##
## During recycling, the FFI thread drains the handlers, frees the lib and releases
## the context, then fires `callback` (RET_OK drained, RET_ERR stuck).
ctx.lock.acquire()
if ctx.lifecycle.load() != CtxLifecycle.Active:
ctx.lock.release()
return err("requestRecycle: context is not Active (already recycling)")
ctx.recycleCallback = callback
ctx.recycleUserData = userData
ctx.lifecycle.store(CtxLifecycle.RecyclePending)
ctx.lock.release()
let fired = ctx.reqSignal.fireSync().valueOr:
return err("requestRecycle: failed to signal the FFI thread: " & $error)
if not fired:
return err("requestRecycle: failed to signal the FFI thread in time")
return ok()
proc markAsActive*[T](ctx: ptr FFIContext[T]) =
ctx.lifecycle.store(CtxLifecycle.Active)
proc tryClaim*[T](ctx: ptr FFIContext[T]): bool =
## Returns true if acquired the contex, false if it was already claimed.
var expected = false
ctx.inUse.compareExchange(expected, true)
proc release*[T](ctx: ptr FFIContext[T]) =
ctx.inUse.store(false)
proc isInUse*[T](ctx: ptr FFIContext[T]): bool =
ctx.inUse.load()

View File

@ -1,56 +1,60 @@
import std/atomics
import results
import ./ffi_context
import ./ffi_context, ./ffi_types
const MaxFFIContexts* = 32
# Only affects upfront pool memory; fds/threads consumed per acquired slot.
## Maximum number of concurrently live FFI contexts when using FFIContextPool.
type FFIContextPool*[T] = object
## Fixed pool. Bounds ThreadSignalPtr fds at MaxFFIContexts * 2.
slots: array[MaxFFIContexts, FFIContext[T]]
inUse: array[MaxFFIContexts, Atomic[bool]]
proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], string] =
for i in 0 ..< MaxFFIContexts:
var expected = false
if pool.inUse[i].compareExchange(expected, true):
return ok(pool.slots[i].addr)
err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)")
proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) =
for i in 0 ..< MaxFFIContexts:
if pool.slots[i].addr == ctx:
pool.inUse[i].store(false)
return
contexts: array[MaxFFIContexts, FFIContext[T]]
initialized: array[MaxFFIContexts, Atomic[bool]]
proc createFFIContext*[T](
pool: var FFIContextPool[T]
): Result[ptr FFIContext[T], string] =
let ctx = pool.acquireSlot().valueOr:
return err("createFFIContext: acquireSlot failed: " & $error)
initContextResources(ctx).isOkOr:
pool.releaseSlot(ctx)
return err("createFFIContext: initContextResources failed: " & $error)
ok(ctx)
## Acquires a context from the fixed pool. The context's worker is built once on
## first use and reused on every later acquisition.
for i in 0 ..< MaxFFIContexts:
let ctx = pool.contexts[i].addr
if not ctx.tryClaim():
continue
if pool.initialized[i].load():
## Reused context: a prior destroy drained and released it, worker still alive.
ctx.markAsActive()
return ok(ctx)
initContextResources(ctx).isOkOr:
ctx.release()
return err("createFFIContext: initContextResources failed: " & $error)
pool.initialized[i].store(true)
return ok(ctx)
return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)")
proc releaseFFIContext*[T](
ctx: ptr FFIContext[T], callback: FFICallBack, userData: pointer
): Result[void, string] =
return ctx.requestRecycle(callback, userData)
proc destroyFFIContext*[T](
pool: var FFIContextPool[T], ctx: ptr FFIContext[T]
): Result[void, string] =
## On thread-exit timeout the slot is leaked — closing live-thread resources is unsafe.
## Full teardown: stops/joins the worker threads and returns the context to the
## pool, marking it uninitialised so a later createFFIContext rebuilds it.
ctx.stopAndJoinThreads().isOkOr:
return err("destroyFFIContext(pool): " & $error)
# Required: next acquisition would otherwise re-init a live lock (UB).
let deinitRes = ctx.deinitContextResources()
pool.releaseSlot(ctx)
deinitRes.isOkOr:
return err("destroyFFIContext(pool): " & $error)
ok()
for i in 0 ..< MaxFFIContexts:
if pool.contexts[i].addr == ctx:
pool.initialized[i].store(false)
break
ctx.release()
return ok()
proc isValidCtx*[T](pool: var FFIContextPool[T], ctx: pointer): bool =
## Rejects nil / offset-invalid / dangling pointers at the API boundary.
## Returns true only if ctx points to one of the pool's contexts that is
## currently in use.
if ctx.isNil():
return false
for i in 0 ..< MaxFFIContexts:
if cast[pointer](pool.slots[i].addr) == ctx:
return pool.inUse[i].load()
false
if cast[pointer](pool.contexts[i].addr) == ctx:
return cast[ptr FFIContext[T]](ctx).isInUse()
return false

View File

@ -1253,16 +1253,15 @@ macro ffiDtor*(prc: untyped): untyped =
## The body contains any library-level cleanup to run before context teardown.
##
## Example:
## proc waku_destroy*(w: Waku) {.ffiDtor.} =
## w.cleanup()
## proc mylibobj_destroy*(obj: MyLibObj) {.ffiDtor.} =
## obj.cleanup()
##
## The generated C-exported proc has the signature:
## int waku_destroy(void* ctx)
## cint mylibobj_destroy(void* ctx, FfiCallback callback, void* userData)
##
## It extracts the library value from ctx, runs the body, then calls
## destroyFFIContext to tear down the FFI thread and free the context.
## Returns RET_OK on success, RET_ERR on failure (null/invalid ctx, or
## destroyFFIContext failure).
## Recycle the context for reuse to keep fd usage bounded.
## NON-BLOCKING: returns RET_OK once accepted;
## the real outcome arrives via `callback`.
let procName = prc[0]
let formalParams = prc[3]
@ -1271,8 +1270,8 @@ macro ffiDtor*(prc: untyped): untyped =
if formalParams.len < 2:
error("ffiDtor: proc must have exactly one parameter (w: LibType)")
let libParamName = formalParams[1][0]
let libTypeName = formalParams[1][1]
let libParamName = formalParams[1][0] # e.g. w
let libTypeName = formalParams[1][1] # e.g. MyLibObj
let procNameStr = block:
let raw = $procName
@ -1289,7 +1288,7 @@ macro ffiDtor*(prc: untyped): untyped =
if procName.kind == nnkPostfix:
cExportProcName = procName[1]
let destroyResIdent = genSym(nskLet, "destroyRes")
let releaseResIdent = genSym(nskLet, "destroyRes")
let ffiBody = newStmtList()
@ -1314,12 +1313,14 @@ macro ffiDtor*(prc: untyped): untyped =
let poolIdent = ident($libTypeName & "FFIPool")
ffiBody.add quote do:
let `destroyResIdent` =
`poolIdent`.destroyFFIContext(cast[ptr FFIContext[`libTypeName`]](ctx))
if `destroyResIdent`.isErr():
let `releaseResIdent` = releaseFFIContext(
cast[ptr FFIContext[`libTypeName`]](ctx), callback, userData
)
if `releaseResIdent`.isErr():
if not callback.isNil():
let errStr = "release failed: " & $`releaseResIdent`.error
callback(RET_ERR, unsafeAddr errStr[0], cast[csize_t](errStr.len), userData)
return RET_ERR
ffiBody.add quote do:
return RET_OK
let ffiProc = newProc(

892
tests/test_ffi_context.nim Normal file
View File

@ -0,0 +1,892 @@
import std/[locks, strutils, os, osproc, sequtils]
import unittest2
import results
import ../ffi
type TestLib = object
## Per-request callback state. The test thread blocks on `cond` until the
## FFI thread signals it — no polling, no CPU waste.
type CallbackData = object
lock: Lock
cond: Cond
called: bool
retCode: cint
msg: array[512, char]
msgLen: int
proc initCallbackData(d: var CallbackData) =
d.lock.initLock()
d.cond.initCond()
proc deinitCallbackData(d: var CallbackData) =
d.cond.deinitCond()
d.lock.deinitLock()
proc testCallback(
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
let d = cast[ptr CallbackData](userData)
acquire(d[].lock)
d[].retCode = retCode
let n = min(int(len), d[].msg.high)
if n > 0 and not msg.isNil:
copyMem(addr d[].msg[0], msg, n)
d[].msg[n] = '\0'
d[].msgLen = n
d[].called = true
signal(d[].cond)
release(d[].lock)
proc waitCallback(d: var CallbackData) =
acquire(d.lock)
while not d.called:
wait(d.cond, d.lock)
release(d.lock)
proc callbackMsg(d: var CallbackData): string =
result = newString(d.msgLen)
if d.msgLen > 0:
copyMem(addr result[0], addr d.msg[0], d.msgLen)
registerReqFFI(PingRequest, lib: ptr TestLib):
proc(message: cstring): Future[Result[string, string]] {.async.} =
return ok("pong:" & $message)
registerReqFFI(FailRequest, lib: ptr TestLib):
proc(): Future[Result[string, string]] {.async.} =
return err("intentional failure")
registerReqFFI(EmptyOkRequest, lib: ptr TestLib):
proc(): Future[Result[string, string]] {.async.} =
return ok("")
registerReqFFI(SlowRequest, lib: ptr TestLib):
proc(): Future[Result[string, string]] {.async.} =
await sleepAsync(500.milliseconds)
return ok("slow-done")
# Coordination channel: the FFI handler signals the test thread the instant
# it is about to block the event loop, so the test can call destroyFFIContext
# while the event loop is truly frozen.
var gSyncBlockStarted: Channel[bool]
gSyncBlockStarted.open()
registerReqFFI(SyncBlockingRequest, lib: ptr TestLib):
proc(): Future[Result[string, string]] {.async.} =
# Yield first so that reqReceivedSignal fires and sendRequestToFFIThread
# returns on the calling thread before we start the synchronous block.
await sleepAsync(0.milliseconds)
# Signal the test thread: the event loop is about to be frozen.
# Channel.send is annotated as raising under refc, so wrap.
try:
gSyncBlockStarted.send(true)
except Exception as exc:
return err("gSyncBlockStarted.send raised: " & exc.msg)
# Simulates a request that blocks the event-loop thread synchronously
# (e.g. w.stop() -> switch.stop() -> connManager.close() with blocking I/O).
# Unlike sleepAsync, os.sleep holds the OS thread and prevents Chronos from
# processing any callbacks -- including the reqSignal fired by destroyFFIContext.
os.sleep(5_000)
return ok("sync-blocking-done")
# Approximates the heavy ref-object workload that libwaku/libp2p performs on
# the FFI thread. The exact cell count is large enough to force several refc
# GC cycles; under refc this stresses the heap state that, when later combined
# with a chronos Selector allocation on the main thread (via close()), used to
# trip the rawNewObj → signal-handler infinite recursion.
type RefCell = ref object
next: RefCell
payload: array[64, byte]
registerReqFFI(HeavyRefAllocRequest, lib: ptr TestLib):
proc(): Future[Result[string, string]] {.async.} =
var head: RefCell
for i in 0 ..< 50_000:
let n = RefCell(next: head)
head = n
if i mod 1000 == 0:
await sleepAsync(0.milliseconds)
# Break the chain iteratively before releasing head.
# ORC's =destroy for RefCell recurses through .next, so a 50k-node chain
# would produce ~50k nested =destroy calls and overflow the stack.
# Walking the list and unlinking each node first keeps destruction O(n)
# iterative instead of O(n) recursive.
var node = head
head = nil
while not node.isNil():
let nxt = node.next
node.next = nil # unlink before the refcount of `node` can drop to zero
node = nxt
await sleepAsync(10.milliseconds)
return ok("heavy-done")
suite "FFIContextPool":
test "create and destroy via pool succeeds":
var pool: FFIContextPool[TestLib]
let ctx = pool.createFFIContext().valueOr:
assert false, "createFFIContext(pool) failed: " & $error
return
check pool.destroyFFIContext(ctx).isOk()
test "context is reused after destroy":
var pool: FFIContextPool[TestLib]
let ctx1 = pool.createFFIContext().valueOr:
assert false, "createFFIContext(pool) failed: " & $error
return
check pool.destroyFFIContext(ctx1).isOk()
# After destroying, the same context must be available again
let ctx2 = pool.createFFIContext().valueOr:
assert false, "createFFIContext(pool) failed after context release: " & $error
return
check pool.destroyFFIContext(ctx2).isOk()
check ctx1 == ctx2 # same context reused
test "pool exhaustion returns error":
var pool: FFIContextPool[TestLib]
var ctxs: array[MaxFFIContexts, ptr FFIContext[TestLib]]
for i in 0 ..< MaxFFIContexts:
ctxs[i] = pool.createFFIContext().valueOr:
for j in 0 ..< i:
discard pool.destroyFFIContext(ctxs[j])
assert false, "createFFIContext(pool) failed at context " & $i & ": " & $error
return
# Pool is now full — next create must fail
check pool.createFFIContext().isErr()
for i in 0 ..< MaxFFIContexts:
discard pool.destroyFFIContext(ctxs[i])
test "requests are processed via pool context":
var pool: FFIContextPool[TestLib]
var d: CallbackData
initCallbackData(d)
defer:
deinitCallbackData(d)
let ctx = pool.createFFIContext().valueOr:
assert false, "createFFIContext(pool) failed: " & $error
return
defer:
discard pool.destroyFFIContext(ctx)
check sendRequestToFFIThread(
ctx, PingRequest.ffiNewReq(testCallback, addr d, "pool".cstring)
)
.isOk()
waitCallback(d)
check d.retCode == RET_OK
check callbackMsg(d) == "pong:pool"
suite "createFFIContext / destroyFFIContext":
test "create and destroy succeeds":
var pool: FFIContextPool[TestLib]
let ctx = pool.createFFIContext().valueOr:
checkpoint "createFFIContext failed: " & $error
check false
return
check pool.destroyFFIContext(ctx).isOk()
test "double destroy is safe via running flag":
var pool: FFIContextPool[TestLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
check pool.destroyFFIContext(ctx).isOk()
suite "destroyFFIContext does not hang":
test "destroy while a slow async request is still in-flight":
## Reproduces the race where destroyFFIContext was called while a long-
## running async request (e.g. stop_node / w.stop()) was still executing.
## The destroy must return well within 2 seconds; before the fix it would
## block forever on joinThread(ffiThread).
var pool: FFIContextPool[TestLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
var d: CallbackData
initCallbackData(d)
defer: deinitCallbackData(d)
# sendRequestToFFIThread returns as soon as the FFI thread ACKs receipt;
# the 500 ms work continues asynchronously on the FFI thread.
check sendRequestToFFIThread(
ctx, SlowRequest.ffiNewReq(testCallback, addr d)
).isOk()
# Destroy immediately while SlowRequest is still running.
let t0 = Moment.now()
check pool.destroyFFIContext(ctx).isOk()
check (Moment.now() - t0) < 2.seconds
suite "destroyFFIContext does not hang when event loop is blocked":
test "destroy while sync-blocking request is in-flight":
## Reproduces the hang seen in logosdelivery_example.c:
## logosdelivery_stop_node(...) -- triggers w.stop() on the FFI thread
## sleep(1)
## logosdelivery_destroy(...) -- hangs forever
##
## Root cause: w.stop() (and similar tear-down calls) can execute a
## synchronous blocking section that holds the OS thread, preventing
## the Chronos event loop from processing the reqSignal fired by
## destroyFFIContext. The result is joinThread(ffiThread) never returns.
##
## With the fix, destroyFFIContext must complete well within the 5 s that
## SyncBlockingRequest holds the event loop.
var pool: FFIContextPool[TestLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
# CallbackData and ctx are kept alive past destroyFFIContext: the leaked
# FFI thread is still inside os.sleep(5_000) and will eventually wake,
# run handleRes, fire testCallback, and exit normally. We wait for that
# to happen at the end of the test so the leaked thread cannot race with
# subsequent tests' createFFIContext on Linux/Windows. Heap allocation
# ensures the late callback's userData is still valid when it fires.
let d = createShared(CallbackData)
initCallbackData(d[])
check sendRequestToFFIThread(
ctx, SyncBlockingRequest.ffiNewReq(testCallback, d)
).isOk()
# Block until the FFI handler has signalled that os.sleep is about to start.
# This guarantees destroyFFIContext is called while the event loop is frozen.
discard gSyncBlockStarted.recv()
# Destroy must return promptly even though the event loop is frozen for 5s.
# It deliberately returns err and leaks ctx in this scenario rather than
# hanging on joinThread.
let t0 = Moment.now()
check pool.destroyFFIContext(ctx).isErr()
check (Moment.now() - t0) < 3.seconds
# Drain the leaked thread before the test scope ends.
# 1. waitCallback blocks until os.sleep(5_000) returns and handleRes
# invokes testCallback (~3.5s after destroy returned), which proves
# the leaked thread has reached the end of processRequest.
# 2. Yield briefly so the thread can finish iterating its while loop,
# fire threadExitSignal in its defer, and return. Without this, on
# Linux/Windows the still-live thread can race with the next test's
# createFFIContext under --mm:orc and segfault.
# ctx.cleanUpResources is intentionally NOT called: destroyFFIContext
# skipped it for a reason, and the signal fds are reclaimed by the OS
# at process exit.
waitCallback(d[])
os.sleep(200)
deinitCallbackData(d[])
freeShared(d)
suite "destroyFFIContext refc workaround":
## Documents the refc-specific workaround in cleanUpResources.
##
## Background: when the FFI thread does heavy ref-object work (the workload
## that triggered the libwaku hang in production), the refc GC heap reaches
## a state where the very first chronos Selector allocation on the *main*
## thread — which happens lazily inside ThreadSignalPtr.close() through
## getThreadDispatcher() — traps in rawNewObj. The refc signal handler
## itself re-enters the same allocator and the process never returns.
## Captured stack:
## close → safeUnregisterAndCloseFd → getThreadDispatcher →
## newDispatcher → Selector.new → newObj (gc.nim:488) → rawNewObj →
## _sigtramp → signalHandler → newObjNoInit → addNewObjToZCT (loop)
##
## The workaround in cleanUpResources is `when defined(gcRefc): discard`,
## i.e. skip the close() calls under refc only. orc is unaffected and
## still cleans up the signal fds normally.
##
## NOTE: this test is documentation more than regression: a synthetic
## ref-allocation workload of ~50k cells does NOT corrupt the refc heap
## the way the real libwaku/libp2p teardown does, so this test passes
## even when the workaround is disabled. Reproducing the actual hang
## requires the full libwaku workload (logosdelivery_example.c).
## Verification of the workaround was done end-to-end against that
## example: with `--mm:refc` and close() enabled it hangs forever in
## the captured stack above; with `when defined(gcRefc): discard` it
## returns immediately. Under `--mm:orc` it returns immediately either
## way.
test "destroy after heavy ref-allocation workload returns promptly":
var pool: FFIContextPool[TestLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
var d: CallbackData
initCallbackData(d)
defer: deinitCallbackData(d)
check sendRequestToFFIThread(
ctx, HeavyRefAllocRequest.ffiNewReq(testCallback, addr d)
).isOk()
waitCallback(d)
check d.retCode == RET_OK
let t0 = Moment.now()
check pool.destroyFFIContext(ctx).isOk()
check (Moment.now() - t0) < 3.seconds
suite "sendRequestToFFIThread":
test "successful request triggers RET_OK callback":
var d: CallbackData
initCallbackData(d)
defer:
deinitCallbackData(d)
var pool: FFIContextPool[TestLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctx)
check sendRequestToFFIThread(
ctx, PingRequest.ffiNewReq(testCallback, addr d, "hello".cstring)
)
.isOk()
waitCallback(d)
check d.retCode == RET_OK
check callbackMsg(d) == "pong:hello"
test "failing request triggers RET_ERR callback":
var d: CallbackData
initCallbackData(d)
defer:
deinitCallbackData(d)
var pool: FFIContextPool[TestLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctx)
check sendRequestToFFIThread(ctx, FailRequest.ffiNewReq(testCallback, addr d)).isOk()
waitCallback(d)
check d.retCode == RET_ERR
test "empty ok response delivers empty message":
var d: CallbackData
initCallbackData(d)
defer:
deinitCallbackData(d)
var pool: FFIContextPool[TestLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctx)
check sendRequestToFFIThread(ctx, EmptyOkRequest.ffiNewReq(testCallback, addr d))
.isOk()
waitCallback(d)
check d.retCode == RET_OK
check d.msgLen == 0
test "sequential requests are all processed":
var pool: FFIContextPool[TestLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctx)
for i in 1 .. 5:
var d: CallbackData
initCallbackData(d)
let msg = "msg" & $i
check sendRequestToFFIThread(
ctx, PingRequest.ffiNewReq(testCallback, addr d, msg.cstring)
)
.isOk()
waitCallback(d)
deinitCallbackData(d)
check d.retCode == RET_OK
check callbackMsg(d) == "pong:" & msg
# ---------------------------------------------------------------------------
# ffiCtor macro integration test
# ---------------------------------------------------------------------------
type SimpleLib = object
value: int
ffiType:
type SimpleConfig = object
initialValue: int
proc testlib_create*(
config: SimpleConfig
): Future[Result[SimpleLib, string]] {.ffiCtor.} =
return ok(SimpleLib(value: config.initialValue))
# Records the value of the library object the destructor body saw, so a test can
# confirm the user cleanup body ran with the right lib state before teardown.
var gDestroyedValue {.threadvar.}: int
proc testlib_destroy*(lib: SimpleLib) {.ffiDtor.} =
gDestroyedValue = lib.value
suite "ffiCtor macro":
test "creates context and returns pointer via callback":
var d: CallbackData
initCallbackData(d)
defer: deinitCallbackData(d)
let configJson = ffiSerialize(SimpleConfig(initialValue: 42))
let ret = testlib_create(configJson.cstring, testCallback, addr d)
check not ret.isNil()
waitCallback(d)
check d.retCode == RET_OK
# The callback message is the ctx address as a decimal string
let addrStr = callbackMsg(d)
check addrStr.len > 0
let ctxAddr = cast[uint](parseBiggestUInt(addrStr))
check ctxAddr != 0
let ctx = cast[ptr FFIContext[SimpleLib]](ctxAddr)
# Verify the library was properly initialized
check not ctx[].myLib.isNil
check ctx[].myLib[].value == 42
check SimpleLibFFIPool.destroyFFIContext(ctx).isOk()
proc createSimpleLib(initialValue: int): ptr FFIContext[SimpleLib] =
## Helper: run the generated async ctor and return the live ctx.
var d: CallbackData
initCallbackData(d)
defer:
deinitCallbackData(d)
let ret =
testlib_create(ffiSerialize(SimpleConfig(initialValue: initialValue)).cstring,
testCallback, addr d)
doAssert not ret.isNil()
waitCallback(d)
doAssert d.retCode == RET_OK
return cast[ptr FFIContext[SimpleLib]](cast[uint](parseBiggestUInt(callbackMsg(d))))
suite "ffiDtor macro (async destroy + reuse)":
test "destroy fires RET_OK after teardown, frees myLib, and frees the context":
let ctx = createSimpleLib(5)
check not ctx[].myLib.isNil
check ctx[].myLib[].value == 5
var dD: CallbackData
initCallbackData(dD)
defer:
deinitCallbackData(dD)
# Async destroy: the C return is just "accepted"; the real outcome arrives
# via the callback once the FFI thread has finished tearing the lib down.
check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK
waitCallback(dD)
check dD.retCode == RET_OK
check gDestroyedValue == 5 # the user cleanup body saw the live lib
check ctx[].myLib.isNil() # freed on the FFI thread
# The context was freed from the FFI thread, so a fresh create reclaims it.
let ctx2 = createSimpleLib(9)
check ctx2 == ctx # same context, reused worker + fds
check ctx2[].myLib[].value == 9
check SimpleLibFFIPool.destroyFFIContext(ctx2).isOk()
test "destroy waits for an in-flight request before reporting RET_OK":
let ctx = createSimpleLib(1)
# Dispatch a 500 ms handler and do NOT wait — it is in flight at destroy time.
var slow: CallbackData
initCallbackData(slow)
defer:
deinitCallbackData(slow)
check sendRequestToFFIThread(ctx, SlowRequest.ffiNewReq(testCallback, addr slow)).isOk()
var dD: CallbackData
initCallbackData(dD)
defer:
deinitCallbackData(dD)
check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK
waitCallback(dD)
check dD.retCode == RET_OK
# Drained: the in-flight handler ran to completion before destroy reported OK.
check slow.called
check callbackMsg(slow) == "slow-done"
let ctx2 = createSimpleLib(2)
check ctx2 == ctx
check SimpleLibFFIPool.destroyFFIContext(ctx2).isOk()
test "requests are rejected once a destroy closes the gate":
let ctx = createSimpleLib(3)
var dD: CallbackData
initCallbackData(dD)
defer:
deinitCallbackData(dD)
check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK
waitCallback(dD)
check dD.retCode == RET_OK
# Gate stays closed until the context is reacquired: a late request must not
# dispatch onto a context about to be (or already) reused.
var d: CallbackData
initCallbackData(d)
defer:
deinitCallbackData(d)
check sendRequestToFFIThread(
ctx, SlowRequest.ffiNewReq(testCallback, addr d)
).isErr()
let ctx2 = createSimpleLib(4)
check ctx2 == ctx
check SimpleLibFFIPool.destroyFFIContext(ctx2).isOk()
test "a stuck context is reported as RET_ERR rather than hanging":
let ctx = createSimpleLib(8)
let savedTimeout = RecycleTimeout
RecycleTimeout = 150.milliseconds
defer:
RecycleTimeout = savedTimeout
# In-flight handler outlasts the (shortened) drain timeout.
var slow: CallbackData
initCallbackData(slow)
defer:
deinitCallbackData(slow)
check sendRequestToFFIThread(ctx, SlowRequest.ffiNewReq(testCallback, addr slow)).isOk()
var dD: CallbackData
initCallbackData(dD)
defer:
deinitCallbackData(dD)
check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK
waitCallback(dD)
check dD.retCode == RET_ERR # drain timed out -> ctx reported stuck
# The stuck context is leaked (not reused); the handler still finishes on its
# own. Wait for it, then fully tear the leaked context down.
waitCallback(slow)
check SimpleLibFFIPool.destroyFFIContext(ctx).isOk()
# ---------------------------------------------------------------------------
# Simplified .ffi. macro integration test
# ---------------------------------------------------------------------------
ffiType:
type SendConfig = object
message: string
proc testlib_send*(
lib: SimpleLib, cfg: SendConfig
): Future[Result[string, string]] {.ffi.} =
return ok("echo:" & cfg.message & ":" & $lib.value)
suite "simplified .ffi. macro":
test "sends request and gets serialized response via callback":
# First create a context using ffiCtor
var ctorD: CallbackData
initCallbackData(ctorD)
defer: deinitCallbackData(ctorD)
let configJson = ffiSerialize(SimpleConfig(initialValue: 7))
let ctorRet = testlib_create(configJson.cstring, testCallback, addr ctorD)
check not ctorRet.isNil()
waitCallback(ctorD)
check ctorD.retCode == RET_OK
let addrStr = callbackMsg(ctorD)
check addrStr.len > 0
let ctxAddr = cast[uint](parseBiggestUInt(addrStr))
check ctxAddr != 0
let ctx = cast[ptr FFIContext[SimpleLib]](ctxAddr)
defer: check SimpleLibFFIPool.destroyFFIContext(ctx).isOk()
# Now call the .ffi. proc
var d: CallbackData
initCallbackData(d)
defer: deinitCallbackData(d)
let cfgJson = ffiSerialize(SendConfig(message: "hello"))
let ret = testlib_send(ctx, testCallback, addr d, cfgJson.cstring)
check ret == RET_OK
waitCallback(d)
check d.retCode == RET_OK
let receivedMsg = callbackMsg(d)
let decoded = ffiDeserialize(receivedMsg.cstring, string).valueOr:
check false
""
check decoded == "echo:hello:7"
# ---------------------------------------------------------------------------
# async/sync detection in .ffi. macro integration test
# ---------------------------------------------------------------------------
# Sync proc (no await in body) — macro detects this and bypasses thread machinery
proc testlib_version*(
lib: SimpleLib
): Future[Result[string, string]] {.ffi.} =
return ok("v" & $lib.value)
suite "async/sync detection in .ffi.":
test "sync proc invokes callback without thread hop":
# Create a context using ffiCtor
var ctorD: CallbackData
initCallbackData(ctorD)
defer: deinitCallbackData(ctorD)
let configJson = ffiSerialize(SimpleConfig(initialValue: 3))
let ctorRet = testlib_create(configJson.cstring, testCallback, addr ctorD)
check not ctorRet.isNil()
waitCallback(ctorD)
check ctorD.retCode == RET_OK
let addrStr = callbackMsg(ctorD)
check addrStr.len > 0
let ctxAddr = cast[uint](parseBiggestUInt(addrStr))
check ctxAddr != 0
let ctx = cast[ptr FFIContext[SimpleLib]](ctxAddr)
defer: check SimpleLibFFIPool.destroyFFIContext(ctx).isOk()
var d2: CallbackData
initCallbackData(d2)
defer: deinitCallbackData(d2)
# Call sync proc — callback should fire before the proc returns (no thread hop)
let ret = testlib_version(ctx, testCallback, addr d2)
# No sleep needed: sync path fires callback inline before returning
check ret == RET_OK
check d2.called # fires synchronously — no waitCallback needed
check d2.retCode == RET_OK
let receivedMsg = callbackMsg(d2)
let decoded = ffiDeserialize(receivedMsg.cstring, string).valueOr:
check false
""
check decoded == "v3"
# ---------------------------------------------------------------------------
# ptr T return type in .ffi. macro integration test
# ---------------------------------------------------------------------------
type Handle = object
data: string
ffiType:
type NameParam = object
name: string
proc testlib_alloc_handle*(
lib: SimpleLib, np: NameParam
): Future[Result[ptr Handle, string]] {.ffi.} =
let h = createShared(Handle)
h[] = Handle(data: np.name & ":" & $lib.value)
return ok(h)
proc testlib_read_handle*(
lib: SimpleLib, handle: pointer
): Future[Result[string, string]] {.ffi.} =
let h = cast[ptr Handle](handle)
return ok(h[].data)
proc testlib_free_handle*(
lib: SimpleLib, handle: pointer
): Future[Result[string, string]] {.ffi.} =
let h = cast[ptr Handle](handle)
deallocShared(h)
return ok("freed")
suite "ptr return type in .ffi.":
test "returns a heap-allocated handle and reads it back":
# Create context via ffiCtor
var ctorD: CallbackData
initCallbackData(ctorD)
defer: deinitCallbackData(ctorD)
let configJson = ffiSerialize(SimpleConfig(initialValue: 5))
let ctorRet = testlib_create(configJson.cstring, testCallback, addr ctorD)
check not ctorRet.isNil()
waitCallback(ctorD)
check ctorD.retCode == RET_OK
let ctxAddrStr = callbackMsg(ctorD)
check ctxAddrStr.len > 0
let ctxAddr = cast[uint](parseBiggestUInt(ctxAddrStr))
check ctxAddr != 0
let ctx = cast[ptr FFIContext[SimpleLib]](ctxAddr)
defer: check SimpleLibFFIPool.destroyFFIContext(ctx).isOk()
# Alloc a handle
var allocD: CallbackData
initCallbackData(allocD)
defer: deinitCallbackData(allocD)
let npJson = ffiSerialize(NameParam(name: "test"))
let allocRet = testlib_alloc_handle(ctx, testCallback, addr allocD, npJson.cstring)
check allocRet == RET_OK
waitCallback(allocD)
check allocD.retCode == RET_OK
let handleAddrStr = callbackMsg(allocD)
check handleAddrStr.len > 0
let handleAddr = parseBiggestUInt(handleAddrStr)
check handleAddr != 0
# Read the handle back
var readD: CallbackData
initCallbackData(readD)
defer: deinitCallbackData(readD)
let handleJson = ffiSerialize(cast[pointer](handleAddr))
let readRet = testlib_read_handle(ctx, testCallback, addr readD, handleJson.cstring)
check readRet == RET_OK
waitCallback(readD)
check readD.retCode == RET_OK
let readMsg = callbackMsg(readD)
let decodedStr = ffiDeserialize(readMsg.cstring, string).valueOr:
check false
""
check decodedStr == "test:5"
# Free the handle
var freeD: CallbackData
initCallbackData(freeD)
defer: deinitCallbackData(freeD)
let freeRet = testlib_free_handle(ctx, testCallback, addr freeD, handleJson.cstring)
check freeRet == RET_OK
waitCallback(freeD)
check freeD.retCode == RET_OK
# ---------------------------------------------------------------------------
# releaseFFIContext: park & reuse (fd-leak regression)
# ---------------------------------------------------------------------------
proc countOpenFds(): int =
## Number of open fds for this process, or -1 if not determinable on this
## platform. On Linux we count /proc/self/fd; elsewhere we shell out to lsof
## (skipped if lsof is unavailable, e.g. Windows).
when defined(linux):
var n = 0
for _ in walkDir("/proc/self/fd"):
inc n
return n
else:
if findExe("lsof").len == 0:
return -1
try:
let output =
execProcess("lsof", args = ["-p", $getCurrentProcessId()], options = {poUsePath})
return output.splitLines().countIt(it.len > 0)
except CatchableError:
return -1
proc releaseAndWait[T](ctx: ptr FFIContext[T]): cint =
## Test helper mirroring how a C consumer destroys a context: kick off the
## (non-blocking) teardown and block on the callback, returning its retCode.
## RET_OK means the lib's in-flight tasks finished and the context was parked.
var d: CallbackData
initCallbackData(d)
defer:
deinitCallbackData(d)
if ctx.releaseFFIContext(testCallback, addr d).isErr():
return RET_ERR
waitCallback(d)
return d.retCode
suite "releaseFFIContext (park & reuse)":
test "park returns the context and reuses the same live worker":
var pool: FFIContextPool[TestLib]
let ctx1 = pool.createFFIContext().valueOr:
check false
return
check ctx1.releaseAndWait() == RET_OK
# Reacquire: must be the same context, with its worker still running.
let ctx2 = pool.createFFIContext().valueOr:
check false
return
check ctx1 == ctx2
var d: CallbackData
initCallbackData(d)
defer:
deinitCallbackData(d)
check sendRequestToFFIThread(
ctx2, PingRequest.ffiNewReq(testCallback, addr d, "reuse".cstring)
).isOk()
waitCallback(d)
check d.retCode == RET_OK
check callbackMsg(d) == "pong:reuse" # reused worker still processes requests
check pool.destroyFFIContext(ctx2).isOk()
test "park drops the stale event callback and library pointer":
var pool: FFIContextPool[TestLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
ctx.callbackState.callback = cast[pointer](testCallback)
ctx.callbackState.userData = cast[pointer](0xDEAD)
check ctx.releaseAndWait() == RET_OK
check ctx.callbackState.callback.isNil() # a watchdog tick can't call a freed cb
check ctx.callbackState.userData.isNil()
check ctx.myLib.isNil()
check pool.destroyFFIContext(ctx).isOk()
test "fd usage stays bounded across many park/reuse cycles":
if countOpenFds() < 0:
skip() # no fd-counting facility on this platform
else:
var pool: FFIContextPool[TestLib]
# Warm up: the first create builds the context's worker (its fds are allocated
# once here); parking keeps them open for reuse.
block:
let ctx = pool.createFFIContext().valueOr:
check false
return
check ctx.releaseAndWait() == RET_OK
let baseline = countOpenFds()
for _ in 0 ..< 20:
let ctx = pool.createFFIContext().valueOr:
check false
return
var d: CallbackData
initCallbackData(d)
check sendRequestToFFIThread(
ctx, PingRequest.ffiNewReq(testCallback, addr d, "x".cstring)
).isOk()
waitCallback(d)
deinitCallbackData(d)
check ctx.releaseAndWait() == RET_OK
let afterCycles = countOpenFds()
# Reuse must not grow fds. Before the fix each cycle leaked ~10 fds (4
# ThreadSignalPtr socketpairs + 2 dispatcher kqueues); the small slack
# only tolerates unrelated runtime fd noise, not a per-cycle leak.
check afterCycles <= baseline + 5
# Tear the (still parked) context's worker down so the test leaves no threads.
let last = pool.createFFIContext().valueOr:
check false
return
check pool.destroyFFIContext(last).isOk()