From be832c5175f2734e5c72ce88ef695da1e98ccdad Mon Sep 17 00:00:00 2001 From: Gabriel Cruz Date: Fri, 5 Jun 2026 14:26:33 -0300 Subject: [PATCH] chore: implement optional improvements --- ffi/event_thread.nim | 137 +++++++++++++++++++++ ffi/ffi_context.nim | 277 ++----------------------------------------- ffi/ffi_thread.nim | 166 ++++++++++++++++++++++++++ 3 files changed, 311 insertions(+), 269 deletions(-) create mode 100644 ffi/event_thread.nim create mode 100644 ffi/ffi_thread.nim diff --git a/ffi/event_thread.nim b/ffi/event_thread.nim new file mode 100644 index 0000000..92caa56 --- /dev/null +++ b/ffi/event_thread.nim @@ -0,0 +1,137 @@ +## Event-thread body and FFI-thread liveness monitoring. +## +## Included from `ffi_context.nim` — inherits its imports, FFIContext type, +## and the heartbeat-timing constants. Lives alongside `ffi_thread.nim` +## so each thread's machinery is readable on its own. +## +## Responsibilities: +## - Drain queued events into listener callbacks (queue producer lands in PR #69). +## - Watch `ctx.ffiHeartbeat` and emit `NotRespondingEvent` / `RespondingEvent` +## on FFI-thread stall and recovery transitions. + +type + NotRespondingEvent* = object + RespondingEvent* = object + +const + NotRespondingEventName* = "not_responding" + RespondingEventName* = "responding" + +proc dispatchToListeners[T]( + ctx: ptr FFIContext[T], eventName: string, data: pointer, dataLen: int +) = + ## Holds reg.lock for the entire snapshot + invocation so concurrent + ## add/remove on this registry blocks until dispatch returns. + withLock ctx[].eventRegistry.lock: + let listeners = ctx[].eventRegistry.byEvent.getOrDefault(eventName) + if listeners.len == 0: + chronicles.debug "no listener registered", event = eventName + return + foreignThreadGc: + try: + notifyListeners(listeners, RET_OK, data, dataLen) + except Exception, CatchableError: + notifyListenersErr( + listeners, + "Exception dispatching " & eventName & ": " & getCurrentExceptionMsg(), + ) + +proc emitLivenessEvent[T, P](ctx: ptr FFIContext[T], name: string, payload: P) = + ## Encodes a zero-field liveness event (`NotRespondingEvent`, + ## `RespondingEvent`) and dispatches it directly to listeners, bypassing + ## the event queue (which may itself be wedged). Runs on the event thread. + let event = + try: + EventEnvelope[P](eventType: name, payload: payload).cborEncode() + except CatchableError as exc: + chronicles.error "liveness event encode failed", name = name, err = exc.msg + return + let dataPtr: pointer = + if event.len > 0: unsafeAddr event[0] else: nil + ctx.dispatchToListeners(name, dataPtr, event.len) + +proc onNotResponding*(ctx: ptr FFIContext) = + emitLivenessEvent(ctx, NotRespondingEventName, NotRespondingEvent()) + +proc onResponding*(ctx: ptr FFIContext) = + ## Fired once when the FFI thread's heartbeat starts advancing again + ## after a `NotRespondingEvent`. Lets consumers clear any "library + ## hung" UI state without polling. + emitLivenessEvent(ctx, RespondingEventName, RespondingEvent()) + +proc dispatchQueuedEvent[T](ctx: ptr FFIContext[T], qe: QueuedEvent) = + ## Frees `qe`'s c_malloc buffers on exit. + defer: + if not qe.name.isNil(): + c_free(cast[pointer](qe.name)) + if not qe.data.isNil(): + c_free(qe.data) + ctx.dispatchToListeners($qe.name, qe.data, qe.dataLen) + +proc drainEventQueue[T](ctx: ptr FFIContext[T]) = + while true: + let opt = ctx.eventQueue.tryDequeueEvent() + if opt.isNone(): + break + ctx.dispatchQueuedEvent(opt.get()) + +type HeartbeatMonitor = object + startedAt: Moment + lastChange: Moment + lastValue: int64 + notifiedStale: bool + +proc init(T: type HeartbeatMonitor, ctx: ptr FFIContext): T = + let now = Moment.now() + T( + startedAt: now, + lastChange: now, + lastValue: ctx.ffiHeartbeat.load(), + notifiedStale: false, + ) + +proc check[T](hb: var HeartbeatMonitor, ctx: ptr FFIContext[T]) = + ## Fires `onNotResponding` once the FFI thread's heartbeat counter stops + ## advancing past the stale threshold, and fires `onResponding` once it + ## starts advancing again. Both transitions latch so each is emitted at + ## most once per stall episode. + if Moment.now() - hb.startedAt <= FFIHeartbeatStartDelay: + return + let cur = ctx.ffiHeartbeat.load() + if cur != hb.lastValue: + if hb.notifiedStale: + onResponding(ctx) + hb.lastValue = cur + hb.lastChange = Moment.now() + hb.notifiedStale = false + elif not hb.notifiedStale and + Moment.now() - hb.lastChange > FFIHeartbeatStaleThreshold: + onNotResponding(ctx) + hb.notifiedStale = true + +proc eventRun[T](ctx: ptr FFIContext[T]) {.async.} = + var hb = HeartbeatMonitor.init(ctx) + + while ctx.running.load(): + # Wake on enqueue or tick — whichever first. The enqueue path lands in PR #69; + # until then the wait always times out and we fall through to the heartbeat check. + discard await ctx.eventQueueSignal.wait().withTimeout(EventThreadTickInterval) + + ctx.drainEventQueue() + + if not ctx.running.load(): + break + hb.check(ctx) + +proc eventThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = + ## Drains the event queue and runs the FFI-thread heartbeat check. + ## Owns the queued `c_malloc` payloads until dispatch returns. + defer: + let fireRes = ctx.eventThreadExitSignal.fireSync() + if fireRes.isErr(): + error "failed to fire eventThreadExitSignal", err = fireRes.error + + try: + waitFor eventRun(ctx) + except CatchableError as exc: + error "event thread exited with exception", error = exc.msg diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index c630be3..5d7afe6 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -1,3 +1,9 @@ +## FFIContext type plus lifecycle (init / signal-stop / join / destroy). +## +## The per-thread bodies live in `ffi_thread.nim` and `event_thread.nim`, +## included below so the thread code can access the private FFIContext +## fields without forcing them through a public surface. + {.passc: "-fPIC".} import system/ansi_c @@ -49,275 +55,8 @@ const FFIHeartbeatStartDelay* = 10.seconds # grace window for library startup FFIHeartbeatStaleThreshold* = 1.seconds -type NotRespondingEvent* = object - -const NotRespondingEventName* = "not_responding" - -proc encodeNotRespondingEvent(): seq[byte] = - EventEnvelope[NotRespondingEvent]( - eventType: NotRespondingEventName, payload: NotRespondingEvent() - ).cborEncode() - -proc dispatchToListeners[T]( - ctx: ptr FFIContext[T], eventName: string, data: pointer, dataLen: int -) = - ## Holds reg.lock for the entire snapshot + invocation so concurrent - ## add/remove on this registry blocks until dispatch returns. - withLock ctx[].eventRegistry.lock: - let listeners = ctx[].eventRegistry.byEvent.getOrDefault(eventName) - if listeners.len == 0: - chronicles.debug "no listener registered", event = eventName - return - foreignThreadGc: - try: - notifyListeners(listeners, RET_OK, data, dataLen) - except Exception, CatchableError: - notifyListenersErr( - listeners, - "Exception dispatching " & eventName & ": " & getCurrentExceptionMsg(), - ) - -proc onNotResponding*(ctx: ptr FFIContext) = - ## Bypasses the event queue (which may itself be wedged) and dispatches - ## directly to listeners. Runs on the event thread. - let event = - try: - encodeNotRespondingEvent() - except CatchableError as exc: - chronicles.error "onNotResponding - encode failed", err = exc.msg - return - let dataPtr: pointer = - if event.len > 0: unsafeAddr event[0] else: nil - ctx.dispatchToListeners(NotRespondingEventName, dataPtr, event.len) - -proc sendRequestToFFIThread*( - ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration -): Result[void, string] = - # Reentrancy guard: only this thread can fire `reqReceivedSignal`, so a handler dispatching back would self-deadlock. - if onFFIThread: - deleteRequest(ffiRequest) - return err( - "reentrant ffi call: a handler invoked sendRequestToFFIThread on its own context" - ) - - # All async submissions serialise on `ctx.lock` for the full - # trySend + fireSync + waitSync sequence because `reqChannel` is - # single-producer and `reqReceivedSignal` is shared across callers. - # Multi-producer redesign is tracked as PR #23 review item 7. - ctx.lock.acquire() - defer: - ctx.lock.release() - - ## Sending the request - let sentOk = ctx.reqChannel.trySend(ffiRequest) - if not sentOk: - deleteRequest(ffiRequest) - return err("Couldn't send a request to the ffi thread") - - let fireSyncRes = ctx.reqSignal.fireSync() - if fireSyncRes.isErr(): - deleteRequest(ffiRequest) - return err("failed fireSync: " & $fireSyncRes.error) - - if fireSyncRes.get() == false: - deleteRequest(ffiRequest) - return err("Couldn't fireSync in time") - - ## wait until the FFI working thread properly received the request - let res = ctx.reqReceivedSignal.waitSync(timeout) - if res.isErr(): - ## Do not free ffiRequest here: the FFI thread was already signaled and - ## will process (and free) it. - return err("Couldn't receive reqReceivedSignal signal") - - ## Notice that in case of "ok", the deallocShared(req) is performed by the FFI Thread in the - ## process proc. - ok() - -proc processRequest[T]( - request: ptr FFIThreadRequest, ctx: ptr FFIContext[T] -) {.async.} = - ## Invoked within the FFI thread to process a request coming from the FFI API consumer thread. - - let reqId = $request[].reqId - ## The reqId determines which proc will handle the request. - ## The registeredRequests represents a table defined at compile time. - ## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously] - - ## Explicit conversion keeps `reqId` alive as the backing string, - ## avoiding the implicit string→cstring warning that will become an error. - let reqIdCs = reqId.cstring - - let retFut = - if not ctx[].registeredRequests[].contains(reqIdCs): - ## That shouldn't happen because only registered requests should be sent to the FFI thread. - nilProcess(request[].reqId) - else: - ctx[].registeredRequests[][reqIdCs](cast[pointer](request), ctx) - - ## Catch every catchable exception (including CancelledError raised by - ## the shutdown drain in ffiRun) so handleRes — and its `deleteRequest` - ## defer — always runs. Otherwise an abandoned in-flight handler would - ## leak its request envelope, reqId copy, and CBOR payload. - let res = - try: - await retFut - except CatchableError as exc: - Result[seq[byte], string].err( - "Error in processRequest for " & reqId & ": " & exc.msg - ) - - ## handleRes may raise (OOM, GC setup) even though it is rare. Catching here - ## keeps the async proc raises:[] compatible. The defer inside handleRes - ## guarantees request is freed before the exception propagates. - try: - handleRes(res, request) - except Exception as exc: - error "Unexpected exception in handleRes", error = exc.msg - -proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = - ## FFI thread body that attends library user API requests - ffiCurrentEventRegistry = addr ctx[].eventRegistry - onFFIThread = true - - logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT) - - defer: - onFFIThread = false - # Unblocks destroyFFIContext's bounded wait so cleanup can proceed. - let fireRes = ctx.threadExitSignal.fireSync() - if fireRes.isErr(): - error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error - - let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} = - var ffiReqHandler: T - ## Holds the main library object, i.e., in charge of handling the ffi requests. - ## e.g., Waku, LibP2P, SDS, etc. - - ## In-flight processRequest futures. Tracked so they can be drained on - ## shutdown — otherwise destroying the context while a handler is - ## awaiting (e.g. sleepAsync) abandons the future and leaks the - ## request's envelope/reqId/payload allocations. - var pending: seq[Future[void]] = @[] - - proc reapCompleted() = - var i = 0 - while i < pending.len: - if pending[i].finished(): - pending.del(i) - else: - inc i - - while ctx.running.load(): - # Freezes if a sync handler blocks the dispatcher; event thread reads to detect wedged FFI thread. - discard ctx.ffiHeartbeat.fetchAdd(1) - - reapCompleted() - - let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds) - if not gotSignal: - continue - - ## Wait for a request from the ffi consumer thread - var request: ptr FFIThreadRequest - if not ctx.reqChannel.tryRecv(request): - continue - - if ctx.myLib.isNil(): - ctx.myLib = addr ffiReqHandler - - ## Handle the request - pending.add processRequest(request, ctx) - - let fireRes = ctx.reqReceivedSignal.fireSync() - if fireRes.isErr(): - error "could not fireSync back to requester thread", error = fireRes.error - - ## Drain in-flight handlers so each request's `deleteRequest` runs - ## before we exit. Without this, abandoning a future mid-await would - ## leak the request allocations (visible to LSan; previously hidden - ## because Nim's pool allocator kept the chunks alive in the process). - reapCompleted() - if pending.len > 0: - try: - await allFutures(pending) - except CatchableError as exc: - error "draining pending FFI requests on shutdown raised", error = exc.msg - - waitFor ffiRun(ctx) - -proc dispatchQueuedEvent[T](ctx: ptr FFIContext[T], qe: QueuedEvent) = - ## Frees `qe`'s c_malloc buffers on exit. - defer: - if not qe.name.isNil(): - c_free(cast[pointer](qe.name)) - if not qe.data.isNil(): - c_free(qe.data) - ctx.dispatchToListeners($qe.name, qe.data, qe.dataLen) - -proc drainEventQueue[T](ctx: ptr FFIContext[T]) = - while true: - let opt = ctx.eventQueue.tryDequeueEvent() - if opt.isNone(): - break - ctx.dispatchQueuedEvent(opt.get()) - -type HeartbeatMonitor = object - startedAt: Moment - lastChange: Moment - lastValue: int64 - notifiedStale: bool - -proc init(T: type HeartbeatMonitor, ctx: ptr FFIContext): T = - let now = Moment.now() - T( - startedAt: now, - lastChange: now, - lastValue: ctx.ffiHeartbeat.load(), - notifiedStale: false, - ) - -proc check[T](hb: var HeartbeatMonitor, ctx: ptr FFIContext[T]) = - ## Fires onNotResponding once the FFI thread's heartbeat counter stops - ## advancing past the stale threshold. Latches until it moves again. - if Moment.now() - hb.startedAt <= FFIHeartbeatStartDelay: - return - let cur = ctx.ffiHeartbeat.load() - if cur != hb.lastValue: - hb.lastValue = cur - hb.lastChange = Moment.now() - hb.notifiedStale = false - elif not hb.notifiedStale and - Moment.now() - hb.lastChange > FFIHeartbeatStaleThreshold: - onNotResponding(ctx) - hb.notifiedStale = true - -proc eventRun[T](ctx: ptr FFIContext[T]) {.async.} = - var hb = HeartbeatMonitor.init(ctx) - - while ctx.running.load(): - # Wake on enqueue or tick — whichever first. The enqueue path lands in PR #69; - # until then the wait always times out and we fall through to the heartbeat check. - discard await ctx.eventQueueSignal.wait().withTimeout(EventThreadTickInterval) - - ctx.drainEventQueue() - - if not ctx.running.load(): - break - hb.check(ctx) - -proc eventThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = - ## Drains the event queue and runs the FFI-thread heartbeat check. - ## Owns the queued `c_malloc` payloads until dispatch returns. - defer: - let fireRes = ctx.eventThreadExitSignal.fireSync() - if fireRes.isErr(): - error "failed to fire eventThreadExitSignal", err = fireRes.error - - try: - waitFor eventRun(ctx) - except CatchableError as exc: - error "event thread exited with exception", error = exc.msg +include ./event_thread +include ./ffi_thread proc deinitContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = ## Mirror of `initContextResources`: tears down lock, registry, queue, diff --git a/ffi/ffi_thread.nim b/ffi/ffi_thread.nim new file mode 100644 index 0000000..1589c4a --- /dev/null +++ b/ffi/ffi_thread.nim @@ -0,0 +1,166 @@ +## FFI-thread body and request submission API. +## +## Included from `ffi_context.nim` — inherits its imports, FFIContext type, +## and the `onFFIThread` threadvar. Companion to `event_thread.nim`. +## +## Responsibilities: +## - Receive `FFIThreadRequest`s from foreign threads via `reqChannel` and +## dispatch them through the user-registered handler table. +## - Advance `ctx.ffiHeartbeat` each loop iteration so the event thread can +## detect a wedged FFI thread. + +proc sendRequestToFFIThread*( + ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration +): Result[void, string] = + # Reentrancy guard: only this thread can fire `reqReceivedSignal`, so a handler dispatching back would self-deadlock. + if onFFIThread: + deleteRequest(ffiRequest) + return err( + "reentrant ffi call: a handler invoked sendRequestToFFIThread on its own context" + ) + + # All async submissions serialise on `ctx.lock` for the full + # trySend + fireSync + waitSync sequence because `reqChannel` is + # single-producer and `reqReceivedSignal` is shared across callers. + # Multi-producer redesign is tracked as PR #23 review item 7. + ctx.lock.acquire() + defer: + ctx.lock.release() + + ## Sending the request + let sentOk = ctx.reqChannel.trySend(ffiRequest) + if not sentOk: + deleteRequest(ffiRequest) + return err("Couldn't send a request to the ffi thread") + + let fireSyncRes = ctx.reqSignal.fireSync() + if fireSyncRes.isErr(): + deleteRequest(ffiRequest) + return err("failed fireSync: " & $fireSyncRes.error) + + if fireSyncRes.get() == false: + deleteRequest(ffiRequest) + return err("Couldn't fireSync in time") + + ## wait until the FFI working thread properly received the request + let res = ctx.reqReceivedSignal.waitSync(timeout) + if res.isErr(): + ## Do not free ffiRequest here: the FFI thread was already signaled and + ## will process (and free) it. + return err("Couldn't receive reqReceivedSignal signal") + + ## Notice that in case of "ok", the deallocShared(req) is performed by the FFI Thread in the + ## process proc. + ok() + +proc processRequest[T]( + request: ptr FFIThreadRequest, ctx: ptr FFIContext[T] +) {.async.} = + ## Invoked within the FFI thread to process a request coming from the FFI API consumer thread. + + let reqId = $request[].reqId + ## The reqId determines which proc will handle the request. + ## The registeredRequests represents a table defined at compile time. + ## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously] + + ## Explicit conversion keeps `reqId` alive as the backing string, + ## avoiding the implicit string→cstring warning that will become an error. + let reqIdCs = reqId.cstring + + let retFut = + if not ctx[].registeredRequests[].contains(reqIdCs): + ## That shouldn't happen because only registered requests should be sent to the FFI thread. + nilProcess(request[].reqId) + else: + ctx[].registeredRequests[][reqIdCs](cast[pointer](request), ctx) + + ## Catch every catchable exception (including CancelledError raised by + ## the shutdown drain in ffiRun) so handleRes — and its `deleteRequest` + ## defer — always runs. Otherwise an abandoned in-flight handler would + ## leak its request envelope, reqId copy, and CBOR payload. + let res = + try: + await retFut + except CatchableError as exc: + Result[seq[byte], string].err( + "Error in processRequest for " & reqId & ": " & exc.msg + ) + + ## handleRes may raise (OOM, GC setup) even though it is rare. Catching here + ## keeps the async proc raises:[] compatible. The defer inside handleRes + ## guarantees request is freed before the exception propagates. + try: + handleRes(res, request) + except Exception as exc: + error "Unexpected exception in handleRes", error = exc.msg + +proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = + ## FFI thread body that attends library user API requests + ffiCurrentEventRegistry = addr ctx[].eventRegistry + onFFIThread = true + + logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT) + + defer: + onFFIThread = false + # Unblocks destroyFFIContext's bounded wait so cleanup can proceed. + let fireRes = ctx.threadExitSignal.fireSync() + if fireRes.isErr(): + error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error + + let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} = + var ffiReqHandler: T + ## Holds the main library object, i.e., in charge of handling the ffi requests. + ## e.g., Waku, LibP2P, SDS, etc. + + ## In-flight processRequest futures. Tracked so they can be drained on + ## shutdown — otherwise destroying the context while a handler is + ## awaiting (e.g. sleepAsync) abandons the future and leaks the + ## request's envelope/reqId/payload allocations. + var pending: seq[Future[void]] = @[] + + proc reapCompleted() = + var i = 0 + while i < pending.len: + if pending[i].finished(): + pending.del(i) + else: + inc i + + while ctx.running.load(): + # Freezes if a sync handler blocks the dispatcher; event thread reads to detect wedged FFI thread. + discard ctx.ffiHeartbeat.fetchAdd(1) + + reapCompleted() + + let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds) + if not gotSignal: + continue + + ## Wait for a request from the ffi consumer thread + var request: ptr FFIThreadRequest + if not ctx.reqChannel.tryRecv(request): + continue + + if ctx.myLib.isNil(): + ctx.myLib = addr ffiReqHandler + + ## Handle the request + pending.add processRequest(request, ctx) + + let fireRes = ctx.reqReceivedSignal.fireSync() + if fireRes.isErr(): + error "could not fireSync back to requester thread", error = fireRes.error + + ## Drain in-flight handlers so each request's `deleteRequest` runs + ## before we exit. Without this, abandoning a future mid-await would + ## leak the request allocations (visible to LSan; previously hidden + ## because Nim's pool allocator kept the chunks alive in the process). + reapCompleted() + if pending.len > 0: + try: + await allFutures(pending) + except CatchableError as exc: + error "draining pending FFI requests on shutdown raised", error = exc.msg + + waitFor ffiRun(ctx)