adding missing files

This commit is contained in:
Gabriel mermelstein 2025-04-09 13:05:42 +03:00
parent 8806391371
commit dbf271e0f9
No known key found for this signature in database
GPG Key ID: 82B8134785FEAE0D
6 changed files with 315 additions and 1 deletions

View File

@ -9,7 +9,7 @@ deps: | sds.nims
STATIC ?= 0
libsds: deps
rm -f build/libwaku*
rm -f build/libsds*
ifeq ($(STATIC), 1)
echo -e $(BUILD_MSG) "build/$@.a" && \
$(ENV_SCRIPT) nim libsdsStatic $(NIM_PARAMS) sds.nims

42
library/alloc.nim Normal file
View File

@ -0,0 +1,42 @@
## Can be shared safely between threads
type SharedSeq*[T] = tuple[data: ptr UncheckedArray[T], len: int]
proc alloc*(str: cstring): cstring =
# Byte allocation from the given address.
# There should be the corresponding manual deallocation with deallocShared !
if str.isNil():
var ret = cast[cstring](allocShared(1)) # Allocate memory for the null terminator
ret[0] = '\0' # Set the null terminator
return ret
let ret = cast[cstring](allocShared(len(str) + 1))
copyMem(ret, str, len(str) + 1)
return ret
proc alloc*(str: string): cstring =
## Byte allocation from the given address.
## There should be the corresponding manual deallocation with deallocShared !
var ret = cast[cstring](allocShared(str.len + 1))
let s = cast[seq[char]](str)
for i in 0 ..< str.len:
ret[i] = s[i]
ret[str.len] = '\0'
return ret
proc allocSharedSeq*[T](s: seq[T]): SharedSeq[T] =
let data = allocShared(sizeof(T) * s.len)
if s.len != 0:
copyMem(data, unsafeAddr s[0], s.len)
return (cast[ptr UncheckedArray[T]](data), s.len)
proc deallocSharedSeq*[T](s: var SharedSeq[T]) =
deallocShared(s.data)
s.len = 0
proc toSeq*[T](s: SharedSeq[T]): seq[T] =
## Creates a seq[T] from a SharedSeq[T]. No explicit dealloc is required
## as req[T] is a GC managed type.
var ret = newSeq[T]()
for i in 0 ..< s.len:
ret.add(s.data[i])
return ret

30
library/ffi_types.nim Normal file
View File

@ -0,0 +1,30 @@
################################################################################
### Exported types
type SdsCallBack* = proc(
callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].}
const RET_OK*: cint = 0
const RET_ERR*: cint = 1
const RET_MISSING_CALLBACK*: cint = 2
### End of exported types
################################################################################
################################################################################
### FFI utils
template foreignThreadGc*(body: untyped) =
when declared(setupForeignThreadGc):
setupForeignThreadGc()
body
when declared(tearDownForeignThreadGc):
tearDownForeignThreadGc()
type onDone* = proc()
### End of FFI utils
################################################################################

View File

@ -0,0 +1,43 @@
import std/[options, json, strutils, net]
import chronos, chronicles, results, confutils, confutils/std/net
import ../../../alloc
type SdsLifecycleMsgType* = enum
CREATE_SDS
START_SDS
STOP_SDS
type SdsLifecycleRequest* = object
operation: SdsLifecycleMsgType
configJson: cstring ## Only used in 'CREATE_NODE' operation
appCallbacks: AppCallbacks
proc createShared*(
T: type SdsLifecycleRequest,
op: SdsLifecycleMsgType,
configJson: cstring = "",
appCallbacks: AppCallbacks = nil,
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].appCallbacks = appCallbacks
ret[].configJson = configJson.alloc()
return ret
proc destroyShared(self: ptr SdsLifecycleRequest) =
deallocShared(self[].configJson)
deallocShared(self)
proc process*(
self: ptr SdsLifecycleRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)
case self.operation
of CREATE_SDS: discard
of START_SDS: discard
of STOP_SDS: discard
return ok("")

View File

@ -0,0 +1,70 @@
## This file contains the base message request type that will be handled.
## The requests are created by the main thread and processed by
## the SDS Thread.
import std/json, results
import chronos, chronos/threadsync
import ../../ffi_types, ./requests/sds_lifecycle_request
type RequestType* {.pure.} = enum
LIFECYCLE
type SdsThreadRequest* = object
reqType: RequestType
reqContent: pointer
callback: SdsCallBack
userData: pointer
proc createShared*(
T: type SdsThreadRequest,
reqType: RequestType,
reqContent: pointer,
callback: SdsCallBack,
userData: pointer,
): ptr type T =
var ret = createShared(T)
ret[].reqType = reqType
ret[].reqContent = reqContent
ret[].callback = callback
ret[].userData = userData
return ret
proc handleRes[T: string | void](
res: Result[T, string], request: ptr SdsThreadRequest
) =
## Handles the Result responses, which can either be Result[string, string] or
## Result[void, string].
defer:
deallocShared(request)
if res.isErr():
foreignThreadGc:
let msg = "libsds error: handleRes fireSyncRes error: " & $res.error
request[].callback(
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData
)
return
foreignThreadGc:
var msg: cstring = ""
when T is string:
msg = res.get().cstring()
request[].callback(
RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData
)
return
# TODO: change waku for reliability manager or something like that
proc process*(
T: type SdsThreadRequest, request: ptr SdsThreadRequest, waku: ptr Waku
) {.async.} =
let retFut =
case request[].reqType
of LIFECYCLE:
cast[ptr SdsLifecycleRequest](request[].reqContent).process(waku)
handleRes(await retFut, request)
proc `$`*(self: SdsThreadRequest): string =
return $self.reqType

View File

@ -0,0 +1,129 @@
{.pragma: exported, exportc, cdecl, raises: [].}
{.pragma: callback, cdecl, raises: [], gcsafe.}
{.passc: "-fPIC".}
import std/[options, atomics, os, net, locks]
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
import ../ffi_types
type SdsContext* = object
thread: Thread[(ptr SdsContext)]
lock: Lock
reqChannel: ChannelSPSCSingle[ptr SdsThreadRequest]
reqSignal: ThreadSignalPtr
# to inform The SDS Thread (a.k.a TST) that a new request is sent
reqReceivedSignal: ThreadSignalPtr
# to inform the main thread that the request is rx by TST
userData*: pointer
eventCallback*: pointer
eventUserdata*: pointer
running: Atomic[bool] # To control when the thread is running
proc runSds(ctx: ptr SdsContext) {.async.} =
## This is the worker body. This runs the SDS instance
## and attends library user requests (stop, connect_to, etc.)
var waku: Waku # TODO
while true:
await ctx.reqSignal.wait()
if ctx.running.load == false:
break
## Trying to get a request from the libsds requestor thread
var request: ptr SdsThreadRequest
let recvOk = ctx.reqChannel.tryRecv(request)
if not recvOk:
error "sds thread could not receive a request"
continue
let fireRes = ctx.reqReceivedSignal.fireSync()
if fireRes.isErr():
error "could not fireSync back to requester thread", error = fireRes.error
## Handle the request
asyncSpawn SdsThreadRequest.process(request, addr waku) # TODO
proc run(ctx: ptr SdsContext) {.thread.} =
## Launch sds worker
waitFor runSds(ctx)
proc createSdsThread*(): Result[ptr SdsContext, string] =
## This proc is called from the main thread and it creates
## the SDS working thread.
var ctx = createShared(SdsContext, 1)
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqSignal ThreadSignalPtr")
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqReceivedSignal ThreadSignalPtr")
ctx.lock.initLock()
ctx.running.store(true)
try:
createThread(ctx.thread, run, ctx)
except ValueError, ResourceExhaustedError:
# and freeShared for typed allocations!
freeShared(ctx)
return err("failed to create the SDS thread: " & getCurrentExceptionMsg())
return ok(ctx)
proc destroySdsThread*(ctx: ptr SdsContext): Result[void, string] =
ctx.running.store(false)
let signaledOnTime = ctx.reqSignal.fireSync().valueOr:
return err("error in destroySdsThread: " & $error)
if not signaledOnTime:
return err("failed to signal reqSignal on time in destroySdsThread")
joinThread(ctx.thread)
ctx.lock.deinitLock()
?ctx.reqSignal.close()
?ctx.reqReceivedSignal.close()
freeShared(ctx)
return ok()
proc sendRequestToSdsThread*(
ctx: ptr SdsContext,
reqType: RequestType,
reqContent: pointer,
callback: SdsCallBack,
userData: pointer,
): Result[void, string] =
let req = SdsThreadRequest.createShared(reqType, reqContent, callback, userData)
# This lock is only necessary while we use a SP Channel and while the signalling
# between threads assumes that there aren't concurrent requests.
# Rearchitecting the signaling + migrating to a MP Channel will allow us to receive
# requests concurrently and spare us the need of locks
ctx.lock.acquire()
defer:
ctx.lock.release()
## Sending the request
let sentOk = ctx.reqChannel.trySend(req)
if not sentOk:
deallocShared(req)
return err("Couldn't send a request to the sds thread: " & $req[])
let fireSyncRes = ctx.reqSignal.fireSync()
if fireSyncRes.isErr():
deallocShared(req)
return err("failed fireSync: " & $fireSyncRes.error)
if fireSyncRes.get() == false:
deallocShared(req)
return err("Couldn't fireSync in time")
## wait until the SDS Thread properly received the request
let res = ctx.reqReceivedSignal.waitSync()
if res.isErr():
deallocShared(req)
return err("Couldn't receive reqReceivedSignal signal")
## Notice that in case of "ok", the deallocShared(req) is performed by the SDS Thread in the
## process proc.
ok()