setting up create reliability manager request

This commit is contained in:
Gabriel mermelstein 2025-04-09 13:35:06 +03:00
parent dbf271e0f9
commit 12f5cc2ebb
No known key found for this signature in database
GPG Key ID: 82B8134785FEAE0D
3 changed files with 35 additions and 21 deletions

View File

@ -4,40 +4,55 @@ import chronos, chronicles, results, confutils, confutils/std/net
import ../../../alloc
type SdsLifecycleMsgType* = enum
CREATE_SDS
START_SDS
STOP_SDS
CREATE_RELIABILITY_MANAGER
type SdsLifecycleRequest* = object
operation: SdsLifecycleMsgType
configJson: cstring ## Only used in 'CREATE_NODE' operation
appCallbacks: AppCallbacks
channelId: cstring
proc createShared*(
T: type SdsLifecycleRequest,
op: SdsLifecycleMsgType,
configJson: cstring = "",
appCallbacks: AppCallbacks = nil,
T: type SdsLifecycleRequest, op: SdsLifecycleMsgType, configJson: cstring = ""
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].appCallbacks = appCallbacks
ret[].configJson = configJson.alloc()
ret[].channelId = channelId.alloc()
return ret
proc destroyShared(self: ptr SdsLifecycleRequest) =
deallocShared(self[].configJson)
deallocShared(self[].channelId)
deallocShared(self)
proc createReliabilityManager(channelId: cstring): Result[ReliabilityManager, string] =
let channelId = $channelIdCStr
if channelId.len == 0:
error "Failed creating ReliabilityManager: Channel ID cannot be empty"
return err("Failed creating ReliabilityManager: Channel ID cannot be empty")
let rm = newReliabilityManager(channelId).valueOr:
error "Failed creating reliability manager", error = error
return err("Failed creating reliability manager: " & $error)
rm.onMessageReady = proc(msgId: MessageID) =
nimMessageReadyCallback(rm, msgId)
rm.onMessageSent = proc(msgId: MessageID) =
nimMessageSentCallback(rm, msgId)
rm.onMissingDependencies = proc(msgId: MessageID, deps: seq[MessageID]) =
nimMissingDependenciesCallback(rm, msgId, deps)
rm.onPeriodicSync = proc() =
nimPeriodicSyncCallback(rm)
return ok(rm)
proc process*(
self: ptr SdsLifecycleRequest, waku: ptr Waku
self: ptr SdsLifecycleRequest, rm: ptr ReliabilityManager
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)
case self.operation
of CREATE_SDS: discard
of START_SDS: discard
of STOP_SDS: discard
of CREATE_RELIABILITY_MANAGER:
rm[] = (await createReliabilityManager(self.channelId)).valueOr:
error "CREATE_RELIABILITY_MANAGER failed", error = error
return err("error processing CREATE_RELIABILITY_MANAGER request: " & $error)
return ok("")

View File

@ -55,14 +55,13 @@ proc handleRes[T: string | void](
)
return
# TODO: change waku for reliability manager or something like that
proc process*(
T: type SdsThreadRequest, request: ptr SdsThreadRequest, waku: ptr Waku
T: type SdsThreadRequest, request: ptr SdsThreadRequest, rm: ptr ReliabilityManager
) {.async.} =
let retFut =
case request[].reqType
of LIFECYCLE:
cast[ptr SdsLifecycleRequest](request[].reqContent).process(waku)
cast[ptr SdsLifecycleRequest](request[].reqContent).process(rm)
handleRes(await retFut, request)

View File

@ -23,7 +23,7 @@ proc runSds(ctx: ptr SdsContext) {.async.} =
## This is the worker body. This runs the SDS instance
## and attends library user requests (stop, connect_to, etc.)
var waku: Waku # TODO
var rm: ReliabilityManager
while true:
await ctx.reqSignal.wait()
@ -43,7 +43,7 @@ proc runSds(ctx: ptr SdsContext) {.async.} =
error "could not fireSync back to requester thread", error = fireRes.error
## Handle the request
asyncSpawn SdsThreadRequest.process(request, addr waku) # TODO
asyncSpawn SdsThreadRequest.process(request, addr rm)
proc run(ctx: ptr SdsContext) {.thread.} =
## Launch sds worker