mirror of
https://github.com/logos-messaging/nim-ffi.git
synced 2026-01-10 10:03:07 +00:00
more positive progress. Getting closer
This commit is contained in:
parent
bbddf6925b
commit
46e51e45a6
6
ffi.nim
6
ffi.nim
@ -1,3 +1,5 @@
|
||||
import ffi/alloc
|
||||
import
|
||||
ffi/[alloc, ffi_types, ffi_context, ffi_thread_request],
|
||||
ffi/internal/[ffi_library, ffi_macro]
|
||||
|
||||
export alloc
|
||||
export alloc, ffi_library, ffi_macro, ffi_types, ffi_context, ffi_thread_request
|
||||
|
||||
@ -4,14 +4,13 @@
|
||||
|
||||
import std/[options, atomics, os, net, locks]
|
||||
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
|
||||
import ./ffi_types
|
||||
import ./ffi_types, ./ffi_thread_request
|
||||
|
||||
type FFIContext* = object
|
||||
libraryName: cstring
|
||||
ffiThread: Thread[(ptr FFIContext)]
|
||||
# represents the main thread in charge of attending SDK consumer actions
|
||||
# represents the main FFI thread in charge of attending API consumer actions
|
||||
watchdogThread: Thread[(ptr FFIContext)]
|
||||
# monitors the FFI thread and notifies the FFI SDK consumer if it hangs
|
||||
# monitors the FFI thread and notifies the FFI API consumer if it hangs
|
||||
lock: Lock
|
||||
reqChannel: ChannelSPSCSingle[ptr FFIThreadRequest]
|
||||
reqSignal: ThreadSignalPtr # to notify the FFI Thread that a new request is sent
|
||||
@ -43,13 +42,8 @@ template callEventCallback(ctx: ptr FFIContext, eventName: string, body: untyped
|
||||
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
|
||||
)
|
||||
|
||||
proc sendRequestToWakuThread*(
|
||||
ctx: ptr FFIContext,
|
||||
reqType: RequestType,
|
||||
reqContent: pointer,
|
||||
callback: FFICallBack,
|
||||
userData: pointer,
|
||||
timeout = InfiniteDuration,
|
||||
proc sendRequestToFFIThread*(
|
||||
ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration
|
||||
): Result[void, string] =
|
||||
ctx.lock.acquire()
|
||||
# This lock is only necessary while we use a SP Channel and while the signalling
|
||||
@ -59,26 +53,21 @@ proc sendRequestToWakuThread*(
|
||||
defer:
|
||||
ctx.lock.release()
|
||||
|
||||
let req = FFIThreadRequest.createShared(reqType, reqContent, callback, userData)
|
||||
## Sending the request
|
||||
let sentOk = ctx.reqChannel.trySend(req)
|
||||
let sentOk = ctx.reqChannel.trySend(ffiRequest)
|
||||
if not sentOk:
|
||||
deallocShared(req)
|
||||
return err("Couldn't send a request to the waku thread: " & $req[])
|
||||
return err("Couldn't send a request to the ffi thread")
|
||||
|
||||
let fireSyncRes = ctx.reqSignal.fireSync()
|
||||
if fireSyncRes.isErr():
|
||||
deallocShared(req)
|
||||
return err("failed fireSync: " & $fireSyncRes.error)
|
||||
|
||||
if fireSyncRes.get() == false:
|
||||
deallocShared(req)
|
||||
return err("Couldn't fireSync in time")
|
||||
|
||||
## wait until the Waku Thread properly received the request
|
||||
let res = ctx.reqReceivedSignal.waitSync(timeout)
|
||||
if res.isErr():
|
||||
deallocShared(req)
|
||||
return err("Couldn't receive reqReceivedSignal signal")
|
||||
|
||||
## Notice that in case of "ok", the deallocShared(req) is performed by the Waku Thread in the
|
||||
@ -110,24 +99,24 @@ proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} =
|
||||
|
||||
trace "Sending watchdog request to FFI thread"
|
||||
|
||||
sendRequestToWakuThread(
|
||||
ctx,
|
||||
RequestType.DEBUG,
|
||||
DebugNodeRequest.createShared(DebugNodeMsgType.CHECK_WAKU_NOT_BLOCKED),
|
||||
wakuCallback,
|
||||
nilUserData,
|
||||
WakuNotRespondingTimeout,
|
||||
).isOkOr:
|
||||
error "Failed to send watchdog request to FFI thread", error = $error
|
||||
onWakuNotResponding(ctx)
|
||||
# sendRequestToFFIThread(
|
||||
# ctx,
|
||||
# 0,
|
||||
# DebugNodeRequest.createShared(DebugNodeMsgType.CHECK_WAKU_NOT_BLOCKED),
|
||||
# wakuCallback,
|
||||
# nilUserData,
|
||||
# WakuNotRespondingTimeout,
|
||||
# ).isOkOr:
|
||||
# error "Failed to send watchdog request to FFI thread", error = $error
|
||||
# onWakuNotResponding(ctx)
|
||||
|
||||
waitFor watchdogRun(ctx)
|
||||
|
||||
proc ffiThreadBody(ctx: ptr FFIContext) {.thread.} =
|
||||
proc ffiThreadBody[TT](ctx: ptr FFIContext) {.thread.} =
|
||||
## FFI thread that attends library user API requests
|
||||
|
||||
let ffiRun = proc(ctx: ptr FFIContext) {.async.} =
|
||||
var waku: Waku
|
||||
var ffiHandler: TT
|
||||
while true:
|
||||
await ctx.reqSignal.wait()
|
||||
|
||||
@ -138,11 +127,11 @@ proc ffiThreadBody(ctx: ptr FFIContext) {.thread.} =
|
||||
var request: ptr FFIThreadRequest
|
||||
let recvOk = ctx.reqChannel.tryRecv(request)
|
||||
if not recvOk:
|
||||
error "ffi thread could not receive a request"
|
||||
chronicles.error "ffi thread could not receive a request"
|
||||
continue
|
||||
|
||||
## Handle the request
|
||||
asyncSpawn FFIThreadRequest.process(request, addr waku)
|
||||
asyncSpawn FFIThreadRequest.process(request, addr ffiHandler)
|
||||
|
||||
let fireRes = ctx.reqReceivedSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
@ -150,7 +139,7 @@ proc ffiThreadBody(ctx: ptr FFIContext) {.thread.} =
|
||||
|
||||
waitFor ffiRun(ctx)
|
||||
|
||||
proc createFFIContext*(libraryName: cstring): Result[ptr FFIContext, string] =
|
||||
proc createFFIContext*[T](tt: typedesc[T]): Result[ptr FFIContext, string] =
|
||||
## This proc is called from the main thread and it creates
|
||||
## the FFI working thread.
|
||||
var ctx = createShared(FFIContext, 1)
|
||||
@ -163,7 +152,7 @@ proc createFFIContext*(libraryName: cstring): Result[ptr FFIContext, string] =
|
||||
ctx.running.store(true)
|
||||
|
||||
try:
|
||||
createThread(ctx.ffiThread, ffiThreadBody, ctx)
|
||||
createThread(ctx.ffiThread, ffiThreadBody[tt], ctx)
|
||||
except ValueError, ResourceExhaustedError:
|
||||
freeShared(ctx)
|
||||
return err("failed to create the Waku thread: " & getCurrentExceptionMsg())
|
||||
@ -176,7 +165,7 @@ proc createFFIContext*(libraryName: cstring): Result[ptr FFIContext, string] =
|
||||
|
||||
return ok(ctx)
|
||||
|
||||
proc destroyFFIContext*(ctx: ptr FFIContext): Result[void, string] =
|
||||
proc destroyFFIContext*[T](ctx: ptr FFIContext): Result[void, string] =
|
||||
ctx.running.store(false)
|
||||
|
||||
let signaledOnTime = ctx.reqSignal.fireSync().valueOr:
|
||||
@ -189,7 +178,6 @@ proc destroyFFIContext*(ctx: ptr FFIContext): Result[void, string] =
|
||||
ctx.lock.deinitLock()
|
||||
?ctx.reqSignal.close()
|
||||
?ctx.reqReceivedSignal.close()
|
||||
deallocShared(ctx.libraryName)
|
||||
freeShared(ctx)
|
||||
|
||||
return ok()
|
||||
|
||||
83
ffi/ffi_thread_request.nim
Normal file
83
ffi/ffi_thread_request.nim
Normal file
@ -0,0 +1,83 @@
|
||||
## This file contains the base message request type that will be handled.
|
||||
## The requests are created by the main thread and processed by
|
||||
## the Waku Thread.
|
||||
|
||||
import std/macros
|
||||
import std/json, results
|
||||
import chronos, chronos/threadsync
|
||||
import ./ffi_types, ./internal/ffi_macro
|
||||
import waku/factory/waku
|
||||
import ../../../library/waku_thread_requests/requests/peer_manager_request
|
||||
# type FFIRequestProcessor* =
|
||||
# concept
|
||||
# proc process[R](
|
||||
# T: type FFIThreadRequest, request: ptr FFIThreadRequest, reqHandler: ptr R
|
||||
# )
|
||||
|
||||
type FFIThreadRequest* = object
|
||||
callback: FFICallBack
|
||||
userData: pointer
|
||||
reqId: uint
|
||||
reqContent*: pointer
|
||||
|
||||
proc init*(
|
||||
T: typedesc[FFIThreadRequest],
|
||||
callback: FFICallBack,
|
||||
userData: pointer,
|
||||
reqId: uint,
|
||||
reqContent: pointer,
|
||||
): ptr type T =
|
||||
var ret = createShared(FFIThreadRequest)
|
||||
ret[].callback = callback
|
||||
ret[].userData = userData
|
||||
ret[].reqId = reqId
|
||||
ret[].reqContent = reqContent
|
||||
return ret
|
||||
|
||||
proc deleteRequest(request: ptr FFIThreadRequest) =
|
||||
deallocShared(request)
|
||||
|
||||
proc handleRes[T: string | void](
|
||||
res: Result[T, string], request: ptr FFIThreadRequest
|
||||
) =
|
||||
## Handles the Result responses, which can either be Result[string, string] or
|
||||
## Result[void, string].
|
||||
|
||||
defer:
|
||||
deleteRequest(request)
|
||||
|
||||
if res.isErr():
|
||||
foreignThreadGc:
|
||||
let msg = "libwaku error: handleRes fireSyncRes error: " & $res.error
|
||||
request[].callback(
|
||||
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData
|
||||
)
|
||||
return
|
||||
|
||||
foreignThreadGc:
|
||||
var msg: cstring = ""
|
||||
when T is string:
|
||||
msg = res.get().cstring()
|
||||
request[].callback(
|
||||
RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData
|
||||
)
|
||||
return
|
||||
|
||||
proc nilProcess(reqId: uint): Future[Result[string, string]] {.async.} =
|
||||
return err("This request type is not implemented: " & $reqId)
|
||||
|
||||
ffiGenerateProcess()
|
||||
|
||||
# dumpAstGen:
|
||||
# proc process*[R](
|
||||
# T: type FFIThreadRequest, request: ptr FFIThreadRequest, reqHandler: ptr R
|
||||
# ) {.async.} =
|
||||
# # reqHandler represents the object that actually processes the request.
|
||||
# let retFut =
|
||||
# case request[].reqId
|
||||
# of 1:
|
||||
# cast[ptr PeerManagementRequest](request[].reqContent).process(reqHandler)
|
||||
# else:
|
||||
# nilProcess(request[].reqId)
|
||||
|
||||
# handleRes(await retFut, request)
|
||||
@ -1,51 +1,82 @@
|
||||
import std/macros, strformat
|
||||
import std/[macros, atomics], strformat, chronicles, chronos
|
||||
|
||||
macro declareLibrary*(libraryName: static[string]): untyped =
|
||||
result = newStmtList()
|
||||
var res = newStmtList()
|
||||
|
||||
{.pragma: exported, exportc, cdecl, raises: [].}
|
||||
{.pragma: callback, cdecl, raises: [], gcsafe.}
|
||||
{.passc: "-fPIC".}
|
||||
## Generate {.pragma: exported, exportc, cdecl, raises: [].}
|
||||
res.add nnkPragma.newTree(
|
||||
nnkExprColonExpr.newTree(ident"pragma", ident"exported"),
|
||||
ident"exportc",
|
||||
ident"cdecl",
|
||||
nnkExprColonExpr.newTree(ident"raises", nnkBracket.newTree()),
|
||||
)
|
||||
|
||||
## Generate {.pragma: callback, cdecl, raises: [], gcsafe.}
|
||||
res.add nnkPragma.newTree(
|
||||
nnkExprColonExpr.newTree(ident"pragma", ident"callback"),
|
||||
ident"cdecl",
|
||||
nnkExprColonExpr.newTree(ident"raises", nnkBracket.newTree()),
|
||||
ident"gcsafe",
|
||||
)
|
||||
|
||||
## Generate {.passc: "-fPIC".}
|
||||
res.add nnkPragma.newTree(nnkExprColonExpr.newTree(ident"passc", newLit("-fPIC")))
|
||||
|
||||
when defined(linux):
|
||||
## Generates {.passl: "-Wl,-soname,libwaku.so".}
|
||||
## Generates {.passl: "-Wl,-soname,libwaku.so".} (considering libraryName=="waku", for example)
|
||||
let soName = fmt"-Wl,-soname,lib{libraryName}.so"
|
||||
result.add(nnkPragmaStmt.newTree(ident"passl", newStrLitNode(soName)))
|
||||
res.add(
|
||||
newNimNode(nnkPragma).add(
|
||||
nnkExprColonExpr.newTree(ident"passl", newStrLitNode(soName))
|
||||
)
|
||||
)
|
||||
|
||||
## proc lib{libraryName}NimMain() {.importc.}
|
||||
let procName = ident(fmt"lib{libraryName}NimMain")
|
||||
let libNimMainName = ident(fmt"lib{libraryName}NimMain")
|
||||
let importcPragma = nnkPragma.newTree(ident"importc")
|
||||
let procDef = newProc(
|
||||
name = procName,
|
||||
params = @[],
|
||||
name = libNimMainName,
|
||||
params = @[ident"void"],
|
||||
pragmas = importcPragma,
|
||||
body = newEmptyNode(),
|
||||
returnType = newEmptyNode(), # no return value
|
||||
)
|
||||
result.add(procDef)
|
||||
res.add(procDef)
|
||||
|
||||
################################################################################
|
||||
### Library setup
|
||||
# Create: var initialized: Atomic[bool]
|
||||
let atomicType = nnkBracketExpr.newTree(ident("Atomic"), ident("bool"))
|
||||
let varStmt = nnkVarSection.newTree(
|
||||
nnkIdentDefs.newTree(ident("initialized"), atomicType, newEmptyNode())
|
||||
)
|
||||
res.add(varStmt)
|
||||
|
||||
# To control when the library has been initialized
|
||||
var initialized: Atomic[bool]
|
||||
## Android chronicles redirection
|
||||
let chroniclesBlock = quote:
|
||||
when defined(android) and compiles(defaultChroniclesStream.outputs[0].writer):
|
||||
defaultChroniclesStream.outputs[0].writer = proc(
|
||||
logLevel: LogLevel, msg: LogOutputStr
|
||||
) {.raises: [].} =
|
||||
echo logLevel, msg
|
||||
result.add(chroniclesBlock)
|
||||
|
||||
if defined(android):
|
||||
# Redirect chronicles to Android System logs
|
||||
when compiles(defaultChroniclesStream.outputs[0].writer):
|
||||
defaultChroniclesStream.outputs[0].writer = proc(
|
||||
logLevel: LogLevel, msg: LogOutputStr
|
||||
) {.raises: [].} =
|
||||
echo logLevel, msg
|
||||
let procName = ident("initializeLibrary")
|
||||
let nimMainName = ident("lib" & libraryName & "NimMain")
|
||||
|
||||
proc initializeLibrary() {.exported.} =
|
||||
if not initialized.exchange(true):
|
||||
## Every Nim library needs to call `<yourprefix>NimMain` once exactly, to initialize the Nim runtime.
|
||||
## Being `<yourprefix>` the value given in the optional compilation flag --nimMainPrefix:yourprefix
|
||||
libwakuNimMain()
|
||||
when declared(setupForeignThreadGc):
|
||||
setupForeignThreadGc()
|
||||
when declared(nimGC_setStackBottom):
|
||||
var locals {.volatile, noinit.}: pointer
|
||||
locals = addr(locals)
|
||||
nimGC_setStackBottom(locals)
|
||||
let initializeLibraryProc = quote:
|
||||
proc `procName`() {.exported.} =
|
||||
if not initialized.exchange(true):
|
||||
## Every Nim library needs to call `<yourprefix>NimMain` once exactly,
|
||||
## to initialize the Nim runtime.
|
||||
## Being `<yourprefix>` the value given in the optional
|
||||
## compilation flag --nimMainPrefix:yourprefix
|
||||
`nimMainName`()
|
||||
when declared(setupForeignThreadGc):
|
||||
setupForeignThreadGc()
|
||||
when declared(nimGC_setStackBottom):
|
||||
var locals {.volatile, noinit.}: pointer
|
||||
locals = addr(locals)
|
||||
nimGC_setStackBottom(locals)
|
||||
|
||||
res.add(initializeLibraryProc)
|
||||
|
||||
echo result.repr
|
||||
return res
|
||||
|
||||
@ -1,51 +1,258 @@
|
||||
import std/macros
|
||||
|
||||
import std/[macros, tables]
|
||||
import chronos
|
||||
import ../ffi_thread_request, ../ffi_types
|
||||
|
||||
macro ffi*(p: typed, args: varargs[untyped]): untyped =
|
||||
## p: the proc definition AST
|
||||
## args: expected to be (RequestTypeValue, MsgTypeValue)
|
||||
var registeredRequests {.compileTime.}: Table[uint, NimNode]
|
||||
|
||||
if p.kind != nnkProcDef:
|
||||
error("ffi pragma can only be applied to proc definitions", p)
|
||||
|
||||
if args.len != 2:
|
||||
error(
|
||||
"ffi pragma requires exactly two arguments: (RequestTypeValue, MsgTypeValue)", p
|
||||
)
|
||||
|
||||
let reqType = args[0]
|
||||
let msgType = args[1]
|
||||
|
||||
let origProc = p
|
||||
let origName = p.name
|
||||
let exportedName = origName
|
||||
|
||||
result = newStmtList()
|
||||
result.add(origProc)
|
||||
|
||||
# Build exported wrapper proc:
|
||||
let wrapperParams = newSeq[NimNode]()
|
||||
wrapperParams.add(newIdentDefs(ident("ctx"), newPtrType(ident("WakuContext"))))
|
||||
wrapperParams.add(newIdentDefs(ident("callback"), ident("WakuCallBack")))
|
||||
wrapperParams.add(newIdentDefs(ident("userData"), ident("pointer")))
|
||||
|
||||
let wrapperPragmas = nnkPragma.newTree(ident("dynlib"), ident("exportc"))
|
||||
proc fnv1aHash32*(s: string): uint =
|
||||
const
|
||||
FNV_offset_basis: uint = 2166136261'u32.uint
|
||||
FNV_prime: uint = 16777619'u32.uint
|
||||
var hash: uint = FNV_offset_basis
|
||||
for c in s:
|
||||
hash = hash xor uint(ord(c))
|
||||
hash = hash * FNV_prime
|
||||
return hash
|
||||
|
||||
proc generateProc(p: NimNode, args: varargs[NimNode]): NimNode =
|
||||
let wrapperPragmas =
|
||||
nnkPragma.newTree(ident("dynlib"), ident("exportc"), ident("cdecl"))
|
||||
let origBody = p.body
|
||||
let wrapperBody = quote:
|
||||
initializeLibrary()
|
||||
checkLibwakuParams(ctx, callback, userData)
|
||||
handleRequest(
|
||||
ctx, `reqType`, PeerManagementRequest.createShared(`msgType`), callback, userData
|
||||
)
|
||||
return 0.cint
|
||||
`origBody`
|
||||
|
||||
let origParams = p.params
|
||||
let wrapperProc = newProc(
|
||||
name = exportedName,
|
||||
params = wrapperParams,
|
||||
pragmas = wrapperPragmas,
|
||||
body = wrapperBody,
|
||||
returnType = ident("cint"),
|
||||
name = p.name, params = @[origParams], pragmas = wrapperPragmas, body = wrapperBody
|
||||
)
|
||||
|
||||
result.add(wrapperProc)
|
||||
var res = newStmtList()
|
||||
res.add(wrapperProc)
|
||||
return res
|
||||
|
||||
proc buildFfiNewReqProc(reqTypeName, body: NimNode): NimNode =
|
||||
# Standard FFI params
|
||||
var formalParams = newSeq[NimNode]()
|
||||
|
||||
var procNode: NimNode
|
||||
if body.kind == nnkStmtList and body.len == 1:
|
||||
procNode = body[0] # unwrap single statement
|
||||
else:
|
||||
procNode = body
|
||||
|
||||
if procNode.kind != nnkLambda:
|
||||
error "registerFFI expects a lambda definition. Found: " & $procNode.kind
|
||||
|
||||
# T: typedesc[CreateNodeRequest]
|
||||
let typedescParam = newIdentDefs(
|
||||
ident("T"), # param name
|
||||
nnkBracketExpr.newTree(ident("typedesc"), reqTypeName), # typedesc[T]
|
||||
)
|
||||
formalParams.add(typedescParam)
|
||||
|
||||
# Other fixed FFI params
|
||||
formalParams.add(newIdentDefs(ident("callback"), ident("FFICallBack")))
|
||||
formalParams.add(newIdentDefs(ident("userData"), ident("pointer")))
|
||||
|
||||
# Add original lambda params
|
||||
let procParams = procNode[3]
|
||||
for p in procParams[1 .. ^1]:
|
||||
formalParams.add(p)
|
||||
|
||||
# Build `ptr FFIThreadRequest`
|
||||
let retType = newNimNode(nnkPtrTy)
|
||||
retType.add(ident("FFIThreadRequest"))
|
||||
|
||||
formalParams = @[retType] & formalParams
|
||||
|
||||
# Build body
|
||||
let reqObjIdent = ident("reqObj")
|
||||
var newBody = newStmtList()
|
||||
newBody.add(
|
||||
quote do:
|
||||
var `reqObjIdent` = createShared(T)
|
||||
)
|
||||
|
||||
for p in procParams[1 .. ^1]:
|
||||
let fieldNameIdent = ident($p[0])
|
||||
let fieldTypeNode = p[1]
|
||||
|
||||
# Extract type name as string
|
||||
var typeStr: string
|
||||
if fieldTypeNode.kind == nnkIdent:
|
||||
typeStr = $fieldTypeNode
|
||||
elif fieldTypeNode.kind == nnkBracketExpr:
|
||||
typeStr = $fieldTypeNode[0] # e.g., `ptr` in `ptr[Waku]`
|
||||
else:
|
||||
typeStr = "" # fallback
|
||||
|
||||
# Apply .alloc() only to cstrings
|
||||
if typeStr == "cstring":
|
||||
newBody.add(
|
||||
quote do:
|
||||
`reqObjIdent`[].`fieldNameIdent` = `fieldNameIdent`.alloc()
|
||||
)
|
||||
else:
|
||||
newBody.add(
|
||||
quote do:
|
||||
`reqObjIdent`[].`fieldNameIdent` = `fieldNameIdent`
|
||||
)
|
||||
|
||||
# FFIThreadRequest.init using fnv1aHash32
|
||||
newBody.add(
|
||||
quote do:
|
||||
let typeStr = $T
|
||||
var ret =
|
||||
FFIThreadRequest.init(callback, userData, fnv1aHash32(typeStr), `reqObjIdent`)
|
||||
return ret
|
||||
)
|
||||
|
||||
# Build the proc node
|
||||
result = newProc(
|
||||
name = postfix(ident("ffiNewReq"), "*"),
|
||||
params = formalParams,
|
||||
body = newBody,
|
||||
pragmas = newEmptyNode(),
|
||||
)
|
||||
|
||||
echo result.repr
|
||||
|
||||
proc buildFfiDeleteReqProc(reqTypeName: NimNode): NimNode =
|
||||
## Generates something like:
|
||||
## proc ffiDeleteReq(self: ptr CreateNodeRequest) =
|
||||
## deallocShared(self[].configJson)
|
||||
## deallocShared(self)
|
||||
|
||||
result = newProc(
|
||||
name = ident("ffiDeleteReq"),
|
||||
params =
|
||||
@[
|
||||
newEmptyNode(), # return type is empty (void)
|
||||
newIdentDefs(ident("self"), nnkPtrTy.newTree(reqTypeName)),
|
||||
],
|
||||
body = newStmtList(
|
||||
nnkCall.newTree(
|
||||
ident("deallocShared"),
|
||||
nnkBracketExpr.newTree(
|
||||
nnkBracketExpr.newTree(ident("self"), newEmptyNode()), # self[]
|
||||
ident("configJson"),
|
||||
),
|
||||
),
|
||||
nnkCall.newTree(ident("deallocShared"), ident("self")),
|
||||
),
|
||||
procType = nnkProcDef,
|
||||
)
|
||||
|
||||
proc buildProcessFFIRequestProc(reqTypeName, reqHandler, body: NimNode): NimNode =
|
||||
## Builds:
|
||||
## proc processFFIRequest(T: typedesc[CreateNodeRequest];
|
||||
## configJson: cstring;
|
||||
## appCallbacks: AppCallbacks;
|
||||
## waku: ptr Waku) ...
|
||||
|
||||
# Require: ident: ptr SomeType
|
||||
if reqHandler.kind != nnkExprColonExpr:
|
||||
error(
|
||||
"Second argument must be a typed parameter, e.g., waku: ptr Waku. Found: " &
|
||||
$reqHandler.kind
|
||||
)
|
||||
|
||||
let rhs = reqHandler[1]
|
||||
if rhs.kind != nnkPtrTy:
|
||||
error("Second argument must be a pointer type, e.g., waku: ptr Waku")
|
||||
|
||||
var procNode = body
|
||||
if procNode.kind == nnkStmtList and procNode.len == 1:
|
||||
procNode = procNode[0]
|
||||
|
||||
if procNode.kind != nnkLambda:
|
||||
error "registerFFI expects a lambda definition. Found: " & $procNode.kind
|
||||
|
||||
var formalParams = newSeq[NimNode]()
|
||||
|
||||
# First param: T: typedesc[reqTypeName]
|
||||
let typedescParam =
|
||||
newIdentDefs(ident("T"), nnkBracketExpr.newTree(ident("typedesc"), reqTypeName))
|
||||
formalParams.add(typedescParam)
|
||||
|
||||
# Add original lambda params
|
||||
let procParams = procNode[3]
|
||||
for p in procParams[1 .. ^1]:
|
||||
formalParams.add(p)
|
||||
|
||||
# Add pointer handler param at the end
|
||||
formalParams.add(
|
||||
newIdentDefs(reqHandler[0], rhs) # e.g., waku: ptr Waku
|
||||
)
|
||||
|
||||
# Return type first
|
||||
formalParams = @[procParams[0]] & formalParams
|
||||
|
||||
# Pragmas
|
||||
let pragmas =
|
||||
if procNode.len >= 5:
|
||||
procNode[4]
|
||||
else:
|
||||
newEmptyNode()
|
||||
|
||||
# Body
|
||||
let bodyNode =
|
||||
if procNode.body.kind == nnkStmtList:
|
||||
procNode.body
|
||||
else:
|
||||
newStmtList(procNode.body)
|
||||
|
||||
# Build proc
|
||||
result = newProc(
|
||||
name = postfix(ident("processFFIRequest"), "*"),
|
||||
params = formalParams,
|
||||
body = bodyNode,
|
||||
procType = nnkProcDef,
|
||||
pragmas = pragmas,
|
||||
)
|
||||
echo result.repr
|
||||
|
||||
macro registerFFI*(reqTypeName, reqHandler, body: untyped): untyped =
|
||||
let ffiNewReqProc = buildFfiNewReqProc(reqTypeName, body)
|
||||
let processProc = buildProcessFFIRequestProc(reqTypeName, reqHandler, body)
|
||||
result = newStmtList(ffiNewReqProc, processProc)
|
||||
|
||||
macro ffiGenerateProcess*(): untyped =
|
||||
## Generates a case statement that handles all the possible registered FFI requests
|
||||
let reqParam = ident"request"
|
||||
let reqHandlerParam = ident"reqHandler"
|
||||
|
||||
var caseStmt =
|
||||
newTree(nnkCaseStmt, newDotExpr(newTree(nnkBracketExpr, reqParam), ident"reqId"))
|
||||
|
||||
for caseKey, typeIdent in registeredRequests.pairs:
|
||||
let castExpr = newTree(
|
||||
nnkCast,
|
||||
newTree(nnkPtrTy, typeIdent),
|
||||
newDotExpr(newTree(nnkBracketExpr, reqParam), ident"reqContent"),
|
||||
)
|
||||
let callExpr = newCall(ident"process", castExpr, reqHandlerParam)
|
||||
caseStmt.add newTree(nnkOfBranch, newLit(caseKey), nnkStmtList.newTree(callExpr))
|
||||
|
||||
caseStmt.add newTree(
|
||||
nnkElse,
|
||||
nnkStmtList.newTree(
|
||||
newCall(
|
||||
ident"nilProcess", newDotExpr(newTree(nnkBracketExpr, reqParam), ident"reqId")
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
let retFutSym = ident"retFut"
|
||||
|
||||
result = quote:
|
||||
proc process*[R](
|
||||
T: type FFIThreadRequest,
|
||||
`reqParam`: ptr FFIThreadRequest,
|
||||
`reqHandlerParam`: ptr R,
|
||||
) {.async.} =
|
||||
let `retFutSym` = `caseStmt`
|
||||
handleRes(await `retFutSym`, `reqParam`)
|
||||
|
||||
echo result.repr
|
||||
|
||||
@ -1,18 +0,0 @@
|
||||
type LifeCycleRequest {.ffi.} = object
|
||||
discard
|
||||
|
||||
type PeerManagerRequest {.ffi.} = object
|
||||
discard
|
||||
|
||||
type PeerManagerRequest* = object
|
||||
reqType: 12123491 ## random int
|
||||
reqContent: pointer
|
||||
callback: WakuCallBack
|
||||
userData: pointer
|
||||
|
||||
## createShared
|
||||
## process
|
||||
## deallocShared
|
||||
##
|
||||
##
|
||||
##
|
||||
@ -1,104 +0,0 @@
|
||||
## This file contains the base message request type that will be handled.
|
||||
## The requests are created by the main thread and processed by
|
||||
## the Waku Thread.
|
||||
|
||||
import std/json, results
|
||||
import chronos, chronos/threadsync
|
||||
import
|
||||
../../waku/factory/waku,
|
||||
../ffi_types,
|
||||
./requests/node_lifecycle_request,
|
||||
./requests/peer_manager_request,
|
||||
./requests/protocols/relay_request,
|
||||
./requests/protocols/store_request,
|
||||
./requests/protocols/lightpush_request,
|
||||
./requests/protocols/filter_request,
|
||||
./requests/debug_node_request,
|
||||
./requests/discovery_request,
|
||||
./requests/ping_request
|
||||
|
||||
type RequestType* {.pure.} = enum
|
||||
LIFECYCLE
|
||||
PEER_MANAGER
|
||||
PING
|
||||
RELAY
|
||||
STORE
|
||||
DEBUG
|
||||
DISCOVERY
|
||||
LIGHTPUSH
|
||||
FILTER
|
||||
|
||||
type FFIThreadRequest* = object
|
||||
reqType: RequestType
|
||||
reqContent: pointer
|
||||
callback: FFICallBack
|
||||
userData: pointer
|
||||
|
||||
proc createShared*(
|
||||
T: type FFIThreadRequest,
|
||||
reqType: RequestType,
|
||||
reqContent: pointer,
|
||||
callback: FFICallBack,
|
||||
userData: pointer,
|
||||
): ptr type T =
|
||||
var ret = createShared(T)
|
||||
ret[].reqType = reqType
|
||||
ret[].reqContent = reqContent
|
||||
ret[].callback = callback
|
||||
ret[].userData = userData
|
||||
return ret
|
||||
|
||||
proc handleRes[T: string | void](
|
||||
res: Result[T, string], request: ptr FFIThreadRequest
|
||||
) =
|
||||
## Handles the Result responses, which can either be Result[string, string] or
|
||||
## Result[void, string].
|
||||
|
||||
defer:
|
||||
deallocShared(request)
|
||||
|
||||
if res.isErr():
|
||||
foreignThreadGc:
|
||||
let msg = "libwaku error: handleRes fireSyncRes error: " & $res.error
|
||||
request[].callback(
|
||||
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData
|
||||
)
|
||||
return
|
||||
|
||||
foreignThreadGc:
|
||||
var msg: cstring = ""
|
||||
when T is string:
|
||||
msg = res.get().cstring()
|
||||
request[].callback(
|
||||
RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData
|
||||
)
|
||||
return
|
||||
|
||||
proc process*(
|
||||
T: type FFIThreadRequest, request: ptr FFIThreadRequest, waku: ptr Waku
|
||||
) {.async.} =
|
||||
let retFut =
|
||||
case request[].reqType
|
||||
of LIFECYCLE:
|
||||
cast[ptr NodeLifecycleRequest](request[].reqContent).process(waku)
|
||||
of PEER_MANAGER:
|
||||
cast[ptr PeerManagementRequest](request[].reqContent).process(waku[])
|
||||
of PING:
|
||||
cast[ptr PingRequest](request[].reqContent).process(waku)
|
||||
of RELAY:
|
||||
cast[ptr RelayRequest](request[].reqContent).process(waku)
|
||||
of STORE:
|
||||
cast[ptr StoreRequest](request[].reqContent).process(waku)
|
||||
of DEBUG:
|
||||
cast[ptr DebugNodeRequest](request[].reqContent).process(waku[])
|
||||
of DISCOVERY:
|
||||
cast[ptr DiscoveryRequest](request[].reqContent).process(waku)
|
||||
of LIGHTPUSH:
|
||||
cast[ptr LightpushRequest](request[].reqContent).process(waku)
|
||||
of FILTER:
|
||||
cast[ptr FilterRequest](request[].reqContent).process(waku)
|
||||
|
||||
handleRes(await retFut, request)
|
||||
|
||||
proc `$`*(self: FFIThreadRequest): string =
|
||||
return $self.reqType
|
||||
Loading…
x
Reference in New Issue
Block a user