diff --git a/ffi/event_thread.nim b/ffi/event_thread.nim index 921555b..bfbb93f 100644 --- a/ffi/event_thread.nim +++ b/ffi/event_thread.nim @@ -37,14 +37,13 @@ proc dispatchToListeners[T]( ) proc emitLivenessEvent[T, P](ctx: ptr FFIContext[T], name: string, payload: P) = - ## Encodes a zero-field liveness event (`NotRespondingEvent`, - ## `RespondingEvent`) and dispatches it directly to listeners, bypassing - ## the event queue (which may itself be wedged). Runs on the event thread. + ## Encodes a liveness event and dispatches directly to listeners (bypassing + ## the queue, which may be wedged). Runs on the event thread. let event = try: EventEnvelope[P](eventType: name, payload: payload).cborEncode() - except CatchableError as exc: - chronicles.error "liveness event encode failed", name = name, err = exc.msg + except CatchableError as e: + chronicles.error "liveness event encode failed", name = name, err = e.msg return let dataPtr: pointer = if event.len > 0: cast[pointer](unsafeAddr event[0]) @@ -55,9 +54,8 @@ proc onNotResponding*(ctx: ptr FFIContext) = emitLivenessEvent(ctx, NotRespondingEventName, NotRespondingEvent()) proc onResponding*(ctx: ptr FFIContext) = - ## Fired once when the FFI thread's heartbeat starts advancing again - ## after a `NotRespondingEvent`. Lets consumers clear any "library - ## hung" UI state without polling. + ## Fired once when the heartbeat resumes after a NotRespondingEvent. + ## Lets consumers clear any "library hung" UI state without polling. emitLivenessEvent(ctx, RespondingEventName, RespondingEvent()) proc dispatchQueuedEvent[T](ctx: ptr FFIContext[T], qe: QueuedEvent) = @@ -89,10 +87,8 @@ proc init(T: type HeartbeatMonitor, ctx: ptr FFIContext): T = ) proc check[T](hb: var HeartbeatMonitor, ctx: ptr FFIContext[T]) = - ## Fires `onNotResponding` once the FFI thread's heartbeat counter stops - ## advancing past the stale threshold, and fires `onResponding` once it - ## starts advancing again. Both transitions latch so each is emitted at - ## most once per stall episode. + ## Fires onNotResponding / onResponding on heartbeat stall / recovery. + ## Both transitions latch — each fires at most once per stall episode. if Moment.now() - hb.startedAt <= FFIHeartbeatStartDelay: return let cur = ctx.ffiHeartbeat.load() @@ -109,7 +105,7 @@ proc check[T](hb: var HeartbeatMonitor, ctx: ptr FFIContext[T]) = proc eventRun[T](ctx: ptr FFIContext[T]) {.async.} = var hb = HeartbeatMonitor.init(ctx) - var notifiedStuck = false + var notifiedStuck = false # latched forever — eventQueueStuck is sticky terminal. while ctx.running.load(): # Wake on enqueue or tick — whichever first. @@ -136,5 +132,5 @@ proc eventThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = try: waitFor eventRun(ctx) - except CatchableError as exc: - error "event thread exited with exception", error = exc.msg + except CatchableError as e: + error "event thread exited with exception", error = e.msg diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index 811b920..4ec7c9d 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -79,7 +79,7 @@ proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] = template newSignalOrErr(field: untyped, name: string) = field = ThreadSignalPtr.new().valueOr: - return err("couldn't create " & name & " ThreadSignalPtr: " & $error) + return err("couldn't create ThreadSignalPtr: " & name & ": " & $error) proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = ## On failure, the deferred cleanup closes partial state; caller releases @@ -133,22 +133,22 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = return err("failed to create the event thread: " & getCurrentExceptionMsg()) success = true - return ok() + ok() proc fireOrErr(sig: ThreadSignalPtr, name: string): Result[void, string] = let fired = sig.fireSync().valueOr: - return err("error signaling " & name & ": " & $error) + return err("error signaling: " & name & ": " & $error) if not fired: - return err("failed to signal " & name & " on time") + return err("failed to signal: " & name & " on time") ok() proc waitExitOrErr( sig: ThreadSignalPtr, name: string, timeout: Duration ): Result[void, string] = let exited = sig.waitSync(timeout).valueOr: - return err("error waiting for " & name & " exit: " & $error) + return err("error waiting for exit: " & name & ": " & $error) if not exited: - return err(name & " did not exit in time; leaking ctx to avoid hang") + return err("did not exit in time: " & name & " (leaking ctx to avoid hang)") ok() proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] = @@ -176,7 +176,7 @@ proc stopAndJoinThreads*[T](ctx: ptr FFIContext[T]): Result[void, string] = joinThread(ctx.ffiThread) ?ctx.eventThreadExitSignal.waitExitOrErr("event thread", ThreadExitTimeout) joinThread(ctx.eventThread) - return ok() + ok() proc clearContext[T](ctx: ptr FFIContext[T]): Result[void, string] = ## Stops a heap-allocated FFI context. @@ -184,4 +184,4 @@ proc clearContext[T](ctx: ptr FFIContext[T]): Result[void, string] = return err("clearContext: " & $error) ctx.cleanUpResources().isOkOr: return err("cleanUpResources failed: " & $error) - return ok() + ok() diff --git a/ffi/ffi_events.nim b/ffi/ffi_events.nim index 4ea4b5b..c66ca34 100644 --- a/ffi/ffi_events.nim +++ b/ffi/ffi_events.nim @@ -36,9 +36,8 @@ proc initEventRegistry*(reg: var FFIEventRegistry) = reg.byEvent = initTable[string, seq[FFIEventListener]]() proc deinitEventRegistry*(reg: var FFIEventRegistry) = - ## Mirror of `initEventRegistry`; same single-thread constraint. - ## Resets GC fields so pool-slot reuse on another thread doesn't fire - ## Nim's hidden assignment dtor against this thread's heap. + ## Mirror of `initEventRegistry`; same single-thread constraint. Resets GC + ## fields so pool-slot reuse on another thread sees no hidden dtor. reg.lock.deinitLock() reg.byEvent = default(Table[string, seq[FFIEventListener]]) reg.nextId = 0'u64 @@ -167,11 +166,11 @@ proc tryDequeueEvent*(q: var EventQueue): Option[QueuedEvent] {.raises: [], gcsa withLock q.lock: if q.count == 0: return none(QueuedEvent) - let e = q.buf[q.head] + let dequeued = q.buf[q.head] q.buf[q.head] = QueuedEvent(name: nil, data: nil, dataLen: 0) q.head = (q.head + 1) mod EventQueueCapacity q.count.dec() - return some(e) + return some(dequeued) proc eventQueueLen*(q: var EventQueue): int {.raises: [], gcsafe.} = withLock q.lock: @@ -179,9 +178,8 @@ proc eventQueueLen*(q: var EventQueue): int {.raises: [], gcsafe.} = const emptyListenerPayload*: cstring = "" - ## Non-nil zero-length buffer handed to listeners when a payload is - ## empty, so a consumer doing `std::string(data, len)` / `memcpy` never - ## receives a nil pointer (which is UB even at len 0). + ## Non-nil zero-length buffer handed to listeners when the payload is empty + ## (a nil pointer would be UB for consumers doing `memcpy` even at len 0). proc notifyListeners*( listeners: seq[FFIEventListener], retCode: cint, data: pointer, dataLen: int @@ -221,7 +219,7 @@ template enqueueOrMarkStuck( dataLen: int, ) = ## Takes ownership of `namePtr`/`dataPtr`. On queue-full sets the sticky - ## stuck flag and wakes the event thread (firing onNotResponding here + ## stuck flag and wakes the event thread (firing onNotResponding from here ## would risk deadlock against a back-pressuring listener). block enqueueBlock: let q = ffiCurrentEventQueue @@ -242,9 +240,8 @@ template enqueueOrMarkStuck( ffiCurrentNotifyEventEnqueued() template dispatchFFIEvent*(eventName: string, body: untyped) = - ## `body` must yield `string` or `seq[byte]`. Runs on the FFI thread — - ## encodes into a `c_malloc` buffer and enqueues; the event thread - ## fans out to listeners. + ## `body` must yield `string` / `seq[byte]`. FFI thread only: encodes into + ## a `c_malloc` buffer and enqueues; the event thread fans out to listeners. block: let evtName: string = eventName let bodyVal = body @@ -257,10 +254,8 @@ template dispatchFFIEvent*(eventName: string, body: untyped) = enqueueOrMarkStuck(evtName, namePtr, dataPtr, dataLen) template dispatchFFIEventCbor*(eventName: string, eventPayload: typed) = - ## Typed CBOR variant of `dispatchFFIEvent`. - ## Parameter is `eventPayload`, not `payload` — Nim's template - ## substitution would otherwise rewrite the `payload:` field inside - ## `EventEnvelope`. + ## Typed CBOR variant of `dispatchFFIEvent`. The param is `eventPayload` + ## (not `payload`) to avoid clobbering `EventEnvelope.payload` substitution. block: let evtName: string = eventName var (dataPtr, dataLen) = cborEncodeShared( diff --git a/ffi/ffi_thread.nim b/ffi/ffi_thread.nim index 1de1643..94f561d 100644 --- a/ffi/ffi_thread.nim +++ b/ffi/ffi_thread.nim @@ -23,15 +23,11 @@ proc sendRequestToFFIThread*( "reentrant ffi call: a handler invoked sendRequestToFFIThread on its own context" ) - # All async submissions serialise on `ctx.lock` for the full - # trySend + fireSync + waitSync sequence because `reqChannel` is - # single-producer and `reqReceivedSignal` is shared across callers. - # Multi-producer redesign is tracked as PR #23 review item 7. + # Serialise the trySend + fireSync + waitSync — reqChannel is SP and reqReceivedSignal is shared. ctx.lock.acquire() defer: ctx.lock.release() - ## Sending the request let sentOk = ctx.reqChannel.trySend(ffiRequest) if not sentOk: deleteRequest(ffiRequest) @@ -46,15 +42,12 @@ proc sendRequestToFFIThread*( deleteRequest(ffiRequest) return err("Couldn't fireSync in time") - ## wait until the FFI working thread properly received the request let res = ctx.reqReceivedSignal.waitSync(timeout) if res.isErr(): - ## Do not free ffiRequest here: the FFI thread was already signaled and - ## will process (and free) it. + # FFI thread was signaled and owns the request; don't double-free. return err("Couldn't receive reqReceivedSignal signal") - ## Notice that in case of "ok", the deallocShared(req) is performed by the FFI Thread in the - ## process proc. + # On ok the FFI thread's processRequest deallocShared(req)'s. ok() proc processRequest[T]( @@ -63,40 +56,27 @@ proc processRequest[T]( ## Invoked within the FFI thread to process a request coming from the FFI API consumer thread. let reqId = $request[].reqId - ## The reqId determines which proc will handle the request. - ## The registeredRequests represents a table defined at compile time. - ## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously] - - ## Explicit conversion keeps `reqId` alive as the backing string, - ## avoiding the implicit string→cstring warning that will become an error. - let reqIdCs = reqId.cstring + let reqIdCs = reqId.cstring # keeps reqId alive; implicit string→cstring is a warning. let retFut = if not ctx[].registeredRequests[].contains(reqIdCs): - ## That shouldn't happen because only registered requests should be sent to the FFI thread. nilProcess(request[].reqId) else: ctx[].registeredRequests[][reqIdCs](cast[pointer](request), ctx) - ## Catch every catchable exception (including CancelledError raised by - ## the shutdown drain in ffiRun) so handleRes — and its `deleteRequest` - ## defer — always runs. Otherwise an abandoned in-flight handler would - ## leak its request envelope, reqId copy, and CBOR payload. + # CatchableError covers CancelledError from the shutdown drain; handleRes must still run. let res = try: await retFut - except CatchableError as exc: + except CatchableError as e: Result[seq[byte], string].err( - "Error in processRequest for " & reqId & ": " & exc.msg + "Error in processRequest for " & reqId & ": " & e.msg ) - ## handleRes may raise (OOM, GC setup) even though it is rare. Catching here - ## keeps the async proc raises:[] compatible. The defer inside handleRes - ## guarantees request is freed before the exception propagates. try: handleRes(res, request) - except Exception as exc: - error "Unexpected exception in handleRes", error = exc.msg + except Exception as e: + error "Unexpected exception in handleRes", error = e.msg var ffiEventQueueSignalPtr {.threadvar.}: ThreadSignalPtr # Stashed so the hook has no closure env. @@ -126,23 +106,18 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} = - var ffiReqHandler: T - ## Holds the main library object, i.e., in charge of handling the ffi requests. - ## e.g., Waku, LibP2P, SDS, etc. + var ffiReqHandler: T # main library object (Waku, LibP2P, SDS, …) - ## In-flight processRequest futures. Tracked so they can be drained on - ## shutdown — otherwise destroying the context while a handler is - ## awaiting (e.g. sleepAsync) abandons the future and leaks the - ## request's envelope/reqId/payload allocations. + # Tracked so shutdown can drain them; abandoning a mid-await future leaks the request. var pending: seq[Future[void]] = @[] proc reapCompleted() = var i = 0 while i < pending.len: - if pending[i].finished(): - pending.del(i) - else: + if not pending[i].finished(): inc i + continue + pending.del(i) while ctx.running.load(): # Freezes if a sync handler blocks the dispatcher; event thread reads to detect wedged FFI thread. @@ -154,7 +129,6 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = if not gotSignal: continue - ## Wait for a request from the ffi consumer thread var request: ptr FFIThreadRequest if not ctx.reqChannel.tryRecv(request): continue @@ -162,22 +136,18 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = if ctx.myLib.isNil(): ctx.myLib = addr ffiReqHandler - ## Handle the request pending.add processRequest(request, ctx) let fireRes = ctx.reqReceivedSignal.fireSync() if fireRes.isErr(): error "could not fireSync back to requester thread", error = fireRes.error - ## Drain in-flight handlers so each request's `deleteRequest` runs - ## before we exit. Without this, abandoning a future mid-await would - ## leak the request allocations (visible to LSan; previously hidden - ## because Nim's pool allocator kept the chunks alive in the process). + # Drain so each pending handler's deleteRequest defer runs before exit. reapCompleted() if pending.len > 0: try: await allFutures(pending) - except CatchableError as exc: - error "draining pending FFI requests on shutdown raised", error = exc.msg + except CatchableError as e: + error "draining pending FFI requests on shutdown raised", error = e.msg waitFor ffiRun(ctx) diff --git a/tests/unit/test_event_thread.nim b/tests/unit/test_event_thread.nim index 5960a5d..51f8829 100644 --- a/tests/unit/test_event_thread.nim +++ b/tests/unit/test_event_thread.nim @@ -40,7 +40,7 @@ proc captureCb( acquire(d[].lock) d[].retCode = retCode let n = min(int(len), d[].msg.len) - if n > 0 and not msg.isNil: + if n > 0 and not msg.isNil(): copyMem(addr d[].msg[0], msg, n) d[].msgLen = n d[].called = true