general ffi increments

This commit is contained in:
Ivan Folgueira Bande 2025-09-05 21:31:01 +02:00
parent 356f0ccc1b
commit a1a6536b3c
No known key found for this signature in database
GPG Key ID: 3C117481F89E24A7
4 changed files with 158 additions and 66 deletions

View File

@ -2,7 +2,7 @@
{.pragma: callback, cdecl, raises: [], gcsafe.}
{.passc: "-fPIC".}
import std/[options, atomics, os, net, locks]
import std/[options, atomics, os, net, locks, json]
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
import ./ffi_types, ./ffi_thread_request, ./internal/ffi_macro
@ -20,13 +20,13 @@ type FFIContext* = object
eventCallback*: pointer
eventUserdata*: pointer
running: Atomic[bool] # To control when the threads are running
registeredRequests: Table[string, FFIRequestProc]
registeredRequests: ptr Table[cstring, FFIRequestProc]
const git_version* {.strdefine.} = "n/a"
template callEventCallback(ctx: ptr FFIContext, eventName: string, body: untyped) =
template callEventCallback*(ctx: ptr FFIContext, eventName: string, body: untyped) =
if isNil(ctx[].eventCallback):
error eventName & " - eventCallback is nil"
chronicles.error eventName & " - eventCallback is nil"
return
foreignThreadGc:
@ -75,6 +75,24 @@ proc sendRequestToFFIThread*(
## process proc. See the 'waku_thread_request.nim' module for more details.
ok()
type Foo = object
registerReqFFI(WatchdogReq, foo: ptr Foo):
proc(): Future[Result[string, string]] {.async.} =
return ok("waku thread is not blocked")
type JsonWakuNotRespondingEvent = object
eventType: string
proc init(T: type JsonWakuNotRespondingEvent): T =
return JsonWakuNotRespondingEvent(eventType: "not_responding")
proc `$`(event: JsonWakuNotRespondingEvent): string =
$(%*event)
proc onWakuNotResponding*(ctx: ptr FFIContext) =
callEventCallback(ctx, "onWakuNotResponsive"):
$JsonWakuNotRespondingEvent.init()
proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} =
## Watchdog thread that monitors the Waku thread and notifies the library user if it hangs.
@ -100,16 +118,9 @@ proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} =
trace "Sending watchdog request to FFI thread"
# sendRequestToFFIThread(
# ctx,
# 0,
# DebugNodeRequest.createShared(DebugNodeMsgType.CHECK_WAKU_NOT_BLOCKED),
# wakuCallback,
# nilUserData,
# WakuNotRespondingTimeout,
# ).isOkOr:
# error "Failed to send watchdog request to FFI thread", error = $error
# onWakuNotResponding(ctx)
sendRequestToFFIThread(ctx, WatchdogReq.ffiNewReq(wakuCallback, nilUserData)).isOkOr:
error "Failed to send watchdog request to FFI thread", error = $error
onWakuNotResponding(ctx)
waitFor watchdogRun(ctx)
@ -133,7 +144,7 @@ proc ffiThreadBody[TT](ctx: ptr FFIContext) {.thread.} =
## Handle the request
asyncSpawn FFIThreadRequest.process(
request, addr ffiHandler, addr ctx.registeredRequests
request, addr ffiHandler, ctx.registeredRequests
)
let fireRes = ctx.reqReceivedSignal.fireSync()
@ -151,7 +162,7 @@ proc createFFIContext*[T](tt: typedesc[T]): Result[ptr FFIContext, string] =
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqReceivedSignal ThreadSignalPtr")
ctx.lock.initLock()
ctx.registeredRequests = ffi_macro.registeredRequests
ctx.registeredRequests = addr ffi_macro.registeredRequests
ctx.running.store(true)
@ -169,7 +180,7 @@ proc createFFIContext*[T](tt: typedesc[T]): Result[ptr FFIContext, string] =
return ok(ctx)
proc destroyFFIContext*[T](ctx: ptr FFIContext): Result[void, string] =
proc destroyFFIContext*(ctx: ptr FFIContext): Result[void, string] =
ctx.running.store(false)
let signaledOnTime = ctx.reqSignal.fireSync().valueOr:
@ -185,3 +196,10 @@ proc destroyFFIContext*[T](ctx: ptr FFIContext): Result[void, string] =
freeShared(ctx)
return ok()
template checkParams*(ctx: ptr FFIContext, callback: FFICallBack, userData: pointer) =
if not isNil(ctx):
ctx[].userData = userData
if isNil(callback):
return RET_MISSING_CALLBACK

View File

@ -65,7 +65,7 @@ proc process*[R](
T: type FFIThreadRequest,
request: ptr FFIThreadRequest,
reqHandler: ptr R,
registeredRequests: ptr Table[string, FFIRequestProc],
registeredRequests: ptr Table[cstring, FFIRequestProc],
) {.async.} =
let reqId = $request[].reqId

View File

@ -78,5 +78,18 @@ macro declareLibrary*(libraryName: static[string]): untyped =
res.add(initializeLibraryProc)
## Generate the exported C-callable callback setter
let setCallbackProc = quote:
proc set_event_callback(
ctx: ptr FFIContext, callback: FFICallBack, userData: pointer
) {.dynlib, exportc.} =
initializeLibrary()
ctx[].eventCallback = cast[pointer](callback)
ctx[].eventUserData = userData
res.add(setCallbackProc)
echo result.repr
return res

View File

@ -2,7 +2,56 @@ import std/[macros, tables]
import chronos
import ../ffi_types
var registeredRequests* {.threadvar.}: Table[string, FFIRequestProc]
var registeredRequests* {.threadvar.}: Table[cstring, FFIRequestProc]
proc extractFieldsFromLambda(body: NimNode): seq[NimNode] =
## Extracts the fields (params) from the given lambda body.
var procNode = body
if procNode.kind == nnkStmtList and procNode.len == 1:
procNode = procNode[0]
if procNode.kind != nnkLambda:
error "registerReqFFI expects a lambda proc, found: " & $procNode.kind
let params = procNode[3] # parameters list
result = @[]
for p in params[1 .. ^1]: # skip return type
result.add newIdentDefs(p[0], p[1])
proc buildRequestType(reqTypeName: NimNode, body: NimNode): NimNode =
## Builds:
## type <reqTypeName>* = object
## <lambdaParam1Name>: <lambdaParam1Type>
## ...
var procNode = body
if procNode.kind == nnkStmtList and procNode.len == 1:
procNode = procNode[0]
if procNode.kind != nnkLambda:
error "registerReqFFI expects a lambda proc, found: " & $procNode.kind
let params = procNode[3] # formal params of the lambda
var fields: seq[NimNode] = @[]
for p in params[1 .. ^1]: # skip return type at index 0
let name = p[0]
let typ = p[1]
# Field must be nnkIdentDefs(name, type, defaultExpr)
fields.add newTree(nnkIdentDefs, name, typ, newEmptyNode())
# Wrap fields in a rec list
let recList = newTree(nnkRecList, fields)
# object type node: object [of?] [] [pragma?] recList
let objTy = newTree(nnkObjectTy, newEmptyNode(), newEmptyNode(), recList)
# Export the type (CreateNodeRequest*)
let typeName =
if reqTypeName.kind == nnkPostfix:
reqTypeName
else:
postfix(reqTypeName, "*")
result =
newNimNode(nnkTypeSection).add(newTree(nnkTypeDef, typeName, newEmptyNode(), objTy))
proc buildFfiNewReqProc(reqTypeName, body: NimNode): NimNode =
var formalParams = newSeq[NimNode]()
@ -14,7 +63,7 @@ proc buildFfiNewReqProc(reqTypeName, body: NimNode): NimNode =
procNode = body
if procNode.kind != nnkLambda:
error "registerFFI expects a lambda definition. Found: " & $procNode.kind
error "registerReqFFI expects a lambda definition. Found: " & $procNode.kind
# T: typedesc[CreateNodeRequest]
let typedescParam = newIdentDefs(
@ -88,30 +137,32 @@ proc buildFfiNewReqProc(reqTypeName, body: NimNode): NimNode =
pragmas = newEmptyNode(),
)
proc buildFfiDeleteReqProc(reqTypeName: NimNode): NimNode =
## Generates something like:
## proc ffiDeleteReq(self: ptr CreateNodeRequest) =
## deallocShared(self[].configJson)
proc buildFfiDeleteReqProc(reqTypeName: NimNode, fields: seq[NimNode]): NimNode =
## Generates:
## proc ffiDeleteReq(self: ptr <reqTypeName>) =
## deallocShared(self[].<cstringField>)
## deallocShared(self)
result = newProc(
name = ident("ffiDeleteReq"),
params =
@[
newEmptyNode(), # return type is empty (void)
newIdentDefs(ident("self"), nnkPtrTy.newTree(reqTypeName)),
],
body = newStmtList(
nnkCall.newTree(
# Build the body
var body = newStmtList()
for f in fields:
if $f[1] == "cstring": # only dealloc cstring fields
body.add newCall(
ident("deallocShared"),
nnkBracketExpr.newTree(
nnkBracketExpr.newTree(ident("self"), newEmptyNode()), # self[]
ident("configJson"),
),
),
nnkCall.newTree(ident("deallocShared"), ident("self")),
),
procType = nnkProcDef,
newDotExpr(newTree(nnkDerefExpr, ident("self")), ident($f[0])),
)
# Always free the whole object at the end
body.add newCall(ident("deallocShared"), ident("self"))
# Build the parameter: (self: ptr <reqTypeName>)
let selfParam = newIdentDefs(ident("self"), newTree(nnkPtrTy, reqTypeName))
# Build the proc definition
result = newProc(
name = postfix(ident("ffiDeleteReq"), "*"),
params = @[newEmptyNode()] & @[selfParam], # ✅ properly wrapped in a sequence
body = body,
)
proc buildProcessFFIRequestProc(reqTypeName, reqHandler, body: NimNode): NimNode =
@ -138,7 +189,7 @@ proc buildProcessFFIRequestProc(reqTypeName, reqHandler, body: NimNode): NimNode
if procNode.kind == nnkStmtList and procNode.len == 1:
procNode = procNode[0]
if procNode.kind != nnkLambda:
error "registerFFI expects a lambda definition. Found: " & $procNode.kind
error "registerReqFFI expects a lambda definition. Found: " & $procNode.kind
let typedescParam =
newIdentDefs(ident("T"), nnkBracketExpr.newTree(ident("typedesc"), reqTypeName))
@ -251,36 +302,46 @@ proc addNewRequestToRegistry(reqTypeName, reqHandler: NimNode): NimNode =
result =
newAssignment(newTree(nnkBracketExpr, ident("registeredRequests"), key), asyncProc)
macro registerFFI*(reqTypeName, reqHandler, body: untyped): untyped =
macro registerReqFFI*(reqTypeName, reqHandler, body: untyped): untyped =
## Registers a request that will be handled by the ffi thread.
## The request should be sent from the ffi consumer thread.
##
# Extract lambda params to generate fields
let fields = extractFieldsFromLambda(body)
let typeDef = buildRequestType(reqTypeName, body)
let ffiNewReqProc = buildFfiNewReqProc(reqTypeName, body)
let processProc = buildProcessFFIRequestProc(reqTypeName, reqHandler, body)
let addNewReqToReg = addNewRequestToRegistry(reqTypeName, reqHandler)
result = newStmtList(ffiNewReqProc, processProc, addNewReqToReg)
let deleteProc = buildFfiDeleteReqProc(reqTypeName, fields)
result = newStmtList(typeDef, ffiNewReqProc, deleteProc, processProc, addNewReqToReg)
macro ffiGenerateProcess*(): untyped =
## Generates a case statement that handles all the possible registered FFI requests
let reqParam = ident"request"
let reqHandlerParam = ident"reqHandler"
echo "Registered FFI request: " & result.repr
var caseStmt =
newTree(nnkCaseStmt, newDotExpr(newTree(nnkBracketExpr, reqParam), ident"reqId"))
macro processReq*(reqType: typed, args: varargs[untyped]): untyped =
## Expands T.processReq(a,b,...) into the sendRequest boilerplate.
caseStmt.add newTree(
nnkElse,
nnkStmtList.newTree(
newCall(
ident"nilProcess", newDotExpr(newTree(nnkBracketExpr, reqParam), ident"reqId")
)
),
# Collect the passed arguments as NimNodes
var callArgs = @[reqType, ident("callback"), ident("userData")]
for a in args:
callArgs.add a
# Build: ffiNewReq(reqType, callback, userData, arg1, arg2, ...)
let newReqCall = newCall(ident("ffiNewReq"), callArgs)
# Build: ffi_context.sendRequestToFFIThread(ctx, <newReqCall>)
let sendCall = newCall(
newDotExpr(ident("ffi_context"), ident("sendRequestToFFIThread")),
ident("ctx"),
newReqCall,
)
let retFutSym = ident"retFut"
result = quote:
proc process*[R](
T: type FFIThreadRequest,
`reqParam`: ptr FFIThreadRequest,
`reqHandlerParam`: ptr R,
) {.async.} =
let `retFutSym` = `caseStmt`
handleRes(await `retFutSym`, `reqParam`)
block:
let res = `sendCall`
if res.isErr:
let msg = "error in sendRequestToFFIThread: " & res.error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData)
return RET_ERR
return RET_OK