mirror of
https://github.com/logos-messaging/nim-ffi.git
synced 2026-06-20 16:29:31 +00:00
refactor(ffi): event thread scaffolding + FFIContext lifecycle split (#71)
This commit is contained in:
parent
721f244312
commit
54c41a3e62
@ -1,10 +1,8 @@
|
||||
//! Synchronous example: exercises the library-event listener API
|
||||
//! (typed + wildcard + remove).
|
||||
//! Synchronous example: exercises the typed per-event listener API.
|
||||
//!
|
||||
//! Run with: `cargo run --example main`
|
||||
|
||||
use my_timer::{decode_event_payload, EchoEvent, EchoRequest, MyTimerCtx, TimerConfig};
|
||||
use std::os::raw::c_int;
|
||||
use my_timer::{EchoEvent, EchoRequest, MyTimerCtx, TimerConfig};
|
||||
use std::sync::mpsc;
|
||||
use std::time::Duration;
|
||||
|
||||
@ -14,32 +12,12 @@ fn main() -> Result<(), String> {
|
||||
Duration::from_secs(5),
|
||||
)?;
|
||||
|
||||
// Typed listener: the closure is invoked on the lib's dispatch
|
||||
// thread, so forward the payload to `main` via std mpsc and block
|
||||
// on `recv_timeout` below. `add_on_echo_fired_listener` is generated
|
||||
// per `{.ffiEvent.}`-declared proc and takes a typed `&EchoEvent`.
|
||||
// Closure runs on the lib's dispatch thread; forward to `main` via mpsc and recv_timeout below.
|
||||
let (tx, rx) = mpsc::channel::<EchoEvent>();
|
||||
let typed_handle = ctx.add_on_echo_fired_listener(move |evt: &EchoEvent| {
|
||||
let _ = tx.send(evt.clone());
|
||||
});
|
||||
|
||||
// Wildcard listener: receives every event with the FFI return code,
|
||||
// the wire `event_id` pre-extracted from the CBOR envelope, and the
|
||||
// raw envelope bytes. Lift to a typed payload via
|
||||
// `decode_event_payload::<T>` when the event_id matches one you
|
||||
// care about — this avoids hand-rolling ciborium calls per branch.
|
||||
let wildcard_handle = ctx.add_event_listener(|ret: c_int, event_id: &str, envelope: &[u8]| {
|
||||
println!("wildcard: ret={}, event_id={}, bytes={}", ret, event_id, envelope.len());
|
||||
if ret == 0 && event_id == "on_echo_fired" {
|
||||
match decode_event_payload::<EchoEvent>(envelope) {
|
||||
Ok(evt) => println!(" decoded: message={}, echo_count={}", evt.message, evt.echo_count),
|
||||
Err(e) => println!(" decode failed: {}", e),
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Trigger the event — fires `on_echo_fired` once, which the
|
||||
// dispatch thread delivers to both listeners above.
|
||||
ctx.echo(EchoRequest { message: "sync-event-demo".into(), delay_ms: 1 })?;
|
||||
|
||||
match rx.recv_timeout(Duration::from_secs(2)) {
|
||||
@ -48,6 +26,5 @@ fn main() -> Result<(), String> {
|
||||
}
|
||||
|
||||
ctx.remove_event_listener(typed_handle);
|
||||
ctx.remove_event_listener(wildcard_handle);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -1,11 +1,9 @@
|
||||
//! Tokio (async) example: same shape as `main.rs` but exercises the
|
||||
//! async `_async` API and bridges library events into a tokio-aware
|
||||
//! channel for async consumption.
|
||||
//! Tokio (async) example: same shape as `main.rs` but exercises the async `_async` API
|
||||
//! and bridges library events into a tokio mpsc for async consumption.
|
||||
//!
|
||||
//! Run with: `cargo run --example tokio_main`
|
||||
|
||||
use my_timer::{decode_event_payload, EchoEvent, EchoRequest, MyTimerCtx, TimerConfig};
|
||||
use std::os::raw::c_int;
|
||||
use my_timer::{EchoEvent, EchoRequest, MyTimerCtx, TimerConfig};
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
@ -17,37 +15,15 @@ async fn main() -> Result<(), String> {
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Typed listener: the handler fires on the lib's dispatch thread,
|
||||
// which is *outside* the tokio runtime. Forwarding through a tokio
|
||||
// `unbounded_channel` (Sender is Send + Sync, non-blocking) hands
|
||||
// the event over to the runtime so we can `.await` it below.
|
||||
// Handler fires on the lib's dispatch thread (outside the tokio runtime); forward via tokio mpsc to await it below.
|
||||
let (typed_tx, mut typed_rx) = mpsc::unbounded_channel::<EchoEvent>();
|
||||
let typed_handle = ctx.add_on_echo_fired_listener(move |evt: &EchoEvent| {
|
||||
let _ = typed_tx.send(evt.clone());
|
||||
});
|
||||
|
||||
// Wildcard listener: receives every event with the FFI return code,
|
||||
// the wire `event_id` pre-extracted from the CBOR envelope, and the
|
||||
// raw envelope bytes. Lift to a typed payload via
|
||||
// `decode_event_payload::<T>` when the event_id matches one you
|
||||
// care about — this avoids hand-rolling ciborium calls per branch.
|
||||
let wildcard_handle = ctx.add_event_listener(|ret: c_int, event_id: &str, envelope: &[u8]| {
|
||||
println!("wildcard: ret={}, event_id={}, bytes={}", ret, event_id, envelope.len());
|
||||
if ret == 0 && event_id == "on_echo_fired" {
|
||||
match decode_event_payload::<EchoEvent>(envelope) {
|
||||
Ok(evt) => println!(" decoded: message={}, echo_count={}", evt.message, evt.echo_count),
|
||||
Err(e) => println!(" decode failed: {}", e),
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Trigger an echo via the async API — fires `on_echo_fired` once,
|
||||
// which the dispatch thread delivers to both listeners above.
|
||||
ctx.echo_async(EchoRequest { message: "async-event-demo".into(), delay_ms: 1 })
|
||||
.await?;
|
||||
|
||||
// Await the typed event with a bounded timeout so a missing event
|
||||
// surfaces as an error instead of hanging the example forever.
|
||||
let evt = tokio::time::timeout(Duration::from_secs(2), typed_rx.recv())
|
||||
.await
|
||||
.map_err(|_| "event never arrived".to_string())?
|
||||
@ -55,6 +31,5 @@ async fn main() -> Result<(), String> {
|
||||
println!("typed onEchoFired: message={}, echo_count={}", evt.message, evt.echo_count);
|
||||
|
||||
ctx.remove_event_listener(typed_handle);
|
||||
ctx.remove_event_listener(wildcard_handle);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -35,9 +35,6 @@ export cbor_serialization, options, results
|
||||
const CborNullByte*: byte = 0xf6'u8
|
||||
## CBOR encoding of `null` — used as the wire sentinel for empty OK payloads.
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
proc cborEncode*[T](x: T): seq[byte] =
|
||||
## CBOR-encode any cbor_serialization-supported type (plus `pointer` / `ptr T`
|
||||
|
||||
@ -63,9 +63,6 @@ proc reqStructName(p: FFIProcMeta): string =
|
||||
else:
|
||||
camel & "Req"
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# File generators
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
proc generateCargoToml*(libName: string): string =
|
||||
# `flume` is the unified callback channel (PR #23 Rust review, item 8): one
|
||||
|
||||
138
ffi/event_thread.nim
Normal file
138
ffi/event_thread.nim
Normal file
@ -0,0 +1,138 @@
|
||||
## Event-thread body and FFI-thread liveness monitoring.
|
||||
##
|
||||
## Included from `ffi_context.nim` — inherits its imports, FFIContext type,
|
||||
## and the heartbeat-timing constants. Lives alongside `ffi_thread.nim`
|
||||
## so each thread's machinery is readable on its own.
|
||||
##
|
||||
## Responsibilities:
|
||||
## - Drain queued events into listener callbacks (queue producer lands in PR #69).
|
||||
## - Watch `ctx.ffiHeartbeat` and emit `NotRespondingEvent` / `RespondingEvent`
|
||||
## on FFI-thread stall and recovery transitions.
|
||||
|
||||
type
|
||||
NotRespondingEvent* = object
|
||||
RespondingEvent* = object
|
||||
|
||||
const
|
||||
NotRespondingEventName* = "not_responding"
|
||||
RespondingEventName* = "responding"
|
||||
|
||||
proc dispatchToListeners[T](
|
||||
ctx: ptr FFIContext[T], eventName: string, data: pointer, dataLen: int
|
||||
) =
|
||||
## Holds reg.lock for the entire snapshot + invocation so concurrent
|
||||
## add/remove on this registry blocks until dispatch returns.
|
||||
withLock ctx[].eventRegistry.lock:
|
||||
let listeners = ctx[].eventRegistry.byEvent.getOrDefault(eventName)
|
||||
if listeners.len == 0:
|
||||
chronicles.debug "no listener registered", event = eventName
|
||||
return
|
||||
foreignThreadGc:
|
||||
try:
|
||||
notifyListeners(listeners, RET_OK, data, dataLen)
|
||||
except Exception, CatchableError:
|
||||
notifyListenersErr(
|
||||
listeners,
|
||||
"Exception dispatching " & eventName & ": " & getCurrentExceptionMsg(),
|
||||
)
|
||||
|
||||
proc emitLivenessEvent[T, P](ctx: ptr FFIContext[T], name: string, payload: P) =
|
||||
## Encodes a zero-field liveness event (`NotRespondingEvent`,
|
||||
## `RespondingEvent`) and dispatches it directly to listeners, bypassing
|
||||
## the event queue (which may itself be wedged). Runs on the event thread.
|
||||
let event =
|
||||
try:
|
||||
EventEnvelope[P](eventType: name, payload: payload).cborEncode()
|
||||
except CatchableError as exc:
|
||||
chronicles.error "liveness event encode failed", name = name, err = exc.msg
|
||||
return
|
||||
let dataPtr: pointer =
|
||||
if event.len > 0: cast[pointer](unsafeAddr event[0])
|
||||
else: cast[pointer](emptyListenerPayload)
|
||||
ctx.dispatchToListeners(name, dataPtr, event.len)
|
||||
|
||||
proc onNotResponding*(ctx: ptr FFIContext) =
|
||||
emitLivenessEvent(ctx, NotRespondingEventName, NotRespondingEvent())
|
||||
|
||||
proc onResponding*(ctx: ptr FFIContext) =
|
||||
## Fired once when the FFI thread's heartbeat starts advancing again
|
||||
## after a `NotRespondingEvent`. Lets consumers clear any "library
|
||||
## hung" UI state without polling.
|
||||
emitLivenessEvent(ctx, RespondingEventName, RespondingEvent())
|
||||
|
||||
proc dispatchQueuedEvent[T](ctx: ptr FFIContext[T], qe: QueuedEvent) =
|
||||
## Frees `qe`'s c_malloc buffers on exit.
|
||||
defer:
|
||||
if not qe.name.isNil():
|
||||
c_free(cast[pointer](qe.name))
|
||||
if not qe.data.isNil():
|
||||
c_free(qe.data)
|
||||
ctx.dispatchToListeners($qe.name, qe.data, qe.dataLen)
|
||||
|
||||
proc drainEventQueue[T](ctx: ptr FFIContext[T]) =
|
||||
while true:
|
||||
let opt = ctx.eventQueue.tryDequeueEvent()
|
||||
if opt.isNone():
|
||||
break
|
||||
ctx.dispatchQueuedEvent(opt.get())
|
||||
|
||||
type HeartbeatMonitor = object
|
||||
startedAt: Moment
|
||||
lastChange: Moment
|
||||
lastValue: int64
|
||||
notifiedStale: bool
|
||||
|
||||
proc init(T: type HeartbeatMonitor, ctx: ptr FFIContext): T =
|
||||
let now = Moment.now()
|
||||
T(
|
||||
startedAt: now,
|
||||
lastChange: now,
|
||||
lastValue: ctx.ffiHeartbeat.load(),
|
||||
notifiedStale: false,
|
||||
)
|
||||
|
||||
proc check[T](hb: var HeartbeatMonitor, ctx: ptr FFIContext[T]) =
|
||||
## Fires `onNotResponding` once the FFI thread's heartbeat counter stops
|
||||
## advancing past the stale threshold, and fires `onResponding` once it
|
||||
## starts advancing again. Both transitions latch so each is emitted at
|
||||
## most once per stall episode.
|
||||
if Moment.now() - hb.startedAt <= FFIHeartbeatStartDelay:
|
||||
return
|
||||
let cur = ctx.ffiHeartbeat.load()
|
||||
if cur != hb.lastValue:
|
||||
if hb.notifiedStale:
|
||||
onResponding(ctx)
|
||||
hb.lastValue = cur
|
||||
hb.lastChange = Moment.now()
|
||||
hb.notifiedStale = false
|
||||
elif not hb.notifiedStale and
|
||||
Moment.now() - hb.lastChange > FFIHeartbeatStaleThreshold:
|
||||
onNotResponding(ctx)
|
||||
hb.notifiedStale = true
|
||||
|
||||
proc eventRun[T](ctx: ptr FFIContext[T]) {.async.} =
|
||||
var hb = HeartbeatMonitor.init(ctx)
|
||||
|
||||
while ctx.running.load():
|
||||
# Wake on enqueue or tick — whichever first. The enqueue path lands in PR #69;
|
||||
# until then the wait always times out and we fall through to the heartbeat check.
|
||||
discard await ctx.eventQueueSignal.wait().withTimeout(EventThreadTickInterval)
|
||||
|
||||
ctx.drainEventQueue()
|
||||
|
||||
if not ctx.running.load():
|
||||
break
|
||||
hb.check(ctx)
|
||||
|
||||
proc eventThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
## Drains the event queue and runs the FFI-thread heartbeat check.
|
||||
## Owns the queued `c_malloc` payloads until dispatch returns.
|
||||
defer:
|
||||
let fireRes = ctx.eventThreadExitSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
error "failed to fire eventThreadExitSignal", err = fireRes.error
|
||||
|
||||
try:
|
||||
waitFor eventRun(ctx)
|
||||
except CatchableError as exc:
|
||||
error "event thread exited with exception", error = exc.msg
|
||||
@ -1,9 +1,20 @@
|
||||
## FFIContext type plus lifecycle (init / signal-stop / join / destroy).
|
||||
##
|
||||
## The per-thread bodies live in `ffi_thread.nim` and `event_thread.nim`,
|
||||
## included below so the thread code can access the private FFIContext
|
||||
## fields without forcing them through a public surface.
|
||||
|
||||
{.passc: "-fPIC".}
|
||||
|
||||
import std/[atomics, locks, json, tables]
|
||||
import system/ansi_c
|
||||
import std/[atomics, locks, options, tables]
|
||||
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
|
||||
import
|
||||
./ffi_types, ./ffi_events, ./ffi_thread_request, ./internal/ffi_macro, ./logging,
|
||||
./ffi_types,
|
||||
./ffi_events,
|
||||
./ffi_thread_request,
|
||||
./internal/ffi_macro,
|
||||
./logging,
|
||||
./cbor_serial
|
||||
|
||||
export ffi_events
|
||||
@ -13,21 +24,21 @@ type FFIContext*[T] = object
|
||||
# main library object (e.g., Waku, LibP2P, SDS, the one to be exposed as a library)
|
||||
ffiThread: Thread[(ptr FFIContext[T])]
|
||||
# represents the main FFI thread in charge of attending API consumer actions
|
||||
watchdogThread: Thread[(ptr FFIContext[T])]
|
||||
# monitors the FFI thread and notifies the FFI API consumer if it hangs
|
||||
eventThread: Thread[(ptr FFIContext[T])]
|
||||
# drains the event queue and runs the FFI-thread heartbeat check
|
||||
lock: Lock
|
||||
reqChannel: ChannelSPSCSingle[ptr FFIThreadRequest]
|
||||
reqSignal: ThreadSignalPtr # to notify the FFI Thread that a new request is sent
|
||||
reqReceivedSignal: ThreadSignalPtr
|
||||
# to signal main thread, interfacing with the FFI thread, that FFI thread received the request
|
||||
stopSignal: ThreadSignalPtr
|
||||
# fired by destroyFFIContext so both ffiThread and watchdogThread can exit promptly
|
||||
threadExitSignal: ThreadSignalPtr
|
||||
# fired by ffiThread just before it exits; destroyFFIContext waits on
|
||||
# this with a bounded timeout instead of joining unconditionally, so a
|
||||
# blocked event loop cannot hang the caller forever
|
||||
threadExitSignal: ThreadSignalPtr # bounds destroyFFIContext's wait so a blocked loop cannot hang the caller
|
||||
eventQueueSignal: ThreadSignalPtr # wakes the event thread on enqueue (used once dispatch is rewired in PR #69)
|
||||
eventThreadExitSignal: ThreadSignalPtr # mirrors threadExitSignal for the event thread
|
||||
userData*: pointer
|
||||
eventRegistry*: FFIEventRegistry
|
||||
eventQueue*: EventQueue
|
||||
ffiHeartbeat*: Atomic[int64] # advanced each FFI-thread loop; event thread reads for liveness
|
||||
running: Atomic[bool] # To control when the threads are running
|
||||
registeredRequests: ptr Table[cstring, FFIRequestProc]
|
||||
# Pointer to with the registered requests at compile time
|
||||
@ -39,246 +50,22 @@ var onFFIThread* {.threadvar.}: bool
|
||||
|
||||
const git_version* {.strdefine.} = "n/a"
|
||||
|
||||
proc sendRequestToFFIThread*(
|
||||
ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration
|
||||
): Result[void, string] =
|
||||
# Reentrancy guard (PR #23 review, item 6): if a handler running on the FFI
|
||||
# thread tries to dispatch back through this proc, it would wait forever on
|
||||
# `reqReceivedSignal` — which only this thread can fire — and self-deadlock.
|
||||
# Return an error instead so the caller can surface it.
|
||||
if onFFIThread:
|
||||
deleteRequest(ffiRequest)
|
||||
return err(
|
||||
"reentrant ffi call: a handler invoked sendRequestToFFIThread on its own context"
|
||||
)
|
||||
const
|
||||
EventThreadTickInterval* = 1.seconds # bounds idle heartbeat check latency
|
||||
FFIHeartbeatStartDelay* = 10.seconds # grace window for library startup
|
||||
FFIHeartbeatStaleThreshold* = 1.seconds
|
||||
|
||||
# All async submissions serialise on `ctx.lock` for the full
|
||||
# trySend + fireSync + waitSync sequence because `reqChannel` is
|
||||
# single-producer and `reqReceivedSignal` is shared across callers.
|
||||
# Multi-producer redesign is tracked as PR #23 review item 7.
|
||||
ctx.lock.acquire()
|
||||
defer:
|
||||
ctx.lock.release()
|
||||
include ./event_thread
|
||||
include ./ffi_thread
|
||||
|
||||
## Sending the request
|
||||
let sentOk = ctx.reqChannel.trySend(ffiRequest)
|
||||
if not sentOk:
|
||||
deleteRequest(ffiRequest)
|
||||
return err("Couldn't send a request to the ffi thread")
|
||||
|
||||
let fireSyncRes = ctx.reqSignal.fireSync()
|
||||
if fireSyncRes.isErr():
|
||||
deleteRequest(ffiRequest)
|
||||
return err("failed fireSync: " & $fireSyncRes.error)
|
||||
|
||||
if fireSyncRes.get() == false:
|
||||
deleteRequest(ffiRequest)
|
||||
return err("Couldn't fireSync in time")
|
||||
|
||||
## wait until the FFI working thread properly received the request
|
||||
let res = ctx.reqReceivedSignal.waitSync(timeout)
|
||||
if res.isErr():
|
||||
## Do not free ffiRequest here: the FFI thread was already signaled and
|
||||
## will process (and free) it.
|
||||
return err("Couldn't receive reqReceivedSignal signal")
|
||||
|
||||
## Notice that in case of "ok", the deallocShared(req) is performed by the FFI Thread in the
|
||||
## process proc.
|
||||
return ok()
|
||||
|
||||
type Foo = object
|
||||
registerReqFFI(WatchdogReq, foo: ptr Foo):
|
||||
proc(): Future[Result[string, string]] {.async.} =
|
||||
return ok("FFI thread is not blocked")
|
||||
|
||||
type JsonNotRespondingEvent = object
|
||||
eventType: string
|
||||
|
||||
proc init(T: type JsonNotRespondingEvent): T =
|
||||
return JsonNotRespondingEvent(eventType: "not_responding")
|
||||
|
||||
proc `$`(event: JsonNotRespondingEvent): string =
|
||||
$(%*event)
|
||||
|
||||
proc onNotResponding*(ctx: ptr FFIContext) =
|
||||
## Shim: still emits the legacy JSON payload through the registry, so
|
||||
## existing foreign consumers see no wire-shape change. A follow-up
|
||||
## PR replaces this with a CBOR `NotRespondingEvent`.
|
||||
## Mirrors the dispatch templates' lock-during-invocation contract
|
||||
## (see `ffi_events.nim`).
|
||||
withLock ctx[].eventRegistry.lock:
|
||||
let snap = ctx[].eventRegistry.byEvent.getOrDefault("onNotResponding")
|
||||
if snap.len == 0:
|
||||
chronicles.debug "onNotResponding - no listener registered"
|
||||
return
|
||||
foreignThreadGc:
|
||||
let event = $JsonNotRespondingEvent.init()
|
||||
for listener in snap:
|
||||
listener.callback(
|
||||
RET_OK,
|
||||
cast[ptr cchar](unsafeAddr event[0]),
|
||||
cast[csize_t](len(event)),
|
||||
listener.userData,
|
||||
)
|
||||
|
||||
proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} =
|
||||
## Watchdog thread that monitors the FFI thread and notifies the library user if it hangs.
|
||||
## This thread never blocks.
|
||||
|
||||
let watchdogRun = proc(ctx: ptr FFIContext) {.async.} =
|
||||
const WatchdogStartDelay = 10.seconds
|
||||
const WatchdogTimeinterval = 1.seconds
|
||||
const WatchdogTimeout = 20.seconds
|
||||
|
||||
# Give time for the node to be created and up before sending watchdog requests
|
||||
let initialStop = await ctx.stopSignal.wait().withTimeout(WatchdogStartDelay)
|
||||
if initialStop or ctx.running.load == false:
|
||||
return
|
||||
|
||||
while true:
|
||||
let intervalStop = await ctx.stopSignal.wait().withTimeout(WatchdogTimeinterval)
|
||||
|
||||
if intervalStop or ctx.running.load == false:
|
||||
debug "Watchdog thread exiting because FFIContext is not running"
|
||||
break
|
||||
|
||||
let callback = proc(
|
||||
callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer
|
||||
) {.cdecl, gcsafe, raises: [].} =
|
||||
discard ## Don't do anything. Just respecting the callback signature.
|
||||
const nilUserData = nil
|
||||
|
||||
trace "Sending watchdog request to FFI thread"
|
||||
|
||||
try:
|
||||
sendRequestToFFIThread(
|
||||
ctx, WatchdogReq.ffiNewReq(callback, nilUserData), WatchdogTimeout
|
||||
).isOkOr:
|
||||
error "Failed to send watchdog request to FFI thread", error = $error
|
||||
onNotResponding(ctx)
|
||||
except Exception as exc:
|
||||
error "Exception sending watchdog request", exc = exc.msg
|
||||
onNotResponding(ctx)
|
||||
|
||||
waitFor watchdogRun(ctx)
|
||||
|
||||
proc processRequest[T](
|
||||
request: ptr FFIThreadRequest, ctx: ptr FFIContext[T]
|
||||
) {.async.} =
|
||||
## Invoked within the FFI thread to process a request coming from the FFI API consumer thread.
|
||||
|
||||
let reqId = $request[].reqId
|
||||
## The reqId determines which proc will handle the request.
|
||||
## The registeredRequests represents a table defined at compile time.
|
||||
## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously]
|
||||
|
||||
## Explicit conversion keeps `reqId` alive as the backing string,
|
||||
## avoiding the implicit string→cstring warning that will become an error.
|
||||
let reqIdCs = reqId.cstring
|
||||
|
||||
let retFut =
|
||||
if not ctx[].registeredRequests[].contains(reqIdCs):
|
||||
## That shouldn't happen because only registered requests should be sent to the FFI thread.
|
||||
nilProcess(request[].reqId)
|
||||
else:
|
||||
ctx[].registeredRequests[][reqIdCs](cast[pointer](request), ctx)
|
||||
|
||||
## Catch every catchable exception (including CancelledError raised by
|
||||
## the shutdown drain in ffiRun) so handleRes — and its `deleteRequest`
|
||||
## defer — always runs. Otherwise an abandoned in-flight handler would
|
||||
## leak its request envelope, reqId copy, and CBOR payload.
|
||||
let res =
|
||||
try:
|
||||
await retFut
|
||||
except CatchableError as exc:
|
||||
Result[seq[byte], string].err(
|
||||
"Error in processRequest for " & reqId & ": " & exc.msg
|
||||
)
|
||||
|
||||
## handleRes may raise (OOM, GC setup) even though it is rare. Catching here
|
||||
## keeps the async proc raises:[] compatible. The defer inside handleRes
|
||||
## guarantees request is freed before the exception propagates.
|
||||
try:
|
||||
handleRes(res, request)
|
||||
except Exception as exc:
|
||||
error "Unexpected exception in handleRes", error = exc.msg
|
||||
|
||||
proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
## FFI thread body that attends library user API requests
|
||||
ffiCurrentEventRegistry = addr ctx[].eventRegistry
|
||||
onFFIThread = true
|
||||
|
||||
logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)
|
||||
|
||||
defer:
|
||||
onFFIThread = false
|
||||
# Signal destroyFFIContext that this thread has exited, so its bounded
|
||||
# wait can unblock and proceed with cleanup.
|
||||
let fireRes = ctx.threadExitSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error
|
||||
|
||||
let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} =
|
||||
var ffiReqHandler: T
|
||||
## Holds the main library object, i.e., in charge of handling the ffi requests.
|
||||
## e.g., Waku, LibP2P, SDS, etc.
|
||||
|
||||
## In-flight processRequest futures. Tracked so they can be drained on
|
||||
## shutdown — otherwise destroying the context while a handler is
|
||||
## awaiting (e.g. sleepAsync) abandons the future and leaks the
|
||||
## request's envelope/reqId/payload allocations.
|
||||
var pending: seq[Future[void]] = @[]
|
||||
|
||||
proc reapCompleted() =
|
||||
var i = 0
|
||||
while i < pending.len:
|
||||
if pending[i].finished():
|
||||
pending.del(i)
|
||||
else:
|
||||
inc i
|
||||
|
||||
while ctx.running.load():
|
||||
reapCompleted()
|
||||
|
||||
let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds)
|
||||
if not gotSignal:
|
||||
continue
|
||||
|
||||
## Wait for a request from the ffi consumer thread
|
||||
var request: ptr FFIThreadRequest
|
||||
if not ctx.reqChannel.tryRecv(request):
|
||||
continue
|
||||
|
||||
if ctx.myLib.isNil():
|
||||
ctx.myLib = addr ffiReqHandler
|
||||
|
||||
## Handle the request
|
||||
pending.add processRequest(request, ctx)
|
||||
|
||||
let fireRes = ctx.reqReceivedSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
error "could not fireSync back to requester thread", error = fireRes.error
|
||||
|
||||
## Drain in-flight handlers so each request's `deleteRequest` runs
|
||||
## before we exit. Without this, abandoning a future mid-await would
|
||||
## leak the request allocations (visible to LSan; previously hidden
|
||||
## because Nim's pool allocator kept the chunks alive in the process).
|
||||
reapCompleted()
|
||||
if pending.len > 0:
|
||||
try:
|
||||
await allFutures(pending)
|
||||
except CatchableError as exc:
|
||||
error "draining pending FFI requests on shutdown raised",
|
||||
error = exc.msg
|
||||
|
||||
waitFor ffiRun(ctx)
|
||||
|
||||
proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
## Full cleanup for heap-allocated contexts: closes all resources and frees memory.
|
||||
defer:
|
||||
freeShared(ctx)
|
||||
proc deinitContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
## Mirror of `initContextResources`: tears down lock, registry, queue,
|
||||
## and signal fds in place. Threads MUST already be joined. Caller owns
|
||||
## the memory holding `ctx`. Fields are nil'd after close so a re-init
|
||||
## on the same slot doesn't double-close.
|
||||
ctx.lock.deinitLock()
|
||||
deinitEventRegistry(ctx[].eventRegistry)
|
||||
deinitEventQueue(ctx[].eventQueue)
|
||||
when defined(gcRefc):
|
||||
## ThreadSignalPtr.close() is intentionally skipped under --mm:refc.
|
||||
##
|
||||
@ -303,20 +90,45 @@ proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
else:
|
||||
if not ctx.reqSignal.isNil():
|
||||
?ctx.reqSignal.close()
|
||||
ctx.reqSignal = nil
|
||||
if not ctx.reqReceivedSignal.isNil():
|
||||
?ctx.reqReceivedSignal.close()
|
||||
ctx.reqReceivedSignal = nil
|
||||
if not ctx.stopSignal.isNil():
|
||||
?ctx.stopSignal.close()
|
||||
ctx.stopSignal = nil
|
||||
if not ctx.threadExitSignal.isNil():
|
||||
?ctx.threadExitSignal.close()
|
||||
ctx.threadExitSignal = nil
|
||||
if not ctx.eventQueueSignal.isNil():
|
||||
?ctx.eventQueueSignal.close()
|
||||
ctx.eventQueueSignal = nil
|
||||
if not ctx.eventThreadExitSignal.isNil():
|
||||
?ctx.eventThreadExitSignal.close()
|
||||
ctx.eventThreadExitSignal = nil
|
||||
return ok()
|
||||
|
||||
proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
## Full cleanup for heap-allocated contexts: closes all resources and frees memory.
|
||||
defer:
|
||||
freeShared(ctx)
|
||||
ctx.deinitContextResources()
|
||||
|
||||
proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
## Initialises all resources inside an already-allocated FFIContext slot.
|
||||
## On failure every partially-initialised resource is closed; the caller
|
||||
## is responsible for releasing the slot (freeShared or pool.releaseSlot).
|
||||
# Defensive nil: deferred cleanup must never double-close stale pointers on a reused pool slot.
|
||||
ctx.reqSignal = nil
|
||||
ctx.reqReceivedSignal = nil
|
||||
ctx.stopSignal = nil
|
||||
ctx.threadExitSignal = nil
|
||||
ctx.eventQueueSignal = nil
|
||||
ctx.eventThreadExitSignal = nil
|
||||
ctx.lock.initLock()
|
||||
initEventRegistry(ctx[].eventRegistry)
|
||||
initEventQueue(ctx[].eventQueue)
|
||||
ctx.ffiHeartbeat.store(0)
|
||||
|
||||
var success = false
|
||||
defer:
|
||||
@ -337,6 +149,12 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
ctx.threadExitSignal = ThreadSignalPtr.new().valueOr:
|
||||
return err("couldn't create threadExitSignal ThreadSignalPtr: " & $error)
|
||||
|
||||
ctx.eventQueueSignal = ThreadSignalPtr.new().valueOr:
|
||||
return err("couldn't create eventQueueSignal ThreadSignalPtr: " & $error)
|
||||
|
||||
ctx.eventThreadExitSignal = ThreadSignalPtr.new().valueOr:
|
||||
return err("couldn't create eventThreadExitSignal ThreadSignalPtr: " & $error)
|
||||
|
||||
ctx.registeredRequests = addr ffi_types.registeredRequests
|
||||
|
||||
ctx.running.store(true)
|
||||
@ -347,32 +165,40 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
return err("failed to create the FFI thread: " & getCurrentExceptionMsg())
|
||||
|
||||
try:
|
||||
createThread(ctx.watchdogThread, watchdogThreadBody, ctx)
|
||||
createThread(ctx.eventThread, eventThreadBody[T], ctx)
|
||||
except ValueError, ResourceExhaustedError:
|
||||
## ffiThread is already running; signal it to exit and join before the
|
||||
## deferred cleanUpResources closes the signals it's waiting on.
|
||||
ctx.running.store(false)
|
||||
let fireRes = ctx.reqSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
error "failed to signal ffiThread during watchdog cleanup", error = fireRes.error
|
||||
error "failed to signal ffiThread during event-thread cleanup",
|
||||
error = fireRes.error
|
||||
joinThread(ctx.ffiThread)
|
||||
return err("failed to create the watchdog thread: " & getCurrentExceptionMsg())
|
||||
return err("failed to create the event thread: " & getCurrentExceptionMsg())
|
||||
|
||||
success = true
|
||||
return ok()
|
||||
|
||||
proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
# Error paths intentionally skip onNotResponding: a back-pressuring
|
||||
# listener may hold reg.lock, and onNotResponding takes it — would
|
||||
# amplify the stuck state into a deadlock instead of escaping it.
|
||||
ctx.running.store(false)
|
||||
let reqSignaled = ctx.reqSignal.fireSync().valueOr:
|
||||
ctx.onNotResponding()
|
||||
return err("error signaling reqSignal in signalStop: " & $error)
|
||||
if not reqSignaled:
|
||||
ctx.onNotResponding()
|
||||
return err("failed to signal reqSignal on time in signalStop")
|
||||
let stopSignaled = ctx.stopSignal.fireSync().valueOr:
|
||||
return err("error signaling stopSignal in signalStop: " & $error)
|
||||
if not stopSignaled:
|
||||
return err("failed to signal stopSignal on time in signalStop")
|
||||
# Non-fatal: event thread will see running==false on the next tick.
|
||||
let evtSignaled = ctx.eventQueueSignal.fireSync()
|
||||
if evtSignaled.isErr():
|
||||
error "failed to signal eventQueueSignal in signalStop", error = evtSignaled.error
|
||||
elif evtSignaled.get() == false:
|
||||
error "failed to signal eventQueueSignal on time in signalStop"
|
||||
return ok()
|
||||
|
||||
## If the FFI thread's event loop is blocked by a synchronous handler
|
||||
@ -383,23 +209,30 @@ proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
const ThreadExitTimeout* = 1500.milliseconds
|
||||
|
||||
proc stopAndJoinThreads*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
## Signals the FFI and watchdog threads to stop, waits up to ThreadExitTimeout
|
||||
## for the FFI thread to exit, and joins both. On timeout returns err and
|
||||
## skips joinThread (leaving the threads live) rather than hanging the caller.
|
||||
## Resource cleanup (signal fds, lock) is the caller's responsibility.
|
||||
## Signals both threads to stop, waits up to ThreadExitTimeout per thread,
|
||||
## and joins them. On timeout returns err and skips remaining joins
|
||||
## (leaving the threads live) rather than hanging the caller. Resource
|
||||
## cleanup is the caller's responsibility.
|
||||
##
|
||||
## Timeout paths skip onNotResponding for the same reason signalStop does.
|
||||
ctx.signalStop().isOkOr:
|
||||
return err("signalStop failed: " & $error)
|
||||
|
||||
let exitedOnTime = ctx.threadExitSignal.waitSync(ThreadExitTimeout).valueOr:
|
||||
ctx.onNotResponding()
|
||||
let ffiExitedOnTime = ctx.threadExitSignal.waitSync(ThreadExitTimeout).valueOr:
|
||||
return err("error waiting for FFI thread exit: " & $error)
|
||||
|
||||
if not exitedOnTime:
|
||||
ctx.onNotResponding()
|
||||
if not ffiExitedOnTime:
|
||||
return err("FFI thread did not exit in time; leaking ctx to avoid hang")
|
||||
|
||||
joinThread(ctx.ffiThread)
|
||||
joinThread(ctx.watchdogThread)
|
||||
|
||||
let evtExitedOnTime = ctx.eventThreadExitSignal.waitSync(ThreadExitTimeout).valueOr:
|
||||
return err("error waiting for event thread exit: " & $error)
|
||||
|
||||
if not evtExitedOnTime:
|
||||
return err("event thread did not exit in time; leaking ctx to avoid hang")
|
||||
|
||||
joinThread(ctx.eventThread)
|
||||
return ok()
|
||||
|
||||
proc clearContext[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
|
||||
@ -48,13 +48,12 @@ proc destroyFFIContext*[T](
|
||||
## unsafe.
|
||||
ctx.stopAndJoinThreads().isOkOr:
|
||||
return err("destroyFFIContext(pool): " & $error)
|
||||
# Tear down the event registry on the *owning* thread so its
|
||||
# GC-managed Table / seq storage is freed on the same heap that
|
||||
# allocated it. Without this, the next thread to grab this slot
|
||||
# would crash inside `initEventRegistry`'s assignment-dtor when
|
||||
# `initTable` tries to dealloc the previous thread's data.
|
||||
deinitEventRegistry(ctx[].eventRegistry)
|
||||
# Without this, the next acquisition would re-init an already-initialised
|
||||
# lock (UB) and leak the previous signal fds.
|
||||
let deinitRes = ctx.deinitContextResources()
|
||||
pool.releaseSlot(ctx)
|
||||
deinitRes.isOkOr:
|
||||
return err("destroyFFIContext(pool): " & $error)
|
||||
return ok()
|
||||
|
||||
proc isValidCtx*[T](pool: var FFIContextPool[T], ctx: pointer): bool =
|
||||
|
||||
@ -1,29 +1,15 @@
|
||||
## Event registry and dispatch primitives for FFI library-initiated events.
|
||||
##
|
||||
## This module owns two concerns so they can evolve together without dragging
|
||||
## in the rest of `FFIContext`:
|
||||
##
|
||||
## 1. A multi-listener registry. Each event name maps to a `seq` of
|
||||
## listeners; a dispatched event reaches exactly the listeners
|
||||
## subscribed to its name. Callers subscribe to each event separately.
|
||||
## 2. The dispatch templates (`dispatchFFIEvent`, `dispatchFFIEventCbor`) used
|
||||
## by `{.ffiEvent.}`-generated procs. They snapshot the registry under its
|
||||
## lock, then invoke each listener *outside* the lock so re-entrant
|
||||
## add/remove from within a handler cannot self-deadlock.
|
||||
##
|
||||
## Phase 1 keeps dispatch synchronous on the FFI thread. A later phase will
|
||||
## route events through a bounded queue to a dedicated event thread; the
|
||||
## registry API does not change.
|
||||
## Event registry, bounded SPSC event queue, and dispatch templates for
|
||||
## FFI library-initiated events. Listeners receive only the event name
|
||||
## they subscribed to. Queue payloads travel via `c_malloc` so transfer
|
||||
## across Nim heaps is safe under both `--mm:orc` and `--mm:refc`.
|
||||
|
||||
{.pragma: callback, cdecl, raises: [], gcsafe.}
|
||||
|
||||
import std/[locks, sequtils, tables]
|
||||
import system/ansi_c
|
||||
import std/[locks, sequtils, options, tables]
|
||||
import chronicles
|
||||
import ./ffi_types, ./cbor_serial
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Wire envelope
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
type EventEnvelope*[T] = object
|
||||
## Standard wire shape for CBOR-encoded FFI events:
|
||||
@ -32,9 +18,6 @@ type EventEnvelope*[T] = object
|
||||
eventType*: string
|
||||
payload*: T
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Registry types
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
type
|
||||
FFIEventListener* = object
|
||||
@ -50,9 +33,6 @@ type
|
||||
nextId*: uint64 ## Monotonic id source. 0 is reserved as "invalid"; ids start at 1.
|
||||
byEvent*: Table[string, seq[FFIEventListener]]
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Registry lifecycle and mutation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
proc initEventRegistry*(reg: var FFIEventRegistry) =
|
||||
## Must be called exactly once on the owning thread before the registry
|
||||
@ -99,7 +79,7 @@ proc addEventListener*(
|
||||
let listener =
|
||||
FFIEventListener(id: assigned, callback: callback, userData: userData)
|
||||
reg.byEvent.mgetOrPut(eventName, @[]).add(listener)
|
||||
return assigned
|
||||
assigned
|
||||
|
||||
proc removeEventListener*(reg: var FFIEventRegistry, id: uint64): bool {.raises: [].} =
|
||||
## Removes the listener with `id`. Returns true on success, false if no
|
||||
@ -126,7 +106,7 @@ proc removeEventListener*(reg: var FFIEventRegistry, id: uint64): bool {.raises:
|
||||
break
|
||||
if prune:
|
||||
reg.byEvent.del(pruneKey)
|
||||
return removed
|
||||
removed
|
||||
|
||||
proc removeAllEventListeners*(reg: var FFIEventRegistry) {.raises: [].} =
|
||||
## Drops every registered listener. Does not reset the listener-id
|
||||
@ -142,29 +122,120 @@ proc snapshotListeners*(
|
||||
## makes re-entrant add/remove from inside a handler deadlock-free:
|
||||
## dispatch holds the lock only for the duration of the copy, then
|
||||
## iterates the copy outside the lock.
|
||||
var snap: seq[FFIEventListener] = @[]
|
||||
var listeners: seq[FFIEventListener] = @[]
|
||||
withLock reg.lock:
|
||||
# `getOrDefault` returns an empty seq when the key is absent —
|
||||
# avoids the raising `[]` operator path.
|
||||
# `getOrDefault` avoids the raising `[]` path; returns empty when absent.
|
||||
for l in reg.byEvent.getOrDefault(eventName):
|
||||
snap.add(l)
|
||||
return snap
|
||||
listeners.add(l)
|
||||
listeners
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dispatch templates (used by {.ffiEvent.}-generated procs)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
const EventQueueCapacity* = 1024
|
||||
## ~24 KiB per context. Sustained backlog at this depth means a
|
||||
## listener is wedged — what the stuck flag exists to surface.
|
||||
|
||||
type
|
||||
QueuedEvent* = object
|
||||
## All fields are raw `c_malloc` pointers so the buffer survives
|
||||
## pool-slot reuse across thread heaps without an assignment dtor.
|
||||
name*: cstring
|
||||
data*: ptr UncheckedArray[byte]
|
||||
dataLen*: int
|
||||
|
||||
EventQueue* = object
|
||||
## SPSC ring: FFI thread enqueues, event thread dequeues. Plain lock
|
||||
## (no atomic indices) — operations are short and uncontended.
|
||||
lock*: Lock
|
||||
head*: int
|
||||
tail*: int
|
||||
count*: int
|
||||
buf*: array[EventQueueCapacity, QueuedEvent]
|
||||
|
||||
proc initEventQueue*(q: var EventQueue) {.raises: [].} =
|
||||
## Same single-owning-thread constraint as `initEventRegistry`.
|
||||
q.lock.initLock()
|
||||
q.head = 0
|
||||
q.tail = 0
|
||||
q.count = 0
|
||||
for i in 0 ..< EventQueueCapacity:
|
||||
q.buf[i] = QueuedEvent(name: nil, data: nil, dataLen: 0)
|
||||
|
||||
proc deinitEventQueue*(q: var EventQueue) {.raises: [].} =
|
||||
## Both producer and consumer must have stopped before calling.
|
||||
for i in 0 ..< EventQueueCapacity:
|
||||
let e = q.buf[i]
|
||||
if not e.name.isNil:
|
||||
c_free(cast[pointer](e.name))
|
||||
if not e.data.isNil:
|
||||
c_free(e.data)
|
||||
q.buf[i] = QueuedEvent(name: nil, data: nil, dataLen: 0)
|
||||
q.head = 0
|
||||
q.tail = 0
|
||||
q.count = 0
|
||||
q.lock.deinitLock()
|
||||
|
||||
proc tryEnqueueEvent*(
|
||||
q: var EventQueue, name: cstring, data: ptr UncheckedArray[byte], dataLen: int
|
||||
): bool {.raises: [], gcsafe.} =
|
||||
## Both `name` and `data` must be `c_malloc`'d; on success the queue
|
||||
## takes ownership. On false the caller still owns and must free them.
|
||||
withLock q.lock:
|
||||
if q.count >= EventQueueCapacity:
|
||||
return false
|
||||
q.buf[q.tail] = QueuedEvent(name: name, data: data, dataLen: dataLen)
|
||||
q.tail = (q.tail + 1) mod EventQueueCapacity
|
||||
q.count.inc()
|
||||
true
|
||||
|
||||
proc tryDequeueEvent*(q: var EventQueue): Option[QueuedEvent] {.raises: [], gcsafe.} =
|
||||
## Transfers buffer ownership to the caller, who must `c_free` both.
|
||||
withLock q.lock:
|
||||
if q.count == 0:
|
||||
return none(QueuedEvent)
|
||||
let e = q.buf[q.head]
|
||||
q.buf[q.head] = QueuedEvent(name: nil, data: nil, dataLen: 0)
|
||||
q.head = (q.head + 1) mod EventQueueCapacity
|
||||
q.count.dec()
|
||||
return some(e)
|
||||
|
||||
proc eventQueueLen*(q: var EventQueue): int {.raises: [], gcsafe.} =
|
||||
withLock q.lock:
|
||||
return q.count
|
||||
|
||||
|
||||
const emptyListenerPayload*: cstring = ""
|
||||
## Non-nil zero-length buffer handed to listeners when a payload is
|
||||
## empty, so a consumer doing `std::string(data, len)` / `memcpy` never
|
||||
## receives a nil pointer (which is UB even at len 0).
|
||||
|
||||
proc notifyListeners*(
|
||||
listeners: seq[FFIEventListener], retCode: cint, data: pointer, dataLen: int
|
||||
) =
|
||||
## Fans out a payload to every listener in the snapshot. Empty payloads
|
||||
## are delivered as the non-nil `emptyListenerPayload` sentinel so a
|
||||
## consumer doing `std::string(data, len)` / `memcpy` never receives nil.
|
||||
let n = max(dataLen, 0)
|
||||
let dataPtr =
|
||||
if n > 0 and not data.isNil(): cast[ptr cchar](data)
|
||||
else: cast[ptr cchar](emptyListenerPayload)
|
||||
for listener in listeners:
|
||||
listener.callback(retCode, dataPtr, cast[csize_t](n), listener.userData)
|
||||
|
||||
proc notifyListenersErr*(listeners: seq[FFIEventListener], msg: string) =
|
||||
## Error fan-out: adapts the message string to `notifyListeners`, which
|
||||
## supplies the non-nil pointer for the empty-message case.
|
||||
let p =
|
||||
if msg.len > 0: cast[pointer](unsafeAddr msg[0])
|
||||
else: cast[pointer](emptyListenerPayload)
|
||||
notifyListeners(listeners, RET_ERR, p, msg.len)
|
||||
|
||||
var ffiCurrentEventRegistry* {.threadvar.}: ptr FFIEventRegistry
|
||||
## Set by the FFI thread at startup so dispatchFFIEvent / dispatchFFIEventCbor
|
||||
## can find their registry without taking a context pointer per call site.
|
||||
|
||||
template withFFIEventDispatch(
|
||||
eventName: string, listeners, body: untyped
|
||||
) =
|
||||
## Shared scaffold for `dispatchFFIEvent` / `dispatchFFIEventCbor`:
|
||||
## resolves the thread-local registry, snapshots listeners under
|
||||
## `reg.lock` into the caller-named `listeners` binding, then runs
|
||||
## `body` inside `foreignThreadGc` + try/except.
|
||||
template withFFIEventDispatch(eventName: string, listeners, body: untyped) =
|
||||
## Resolves the thread-local registry, snapshots listeners under
|
||||
## `reg.lock`, then runs `body` inside `foreignThreadGc` + try/except.
|
||||
let regPtr = ffiCurrentEventRegistry
|
||||
if regPtr.isNil():
|
||||
chronicles.error eventName & " - event registry not set on this thread"
|
||||
@ -179,53 +250,39 @@ template withFFIEventDispatch(
|
||||
try:
|
||||
body
|
||||
except Exception, CatchableError:
|
||||
let msg =
|
||||
"Exception dispatching " & eventName & ": " & getCurrentExceptionMsg()
|
||||
for listener in listeners:
|
||||
listener.callback(
|
||||
RET_ERR,
|
||||
cast[ptr cchar](unsafeAddr msg[0]),
|
||||
cast[csize_t](len(msg)),
|
||||
listener.userData,
|
||||
)
|
||||
notifyListenersErr(
|
||||
listeners,
|
||||
"Exception dispatching " & eventName & ": " & getCurrentExceptionMsg(),
|
||||
)
|
||||
|
||||
template dispatchFFIEvent*(eventName: string, body: untyped) =
|
||||
## Dispatches an FFI event to every listener subscribed to `eventName`.
|
||||
## `body` must yield a `string` or `seq[byte]`.
|
||||
##
|
||||
## Valid only on the FFI thread (where `ffiCurrentEventRegistry` is
|
||||
## set). Holds `reg.lock` for the entire snapshot + invocation so a
|
||||
## concurrent `removeEventListener` from a foreign thread blocks until
|
||||
## dispatch returns — closes the UAF window in #40 / PR #39 review
|
||||
## #4356915554. Handlers must not call addEventListener /
|
||||
## removeEventListener on the same registry (would self-deadlock).
|
||||
## Valid only on the FFI thread (where `ffiCurrentEventRegistry` is set).
|
||||
## Holds `reg.lock` for the entire snapshot + invocation so a concurrent
|
||||
## `removeEventListener` from a foreign thread blocks until dispatch
|
||||
## returns. Handlers must not call addEventListener / removeEventListener
|
||||
## on the same registry (would self-deadlock).
|
||||
withFFIEventDispatch(eventName, listeners):
|
||||
let event = body
|
||||
for listener in listeners:
|
||||
listener.callback(
|
||||
RET_OK,
|
||||
cast[ptr cchar](unsafeAddr event[0]),
|
||||
cast[csize_t](len(event)),
|
||||
listener.userData,
|
||||
)
|
||||
let dataPtr: pointer =
|
||||
if event.len > 0: cast[pointer](unsafeAddr event[0])
|
||||
else: cast[pointer](emptyListenerPayload)
|
||||
notifyListeners(listeners, RET_OK, dataPtr, event.len)
|
||||
|
||||
template dispatchFFIEventCbor*(eventName: string, eventPayload: typed) =
|
||||
## Typed CBOR variant of `dispatchFFIEvent`. Wraps `eventPayload` in an
|
||||
## `EventEnvelope`, CBOR-encodes it into a `c_malloc` buffer once, and
|
||||
## fans the same buffer out to every registered listener.
|
||||
##
|
||||
## NB: the template parameter is intentionally named `eventPayload`
|
||||
## rather than `payload` — Nim's template substitution would otherwise
|
||||
## also replace the `payload:` field name inside `EventEnvelope`.
|
||||
## NB: parameter is `eventPayload`, not `payload` — Nim's template
|
||||
## substitution would otherwise also rewrite the `payload:` field inside
|
||||
## `EventEnvelope`.
|
||||
withFFIEventDispatch(eventName, listeners):
|
||||
var (data, dataLen) = cborEncodeShared(
|
||||
EventEnvelope[typeof(eventPayload)](
|
||||
eventType: eventName, payload: eventPayload
|
||||
)
|
||||
EventEnvelope[typeof(eventPayload)](eventType: eventName, payload: eventPayload)
|
||||
)
|
||||
defer:
|
||||
cborFreeShared(data)
|
||||
for listener in listeners:
|
||||
listener.callback(
|
||||
RET_OK, cast[ptr cchar](data), cast[csize_t](dataLen), listener.userData
|
||||
)
|
||||
notifyListeners(listeners, RET_OK, data, dataLen)
|
||||
|
||||
166
ffi/ffi_thread.nim
Normal file
166
ffi/ffi_thread.nim
Normal file
@ -0,0 +1,166 @@
|
||||
## FFI-thread body and request submission API.
|
||||
##
|
||||
## Included from `ffi_context.nim` — inherits its imports, FFIContext type,
|
||||
## and the `onFFIThread` threadvar. Companion to `event_thread.nim`.
|
||||
##
|
||||
## Responsibilities:
|
||||
## - Receive `FFIThreadRequest`s from foreign threads via `reqChannel` and
|
||||
## dispatch them through the user-registered handler table.
|
||||
## - Advance `ctx.ffiHeartbeat` each loop iteration so the event thread can
|
||||
## detect a wedged FFI thread.
|
||||
|
||||
proc sendRequestToFFIThread*(
|
||||
ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration
|
||||
): Result[void, string] =
|
||||
# Reentrancy guard: only this thread can fire `reqReceivedSignal`, so a handler dispatching back would self-deadlock.
|
||||
if onFFIThread:
|
||||
deleteRequest(ffiRequest)
|
||||
return err(
|
||||
"reentrant ffi call: a handler invoked sendRequestToFFIThread on its own context"
|
||||
)
|
||||
|
||||
# All async submissions serialise on `ctx.lock` for the full
|
||||
# trySend + fireSync + waitSync sequence because `reqChannel` is
|
||||
# single-producer and `reqReceivedSignal` is shared across callers.
|
||||
# Multi-producer redesign is tracked as PR #23 review item 7.
|
||||
ctx.lock.acquire()
|
||||
defer:
|
||||
ctx.lock.release()
|
||||
|
||||
## Sending the request
|
||||
let sentOk = ctx.reqChannel.trySend(ffiRequest)
|
||||
if not sentOk:
|
||||
deleteRequest(ffiRequest)
|
||||
return err("Couldn't send a request to the ffi thread")
|
||||
|
||||
let fireSyncRes = ctx.reqSignal.fireSync()
|
||||
if fireSyncRes.isErr():
|
||||
deleteRequest(ffiRequest)
|
||||
return err("failed fireSync: " & $fireSyncRes.error)
|
||||
|
||||
if fireSyncRes.get() == false:
|
||||
deleteRequest(ffiRequest)
|
||||
return err("Couldn't fireSync in time")
|
||||
|
||||
## wait until the FFI working thread properly received the request
|
||||
let res = ctx.reqReceivedSignal.waitSync(timeout)
|
||||
if res.isErr():
|
||||
## Do not free ffiRequest here: the FFI thread was already signaled and
|
||||
## will process (and free) it.
|
||||
return err("Couldn't receive reqReceivedSignal signal")
|
||||
|
||||
## Notice that in case of "ok", the deallocShared(req) is performed by the FFI Thread in the
|
||||
## process proc.
|
||||
ok()
|
||||
|
||||
proc processRequest[T](
|
||||
request: ptr FFIThreadRequest, ctx: ptr FFIContext[T]
|
||||
) {.async.} =
|
||||
## Invoked within the FFI thread to process a request coming from the FFI API consumer thread.
|
||||
|
||||
let reqId = $request[].reqId
|
||||
## The reqId determines which proc will handle the request.
|
||||
## The registeredRequests represents a table defined at compile time.
|
||||
## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously]
|
||||
|
||||
## Explicit conversion keeps `reqId` alive as the backing string,
|
||||
## avoiding the implicit string→cstring warning that will become an error.
|
||||
let reqIdCs = reqId.cstring
|
||||
|
||||
let retFut =
|
||||
if not ctx[].registeredRequests[].contains(reqIdCs):
|
||||
## That shouldn't happen because only registered requests should be sent to the FFI thread.
|
||||
nilProcess(request[].reqId)
|
||||
else:
|
||||
ctx[].registeredRequests[][reqIdCs](cast[pointer](request), ctx)
|
||||
|
||||
## Catch every catchable exception (including CancelledError raised by
|
||||
## the shutdown drain in ffiRun) so handleRes — and its `deleteRequest`
|
||||
## defer — always runs. Otherwise an abandoned in-flight handler would
|
||||
## leak its request envelope, reqId copy, and CBOR payload.
|
||||
let res =
|
||||
try:
|
||||
await retFut
|
||||
except CatchableError as exc:
|
||||
Result[seq[byte], string].err(
|
||||
"Error in processRequest for " & reqId & ": " & exc.msg
|
||||
)
|
||||
|
||||
## handleRes may raise (OOM, GC setup) even though it is rare. Catching here
|
||||
## keeps the async proc raises:[] compatible. The defer inside handleRes
|
||||
## guarantees request is freed before the exception propagates.
|
||||
try:
|
||||
handleRes(res, request)
|
||||
except Exception as exc:
|
||||
error "Unexpected exception in handleRes", error = exc.msg
|
||||
|
||||
proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
## FFI thread body that attends library user API requests
|
||||
ffiCurrentEventRegistry = addr ctx[].eventRegistry
|
||||
onFFIThread = true
|
||||
|
||||
logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)
|
||||
|
||||
defer:
|
||||
onFFIThread = false
|
||||
# Unblocks destroyFFIContext's bounded wait so cleanup can proceed.
|
||||
let fireRes = ctx.threadExitSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error
|
||||
|
||||
let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} =
|
||||
var ffiReqHandler: T
|
||||
## Holds the main library object, i.e., in charge of handling the ffi requests.
|
||||
## e.g., Waku, LibP2P, SDS, etc.
|
||||
|
||||
## In-flight processRequest futures. Tracked so they can be drained on
|
||||
## shutdown — otherwise destroying the context while a handler is
|
||||
## awaiting (e.g. sleepAsync) abandons the future and leaks the
|
||||
## request's envelope/reqId/payload allocations.
|
||||
var pending: seq[Future[void]] = @[]
|
||||
|
||||
proc reapCompleted() =
|
||||
var i = 0
|
||||
while i < pending.len:
|
||||
if pending[i].finished():
|
||||
pending.del(i)
|
||||
else:
|
||||
inc i
|
||||
|
||||
while ctx.running.load():
|
||||
# Freezes if a sync handler blocks the dispatcher; event thread reads to detect wedged FFI thread.
|
||||
discard ctx.ffiHeartbeat.fetchAdd(1)
|
||||
|
||||
reapCompleted()
|
||||
|
||||
let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds)
|
||||
if not gotSignal:
|
||||
continue
|
||||
|
||||
## Wait for a request from the ffi consumer thread
|
||||
var request: ptr FFIThreadRequest
|
||||
if not ctx.reqChannel.tryRecv(request):
|
||||
continue
|
||||
|
||||
if ctx.myLib.isNil():
|
||||
ctx.myLib = addr ffiReqHandler
|
||||
|
||||
## Handle the request
|
||||
pending.add processRequest(request, ctx)
|
||||
|
||||
let fireRes = ctx.reqReceivedSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
error "could not fireSync back to requester thread", error = fireRes.error
|
||||
|
||||
## Drain in-flight handlers so each request's `deleteRequest` runs
|
||||
## before we exit. Without this, abandoning a future mid-await would
|
||||
## leak the request allocations (visible to LSan; previously hidden
|
||||
## because Nim's pool allocator kept the chunks alive in the process).
|
||||
reapCompleted()
|
||||
if pending.len > 0:
|
||||
try:
|
||||
await allFutures(pending)
|
||||
except CatchableError as exc:
|
||||
error "draining pending FFI requests on shutdown raised", error = exc.msg
|
||||
|
||||
waitFor ffiRun(ctx)
|
||||
@ -7,9 +7,6 @@ when defined(ffiGenBindings):
|
||||
import ../codegen/cpp
|
||||
import ../codegen/cddl
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# String helpers used by multiple macros
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
proc isPtr(typ: NimNode): bool =
|
||||
## True iff `typ` is a `ptr T` type expression — i.e. an `nnkPtrTy` AST node.
|
||||
@ -600,9 +597,6 @@ macro ffiRaw*(prc: untyped): untyped =
|
||||
echo stmts.repr
|
||||
return stmts
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ffi macro — primary FFI proc / FFI type registration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
macro ffi*(prc: untyped): untyped =
|
||||
## Simplified FFI macro — applies to procs or types.
|
||||
@ -843,9 +837,6 @@ macro ffi*(prc: untyped): untyped =
|
||||
echo stmts.repr
|
||||
return stmts
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ffiCtor — constructor macro
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
proc buildCtorRequestType(
|
||||
reqTypeName: NimNode, paramNames: seq[string], paramTypes: seq[NimNode]
|
||||
@ -1257,9 +1248,6 @@ macro ffiCtor*(prc: untyped): untyped =
|
||||
echo stmts.repr
|
||||
return stmts
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ffiDtor — destructor macro
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
macro ffiDtor*(prc: untyped): untyped =
|
||||
## Defines a C-exported destructor that tears down the FFIContext after the
|
||||
@ -1373,9 +1361,6 @@ macro ffiDtor*(prc: untyped): untyped =
|
||||
echo stmts.repr
|
||||
return stmts
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ffiEvent — library-initiated typed event
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
macro ffiEvent*(wireName: static[string], prc: untyped): untyped =
|
||||
## Declares a library-initiated event. The annotated proc has an empty
|
||||
@ -1467,9 +1452,6 @@ macro ffiEvent*(wireName: static[string], prc: untyped): untyped =
|
||||
echo generated.repr
|
||||
return generated
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# genBindings — codegen entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
macro genBindings*(
|
||||
outputDir: static[string] = ffiOutputDir, nimSrcRelPath: static[string] = ffiSrcPath
|
||||
|
||||
@ -64,7 +64,7 @@ proc callbackBytes(d: var CallbackData): seq[byte] =
|
||||
var bytes = newSeq[byte](d.msgLen)
|
||||
if d.msgLen > 0:
|
||||
copyMem(addr bytes[0], addr d.msg[0], d.msgLen)
|
||||
return bytes
|
||||
bytes
|
||||
|
||||
## A request that dispatches a typed CBOR event from inside the FFI
|
||||
## thread and then returns ok — so the response callback can be used to
|
||||
@ -243,9 +243,6 @@ when not defined(gcRefc):
|
||||
# actually landed so a silently-broken dispatch loop is caught.
|
||||
check evt.called
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Lock-during-invocation regression (issue #40 second concern)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
## A foreign-thread mutation must not be able to invalidate the
|
||||
## listener's `userData` while an in-flight dispatch is mid-invocation.
|
||||
@ -310,3 +307,107 @@ suite "registry lock held during invocation":
|
||||
check st.exited.load()
|
||||
joinThread(thr)
|
||||
check done.load()
|
||||
|
||||
suite "liveness events":
|
||||
## `onNotResponding` / `onResponding` bypass the event queue and dispatch
|
||||
## directly to listeners — the queue itself may be wedged behind the same
|
||||
## stall they're signalling. These tests pin down the wire shape (event
|
||||
## name + CBOR-encoded `EventEnvelope[…]`) so a future refactor can't
|
||||
## silently break consumers polling for the "library hung" signal.
|
||||
test "onNotResponding delivers EventEnvelope[NotRespondingEvent] to subscribers":
|
||||
var pool: FFIContextPool[TestEvtLib]
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
defer:
|
||||
discard pool.destroyFFIContext(ctx)
|
||||
|
||||
var evt: CallbackData
|
||||
initCallbackData(evt)
|
||||
defer:
|
||||
deinitCallbackData(evt)
|
||||
|
||||
discard addEventListener(
|
||||
ctx[].eventRegistry, NotRespondingEventName, captureCb, addr evt
|
||||
)
|
||||
|
||||
onNotResponding(ctx)
|
||||
|
||||
waitCallback(evt)
|
||||
check evt.retCode == RET_OK
|
||||
let decoded =
|
||||
cborDecode(callbackBytes(evt), EventEnvelope[NotRespondingEvent])
|
||||
check decoded.isOk()
|
||||
check decoded.value.eventType == NotRespondingEventName
|
||||
|
||||
test "onResponding delivers EventEnvelope[RespondingEvent] to subscribers":
|
||||
var pool: FFIContextPool[TestEvtLib]
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
defer:
|
||||
discard pool.destroyFFIContext(ctx)
|
||||
|
||||
var evt: CallbackData
|
||||
initCallbackData(evt)
|
||||
defer:
|
||||
deinitCallbackData(evt)
|
||||
|
||||
discard addEventListener(
|
||||
ctx[].eventRegistry, RespondingEventName, captureCb, addr evt
|
||||
)
|
||||
|
||||
onResponding(ctx)
|
||||
|
||||
waitCallback(evt)
|
||||
check evt.retCode == RET_OK
|
||||
let decoded = cborDecode(callbackBytes(evt), EventEnvelope[RespondingEvent])
|
||||
check decoded.isOk()
|
||||
check decoded.value.eventType == RespondingEventName
|
||||
|
||||
test "liveness events with no subscriber are a no-op":
|
||||
var pool: FFIContextPool[TestEvtLib]
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
defer:
|
||||
discard pool.destroyFFIContext(ctx)
|
||||
# No listener registered — must not crash, must not block.
|
||||
onNotResponding(ctx)
|
||||
onResponding(ctx)
|
||||
|
||||
suite "event thread drains queued events":
|
||||
## The event thread wakes every `EventThreadTickInterval` (or on
|
||||
## `eventQueueSignal`, not exported) and drains `eventQueue` into the
|
||||
## registered listeners. This test pushes a c_malloc'd payload onto the
|
||||
## queue from the test thread and waits for the tick-driven drain to
|
||||
## deliver it — exercises the `tryEnqueueEvent` → `drainEventQueue` →
|
||||
## `dispatchQueuedEvent` → listener path end-to-end.
|
||||
test "enqueued event is delivered to subscriber within a tick":
|
||||
var pool: FFIContextPool[TestEvtLib]
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
defer:
|
||||
discard pool.destroyFFIContext(ctx)
|
||||
|
||||
var evt: CallbackData
|
||||
initCallbackData(evt)
|
||||
defer:
|
||||
deinitCallbackData(evt)
|
||||
|
||||
const QueuedEvtName = "queued_evt"
|
||||
discard addEventListener(
|
||||
ctx[].eventRegistry, QueuedEvtName, captureCb, addr evt
|
||||
)
|
||||
|
||||
# `tryEnqueueEvent` takes ownership of both buffers on success; the
|
||||
# event thread c_frees them after dispatch returns.
|
||||
let nameBuf = alloc(QueuedEvtName)
|
||||
let payload = @[byte 0xDE, 0xAD, 0xBE, 0xEF]
|
||||
var shared = allocSharedSeq(payload)
|
||||
check tryEnqueueEvent(ctx[].eventQueue, nameBuf, shared.data, shared.len)
|
||||
|
||||
waitCallback(evt)
|
||||
check evt.retCode == RET_OK
|
||||
check callbackBytes(evt) == payload
|
||||
|
||||
@ -10,12 +10,10 @@ import std/locks
|
||||
import unittest2
|
||||
import ffi
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tiny helpers — a thread-safe sink each listener writes into so we can
|
||||
# assert which callbacks would fire and in what order once dispatch lands.
|
||||
# Today only `tagCb`'s presence is exercised; the recorder is also used to
|
||||
# make sure listener bookkeeping doesn't accidentally invoke callbacks.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
type Recorder = object
|
||||
lock: Lock
|
||||
@ -50,9 +48,6 @@ proc tagCb(
|
||||
copyMem(addr payload[0], msg, int(len))
|
||||
record(t[].rec[], t[].name, retCode, payload)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
suite "FFIEventRegistry mutation":
|
||||
test "addEventListener assigns monotonically increasing non-zero ids":
|
||||
|
||||
@ -324,9 +324,6 @@ suite "sendRequestToFFIThread":
|
||||
check d.retCode == RET_OK
|
||||
check cborDecode(callbackBytes(d), string).value == "pong:" & msg
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ffiCtor / .ffi. macros — exercise the full CBOR transport
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
type SimpleLib = object
|
||||
value: int
|
||||
@ -375,9 +372,6 @@ suite "ffiCtor macro":
|
||||
|
||||
check SimpleLibFFIPool.destroyFFIContext(ctx).isOk()
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Simplified .ffi. macro integration test
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
type SendConfig {.ffi.} = object
|
||||
message: string
|
||||
@ -468,10 +462,8 @@ suite "sync-body .ffi. is dispatched on FFI thread":
|
||||
check d2.retCode == RET_OK
|
||||
check cborDecode(callbackBytes(d2), string).value == "v3"
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Nim-native API (no callbacks, no CBOR buffers): the original proc name
|
||||
# resolves to the user's declared async signature and is callable directly.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
suite "Nim-native .ffi. / .ffiCtor. API":
|
||||
test "user proc names retain their declared Future[Result[T,string]] shape":
|
||||
@ -492,14 +484,12 @@ suite "Nim-native .ffi. / .ffiCtor. API":
|
||||
check ctorRes.isOk
|
||||
check ctorRes.value.value == 21
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Regression for PR #23 review items 1–5: a `.ffi.` body without `await`
|
||||
# used to be emitted as an inline-on-foreign-thread fast path, which bypassed
|
||||
# `foreignThreadGc`, `ctx.lock`, and chronos's single-thread invariant. The
|
||||
# sync fast-path was deleted; this test records `getThreadId()` inside a
|
||||
# sync body and asserts the handler runs on the FFI thread, not on the
|
||||
# caller's thread.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
var gRecordedHandlerTid: Atomic[int]
|
||||
|
||||
@ -558,13 +548,11 @@ suite "sync-body .ffi. runs on FFI thread (PR #23 regression)":
|
||||
# And the callback payload (the recorded tid) matches what the handler stored.
|
||||
check cborDecode(callbackBytes(d), int).value == handlerTid
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Regression for PR #23 review item 6: reentrancy guard on
|
||||
# sendRequestToFFIThread. A handler running on the FFI thread that tries to
|
||||
# dispatch back through sendRequestToFFIThread used to self-deadlock waiting
|
||||
# on `reqReceivedSignal` (which only the FFI thread can fire). The guard now
|
||||
# returns an Err immediately.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
var gReentrantNestedRes: Channel[string]
|
||||
gReentrantNestedRes.open()
|
||||
|
||||
@ -201,12 +201,10 @@ suite "CBOR error handling":
|
||||
let res = cborDecode(truncated, string)
|
||||
check res.isErr
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Regression for PR #23 review item 9: cborEncodeShared writes directly into
|
||||
# a c_malloc buffer, letting the FFI thread request take ownership without
|
||||
# an intermediate seq[byte] copy. The shared-encoder must produce
|
||||
# byte-for-byte the same output as the seq-encoder.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
import system/ansi_c
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user