From d05f7f6441b745efc68e9880f2cae3cdd0a2602f Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Fri, 5 Jun 2026 21:02:31 +0200 Subject: [PATCH] simplify comments and procs --- ffi/ffi_context.nim | 131 +++++++++++---------------------------- ffi/ffi_context_pool.nim | 33 +++------- 2 files changed, 44 insertions(+), 120 deletions(-) diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index d11e295..cc9130f 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -13,24 +13,14 @@ type FFICallbackState* = object userData*: pointer type CtxLifecycle {.pure.} = enum - ## Request-acceptance + recycle handshake for a pooled context, held as an - ## Atomic on FFIContext. ("Recycle" = drain the context and return its context to - ## the pool for reuse, keeping the worker alive — unlike destroyFFIContext, which - ## fully tears the threads down.) Invariants: - ## * Requests are accepted ONLY in `Active`; the gate in sendRequestToFFIThread - ## and the watchdog ping both test `== Active`. - ## * Legal transitions, and who performs them: - ## Active -> RecyclePending requestRecycle (caller, under `lock`) - ## RecyclePending -> Recycling the FFI loop (one-shot compareExchange) - ## Recycling -> Active markReacquired (caller, on reuse) - ## (initContextResources starts a context in `Active`.) - ## * The gate stays closed across BOTH RecyclePending and Recycling, so no - ## request can dispatch onto a context being recycled or about to be reused. - ## * Only the FFI loop makes the RecyclePending -> Recycling move, so the - ## recycle runs exactly once per request. - Active ## serving requests - RecyclePending ## recycle requested; FFI loop hasn't claimed it yet - Recycling ## FFI loop claimed it: draining handlers, then freeing + ## State machine guarding a pooled FFI context, held as an Atomic on FFIContext. + ## Transitions: + ## Active -> RecyclePending when ffiDtor is invoked + ## RecyclePending -> Recycling The process completed the in-flight processes and is ready for lib cleanup and release + ## Recycling -> Active When the FFI thread is ready again to attend to requests + Active ## accepting and serving requests + RecyclePending ## recycle requested; FFI thread loop hasn't claimed it yet + Recycling ## FFI loop draining handlers, then frees lib + returns to pool type FFIContext*[T] = object myLib*: ptr T @@ -54,8 +44,6 @@ type FFIContext*[T] = object callbackState*: FFICallbackState running: Atomic[bool] # To control when the threads are running lifecycle: Atomic[CtxLifecycle] - # Request gate + recycle handshake in one. See CtxLifecycle for the states, - # transitions and invariants. recycleCallback: FFICallBack # The destructor's callback, fired by the recycle handler with the outcome: # RET_OK once drained, RET_ERR if it timed out. Set by requestRecycle. @@ -121,20 +109,14 @@ proc sendRequestToFFIThread*( ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration ): Result[void, string] = ctx.lock.acquire() - # This lock is only necessary while we use a SP Channel and while the signalling - # between threads assumes that there aren't concurrent requests. - # Rearchitecting the signaling + migrating to a MP Channel will allow us to receive - # requests concurrently and spare us the need of locks defer: ctx.lock.release() - ## A recycle closes this gate (under the same lock), so a queued or late sender - ## bails here instead of dispatching onto a context about to be reused. if ctx.lifecycle.load() != CtxLifecycle.Active: deleteRequest(ffiRequest) return err("FFI context is not accepting requests (being recycled)") - ## Sending the request + ## Sending the request to the FFI thread let sentOk = ctx.reqChannel.trySend(ffiRequest) if not sentOk: deleteRequest(ffiRequest) @@ -152,12 +134,8 @@ proc sendRequestToFFIThread*( ## wait until the FFI working thread properly received the request let res = ctx.reqReceivedSignal.waitSync(timeout) if res.isErr(): - ## Do not free ffiRequest here: the FFI thread was already signaled and - ## will process (and free) it. return err("Couldn't receive reqReceivedSignal signal") - ## Notice that in case of "ok", the deallocShared(req) is performed by the FFI Thread in the - ## process proc. return ok() type Foo = object @@ -200,8 +178,6 @@ proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} = break if ctx.lifecycle.load() != CtxLifecycle.Active: - ## Gate closed (being recycled, not yet reused): a ping would just fail - ## and spuriously trip onNotResponding. Skip until reused. continue let callback = proc( @@ -228,17 +204,14 @@ proc processRequest[T]( request: ptr FFIThreadRequest, ctx: ptr FFIContext[T] ) {.async.} = ## Invoked within the FFI thread to process a request coming from the FFI API consumer thread. - ## ffiThreadBody keeps this proc's Future in the run loop's `pending` seq so a - ## destroy can await (or cancel) it before freeing myLib. let reqId = $request[].reqId ## The reqId determines which proc will handle the request. ## The registeredRequests represents a table defined at compile time. ## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously] - ## Explicit conversion keeps `reqId` alive as the backing string, - ## avoiding the implicit string→cstring warning that will become an error. let reqIdCs = reqId.cstring + # keep `reqId` alive and avoid the implicit string→cstring warning. let retFut = if not ctx[].registeredRequests[].contains(reqIdCs): @@ -251,35 +224,26 @@ proc processRequest[T]( try: await retFut except CancelledError as exc: - ## Destroy timed out and cancelled us: turn it into an error so handleRes - ## still fires the callback and frees the request. Result[string, string].err("Request cancelled during destroy: " & exc.msg) except AsyncError as exc: Result[string, string].err( "Async error in processRequest for " & reqId & ": " & exc.msg ) - ## handleRes may raise (OOM, GC setup) even though it is rare. Catching here - ## keeps the async proc raises:[] compatible. The defer inside handleRes - ## guarantees request is freed before the exception propagates. + ## handleRes may raise (OOM, GC setup) even though it is rare. try: handleRes(res, request) except Exception as exc: error "Unexpected exception in handleRes", error = exc.msg proc freeLib[T](ctx: ptr FFIContext[T]) {.gcsafe.} = - ## Frees the createShared'd library object in ctx.myLib. Runs on the FFI - ## thread, which is what makes destroying its GC fields safe. if ctx.myLib.isNil(): return + when not defined(gcRefc): - ## orc: shared heap, so run the destructor here. The hook isn't inferred - ## gcsafe but only touches this (owning-thread) object, so assert it. {.cast(gcsafe).}: `=destroy`(ctx.myLib[]) else: - ## refc: `=destroy` here hits the unsafe Selector path (see cleanUpResources). - ## Reclaim only the wrapper; leak the inner fields, like the signal fds. discard freeShared(ctx.myLib) ctx.myLib = nil @@ -290,51 +254,44 @@ var RecycleTimeout* = 1500.milliseconds ## finish, so this only bounds a *stuck* handler. A `var` so tests can shorten it. proc recycleContext[T]( - ctx: ptr FFIContext[T], pending: ptr seq[Future[void]] + ctx: ptr FFIContext[T], ongoingProcessReq: ptr seq[Future[void]] ) {.async.} = - ## Recycle handler, on the FFI thread (requestRecycle already closed the gate): - ## drain the in-flight handlers, free the lib object, release the context for reuse, + ## Drain the in-flight handlers, free the lib object, release the context for reuse, ## and fire the callback with the outcome. Never blocks the caller. - ## - ## `pending` is the run loop's seq of handler Futures, by ptr (async procs can't - ## take `var`); holding the instances lets us await and cancel them. - pending[].keepItIf(not it.finished()) + + ongoingProcessReq[].keepItIf(not it.finished()) ## 1. Let the in-flight handlers finish on their own, bounded by RecycleTimeout. - var naturallyDrained = pending[].len == 0 + var naturallyDrained = ongoingProcessReq[].len == 0 if not naturallyDrained: - naturallyDrained = await allFutures(pending[]).withTimeout(RecycleTimeout) + naturallyDrained = await allFutures(ongoingProcessReq[]).withTimeout(RecycleTimeout) ## 2. If any are wedged, cancel them and give the cancellations a bounded moment ## to unwind, so the context can be reclaimed rather than leaked. var safeToRecycle = naturallyDrained if not naturallyDrained: - for fut in pending[]: + for fut in ongoingProcessReq[]: if not fut.finished(): fut.cancelSoon() - safeToRecycle = await allFutures(pending[]).withTimeout(RecycleTimeout) + safeToRecycle = await allFutures(ongoingProcessReq[]).withTimeout(RecycleTimeout) let cb = ctx.recycleCallback let ud = ctx.recycleUserData ctx.recycleCallback = nil if safeToRecycle: - ## Nothing can touch the context now. Free the lib here, then release the context - ## BEFORE the callback (the atomic store publishes these writes to whoever - ## reclaims it) so a caller reacquiring on the callback finds it already free. freeLib(ctx) ctx.callbackState = default(FFICallbackState) - pending[].setLen(0) - ctx.unclaim() + ongoingProcessReq[].setLen(0) + ctx.release() if not cb.isNil(): foreignThreadGc: - ## Never hand the callback nil: empty string on success, reason on timeout. - ## An empty string's cstring still points at a '\0', so msg[0] is a safe - ## address with len 0. let msg = - if naturallyDrained: "" - else: "recycle: in-flight requests did not finish in time" + if naturallyDrained: + "" + else: + "recycle: in-flight requests did not finish in time" let cmsg = msg.cstring let retCode = if naturallyDrained: RET_OK else: RET_ERR cb(retCode, unsafeAddr cmsg[0], cast[csize_t](msg.len), ud) @@ -346,25 +303,18 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT) defer: - # Signal destroyFFIContext that this thread has exited, so its bounded - # wait can unblock and proceed with cleanup. let fireRes = ctx.threadExitSignal.fireSync() if fireRes.isErr(): error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} = - ## Handler Futures live here on the FFI thread's heap, NOT on FFIContext - ## (shared pool memory, must stay GC-free); holding them lets destroy await - ## and cancel them. - var pending: seq[Future[void]] + + var ongoingProcessReq: seq[Future[void]] while ctx.running.load(): - ## Recycle requested: claim it (RecyclePending -> Recycling, one-shot) and run - ## the recycle on this owning thread, then keep looping so the worker stays - ## alive for the context's next reuse. var expected = CtxLifecycle.RecyclePending if ctx.lifecycle.compareExchange(expected, CtxLifecycle.Recycling): - await recycleContext(ctx, addr pending) + await recycleContext(ctx, addr ongoingProcessReq) continue let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds) @@ -376,10 +326,8 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = if not ctx.reqChannel.tryRecv(request): continue - ## Dispatch and remember the handler's Future (pruning finished ones) so a - ## later recycle can await or cancel it. - pending.keepItIf(not it.finished()) - pending.add(processRequest(request, ctx)) + ongoingProcessReq.keepItIf(not it.finished()) + ongoingProcessReq.add(processRequest(request, ctx)) let fireRes = ctx.reqReceivedSignal.fireSync() if fireRes.isErr(): @@ -427,7 +375,7 @@ proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] = proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = ## Initialises all resources inside an already-allocated FFIContext. ## On failure every partially-initialised resource is closed; the caller - ## is responsible for releasing the context (freeShared or ctx.unclaim()). + ## is responsible for releasing the context. ctx.lock.initLock() var success = false @@ -536,23 +484,16 @@ proc requestRecycle*[T]( return err("requestRecycle: failed to signal the FFI thread in time") return ok() -proc markReacquired*[T](ctx: ptr FFIContext[T]) = - ## Re-arms a recycled context when its context is reacquired by createFFIContext: - ## moves Recycling -> Active (re-opening the gate). The FFI thread's `pending` - ## seq was already drained and myLib freed by the recycle handler. +proc markAsActive*[T](ctx: ptr FFIContext[T]) = ctx.lifecycle.store(CtxLifecycle.Active) proc tryClaim*[T](ctx: ptr FFIContext[T]): bool = - ## Atomically claim this context (false -> true). Returns true if we won it, false - ## if it was already claimed. Used by createFFIContext to hand out a free context. + ## Returns true if acquired the contex, false if it was already claimed. var expected = false ctx.inUse.compareExchange(expected, true) -proc unclaim*[T](ctx: ptr FFIContext[T]) = - ## Mark the context free for reuse. Called by the recycle handler on the FFI thread - ## once teardown is done, and on creation failure / full teardown. +proc release*[T](ctx: ptr FFIContext[T]) = ctx.inUse.store(false) -proc isClaimed*[T](ctx: ptr FFIContext[T]): bool = - ## Whether the context is currently claimed by a consumer. +proc isInUse*[T](ctx: ptr FFIContext[T]): bool = ctx.inUse.load() diff --git a/ffi/ffi_context_pool.nim b/ffi/ffi_context_pool.nim index 112f75d..421a636 100644 --- a/ffi/ffi_context_pool.nim +++ b/ffi/ffi_context_pool.nim @@ -4,39 +4,27 @@ import ./ffi_context, ./ffi_types const MaxFFIContexts* = 32 ## Maximum number of concurrently live FFI contexts when using FFIContextPool. - ## Fds and threads are only consumed for contexts that are actually acquired, - ## so this value only affects the upfront memory of the pool array. type FFIContextPool*[T] = object - ## Fixed-size pool of FFI contexts. Avoids dynamic heap allocation per context - ## and bounds the total number of file descriptors consumed by ThreadSignalPtrs - ## to at most MaxFFIContexts * 2. contexts: array[MaxFFIContexts, FFIContext[T]] initialized: array[MaxFFIContexts, Atomic[bool]] - ## Whether a context's worker (threads, chronos dispatcher and ThreadSignalPtrs) - ## has been built. Set on first acquisition and kept set across park/reuse, - ## so a reacquired context reuses the same fds instead of allocating a fresh set - ## every create/destroy cycle. Cleared only by full teardown. proc createFFIContext*[T]( pool: var FFIContextPool[T] ): Result[ptr FFIContext[T], string] = ## Acquires a context from the fixed pool. The context's worker is built once on - ## first use and REUSED on every later acquisition of the same context (a context is - ## made reacquirable by releaseFFIContext, which parks it without tearing the - ## worker down). This is what keeps fd usage bounded: repeated create/destroy - ## cycles no longer leak a fresh set of ThreadSignalPtr/dispatcher fds. + ## first use and reused on every later acquisition. + for i in 0 ..< MaxFFIContexts: let ctx = pool.contexts[i].addr if not ctx.tryClaim(): continue if pool.initialized[i].load(): ## Reused context: a prior destroy drained and released it, worker still alive. - ## Re-arm the gate and hand it back. - ctx.markReacquired() + ctx.markAsActive() return ok(ctx) initContextResources(ctx).isOkOr: - ctx.unclaim() + ctx.release() return err("createFFIContext: initContextResources failed: " & $error) pool.initialized[i].store(true) return ok(ctx) @@ -51,27 +39,22 @@ proc destroyFFIContext*[T]( pool: var FFIContextPool[T], ctx: ptr FFIContext[T] ): Result[void, string] = ## Full teardown: stops/joins the worker threads and returns the context to the - ## pool, marking it uninitialised so a later createFFIContext rebuilds it. Used - ## on creation failure and by non-pooling callers; steady-state cleanup should - ## use releaseFFIContext to keep fd usage bounded. If the FFI thread is blocked - ## and does not exit in time, the context is leaked rather than reclaimed — - ## closing its resources while the thread is still live would be unsafe. + ## pool, marking it uninitialised so a later createFFIContext rebuilds it. ctx.stopAndJoinThreads().isOkOr: return err("destroyFFIContext(pool): " & $error) for i in 0 ..< MaxFFIContexts: if pool.contexts[i].addr == ctx: pool.initialized[i].store(false) break - ctx.unclaim() + ctx.release() return ok() proc isValidCtx*[T](pool: var FFIContextPool[T], ctx: pointer): bool = ## Returns true only if ctx points to one of the pool's contexts that is - ## currently in use. Rejects nil, offset-invalid, and dangling pointers - ## at the API boundary, preventing use-after-free dereferences. + ## currently in use. if ctx.isNil(): return false for i in 0 ..< MaxFFIContexts: if cast[pointer](pool.contexts[i].addr) == ctx: - return cast[ptr FFIContext[T]](ctx).isClaimed() + return cast[ptr FFIContext[T]](ctx).isInUse() return false