First commit

This commit is contained in:
Ivan Folgueira Bande 2025-08-09 22:56:44 +02:00
parent 04fd9f0f6d
commit bbddf6925b
No known key found for this signature in database
GPG Key ID: 3C117481F89E24A7
10 changed files with 518 additions and 0 deletions

4
config.nims Normal file
View File

@ -0,0 +1,4 @@
# begin Nimble config (version 1)
when fileExists("nimble.paths"):
include "nimble.paths"
# end Nimble config

3
ffi.nim Normal file
View File

@ -0,0 +1,3 @@
import ffi/alloc
export alloc

20
ffi.nimble Normal file
View File

@ -0,0 +1,20 @@
# ffi.nimble
version = "0.1.0"
author = "Institute of Free Technology"
description = "FFI framework with custom header generation"
license = "MIT or Apache License 2.0"
packageName = "ffi"
requires "nim >= 2.2.4"
"chronos"
# Source files to include
# srcDir = "src"
# installFiles = @["src/ffi.nim", "mylib.h"]
# # 💡 Custom build step before installation
# before install:
# echo "Generating custom C header..."
# exec "nim r tools/gen_header.nim"

42
ffi/alloc.nim Normal file
View File

@ -0,0 +1,42 @@
## Can be shared safely between threads
type SharedSeq*[T] = tuple[data: ptr UncheckedArray[T], len: int]
proc alloc*(str: cstring): cstring =
# Byte allocation from the given address.
# There should be the corresponding manual deallocation with deallocShared !
if str.isNil():
var ret = cast[cstring](allocShared(1)) # Allocate memory for the null terminator
ret[0] = '\0' # Set the null terminator
return ret
let ret = cast[cstring](allocShared(len(str) + 1))
copyMem(ret, str, len(str) + 1)
return ret
proc alloc*(str: string): cstring =
## Byte allocation from the given address.
## There should be the corresponding manual deallocation with deallocShared !
var ret = cast[cstring](allocShared(str.len + 1))
let s = cast[seq[char]](str)
for i in 0 ..< str.len:
ret[i] = s[i]
ret[str.len] = '\0'
return ret
proc allocSharedSeq*[T](s: seq[T]): SharedSeq[T] =
let data = allocShared(sizeof(T) * s.len)
if s.len != 0:
copyMem(data, unsafeAddr s[0], s.len)
return (cast[ptr UncheckedArray[T]](data), s.len)
proc deallocSharedSeq*[T](s: var SharedSeq[T]) =
deallocShared(s.data)
s.len = 0
proc toSeq*[T](s: SharedSeq[T]): seq[T] =
## Creates a seq[T] from a SharedSeq[T]. No explicit dealloc is required
## as req[T] is a GC managed type.
var ret = newSeq[T]()
for i in 0 ..< s.len:
ret.add(s.data[i])
return ret

195
ffi/ffi_context.nim Normal file
View File

@ -0,0 +1,195 @@
{.pragma: exported, exportc, cdecl, raises: [].}
{.pragma: callback, cdecl, raises: [], gcsafe.}
{.passc: "-fPIC".}
import std/[options, atomics, os, net, locks]
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
import ./ffi_types
type FFIContext* = object
libraryName: cstring
ffiThread: Thread[(ptr FFIContext)]
# represents the main thread in charge of attending SDK consumer actions
watchdogThread: Thread[(ptr FFIContext)]
# monitors the FFI thread and notifies the FFI SDK consumer if it hangs
lock: Lock
reqChannel: ChannelSPSCSingle[ptr FFIThreadRequest]
reqSignal: ThreadSignalPtr # to notify the FFI Thread that a new request is sent
reqReceivedSignal: ThreadSignalPtr
# to signal main thread, interfacing with the FFI thread, that FFI thread received the request
userData*: pointer
eventCallback*: pointer
eventUserdata*: pointer
running: Atomic[bool] # To control when the threads are running
const git_version* {.strdefine.} = "n/a"
template callEventCallback(ctx: ptr FFIContext, eventName: string, body: untyped) =
if isNil(ctx[].eventCallback):
error eventName & " - eventCallback is nil"
return
foreignThreadGc:
try:
let event = body
cast[FFICallBack](ctx[].eventCallback)(
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData
)
except Exception, CatchableError:
let msg =
"Exception " & eventName & " when calling 'eventCallBack': " &
getCurrentExceptionMsg()
cast[FFICallBack](ctx[].eventCallback)(
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
)
proc sendRequestToWakuThread*(
ctx: ptr FFIContext,
reqType: RequestType,
reqContent: pointer,
callback: FFICallBack,
userData: pointer,
timeout = InfiniteDuration,
): Result[void, string] =
ctx.lock.acquire()
# This lock is only necessary while we use a SP Channel and while the signalling
# between threads assumes that there aren't concurrent requests.
# Rearchitecting the signaling + migrating to a MP Channel will allow us to receive
# requests concurrently and spare us the need of locks
defer:
ctx.lock.release()
let req = FFIThreadRequest.createShared(reqType, reqContent, callback, userData)
## Sending the request
let sentOk = ctx.reqChannel.trySend(req)
if not sentOk:
deallocShared(req)
return err("Couldn't send a request to the waku thread: " & $req[])
let fireSyncRes = ctx.reqSignal.fireSync()
if fireSyncRes.isErr():
deallocShared(req)
return err("failed fireSync: " & $fireSyncRes.error)
if fireSyncRes.get() == false:
deallocShared(req)
return err("Couldn't fireSync in time")
## wait until the Waku Thread properly received the request
let res = ctx.reqReceivedSignal.waitSync(timeout)
if res.isErr():
deallocShared(req)
return err("Couldn't receive reqReceivedSignal signal")
## Notice that in case of "ok", the deallocShared(req) is performed by the Waku Thread in the
## process proc. See the 'waku_thread_request.nim' module for more details.
ok()
proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} =
## Watchdog thread that monitors the Waku thread and notifies the library user if it hangs.
let watchdogRun = proc(ctx: ptr FFIContext) {.async.} =
const WatchdogStartDelay = 10.seconds
const WatchdogTimeinterval = 1.seconds
const WakuNotRespondingTimeout = 3.seconds
# Give time for the node to be created and up before sending watchdog requests
await sleepAsync(WatchdogStartDelay)
while true:
await sleepAsync(WatchdogTimeinterval)
if ctx.running.load == false:
debug "Watchdog thread exiting because FFIContext is not running"
break
let wakuCallback = proc(
callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
discard ## Don't do anything. Just respecting the callback signature.
const nilUserData = nil
trace "Sending watchdog request to FFI thread"
sendRequestToWakuThread(
ctx,
RequestType.DEBUG,
DebugNodeRequest.createShared(DebugNodeMsgType.CHECK_WAKU_NOT_BLOCKED),
wakuCallback,
nilUserData,
WakuNotRespondingTimeout,
).isOkOr:
error "Failed to send watchdog request to FFI thread", error = $error
onWakuNotResponding(ctx)
waitFor watchdogRun(ctx)
proc ffiThreadBody(ctx: ptr FFIContext) {.thread.} =
## FFI thread that attends library user API requests
let ffiRun = proc(ctx: ptr FFIContext) {.async.} =
var waku: Waku
while true:
await ctx.reqSignal.wait()
if ctx.running.load == false:
break
## Wait for a request from the ffi consumer thread
var request: ptr FFIThreadRequest
let recvOk = ctx.reqChannel.tryRecv(request)
if not recvOk:
error "ffi thread could not receive a request"
continue
## Handle the request
asyncSpawn FFIThreadRequest.process(request, addr waku)
let fireRes = ctx.reqReceivedSignal.fireSync()
if fireRes.isErr():
error "could not fireSync back to requester thread", error = fireRes.error
waitFor ffiRun(ctx)
proc createFFIContext*(libraryName: cstring): Result[ptr FFIContext, string] =
## This proc is called from the main thread and it creates
## the FFI working thread.
var ctx = createShared(FFIContext, 1)
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqSignal ThreadSignalPtr")
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqReceivedSignal ThreadSignalPtr")
ctx.lock.initLock()
ctx.running.store(true)
try:
createThread(ctx.ffiThread, ffiThreadBody, ctx)
except ValueError, ResourceExhaustedError:
freeShared(ctx)
return err("failed to create the Waku thread: " & getCurrentExceptionMsg())
try:
createThread(ctx.watchdogThread, watchdogThreadBody, ctx)
except ValueError, ResourceExhaustedError:
freeShared(ctx)
return err("failed to create the watchdog thread: " & getCurrentExceptionMsg())
return ok(ctx)
proc destroyFFIContext*(ctx: ptr FFIContext): Result[void, string] =
ctx.running.store(false)
let signaledOnTime = ctx.reqSignal.fireSync().valueOr:
return err("error in destroyFFIContext: " & $error)
if not signaledOnTime:
return err("failed to signal reqSignal on time in destroyFFIContext")
joinThread(ctx.ffiThread)
joinThread(ctx.watchdogThread)
ctx.lock.deinitLock()
?ctx.reqSignal.close()
?ctx.reqReceivedSignal.close()
deallocShared(ctx.libraryName)
freeShared(ctx)
return ok()

30
ffi/ffi_types.nim Normal file
View File

@ -0,0 +1,30 @@
################################################################################
### Exported types
type FFICallBack* = proc(
callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].}
const RET_OK*: cint = 0
const RET_ERR*: cint = 1
const RET_MISSING_CALLBACK*: cint = 2
### End of exported types
################################################################################
################################################################################
### FFI utils
template foreignThreadGc*(body: untyped) =
when declared(setupForeignThreadGc):
setupForeignThreadGc()
body
when declared(tearDownForeignThreadGc):
tearDownForeignThreadGc()
type onDone* = proc()
### End of FFI utils
################################################################################

View File

@ -0,0 +1,51 @@
import std/macros, strformat
macro declareLibrary*(libraryName: static[string]): untyped =
result = newStmtList()
{.pragma: exported, exportc, cdecl, raises: [].}
{.pragma: callback, cdecl, raises: [], gcsafe.}
{.passc: "-fPIC".}
when defined(linux):
## Generates {.passl: "-Wl,-soname,libwaku.so".}
let soName = fmt"-Wl,-soname,lib{libraryName}.so"
result.add(nnkPragmaStmt.newTree(ident"passl", newStrLitNode(soName)))
## proc lib{libraryName}NimMain() {.importc.}
let procName = ident(fmt"lib{libraryName}NimMain")
let importcPragma = nnkPragma.newTree(ident"importc")
let procDef = newProc(
name = procName,
params = @[],
pragmas = importcPragma,
body = newEmptyNode(),
returnType = newEmptyNode(), # no return value
)
result.add(procDef)
################################################################################
### Library setup
# To control when the library has been initialized
var initialized: Atomic[bool]
if defined(android):
# Redirect chronicles to Android System logs
when compiles(defaultChroniclesStream.outputs[0].writer):
defaultChroniclesStream.outputs[0].writer = proc(
logLevel: LogLevel, msg: LogOutputStr
) {.raises: [].} =
echo logLevel, msg
proc initializeLibrary() {.exported.} =
if not initialized.exchange(true):
## Every Nim library needs to call `<yourprefix>NimMain` once exactly, to initialize the Nim runtime.
## Being `<yourprefix>` the value given in the optional compilation flag --nimMainPrefix:yourprefix
libwakuNimMain()
when declared(setupForeignThreadGc):
setupForeignThreadGc()
when declared(nimGC_setStackBottom):
var locals {.volatile, noinit.}: pointer
locals = addr(locals)
nimGC_setStackBottom(locals)

View File

@ -0,0 +1,51 @@
import std/macros
import chronos
macro ffi*(p: typed, args: varargs[untyped]): untyped =
## p: the proc definition AST
## args: expected to be (RequestTypeValue, MsgTypeValue)
if p.kind != nnkProcDef:
error("ffi pragma can only be applied to proc definitions", p)
if args.len != 2:
error(
"ffi pragma requires exactly two arguments: (RequestTypeValue, MsgTypeValue)", p
)
let reqType = args[0]
let msgType = args[1]
let origProc = p
let origName = p.name
let exportedName = origName
result = newStmtList()
result.add(origProc)
# Build exported wrapper proc:
let wrapperParams = newSeq[NimNode]()
wrapperParams.add(newIdentDefs(ident("ctx"), newPtrType(ident("WakuContext"))))
wrapperParams.add(newIdentDefs(ident("callback"), ident("WakuCallBack")))
wrapperParams.add(newIdentDefs(ident("userData"), ident("pointer")))
let wrapperPragmas = nnkPragma.newTree(ident("dynlib"), ident("exportc"))
let wrapperBody = quote:
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx, `reqType`, PeerManagementRequest.createShared(`msgType`), callback, userData
)
return 0.cint
let wrapperProc = newProc(
name = exportedName,
params = wrapperParams,
pragmas = wrapperPragmas,
body = wrapperBody,
returnType = ident("cint"),
)
result.add(wrapperProc)

View File

@ -0,0 +1,18 @@
type LifeCycleRequest {.ffi.} = object
discard
type PeerManagerRequest {.ffi.} = object
discard
type PeerManagerRequest* = object
reqType: 12123491 ## random int
reqContent: pointer
callback: WakuCallBack
userData: pointer
## createShared
## process
## deallocShared
##
##
##

View File

@ -0,0 +1,104 @@
## This file contains the base message request type that will be handled.
## The requests are created by the main thread and processed by
## the Waku Thread.
import std/json, results
import chronos, chronos/threadsync
import
../../waku/factory/waku,
../ffi_types,
./requests/node_lifecycle_request,
./requests/peer_manager_request,
./requests/protocols/relay_request,
./requests/protocols/store_request,
./requests/protocols/lightpush_request,
./requests/protocols/filter_request,
./requests/debug_node_request,
./requests/discovery_request,
./requests/ping_request
type RequestType* {.pure.} = enum
LIFECYCLE
PEER_MANAGER
PING
RELAY
STORE
DEBUG
DISCOVERY
LIGHTPUSH
FILTER
type FFIThreadRequest* = object
reqType: RequestType
reqContent: pointer
callback: FFICallBack
userData: pointer
proc createShared*(
T: type FFIThreadRequest,
reqType: RequestType,
reqContent: pointer,
callback: FFICallBack,
userData: pointer,
): ptr type T =
var ret = createShared(T)
ret[].reqType = reqType
ret[].reqContent = reqContent
ret[].callback = callback
ret[].userData = userData
return ret
proc handleRes[T: string | void](
res: Result[T, string], request: ptr FFIThreadRequest
) =
## Handles the Result responses, which can either be Result[string, string] or
## Result[void, string].
defer:
deallocShared(request)
if res.isErr():
foreignThreadGc:
let msg = "libwaku error: handleRes fireSyncRes error: " & $res.error
request[].callback(
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData
)
return
foreignThreadGc:
var msg: cstring = ""
when T is string:
msg = res.get().cstring()
request[].callback(
RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData
)
return
proc process*(
T: type FFIThreadRequest, request: ptr FFIThreadRequest, waku: ptr Waku
) {.async.} =
let retFut =
case request[].reqType
of LIFECYCLE:
cast[ptr NodeLifecycleRequest](request[].reqContent).process(waku)
of PEER_MANAGER:
cast[ptr PeerManagementRequest](request[].reqContent).process(waku[])
of PING:
cast[ptr PingRequest](request[].reqContent).process(waku)
of RELAY:
cast[ptr RelayRequest](request[].reqContent).process(waku)
of STORE:
cast[ptr StoreRequest](request[].reqContent).process(waku)
of DEBUG:
cast[ptr DebugNodeRequest](request[].reqContent).process(waku[])
of DISCOVERY:
cast[ptr DiscoveryRequest](request[].reqContent).process(waku)
of LIGHTPUSH:
cast[ptr LightpushRequest](request[].reqContent).process(waku)
of FILTER:
cast[ptr FilterRequest](request[].reqContent).process(waku)
handleRes(await retFut, request)
proc `$`*(self: FFIThreadRequest): string =
return $self.reqType