From c3d135f46f809e09602d44f48949c2a4b2688e37 Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Fri, 19 Jun 2026 23:27:08 +0200 Subject: [PATCH] feat: recycle pooled FFI contexts; compile-time request ids MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two foreign-host concurrency fixes for refc, both needed so a Go host can hammer the FFI under load without corrupting Nim's per-thread GC. 1. Compile-time request ids. ffiNewReq / the method + ctor wrappers built the request id with `$T` at runtime, allocating a Nim GC string on the foreign caller's (often transient) thread. Emit a `cstring` literal of the type name instead — no allocation on the caller thread. 2. Recycle pooled contexts instead of destroy/recreate. Restores the release/v0.1 model that v0.2 dropped: a pool slot's worker + event threads and signal fds are built once and reused. The ffiDtor now requests a synchronous recycle (drain in-flight handlers, free the lib, clear listeners, release the slot) on the FFI thread, keeping the threads alive; createFFIContext reuses an initialised slot. Without this every create/destroy churned ~6 signal fds, so fd numbers climbed past FD_SETSIZE (1024) and ThreadSignalPtr.waitSync's select() failed with EINVAL under create/destroy load. Adds CtxLifecycle (Active/RecyclePending/Recycling), ctx-level inUse/tryClaim/release/markAsActive, requestRecycle (waits on a new recycleDoneSignal), freeLib (refc GC_unref / orc =destroy of ctor-owned libs), recycleContext, FFIEventRegistry.clearListeners, and roots the ctor-stored ref lib under refc (GC_ref, balanced in freeLib). Co-Authored-By: Claude Opus 4.8 --- ffi/ffi_context.nim | 72 ++++++++++++++++++++++++++++++++++++-- ffi/ffi_context_pool.nim | 67 +++++++++++++++++++++-------------- ffi/ffi_events.nim | 7 ++++ ffi/ffi_thread.nim | 61 ++++++++++++++++++++++++++++++++ ffi/internal/ffi_macro.nim | 29 ++++++++++----- 5 files changed, 198 insertions(+), 38 deletions(-) diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index ef1b6f5..45044a1 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -6,7 +6,7 @@ {.passc: "-fPIC".} -import std/[atomics, locks, options, tables] +import std/[atomics, locks, options, sequtils, tables] import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results import ./ffi_types, @@ -18,8 +18,30 @@ import export ffi_events, ffi_handles +type CtxLifecycle* {.pure.} = enum + ## State machine guarding a pooled FFI context (Atomic on FFIContext). + ## Active -> RecyclePending when the ffiDtor requests recycle + ## RecyclePending -> Recycling FFI loop claimed it, draining handlers + ## Recycling -> Active createFFIContext reuses the slot + Active + RecyclePending + Recycling + type FFIContext*[T] = object myLib*: ptr T # main library object (Waku, LibP2P, SDS, …) + myLibRefd*: bool + # refc only: true once myLib[] (a ref) has been GC_ref'd to root it against + # the cycle collector. Balanced by GC_unref in freeLib. + myLibOwned*: bool + # true once a ctor stored a createShared'd lib into myLib (vs the worker's + # stack fallback). freeLib only frees/destroys owned libs. + inUse*: Atomic[bool] + # Whether this pooled context is claimed. The recycle handler clears it on + # the FFI thread so the slot returns to the pool without recreating threads. + lifecycle*: Atomic[CtxLifecycle] + recycleDoneSignal: ThreadSignalPtr + # fired by the recycle handler once the lib is freed and the slot released; + # the synchronous recycleFFIContext caller waits on it. ffiThread: Thread[(ptr FFIContext[T])] eventThread: Thread[(ptr FFIContext[T])] lock: Lock @@ -47,6 +69,9 @@ var onFFIThread* {.threadvar.}: bool const git_version* {.strdefine.} = "n/a" const + RecycleWaitTimeout* = 5.seconds + ## Caller-side bound for synchronous recycle; the FFI-thread drain itself is + ## bounded by RecycleTimeout, so this only guards against a wedged worker. EventThreadTickInterval* = 1.seconds FFIHeartbeatStartDelay* = 10.seconds # grace window for library startup FFIHeartbeatStaleThreshold* = 1.seconds @@ -70,7 +95,7 @@ proc deinitContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = # ThreadSignalPtr.close() under refc traps in safeUnregisterAndCloseFd # → newDispatcher → rawNewObj → signal-handler re-entry (process hangs). # See tests/test_ffi_context.nim "destroyFFIContext refc workaround". - # Fd leak is bounded — destroy runs once per process lifetime. + # Fd leak is bounded — with the recycle pool, full destroy is rare. discard else: closeAndNil(ctx.reqSignal) @@ -79,6 +104,7 @@ proc deinitContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = closeAndNil(ctx.threadExitSignal) closeAndNil(ctx.eventQueueSignal) closeAndNil(ctx.eventThreadExitSignal) + closeAndNil(ctx.recycleDoneSignal) ok() proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] = @@ -101,6 +127,10 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = ctx.threadExitSignal = nil ctx.eventQueueSignal = nil ctx.eventThreadExitSignal = nil + ctx.recycleDoneSignal = nil + ctx.myLibOwned = false + ctx.myLibRefd = false + ctx.lifecycle.store(CtxLifecycle.Active) ctx.lock.initLock() initEventRegistry(ctx[].eventRegistry) initHandleRegistry(ctx[].handles) @@ -121,6 +151,7 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = newSignalOrErr(ctx.threadExitSignal, "threadExitSignal") newSignalOrErr(ctx.eventQueueSignal, "eventQueueSignal") newSignalOrErr(ctx.eventThreadExitSignal, "eventThreadExitSignal") + newSignalOrErr(ctx.recycleDoneSignal, "recycleDoneSignal") ctx.registeredRequests = addr ffi_types.registeredRequests @@ -173,6 +204,43 @@ proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] = error "failed to signal eventQueueSignal in signalStop", error = error ok() +proc tryClaim*[T](ctx: ptr FFIContext[T]): bool = + ## Atomically claim a free pooled context (false -> true). + var expected = false + ctx.inUse.compareExchange(expected, true) + +proc release*[T](ctx: ptr FFIContext[T]) = + ctx.inUse.store(false) + +proc isInUse*[T](ctx: ptr FFIContext[T]): bool = + ctx.inUse.load() + +proc markAsActive*[T](ctx: ptr FFIContext[T]) = + ## Reused context: its worker threads are still alive; re-arm for requests. + ctx.lifecycle.store(CtxLifecycle.Active) + +proc requestRecycle*[T](ctx: ptr FFIContext[T]): Result[void, string] = + ## Ask the FFI thread to drain, free the lib and release the slot, WITHOUT + ## stopping its worker/event threads, so the next createFFIContext reuses them. + ## Synchronous: waits on recycleDoneSignal. No fd churn -> no select() limit. + ctx.lock.acquire() + if ctx.lifecycle.load() != CtxLifecycle.Active: + ctx.lock.release() + return err("requestRecycle: context is not Active (already recycling)") + ctx.lifecycle.store(CtxLifecycle.RecyclePending) + ctx.lock.release() + + let fired = ctx.reqSignal.fireSync().valueOr: + return err("requestRecycle: failed to signal the FFI thread: " & $error) + if not fired: + return err("requestRecycle: failed to signal the FFI thread in time") + + let done = ctx.recycleDoneSignal.waitSync(RecycleWaitTimeout).valueOr: + return err("requestRecycle: failed waiting for recycle: " & $error) + if not done: + return err("requestRecycle: recycle did not complete in time") + ok() + ## Bound on how long clearContext waits for the FFI thread to exit before ## leaking ctx rather than hanging the caller. const ThreadExitTimeout* = 1500.milliseconds diff --git a/ffi/ffi_context_pool.nim b/ffi/ffi_context_pool.nim index 5e2d2cb..b00c11f 100644 --- a/ffi/ffi_context_pool.nim +++ b/ffi/ffi_context_pool.nim @@ -6,42 +6,55 @@ const MaxFFIContexts* = 32 # Only affects upfront pool memory; fds/threads consumed per acquired slot. type FFIContextPool*[T] = object - ## Fixed pool. Bounds ThreadSignalPtr fds at MaxFFIContexts * 2. - slots: array[MaxFFIContexts, FFIContext[T]] - inUse: array[MaxFFIContexts, Atomic[bool]] - -proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], string] = - for i in 0 ..< MaxFFIContexts: - var expected = false - if pool.inUse[i].compareExchange(expected, true): - return ok(pool.slots[i].addr) - err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)") - -proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) = - for i in 0 ..< MaxFFIContexts: - if pool.slots[i].addr == ctx: - pool.inUse[i].store(false) - return + ## Fixed pool. Each slot's worker + event threads and signal fds are built + ## once (on first use) and reused across create/recycle cycles — recycle keeps + ## them alive, so repeated create/destroy does not churn fds. Bounds + ## ThreadSignalPtr fds at MaxFFIContexts * (signals per ctx). + contexts: array[MaxFFIContexts, FFIContext[T]] + initialized: array[MaxFFIContexts, Atomic[bool]] proc createFFIContext*[T]( pool: var FFIContextPool[T] ): Result[ptr FFIContext[T], string] = - let ctx = pool.acquireSlot().valueOr: - return err("createFFIContext: acquireSlot failed: " & $error) - initContextResources(ctx).isOkOr: - pool.releaseSlot(ctx) - return err("createFFIContext: initContextResources failed: " & $error) - ok(ctx) + ## Acquires a context from the fixed pool. A slot's worker is built once on + ## first use and reused (markAsActive) on every later acquisition. + for i in 0 ..< MaxFFIContexts: + let ctx = pool.contexts[i].addr + if not ctx.tryClaim(): + continue + if pool.initialized[i].load(): + # Reused slot: a prior recycle drained and released it; worker still alive. + ctx.markAsActive() + return ok(ctx) + initContextResources(ctx).isOkOr: + ctx.release() + return err("createFFIContext: initContextResources failed: " & $error) + pool.initialized[i].store(true) + return ok(ctx) + err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)") + +proc recycleFFIContext*[T]( + pool: var FFIContextPool[T], ctx: ptr FFIContext[T] +): Result[void, string] = + ## Normal teardown: drains in-flight handlers, frees the lib and returns the + ## slot to the pool WITHOUT stopping its threads, so a later createFFIContext + ## reuses them. Synchronous (waits for the FFI thread to finish draining). + ctx.requestRecycle() proc destroyFFIContext*[T]( pool: var FFIContextPool[T], ctx: ptr FFIContext[T] ): Result[void, string] = - ## On thread-exit timeout the slot is leaked — closing live-thread resources is unsafe. + ## Full teardown: stops/joins the worker + event threads and frees resources, + ## marking the slot uninitialised so a later createFFIContext rebuilds it. + ## Used for process-level shutdown; normal cleanup uses recycleFFIContext. ctx.stopAndJoinThreads().isOkOr: return err("destroyFFIContext(pool): " & $error) - # Required: next acquisition would otherwise re-init a live lock (UB). let deinitRes = ctx.deinitContextResources() - pool.releaseSlot(ctx) + for i in 0 ..< MaxFFIContexts: + if pool.contexts[i].addr == ctx: + pool.initialized[i].store(false) + break + ctx.release() deinitRes.isOkOr: return err("destroyFFIContext(pool): " & $error) ok() @@ -51,6 +64,6 @@ proc isValidCtx*[T](pool: var FFIContextPool[T], ctx: pointer): bool = if ctx.isNil(): return false for i in 0 ..< MaxFFIContexts: - if cast[pointer](pool.slots[i].addr) == ctx: - return pool.inUse[i].load() + if cast[pointer](pool.contexts[i].addr) == ctx: + return cast[ptr FFIContext[T]](ctx).isInUse() false diff --git a/ffi/ffi_events.nim b/ffi/ffi_events.nim index 898746c..d41420a 100644 --- a/ffi/ffi_events.nim +++ b/ffi/ffi_events.nim @@ -38,6 +38,13 @@ proc deinitEventRegistry*(reg: var FFIEventRegistry) = reg.byEvent = default(Table[string, seq[FFIEventListener]]) reg.nextId = 0'u64 +proc clearListeners*(reg: var FFIEventRegistry) {.raises: [].} = + ## Drops all listeners (used when a context is recycled for reuse) without + ## touching the lock — the event thread keeps using it across recycles. + withLock reg.lock: + reg.byEvent.clear() + reg.nextId = 0'u64 + proc addEventListener*( reg: var FFIEventRegistry, eventName: string, diff --git a/ffi/ffi_thread.nim b/ffi/ffi_thread.nim index ae5b6d8..ad89542 100644 --- a/ffi/ffi_thread.nim +++ b/ffi/ffi_thread.nim @@ -28,6 +28,10 @@ proc sendRequestToFFIThread*( defer: ctx.lock.release() + if ctx.lifecycle.load() != CtxLifecycle.Active: + deleteRequest(ffiRequest) + return err("FFI context is not accepting requests (being recycled)") + let sentOk = ctx.reqChannel.trySend(ffiRequest) if not sentOk: deleteRequest(ffiRequest) @@ -79,6 +83,56 @@ proc processRequest[T]( except Exception as e: error "Unexpected exception in handleRes", error = e.msg +const RecycleTimeout = 1500.milliseconds + ## Bounds how long the recycle handler waits for in-flight handlers before it + ## cancels them, so a wedged handler cannot block reuse forever. + +proc freeLib[T](ctx: ptr FFIContext[T]) {.gcsafe.} = + ## Releases the library object the ctor stored in ctx.myLib. Only owned libs + ## (createShared'd by a ctor) are freed; the worker's stack fallback is not. + if not ctx.myLibOwned or ctx.myLib.isNil(): + ctx.myLib = nil + return + when not defined(gcRefc): + try: + {.cast(gcsafe).}: + `=destroy`(ctx.myLib[]) + except Exception: + discard + else: + when T is ref: + if ctx.myLibRefd: + GC_unref(ctx.myLib[]) + ctx.myLibRefd = false + freeShared(ctx.myLib) + ctx.myLib = nil + ctx.myLibOwned = false + +proc recycleContext[T]( + ctx: ptr FFIContext[T], ongoing: ptr seq[Future[void]] +) {.async.} = + ## Drain in-flight handlers, free the lib, clear listeners and release the + ## slot — all WITHOUT stopping the worker/event threads, so the next + ## createFFIContext reuses them (no fd churn). Then fire recycleDoneSignal. + ongoing[].keepItIf(not it.finished()) + var drained = ongoing[].len == 0 + if not drained: + drained = await allFutures(ongoing[]).withTimeout(RecycleTimeout) + if not drained: + for fut in ongoing[]: + if not fut.finished(): + fut.cancelSoon() + drained = await allFutures(ongoing[]).withTimeout(RecycleTimeout) + + freeLib(ctx) + clearListeners(ctx[].eventRegistry) + ongoing[].setLen(0) + ctx.release() + + let fireRes = ctx.recycleDoneSignal.fireSync() + if fireRes.isErr(): + error "failed to fire recycleDoneSignal", err = fireRes.error + var ffiEventQueueSignalPtr {.threadvar.}: ThreadSignalPtr # Stashed so the hook has no closure env. @@ -126,6 +180,13 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = # Freezes if a sync handler blocks the dispatcher; event thread reads to detect wedged FFI thread. discard ctx.ffiHeartbeat.fetchAdd(1) + # Recycle requested by the ffiDtor: drain + free lib + release the slot, + # keeping this thread alive for the next createFFIContext to reuse. + var expected = CtxLifecycle.RecyclePending + if ctx.lifecycle.compareExchange(expected, CtxLifecycle.Recycling): + await recycleContext(ctx, addr pending) + continue + reapCompleted() let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds) diff --git a/ffi/internal/ffi_macro.nim b/ffi/internal/ffi_macro.nim index 7ec84fe..8aed7be 100644 --- a/ffi/internal/ffi_macro.nim +++ b/ffi/internal/ffi_macro.nim @@ -291,14 +291,13 @@ proc buildFFINewReqProc(reqTypeName, body: NimNode): NimNode = `reqObjIdent`.`fieldName` = `fieldName` ) + let reqNameLit = + newLit(if reqTypeName.kind == nnkPostfix: $reqTypeName[1] else: $reqTypeName) newBody.add( quote do: - let typeStr = $T - # Encode directly into shared memory and hand ownership to the request, - # avoiding the seq[byte] → allocShared+copyMem second copy. let (sharedData, sharedLen) = cborEncodeShared(`reqObjIdent`) return FFIThreadRequest.initFromOwnedShared( - callback, userData, typeStr.cstring, sharedData, sharedLen + callback, userData, cstring(`reqNameLit`), sharedData, sharedLen ) ) @@ -848,10 +847,11 @@ macro ffi*(prc: untyped): untyped = # Build the FFIThreadRequest payload directly from the incoming bytes. let reqPtrIdent = genSym(nskLet, "reqPtr") + let reqNameLit = + newLit(if reqTypeName.kind == nnkPostfix: $reqTypeName[1] else: $reqTypeName) ffiBody.add quote do: - let typeStr = $`reqTypeName` let `reqPtrIdent` = FFIThreadRequest.initFromPtr( - callback, userData, typeStr.cstring, reqCbor, int(reqCborLen) + callback, userData, cstring(`reqNameLit`), reqCbor, int(reqCborLen) ) let sendResIdent = genSym(nskLet, "sendRes") @@ -969,11 +969,12 @@ proc buildCtorFFINewReqProc(reqTypeName: NimNode, paramNames: seq[string]): NimN let retType = newTree(nnkPtrTy, ident("FFIThreadRequest")) formalParams = @[retType] & formalParams + let reqNameLit = + newLit(if reqTypeName.kind == nnkPostfix: $reqTypeName[1] else: $reqTypeName) var newBody = newStmtList() newBody.add quote do: - let typeStr = $T return FFIThreadRequest.initFromPtr( - callback, userData, typeStr.cstring, reqCbor, int(reqCborLen) + callback, userData, cstring(`reqNameLit`), reqCbor, int(reqCborLen) ) let newReqProc = newProc( @@ -1068,9 +1069,19 @@ proc buildCtorProcessFFIRequestProc( return err($error) let myLibIdent = newDotExpr(newTree(nnkDerefExpr, ctxIdent), ident("myLib")) + let myLibOwnedIdent = + newDotExpr(newTree(nnkDerefExpr, ctxIdent), ident("myLibOwned")) + let myLibRefdIdent = newDotExpr(newTree(nnkDerefExpr, ctxIdent), ident("myLibRefd")) newBody.add quote do: `myLibIdent` = createShared(`libTypeName`) `myLibIdent`[] = `libValIdent` + `myLibOwnedIdent` = true + # Root the ref lib under refc: it lives only via this ptr in non-GC + # createShared memory, invisible to the cycle collector. freeLib unroots it. + when defined(gcRefc): + when `libTypeName` is ref: + GC_ref(`myLibIdent`[]) + `myLibRefdIdent` = true newBody.add quote do: return ok($cast[uint](`ctxIdent`)) @@ -1405,7 +1416,7 @@ macro ffiDtor*(prc: untyped): untyped = let poolIdent = ident($libTypeName & "FFIPool") ffiBody.add quote do: let `destroyResIdent` = - `poolIdent`.destroyFFIContext(cast[ptr FFIContext[`libTypeName`]](ctx)) + `poolIdent`.recycleFFIContext(cast[ptr FFIContext[`libTypeName`]](ctx)) if `destroyResIdent`.isErr(): return RET_ERR