From 1523b496031ced0d23b39b64046c2e2587f029cd Mon Sep 17 00:00:00 2001 From: Arnaud Date: Tue, 16 Sep 2025 10:07:48 +0200 Subject: [PATCH] Add repo and fix pointer freeing memory --- codex/codex.nim | 2 +- examples/golang/codex.go | 113 ++++++++++++------ .../codex_thread_request.nim | 4 + .../requests/node_info_request.nim | 46 +++++++ library/libcodex.h | 5 + library/libcodex.nim | 16 ++- 6 files changed, 141 insertions(+), 45 deletions(-) create mode 100644 library/codex_thread_requests/requests/node_info_request.nim diff --git a/codex/codex.nim b/codex/codex.nim index 7c22a126..234a39ec 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -51,7 +51,7 @@ logScope: type CodexServer* = ref object - config: CodexConf + config*: CodexConf restServer: RestServerRef codexNode: CodexNodeRef repoStore: RepoStore diff --git a/examples/golang/codex.go b/examples/golang/codex.go index 6b98c374..2604020c 100644 --- a/examples/golang/codex.go +++ b/examples/golang/codex.go @@ -89,6 +89,10 @@ package main CODEX_CALL(codex_revision(codexCtx, (CodexCallback) callback, resp)); } + static void cGoCodexRepo(void* codexCtx, void* resp) { + CODEX_CALL(codex_repo(codexCtx, (CodexCallback) callback, resp)); + } + static void cGoCodexStart(void* codexCtx, void* resp) { CODEX_CALL(codex_start(codexCtx, (CodexCallback) callback, resp)); } @@ -209,10 +213,8 @@ func newBridgeCtx() *bridgeCtx { } func (b *bridgeCtx) free() { - if b.resp != nil { - C.freeResp(b.resp) - b.resp = nil - } + C.freeResp(b.resp) + b.resp = nil } func (b *bridgeCtx) isACK() bool { @@ -228,20 +230,39 @@ func (b *bridgeCtx) isError() bool { } // TODO: Check the error here after the wait -func (b *bridgeCtx) wait() { +func (b *bridgeCtx) wait() error { b.wg.Wait() + return b.err } func (b *bridgeCtx) getMsg() string { return C.GoStringN(C.getMyCharPtr(b.resp), C.int(C.getMyCharLen(b.resp))) } +func (b *bridgeCtx) StringResult() (string, error) { + if b.isACK() { + if b.wait() != nil { + return "", b.err + } + + return b.result, b.err + } + + if b.isOK() { + return b.getMsg(), nil + } + + return "", errors.New(b.getMsg()) +} + //export callback func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { if resp == nil { return } + // log.Println("Callback called with ret:", ret, " msg:", C.GoStringN(msg, C.int(len)), " len:", len) + m := (*C.Resp)(resp) m.ret = ret m.msg = msg @@ -249,20 +270,27 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { if m.h != 0 { h := cgo.Handle(m.h) + + if h == 0 { + return + } + if v, ok := h.Value().(*bridgeCtx); ok { if ret == C.RET_OK || ret == C.RET_ERR { - msg := C.GoStringN(msg, C.int(len)) + retMsg := C.GoStringN(msg, C.int(len)) if ret == C.RET_OK { - v.result = msg + v.result = retMsg } else { - v.err = errors.New(msg) + v.err = errors.New(retMsg) } h.Delete() + m.h = 0 if v.wg != nil { v.wg.Done() + v = nil } } } @@ -284,7 +312,10 @@ func CodexNew(config CodexConfig) (*CodexNode, error) { ctx := C.cGoCodexNew(cJsonConfig, bridge.resp) if bridge.isACK() { - bridge.wait() + if bridge.wait() != nil { + return nil, bridge.err + } + return &CodexNode{ctx: ctx}, bridge.err } @@ -301,16 +332,7 @@ func (self *CodexNode) CodexVersion() (string, error) { C.cGoCodexVersion(self.ctx, bridge.resp) - if bridge.isACK() { - bridge.wait() - return bridge.result, bridge.err - } - - if bridge.isOK() { - return bridge.getMsg(), nil - } - - return "", errors.New(bridge.getMsg()) + return bridge.StringResult() } func (self *CodexNode) CodexRevision() (string, error) { @@ -319,16 +341,16 @@ func (self *CodexNode) CodexRevision() (string, error) { C.cGoCodexRevision(self.ctx, bridge.resp) - if bridge.isACK() { - bridge.wait() - return bridge.result, bridge.err - } + return bridge.StringResult() +} - if bridge.isOK() { - return bridge.result, nil - } +func (self *CodexNode) CodexRepo() (string, error) { + bridge := newBridgeCtx() + defer bridge.free() - return "", errors.New(bridge.getMsg()) + C.cGoCodexRepo(self.ctx, bridge.resp) + + return bridge.StringResult() } // CodexStart returns the bridgeCtx to allow the caller @@ -340,7 +362,6 @@ func (self *CodexNode) CodexStart() (*bridgeCtx, error) { C.cGoCodexStart(self.ctx, bridge.resp) if bridge.isError() { - bridge.free() return nil, errors.New(bridge.getMsg()) } @@ -352,11 +373,11 @@ func (self *CodexNode) CodexStart() (*bridgeCtx, error) { // TODO: be consistent and do not free the bridgeCtx here func (self *CodexNode) CodexStop() (*bridgeCtx, error) { bridge := newBridgeCtx() + defer bridge.free() C.cGoCodexStop(self.ctx, bridge.resp) if bridge.isError() { - bridge.free() return nil, errors.New(bridge.getMsg()) } @@ -370,11 +391,13 @@ func (self *CodexNode) CodexDestroy() error { C.cGoCodexDestroy(self.ctx, bridge.resp) if bridge.isACK() { - bridge.wait() - return bridge.err + err := bridge.wait() + self.ctx = nil + return err } if bridge.isOK() { + self.ctx = nil return nil } @@ -411,7 +434,11 @@ func main() { return } - node.CodexSetEventCallback() + log.Println("Codex created.") + + // node.CodexSetEventCallback() + + log.Println("Getting version...") version, err := node.CodexVersion() if err != nil { @@ -421,6 +448,8 @@ func main() { log.Println("Codex version:", version) + log.Println("Getting revision...") + revision, err := node.CodexRevision() if err != nil { fmt.Println("Error happened:", err.Error()) @@ -429,6 +458,16 @@ func main() { log.Println("Codex revision:", revision) + log.Println("Getting repo...") + + repo, err := node.CodexRepo() + if err != nil { + fmt.Println("Error happened:", err.Error()) + return + } + + log.Println("Codex repo:", repo) + log.Println("Starting Codex...") bridge, err := node.CodexStart() @@ -438,11 +477,10 @@ func main() { return } - bridge.wait() defer bridge.free() - if bridge.err != nil { - fmt.Println("Error happened:", err.Error()) + if err := bridge.wait(); err != nil { + fmt.Println("Error happened:", err) return } @@ -462,13 +500,12 @@ func main() { return } - bridge.wait() defer bridge.free() log.Println("Codex stopped...") - if bridge.err != nil { - fmt.Println("Error happened:", err.Error()) + if err := bridge.wait(); err != nil { + fmt.Println("Error happened:", err) return } diff --git a/library/codex_thread_requests/codex_thread_request.nim b/library/codex_thread_requests/codex_thread_request.nim index 88d0a678..d05923cd 100644 --- a/library/codex_thread_requests/codex_thread_request.nim +++ b/library/codex_thread_requests/codex_thread_request.nim @@ -7,11 +7,13 @@ import results import chronos import ../ffi_types import ./requests/node_lifecycle_request +import ./requests/node_info_request from ../../codex/codex import CodexServer type RequestType* {.pure.} = enum LIFECYCLE + INFO type CodexThreadRequest* = object reqType: RequestType @@ -80,6 +82,8 @@ proc process*( case request[].reqType of LIFECYCLE: cast[ptr NodeLifecycleRequest](request[].reqContent).process(codex) + of INFO: + cast[ptr NodeInfoRequest](request[].reqContent).process(codex) handleRes(await retFut, request) diff --git a/library/codex_thread_requests/requests/node_info_request.nim b/library/codex_thread_requests/requests/node_info_request.nim new file mode 100644 index 00000000..c252aba2 --- /dev/null +++ b/library/codex_thread_requests/requests/node_info_request.nim @@ -0,0 +1,46 @@ +## This file contains the lifecycle request type that will be handled. + +import std/[options] +import chronos +import chronicles +import confutils +import ../../../codex/conf + +from ../../../codex/codex import CodexServer + +type NodeInfoMsgType* = enum + REPO + +type NodeInfoRequest* = object + operation: NodeInfoMsgType + +proc createShared*( + T: type NodeInfoRequest, op: NodeInfoMsgType, configJson: cstring = "" +): ptr type T = + var ret = createShared(T) + ret[].operation = op + return ret + +proc destroyShared(self: ptr NodeInfoRequest) = + deallocShared(self) + +proc getRepo( + codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + return ok($(codex[].config.dataDir)) + +proc process*( + self: ptr NodeInfoRequest, codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + defer: + destroyShared(self) + + case self.operation + of REPO: + let res = (await getRepo(codex)) + if res.isErr: + error "INFO failed", error = res.error + return err($res.error) + return res + + return ok("") diff --git a/library/libcodex.h b/library/libcodex.h index f8f41fbe..8f46d421 100644 --- a/library/libcodex.h +++ b/library/libcodex.h @@ -41,6 +41,11 @@ int codex_revision( CodexCallback callback, void* userData); +int codex_repo( + void* ctx, + CodexCallback callback, + void* userData); + int codex_start(void* ctx, CodexCallback callback, void* userData); diff --git a/library/libcodex.nim b/library/libcodex.nim index d2cb3c2d..4e34f706 100644 --- a/library/libcodex.nim +++ b/library/libcodex.nim @@ -30,6 +30,7 @@ import chronos import ./codex_context import ./codex_thread_requests/codex_thread_request import ./codex_thread_requests/requests/node_lifecycle_request +import ./codex_thread_requests/requests/node_info_request import ./ffi_types from ../codex/conf import codexVersion @@ -133,12 +134,15 @@ proc codex_repo( ): cint {.dynlib, exportc.} = initializeLibrary() checkLibcodexParams(ctx, callback, userData) - callback( - RET_OK, - cast[ptr cchar]($conf.codexRevision), - cast[csize_t](len($conf.codexRevision)), - userData, - ) + + let reqContent = NodeInfoRequest.createShared(NodeInfoMsgType.REPO) + + codex_context.sendRequestToCodexThread( + ctx, RequestType.INFO, reqContent, callback, userData + ).isOkOr: + let msg = "libcodex error: " & $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR return RET_ACK