chore: implement optional improvements

This commit is contained in:
Gabriel Cruz 2026-06-05 14:26:33 -03:00
parent b8cab5d8b9
commit be832c5175
No known key found for this signature in database
GPG Key ID: 2E467754A6BA9BA5
3 changed files with 311 additions and 269 deletions

137
ffi/event_thread.nim Normal file
View File

@ -0,0 +1,137 @@
## Event-thread body and FFI-thread liveness monitoring.
##
## Included from `ffi_context.nim` — inherits its imports, FFIContext type,
## and the heartbeat-timing constants. Lives alongside `ffi_thread.nim`
## so each thread's machinery is readable on its own.
##
## Responsibilities:
## - Drain queued events into listener callbacks (queue producer lands in PR #69).
## - Watch `ctx.ffiHeartbeat` and emit `NotRespondingEvent` / `RespondingEvent`
## on FFI-thread stall and recovery transitions.
type
NotRespondingEvent* = object
RespondingEvent* = object
const
NotRespondingEventName* = "not_responding"
RespondingEventName* = "responding"
proc dispatchToListeners[T](
ctx: ptr FFIContext[T], eventName: string, data: pointer, dataLen: int
) =
## Holds reg.lock for the entire snapshot + invocation so concurrent
## add/remove on this registry blocks until dispatch returns.
withLock ctx[].eventRegistry.lock:
let listeners = ctx[].eventRegistry.byEvent.getOrDefault(eventName)
if listeners.len == 0:
chronicles.debug "no listener registered", event = eventName
return
foreignThreadGc:
try:
notifyListeners(listeners, RET_OK, data, dataLen)
except Exception, CatchableError:
notifyListenersErr(
listeners,
"Exception dispatching " & eventName & ": " & getCurrentExceptionMsg(),
)
proc emitLivenessEvent[T, P](ctx: ptr FFIContext[T], name: string, payload: P) =
## Encodes a zero-field liveness event (`NotRespondingEvent`,
## `RespondingEvent`) and dispatches it directly to listeners, bypassing
## the event queue (which may itself be wedged). Runs on the event thread.
let event =
try:
EventEnvelope[P](eventType: name, payload: payload).cborEncode()
except CatchableError as exc:
chronicles.error "liveness event encode failed", name = name, err = exc.msg
return
let dataPtr: pointer =
if event.len > 0: unsafeAddr event[0] else: nil
ctx.dispatchToListeners(name, dataPtr, event.len)
proc onNotResponding*(ctx: ptr FFIContext) =
emitLivenessEvent(ctx, NotRespondingEventName, NotRespondingEvent())
proc onResponding*(ctx: ptr FFIContext) =
## Fired once when the FFI thread's heartbeat starts advancing again
## after a `NotRespondingEvent`. Lets consumers clear any "library
## hung" UI state without polling.
emitLivenessEvent(ctx, RespondingEventName, RespondingEvent())
proc dispatchQueuedEvent[T](ctx: ptr FFIContext[T], qe: QueuedEvent) =
## Frees `qe`'s c_malloc buffers on exit.
defer:
if not qe.name.isNil():
c_free(cast[pointer](qe.name))
if not qe.data.isNil():
c_free(qe.data)
ctx.dispatchToListeners($qe.name, qe.data, qe.dataLen)
proc drainEventQueue[T](ctx: ptr FFIContext[T]) =
while true:
let opt = ctx.eventQueue.tryDequeueEvent()
if opt.isNone():
break
ctx.dispatchQueuedEvent(opt.get())
type HeartbeatMonitor = object
startedAt: Moment
lastChange: Moment
lastValue: int64
notifiedStale: bool
proc init(T: type HeartbeatMonitor, ctx: ptr FFIContext): T =
let now = Moment.now()
T(
startedAt: now,
lastChange: now,
lastValue: ctx.ffiHeartbeat.load(),
notifiedStale: false,
)
proc check[T](hb: var HeartbeatMonitor, ctx: ptr FFIContext[T]) =
## Fires `onNotResponding` once the FFI thread's heartbeat counter stops
## advancing past the stale threshold, and fires `onResponding` once it
## starts advancing again. Both transitions latch so each is emitted at
## most once per stall episode.
if Moment.now() - hb.startedAt <= FFIHeartbeatStartDelay:
return
let cur = ctx.ffiHeartbeat.load()
if cur != hb.lastValue:
if hb.notifiedStale:
onResponding(ctx)
hb.lastValue = cur
hb.lastChange = Moment.now()
hb.notifiedStale = false
elif not hb.notifiedStale and
Moment.now() - hb.lastChange > FFIHeartbeatStaleThreshold:
onNotResponding(ctx)
hb.notifiedStale = true
proc eventRun[T](ctx: ptr FFIContext[T]) {.async.} =
var hb = HeartbeatMonitor.init(ctx)
while ctx.running.load():
# Wake on enqueue or tick — whichever first. The enqueue path lands in PR #69;
# until then the wait always times out and we fall through to the heartbeat check.
discard await ctx.eventQueueSignal.wait().withTimeout(EventThreadTickInterval)
ctx.drainEventQueue()
if not ctx.running.load():
break
hb.check(ctx)
proc eventThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
## Drains the event queue and runs the FFI-thread heartbeat check.
## Owns the queued `c_malloc` payloads until dispatch returns.
defer:
let fireRes = ctx.eventThreadExitSignal.fireSync()
if fireRes.isErr():
error "failed to fire eventThreadExitSignal", err = fireRes.error
try:
waitFor eventRun(ctx)
except CatchableError as exc:
error "event thread exited with exception", error = exc.msg

View File

@ -1,3 +1,9 @@
## FFIContext type plus lifecycle (init / signal-stop / join / destroy).
##
## The per-thread bodies live in `ffi_thread.nim` and `event_thread.nim`,
## included below so the thread code can access the private FFIContext
## fields without forcing them through a public surface.
{.passc: "-fPIC".}
import system/ansi_c
@ -49,275 +55,8 @@ const
FFIHeartbeatStartDelay* = 10.seconds # grace window for library startup
FFIHeartbeatStaleThreshold* = 1.seconds
type NotRespondingEvent* = object
const NotRespondingEventName* = "not_responding"
proc encodeNotRespondingEvent(): seq[byte] =
EventEnvelope[NotRespondingEvent](
eventType: NotRespondingEventName, payload: NotRespondingEvent()
).cborEncode()
proc dispatchToListeners[T](
ctx: ptr FFIContext[T], eventName: string, data: pointer, dataLen: int
) =
## Holds reg.lock for the entire snapshot + invocation so concurrent
## add/remove on this registry blocks until dispatch returns.
withLock ctx[].eventRegistry.lock:
let listeners = ctx[].eventRegistry.byEvent.getOrDefault(eventName)
if listeners.len == 0:
chronicles.debug "no listener registered", event = eventName
return
foreignThreadGc:
try:
notifyListeners(listeners, RET_OK, data, dataLen)
except Exception, CatchableError:
notifyListenersErr(
listeners,
"Exception dispatching " & eventName & ": " & getCurrentExceptionMsg(),
)
proc onNotResponding*(ctx: ptr FFIContext) =
## Bypasses the event queue (which may itself be wedged) and dispatches
## directly to listeners. Runs on the event thread.
let event =
try:
encodeNotRespondingEvent()
except CatchableError as exc:
chronicles.error "onNotResponding - encode failed", err = exc.msg
return
let dataPtr: pointer =
if event.len > 0: unsafeAddr event[0] else: nil
ctx.dispatchToListeners(NotRespondingEventName, dataPtr, event.len)
proc sendRequestToFFIThread*(
ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration
): Result[void, string] =
# Reentrancy guard: only this thread can fire `reqReceivedSignal`, so a handler dispatching back would self-deadlock.
if onFFIThread:
deleteRequest(ffiRequest)
return err(
"reentrant ffi call: a handler invoked sendRequestToFFIThread on its own context"
)
# All async submissions serialise on `ctx.lock` for the full
# trySend + fireSync + waitSync sequence because `reqChannel` is
# single-producer and `reqReceivedSignal` is shared across callers.
# Multi-producer redesign is tracked as PR #23 review item 7.
ctx.lock.acquire()
defer:
ctx.lock.release()
## Sending the request
let sentOk = ctx.reqChannel.trySend(ffiRequest)
if not sentOk:
deleteRequest(ffiRequest)
return err("Couldn't send a request to the ffi thread")
let fireSyncRes = ctx.reqSignal.fireSync()
if fireSyncRes.isErr():
deleteRequest(ffiRequest)
return err("failed fireSync: " & $fireSyncRes.error)
if fireSyncRes.get() == false:
deleteRequest(ffiRequest)
return err("Couldn't fireSync in time")
## wait until the FFI working thread properly received the request
let res = ctx.reqReceivedSignal.waitSync(timeout)
if res.isErr():
## Do not free ffiRequest here: the FFI thread was already signaled and
## will process (and free) it.
return err("Couldn't receive reqReceivedSignal signal")
## Notice that in case of "ok", the deallocShared(req) is performed by the FFI Thread in the
## process proc.
ok()
proc processRequest[T](
request: ptr FFIThreadRequest, ctx: ptr FFIContext[T]
) {.async.} =
## Invoked within the FFI thread to process a request coming from the FFI API consumer thread.
let reqId = $request[].reqId
## The reqId determines which proc will handle the request.
## The registeredRequests represents a table defined at compile time.
## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously]
## Explicit conversion keeps `reqId` alive as the backing string,
## avoiding the implicit string→cstring warning that will become an error.
let reqIdCs = reqId.cstring
let retFut =
if not ctx[].registeredRequests[].contains(reqIdCs):
## That shouldn't happen because only registered requests should be sent to the FFI thread.
nilProcess(request[].reqId)
else:
ctx[].registeredRequests[][reqIdCs](cast[pointer](request), ctx)
## Catch every catchable exception (including CancelledError raised by
## the shutdown drain in ffiRun) so handleRes — and its `deleteRequest`
## defer — always runs. Otherwise an abandoned in-flight handler would
## leak its request envelope, reqId copy, and CBOR payload.
let res =
try:
await retFut
except CatchableError as exc:
Result[seq[byte], string].err(
"Error in processRequest for " & reqId & ": " & exc.msg
)
## handleRes may raise (OOM, GC setup) even though it is rare. Catching here
## keeps the async proc raises:[] compatible. The defer inside handleRes
## guarantees request is freed before the exception propagates.
try:
handleRes(res, request)
except Exception as exc:
error "Unexpected exception in handleRes", error = exc.msg
proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
## FFI thread body that attends library user API requests
ffiCurrentEventRegistry = addr ctx[].eventRegistry
onFFIThread = true
logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)
defer:
onFFIThread = false
# Unblocks destroyFFIContext's bounded wait so cleanup can proceed.
let fireRes = ctx.threadExitSignal.fireSync()
if fireRes.isErr():
error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error
let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} =
var ffiReqHandler: T
## Holds the main library object, i.e., in charge of handling the ffi requests.
## e.g., Waku, LibP2P, SDS, etc.
## In-flight processRequest futures. Tracked so they can be drained on
## shutdown — otherwise destroying the context while a handler is
## awaiting (e.g. sleepAsync) abandons the future and leaks the
## request's envelope/reqId/payload allocations.
var pending: seq[Future[void]] = @[]
proc reapCompleted() =
var i = 0
while i < pending.len:
if pending[i].finished():
pending.del(i)
else:
inc i
while ctx.running.load():
# Freezes if a sync handler blocks the dispatcher; event thread reads to detect wedged FFI thread.
discard ctx.ffiHeartbeat.fetchAdd(1)
reapCompleted()
let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds)
if not gotSignal:
continue
## Wait for a request from the ffi consumer thread
var request: ptr FFIThreadRequest
if not ctx.reqChannel.tryRecv(request):
continue
if ctx.myLib.isNil():
ctx.myLib = addr ffiReqHandler
## Handle the request
pending.add processRequest(request, ctx)
let fireRes = ctx.reqReceivedSignal.fireSync()
if fireRes.isErr():
error "could not fireSync back to requester thread", error = fireRes.error
## Drain in-flight handlers so each request's `deleteRequest` runs
## before we exit. Without this, abandoning a future mid-await would
## leak the request allocations (visible to LSan; previously hidden
## because Nim's pool allocator kept the chunks alive in the process).
reapCompleted()
if pending.len > 0:
try:
await allFutures(pending)
except CatchableError as exc:
error "draining pending FFI requests on shutdown raised", error = exc.msg
waitFor ffiRun(ctx)
proc dispatchQueuedEvent[T](ctx: ptr FFIContext[T], qe: QueuedEvent) =
## Frees `qe`'s c_malloc buffers on exit.
defer:
if not qe.name.isNil():
c_free(cast[pointer](qe.name))
if not qe.data.isNil():
c_free(qe.data)
ctx.dispatchToListeners($qe.name, qe.data, qe.dataLen)
proc drainEventQueue[T](ctx: ptr FFIContext[T]) =
while true:
let opt = ctx.eventQueue.tryDequeueEvent()
if opt.isNone():
break
ctx.dispatchQueuedEvent(opt.get())
type HeartbeatMonitor = object
startedAt: Moment
lastChange: Moment
lastValue: int64
notifiedStale: bool
proc init(T: type HeartbeatMonitor, ctx: ptr FFIContext): T =
let now = Moment.now()
T(
startedAt: now,
lastChange: now,
lastValue: ctx.ffiHeartbeat.load(),
notifiedStale: false,
)
proc check[T](hb: var HeartbeatMonitor, ctx: ptr FFIContext[T]) =
## Fires onNotResponding once the FFI thread's heartbeat counter stops
## advancing past the stale threshold. Latches until it moves again.
if Moment.now() - hb.startedAt <= FFIHeartbeatStartDelay:
return
let cur = ctx.ffiHeartbeat.load()
if cur != hb.lastValue:
hb.lastValue = cur
hb.lastChange = Moment.now()
hb.notifiedStale = false
elif not hb.notifiedStale and
Moment.now() - hb.lastChange > FFIHeartbeatStaleThreshold:
onNotResponding(ctx)
hb.notifiedStale = true
proc eventRun[T](ctx: ptr FFIContext[T]) {.async.} =
var hb = HeartbeatMonitor.init(ctx)
while ctx.running.load():
# Wake on enqueue or tick — whichever first. The enqueue path lands in PR #69;
# until then the wait always times out and we fall through to the heartbeat check.
discard await ctx.eventQueueSignal.wait().withTimeout(EventThreadTickInterval)
ctx.drainEventQueue()
if not ctx.running.load():
break
hb.check(ctx)
proc eventThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
## Drains the event queue and runs the FFI-thread heartbeat check.
## Owns the queued `c_malloc` payloads until dispatch returns.
defer:
let fireRes = ctx.eventThreadExitSignal.fireSync()
if fireRes.isErr():
error "failed to fire eventThreadExitSignal", err = fireRes.error
try:
waitFor eventRun(ctx)
except CatchableError as exc:
error "event thread exited with exception", error = exc.msg
include ./event_thread
include ./ffi_thread
proc deinitContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
## Mirror of `initContextResources`: tears down lock, registry, queue,

166
ffi/ffi_thread.nim Normal file
View File

@ -0,0 +1,166 @@
## FFI-thread body and request submission API.
##
## Included from `ffi_context.nim` — inherits its imports, FFIContext type,
## and the `onFFIThread` threadvar. Companion to `event_thread.nim`.
##
## Responsibilities:
## - Receive `FFIThreadRequest`s from foreign threads via `reqChannel` and
## dispatch them through the user-registered handler table.
## - Advance `ctx.ffiHeartbeat` each loop iteration so the event thread can
## detect a wedged FFI thread.
proc sendRequestToFFIThread*(
ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration
): Result[void, string] =
# Reentrancy guard: only this thread can fire `reqReceivedSignal`, so a handler dispatching back would self-deadlock.
if onFFIThread:
deleteRequest(ffiRequest)
return err(
"reentrant ffi call: a handler invoked sendRequestToFFIThread on its own context"
)
# All async submissions serialise on `ctx.lock` for the full
# trySend + fireSync + waitSync sequence because `reqChannel` is
# single-producer and `reqReceivedSignal` is shared across callers.
# Multi-producer redesign is tracked as PR #23 review item 7.
ctx.lock.acquire()
defer:
ctx.lock.release()
## Sending the request
let sentOk = ctx.reqChannel.trySend(ffiRequest)
if not sentOk:
deleteRequest(ffiRequest)
return err("Couldn't send a request to the ffi thread")
let fireSyncRes = ctx.reqSignal.fireSync()
if fireSyncRes.isErr():
deleteRequest(ffiRequest)
return err("failed fireSync: " & $fireSyncRes.error)
if fireSyncRes.get() == false:
deleteRequest(ffiRequest)
return err("Couldn't fireSync in time")
## wait until the FFI working thread properly received the request
let res = ctx.reqReceivedSignal.waitSync(timeout)
if res.isErr():
## Do not free ffiRequest here: the FFI thread was already signaled and
## will process (and free) it.
return err("Couldn't receive reqReceivedSignal signal")
## Notice that in case of "ok", the deallocShared(req) is performed by the FFI Thread in the
## process proc.
ok()
proc processRequest[T](
request: ptr FFIThreadRequest, ctx: ptr FFIContext[T]
) {.async.} =
## Invoked within the FFI thread to process a request coming from the FFI API consumer thread.
let reqId = $request[].reqId
## The reqId determines which proc will handle the request.
## The registeredRequests represents a table defined at compile time.
## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously]
## Explicit conversion keeps `reqId` alive as the backing string,
## avoiding the implicit string→cstring warning that will become an error.
let reqIdCs = reqId.cstring
let retFut =
if not ctx[].registeredRequests[].contains(reqIdCs):
## That shouldn't happen because only registered requests should be sent to the FFI thread.
nilProcess(request[].reqId)
else:
ctx[].registeredRequests[][reqIdCs](cast[pointer](request), ctx)
## Catch every catchable exception (including CancelledError raised by
## the shutdown drain in ffiRun) so handleRes — and its `deleteRequest`
## defer — always runs. Otherwise an abandoned in-flight handler would
## leak its request envelope, reqId copy, and CBOR payload.
let res =
try:
await retFut
except CatchableError as exc:
Result[seq[byte], string].err(
"Error in processRequest for " & reqId & ": " & exc.msg
)
## handleRes may raise (OOM, GC setup) even though it is rare. Catching here
## keeps the async proc raises:[] compatible. The defer inside handleRes
## guarantees request is freed before the exception propagates.
try:
handleRes(res, request)
except Exception as exc:
error "Unexpected exception in handleRes", error = exc.msg
proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
## FFI thread body that attends library user API requests
ffiCurrentEventRegistry = addr ctx[].eventRegistry
onFFIThread = true
logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)
defer:
onFFIThread = false
# Unblocks destroyFFIContext's bounded wait so cleanup can proceed.
let fireRes = ctx.threadExitSignal.fireSync()
if fireRes.isErr():
error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error
let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} =
var ffiReqHandler: T
## Holds the main library object, i.e., in charge of handling the ffi requests.
## e.g., Waku, LibP2P, SDS, etc.
## In-flight processRequest futures. Tracked so they can be drained on
## shutdown — otherwise destroying the context while a handler is
## awaiting (e.g. sleepAsync) abandons the future and leaks the
## request's envelope/reqId/payload allocations.
var pending: seq[Future[void]] = @[]
proc reapCompleted() =
var i = 0
while i < pending.len:
if pending[i].finished():
pending.del(i)
else:
inc i
while ctx.running.load():
# Freezes if a sync handler blocks the dispatcher; event thread reads to detect wedged FFI thread.
discard ctx.ffiHeartbeat.fetchAdd(1)
reapCompleted()
let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds)
if not gotSignal:
continue
## Wait for a request from the ffi consumer thread
var request: ptr FFIThreadRequest
if not ctx.reqChannel.tryRecv(request):
continue
if ctx.myLib.isNil():
ctx.myLib = addr ffiReqHandler
## Handle the request
pending.add processRequest(request, ctx)
let fireRes = ctx.reqReceivedSignal.fireSync()
if fireRes.isErr():
error "could not fireSync back to requester thread", error = fireRes.error
## Drain in-flight handlers so each request's `deleteRequest` runs
## before we exit. Without this, abandoning a future mid-await would
## leak the request allocations (visible to LSan; previously hidden
## because Nim's pool allocator kept the chunks alive in the process).
reapCompleted()
if pending.len > 0:
try:
await allFutures(pending)
except CatchableError as exc:
error "draining pending FFI requests on shutdown raised", error = exc.msg
waitFor ffiRun(ctx)