From 1c2ea72336397e3433341536d15415ae40c6344c Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Thu, 4 Jun 2026 00:21:23 +0200 Subject: [PATCH] feat(ffi): dispatch events on the event thread, not the FFI thread MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #71 stood up the event thread + bounded queue but left dispatch inline, so a blocking consumer callback stalled request processing on the FFI thread. Wire the producer side onto the existing queue: the FFI thread now enqueues (copyToShared/dupSharedCString/enqueueFFIEvent) and returns immediately; the event thread drains and invokes callbacks under reg.lock. This keeps the event thread as watchdog *and* dispatcher. Because callbacks no longer run on the FFI thread, a callback may issue a synchronous sendRequestToFFIThread without tripping the reentrancy guard (the FFI thread is separate) — a callback still must not add/remove listeners on its own registry. A wedged callback fills the queue and latches eventQueueStuck, which sendRequestToFFIThread surfaces as backpressure. Reworked the lock-during-invocation test to drive through the real event thread, and added a test for the now-safe sync-request-from-callback path. Co-Authored-By: Claude Opus 4.8 --- ffi/ffi_events.nim | 124 +++++++++++++++++++--------- tests/unit/test_event_dispatch.nim | 125 ++++++++++++++++++++++------- 2 files changed, 178 insertions(+), 71 deletions(-) diff --git a/ffi/ffi_events.nim b/ffi/ffi_events.nim index 96cc0cd..9e86d7d 100644 --- a/ffi/ffi_events.nim +++ b/ffi/ffi_events.nim @@ -232,56 +232,100 @@ var ffiCurrentNotifyEventEnqueued* {.threadvar.}: proc() {.gcsafe, raises: [].} ## Hook (not a queue field) so this module doesn't depend on chronos's ## ThreadSignalPtr. Nil-safe. -template withFFIEventDispatch(eventName: string, listeners, body: untyped) = - ## Resolves the thread-local registry, snapshots listeners under - ## `reg.lock`, then runs `body` inside `foreignThreadGc` + try/except. - let regPtr = ffiCurrentEventRegistry - if regPtr.isNil(): - chronicles.error eventName & " - event registry not set on this thread" +proc copyToShared( + src: pointer, n: int +): tuple[data: ptr UncheckedArray[byte], len: int] {.raises: [].} = + ## Raw `c_malloc` copy of `n` bytes from `src` — the `string` / `seq[byte]` + ## dispatch path. Mirrors `cborEncodeShared`'s ownership contract: the + ## returned buffer is the caller's to hand to the queue. Empty -> (nil, 0). + if n <= 0 or src.isNil: + return (nil, 0) + let buf = cast[ptr UncheckedArray[byte]](c_malloc(csize_t(n))) + copyMem(buf, src, n) + (buf, n) + +proc dupSharedCString(name: string): cstring {.raises: [].} = + ## NUL-terminated `c_malloc` copy of `name`. The queue takes ownership; + ## `freeQueuedEventPayload` frees it once dispatch on the event thread + ## returns. + let n = name.len + let buf = cast[cstring](c_malloc(csize_t(n + 1))) + if n > 0: + copyMem(buf, unsafeAddr name[0], n) + cast[ptr char](cast[uint](buf) + uint(n))[] = '\0' + buf + +proc enqueueFFIEvent*( + name: string, data: ptr UncheckedArray[byte], dataLen: int +) {.gcsafe, raises: [].} = + ## Producer side of the event path. Copies `name` into a `c_malloc` + ## cstring, enqueues `(name, data, dataLen)` onto the thread-local event + ## queue, and wakes the event thread so it drains and invokes the + ## listener callbacks. The FFI thread therefore never runs a listener + ## callback itself — a blocked consumer can't stall request processing. + ## + ## `data` must be a `c_malloc` buffer the queue can adopt (or nil for an + ## empty payload). On any path that does not enqueue, this proc frees the + ## buffers it was handed. A full queue latches `eventQueueStuck`; recovery + ## is destroy+recreate. + let qPtr = ffiCurrentEventQueue + if qPtr.isNil(): + chronicles.error "event queue not set on this thread", event = name + if not data.isNil: + c_free(data) return - withLock regPtr[].lock: - let listeners = regPtr[].byEvent.getOrDefault(eventName) - if listeners.len == 0: - chronicles.debug eventName & " - no listener registered" - else: - foreignThreadGc: - try: - body - except Exception, CatchableError: - notifyListenersErr( - listeners, - "Exception dispatching " & eventName & ": " & getCurrentExceptionMsg(), - ) + let cname = dupSharedCString(name) + if not qPtr[].tryEnqueueEvent(cname, data, dataLen): + chronicles.error "event queue full; dropping event, marking stuck", event = name + c_free(cast[pointer](cname)) + if not data.isNil: + c_free(data) + if not ffiCurrentEventQueueStuck.isNil(): + ffiCurrentEventQueueStuck[].store(true) + return + + if not ffiCurrentNotifyEventEnqueued.isNil(): + ffiCurrentNotifyEventEnqueued() template dispatchFFIEvent*(eventName: string, body: untyped) = - ## Dispatches an FFI event to every listener subscribed to `eventName`. + ## Emits an FFI event to every listener subscribed to `eventName`. ## `body` must yield a `string` or `seq[byte]`. ## - ## Valid only on the FFI thread (where `ffiCurrentEventRegistry` is set). - ## Holds `reg.lock` for the entire snapshot + invocation so a concurrent - ## `removeEventListener` from a foreign thread blocks until dispatch - ## returns. Handlers must not call addEventListener / removeEventListener - ## on the same registry (would self-deadlock). - withFFIEventDispatch(eventName, listeners): - let event = body - let dataPtr: pointer = - if event.len > 0: unsafeAddr event[0] - else: nil - notifyListenersOk(listeners, dataPtr, event.len) + ## Copies the payload, enqueues it on the thread-local event queue, and + ## returns immediately; the event thread drains the queue and invokes the + ## callbacks (holding `reg.lock` across snapshot + invocation, so a + ## concurrent `removeEventListener` blocks until dispatch returns). Valid + ## only where `ffiCurrentEventQueue` is set (the FFI thread). A callback + ## must not call addEventListener / removeEventListener on its own + ## registry (would self-deadlock against the held `reg.lock`). + block: + try: + let event = body + let dataPtr = + if event.len > 0: unsafeAddr event[0] + else: nil + let (data, dataLen) = copyToShared(dataPtr, event.len) + enqueueFFIEvent(eventName, data, dataLen) + except CatchableError, Exception: + chronicles.error eventName & " - failed to enqueue event", + err = getCurrentExceptionMsg() template dispatchFFIEventCbor*(eventName: string, eventPayload: typed) = ## Typed CBOR variant of `dispatchFFIEvent`. Wraps `eventPayload` in an - ## `EventEnvelope`, CBOR-encodes it into a `c_malloc` buffer once, and - ## fans the same buffer out to every registered listener. + ## `EventEnvelope`, CBOR-encodes it into a `c_malloc` buffer, and enqueues + ## it for the event thread to deliver. Same threading / locking contract + ## as `dispatchFFIEvent`. ## ## NB: parameter is `eventPayload`, not `payload` — Nim's template ## substitution would otherwise also rewrite the `payload:` field inside ## `EventEnvelope`. - withFFIEventDispatch(eventName, listeners): - var (data, dataLen) = cborEncodeShared( - EventEnvelope[typeof(eventPayload)](eventType: eventName, payload: eventPayload) - ) - defer: - cborFreeShared(data) - notifyListenersOk(listeners, data, dataLen) + block: + try: + let (data, dataLen) = cborEncodeShared( + EventEnvelope[typeof(eventPayload)](eventType: eventName, payload: eventPayload) + ) + enqueueFFIEvent(eventName, data, dataLen) + except CatchableError, Exception: + chronicles.error eventName & " - failed to encode/enqueue event", + err = getCurrentExceptionMsg() diff --git a/tests/unit/test_event_dispatch.nim b/tests/unit/test_event_dispatch.nim index dfc2cac..3aa8e04 100644 --- a/tests/unit/test_event_dispatch.nim +++ b/tests/unit/test_event_dispatch.nim @@ -246,9 +246,11 @@ when not defined(gcRefc): ## A foreign-thread mutation must not be able to invalidate the ## listener's `userData` while an in-flight dispatch is mid-invocation. -## The dispatch templates hold `reg.lock` for the entire snapshot + -## invocation, so foreign `removeEventListener` blocks until dispatch -## returns. +## Dispatch now runs on the event thread, where `dispatchToListeners` +## holds `reg.lock` across the snapshot + callback invocation, so a +## foreign `removeEventListener` blocks until dispatch returns. Driven +## end-to-end through the pool so the callback runs on the real event +## thread (the only place dispatch now happens). type SlowState = object entered: Atomic[bool] @@ -257,53 +259,114 @@ type SlowState = object proc slowEventCb( retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer ) {.cdecl, gcsafe, raises: [].} = - ## Signal entry, sleep briefly (the window during which the main - ## thread must call removeEventListener and block), signal exit. + ## Signal entry, sleep (the window during which the main thread must + ## call removeEventListener and block), signal exit. let st = cast[ptr SlowState](userData) st[].entered.store(true) - os.sleep(15) + os.sleep(100) st[].exited.store(true) -type DispatcherArgs = tuple - reg: ptr FFIEventRegistry - done: ptr Atomic[bool] - -proc dispatcherBody(args: DispatcherArgs) {.thread.} = - ffiCurrentEventRegistry = args.reg - dispatchFFIEvent("evt"): - "payload" - args.done[].store(true) - suite "registry lock held during invocation": test "removeEventListener blocks until in-flight dispatch finishes": - var reg: FFIEventRegistry - initEventRegistry(reg) + var pool: FFIContextPool[TestEvtLib] + let ctx = pool.createFFIContext().valueOr: + check false + return defer: - deinitEventRegistry(reg) + discard pool.destroyFFIContext(ctx) var st: SlowState st.entered.store(false) st.exited.store(false) - let id = addEventListener(reg, "evt", slowEventCb, addr st) + let id = + addEventListener(ctx[].eventRegistry, "message_sent", slowEventCb, addr st) check id != 0'u64 - var done: Atomic[bool] - done.store(false) - var thr: Thread[DispatcherArgs] - createThread(thr, dispatcherBody, (addr reg, addr done)) + # Emit from the FFI thread; the event thread invokes slowEventCb while + # holding reg.lock. + var rsp: CallbackData + initCallbackData(rsp) + defer: + deinitCallbackData(rsp) + check sendRequestToFFIThread( + ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp) + ) + .isOk() - # Wait until the worker thread is inside slowEventCb. - for _ in 0 ..< 200: + # Wait until the event thread is inside slowEventCb (mid-sleep). + for _ in 0 ..< 500: if st.entered.load(): break os.sleep(1) check st.entered.load() check not st.exited.load() - # Lock-during-invocation contract: remove blocks until dispatch - # finishes; by the time it returns, slowEventCb has set exited=true. - check removeEventListener(reg, id) + # remove blocks on reg.lock until dispatch finishes; by the time it + # returns, slowEventCb has set exited=true. + check removeEventListener(ctx[].eventRegistry, id) check st.exited.load() - joinThread(thr) - check done.load() + + +## Because dispatch now runs on the event thread (not the FFI thread), an +## event callback may itself issue a *synchronous* request without +## tripping the FFI-thread reentrancy guard or self-deadlocking — the FFI +## thread is a separate thread that services the nested request. + +registerReqFFI(SyncPingRequest, lib: ptr TestEvtLib): + proc(): Future[Result[string, string]] {.async.} = + return ok("pong") + +proc noopCb( + retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer +) {.cdecl, gcsafe, raises: [].} = + discard + +type SyncReqState = object + ctx: ptr FFIContext[TestEvtLib] + sendOk: Atomic[bool] + done: Atomic[bool] + +proc syncRequestCb( + retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer +) {.cdecl, gcsafe, raises: [].} = + ## Runs on the event thread; issues a synchronous request back to the + ## (separate) FFI thread and records whether it was accepted. + let st = cast[ptr SyncReqState](userData) + let res = + sendRequestToFFIThread(st[].ctx, SyncPingRequest.ffiNewReq(noopCb, nil)) + st[].sendOk.store(res.isOk()) + st[].done.store(true) + +suite "event callback may issue a synchronous request": + test "sync sendRequestToFFIThread from an event callback succeeds": + var pool: FFIContextPool[TestEvtLib] + let ctx = pool.createFFIContext().valueOr: + check false + return + defer: + discard pool.destroyFFIContext(ctx) + + var st: SyncReqState + st.ctx = ctx + st.sendOk.store(false) + st.done.store(false) + + discard + addEventListener(ctx[].eventRegistry, "message_sent", syncRequestCb, addr st) + + var rsp: CallbackData + initCallbackData(rsp) + defer: + deinitCallbackData(rsp) + check sendRequestToFFIThread( + ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp) + ) + .isOk() + + for _ in 0 ..< 500: + if st.done.load(): + break + os.sleep(1) + check st.done.load() + check st.sendOk.load()