avoid use gc ed types in FFIContext and better macro documentation

This commit is contained in:
Ivan Folgueira Bande 2025-12-13 23:53:59 +01:00
parent 6811c8675f
commit d7a5492121
No known key found for this signature in database
GPG Key ID: 3C117481F89E24A7
4 changed files with 176 additions and 38 deletions

View File

@ -7,7 +7,7 @@ import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single,
import ./ffi_types, ./ffi_thread_request, ./internal/ffi_macro, ./logging
type FFIContext*[T] = object
myLib*: T
myLib*: ptr T
# main library object (e.g., Waku, LibP2P, SDS, the one to be exposed as a library)
ffiThread: Thread[(ptr FFIContext[T])]
# represents the main FFI thread in charge of attending API consumer actions
@ -76,7 +76,7 @@ proc sendRequestToFFIThread*(
## Notice that in case of "ok", the deallocShared(req) is performed by the FFI Thread in the
## process proc.
ok()
return ok()
type Foo = object
registerReqFFI(WatchdogReq, foo: ptr Foo):
@ -98,6 +98,7 @@ proc onNotResponding*(ctx: ptr FFIContext) =
proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} =
## Watchdog thread that monitors the FFI thread and notifies the library user if it hangs.
## This thread never blocks.
let watchdogRun = proc(ctx: ptr FFIContext) {.async.} =
const WatchdogStartDelay = 10.seconds
@ -126,12 +127,34 @@ proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} =
waitFor watchdogRun(ctx)
proc processRequest[T](
request: ptr FFIThreadRequest, ctx: ptr FFIContext[T]
) {.async.} =
## Invoked within the FFI thread to process a request coming from the FFI API consumer thread.
let reqId = $request[].reqId
## The reqId determines which proc will handle the request.
## The registeredRequests represents a table defined at compile time.
## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously]
let retFut =
if not ctx[].registeredRequests[].contains(reqId):
## That shouldn't happen because only registered requests should be sent to the FFI thread.
nilProcess(request[].reqId)
else:
ctx[].registeredRequests[][reqId](request[].reqContent, ctx)
handleRes(await retFut, request)
proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
## FFI thread that attends library user API requests
## FFI thread body that attends library user API requests
logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)
let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} =
var ffiReqHandler: T
## Holds the main library object, i.e., in charge of handling the ffi requests.
## e.g., Waku, LibP2P, SDS, etc.
while true:
await ctx.reqSignal.wait()
@ -145,10 +168,10 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
chronicles.error "ffi thread could not receive a request"
continue
ctx.myLib = addr ffiReqHandler
## Handle the request
asyncSpawn FFIThreadRequest.process(
request, addr ctx.myLib, ctx.registeredRequests
)
asyncSpawn processRequest(request, ctx)
let fireRes = ctx.reqReceivedSignal.fireSync()
if fireRes.isErr():

View File

@ -4,12 +4,12 @@
import std/[json, macros], results, tables
import chronos, chronos/threadsync
import ./ffi_types, ./internal/ffi_macro, ./alloc, ./ffi_context
import ./ffi_types, ./internal/ffi_macro, ./alloc
type FFIThreadRequest* = object
callback: FFICallBack
userData: pointer
reqId: cstring
reqId*: cstring
reqContent*: pointer
proc init*(
@ -30,7 +30,7 @@ proc deleteRequest(request: ptr FFIThreadRequest) =
deallocShared(request[].reqId)
deallocShared(request)
proc handleRes[T: string | void](
proc handleRes*[T: string | void](
res: Result[T, string], request: ptr FFIThreadRequest
) =
## Handles the Result responses, which can either be Result[string, string] or
@ -56,20 +56,6 @@ proc handleRes[T: string | void](
)
return
proc nilProcess(reqId: cstring): Future[Result[string, string]] {.async.} =
proc nilProcess*(reqId: cstring): Future[Result[string, string]] {.async.} =
return err("This request type is not implemented: " & $reqId)
proc process*[R](
T: type FFIThreadRequest,
request: ptr FFIThreadRequest,
reqHandler: ptr R,
registeredRequests: ptr Table[cstring, FFIRequestProc],
) {.async.} =
let reqId = $request[].reqId
let retFut =
if not registeredRequests[].contains(reqId):
nilProcess(request[].reqId)
else:
registeredRequests[][reqId](request[].reqContent, reqHandler)
handleRes(await retFut, request)

View File

@ -30,9 +30,9 @@ template foreignThreadGc*(body: untyped) =
when declared(tearDownForeignThreadGc):
tearDownForeignThreadGc()
type onDone* = proc()
## Registered requests table populated at compile time and never updated at run time
## Registered requests table populated at compile time and never updated at run time.
## The key represents the request type name as cstring, e.g., "CreateNodeRequest".
## The value is a proc that handles the request asynchronously.
var registeredRequests*: Table[cstring, FFIRequestProc]
### End of FFI utils

View File

@ -3,7 +3,18 @@ import chronos
import ../ffi_types
proc extractFieldsFromLambda(body: NimNode): seq[NimNode] =
## Extracts the fields (params) from the given lambda body.
## Extracts the fields (params) from the given lambda body, when using the registerReqFFI macro.
## e.g., for:
## registerReqFFI(CreateNodeRequest, ctx: ptr FFIContext[Waku]):
## proc(
## configJson: cstring, appCallbacks: AppCallbacks
## ): Future[Result[string, string]] {.async.} =
## ...
## The extracted fields will be:
## - configJson: cstring
## - appCallbacks: AppCallbacks
##
var procNode = body
if procNode.kind == nnkStmtList and procNode.len == 1:
procNode = procNode[0]
@ -15,11 +26,19 @@ proc extractFieldsFromLambda(body: NimNode): seq[NimNode] =
for p in params[1 .. ^1]: # skip return type
result.add newIdentDefs(p[0], p[1])
when defined(ffiDumpMacros):
echo result.repr
proc buildRequestType(reqTypeName: NimNode, body: NimNode): NimNode =
## Builds:
## type <reqTypeName>* = object
## <lambdaParam1Name>: <lambdaParam1Type>
## ...
## e.g.:
## type CreateNodeRequest* = object
## configJson: cstring
## appCallbacks: AppCallbacks
##
var procNode = body
if procNode.kind == nnkStmtList and procNode.len == 1:
@ -51,7 +70,28 @@ proc buildRequestType(reqTypeName: NimNode, body: NimNode): NimNode =
result =
newNimNode(nnkTypeSection).add(newTree(nnkTypeDef, typeName, newEmptyNode(), objTy))
when defined(ffiDumpMacros):
echo result.repr
proc buildFfiNewReqProc(reqTypeName, body: NimNode): NimNode =
## Builds the ffiNewProc in charge of creating the FFIThreadRequest in shared memory.
## Then, a pointer to this request will be sent to the FFI thread for processing.
## e.g.:
## proc ffiNewReq*(T: typedesc[CreateNodeRequest]; callback: FFICallBack;
## userData: pointer; configJson: cstring;
## appCallbacks: AppCallbacks): ptr FFIThreadRequest =
## var reqObj = createShared(T)
## reqObj[].configJson = configJson.alloc()
## reqObj[].appCallbacks = appCallbacks
## let typeStr`gensym2866 = $T
## var ret`gensym2866 = FFIThreadRequest.init(callback, userData,
## typeStr`gensym2866.cstring, reqObj)
## return ret`gensym2866
##
## This should be invoked by the ffi consumer thread (generally, main thread.)
## Notice that the shared memory allocated by the main thread is freed by the FFI thread
## after processing the request.
var formalParams = newSeq[NimNode]()
var procNode: NimNode
@ -135,6 +175,9 @@ proc buildFfiNewReqProc(reqTypeName, body: NimNode): NimNode =
pragmas = newEmptyNode(),
)
when defined(ffiDumpMacros):
echo result.repr
proc buildFfiDeleteReqProc(reqTypeName: NimNode, fields: seq[NimNode]): NimNode =
## Generates:
## proc ffiDeleteReq(self: ptr <reqTypeName>) =
@ -163,15 +206,15 @@ proc buildFfiDeleteReqProc(reqTypeName: NimNode, fields: seq[NimNode]): NimNode
body = body,
)
when defined(ffiDumpMacros):
echo result.repr
proc buildProcessFFIRequestProc(reqTypeName, reqHandler, body: NimNode): NimNode =
## Builds:
## Builds, f.e.:
## proc processFFIRequest(T: typedesc[CreateNodeRequest];
## configJson: cstring;
## appCallbacks: AppCallbacks;
## waku: ptr Waku) ...
## Builds:
## proc processFFIRequest*(request: pointer; waku: ptr Waku) ...
## ctx: ptr FFIContext[Waku]) ...
if reqHandler.kind != nnkExprColonExpr:
error(
@ -237,10 +280,13 @@ proc buildProcessFFIRequestProc(reqTypeName, reqHandler, body: NimNode): NimNode
newEmptyNode(),
)
when defined(ffiDumpMacros):
echo result.repr
proc addNewRequestToRegistry(reqTypeName, reqHandler: NimNode): NimNode =
## Adds a new request to the registeredRequests table.
## The key is the hash of the request type name, and the value is the NimNode
## representing the request type.
## The key is a representation of the request, e.g. "CreateNodeReq".
## The value is a proc definition in charge of handling the request from FFI thread.
# Build: request[].reqContent
let reqContent =
@ -294,16 +340,44 @@ proc addNewRequestToRegistry(reqTypeName, reqHandler: NimNode): NimNode =
let reqTypeNameStr = $reqTypeName
# Use a string literal instead of reqTypeNameStr
let key = newLit($reqTypeName)
# Generate: registeredRequests["CreateNodeRequest"] = <generated proc>
result =
newAssignment(newTree(nnkBracketExpr, ident("registeredRequests"), key), asyncProc)
when defined(ffiDumpMacros):
echo result.repr
macro registerReqFFI*(reqTypeName, reqHandler, body: untyped): untyped =
## Registers a request that will be handled by the ffi thread.
## Registers a request that will be handled by the FFI/working thread.
## The request should be sent from the ffi consumer thread.
##
##
## e.g.:
## In this example, we register a CreateNodeRequest that will be handled by a proc that contains
## the provided lambda body and parameters, by the FFI/working thread.
##
## The lambda passed to this macro must:
## - only have no-GC'ed types.
## - Return Future[Result[string, string]] and be annotated with {.async.}
## And notice that the returned values will be sent back to the ffi consumer thread.
##
## registerReqFFI(CreateNodeRequest, ctx: ptr FFIContext[Waku]):
## proc(
## configJson: cstring, appCallbacks: AppCallbacks
## ): Future[Result[string, string]] {.async.} =
## ctx.myLib[] = (await createWaku(configJson, cast[AppCallbacks](appCallbacks))).valueOr:
## return err($error)
## return ok("")
##
## On the other hand, the created FFI request should be dispatched from the ffi consumer thread
## (generally, the main thread) following something like:
##
## ffi.sendRequestToFFIThread(
## ctx, CreateNodeRequest.ffiNewReq(callback, userData, configJson, appCallbacks)
## ).isOkOr:
## ...
## ...
##
# Extract lambda params to generate fields
let fields = extractFieldsFromLambda(body)
@ -315,10 +389,17 @@ macro registerReqFFI*(reqTypeName, reqHandler, body: untyped): untyped =
let deleteProc = buildFfiDeleteReqProc(reqTypeName, fields)
result = newStmtList(typeDef, ffiNewReqProc, deleteProc, processProc, addNewReqToReg)
when defined(ffiDumpMacros):
echo result.repr
macro processReq*(
reqType, ctx, callback, userData: untyped, args: varargs[untyped]
): untyped =
## Expands T.processReq(ctx, callback, userData, a, b, ...)
## e.g.:
## waku_dial_peerReq.processReq(ctx, callback, userData, peerMultiAddr, protocol, timeoutMs)
##
var callArgs = @[reqType, callback, userData]
for a in args:
callArgs.add a
@ -338,7 +419,52 @@ macro processReq*(
return RET_ERR
return RET_OK
when defined(ffiDumpMacros):
echo result.repr
macro ffi*(prc: untyped): untyped =
## Defines an FFI-exported proc that registers a request handler to be executed
## asynchronously in the FFI thread.
##
## {.ffi.} implicitly implies: ...Return[Future[Result[string, string]] {.async.}
##
## When using {.ffi.}, the first three parameters must be:
## - ctx: ptr FFIContext[T] <-- T is the type that handles the FFI requests
## - callback: FFICallBack
## - userData: pointer
## Then, additional parameters may be defined as needed, after these first three, always
## considering that only no-GC'ed (or C-like) types are allowed.
##
## e.g.:
## proc waku_version(
## ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
## ) {.ffi.} =
## return ok(WakuNodeVersionString)
##
## e.g2.:
## proc waku_start(
## ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
## ) {.ffi.} =
## (await startWaku(ctx[].myLib)).isOkOr:
## error "START_NODE failed", error = error
## return err("failed to start: " & $error)
## return ok("")
##
## e.g3.:
## proc waku_peer_exchange_request(
## ctx: ptr FFIContext[Waku],
## callback: FFICallBack,
## userData: pointer,
## numPeers: uint64,
## ) {.ffi.} =
## let numValidPeers = (await performPeerExchangeRequestTo(numPeers, ctx.myLib[])).valueOr:
## error "waku_peer_exchange_request failed", error = error
## return err("failed peer exchange: " & $error)
## return ok($numValidPeers)
##
## In these examples, notice that ctx.myLib is of type "ptr Waku", being Waku main library type.
##
let procName = prc[0]
let formalParams = prc[3]
let bodyNode = prc[^1]
@ -414,3 +540,6 @@ macro ffi*(prc: untyped): untyped =
`anonymousProcNode`
result = newStmtList(registerReq, ffiProc)
when defined(ffiDumpMacros):
echo result.repr