mirror of
https://github.com/logos-messaging/nim-ffi.git
synced 2026-06-21 16:59:30 +00:00
better ctx lifecycle management
This commit is contained in:
parent
e433624645
commit
18946d0593
@ -2,7 +2,7 @@
|
||||
{.pragma: callback, cdecl, raises: [], gcsafe.}
|
||||
{.passc: "-fPIC".}
|
||||
|
||||
import std/[atomics, locks, json, tables]
|
||||
import std/[atomics, locks, json, tables, sequtils]
|
||||
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
|
||||
import ./ffi_types, ./ffi_thread_request, ./internal/ffi_macro, ./logging
|
||||
|
||||
@ -12,6 +12,26 @@ type FFICallbackState* = object
|
||||
callback*: pointer
|
||||
userData*: pointer
|
||||
|
||||
type CtxLifecycle {.pure.} = enum
|
||||
## Request-acceptance + recycle handshake for a pooled context, held as an
|
||||
## Atomic on FFIContext. ("Recycle" = drain the context and return its slot to
|
||||
## the pool for reuse, keeping the worker alive — unlike destroyFFIContext, which
|
||||
## fully tears the threads down.) Invariants:
|
||||
## * Requests are accepted ONLY in `Active`; the gate in sendRequestToFFIThread
|
||||
## and the watchdog ping both test `== Active`.
|
||||
## * Legal transitions, and who performs them:
|
||||
## Active -> RecyclePending requestRecycle (caller, under `lock`)
|
||||
## RecyclePending -> Recycling the FFI loop (one-shot compareExchange)
|
||||
## Recycling -> Active markReacquired (caller, on reuse)
|
||||
## (initContextResources starts a slot in `Active`.)
|
||||
## * The gate stays closed across BOTH RecyclePending and Recycling, so no
|
||||
## request can dispatch onto a context being recycled or about to be reused.
|
||||
## * Only the FFI loop makes the RecyclePending -> Recycling move, so the
|
||||
## recycle runs exactly once per request.
|
||||
Active ## serving requests
|
||||
RecyclePending ## recycle requested; FFI loop hasn't claimed it yet
|
||||
Recycling ## FFI loop claimed it: draining handlers, then freeing
|
||||
|
||||
type FFIContext*[T] = object
|
||||
myLib*: ptr T
|
||||
# main library object (e.g., Waku, LibP2P, SDS, the one to be exposed as a library)
|
||||
@ -33,6 +53,17 @@ type FFIContext*[T] = object
|
||||
userData*: pointer
|
||||
callbackState*: FFICallbackState
|
||||
running: Atomic[bool] # To control when the threads are running
|
||||
lifecycle: Atomic[CtxLifecycle]
|
||||
# Request gate + recycle handshake in one. See CtxLifecycle for the states,
|
||||
# transitions and invariants.
|
||||
recycleCallback: FFICallBack
|
||||
# The destructor's callback, fired by the recycle handler with the outcome:
|
||||
# RET_OK once drained, RET_ERR if it timed out. Set by requestRecycle.
|
||||
recycleUserData: pointer
|
||||
inUse: Atomic[bool]
|
||||
# Whether the slot is claimed. createFFIContext claims it (false -> true); the
|
||||
# recycle handler clears it once drained. On the slot so the owning thread can
|
||||
# release it without reaching into the pool.
|
||||
registeredRequests: ptr Table[cstring, FFIRequestProc]
|
||||
# Pointer to with the registered requests at compile time
|
||||
|
||||
@ -97,6 +128,12 @@ proc sendRequestToFFIThread*(
|
||||
defer:
|
||||
ctx.lock.release()
|
||||
|
||||
## A recycle closes this gate (under the same lock), so a queued or late sender
|
||||
## bails here instead of dispatching onto a slot about to be reused.
|
||||
if ctx.lifecycle.load() != CtxLifecycle.Active:
|
||||
deleteRequest(ffiRequest)
|
||||
return err("FFI context is not accepting requests (being recycled)")
|
||||
|
||||
## Sending the request
|
||||
let sentOk = ctx.reqChannel.trySend(ffiRequest)
|
||||
if not sentOk:
|
||||
@ -162,6 +199,11 @@ proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} =
|
||||
debug "Watchdog thread exiting because FFIContext is not running"
|
||||
break
|
||||
|
||||
if ctx.lifecycle.load() != CtxLifecycle.Active:
|
||||
## Gate closed (being recycled, not yet reused): a ping would just fail
|
||||
## and spuriously trip onNotResponding. Skip until reused.
|
||||
continue
|
||||
|
||||
let callback = proc(
|
||||
callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer
|
||||
) {.cdecl, gcsafe, raises: [].} =
|
||||
@ -186,6 +228,8 @@ proc processRequest[T](
|
||||
request: ptr FFIThreadRequest, ctx: ptr FFIContext[T]
|
||||
) {.async.} =
|
||||
## Invoked within the FFI thread to process a request coming from the FFI API consumer thread.
|
||||
## ffiThreadBody keeps this proc's Future in the run loop's `pending` seq so a
|
||||
## destroy can await (or cancel) it before freeing myLib.
|
||||
|
||||
let reqId = $request[].reqId
|
||||
## The reqId determines which proc will handle the request.
|
||||
@ -206,6 +250,10 @@ proc processRequest[T](
|
||||
let res =
|
||||
try:
|
||||
await retFut
|
||||
except CancelledError as exc:
|
||||
## Destroy timed out and cancelled us: turn it into an error so handleRes
|
||||
## still fires the callback and frees the request.
|
||||
Result[string, string].err("Request cancelled during destroy: " & exc.msg)
|
||||
except AsyncError as exc:
|
||||
Result[string, string].err(
|
||||
"Async error in processRequest for " & reqId & ": " & exc.msg
|
||||
@ -219,6 +267,78 @@ proc processRequest[T](
|
||||
except Exception as exc:
|
||||
error "Unexpected exception in handleRes", error = exc.msg
|
||||
|
||||
proc freeLib[T](ctx: ptr FFIContext[T]) {.gcsafe.} =
|
||||
## Frees the createShared'd library object in ctx.myLib. Runs on the FFI
|
||||
## thread, which is what makes destroying its GC fields safe.
|
||||
if ctx.myLib.isNil():
|
||||
return
|
||||
when not defined(gcRefc):
|
||||
## orc: shared heap, so run the destructor here. The hook isn't inferred
|
||||
## gcsafe but only touches this (owning-thread) object, so assert it.
|
||||
{.cast(gcsafe).}:
|
||||
`=destroy`(ctx.myLib[])
|
||||
else:
|
||||
## refc: `=destroy` here hits the unsafe Selector path (see cleanUpResources).
|
||||
## Reclaim only the wrapper; leak the inner fields, like the signal fds.
|
||||
discard
|
||||
freeShared(ctx.myLib)
|
||||
ctx.myLib = nil
|
||||
|
||||
var RecycleTimeout* = 1500.milliseconds
|
||||
## Upper bound the recycle handler waits for in-flight handlers before it
|
||||
## cancels them and reports the ctx as stuck. The drain returns as soon as they
|
||||
## finish, so this only bounds a *stuck* handler. A `var` so tests can shorten it.
|
||||
|
||||
proc recycleContext[T](
|
||||
ctx: ptr FFIContext[T], pending: ptr seq[Future[void]]
|
||||
) {.async.} =
|
||||
## Recycle handler, on the FFI thread (requestRecycle already closed the gate):
|
||||
## drain the in-flight handlers, free the lib object, release the slot for reuse,
|
||||
## and fire the callback with the outcome. Never blocks the caller.
|
||||
##
|
||||
## `pending` is the run loop's seq of handler Futures, by ptr (async procs can't
|
||||
## take `var`); holding the instances lets us await and cancel them.
|
||||
pending[].keepItIf(not it.finished())
|
||||
|
||||
## 1. Let the in-flight handlers finish on their own, bounded by RecycleTimeout.
|
||||
var naturallyDrained = pending[].len == 0
|
||||
if not naturallyDrained:
|
||||
naturallyDrained = await allFutures(pending[]).withTimeout(RecycleTimeout)
|
||||
|
||||
## 2. If any are wedged, cancel them and give the cancellations a bounded moment
|
||||
## to unwind, so the slot can be reclaimed rather than leaked.
|
||||
var safeToRecycle = naturallyDrained
|
||||
if not naturallyDrained:
|
||||
for fut in pending[]:
|
||||
if not fut.finished():
|
||||
fut.cancelSoon()
|
||||
safeToRecycle = await allFutures(pending[]).withTimeout(RecycleTimeout)
|
||||
|
||||
let cb = ctx.recycleCallback
|
||||
let ud = ctx.recycleUserData
|
||||
ctx.recycleCallback = nil
|
||||
|
||||
if safeToRecycle:
|
||||
## Nothing can touch the context now. Free the lib here, then release the slot
|
||||
## BEFORE the callback (the atomic store publishes these writes to whoever
|
||||
## reclaims it) so a caller reacquiring on the callback finds it already free.
|
||||
freeLib(ctx)
|
||||
ctx.callbackState = default(FFICallbackState)
|
||||
pending[].setLen(0)
|
||||
ctx.unclaim()
|
||||
|
||||
if not cb.isNil():
|
||||
foreignThreadGc:
|
||||
## Never hand the callback nil: empty string on success, reason on timeout.
|
||||
## An empty string's cstring still points at a '\0', so msg[0] is a safe
|
||||
## address with len 0.
|
||||
let msg =
|
||||
if naturallyDrained: ""
|
||||
else: "recycle: in-flight requests did not finish in time"
|
||||
let cmsg = msg.cstring
|
||||
let retCode = if naturallyDrained: RET_OK else: RET_ERR
|
||||
cb(retCode, unsafeAddr cmsg[0], cast[csize_t](msg.len), ud)
|
||||
|
||||
proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
## FFI thread body that attends library user API requests
|
||||
ffiCurrentCallbackState = addr ctx[].callbackState
|
||||
@ -233,11 +353,20 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error
|
||||
|
||||
let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} =
|
||||
var ffiReqHandler: T
|
||||
## Holds the main library object, i.e., in charge of handling the ffi requests.
|
||||
## e.g., Waku, LibP2P, SDS, etc.
|
||||
## Handler Futures live here on the FFI thread's heap, NOT on FFIContext
|
||||
## (shared pool memory, must stay GC-free); holding them lets destroy await
|
||||
## and cancel them.
|
||||
var pending: seq[Future[void]]
|
||||
|
||||
while ctx.running.load():
|
||||
## Recycle requested: claim it (RecyclePending -> Recycling, one-shot) and run
|
||||
## the recycle on this owning thread, then keep looping so the worker stays
|
||||
## alive for the slot's next reuse.
|
||||
var expected = CtxLifecycle.RecyclePending
|
||||
if ctx.lifecycle.compareExchange(expected, CtxLifecycle.Recycling):
|
||||
await recycleContext(ctx, addr pending)
|
||||
continue
|
||||
|
||||
let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds)
|
||||
if not gotSignal:
|
||||
continue
|
||||
@ -247,11 +376,10 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
if not ctx.reqChannel.tryRecv(request):
|
||||
continue
|
||||
|
||||
if ctx.myLib.isNil():
|
||||
ctx.myLib = addr ffiReqHandler
|
||||
|
||||
## Handle the request
|
||||
asyncSpawn processRequest(request, ctx)
|
||||
## Dispatch and remember the handler's Future (pruning finished ones) so a
|
||||
## later recycle can await or cancel it.
|
||||
pending.keepItIf(not it.finished())
|
||||
pending.add(processRequest(request, ctx))
|
||||
|
||||
let fireRes = ctx.reqReceivedSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
@ -323,6 +451,7 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
|
||||
ctx.registeredRequests = addr ffi_types.registeredRequests
|
||||
|
||||
ctx.lifecycle.store(CtxLifecycle.Active)
|
||||
ctx.running.store(true)
|
||||
|
||||
try:
|
||||
@ -385,3 +514,45 @@ proc stopAndJoinThreads*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
joinThread(ctx.ffiThread)
|
||||
joinThread(ctx.watchdogThread)
|
||||
return ok()
|
||||
|
||||
proc requestRecycle*[T](
|
||||
ctx: ptr FFIContext[T], callback: FFICallBack, userData: pointer
|
||||
): Result[void, string] =
|
||||
## Asks the FFI thread to recycle the context and fire `callback` with the
|
||||
## outcome (RET_OK drained, RET_ERR stuck). NON-BLOCKING.
|
||||
##
|
||||
## Order matters: set the callback before flipping to RecyclePending (the flip is
|
||||
## the trigger), under `lock` to serialise the gate with sendRequestToFFIThread.
|
||||
ctx.recycleCallback = callback
|
||||
ctx.recycleUserData = userData
|
||||
|
||||
ctx.lock.acquire()
|
||||
ctx.lifecycle.store(CtxLifecycle.RecyclePending)
|
||||
ctx.lock.release()
|
||||
|
||||
let fired = ctx.reqSignal.fireSync().valueOr:
|
||||
return err("requestRecycle: failed to signal the FFI thread: " & $error)
|
||||
if not fired:
|
||||
return err("requestRecycle: failed to signal the FFI thread in time")
|
||||
return ok()
|
||||
|
||||
proc markReacquired*[T](ctx: ptr FFIContext[T]) =
|
||||
## Re-arms a recycled context when its slot is reacquired by createFFIContext:
|
||||
## moves Recycling -> Active (re-opening the gate). The FFI thread's `pending`
|
||||
## seq was already drained and myLib freed by the recycle handler.
|
||||
ctx.lifecycle.store(CtxLifecycle.Active)
|
||||
|
||||
proc tryClaim*[T](ctx: ptr FFIContext[T]): bool =
|
||||
## Atomically claim this slot (false -> true). Returns true if we won it, false
|
||||
## if it was already claimed. Used by createFFIContext to hand out a free slot.
|
||||
var expected = false
|
||||
ctx.inUse.compareExchange(expected, true)
|
||||
|
||||
proc unclaim*[T](ctx: ptr FFIContext[T]) =
|
||||
## Mark the slot free for reuse. Called by the recycle handler on the FFI thread
|
||||
## once teardown is done, and on creation failure / full teardown.
|
||||
ctx.inUse.store(false)
|
||||
|
||||
proc isClaimed*[T](ctx: ptr FFIContext[T]): bool =
|
||||
## Whether the slot is currently claimed by a consumer.
|
||||
ctx.inUse.load()
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import std/atomics
|
||||
import results
|
||||
import ./ffi_context
|
||||
import ./ffi_context, ./ffi_types
|
||||
|
||||
const MaxFFIContexts* = 32
|
||||
## Maximum number of concurrently live FFI contexts when using FFIContextPool.
|
||||
@ -12,19 +12,12 @@ type FFIContextPool*[T] = object
|
||||
## and bounds the total number of file descriptors consumed by ThreadSignalPtrs
|
||||
## to at most MaxFFIContexts * 2.
|
||||
slots: array[MaxFFIContexts, FFIContext[T]]
|
||||
inUse: array[MaxFFIContexts, Atomic[bool]]
|
||||
initialized: array[MaxFFIContexts, Atomic[bool]]
|
||||
## Whether a slot's worker (threads, chronos dispatcher and ThreadSignalPtrs)
|
||||
## has been built. Set on first acquisition and kept set across park/reuse,
|
||||
## so a reacquired slot reuses the same fds instead of allocating a fresh set
|
||||
## every create/destroy cycle. Cleared only by full teardown.
|
||||
|
||||
proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) =
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
if pool.slots[i].addr == ctx:
|
||||
pool.inUse[i].store(false)
|
||||
return
|
||||
|
||||
proc createFFIContext*[T](
|
||||
pool: var FFIContextPool[T]
|
||||
): Result[ptr FFIContext[T], string] =
|
||||
@ -34,44 +27,35 @@ proc createFFIContext*[T](
|
||||
## worker down). This is what keeps fd usage bounded: repeated create/destroy
|
||||
## cycles no longer leak a fresh set of ThreadSignalPtr/dispatcher fds.
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
var expected = false
|
||||
if not pool.inUse[i].compareExchange(expected, true):
|
||||
continue
|
||||
let ctx = pool.slots[i].addr
|
||||
if not ctx.tryClaim():
|
||||
continue
|
||||
if pool.initialized[i].load():
|
||||
## Reused slot: worker threads, dispatcher and signals are already alive.
|
||||
## Reused slot: a prior destroy drained and released it, worker still alive.
|
||||
## Re-arm the gate and hand it back.
|
||||
ctx.markReacquired()
|
||||
return ok(ctx)
|
||||
initContextResources(ctx).isOkOr:
|
||||
pool.inUse[i].store(false)
|
||||
ctx.unclaim()
|
||||
return err("createFFIContext: initContextResources failed: " & $error)
|
||||
pool.initialized[i].store(true)
|
||||
return ok(ctx)
|
||||
return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)")
|
||||
|
||||
proc releaseFFIContext*[T](
|
||||
pool: var FFIContextPool[T], ctx: ptr FFIContext[T]
|
||||
pool: var FFIContextPool[T],
|
||||
ctx: ptr FFIContext[T],
|
||||
callback: FFICallBack,
|
||||
userData: pointer,
|
||||
): Result[void, string] =
|
||||
## Parks a context for reuse: returns its slot to the pool WITHOUT stopping the
|
||||
## worker threads, so the next createFFIContext reuses the same fds. This is the
|
||||
## steady-state cleanup path, called by the generated destructor (ffiDtor);
|
||||
## destroyFFIContext is only for creation failure and non-pooling callers.
|
||||
## Parks a context for reuse without stopping its worker, so the next
|
||||
## createFFIContext reuses the same threads and fds. Steady-state cleanup path
|
||||
## for the generated destructor; destroyFFIContext is for failure/non-pool use.
|
||||
##
|
||||
## Runs on the CALLER's thread, not the FFI worker thread, and does NOT itself
|
||||
## wait for in-flight work to finish. It is safe to park here because the
|
||||
## framework processes one request at a time (see sendRequestToFFIThread): by
|
||||
## the time the destructor calls this, the worker has finished the previous
|
||||
## request and is idle (looping on reqSignal), so there is no handler still
|
||||
## touching the slot when it is reused.
|
||||
##
|
||||
## Clearing callbackState removes the stored C event callback. The
|
||||
## worker/watchdog threads stay alive after parking, so an event could still
|
||||
## fire on this slot; with no callback set that event does nothing, instead of
|
||||
## calling back into a consumer that has already released the context (whose
|
||||
## user-data pointer may now be freed).
|
||||
ctx.callbackState = default(FFICallbackState)
|
||||
ctx.myLib = nil
|
||||
pool.releaseSlot(ctx)
|
||||
return ok()
|
||||
## NON-BLOCKING: the FFI thread drains the handlers, frees the lib and releases
|
||||
## the slot, then fires `callback` (RET_OK drained, RET_ERR stuck). The slot
|
||||
## returns to the pool from that thread, so a reused slot never carries a straggler.
|
||||
return ctx.requestRecycle(callback, userData)
|
||||
|
||||
proc destroyFFIContext*[T](
|
||||
pool: var FFIContextPool[T], ctx: ptr FFIContext[T]
|
||||
@ -88,7 +72,7 @@ proc destroyFFIContext*[T](
|
||||
if pool.slots[i].addr == ctx:
|
||||
pool.initialized[i].store(false)
|
||||
break
|
||||
pool.releaseSlot(ctx)
|
||||
ctx.unclaim()
|
||||
return ok()
|
||||
|
||||
proc isValidCtx*[T](pool: var FFIContextPool[T], ctx: pointer): bool =
|
||||
@ -99,5 +83,5 @@ proc isValidCtx*[T](pool: var FFIContextPool[T], ctx: pointer): bool =
|
||||
return false
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
if cast[pointer](pool.slots[i].addr) == ctx:
|
||||
return pool.inUse[i].load()
|
||||
return cast[ptr FFIContext[T]](ctx).isClaimed()
|
||||
return false
|
||||
|
||||
@ -1565,24 +1565,19 @@ macro ffiDtor*(prc: untyped): untyped =
|
||||
ffiBody.add(bodyNode)
|
||||
|
||||
let poolIdent = ident($libTypeName & "FFIPool")
|
||||
# Park the slot (releaseFFIContext) instead of tearing it down
|
||||
# (destroyFFIContext) so the next createFFIContext reuses the same worker and
|
||||
# fds — this is what keeps fd usage bounded across create/destroy cycles.
|
||||
# Safe because the framework processes one request at a time
|
||||
# (see sendRequestToFFIThread): by the time this destructor runs the worker is
|
||||
# idle, not mid-request, so parking cannot race an in-flight handler.
|
||||
# Hand teardown to the FFI thread (releaseFFIContext -> requestRecycle), recycling
|
||||
# the slot for reuse to keep fd usage bounded. NON-BLOCKING: returns RET_OK once
|
||||
# accepted; the real outcome arrives via `callback`, so callers must wait on it,
|
||||
# not the return code.
|
||||
ffiBody.add quote do:
|
||||
let `destroyResIdent` =
|
||||
`poolIdent`.releaseFFIContext(cast[ptr FFIContext[`libTypeName`]](ctx))
|
||||
let `destroyResIdent` = `poolIdent`.releaseFFIContext(
|
||||
cast[ptr FFIContext[`libTypeName`]](ctx), callback, userData
|
||||
)
|
||||
if `destroyResIdent`.isErr():
|
||||
if not callback.isNil:
|
||||
let errStr = "destroy failed: " & $`destroyResIdent`.error
|
||||
callback(RET_ERR, unsafeAddr errStr[0], cast[csize_t](errStr.len), userData)
|
||||
return RET_ERR
|
||||
|
||||
ffiBody.add quote do:
|
||||
if not callback.isNil:
|
||||
callback(RET_OK, nil, 0, userData)
|
||||
return RET_OK
|
||||
|
||||
let ffiProc = newProc(
|
||||
|
||||
@ -421,6 +421,12 @@ proc testlib_create*(
|
||||
): Future[Result[SimpleLib, string]] {.ffiCtor.} =
|
||||
return ok(SimpleLib(value: config.initialValue))
|
||||
|
||||
# Records the value of the library object the destructor body saw, so a test can
|
||||
# confirm the user cleanup body ran with the right lib state before teardown.
|
||||
var gDestroyedValue {.threadvar.}: int
|
||||
proc testlib_destroy*(lib: SimpleLib) {.ffiDtor.} =
|
||||
gDestroyedValue = lib.value
|
||||
|
||||
suite "ffiCtor macro":
|
||||
test "creates context and returns pointer via callback":
|
||||
var d: CallbackData
|
||||
@ -450,6 +456,123 @@ suite "ffiCtor macro":
|
||||
|
||||
check SimpleLibFFIPool.destroyFFIContext(ctx).isOk()
|
||||
|
||||
proc createSimpleLib(initialValue: int): ptr FFIContext[SimpleLib] =
|
||||
## Helper: run the generated async ctor and return the live ctx.
|
||||
var d: CallbackData
|
||||
initCallbackData(d)
|
||||
defer:
|
||||
deinitCallbackData(d)
|
||||
let ret =
|
||||
testlib_create(ffiSerialize(SimpleConfig(initialValue: initialValue)).cstring,
|
||||
testCallback, addr d)
|
||||
doAssert not ret.isNil()
|
||||
waitCallback(d)
|
||||
doAssert d.retCode == RET_OK
|
||||
return cast[ptr FFIContext[SimpleLib]](cast[uint](parseBiggestUInt(callbackMsg(d))))
|
||||
|
||||
suite "ffiDtor macro (async destroy + reuse)":
|
||||
test "destroy fires RET_OK after teardown, frees myLib, and frees the slot":
|
||||
let ctx = createSimpleLib(5)
|
||||
check not ctx[].myLib.isNil
|
||||
check ctx[].myLib[].value == 5
|
||||
|
||||
var dD: CallbackData
|
||||
initCallbackData(dD)
|
||||
defer:
|
||||
deinitCallbackData(dD)
|
||||
|
||||
# Async destroy: the C return is just "accepted"; the real outcome arrives
|
||||
# via the callback once the FFI thread has finished tearing the lib down.
|
||||
check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK
|
||||
waitCallback(dD)
|
||||
check dD.retCode == RET_OK
|
||||
check gDestroyedValue == 5 # the user cleanup body saw the live lib
|
||||
check ctx[].myLib.isNil() # freed on the FFI thread
|
||||
|
||||
# The slot was freed from the FFI thread, so a fresh create reclaims it.
|
||||
let ctx2 = createSimpleLib(9)
|
||||
check ctx2 == ctx # same slot, reused worker + fds
|
||||
check ctx2[].myLib[].value == 9
|
||||
check SimpleLibFFIPool.destroyFFIContext(ctx2).isOk()
|
||||
|
||||
test "destroy waits for an in-flight request before reporting RET_OK":
|
||||
let ctx = createSimpleLib(1)
|
||||
|
||||
# Dispatch a 500 ms handler and do NOT wait — it is in flight at destroy time.
|
||||
var slow: CallbackData
|
||||
initCallbackData(slow)
|
||||
defer:
|
||||
deinitCallbackData(slow)
|
||||
check sendRequestToFFIThread(ctx, SlowRequest.ffiNewReq(testCallback, addr slow)).isOk()
|
||||
|
||||
var dD: CallbackData
|
||||
initCallbackData(dD)
|
||||
defer:
|
||||
deinitCallbackData(dD)
|
||||
check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK
|
||||
waitCallback(dD)
|
||||
check dD.retCode == RET_OK
|
||||
# Drained: the in-flight handler ran to completion before destroy reported OK.
|
||||
check slow.called
|
||||
check callbackMsg(slow) == "slow-done"
|
||||
|
||||
let ctx2 = createSimpleLib(2)
|
||||
check ctx2 == ctx
|
||||
check SimpleLibFFIPool.destroyFFIContext(ctx2).isOk()
|
||||
|
||||
test "requests are rejected once a destroy closes the gate":
|
||||
let ctx = createSimpleLib(3)
|
||||
|
||||
var dD: CallbackData
|
||||
initCallbackData(dD)
|
||||
defer:
|
||||
deinitCallbackData(dD)
|
||||
check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK
|
||||
waitCallback(dD)
|
||||
check dD.retCode == RET_OK
|
||||
|
||||
# Gate stays closed until the slot is reacquired: a late request must not
|
||||
# dispatch onto a context about to be (or already) reused.
|
||||
var d: CallbackData
|
||||
initCallbackData(d)
|
||||
defer:
|
||||
deinitCallbackData(d)
|
||||
check sendRequestToFFIThread(
|
||||
ctx, SlowRequest.ffiNewReq(testCallback, addr d)
|
||||
).isErr()
|
||||
|
||||
let ctx2 = createSimpleLib(4)
|
||||
check ctx2 == ctx
|
||||
check SimpleLibFFIPool.destroyFFIContext(ctx2).isOk()
|
||||
|
||||
test "a stuck context is reported as RET_ERR rather than hanging":
|
||||
let ctx = createSimpleLib(8)
|
||||
|
||||
let savedTimeout = RecycleTimeout
|
||||
RecycleTimeout = 150.milliseconds
|
||||
defer:
|
||||
RecycleTimeout = savedTimeout
|
||||
|
||||
# In-flight handler outlasts the (shortened) drain timeout.
|
||||
var slow: CallbackData
|
||||
initCallbackData(slow)
|
||||
defer:
|
||||
deinitCallbackData(slow)
|
||||
check sendRequestToFFIThread(ctx, SlowRequest.ffiNewReq(testCallback, addr slow)).isOk()
|
||||
|
||||
var dD: CallbackData
|
||||
initCallbackData(dD)
|
||||
defer:
|
||||
deinitCallbackData(dD)
|
||||
check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK
|
||||
waitCallback(dD)
|
||||
check dD.retCode == RET_ERR # drain timed out -> ctx reported stuck
|
||||
|
||||
# The stuck slot is leaked (not reused); the handler still finishes on its
|
||||
# own. Wait for it, then fully tear the leaked slot down.
|
||||
waitCallback(slow)
|
||||
check SimpleLibFFIPool.destroyFFIContext(ctx).isOk()
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Simplified .ffi. macro integration test
|
||||
# ---------------------------------------------------------------------------
|
||||
@ -672,13 +795,26 @@ proc countOpenFds(): int =
|
||||
except CatchableError:
|
||||
return -1
|
||||
|
||||
proc releaseAndWait[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]): cint =
|
||||
## Test helper mirroring how a C consumer destroys a context: kick off the
|
||||
## (non-blocking) teardown and block on the callback, returning its retCode.
|
||||
## RET_OK means the lib's in-flight tasks finished and the slot was parked.
|
||||
var d: CallbackData
|
||||
initCallbackData(d)
|
||||
defer:
|
||||
deinitCallbackData(d)
|
||||
if pool.releaseFFIContext(ctx, testCallback, addr d).isErr():
|
||||
return RET_ERR
|
||||
waitCallback(d)
|
||||
return d.retCode
|
||||
|
||||
suite "releaseFFIContext (park & reuse)":
|
||||
test "park returns the slot and reuses the same live worker":
|
||||
var pool: FFIContextPool[TestLib]
|
||||
let ctx1 = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
check pool.releaseFFIContext(ctx1).isOk()
|
||||
check pool.releaseAndWait(ctx1) == RET_OK
|
||||
|
||||
# Reacquire: must be the same array slot, with its worker still running.
|
||||
let ctx2 = pool.createFFIContext().valueOr:
|
||||
@ -707,7 +843,7 @@ suite "releaseFFIContext (park & reuse)":
|
||||
ctx.callbackState.callback = cast[pointer](testCallback)
|
||||
ctx.callbackState.userData = cast[pointer](0xDEAD)
|
||||
|
||||
check pool.releaseFFIContext(ctx).isOk()
|
||||
check pool.releaseAndWait(ctx) == RET_OK
|
||||
check ctx.callbackState.callback.isNil() # a watchdog tick can't call a freed cb
|
||||
check ctx.callbackState.userData.isNil()
|
||||
check ctx.myLib.isNil()
|
||||
@ -726,7 +862,7 @@ suite "releaseFFIContext (park & reuse)":
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
check pool.releaseFFIContext(ctx).isOk()
|
||||
check pool.releaseAndWait(ctx) == RET_OK
|
||||
|
||||
let baseline = countOpenFds()
|
||||
|
||||
@ -741,7 +877,7 @@ suite "releaseFFIContext (park & reuse)":
|
||||
).isOk()
|
||||
waitCallback(d)
|
||||
deinitCallbackData(d)
|
||||
check pool.releaseFFIContext(ctx).isOk()
|
||||
check pool.releaseAndWait(ctx) == RET_OK
|
||||
|
||||
let afterCycles = countOpenFds()
|
||||
# Reuse must not grow fds. Before the fix each cycle leaked ~10 fds (4
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user