mirror of
https://github.com/logos-messaging/nim-ffi.git
synced 2026-06-20 08:19:55 +00:00
feat: recycle pooled FFI contexts; compile-time request ids
Two foreign-host concurrency fixes for refc, both needed so a Go host can hammer the FFI under load without corrupting Nim's per-thread GC. 1. Compile-time request ids. ffiNewReq / the method + ctor wrappers built the request id with `$T` at runtime, allocating a Nim GC string on the foreign caller's (often transient) thread. Emit a `cstring` literal of the type name instead — no allocation on the caller thread. 2. Recycle pooled contexts instead of destroy/recreate. Restores the release/v0.1 model that v0.2 dropped: a pool slot's worker + event threads and signal fds are built once and reused. The ffiDtor now requests a synchronous recycle (drain in-flight handlers, free the lib, clear listeners, release the slot) on the FFI thread, keeping the threads alive; createFFIContext reuses an initialised slot. Without this every create/destroy churned ~6 signal fds, so fd numbers climbed past FD_SETSIZE (1024) and ThreadSignalPtr.waitSync's select() failed with EINVAL under create/destroy load. Adds CtxLifecycle (Active/RecyclePending/Recycling), ctx-level inUse/tryClaim/release/markAsActive, requestRecycle (waits on a new recycleDoneSignal), freeLib (refc GC_unref / orc =destroy of ctor-owned libs), recycleContext, FFIEventRegistry.clearListeners, and roots the ctor-stored ref lib under refc (GC_ref, balanced in freeLib). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
a6b22fc6df
commit
c3d135f46f
@ -6,7 +6,7 @@
|
||||
|
||||
{.passc: "-fPIC".}
|
||||
|
||||
import std/[atomics, locks, options, tables]
|
||||
import std/[atomics, locks, options, sequtils, tables]
|
||||
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
|
||||
import
|
||||
./ffi_types,
|
||||
@ -18,8 +18,30 @@ import
|
||||
|
||||
export ffi_events, ffi_handles
|
||||
|
||||
type CtxLifecycle* {.pure.} = enum
|
||||
## State machine guarding a pooled FFI context (Atomic on FFIContext).
|
||||
## Active -> RecyclePending when the ffiDtor requests recycle
|
||||
## RecyclePending -> Recycling FFI loop claimed it, draining handlers
|
||||
## Recycling -> Active createFFIContext reuses the slot
|
||||
Active
|
||||
RecyclePending
|
||||
Recycling
|
||||
|
||||
type FFIContext*[T] = object
|
||||
myLib*: ptr T # main library object (Waku, LibP2P, SDS, …)
|
||||
myLibRefd*: bool
|
||||
# refc only: true once myLib[] (a ref) has been GC_ref'd to root it against
|
||||
# the cycle collector. Balanced by GC_unref in freeLib.
|
||||
myLibOwned*: bool
|
||||
# true once a ctor stored a createShared'd lib into myLib (vs the worker's
|
||||
# stack fallback). freeLib only frees/destroys owned libs.
|
||||
inUse*: Atomic[bool]
|
||||
# Whether this pooled context is claimed. The recycle handler clears it on
|
||||
# the FFI thread so the slot returns to the pool without recreating threads.
|
||||
lifecycle*: Atomic[CtxLifecycle]
|
||||
recycleDoneSignal: ThreadSignalPtr
|
||||
# fired by the recycle handler once the lib is freed and the slot released;
|
||||
# the synchronous recycleFFIContext caller waits on it.
|
||||
ffiThread: Thread[(ptr FFIContext[T])]
|
||||
eventThread: Thread[(ptr FFIContext[T])]
|
||||
lock: Lock
|
||||
@ -47,6 +69,9 @@ var onFFIThread* {.threadvar.}: bool
|
||||
const git_version* {.strdefine.} = "n/a"
|
||||
|
||||
const
|
||||
RecycleWaitTimeout* = 5.seconds
|
||||
## Caller-side bound for synchronous recycle; the FFI-thread drain itself is
|
||||
## bounded by RecycleTimeout, so this only guards against a wedged worker.
|
||||
EventThreadTickInterval* = 1.seconds
|
||||
FFIHeartbeatStartDelay* = 10.seconds # grace window for library startup
|
||||
FFIHeartbeatStaleThreshold* = 1.seconds
|
||||
@ -70,7 +95,7 @@ proc deinitContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
# ThreadSignalPtr.close() under refc traps in safeUnregisterAndCloseFd
|
||||
# → newDispatcher → rawNewObj → signal-handler re-entry (process hangs).
|
||||
# See tests/test_ffi_context.nim "destroyFFIContext refc workaround".
|
||||
# Fd leak is bounded — destroy runs once per process lifetime.
|
||||
# Fd leak is bounded — with the recycle pool, full destroy is rare.
|
||||
discard
|
||||
else:
|
||||
closeAndNil(ctx.reqSignal)
|
||||
@ -79,6 +104,7 @@ proc deinitContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
closeAndNil(ctx.threadExitSignal)
|
||||
closeAndNil(ctx.eventQueueSignal)
|
||||
closeAndNil(ctx.eventThreadExitSignal)
|
||||
closeAndNil(ctx.recycleDoneSignal)
|
||||
ok()
|
||||
|
||||
proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
@ -101,6 +127,10 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
ctx.threadExitSignal = nil
|
||||
ctx.eventQueueSignal = nil
|
||||
ctx.eventThreadExitSignal = nil
|
||||
ctx.recycleDoneSignal = nil
|
||||
ctx.myLibOwned = false
|
||||
ctx.myLibRefd = false
|
||||
ctx.lifecycle.store(CtxLifecycle.Active)
|
||||
ctx.lock.initLock()
|
||||
initEventRegistry(ctx[].eventRegistry)
|
||||
initHandleRegistry(ctx[].handles)
|
||||
@ -121,6 +151,7 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
newSignalOrErr(ctx.threadExitSignal, "threadExitSignal")
|
||||
newSignalOrErr(ctx.eventQueueSignal, "eventQueueSignal")
|
||||
newSignalOrErr(ctx.eventThreadExitSignal, "eventThreadExitSignal")
|
||||
newSignalOrErr(ctx.recycleDoneSignal, "recycleDoneSignal")
|
||||
|
||||
ctx.registeredRequests = addr ffi_types.registeredRequests
|
||||
|
||||
@ -173,6 +204,43 @@ proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
error "failed to signal eventQueueSignal in signalStop", error = error
|
||||
ok()
|
||||
|
||||
proc tryClaim*[T](ctx: ptr FFIContext[T]): bool =
|
||||
## Atomically claim a free pooled context (false -> true).
|
||||
var expected = false
|
||||
ctx.inUse.compareExchange(expected, true)
|
||||
|
||||
proc release*[T](ctx: ptr FFIContext[T]) =
|
||||
ctx.inUse.store(false)
|
||||
|
||||
proc isInUse*[T](ctx: ptr FFIContext[T]): bool =
|
||||
ctx.inUse.load()
|
||||
|
||||
proc markAsActive*[T](ctx: ptr FFIContext[T]) =
|
||||
## Reused context: its worker threads are still alive; re-arm for requests.
|
||||
ctx.lifecycle.store(CtxLifecycle.Active)
|
||||
|
||||
proc requestRecycle*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
## Ask the FFI thread to drain, free the lib and release the slot, WITHOUT
|
||||
## stopping its worker/event threads, so the next createFFIContext reuses them.
|
||||
## Synchronous: waits on recycleDoneSignal. No fd churn -> no select() limit.
|
||||
ctx.lock.acquire()
|
||||
if ctx.lifecycle.load() != CtxLifecycle.Active:
|
||||
ctx.lock.release()
|
||||
return err("requestRecycle: context is not Active (already recycling)")
|
||||
ctx.lifecycle.store(CtxLifecycle.RecyclePending)
|
||||
ctx.lock.release()
|
||||
|
||||
let fired = ctx.reqSignal.fireSync().valueOr:
|
||||
return err("requestRecycle: failed to signal the FFI thread: " & $error)
|
||||
if not fired:
|
||||
return err("requestRecycle: failed to signal the FFI thread in time")
|
||||
|
||||
let done = ctx.recycleDoneSignal.waitSync(RecycleWaitTimeout).valueOr:
|
||||
return err("requestRecycle: failed waiting for recycle: " & $error)
|
||||
if not done:
|
||||
return err("requestRecycle: recycle did not complete in time")
|
||||
ok()
|
||||
|
||||
## Bound on how long clearContext waits for the FFI thread to exit before
|
||||
## leaking ctx rather than hanging the caller.
|
||||
const ThreadExitTimeout* = 1500.milliseconds
|
||||
|
||||
@ -6,42 +6,55 @@ const MaxFFIContexts* = 32
|
||||
# Only affects upfront pool memory; fds/threads consumed per acquired slot.
|
||||
|
||||
type FFIContextPool*[T] = object
|
||||
## Fixed pool. Bounds ThreadSignalPtr fds at MaxFFIContexts * 2.
|
||||
slots: array[MaxFFIContexts, FFIContext[T]]
|
||||
inUse: array[MaxFFIContexts, Atomic[bool]]
|
||||
|
||||
proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], string] =
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
var expected = false
|
||||
if pool.inUse[i].compareExchange(expected, true):
|
||||
return ok(pool.slots[i].addr)
|
||||
err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)")
|
||||
|
||||
proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) =
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
if pool.slots[i].addr == ctx:
|
||||
pool.inUse[i].store(false)
|
||||
return
|
||||
## Fixed pool. Each slot's worker + event threads and signal fds are built
|
||||
## once (on first use) and reused across create/recycle cycles — recycle keeps
|
||||
## them alive, so repeated create/destroy does not churn fds. Bounds
|
||||
## ThreadSignalPtr fds at MaxFFIContexts * (signals per ctx).
|
||||
contexts: array[MaxFFIContexts, FFIContext[T]]
|
||||
initialized: array[MaxFFIContexts, Atomic[bool]]
|
||||
|
||||
proc createFFIContext*[T](
|
||||
pool: var FFIContextPool[T]
|
||||
): Result[ptr FFIContext[T], string] =
|
||||
let ctx = pool.acquireSlot().valueOr:
|
||||
return err("createFFIContext: acquireSlot failed: " & $error)
|
||||
initContextResources(ctx).isOkOr:
|
||||
pool.releaseSlot(ctx)
|
||||
return err("createFFIContext: initContextResources failed: " & $error)
|
||||
ok(ctx)
|
||||
## Acquires a context from the fixed pool. A slot's worker is built once on
|
||||
## first use and reused (markAsActive) on every later acquisition.
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
let ctx = pool.contexts[i].addr
|
||||
if not ctx.tryClaim():
|
||||
continue
|
||||
if pool.initialized[i].load():
|
||||
# Reused slot: a prior recycle drained and released it; worker still alive.
|
||||
ctx.markAsActive()
|
||||
return ok(ctx)
|
||||
initContextResources(ctx).isOkOr:
|
||||
ctx.release()
|
||||
return err("createFFIContext: initContextResources failed: " & $error)
|
||||
pool.initialized[i].store(true)
|
||||
return ok(ctx)
|
||||
err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)")
|
||||
|
||||
proc recycleFFIContext*[T](
|
||||
pool: var FFIContextPool[T], ctx: ptr FFIContext[T]
|
||||
): Result[void, string] =
|
||||
## Normal teardown: drains in-flight handlers, frees the lib and returns the
|
||||
## slot to the pool WITHOUT stopping its threads, so a later createFFIContext
|
||||
## reuses them. Synchronous (waits for the FFI thread to finish draining).
|
||||
ctx.requestRecycle()
|
||||
|
||||
proc destroyFFIContext*[T](
|
||||
pool: var FFIContextPool[T], ctx: ptr FFIContext[T]
|
||||
): Result[void, string] =
|
||||
## On thread-exit timeout the slot is leaked — closing live-thread resources is unsafe.
|
||||
## Full teardown: stops/joins the worker + event threads and frees resources,
|
||||
## marking the slot uninitialised so a later createFFIContext rebuilds it.
|
||||
## Used for process-level shutdown; normal cleanup uses recycleFFIContext.
|
||||
ctx.stopAndJoinThreads().isOkOr:
|
||||
return err("destroyFFIContext(pool): " & $error)
|
||||
# Required: next acquisition would otherwise re-init a live lock (UB).
|
||||
let deinitRes = ctx.deinitContextResources()
|
||||
pool.releaseSlot(ctx)
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
if pool.contexts[i].addr == ctx:
|
||||
pool.initialized[i].store(false)
|
||||
break
|
||||
ctx.release()
|
||||
deinitRes.isOkOr:
|
||||
return err("destroyFFIContext(pool): " & $error)
|
||||
ok()
|
||||
@ -51,6 +64,6 @@ proc isValidCtx*[T](pool: var FFIContextPool[T], ctx: pointer): bool =
|
||||
if ctx.isNil():
|
||||
return false
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
if cast[pointer](pool.slots[i].addr) == ctx:
|
||||
return pool.inUse[i].load()
|
||||
if cast[pointer](pool.contexts[i].addr) == ctx:
|
||||
return cast[ptr FFIContext[T]](ctx).isInUse()
|
||||
false
|
||||
|
||||
@ -38,6 +38,13 @@ proc deinitEventRegistry*(reg: var FFIEventRegistry) =
|
||||
reg.byEvent = default(Table[string, seq[FFIEventListener]])
|
||||
reg.nextId = 0'u64
|
||||
|
||||
proc clearListeners*(reg: var FFIEventRegistry) {.raises: [].} =
|
||||
## Drops all listeners (used when a context is recycled for reuse) without
|
||||
## touching the lock — the event thread keeps using it across recycles.
|
||||
withLock reg.lock:
|
||||
reg.byEvent.clear()
|
||||
reg.nextId = 0'u64
|
||||
|
||||
proc addEventListener*(
|
||||
reg: var FFIEventRegistry,
|
||||
eventName: string,
|
||||
|
||||
@ -28,6 +28,10 @@ proc sendRequestToFFIThread*(
|
||||
defer:
|
||||
ctx.lock.release()
|
||||
|
||||
if ctx.lifecycle.load() != CtxLifecycle.Active:
|
||||
deleteRequest(ffiRequest)
|
||||
return err("FFI context is not accepting requests (being recycled)")
|
||||
|
||||
let sentOk = ctx.reqChannel.trySend(ffiRequest)
|
||||
if not sentOk:
|
||||
deleteRequest(ffiRequest)
|
||||
@ -79,6 +83,56 @@ proc processRequest[T](
|
||||
except Exception as e:
|
||||
error "Unexpected exception in handleRes", error = e.msg
|
||||
|
||||
const RecycleTimeout = 1500.milliseconds
|
||||
## Bounds how long the recycle handler waits for in-flight handlers before it
|
||||
## cancels them, so a wedged handler cannot block reuse forever.
|
||||
|
||||
proc freeLib[T](ctx: ptr FFIContext[T]) {.gcsafe.} =
|
||||
## Releases the library object the ctor stored in ctx.myLib. Only owned libs
|
||||
## (createShared'd by a ctor) are freed; the worker's stack fallback is not.
|
||||
if not ctx.myLibOwned or ctx.myLib.isNil():
|
||||
ctx.myLib = nil
|
||||
return
|
||||
when not defined(gcRefc):
|
||||
try:
|
||||
{.cast(gcsafe).}:
|
||||
`=destroy`(ctx.myLib[])
|
||||
except Exception:
|
||||
discard
|
||||
else:
|
||||
when T is ref:
|
||||
if ctx.myLibRefd:
|
||||
GC_unref(ctx.myLib[])
|
||||
ctx.myLibRefd = false
|
||||
freeShared(ctx.myLib)
|
||||
ctx.myLib = nil
|
||||
ctx.myLibOwned = false
|
||||
|
||||
proc recycleContext[T](
|
||||
ctx: ptr FFIContext[T], ongoing: ptr seq[Future[void]]
|
||||
) {.async.} =
|
||||
## Drain in-flight handlers, free the lib, clear listeners and release the
|
||||
## slot — all WITHOUT stopping the worker/event threads, so the next
|
||||
## createFFIContext reuses them (no fd churn). Then fire recycleDoneSignal.
|
||||
ongoing[].keepItIf(not it.finished())
|
||||
var drained = ongoing[].len == 0
|
||||
if not drained:
|
||||
drained = await allFutures(ongoing[]).withTimeout(RecycleTimeout)
|
||||
if not drained:
|
||||
for fut in ongoing[]:
|
||||
if not fut.finished():
|
||||
fut.cancelSoon()
|
||||
drained = await allFutures(ongoing[]).withTimeout(RecycleTimeout)
|
||||
|
||||
freeLib(ctx)
|
||||
clearListeners(ctx[].eventRegistry)
|
||||
ongoing[].setLen(0)
|
||||
ctx.release()
|
||||
|
||||
let fireRes = ctx.recycleDoneSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
error "failed to fire recycleDoneSignal", err = fireRes.error
|
||||
|
||||
var ffiEventQueueSignalPtr {.threadvar.}: ThreadSignalPtr
|
||||
# Stashed so the hook has no closure env.
|
||||
|
||||
@ -126,6 +180,13 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
# Freezes if a sync handler blocks the dispatcher; event thread reads to detect wedged FFI thread.
|
||||
discard ctx.ffiHeartbeat.fetchAdd(1)
|
||||
|
||||
# Recycle requested by the ffiDtor: drain + free lib + release the slot,
|
||||
# keeping this thread alive for the next createFFIContext to reuse.
|
||||
var expected = CtxLifecycle.RecyclePending
|
||||
if ctx.lifecycle.compareExchange(expected, CtxLifecycle.Recycling):
|
||||
await recycleContext(ctx, addr pending)
|
||||
continue
|
||||
|
||||
reapCompleted()
|
||||
|
||||
let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds)
|
||||
|
||||
@ -291,14 +291,13 @@ proc buildFFINewReqProc(reqTypeName, body: NimNode): NimNode =
|
||||
`reqObjIdent`.`fieldName` = `fieldName`
|
||||
)
|
||||
|
||||
let reqNameLit =
|
||||
newLit(if reqTypeName.kind == nnkPostfix: $reqTypeName[1] else: $reqTypeName)
|
||||
newBody.add(
|
||||
quote do:
|
||||
let typeStr = $T
|
||||
# Encode directly into shared memory and hand ownership to the request,
|
||||
# avoiding the seq[byte] → allocShared+copyMem second copy.
|
||||
let (sharedData, sharedLen) = cborEncodeShared(`reqObjIdent`)
|
||||
return FFIThreadRequest.initFromOwnedShared(
|
||||
callback, userData, typeStr.cstring, sharedData, sharedLen
|
||||
callback, userData, cstring(`reqNameLit`), sharedData, sharedLen
|
||||
)
|
||||
)
|
||||
|
||||
@ -848,10 +847,11 @@ macro ffi*(prc: untyped): untyped =
|
||||
|
||||
# Build the FFIThreadRequest payload directly from the incoming bytes.
|
||||
let reqPtrIdent = genSym(nskLet, "reqPtr")
|
||||
let reqNameLit =
|
||||
newLit(if reqTypeName.kind == nnkPostfix: $reqTypeName[1] else: $reqTypeName)
|
||||
ffiBody.add quote do:
|
||||
let typeStr = $`reqTypeName`
|
||||
let `reqPtrIdent` = FFIThreadRequest.initFromPtr(
|
||||
callback, userData, typeStr.cstring, reqCbor, int(reqCborLen)
|
||||
callback, userData, cstring(`reqNameLit`), reqCbor, int(reqCborLen)
|
||||
)
|
||||
|
||||
let sendResIdent = genSym(nskLet, "sendRes")
|
||||
@ -969,11 +969,12 @@ proc buildCtorFFINewReqProc(reqTypeName: NimNode, paramNames: seq[string]): NimN
|
||||
let retType = newTree(nnkPtrTy, ident("FFIThreadRequest"))
|
||||
formalParams = @[retType] & formalParams
|
||||
|
||||
let reqNameLit =
|
||||
newLit(if reqTypeName.kind == nnkPostfix: $reqTypeName[1] else: $reqTypeName)
|
||||
var newBody = newStmtList()
|
||||
newBody.add quote do:
|
||||
let typeStr = $T
|
||||
return FFIThreadRequest.initFromPtr(
|
||||
callback, userData, typeStr.cstring, reqCbor, int(reqCborLen)
|
||||
callback, userData, cstring(`reqNameLit`), reqCbor, int(reqCborLen)
|
||||
)
|
||||
|
||||
let newReqProc = newProc(
|
||||
@ -1068,9 +1069,19 @@ proc buildCtorProcessFFIRequestProc(
|
||||
return err($error)
|
||||
|
||||
let myLibIdent = newDotExpr(newTree(nnkDerefExpr, ctxIdent), ident("myLib"))
|
||||
let myLibOwnedIdent =
|
||||
newDotExpr(newTree(nnkDerefExpr, ctxIdent), ident("myLibOwned"))
|
||||
let myLibRefdIdent = newDotExpr(newTree(nnkDerefExpr, ctxIdent), ident("myLibRefd"))
|
||||
newBody.add quote do:
|
||||
`myLibIdent` = createShared(`libTypeName`)
|
||||
`myLibIdent`[] = `libValIdent`
|
||||
`myLibOwnedIdent` = true
|
||||
# Root the ref lib under refc: it lives only via this ptr in non-GC
|
||||
# createShared memory, invisible to the cycle collector. freeLib unroots it.
|
||||
when defined(gcRefc):
|
||||
when `libTypeName` is ref:
|
||||
GC_ref(`myLibIdent`[])
|
||||
`myLibRefdIdent` = true
|
||||
|
||||
newBody.add quote do:
|
||||
return ok($cast[uint](`ctxIdent`))
|
||||
@ -1405,7 +1416,7 @@ macro ffiDtor*(prc: untyped): untyped =
|
||||
let poolIdent = ident($libTypeName & "FFIPool")
|
||||
ffiBody.add quote do:
|
||||
let `destroyResIdent` =
|
||||
`poolIdent`.destroyFFIContext(cast[ptr FFIContext[`libTypeName`]](ctx))
|
||||
`poolIdent`.recycleFFIContext(cast[ptr FFIContext[`libTypeName`]](ctx))
|
||||
if `destroyResIdent`.isErr():
|
||||
return RET_ERR
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user