diff --git a/Makefile b/Makefile index 3ab8b78..1c478cd 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ deps: | sds.nims STATIC ?= 0 libsds: deps - rm -f build/libwaku* + rm -f build/libsds* ifeq ($(STATIC), 1) echo -e $(BUILD_MSG) "build/$@.a" && \ $(ENV_SCRIPT) nim libsdsStatic $(NIM_PARAMS) sds.nims diff --git a/library/alloc.nim b/library/alloc.nim new file mode 100644 index 0000000..1a6f118 --- /dev/null +++ b/library/alloc.nim @@ -0,0 +1,42 @@ +## Can be shared safely between threads +type SharedSeq*[T] = tuple[data: ptr UncheckedArray[T], len: int] + +proc alloc*(str: cstring): cstring = + # Byte allocation from the given address. + # There should be the corresponding manual deallocation with deallocShared ! + if str.isNil(): + var ret = cast[cstring](allocShared(1)) # Allocate memory for the null terminator + ret[0] = '\0' # Set the null terminator + return ret + + let ret = cast[cstring](allocShared(len(str) + 1)) + copyMem(ret, str, len(str) + 1) + return ret + +proc alloc*(str: string): cstring = + ## Byte allocation from the given address. + ## There should be the corresponding manual deallocation with deallocShared ! + var ret = cast[cstring](allocShared(str.len + 1)) + let s = cast[seq[char]](str) + for i in 0 ..< str.len: + ret[i] = s[i] + ret[str.len] = '\0' + return ret + +proc allocSharedSeq*[T](s: seq[T]): SharedSeq[T] = + let data = allocShared(sizeof(T) * s.len) + if s.len != 0: + copyMem(data, unsafeAddr s[0], s.len) + return (cast[ptr UncheckedArray[T]](data), s.len) + +proc deallocSharedSeq*[T](s: var SharedSeq[T]) = + deallocShared(s.data) + s.len = 0 + +proc toSeq*[T](s: SharedSeq[T]): seq[T] = + ## Creates a seq[T] from a SharedSeq[T]. No explicit dealloc is required + ## as req[T] is a GC managed type. + var ret = newSeq[T]() + for i in 0 ..< s.len: + ret.add(s.data[i]) + return ret diff --git a/library/ffi_types.nim b/library/ffi_types.nim new file mode 100644 index 0000000..e2445a2 --- /dev/null +++ b/library/ffi_types.nim @@ -0,0 +1,30 @@ +################################################################################ +### Exported types + +type SdsCallBack* = proc( + callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer +) {.cdecl, gcsafe, raises: [].} + +const RET_OK*: cint = 0 +const RET_ERR*: cint = 1 +const RET_MISSING_CALLBACK*: cint = 2 + +### End of exported types +################################################################################ + +################################################################################ +### FFI utils + +template foreignThreadGc*(body: untyped) = + when declared(setupForeignThreadGc): + setupForeignThreadGc() + + body + + when declared(tearDownForeignThreadGc): + tearDownForeignThreadGc() + +type onDone* = proc() + +### End of FFI utils +################################################################################ diff --git a/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim b/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim new file mode 100644 index 0000000..16ab116 --- /dev/null +++ b/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim @@ -0,0 +1,43 @@ +import std/[options, json, strutils, net] +import chronos, chronicles, results, confutils, confutils/std/net + +import ../../../alloc + +type SdsLifecycleMsgType* = enum + CREATE_SDS + START_SDS + STOP_SDS + +type SdsLifecycleRequest* = object + operation: SdsLifecycleMsgType + configJson: cstring ## Only used in 'CREATE_NODE' operation + appCallbacks: AppCallbacks + +proc createShared*( + T: type SdsLifecycleRequest, + op: SdsLifecycleMsgType, + configJson: cstring = "", + appCallbacks: AppCallbacks = nil, +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].appCallbacks = appCallbacks + ret[].configJson = configJson.alloc() + return ret + +proc destroyShared(self: ptr SdsLifecycleRequest) = + deallocShared(self[].configJson) + deallocShared(self) + +proc process*( + self: ptr SdsLifecycleRequest, waku: ptr Waku +): Future[Result[string, string]] {.async.} = + defer: + destroyShared(self) + + case self.operation + of CREATE_SDS: discard + of START_SDS: discard + of STOP_SDS: discard + + return ok("") diff --git a/library/sds_thread/inter_thread_communication/sds_thread_request.nim b/library/sds_thread/inter_thread_communication/sds_thread_request.nim new file mode 100644 index 0000000..be38fd6 --- /dev/null +++ b/library/sds_thread/inter_thread_communication/sds_thread_request.nim @@ -0,0 +1,70 @@ +## This file contains the base message request type that will be handled. +## The requests are created by the main thread and processed by +## the SDS Thread. + +import std/json, results +import chronos, chronos/threadsync +import ../../ffi_types, ./requests/sds_lifecycle_request + +type RequestType* {.pure.} = enum + LIFECYCLE + +type SdsThreadRequest* = object + reqType: RequestType + reqContent: pointer + callback: SdsCallBack + userData: pointer + +proc createShared*( + T: type SdsThreadRequest, + reqType: RequestType, + reqContent: pointer, + callback: SdsCallBack, + userData: pointer, +): ptr type T = + var ret = createShared(T) + ret[].reqType = reqType + ret[].reqContent = reqContent + ret[].callback = callback + ret[].userData = userData + return ret + +proc handleRes[T: string | void]( + res: Result[T, string], request: ptr SdsThreadRequest +) = + ## Handles the Result responses, which can either be Result[string, string] or + ## Result[void, string]. + + defer: + deallocShared(request) + + if res.isErr(): + foreignThreadGc: + let msg = "libsds error: handleRes fireSyncRes error: " & $res.error + request[].callback( + RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData + ) + return + + foreignThreadGc: + var msg: cstring = "" + when T is string: + msg = res.get().cstring() + request[].callback( + RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData + ) + return + +# TODO: change waku for reliability manager or something like that +proc process*( + T: type SdsThreadRequest, request: ptr SdsThreadRequest, waku: ptr Waku +) {.async.} = + let retFut = + case request[].reqType + of LIFECYCLE: + cast[ptr SdsLifecycleRequest](request[].reqContent).process(waku) + + handleRes(await retFut, request) + +proc `$`*(self: SdsThreadRequest): string = + return $self.reqType diff --git a/library/sds_thread/sds_thread.nim b/library/sds_thread/sds_thread.nim new file mode 100644 index 0000000..a8b141d --- /dev/null +++ b/library/sds_thread/sds_thread.nim @@ -0,0 +1,129 @@ +{.pragma: exported, exportc, cdecl, raises: [].} +{.pragma: callback, cdecl, raises: [], gcsafe.} +{.passc: "-fPIC".} + +import std/[options, atomics, os, net, locks] +import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results +import ../ffi_types + +type SdsContext* = object + thread: Thread[(ptr SdsContext)] + lock: Lock + reqChannel: ChannelSPSCSingle[ptr SdsThreadRequest] + reqSignal: ThreadSignalPtr + # to inform The SDS Thread (a.k.a TST) that a new request is sent + reqReceivedSignal: ThreadSignalPtr + # to inform the main thread that the request is rx by TST + userData*: pointer + eventCallback*: pointer + eventUserdata*: pointer + running: Atomic[bool] # To control when the thread is running + +proc runSds(ctx: ptr SdsContext) {.async.} = + ## This is the worker body. This runs the SDS instance + ## and attends library user requests (stop, connect_to, etc.) + + var waku: Waku # TODO + + while true: + await ctx.reqSignal.wait() + + if ctx.running.load == false: + break + + ## Trying to get a request from the libsds requestor thread + var request: ptr SdsThreadRequest + let recvOk = ctx.reqChannel.tryRecv(request) + if not recvOk: + error "sds thread could not receive a request" + continue + + let fireRes = ctx.reqReceivedSignal.fireSync() + if fireRes.isErr(): + error "could not fireSync back to requester thread", error = fireRes.error + + ## Handle the request + asyncSpawn SdsThreadRequest.process(request, addr waku) # TODO + +proc run(ctx: ptr SdsContext) {.thread.} = + ## Launch sds worker + waitFor runSds(ctx) + +proc createSdsThread*(): Result[ptr SdsContext, string] = + ## This proc is called from the main thread and it creates + ## the SDS working thread. + var ctx = createShared(SdsContext, 1) + ctx.reqSignal = ThreadSignalPtr.new().valueOr: + return err("couldn't create reqSignal ThreadSignalPtr") + ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: + return err("couldn't create reqReceivedSignal ThreadSignalPtr") + ctx.lock.initLock() + + ctx.running.store(true) + + try: + createThread(ctx.thread, run, ctx) + except ValueError, ResourceExhaustedError: + # and freeShared for typed allocations! + freeShared(ctx) + + return err("failed to create the SDS thread: " & getCurrentExceptionMsg()) + + return ok(ctx) + +proc destroySdsThread*(ctx: ptr SdsContext): Result[void, string] = + ctx.running.store(false) + + let signaledOnTime = ctx.reqSignal.fireSync().valueOr: + return err("error in destroySdsThread: " & $error) + if not signaledOnTime: + return err("failed to signal reqSignal on time in destroySdsThread") + + joinThread(ctx.thread) + ctx.lock.deinitLock() + ?ctx.reqSignal.close() + ?ctx.reqReceivedSignal.close() + freeShared(ctx) + + return ok() + +proc sendRequestToSdsThread*( + ctx: ptr SdsContext, + reqType: RequestType, + reqContent: pointer, + callback: SdsCallBack, + userData: pointer, +): Result[void, string] = + let req = SdsThreadRequest.createShared(reqType, reqContent, callback, userData) + + # This lock is only necessary while we use a SP Channel and while the signalling + # between threads assumes that there aren't concurrent requests. + # Rearchitecting the signaling + migrating to a MP Channel will allow us to receive + # requests concurrently and spare us the need of locks + ctx.lock.acquire() + defer: + ctx.lock.release() + ## Sending the request + let sentOk = ctx.reqChannel.trySend(req) + if not sentOk: + deallocShared(req) + return err("Couldn't send a request to the sds thread: " & $req[]) + + let fireSyncRes = ctx.reqSignal.fireSync() + if fireSyncRes.isErr(): + deallocShared(req) + return err("failed fireSync: " & $fireSyncRes.error) + + if fireSyncRes.get() == false: + deallocShared(req) + return err("Couldn't fireSync in time") + + ## wait until the SDS Thread properly received the request + let res = ctx.reqReceivedSignal.waitSync() + if res.isErr(): + deallocShared(req) + return err("Couldn't receive reqReceivedSignal signal") + + ## Notice that in case of "ok", the deallocShared(req) is performed by the SDS Thread in the + ## process proc. + ok()