Separate stop and close features for restartability

This commit is contained in:
Arnaud 2025-10-07 07:58:07 +02:00
parent 107bf42a7e
commit 80c0e64be4
No known key found for this signature in database
GPG Key ID: B8FBC178F10CA7AE
6 changed files with 51 additions and 9 deletions

View File

@ -104,7 +104,7 @@ when isMainModule:
## Ctrl+C handling
proc doShutdown() =
shutdown = server.stop()
shutdown = server.shutdown()
state = CodexStatus.Stopping
notice "Stopping Codex"

View File

@ -169,8 +169,8 @@ proc bootstrapInteractions(s: CodexServer): Future[void] {.async.} =
proc start*(s: CodexServer) {.async.} =
trace "Starting codex node", config = $s.config
await s.repoStore.start()
s.maintenance.start()
await s.codexNode.switch.start()
@ -208,6 +208,11 @@ proc stop*(s: CodexServer) {.async.} =
error "Failed to stop codex node", failures = res.failure.len
raiseAssert "Failed to stop codex node"
proc close*(s: CodexServer) {.async.} =
var futures = @[s.codexNode.close(), s.repoStore.close()]
let res = await noCancel allFinishedFailed[void](futures)
if not s.taskpool.isNil:
try:
s.taskpool.shutdown()
@ -215,6 +220,14 @@ proc stop*(s: CodexServer) {.async.} =
error "Failed to stop the taskpool", failures = res.failure.len
raiseAssert("Failure in taskpool shutdown:" & exc.msg)
if res.failure.len > 0:
error "Failed to close codex node", failures = res.failure.len
raiseAssert "Failed to close codex node"
proc shutdown(server: CodexServer) {.async.} =
await server.stop()
await server.close()
proc new*(
T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey
): CodexServer =

View File

@ -907,6 +907,7 @@ proc stop*(self: CodexNodeRef) {.async.} =
if not self.clock.isNil:
await self.clock.stop()
proc close*(self: CodexNodeRef) {.async.} =
if not self.networkStore.isNil:
await self.networkStore.close

View File

@ -443,7 +443,6 @@ proc start*(
): Future[void] {.async: (raises: [CancelledError, CodexError]).} =
## Start repo
##
if self.started:
trace "Repo already started"
return
@ -465,6 +464,5 @@ proc stop*(self: RepoStore): Future[void] {.async: (raises: []).} =
return
trace "Stopping repo"
await self.close()
self.started = false

View File

@ -22,7 +22,7 @@ import ../../../codex/utils
import ../../../codex/utils/[keyutils, fileutils]
import ../../../codex/units
from ../../../codex/codex import CodexServer, new, start, stop
from ../../../codex/codex import CodexServer, new, start, stop, close
logScope:
topics = "codexlib codexliblifecycle"
@ -31,6 +31,7 @@ type NodeLifecycleMsgType* = enum
CREATE_NODE
START_NODE
STOP_NODE
DESTROY_NODE
proc readValue*[T: InputFile | InputDir | OutPath | OutDir | OutFile](
r: var JsonReader, val: var T
@ -81,13 +82,18 @@ proc readValue*(r: var JsonReader, val: var EthAddress) =
type NodeLifecycleRequest* = object
operation: NodeLifecycleMsgType
configJson: cstring
onDestroy: proc() {.gcsafe.}
proc createShared*(
T: type NodeLifecycleRequest, op: NodeLifecycleMsgType, configJson: cstring = ""
T: type NodeLifecycleRequest,
op: NodeLifecycleMsgType,
configJson: cstring = "",
onDestroy: proc() {.gcsafe.} = nil,
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].configJson = configJson.alloc()
ret[].onDestroy = onDestroy
return ret
proc destroyShared(self: ptr NodeLifecycleRequest) =
@ -178,5 +184,11 @@ proc process*(
except Exception as e:
error "Failed to STOP_NODE.", error = e.msg
return err(e.msg)
of DESTROY_NODE:
try:
await codex[].close()
self.onDestroy()
except Exception as e:
error "Failed to STOP_NODE.", error = e.msg
return err(e.msg)
return ok("")

View File

@ -27,6 +27,7 @@ when defined(linux):
import std/[atomics]
import chronicles
import chronos
import chronos/threadsync
import ./codex_context
import ./codex_thread_requests/codex_thread_request
import ./codex_thread_requests/requests/node_lifecycle_request
@ -251,11 +252,28 @@ proc codex_destroy(
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let res = codex_context.destroyCodexContext(ctx)
let destroySignal = ThreadSignalPtr.new().valueOr:
return callback.error("failed to create destroy signal", userData)
proc onDestroy() {.gcsafe.} =
discard destroySignal.fireSync()
let reqContent = NodeLifecycleRequest.createShared(
NodeLifecycleMsgType.DESTROY_NODE, onDestroy = onDestroy
)
var res = codex_context.sendRequestToCodexThread(
ctx, RequestType.LIFECYCLE, reqContent, callback, userData
)
if res.isErr:
return callback.error(res.error, userData)
return callback.success("", userData)
discard destroySignal.waitSync()
res = codex_context.destroyCodexContext(ctx)
if res.isErr:
return callback.error(res.error, userData)
return RET_OK
proc codex_upload_init(
ctx: ptr CodexContext,