diff --git a/examples/timer/rust_bindings/examples/main.rs b/examples/timer/rust_bindings/examples/main.rs index b951369..132b809 100644 --- a/examples/timer/rust_bindings/examples/main.rs +++ b/examples/timer/rust_bindings/examples/main.rs @@ -1,10 +1,8 @@ -//! Synchronous example: exercises the library-event listener API -//! (typed + wildcard + remove). +//! Synchronous example: exercises the typed per-event listener API. //! //! Run with: `cargo run --example main` -use my_timer::{decode_event_payload, EchoEvent, EchoRequest, MyTimerCtx, TimerConfig}; -use std::os::raw::c_int; +use my_timer::{EchoEvent, EchoRequest, MyTimerCtx, TimerConfig}; use std::sync::mpsc; use std::time::Duration; @@ -14,32 +12,12 @@ fn main() -> Result<(), String> { Duration::from_secs(5), )?; - // Typed listener: the closure is invoked on the lib's dispatch - // thread, so forward the payload to `main` via std mpsc and block - // on `recv_timeout` below. `add_on_echo_fired_listener` is generated - // per `{.ffiEvent.}`-declared proc and takes a typed `&EchoEvent`. + // Closure runs on the lib's dispatch thread; forward to `main` via mpsc and recv_timeout below. let (tx, rx) = mpsc::channel::(); let typed_handle = ctx.add_on_echo_fired_listener(move |evt: &EchoEvent| { let _ = tx.send(evt.clone()); }); - // Wildcard listener: receives every event with the FFI return code, - // the wire `event_id` pre-extracted from the CBOR envelope, and the - // raw envelope bytes. Lift to a typed payload via - // `decode_event_payload::` when the event_id matches one you - // care about — this avoids hand-rolling ciborium calls per branch. - let wildcard_handle = ctx.add_event_listener(|ret: c_int, event_id: &str, envelope: &[u8]| { - println!("wildcard: ret={}, event_id={}, bytes={}", ret, event_id, envelope.len()); - if ret == 0 && event_id == "on_echo_fired" { - match decode_event_payload::(envelope) { - Ok(evt) => println!(" decoded: message={}, echo_count={}", evt.message, evt.echo_count), - Err(e) => println!(" decode failed: {}", e), - } - } - }); - - // Trigger the event — fires `on_echo_fired` once, which the - // dispatch thread delivers to both listeners above. ctx.echo(EchoRequest { message: "sync-event-demo".into(), delay_ms: 1 })?; match rx.recv_timeout(Duration::from_secs(2)) { @@ -48,6 +26,5 @@ fn main() -> Result<(), String> { } ctx.remove_event_listener(typed_handle); - ctx.remove_event_listener(wildcard_handle); Ok(()) } diff --git a/examples/timer/rust_bindings/examples/tokio_main.rs b/examples/timer/rust_bindings/examples/tokio_main.rs index 8ab1d16..feff8ea 100644 --- a/examples/timer/rust_bindings/examples/tokio_main.rs +++ b/examples/timer/rust_bindings/examples/tokio_main.rs @@ -1,11 +1,9 @@ -//! Tokio (async) example: same shape as `main.rs` but exercises the -//! async `_async` API and bridges library events into a tokio-aware -//! channel for async consumption. +//! Tokio (async) example: same shape as `main.rs` but exercises the async `_async` API +//! and bridges library events into a tokio mpsc for async consumption. //! //! Run with: `cargo run --example tokio_main` -use my_timer::{decode_event_payload, EchoEvent, EchoRequest, MyTimerCtx, TimerConfig}; -use std::os::raw::c_int; +use my_timer::{EchoEvent, EchoRequest, MyTimerCtx, TimerConfig}; use std::time::Duration; use tokio::sync::mpsc; @@ -17,37 +15,15 @@ async fn main() -> Result<(), String> { ) .await?; - // Typed listener: the handler fires on the lib's dispatch thread, - // which is *outside* the tokio runtime. Forwarding through a tokio - // `unbounded_channel` (Sender is Send + Sync, non-blocking) hands - // the event over to the runtime so we can `.await` it below. + // Handler fires on the lib's dispatch thread (outside the tokio runtime); forward via tokio mpsc to await it below. let (typed_tx, mut typed_rx) = mpsc::unbounded_channel::(); let typed_handle = ctx.add_on_echo_fired_listener(move |evt: &EchoEvent| { let _ = typed_tx.send(evt.clone()); }); - // Wildcard listener: receives every event with the FFI return code, - // the wire `event_id` pre-extracted from the CBOR envelope, and the - // raw envelope bytes. Lift to a typed payload via - // `decode_event_payload::` when the event_id matches one you - // care about — this avoids hand-rolling ciborium calls per branch. - let wildcard_handle = ctx.add_event_listener(|ret: c_int, event_id: &str, envelope: &[u8]| { - println!("wildcard: ret={}, event_id={}, bytes={}", ret, event_id, envelope.len()); - if ret == 0 && event_id == "on_echo_fired" { - match decode_event_payload::(envelope) { - Ok(evt) => println!(" decoded: message={}, echo_count={}", evt.message, evt.echo_count), - Err(e) => println!(" decode failed: {}", e), - } - } - }); - - // Trigger an echo via the async API — fires `on_echo_fired` once, - // which the dispatch thread delivers to both listeners above. ctx.echo_async(EchoRequest { message: "async-event-demo".into(), delay_ms: 1 }) .await?; - // Await the typed event with a bounded timeout so a missing event - // surfaces as an error instead of hanging the example forever. let evt = tokio::time::timeout(Duration::from_secs(2), typed_rx.recv()) .await .map_err(|_| "event never arrived".to_string())? @@ -55,6 +31,5 @@ async fn main() -> Result<(), String> { println!("typed onEchoFired: message={}, echo_count={}", evt.message, evt.echo_count); ctx.remove_event_listener(typed_handle); - ctx.remove_event_listener(wildcard_handle); Ok(()) } diff --git a/ffi/cbor_serial.nim b/ffi/cbor_serial.nim index 5c1ae8f..0fc87ce 100644 --- a/ffi/cbor_serial.nim +++ b/ffi/cbor_serial.nim @@ -35,9 +35,6 @@ export cbor_serialization, options, results const CborNullByte*: byte = 0xf6'u8 ## CBOR encoding of `null` — used as the wire sentinel for empty OK payloads. -# --------------------------------------------------------------------------- -# Public API -# --------------------------------------------------------------------------- proc cborEncode*[T](x: T): seq[byte] = ## CBOR-encode any cbor_serialization-supported type (plus `pointer` / `ptr T` diff --git a/ffi/codegen/rust.nim b/ffi/codegen/rust.nim index 6972954..c8d278e 100644 --- a/ffi/codegen/rust.nim +++ b/ffi/codegen/rust.nim @@ -63,9 +63,6 @@ proc reqStructName(p: FFIProcMeta): string = else: camel & "Req" -# --------------------------------------------------------------------------- -# File generators -# --------------------------------------------------------------------------- proc generateCargoToml*(libName: string): string = # `flume` is the unified callback channel (PR #23 Rust review, item 8): one diff --git a/ffi/event_thread.nim b/ffi/event_thread.nim new file mode 100644 index 0000000..81d9c77 --- /dev/null +++ b/ffi/event_thread.nim @@ -0,0 +1,138 @@ +## Event-thread body and FFI-thread liveness monitoring. +## +## Included from `ffi_context.nim` — inherits its imports, FFIContext type, +## and the heartbeat-timing constants. Lives alongside `ffi_thread.nim` +## so each thread's machinery is readable on its own. +## +## Responsibilities: +## - Drain queued events into listener callbacks (queue producer lands in PR #69). +## - Watch `ctx.ffiHeartbeat` and emit `NotRespondingEvent` / `RespondingEvent` +## on FFI-thread stall and recovery transitions. + +type + NotRespondingEvent* = object + RespondingEvent* = object + +const + NotRespondingEventName* = "not_responding" + RespondingEventName* = "responding" + +proc dispatchToListeners[T]( + ctx: ptr FFIContext[T], eventName: string, data: pointer, dataLen: int +) = + ## Holds reg.lock for the entire snapshot + invocation so concurrent + ## add/remove on this registry blocks until dispatch returns. + withLock ctx[].eventRegistry.lock: + let listeners = ctx[].eventRegistry.byEvent.getOrDefault(eventName) + if listeners.len == 0: + chronicles.debug "no listener registered", event = eventName + return + foreignThreadGc: + try: + notifyListeners(listeners, RET_OK, data, dataLen) + except Exception, CatchableError: + notifyListenersErr( + listeners, + "Exception dispatching " & eventName & ": " & getCurrentExceptionMsg(), + ) + +proc emitLivenessEvent[T, P](ctx: ptr FFIContext[T], name: string, payload: P) = + ## Encodes a zero-field liveness event (`NotRespondingEvent`, + ## `RespondingEvent`) and dispatches it directly to listeners, bypassing + ## the event queue (which may itself be wedged). Runs on the event thread. + let event = + try: + EventEnvelope[P](eventType: name, payload: payload).cborEncode() + except CatchableError as exc: + chronicles.error "liveness event encode failed", name = name, err = exc.msg + return + let dataPtr: pointer = + if event.len > 0: cast[pointer](unsafeAddr event[0]) + else: cast[pointer](emptyListenerPayload) + ctx.dispatchToListeners(name, dataPtr, event.len) + +proc onNotResponding*(ctx: ptr FFIContext) = + emitLivenessEvent(ctx, NotRespondingEventName, NotRespondingEvent()) + +proc onResponding*(ctx: ptr FFIContext) = + ## Fired once when the FFI thread's heartbeat starts advancing again + ## after a `NotRespondingEvent`. Lets consumers clear any "library + ## hung" UI state without polling. + emitLivenessEvent(ctx, RespondingEventName, RespondingEvent()) + +proc dispatchQueuedEvent[T](ctx: ptr FFIContext[T], qe: QueuedEvent) = + ## Frees `qe`'s c_malloc buffers on exit. + defer: + if not qe.name.isNil(): + c_free(cast[pointer](qe.name)) + if not qe.data.isNil(): + c_free(qe.data) + ctx.dispatchToListeners($qe.name, qe.data, qe.dataLen) + +proc drainEventQueue[T](ctx: ptr FFIContext[T]) = + while true: + let opt = ctx.eventQueue.tryDequeueEvent() + if opt.isNone(): + break + ctx.dispatchQueuedEvent(opt.get()) + +type HeartbeatMonitor = object + startedAt: Moment + lastChange: Moment + lastValue: int64 + notifiedStale: bool + +proc init(T: type HeartbeatMonitor, ctx: ptr FFIContext): T = + let now = Moment.now() + T( + startedAt: now, + lastChange: now, + lastValue: ctx.ffiHeartbeat.load(), + notifiedStale: false, + ) + +proc check[T](hb: var HeartbeatMonitor, ctx: ptr FFIContext[T]) = + ## Fires `onNotResponding` once the FFI thread's heartbeat counter stops + ## advancing past the stale threshold, and fires `onResponding` once it + ## starts advancing again. Both transitions latch so each is emitted at + ## most once per stall episode. + if Moment.now() - hb.startedAt <= FFIHeartbeatStartDelay: + return + let cur = ctx.ffiHeartbeat.load() + if cur != hb.lastValue: + if hb.notifiedStale: + onResponding(ctx) + hb.lastValue = cur + hb.lastChange = Moment.now() + hb.notifiedStale = false + elif not hb.notifiedStale and + Moment.now() - hb.lastChange > FFIHeartbeatStaleThreshold: + onNotResponding(ctx) + hb.notifiedStale = true + +proc eventRun[T](ctx: ptr FFIContext[T]) {.async.} = + var hb = HeartbeatMonitor.init(ctx) + + while ctx.running.load(): + # Wake on enqueue or tick — whichever first. The enqueue path lands in PR #69; + # until then the wait always times out and we fall through to the heartbeat check. + discard await ctx.eventQueueSignal.wait().withTimeout(EventThreadTickInterval) + + ctx.drainEventQueue() + + if not ctx.running.load(): + break + hb.check(ctx) + +proc eventThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = + ## Drains the event queue and runs the FFI-thread heartbeat check. + ## Owns the queued `c_malloc` payloads until dispatch returns. + defer: + let fireRes = ctx.eventThreadExitSignal.fireSync() + if fireRes.isErr(): + error "failed to fire eventThreadExitSignal", err = fireRes.error + + try: + waitFor eventRun(ctx) + except CatchableError as exc: + error "event thread exited with exception", error = exc.msg diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index 531347c..705ad23 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -1,9 +1,20 @@ +## FFIContext type plus lifecycle (init / signal-stop / join / destroy). +## +## The per-thread bodies live in `ffi_thread.nim` and `event_thread.nim`, +## included below so the thread code can access the private FFIContext +## fields without forcing them through a public surface. + {.passc: "-fPIC".} -import std/[atomics, locks, json, tables] +import system/ansi_c +import std/[atomics, locks, options, tables] import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results import - ./ffi_types, ./ffi_events, ./ffi_thread_request, ./internal/ffi_macro, ./logging, + ./ffi_types, + ./ffi_events, + ./ffi_thread_request, + ./internal/ffi_macro, + ./logging, ./cbor_serial export ffi_events @@ -13,21 +24,21 @@ type FFIContext*[T] = object # main library object (e.g., Waku, LibP2P, SDS, the one to be exposed as a library) ffiThread: Thread[(ptr FFIContext[T])] # represents the main FFI thread in charge of attending API consumer actions - watchdogThread: Thread[(ptr FFIContext[T])] - # monitors the FFI thread and notifies the FFI API consumer if it hangs + eventThread: Thread[(ptr FFIContext[T])] + # drains the event queue and runs the FFI-thread heartbeat check lock: Lock reqChannel: ChannelSPSCSingle[ptr FFIThreadRequest] reqSignal: ThreadSignalPtr # to notify the FFI Thread that a new request is sent reqReceivedSignal: ThreadSignalPtr # to signal main thread, interfacing with the FFI thread, that FFI thread received the request stopSignal: ThreadSignalPtr - # fired by destroyFFIContext so both ffiThread and watchdogThread can exit promptly - threadExitSignal: ThreadSignalPtr - # fired by ffiThread just before it exits; destroyFFIContext waits on - # this with a bounded timeout instead of joining unconditionally, so a - # blocked event loop cannot hang the caller forever + threadExitSignal: ThreadSignalPtr # bounds destroyFFIContext's wait so a blocked loop cannot hang the caller + eventQueueSignal: ThreadSignalPtr # wakes the event thread on enqueue (used once dispatch is rewired in PR #69) + eventThreadExitSignal: ThreadSignalPtr # mirrors threadExitSignal for the event thread userData*: pointer eventRegistry*: FFIEventRegistry + eventQueue*: EventQueue + ffiHeartbeat*: Atomic[int64] # advanced each FFI-thread loop; event thread reads for liveness running: Atomic[bool] # To control when the threads are running registeredRequests: ptr Table[cstring, FFIRequestProc] # Pointer to with the registered requests at compile time @@ -39,246 +50,22 @@ var onFFIThread* {.threadvar.}: bool const git_version* {.strdefine.} = "n/a" -proc sendRequestToFFIThread*( - ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration -): Result[void, string] = - # Reentrancy guard (PR #23 review, item 6): if a handler running on the FFI - # thread tries to dispatch back through this proc, it would wait forever on - # `reqReceivedSignal` — which only this thread can fire — and self-deadlock. - # Return an error instead so the caller can surface it. - if onFFIThread: - deleteRequest(ffiRequest) - return err( - "reentrant ffi call: a handler invoked sendRequestToFFIThread on its own context" - ) +const + EventThreadTickInterval* = 1.seconds # bounds idle heartbeat check latency + FFIHeartbeatStartDelay* = 10.seconds # grace window for library startup + FFIHeartbeatStaleThreshold* = 1.seconds - # All async submissions serialise on `ctx.lock` for the full - # trySend + fireSync + waitSync sequence because `reqChannel` is - # single-producer and `reqReceivedSignal` is shared across callers. - # Multi-producer redesign is tracked as PR #23 review item 7. - ctx.lock.acquire() - defer: - ctx.lock.release() +include ./event_thread +include ./ffi_thread - ## Sending the request - let sentOk = ctx.reqChannel.trySend(ffiRequest) - if not sentOk: - deleteRequest(ffiRequest) - return err("Couldn't send a request to the ffi thread") - - let fireSyncRes = ctx.reqSignal.fireSync() - if fireSyncRes.isErr(): - deleteRequest(ffiRequest) - return err("failed fireSync: " & $fireSyncRes.error) - - if fireSyncRes.get() == false: - deleteRequest(ffiRequest) - return err("Couldn't fireSync in time") - - ## wait until the FFI working thread properly received the request - let res = ctx.reqReceivedSignal.waitSync(timeout) - if res.isErr(): - ## Do not free ffiRequest here: the FFI thread was already signaled and - ## will process (and free) it. - return err("Couldn't receive reqReceivedSignal signal") - - ## Notice that in case of "ok", the deallocShared(req) is performed by the FFI Thread in the - ## process proc. - return ok() - -type Foo = object -registerReqFFI(WatchdogReq, foo: ptr Foo): - proc(): Future[Result[string, string]] {.async.} = - return ok("FFI thread is not blocked") - -type JsonNotRespondingEvent = object - eventType: string - -proc init(T: type JsonNotRespondingEvent): T = - return JsonNotRespondingEvent(eventType: "not_responding") - -proc `$`(event: JsonNotRespondingEvent): string = - $(%*event) - -proc onNotResponding*(ctx: ptr FFIContext) = - ## Shim: still emits the legacy JSON payload through the registry, so - ## existing foreign consumers see no wire-shape change. A follow-up - ## PR replaces this with a CBOR `NotRespondingEvent`. - ## Mirrors the dispatch templates' lock-during-invocation contract - ## (see `ffi_events.nim`). - withLock ctx[].eventRegistry.lock: - let snap = ctx[].eventRegistry.byEvent.getOrDefault("onNotResponding") - if snap.len == 0: - chronicles.debug "onNotResponding - no listener registered" - return - foreignThreadGc: - let event = $JsonNotRespondingEvent.init() - for listener in snap: - listener.callback( - RET_OK, - cast[ptr cchar](unsafeAddr event[0]), - cast[csize_t](len(event)), - listener.userData, - ) - -proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} = - ## Watchdog thread that monitors the FFI thread and notifies the library user if it hangs. - ## This thread never blocks. - - let watchdogRun = proc(ctx: ptr FFIContext) {.async.} = - const WatchdogStartDelay = 10.seconds - const WatchdogTimeinterval = 1.seconds - const WatchdogTimeout = 20.seconds - - # Give time for the node to be created and up before sending watchdog requests - let initialStop = await ctx.stopSignal.wait().withTimeout(WatchdogStartDelay) - if initialStop or ctx.running.load == false: - return - - while true: - let intervalStop = await ctx.stopSignal.wait().withTimeout(WatchdogTimeinterval) - - if intervalStop or ctx.running.load == false: - debug "Watchdog thread exiting because FFIContext is not running" - break - - let callback = proc( - callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer - ) {.cdecl, gcsafe, raises: [].} = - discard ## Don't do anything. Just respecting the callback signature. - const nilUserData = nil - - trace "Sending watchdog request to FFI thread" - - try: - sendRequestToFFIThread( - ctx, WatchdogReq.ffiNewReq(callback, nilUserData), WatchdogTimeout - ).isOkOr: - error "Failed to send watchdog request to FFI thread", error = $error - onNotResponding(ctx) - except Exception as exc: - error "Exception sending watchdog request", exc = exc.msg - onNotResponding(ctx) - - waitFor watchdogRun(ctx) - -proc processRequest[T]( - request: ptr FFIThreadRequest, ctx: ptr FFIContext[T] -) {.async.} = - ## Invoked within the FFI thread to process a request coming from the FFI API consumer thread. - - let reqId = $request[].reqId - ## The reqId determines which proc will handle the request. - ## The registeredRequests represents a table defined at compile time. - ## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously] - - ## Explicit conversion keeps `reqId` alive as the backing string, - ## avoiding the implicit string→cstring warning that will become an error. - let reqIdCs = reqId.cstring - - let retFut = - if not ctx[].registeredRequests[].contains(reqIdCs): - ## That shouldn't happen because only registered requests should be sent to the FFI thread. - nilProcess(request[].reqId) - else: - ctx[].registeredRequests[][reqIdCs](cast[pointer](request), ctx) - - ## Catch every catchable exception (including CancelledError raised by - ## the shutdown drain in ffiRun) so handleRes — and its `deleteRequest` - ## defer — always runs. Otherwise an abandoned in-flight handler would - ## leak its request envelope, reqId copy, and CBOR payload. - let res = - try: - await retFut - except CatchableError as exc: - Result[seq[byte], string].err( - "Error in processRequest for " & reqId & ": " & exc.msg - ) - - ## handleRes may raise (OOM, GC setup) even though it is rare. Catching here - ## keeps the async proc raises:[] compatible. The defer inside handleRes - ## guarantees request is freed before the exception propagates. - try: - handleRes(res, request) - except Exception as exc: - error "Unexpected exception in handleRes", error = exc.msg - -proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = - ## FFI thread body that attends library user API requests - ffiCurrentEventRegistry = addr ctx[].eventRegistry - onFFIThread = true - - logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT) - - defer: - onFFIThread = false - # Signal destroyFFIContext that this thread has exited, so its bounded - # wait can unblock and proceed with cleanup. - let fireRes = ctx.threadExitSignal.fireSync() - if fireRes.isErr(): - error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error - - let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} = - var ffiReqHandler: T - ## Holds the main library object, i.e., in charge of handling the ffi requests. - ## e.g., Waku, LibP2P, SDS, etc. - - ## In-flight processRequest futures. Tracked so they can be drained on - ## shutdown — otherwise destroying the context while a handler is - ## awaiting (e.g. sleepAsync) abandons the future and leaks the - ## request's envelope/reqId/payload allocations. - var pending: seq[Future[void]] = @[] - - proc reapCompleted() = - var i = 0 - while i < pending.len: - if pending[i].finished(): - pending.del(i) - else: - inc i - - while ctx.running.load(): - reapCompleted() - - let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds) - if not gotSignal: - continue - - ## Wait for a request from the ffi consumer thread - var request: ptr FFIThreadRequest - if not ctx.reqChannel.tryRecv(request): - continue - - if ctx.myLib.isNil(): - ctx.myLib = addr ffiReqHandler - - ## Handle the request - pending.add processRequest(request, ctx) - - let fireRes = ctx.reqReceivedSignal.fireSync() - if fireRes.isErr(): - error "could not fireSync back to requester thread", error = fireRes.error - - ## Drain in-flight handlers so each request's `deleteRequest` runs - ## before we exit. Without this, abandoning a future mid-await would - ## leak the request allocations (visible to LSan; previously hidden - ## because Nim's pool allocator kept the chunks alive in the process). - reapCompleted() - if pending.len > 0: - try: - await allFutures(pending) - except CatchableError as exc: - error "draining pending FFI requests on shutdown raised", - error = exc.msg - - waitFor ffiRun(ctx) - -proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] = - ## Full cleanup for heap-allocated contexts: closes all resources and frees memory. - defer: - freeShared(ctx) +proc deinitContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = + ## Mirror of `initContextResources`: tears down lock, registry, queue, + ## and signal fds in place. Threads MUST already be joined. Caller owns + ## the memory holding `ctx`. Fields are nil'd after close so a re-init + ## on the same slot doesn't double-close. ctx.lock.deinitLock() deinitEventRegistry(ctx[].eventRegistry) + deinitEventQueue(ctx[].eventQueue) when defined(gcRefc): ## ThreadSignalPtr.close() is intentionally skipped under --mm:refc. ## @@ -303,20 +90,45 @@ proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] = else: if not ctx.reqSignal.isNil(): ?ctx.reqSignal.close() + ctx.reqSignal = nil if not ctx.reqReceivedSignal.isNil(): ?ctx.reqReceivedSignal.close() + ctx.reqReceivedSignal = nil if not ctx.stopSignal.isNil(): ?ctx.stopSignal.close() + ctx.stopSignal = nil if not ctx.threadExitSignal.isNil(): ?ctx.threadExitSignal.close() + ctx.threadExitSignal = nil + if not ctx.eventQueueSignal.isNil(): + ?ctx.eventQueueSignal.close() + ctx.eventQueueSignal = nil + if not ctx.eventThreadExitSignal.isNil(): + ?ctx.eventThreadExitSignal.close() + ctx.eventThreadExitSignal = nil return ok() +proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] = + ## Full cleanup for heap-allocated contexts: closes all resources and frees memory. + defer: + freeShared(ctx) + ctx.deinitContextResources() + proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = ## Initialises all resources inside an already-allocated FFIContext slot. ## On failure every partially-initialised resource is closed; the caller ## is responsible for releasing the slot (freeShared or pool.releaseSlot). + # Defensive nil: deferred cleanup must never double-close stale pointers on a reused pool slot. + ctx.reqSignal = nil + ctx.reqReceivedSignal = nil + ctx.stopSignal = nil + ctx.threadExitSignal = nil + ctx.eventQueueSignal = nil + ctx.eventThreadExitSignal = nil ctx.lock.initLock() initEventRegistry(ctx[].eventRegistry) + initEventQueue(ctx[].eventQueue) + ctx.ffiHeartbeat.store(0) var success = false defer: @@ -337,6 +149,12 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = ctx.threadExitSignal = ThreadSignalPtr.new().valueOr: return err("couldn't create threadExitSignal ThreadSignalPtr: " & $error) + ctx.eventQueueSignal = ThreadSignalPtr.new().valueOr: + return err("couldn't create eventQueueSignal ThreadSignalPtr: " & $error) + + ctx.eventThreadExitSignal = ThreadSignalPtr.new().valueOr: + return err("couldn't create eventThreadExitSignal ThreadSignalPtr: " & $error) + ctx.registeredRequests = addr ffi_types.registeredRequests ctx.running.store(true) @@ -347,32 +165,40 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = return err("failed to create the FFI thread: " & getCurrentExceptionMsg()) try: - createThread(ctx.watchdogThread, watchdogThreadBody, ctx) + createThread(ctx.eventThread, eventThreadBody[T], ctx) except ValueError, ResourceExhaustedError: ## ffiThread is already running; signal it to exit and join before the ## deferred cleanUpResources closes the signals it's waiting on. ctx.running.store(false) let fireRes = ctx.reqSignal.fireSync() if fireRes.isErr(): - error "failed to signal ffiThread during watchdog cleanup", error = fireRes.error + error "failed to signal ffiThread during event-thread cleanup", + error = fireRes.error joinThread(ctx.ffiThread) - return err("failed to create the watchdog thread: " & getCurrentExceptionMsg()) + return err("failed to create the event thread: " & getCurrentExceptionMsg()) success = true return ok() proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] = + # Error paths intentionally skip onNotResponding: a back-pressuring + # listener may hold reg.lock, and onNotResponding takes it — would + # amplify the stuck state into a deadlock instead of escaping it. ctx.running.store(false) let reqSignaled = ctx.reqSignal.fireSync().valueOr: - ctx.onNotResponding() return err("error signaling reqSignal in signalStop: " & $error) if not reqSignaled: - ctx.onNotResponding() return err("failed to signal reqSignal on time in signalStop") let stopSignaled = ctx.stopSignal.fireSync().valueOr: return err("error signaling stopSignal in signalStop: " & $error) if not stopSignaled: return err("failed to signal stopSignal on time in signalStop") + # Non-fatal: event thread will see running==false on the next tick. + let evtSignaled = ctx.eventQueueSignal.fireSync() + if evtSignaled.isErr(): + error "failed to signal eventQueueSignal in signalStop", error = evtSignaled.error + elif evtSignaled.get() == false: + error "failed to signal eventQueueSignal on time in signalStop" return ok() ## If the FFI thread's event loop is blocked by a synchronous handler @@ -383,23 +209,30 @@ proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] = const ThreadExitTimeout* = 1500.milliseconds proc stopAndJoinThreads*[T](ctx: ptr FFIContext[T]): Result[void, string] = - ## Signals the FFI and watchdog threads to stop, waits up to ThreadExitTimeout - ## for the FFI thread to exit, and joins both. On timeout returns err and - ## skips joinThread (leaving the threads live) rather than hanging the caller. - ## Resource cleanup (signal fds, lock) is the caller's responsibility. + ## Signals both threads to stop, waits up to ThreadExitTimeout per thread, + ## and joins them. On timeout returns err and skips remaining joins + ## (leaving the threads live) rather than hanging the caller. Resource + ## cleanup is the caller's responsibility. + ## + ## Timeout paths skip onNotResponding for the same reason signalStop does. ctx.signalStop().isOkOr: return err("signalStop failed: " & $error) - let exitedOnTime = ctx.threadExitSignal.waitSync(ThreadExitTimeout).valueOr: - ctx.onNotResponding() + let ffiExitedOnTime = ctx.threadExitSignal.waitSync(ThreadExitTimeout).valueOr: return err("error waiting for FFI thread exit: " & $error) - if not exitedOnTime: - ctx.onNotResponding() + if not ffiExitedOnTime: return err("FFI thread did not exit in time; leaking ctx to avoid hang") joinThread(ctx.ffiThread) - joinThread(ctx.watchdogThread) + + let evtExitedOnTime = ctx.eventThreadExitSignal.waitSync(ThreadExitTimeout).valueOr: + return err("error waiting for event thread exit: " & $error) + + if not evtExitedOnTime: + return err("event thread did not exit in time; leaking ctx to avoid hang") + + joinThread(ctx.eventThread) return ok() proc clearContext[T](ctx: ptr FFIContext[T]): Result[void, string] = diff --git a/ffi/ffi_context_pool.nim b/ffi/ffi_context_pool.nim index 547a1ff..a92ef6e 100644 --- a/ffi/ffi_context_pool.nim +++ b/ffi/ffi_context_pool.nim @@ -48,13 +48,12 @@ proc destroyFFIContext*[T]( ## unsafe. ctx.stopAndJoinThreads().isOkOr: return err("destroyFFIContext(pool): " & $error) - # Tear down the event registry on the *owning* thread so its - # GC-managed Table / seq storage is freed on the same heap that - # allocated it. Without this, the next thread to grab this slot - # would crash inside `initEventRegistry`'s assignment-dtor when - # `initTable` tries to dealloc the previous thread's data. - deinitEventRegistry(ctx[].eventRegistry) + # Without this, the next acquisition would re-init an already-initialised + # lock (UB) and leak the previous signal fds. + let deinitRes = ctx.deinitContextResources() pool.releaseSlot(ctx) + deinitRes.isOkOr: + return err("destroyFFIContext(pool): " & $error) return ok() proc isValidCtx*[T](pool: var FFIContextPool[T], ctx: pointer): bool = diff --git a/ffi/ffi_events.nim b/ffi/ffi_events.nim index 0a33308..29f8c89 100644 --- a/ffi/ffi_events.nim +++ b/ffi/ffi_events.nim @@ -1,29 +1,15 @@ -## Event registry and dispatch primitives for FFI library-initiated events. -## -## This module owns two concerns so they can evolve together without dragging -## in the rest of `FFIContext`: -## -## 1. A multi-listener registry. Each event name maps to a `seq` of -## listeners; a dispatched event reaches exactly the listeners -## subscribed to its name. Callers subscribe to each event separately. -## 2. The dispatch templates (`dispatchFFIEvent`, `dispatchFFIEventCbor`) used -## by `{.ffiEvent.}`-generated procs. They snapshot the registry under its -## lock, then invoke each listener *outside* the lock so re-entrant -## add/remove from within a handler cannot self-deadlock. -## -## Phase 1 keeps dispatch synchronous on the FFI thread. A later phase will -## route events through a bounded queue to a dedicated event thread; the -## registry API does not change. +## Event registry, bounded SPSC event queue, and dispatch templates for +## FFI library-initiated events. Listeners receive only the event name +## they subscribed to. Queue payloads travel via `c_malloc` so transfer +## across Nim heaps is safe under both `--mm:orc` and `--mm:refc`. {.pragma: callback, cdecl, raises: [], gcsafe.} -import std/[locks, sequtils, tables] +import system/ansi_c +import std/[locks, sequtils, options, tables] import chronicles import ./ffi_types, ./cbor_serial -# --------------------------------------------------------------------------- -# Wire envelope -# --------------------------------------------------------------------------- type EventEnvelope*[T] = object ## Standard wire shape for CBOR-encoded FFI events: @@ -32,9 +18,6 @@ type EventEnvelope*[T] = object eventType*: string payload*: T -# --------------------------------------------------------------------------- -# Registry types -# --------------------------------------------------------------------------- type FFIEventListener* = object @@ -50,9 +33,6 @@ type nextId*: uint64 ## Monotonic id source. 0 is reserved as "invalid"; ids start at 1. byEvent*: Table[string, seq[FFIEventListener]] -# --------------------------------------------------------------------------- -# Registry lifecycle and mutation -# --------------------------------------------------------------------------- proc initEventRegistry*(reg: var FFIEventRegistry) = ## Must be called exactly once on the owning thread before the registry @@ -99,7 +79,7 @@ proc addEventListener*( let listener = FFIEventListener(id: assigned, callback: callback, userData: userData) reg.byEvent.mgetOrPut(eventName, @[]).add(listener) - return assigned + assigned proc removeEventListener*(reg: var FFIEventRegistry, id: uint64): bool {.raises: [].} = ## Removes the listener with `id`. Returns true on success, false if no @@ -126,7 +106,7 @@ proc removeEventListener*(reg: var FFIEventRegistry, id: uint64): bool {.raises: break if prune: reg.byEvent.del(pruneKey) - return removed + removed proc removeAllEventListeners*(reg: var FFIEventRegistry) {.raises: [].} = ## Drops every registered listener. Does not reset the listener-id @@ -142,29 +122,120 @@ proc snapshotListeners*( ## makes re-entrant add/remove from inside a handler deadlock-free: ## dispatch holds the lock only for the duration of the copy, then ## iterates the copy outside the lock. - var snap: seq[FFIEventListener] = @[] + var listeners: seq[FFIEventListener] = @[] withLock reg.lock: - # `getOrDefault` returns an empty seq when the key is absent — - # avoids the raising `[]` operator path. + # `getOrDefault` avoids the raising `[]` path; returns empty when absent. for l in reg.byEvent.getOrDefault(eventName): - snap.add(l) - return snap + listeners.add(l) + listeners -# --------------------------------------------------------------------------- -# Dispatch templates (used by {.ffiEvent.}-generated procs) -# --------------------------------------------------------------------------- + +const EventQueueCapacity* = 1024 + ## ~24 KiB per context. Sustained backlog at this depth means a + ## listener is wedged — what the stuck flag exists to surface. + +type + QueuedEvent* = object + ## All fields are raw `c_malloc` pointers so the buffer survives + ## pool-slot reuse across thread heaps without an assignment dtor. + name*: cstring + data*: ptr UncheckedArray[byte] + dataLen*: int + + EventQueue* = object + ## SPSC ring: FFI thread enqueues, event thread dequeues. Plain lock + ## (no atomic indices) — operations are short and uncontended. + lock*: Lock + head*: int + tail*: int + count*: int + buf*: array[EventQueueCapacity, QueuedEvent] + +proc initEventQueue*(q: var EventQueue) {.raises: [].} = + ## Same single-owning-thread constraint as `initEventRegistry`. + q.lock.initLock() + q.head = 0 + q.tail = 0 + q.count = 0 + for i in 0 ..< EventQueueCapacity: + q.buf[i] = QueuedEvent(name: nil, data: nil, dataLen: 0) + +proc deinitEventQueue*(q: var EventQueue) {.raises: [].} = + ## Both producer and consumer must have stopped before calling. + for i in 0 ..< EventQueueCapacity: + let e = q.buf[i] + if not e.name.isNil: + c_free(cast[pointer](e.name)) + if not e.data.isNil: + c_free(e.data) + q.buf[i] = QueuedEvent(name: nil, data: nil, dataLen: 0) + q.head = 0 + q.tail = 0 + q.count = 0 + q.lock.deinitLock() + +proc tryEnqueueEvent*( + q: var EventQueue, name: cstring, data: ptr UncheckedArray[byte], dataLen: int +): bool {.raises: [], gcsafe.} = + ## Both `name` and `data` must be `c_malloc`'d; on success the queue + ## takes ownership. On false the caller still owns and must free them. + withLock q.lock: + if q.count >= EventQueueCapacity: + return false + q.buf[q.tail] = QueuedEvent(name: name, data: data, dataLen: dataLen) + q.tail = (q.tail + 1) mod EventQueueCapacity + q.count.inc() + true + +proc tryDequeueEvent*(q: var EventQueue): Option[QueuedEvent] {.raises: [], gcsafe.} = + ## Transfers buffer ownership to the caller, who must `c_free` both. + withLock q.lock: + if q.count == 0: + return none(QueuedEvent) + let e = q.buf[q.head] + q.buf[q.head] = QueuedEvent(name: nil, data: nil, dataLen: 0) + q.head = (q.head + 1) mod EventQueueCapacity + q.count.dec() + return some(e) + +proc eventQueueLen*(q: var EventQueue): int {.raises: [], gcsafe.} = + withLock q.lock: + return q.count + + +const emptyListenerPayload*: cstring = "" + ## Non-nil zero-length buffer handed to listeners when a payload is + ## empty, so a consumer doing `std::string(data, len)` / `memcpy` never + ## receives a nil pointer (which is UB even at len 0). + +proc notifyListeners*( + listeners: seq[FFIEventListener], retCode: cint, data: pointer, dataLen: int +) = + ## Fans out a payload to every listener in the snapshot. Empty payloads + ## are delivered as the non-nil `emptyListenerPayload` sentinel so a + ## consumer doing `std::string(data, len)` / `memcpy` never receives nil. + let n = max(dataLen, 0) + let dataPtr = + if n > 0 and not data.isNil(): cast[ptr cchar](data) + else: cast[ptr cchar](emptyListenerPayload) + for listener in listeners: + listener.callback(retCode, dataPtr, cast[csize_t](n), listener.userData) + +proc notifyListenersErr*(listeners: seq[FFIEventListener], msg: string) = + ## Error fan-out: adapts the message string to `notifyListeners`, which + ## supplies the non-nil pointer for the empty-message case. + let p = + if msg.len > 0: cast[pointer](unsafeAddr msg[0]) + else: cast[pointer](emptyListenerPayload) + notifyListeners(listeners, RET_ERR, p, msg.len) var ffiCurrentEventRegistry* {.threadvar.}: ptr FFIEventRegistry ## Set by the FFI thread at startup so dispatchFFIEvent / dispatchFFIEventCbor ## can find their registry without taking a context pointer per call site. -template withFFIEventDispatch( - eventName: string, listeners, body: untyped -) = - ## Shared scaffold for `dispatchFFIEvent` / `dispatchFFIEventCbor`: - ## resolves the thread-local registry, snapshots listeners under - ## `reg.lock` into the caller-named `listeners` binding, then runs - ## `body` inside `foreignThreadGc` + try/except. +template withFFIEventDispatch(eventName: string, listeners, body: untyped) = + ## Resolves the thread-local registry, snapshots listeners under + ## `reg.lock`, then runs `body` inside `foreignThreadGc` + try/except. let regPtr = ffiCurrentEventRegistry if regPtr.isNil(): chronicles.error eventName & " - event registry not set on this thread" @@ -179,53 +250,39 @@ template withFFIEventDispatch( try: body except Exception, CatchableError: - let msg = - "Exception dispatching " & eventName & ": " & getCurrentExceptionMsg() - for listener in listeners: - listener.callback( - RET_ERR, - cast[ptr cchar](unsafeAddr msg[0]), - cast[csize_t](len(msg)), - listener.userData, - ) + notifyListenersErr( + listeners, + "Exception dispatching " & eventName & ": " & getCurrentExceptionMsg(), + ) template dispatchFFIEvent*(eventName: string, body: untyped) = ## Dispatches an FFI event to every listener subscribed to `eventName`. ## `body` must yield a `string` or `seq[byte]`. ## - ## Valid only on the FFI thread (where `ffiCurrentEventRegistry` is - ## set). Holds `reg.lock` for the entire snapshot + invocation so a - ## concurrent `removeEventListener` from a foreign thread blocks until - ## dispatch returns — closes the UAF window in #40 / PR #39 review - ## #4356915554. Handlers must not call addEventListener / - ## removeEventListener on the same registry (would self-deadlock). + ## Valid only on the FFI thread (where `ffiCurrentEventRegistry` is set). + ## Holds `reg.lock` for the entire snapshot + invocation so a concurrent + ## `removeEventListener` from a foreign thread blocks until dispatch + ## returns. Handlers must not call addEventListener / removeEventListener + ## on the same registry (would self-deadlock). withFFIEventDispatch(eventName, listeners): let event = body - for listener in listeners: - listener.callback( - RET_OK, - cast[ptr cchar](unsafeAddr event[0]), - cast[csize_t](len(event)), - listener.userData, - ) + let dataPtr: pointer = + if event.len > 0: cast[pointer](unsafeAddr event[0]) + else: cast[pointer](emptyListenerPayload) + notifyListeners(listeners, RET_OK, dataPtr, event.len) template dispatchFFIEventCbor*(eventName: string, eventPayload: typed) = ## Typed CBOR variant of `dispatchFFIEvent`. Wraps `eventPayload` in an ## `EventEnvelope`, CBOR-encodes it into a `c_malloc` buffer once, and ## fans the same buffer out to every registered listener. ## - ## NB: the template parameter is intentionally named `eventPayload` - ## rather than `payload` — Nim's template substitution would otherwise - ## also replace the `payload:` field name inside `EventEnvelope`. + ## NB: parameter is `eventPayload`, not `payload` — Nim's template + ## substitution would otherwise also rewrite the `payload:` field inside + ## `EventEnvelope`. withFFIEventDispatch(eventName, listeners): var (data, dataLen) = cborEncodeShared( - EventEnvelope[typeof(eventPayload)]( - eventType: eventName, payload: eventPayload - ) + EventEnvelope[typeof(eventPayload)](eventType: eventName, payload: eventPayload) ) defer: cborFreeShared(data) - for listener in listeners: - listener.callback( - RET_OK, cast[ptr cchar](data), cast[csize_t](dataLen), listener.userData - ) + notifyListeners(listeners, RET_OK, data, dataLen) diff --git a/ffi/ffi_thread.nim b/ffi/ffi_thread.nim new file mode 100644 index 0000000..1589c4a --- /dev/null +++ b/ffi/ffi_thread.nim @@ -0,0 +1,166 @@ +## FFI-thread body and request submission API. +## +## Included from `ffi_context.nim` — inherits its imports, FFIContext type, +## and the `onFFIThread` threadvar. Companion to `event_thread.nim`. +## +## Responsibilities: +## - Receive `FFIThreadRequest`s from foreign threads via `reqChannel` and +## dispatch them through the user-registered handler table. +## - Advance `ctx.ffiHeartbeat` each loop iteration so the event thread can +## detect a wedged FFI thread. + +proc sendRequestToFFIThread*( + ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration +): Result[void, string] = + # Reentrancy guard: only this thread can fire `reqReceivedSignal`, so a handler dispatching back would self-deadlock. + if onFFIThread: + deleteRequest(ffiRequest) + return err( + "reentrant ffi call: a handler invoked sendRequestToFFIThread on its own context" + ) + + # All async submissions serialise on `ctx.lock` for the full + # trySend + fireSync + waitSync sequence because `reqChannel` is + # single-producer and `reqReceivedSignal` is shared across callers. + # Multi-producer redesign is tracked as PR #23 review item 7. + ctx.lock.acquire() + defer: + ctx.lock.release() + + ## Sending the request + let sentOk = ctx.reqChannel.trySend(ffiRequest) + if not sentOk: + deleteRequest(ffiRequest) + return err("Couldn't send a request to the ffi thread") + + let fireSyncRes = ctx.reqSignal.fireSync() + if fireSyncRes.isErr(): + deleteRequest(ffiRequest) + return err("failed fireSync: " & $fireSyncRes.error) + + if fireSyncRes.get() == false: + deleteRequest(ffiRequest) + return err("Couldn't fireSync in time") + + ## wait until the FFI working thread properly received the request + let res = ctx.reqReceivedSignal.waitSync(timeout) + if res.isErr(): + ## Do not free ffiRequest here: the FFI thread was already signaled and + ## will process (and free) it. + return err("Couldn't receive reqReceivedSignal signal") + + ## Notice that in case of "ok", the deallocShared(req) is performed by the FFI Thread in the + ## process proc. + ok() + +proc processRequest[T]( + request: ptr FFIThreadRequest, ctx: ptr FFIContext[T] +) {.async.} = + ## Invoked within the FFI thread to process a request coming from the FFI API consumer thread. + + let reqId = $request[].reqId + ## The reqId determines which proc will handle the request. + ## The registeredRequests represents a table defined at compile time. + ## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously] + + ## Explicit conversion keeps `reqId` alive as the backing string, + ## avoiding the implicit string→cstring warning that will become an error. + let reqIdCs = reqId.cstring + + let retFut = + if not ctx[].registeredRequests[].contains(reqIdCs): + ## That shouldn't happen because only registered requests should be sent to the FFI thread. + nilProcess(request[].reqId) + else: + ctx[].registeredRequests[][reqIdCs](cast[pointer](request), ctx) + + ## Catch every catchable exception (including CancelledError raised by + ## the shutdown drain in ffiRun) so handleRes — and its `deleteRequest` + ## defer — always runs. Otherwise an abandoned in-flight handler would + ## leak its request envelope, reqId copy, and CBOR payload. + let res = + try: + await retFut + except CatchableError as exc: + Result[seq[byte], string].err( + "Error in processRequest for " & reqId & ": " & exc.msg + ) + + ## handleRes may raise (OOM, GC setup) even though it is rare. Catching here + ## keeps the async proc raises:[] compatible. The defer inside handleRes + ## guarantees request is freed before the exception propagates. + try: + handleRes(res, request) + except Exception as exc: + error "Unexpected exception in handleRes", error = exc.msg + +proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = + ## FFI thread body that attends library user API requests + ffiCurrentEventRegistry = addr ctx[].eventRegistry + onFFIThread = true + + logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT) + + defer: + onFFIThread = false + # Unblocks destroyFFIContext's bounded wait so cleanup can proceed. + let fireRes = ctx.threadExitSignal.fireSync() + if fireRes.isErr(): + error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error + + let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} = + var ffiReqHandler: T + ## Holds the main library object, i.e., in charge of handling the ffi requests. + ## e.g., Waku, LibP2P, SDS, etc. + + ## In-flight processRequest futures. Tracked so they can be drained on + ## shutdown — otherwise destroying the context while a handler is + ## awaiting (e.g. sleepAsync) abandons the future and leaks the + ## request's envelope/reqId/payload allocations. + var pending: seq[Future[void]] = @[] + + proc reapCompleted() = + var i = 0 + while i < pending.len: + if pending[i].finished(): + pending.del(i) + else: + inc i + + while ctx.running.load(): + # Freezes if a sync handler blocks the dispatcher; event thread reads to detect wedged FFI thread. + discard ctx.ffiHeartbeat.fetchAdd(1) + + reapCompleted() + + let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds) + if not gotSignal: + continue + + ## Wait for a request from the ffi consumer thread + var request: ptr FFIThreadRequest + if not ctx.reqChannel.tryRecv(request): + continue + + if ctx.myLib.isNil(): + ctx.myLib = addr ffiReqHandler + + ## Handle the request + pending.add processRequest(request, ctx) + + let fireRes = ctx.reqReceivedSignal.fireSync() + if fireRes.isErr(): + error "could not fireSync back to requester thread", error = fireRes.error + + ## Drain in-flight handlers so each request's `deleteRequest` runs + ## before we exit. Without this, abandoning a future mid-await would + ## leak the request allocations (visible to LSan; previously hidden + ## because Nim's pool allocator kept the chunks alive in the process). + reapCompleted() + if pending.len > 0: + try: + await allFutures(pending) + except CatchableError as exc: + error "draining pending FFI requests on shutdown raised", error = exc.msg + + waitFor ffiRun(ctx) diff --git a/ffi/internal/ffi_macro.nim b/ffi/internal/ffi_macro.nim index 4f9ffc5..5d3b605 100644 --- a/ffi/internal/ffi_macro.nim +++ b/ffi/internal/ffi_macro.nim @@ -7,9 +7,6 @@ when defined(ffiGenBindings): import ../codegen/cpp import ../codegen/cddl -# --------------------------------------------------------------------------- -# String helpers used by multiple macros -# --------------------------------------------------------------------------- proc isPtr(typ: NimNode): bool = ## True iff `typ` is a `ptr T` type expression — i.e. an `nnkPtrTy` AST node. @@ -600,9 +597,6 @@ macro ffiRaw*(prc: untyped): untyped = echo stmts.repr return stmts -# --------------------------------------------------------------------------- -# ffi macro — primary FFI proc / FFI type registration -# --------------------------------------------------------------------------- macro ffi*(prc: untyped): untyped = ## Simplified FFI macro — applies to procs or types. @@ -843,9 +837,6 @@ macro ffi*(prc: untyped): untyped = echo stmts.repr return stmts -# --------------------------------------------------------------------------- -# ffiCtor — constructor macro -# --------------------------------------------------------------------------- proc buildCtorRequestType( reqTypeName: NimNode, paramNames: seq[string], paramTypes: seq[NimNode] @@ -1257,9 +1248,6 @@ macro ffiCtor*(prc: untyped): untyped = echo stmts.repr return stmts -# --------------------------------------------------------------------------- -# ffiDtor — destructor macro -# --------------------------------------------------------------------------- macro ffiDtor*(prc: untyped): untyped = ## Defines a C-exported destructor that tears down the FFIContext after the @@ -1373,9 +1361,6 @@ macro ffiDtor*(prc: untyped): untyped = echo stmts.repr return stmts -# --------------------------------------------------------------------------- -# ffiEvent — library-initiated typed event -# --------------------------------------------------------------------------- macro ffiEvent*(wireName: static[string], prc: untyped): untyped = ## Declares a library-initiated event. The annotated proc has an empty @@ -1467,9 +1452,6 @@ macro ffiEvent*(wireName: static[string], prc: untyped): untyped = echo generated.repr return generated -# --------------------------------------------------------------------------- -# genBindings — codegen entry point -# --------------------------------------------------------------------------- macro genBindings*( outputDir: static[string] = ffiOutputDir, nimSrcRelPath: static[string] = ffiSrcPath diff --git a/tests/unit/test_event_dispatch.nim b/tests/unit/test_event_dispatch.nim index c5bd198..e896e11 100644 --- a/tests/unit/test_event_dispatch.nim +++ b/tests/unit/test_event_dispatch.nim @@ -64,7 +64,7 @@ proc callbackBytes(d: var CallbackData): seq[byte] = var bytes = newSeq[byte](d.msgLen) if d.msgLen > 0: copyMem(addr bytes[0], addr d.msg[0], d.msgLen) - return bytes + bytes ## A request that dispatches a typed CBOR event from inside the FFI ## thread and then returns ok — so the response callback can be used to @@ -243,9 +243,6 @@ when not defined(gcRefc): # actually landed so a silently-broken dispatch loop is caught. check evt.called -# --------------------------------------------------------------------------- -# Lock-during-invocation regression (issue #40 second concern) -# --------------------------------------------------------------------------- ## A foreign-thread mutation must not be able to invalidate the ## listener's `userData` while an in-flight dispatch is mid-invocation. @@ -310,3 +307,107 @@ suite "registry lock held during invocation": check st.exited.load() joinThread(thr) check done.load() + +suite "liveness events": + ## `onNotResponding` / `onResponding` bypass the event queue and dispatch + ## directly to listeners — the queue itself may be wedged behind the same + ## stall they're signalling. These tests pin down the wire shape (event + ## name + CBOR-encoded `EventEnvelope[…]`) so a future refactor can't + ## silently break consumers polling for the "library hung" signal. + test "onNotResponding delivers EventEnvelope[NotRespondingEvent] to subscribers": + var pool: FFIContextPool[TestEvtLib] + let ctx = pool.createFFIContext().valueOr: + check false + return + defer: + discard pool.destroyFFIContext(ctx) + + var evt: CallbackData + initCallbackData(evt) + defer: + deinitCallbackData(evt) + + discard addEventListener( + ctx[].eventRegistry, NotRespondingEventName, captureCb, addr evt + ) + + onNotResponding(ctx) + + waitCallback(evt) + check evt.retCode == RET_OK + let decoded = + cborDecode(callbackBytes(evt), EventEnvelope[NotRespondingEvent]) + check decoded.isOk() + check decoded.value.eventType == NotRespondingEventName + + test "onResponding delivers EventEnvelope[RespondingEvent] to subscribers": + var pool: FFIContextPool[TestEvtLib] + let ctx = pool.createFFIContext().valueOr: + check false + return + defer: + discard pool.destroyFFIContext(ctx) + + var evt: CallbackData + initCallbackData(evt) + defer: + deinitCallbackData(evt) + + discard addEventListener( + ctx[].eventRegistry, RespondingEventName, captureCb, addr evt + ) + + onResponding(ctx) + + waitCallback(evt) + check evt.retCode == RET_OK + let decoded = cborDecode(callbackBytes(evt), EventEnvelope[RespondingEvent]) + check decoded.isOk() + check decoded.value.eventType == RespondingEventName + + test "liveness events with no subscriber are a no-op": + var pool: FFIContextPool[TestEvtLib] + let ctx = pool.createFFIContext().valueOr: + check false + return + defer: + discard pool.destroyFFIContext(ctx) + # No listener registered — must not crash, must not block. + onNotResponding(ctx) + onResponding(ctx) + +suite "event thread drains queued events": + ## The event thread wakes every `EventThreadTickInterval` (or on + ## `eventQueueSignal`, not exported) and drains `eventQueue` into the + ## registered listeners. This test pushes a c_malloc'd payload onto the + ## queue from the test thread and waits for the tick-driven drain to + ## deliver it — exercises the `tryEnqueueEvent` → `drainEventQueue` → + ## `dispatchQueuedEvent` → listener path end-to-end. + test "enqueued event is delivered to subscriber within a tick": + var pool: FFIContextPool[TestEvtLib] + let ctx = pool.createFFIContext().valueOr: + check false + return + defer: + discard pool.destroyFFIContext(ctx) + + var evt: CallbackData + initCallbackData(evt) + defer: + deinitCallbackData(evt) + + const QueuedEvtName = "queued_evt" + discard addEventListener( + ctx[].eventRegistry, QueuedEvtName, captureCb, addr evt + ) + + # `tryEnqueueEvent` takes ownership of both buffers on success; the + # event thread c_frees them after dispatch returns. + let nameBuf = alloc(QueuedEvtName) + let payload = @[byte 0xDE, 0xAD, 0xBE, 0xEF] + var shared = allocSharedSeq(payload) + check tryEnqueueEvent(ctx[].eventQueue, nameBuf, shared.data, shared.len) + + waitCallback(evt) + check evt.retCode == RET_OK + check callbackBytes(evt) == payload diff --git a/tests/unit/test_event_listener.nim b/tests/unit/test_event_listener.nim index 620c385..4cf15aa 100644 --- a/tests/unit/test_event_listener.nim +++ b/tests/unit/test_event_listener.nim @@ -10,12 +10,10 @@ import std/locks import unittest2 import ffi -# --------------------------------------------------------------------------- # Tiny helpers — a thread-safe sink each listener writes into so we can # assert which callbacks would fire and in what order once dispatch lands. # Today only `tagCb`'s presence is exercised; the recorder is also used to # make sure listener bookkeeping doesn't accidentally invoke callbacks. -# --------------------------------------------------------------------------- type Recorder = object lock: Lock @@ -50,9 +48,6 @@ proc tagCb( copyMem(addr payload[0], msg, int(len)) record(t[].rec[], t[].name, retCode, payload) -# --------------------------------------------------------------------------- -# Tests -# --------------------------------------------------------------------------- suite "FFIEventRegistry mutation": test "addEventListener assigns monotonically increasing non-zero ids": diff --git a/tests/unit/test_ffi_context.nim b/tests/unit/test_ffi_context.nim index f33f760..ec28ccb 100644 --- a/tests/unit/test_ffi_context.nim +++ b/tests/unit/test_ffi_context.nim @@ -324,9 +324,6 @@ suite "sendRequestToFFIThread": check d.retCode == RET_OK check cborDecode(callbackBytes(d), string).value == "pong:" & msg -# --------------------------------------------------------------------------- -# ffiCtor / .ffi. macros — exercise the full CBOR transport -# --------------------------------------------------------------------------- type SimpleLib = object value: int @@ -375,9 +372,6 @@ suite "ffiCtor macro": check SimpleLibFFIPool.destroyFFIContext(ctx).isOk() -# --------------------------------------------------------------------------- -# Simplified .ffi. macro integration test -# --------------------------------------------------------------------------- type SendConfig {.ffi.} = object message: string @@ -468,10 +462,8 @@ suite "sync-body .ffi. is dispatched on FFI thread": check d2.retCode == RET_OK check cborDecode(callbackBytes(d2), string).value == "v3" -# --------------------------------------------------------------------------- # Nim-native API (no callbacks, no CBOR buffers): the original proc name # resolves to the user's declared async signature and is callable directly. -# --------------------------------------------------------------------------- suite "Nim-native .ffi. / .ffiCtor. API": test "user proc names retain their declared Future[Result[T,string]] shape": @@ -492,14 +484,12 @@ suite "Nim-native .ffi. / .ffiCtor. API": check ctorRes.isOk check ctorRes.value.value == 21 -# --------------------------------------------------------------------------- # Regression for PR #23 review items 1–5: a `.ffi.` body without `await` # used to be emitted as an inline-on-foreign-thread fast path, which bypassed # `foreignThreadGc`, `ctx.lock`, and chronos's single-thread invariant. The # sync fast-path was deleted; this test records `getThreadId()` inside a # sync body and asserts the handler runs on the FFI thread, not on the # caller's thread. -# --------------------------------------------------------------------------- var gRecordedHandlerTid: Atomic[int] @@ -558,13 +548,11 @@ suite "sync-body .ffi. runs on FFI thread (PR #23 regression)": # And the callback payload (the recorded tid) matches what the handler stored. check cborDecode(callbackBytes(d), int).value == handlerTid -# --------------------------------------------------------------------------- # Regression for PR #23 review item 6: reentrancy guard on # sendRequestToFFIThread. A handler running on the FFI thread that tries to # dispatch back through sendRequestToFFIThread used to self-deadlock waiting # on `reqReceivedSignal` (which only the FFI thread can fire). The guard now # returns an Err immediately. -# --------------------------------------------------------------------------- var gReentrantNestedRes: Channel[string] gReentrantNestedRes.open() diff --git a/tests/unit/test_serial.nim b/tests/unit/test_serial.nim index 6061028..883df75 100644 --- a/tests/unit/test_serial.nim +++ b/tests/unit/test_serial.nim @@ -201,12 +201,10 @@ suite "CBOR error handling": let res = cborDecode(truncated, string) check res.isErr -# --------------------------------------------------------------------------- # Regression for PR #23 review item 9: cborEncodeShared writes directly into # a c_malloc buffer, letting the FFI thread request take ownership without # an intermediate seq[byte] copy. The shared-encoder must produce # byte-for-byte the same output as the seq-encoder. -# --------------------------------------------------------------------------- import system/ansi_c