mirror of
https://github.com/logos-messaging/nim-ffi.git
synced 2026-06-20 16:29:31 +00:00
294 lines
9.3 KiB
Nim
294 lines
9.3 KiB
Nim
## End-to-end tests for `dispatchFFIEvent` / `dispatchFFIEventCbor`,
|
|
## driven through a real `FFIContext` so the threadvar wiring is exercised.
|
|
|
|
import std/[locks, os]
|
|
import unittest2
|
|
import results
|
|
import ffi
|
|
|
|
type TestEvtLib = object
|
|
|
|
type MessageSentBody* {.ffi.} = object
|
|
requestId*: string
|
|
messageHash*: string
|
|
|
|
type CallbackData = object
|
|
lock: Lock
|
|
cond: Cond
|
|
called: bool
|
|
retCode: cint
|
|
msg: array[1024, byte]
|
|
msgLen: int
|
|
|
|
proc initCallbackData(d: var CallbackData) =
|
|
d.lock.initLock()
|
|
d.cond.initCond()
|
|
|
|
proc deinitCallbackData(d: var CallbackData) =
|
|
d.cond.deinitCond()
|
|
d.lock.deinitLock()
|
|
|
|
template setupCallbackData(name: untyped) =
|
|
## Declares `name`, inits it, and defers its deinit in the caller's scope.
|
|
var name: CallbackData
|
|
initCallbackData(name)
|
|
defer:
|
|
deinitCallbackData(name)
|
|
|
|
proc captureCb(
|
|
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
|
|
) {.cdecl, gcsafe, raises: [].} =
|
|
let d = cast[ptr CallbackData](userData)
|
|
acquire(d[].lock)
|
|
d[].retCode = retCode
|
|
let n = min(int(len), d[].msg.len)
|
|
if n > 0 and not msg.isNil:
|
|
copyMem(addr d[].msg[0], msg, n)
|
|
d[].msgLen = n
|
|
d[].called = true
|
|
signal(d[].cond)
|
|
release(d[].lock)
|
|
|
|
proc waitCallback(d: var CallbackData) =
|
|
acquire(d.lock)
|
|
while not d.called:
|
|
wait(d.cond, d.lock)
|
|
release(d.lock)
|
|
|
|
proc resetCalled(d: var CallbackData) =
|
|
acquire(d.lock)
|
|
d.called = false
|
|
release(d.lock)
|
|
|
|
proc callbackBytes(d: var CallbackData): seq[byte] =
|
|
var bytes = newSeq[byte](d.msgLen)
|
|
if d.msgLen > 0:
|
|
copyMem(addr bytes[0], addr d.msg[0], d.msgLen)
|
|
bytes
|
|
|
|
template withPool(ctxIdent: untyped, body: untyped) =
|
|
## Sets up pool + ctx, runs body, destroys on exit.
|
|
var pool: FFIContextPool[TestEvtLib]
|
|
let ctxIdent = pool.createFFIContext().valueOr:
|
|
check false
|
|
return
|
|
defer:
|
|
discard pool.destroyFFIContext(ctxIdent)
|
|
body
|
|
|
|
registerReqFFI(EmitCborEventRequest, lib: ptr TestEvtLib):
|
|
proc(): Future[Result[string, string]] {.async.} =
|
|
dispatchFFIEventCbor(
|
|
"message_sent", MessageSentBody(requestId: "req-1", messageHash: "0xdeadbeef")
|
|
)
|
|
return ok("emitted")
|
|
|
|
registerReqFFI(EmitRawBytesEventRequest, lib: ptr TestEvtLib):
|
|
proc(): Future[Result[string, string]] {.async.} =
|
|
dispatchFFIEvent("raw_bytes"):
|
|
@[byte 0x01, 0x02, 0x03]
|
|
return ok("emitted")
|
|
|
|
# Add/remove worker for the registry-race regression test.
|
|
type SetterArgs =
|
|
tuple[
|
|
ctx: ptr FFIContext[TestEvtLib], stop: ptr Atomic[bool], target: ptr CallbackData
|
|
]
|
|
|
|
proc setterThreadBody(args: SetterArgs) {.thread.} =
|
|
while not args.stop[].load():
|
|
let id =
|
|
addEventListener(args.ctx[].eventRegistry, "message_sent", captureCb, args.target)
|
|
discard removeEventListener(args.ctx[].eventRegistry, id)
|
|
|
|
suite "dispatchFFIEventCbor":
|
|
test "delivers EventEnvelope-shaped CBOR payload to event callback":
|
|
# CallbackData defers declared first run last (LIFO), AFTER pool destroy
|
|
# joins the event thread — otherwise TSan flags captureCb on a destroyed mutex.
|
|
setupCallbackData(evt)
|
|
setupCallbackData(rsp)
|
|
|
|
withPool(ctx):
|
|
discard addEventListener(ctx[].eventRegistry, "message_sent", captureCb, addr evt)
|
|
|
|
check sendRequestToFFIThread(
|
|
ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp)
|
|
)
|
|
.isOk()
|
|
waitCallback(rsp)
|
|
waitCallback(evt)
|
|
|
|
check evt.retCode == RET_OK
|
|
let decoded = cborDecode(callbackBytes(evt), EventEnvelope[MessageSentBody])
|
|
check decoded.isOk()
|
|
check decoded.value.eventType == "message_sent"
|
|
check decoded.value.payload.requestId == "req-1"
|
|
check decoded.value.payload.messageHash == "0xdeadbeef"
|
|
|
|
suite "dispatchFFIEvent with seq[byte]":
|
|
test "accepts a raw seq[byte] body":
|
|
setupCallbackData(evt)
|
|
setupCallbackData(rsp)
|
|
|
|
withPool(ctx):
|
|
discard addEventListener(ctx[].eventRegistry, "raw_bytes", captureCb, addr evt)
|
|
|
|
check sendRequestToFFIThread(
|
|
ctx, EmitRawBytesEventRequest.ffiNewReq(captureCb, addr rsp)
|
|
)
|
|
.isOk()
|
|
waitCallback(rsp)
|
|
waitCallback(evt)
|
|
|
|
check evt.retCode == RET_OK
|
|
check callbackBytes(evt) == @[byte 0x01, 0x02, 0x03]
|
|
|
|
when not defined(gcRefc):
|
|
## Skipped under refc: setter threads grow/shrink the per-event listener
|
|
## seq, and refc's per-thread GC heap makes that unsafe cross-thread.
|
|
suite "FFIEventRegistry concurrent access":
|
|
## Regression for PR #39 (r3288220895 / r3289285387).
|
|
## Validate with: NIM_FFI_SAN=tsan NIM_FFI_MM=orc nimble test_sanitized
|
|
test "concurrent add/remove writers vs dispatch reads stay race-free":
|
|
setupCallbackData(evt)
|
|
setupCallbackData(rsp)
|
|
|
|
withPool(ctx):
|
|
# Seed an initial listener so the first dispatch has a target.
|
|
discard
|
|
addEventListener(ctx[].eventRegistry, "message_sent", captureCb, addr evt)
|
|
|
|
const NumSetterThreads = 4
|
|
const NumDispatchIters = 200
|
|
|
|
var stop: Atomic[bool]
|
|
stop.store(false)
|
|
var setters: array[NumSetterThreads, Thread[SetterArgs]]
|
|
for i in 0 ..< NumSetterThreads:
|
|
createThread(setters[i], setterThreadBody, (ctx, addr stop, addr evt))
|
|
|
|
for _ in 0 ..< NumDispatchIters:
|
|
resetCalled(rsp)
|
|
check sendRequestToFFIThread(
|
|
ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp)
|
|
)
|
|
.isOk()
|
|
waitCallback(rsp)
|
|
|
|
stop.store(true)
|
|
for i in 0 ..< NumSetterThreads:
|
|
joinThread(setters[i])
|
|
|
|
check evt.called
|
|
|
|
type SlowState = object
|
|
entered: Atomic[bool]
|
|
exited: Atomic[bool]
|
|
|
|
proc slowEventCb(
|
|
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
|
|
) {.cdecl, gcsafe, raises: [].} =
|
|
let st = cast[ptr SlowState](userData)
|
|
st[].entered.store(true)
|
|
os.sleep(60)
|
|
st[].exited.store(true)
|
|
|
|
suite "registry lock held during invocation":
|
|
test "removeEventListener blocks until in-flight dispatch finishes":
|
|
setupCallbackData(rsp)
|
|
|
|
withPool(ctx):
|
|
var st: SlowState
|
|
st.entered.store(false)
|
|
st.exited.store(false)
|
|
|
|
let id =
|
|
addEventListener(ctx[].eventRegistry, "message_sent", slowEventCb, addr st)
|
|
check id != 0'u64
|
|
|
|
check sendRequestToFFIThread(
|
|
ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp)
|
|
)
|
|
.isOk()
|
|
waitCallback(rsp)
|
|
|
|
for _ in 0 ..< 500:
|
|
if st.entered.load():
|
|
break
|
|
os.sleep(1)
|
|
check st.entered.load()
|
|
check not st.exited.load()
|
|
|
|
# Lock-during-invocation: remove blocks until dispatch finishes,
|
|
# by which time slowEventCb has set exited=true.
|
|
check removeEventListener(ctx[].eventRegistry, id)
|
|
check st.exited.load()
|
|
|
|
suite "liveness events":
|
|
## `onNotResponding` / `onResponding` bypass the event queue and dispatch
|
|
## directly to listeners — the queue itself may be wedged behind the same
|
|
## stall they're signalling. These tests pin down the wire shape (event
|
|
## name + CBOR-encoded `EventEnvelope[…]`) so a future refactor can't
|
|
## silently break consumers polling for the "library hung" signal.
|
|
test "onNotResponding delivers EventEnvelope[NotRespondingEvent] to subscribers":
|
|
setupCallbackData(evt)
|
|
|
|
withPool(ctx):
|
|
discard addEventListener(
|
|
ctx[].eventRegistry, NotRespondingEventName, captureCb, addr evt
|
|
)
|
|
|
|
onNotResponding(ctx)
|
|
|
|
waitCallback(evt)
|
|
check evt.retCode == RET_OK
|
|
let decoded = cborDecode(callbackBytes(evt), EventEnvelope[NotRespondingEvent])
|
|
check decoded.isOk()
|
|
check decoded.value.eventType == NotRespondingEventName
|
|
|
|
test "onResponding delivers EventEnvelope[RespondingEvent] to subscribers":
|
|
setupCallbackData(evt)
|
|
|
|
withPool(ctx):
|
|
discard
|
|
addEventListener(ctx[].eventRegistry, RespondingEventName, captureCb, addr evt)
|
|
|
|
onResponding(ctx)
|
|
|
|
waitCallback(evt)
|
|
check evt.retCode == RET_OK
|
|
let decoded = cborDecode(callbackBytes(evt), EventEnvelope[RespondingEvent])
|
|
check decoded.isOk()
|
|
check decoded.value.eventType == RespondingEventName
|
|
|
|
test "liveness events with no subscriber are a no-op":
|
|
withPool(ctx):
|
|
# No listener registered — must not crash, must not block.
|
|
onNotResponding(ctx)
|
|
onResponding(ctx)
|
|
|
|
suite "event thread drains queued events":
|
|
## The event thread wakes every `EventThreadTickInterval` (or on
|
|
## `eventQueueSignal`, not exported) and drains `eventQueue` into the
|
|
## registered listeners. This test pushes a c_malloc'd payload onto the
|
|
## queue from the test thread and waits for the tick-driven drain to
|
|
## deliver it — exercises the `tryEnqueueEvent` → `drainEventQueue` →
|
|
## `dispatchQueuedEvent` → listener path end-to-end.
|
|
test "enqueued event is delivered to subscriber within a tick":
|
|
setupCallbackData(evt)
|
|
|
|
withPool(ctx):
|
|
const QueuedEvtName = "queued_evt"
|
|
discard addEventListener(ctx[].eventRegistry, QueuedEvtName, captureCb, addr evt)
|
|
|
|
# `tryEnqueueEvent` takes ownership of both buffers on success; the
|
|
# event thread c_frees them after dispatch returns.
|
|
let nameBuf = alloc(QueuedEvtName)
|
|
let payload = @[byte 0xDE, 0xAD, 0xBE, 0xEF]
|
|
var shared = allocSharedSeq(payload)
|
|
check tryEnqueueEvent(ctx[].eventQueue, nameBuf, shared.data, shared.len)
|
|
|
|
waitCallback(evt)
|
|
check evt.retCode == RET_OK
|
|
check callbackBytes(evt) == payload
|