diff --git a/examples/timer/rust_bindings/examples/main.rs b/examples/timer/rust_bindings/examples/main.rs index b951369..132b809 100644 --- a/examples/timer/rust_bindings/examples/main.rs +++ b/examples/timer/rust_bindings/examples/main.rs @@ -1,10 +1,8 @@ -//! Synchronous example: exercises the library-event listener API -//! (typed + wildcard + remove). +//! Synchronous example: exercises the typed per-event listener API. //! //! Run with: `cargo run --example main` -use my_timer::{decode_event_payload, EchoEvent, EchoRequest, MyTimerCtx, TimerConfig}; -use std::os::raw::c_int; +use my_timer::{EchoEvent, EchoRequest, MyTimerCtx, TimerConfig}; use std::sync::mpsc; use std::time::Duration; @@ -14,32 +12,12 @@ fn main() -> Result<(), String> { Duration::from_secs(5), )?; - // Typed listener: the closure is invoked on the lib's dispatch - // thread, so forward the payload to `main` via std mpsc and block - // on `recv_timeout` below. `add_on_echo_fired_listener` is generated - // per `{.ffiEvent.}`-declared proc and takes a typed `&EchoEvent`. + // Closure runs on the lib's dispatch thread; forward to `main` via mpsc and recv_timeout below. let (tx, rx) = mpsc::channel::(); let typed_handle = ctx.add_on_echo_fired_listener(move |evt: &EchoEvent| { let _ = tx.send(evt.clone()); }); - // Wildcard listener: receives every event with the FFI return code, - // the wire `event_id` pre-extracted from the CBOR envelope, and the - // raw envelope bytes. Lift to a typed payload via - // `decode_event_payload::` when the event_id matches one you - // care about — this avoids hand-rolling ciborium calls per branch. - let wildcard_handle = ctx.add_event_listener(|ret: c_int, event_id: &str, envelope: &[u8]| { - println!("wildcard: ret={}, event_id={}, bytes={}", ret, event_id, envelope.len()); - if ret == 0 && event_id == "on_echo_fired" { - match decode_event_payload::(envelope) { - Ok(evt) => println!(" decoded: message={}, echo_count={}", evt.message, evt.echo_count), - Err(e) => println!(" decode failed: {}", e), - } - } - }); - - // Trigger the event — fires `on_echo_fired` once, which the - // dispatch thread delivers to both listeners above. ctx.echo(EchoRequest { message: "sync-event-demo".into(), delay_ms: 1 })?; match rx.recv_timeout(Duration::from_secs(2)) { @@ -48,6 +26,5 @@ fn main() -> Result<(), String> { } ctx.remove_event_listener(typed_handle); - ctx.remove_event_listener(wildcard_handle); Ok(()) } diff --git a/examples/timer/rust_bindings/examples/tokio_main.rs b/examples/timer/rust_bindings/examples/tokio_main.rs index 8ab1d16..feff8ea 100644 --- a/examples/timer/rust_bindings/examples/tokio_main.rs +++ b/examples/timer/rust_bindings/examples/tokio_main.rs @@ -1,11 +1,9 @@ -//! Tokio (async) example: same shape as `main.rs` but exercises the -//! async `_async` API and bridges library events into a tokio-aware -//! channel for async consumption. +//! Tokio (async) example: same shape as `main.rs` but exercises the async `_async` API +//! and bridges library events into a tokio mpsc for async consumption. //! //! Run with: `cargo run --example tokio_main` -use my_timer::{decode_event_payload, EchoEvent, EchoRequest, MyTimerCtx, TimerConfig}; -use std::os::raw::c_int; +use my_timer::{EchoEvent, EchoRequest, MyTimerCtx, TimerConfig}; use std::time::Duration; use tokio::sync::mpsc; @@ -17,37 +15,15 @@ async fn main() -> Result<(), String> { ) .await?; - // Typed listener: the handler fires on the lib's dispatch thread, - // which is *outside* the tokio runtime. Forwarding through a tokio - // `unbounded_channel` (Sender is Send + Sync, non-blocking) hands - // the event over to the runtime so we can `.await` it below. + // Handler fires on the lib's dispatch thread (outside the tokio runtime); forward via tokio mpsc to await it below. let (typed_tx, mut typed_rx) = mpsc::unbounded_channel::(); let typed_handle = ctx.add_on_echo_fired_listener(move |evt: &EchoEvent| { let _ = typed_tx.send(evt.clone()); }); - // Wildcard listener: receives every event with the FFI return code, - // the wire `event_id` pre-extracted from the CBOR envelope, and the - // raw envelope bytes. Lift to a typed payload via - // `decode_event_payload::` when the event_id matches one you - // care about — this avoids hand-rolling ciborium calls per branch. - let wildcard_handle = ctx.add_event_listener(|ret: c_int, event_id: &str, envelope: &[u8]| { - println!("wildcard: ret={}, event_id={}, bytes={}", ret, event_id, envelope.len()); - if ret == 0 && event_id == "on_echo_fired" { - match decode_event_payload::(envelope) { - Ok(evt) => println!(" decoded: message={}, echo_count={}", evt.message, evt.echo_count), - Err(e) => println!(" decode failed: {}", e), - } - } - }); - - // Trigger an echo via the async API — fires `on_echo_fired` once, - // which the dispatch thread delivers to both listeners above. ctx.echo_async(EchoRequest { message: "async-event-demo".into(), delay_ms: 1 }) .await?; - // Await the typed event with a bounded timeout so a missing event - // surfaces as an error instead of hanging the example forever. let evt = tokio::time::timeout(Duration::from_secs(2), typed_rx.recv()) .await .map_err(|_| "event never arrived".to_string())? @@ -55,6 +31,5 @@ async fn main() -> Result<(), String> { println!("typed onEchoFired: message={}, echo_count={}", evt.message, evt.echo_count); ctx.remove_event_listener(typed_handle); - ctx.remove_event_listener(wildcard_handle); Ok(()) } diff --git a/ffi/cbor_serial.nim b/ffi/cbor_serial.nim index 5c1ae8f..0fc87ce 100644 --- a/ffi/cbor_serial.nim +++ b/ffi/cbor_serial.nim @@ -35,9 +35,6 @@ export cbor_serialization, options, results const CborNullByte*: byte = 0xf6'u8 ## CBOR encoding of `null` — used as the wire sentinel for empty OK payloads. -# --------------------------------------------------------------------------- -# Public API -# --------------------------------------------------------------------------- proc cborEncode*[T](x: T): seq[byte] = ## CBOR-encode any cbor_serialization-supported type (plus `pointer` / `ptr T` diff --git a/ffi/codegen/cpp.nim b/ffi/codegen/cpp.nim index 3401a83..f335c13 100644 --- a/ffi/codegen/cpp.nim +++ b/ffi/codegen/cpp.nim @@ -23,10 +23,8 @@ const proc genericInnerType(typeName, prefix: string): string = if typeName.startsWith(prefix) and typeName.endsWith("]"): - let start = prefix.len - let lastIndex = typeName.len - 2 - return typeName[start .. lastIndex] - return "" + return typeName[prefix.len .. typeName.len - 2] + "" proc nimTypeToCpp*(typeName: string): string = let trimmed = typeName.strip() @@ -56,7 +54,7 @@ proc stripLibPrefixCpp(procName, libName: string): string = let prefix = libName & "_" if procName.startsWith(prefix): return procName[prefix.len .. ^1] - return procName + procName proc reqStructName(p: FFIProcMeta): string = let camel = snakeToPascalCase(p.procName) @@ -134,7 +132,7 @@ proc cppBracedInit(structName: string, fieldNames: seq[string]): string = ## ## Empty `fieldNames` collapses cleanly because `join` on an empty seq ## returns "", so the result is the well-formed empty-init `Name{}`. - return structName & "{" & fieldNames.join(", ") & "}" + structName & "{" & fieldNames.join(", ") & "}" proc emitEventDispatcher( lines: var seq[string], ctxTypeName, libName: string, events: seq[FFIEventMeta] @@ -575,11 +573,11 @@ proc generateCppHeader*( lines.add("};") lines.add("") - return lines.join("\n") + lines.join("\n") proc generateCppCMakeLists*(libName: string, nimSrcRelPath: string): string = let src = nimSrcRelPath.replace("\\", "/") - return CMakeListsTpl.multiReplace(("{{LIB}}", libName), ("{{SRC}}", src)) + CMakeListsTpl.multiReplace(("{{LIB}}", libName), ("{{SRC}}", src)) proc generateCppBindings*( procs: seq[FFIProcMeta], diff --git a/ffi/codegen/rust.nim b/ffi/codegen/rust.nim index 6972954..39972b8 100644 --- a/ffi/codegen/rust.nim +++ b/ffi/codegen/rust.nim @@ -45,7 +45,7 @@ proc deriveLibName*(procs: seq[FFIProcMeta]): string = let parts = first.split('_') if parts.len > 0: return parts[0] - return "unknown" + "unknown" proc stripLibPrefix*(procName: string, libName: string): string = ## Strips the library prefix from a proc name. @@ -53,7 +53,7 @@ proc stripLibPrefix*(procName: string, libName: string): string = let prefix = libName & "_" if procName.startsWith(prefix): return procName[prefix.len .. ^1] - return procName + procName proc reqStructName(p: FFIProcMeta): string = ## Mirrors the Nim macro: Req or CtorReq for ctors. @@ -63,9 +63,6 @@ proc reqStructName(p: FFIProcMeta): string = else: camel & "Req" -# --------------------------------------------------------------------------- -# File generators -# --------------------------------------------------------------------------- proc generateCargoToml*(libName: string): string = # `flume` is the unified callback channel (PR #23 Rust review, item 8): one @@ -149,7 +146,7 @@ fn main() { [escapedSrc, libName] proc generateLibRs*(): string = - return """mod ffi; + """mod ffi; mod types; mod api; pub use types::*; @@ -224,7 +221,7 @@ proc generateFFIRs*(procs: seq[FFIProcMeta]): string = ) lines.add("}") - return lines.join("\n") & "\n" + lines.join("\n") & "\n" proc generateTypesRs*(types: seq[FFITypeMeta], procs: seq[FFIProcMeta]): string = ## Generates types.rs with Rust structs for all user-declared FFI types and @@ -270,7 +267,7 @@ proc generateTypesRs*(types: seq[FFITypeMeta], procs: seq[FFIProcMeta]): string lines.add("}") lines.add("") - return lines.join("\n") + lines.join("\n") proc generateApiRs*( procs: seq[FFIProcMeta], libName: string, events: seq[FFIEventMeta] = @[] @@ -783,7 +780,7 @@ proc generateApiRs*( lines.add("") lines.add("}") - return lines.join("\n") & "\n" + lines.join("\n") & "\n" proc generateRustCrate*( procs: seq[FFIProcMeta], diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index b2114dc..f9284d7 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -26,17 +26,13 @@ type FFIContext*[T] = object reqReceivedSignal: ThreadSignalPtr # to signal main thread, interfacing with the FFI thread, that FFI thread received the request stopSignal: ThreadSignalPtr - threadExitSignal: ThreadSignalPtr - # fired by ffiThread before exit; bounds destroyFFIContext's wait so - # a blocked event loop cannot hang the caller + threadExitSignal: ThreadSignalPtr # bounds destroyFFIContext's wait so a blocked loop cannot hang the caller eventQueueSignal: ThreadSignalPtr # wakes the event thread on enqueue eventThreadExitSignal: ThreadSignalPtr # mirrors threadExitSignal for the event thread userData*: pointer eventRegistry*: FFIEventRegistry eventQueue*: EventQueue - ffiHeartbeat*: Atomic[int64] - # advanced by the FFI thread each loop iteration; event thread reads it - # for liveness + ffiHeartbeat*: Atomic[int64] # advanced each FFI-thread loop; event thread reads for liveness eventQueueStuck*: Atomic[bool] # sticky overflow flag; recovery is destroy+recreate running: Atomic[bool] # To control when the threads are running registeredRequests: ptr Table[cstring, FFIRequestProc] @@ -58,53 +54,53 @@ type NotRespondingEvent* = object const NotRespondingEventName* = "not_responding" -proc onNotResponding*(ctx: ptr FFIContext) = - ## Bypasses the event queue (which may itself be wedged). Cannot reuse - ## `dispatchFFIEventCbor`: that template reads `ffiCurrentEventRegistry`, - ## a threadvar only set on the FFI thread, but this runs on the event thread. +proc encodeNotRespondingEvent(): seq[byte] = + EventEnvelope[NotRespondingEvent]( + eventType: NotRespondingEventName, payload: NotRespondingEvent() + ).cborEncode() + +proc dispatchToListeners[T]( + ctx: ptr FFIContext[T], eventName: string, data: pointer, dataLen: int +) = + ## Holds reg.lock for the entire snapshot + invocation so concurrent + ## add/remove on this registry blocks until dispatch returns. withLock ctx[].eventRegistry.lock: - let snap = ctx[].eventRegistry.byEvent.getOrDefault(NotRespondingEventName) - if snap.len == 0: - chronicles.debug "onNotResponding - no listener registered" + let listeners = ctx[].eventRegistry.byEvent.getOrDefault(eventName) + if listeners.len == 0: + chronicles.debug "no listener registered", event = eventName return foreignThreadGc: try: - let event = cborEncode( - EventEnvelope[NotRespondingEvent]( - eventType: NotRespondingEventName, payload: NotRespondingEvent() - ) - ) - for listener in snap: - listener.callback( - RET_OK, - cast[ptr cchar](unsafeAddr event[0]), - cast[csize_t](event.len), - listener.userData, - ) + notifyListenersOk(listeners, data, dataLen) except Exception, CatchableError: - let msg = - "Exception dispatching " & NotRespondingEventName & ": " & - getCurrentExceptionMsg() - for listener in snap: - listener.callback( - RET_ERR, - cast[ptr cchar](unsafeAddr msg[0]), - cast[csize_t](msg.len), - listener.userData, - ) + notifyListenersErr( + listeners, + "Exception dispatching " & eventName & ": " & getCurrentExceptionMsg(), + ) + +proc onNotResponding*(ctx: ptr FFIContext) = + ## Bypasses the event queue (which may itself be wedged) and dispatches + ## directly to listeners. Runs on the event thread. + let event = + try: + encodeNotRespondingEvent() + except CatchableError as exc: + chronicles.error "onNotResponding - encode failed", err = exc.msg + return + let dataPtr: pointer = + if event.len > 0: unsafeAddr event[0] + else: nil + ctx.dispatchToListeners(NotRespondingEventName, dataPtr, event.len) proc sendRequestToFFIThread*( ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration ): Result[void, string] = - # Once the event queue overflows, refuse further requests. onNotResponding - # is fired by the event thread (not here) to avoid deadlocking against a - # back-pressuring listener that holds reg.lock. + # Event-queue overflow refuses further requests; the event thread fires onNotResponding to avoid deadlocking on reg.lock here. if ctx.eventQueueStuck.load(): deleteRequest(ffiRequest) return err("event queue stuck - library cannot accept new requests") - # Reentrancy guard: a handler dispatching back through this proc would - # wait forever on `reqReceivedSignal` — only this thread can fire it. + # Reentrancy guard: only this thread can fire `reqReceivedSignal`, so a handler dispatching back would self-deadlock. if onFFIThread: deleteRequest(ffiRequest) return err( @@ -143,7 +139,7 @@ proc sendRequestToFFIThread*( ## Notice that in case of "ok", the deallocShared(req) is performed by the FFI Thread in the ## process proc. - return ok() + ok() proc processRequest[T]( request: ptr FFIThreadRequest, ctx: ptr FFIContext[T] @@ -208,8 +204,7 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = defer: onFFIThread = false - # Signal destroyFFIContext that this thread has exited, so its bounded - # wait can unblock and proceed with cleanup. + # Unblocks destroyFFIContext's bounded wait so cleanup can proceed. let fireRes = ctx.threadExitSignal.fireSync() if fireRes.isErr(): error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error @@ -234,8 +229,7 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = inc i while ctx.running.load(): - # A sync handler blocking the dispatcher freezes this counter; the - # event thread reads it to detect a wedged FFI thread. + # Freezes if a sync handler blocks the dispatcher; event thread reads to detect wedged FFI thread. discard ctx.ffiHeartbeat.fetchAdd(1) reapCompleted() @@ -272,87 +266,82 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = waitFor ffiRun(ctx) +proc freeQueuedEventPayload(qe: QueuedEvent) = + if not qe.name.isNil: + c_free(cast[pointer](qe.name)) + if not qe.data.isNil: + c_free(qe.data) + +proc dispatchQueuedEvent[T](ctx: ptr FFIContext[T], qe: QueuedEvent) = + ## Frees `qe`'s c_malloc buffers on exit. + defer: + freeQueuedEventPayload(qe) + ctx.dispatchToListeners($qe.name, qe.data, qe.dataLen) + +proc drainEventQueue[T](ctx: ptr FFIContext[T]) = + while true: + let opt = ctx.eventQueue.tryDequeueEvent() + if opt.isNone: + break + ctx.dispatchQueuedEvent(opt.get()) + +type HeartbeatMonitor = object + startedAt: Moment + lastChange: Moment + lastValue: int64 + notifiedStale: bool + +proc initHeartbeatMonitor[T](ctx: ptr FFIContext[T]): HeartbeatMonitor = + let now = Moment.now() + HeartbeatMonitor( + startedAt: now, + lastChange: now, + lastValue: ctx.ffiHeartbeat.load(), + notifiedStale: false, + ) + +proc check[T](hb: var HeartbeatMonitor, ctx: ptr FFIContext[T]) = + ## Fires onNotResponding once the FFI thread's heartbeat counter stops + ## advancing past the stale threshold. Latches until it moves again. + if Moment.now() - hb.startedAt <= FFIHeartbeatStartDelay: + return + let cur = ctx.ffiHeartbeat.load() + if cur != hb.lastValue: + hb.lastValue = cur + hb.lastChange = Moment.now() + hb.notifiedStale = false + elif not hb.notifiedStale and + Moment.now() - hb.lastChange > FFIHeartbeatStaleThreshold: + onNotResponding(ctx) + hb.notifiedStale = true + +proc eventRun[T](ctx: ptr FFIContext[T]) {.async.} = + var hb = initHeartbeatMonitor(ctx) + var notifiedStuck = false + + while ctx.running.load(): + # Wake on enqueue or tick — whichever first. + discard await ctx.eventQueueSignal.wait().withTimeout(EventThreadTickInterval) + + ctx.drainEventQueue() + + # Fires here (after drain releases reg.lock) — from the FFI thread it'd deadlock on a back-pressuring listener. + if not notifiedStuck and ctx.eventQueueStuck.load(): + onNotResponding(ctx) + notifiedStuck = true + + if not ctx.running.load(): + break + hb.check(ctx) + proc eventThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = ## Drains the event queue and runs the FFI-thread heartbeat check. ## Owns the queued `c_malloc` payloads until dispatch returns. - defer: let fireRes = ctx.eventThreadExitSignal.fireSync() if fireRes.isErr(): error "failed to fire eventThreadExitSignal", err = fireRes.error - let eventRun = proc(ctx: ptr FFIContext[T]) {.async.} = - let startedAt = Moment.now() - var lastHeartbeat = ctx.ffiHeartbeat.load() - var lastHeartbeatChange = Moment.now() - var notifiedStale = false - var notifiedStuck = false - - while ctx.running.load(): - # Wake on enqueue or tick — whichever first. - discard await ctx.eventQueueSignal.wait().withTimeout(EventThreadTickInterval) - - # Listener fan-out runs under reg.lock — preserves the - # lock-during-invocation contract from PR #39 / issue #40. - while true: - let opt = ctx.eventQueue.tryDequeueEvent() - if opt.isNone: - break - let qe = opt.get() - defer: - if not qe.name.isNil: - c_free(cast[pointer](qe.name)) - if not qe.data.isNil: - c_free(qe.data) - - withLock ctx[].eventRegistry.lock: - let snap = ctx[].eventRegistry.byEvent.getOrDefault($qe.name) - if snap.len == 0: - chronicles.debug "event has no listeners", event = $qe.name - else: - foreignThreadGc: - try: - for listener in snap: - listener.callback( - RET_OK, - cast[ptr cchar](qe.data), - cast[csize_t](qe.dataLen), - listener.userData, - ) - except Exception, CatchableError: - let msg = - "Exception dispatching " & $qe.name & ": " & getCurrentExceptionMsg() - for listener in snap: - listener.callback( - RET_ERR, - cast[ptr cchar](unsafeAddr msg[0]), - cast[csize_t](msg.len), - listener.userData, - ) - - # Overflow notification fires here (after drain releases reg.lock) - # rather than from the FFI thread, which would deadlock against an - # in-flight back-pressuring listener. - if not notifiedStuck and ctx.eventQueueStuck.load(): - onNotResponding(ctx) - notifiedStuck = true - - if not ctx.running.load(): - break - if Moment.now() - startedAt <= FFIHeartbeatStartDelay: - continue - - let cur = ctx.ffiHeartbeat.load() - if cur != lastHeartbeat: - lastHeartbeat = cur - lastHeartbeatChange = Moment.now() - notifiedStale = false - elif not notifiedStale and - Moment.now() - lastHeartbeatChange > FFIHeartbeatStaleThreshold: - # Latch until the FFI thread proves it's alive again. - onNotResponding(ctx) - notifiedStale = true - try: waitFor eventRun(ctx) except CatchableError as exc: @@ -406,20 +395,19 @@ proc deinitContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = if not ctx.eventThreadExitSignal.isNil(): ?ctx.eventThreadExitSignal.close() ctx.eventThreadExitSignal = nil - return ok() + ok() proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] = ## Full cleanup for heap-allocated contexts: closes all resources and frees memory. defer: freeShared(ctx) - return ctx.deinitContextResources() + ctx.deinitContextResources() proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = ## Initialises all resources inside an already-allocated FFIContext slot. ## On failure every partially-initialised resource is closed; the caller ## is responsible for releasing the slot (freeShared or pool.releaseSlot). - # Defensive nil so the deferred cleanup never double-closes if a future - # path forgets to clear stale pointers on a reused pool slot. + # Defensive nil: deferred cleanup must never double-close stale pointers on a reused pool slot. ctx.reqSignal = nil ctx.reqReceivedSignal = nil ctx.stopSignal = nil @@ -480,7 +468,7 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = return err("failed to create the event thread: " & getCurrentExceptionMsg()) success = true - return ok() + ok() proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] = # Error paths intentionally skip onNotResponding: a back-pressuring @@ -501,7 +489,7 @@ proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] = error "failed to signal eventQueueSignal in signalStop", error = evtSignaled.error elif evtSignaled.get() == false: error "failed to signal eventQueueSignal on time in signalStop" - return ok() + ok() ## If the FFI thread's event loop is blocked by a synchronous handler ## (e.g. blocking I/O), it cannot process reqSignal in time to exit. @@ -535,7 +523,7 @@ proc stopAndJoinThreads*[T](ctx: ptr FFIContext[T]): Result[void, string] = return err("event thread did not exit in time; leaking ctx to avoid hang") joinThread(ctx.eventThread) - return ok() + ok() proc clearContext[T](ctx: ptr FFIContext[T]): Result[void, string] = ## Stops the FFI context that was created via createFFIContext[T]() (heap). @@ -543,4 +531,4 @@ proc clearContext[T](ctx: ptr FFIContext[T]): Result[void, string] = return err("clearContext: " & $error) ctx.cleanUpResources().isOkOr: return err("cleanUpResources failed: " & $error) - return ok() + ok() diff --git a/ffi/ffi_events.nim b/ffi/ffi_events.nim index 8e54c9c..96cc0cd 100644 --- a/ffi/ffi_events.nim +++ b/ffi/ffi_events.nim @@ -1,8 +1,7 @@ ## Event registry, bounded SPSC event queue, and dispatch templates for -## FFI library-initiated events. Empty event name `""` registers a -## wildcard listener that receives every dispatched event. Queue -## payloads travel via `c_malloc` so transfer across Nim heaps is safe -## under both `--mm:orc` and `--mm:refc`. +## FFI library-initiated events. Listeners receive only the event name +## they subscribed to. Queue payloads travel via `c_malloc` so transfer +## across Nim heaps is safe under both `--mm:orc` and `--mm:refc`. {.pragma: callback, cdecl, raises: [], gcsafe.} @@ -11,9 +10,6 @@ import std/[atomics, locks, sequtils, options, tables] import chronicles import ./ffi_types, ./cbor_serial -# --------------------------------------------------------------------------- -# Wire envelope -# --------------------------------------------------------------------------- type EventEnvelope*[T] = object ## Standard wire shape for CBOR-encoded FFI events: @@ -22,9 +18,6 @@ type EventEnvelope*[T] = object eventType*: string payload*: T -# --------------------------------------------------------------------------- -# Registry types -# --------------------------------------------------------------------------- type FFIEventListener* = object @@ -40,9 +33,6 @@ type nextId*: uint64 ## Monotonic id source. 0 is reserved as "invalid"; ids start at 1. byEvent*: Table[string, seq[FFIEventListener]] -# --------------------------------------------------------------------------- -# Registry lifecycle and mutation -# --------------------------------------------------------------------------- proc initEventRegistry*(reg: var FFIEventRegistry) = ## Must be called exactly once on the owning thread before the registry @@ -89,7 +79,7 @@ proc addEventListener*( let listener = FFIEventListener(id: assigned, callback: callback, userData: userData) reg.byEvent.mgetOrPut(eventName, @[]).add(listener) - return assigned + assigned proc removeEventListener*(reg: var FFIEventRegistry, id: uint64): bool {.raises: [].} = ## Removes the listener with `id`. Returns true on success, false if no @@ -116,7 +106,7 @@ proc removeEventListener*(reg: var FFIEventRegistry, id: uint64): bool {.raises: break if prune: reg.byEvent.del(pruneKey) - return removed + removed proc removeAllEventListeners*(reg: var FFIEventRegistry) {.raises: [].} = ## Drops every registered listener. Does not reset the listener-id @@ -132,17 +122,13 @@ proc snapshotListeners*( ## makes re-entrant add/remove from inside a handler deadlock-free: ## dispatch holds the lock only for the duration of the copy, then ## iterates the copy outside the lock. - var snap: seq[FFIEventListener] = @[] + var listeners: seq[FFIEventListener] = @[] withLock reg.lock: - # `getOrDefault` returns an empty seq when the key is absent — - # avoids the raising `[]` operator path. + # `getOrDefault` avoids the raising `[]` path; returns empty when absent. for l in reg.byEvent.getOrDefault(eventName): - snap.add(l) - return snap + listeners.add(l) + listeners -# --------------------------------------------------------------------------- -# Bounded event queue -# --------------------------------------------------------------------------- const EventQueueCapacity* = 1024 ## ~24 KiB per context. Sustained backlog at this depth means a @@ -199,7 +185,7 @@ proc tryEnqueueEvent*( q.buf[q.tail] = QueuedEvent(name: name, data: data, dataLen: dataLen) q.tail = (q.tail + 1) mod EventQueueCapacity q.count.inc() - return true + true proc tryDequeueEvent*(q: var EventQueue): Option[QueuedEvent] {.raises: [], gcsafe.} = ## Transfers buffer ownership to the caller, who must `c_free` both. @@ -216,9 +202,28 @@ proc eventQueueLen*(q: var EventQueue): int {.raises: [], gcsafe.} = withLock q.lock: return q.count -# --------------------------------------------------------------------------- -# Dispatch templates (used by {.ffiEvent.}-generated procs) -# --------------------------------------------------------------------------- + +proc notifyListenersOk*( + listeners: seq[FFIEventListener], data: pointer, dataLen: int +) = + ## Fans out a successful payload to every listener in the snapshot. + let dataPtr = + if dataLen > 0: cast[ptr cchar](data) + else: nil + for listener in listeners: + listener.callback( + RET_OK, dataPtr, cast[csize_t](dataLen), listener.userData + ) + +proc notifyListenersErr*(listeners: seq[FFIEventListener], msg: string) = + ## Fans out an error message to every listener in the snapshot. + let dataPtr = + if msg.len > 0: cast[ptr cchar](unsafeAddr msg[0]) + else: nil + for listener in listeners: + listener.callback( + RET_ERR, dataPtr, cast[csize_t](msg.len), listener.userData + ) var ffiCurrentEventRegistry* {.threadvar.}: ptr FFIEventRegistry var ffiCurrentEventQueue* {.threadvar.}: ptr EventQueue @@ -244,51 +249,39 @@ template withFFIEventDispatch(eventName: string, listeners, body: untyped) = try: body except Exception, CatchableError: - let msg = - "Exception dispatching " & eventName & ": " & getCurrentExceptionMsg() - for listener in listeners: - listener.callback( - RET_ERR, - cast[ptr cchar](unsafeAddr msg[0]), - cast[csize_t](len(msg)), - listener.userData, - ) + notifyListenersErr( + listeners, + "Exception dispatching " & eventName & ": " & getCurrentExceptionMsg(), + ) template dispatchFFIEvent*(eventName: string, body: untyped) = ## Dispatches an FFI event to every listener subscribed to `eventName`. ## `body` must yield a `string` or `seq[byte]`. ## - ## Valid only on the FFI thread (where `ffiCurrentEventRegistry` is - ## set). Holds `reg.lock` for the entire snapshot + invocation so a - ## concurrent `removeEventListener` from a foreign thread blocks until - ## dispatch returns — closes the UAF window in #40 / PR #39 review - ## #4356915554. Handlers must not call addEventListener / - ## removeEventListener on the same registry (would self-deadlock). + ## Valid only on the FFI thread (where `ffiCurrentEventRegistry` is set). + ## Holds `reg.lock` for the entire snapshot + invocation so a concurrent + ## `removeEventListener` from a foreign thread blocks until dispatch + ## returns. Handlers must not call addEventListener / removeEventListener + ## on the same registry (would self-deadlock). withFFIEventDispatch(eventName, listeners): let event = body - for listener in listeners: - listener.callback( - RET_OK, - cast[ptr cchar](unsafeAddr event[0]), - cast[csize_t](len(event)), - listener.userData, - ) + let dataPtr: pointer = + if event.len > 0: unsafeAddr event[0] + else: nil + notifyListenersOk(listeners, dataPtr, event.len) template dispatchFFIEventCbor*(eventName: string, eventPayload: typed) = ## Typed CBOR variant of `dispatchFFIEvent`. Wraps `eventPayload` in an ## `EventEnvelope`, CBOR-encodes it into a `c_malloc` buffer once, and ## fans the same buffer out to every registered listener. ## - ## NB: the template parameter is intentionally named `eventPayload` - ## rather than `payload` — Nim's template substitution would otherwise - ## also replace the `payload:` field name inside `EventEnvelope`. + ## NB: parameter is `eventPayload`, not `payload` — Nim's template + ## substitution would otherwise also rewrite the `payload:` field inside + ## `EventEnvelope`. withFFIEventDispatch(eventName, listeners): var (data, dataLen) = cborEncodeShared( EventEnvelope[typeof(eventPayload)](eventType: eventName, payload: eventPayload) ) defer: cborFreeShared(data) - for listener in listeners: - listener.callback( - RET_OK, cast[ptr cchar](data), cast[csize_t](dataLen), listener.userData - ) + notifyListenersOk(listeners, data, dataLen) diff --git a/ffi/internal/ffi_macro.nim b/ffi/internal/ffi_macro.nim index 4f9ffc5..5d3b605 100644 --- a/ffi/internal/ffi_macro.nim +++ b/ffi/internal/ffi_macro.nim @@ -7,9 +7,6 @@ when defined(ffiGenBindings): import ../codegen/cpp import ../codegen/cddl -# --------------------------------------------------------------------------- -# String helpers used by multiple macros -# --------------------------------------------------------------------------- proc isPtr(typ: NimNode): bool = ## True iff `typ` is a `ptr T` type expression — i.e. an `nnkPtrTy` AST node. @@ -600,9 +597,6 @@ macro ffiRaw*(prc: untyped): untyped = echo stmts.repr return stmts -# --------------------------------------------------------------------------- -# ffi macro — primary FFI proc / FFI type registration -# --------------------------------------------------------------------------- macro ffi*(prc: untyped): untyped = ## Simplified FFI macro — applies to procs or types. @@ -843,9 +837,6 @@ macro ffi*(prc: untyped): untyped = echo stmts.repr return stmts -# --------------------------------------------------------------------------- -# ffiCtor — constructor macro -# --------------------------------------------------------------------------- proc buildCtorRequestType( reqTypeName: NimNode, paramNames: seq[string], paramTypes: seq[NimNode] @@ -1257,9 +1248,6 @@ macro ffiCtor*(prc: untyped): untyped = echo stmts.repr return stmts -# --------------------------------------------------------------------------- -# ffiDtor — destructor macro -# --------------------------------------------------------------------------- macro ffiDtor*(prc: untyped): untyped = ## Defines a C-exported destructor that tears down the FFIContext after the @@ -1373,9 +1361,6 @@ macro ffiDtor*(prc: untyped): untyped = echo stmts.repr return stmts -# --------------------------------------------------------------------------- -# ffiEvent — library-initiated typed event -# --------------------------------------------------------------------------- macro ffiEvent*(wireName: static[string], prc: untyped): untyped = ## Declares a library-initiated event. The annotated proc has an empty @@ -1467,9 +1452,6 @@ macro ffiEvent*(wireName: static[string], prc: untyped): untyped = echo generated.repr return generated -# --------------------------------------------------------------------------- -# genBindings — codegen entry point -# --------------------------------------------------------------------------- macro genBindings*( outputDir: static[string] = ffiOutputDir, nimSrcRelPath: static[string] = ffiSrcPath diff --git a/tests/unit/test_event_dispatch.nim b/tests/unit/test_event_dispatch.nim index c5bd198..dfc2cac 100644 --- a/tests/unit/test_event_dispatch.nim +++ b/tests/unit/test_event_dispatch.nim @@ -64,7 +64,7 @@ proc callbackBytes(d: var CallbackData): seq[byte] = var bytes = newSeq[byte](d.msgLen) if d.msgLen > 0: copyMem(addr bytes[0], addr d.msg[0], d.msgLen) - return bytes + bytes ## A request that dispatches a typed CBOR event from inside the FFI ## thread and then returns ok — so the response callback can be used to @@ -243,9 +243,6 @@ when not defined(gcRefc): # actually landed so a silently-broken dispatch loop is caught. check evt.called -# --------------------------------------------------------------------------- -# Lock-during-invocation regression (issue #40 second concern) -# --------------------------------------------------------------------------- ## A foreign-thread mutation must not be able to invalidate the ## listener's `userData` while an in-flight dispatch is mid-invocation. diff --git a/tests/unit/test_event_listener.nim b/tests/unit/test_event_listener.nim index 620c385..4cf15aa 100644 --- a/tests/unit/test_event_listener.nim +++ b/tests/unit/test_event_listener.nim @@ -10,12 +10,10 @@ import std/locks import unittest2 import ffi -# --------------------------------------------------------------------------- # Tiny helpers — a thread-safe sink each listener writes into so we can # assert which callbacks would fire and in what order once dispatch lands. # Today only `tagCb`'s presence is exercised; the recorder is also used to # make sure listener bookkeeping doesn't accidentally invoke callbacks. -# --------------------------------------------------------------------------- type Recorder = object lock: Lock @@ -50,9 +48,6 @@ proc tagCb( copyMem(addr payload[0], msg, int(len)) record(t[].rec[], t[].name, retCode, payload) -# --------------------------------------------------------------------------- -# Tests -# --------------------------------------------------------------------------- suite "FFIEventRegistry mutation": test "addEventListener assigns monotonically increasing non-zero ids": diff --git a/tests/unit/test_ffi_context.nim b/tests/unit/test_ffi_context.nim index f33f760..ec28ccb 100644 --- a/tests/unit/test_ffi_context.nim +++ b/tests/unit/test_ffi_context.nim @@ -324,9 +324,6 @@ suite "sendRequestToFFIThread": check d.retCode == RET_OK check cborDecode(callbackBytes(d), string).value == "pong:" & msg -# --------------------------------------------------------------------------- -# ffiCtor / .ffi. macros — exercise the full CBOR transport -# --------------------------------------------------------------------------- type SimpleLib = object value: int @@ -375,9 +372,6 @@ suite "ffiCtor macro": check SimpleLibFFIPool.destroyFFIContext(ctx).isOk() -# --------------------------------------------------------------------------- -# Simplified .ffi. macro integration test -# --------------------------------------------------------------------------- type SendConfig {.ffi.} = object message: string @@ -468,10 +462,8 @@ suite "sync-body .ffi. is dispatched on FFI thread": check d2.retCode == RET_OK check cborDecode(callbackBytes(d2), string).value == "v3" -# --------------------------------------------------------------------------- # Nim-native API (no callbacks, no CBOR buffers): the original proc name # resolves to the user's declared async signature and is callable directly. -# --------------------------------------------------------------------------- suite "Nim-native .ffi. / .ffiCtor. API": test "user proc names retain their declared Future[Result[T,string]] shape": @@ -492,14 +484,12 @@ suite "Nim-native .ffi. / .ffiCtor. API": check ctorRes.isOk check ctorRes.value.value == 21 -# --------------------------------------------------------------------------- # Regression for PR #23 review items 1–5: a `.ffi.` body without `await` # used to be emitted as an inline-on-foreign-thread fast path, which bypassed # `foreignThreadGc`, `ctx.lock`, and chronos's single-thread invariant. The # sync fast-path was deleted; this test records `getThreadId()` inside a # sync body and asserts the handler runs on the FFI thread, not on the # caller's thread. -# --------------------------------------------------------------------------- var gRecordedHandlerTid: Atomic[int] @@ -558,13 +548,11 @@ suite "sync-body .ffi. runs on FFI thread (PR #23 regression)": # And the callback payload (the recorded tid) matches what the handler stored. check cborDecode(callbackBytes(d), int).value == handlerTid -# --------------------------------------------------------------------------- # Regression for PR #23 review item 6: reentrancy guard on # sendRequestToFFIThread. A handler running on the FFI thread that tries to # dispatch back through sendRequestToFFIThread used to self-deadlock waiting # on `reqReceivedSignal` (which only the FFI thread can fire). The guard now # returns an Err immediately. -# --------------------------------------------------------------------------- var gReentrantNestedRes: Channel[string] gReentrantNestedRes.open() diff --git a/tests/unit/test_serial.nim b/tests/unit/test_serial.nim index 6061028..883df75 100644 --- a/tests/unit/test_serial.nim +++ b/tests/unit/test_serial.nim @@ -201,12 +201,10 @@ suite "CBOR error handling": let res = cborDecode(truncated, string) check res.isErr -# --------------------------------------------------------------------------- # Regression for PR #23 review item 9: cborEncodeShared writes directly into # a c_malloc buffer, letting the FFI thread request take ownership without # an intermediate seq[byte] copy. The shared-encoder must produce # byte-for-byte the same output as the seq-encoder. -# --------------------------------------------------------------------------- import system/ansi_c