mirror of
https://github.com/logos-messaging/nim-ffi.git
synced 2026-06-21 00:40:16 +00:00
Merge branch 'chore/ffi-context-lifecycle' into chore/event-separate-thread
This commit is contained in:
commit
2a73dae218
139
ffi/event_thread.nim
Normal file
139
ffi/event_thread.nim
Normal file
@ -0,0 +1,139 @@
|
||||
## Event-thread body and FFI-thread liveness monitoring.
|
||||
##
|
||||
## Included from `ffi_context.nim` — inherits its imports, FFIContext type,
|
||||
## and the heartbeat-timing constants. Lives alongside `ffi_thread.nim`
|
||||
## so each thread's machinery is readable on its own.
|
||||
##
|
||||
## Responsibilities:
|
||||
## - Drain queued events into listener callbacks.
|
||||
## - Watch `ctx.ffiHeartbeat` and emit `NotRespondingEvent` / `RespondingEvent`
|
||||
## on FFI-thread stall and recovery transitions.
|
||||
|
||||
type
|
||||
NotRespondingEvent* = object
|
||||
RespondingEvent* = object
|
||||
|
||||
const
|
||||
NotRespondingEventName* = "not_responding"
|
||||
RespondingEventName* = "responding"
|
||||
|
||||
proc dispatchToListeners[T](
|
||||
ctx: ptr FFIContext[T], eventName: string, data: pointer, dataLen: int
|
||||
) =
|
||||
## Holds reg.lock for the entire snapshot + invocation so concurrent
|
||||
## add/remove on this registry blocks until dispatch returns.
|
||||
withLock ctx[].eventRegistry.lock:
|
||||
let listeners = ctx[].eventRegistry.byEvent.getOrDefault(eventName)
|
||||
if listeners.len == 0:
|
||||
chronicles.debug "no listener registered", event = eventName
|
||||
return
|
||||
foreignThreadGc:
|
||||
try:
|
||||
notifyListeners(listeners, RET_OK, data, dataLen)
|
||||
except Exception, CatchableError:
|
||||
notifyListenersErr(
|
||||
listeners,
|
||||
"Exception dispatching " & eventName & ": " & getCurrentExceptionMsg(),
|
||||
)
|
||||
|
||||
proc emitLivenessEvent[T, P](ctx: ptr FFIContext[T], name: string, payload: P) =
|
||||
## Encodes a zero-field liveness event (`NotRespondingEvent`,
|
||||
## `RespondingEvent`) and dispatches it directly to listeners, bypassing
|
||||
## the event queue (which may itself be wedged). Runs on the event thread.
|
||||
let event =
|
||||
try:
|
||||
EventEnvelope[P](eventType: name, payload: payload).cborEncode()
|
||||
except CatchableError as exc:
|
||||
chronicles.error "liveness event encode failed", name = name, err = exc.msg
|
||||
return
|
||||
let dataPtr: pointer =
|
||||
if event.len > 0: unsafeAddr event[0] else: nil
|
||||
ctx.dispatchToListeners(name, dataPtr, event.len)
|
||||
|
||||
proc onNotResponding*(ctx: ptr FFIContext) =
|
||||
emitLivenessEvent(ctx, NotRespondingEventName, NotRespondingEvent())
|
||||
|
||||
proc onResponding*(ctx: ptr FFIContext) =
|
||||
## Fired once when the FFI thread's heartbeat starts advancing again
|
||||
## after a `NotRespondingEvent`. Lets consumers clear any "library
|
||||
## hung" UI state without polling.
|
||||
emitLivenessEvent(ctx, RespondingEventName, RespondingEvent())
|
||||
|
||||
proc dispatchQueuedEvent[T](ctx: ptr FFIContext[T], qe: QueuedEvent) =
|
||||
## Frees `qe`'s c_malloc buffers on exit.
|
||||
defer:
|
||||
freeEventBuffers(qe.name, qe.data)
|
||||
ctx.dispatchToListeners($qe.name, qe.data, qe.dataLen)
|
||||
|
||||
proc drainEventQueue[T](ctx: ptr FFIContext[T]) =
|
||||
while true:
|
||||
let opt = ctx.eventQueue.tryDequeueEvent()
|
||||
if opt.isNone():
|
||||
break
|
||||
ctx.dispatchQueuedEvent(opt.get())
|
||||
|
||||
type HeartbeatMonitor = object
|
||||
startedAt: Moment
|
||||
lastChange: Moment
|
||||
lastValue: int64
|
||||
notifiedStale: bool
|
||||
|
||||
proc init(T: type HeartbeatMonitor, ctx: ptr FFIContext): T =
|
||||
let now = Moment.now()
|
||||
T(
|
||||
startedAt: now,
|
||||
lastChange: now,
|
||||
lastValue: ctx.ffiHeartbeat.load(),
|
||||
notifiedStale: false,
|
||||
)
|
||||
|
||||
proc check[T](hb: var HeartbeatMonitor, ctx: ptr FFIContext[T]) =
|
||||
## Fires `onNotResponding` once the FFI thread's heartbeat counter stops
|
||||
## advancing past the stale threshold, and fires `onResponding` once it
|
||||
## starts advancing again. Both transitions latch so each is emitted at
|
||||
## most once per stall episode.
|
||||
if Moment.now() - hb.startedAt <= FFIHeartbeatStartDelay:
|
||||
return
|
||||
let cur = ctx.ffiHeartbeat.load()
|
||||
if cur != hb.lastValue:
|
||||
if hb.notifiedStale:
|
||||
onResponding(ctx)
|
||||
hb.lastValue = cur
|
||||
hb.lastChange = Moment.now()
|
||||
hb.notifiedStale = false
|
||||
elif not hb.notifiedStale and
|
||||
Moment.now() - hb.lastChange > FFIHeartbeatStaleThreshold:
|
||||
onNotResponding(ctx)
|
||||
hb.notifiedStale = true
|
||||
|
||||
proc eventRun[T](ctx: ptr FFIContext[T]) {.async.} =
|
||||
var hb = HeartbeatMonitor.init(ctx)
|
||||
var notifiedStuck = false
|
||||
|
||||
while ctx.running.load():
|
||||
# Wake on enqueue or tick — whichever first.
|
||||
discard await ctx.eventQueueSignal.wait().withTimeout(EventThreadTickInterval)
|
||||
|
||||
ctx.drainEventQueue()
|
||||
|
||||
# Fire after drain so reg.lock is free — FFI-thread would deadlock here.
|
||||
if not notifiedStuck and ctx.eventQueueStuck.load():
|
||||
onNotResponding(ctx)
|
||||
notifiedStuck = true
|
||||
|
||||
if not ctx.running.load():
|
||||
break
|
||||
hb.check(ctx)
|
||||
|
||||
proc eventThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
## Drains the event queue and runs the FFI-thread heartbeat check.
|
||||
## Owns the queued `c_malloc` payloads until dispatch returns.
|
||||
defer:
|
||||
let fireRes = ctx.eventThreadExitSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
error "failed to fire eventThreadExitSignal", err = fireRes.error
|
||||
|
||||
try:
|
||||
waitFor eventRun(ctx)
|
||||
except CatchableError as exc:
|
||||
error "event thread exited with exception", error = exc.msg
|
||||
@ -1,3 +1,9 @@
|
||||
## FFIContext type plus lifecycle (init / signal-stop / join / destroy).
|
||||
##
|
||||
## The per-thread bodies live in `ffi_thread.nim` and `event_thread.nim`,
|
||||
## included below so the thread code can access the private FFIContext
|
||||
## fields without forcing them through a public surface.
|
||||
|
||||
{.passc: "-fPIC".}
|
||||
|
||||
import std/[atomics, locks, options, tables]
|
||||
@ -36,267 +42,8 @@ const
|
||||
FFIHeartbeatStartDelay* = 10.seconds # grace window for library startup
|
||||
FFIHeartbeatStaleThreshold* = 1.seconds
|
||||
|
||||
type NotRespondingEvent* = object
|
||||
|
||||
const NotRespondingEventName* = "not_responding"
|
||||
|
||||
proc encodeNotRespondingEvent(): seq[byte] =
|
||||
EventEnvelope[NotRespondingEvent](
|
||||
eventType: NotRespondingEventName, payload: NotRespondingEvent()
|
||||
).cborEncode()
|
||||
|
||||
proc dispatchToListeners[T](
|
||||
ctx: ptr FFIContext[T], eventName: string, data: pointer, dataLen: int
|
||||
) =
|
||||
## Lock held across the whole fan-out so foreign add/remove blocks
|
||||
## until dispatch returns (UAF-close contract from PR #39).
|
||||
withLock ctx[].eventRegistry.lock:
|
||||
let listeners = ctx[].eventRegistry.byEvent.getOrDefault(eventName)
|
||||
if listeners.len == 0:
|
||||
chronicles.debug "no listener registered", event = eventName
|
||||
return
|
||||
foreignThreadGc:
|
||||
try:
|
||||
notifyListeners(listeners, RET_OK, data, dataLen)
|
||||
except Exception, CatchableError:
|
||||
notifyListenersErr(
|
||||
listeners,
|
||||
"Exception dispatching " & eventName & ": " & getCurrentExceptionMsg(),
|
||||
)
|
||||
|
||||
proc onNotResponding*(ctx: ptr FFIContext) =
|
||||
## Bypasses the (possibly wedged) event queue; runs on the event thread.
|
||||
let event =
|
||||
try:
|
||||
encodeNotRespondingEvent()
|
||||
except CatchableError as e:
|
||||
chronicles.error "onNotResponding - encode failed", err = e.msg
|
||||
return
|
||||
let dataPtr: pointer =
|
||||
if event.len > 0: unsafeAddr event[0] else: nil
|
||||
ctx.dispatchToListeners(NotRespondingEventName, dataPtr, event.len)
|
||||
|
||||
proc sendRequestToFFIThread*(
|
||||
ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration
|
||||
): Result[void, string] =
|
||||
if ctx.eventQueueStuck.load():
|
||||
deleteRequest(ffiRequest)
|
||||
return err("event queue stuck - library cannot accept new requests")
|
||||
|
||||
if onFFIThread:
|
||||
# Re-entrant dispatch from a handler would self-deadlock on `reqReceivedSignal`.
|
||||
deleteRequest(ffiRequest)
|
||||
return err(
|
||||
"reentrant ffi call: a handler invoked sendRequestToFFIThread on its own context"
|
||||
)
|
||||
|
||||
# `reqChannel` is single-producer and `reqReceivedSignal` shared; serialise
|
||||
# the full trySend + fireSync + waitSync. PR #23 review item 7 tracks SP→MP.
|
||||
ctx.lock.acquire()
|
||||
defer:
|
||||
ctx.lock.release()
|
||||
|
||||
let sentOk = ctx.reqChannel.trySend(ffiRequest)
|
||||
if not sentOk:
|
||||
deleteRequest(ffiRequest)
|
||||
return err("Couldn't send a request to the ffi thread")
|
||||
|
||||
let fireSyncRes = ctx.reqSignal.fireSync()
|
||||
if fireSyncRes.isErr():
|
||||
deleteRequest(ffiRequest)
|
||||
return err("failed fireSync: " & $fireSyncRes.error)
|
||||
|
||||
if fireSyncRes.get() == false:
|
||||
deleteRequest(ffiRequest)
|
||||
return err("Couldn't fireSync in time")
|
||||
|
||||
let res = ctx.reqReceivedSignal.waitSync(timeout)
|
||||
if res.isErr():
|
||||
# FFI thread was already signaled; it owns ffiRequest now.
|
||||
return err("Couldn't receive reqReceivedSignal signal")
|
||||
|
||||
ok()
|
||||
|
||||
proc processRequest[T](
|
||||
request: ptr FFIThreadRequest, ctx: ptr FFIContext[T]
|
||||
) {.async.} =
|
||||
let reqId = $request[].reqId
|
||||
# Keep `reqId` alive as backing for the cstring view.
|
||||
let reqIdCs = reqId.cstring
|
||||
|
||||
let retFut =
|
||||
if not ctx[].registeredRequests[].contains(reqIdCs):
|
||||
nilProcess(request[].reqId)
|
||||
else:
|
||||
ctx[].registeredRequests[][reqIdCs](cast[pointer](request), ctx)
|
||||
|
||||
# Catch all (incl. CancelledError from the shutdown drain) so handleRes —
|
||||
# and its `deleteRequest` defer — always runs.
|
||||
let res =
|
||||
try:
|
||||
await retFut
|
||||
except CatchableError as e:
|
||||
Result[seq[byte], string].err(
|
||||
"Error in processRequest for " & reqId & ": " & e.msg
|
||||
)
|
||||
|
||||
# handleRes may raise (rare: OOM, GC setup); keep `raises: []`.
|
||||
try:
|
||||
handleRes(res, request)
|
||||
except Exception as e:
|
||||
error "Unexpected exception in handleRes", error = e.msg
|
||||
|
||||
var ffiEventQueueSignalPtr {.threadvar.}: ThreadSignalPtr
|
||||
# Stashed so the hook has no closure env.
|
||||
|
||||
proc ffiNotifyEventEnqueuedHook() {.gcsafe, raises: [].} =
|
||||
if not ffiEventQueueSignalPtr.isNil():
|
||||
let res = ffiEventQueueSignalPtr.fireSync()
|
||||
if res.isErr():
|
||||
error "failed to fire eventQueueSignal after enqueue", err = res.error
|
||||
|
||||
proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
ffiCurrentEventRegistry = addr ctx[].eventRegistry
|
||||
ffiCurrentEventQueue = addr ctx[].eventQueue
|
||||
ffiCurrentEventQueueStuck = addr ctx[].eventQueueStuck
|
||||
ffiEventQueueSignalPtr = ctx.eventQueueSignal
|
||||
ffiCurrentNotifyEventEnqueued = ffiNotifyEventEnqueuedHook
|
||||
onFFIThread = true
|
||||
|
||||
logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)
|
||||
|
||||
defer:
|
||||
onFFIThread = false
|
||||
# Unblocks destroyFFIContext's bounded wait.
|
||||
let fireRes = ctx.threadExitSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error
|
||||
|
||||
let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} =
|
||||
var ffiReqHandler: T
|
||||
|
||||
# Track in-flight handlers so shutdown can drain them — otherwise
|
||||
# abandoned futures leak request envelope/reqId/payload.
|
||||
var pending: seq[Future[void]] = @[]
|
||||
|
||||
proc reapCompleted() =
|
||||
var i = 0
|
||||
while i < pending.len:
|
||||
if pending[i].finished():
|
||||
pending.del(i)
|
||||
else:
|
||||
inc i
|
||||
|
||||
while ctx.running.load():
|
||||
# Freezes if a sync handler blocks; event thread reads for liveness.
|
||||
discard ctx.ffiHeartbeat.fetchAdd(1)
|
||||
|
||||
reapCompleted()
|
||||
|
||||
let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds)
|
||||
if not gotSignal:
|
||||
continue
|
||||
|
||||
var request: ptr FFIThreadRequest
|
||||
if not ctx.reqChannel.tryRecv(request):
|
||||
continue
|
||||
|
||||
if ctx.myLib.isNil():
|
||||
ctx.myLib = addr ffiReqHandler
|
||||
|
||||
pending.add processRequest(request, ctx)
|
||||
|
||||
let fireRes = ctx.reqReceivedSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
error "could not fireSync back to requester thread", error = fireRes.error
|
||||
|
||||
# Drain in-flight handlers so each request's `deleteRequest` runs.
|
||||
reapCompleted()
|
||||
if pending.len > 0:
|
||||
try:
|
||||
await allFutures(pending)
|
||||
except CatchableError as e:
|
||||
error "draining pending FFI requests on shutdown raised", error = e.msg
|
||||
|
||||
waitFor ffiRun(ctx)
|
||||
|
||||
proc dispatchQueuedEvent[T](ctx: ptr FFIContext[T], qe: QueuedEvent) =
|
||||
defer:
|
||||
freeEventBuffers(qe.name, qe.data)
|
||||
ctx.dispatchToListeners($qe.name, qe.data, qe.dataLen)
|
||||
|
||||
proc drainEventQueue[T](ctx: ptr FFIContext[T]) =
|
||||
while true:
|
||||
let opt = ctx.eventQueue.tryDequeueEvent()
|
||||
if opt.isNone():
|
||||
break
|
||||
ctx.dispatchQueuedEvent(opt.get())
|
||||
|
||||
type HeartbeatMonitor = object
|
||||
startedAt: Moment
|
||||
lastChange: Moment
|
||||
lastValue: int64
|
||||
notifiedStale: bool
|
||||
|
||||
proc init(T: type HeartbeatMonitor, ctx: ptr FFIContext): T =
|
||||
let now = Moment.now()
|
||||
T(
|
||||
startedAt: now,
|
||||
lastChange: now,
|
||||
lastValue: ctx.ffiHeartbeat.load(),
|
||||
notifiedStale: false,
|
||||
)
|
||||
|
||||
proc check[T](hb: var HeartbeatMonitor, ctx: ptr FFIContext[T]) =
|
||||
## Fires onNotResponding once the heartbeat stalls past the threshold;
|
||||
## latches until it moves again.
|
||||
if Moment.now() - hb.startedAt <= FFIHeartbeatStartDelay:
|
||||
return
|
||||
|
||||
let cur = ctx.ffiHeartbeat.load()
|
||||
if cur != hb.lastValue:
|
||||
hb.lastValue = cur
|
||||
hb.lastChange = Moment.now()
|
||||
hb.notifiedStale = false
|
||||
return
|
||||
|
||||
if hb.notifiedStale:
|
||||
return
|
||||
if Moment.now() - hb.lastChange <= FFIHeartbeatStaleThreshold:
|
||||
return
|
||||
onNotResponding(ctx)
|
||||
hb.notifiedStale = true
|
||||
|
||||
proc eventRun[T](ctx: ptr FFIContext[T]) {.async.} =
|
||||
var hb = HeartbeatMonitor.init(ctx)
|
||||
var notifiedStuck = false
|
||||
|
||||
while ctx.running.load():
|
||||
# Wake on enqueue or tick — whichever first.
|
||||
discard await ctx.eventQueueSignal.wait().withTimeout(EventThreadTickInterval)
|
||||
|
||||
ctx.drainEventQueue()
|
||||
|
||||
# Fire after drain so reg.lock is free — FFI-thread would deadlock here.
|
||||
if not notifiedStuck and ctx.eventQueueStuck.load():
|
||||
onNotResponding(ctx)
|
||||
notifiedStuck = true
|
||||
|
||||
if not ctx.running.load():
|
||||
break
|
||||
hb.check(ctx)
|
||||
|
||||
proc eventThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
## Owns queued `c_malloc` payloads until dispatch returns.
|
||||
defer:
|
||||
let fireRes = ctx.eventThreadExitSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
error "failed to fire eventThreadExitSignal", err = fireRes.error
|
||||
|
||||
try:
|
||||
waitFor eventRun(ctx)
|
||||
except CatchableError as e:
|
||||
error "event thread exited with exception", error = e.msg
|
||||
include ./event_thread
|
||||
include ./ffi_thread
|
||||
|
||||
template closeAndNil(field: untyped) =
|
||||
if not field.isNil():
|
||||
|
||||
183
ffi/ffi_thread.nim
Normal file
183
ffi/ffi_thread.nim
Normal file
@ -0,0 +1,183 @@
|
||||
## FFI-thread body and request submission API.
|
||||
##
|
||||
## Included from `ffi_context.nim` — inherits its imports, FFIContext type,
|
||||
## and the `onFFIThread` threadvar. Companion to `event_thread.nim`.
|
||||
##
|
||||
## Responsibilities:
|
||||
## - Receive `FFIThreadRequest`s from foreign threads via `reqChannel` and
|
||||
## dispatch them through the user-registered handler table.
|
||||
## - Advance `ctx.ffiHeartbeat` each loop iteration so the event thread can
|
||||
## detect a wedged FFI thread.
|
||||
|
||||
proc sendRequestToFFIThread*(
|
||||
ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration
|
||||
): Result[void, string] =
|
||||
if ctx.eventQueueStuck.load():
|
||||
deleteRequest(ffiRequest)
|
||||
return err("event queue stuck - library cannot accept new requests")
|
||||
|
||||
if onFFIThread:
|
||||
# Re-entrant dispatch from a handler would self-deadlock on `reqReceivedSignal`.
|
||||
deleteRequest(ffiRequest)
|
||||
return err(
|
||||
"reentrant ffi call: a handler invoked sendRequestToFFIThread on its own context"
|
||||
)
|
||||
|
||||
# All async submissions serialise on `ctx.lock` for the full
|
||||
# trySend + fireSync + waitSync sequence because `reqChannel` is
|
||||
# single-producer and `reqReceivedSignal` is shared across callers.
|
||||
# Multi-producer redesign is tracked as PR #23 review item 7.
|
||||
ctx.lock.acquire()
|
||||
defer:
|
||||
ctx.lock.release()
|
||||
|
||||
## Sending the request
|
||||
let sentOk = ctx.reqChannel.trySend(ffiRequest)
|
||||
if not sentOk:
|
||||
deleteRequest(ffiRequest)
|
||||
return err("Couldn't send a request to the ffi thread")
|
||||
|
||||
let fireSyncRes = ctx.reqSignal.fireSync()
|
||||
if fireSyncRes.isErr():
|
||||
deleteRequest(ffiRequest)
|
||||
return err("failed fireSync: " & $fireSyncRes.error)
|
||||
|
||||
if fireSyncRes.get() == false:
|
||||
deleteRequest(ffiRequest)
|
||||
return err("Couldn't fireSync in time")
|
||||
|
||||
## wait until the FFI working thread properly received the request
|
||||
let res = ctx.reqReceivedSignal.waitSync(timeout)
|
||||
if res.isErr():
|
||||
## Do not free ffiRequest here: the FFI thread was already signaled and
|
||||
## will process (and free) it.
|
||||
return err("Couldn't receive reqReceivedSignal signal")
|
||||
|
||||
## Notice that in case of "ok", the deallocShared(req) is performed by the FFI Thread in the
|
||||
## process proc.
|
||||
ok()
|
||||
|
||||
proc processRequest[T](
|
||||
request: ptr FFIThreadRequest, ctx: ptr FFIContext[T]
|
||||
) {.async.} =
|
||||
## Invoked within the FFI thread to process a request coming from the FFI API consumer thread.
|
||||
|
||||
let reqId = $request[].reqId
|
||||
## The reqId determines which proc will handle the request.
|
||||
## The registeredRequests represents a table defined at compile time.
|
||||
## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously]
|
||||
|
||||
## Explicit conversion keeps `reqId` alive as the backing string,
|
||||
## avoiding the implicit string→cstring warning that will become an error.
|
||||
let reqIdCs = reqId.cstring
|
||||
|
||||
let retFut =
|
||||
if not ctx[].registeredRequests[].contains(reqIdCs):
|
||||
## That shouldn't happen because only registered requests should be sent to the FFI thread.
|
||||
nilProcess(request[].reqId)
|
||||
else:
|
||||
ctx[].registeredRequests[][reqIdCs](cast[pointer](request), ctx)
|
||||
|
||||
## Catch every catchable exception (including CancelledError raised by
|
||||
## the shutdown drain in ffiRun) so handleRes — and its `deleteRequest`
|
||||
## defer — always runs. Otherwise an abandoned in-flight handler would
|
||||
## leak its request envelope, reqId copy, and CBOR payload.
|
||||
let res =
|
||||
try:
|
||||
await retFut
|
||||
except CatchableError as exc:
|
||||
Result[seq[byte], string].err(
|
||||
"Error in processRequest for " & reqId & ": " & exc.msg
|
||||
)
|
||||
|
||||
## handleRes may raise (OOM, GC setup) even though it is rare. Catching here
|
||||
## keeps the async proc raises:[] compatible. The defer inside handleRes
|
||||
## guarantees request is freed before the exception propagates.
|
||||
try:
|
||||
handleRes(res, request)
|
||||
except Exception as exc:
|
||||
error "Unexpected exception in handleRes", error = exc.msg
|
||||
|
||||
var ffiEventQueueSignalPtr {.threadvar.}: ThreadSignalPtr
|
||||
# Stashed so the hook has no closure env.
|
||||
|
||||
proc ffiNotifyEventEnqueuedHook() {.gcsafe, raises: [].} =
|
||||
if not ffiEventQueueSignalPtr.isNil():
|
||||
let res = ffiEventQueueSignalPtr.fireSync()
|
||||
if res.isErr():
|
||||
error "failed to fire eventQueueSignal after enqueue", err = res.error
|
||||
|
||||
proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
## FFI thread body that attends library user API requests
|
||||
ffiCurrentEventRegistry = addr ctx[].eventRegistry
|
||||
ffiCurrentEventQueue = addr ctx[].eventQueue
|
||||
ffiCurrentEventQueueStuck = addr ctx[].eventQueueStuck
|
||||
ffiEventQueueSignalPtr = ctx.eventQueueSignal
|
||||
ffiCurrentNotifyEventEnqueued = ffiNotifyEventEnqueuedHook
|
||||
onFFIThread = true
|
||||
|
||||
logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)
|
||||
|
||||
defer:
|
||||
onFFIThread = false
|
||||
# Unblocks destroyFFIContext's bounded wait so cleanup can proceed.
|
||||
let fireRes = ctx.threadExitSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error
|
||||
|
||||
let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} =
|
||||
var ffiReqHandler: T
|
||||
## Holds the main library object, i.e., in charge of handling the ffi requests.
|
||||
## e.g., Waku, LibP2P, SDS, etc.
|
||||
|
||||
## In-flight processRequest futures. Tracked so they can be drained on
|
||||
## shutdown — otherwise destroying the context while a handler is
|
||||
## awaiting (e.g. sleepAsync) abandons the future and leaks the
|
||||
## request's envelope/reqId/payload allocations.
|
||||
var pending: seq[Future[void]] = @[]
|
||||
|
||||
proc reapCompleted() =
|
||||
var i = 0
|
||||
while i < pending.len:
|
||||
if pending[i].finished():
|
||||
pending.del(i)
|
||||
else:
|
||||
inc i
|
||||
|
||||
while ctx.running.load():
|
||||
# Freezes if a sync handler blocks the dispatcher; event thread reads to detect wedged FFI thread.
|
||||
discard ctx.ffiHeartbeat.fetchAdd(1)
|
||||
|
||||
reapCompleted()
|
||||
|
||||
let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds)
|
||||
if not gotSignal:
|
||||
continue
|
||||
|
||||
## Wait for a request from the ffi consumer thread
|
||||
var request: ptr FFIThreadRequest
|
||||
if not ctx.reqChannel.tryRecv(request):
|
||||
continue
|
||||
|
||||
if ctx.myLib.isNil():
|
||||
ctx.myLib = addr ffiReqHandler
|
||||
|
||||
## Handle the request
|
||||
pending.add processRequest(request, ctx)
|
||||
|
||||
let fireRes = ctx.reqReceivedSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
error "could not fireSync back to requester thread", error = fireRes.error
|
||||
|
||||
## Drain in-flight handlers so each request's `deleteRequest` runs
|
||||
## before we exit. Without this, abandoning a future mid-await would
|
||||
## leak the request allocations (visible to LSan; previously hidden
|
||||
## because Nim's pool allocator kept the chunks alive in the process).
|
||||
reapCompleted()
|
||||
if pending.len > 0:
|
||||
try:
|
||||
await allFutures(pending)
|
||||
except CatchableError as exc:
|
||||
error "draining pending FFI requests on shutdown raised", error = exc.msg
|
||||
|
||||
waitFor ffiRun(ctx)
|
||||
Loading…
x
Reference in New Issue
Block a user