mirror of
https://github.com/logos-messaging/nim-ffi.git
synced 2026-06-21 00:40:16 +00:00
simplify comments and procs
This commit is contained in:
parent
1667398f07
commit
d05f7f6441
@ -13,24 +13,14 @@ type FFICallbackState* = object
|
||||
userData*: pointer
|
||||
|
||||
type CtxLifecycle {.pure.} = enum
|
||||
## Request-acceptance + recycle handshake for a pooled context, held as an
|
||||
## Atomic on FFIContext. ("Recycle" = drain the context and return its context to
|
||||
## the pool for reuse, keeping the worker alive — unlike destroyFFIContext, which
|
||||
## fully tears the threads down.) Invariants:
|
||||
## * Requests are accepted ONLY in `Active`; the gate in sendRequestToFFIThread
|
||||
## and the watchdog ping both test `== Active`.
|
||||
## * Legal transitions, and who performs them:
|
||||
## Active -> RecyclePending requestRecycle (caller, under `lock`)
|
||||
## RecyclePending -> Recycling the FFI loop (one-shot compareExchange)
|
||||
## Recycling -> Active markReacquired (caller, on reuse)
|
||||
## (initContextResources starts a context in `Active`.)
|
||||
## * The gate stays closed across BOTH RecyclePending and Recycling, so no
|
||||
## request can dispatch onto a context being recycled or about to be reused.
|
||||
## * Only the FFI loop makes the RecyclePending -> Recycling move, so the
|
||||
## recycle runs exactly once per request.
|
||||
Active ## serving requests
|
||||
RecyclePending ## recycle requested; FFI loop hasn't claimed it yet
|
||||
Recycling ## FFI loop claimed it: draining handlers, then freeing
|
||||
## State machine guarding a pooled FFI context, held as an Atomic on FFIContext.
|
||||
## Transitions:
|
||||
## Active -> RecyclePending when ffiDtor is invoked
|
||||
## RecyclePending -> Recycling The process completed the in-flight processes and is ready for lib cleanup and release
|
||||
## Recycling -> Active When the FFI thread is ready again to attend to requests
|
||||
Active ## accepting and serving requests
|
||||
RecyclePending ## recycle requested; FFI thread loop hasn't claimed it yet
|
||||
Recycling ## FFI loop draining handlers, then frees lib + returns to pool
|
||||
|
||||
type FFIContext*[T] = object
|
||||
myLib*: ptr T
|
||||
@ -54,8 +44,6 @@ type FFIContext*[T] = object
|
||||
callbackState*: FFICallbackState
|
||||
running: Atomic[bool] # To control when the threads are running
|
||||
lifecycle: Atomic[CtxLifecycle]
|
||||
# Request gate + recycle handshake in one. See CtxLifecycle for the states,
|
||||
# transitions and invariants.
|
||||
recycleCallback: FFICallBack
|
||||
# The destructor's callback, fired by the recycle handler with the outcome:
|
||||
# RET_OK once drained, RET_ERR if it timed out. Set by requestRecycle.
|
||||
@ -121,20 +109,14 @@ proc sendRequestToFFIThread*(
|
||||
ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration
|
||||
): Result[void, string] =
|
||||
ctx.lock.acquire()
|
||||
# This lock is only necessary while we use a SP Channel and while the signalling
|
||||
# between threads assumes that there aren't concurrent requests.
|
||||
# Rearchitecting the signaling + migrating to a MP Channel will allow us to receive
|
||||
# requests concurrently and spare us the need of locks
|
||||
defer:
|
||||
ctx.lock.release()
|
||||
|
||||
## A recycle closes this gate (under the same lock), so a queued or late sender
|
||||
## bails here instead of dispatching onto a context about to be reused.
|
||||
if ctx.lifecycle.load() != CtxLifecycle.Active:
|
||||
deleteRequest(ffiRequest)
|
||||
return err("FFI context is not accepting requests (being recycled)")
|
||||
|
||||
## Sending the request
|
||||
## Sending the request to the FFI thread
|
||||
let sentOk = ctx.reqChannel.trySend(ffiRequest)
|
||||
if not sentOk:
|
||||
deleteRequest(ffiRequest)
|
||||
@ -152,12 +134,8 @@ proc sendRequestToFFIThread*(
|
||||
## wait until the FFI working thread properly received the request
|
||||
let res = ctx.reqReceivedSignal.waitSync(timeout)
|
||||
if res.isErr():
|
||||
## Do not free ffiRequest here: the FFI thread was already signaled and
|
||||
## will process (and free) it.
|
||||
return err("Couldn't receive reqReceivedSignal signal")
|
||||
|
||||
## Notice that in case of "ok", the deallocShared(req) is performed by the FFI Thread in the
|
||||
## process proc.
|
||||
return ok()
|
||||
|
||||
type Foo = object
|
||||
@ -200,8 +178,6 @@ proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} =
|
||||
break
|
||||
|
||||
if ctx.lifecycle.load() != CtxLifecycle.Active:
|
||||
## Gate closed (being recycled, not yet reused): a ping would just fail
|
||||
## and spuriously trip onNotResponding. Skip until reused.
|
||||
continue
|
||||
|
||||
let callback = proc(
|
||||
@ -228,17 +204,14 @@ proc processRequest[T](
|
||||
request: ptr FFIThreadRequest, ctx: ptr FFIContext[T]
|
||||
) {.async.} =
|
||||
## Invoked within the FFI thread to process a request coming from the FFI API consumer thread.
|
||||
## ffiThreadBody keeps this proc's Future in the run loop's `pending` seq so a
|
||||
## destroy can await (or cancel) it before freeing myLib.
|
||||
|
||||
let reqId = $request[].reqId
|
||||
## The reqId determines which proc will handle the request.
|
||||
## The registeredRequests represents a table defined at compile time.
|
||||
## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously]
|
||||
|
||||
## Explicit conversion keeps `reqId` alive as the backing string,
|
||||
## avoiding the implicit string→cstring warning that will become an error.
|
||||
let reqIdCs = reqId.cstring
|
||||
# keep `reqId` alive and avoid the implicit string→cstring warning.
|
||||
|
||||
let retFut =
|
||||
if not ctx[].registeredRequests[].contains(reqIdCs):
|
||||
@ -251,35 +224,26 @@ proc processRequest[T](
|
||||
try:
|
||||
await retFut
|
||||
except CancelledError as exc:
|
||||
## Destroy timed out and cancelled us: turn it into an error so handleRes
|
||||
## still fires the callback and frees the request.
|
||||
Result[string, string].err("Request cancelled during destroy: " & exc.msg)
|
||||
except AsyncError as exc:
|
||||
Result[string, string].err(
|
||||
"Async error in processRequest for " & reqId & ": " & exc.msg
|
||||
)
|
||||
|
||||
## handleRes may raise (OOM, GC setup) even though it is rare. Catching here
|
||||
## keeps the async proc raises:[] compatible. The defer inside handleRes
|
||||
## guarantees request is freed before the exception propagates.
|
||||
## handleRes may raise (OOM, GC setup) even though it is rare.
|
||||
try:
|
||||
handleRes(res, request)
|
||||
except Exception as exc:
|
||||
error "Unexpected exception in handleRes", error = exc.msg
|
||||
|
||||
proc freeLib[T](ctx: ptr FFIContext[T]) {.gcsafe.} =
|
||||
## Frees the createShared'd library object in ctx.myLib. Runs on the FFI
|
||||
## thread, which is what makes destroying its GC fields safe.
|
||||
if ctx.myLib.isNil():
|
||||
return
|
||||
|
||||
when not defined(gcRefc):
|
||||
## orc: shared heap, so run the destructor here. The hook isn't inferred
|
||||
## gcsafe but only touches this (owning-thread) object, so assert it.
|
||||
{.cast(gcsafe).}:
|
||||
`=destroy`(ctx.myLib[])
|
||||
else:
|
||||
## refc: `=destroy` here hits the unsafe Selector path (see cleanUpResources).
|
||||
## Reclaim only the wrapper; leak the inner fields, like the signal fds.
|
||||
discard
|
||||
freeShared(ctx.myLib)
|
||||
ctx.myLib = nil
|
||||
@ -290,51 +254,44 @@ var RecycleTimeout* = 1500.milliseconds
|
||||
## finish, so this only bounds a *stuck* handler. A `var` so tests can shorten it.
|
||||
|
||||
proc recycleContext[T](
|
||||
ctx: ptr FFIContext[T], pending: ptr seq[Future[void]]
|
||||
ctx: ptr FFIContext[T], ongoingProcessReq: ptr seq[Future[void]]
|
||||
) {.async.} =
|
||||
## Recycle handler, on the FFI thread (requestRecycle already closed the gate):
|
||||
## drain the in-flight handlers, free the lib object, release the context for reuse,
|
||||
## Drain the in-flight handlers, free the lib object, release the context for reuse,
|
||||
## and fire the callback with the outcome. Never blocks the caller.
|
||||
##
|
||||
## `pending` is the run loop's seq of handler Futures, by ptr (async procs can't
|
||||
## take `var`); holding the instances lets us await and cancel them.
|
||||
pending[].keepItIf(not it.finished())
|
||||
|
||||
ongoingProcessReq[].keepItIf(not it.finished())
|
||||
|
||||
## 1. Let the in-flight handlers finish on their own, bounded by RecycleTimeout.
|
||||
var naturallyDrained = pending[].len == 0
|
||||
var naturallyDrained = ongoingProcessReq[].len == 0
|
||||
if not naturallyDrained:
|
||||
naturallyDrained = await allFutures(pending[]).withTimeout(RecycleTimeout)
|
||||
naturallyDrained = await allFutures(ongoingProcessReq[]).withTimeout(RecycleTimeout)
|
||||
|
||||
## 2. If any are wedged, cancel them and give the cancellations a bounded moment
|
||||
## to unwind, so the context can be reclaimed rather than leaked.
|
||||
var safeToRecycle = naturallyDrained
|
||||
if not naturallyDrained:
|
||||
for fut in pending[]:
|
||||
for fut in ongoingProcessReq[]:
|
||||
if not fut.finished():
|
||||
fut.cancelSoon()
|
||||
safeToRecycle = await allFutures(pending[]).withTimeout(RecycleTimeout)
|
||||
safeToRecycle = await allFutures(ongoingProcessReq[]).withTimeout(RecycleTimeout)
|
||||
|
||||
let cb = ctx.recycleCallback
|
||||
let ud = ctx.recycleUserData
|
||||
ctx.recycleCallback = nil
|
||||
|
||||
if safeToRecycle:
|
||||
## Nothing can touch the context now. Free the lib here, then release the context
|
||||
## BEFORE the callback (the atomic store publishes these writes to whoever
|
||||
## reclaims it) so a caller reacquiring on the callback finds it already free.
|
||||
freeLib(ctx)
|
||||
ctx.callbackState = default(FFICallbackState)
|
||||
pending[].setLen(0)
|
||||
ctx.unclaim()
|
||||
ongoingProcessReq[].setLen(0)
|
||||
ctx.release()
|
||||
|
||||
if not cb.isNil():
|
||||
foreignThreadGc:
|
||||
## Never hand the callback nil: empty string on success, reason on timeout.
|
||||
## An empty string's cstring still points at a '\0', so msg[0] is a safe
|
||||
## address with len 0.
|
||||
let msg =
|
||||
if naturallyDrained: ""
|
||||
else: "recycle: in-flight requests did not finish in time"
|
||||
if naturallyDrained:
|
||||
""
|
||||
else:
|
||||
"recycle: in-flight requests did not finish in time"
|
||||
let cmsg = msg.cstring
|
||||
let retCode = if naturallyDrained: RET_OK else: RET_ERR
|
||||
cb(retCode, unsafeAddr cmsg[0], cast[csize_t](msg.len), ud)
|
||||
@ -346,25 +303,18 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)
|
||||
|
||||
defer:
|
||||
# Signal destroyFFIContext that this thread has exited, so its bounded
|
||||
# wait can unblock and proceed with cleanup.
|
||||
let fireRes = ctx.threadExitSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error
|
||||
|
||||
let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} =
|
||||
## Handler Futures live here on the FFI thread's heap, NOT on FFIContext
|
||||
## (shared pool memory, must stay GC-free); holding them lets destroy await
|
||||
## and cancel them.
|
||||
var pending: seq[Future[void]]
|
||||
|
||||
var ongoingProcessReq: seq[Future[void]]
|
||||
|
||||
while ctx.running.load():
|
||||
## Recycle requested: claim it (RecyclePending -> Recycling, one-shot) and run
|
||||
## the recycle on this owning thread, then keep looping so the worker stays
|
||||
## alive for the context's next reuse.
|
||||
var expected = CtxLifecycle.RecyclePending
|
||||
if ctx.lifecycle.compareExchange(expected, CtxLifecycle.Recycling):
|
||||
await recycleContext(ctx, addr pending)
|
||||
await recycleContext(ctx, addr ongoingProcessReq)
|
||||
continue
|
||||
|
||||
let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds)
|
||||
@ -376,10 +326,8 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
if not ctx.reqChannel.tryRecv(request):
|
||||
continue
|
||||
|
||||
## Dispatch and remember the handler's Future (pruning finished ones) so a
|
||||
## later recycle can await or cancel it.
|
||||
pending.keepItIf(not it.finished())
|
||||
pending.add(processRequest(request, ctx))
|
||||
ongoingProcessReq.keepItIf(not it.finished())
|
||||
ongoingProcessReq.add(processRequest(request, ctx))
|
||||
|
||||
let fireRes = ctx.reqReceivedSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
@ -427,7 +375,7 @@ proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
## Initialises all resources inside an already-allocated FFIContext.
|
||||
## On failure every partially-initialised resource is closed; the caller
|
||||
## is responsible for releasing the context (freeShared or ctx.unclaim()).
|
||||
## is responsible for releasing the context.
|
||||
ctx.lock.initLock()
|
||||
|
||||
var success = false
|
||||
@ -536,23 +484,16 @@ proc requestRecycle*[T](
|
||||
return err("requestRecycle: failed to signal the FFI thread in time")
|
||||
return ok()
|
||||
|
||||
proc markReacquired*[T](ctx: ptr FFIContext[T]) =
|
||||
## Re-arms a recycled context when its context is reacquired by createFFIContext:
|
||||
## moves Recycling -> Active (re-opening the gate). The FFI thread's `pending`
|
||||
## seq was already drained and myLib freed by the recycle handler.
|
||||
proc markAsActive*[T](ctx: ptr FFIContext[T]) =
|
||||
ctx.lifecycle.store(CtxLifecycle.Active)
|
||||
|
||||
proc tryClaim*[T](ctx: ptr FFIContext[T]): bool =
|
||||
## Atomically claim this context (false -> true). Returns true if we won it, false
|
||||
## if it was already claimed. Used by createFFIContext to hand out a free context.
|
||||
## Returns true if acquired the contex, false if it was already claimed.
|
||||
var expected = false
|
||||
ctx.inUse.compareExchange(expected, true)
|
||||
|
||||
proc unclaim*[T](ctx: ptr FFIContext[T]) =
|
||||
## Mark the context free for reuse. Called by the recycle handler on the FFI thread
|
||||
## once teardown is done, and on creation failure / full teardown.
|
||||
proc release*[T](ctx: ptr FFIContext[T]) =
|
||||
ctx.inUse.store(false)
|
||||
|
||||
proc isClaimed*[T](ctx: ptr FFIContext[T]): bool =
|
||||
## Whether the context is currently claimed by a consumer.
|
||||
proc isInUse*[T](ctx: ptr FFIContext[T]): bool =
|
||||
ctx.inUse.load()
|
||||
|
||||
@ -4,39 +4,27 @@ import ./ffi_context, ./ffi_types
|
||||
|
||||
const MaxFFIContexts* = 32
|
||||
## Maximum number of concurrently live FFI contexts when using FFIContextPool.
|
||||
## Fds and threads are only consumed for contexts that are actually acquired,
|
||||
## so this value only affects the upfront memory of the pool array.
|
||||
|
||||
type FFIContextPool*[T] = object
|
||||
## Fixed-size pool of FFI contexts. Avoids dynamic heap allocation per context
|
||||
## and bounds the total number of file descriptors consumed by ThreadSignalPtrs
|
||||
## to at most MaxFFIContexts * 2.
|
||||
contexts: array[MaxFFIContexts, FFIContext[T]]
|
||||
initialized: array[MaxFFIContexts, Atomic[bool]]
|
||||
## Whether a context's worker (threads, chronos dispatcher and ThreadSignalPtrs)
|
||||
## has been built. Set on first acquisition and kept set across park/reuse,
|
||||
## so a reacquired context reuses the same fds instead of allocating a fresh set
|
||||
## every create/destroy cycle. Cleared only by full teardown.
|
||||
|
||||
proc createFFIContext*[T](
|
||||
pool: var FFIContextPool[T]
|
||||
): Result[ptr FFIContext[T], string] =
|
||||
## Acquires a context from the fixed pool. The context's worker is built once on
|
||||
## first use and REUSED on every later acquisition of the same context (a context is
|
||||
## made reacquirable by releaseFFIContext, which parks it without tearing the
|
||||
## worker down). This is what keeps fd usage bounded: repeated create/destroy
|
||||
## cycles no longer leak a fresh set of ThreadSignalPtr/dispatcher fds.
|
||||
## first use and reused on every later acquisition.
|
||||
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
let ctx = pool.contexts[i].addr
|
||||
if not ctx.tryClaim():
|
||||
continue
|
||||
if pool.initialized[i].load():
|
||||
## Reused context: a prior destroy drained and released it, worker still alive.
|
||||
## Re-arm the gate and hand it back.
|
||||
ctx.markReacquired()
|
||||
ctx.markAsActive()
|
||||
return ok(ctx)
|
||||
initContextResources(ctx).isOkOr:
|
||||
ctx.unclaim()
|
||||
ctx.release()
|
||||
return err("createFFIContext: initContextResources failed: " & $error)
|
||||
pool.initialized[i].store(true)
|
||||
return ok(ctx)
|
||||
@ -51,27 +39,22 @@ proc destroyFFIContext*[T](
|
||||
pool: var FFIContextPool[T], ctx: ptr FFIContext[T]
|
||||
): Result[void, string] =
|
||||
## Full teardown: stops/joins the worker threads and returns the context to the
|
||||
## pool, marking it uninitialised so a later createFFIContext rebuilds it. Used
|
||||
## on creation failure and by non-pooling callers; steady-state cleanup should
|
||||
## use releaseFFIContext to keep fd usage bounded. If the FFI thread is blocked
|
||||
## and does not exit in time, the context is leaked rather than reclaimed —
|
||||
## closing its resources while the thread is still live would be unsafe.
|
||||
## pool, marking it uninitialised so a later createFFIContext rebuilds it.
|
||||
ctx.stopAndJoinThreads().isOkOr:
|
||||
return err("destroyFFIContext(pool): " & $error)
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
if pool.contexts[i].addr == ctx:
|
||||
pool.initialized[i].store(false)
|
||||
break
|
||||
ctx.unclaim()
|
||||
ctx.release()
|
||||
return ok()
|
||||
|
||||
proc isValidCtx*[T](pool: var FFIContextPool[T], ctx: pointer): bool =
|
||||
## Returns true only if ctx points to one of the pool's contexts that is
|
||||
## currently in use. Rejects nil, offset-invalid, and dangling pointers
|
||||
## at the API boundary, preventing use-after-free dereferences.
|
||||
## currently in use.
|
||||
if ctx.isNil():
|
||||
return false
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
if cast[pointer](pool.contexts[i].addr) == ctx:
|
||||
return cast[ptr FFIContext[T]](ctx).isClaimed()
|
||||
return cast[ptr FFIContext[T]](ctx).isInUse()
|
||||
return false
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user