mirror of
https://github.com/logos-messaging/nim-ffi.git
synced 2026-06-21 08:49:34 +00:00
chore: code cleanup
This commit is contained in:
parent
c7c9329abd
commit
a429ee9e31
@ -80,8 +80,8 @@ proc onNotResponding*(ctx: ptr FFIContext) =
|
||||
let event =
|
||||
try:
|
||||
encodeNotRespondingEvent()
|
||||
except CatchableError as exc:
|
||||
chronicles.error "onNotResponding - encode failed", err = exc.msg
|
||||
except CatchableError as e:
|
||||
chronicles.error "onNotResponding - encode failed", err = e.msg
|
||||
return
|
||||
let dataPtr: pointer =
|
||||
if event.len > 0: unsafeAddr event[0]
|
||||
@ -166,9 +166,9 @@ proc processRequest[T](
|
||||
let res =
|
||||
try:
|
||||
await retFut
|
||||
except CatchableError as exc:
|
||||
except CatchableError as e:
|
||||
Result[seq[byte], string].err(
|
||||
"Error in processRequest for " & reqId & ": " & exc.msg
|
||||
"Error in processRequest for " & reqId & ": " & e.msg
|
||||
)
|
||||
|
||||
## handleRes may raise (OOM, GC setup) even though it is rare. Catching here
|
||||
@ -176,8 +176,8 @@ proc processRequest[T](
|
||||
## guarantees request is freed before the exception propagates.
|
||||
try:
|
||||
handleRes(res, request)
|
||||
except Exception as exc:
|
||||
error "Unexpected exception in handleRes", error = exc.msg
|
||||
except Exception as e:
|
||||
error "Unexpected exception in handleRes", error = e.msg
|
||||
|
||||
var ffiEventQueueSignalPtr {.threadvar.}: ThreadSignalPtr
|
||||
## Stashed so the hook below has no closure environment.
|
||||
@ -258,22 +258,15 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
if pending.len > 0:
|
||||
try:
|
||||
await allFutures(pending)
|
||||
except CatchableError as exc:
|
||||
error "draining pending FFI requests on shutdown raised",
|
||||
error = exc.msg
|
||||
except CatchableError as e:
|
||||
error "draining pending FFI requests on shutdown raised", error = e.msg
|
||||
|
||||
waitFor ffiRun(ctx)
|
||||
|
||||
proc freeQueuedEventPayload(qe: QueuedEvent) =
|
||||
if not qe.name.isNil:
|
||||
c_free(cast[pointer](qe.name))
|
||||
if not qe.data.isNil:
|
||||
c_free(qe.data)
|
||||
|
||||
proc dispatchQueuedEvent[T](ctx: ptr FFIContext[T], qe: QueuedEvent) =
|
||||
## Frees `qe`'s c_malloc buffers on exit.
|
||||
defer:
|
||||
freeQueuedEventPayload(qe)
|
||||
freeEventBuffers(qe.name, qe.data)
|
||||
ctx.dispatchToListeners($qe.name, qe.data, qe.dataLen)
|
||||
|
||||
proc drainEventQueue[T](ctx: ptr FFIContext[T]) =
|
||||
@ -342,8 +335,8 @@ proc eventThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
|
||||
try:
|
||||
waitFor eventRun(ctx)
|
||||
except CatchableError as exc:
|
||||
error "event thread exited with exception", error = exc.msg
|
||||
except CatchableError as e:
|
||||
error "event thread exited with exception", error = e.msg
|
||||
|
||||
proc deinitContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
## Mirror of `initContextResources`: tears down lock, registry, queue,
|
||||
@ -375,24 +368,16 @@ proc deinitContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
## leak is bounded.
|
||||
discard
|
||||
else:
|
||||
if not ctx.reqSignal.isNil():
|
||||
?ctx.reqSignal.close()
|
||||
ctx.reqSignal = nil
|
||||
if not ctx.reqReceivedSignal.isNil():
|
||||
?ctx.reqReceivedSignal.close()
|
||||
ctx.reqReceivedSignal = nil
|
||||
if not ctx.stopSignal.isNil():
|
||||
?ctx.stopSignal.close()
|
||||
ctx.stopSignal = nil
|
||||
if not ctx.threadExitSignal.isNil():
|
||||
?ctx.threadExitSignal.close()
|
||||
ctx.threadExitSignal = nil
|
||||
if not ctx.eventQueueSignal.isNil():
|
||||
?ctx.eventQueueSignal.close()
|
||||
ctx.eventQueueSignal = nil
|
||||
if not ctx.eventThreadExitSignal.isNil():
|
||||
?ctx.eventThreadExitSignal.close()
|
||||
ctx.eventThreadExitSignal = nil
|
||||
template closeAndNil(field: untyped) =
|
||||
if not field.isNil():
|
||||
?field.close()
|
||||
field = nil
|
||||
closeAndNil(ctx.reqSignal)
|
||||
closeAndNil(ctx.reqReceivedSignal)
|
||||
closeAndNil(ctx.stopSignal)
|
||||
closeAndNil(ctx.threadExitSignal)
|
||||
closeAndNil(ctx.eventQueueSignal)
|
||||
closeAndNil(ctx.eventThreadExitSignal)
|
||||
ok()
|
||||
|
||||
proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
@ -468,26 +453,32 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
success = true
|
||||
ok()
|
||||
|
||||
proc fireOrErr(sig: ThreadSignalPtr, name: string): Result[void, string] =
|
||||
let fired = sig.fireSync().valueOr:
|
||||
return err("error signaling " & name & ": " & $error)
|
||||
if not fired:
|
||||
return err("failed to signal " & name & " on time")
|
||||
ok()
|
||||
|
||||
proc waitExitOrErr(
|
||||
sig: ThreadSignalPtr, name: string, timeout: Duration
|
||||
): Result[void, string] =
|
||||
let exited = sig.waitSync(timeout).valueOr:
|
||||
return err("error waiting for " & name & " exit: " & $error)
|
||||
if not exited:
|
||||
return err(name & " did not exit in time; leaking ctx to avoid hang")
|
||||
ok()
|
||||
|
||||
proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
# Error paths intentionally skip onNotResponding: a back-pressuring
|
||||
# listener may hold reg.lock, and onNotResponding takes it — would
|
||||
# amplify the stuck state into a deadlock instead of escaping it.
|
||||
ctx.running.store(false)
|
||||
let reqSignaled = ctx.reqSignal.fireSync().valueOr:
|
||||
return err("error signaling reqSignal in signalStop: " & $error)
|
||||
if not reqSignaled:
|
||||
return err("failed to signal reqSignal on time in signalStop")
|
||||
let stopSignaled = ctx.stopSignal.fireSync().valueOr:
|
||||
return err("error signaling stopSignal in signalStop: " & $error)
|
||||
if not stopSignaled:
|
||||
return err("failed to signal stopSignal on time in signalStop")
|
||||
?ctx.reqSignal.fireOrErr("reqSignal")
|
||||
?ctx.stopSignal.fireOrErr("stopSignal")
|
||||
# Non-fatal: event thread will see running==false on the next tick.
|
||||
let evtSignaled = ctx.eventQueueSignal.fireSync()
|
||||
if evtSignaled.isErr():
|
||||
error "failed to signal eventQueueSignal in signalStop",
|
||||
error = evtSignaled.error
|
||||
elif evtSignaled.get() == false:
|
||||
error "failed to signal eventQueueSignal on time in signalStop"
|
||||
ctx.eventQueueSignal.fireOrErr("eventQueueSignal").isOkOr:
|
||||
error "failed to signal eventQueueSignal in signalStop", error = error
|
||||
ok()
|
||||
|
||||
## If the FFI thread's event loop is blocked by a synchronous handler
|
||||
@ -507,20 +498,9 @@ proc stopAndJoinThreads*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
ctx.signalStop().isOkOr:
|
||||
return err("signalStop failed: " & $error)
|
||||
|
||||
let ffiExitedOnTime = ctx.threadExitSignal.waitSync(ThreadExitTimeout).valueOr:
|
||||
return err("error waiting for FFI thread exit: " & $error)
|
||||
|
||||
if not ffiExitedOnTime:
|
||||
return err("FFI thread did not exit in time; leaking ctx to avoid hang")
|
||||
|
||||
?ctx.threadExitSignal.waitExitOrErr("FFI thread", ThreadExitTimeout)
|
||||
joinThread(ctx.ffiThread)
|
||||
|
||||
let evtExitedOnTime = ctx.eventThreadExitSignal.waitSync(ThreadExitTimeout).valueOr:
|
||||
return err("error waiting for event thread exit: " & $error)
|
||||
|
||||
if not evtExitedOnTime:
|
||||
return err("event thread did not exit in time; leaking ctx to avoid hang")
|
||||
|
||||
?ctx.eventThreadExitSignal.waitExitOrErr("event thread", ThreadExitTimeout)
|
||||
joinThread(ctx.eventThread)
|
||||
ok()
|
||||
|
||||
|
||||
@ -168,14 +168,18 @@ proc initEventQueue*(q: var EventQueue) {.raises: [].} =
|
||||
for i in 0 ..< EventQueueCapacity:
|
||||
q.buf[i] = QueuedEvent(name: nil, data: nil, dataLen: 0)
|
||||
|
||||
proc freeEventBuffers*(
|
||||
name: cstring, data: ptr UncheckedArray[byte]
|
||||
) {.raises: [], gcsafe.} =
|
||||
if not name.isNil():
|
||||
c_free(cast[pointer](name))
|
||||
if not data.isNil():
|
||||
c_free(data)
|
||||
|
||||
proc deinitEventQueue*(q: var EventQueue) {.raises: [].} =
|
||||
## Both producer and consumer must have stopped before calling.
|
||||
for i in 0 ..< EventQueueCapacity:
|
||||
let e = q.buf[i]
|
||||
if not e.name.isNil:
|
||||
c_free(cast[pointer](e.name))
|
||||
if not e.data.isNil:
|
||||
c_free(e.data)
|
||||
freeEventBuffers(q.buf[i].name, q.buf[i].data)
|
||||
q.buf[i] = QueuedEvent(name: nil, data: nil, dataLen: 0)
|
||||
q.head = 0
|
||||
q.tail = 0
|
||||
@ -265,17 +269,11 @@ template enqueueOrMarkStuck(
|
||||
let q = ffiCurrentEventQueue
|
||||
if q.isNil():
|
||||
chronicles.error "event queue not set on this thread", event = eventName
|
||||
if not namePtr.isNil:
|
||||
c_free(cast[pointer](namePtr))
|
||||
if not dataPtr.isNil:
|
||||
c_free(dataPtr)
|
||||
freeEventBuffers(namePtr, dataPtr)
|
||||
elif not q[].tryEnqueueEvent(namePtr, dataPtr, dataLen):
|
||||
chronicles.error "event queue full; library marked stuck",
|
||||
event = eventName, capacity = EventQueueCapacity
|
||||
if not namePtr.isNil:
|
||||
c_free(cast[pointer](namePtr))
|
||||
if not dataPtr.isNil:
|
||||
c_free(dataPtr)
|
||||
freeEventBuffers(namePtr, dataPtr)
|
||||
if not ffiCurrentEventQueueStuck.isNil():
|
||||
ffiCurrentEventQueueStuck[].store(true)
|
||||
if not ffiCurrentNotifyEventEnqueued.isNil():
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user