From 45d5414323e69a6c7f731c8169af354010977f2b Mon Sep 17 00:00:00 2001 From: Gabriel Cruz Date: Fri, 5 Jun 2026 14:20:01 -0300 Subject: [PATCH] chore: reduce coments, reuse code --- ffi/ffi_context.nim | 223 ++++++------------ ffi/ffi_context_pool.nim | 30 +-- ffi/ffi_events.nim | 154 +++++-------- tests/unit/test_event_dispatch.nim | 319 ++++++++++---------------- tests/unit/test_event_listener.nim | 101 +++----- tests/unit/test_event_thread.nim | 355 ++++++++++------------------- 6 files changed, 401 insertions(+), 781 deletions(-) diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index 126111b..7bb701a 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -1,48 +1,38 @@ {.passc: "-fPIC".} -import system/ansi_c import std/[atomics, locks, options, tables] import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results -import - ./ffi_types, ./ffi_events, ./ffi_thread_request, ./internal/ffi_macro, ./logging, - ./cbor_serial +import ./ffi_types, ./ffi_events, ./ffi_thread_request, ./logging, ./cbor_serial export ffi_events type FFIContext*[T] = object - myLib*: ptr T - # main library object (e.g., Waku, LibP2P, SDS, the one to be exposed as a library) + myLib*: ptr T # main library object (Waku, LibP2P, SDS, …) ffiThread: Thread[(ptr FFIContext[T])] - # represents the main FFI thread in charge of attending API consumer actions eventThread: Thread[(ptr FFIContext[T])] - # drains the event queue and runs the FFI-thread heartbeat check lock: Lock reqChannel: ChannelSPSCSingle[ptr FFIThreadRequest] - reqSignal: ThreadSignalPtr # to notify the FFI Thread that a new request is sent + reqSignal: ThreadSignalPtr reqReceivedSignal: ThreadSignalPtr - # to signal main thread, interfacing with the FFI thread, that FFI thread received the request stopSignal: ThreadSignalPtr - threadExitSignal: ThreadSignalPtr # bounds destroyFFIContext's wait so a blocked loop cannot hang the caller - eventQueueSignal: ThreadSignalPtr # wakes the event thread on enqueue - eventThreadExitSignal: ThreadSignalPtr # mirrors threadExitSignal for the event thread + threadExitSignal: ThreadSignalPtr + eventQueueSignal: ThreadSignalPtr + eventThreadExitSignal: ThreadSignalPtr userData*: pointer eventRegistry*: FFIEventRegistry eventQueue*: EventQueue ffiHeartbeat*: Atomic[int64] # advanced each FFI-thread loop; event thread reads for liveness - eventQueueStuck*: Atomic[bool] # sticky overflow flag; recovery is destroy+recreate - running: Atomic[bool] # To control when the threads are running + eventQueueStuck*: Atomic[bool] # sticky overflow flag + running: Atomic[bool] registeredRequests: ptr Table[cstring, FFIRequestProc] - # Pointer to with the registered requests at compile time var onFFIThread* {.threadvar.}: bool - ## True while executing inside `ffiThreadBody`. Used by - ## `sendRequestToFFIThread` to detect re-entrant dispatch from a handler - ## (which would self-deadlock on `reqReceivedSignal`). + # Re-entrant dispatch guard for `sendRequestToFFIThread`. const git_version* {.strdefine.} = "n/a" const - EventThreadTickInterval* = 1.seconds # bounds idle heartbeat check latency + EventThreadTickInterval* = 1.seconds FFIHeartbeatStartDelay* = 10.seconds # grace window for library startup FFIHeartbeatStaleThreshold* = 1.seconds @@ -58,8 +48,8 @@ proc encodeNotRespondingEvent(): seq[byte] = proc dispatchToListeners[T]( ctx: ptr FFIContext[T], eventName: string, data: pointer, dataLen: int ) = - ## Holds reg.lock for the entire snapshot + invocation so concurrent - ## add/remove on this registry blocks until dispatch returns. + ## Lock held across the whole fan-out so foreign add/remove blocks + ## until dispatch returns (UAF-close contract from PR #39). withLock ctx[].eventRegistry.lock: let listeners = ctx[].eventRegistry.byEvent.getOrDefault(eventName) if listeners.len == 0: @@ -75,8 +65,7 @@ proc dispatchToListeners[T]( ) proc onNotResponding*(ctx: ptr FFIContext) = - ## Bypasses the event queue (which may itself be wedged) and dispatches - ## directly to listeners. Runs on the event thread. + ## Bypasses the (possibly wedged) event queue; runs on the event thread. let event = try: encodeNotRespondingEvent() @@ -91,28 +80,23 @@ proc onNotResponding*(ctx: ptr FFIContext) = proc sendRequestToFFIThread*( ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration ): Result[void, string] = - # Event-queue overflow refuses further requests; the event thread fires onNotResponding to avoid deadlocking on reg.lock here. if ctx.eventQueueStuck.load(): deleteRequest(ffiRequest) - return - err("event queue stuck - library cannot accept new requests") + return err("event queue stuck - library cannot accept new requests") - # Reentrancy guard: only this thread can fire `reqReceivedSignal`, so a handler dispatching back would self-deadlock. if onFFIThread: + # Re-entrant dispatch from a handler would self-deadlock on `reqReceivedSignal`. deleteRequest(ffiRequest) return err( "reentrant ffi call: a handler invoked sendRequestToFFIThread on its own context" ) - # All async submissions serialise on `ctx.lock` for the full - # trySend + fireSync + waitSync sequence because `reqChannel` is - # single-producer and `reqReceivedSignal` is shared across callers. - # Multi-producer redesign is tracked as PR #23 review item 7. + # `reqChannel` is single-producer and `reqReceivedSignal` shared; serialise + # the full trySend + fireSync + waitSync. PR #23 review item 7 tracks SP→MP. ctx.lock.acquire() defer: ctx.lock.release() - ## Sending the request let sentOk = ctx.reqChannel.trySend(ffiRequest) if not sentOk: deleteRequest(ffiRequest) @@ -127,42 +111,28 @@ proc sendRequestToFFIThread*( deleteRequest(ffiRequest) return err("Couldn't fireSync in time") - ## wait until the FFI working thread properly received the request let res = ctx.reqReceivedSignal.waitSync(timeout) if res.isErr(): - ## Do not free ffiRequest here: the FFI thread was already signaled and - ## will process (and free) it. + # FFI thread was already signaled; it owns ffiRequest now. return err("Couldn't receive reqReceivedSignal signal") - ## Notice that in case of "ok", the deallocShared(req) is performed by the FFI Thread in the - ## process proc. ok() proc processRequest[T]( request: ptr FFIThreadRequest, ctx: ptr FFIContext[T] ) {.async.} = - ## Invoked within the FFI thread to process a request coming from the FFI API consumer thread. - let reqId = $request[].reqId - ## The reqId determines which proc will handle the request. - ## The registeredRequests represents a table defined at compile time. - ## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously] - - ## Explicit conversion keeps `reqId` alive as the backing string, - ## avoiding the implicit string→cstring warning that will become an error. + # Keep `reqId` alive as backing for the cstring view. let reqIdCs = reqId.cstring let retFut = if not ctx[].registeredRequests[].contains(reqIdCs): - ## That shouldn't happen because only registered requests should be sent to the FFI thread. nilProcess(request[].reqId) else: ctx[].registeredRequests[][reqIdCs](cast[pointer](request), ctx) - ## Catch every catchable exception (including CancelledError raised by - ## the shutdown drain in ffiRun) so handleRes — and its `deleteRequest` - ## defer — always runs. Otherwise an abandoned in-flight handler would - ## leak its request envelope, reqId copy, and CBOR payload. + # Catch all (incl. CancelledError from the shutdown drain) so handleRes — + # and its `deleteRequest` defer — always runs. let res = try: await retFut @@ -171,16 +141,14 @@ proc processRequest[T]( "Error in processRequest for " & reqId & ": " & e.msg ) - ## handleRes may raise (OOM, GC setup) even though it is rare. Catching here - ## keeps the async proc raises:[] compatible. The defer inside handleRes - ## guarantees request is freed before the exception propagates. + # handleRes may raise (rare: OOM, GC setup); keep `raises: []`. try: handleRes(res, request) except Exception as e: error "Unexpected exception in handleRes", error = e.msg var ffiEventQueueSignalPtr {.threadvar.}: ThreadSignalPtr - ## Stashed so the hook below has no closure environment. + # Stashed so the hook has no closure env. proc ffiNotifyEventEnqueuedHook() {.gcsafe, raises: [].} = if not ffiEventQueueSignalPtr.isNil(): @@ -189,7 +157,6 @@ proc ffiNotifyEventEnqueuedHook() {.gcsafe, raises: [].} = error "failed to fire eventQueueSignal after enqueue", err = res.error proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = - ## FFI thread body that attends library user API requests ffiCurrentEventRegistry = addr ctx[].eventRegistry ffiCurrentEventQueue = addr ctx[].eventQueue ffiCurrentEventQueueStuck = addr ctx[].eventQueueStuck @@ -201,20 +168,16 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = defer: onFFIThread = false - # Unblocks destroyFFIContext's bounded wait so cleanup can proceed. + # Unblocks destroyFFIContext's bounded wait. let fireRes = ctx.threadExitSignal.fireSync() if fireRes.isErr(): error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} = var ffiReqHandler: T - ## Holds the main library object, i.e., in charge of handling the ffi requests. - ## e.g., Waku, LibP2P, SDS, etc. - ## In-flight processRequest futures. Tracked so they can be drained on - ## shutdown — otherwise destroying the context while a handler is - ## awaiting (e.g. sleepAsync) abandons the future and leaks the - ## request's envelope/reqId/payload allocations. + # Track in-flight handlers so shutdown can drain them — otherwise + # abandoned futures leak request envelope/reqId/payload. var pending: seq[Future[void]] = @[] proc reapCompleted() = @@ -226,7 +189,7 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = inc i while ctx.running.load(): - # Freezes if a sync handler blocks the dispatcher; event thread reads to detect wedged FFI thread. + # Freezes if a sync handler blocks; event thread reads for liveness. discard ctx.ffiHeartbeat.fetchAdd(1) reapCompleted() @@ -235,7 +198,6 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = if not gotSignal: continue - ## Wait for a request from the ffi consumer thread var request: ptr FFIThreadRequest if not ctx.reqChannel.tryRecv(request): continue @@ -243,17 +205,13 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = if ctx.myLib.isNil(): ctx.myLib = addr ffiReqHandler - ## Handle the request pending.add processRequest(request, ctx) let fireRes = ctx.reqReceivedSignal.fireSync() if fireRes.isErr(): error "could not fireSync back to requester thread", error = fireRes.error - ## Drain in-flight handlers so each request's `deleteRequest` runs - ## before we exit. Without this, abandoning a future mid-await would - ## leak the request allocations (visible to LSan; previously hidden - ## because Nim's pool allocator kept the chunks alive in the process). + # Drain in-flight handlers so each request's `deleteRequest` runs. reapCompleted() if pending.len > 0: try: @@ -264,7 +222,6 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = waitFor ffiRun(ctx) proc dispatchQueuedEvent[T](ctx: ptr FFIContext[T], qe: QueuedEvent) = - ## Frees `qe`'s c_malloc buffers on exit. defer: freeEventBuffers(qe.name, qe.data) ctx.dispatchToListeners($qe.name, qe.data, qe.dataLen) @@ -292,31 +249,35 @@ proc initHeartbeatMonitor[T](ctx: ptr FFIContext[T]): HeartbeatMonitor = ) proc check[T](hb: var HeartbeatMonitor, ctx: ptr FFIContext[T]) = - ## Fires onNotResponding once the FFI thread's heartbeat counter stops - ## advancing past the stale threshold. Latches until it moves again. + ## Fires onNotResponding once the heartbeat stalls past the threshold; + ## latches until it moves again. if Moment.now() - hb.startedAt <= FFIHeartbeatStartDelay: return + let cur = ctx.ffiHeartbeat.load() if cur != hb.lastValue: hb.lastValue = cur hb.lastChange = Moment.now() hb.notifiedStale = false - elif not hb.notifiedStale and - Moment.now() - hb.lastChange > FFIHeartbeatStaleThreshold: - onNotResponding(ctx) - hb.notifiedStale = true + return + + if hb.notifiedStale: + return + if Moment.now() - hb.lastChange <= FFIHeartbeatStaleThreshold: + return + onNotResponding(ctx) + hb.notifiedStale = true proc eventRun[T](ctx: ptr FFIContext[T]) {.async.} = var hb = initHeartbeatMonitor(ctx) var notifiedStuck = false while ctx.running.load(): - # Wake on enqueue or tick — whichever first. discard await ctx.eventQueueSignal.wait().withTimeout(EventThreadTickInterval) ctx.drainEventQueue() - # Fires here (after drain releases reg.lock) — from the FFI thread it'd deadlock on a back-pressuring listener. + # Fire after drain so reg.lock is free — FFI-thread would deadlock here. if not notifiedStuck and ctx.eventQueueStuck.load(): onNotResponding(ctx) notifiedStuck = true @@ -326,8 +287,7 @@ proc eventRun[T](ctx: ptr FFIContext[T]) {.async.} = hb.check(ctx) proc eventThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = - ## Drains the event queue and runs the FFI-thread heartbeat check. - ## Owns the queued `c_malloc` payloads until dispatch returns. + ## Owns queued `c_malloc` payloads until dispatch returns. defer: let fireRes = ctx.eventThreadExitSignal.fireSync() if fireRes.isErr(): @@ -338,40 +298,24 @@ proc eventThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = except CatchableError as e: error "event thread exited with exception", error = e.msg +template closeAndNil(field: untyped) = + if not field.isNil(): + ?field.close() + field = nil + proc deinitContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = - ## Mirror of `initContextResources`: tears down lock, registry, queue, - ## and signal fds in place. Threads MUST already be joined. Caller owns - ## the memory holding `ctx`. Fields are nil'd after close so a re-init - ## on the same slot doesn't double-close. + ## Mirror of `initContextResources`. Threads MUST be joined first; + ## fields are nil'd after close so re-init on the same slot is safe. ctx.lock.deinitLock() deinitEventRegistry(ctx[].eventRegistry) deinitEventQueue(ctx[].eventQueue) when defined(gcRefc): - ## ThreadSignalPtr.close() is intentionally skipped under --mm:refc. - ## - ## close() goes through chronos's safeUnregisterAndCloseFd, which calls - ## getThreadDispatcher() and lazily allocates a new Selector for the - ## main thread. With refc and a heavy ref-object graph torn down by the - ## FFI thread (libwaku/libp2p), that allocation traps inside rawNewObj - ## and the refc signal handler re-enters the same allocator — the - ## process never returns. Captured stack from a hung process: - ## close → safeUnregisterAndCloseFd → getThreadDispatcher → - ## newDispatcher → Selector.new → newObj (gc.nim:488) → - ## rawNewObj (gc.nim:470) → rawNewObj → _sigtramp → signalHandler → - ## newObjNoInit → addNewObjToZCT (infinite re-entry) - ## - ## --mm:orc does NOT exhibit this bug; see the - ## "destroyFFIContext refc workaround" suite in tests/test_ffi_context.nim - ## (test "destroy after heavy ref-allocation workload returns promptly"). - ## The signal fds (a few per ctx) are reclaimed by the OS at process - ## exit; destroyFFIContext is called once per process lifetime, so the - ## leak is bounded. + # ThreadSignalPtr.close() under refc traps in safeUnregisterAndCloseFd + # → newDispatcher → rawNewObj → signal-handler re-entry (process hangs). + # See tests/test_ffi_context.nim "destroyFFIContext refc workaround". + # Fd leak is bounded — destroy runs once per process lifetime. discard else: - template closeAndNil(field: untyped) = - if not field.isNil(): - ?field.close() - field = nil closeAndNil(ctx.reqSignal) closeAndNil(ctx.reqReceivedSignal) closeAndNil(ctx.stopSignal) @@ -381,16 +325,19 @@ proc deinitContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = ok() proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] = - ## Full cleanup for heap-allocated contexts: closes all resources and frees memory. + ## Deinit + free for heap-allocated contexts. defer: freeShared(ctx) ctx.deinitContextResources() +template newSignalOrErr(field: untyped, name: string) = + field = ThreadSignalPtr.new().valueOr: + return err("couldn't create " & name & " ThreadSignalPtr: " & $error) + proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = - ## Initialises all resources inside an already-allocated FFIContext slot. - ## On failure every partially-initialised resource is closed; the caller - ## is responsible for releasing the slot (freeShared or pool.releaseSlot). - # Defensive nil: deferred cleanup must never double-close stale pointers on a reused pool slot. + ## On failure, the deferred cleanup closes partial state; caller releases + ## the slot (freeShared or pool.releaseSlot). + # Nil first so deferred cleanup can't double-close a reused pool slot. ctx.reqSignal = nil ctx.reqReceivedSignal = nil ctx.stopSignal = nil @@ -410,23 +357,12 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = error "failed to clean up resources after createFFIContext failure", error = error - ctx.reqSignal = ThreadSignalPtr.new().valueOr: - return err("couldn't create reqSignal ThreadSignalPtr: " & $error) - - ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: - return err("couldn't create reqReceivedSignal ThreadSignalPtr: " & $error) - - ctx.stopSignal = ThreadSignalPtr.new().valueOr: - return err("couldn't create stopSignal ThreadSignalPtr: " & $error) - - ctx.threadExitSignal = ThreadSignalPtr.new().valueOr: - return err("couldn't create threadExitSignal ThreadSignalPtr: " & $error) - - ctx.eventQueueSignal = ThreadSignalPtr.new().valueOr: - return err("couldn't create eventQueueSignal ThreadSignalPtr: " & $error) - - ctx.eventThreadExitSignal = ThreadSignalPtr.new().valueOr: - return err("couldn't create eventThreadExitSignal ThreadSignalPtr: " & $error) + newSignalOrErr(ctx.reqSignal, "reqSignal") + newSignalOrErr(ctx.reqReceivedSignal, "reqReceivedSignal") + newSignalOrErr(ctx.stopSignal, "stopSignal") + newSignalOrErr(ctx.threadExitSignal, "threadExitSignal") + newSignalOrErr(ctx.eventQueueSignal, "eventQueueSignal") + newSignalOrErr(ctx.eventThreadExitSignal, "eventThreadExitSignal") ctx.registeredRequests = addr ffi_types.registeredRequests @@ -440,8 +376,7 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = try: createThread(ctx.eventThread, eventThreadBody[T], ctx) except ValueError, ResourceExhaustedError: - ## ffiThread is already running; signal it to exit and join before the - ## deferred cleanUpResources closes the signals it's waiting on. + # Join ffiThread before deferred cleanup closes signals it's waiting on. ctx.running.store(false) let fireRes = ctx.reqSignal.fireSync() if fireRes.isErr(): @@ -470,31 +405,23 @@ proc waitExitOrErr( ok() proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] = - # Error paths intentionally skip onNotResponding: a back-pressuring - # listener may hold reg.lock, and onNotResponding takes it — would - # amplify the stuck state into a deadlock instead of escaping it. + # Skip onNotResponding on error: it takes reg.lock, which a back-pressuring + # listener may hold — would deepen the stuck state into a deadlock. ctx.running.store(false) ?ctx.reqSignal.fireOrErr("reqSignal") ?ctx.stopSignal.fireOrErr("stopSignal") - # Non-fatal: event thread will see running==false on the next tick. + # Non-fatal: event thread sees running==false on the next tick anyway. ctx.eventQueueSignal.fireOrErr("eventQueueSignal").isOkOr: error "failed to signal eventQueueSignal in signalStop", error = error ok() -## If the FFI thread's event loop is blocked by a synchronous handler -## (e.g. blocking I/O), it cannot process reqSignal in time to exit. -## clearContext waits on threadExitSignal up to this bound; on timeout it -## returns err and skips joinThread/cleanup (leaking the thread + ctx slot) -## rather than hanging the caller forever. +## Bound on how long clearContext waits for the FFI thread to exit before +## leaking ctx rather than hanging the caller. const ThreadExitTimeout* = 1500.milliseconds proc stopAndJoinThreads*[T](ctx: ptr FFIContext[T]): Result[void, string] = - ## Signals both threads to stop, waits up to ThreadExitTimeout per thread, - ## and joins them. On timeout returns err and skips remaining joins - ## (leaving the threads live) rather than hanging the caller. Resource - ## cleanup is the caller's responsibility. - ## - ## Timeout paths skip onNotResponding for the same reason signalStop does. + ## On timeout, returns err and skips remaining joins (leaves threads live). + ## Caller owns resource cleanup. Skips onNotResponding (same reason as signalStop). ctx.signalStop().isOkOr: return err("signalStop failed: " & $error) @@ -505,7 +432,7 @@ proc stopAndJoinThreads*[T](ctx: ptr FFIContext[T]): Result[void, string] = ok() proc clearContext[T](ctx: ptr FFIContext[T]): Result[void, string] = - ## Stops the FFI context that was created via createFFIContext[T]() (heap). + ## Stops a heap-allocated FFI context. ctx.stopAndJoinThreads().isOkOr: return err("clearContext: " & $error) ctx.cleanUpResources().isOkOr: diff --git a/ffi/ffi_context_pool.nim b/ffi/ffi_context_pool.nim index a92ef6e..5e2d2cb 100644 --- a/ffi/ffi_context_pool.nim +++ b/ffi/ffi_context_pool.nim @@ -3,14 +3,10 @@ import results import ./ffi_context const MaxFFIContexts* = 32 - ## Maximum number of concurrently live FFI contexts when using FFIContextPool. - ## Fds and threads are only consumed for slots that are actually acquired, - ## so this value only affects the upfront memory of the pool array. + # Only affects upfront pool memory; fds/threads consumed per acquired slot. type FFIContextPool*[T] = object - ## Fixed-size pool of FFI contexts. Avoids dynamic heap allocation per context - ## and bounds the total number of file descriptors consumed by ThreadSignalPtrs - ## to at most MaxFFIContexts * 2. + ## Fixed pool. Bounds ThreadSignalPtr fds at MaxFFIContexts * 2. slots: array[MaxFFIContexts, FFIContext[T]] inUse: array[MaxFFIContexts, Atomic[bool]] @@ -19,7 +15,7 @@ proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], stri var expected = false if pool.inUse[i].compareExchange(expected, true): return ok(pool.slots[i].addr) - return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)") + err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)") proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) = for i in 0 ..< MaxFFIContexts: @@ -30,39 +26,31 @@ proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) = proc createFFIContext*[T]( pool: var FFIContextPool[T] ): Result[ptr FFIContext[T], string] = - ## Acquires a slot from the fixed pool and initialises it as an FFI context. - ## Bounded fd usage: at most MaxFFIContexts * 2 ThreadSignalPtr fds are ever open. let ctx = pool.acquireSlot().valueOr: return err("createFFIContext: acquireSlot failed: " & $error) initContextResources(ctx).isOkOr: pool.releaseSlot(ctx) return err("createFFIContext: initContextResources failed: " & $error) - return ok(ctx) + ok(ctx) proc destroyFFIContext*[T]( pool: var FFIContextPool[T], ctx: ptr FFIContext[T] ): Result[void, string] = - ## Stops the FFI context and returns its slot to the pool. If the FFI thread - ## is blocked and does not exit in time, the slot is leaked rather than - ## reclaimed — closing its resources while the thread is still live would be - ## unsafe. + ## On thread-exit timeout the slot is leaked — closing live-thread resources is unsafe. ctx.stopAndJoinThreads().isOkOr: return err("destroyFFIContext(pool): " & $error) - # Without this, the next acquisition would re-init an already-initialised - # lock (UB) and leak the previous signal fds. + # Required: next acquisition would otherwise re-init a live lock (UB). let deinitRes = ctx.deinitContextResources() pool.releaseSlot(ctx) deinitRes.isOkOr: return err("destroyFFIContext(pool): " & $error) - return ok() + ok() proc isValidCtx*[T](pool: var FFIContextPool[T], ctx: pointer): bool = - ## Returns true only if ctx points to one of the pool's slots that is - ## currently in use. Rejects nil, offset-invalid, and dangling pointers - ## at the API boundary, preventing use-after-free dereferences. + ## Rejects nil / offset-invalid / dangling pointers at the API boundary. if ctx.isNil(): return false for i in 0 ..< MaxFFIContexts: if cast[pointer](pool.slots[i].addr) == ctx: return pool.inUse[i].load() - return false + false diff --git a/ffi/ffi_events.nim b/ffi/ffi_events.nim index ade364b..d13c24d 100644 --- a/ffi/ffi_events.nim +++ b/ffi/ffi_events.nim @@ -1,15 +1,6 @@ -## Event registry, bounded SPSC event queue, and dispatch templates for -## FFI library-initiated events. Listeners receive only the event name -## they subscribed to. Queue payloads travel via `c_malloc` so transfer -## across Nim heaps is safe under both `--mm:orc` and `--mm:refc`. -## -## Dispatch templates enqueue and return on the FFI thread; a dedicated -## event thread (owned by `FFIContext`) drains the queue and invokes -## listeners, so a slow handler can never block the FFI event loop. On -## queue overflow the templates log, set a sticky stuck flag, and wake -## the event thread — which fires the global "not responding" -## notification from its own loop (firing it from the FFI thread would -## risk deadlocking against a listener back-pressuring under `reg.lock`). +## Per-context event registry + bounded SPSC queue. FFI thread enqueues, +## event thread drains; payloads travel via `c_malloc` so they survive +## pool-slot reuse across thread heaps. {.pragma: callback, cdecl, raises: [], gcsafe.} @@ -20,9 +11,7 @@ import ./ffi_types, ./cbor_serial, ./alloc type EventEnvelope*[T] = object - ## Standard wire shape for CBOR-encoded FFI events: - ## { eventType: tstr, payload: } - ## Pair with `dispatchFFIEventCbor` (or call `cborEncode` directly). + ## CBOR wire shape: { eventType: tstr, payload: }. eventType*: string payload*: T @@ -34,33 +23,22 @@ type userData*: pointer FFIEventRegistry* = object - ## Per-context multi-listener registry. `lock` guards every mutation; - ## readers (dispatch path) acquire it only long enough to copy out the - ## listener slice for the event being dispatched. lock*: Lock - nextId*: uint64 ## Monotonic id source. 0 is reserved as "invalid"; ids start at 1. + nextId*: uint64 # 0 is reserved as "invalid"; ids start at 1. byEvent*: Table[string, seq[FFIEventListener]] proc initEventRegistry*(reg: var FFIEventRegistry) = - ## Must be called exactly once on the owning thread before the registry - ## is shared. The embedded `Lock` wraps a platform primitive that cannot - ## be safely double-initialised, so concurrent callers would hit UB at - ## the OS layer — the lock itself can't defend against its own init. + ## Must run once on the owning thread before sharing — `initLock` on a + ## live primitive is UB at the OS layer. reg.lock.initLock() reg.nextId = 0'u64 reg.byEvent = initTable[string, seq[FFIEventListener]]() proc deinitEventRegistry*(reg: var FFIEventRegistry) = - ## Mirror of `initEventRegistry`: must be called exactly once, by the - ## same thread that owns the registry, after all other threads have - ## stopped using it. `deinitLock` on a platform primitive that any - ## thread might still be holding or about to acquire is UB at the OS - ## layer. - ## - ## Resets the GC-managed fields to default so `FFIContextPool`'s - ## slot reuse on a *different* thread doesn't trigger Nim's hidden - ## assignment destructor against this thread's heap allocations. + ## Mirror of `initEventRegistry`; same single-thread constraint. + ## Resets GC fields so pool-slot reuse on another thread doesn't fire + ## Nim's hidden assignment dtor against this thread's heap. reg.lock.deinitLock() reg.byEvent = default(Table[string, seq[FFIEventListener]]) reg.nextId = 0'u64 @@ -71,11 +49,7 @@ proc addEventListener*( callback: FFICallBack, userData: pointer, ): uint64 {.raises: [].} = - ## Registers `callback` for `eventName` and returns the listener's stable - ## id (always non-zero on success). A listener only receives events - ## dispatched under its own `eventName` — subscribe to each event - ## separately. Returns 0 if `callback` is nil — the only documented - ## failure mode. + ## Returns the listener id (>0), or 0 if `callback` is nil. if callback.isNil(): return 0 @@ -90,10 +64,8 @@ proc addEventListener*( assigned proc removeEventListener*(reg: var FFIEventRegistry, id: uint64): bool {.raises: [].} = - ## Removes the listener with `id`. Returns true on success, false if no - ## listener with that id exists. Safe to call from inside a dispatch: - ## the in-flight snapshot still delivers exactly once to the listener - ## being removed. + ## Safe to call from inside a dispatch — the in-flight snapshot still + ## delivers exactly once to the removed listener. if id == 0'u64: return false @@ -117,42 +89,35 @@ proc removeEventListener*(reg: var FFIEventRegistry, id: uint64): bool {.raises: removed proc removeAllEventListeners*(reg: var FFIEventRegistry) {.raises: [].} = - ## Drops every registered listener. Does not reset the listener-id - ## counter — subsequent `addEventListener` calls still return strictly - ## increasing ids. + ## Does not reset the id counter. withLock reg.lock: reg.byEvent.clear() proc snapshotListeners*( reg: var FFIEventRegistry, eventName: string ): seq[FFIEventListener] {.raises: [].} = - ## Returns a copy of the listener slice for `eventName`. The copy is what - ## makes re-entrant add/remove from inside a handler deadlock-free: - ## dispatch holds the lock only for the duration of the copy, then - ## iterates the copy outside the lock. + ## Lock held only across the copy — keeps re-entrant add/remove + ## from a handler deadlock-free. var listeners: seq[FFIEventListener] = @[] withLock reg.lock: - # `getOrDefault` avoids the raising `[]` path; returns empty when absent. for l in reg.byEvent.getOrDefault(eventName): listeners.add(l) listeners const EventQueueCapacity* = 1024 - ## ~24 KiB per context. Sustained backlog at this depth means a - ## listener is wedged — what the stuck flag exists to surface. + # Sustained backlog at this depth means a listener is wedged. type QueuedEvent* = object - ## All fields are raw `c_malloc` pointers so the buffer survives - ## pool-slot reuse across thread heaps without an assignment dtor. + # Raw `c_malloc` pointers so the buffer survives pool-slot reuse + # across thread heaps without an assignment dtor. name*: cstring data*: ptr UncheckedArray[byte] dataLen*: int EventQueue* = object - ## SPSC ring: FFI thread enqueues, event thread dequeues. Plain lock - ## (no atomic indices) — operations are short and uncontended. + # SPSC ring; plain lock since ops are short and uncontended. lock*: Lock head*: int tail*: int @@ -160,7 +125,6 @@ type buf*: array[EventQueueCapacity, QueuedEvent] proc initEventQueue*(q: var EventQueue) {.raises: [].} = - ## Same single-owning-thread constraint as `initEventRegistry`. q.lock.initLock() q.head = 0 q.tail = 0 @@ -177,7 +141,7 @@ proc freeEventBuffers*( c_free(data) proc deinitEventQueue*(q: var EventQueue) {.raises: [].} = - ## Both producer and consumer must have stopped before calling. + ## Both producer and consumer must have stopped. for i in 0 ..< EventQueueCapacity: freeEventBuffers(q.buf[i].name, q.buf[i].data) q.buf[i] = QueuedEvent(name: nil, data: nil, dataLen: 0) @@ -189,8 +153,7 @@ proc deinitEventQueue*(q: var EventQueue) {.raises: [].} = proc tryEnqueueEvent*( q: var EventQueue, name: cstring, data: ptr UncheckedArray[byte], dataLen: int ): bool {.raises: [], gcsafe.} = - ## Both `name` and `data` must be `c_malloc`'d; on success the queue - ## takes ownership. On false the caller still owns and must free them. + ## On true the queue owns `name`/`data`; on false the caller still does. withLock q.lock: if q.count >= EventQueueCapacity: return false @@ -200,7 +163,7 @@ proc tryEnqueueEvent*( true proc tryDequeueEvent*(q: var EventQueue): Option[QueuedEvent] {.raises: [], gcsafe.} = - ## Transfers buffer ownership to the caller, who must `c_free` both. + ## Caller takes ownership and must `c_free` both buffers. withLock q.lock: if q.count == 0: return none(QueuedEvent) @@ -218,7 +181,6 @@ proc eventQueueLen*(q: var EventQueue): int {.raises: [], gcsafe.} = proc notifyListenersOk*( listeners: seq[FFIEventListener], data: pointer, dataLen: int ) = - ## Fans out a successful payload to every listener in the snapshot. let dataPtr = if dataLen > 0: cast[ptr cchar](data) else: nil @@ -228,7 +190,6 @@ proc notifyListenersOk*( ) proc notifyListenersErr*(listeners: seq[FFIEventListener], msg: string) = - ## Fans out an error message to every listener in the snapshot. let dataPtr = if msg.len > 0: cast[ptr cchar](unsafeAddr msg[0]) else: nil @@ -238,22 +199,17 @@ proc notifyListenersErr*(listeners: seq[FFIEventListener], msg: string) = ) var ffiCurrentEventRegistry* {.threadvar.}: ptr FFIEventRegistry - ## Kept for tests that drive the registry directly. Dispatch no longer - ## reads it — invocation has moved to the event thread. + # Kept for tests that drive the registry directly. var ffiCurrentEventQueue* {.threadvar.}: ptr EventQueue - ## Installed by the FFI thread so dispatch templates enqueue without - ## threading a `ctx` parameter through every call site. + # Installed by the FFI thread so dispatch templates need no `ctx`. var ffiCurrentEventQueueStuck* {.threadvar.}: ptr Atomic[bool] - ## Sticky overflow flag on the owning `FFIContext`. Set by dispatch - ## templates when enqueue fails; read by the FFI request entry point - ## to reject further calls. + # Sticky overflow flag; FFI request entry point reads it to reject. var ffiCurrentNotifyEventEnqueued* {.threadvar.}: proc() {.gcsafe, raises: [].} - ## Hook (not a queue field) so this module doesn't depend on chronos's - ## ThreadSignalPtr. Nil-safe — tests that drive the queue directly leave - ## it unset and the event thread picks up enqueued events on the next tick. + # Hook so this module doesn't depend on chronos's ThreadSignalPtr. + # Nil-safe; tick-driven tests leave it unset. template enqueueOrMarkStuck( eventName: string, @@ -261,34 +217,31 @@ template enqueueOrMarkStuck( dataPtr: ptr UncheckedArray[byte], dataLen: int, ) = - ## Common tail for both dispatch templates. Takes ownership of `namePtr` - ## and `dataPtr` (both `c_malloc`'d). On queue-full, frees the buffers, - ## sets the sticky stuck flag, and wakes the event thread — which fires - ## onNotResponding from its loop (firing it here would risk deadlocking - ## against a listener back-pressuring under `reg.lock`). - let q = ffiCurrentEventQueue - if q.isNil(): - chronicles.error "event queue not set on this thread", event = eventName - freeEventBuffers(namePtr, dataPtr) - elif not q[].tryEnqueueEvent(namePtr, dataPtr, dataLen): - chronicles.error "event queue full; library marked stuck", - event = eventName, capacity = EventQueueCapacity - freeEventBuffers(namePtr, dataPtr) - if not ffiCurrentEventQueueStuck.isNil(): - ffiCurrentEventQueueStuck[].store(true) - if not ffiCurrentNotifyEventEnqueued.isNil(): - ffiCurrentNotifyEventEnqueued() - else: + ## Takes ownership of `namePtr`/`dataPtr`. On queue-full sets the sticky + ## stuck flag and wakes the event thread (firing onNotResponding here + ## would risk deadlock against a back-pressuring listener). + block enqueueBlock: + let q = ffiCurrentEventQueue + if q.isNil(): + chronicles.error "event queue not set on this thread", event = eventName + freeEventBuffers(namePtr, dataPtr) + break enqueueBlock + if not q[].tryEnqueueEvent(namePtr, dataPtr, dataLen): + chronicles.error "event queue full; library marked stuck", + event = eventName, capacity = EventQueueCapacity + freeEventBuffers(namePtr, dataPtr) + if not ffiCurrentEventQueueStuck.isNil(): + ffiCurrentEventQueueStuck[].store(true) + if not ffiCurrentNotifyEventEnqueued.isNil(): + ffiCurrentNotifyEventEnqueued() + break enqueueBlock if not ffiCurrentNotifyEventEnqueued.isNil(): ffiCurrentNotifyEventEnqueued() template dispatchFFIEvent*(eventName: string, body: untyped) = - ## Dispatches an FFI event to every listener subscribed to `eventName`. - ## `body` must yield a `string` or `seq[byte]`. - ## - ## Runs on the FFI thread: encodes the body into a fresh `c_malloc` - ## buffer and enqueues it. Listener invocation happens later on the - ## dedicated event thread, so user code can never block the FFI loop. + ## `body` must yield `string` or `seq[byte]`. Runs on the FFI thread — + ## encodes into a `c_malloc` buffer and enqueues; the event thread + ## fans out to listeners. block: let evtName: string = eventName let bodyVal = body @@ -301,12 +254,9 @@ template dispatchFFIEvent*(eventName: string, body: untyped) = enqueueOrMarkStuck(evtName, namePtr, dataPtr, dataLen) template dispatchFFIEventCbor*(eventName: string, eventPayload: typed) = - ## Typed CBOR variant of `dispatchFFIEvent`. Wraps `eventPayload` in an - ## `EventEnvelope`, CBOR-encodes it into a `c_malloc` buffer once, and - ## queues it for the event thread to fan out to listeners. - ## - ## NB: parameter is `eventPayload`, not `payload` — Nim's template - ## substitution would otherwise also rewrite the `payload:` field inside + ## Typed CBOR variant of `dispatchFFIEvent`. + ## Parameter is `eventPayload`, not `payload` — Nim's template + ## substitution would otherwise rewrite the `payload:` field inside ## `EventEnvelope`. block: let evtName: string = eventName diff --git a/tests/unit/test_event_dispatch.nim b/tests/unit/test_event_dispatch.nim index deb6724..313db29 100644 --- a/tests/unit/test_event_dispatch.nim +++ b/tests/unit/test_event_dispatch.nim @@ -1,11 +1,5 @@ -## Tests for the CBOR-style FFI event dispatch path: -## - `dispatchFFIEvent` accepts both `string` and `seq[byte]` bodies -## - `dispatchFFIEventCbor` wraps a typed payload in `EventEnvelope[T]`, -## CBOR-encodes it, and dispatches via the event callback -## -## Tests run end-to-end against a real FFI thread (via FFIContextPool + -## sendRequestToFFIThread) so we exercise the threadvar-backed -## ffiCurrentEventRegistry wiring, not just the templates in isolation. +## End-to-end tests for `dispatchFFIEvent` / `dispatchFFIEventCbor`, +## driven through a real `FFIContext` so the threadvar wiring is exercised. import std/[locks, os] import unittest2 @@ -14,16 +8,10 @@ import ffi type TestEvtLib = object -## Event payload type (would be `{.ffi.}` in production so the codec gen -## emits a matching struct on the foreign side; the test only needs CBOR -## round-trip, which `cborEncode`/`cborDecode` provide via cbor_serial's -## generic overloads). type MessageSentBody* {.ffi.} = object requestId*: string messageHash*: string -## Same callback-state helper as test_ffi_context.nim, duplicated here so -## this file stays a self-contained test binary. type CallbackData = object lock: Lock cond: Cond @@ -40,6 +28,13 @@ proc deinitCallbackData(d: var CallbackData) = d.cond.deinitCond() d.lock.deinitLock() +template setupCallbackData(name: untyped) = + ## Declares `name`, inits it, and defers its deinit in the caller's scope. + var name: CallbackData + initCallbackData(name) + defer: + deinitCallbackData(name) + proc captureCb( retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer ) {.cdecl, gcsafe, raises: [].} = @@ -60,15 +55,27 @@ proc waitCallback(d: var CallbackData) = wait(d.cond, d.lock) release(d.lock) +proc resetCalled(d: var CallbackData) = + acquire(d.lock) + d.called = false + release(d.lock) + proc callbackBytes(d: var CallbackData): seq[byte] = var bytes = newSeq[byte](d.msgLen) if d.msgLen > 0: copyMem(addr bytes[0], addr d.msg[0], d.msgLen) bytes -## A request that dispatches a typed CBOR event from inside the FFI -## thread and then returns ok — so the response callback can be used to -## synchronize the test. +template withPool(ctxIdent: untyped, body: untyped) = + ## Sets up pool + ctx, runs body, destroys on exit. + var pool: FFIContextPool[TestEvtLib] + let ctxIdent = pool.createFFIContext().valueOr: + check false + return + defer: + discard pool.destroyFFIContext(ctxIdent) + body + registerReqFFI(EmitCborEventRequest, lib: ptr TestEvtLib): proc(): Future[Result[string, string]] {.async.} = dispatchFFIEventCbor( @@ -77,19 +84,13 @@ registerReqFFI(EmitCborEventRequest, lib: ptr TestEvtLib): ) return ok("emitted") -## A request that uses the lower-level `dispatchFFIEvent` with a raw -## `seq[byte]` body — the path that previously rejected non-string bodies. registerReqFFI(EmitRawBytesEventRequest, lib: ptr TestEvtLib): proc(): Future[Result[string, string]] {.async.} = dispatchFFIEvent("raw_bytes"): @[byte 0x01, 0x02, 0x03] return ok("emitted") -## Setter-thread worker for the registry race regression test. Each -## iteration adds then immediately removes a listener for the dispatched -## event so a TSan-instrumented build can confirm `FFIEventRegistry.lock` -## serialises the cross-thread mutation against dispatch-time -## `snapshotListeners` reads from the FFI thread. +# Add/remove worker for the registry-race regression test. type SetterArgs = tuple ctx: ptr FFIContext[TestEvtLib] stop: ptr Atomic[bool] @@ -104,153 +105,88 @@ proc setterThreadBody(args: SetterArgs) {.thread.} = suite "dispatchFFIEventCbor": test "delivers EventEnvelope-shaped CBOR payload to event callback": - # CallbackData defers declared first so they run LAST (LIFO), - # AFTER pool.destroyFFIContext joins the event thread. Otherwise - # TSan flags `captureCb` accessing an already-destroyed mutex - # whose memory got reused by the next test's stack frame. - var evt: CallbackData - initCallbackData(evt) - defer: - deinitCallbackData(evt) + # CallbackData defers declared first run last (LIFO), AFTER pool destroy + # joins the event thread — otherwise TSan flags captureCb on a destroyed mutex. + setupCallbackData(evt) + setupCallbackData(rsp) - var rsp: CallbackData - initCallbackData(rsp) - defer: - deinitCallbackData(rsp) - - var pool: FFIContextPool[TestEvtLib] - let ctx = pool.createFFIContext().valueOr: - check false - return - defer: - discard pool.destroyFFIContext(ctx) - - # Subscribe to the specific event the request below dispatches. - discard addEventListener( - ctx[].eventRegistry, "message_sent", captureCb, addr evt - ) - - check sendRequestToFFIThread( - ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp) - ) - .isOk() - waitCallback(rsp) - waitCallback(evt) - - check evt.retCode == RET_OK - let decoded = cborDecode(callbackBytes(evt), EventEnvelope[MessageSentBody]) - check decoded.isOk() - check decoded.value.eventType == "message_sent" - check decoded.value.payload.requestId == "req-1" - check decoded.value.payload.messageHash == "0xdeadbeef" - -suite "dispatchFFIEvent with seq[byte]": - test "accepts a raw seq[byte] body": - var evt: CallbackData - initCallbackData(evt) - defer: - deinitCallbackData(evt) - - var rsp: CallbackData - initCallbackData(rsp) - defer: - deinitCallbackData(rsp) - - var pool: FFIContextPool[TestEvtLib] - let ctx = pool.createFFIContext().valueOr: - check false - return - defer: - discard pool.destroyFFIContext(ctx) - - discard addEventListener( - ctx[].eventRegistry, "raw_bytes", captureCb, addr evt - ) - - check sendRequestToFFIThread( - ctx, EmitRawBytesEventRequest.ffiNewReq(captureCb, addr rsp) - ) - .isOk() - waitCallback(rsp) - waitCallback(evt) - - check evt.retCode == RET_OK - check callbackBytes(evt) == @[byte 0x01, 0x02, 0x03] - -when not defined(gcRefc): - ## Skipped under `--mm:refc`: each setter thread grows / shrinks the - ## per-event listener `seq[FFIEventListener]` via `addEventListener`, - ## and refc's per-thread GC heap ownership makes cross-thread seq - ## buffer reallocation unsafe even when the surrounding lock is held. - ## ORC + the FFI thread + tsan (the combo this test was written for) - ## does not have that limitation. - suite "FFIEventRegistry concurrent access": - ## Regression for PR #39 review comments r3288220895 / r3289285387. - ## Run under tsan to actually validate the fix: - ## NIM_FFI_SAN=tsan NIM_FFI_MM=orc nimble test_sanitized - test "concurrent add/remove writers vs dispatch reads stay race-free": - var evt: CallbackData - initCallbackData(evt) - defer: - deinitCallbackData(evt) - - var rsp: CallbackData - initCallbackData(rsp) - defer: - deinitCallbackData(rsp) - - var pool: FFIContextPool[TestEvtLib] - let ctx = pool.createFFIContext().valueOr: - check false - return - defer: - discard pool.destroyFFIContext(ctx) - - # Seed an initial callback so the FFI thread's first dispatch has a - # target. The setter threads will then repeatedly re-install the same - # (callback, userData) pair — what matters is the cross-thread write - # racing the FFI thread's read, not which pair "wins". + withPool(ctx): discard addEventListener( ctx[].eventRegistry, "message_sent", captureCb, addr evt ) - const NumSetterThreads = 4 - const NumDispatchIters = 200 + check sendRequestToFFIThread( + ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp) + ) + .isOk() + waitCallback(rsp) + waitCallback(evt) - var stop: Atomic[bool] - stop.store(false) - var setters: array[NumSetterThreads, Thread[SetterArgs]] - for i in 0 ..< NumSetterThreads: - createThread(setters[i], setterThreadBody, (ctx, addr stop, addr evt)) + check evt.retCode == RET_OK + let decoded = cborDecode(callbackBytes(evt), EventEnvelope[MessageSentBody]) + check decoded.isOk() + check decoded.value.eventType == "message_sent" + check decoded.value.payload.requestId == "req-1" + check decoded.value.payload.messageHash == "0xdeadbeef" - for _ in 0 ..< NumDispatchIters: - # Reset rsp so each iteration's `waitCallback` blocks until the - # FFI thread fires the response — keeps the loop synchronous. - acquire(rsp.lock) - rsp.called = false - release(rsp.lock) +suite "dispatchFFIEvent with seq[byte]": + test "accepts a raw seq[byte] body": + setupCallbackData(evt) + setupCallbackData(rsp) - check sendRequestToFFIThread( - ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp) + withPool(ctx): + discard addEventListener( + ctx[].eventRegistry, "raw_bytes", captureCb, addr evt + ) + + check sendRequestToFFIThread( + ctx, EmitRawBytesEventRequest.ffiNewReq(captureCb, addr rsp) + ) + .isOk() + waitCallback(rsp) + waitCallback(evt) + + check evt.retCode == RET_OK + check callbackBytes(evt) == @[byte 0x01, 0x02, 0x03] + +when not defined(gcRefc): + ## Skipped under refc: setter threads grow/shrink the per-event listener + ## seq, and refc's per-thread GC heap makes that unsafe cross-thread. + suite "FFIEventRegistry concurrent access": + ## Regression for PR #39 (r3288220895 / r3289285387). + ## Validate with: NIM_FFI_SAN=tsan NIM_FFI_MM=orc nimble test_sanitized + test "concurrent add/remove writers vs dispatch reads stay race-free": + setupCallbackData(evt) + setupCallbackData(rsp) + + withPool(ctx): + # Seed an initial listener so the first dispatch has a target. + discard addEventListener( + ctx[].eventRegistry, "message_sent", captureCb, addr evt ) - .isOk() - waitCallback(rsp) - stop.store(true) - for i in 0 ..< NumSetterThreads: - joinThread(setters[i]) + const NumSetterThreads = 4 + const NumDispatchIters = 200 - # `evt` got hit by every dispatch above; just confirm at least one - # actually landed so a silently-broken dispatch loop is caught. - check evt.called + var stop: Atomic[bool] + stop.store(false) + var setters: array[NumSetterThreads, Thread[SetterArgs]] + for i in 0 ..< NumSetterThreads: + createThread(setters[i], setterThreadBody, (ctx, addr stop, addr evt)) -## A foreign-thread mutation must not be able to invalidate a listener's -## `userData` while an in-flight dispatch is mid-invocation. Dispatch -## now lives on the dedicated event thread, which still holds -## `reg.lock` across the listener fan-out — so foreign -## `removeEventListener` blocks until dispatch returns. This test -## drives a real `FFIContext` (FFI thread enqueues, event thread -## dispatches) and asserts the same contract end-to-end. + for _ in 0 ..< NumDispatchIters: + resetCalled(rsp) + check sendRequestToFFIThread( + ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp) + ) + .isOk() + waitCallback(rsp) + + stop.store(true) + for i in 0 ..< NumSetterThreads: + joinThread(setters[i]) + + check evt.called type SlowState = object entered: Atomic[bool] @@ -259,8 +195,6 @@ type SlowState = object proc slowEventCb( retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer ) {.cdecl, gcsafe, raises: [].} = - ## Signal entry, sleep long enough that the test thread can race in - ## with removeEventListener and observe the block, then signal exit. let st = cast[ptr SlowState](userData) st[].entered.store(true) os.sleep(60) @@ -268,47 +202,32 @@ proc slowEventCb( suite "registry lock held during invocation": test "removeEventListener blocks until in-flight dispatch finishes": - var rsp: CallbackData - initCallbackData(rsp) - defer: - deinitCallbackData(rsp) + setupCallbackData(rsp) - var pool: FFIContextPool[TestEvtLib] - let ctx = pool.createFFIContext().valueOr: - check false - return - defer: - discard pool.destroyFFIContext(ctx) + withPool(ctx): + var st: SlowState + st.entered.store(false) + st.exited.store(false) - var st: SlowState - st.entered.store(false) - st.exited.store(false) + let id = addEventListener( + ctx[].eventRegistry, "message_sent", slowEventCb, addr st + ) + check id != 0'u64 - # Register the slow callback under the same event the dispatch fires. - let id = addEventListener( - ctx[].eventRegistry, "message_sent", slowEventCb, addr st - ) - check id != 0'u64 + check sendRequestToFFIThread( + ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp) + ) + .isOk() + waitCallback(rsp) - # Fire an event from the FFI thread; the request-response callback - # only confirms the request completed — the slow listener is invoked - # asynchronously by the event thread. + for _ in 0 ..< 500: + if st.entered.load(): + break + os.sleep(1) + check st.entered.load() + check not st.exited.load() - check sendRequestToFFIThread( - ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp) - ) - .isOk() - waitCallback(rsp) - - # Wait until the event thread is inside slowEventCb. - for _ in 0 ..< 500: - if st.entered.load(): - break - os.sleep(1) - check st.entered.load() - check not st.exited.load() - - # Lock-during-invocation contract: remove blocks until dispatch - # finishes; by the time it returns, slowEventCb has set exited=true. - check removeEventListener(ctx[].eventRegistry, id) - check st.exited.load() + # Lock-during-invocation: remove blocks until dispatch finishes, + # by which time slowEventCb has set exited=true. + check removeEventListener(ctx[].eventRegistry, id) + check st.exited.load() diff --git a/tests/unit/test_event_listener.nim b/tests/unit/test_event_listener.nim index 4cf15aa..f63e860 100644 --- a/tests/unit/test_event_listener.nim +++ b/tests/unit/test_event_listener.nim @@ -1,23 +1,12 @@ -## Unit tests for the `FFIEventRegistry` primitive — the multi-listener -## data structure that will back `_add_event_listener` / -## `_remove_event_listener` once the dispatch wiring lands. -## -## These tests exercise the registry directly (no FFI thread, no dispatch -## templates) so they stay fast and pin down the registry's mutation and -## snapshot semantics in isolation. +## Unit tests for the `FFIEventRegistry` primitive (no FFI thread, no dispatch). import std/locks import unittest2 import ffi -# Tiny helpers — a thread-safe sink each listener writes into so we can -# assert which callbacks would fire and in what order once dispatch lands. -# Today only `tagCb`'s presence is exercised; the recorder is also used to -# make sure listener bookkeeping doesn't accidentally invoke callbacks. - type Recorder = object lock: Lock - hits: seq[string] # tag captured from `userData` per invocation + hits: seq[string] retCodes: seq[cint] payloads: seq[string] @@ -34,7 +23,6 @@ proc record(r: var Recorder, tag: string, retCode: cint, payload: string) = r.payloads.add(payload) release(r.lock) -# Each listener is identified by a `Tag` passed through `userData`. type Tag = object name: string rec: ptr Recorder @@ -48,17 +36,22 @@ proc tagCb( copyMem(addr payload[0], msg, int(len)) record(t[].rec[], t[].name, retCode, payload) +template setupRegistry(regIdent: untyped) = + var regIdent: FFIEventRegistry + initEventRegistry(regIdent) + defer: + deinitEventRegistry(regIdent) + +template setupRecorder(recIdent: untyped) = + var recIdent: Recorder + initRecorder(recIdent) + defer: + deinitRecorder(recIdent) suite "FFIEventRegistry mutation": test "addEventListener assigns monotonically increasing non-zero ids": - var reg: FFIEventRegistry - initEventRegistry(reg) - defer: - deinitEventRegistry(reg) - var rec: Recorder - initRecorder(rec) - defer: - deinitRecorder(rec) + setupRegistry(reg) + setupRecorder(rec) var t = Tag(name: "a", rec: addr rec) let id1 = addEventListener(reg, "evt", tagCb, addr t) @@ -69,30 +62,18 @@ suite "FFIEventRegistry mutation": check id3 == 3'u64 test "addEventListener returns 0 when callback is nil": - var reg: FFIEventRegistry - initEventRegistry(reg) - defer: - deinitEventRegistry(reg) + setupRegistry(reg) let id = addEventListener(reg, "evt", nil, nil) check id == 0'u64 test "removeEventListener returns false for unknown ids": - var reg: FFIEventRegistry - initEventRegistry(reg) - defer: - deinitEventRegistry(reg) + setupRegistry(reg) check not removeEventListener(reg, 0'u64) check not removeEventListener(reg, 99'u64) test "removeEventListener removes listeners across distinct events": - var reg: FFIEventRegistry - initEventRegistry(reg) - defer: - deinitEventRegistry(reg) - var rec: Recorder - initRecorder(rec) - defer: - deinitRecorder(rec) + setupRegistry(reg) + setupRecorder(rec) var t = Tag(name: "a", rec: addr rec) let id1 = addEventListener(reg, "evt", tagCb, addr t) @@ -100,7 +81,6 @@ suite "FFIEventRegistry mutation": check removeEventListener(reg, id1) check removeEventListener(reg, id2) - # Second remove of the same id is a no-op. check not removeEventListener(reg, id1) check snapshotListeners(reg, "evt").len == 0 @@ -108,14 +88,8 @@ suite "FFIEventRegistry mutation": suite "FFIEventRegistry snapshot semantics": test "snapshot returns only the listeners for the requested event": - var reg: FFIEventRegistry - initEventRegistry(reg) - defer: - deinitEventRegistry(reg) - var rec: Recorder - initRecorder(rec) - defer: - deinitRecorder(rec) + setupRegistry(reg) + setupRecorder(rec) var a = Tag(name: "a", rec: addr rec) var b = Tag(name: "b", rec: addr rec) var c = Tag(name: "c", rec: addr rec) @@ -124,32 +98,19 @@ suite "FFIEventRegistry snapshot semantics": discard addEventListener(reg, "evt", tagCb, addr b) discard addEventListener(reg, "other", tagCb, addr c) - let snapEvt = snapshotListeners(reg, "evt") - check snapEvt.len == 2 # both listeners for "evt" - - let snapOther = snapshotListeners(reg, "other") - check snapOther.len == 1 # only the listener for "other" - - let snapUnknown = snapshotListeners(reg, "no-subscriber") - check snapUnknown.len == 0 # no listener for this event + check snapshotListeners(reg, "evt").len == 2 + check snapshotListeners(reg, "other").len == 1 + check snapshotListeners(reg, "no-subscriber").len == 0 test "snapshot is a copy: post-snapshot mutation does not affect it": - var reg: FFIEventRegistry - initEventRegistry(reg) - defer: - deinitEventRegistry(reg) - var rec: Recorder - initRecorder(rec) - defer: - deinitRecorder(rec) + setupRegistry(reg) + setupRecorder(rec) var t = Tag(name: "a", rec: addr rec) let id1 = addEventListener(reg, "evt", tagCb, addr t) let snap = snapshotListeners(reg, "evt") check snap.len == 1 - # Mutating the registry after the snapshot must not retroactively - # shrink or grow the snapshot we already captured. check removeEventListener(reg, id1) discard addEventListener(reg, "evt", tagCb, addr t) check snap.len == 1 @@ -157,14 +118,8 @@ suite "FFIEventRegistry snapshot semantics": suite "removeAllEventListeners": test "drops every registered listener": - var reg: FFIEventRegistry - initEventRegistry(reg) - defer: - deinitEventRegistry(reg) - var rec: Recorder - initRecorder(rec) - defer: - deinitRecorder(rec) + setupRegistry(reg) + setupRecorder(rec) var a = Tag(name: "a", rec: addr rec) var b = Tag(name: "b", rec: addr rec) diff --git a/tests/unit/test_event_thread.nim b/tests/unit/test_event_thread.nim index a459a24..5960a5d 100644 --- a/tests/unit/test_event_thread.nim +++ b/tests/unit/test_event_thread.nim @@ -1,7 +1,4 @@ -## Integration tests for the dedicated event thread introduced by -## issue #6. Each test stands up a real `FFIContext` (via -## `FFIContextPool`) and exercises the FFI thread → bounded queue → -## event thread → listener pipeline end to end. +## Integration tests for the dedicated event thread (issue #6). import std/[atomics, locks, os, strutils] import unittest2 @@ -13,9 +10,6 @@ type TestEvtLib = object type LatchPayload* {.ffi.} = object iter*: int -## Captured-state callback identical to the helpers in -## `test_event_dispatch.nim`, repeated here so this file stays a -## self-contained test binary. type CallbackData = object lock: Lock cond: Cond @@ -32,6 +26,13 @@ proc deinitCallbackData(d: var CallbackData) = d.cond.deinitCond() d.lock.deinitLock() +template setupCallbackData(name: untyped) = + ## Declares `name`, inits it, and defers its deinit in the caller's scope. + var name: CallbackData + initCallbackData(name) + defer: + deinitCallbackData(name) + proc captureCb( retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer ) {.cdecl, gcsafe, raises: [].} = @@ -52,11 +53,13 @@ proc waitCallback(d: var CallbackData) = wait(d.cond, d.lock) release(d.lock) +proc resetCalled(d: var CallbackData) = + acquire(d.lock) + d.called = false + release(d.lock) + proc waitCallbackTimeout(d: var CallbackData, timeoutMs: int): bool = - ## Polling variant for tests where the callback may legitimately - ## never fire — returns true if observed `called` within the budget, - ## false on timeout. Polls in 10 ms increments under `d.lock` so the - ## load is synchronised with the `captureCb` writer. + ## Polls under `d.lock` so the load syncs with the `captureCb` writer. let deadline = Moment.now() + timeoutMs.milliseconds while true: acquire(d.lock) @@ -68,26 +71,26 @@ proc waitCallbackTimeout(d: var CallbackData, timeoutMs: int): bool = return false os.sleep(10) -# --------------------------------------------------------------------------- -# Request helpers -# --------------------------------------------------------------------------- +template withPool(ctxIdent: untyped, body: untyped) = + ## Sets up a pool + ctx, runs body, destroys on exit. + var pool: FFIContextPool[TestEvtLib] + let ctxIdent = pool.createFFIContext().valueOr: + check false + return + defer: + discard pool.destroyFFIContext(ctxIdent) + body registerReqFFI(EmitLatchEvent, lib: ptr TestEvtLib): proc(iter: int): Future[Result[string, string]] {.async.} = dispatchFFIEventCbor("latch", LatchPayload(iter: iter)) return ok("emitted") -## A request whose async body completes immediately — useful for -## probing FFI-thread round-trip latency under load. registerReqFFI(PingEvent, lib: ptr TestEvtLib): proc(): Future[Result[string, string]] {.async.} = return ok("pong") -## A request whose async body blocks the FFI thread synchronously -## (no await) so the heartbeat freezes. Used to exercise the new -## heartbeat-staleness path. Guarded by an Atomic switch so we can -## enable it deterministically per test rather than turning every -## test of this request type into a watchdog probe. +# Atomic switch so the wedge fires deterministically per test. var gBlockingEnabled: Atomic[bool] gBlockingEnabled.store(false) @@ -97,18 +100,12 @@ registerReqFFI(BlockingRequest, lib: ptr TestEvtLib): os.sleep(milliseconds) return ok("done") -# --------------------------------------------------------------------------- -# Thread-id capture -# --------------------------------------------------------------------------- - var gListenerThreadId: Atomic[int] gListenerThreadId.store(-1) proc captureThreadIdCb( retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer ) {.cdecl, gcsafe, raises: [].} = - ## Records the OS thread id of the listener invocation, so the test - ## can assert it differs from the FFI thread's id. gListenerThreadId.store(getThreadId()) let d = cast[ptr CallbackData](userData) acquire(d[].lock) @@ -116,9 +113,6 @@ proc captureThreadIdCb( signal(d[].cond) release(d[].lock) -## Returns the FFI thread's id by running a no-op request and reading -## `getThreadId()` from inside the handler. Used to compare against -## the listener's thread id below. var gFfiThreadId: Atomic[int] gFfiThreadId.store(-1) @@ -129,158 +123,93 @@ registerReqFFI(CaptureFfiTidRequest, lib: ptr TestEvtLib): suite "event delivery is asynchronous": test "listener runs on the event thread, not the FFI thread": - # CallbackData defers come BEFORE the pool-destroy defer so they run - # AFTER it (LIFO): the event thread is joined before any lock the - # event thread might still be holding is torn down — otherwise TSan - # flags `captureCb` accessing an already-destroyed mutex. - var evt: CallbackData - initCallbackData(evt) - defer: - deinitCallbackData(evt) + # CallbackData defers declared first run last (LIFO): pool-destroy joins + # the event thread before any still-held mutex is torn down. TSan otherwise + # flags `captureCb` on a destroyed mutex. + setupCallbackData(evt) + setupCallbackData(rsp) - var rsp: CallbackData - initCallbackData(rsp) - defer: - deinitCallbackData(rsp) + withPool(ctx): + discard addEventListener( + ctx[].eventRegistry, "latch", captureThreadIdCb, addr evt + ) - var pool: FFIContextPool[TestEvtLib] - let ctx = pool.createFFIContext().valueOr: - check false - return - defer: - discard pool.destroyFFIContext(ctx) + check sendRequestToFFIThread( + ctx, CaptureFfiTidRequest.ffiNewReq(captureCb, addr rsp) + ) + .isOk() + waitCallback(rsp) - discard addEventListener( - ctx[].eventRegistry, "latch", captureThreadIdCb, addr evt - ) + resetCalled(rsp) + check sendRequestToFFIThread( + ctx, EmitLatchEvent.ffiNewReq(captureCb, addr rsp, 0) + ) + .isOk() + waitCallback(rsp) + waitCallback(evt) - # Capture the FFI thread id. - check sendRequestToFFIThread( - ctx, CaptureFfiTidRequest.ffiNewReq(captureCb, addr rsp) - ) - .isOk() - waitCallback(rsp) - - # Now emit an event from the FFI thread and wait for the listener. - acquire(rsp.lock) - rsp.called = false - release(rsp.lock) - check sendRequestToFFIThread( - ctx, EmitLatchEvent.ffiNewReq(captureCb, addr rsp, 0) - ) - .isOk() - waitCallback(rsp) - waitCallback(evt) - - let ffiTid = gFfiThreadId.load() - let listenerTid = gListenerThreadId.load() - check ffiTid >= 0 - check listenerTid >= 0 - check ffiTid != listenerTid - -# --------------------------------------------------------------------------- -# Slow listener does not block the FFI thread -# --------------------------------------------------------------------------- + let ffiTid = gFfiThreadId.load() + let listenerTid = gListenerThreadId.load() + check ffiTid >= 0 + check listenerTid >= 0 + check ffiTid != listenerTid proc slowSleepCb( retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer ) {.cdecl, gcsafe, raises: [].} = - ## Sleeps long enough that we'd notice if the FFI thread were waiting - ## on us before accepting the next request. os.sleep(150) suite "FFI thread independence": test "slow listener does not block FFI thread request round-trip": - var rsp: CallbackData - initCallbackData(rsp) - defer: - deinitCallbackData(rsp) + setupCallbackData(rsp) - var pool: FFIContextPool[TestEvtLib] - let ctx = pool.createFFIContext().valueOr: - check false - return - defer: - discard pool.destroyFFIContext(ctx) + withPool(ctx): + discard addEventListener( + ctx[].eventRegistry, "latch", slowSleepCb, nil + ) - discard addEventListener( - ctx[].eventRegistry, "latch", slowSleepCb, nil - ) + check sendRequestToFFIThread( + ctx, EmitLatchEvent.ffiNewReq(captureCb, addr rsp, 0) + ) + .isOk() + waitCallback(rsp) + resetCalled(rsp) - # Fire an event, then immediately fire a ping request. With dispatch - # off the FFI thread, the ping must complete in well under 150 ms - # even though the prior event is still in flight on the event thread. - check sendRequestToFFIThread( - ctx, EmitLatchEvent.ffiNewReq(captureCb, addr rsp, 0) - ) - .isOk() - waitCallback(rsp) + # chronos's `Moment` — std/times exports a `milliseconds` that + # shadows chronos's at this generic-instantiation site. + let started = Moment.now() + check sendRequestToFFIThread(ctx, PingEvent.ffiNewReq(captureCb, addr rsp)) + .isOk() + waitCallback(rsp) + let elapsed = Moment.now() - started - acquire(rsp.lock) - rsp.called = false - release(rsp.lock) - - # Use chronos's Moment for timing so we don't pull std/times into - # scope — std/times exports a `milliseconds` proc that shadows the - # chronos one used inside the FFI thread body, breaking compilation - # of generic instantiations at this call site. - let started = Moment.now() - check sendRequestToFFIThread(ctx, PingEvent.ffiNewReq(captureCb, addr rsp)) - .isOk() - waitCallback(rsp) - let elapsed = Moment.now() - started - - check elapsed < 100.milliseconds # ample margin under the 150 ms slow-listener sleep - -# --------------------------------------------------------------------------- -# Heartbeat staleness fires onNotResponding -# --------------------------------------------------------------------------- + check elapsed < 100.milliseconds # under the 150 ms slow-listener sleep when not defined(gcRefc): - ## Skipped under `--mm:refc`: this test relies on `os.sleep`ing the - ## FFI thread for several seconds inside a synchronous handler. - ## refc plus the existing destroy-on-time policies make that - ## combination flaky in CI; the orc path is the contract we care - ## about here. + ## Skipped under refc: sleeping the FFI thread inside a sync handler + ## interacts badly with refc + existing destroy-on-time policies. suite "FFI heartbeat staleness": test "wedged FFI thread triggers onNotResponding via heartbeat": - # Lock-bearing CallbackData defers are declared FIRST so they run - # LAST (LIFO); pool-destroy runs FIRST and joins the event thread - # before any mutex it might still be holding is destroyed. - var notif: CallbackData - initCallbackData(notif) - defer: - deinitCallbackData(notif) - - var rsp: CallbackData - initCallbackData(rsp) - defer: - deinitCallbackData(rsp) + setupCallbackData(notif) + setupCallbackData(rsp) var pool: FFIContextPool[TestEvtLib] let ctx = pool.createFFIContext().valueOr: check false return defer: - # Disable the wedge before tearing down so destroy isn't blocked - # by the still-sleeping handler. + # Disable wedge first so destroy isn't blocked by the still-sleeping handler. gBlockingEnabled.store(false) discard pool.destroyFFIContext(ctx) - # Subscribe to the exact event name `onNotResponding` looks up so - # the captured signal can't be ambiguously satisfied by an unrelated - # event the test happens to dispatch. discard addEventListener( ctx[].eventRegistry, NotRespondingEventName, captureCb, addr notif ) - # Wait out the start-delay grace window first so the heartbeat - # check is actually armed. + # Wait out the start-delay so the heartbeat check is armed. os.sleep(FFIHeartbeatStartDelay.milliseconds.int + 200) - # Wedge the FFI thread for longer than EventThreadTickInterval + - # FFIHeartbeatStaleThreshold so the event thread observes a - # frozen heartbeat across at least one tick boundary. + # Wedge long enough to cross at least one tick boundary. gBlockingEnabled.store(true) let wedgeMs = (EventThreadTickInterval + FFIHeartbeatStaleThreshold).milliseconds.int + @@ -292,16 +221,8 @@ when not defined(gcRefc): waitCallback(rsp) gBlockingEnabled.store(false) - # The not_responding event should have been delivered while the - # FFI thread was wedged. captureCb writes `called` under - # `notif.lock`, so wait through the cond rather than reading it - # raw — avoids both the data race and the just-missed-it window. check waitCallbackTimeout(notif, 1500) -# --------------------------------------------------------------------------- -# Queue overflow sets the stuck flag and rejects further requests -# --------------------------------------------------------------------------- - type BackpressureState = object enteredLock: Lock enteredCond: Cond @@ -325,10 +246,8 @@ proc deinitBackpressure(b: var BackpressureState) = proc backpressureCb( retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer ) {.cdecl, gcsafe, raises: [].} = - ## First invocation signals entered and then blocks until released — - ## holds the event thread inside `withLock reg.lock`, which back-pressures - ## subsequent dispatches and gives us a deterministic way to fill the - ## queue. + ## First call signals entered then blocks under reg.lock to back-pressure + ## subsequent dispatches — gives a deterministic way to fill the queue. let b = cast[ptr BackpressureState](userData) if not b[].entered.exchange(true): acquire(b[].enteredLock) @@ -340,9 +259,6 @@ proc backpressureCb( wait(b[].releaseCond, b[].releaseLock) release(b[].releaseLock) -## A request that does N back-to-back dispatches from the FFI thread. -## Used to push enough events at once that the queue saturates while -## the event thread is stuck inside the backpressure listener above. registerReqFFI(BurstEmit, lib: ptr TestEvtLib): proc(count: int): Future[Result[string, string]] {.async.} = for i in 0 ..< count: @@ -351,91 +267,56 @@ registerReqFFI(BurstEmit, lib: ptr TestEvtLib): suite "queue overflow": test "overflow sets stuck flag, fires onNotResponding, rejects new requests": - # Lock-bearing state defers come FIRST so they run LAST (LIFO); - # pool destroy joins the event thread before any mutex still - # referenced from a listener is torn down. var bp: BackpressureState initBackpressure(bp) defer: deinitBackpressure(bp) - var notif: CallbackData - initCallbackData(notif) - defer: - deinitCallbackData(notif) + setupCallbackData(notif) + setupCallbackData(rsp) + setupCallbackData(rejected) - var rsp: CallbackData - initCallbackData(rsp) - defer: - deinitCallbackData(rsp) + withPool(ctx): + discard addEventListener( + ctx[].eventRegistry, "latch", backpressureCb, addr bp + ) + discard addEventListener( + ctx[].eventRegistry, NotRespondingEventName, captureCb, addr notif + ) - var rejected: CallbackData - initCallbackData(rejected) - defer: - deinitCallbackData(rejected) + # Kick one event so the listener holds reg.lock; subsequent enqueues + # pile up undrained. + check sendRequestToFFIThread( + ctx, EmitLatchEvent.ffiNewReq(captureCb, addr rsp, -1) + ) + .isOk() + waitCallback(rsp) - var pool: FFIContextPool[TestEvtLib] - let ctx = pool.createFFIContext().valueOr: - check false - return - defer: - discard pool.destroyFFIContext(ctx) + acquire(bp.enteredLock) + while not bp.entered.load(): + wait(bp.enteredCond, bp.enteredLock) + release(bp.enteredLock) - discard addEventListener( - ctx[].eventRegistry, "latch", backpressureCb, addr bp - ) - # Subscribe to the exact not_responding event name so the wait below - # can't be falsely satisfied by a "latch" payload from the burst. - discard addEventListener( - ctx[].eventRegistry, NotRespondingEventName, captureCb, addr notif - ) + # Burst > capacity in one request; tail enqueues flip the stuck flag. + resetCalled(rsp) + check sendRequestToFFIThread( + ctx, BurstEmit.ffiNewReq(captureCb, addr rsp, EventQueueCapacity + 8) + ) + .isOk() + waitCallback(rsp) - # Kick off one event so the event thread enters backpressureCb and - # holds reg.lock. Once that's confirmed, any subsequent enqueues - # pile up in the queue without being drained. - check sendRequestToFFIThread( - ctx, EmitLatchEvent.ffiNewReq(captureCb, addr rsp, -1) - ) - .isOk() - waitCallback(rsp) + check ctx.eventQueueStuck.load() - acquire(bp.enteredLock) - while not bp.entered.load(): - wait(bp.enteredCond, bp.enteredLock) - release(bp.enteredLock) + let res = + sendRequestToFFIThread(ctx, PingEvent.ffiNewReq(captureCb, addr rejected)) + check res.isErr() + check res.error.contains("stuck") - # Now flood the queue. EventQueueCapacity+8 dispatches in one - # FFI-thread request, atomically from the queue's perspective: the - # event thread can't drain anything because it's stuck inside the - # first callback. The last several enqueues must hit the - # queue-full path and flip the stuck flag. - acquire(rsp.lock) - rsp.called = false - release(rsp.lock) - check sendRequestToFFIThread( - ctx, BurstEmit.ffiNewReq(captureCb, addr rsp, EventQueueCapacity + 8) - ) - .isOk() - waitCallback(rsp) + # Release backpressure so drain advances and the stuck flag fires + # not_responding. + acquire(bp.releaseLock) + bp.release.store(true) + signal(bp.releaseCond) + release(bp.releaseLock) - # The stuck flag is set as soon as the first overflow happens; - # subsequent sendRequestToFFIThread calls must short-circuit. - check ctx.eventQueueStuck.load() - - let res = - sendRequestToFFIThread(ctx, PingEvent.ffiNewReq(captureCb, addr rejected)) - check res.isErr() - check res.error.contains("stuck") - - # Release backpressure so the event thread can drain, advance past - # the backpressure listener, observe the stuck flag, and fire the - # not_responding notification. - acquire(bp.releaseLock) - bp.release.store(true) - signal(bp.releaseCond) - release(bp.releaseLock) - - # The whole point of this test is that overflow surfaces the - # not_responding signal — assert it actually fires within a - # bounded window rather than letting the test silently pass. - check waitCallbackTimeout(notif, 2000) + check waitCallbackTimeout(notif, 2000)