mirror of
https://github.com/logos-messaging/nim-ffi.git
synced 2026-06-21 00:40:16 +00:00
PR #71 stood up the event thread + bounded queue but left dispatch inline, so a blocking consumer callback stalled request processing on the FFI thread. Wire the producer side onto the existing queue: the FFI thread now enqueues (copyToShared/dupSharedCString/enqueueFFIEvent) and returns immediately; the event thread drains and invokes callbacks under reg.lock. This keeps the event thread as watchdog *and* dispatcher. Because callbacks no longer run on the FFI thread, a callback may issue a synchronous sendRequestToFFIThread without tripping the reentrancy guard (the FFI thread is separate) — a callback still must not add/remove listeners on its own registry. A wedged callback fills the queue and latches eventQueueStuck, which sendRequestToFFIThread surfaces as backpressure. Reworked the lock-during-invocation test to drive through the real event thread, and added a test for the now-safe sync-request-from-callback path. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
373 lines
12 KiB
Nim
373 lines
12 KiB
Nim
## Tests for the CBOR-style FFI event dispatch path:
|
|
## - `dispatchFFIEvent` accepts both `string` and `seq[byte]` bodies
|
|
## - `dispatchFFIEventCbor` wraps a typed payload in `EventEnvelope[T]`,
|
|
## CBOR-encodes it, and dispatches via the event callback
|
|
##
|
|
## Tests run end-to-end against a real FFI thread (via FFIContextPool +
|
|
## sendRequestToFFIThread) so we exercise the threadvar-backed
|
|
## ffiCurrentEventRegistry wiring, not just the templates in isolation.
|
|
|
|
import std/[locks, os]
|
|
import unittest2
|
|
import results
|
|
import ffi
|
|
|
|
type TestEvtLib = object
|
|
|
|
## Event payload type (would be `{.ffi.}` in production so the codec gen
|
|
## emits a matching struct on the foreign side; the test only needs CBOR
|
|
## round-trip, which `cborEncode`/`cborDecode` provide via cbor_serial's
|
|
## generic overloads).
|
|
type MessageSentBody* {.ffi.} = object
|
|
requestId*: string
|
|
messageHash*: string
|
|
|
|
## Same callback-state helper as test_ffi_context.nim, duplicated here so
|
|
## this file stays a self-contained test binary.
|
|
type CallbackData = object
|
|
lock: Lock
|
|
cond: Cond
|
|
called: bool
|
|
retCode: cint
|
|
msg: array[1024, byte]
|
|
msgLen: int
|
|
|
|
proc initCallbackData(d: var CallbackData) =
|
|
d.lock.initLock()
|
|
d.cond.initCond()
|
|
|
|
proc deinitCallbackData(d: var CallbackData) =
|
|
d.cond.deinitCond()
|
|
d.lock.deinitLock()
|
|
|
|
proc captureCb(
|
|
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
|
|
) {.cdecl, gcsafe, raises: [].} =
|
|
let d = cast[ptr CallbackData](userData)
|
|
acquire(d[].lock)
|
|
d[].retCode = retCode
|
|
let n = min(int(len), d[].msg.len)
|
|
if n > 0 and not msg.isNil:
|
|
copyMem(addr d[].msg[0], msg, n)
|
|
d[].msgLen = n
|
|
d[].called = true
|
|
signal(d[].cond)
|
|
release(d[].lock)
|
|
|
|
proc waitCallback(d: var CallbackData) =
|
|
acquire(d.lock)
|
|
while not d.called:
|
|
wait(d.cond, d.lock)
|
|
release(d.lock)
|
|
|
|
proc callbackBytes(d: var CallbackData): seq[byte] =
|
|
var bytes = newSeq[byte](d.msgLen)
|
|
if d.msgLen > 0:
|
|
copyMem(addr bytes[0], addr d.msg[0], d.msgLen)
|
|
bytes
|
|
|
|
## A request that dispatches a typed CBOR event from inside the FFI
|
|
## thread and then returns ok — so the response callback can be used to
|
|
## synchronize the test.
|
|
registerReqFFI(EmitCborEventRequest, lib: ptr TestEvtLib):
|
|
proc(): Future[Result[string, string]] {.async.} =
|
|
dispatchFFIEventCbor(
|
|
"message_sent",
|
|
MessageSentBody(requestId: "req-1", messageHash: "0xdeadbeef"),
|
|
)
|
|
return ok("emitted")
|
|
|
|
## A request that uses the lower-level `dispatchFFIEvent` with a raw
|
|
## `seq[byte]` body — the path that previously rejected non-string bodies.
|
|
registerReqFFI(EmitRawBytesEventRequest, lib: ptr TestEvtLib):
|
|
proc(): Future[Result[string, string]] {.async.} =
|
|
dispatchFFIEvent("raw_bytes"):
|
|
@[byte 0x01, 0x02, 0x03]
|
|
return ok("emitted")
|
|
|
|
## Setter-thread worker for the registry race regression test. Each
|
|
## iteration adds then immediately removes a listener for the dispatched
|
|
## event so a TSan-instrumented build can confirm `FFIEventRegistry.lock`
|
|
## serialises the cross-thread mutation against dispatch-time
|
|
## `snapshotListeners` reads from the FFI thread.
|
|
type SetterArgs = tuple
|
|
ctx: ptr FFIContext[TestEvtLib]
|
|
stop: ptr Atomic[bool]
|
|
target: ptr CallbackData
|
|
|
|
proc setterThreadBody(args: SetterArgs) {.thread.} =
|
|
while not args.stop[].load():
|
|
let id = addEventListener(
|
|
args.ctx[].eventRegistry, "message_sent", captureCb, args.target
|
|
)
|
|
discard removeEventListener(args.ctx[].eventRegistry, id)
|
|
|
|
suite "dispatchFFIEventCbor":
|
|
test "delivers EventEnvelope-shaped CBOR payload to event callback":
|
|
var pool: FFIContextPool[TestEvtLib]
|
|
let ctx = pool.createFFIContext().valueOr:
|
|
check false
|
|
return
|
|
defer:
|
|
discard pool.destroyFFIContext(ctx)
|
|
|
|
var evt: CallbackData
|
|
initCallbackData(evt)
|
|
defer:
|
|
deinitCallbackData(evt)
|
|
|
|
# Subscribe to the specific event the request below dispatches.
|
|
discard addEventListener(
|
|
ctx[].eventRegistry, "message_sent", captureCb, addr evt
|
|
)
|
|
|
|
# Trigger the dispatch from the FFI thread; the response callback is
|
|
# ignored (we only care that the request completed so we know the event
|
|
# has fired).
|
|
var rsp: CallbackData
|
|
initCallbackData(rsp)
|
|
defer:
|
|
deinitCallbackData(rsp)
|
|
|
|
check sendRequestToFFIThread(
|
|
ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp)
|
|
)
|
|
.isOk()
|
|
waitCallback(rsp)
|
|
waitCallback(evt)
|
|
|
|
check evt.retCode == RET_OK
|
|
let decoded = cborDecode(callbackBytes(evt), EventEnvelope[MessageSentBody])
|
|
check decoded.isOk()
|
|
check decoded.value.eventType == "message_sent"
|
|
check decoded.value.payload.requestId == "req-1"
|
|
check decoded.value.payload.messageHash == "0xdeadbeef"
|
|
|
|
suite "dispatchFFIEvent with seq[byte]":
|
|
test "accepts a raw seq[byte] body":
|
|
var pool: FFIContextPool[TestEvtLib]
|
|
let ctx = pool.createFFIContext().valueOr:
|
|
check false
|
|
return
|
|
defer:
|
|
discard pool.destroyFFIContext(ctx)
|
|
|
|
var evt: CallbackData
|
|
initCallbackData(evt)
|
|
defer:
|
|
deinitCallbackData(evt)
|
|
|
|
discard addEventListener(
|
|
ctx[].eventRegistry, "raw_bytes", captureCb, addr evt
|
|
)
|
|
|
|
var rsp: CallbackData
|
|
initCallbackData(rsp)
|
|
defer:
|
|
deinitCallbackData(rsp)
|
|
|
|
check sendRequestToFFIThread(
|
|
ctx, EmitRawBytesEventRequest.ffiNewReq(captureCb, addr rsp)
|
|
)
|
|
.isOk()
|
|
waitCallback(rsp)
|
|
waitCallback(evt)
|
|
|
|
check evt.retCode == RET_OK
|
|
check callbackBytes(evt) == @[byte 0x01, 0x02, 0x03]
|
|
|
|
when not defined(gcRefc):
|
|
## Skipped under `--mm:refc`: each setter thread grows / shrinks the
|
|
## per-event listener `seq[FFIEventListener]` via `addEventListener`,
|
|
## and refc's per-thread GC heap ownership makes cross-thread seq
|
|
## buffer reallocation unsafe even when the surrounding lock is held.
|
|
## ORC + the FFI thread + tsan (the combo this test was written for)
|
|
## does not have that limitation.
|
|
suite "FFIEventRegistry concurrent access":
|
|
## Regression for PR #39 review comments r3288220895 / r3289285387.
|
|
## Run under tsan to actually validate the fix:
|
|
## NIM_FFI_SAN=tsan NIM_FFI_MM=orc nimble test_sanitized
|
|
test "concurrent add/remove writers vs dispatch reads stay race-free":
|
|
var pool: FFIContextPool[TestEvtLib]
|
|
let ctx = pool.createFFIContext().valueOr:
|
|
check false
|
|
return
|
|
defer:
|
|
discard pool.destroyFFIContext(ctx)
|
|
|
|
var evt: CallbackData
|
|
initCallbackData(evt)
|
|
defer:
|
|
deinitCallbackData(evt)
|
|
|
|
# Seed an initial callback so the FFI thread's first dispatch has a
|
|
# target. The setter threads will then repeatedly re-install the same
|
|
# (callback, userData) pair — what matters is the cross-thread write
|
|
# racing the FFI thread's read, not which pair "wins".
|
|
discard addEventListener(
|
|
ctx[].eventRegistry, "message_sent", captureCb, addr evt
|
|
)
|
|
|
|
const NumSetterThreads = 4
|
|
const NumDispatchIters = 200
|
|
|
|
var stop: Atomic[bool]
|
|
stop.store(false)
|
|
var setters: array[NumSetterThreads, Thread[SetterArgs]]
|
|
for i in 0 ..< NumSetterThreads:
|
|
createThread(setters[i], setterThreadBody, (ctx, addr stop, addr evt))
|
|
|
|
var rsp: CallbackData
|
|
initCallbackData(rsp)
|
|
defer:
|
|
deinitCallbackData(rsp)
|
|
|
|
for _ in 0 ..< NumDispatchIters:
|
|
# Reset rsp so each iteration's `waitCallback` blocks until the
|
|
# FFI thread fires the response — keeps the loop synchronous.
|
|
acquire(rsp.lock)
|
|
rsp.called = false
|
|
release(rsp.lock)
|
|
|
|
check sendRequestToFFIThread(
|
|
ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp)
|
|
)
|
|
.isOk()
|
|
waitCallback(rsp)
|
|
|
|
stop.store(true)
|
|
for i in 0 ..< NumSetterThreads:
|
|
joinThread(setters[i])
|
|
|
|
# `evt` got hit by every dispatch above; just confirm at least one
|
|
# actually landed so a silently-broken dispatch loop is caught.
|
|
check evt.called
|
|
|
|
|
|
## A foreign-thread mutation must not be able to invalidate the
|
|
## listener's `userData` while an in-flight dispatch is mid-invocation.
|
|
## Dispatch now runs on the event thread, where `dispatchToListeners`
|
|
## holds `reg.lock` across the snapshot + callback invocation, so a
|
|
## foreign `removeEventListener` blocks until dispatch returns. Driven
|
|
## end-to-end through the pool so the callback runs on the real event
|
|
## thread (the only place dispatch now happens).
|
|
|
|
type SlowState = object
|
|
entered: Atomic[bool]
|
|
exited: Atomic[bool]
|
|
|
|
proc slowEventCb(
|
|
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
|
|
) {.cdecl, gcsafe, raises: [].} =
|
|
## Signal entry, sleep (the window during which the main thread must
|
|
## call removeEventListener and block), signal exit.
|
|
let st = cast[ptr SlowState](userData)
|
|
st[].entered.store(true)
|
|
os.sleep(100)
|
|
st[].exited.store(true)
|
|
|
|
suite "registry lock held during invocation":
|
|
test "removeEventListener blocks until in-flight dispatch finishes":
|
|
var pool: FFIContextPool[TestEvtLib]
|
|
let ctx = pool.createFFIContext().valueOr:
|
|
check false
|
|
return
|
|
defer:
|
|
discard pool.destroyFFIContext(ctx)
|
|
|
|
var st: SlowState
|
|
st.entered.store(false)
|
|
st.exited.store(false)
|
|
|
|
let id =
|
|
addEventListener(ctx[].eventRegistry, "message_sent", slowEventCb, addr st)
|
|
check id != 0'u64
|
|
|
|
# Emit from the FFI thread; the event thread invokes slowEventCb while
|
|
# holding reg.lock.
|
|
var rsp: CallbackData
|
|
initCallbackData(rsp)
|
|
defer:
|
|
deinitCallbackData(rsp)
|
|
check sendRequestToFFIThread(
|
|
ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp)
|
|
)
|
|
.isOk()
|
|
|
|
# Wait until the event thread is inside slowEventCb (mid-sleep).
|
|
for _ in 0 ..< 500:
|
|
if st.entered.load():
|
|
break
|
|
os.sleep(1)
|
|
check st.entered.load()
|
|
check not st.exited.load()
|
|
|
|
# remove blocks on reg.lock until dispatch finishes; by the time it
|
|
# returns, slowEventCb has set exited=true.
|
|
check removeEventListener(ctx[].eventRegistry, id)
|
|
check st.exited.load()
|
|
|
|
|
|
## Because dispatch now runs on the event thread (not the FFI thread), an
|
|
## event callback may itself issue a *synchronous* request without
|
|
## tripping the FFI-thread reentrancy guard or self-deadlocking — the FFI
|
|
## thread is a separate thread that services the nested request.
|
|
|
|
registerReqFFI(SyncPingRequest, lib: ptr TestEvtLib):
|
|
proc(): Future[Result[string, string]] {.async.} =
|
|
return ok("pong")
|
|
|
|
proc noopCb(
|
|
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
|
|
) {.cdecl, gcsafe, raises: [].} =
|
|
discard
|
|
|
|
type SyncReqState = object
|
|
ctx: ptr FFIContext[TestEvtLib]
|
|
sendOk: Atomic[bool]
|
|
done: Atomic[bool]
|
|
|
|
proc syncRequestCb(
|
|
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
|
|
) {.cdecl, gcsafe, raises: [].} =
|
|
## Runs on the event thread; issues a synchronous request back to the
|
|
## (separate) FFI thread and records whether it was accepted.
|
|
let st = cast[ptr SyncReqState](userData)
|
|
let res =
|
|
sendRequestToFFIThread(st[].ctx, SyncPingRequest.ffiNewReq(noopCb, nil))
|
|
st[].sendOk.store(res.isOk())
|
|
st[].done.store(true)
|
|
|
|
suite "event callback may issue a synchronous request":
|
|
test "sync sendRequestToFFIThread from an event callback succeeds":
|
|
var pool: FFIContextPool[TestEvtLib]
|
|
let ctx = pool.createFFIContext().valueOr:
|
|
check false
|
|
return
|
|
defer:
|
|
discard pool.destroyFFIContext(ctx)
|
|
|
|
var st: SyncReqState
|
|
st.ctx = ctx
|
|
st.sendOk.store(false)
|
|
st.done.store(false)
|
|
|
|
discard
|
|
addEventListener(ctx[].eventRegistry, "message_sent", syncRequestCb, addr st)
|
|
|
|
var rsp: CallbackData
|
|
initCallbackData(rsp)
|
|
defer:
|
|
deinitCallbackData(rsp)
|
|
check sendRequestToFFIThread(
|
|
ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp)
|
|
)
|
|
.isOk()
|
|
|
|
for _ in 0 ..< 500:
|
|
if st.done.load():
|
|
break
|
|
os.sleep(1)
|
|
check st.done.load()
|
|
check st.sendOk.load()
|