feat(ffi): dispatch events on the event thread, not the FFI thread

PR #71 stood up the event thread + bounded queue but left dispatch
inline, so a blocking consumer callback stalled request processing on
the FFI thread. Wire the producer side onto the existing queue: the FFI
thread now enqueues (copyToShared/dupSharedCString/enqueueFFIEvent) and
returns immediately; the event thread drains and invokes callbacks under
reg.lock. This keeps the event thread as watchdog *and* dispatcher.

Because callbacks no longer run on the FFI thread, a callback may issue a
synchronous sendRequestToFFIThread without tripping the reentrancy guard
(the FFI thread is separate) — a callback still must not add/remove
listeners on its own registry. A wedged callback fills the queue and
latches eventQueueStuck, which sendRequestToFFIThread surfaces as backpressure.

Reworked the lock-during-invocation test to drive through the real event
thread, and added a test for the now-safe sync-request-from-callback path.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Ivan FB 2026-06-04 00:21:23 +02:00
parent 20e29fd454
commit 1c2ea72336
No known key found for this signature in database
GPG Key ID: DF0C67A04C543270
2 changed files with 178 additions and 71 deletions

View File

@ -232,56 +232,100 @@ var ffiCurrentNotifyEventEnqueued* {.threadvar.}: proc() {.gcsafe, raises: [].}
## Hook (not a queue field) so this module doesn't depend on chronos's
## ThreadSignalPtr. Nil-safe.
template withFFIEventDispatch(eventName: string, listeners, body: untyped) =
## Resolves the thread-local registry, snapshots listeners under
## `reg.lock`, then runs `body` inside `foreignThreadGc` + try/except.
let regPtr = ffiCurrentEventRegistry
if regPtr.isNil():
chronicles.error eventName & " - event registry not set on this thread"
proc copyToShared(
src: pointer, n: int
): tuple[data: ptr UncheckedArray[byte], len: int] {.raises: [].} =
## Raw `c_malloc` copy of `n` bytes from `src` — the `string` / `seq[byte]`
## dispatch path. Mirrors `cborEncodeShared`'s ownership contract: the
## returned buffer is the caller's to hand to the queue. Empty -> (nil, 0).
if n <= 0 or src.isNil:
return (nil, 0)
let buf = cast[ptr UncheckedArray[byte]](c_malloc(csize_t(n)))
copyMem(buf, src, n)
(buf, n)
proc dupSharedCString(name: string): cstring {.raises: [].} =
## NUL-terminated `c_malloc` copy of `name`. The queue takes ownership;
## `freeQueuedEventPayload` frees it once dispatch on the event thread
## returns.
let n = name.len
let buf = cast[cstring](c_malloc(csize_t(n + 1)))
if n > 0:
copyMem(buf, unsafeAddr name[0], n)
cast[ptr char](cast[uint](buf) + uint(n))[] = '\0'
buf
proc enqueueFFIEvent*(
name: string, data: ptr UncheckedArray[byte], dataLen: int
) {.gcsafe, raises: [].} =
## Producer side of the event path. Copies `name` into a `c_malloc`
## cstring, enqueues `(name, data, dataLen)` onto the thread-local event
## queue, and wakes the event thread so it drains and invokes the
## listener callbacks. The FFI thread therefore never runs a listener
## callback itself — a blocked consumer can't stall request processing.
##
## `data` must be a `c_malloc` buffer the queue can adopt (or nil for an
## empty payload). On any path that does not enqueue, this proc frees the
## buffers it was handed. A full queue latches `eventQueueStuck`; recovery
## is destroy+recreate.
let qPtr = ffiCurrentEventQueue
if qPtr.isNil():
chronicles.error "event queue not set on this thread", event = name
if not data.isNil:
c_free(data)
return
withLock regPtr[].lock:
let listeners = regPtr[].byEvent.getOrDefault(eventName)
if listeners.len == 0:
chronicles.debug eventName & " - no listener registered"
else:
foreignThreadGc:
try:
body
except Exception, CatchableError:
notifyListenersErr(
listeners,
"Exception dispatching " & eventName & ": " & getCurrentExceptionMsg(),
)
let cname = dupSharedCString(name)
if not qPtr[].tryEnqueueEvent(cname, data, dataLen):
chronicles.error "event queue full; dropping event, marking stuck", event = name
c_free(cast[pointer](cname))
if not data.isNil:
c_free(data)
if not ffiCurrentEventQueueStuck.isNil():
ffiCurrentEventQueueStuck[].store(true)
return
if not ffiCurrentNotifyEventEnqueued.isNil():
ffiCurrentNotifyEventEnqueued()
template dispatchFFIEvent*(eventName: string, body: untyped) =
## Dispatches an FFI event to every listener subscribed to `eventName`.
## Emits an FFI event to every listener subscribed to `eventName`.
## `body` must yield a `string` or `seq[byte]`.
##
## Valid only on the FFI thread (where `ffiCurrentEventRegistry` is set).
## Holds `reg.lock` for the entire snapshot + invocation so a concurrent
## `removeEventListener` from a foreign thread blocks until dispatch
## returns. Handlers must not call addEventListener / removeEventListener
## on the same registry (would self-deadlock).
withFFIEventDispatch(eventName, listeners):
let event = body
let dataPtr: pointer =
if event.len > 0: unsafeAddr event[0]
else: nil
notifyListenersOk(listeners, dataPtr, event.len)
## Copies the payload, enqueues it on the thread-local event queue, and
## returns immediately; the event thread drains the queue and invokes the
## callbacks (holding `reg.lock` across snapshot + invocation, so a
## concurrent `removeEventListener` blocks until dispatch returns). Valid
## only where `ffiCurrentEventQueue` is set (the FFI thread). A callback
## must not call addEventListener / removeEventListener on its own
## registry (would self-deadlock against the held `reg.lock`).
block:
try:
let event = body
let dataPtr =
if event.len > 0: unsafeAddr event[0]
else: nil
let (data, dataLen) = copyToShared(dataPtr, event.len)
enqueueFFIEvent(eventName, data, dataLen)
except CatchableError, Exception:
chronicles.error eventName & " - failed to enqueue event",
err = getCurrentExceptionMsg()
template dispatchFFIEventCbor*(eventName: string, eventPayload: typed) =
## Typed CBOR variant of `dispatchFFIEvent`. Wraps `eventPayload` in an
## `EventEnvelope`, CBOR-encodes it into a `c_malloc` buffer once, and
## fans the same buffer out to every registered listener.
## `EventEnvelope`, CBOR-encodes it into a `c_malloc` buffer, and enqueues
## it for the event thread to deliver. Same threading / locking contract
## as `dispatchFFIEvent`.
##
## NB: parameter is `eventPayload`, not `payload` — Nim's template
## substitution would otherwise also rewrite the `payload:` field inside
## `EventEnvelope`.
withFFIEventDispatch(eventName, listeners):
var (data, dataLen) = cborEncodeShared(
EventEnvelope[typeof(eventPayload)](eventType: eventName, payload: eventPayload)
)
defer:
cborFreeShared(data)
notifyListenersOk(listeners, data, dataLen)
block:
try:
let (data, dataLen) = cborEncodeShared(
EventEnvelope[typeof(eventPayload)](eventType: eventName, payload: eventPayload)
)
enqueueFFIEvent(eventName, data, dataLen)
except CatchableError, Exception:
chronicles.error eventName & " - failed to encode/enqueue event",
err = getCurrentExceptionMsg()

View File

@ -246,9 +246,11 @@ when not defined(gcRefc):
## A foreign-thread mutation must not be able to invalidate the
## listener's `userData` while an in-flight dispatch is mid-invocation.
## The dispatch templates hold `reg.lock` for the entire snapshot +
## invocation, so foreign `removeEventListener` blocks until dispatch
## returns.
## Dispatch now runs on the event thread, where `dispatchToListeners`
## holds `reg.lock` across the snapshot + callback invocation, so a
## foreign `removeEventListener` blocks until dispatch returns. Driven
## end-to-end through the pool so the callback runs on the real event
## thread (the only place dispatch now happens).
type SlowState = object
entered: Atomic[bool]
@ -257,53 +259,114 @@ type SlowState = object
proc slowEventCb(
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
## Signal entry, sleep briefly (the window during which the main
## thread must call removeEventListener and block), signal exit.
## Signal entry, sleep (the window during which the main thread must
## call removeEventListener and block), signal exit.
let st = cast[ptr SlowState](userData)
st[].entered.store(true)
os.sleep(15)
os.sleep(100)
st[].exited.store(true)
type DispatcherArgs = tuple
reg: ptr FFIEventRegistry
done: ptr Atomic[bool]
proc dispatcherBody(args: DispatcherArgs) {.thread.} =
ffiCurrentEventRegistry = args.reg
dispatchFFIEvent("evt"):
"payload"
args.done[].store(true)
suite "registry lock held during invocation":
test "removeEventListener blocks until in-flight dispatch finishes":
var reg: FFIEventRegistry
initEventRegistry(reg)
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
deinitEventRegistry(reg)
discard pool.destroyFFIContext(ctx)
var st: SlowState
st.entered.store(false)
st.exited.store(false)
let id = addEventListener(reg, "evt", slowEventCb, addr st)
let id =
addEventListener(ctx[].eventRegistry, "message_sent", slowEventCb, addr st)
check id != 0'u64
var done: Atomic[bool]
done.store(false)
var thr: Thread[DispatcherArgs]
createThread(thr, dispatcherBody, (addr reg, addr done))
# Emit from the FFI thread; the event thread invokes slowEventCb while
# holding reg.lock.
var rsp: CallbackData
initCallbackData(rsp)
defer:
deinitCallbackData(rsp)
check sendRequestToFFIThread(
ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp)
)
.isOk()
# Wait until the worker thread is inside slowEventCb.
for _ in 0 ..< 200:
# Wait until the event thread is inside slowEventCb (mid-sleep).
for _ in 0 ..< 500:
if st.entered.load():
break
os.sleep(1)
check st.entered.load()
check not st.exited.load()
# Lock-during-invocation contract: remove blocks until dispatch
# finishes; by the time it returns, slowEventCb has set exited=true.
check removeEventListener(reg, id)
# remove blocks on reg.lock until dispatch finishes; by the time it
# returns, slowEventCb has set exited=true.
check removeEventListener(ctx[].eventRegistry, id)
check st.exited.load()
joinThread(thr)
check done.load()
## Because dispatch now runs on the event thread (not the FFI thread), an
## event callback may itself issue a *synchronous* request without
## tripping the FFI-thread reentrancy guard or self-deadlocking — the FFI
## thread is a separate thread that services the nested request.
registerReqFFI(SyncPingRequest, lib: ptr TestEvtLib):
proc(): Future[Result[string, string]] {.async.} =
return ok("pong")
proc noopCb(
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
discard
type SyncReqState = object
ctx: ptr FFIContext[TestEvtLib]
sendOk: Atomic[bool]
done: Atomic[bool]
proc syncRequestCb(
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
## Runs on the event thread; issues a synchronous request back to the
## (separate) FFI thread and records whether it was accepted.
let st = cast[ptr SyncReqState](userData)
let res =
sendRequestToFFIThread(st[].ctx, SyncPingRequest.ffiNewReq(noopCb, nil))
st[].sendOk.store(res.isOk())
st[].done.store(true)
suite "event callback may issue a synchronous request":
test "sync sendRequestToFFIThread from an event callback succeeds":
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctx)
var st: SyncReqState
st.ctx = ctx
st.sendOk.store(false)
st.done.store(false)
discard
addEventListener(ctx[].eventRegistry, "message_sent", syncRequestCb, addr st)
var rsp: CallbackData
initCallbackData(rsp)
defer:
deinitCallbackData(rsp)
check sendRequestToFFIThread(
ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp)
)
.isOk()
for _ in 0 ..< 500:
if st.done.load():
break
os.sleep(1)
check st.done.load()
check st.sendOk.load()