diff --git a/config.nims b/config.nims new file mode 100644 index 0000000..fce8f96 --- /dev/null +++ b/config.nims @@ -0,0 +1,4 @@ +# begin Nimble config (version 1) +when fileExists("nimble.paths"): + include "nimble.paths" +# end Nimble config \ No newline at end of file diff --git a/ffi.nim b/ffi.nim new file mode 100644 index 0000000..a9c983c --- /dev/null +++ b/ffi.nim @@ -0,0 +1,3 @@ +import ffi/alloc + +export alloc diff --git a/ffi.nimble b/ffi.nimble new file mode 100644 index 0000000..edc0799 --- /dev/null +++ b/ffi.nimble @@ -0,0 +1,20 @@ +# ffi.nimble + +version = "0.1.0" +author = "Institute of Free Technology" +description = "FFI framework with custom header generation" +license = "MIT or Apache License 2.0" + +packageName = "ffi" + +requires "nim >= 2.2.4" + "chronos" + +# Source files to include +# srcDir = "src" +# installFiles = @["src/ffi.nim", "mylib.h"] + +# # 💡 Custom build step before installation +# before install: +# echo "Generating custom C header..." +# exec "nim r tools/gen_header.nim" diff --git a/ffi/alloc.nim b/ffi/alloc.nim new file mode 100644 index 0000000..1a6f118 --- /dev/null +++ b/ffi/alloc.nim @@ -0,0 +1,42 @@ +## Can be shared safely between threads +type SharedSeq*[T] = tuple[data: ptr UncheckedArray[T], len: int] + +proc alloc*(str: cstring): cstring = + # Byte allocation from the given address. + # There should be the corresponding manual deallocation with deallocShared ! + if str.isNil(): + var ret = cast[cstring](allocShared(1)) # Allocate memory for the null terminator + ret[0] = '\0' # Set the null terminator + return ret + + let ret = cast[cstring](allocShared(len(str) + 1)) + copyMem(ret, str, len(str) + 1) + return ret + +proc alloc*(str: string): cstring = + ## Byte allocation from the given address. + ## There should be the corresponding manual deallocation with deallocShared ! + var ret = cast[cstring](allocShared(str.len + 1)) + let s = cast[seq[char]](str) + for i in 0 ..< str.len: + ret[i] = s[i] + ret[str.len] = '\0' + return ret + +proc allocSharedSeq*[T](s: seq[T]): SharedSeq[T] = + let data = allocShared(sizeof(T) * s.len) + if s.len != 0: + copyMem(data, unsafeAddr s[0], s.len) + return (cast[ptr UncheckedArray[T]](data), s.len) + +proc deallocSharedSeq*[T](s: var SharedSeq[T]) = + deallocShared(s.data) + s.len = 0 + +proc toSeq*[T](s: SharedSeq[T]): seq[T] = + ## Creates a seq[T] from a SharedSeq[T]. No explicit dealloc is required + ## as req[T] is a GC managed type. + var ret = newSeq[T]() + for i in 0 ..< s.len: + ret.add(s.data[i]) + return ret diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim new file mode 100644 index 0000000..e09308d --- /dev/null +++ b/ffi/ffi_context.nim @@ -0,0 +1,195 @@ +{.pragma: exported, exportc, cdecl, raises: [].} +{.pragma: callback, cdecl, raises: [], gcsafe.} +{.passc: "-fPIC".} + +import std/[options, atomics, os, net, locks] +import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results +import ./ffi_types + +type FFIContext* = object + libraryName: cstring + ffiThread: Thread[(ptr FFIContext)] + # represents the main thread in charge of attending SDK consumer actions + watchdogThread: Thread[(ptr FFIContext)] + # monitors the FFI thread and notifies the FFI SDK consumer if it hangs + lock: Lock + reqChannel: ChannelSPSCSingle[ptr FFIThreadRequest] + reqSignal: ThreadSignalPtr # to notify the FFI Thread that a new request is sent + reqReceivedSignal: ThreadSignalPtr + # to signal main thread, interfacing with the FFI thread, that FFI thread received the request + userData*: pointer + eventCallback*: pointer + eventUserdata*: pointer + running: Atomic[bool] # To control when the threads are running + +const git_version* {.strdefine.} = "n/a" + +template callEventCallback(ctx: ptr FFIContext, eventName: string, body: untyped) = + if isNil(ctx[].eventCallback): + error eventName & " - eventCallback is nil" + return + + foreignThreadGc: + try: + let event = body + cast[FFICallBack](ctx[].eventCallback)( + RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData + ) + except Exception, CatchableError: + let msg = + "Exception " & eventName & " when calling 'eventCallBack': " & + getCurrentExceptionMsg() + cast[FFICallBack](ctx[].eventCallback)( + RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData + ) + +proc sendRequestToWakuThread*( + ctx: ptr FFIContext, + reqType: RequestType, + reqContent: pointer, + callback: FFICallBack, + userData: pointer, + timeout = InfiniteDuration, +): Result[void, string] = + ctx.lock.acquire() + # This lock is only necessary while we use a SP Channel and while the signalling + # between threads assumes that there aren't concurrent requests. + # Rearchitecting the signaling + migrating to a MP Channel will allow us to receive + # requests concurrently and spare us the need of locks + defer: + ctx.lock.release() + + let req = FFIThreadRequest.createShared(reqType, reqContent, callback, userData) + ## Sending the request + let sentOk = ctx.reqChannel.trySend(req) + if not sentOk: + deallocShared(req) + return err("Couldn't send a request to the waku thread: " & $req[]) + + let fireSyncRes = ctx.reqSignal.fireSync() + if fireSyncRes.isErr(): + deallocShared(req) + return err("failed fireSync: " & $fireSyncRes.error) + + if fireSyncRes.get() == false: + deallocShared(req) + return err("Couldn't fireSync in time") + + ## wait until the Waku Thread properly received the request + let res = ctx.reqReceivedSignal.waitSync(timeout) + if res.isErr(): + deallocShared(req) + return err("Couldn't receive reqReceivedSignal signal") + + ## Notice that in case of "ok", the deallocShared(req) is performed by the Waku Thread in the + ## process proc. See the 'waku_thread_request.nim' module for more details. + ok() + +proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} = + ## Watchdog thread that monitors the Waku thread and notifies the library user if it hangs. + + let watchdogRun = proc(ctx: ptr FFIContext) {.async.} = + const WatchdogStartDelay = 10.seconds + const WatchdogTimeinterval = 1.seconds + const WakuNotRespondingTimeout = 3.seconds + + # Give time for the node to be created and up before sending watchdog requests + await sleepAsync(WatchdogStartDelay) + while true: + await sleepAsync(WatchdogTimeinterval) + + if ctx.running.load == false: + debug "Watchdog thread exiting because FFIContext is not running" + break + + let wakuCallback = proc( + callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer + ) {.cdecl, gcsafe, raises: [].} = + discard ## Don't do anything. Just respecting the callback signature. + const nilUserData = nil + + trace "Sending watchdog request to FFI thread" + + sendRequestToWakuThread( + ctx, + RequestType.DEBUG, + DebugNodeRequest.createShared(DebugNodeMsgType.CHECK_WAKU_NOT_BLOCKED), + wakuCallback, + nilUserData, + WakuNotRespondingTimeout, + ).isOkOr: + error "Failed to send watchdog request to FFI thread", error = $error + onWakuNotResponding(ctx) + + waitFor watchdogRun(ctx) + +proc ffiThreadBody(ctx: ptr FFIContext) {.thread.} = + ## FFI thread that attends library user API requests + + let ffiRun = proc(ctx: ptr FFIContext) {.async.} = + var waku: Waku + while true: + await ctx.reqSignal.wait() + + if ctx.running.load == false: + break + + ## Wait for a request from the ffi consumer thread + var request: ptr FFIThreadRequest + let recvOk = ctx.reqChannel.tryRecv(request) + if not recvOk: + error "ffi thread could not receive a request" + continue + + ## Handle the request + asyncSpawn FFIThreadRequest.process(request, addr waku) + + let fireRes = ctx.reqReceivedSignal.fireSync() + if fireRes.isErr(): + error "could not fireSync back to requester thread", error = fireRes.error + + waitFor ffiRun(ctx) + +proc createFFIContext*(libraryName: cstring): Result[ptr FFIContext, string] = + ## This proc is called from the main thread and it creates + ## the FFI working thread. + var ctx = createShared(FFIContext, 1) + ctx.reqSignal = ThreadSignalPtr.new().valueOr: + return err("couldn't create reqSignal ThreadSignalPtr") + ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: + return err("couldn't create reqReceivedSignal ThreadSignalPtr") + ctx.lock.initLock() + + ctx.running.store(true) + + try: + createThread(ctx.ffiThread, ffiThreadBody, ctx) + except ValueError, ResourceExhaustedError: + freeShared(ctx) + return err("failed to create the Waku thread: " & getCurrentExceptionMsg()) + + try: + createThread(ctx.watchdogThread, watchdogThreadBody, ctx) + except ValueError, ResourceExhaustedError: + freeShared(ctx) + return err("failed to create the watchdog thread: " & getCurrentExceptionMsg()) + + return ok(ctx) + +proc destroyFFIContext*(ctx: ptr FFIContext): Result[void, string] = + ctx.running.store(false) + + let signaledOnTime = ctx.reqSignal.fireSync().valueOr: + return err("error in destroyFFIContext: " & $error) + if not signaledOnTime: + return err("failed to signal reqSignal on time in destroyFFIContext") + + joinThread(ctx.ffiThread) + joinThread(ctx.watchdogThread) + ctx.lock.deinitLock() + ?ctx.reqSignal.close() + ?ctx.reqReceivedSignal.close() + deallocShared(ctx.libraryName) + freeShared(ctx) + + return ok() diff --git a/ffi/ffi_types.nim b/ffi/ffi_types.nim new file mode 100644 index 0000000..f57a230 --- /dev/null +++ b/ffi/ffi_types.nim @@ -0,0 +1,30 @@ +################################################################################ +### Exported types + +type FFICallBack* = proc( + callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer +) {.cdecl, gcsafe, raises: [].} + +const RET_OK*: cint = 0 +const RET_ERR*: cint = 1 +const RET_MISSING_CALLBACK*: cint = 2 + +### End of exported types +################################################################################ + +################################################################################ +### FFI utils + +template foreignThreadGc*(body: untyped) = + when declared(setupForeignThreadGc): + setupForeignThreadGc() + + body + + when declared(tearDownForeignThreadGc): + tearDownForeignThreadGc() + +type onDone* = proc() + +### End of FFI utils +################################################################################ diff --git a/ffi/internal/ffi_library.nim b/ffi/internal/ffi_library.nim new file mode 100644 index 0000000..04d1dee --- /dev/null +++ b/ffi/internal/ffi_library.nim @@ -0,0 +1,51 @@ +import std/macros, strformat + +macro declareLibrary*(libraryName: static[string]): untyped = + result = newStmtList() + + {.pragma: exported, exportc, cdecl, raises: [].} + {.pragma: callback, cdecl, raises: [], gcsafe.} + {.passc: "-fPIC".} + + when defined(linux): + ## Generates {.passl: "-Wl,-soname,libwaku.so".} + let soName = fmt"-Wl,-soname,lib{libraryName}.so" + result.add(nnkPragmaStmt.newTree(ident"passl", newStrLitNode(soName))) + + ## proc lib{libraryName}NimMain() {.importc.} + let procName = ident(fmt"lib{libraryName}NimMain") + let importcPragma = nnkPragma.newTree(ident"importc") + let procDef = newProc( + name = procName, + params = @[], + pragmas = importcPragma, + body = newEmptyNode(), + returnType = newEmptyNode(), # no return value + ) + result.add(procDef) + +################################################################################ +### Library setup + +# To control when the library has been initialized +var initialized: Atomic[bool] + +if defined(android): + # Redirect chronicles to Android System logs + when compiles(defaultChroniclesStream.outputs[0].writer): + defaultChroniclesStream.outputs[0].writer = proc( + logLevel: LogLevel, msg: LogOutputStr + ) {.raises: [].} = + echo logLevel, msg + +proc initializeLibrary() {.exported.} = + if not initialized.exchange(true): + ## Every Nim library needs to call `NimMain` once exactly, to initialize the Nim runtime. + ## Being `` the value given in the optional compilation flag --nimMainPrefix:yourprefix + libwakuNimMain() + when declared(setupForeignThreadGc): + setupForeignThreadGc() + when declared(nimGC_setStackBottom): + var locals {.volatile, noinit.}: pointer + locals = addr(locals) + nimGC_setStackBottom(locals) diff --git a/ffi/internal/ffi_macro.nim b/ffi/internal/ffi_macro.nim new file mode 100644 index 0000000..ad532d5 --- /dev/null +++ b/ffi/internal/ffi_macro.nim @@ -0,0 +1,51 @@ +import std/macros + +import chronos + +macro ffi*(p: typed, args: varargs[untyped]): untyped = + ## p: the proc definition AST + ## args: expected to be (RequestTypeValue, MsgTypeValue) + + if p.kind != nnkProcDef: + error("ffi pragma can only be applied to proc definitions", p) + + if args.len != 2: + error( + "ffi pragma requires exactly two arguments: (RequestTypeValue, MsgTypeValue)", p + ) + + let reqType = args[0] + let msgType = args[1] + + let origProc = p + let origName = p.name + let exportedName = origName + + result = newStmtList() + result.add(origProc) + + # Build exported wrapper proc: + let wrapperParams = newSeq[NimNode]() + wrapperParams.add(newIdentDefs(ident("ctx"), newPtrType(ident("WakuContext")))) + wrapperParams.add(newIdentDefs(ident("callback"), ident("WakuCallBack"))) + wrapperParams.add(newIdentDefs(ident("userData"), ident("pointer"))) + + let wrapperPragmas = nnkPragma.newTree(ident("dynlib"), ident("exportc")) + + let wrapperBody = quote: + initializeLibrary() + checkLibwakuParams(ctx, callback, userData) + handleRequest( + ctx, `reqType`, PeerManagementRequest.createShared(`msgType`), callback, userData + ) + return 0.cint + + let wrapperProc = newProc( + name = exportedName, + params = wrapperParams, + pragmas = wrapperPragmas, + body = wrapperBody, + returnType = ident("cint"), + ) + + result.add(wrapperProc) diff --git a/ffi/thread_requests/ffi_lifecycle_request.nim b/ffi/thread_requests/ffi_lifecycle_request.nim new file mode 100644 index 0000000..fc6d5de --- /dev/null +++ b/ffi/thread_requests/ffi_lifecycle_request.nim @@ -0,0 +1,18 @@ +type LifeCycleRequest {.ffi.} = object + discard + +type PeerManagerRequest {.ffi.} = object + discard + +type PeerManagerRequest* = object + reqType: 12123491 ## random int + reqContent: pointer + callback: WakuCallBack + userData: pointer + +## createShared +## process +## deallocShared +## +## +## \ No newline at end of file diff --git a/ffi/thread_requests/ffi_thread_request.nim b/ffi/thread_requests/ffi_thread_request.nim new file mode 100644 index 0000000..bdaaa22 --- /dev/null +++ b/ffi/thread_requests/ffi_thread_request.nim @@ -0,0 +1,104 @@ +## This file contains the base message request type that will be handled. +## The requests are created by the main thread and processed by +## the Waku Thread. + +import std/json, results +import chronos, chronos/threadsync +import + ../../waku/factory/waku, + ../ffi_types, + ./requests/node_lifecycle_request, + ./requests/peer_manager_request, + ./requests/protocols/relay_request, + ./requests/protocols/store_request, + ./requests/protocols/lightpush_request, + ./requests/protocols/filter_request, + ./requests/debug_node_request, + ./requests/discovery_request, + ./requests/ping_request + +type RequestType* {.pure.} = enum + LIFECYCLE + PEER_MANAGER + PING + RELAY + STORE + DEBUG + DISCOVERY + LIGHTPUSH + FILTER + +type FFIThreadRequest* = object + reqType: RequestType + reqContent: pointer + callback: FFICallBack + userData: pointer + +proc createShared*( + T: type FFIThreadRequest, + reqType: RequestType, + reqContent: pointer, + callback: FFICallBack, + userData: pointer, +): ptr type T = + var ret = createShared(T) + ret[].reqType = reqType + ret[].reqContent = reqContent + ret[].callback = callback + ret[].userData = userData + return ret + +proc handleRes[T: string | void]( + res: Result[T, string], request: ptr FFIThreadRequest +) = + ## Handles the Result responses, which can either be Result[string, string] or + ## Result[void, string]. + + defer: + deallocShared(request) + + if res.isErr(): + foreignThreadGc: + let msg = "libwaku error: handleRes fireSyncRes error: " & $res.error + request[].callback( + RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData + ) + return + + foreignThreadGc: + var msg: cstring = "" + when T is string: + msg = res.get().cstring() + request[].callback( + RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData + ) + return + +proc process*( + T: type FFIThreadRequest, request: ptr FFIThreadRequest, waku: ptr Waku +) {.async.} = + let retFut = + case request[].reqType + of LIFECYCLE: + cast[ptr NodeLifecycleRequest](request[].reqContent).process(waku) + of PEER_MANAGER: + cast[ptr PeerManagementRequest](request[].reqContent).process(waku[]) + of PING: + cast[ptr PingRequest](request[].reqContent).process(waku) + of RELAY: + cast[ptr RelayRequest](request[].reqContent).process(waku) + of STORE: + cast[ptr StoreRequest](request[].reqContent).process(waku) + of DEBUG: + cast[ptr DebugNodeRequest](request[].reqContent).process(waku[]) + of DISCOVERY: + cast[ptr DiscoveryRequest](request[].reqContent).process(waku) + of LIGHTPUSH: + cast[ptr LightpushRequest](request[].reqContent).process(waku) + of FILTER: + cast[ptr FilterRequest](request[].reqContent).process(waku) + + handleRes(await retFut, request) + +proc `$`*(self: FFIThreadRequest): string = + return $self.reqType