diff --git a/ffi/event_thread.nim b/ffi/event_thread.nim new file mode 100644 index 0000000..cab1f2c --- /dev/null +++ b/ffi/event_thread.nim @@ -0,0 +1,139 @@ +## Event-thread body and FFI-thread liveness monitoring. +## +## Included from `ffi_context.nim` — inherits its imports, FFIContext type, +## and the heartbeat-timing constants. Lives alongside `ffi_thread.nim` +## so each thread's machinery is readable on its own. +## +## Responsibilities: +## - Drain queued events into listener callbacks. +## - Watch `ctx.ffiHeartbeat` and emit `NotRespondingEvent` / `RespondingEvent` +## on FFI-thread stall and recovery transitions. + +type + NotRespondingEvent* = object + RespondingEvent* = object + +const + NotRespondingEventName* = "not_responding" + RespondingEventName* = "responding" + +proc dispatchToListeners[T]( + ctx: ptr FFIContext[T], eventName: string, data: pointer, dataLen: int +) = + ## Holds reg.lock for the entire snapshot + invocation so concurrent + ## add/remove on this registry blocks until dispatch returns. + withLock ctx[].eventRegistry.lock: + let listeners = ctx[].eventRegistry.byEvent.getOrDefault(eventName) + if listeners.len == 0: + chronicles.debug "no listener registered", event = eventName + return + foreignThreadGc: + try: + notifyListeners(listeners, RET_OK, data, dataLen) + except Exception, CatchableError: + notifyListenersErr( + listeners, + "Exception dispatching " & eventName & ": " & getCurrentExceptionMsg(), + ) + +proc emitLivenessEvent[T, P](ctx: ptr FFIContext[T], name: string, payload: P) = + ## Encodes a zero-field liveness event (`NotRespondingEvent`, + ## `RespondingEvent`) and dispatches it directly to listeners, bypassing + ## the event queue (which may itself be wedged). Runs on the event thread. + let event = + try: + EventEnvelope[P](eventType: name, payload: payload).cborEncode() + except CatchableError as exc: + chronicles.error "liveness event encode failed", name = name, err = exc.msg + return + let dataPtr: pointer = + if event.len > 0: unsafeAddr event[0] else: nil + ctx.dispatchToListeners(name, dataPtr, event.len) + +proc onNotResponding*(ctx: ptr FFIContext) = + emitLivenessEvent(ctx, NotRespondingEventName, NotRespondingEvent()) + +proc onResponding*(ctx: ptr FFIContext) = + ## Fired once when the FFI thread's heartbeat starts advancing again + ## after a `NotRespondingEvent`. Lets consumers clear any "library + ## hung" UI state without polling. + emitLivenessEvent(ctx, RespondingEventName, RespondingEvent()) + +proc dispatchQueuedEvent[T](ctx: ptr FFIContext[T], qe: QueuedEvent) = + ## Frees `qe`'s c_malloc buffers on exit. + defer: + freeEventBuffers(qe.name, qe.data) + ctx.dispatchToListeners($qe.name, qe.data, qe.dataLen) + +proc drainEventQueue[T](ctx: ptr FFIContext[T]) = + while true: + let opt = ctx.eventQueue.tryDequeueEvent() + if opt.isNone(): + break + ctx.dispatchQueuedEvent(opt.get()) + +type HeartbeatMonitor = object + startedAt: Moment + lastChange: Moment + lastValue: int64 + notifiedStale: bool + +proc init(T: type HeartbeatMonitor, ctx: ptr FFIContext): T = + let now = Moment.now() + T( + startedAt: now, + lastChange: now, + lastValue: ctx.ffiHeartbeat.load(), + notifiedStale: false, + ) + +proc check[T](hb: var HeartbeatMonitor, ctx: ptr FFIContext[T]) = + ## Fires `onNotResponding` once the FFI thread's heartbeat counter stops + ## advancing past the stale threshold, and fires `onResponding` once it + ## starts advancing again. Both transitions latch so each is emitted at + ## most once per stall episode. + if Moment.now() - hb.startedAt <= FFIHeartbeatStartDelay: + return + let cur = ctx.ffiHeartbeat.load() + if cur != hb.lastValue: + if hb.notifiedStale: + onResponding(ctx) + hb.lastValue = cur + hb.lastChange = Moment.now() + hb.notifiedStale = false + elif not hb.notifiedStale and + Moment.now() - hb.lastChange > FFIHeartbeatStaleThreshold: + onNotResponding(ctx) + hb.notifiedStale = true + +proc eventRun[T](ctx: ptr FFIContext[T]) {.async.} = + var hb = HeartbeatMonitor.init(ctx) + var notifiedStuck = false + + while ctx.running.load(): + # Wake on enqueue or tick — whichever first. + discard await ctx.eventQueueSignal.wait().withTimeout(EventThreadTickInterval) + + ctx.drainEventQueue() + + # Fire after drain so reg.lock is free — FFI-thread would deadlock here. + if not notifiedStuck and ctx.eventQueueStuck.load(): + onNotResponding(ctx) + notifiedStuck = true + + if not ctx.running.load(): + break + hb.check(ctx) + +proc eventThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = + ## Drains the event queue and runs the FFI-thread heartbeat check. + ## Owns the queued `c_malloc` payloads until dispatch returns. + defer: + let fireRes = ctx.eventThreadExitSignal.fireSync() + if fireRes.isErr(): + error "failed to fire eventThreadExitSignal", err = fireRes.error + + try: + waitFor eventRun(ctx) + except CatchableError as exc: + error "event thread exited with exception", error = exc.msg diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index 0944af2..bbc512b 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -1,3 +1,9 @@ +## FFIContext type plus lifecycle (init / signal-stop / join / destroy). +## +## The per-thread bodies live in `ffi_thread.nim` and `event_thread.nim`, +## included below so the thread code can access the private FFIContext +## fields without forcing them through a public surface. + {.passc: "-fPIC".} import std/[atomics, locks, options, tables] @@ -36,267 +42,8 @@ const FFIHeartbeatStartDelay* = 10.seconds # grace window for library startup FFIHeartbeatStaleThreshold* = 1.seconds -type NotRespondingEvent* = object - -const NotRespondingEventName* = "not_responding" - -proc encodeNotRespondingEvent(): seq[byte] = - EventEnvelope[NotRespondingEvent]( - eventType: NotRespondingEventName, payload: NotRespondingEvent() - ).cborEncode() - -proc dispatchToListeners[T]( - ctx: ptr FFIContext[T], eventName: string, data: pointer, dataLen: int -) = - ## Lock held across the whole fan-out so foreign add/remove blocks - ## until dispatch returns (UAF-close contract from PR #39). - withLock ctx[].eventRegistry.lock: - let listeners = ctx[].eventRegistry.byEvent.getOrDefault(eventName) - if listeners.len == 0: - chronicles.debug "no listener registered", event = eventName - return - foreignThreadGc: - try: - notifyListeners(listeners, RET_OK, data, dataLen) - except Exception, CatchableError: - notifyListenersErr( - listeners, - "Exception dispatching " & eventName & ": " & getCurrentExceptionMsg(), - ) - -proc onNotResponding*(ctx: ptr FFIContext) = - ## Bypasses the (possibly wedged) event queue; runs on the event thread. - let event = - try: - encodeNotRespondingEvent() - except CatchableError as e: - chronicles.error "onNotResponding - encode failed", err = e.msg - return - let dataPtr: pointer = - if event.len > 0: unsafeAddr event[0] else: nil - ctx.dispatchToListeners(NotRespondingEventName, dataPtr, event.len) - -proc sendRequestToFFIThread*( - ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration -): Result[void, string] = - if ctx.eventQueueStuck.load(): - deleteRequest(ffiRequest) - return err("event queue stuck - library cannot accept new requests") - - if onFFIThread: - # Re-entrant dispatch from a handler would self-deadlock on `reqReceivedSignal`. - deleteRequest(ffiRequest) - return err( - "reentrant ffi call: a handler invoked sendRequestToFFIThread on its own context" - ) - - # `reqChannel` is single-producer and `reqReceivedSignal` shared; serialise - # the full trySend + fireSync + waitSync. PR #23 review item 7 tracks SP→MP. - ctx.lock.acquire() - defer: - ctx.lock.release() - - let sentOk = ctx.reqChannel.trySend(ffiRequest) - if not sentOk: - deleteRequest(ffiRequest) - return err("Couldn't send a request to the ffi thread") - - let fireSyncRes = ctx.reqSignal.fireSync() - if fireSyncRes.isErr(): - deleteRequest(ffiRequest) - return err("failed fireSync: " & $fireSyncRes.error) - - if fireSyncRes.get() == false: - deleteRequest(ffiRequest) - return err("Couldn't fireSync in time") - - let res = ctx.reqReceivedSignal.waitSync(timeout) - if res.isErr(): - # FFI thread was already signaled; it owns ffiRequest now. - return err("Couldn't receive reqReceivedSignal signal") - - ok() - -proc processRequest[T]( - request: ptr FFIThreadRequest, ctx: ptr FFIContext[T] -) {.async.} = - let reqId = $request[].reqId - # Keep `reqId` alive as backing for the cstring view. - let reqIdCs = reqId.cstring - - let retFut = - if not ctx[].registeredRequests[].contains(reqIdCs): - nilProcess(request[].reqId) - else: - ctx[].registeredRequests[][reqIdCs](cast[pointer](request), ctx) - - # Catch all (incl. CancelledError from the shutdown drain) so handleRes — - # and its `deleteRequest` defer — always runs. - let res = - try: - await retFut - except CatchableError as e: - Result[seq[byte], string].err( - "Error in processRequest for " & reqId & ": " & e.msg - ) - - # handleRes may raise (rare: OOM, GC setup); keep `raises: []`. - try: - handleRes(res, request) - except Exception as e: - error "Unexpected exception in handleRes", error = e.msg - -var ffiEventQueueSignalPtr {.threadvar.}: ThreadSignalPtr - # Stashed so the hook has no closure env. - -proc ffiNotifyEventEnqueuedHook() {.gcsafe, raises: [].} = - if not ffiEventQueueSignalPtr.isNil(): - let res = ffiEventQueueSignalPtr.fireSync() - if res.isErr(): - error "failed to fire eventQueueSignal after enqueue", err = res.error - -proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = - ffiCurrentEventRegistry = addr ctx[].eventRegistry - ffiCurrentEventQueue = addr ctx[].eventQueue - ffiCurrentEventQueueStuck = addr ctx[].eventQueueStuck - ffiEventQueueSignalPtr = ctx.eventQueueSignal - ffiCurrentNotifyEventEnqueued = ffiNotifyEventEnqueuedHook - onFFIThread = true - - logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT) - - defer: - onFFIThread = false - # Unblocks destroyFFIContext's bounded wait. - let fireRes = ctx.threadExitSignal.fireSync() - if fireRes.isErr(): - error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error - - let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} = - var ffiReqHandler: T - - # Track in-flight handlers so shutdown can drain them — otherwise - # abandoned futures leak request envelope/reqId/payload. - var pending: seq[Future[void]] = @[] - - proc reapCompleted() = - var i = 0 - while i < pending.len: - if pending[i].finished(): - pending.del(i) - else: - inc i - - while ctx.running.load(): - # Freezes if a sync handler blocks; event thread reads for liveness. - discard ctx.ffiHeartbeat.fetchAdd(1) - - reapCompleted() - - let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds) - if not gotSignal: - continue - - var request: ptr FFIThreadRequest - if not ctx.reqChannel.tryRecv(request): - continue - - if ctx.myLib.isNil(): - ctx.myLib = addr ffiReqHandler - - pending.add processRequest(request, ctx) - - let fireRes = ctx.reqReceivedSignal.fireSync() - if fireRes.isErr(): - error "could not fireSync back to requester thread", error = fireRes.error - - # Drain in-flight handlers so each request's `deleteRequest` runs. - reapCompleted() - if pending.len > 0: - try: - await allFutures(pending) - except CatchableError as e: - error "draining pending FFI requests on shutdown raised", error = e.msg - - waitFor ffiRun(ctx) - -proc dispatchQueuedEvent[T](ctx: ptr FFIContext[T], qe: QueuedEvent) = - defer: - freeEventBuffers(qe.name, qe.data) - ctx.dispatchToListeners($qe.name, qe.data, qe.dataLen) - -proc drainEventQueue[T](ctx: ptr FFIContext[T]) = - while true: - let opt = ctx.eventQueue.tryDequeueEvent() - if opt.isNone(): - break - ctx.dispatchQueuedEvent(opt.get()) - -type HeartbeatMonitor = object - startedAt: Moment - lastChange: Moment - lastValue: int64 - notifiedStale: bool - -proc init(T: type HeartbeatMonitor, ctx: ptr FFIContext): T = - let now = Moment.now() - T( - startedAt: now, - lastChange: now, - lastValue: ctx.ffiHeartbeat.load(), - notifiedStale: false, - ) - -proc check[T](hb: var HeartbeatMonitor, ctx: ptr FFIContext[T]) = - ## Fires onNotResponding once the heartbeat stalls past the threshold; - ## latches until it moves again. - if Moment.now() - hb.startedAt <= FFIHeartbeatStartDelay: - return - - let cur = ctx.ffiHeartbeat.load() - if cur != hb.lastValue: - hb.lastValue = cur - hb.lastChange = Moment.now() - hb.notifiedStale = false - return - - if hb.notifiedStale: - return - if Moment.now() - hb.lastChange <= FFIHeartbeatStaleThreshold: - return - onNotResponding(ctx) - hb.notifiedStale = true - -proc eventRun[T](ctx: ptr FFIContext[T]) {.async.} = - var hb = HeartbeatMonitor.init(ctx) - var notifiedStuck = false - - while ctx.running.load(): - # Wake on enqueue or tick — whichever first. - discard await ctx.eventQueueSignal.wait().withTimeout(EventThreadTickInterval) - - ctx.drainEventQueue() - - # Fire after drain so reg.lock is free — FFI-thread would deadlock here. - if not notifiedStuck and ctx.eventQueueStuck.load(): - onNotResponding(ctx) - notifiedStuck = true - - if not ctx.running.load(): - break - hb.check(ctx) - -proc eventThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = - ## Owns queued `c_malloc` payloads until dispatch returns. - defer: - let fireRes = ctx.eventThreadExitSignal.fireSync() - if fireRes.isErr(): - error "failed to fire eventThreadExitSignal", err = fireRes.error - - try: - waitFor eventRun(ctx) - except CatchableError as e: - error "event thread exited with exception", error = e.msg +include ./event_thread +include ./ffi_thread template closeAndNil(field: untyped) = if not field.isNil(): diff --git a/ffi/ffi_thread.nim b/ffi/ffi_thread.nim new file mode 100644 index 0000000..1de1643 --- /dev/null +++ b/ffi/ffi_thread.nim @@ -0,0 +1,183 @@ +## FFI-thread body and request submission API. +## +## Included from `ffi_context.nim` — inherits its imports, FFIContext type, +## and the `onFFIThread` threadvar. Companion to `event_thread.nim`. +## +## Responsibilities: +## - Receive `FFIThreadRequest`s from foreign threads via `reqChannel` and +## dispatch them through the user-registered handler table. +## - Advance `ctx.ffiHeartbeat` each loop iteration so the event thread can +## detect a wedged FFI thread. + +proc sendRequestToFFIThread*( + ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration +): Result[void, string] = + if ctx.eventQueueStuck.load(): + deleteRequest(ffiRequest) + return err("event queue stuck - library cannot accept new requests") + + if onFFIThread: + # Re-entrant dispatch from a handler would self-deadlock on `reqReceivedSignal`. + deleteRequest(ffiRequest) + return err( + "reentrant ffi call: a handler invoked sendRequestToFFIThread on its own context" + ) + + # All async submissions serialise on `ctx.lock` for the full + # trySend + fireSync + waitSync sequence because `reqChannel` is + # single-producer and `reqReceivedSignal` is shared across callers. + # Multi-producer redesign is tracked as PR #23 review item 7. + ctx.lock.acquire() + defer: + ctx.lock.release() + + ## Sending the request + let sentOk = ctx.reqChannel.trySend(ffiRequest) + if not sentOk: + deleteRequest(ffiRequest) + return err("Couldn't send a request to the ffi thread") + + let fireSyncRes = ctx.reqSignal.fireSync() + if fireSyncRes.isErr(): + deleteRequest(ffiRequest) + return err("failed fireSync: " & $fireSyncRes.error) + + if fireSyncRes.get() == false: + deleteRequest(ffiRequest) + return err("Couldn't fireSync in time") + + ## wait until the FFI working thread properly received the request + let res = ctx.reqReceivedSignal.waitSync(timeout) + if res.isErr(): + ## Do not free ffiRequest here: the FFI thread was already signaled and + ## will process (and free) it. + return err("Couldn't receive reqReceivedSignal signal") + + ## Notice that in case of "ok", the deallocShared(req) is performed by the FFI Thread in the + ## process proc. + ok() + +proc processRequest[T]( + request: ptr FFIThreadRequest, ctx: ptr FFIContext[T] +) {.async.} = + ## Invoked within the FFI thread to process a request coming from the FFI API consumer thread. + + let reqId = $request[].reqId + ## The reqId determines which proc will handle the request. + ## The registeredRequests represents a table defined at compile time. + ## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously] + + ## Explicit conversion keeps `reqId` alive as the backing string, + ## avoiding the implicit string→cstring warning that will become an error. + let reqIdCs = reqId.cstring + + let retFut = + if not ctx[].registeredRequests[].contains(reqIdCs): + ## That shouldn't happen because only registered requests should be sent to the FFI thread. + nilProcess(request[].reqId) + else: + ctx[].registeredRequests[][reqIdCs](cast[pointer](request), ctx) + + ## Catch every catchable exception (including CancelledError raised by + ## the shutdown drain in ffiRun) so handleRes — and its `deleteRequest` + ## defer — always runs. Otherwise an abandoned in-flight handler would + ## leak its request envelope, reqId copy, and CBOR payload. + let res = + try: + await retFut + except CatchableError as exc: + Result[seq[byte], string].err( + "Error in processRequest for " & reqId & ": " & exc.msg + ) + + ## handleRes may raise (OOM, GC setup) even though it is rare. Catching here + ## keeps the async proc raises:[] compatible. The defer inside handleRes + ## guarantees request is freed before the exception propagates. + try: + handleRes(res, request) + except Exception as exc: + error "Unexpected exception in handleRes", error = exc.msg + +var ffiEventQueueSignalPtr {.threadvar.}: ThreadSignalPtr + # Stashed so the hook has no closure env. + +proc ffiNotifyEventEnqueuedHook() {.gcsafe, raises: [].} = + if not ffiEventQueueSignalPtr.isNil(): + let res = ffiEventQueueSignalPtr.fireSync() + if res.isErr(): + error "failed to fire eventQueueSignal after enqueue", err = res.error + +proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = + ## FFI thread body that attends library user API requests + ffiCurrentEventRegistry = addr ctx[].eventRegistry + ffiCurrentEventQueue = addr ctx[].eventQueue + ffiCurrentEventQueueStuck = addr ctx[].eventQueueStuck + ffiEventQueueSignalPtr = ctx.eventQueueSignal + ffiCurrentNotifyEventEnqueued = ffiNotifyEventEnqueuedHook + onFFIThread = true + + logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT) + + defer: + onFFIThread = false + # Unblocks destroyFFIContext's bounded wait so cleanup can proceed. + let fireRes = ctx.threadExitSignal.fireSync() + if fireRes.isErr(): + error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error + + let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} = + var ffiReqHandler: T + ## Holds the main library object, i.e., in charge of handling the ffi requests. + ## e.g., Waku, LibP2P, SDS, etc. + + ## In-flight processRequest futures. Tracked so they can be drained on + ## shutdown — otherwise destroying the context while a handler is + ## awaiting (e.g. sleepAsync) abandons the future and leaks the + ## request's envelope/reqId/payload allocations. + var pending: seq[Future[void]] = @[] + + proc reapCompleted() = + var i = 0 + while i < pending.len: + if pending[i].finished(): + pending.del(i) + else: + inc i + + while ctx.running.load(): + # Freezes if a sync handler blocks the dispatcher; event thread reads to detect wedged FFI thread. + discard ctx.ffiHeartbeat.fetchAdd(1) + + reapCompleted() + + let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds) + if not gotSignal: + continue + + ## Wait for a request from the ffi consumer thread + var request: ptr FFIThreadRequest + if not ctx.reqChannel.tryRecv(request): + continue + + if ctx.myLib.isNil(): + ctx.myLib = addr ffiReqHandler + + ## Handle the request + pending.add processRequest(request, ctx) + + let fireRes = ctx.reqReceivedSignal.fireSync() + if fireRes.isErr(): + error "could not fireSync back to requester thread", error = fireRes.error + + ## Drain in-flight handlers so each request's `deleteRequest` runs + ## before we exit. Without this, abandoning a future mid-await would + ## leak the request allocations (visible to LSan; previously hidden + ## because Nim's pool allocator kept the chunks alive in the process). + reapCompleted() + if pending.len > 0: + try: + await allFutures(pending) + except CatchableError as exc: + error "draining pending FFI requests on shutdown raised", error = exc.msg + + waitFor ffiRun(ctx)