mirror of
https://github.com/logos-messaging/nim-ffi.git
synced 2026-05-11 20:59:32 +00:00
better onDestroy sync
This commit is contained in:
parent
4290e8893a
commit
d427166734
@ -24,6 +24,12 @@ type FFIContext*[T] = object
|
||||
reqSignal: ThreadSignalPtr # to notify the FFI Thread that a new request is sent
|
||||
reqReceivedSignal: ThreadSignalPtr
|
||||
# to signal main thread, interfacing with the FFI thread, that FFI thread received the request
|
||||
stopSignal: ThreadSignalPtr
|
||||
# fired by destroyFFIContext so both ffiThread and watchdogThread can exit promptly
|
||||
threadExitSignal: ThreadSignalPtr
|
||||
# fired by ffiThread just before it exits; destroyFFIContext waits on
|
||||
# this with a bounded timeout instead of joining unconditionally, so a
|
||||
# blocked event loop cannot hang the caller forever
|
||||
userData*: pointer
|
||||
callbackState*: FFICallbackState
|
||||
running: Atomic[bool] # To control when the threads are running
|
||||
@ -140,11 +146,14 @@ proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} =
|
||||
const WatchdogTimeout = 20.seconds
|
||||
|
||||
# Give time for the node to be created and up before sending watchdog requests
|
||||
await sleepAsync(WatchdogStartDelay)
|
||||
while true:
|
||||
await sleepAsync(WatchdogTimeinterval)
|
||||
let initialStop = await ctx.stopSignal.wait().withTimeout(WatchdogStartDelay)
|
||||
if initialStop or ctx.running.load == false:
|
||||
return
|
||||
|
||||
if ctx.running.load == false:
|
||||
while true:
|
||||
let intervalStop = await ctx.stopSignal.wait().withTimeout(WatchdogTimeinterval)
|
||||
|
||||
if intervalStop or ctx.running.load == false:
|
||||
debug "Watchdog thread exiting because FFIContext is not running"
|
||||
break
|
||||
|
||||
@ -209,22 +218,27 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
|
||||
logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)
|
||||
|
||||
defer:
|
||||
# Signal destroyFFIContext that this thread has exited, so its bounded
|
||||
# wait can unblock and proceed with cleanup.
|
||||
let fireRes = ctx.threadExitSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
error "failed to fire threadExitSignal on FFI thread exit",
|
||||
err = fireRes.error
|
||||
|
||||
let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} =
|
||||
var ffiReqHandler: T
|
||||
## Holds the main library object, i.e., in charge of handling the ffi requests.
|
||||
## e.g., Waku, LibP2P, SDS, etc.
|
||||
|
||||
while true:
|
||||
await ctx.reqSignal.wait()
|
||||
|
||||
if ctx.running.load == false:
|
||||
break
|
||||
while ctx.running.load():
|
||||
let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds)
|
||||
if not gotSignal:
|
||||
continue
|
||||
|
||||
## Wait for a request from the ffi consumer thread
|
||||
var request: ptr FFIThreadRequest
|
||||
let recvOk = ctx.reqChannel.tryRecv(request)
|
||||
if not recvOk:
|
||||
chronicles.error "ffi thread could not receive a request"
|
||||
if not ctx.reqChannel.tryRecv(request):
|
||||
continue
|
||||
|
||||
if ctx.myLib.isNil():
|
||||
@ -243,10 +257,36 @@ proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
defer:
|
||||
freeShared(ctx)
|
||||
ctx.lock.deinitLock()
|
||||
if not ctx.reqSignal.isNil():
|
||||
?ctx.reqSignal.close()
|
||||
if not ctx.reqReceivedSignal.isNil():
|
||||
?ctx.reqReceivedSignal.close()
|
||||
when defined(gcRefc):
|
||||
## ThreadSignalPtr.close() is intentionally skipped under --mm:refc.
|
||||
##
|
||||
## close() goes through chronos's safeUnregisterAndCloseFd, which calls
|
||||
## getThreadDispatcher() and lazily allocates a new Selector for the
|
||||
## main thread. With refc and a heavy ref-object graph torn down by the
|
||||
## FFI thread (libwaku/libp2p), that allocation traps inside rawNewObj
|
||||
## and the refc signal handler re-enters the same allocator — the
|
||||
## process never returns. Captured stack from a hung process:
|
||||
## close → safeUnregisterAndCloseFd → getThreadDispatcher →
|
||||
## newDispatcher → Selector.new → newObj (gc.nim:488) →
|
||||
## rawNewObj (gc.nim:470) → rawNewObj → _sigtramp → signalHandler →
|
||||
## newObjNoInit → addNewObjToZCT (infinite re-entry)
|
||||
##
|
||||
## --mm:orc does NOT exhibit this bug; see the
|
||||
## "destroyFFIContext refc workaround" suite in tests/test_ffi_context.nim
|
||||
## (test "destroy after heavy ref-allocation workload returns promptly").
|
||||
## The signal fds (a few per ctx) are reclaimed by the OS at process
|
||||
## exit; destroyFFIContext is called once per process lifetime, so the
|
||||
## leak is bounded.
|
||||
discard
|
||||
else:
|
||||
if not ctx.reqSignal.isNil():
|
||||
?ctx.reqSignal.close()
|
||||
if not ctx.reqReceivedSignal.isNil():
|
||||
?ctx.reqReceivedSignal.close()
|
||||
if not ctx.stopSignal.isNil():
|
||||
?ctx.stopSignal.close()
|
||||
if not ctx.threadExitSignal.isNil():
|
||||
?ctx.threadExitSignal.close()
|
||||
return ok()
|
||||
|
||||
proc createFFIContext*[T](): Result[ptr FFIContext[T], string] =
|
||||
@ -255,16 +295,24 @@ proc createFFIContext*[T](): Result[ptr FFIContext[T], string] =
|
||||
var ctx = createShared(FFIContext[T], 1)
|
||||
ctx.lock.initLock()
|
||||
|
||||
var success = false
|
||||
defer:
|
||||
if not success:
|
||||
ctx.cleanUpResources().isOkOr:
|
||||
error "failed to clean up resources after createFFIContext failure",
|
||||
err = error
|
||||
|
||||
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
|
||||
ctx.cleanUpResources().isOkOr:
|
||||
return err("could not clean resources in a failure new reqSignal: " & $error)
|
||||
return err("couldn't create reqSignal ThreadSignalPtr: " & $error)
|
||||
|
||||
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
|
||||
ctx.cleanUpResources().isOkOr:
|
||||
return
|
||||
err("could not clean resources in a failure new reqReceivedSignal: " & $error)
|
||||
return err("couldn't create reqReceivedSignal ThreadSignalPtr")
|
||||
return err("couldn't create reqReceivedSignal ThreadSignalPtr: " & $error)
|
||||
|
||||
ctx.stopSignal = ThreadSignalPtr.new().valueOr:
|
||||
return err("couldn't create stopSignal ThreadSignalPtr: " & $error)
|
||||
|
||||
ctx.threadExitSignal = ThreadSignalPtr.new().valueOr:
|
||||
return err("couldn't create threadExitSignal ThreadSignalPtr: " & $error)
|
||||
|
||||
ctx.registeredRequests = addr ffi_types.registeredRequests
|
||||
|
||||
@ -273,31 +321,33 @@ proc createFFIContext*[T](): Result[ptr FFIContext[T], string] =
|
||||
try:
|
||||
createThread(ctx.ffiThread, ffiThreadBody[T], ctx)
|
||||
except ValueError, ResourceExhaustedError:
|
||||
ctx.cleanUpResources().isOkOr:
|
||||
error "failed to clean up resources after ffiThread creation failure", err = error
|
||||
return err("failed to create the FFI thread: " & getCurrentExceptionMsg())
|
||||
|
||||
try:
|
||||
createThread(ctx.watchdogThread, watchdogThreadBody, ctx)
|
||||
except ValueError, ResourceExhaustedError:
|
||||
## ffiThread is already running; signal it to exit and join before the
|
||||
## deferred cleanUpResources closes the signals it's waiting on.
|
||||
ctx.running.store(false)
|
||||
let fireRes = ctx.reqSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
error "failed to signal ffiThread during watchdog cleanup", err = fireRes.error
|
||||
error "failed to signal ffiThread during watchdog cleanup",
|
||||
err = fireRes.error
|
||||
joinThread(ctx.ffiThread)
|
||||
ctx.cleanUpResources().isOkOr:
|
||||
error "failed to clean up resources after watchdogThread creation failure", err = error
|
||||
return err("failed to create the watchdog thread: " & getCurrentExceptionMsg())
|
||||
|
||||
success = true
|
||||
return ok(ctx)
|
||||
|
||||
proc destroyFFIContext*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
## If the FFI thread's event loop is blocked by a synchronous handler
|
||||
## (e.g. blocking I/O), it cannot process reqSignal in time to exit.
|
||||
## In that case we leak ctx and the thread rather than hanging forever:
|
||||
## the thread will eventually exit on its own, but cleanup is skipped
|
||||
## because the thread may still be touching ctx fields.
|
||||
const ThreadExitTimeout = 1500.milliseconds
|
||||
|
||||
ctx.running.store(false)
|
||||
defer:
|
||||
joinThread(ctx.ffiThread)
|
||||
joinThread(ctx.watchdogThread)
|
||||
ctx.cleanUpResources().isOkOr:
|
||||
error "failed to clean up resources in destroyFFIContext", err = error
|
||||
|
||||
let signaledOnTime = ctx.reqSignal.fireSync().valueOr:
|
||||
ctx.onNotResponding()
|
||||
@ -306,6 +356,27 @@ proc destroyFFIContext*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
ctx.onNotResponding()
|
||||
return err("failed to signal reqSignal on time in destroyFFIContext")
|
||||
|
||||
ctx.stopSignal.fireSync().isOkOr:
|
||||
error "failed to fire stopSignal in destroyFFIContext", err = $error
|
||||
|
||||
## Bounded wait for ffiThread to exit. waitSync blocks the calling thread
|
||||
## up to the timeout; ffiThread fires threadExitSignal in its defer block.
|
||||
let exitedOnTime = ctx.threadExitSignal.waitSync(ThreadExitTimeout).valueOr:
|
||||
ctx.onNotResponding()
|
||||
return err("error waiting for FFI thread exit: " & $error)
|
||||
|
||||
if not exitedOnTime:
|
||||
## Event loop is blocked by a synchronous handler. Leak the thread and
|
||||
## ctx to avoid hanging the caller forever.
|
||||
ctx.onNotResponding()
|
||||
return err("FFI thread did not exit in time; leaking ctx to avoid hang")
|
||||
|
||||
joinThread(ctx.ffiThread)
|
||||
joinThread(ctx.watchdogThread)
|
||||
ctx.cleanUpResources().isOkOr:
|
||||
error "failed to clean up resources in destroyFFIContext", err = error
|
||||
return err("cleanUpResources failed: " & $error)
|
||||
|
||||
return ok()
|
||||
|
||||
template checkParams*(ctx: ptr FFIContext, callback: FFICallBack, userData: pointer) =
|
||||
|
||||
@ -958,11 +958,13 @@ macro ffi*(prc: untyped): untyped =
|
||||
let `retValOrErrIdent` = `syncHelperCall`
|
||||
if `retValOrErrIdent`.isErr():
|
||||
let errStr = `retValOrErrIdent`.error
|
||||
callback(RET_ERR, unsafeAddr errStr[0], cast[csize_t](errStr.len), userData)
|
||||
callback(
|
||||
RET_ERR, cast[ptr cchar](errStr.cstring), cast[csize_t](errStr.len), userData
|
||||
)
|
||||
return RET_ERR
|
||||
let serialized = ffiSerialize(`retValOrErrIdent`.value)
|
||||
callback(
|
||||
RET_OK, unsafeAddr serialized[0], cast[csize_t](serialized.len), userData
|
||||
RET_OK, cast[ptr cchar](serialized.cstring), cast[csize_t](serialized.len), userData
|
||||
)
|
||||
return RET_OK
|
||||
|
||||
|
||||
@ -61,6 +61,57 @@ registerReqFFI(EmptyOkRequest, lib: ptr TestLib):
|
||||
proc(): Future[Result[string, string]] {.async.} =
|
||||
return ok("")
|
||||
|
||||
registerReqFFI(SlowRequest, lib: ptr TestLib):
|
||||
proc(): Future[Result[string, string]] {.async.} =
|
||||
await sleepAsync(500.milliseconds)
|
||||
return ok("slow-done")
|
||||
|
||||
# Coordination channel: the FFI handler signals the test thread the instant
|
||||
# it is about to block the event loop, so the test can call destroyFFIContext
|
||||
# while the event loop is truly frozen.
|
||||
var gSyncBlockStarted: Channel[bool]
|
||||
gSyncBlockStarted.open()
|
||||
|
||||
registerReqFFI(SyncBlockingRequest, lib: ptr TestLib):
|
||||
proc(): Future[Result[string, string]] {.async.} =
|
||||
# Yield first so that reqReceivedSignal fires and sendRequestToFFIThread
|
||||
# returns on the calling thread before we start the synchronous block.
|
||||
await sleepAsync(0.milliseconds)
|
||||
# Signal the test thread: the event loop is about to be frozen.
|
||||
# Channel.send is annotated as raising under refc, so wrap.
|
||||
try:
|
||||
gSyncBlockStarted.send(true)
|
||||
except Exception as exc:
|
||||
return err("gSyncBlockStarted.send raised: " & exc.msg)
|
||||
# Simulates a request that blocks the event-loop thread synchronously
|
||||
# (e.g. w.stop() -> switch.stop() -> connManager.close() with blocking I/O).
|
||||
# Unlike sleepAsync, os.sleep holds the OS thread and prevents Chronos from
|
||||
# processing any callbacks -- including the reqSignal fired by destroyFFIContext.
|
||||
os.sleep(5_000)
|
||||
return ok("sync-blocking-done")
|
||||
|
||||
# Approximates the heavy ref-object workload that libwaku/libp2p performs on
|
||||
# the FFI thread. The exact cell count is large enough to force several refc
|
||||
# GC cycles; under refc this stresses the heap state that, when later combined
|
||||
# with a chronos Selector allocation on the main thread (via close()), used to
|
||||
# trip the rawNewObj → signal-handler infinite recursion.
|
||||
type RefCell = ref object
|
||||
next: RefCell
|
||||
payload: array[64, byte]
|
||||
|
||||
registerReqFFI(HeavyRefAllocRequest, lib: ptr TestLib):
|
||||
proc(): Future[Result[string, string]] {.async.} =
|
||||
var head: RefCell
|
||||
for i in 0 ..< 50_000:
|
||||
let n = RefCell(next: head)
|
||||
head = n
|
||||
if i mod 1000 == 0:
|
||||
await sleepAsync(0.milliseconds)
|
||||
# Let the chain become collectable and yield so refc has a chance to run.
|
||||
head = nil
|
||||
await sleepAsync(10.milliseconds)
|
||||
return ok("heavy-done")
|
||||
|
||||
suite "createFFIContext / destroyFFIContext":
|
||||
test "create and destroy succeeds":
|
||||
let ctx = createFFIContext[TestLib]().valueOr:
|
||||
@ -75,6 +126,115 @@ suite "createFFIContext / destroyFFIContext":
|
||||
return
|
||||
check destroyFFIContext(ctx).isOk()
|
||||
|
||||
suite "destroyFFIContext does not hang":
|
||||
test "destroy while a slow async request is still in-flight":
|
||||
## Reproduces the race where destroyFFIContext was called while a long-
|
||||
## running async request (e.g. stop_node / w.stop()) was still executing.
|
||||
## The destroy must return well within 2 seconds; before the fix it would
|
||||
## block forever on joinThread(ffiThread).
|
||||
let ctx = createFFIContext[TestLib]().valueOr:
|
||||
check false
|
||||
return
|
||||
|
||||
var d: CallbackData
|
||||
initCallbackData(d)
|
||||
defer: deinitCallbackData(d)
|
||||
|
||||
# sendRequestToFFIThread returns as soon as the FFI thread ACKs receipt;
|
||||
# the 500 ms work continues asynchronously on the FFI thread.
|
||||
check sendRequestToFFIThread(
|
||||
ctx, SlowRequest.ffiNewReq(testCallback, addr d)
|
||||
).isOk()
|
||||
|
||||
# Destroy immediately while SlowRequest is still running.
|
||||
let t0 = Moment.now()
|
||||
check destroyFFIContext(ctx).isOk()
|
||||
check (Moment.now() - t0) < 2.seconds
|
||||
|
||||
suite "destroyFFIContext does not hang when event loop is blocked":
|
||||
test "destroy while sync-blocking request is in-flight":
|
||||
## Reproduces the hang seen in logosdelivery_example.c:
|
||||
## logosdelivery_stop_node(...) -- triggers w.stop() on the FFI thread
|
||||
## sleep(1)
|
||||
## logosdelivery_destroy(...) -- hangs forever
|
||||
##
|
||||
## Root cause: w.stop() (and similar tear-down calls) can execute a
|
||||
## synchronous blocking section that holds the OS thread, preventing
|
||||
## the Chronos event loop from processing the reqSignal fired by
|
||||
## destroyFFIContext. The result is joinThread(ffiThread) never returns.
|
||||
##
|
||||
## With the fix, destroyFFIContext must complete well within the 5 s that
|
||||
## SyncBlockingRequest holds the event loop.
|
||||
let ctx = createFFIContext[TestLib]().valueOr:
|
||||
check false
|
||||
return
|
||||
|
||||
var d: CallbackData
|
||||
initCallbackData(d)
|
||||
defer: deinitCallbackData(d)
|
||||
|
||||
check sendRequestToFFIThread(
|
||||
ctx, SyncBlockingRequest.ffiNewReq(testCallback, addr d)
|
||||
).isOk()
|
||||
|
||||
# Block until the FFI handler has signalled that os.sleep is about to start.
|
||||
# This guarantees destroyFFIContext is called while the event loop is frozen.
|
||||
discard gSyncBlockStarted.recv()
|
||||
|
||||
# Destroy must return promptly even though the event loop is frozen for 5s.
|
||||
# It deliberately returns err and leaks ctx in this scenario rather than
|
||||
# hanging on joinThread.
|
||||
let t0 = Moment.now()
|
||||
check destroyFFIContext(ctx).isErr()
|
||||
check (Moment.now() - t0) < 3.seconds
|
||||
|
||||
suite "destroyFFIContext refc workaround":
|
||||
## Documents the refc-specific workaround in cleanUpResources.
|
||||
##
|
||||
## Background: when the FFI thread does heavy ref-object work (the workload
|
||||
## that triggered the libwaku hang in production), the refc GC heap reaches
|
||||
## a state where the very first chronos Selector allocation on the *main*
|
||||
## thread — which happens lazily inside ThreadSignalPtr.close() through
|
||||
## getThreadDispatcher() — traps in rawNewObj. The refc signal handler
|
||||
## itself re-enters the same allocator and the process never returns.
|
||||
## Captured stack:
|
||||
## close → safeUnregisterAndCloseFd → getThreadDispatcher →
|
||||
## newDispatcher → Selector.new → newObj (gc.nim:488) → rawNewObj →
|
||||
## _sigtramp → signalHandler → newObjNoInit → addNewObjToZCT (loop)
|
||||
##
|
||||
## The workaround in cleanUpResources is `when defined(gcRefc): discard`,
|
||||
## i.e. skip the close() calls under refc only. orc is unaffected and
|
||||
## still cleans up the signal fds normally.
|
||||
##
|
||||
## NOTE: this test is documentation more than regression: a synthetic
|
||||
## ref-allocation workload of ~50k cells does NOT corrupt the refc heap
|
||||
## the way the real libwaku/libp2p teardown does, so this test passes
|
||||
## even when the workaround is disabled. Reproducing the actual hang
|
||||
## requires the full libwaku workload (logosdelivery_example.c).
|
||||
## Verification of the workaround was done end-to-end against that
|
||||
## example: with `--mm:refc` and close() enabled it hangs forever in
|
||||
## the captured stack above; with `when defined(gcRefc): discard` it
|
||||
## returns immediately. Under `--mm:orc` it returns immediately either
|
||||
## way.
|
||||
test "destroy after heavy ref-allocation workload returns promptly":
|
||||
let ctx = createFFIContext[TestLib]().valueOr:
|
||||
check false
|
||||
return
|
||||
|
||||
var d: CallbackData
|
||||
initCallbackData(d)
|
||||
defer: deinitCallbackData(d)
|
||||
|
||||
check sendRequestToFFIThread(
|
||||
ctx, HeavyRefAllocRequest.ffiNewReq(testCallback, addr d)
|
||||
).isOk()
|
||||
waitCallback(d)
|
||||
check d.retCode == RET_OK
|
||||
|
||||
let t0 = Moment.now()
|
||||
check destroyFFIContext(ctx).isOk()
|
||||
check (Moment.now() - t0) < 3.seconds
|
||||
|
||||
suite "sendRequestToFFIThread":
|
||||
test "successful request triggers RET_OK callback":
|
||||
var d: CallbackData
|
||||
@ -161,7 +321,7 @@ suite "ffiCtor macro":
|
||||
let configJson = ffiSerialize(SimpleConfig(initialValue: 42))
|
||||
let ret = testlib_create(configJson.cstring, testCallback, addr d)
|
||||
|
||||
check ret == RET_OK
|
||||
check not ret.isNil()
|
||||
|
||||
waitCallback(d)
|
||||
|
||||
@ -203,7 +363,7 @@ suite "simplified .ffi. macro":
|
||||
|
||||
let configJson = ffiSerialize(SimpleConfig(initialValue: 7))
|
||||
let ctorRet = testlib_create(configJson.cstring, testCallback, addr ctorD)
|
||||
check ctorRet == RET_OK
|
||||
check not ctorRet.isNil()
|
||||
|
||||
waitCallback(ctorD)
|
||||
check ctorD.retCode == RET_OK
|
||||
@ -253,7 +413,7 @@ suite "async/sync detection in .ffi.":
|
||||
|
||||
let configJson = ffiSerialize(SimpleConfig(initialValue: 3))
|
||||
let ctorRet = testlib_create(configJson.cstring, testCallback, addr ctorD)
|
||||
check ctorRet == RET_OK
|
||||
check not ctorRet.isNil()
|
||||
|
||||
waitCallback(ctorD)
|
||||
check ctorD.retCode == RET_OK
|
||||
@ -322,7 +482,7 @@ suite "ptr return type in .ffi.":
|
||||
|
||||
let configJson = ffiSerialize(SimpleConfig(initialValue: 5))
|
||||
let ctorRet = testlib_create(configJson.cstring, testCallback, addr ctorD)
|
||||
check ctorRet == RET_OK
|
||||
check not ctorRet.isNil()
|
||||
|
||||
waitCallback(ctorD)
|
||||
check ctorD.retCode == RET_OK
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user