diff --git a/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim b/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim index 16ab116..58ca559 100644 --- a/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim +++ b/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim @@ -4,40 +4,55 @@ import chronos, chronicles, results, confutils, confutils/std/net import ../../../alloc type SdsLifecycleMsgType* = enum - CREATE_SDS - START_SDS - STOP_SDS + CREATE_RELIABILITY_MANAGER type SdsLifecycleRequest* = object operation: SdsLifecycleMsgType - configJson: cstring ## Only used in 'CREATE_NODE' operation - appCallbacks: AppCallbacks + channelId: cstring proc createShared*( - T: type SdsLifecycleRequest, - op: SdsLifecycleMsgType, - configJson: cstring = "", - appCallbacks: AppCallbacks = nil, + T: type SdsLifecycleRequest, op: SdsLifecycleMsgType, configJson: cstring = "" ): ptr type T = var ret = createShared(T) ret[].operation = op - ret[].appCallbacks = appCallbacks - ret[].configJson = configJson.alloc() + ret[].channelId = channelId.alloc() return ret proc destroyShared(self: ptr SdsLifecycleRequest) = - deallocShared(self[].configJson) + deallocShared(self[].channelId) deallocShared(self) +proc createReliabilityManager(channelId: cstring): Result[ReliabilityManager, string] = + let channelId = $channelIdCStr + if channelId.len == 0: + error "Failed creating ReliabilityManager: Channel ID cannot be empty" + return err("Failed creating ReliabilityManager: Channel ID cannot be empty") + + let rm = newReliabilityManager(channelId).valueOr: + error "Failed creating reliability manager", error = error + return err("Failed creating reliability manager: " & $error) + + rm.onMessageReady = proc(msgId: MessageID) = + nimMessageReadyCallback(rm, msgId) + rm.onMessageSent = proc(msgId: MessageID) = + nimMessageSentCallback(rm, msgId) + rm.onMissingDependencies = proc(msgId: MessageID, deps: seq[MessageID]) = + nimMissingDependenciesCallback(rm, msgId, deps) + rm.onPeriodicSync = proc() = + nimPeriodicSyncCallback(rm) + + return ok(rm) + proc process*( - self: ptr SdsLifecycleRequest, waku: ptr Waku + self: ptr SdsLifecycleRequest, rm: ptr ReliabilityManager ): Future[Result[string, string]] {.async.} = defer: destroyShared(self) case self.operation - of CREATE_SDS: discard - of START_SDS: discard - of STOP_SDS: discard + of CREATE_RELIABILITY_MANAGER: + rm[] = (await createReliabilityManager(self.channelId)).valueOr: + error "CREATE_RELIABILITY_MANAGER failed", error = error + return err("error processing CREATE_RELIABILITY_MANAGER request: " & $error) return ok("") diff --git a/library/sds_thread/inter_thread_communication/sds_thread_request.nim b/library/sds_thread/inter_thread_communication/sds_thread_request.nim index be38fd6..64c908f 100644 --- a/library/sds_thread/inter_thread_communication/sds_thread_request.nim +++ b/library/sds_thread/inter_thread_communication/sds_thread_request.nim @@ -55,14 +55,13 @@ proc handleRes[T: string | void]( ) return -# TODO: change waku for reliability manager or something like that proc process*( - T: type SdsThreadRequest, request: ptr SdsThreadRequest, waku: ptr Waku + T: type SdsThreadRequest, request: ptr SdsThreadRequest, rm: ptr ReliabilityManager ) {.async.} = let retFut = case request[].reqType of LIFECYCLE: - cast[ptr SdsLifecycleRequest](request[].reqContent).process(waku) + cast[ptr SdsLifecycleRequest](request[].reqContent).process(rm) handleRes(await retFut, request) diff --git a/library/sds_thread/sds_thread.nim b/library/sds_thread/sds_thread.nim index a8b141d..3cde969 100644 --- a/library/sds_thread/sds_thread.nim +++ b/library/sds_thread/sds_thread.nim @@ -23,7 +23,7 @@ proc runSds(ctx: ptr SdsContext) {.async.} = ## This is the worker body. This runs the SDS instance ## and attends library user requests (stop, connect_to, etc.) - var waku: Waku # TODO + var rm: ReliabilityManager while true: await ctx.reqSignal.wait() @@ -43,7 +43,7 @@ proc runSds(ctx: ptr SdsContext) {.async.} = error "could not fireSync back to requester thread", error = fireRes.error ## Handle the request - asyncSpawn SdsThreadRequest.process(request, addr waku) # TODO + asyncSpawn SdsThreadRequest.process(request, addr rm) proc run(ctx: ptr SdsContext) {.thread.} = ## Launch sds worker