mirror of
https://github.com/logos-messaging/nim-ffi.git
synced 2026-06-21 16:59:30 +00:00
fix: pr comments
This commit is contained in:
parent
8ac3b5d52c
commit
01687ce287
@ -37,14 +37,13 @@ proc dispatchToListeners[T](
|
||||
)
|
||||
|
||||
proc emitLivenessEvent[T, P](ctx: ptr FFIContext[T], name: string, payload: P) =
|
||||
## Encodes a zero-field liveness event (`NotRespondingEvent`,
|
||||
## `RespondingEvent`) and dispatches it directly to listeners, bypassing
|
||||
## the event queue (which may itself be wedged). Runs on the event thread.
|
||||
## Encodes a liveness event and dispatches directly to listeners (bypassing
|
||||
## the queue, which may be wedged). Runs on the event thread.
|
||||
let event =
|
||||
try:
|
||||
EventEnvelope[P](eventType: name, payload: payload).cborEncode()
|
||||
except CatchableError as exc:
|
||||
chronicles.error "liveness event encode failed", name = name, err = exc.msg
|
||||
except CatchableError as e:
|
||||
chronicles.error "liveness event encode failed", name = name, err = e.msg
|
||||
return
|
||||
let dataPtr: pointer =
|
||||
if event.len > 0: cast[pointer](unsafeAddr event[0])
|
||||
@ -55,9 +54,8 @@ proc onNotResponding*(ctx: ptr FFIContext) =
|
||||
emitLivenessEvent(ctx, NotRespondingEventName, NotRespondingEvent())
|
||||
|
||||
proc onResponding*(ctx: ptr FFIContext) =
|
||||
## Fired once when the FFI thread's heartbeat starts advancing again
|
||||
## after a `NotRespondingEvent`. Lets consumers clear any "library
|
||||
## hung" UI state without polling.
|
||||
## Fired once when the heartbeat resumes after a NotRespondingEvent.
|
||||
## Lets consumers clear any "library hung" UI state without polling.
|
||||
emitLivenessEvent(ctx, RespondingEventName, RespondingEvent())
|
||||
|
||||
proc dispatchQueuedEvent[T](ctx: ptr FFIContext[T], qe: QueuedEvent) =
|
||||
@ -89,10 +87,8 @@ proc init(T: type HeartbeatMonitor, ctx: ptr FFIContext): T =
|
||||
)
|
||||
|
||||
proc check[T](hb: var HeartbeatMonitor, ctx: ptr FFIContext[T]) =
|
||||
## Fires `onNotResponding` once the FFI thread's heartbeat counter stops
|
||||
## advancing past the stale threshold, and fires `onResponding` once it
|
||||
## starts advancing again. Both transitions latch so each is emitted at
|
||||
## most once per stall episode.
|
||||
## Fires onNotResponding / onResponding on heartbeat stall / recovery.
|
||||
## Both transitions latch — each fires at most once per stall episode.
|
||||
if Moment.now() - hb.startedAt <= FFIHeartbeatStartDelay:
|
||||
return
|
||||
let cur = ctx.ffiHeartbeat.load()
|
||||
@ -109,7 +105,7 @@ proc check[T](hb: var HeartbeatMonitor, ctx: ptr FFIContext[T]) =
|
||||
|
||||
proc eventRun[T](ctx: ptr FFIContext[T]) {.async.} =
|
||||
var hb = HeartbeatMonitor.init(ctx)
|
||||
var notifiedStuck = false
|
||||
var notifiedStuck = false # latched forever — eventQueueStuck is sticky terminal.
|
||||
|
||||
while ctx.running.load():
|
||||
# Wake on enqueue or tick — whichever first.
|
||||
@ -136,5 +132,5 @@ proc eventThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
|
||||
try:
|
||||
waitFor eventRun(ctx)
|
||||
except CatchableError as exc:
|
||||
error "event thread exited with exception", error = exc.msg
|
||||
except CatchableError as e:
|
||||
error "event thread exited with exception", error = e.msg
|
||||
|
||||
@ -79,7 +79,7 @@ proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
|
||||
template newSignalOrErr(field: untyped, name: string) =
|
||||
field = ThreadSignalPtr.new().valueOr:
|
||||
return err("couldn't create " & name & " ThreadSignalPtr: " & $error)
|
||||
return err("couldn't create ThreadSignalPtr: " & name & ": " & $error)
|
||||
|
||||
proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
## On failure, the deferred cleanup closes partial state; caller releases
|
||||
@ -133,22 +133,22 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
return err("failed to create the event thread: " & getCurrentExceptionMsg())
|
||||
|
||||
success = true
|
||||
return ok()
|
||||
ok()
|
||||
|
||||
proc fireOrErr(sig: ThreadSignalPtr, name: string): Result[void, string] =
|
||||
let fired = sig.fireSync().valueOr:
|
||||
return err("error signaling " & name & ": " & $error)
|
||||
return err("error signaling: " & name & ": " & $error)
|
||||
if not fired:
|
||||
return err("failed to signal " & name & " on time")
|
||||
return err("failed to signal: " & name & " on time")
|
||||
ok()
|
||||
|
||||
proc waitExitOrErr(
|
||||
sig: ThreadSignalPtr, name: string, timeout: Duration
|
||||
): Result[void, string] =
|
||||
let exited = sig.waitSync(timeout).valueOr:
|
||||
return err("error waiting for " & name & " exit: " & $error)
|
||||
return err("error waiting for exit: " & name & ": " & $error)
|
||||
if not exited:
|
||||
return err(name & " did not exit in time; leaking ctx to avoid hang")
|
||||
return err("did not exit in time: " & name & " (leaking ctx to avoid hang)")
|
||||
ok()
|
||||
|
||||
proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
@ -176,7 +176,7 @@ proc stopAndJoinThreads*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
joinThread(ctx.ffiThread)
|
||||
?ctx.eventThreadExitSignal.waitExitOrErr("event thread", ThreadExitTimeout)
|
||||
joinThread(ctx.eventThread)
|
||||
return ok()
|
||||
ok()
|
||||
|
||||
proc clearContext[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
## Stops a heap-allocated FFI context.
|
||||
@ -184,4 +184,4 @@ proc clearContext[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
return err("clearContext: " & $error)
|
||||
ctx.cleanUpResources().isOkOr:
|
||||
return err("cleanUpResources failed: " & $error)
|
||||
return ok()
|
||||
ok()
|
||||
|
||||
@ -36,9 +36,8 @@ proc initEventRegistry*(reg: var FFIEventRegistry) =
|
||||
reg.byEvent = initTable[string, seq[FFIEventListener]]()
|
||||
|
||||
proc deinitEventRegistry*(reg: var FFIEventRegistry) =
|
||||
## Mirror of `initEventRegistry`; same single-thread constraint.
|
||||
## Resets GC fields so pool-slot reuse on another thread doesn't fire
|
||||
## Nim's hidden assignment dtor against this thread's heap.
|
||||
## Mirror of `initEventRegistry`; same single-thread constraint. Resets GC
|
||||
## fields so pool-slot reuse on another thread sees no hidden dtor.
|
||||
reg.lock.deinitLock()
|
||||
reg.byEvent = default(Table[string, seq[FFIEventListener]])
|
||||
reg.nextId = 0'u64
|
||||
@ -167,11 +166,11 @@ proc tryDequeueEvent*(q: var EventQueue): Option[QueuedEvent] {.raises: [], gcsa
|
||||
withLock q.lock:
|
||||
if q.count == 0:
|
||||
return none(QueuedEvent)
|
||||
let e = q.buf[q.head]
|
||||
let dequeued = q.buf[q.head]
|
||||
q.buf[q.head] = QueuedEvent(name: nil, data: nil, dataLen: 0)
|
||||
q.head = (q.head + 1) mod EventQueueCapacity
|
||||
q.count.dec()
|
||||
return some(e)
|
||||
return some(dequeued)
|
||||
|
||||
proc eventQueueLen*(q: var EventQueue): int {.raises: [], gcsafe.} =
|
||||
withLock q.lock:
|
||||
@ -179,9 +178,8 @@ proc eventQueueLen*(q: var EventQueue): int {.raises: [], gcsafe.} =
|
||||
|
||||
|
||||
const emptyListenerPayload*: cstring = ""
|
||||
## Non-nil zero-length buffer handed to listeners when a payload is
|
||||
## empty, so a consumer doing `std::string(data, len)` / `memcpy` never
|
||||
## receives a nil pointer (which is UB even at len 0).
|
||||
## Non-nil zero-length buffer handed to listeners when the payload is empty
|
||||
## (a nil pointer would be UB for consumers doing `memcpy` even at len 0).
|
||||
|
||||
proc notifyListeners*(
|
||||
listeners: seq[FFIEventListener], retCode: cint, data: pointer, dataLen: int
|
||||
@ -221,7 +219,7 @@ template enqueueOrMarkStuck(
|
||||
dataLen: int,
|
||||
) =
|
||||
## Takes ownership of `namePtr`/`dataPtr`. On queue-full sets the sticky
|
||||
## stuck flag and wakes the event thread (firing onNotResponding here
|
||||
## stuck flag and wakes the event thread (firing onNotResponding from here
|
||||
## would risk deadlock against a back-pressuring listener).
|
||||
block enqueueBlock:
|
||||
let q = ffiCurrentEventQueue
|
||||
@ -242,9 +240,8 @@ template enqueueOrMarkStuck(
|
||||
ffiCurrentNotifyEventEnqueued()
|
||||
|
||||
template dispatchFFIEvent*(eventName: string, body: untyped) =
|
||||
## `body` must yield `string` or `seq[byte]`. Runs on the FFI thread —
|
||||
## encodes into a `c_malloc` buffer and enqueues; the event thread
|
||||
## fans out to listeners.
|
||||
## `body` must yield `string` / `seq[byte]`. FFI thread only: encodes into
|
||||
## a `c_malloc` buffer and enqueues; the event thread fans out to listeners.
|
||||
block:
|
||||
let evtName: string = eventName
|
||||
let bodyVal = body
|
||||
@ -257,10 +254,8 @@ template dispatchFFIEvent*(eventName: string, body: untyped) =
|
||||
enqueueOrMarkStuck(evtName, namePtr, dataPtr, dataLen)
|
||||
|
||||
template dispatchFFIEventCbor*(eventName: string, eventPayload: typed) =
|
||||
## Typed CBOR variant of `dispatchFFIEvent`.
|
||||
## Parameter is `eventPayload`, not `payload` — Nim's template
|
||||
## substitution would otherwise rewrite the `payload:` field inside
|
||||
## `EventEnvelope`.
|
||||
## Typed CBOR variant of `dispatchFFIEvent`. The param is `eventPayload`
|
||||
## (not `payload`) to avoid clobbering `EventEnvelope.payload` substitution.
|
||||
block:
|
||||
let evtName: string = eventName
|
||||
var (dataPtr, dataLen) = cborEncodeShared(
|
||||
|
||||
@ -23,15 +23,11 @@ proc sendRequestToFFIThread*(
|
||||
"reentrant ffi call: a handler invoked sendRequestToFFIThread on its own context"
|
||||
)
|
||||
|
||||
# All async submissions serialise on `ctx.lock` for the full
|
||||
# trySend + fireSync + waitSync sequence because `reqChannel` is
|
||||
# single-producer and `reqReceivedSignal` is shared across callers.
|
||||
# Multi-producer redesign is tracked as PR #23 review item 7.
|
||||
# Serialise the trySend + fireSync + waitSync — reqChannel is SP and reqReceivedSignal is shared.
|
||||
ctx.lock.acquire()
|
||||
defer:
|
||||
ctx.lock.release()
|
||||
|
||||
## Sending the request
|
||||
let sentOk = ctx.reqChannel.trySend(ffiRequest)
|
||||
if not sentOk:
|
||||
deleteRequest(ffiRequest)
|
||||
@ -46,15 +42,12 @@ proc sendRequestToFFIThread*(
|
||||
deleteRequest(ffiRequest)
|
||||
return err("Couldn't fireSync in time")
|
||||
|
||||
## wait until the FFI working thread properly received the request
|
||||
let res = ctx.reqReceivedSignal.waitSync(timeout)
|
||||
if res.isErr():
|
||||
## Do not free ffiRequest here: the FFI thread was already signaled and
|
||||
## will process (and free) it.
|
||||
# FFI thread was signaled and owns the request; don't double-free.
|
||||
return err("Couldn't receive reqReceivedSignal signal")
|
||||
|
||||
## Notice that in case of "ok", the deallocShared(req) is performed by the FFI Thread in the
|
||||
## process proc.
|
||||
# On ok the FFI thread's processRequest deallocShared(req)'s.
|
||||
ok()
|
||||
|
||||
proc processRequest[T](
|
||||
@ -63,40 +56,27 @@ proc processRequest[T](
|
||||
## Invoked within the FFI thread to process a request coming from the FFI API consumer thread.
|
||||
|
||||
let reqId = $request[].reqId
|
||||
## The reqId determines which proc will handle the request.
|
||||
## The registeredRequests represents a table defined at compile time.
|
||||
## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously]
|
||||
|
||||
## Explicit conversion keeps `reqId` alive as the backing string,
|
||||
## avoiding the implicit string→cstring warning that will become an error.
|
||||
let reqIdCs = reqId.cstring
|
||||
let reqIdCs = reqId.cstring # keeps reqId alive; implicit string→cstring is a warning.
|
||||
|
||||
let retFut =
|
||||
if not ctx[].registeredRequests[].contains(reqIdCs):
|
||||
## That shouldn't happen because only registered requests should be sent to the FFI thread.
|
||||
nilProcess(request[].reqId)
|
||||
else:
|
||||
ctx[].registeredRequests[][reqIdCs](cast[pointer](request), ctx)
|
||||
|
||||
## Catch every catchable exception (including CancelledError raised by
|
||||
## the shutdown drain in ffiRun) so handleRes — and its `deleteRequest`
|
||||
## defer — always runs. Otherwise an abandoned in-flight handler would
|
||||
## leak its request envelope, reqId copy, and CBOR payload.
|
||||
# CatchableError covers CancelledError from the shutdown drain; handleRes must still run.
|
||||
let res =
|
||||
try:
|
||||
await retFut
|
||||
except CatchableError as exc:
|
||||
except CatchableError as e:
|
||||
Result[seq[byte], string].err(
|
||||
"Error in processRequest for " & reqId & ": " & exc.msg
|
||||
"Error in processRequest for " & reqId & ": " & e.msg
|
||||
)
|
||||
|
||||
## handleRes may raise (OOM, GC setup) even though it is rare. Catching here
|
||||
## keeps the async proc raises:[] compatible. The defer inside handleRes
|
||||
## guarantees request is freed before the exception propagates.
|
||||
try:
|
||||
handleRes(res, request)
|
||||
except Exception as exc:
|
||||
error "Unexpected exception in handleRes", error = exc.msg
|
||||
except Exception as e:
|
||||
error "Unexpected exception in handleRes", error = e.msg
|
||||
|
||||
var ffiEventQueueSignalPtr {.threadvar.}: ThreadSignalPtr
|
||||
# Stashed so the hook has no closure env.
|
||||
@ -126,23 +106,18 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error
|
||||
|
||||
let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} =
|
||||
var ffiReqHandler: T
|
||||
## Holds the main library object, i.e., in charge of handling the ffi requests.
|
||||
## e.g., Waku, LibP2P, SDS, etc.
|
||||
var ffiReqHandler: T # main library object (Waku, LibP2P, SDS, …)
|
||||
|
||||
## In-flight processRequest futures. Tracked so they can be drained on
|
||||
## shutdown — otherwise destroying the context while a handler is
|
||||
## awaiting (e.g. sleepAsync) abandons the future and leaks the
|
||||
## request's envelope/reqId/payload allocations.
|
||||
# Tracked so shutdown can drain them; abandoning a mid-await future leaks the request.
|
||||
var pending: seq[Future[void]] = @[]
|
||||
|
||||
proc reapCompleted() =
|
||||
var i = 0
|
||||
while i < pending.len:
|
||||
if pending[i].finished():
|
||||
pending.del(i)
|
||||
else:
|
||||
if not pending[i].finished():
|
||||
inc i
|
||||
continue
|
||||
pending.del(i)
|
||||
|
||||
while ctx.running.load():
|
||||
# Freezes if a sync handler blocks the dispatcher; event thread reads to detect wedged FFI thread.
|
||||
@ -154,7 +129,6 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
if not gotSignal:
|
||||
continue
|
||||
|
||||
## Wait for a request from the ffi consumer thread
|
||||
var request: ptr FFIThreadRequest
|
||||
if not ctx.reqChannel.tryRecv(request):
|
||||
continue
|
||||
@ -162,22 +136,18 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
if ctx.myLib.isNil():
|
||||
ctx.myLib = addr ffiReqHandler
|
||||
|
||||
## Handle the request
|
||||
pending.add processRequest(request, ctx)
|
||||
|
||||
let fireRes = ctx.reqReceivedSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
error "could not fireSync back to requester thread", error = fireRes.error
|
||||
|
||||
## Drain in-flight handlers so each request's `deleteRequest` runs
|
||||
## before we exit. Without this, abandoning a future mid-await would
|
||||
## leak the request allocations (visible to LSan; previously hidden
|
||||
## because Nim's pool allocator kept the chunks alive in the process).
|
||||
# Drain so each pending handler's deleteRequest defer runs before exit.
|
||||
reapCompleted()
|
||||
if pending.len > 0:
|
||||
try:
|
||||
await allFutures(pending)
|
||||
except CatchableError as exc:
|
||||
error "draining pending FFI requests on shutdown raised", error = exc.msg
|
||||
except CatchableError as e:
|
||||
error "draining pending FFI requests on shutdown raised", error = e.msg
|
||||
|
||||
waitFor ffiRun(ctx)
|
||||
|
||||
@ -40,7 +40,7 @@ proc captureCb(
|
||||
acquire(d[].lock)
|
||||
d[].retCode = retCode
|
||||
let n = min(int(len), d[].msg.len)
|
||||
if n > 0 and not msg.isNil:
|
||||
if n > 0 and not msg.isNil():
|
||||
copyMem(addr d[].msg[0], msg, n)
|
||||
d[].msgLen = n
|
||||
d[].called = true
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user