diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index 17a4412..126111b 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -80,8 +80,8 @@ proc onNotResponding*(ctx: ptr FFIContext) = let event = try: encodeNotRespondingEvent() - except CatchableError as exc: - chronicles.error "onNotResponding - encode failed", err = exc.msg + except CatchableError as e: + chronicles.error "onNotResponding - encode failed", err = e.msg return let dataPtr: pointer = if event.len > 0: unsafeAddr event[0] @@ -166,9 +166,9 @@ proc processRequest[T]( let res = try: await retFut - except CatchableError as exc: + except CatchableError as e: Result[seq[byte], string].err( - "Error in processRequest for " & reqId & ": " & exc.msg + "Error in processRequest for " & reqId & ": " & e.msg ) ## handleRes may raise (OOM, GC setup) even though it is rare. Catching here @@ -176,8 +176,8 @@ proc processRequest[T]( ## guarantees request is freed before the exception propagates. try: handleRes(res, request) - except Exception as exc: - error "Unexpected exception in handleRes", error = exc.msg + except Exception as e: + error "Unexpected exception in handleRes", error = e.msg var ffiEventQueueSignalPtr {.threadvar.}: ThreadSignalPtr ## Stashed so the hook below has no closure environment. @@ -258,22 +258,15 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = if pending.len > 0: try: await allFutures(pending) - except CatchableError as exc: - error "draining pending FFI requests on shutdown raised", - error = exc.msg + except CatchableError as e: + error "draining pending FFI requests on shutdown raised", error = e.msg waitFor ffiRun(ctx) -proc freeQueuedEventPayload(qe: QueuedEvent) = - if not qe.name.isNil: - c_free(cast[pointer](qe.name)) - if not qe.data.isNil: - c_free(qe.data) - proc dispatchQueuedEvent[T](ctx: ptr FFIContext[T], qe: QueuedEvent) = ## Frees `qe`'s c_malloc buffers on exit. defer: - freeQueuedEventPayload(qe) + freeEventBuffers(qe.name, qe.data) ctx.dispatchToListeners($qe.name, qe.data, qe.dataLen) proc drainEventQueue[T](ctx: ptr FFIContext[T]) = @@ -342,8 +335,8 @@ proc eventThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = try: waitFor eventRun(ctx) - except CatchableError as exc: - error "event thread exited with exception", error = exc.msg + except CatchableError as e: + error "event thread exited with exception", error = e.msg proc deinitContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = ## Mirror of `initContextResources`: tears down lock, registry, queue, @@ -375,24 +368,16 @@ proc deinitContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = ## leak is bounded. discard else: - if not ctx.reqSignal.isNil(): - ?ctx.reqSignal.close() - ctx.reqSignal = nil - if not ctx.reqReceivedSignal.isNil(): - ?ctx.reqReceivedSignal.close() - ctx.reqReceivedSignal = nil - if not ctx.stopSignal.isNil(): - ?ctx.stopSignal.close() - ctx.stopSignal = nil - if not ctx.threadExitSignal.isNil(): - ?ctx.threadExitSignal.close() - ctx.threadExitSignal = nil - if not ctx.eventQueueSignal.isNil(): - ?ctx.eventQueueSignal.close() - ctx.eventQueueSignal = nil - if not ctx.eventThreadExitSignal.isNil(): - ?ctx.eventThreadExitSignal.close() - ctx.eventThreadExitSignal = nil + template closeAndNil(field: untyped) = + if not field.isNil(): + ?field.close() + field = nil + closeAndNil(ctx.reqSignal) + closeAndNil(ctx.reqReceivedSignal) + closeAndNil(ctx.stopSignal) + closeAndNil(ctx.threadExitSignal) + closeAndNil(ctx.eventQueueSignal) + closeAndNil(ctx.eventThreadExitSignal) ok() proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] = @@ -468,26 +453,32 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = success = true ok() +proc fireOrErr(sig: ThreadSignalPtr, name: string): Result[void, string] = + let fired = sig.fireSync().valueOr: + return err("error signaling " & name & ": " & $error) + if not fired: + return err("failed to signal " & name & " on time") + ok() + +proc waitExitOrErr( + sig: ThreadSignalPtr, name: string, timeout: Duration +): Result[void, string] = + let exited = sig.waitSync(timeout).valueOr: + return err("error waiting for " & name & " exit: " & $error) + if not exited: + return err(name & " did not exit in time; leaking ctx to avoid hang") + ok() + proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] = # Error paths intentionally skip onNotResponding: a back-pressuring # listener may hold reg.lock, and onNotResponding takes it — would # amplify the stuck state into a deadlock instead of escaping it. ctx.running.store(false) - let reqSignaled = ctx.reqSignal.fireSync().valueOr: - return err("error signaling reqSignal in signalStop: " & $error) - if not reqSignaled: - return err("failed to signal reqSignal on time in signalStop") - let stopSignaled = ctx.stopSignal.fireSync().valueOr: - return err("error signaling stopSignal in signalStop: " & $error) - if not stopSignaled: - return err("failed to signal stopSignal on time in signalStop") + ?ctx.reqSignal.fireOrErr("reqSignal") + ?ctx.stopSignal.fireOrErr("stopSignal") # Non-fatal: event thread will see running==false on the next tick. - let evtSignaled = ctx.eventQueueSignal.fireSync() - if evtSignaled.isErr(): - error "failed to signal eventQueueSignal in signalStop", - error = evtSignaled.error - elif evtSignaled.get() == false: - error "failed to signal eventQueueSignal on time in signalStop" + ctx.eventQueueSignal.fireOrErr("eventQueueSignal").isOkOr: + error "failed to signal eventQueueSignal in signalStop", error = error ok() ## If the FFI thread's event loop is blocked by a synchronous handler @@ -507,20 +498,9 @@ proc stopAndJoinThreads*[T](ctx: ptr FFIContext[T]): Result[void, string] = ctx.signalStop().isOkOr: return err("signalStop failed: " & $error) - let ffiExitedOnTime = ctx.threadExitSignal.waitSync(ThreadExitTimeout).valueOr: - return err("error waiting for FFI thread exit: " & $error) - - if not ffiExitedOnTime: - return err("FFI thread did not exit in time; leaking ctx to avoid hang") - + ?ctx.threadExitSignal.waitExitOrErr("FFI thread", ThreadExitTimeout) joinThread(ctx.ffiThread) - - let evtExitedOnTime = ctx.eventThreadExitSignal.waitSync(ThreadExitTimeout).valueOr: - return err("error waiting for event thread exit: " & $error) - - if not evtExitedOnTime: - return err("event thread did not exit in time; leaking ctx to avoid hang") - + ?ctx.eventThreadExitSignal.waitExitOrErr("event thread", ThreadExitTimeout) joinThread(ctx.eventThread) ok() diff --git a/ffi/ffi_events.nim b/ffi/ffi_events.nim index cfcd0ff..ade364b 100644 --- a/ffi/ffi_events.nim +++ b/ffi/ffi_events.nim @@ -168,14 +168,18 @@ proc initEventQueue*(q: var EventQueue) {.raises: [].} = for i in 0 ..< EventQueueCapacity: q.buf[i] = QueuedEvent(name: nil, data: nil, dataLen: 0) +proc freeEventBuffers*( + name: cstring, data: ptr UncheckedArray[byte] +) {.raises: [], gcsafe.} = + if not name.isNil(): + c_free(cast[pointer](name)) + if not data.isNil(): + c_free(data) + proc deinitEventQueue*(q: var EventQueue) {.raises: [].} = ## Both producer and consumer must have stopped before calling. for i in 0 ..< EventQueueCapacity: - let e = q.buf[i] - if not e.name.isNil: - c_free(cast[pointer](e.name)) - if not e.data.isNil: - c_free(e.data) + freeEventBuffers(q.buf[i].name, q.buf[i].data) q.buf[i] = QueuedEvent(name: nil, data: nil, dataLen: 0) q.head = 0 q.tail = 0 @@ -265,17 +269,11 @@ template enqueueOrMarkStuck( let q = ffiCurrentEventQueue if q.isNil(): chronicles.error "event queue not set on this thread", event = eventName - if not namePtr.isNil: - c_free(cast[pointer](namePtr)) - if not dataPtr.isNil: - c_free(dataPtr) + freeEventBuffers(namePtr, dataPtr) elif not q[].tryEnqueueEvent(namePtr, dataPtr, dataLen): chronicles.error "event queue full; library marked stuck", event = eventName, capacity = EventQueueCapacity - if not namePtr.isNil: - c_free(cast[pointer](namePtr)) - if not dataPtr.isNil: - c_free(dataPtr) + freeEventBuffers(namePtr, dataPtr) if not ffiCurrentEventQueueStuck.isNil(): ffiCurrentEventQueueStuck[].store(true) if not ffiCurrentNotifyEventEnqueued.isNil():