diff --git a/codex.nim b/codex.nim index 6302b257..b3e40608 100644 --- a/codex.nim +++ b/codex.nim @@ -104,7 +104,7 @@ when isMainModule: ## Ctrl+C handling proc doShutdown() = - shutdown = server.stop() + shutdown = server.shutdown() state = CodexStatus.Stopping notice "Stopping Codex" diff --git a/codex/codex.nim b/codex/codex.nim index 5cbd6fdd..9807d5a8 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -169,8 +169,8 @@ proc bootstrapInteractions(s: CodexServer): Future[void] {.async.} = proc start*(s: CodexServer) {.async.} = trace "Starting codex node", config = $s.config - await s.repoStore.start() + s.maintenance.start() await s.codexNode.switch.start() @@ -208,6 +208,11 @@ proc stop*(s: CodexServer) {.async.} = error "Failed to stop codex node", failures = res.failure.len raiseAssert "Failed to stop codex node" +proc close*(s: CodexServer) {.async.} = + var futures = @[s.codexNode.close(), s.repoStore.close()] + + let res = await noCancel allFinishedFailed[void](futures) + if not s.taskpool.isNil: try: s.taskpool.shutdown() @@ -215,6 +220,14 @@ proc stop*(s: CodexServer) {.async.} = error "Failed to stop the taskpool", failures = res.failure.len raiseAssert("Failure in taskpool shutdown:" & exc.msg) + if res.failure.len > 0: + error "Failed to close codex node", failures = res.failure.len + raiseAssert "Failed to close codex node" + +proc shutdown(server: CodexServer) {.async.} = + await server.stop() + await server.close() + proc new*( T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey ): CodexServer = diff --git a/codex/node.nim b/codex/node.nim index 063e90f5..ccae080c 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -907,6 +907,7 @@ proc stop*(self: CodexNodeRef) {.async.} = if not self.clock.isNil: await self.clock.stop() +proc close*(self: CodexNodeRef) {.async.} = if not self.networkStore.isNil: await self.networkStore.close diff --git a/codex/stores/repostore/store.nim b/codex/stores/repostore/store.nim index ad6f03fc..16813a16 100644 --- a/codex/stores/repostore/store.nim +++ b/codex/stores/repostore/store.nim @@ -443,7 +443,6 @@ proc start*( ): Future[void] {.async: (raises: [CancelledError, CodexError]).} = ## Start repo ## - if self.started: trace "Repo already started" return @@ -465,6 +464,5 @@ proc stop*(self: RepoStore): Future[void] {.async: (raises: []).} = return trace "Stopping repo" - await self.close() self.started = false diff --git a/library/codex_thread_requests/requests/node_lifecycle_request.nim b/library/codex_thread_requests/requests/node_lifecycle_request.nim index 885a2a0e..5f5efbaf 100644 --- a/library/codex_thread_requests/requests/node_lifecycle_request.nim +++ b/library/codex_thread_requests/requests/node_lifecycle_request.nim @@ -22,7 +22,7 @@ import ../../../codex/utils import ../../../codex/utils/[keyutils, fileutils] import ../../../codex/units -from ../../../codex/codex import CodexServer, new, start, stop +from ../../../codex/codex import CodexServer, new, start, stop, close logScope: topics = "codexlib codexliblifecycle" @@ -31,6 +31,7 @@ type NodeLifecycleMsgType* = enum CREATE_NODE START_NODE STOP_NODE + DESTROY_NODE proc readValue*[T: InputFile | InputDir | OutPath | OutDir | OutFile]( r: var JsonReader, val: var T @@ -81,13 +82,18 @@ proc readValue*(r: var JsonReader, val: var EthAddress) = type NodeLifecycleRequest* = object operation: NodeLifecycleMsgType configJson: cstring + onDestroy: proc() {.gcsafe.} proc createShared*( - T: type NodeLifecycleRequest, op: NodeLifecycleMsgType, configJson: cstring = "" + T: type NodeLifecycleRequest, + op: NodeLifecycleMsgType, + configJson: cstring = "", + onDestroy: proc() {.gcsafe.} = nil, ): ptr type T = var ret = createShared(T) ret[].operation = op ret[].configJson = configJson.alloc() + ret[].onDestroy = onDestroy return ret proc destroyShared(self: ptr NodeLifecycleRequest) = @@ -178,5 +184,11 @@ proc process*( except Exception as e: error "Failed to STOP_NODE.", error = e.msg return err(e.msg) - + of DESTROY_NODE: + try: + await codex[].close() + self.onDestroy() + except Exception as e: + error "Failed to STOP_NODE.", error = e.msg + return err(e.msg) return ok("") diff --git a/library/libcodex.nim b/library/libcodex.nim index 566bbf67..da17e81c 100644 --- a/library/libcodex.nim +++ b/library/libcodex.nim @@ -27,6 +27,7 @@ when defined(linux): import std/[atomics] import chronicles import chronos +import chronos/threadsync import ./codex_context import ./codex_thread_requests/codex_thread_request import ./codex_thread_requests/requests/node_lifecycle_request @@ -251,11 +252,28 @@ proc codex_destroy( initializeLibrary() checkLibcodexParams(ctx, callback, userData) - let res = codex_context.destroyCodexContext(ctx) + let destroySignal = ThreadSignalPtr.new().valueOr: + return callback.error("failed to create destroy signal", userData) + + proc onDestroy() {.gcsafe.} = + discard destroySignal.fireSync() + + let reqContent = NodeLifecycleRequest.createShared( + NodeLifecycleMsgType.DESTROY_NODE, onDestroy = onDestroy + ) + var res = codex_context.sendRequestToCodexThread( + ctx, RequestType.LIFECYCLE, reqContent, callback, userData + ) if res.isErr: return callback.error(res.error, userData) - return callback.success("", userData) + discard destroySignal.waitSync() + + res = codex_context.destroyCodexContext(ctx) + if res.isErr: + return callback.error(res.error, userData) + + return RET_OK proc codex_upload_init( ctx: ptr CodexContext,