From 534cbe8f1a8ba4ef6805581f08b3a71652a323a0 Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Sat, 2 May 2026 00:20:33 +0200 Subject: [PATCH 1/2] use fixed array of ctx to avoid consuming all fds --- ffi/ffi_context.nim | 156 +++++++++++++++++++++++++++++-------- tests/test_ffi_context.nim | 90 ++++++++++++++++++--- 2 files changed, 204 insertions(+), 42 deletions(-) diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index 545d2d0..b195958 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -18,6 +18,8 @@ type FFIContext*[T] = object reqSignal: ThreadSignalPtr # to notify the FFI Thread that a new request is sent reqReceivedSignal: ThreadSignalPtr # to signal main thread, interfacing with the FFI thread, that FFI thread received the request + watchdogStopSignal: ThreadSignalPtr + # fired by destroyFFIContext so the watchdog exits immediately instead of waiting out its sleep userData*: pointer eventCallback*: pointer eventUserdata*: pointer @@ -25,6 +27,18 @@ type FFIContext*[T] = object registeredRequests: ptr Table[cstring, FFIRequestProc] # Pointer to with the registered requests at compile time +const MaxFFIContexts* = 32 + ## Maximum number of concurrently live FFI contexts when using FFIContextPool. + ## Fds and threads are only consumed for slots that are actually acquired, + ## so this value only affects the upfront memory of the pool array. + +type FFIContextPool*[T] = object + ## Fixed-size pool of FFI contexts. Avoids dynamic heap allocation per context + ## and bounds the total number of file descriptors consumed by ThreadSignalPtrs + ## to at most MaxFFIContexts * 2. + slots: array[MaxFFIContexts, FFIContext[T]] + inUse: array[MaxFFIContexts, Atomic[bool]] + const git_version* {.strdefine.} = "n/a" template callEventCallback*(ctx: ptr FFIContext, eventName: string, body: untyped) = @@ -110,13 +124,19 @@ proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} = const WatchdogTimeinterval = 1.seconds const WatchdogTimeout = 20.seconds - # Give time for the node to be created and up before sending watchdog requests - await sleepAsync(WatchdogStartDelay) - while true: - await sleepAsync(WatchdogTimeinterval) + # Give time for the node to be created and up before sending watchdog requests. + # waitSync returns early if watchdogStopSignal fires (i.e. on destroy). + let startWait = ctx.watchdogStopSignal.waitSync(WatchdogStartDelay) + if startWait.isErr(): + error "watchdog: start-delay waitSync failed", err = startWait.error + elif startWait.get(): + return # stop signal fired during start delay - if ctx.running.load == false: - debug "Watchdog thread exiting because FFIContext is not running" + while ctx.running.load: + let intervalWait = ctx.watchdogStopSignal.waitSync(WatchdogTimeinterval) + if intervalWait.isErr(): + error "watchdog: interval waitSync failed", err = intervalWait.error + elif intervalWait.get() or not ctx.running.load: break let callback = proc( @@ -164,7 +184,9 @@ proc processRequest[T]( try: await retFut except CatchableError as exc: - Result[string, string].err("Exception in processRequest for " & reqId & ": " & exc.msg) + Result[string, string].err( + "Exception in processRequest for " & reqId & ": " & exc.msg + ) ## handleRes may raise (OOM, GC setup) even though it is rare. Catching here ## keeps the async proc raises:[] compatible. The defer inside handleRes @@ -208,33 +230,47 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = waitFor ffiRun(ctx) -proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] = - defer: - freeShared(ctx) +proc closeResources[T](ctx: ptr FFIContext[T]): Result[void, string] = + ## Closes file descriptors and deinits the lock. Does NOT free ctx memory. + ## Used by initContextResources error paths and pool destroy, where ctx is + ## not heap-allocated (pool slots live in a fixed array, not on the heap). ctx.lock.deinitLock() if not ctx.reqSignal.isNil(): ?ctx.reqSignal.close() if not ctx.reqReceivedSignal.isNil(): ?ctx.reqReceivedSignal.close() + if not ctx.watchdogStopSignal.isNil(): + ?ctx.watchdogStopSignal.close() return ok() -proc createFFIContext*[T](): Result[ptr FFIContext[T], string] = - ## This proc is called from the main thread and it creates - ## the FFI working thread. - var ctx = createShared(FFIContext[T], 1) +proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] = + ## Full cleanup for heap-allocated contexts: closes all resources and frees memory. + defer: + freeShared(ctx) + return ctx.closeResources() + +proc initContextResources[T](ctx: ptr FFIContext[T]): Result[void, string] = + ## Initialises all resources inside an already-allocated FFIContext slot. + ## On failure every partially-initialised resource is closed; the caller + ## is responsible for releasing the slot (freeShared or pool.releaseSlot). ctx.lock.initLock() ctx.reqSignal = ThreadSignalPtr.new().valueOr: - ctx.cleanUpResources().isOkOr: - return err("could not clean resources in a failure new reqSignal: " & $error) + ctx.closeResources().isOkOr: + return err("could not close resources after reqSignal failure: " & $error) return err("couldn't create reqSignal ThreadSignalPtr: " & $error) ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: - ctx.cleanUpResources().isOkOr: - return - err("could not clean resources in a failure new reqReceivedSignal: " & $error) + ctx.closeResources().isOkOr: + return err("could not close resources after reqReceivedSignal failure: " & $error) return err("couldn't create reqReceivedSignal ThreadSignalPtr") + ctx.watchdogStopSignal = ThreadSignalPtr.new().valueOr: + ctx.closeResources().isOkOr: + return + err("could not close resources after watchdogStopSignal failure: " & $error) + return err("couldn't create watchdogStopSignal ThreadSignalPtr") + ctx.registeredRequests = addr ffi_types.registeredRequests ctx.running.store(true) @@ -242,8 +278,8 @@ proc createFFIContext*[T](): Result[ptr FFIContext[T], string] = try: createThread(ctx.ffiThread, ffiThreadBody[T], ctx) except ValueError, ResourceExhaustedError: - ctx.cleanUpResources().isOkOr: - error "failed to clean up resources after ffiThread creation failure", err = error + ctx.closeResources().isOkOr: + error "failed to close resources after ffiThread creation failure", err = error return err("failed to create the FFI thread: " & getCurrentExceptionMsg()) try: @@ -254,28 +290,84 @@ proc createFFIContext*[T](): Result[ptr FFIContext[T], string] = if fireRes.isErr(): error "failed to signal ffiThread during watchdog cleanup", err = fireRes.error joinThread(ctx.ffiThread) - ctx.cleanUpResources().isOkOr: - error "failed to clean up resources after watchdogThread creation failure", err = error + ctx.closeResources().isOkOr: + error "failed to close resources after watchdogThread creation failure", + err = error return err("failed to create the watchdog thread: " & getCurrentExceptionMsg()) + return ok() + +# ── Pool helpers ───────────────────────────────────────────────────────────── + +proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], string] = + for i in 0 ..< MaxFFIContexts: + var expected = false + if pool.inUse[i].compareExchange(expected, true): + return ok(pool.slots[i].addr) + return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)") + +proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) = + for i in 0 ..< MaxFFIContexts: + if pool.slots[i].addr == ctx: + pool.inUse[i].store(false) + return + +# ── Public API ──────────────────────────────────────────────────────────────── + +proc createFFIContext*[T](): Result[ptr FFIContext[T], string] = + ## Creates a heap-allocated FFI context. The caller must call destroyFFIContext(ctx) + ## to release it. Prefer the pool overload when the maximum context count is known. + var ctx = createShared(FFIContext[T], 1) + initContextResources(ctx).isOkOr: + freeShared(ctx) + return err(error) return ok(ctx) -proc destroyFFIContext*[T](ctx: ptr FFIContext[T]): Result[void, string] = +proc createFFIContext*[T]( + pool: var FFIContextPool[T] +): Result[ptr FFIContext[T], string] = + ## Acquires a slot from the fixed pool and initialises it as an FFI context. + ## Bounded fd usage: at most MaxFFIContexts * 2 ThreadSignalPtr fds are ever open. + let ctx = pool.acquireSlot().valueOr: + return err(error) + initContextResources(ctx).isOkOr: + pool.releaseSlot(ctx) + return err(error) + return ok(ctx) + +proc signalStop[T](ctx: ptr FFIContext[T]): Result[void, string] = ctx.running.store(false) + let ffiSignaled = ctx.reqSignal.fireSync().valueOr: + ctx.onNotResponding() + return err("error signaling reqSignal in destroyFFIContext: " & $error) + if not ffiSignaled: + ctx.onNotResponding() + return err("failed to signal reqSignal on time in destroyFFIContext") + let wdSignaled = ctx.watchdogStopSignal.fireSync().valueOr: + return err("error signaling watchdogStopSignal in destroyFFIContext: " & $error) + if not wdSignaled: + return err("failed to signal watchdogStopSignal on time in destroyFFIContext") + return ok() + +proc destroyFFIContext*[T](ctx: ptr FFIContext[T]): Result[void, string] = defer: joinThread(ctx.ffiThread) joinThread(ctx.watchdogThread) ctx.cleanUpResources().isOkOr: error "failed to clean up resources in destroyFFIContext", err = error + return ctx.signalStop() - let signaledOnTime = ctx.reqSignal.fireSync().valueOr: - ctx.onNotResponding() - return err("error in destroyFFIContext: " & $error) - if not signaledOnTime: - ctx.onNotResponding() - return err("failed to signal reqSignal on time in destroyFFIContext") - - return ok() +proc destroyFFIContext*[T]( + pool: var FFIContextPool[T], ctx: ptr FFIContext[T] +): Result[void, string] = + ## Stops the FFI context and returns its slot to the pool. + defer: + joinThread(ctx.ffiThread) + joinThread(ctx.watchdogThread) + ctx.closeResources().isOkOr: + error "failed to close resources in destroyFFIContext", err = error + pool.releaseSlot(ctx) + return ctx.signalStop() template checkParams*(ctx: ptr FFIContext, callback: FFICallBack, userData: pointer) = if not isNil(ctx): diff --git a/tests/test_ffi_context.nim b/tests/test_ffi_context.nim index 9cb9542..e9261e6 100644 --- a/tests/test_ffi_context.nim +++ b/tests/test_ffi_context.nim @@ -61,6 +61,62 @@ registerReqFFI(EmptyOkRequest, lib: ptr TestLib): proc(): Future[Result[string, string]] {.async.} = return ok("") +suite "FFIContextPool": + test "create and destroy via pool succeeds": + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: + assert false, "createFFIContext(pool) failed: " & $error + return + check pool.destroyFFIContext(ctx).isOk() + + test "slot is reused after destroy": + var pool: FFIContextPool[TestLib] + let ctx1 = pool.createFFIContext().valueOr: + assert false, "createFFIContext(pool) failed: " & $error + return + check pool.destroyFFIContext(ctx1).isOk() + # After destroying, the same slot must be available again + let ctx2 = pool.createFFIContext().valueOr: + assert false, "createFFIContext(pool) failed after slot release: " & $error + return + check pool.destroyFFIContext(ctx2).isOk() + check ctx1 == ctx2 # same array slot reused + + test "pool exhaustion returns error": + var pool: FFIContextPool[TestLib] + var ctxs: array[MaxFFIContexts, ptr FFIContext[TestLib]] + for i in 0 ..< MaxFFIContexts: + ctxs[i] = pool.createFFIContext().valueOr: + for j in 0 ..< i: + discard pool.destroyFFIContext(ctxs[j]) + assert false, "createFFIContext(pool) failed at slot " & $i & ": " & $error + return + # Pool is now full — next create must fail + check pool.createFFIContext().isErr() + for i in 0 ..< MaxFFIContexts: + discard pool.destroyFFIContext(ctxs[i]) + + test "requests are processed via pool context": + var pool: FFIContextPool[TestLib] + var d: CallbackData + initCallbackData(d) + defer: + deinitCallbackData(d) + + let ctx = pool.createFFIContext().valueOr: + assert false, "createFFIContext(pool) failed: " & $error + return + defer: + discard pool.destroyFFIContext(ctx) + + check sendRequestToFFIThread( + ctx, PingRequest.ffiNewReq(testCallback, addr d, "pool".cstring) + ) + .isOk() + waitCallback(d) + check d.retCode == RET_OK + check callbackMsg(d) == "pong:pool" + suite "createFFIContext / destroyFFIContext": test "create and destroy succeeds": let ctx = createFFIContext[TestLib]().valueOr: @@ -79,14 +135,19 @@ suite "sendRequestToFFIThread": test "successful request triggers RET_OK callback": var d: CallbackData initCallbackData(d) - defer: deinitCallbackData(d) + defer: + deinitCallbackData(d) let ctx = createFFIContext[TestLib]().valueOr: check false return - defer: discard destroyFFIContext(ctx) + defer: + discard destroyFFIContext(ctx) - check sendRequestToFFIThread(ctx, PingRequest.ffiNewReq(testCallback, addr d, "hello".cstring)).isOk() + check sendRequestToFFIThread( + ctx, PingRequest.ffiNewReq(testCallback, addr d, "hello".cstring) + ) + .isOk() waitCallback(d) check d.retCode == RET_OK check callbackMsg(d) == "pong:hello" @@ -94,12 +155,14 @@ suite "sendRequestToFFIThread": test "failing request triggers RET_ERR callback": var d: CallbackData initCallbackData(d) - defer: deinitCallbackData(d) + defer: + deinitCallbackData(d) let ctx = createFFIContext[TestLib]().valueOr: check false return - defer: discard destroyFFIContext(ctx) + defer: + discard destroyFFIContext(ctx) check sendRequestToFFIThread(ctx, FailRequest.ffiNewReq(testCallback, addr d)).isOk() waitCallback(d) @@ -108,14 +171,17 @@ suite "sendRequestToFFIThread": test "empty ok response delivers empty message": var d: CallbackData initCallbackData(d) - defer: deinitCallbackData(d) + defer: + deinitCallbackData(d) let ctx = createFFIContext[TestLib]().valueOr: check false return - defer: discard destroyFFIContext(ctx) + defer: + discard destroyFFIContext(ctx) - check sendRequestToFFIThread(ctx, EmptyOkRequest.ffiNewReq(testCallback, addr d)).isOk() + check sendRequestToFFIThread(ctx, EmptyOkRequest.ffiNewReq(testCallback, addr d)) + .isOk() waitCallback(d) check d.retCode == RET_OK check d.msgLen == 0 @@ -124,13 +190,17 @@ suite "sendRequestToFFIThread": let ctx = createFFIContext[TestLib]().valueOr: check false return - defer: discard destroyFFIContext(ctx) + defer: + discard destroyFFIContext(ctx) for i in 1 .. 5: var d: CallbackData initCallbackData(d) let msg = "msg" & $i - check sendRequestToFFIThread(ctx, PingRequest.ffiNewReq(testCallback, addr d, msg.cstring)).isOk() + check sendRequestToFFIThread( + ctx, PingRequest.ffiNewReq(testCallback, addr d, msg.cstring) + ) + .isOk() waitCallback(d) deinitCallbackData(d) check d.retCode == RET_OK From 63d3276576c8f62604cd6b2afdff9b14297fe804 Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Sat, 2 May 2026 00:56:43 +0200 Subject: [PATCH 2/2] better code separation and better error handling --- ffi.nim | 5 ++- ffi/ffi_context.nim | 80 ++++++++-------------------------------- ffi/ffi_context_pool.nim | 56 ++++++++++++++++++++++++++++ 3 files changed, 75 insertions(+), 66 deletions(-) create mode 100644 ffi/ffi_context_pool.nim diff --git a/ffi.nim b/ffi.nim index 0ef64ac..ae70cbf 100644 --- a/ffi.nim +++ b/ffi.nim @@ -2,9 +2,10 @@ import std/[atomics, tables] import chronos, chronicles import ffi/internal/[ffi_library, ffi_macro], - ffi/[alloc, ffi_types, ffi_context, ffi_thread_request] + ffi/[alloc, ffi_types, ffi_context, ffi_context_pool, ffi_thread_request] export atomics, tables export chronos, chronicles export - atomics, alloc, ffi_library, ffi_macro, ffi_types, ffi_context, ffi_thread_request + atomics, alloc, ffi_library, ffi_macro, ffi_types, ffi_context, ffi_context_pool, + ffi_thread_request diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index b195958..c9f6c71 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -27,18 +27,6 @@ type FFIContext*[T] = object registeredRequests: ptr Table[cstring, FFIRequestProc] # Pointer to with the registered requests at compile time -const MaxFFIContexts* = 32 - ## Maximum number of concurrently live FFI contexts when using FFIContextPool. - ## Fds and threads are only consumed for slots that are actually acquired, - ## so this value only affects the upfront memory of the pool array. - -type FFIContextPool*[T] = object - ## Fixed-size pool of FFI contexts. Avoids dynamic heap allocation per context - ## and bounds the total number of file descriptors consumed by ThreadSignalPtrs - ## to at most MaxFFIContexts * 2. - slots: array[MaxFFIContexts, FFIContext[T]] - inUse: array[MaxFFIContexts, Atomic[bool]] - const git_version* {.strdefine.} = "n/a" template callEventCallback*(ctx: ptr FFIContext, eventName: string, body: untyped) = @@ -128,14 +116,14 @@ proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} = # waitSync returns early if watchdogStopSignal fires (i.e. on destroy). let startWait = ctx.watchdogStopSignal.waitSync(WatchdogStartDelay) if startWait.isErr(): - error "watchdog: start-delay waitSync failed", err = startWait.error + error "watchdog: start-delay waitSync failed", error = startWait.error elif startWait.get(): return # stop signal fired during start delay while ctx.running.load: let intervalWait = ctx.watchdogStopSignal.waitSync(WatchdogTimeinterval) if intervalWait.isErr(): - error "watchdog: interval waitSync failed", err = intervalWait.error + error "watchdog: interval waitSync failed", error = intervalWait.error elif intervalWait.get() or not ctx.running.load: break @@ -185,7 +173,7 @@ proc processRequest[T]( await retFut except CatchableError as exc: Result[string, string].err( - "Exception in processRequest for " & reqId & ": " & exc.msg + "Exception in processRequest for: " & reqId & ": " & exc.msg ) ## handleRes may raise (OOM, GC setup) even though it is rare. Catching here @@ -194,7 +182,7 @@ proc processRequest[T]( try: handleRes(res, request) except Exception as exc: - error "Unexpected exception in handleRes", exc = exc.msg + error "Unexpected exception in handleRes", error = exc.msg proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = ## FFI thread body that attends library user API requests @@ -230,7 +218,7 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = waitFor ffiRun(ctx) -proc closeResources[T](ctx: ptr FFIContext[T]): Result[void, string] = +proc closeResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = ## Closes file descriptors and deinits the lock. Does NOT free ctx memory. ## Used by initContextResources error paths and pool destroy, where ctx is ## not heap-allocated (pool slots live in a fixed array, not on the heap). @@ -249,7 +237,7 @@ proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] = freeShared(ctx) return ctx.closeResources() -proc initContextResources[T](ctx: ptr FFIContext[T]): Result[void, string] = +proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = ## Initialises all resources inside an already-allocated FFIContext slot. ## On failure every partially-initialised resource is closed; the caller ## is responsible for releasing the slot (freeShared or pool.releaseSlot). @@ -279,7 +267,7 @@ proc initContextResources[T](ctx: ptr FFIContext[T]): Result[void, string] = createThread(ctx.ffiThread, ffiThreadBody[T], ctx) except ValueError, ResourceExhaustedError: ctx.closeResources().isOkOr: - error "failed to close resources after ffiThread creation failure", err = error + error "failed to close resources after ffiThread creation failure", error = error return err("failed to create the FFI thread: " & getCurrentExceptionMsg()) try: @@ -288,30 +276,15 @@ proc initContextResources[T](ctx: ptr FFIContext[T]): Result[void, string] = ctx.running.store(false) let fireRes = ctx.reqSignal.fireSync() if fireRes.isErr(): - error "failed to signal ffiThread during watchdog cleanup", err = fireRes.error + error "failed to signal ffiThread during watchdog cleanup", error = fireRes.error joinThread(ctx.ffiThread) ctx.closeResources().isOkOr: error "failed to close resources after watchdogThread creation failure", - err = error + error = error return err("failed to create the watchdog thread: " & getCurrentExceptionMsg()) return ok() -# ── Pool helpers ───────────────────────────────────────────────────────────── - -proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], string] = - for i in 0 ..< MaxFFIContexts: - var expected = false - if pool.inUse[i].compareExchange(expected, true): - return ok(pool.slots[i].addr) - return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)") - -proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) = - for i in 0 ..< MaxFFIContexts: - if pool.slots[i].addr == ctx: - pool.inUse[i].store(false) - return - # ── Public API ──────────────────────────────────────────────────────────────── proc createFFIContext*[T](): Result[ptr FFIContext[T], string] = @@ -323,19 +296,7 @@ proc createFFIContext*[T](): Result[ptr FFIContext[T], string] = return err(error) return ok(ctx) -proc createFFIContext*[T]( - pool: var FFIContextPool[T] -): Result[ptr FFIContext[T], string] = - ## Acquires a slot from the fixed pool and initialises it as an FFI context. - ## Bounded fd usage: at most MaxFFIContexts * 2 ThreadSignalPtr fds are ever open. - let ctx = pool.acquireSlot().valueOr: - return err(error) - initContextResources(ctx).isOkOr: - pool.releaseSlot(ctx) - return err(error) - return ok(ctx) - -proc signalStop[T](ctx: ptr FFIContext[T]): Result[void, string] = +proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] = ctx.running.store(false) let ffiSignaled = ctx.reqSignal.fireSync().valueOr: ctx.onNotResponding() @@ -349,24 +310,15 @@ proc signalStop[T](ctx: ptr FFIContext[T]): Result[void, string] = return err("failed to signal watchdogStopSignal on time in destroyFFIContext") return ok() +proc joinFFIThreads*[T](ctx: ptr FFIContext[T]) = + joinThread(ctx.ffiThread) + joinThread(ctx.watchdogThread) + proc destroyFFIContext*[T](ctx: ptr FFIContext[T]): Result[void, string] = defer: - joinThread(ctx.ffiThread) - joinThread(ctx.watchdogThread) + joinFFIThreads(ctx) ctx.cleanUpResources().isOkOr: - error "failed to clean up resources in destroyFFIContext", err = error - return ctx.signalStop() - -proc destroyFFIContext*[T]( - pool: var FFIContextPool[T], ctx: ptr FFIContext[T] -): Result[void, string] = - ## Stops the FFI context and returns its slot to the pool. - defer: - joinThread(ctx.ffiThread) - joinThread(ctx.watchdogThread) - ctx.closeResources().isOkOr: - error "failed to close resources in destroyFFIContext", err = error - pool.releaseSlot(ctx) + error "failed to clean up resources in destroyFFIContext", error = error return ctx.signalStop() template checkParams*(ctx: ptr FFIContext, callback: FFICallBack, userData: pointer) = diff --git a/ffi/ffi_context_pool.nim b/ffi/ffi_context_pool.nim new file mode 100644 index 0000000..e0985c6 --- /dev/null +++ b/ffi/ffi_context_pool.nim @@ -0,0 +1,56 @@ +import std/atomics +import results, chronicles +import ./ffi_context + +export ffi_context + +const MaxFFIContexts* = 32 + ## Maximum number of concurrently live FFI contexts. + ## Fds and threads are only consumed for slots that are actually acquired, + ## so this value only affects the upfront memory of the pool array. + +type FFIContextPool*[T] = object + ## Fixed-size pool of FFI contexts. Avoids dynamic heap allocation per context + ## and bounds the total number of file descriptors consumed by ThreadSignalPtrs + ## to at most MaxFFIContexts * 2. + slots: array[MaxFFIContexts, FFIContext[T]] + inUse: array[MaxFFIContexts, Atomic[bool]] + +proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], string] = + for i in 0 ..< MaxFFIContexts: + var expected = false + if pool.inUse[i].compareExchange(expected, true): + return ok(pool.slots[i].addr) + return err("FFI context pool exhausted; max: " & $MaxFFIContexts & " contexts)") + +proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) = + for i in 0 ..< MaxFFIContexts: + if pool.slots[i].addr == ctx: + pool.inUse[i].store(false) + return + +proc createFFIContext*[T]( + pool: var FFIContextPool[T] +): Result[ptr FFIContext[T], string] = + ## Acquires a slot from the fixed pool and initialises it as an FFI context. + ## Bounded fd usage: at most MaxFFIContexts * 2 ThreadSignalPtr fds are ever open. + let ctx = pool.acquireSlot().valueOr: + return err("failure calling acquireSlot in createFFIContext: " & $error) + initContextResources(ctx).isOkOr: + pool.releaseSlot(ctx) + return err("failure calling initContextResources in createFFIContext: " & $error) + return ok(ctx) + +proc destroyFFIContext*[T]( + pool: var FFIContextPool[T], ctx: ptr FFIContext[T] +): Result[void, string] = + ## Stops the FFI context and returns its slot to the pool. + defer: + joinFFIThreads(ctx) + ctx.closeResources().isOkOr: + error "failed to close resources in destroyFFIContext", error = error + return err("failure calling closeResources in destroyFFIContext: " & $error) + pool.releaseSlot(ctx) + ctx.signalStop().isOkOr: + return err("failure calling signalStop in destroyFFIContext: " & $error) + return ok()