mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-03 22:13:12 +00:00
Add repo and fix pointer freeing memory
This commit is contained in:
parent
835c0c2ab2
commit
1523b49603
@ -51,7 +51,7 @@ logScope:
|
||||
|
||||
type
|
||||
CodexServer* = ref object
|
||||
config: CodexConf
|
||||
config*: CodexConf
|
||||
restServer: RestServerRef
|
||||
codexNode: CodexNodeRef
|
||||
repoStore: RepoStore
|
||||
|
||||
@ -89,6 +89,10 @@ package main
|
||||
CODEX_CALL(codex_revision(codexCtx, (CodexCallback) callback, resp));
|
||||
}
|
||||
|
||||
static void cGoCodexRepo(void* codexCtx, void* resp) {
|
||||
CODEX_CALL(codex_repo(codexCtx, (CodexCallback) callback, resp));
|
||||
}
|
||||
|
||||
static void cGoCodexStart(void* codexCtx, void* resp) {
|
||||
CODEX_CALL(codex_start(codexCtx, (CodexCallback) callback, resp));
|
||||
}
|
||||
@ -209,10 +213,8 @@ func newBridgeCtx() *bridgeCtx {
|
||||
}
|
||||
|
||||
func (b *bridgeCtx) free() {
|
||||
if b.resp != nil {
|
||||
C.freeResp(b.resp)
|
||||
b.resp = nil
|
||||
}
|
||||
C.freeResp(b.resp)
|
||||
b.resp = nil
|
||||
}
|
||||
|
||||
func (b *bridgeCtx) isACK() bool {
|
||||
@ -228,20 +230,39 @@ func (b *bridgeCtx) isError() bool {
|
||||
}
|
||||
|
||||
// TODO: Check the error here after the wait
|
||||
func (b *bridgeCtx) wait() {
|
||||
func (b *bridgeCtx) wait() error {
|
||||
b.wg.Wait()
|
||||
return b.err
|
||||
}
|
||||
|
||||
func (b *bridgeCtx) getMsg() string {
|
||||
return C.GoStringN(C.getMyCharPtr(b.resp), C.int(C.getMyCharLen(b.resp)))
|
||||
}
|
||||
|
||||
func (b *bridgeCtx) StringResult() (string, error) {
|
||||
if b.isACK() {
|
||||
if b.wait() != nil {
|
||||
return "", b.err
|
||||
}
|
||||
|
||||
return b.result, b.err
|
||||
}
|
||||
|
||||
if b.isOK() {
|
||||
return b.getMsg(), nil
|
||||
}
|
||||
|
||||
return "", errors.New(b.getMsg())
|
||||
}
|
||||
|
||||
//export callback
|
||||
func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
|
||||
if resp == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// log.Println("Callback called with ret:", ret, " msg:", C.GoStringN(msg, C.int(len)), " len:", len)
|
||||
|
||||
m := (*C.Resp)(resp)
|
||||
m.ret = ret
|
||||
m.msg = msg
|
||||
@ -249,20 +270,27 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
|
||||
|
||||
if m.h != 0 {
|
||||
h := cgo.Handle(m.h)
|
||||
|
||||
if h == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
if v, ok := h.Value().(*bridgeCtx); ok {
|
||||
if ret == C.RET_OK || ret == C.RET_ERR {
|
||||
msg := C.GoStringN(msg, C.int(len))
|
||||
retMsg := C.GoStringN(msg, C.int(len))
|
||||
|
||||
if ret == C.RET_OK {
|
||||
v.result = msg
|
||||
v.result = retMsg
|
||||
} else {
|
||||
v.err = errors.New(msg)
|
||||
v.err = errors.New(retMsg)
|
||||
}
|
||||
|
||||
h.Delete()
|
||||
m.h = 0
|
||||
|
||||
if v.wg != nil {
|
||||
v.wg.Done()
|
||||
v = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -284,7 +312,10 @@ func CodexNew(config CodexConfig) (*CodexNode, error) {
|
||||
ctx := C.cGoCodexNew(cJsonConfig, bridge.resp)
|
||||
|
||||
if bridge.isACK() {
|
||||
bridge.wait()
|
||||
if bridge.wait() != nil {
|
||||
return nil, bridge.err
|
||||
}
|
||||
|
||||
return &CodexNode{ctx: ctx}, bridge.err
|
||||
}
|
||||
|
||||
@ -301,16 +332,7 @@ func (self *CodexNode) CodexVersion() (string, error) {
|
||||
|
||||
C.cGoCodexVersion(self.ctx, bridge.resp)
|
||||
|
||||
if bridge.isACK() {
|
||||
bridge.wait()
|
||||
return bridge.result, bridge.err
|
||||
}
|
||||
|
||||
if bridge.isOK() {
|
||||
return bridge.getMsg(), nil
|
||||
}
|
||||
|
||||
return "", errors.New(bridge.getMsg())
|
||||
return bridge.StringResult()
|
||||
}
|
||||
|
||||
func (self *CodexNode) CodexRevision() (string, error) {
|
||||
@ -319,16 +341,16 @@ func (self *CodexNode) CodexRevision() (string, error) {
|
||||
|
||||
C.cGoCodexRevision(self.ctx, bridge.resp)
|
||||
|
||||
if bridge.isACK() {
|
||||
bridge.wait()
|
||||
return bridge.result, bridge.err
|
||||
}
|
||||
return bridge.StringResult()
|
||||
}
|
||||
|
||||
if bridge.isOK() {
|
||||
return bridge.result, nil
|
||||
}
|
||||
func (self *CodexNode) CodexRepo() (string, error) {
|
||||
bridge := newBridgeCtx()
|
||||
defer bridge.free()
|
||||
|
||||
return "", errors.New(bridge.getMsg())
|
||||
C.cGoCodexRepo(self.ctx, bridge.resp)
|
||||
|
||||
return bridge.StringResult()
|
||||
}
|
||||
|
||||
// CodexStart returns the bridgeCtx to allow the caller
|
||||
@ -340,7 +362,6 @@ func (self *CodexNode) CodexStart() (*bridgeCtx, error) {
|
||||
C.cGoCodexStart(self.ctx, bridge.resp)
|
||||
|
||||
if bridge.isError() {
|
||||
bridge.free()
|
||||
return nil, errors.New(bridge.getMsg())
|
||||
}
|
||||
|
||||
@ -352,11 +373,11 @@ func (self *CodexNode) CodexStart() (*bridgeCtx, error) {
|
||||
// TODO: be consistent and do not free the bridgeCtx here
|
||||
func (self *CodexNode) CodexStop() (*bridgeCtx, error) {
|
||||
bridge := newBridgeCtx()
|
||||
defer bridge.free()
|
||||
|
||||
C.cGoCodexStop(self.ctx, bridge.resp)
|
||||
|
||||
if bridge.isError() {
|
||||
bridge.free()
|
||||
return nil, errors.New(bridge.getMsg())
|
||||
}
|
||||
|
||||
@ -370,11 +391,13 @@ func (self *CodexNode) CodexDestroy() error {
|
||||
C.cGoCodexDestroy(self.ctx, bridge.resp)
|
||||
|
||||
if bridge.isACK() {
|
||||
bridge.wait()
|
||||
return bridge.err
|
||||
err := bridge.wait()
|
||||
self.ctx = nil
|
||||
return err
|
||||
}
|
||||
|
||||
if bridge.isOK() {
|
||||
self.ctx = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -411,7 +434,11 @@ func main() {
|
||||
return
|
||||
}
|
||||
|
||||
node.CodexSetEventCallback()
|
||||
log.Println("Codex created.")
|
||||
|
||||
// node.CodexSetEventCallback()
|
||||
|
||||
log.Println("Getting version...")
|
||||
|
||||
version, err := node.CodexVersion()
|
||||
if err != nil {
|
||||
@ -421,6 +448,8 @@ func main() {
|
||||
|
||||
log.Println("Codex version:", version)
|
||||
|
||||
log.Println("Getting revision...")
|
||||
|
||||
revision, err := node.CodexRevision()
|
||||
if err != nil {
|
||||
fmt.Println("Error happened:", err.Error())
|
||||
@ -429,6 +458,16 @@ func main() {
|
||||
|
||||
log.Println("Codex revision:", revision)
|
||||
|
||||
log.Println("Getting repo...")
|
||||
|
||||
repo, err := node.CodexRepo()
|
||||
if err != nil {
|
||||
fmt.Println("Error happened:", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
log.Println("Codex repo:", repo)
|
||||
|
||||
log.Println("Starting Codex...")
|
||||
|
||||
bridge, err := node.CodexStart()
|
||||
@ -438,11 +477,10 @@ func main() {
|
||||
return
|
||||
}
|
||||
|
||||
bridge.wait()
|
||||
defer bridge.free()
|
||||
|
||||
if bridge.err != nil {
|
||||
fmt.Println("Error happened:", err.Error())
|
||||
if err := bridge.wait(); err != nil {
|
||||
fmt.Println("Error happened:", err)
|
||||
return
|
||||
}
|
||||
|
||||
@ -462,13 +500,12 @@ func main() {
|
||||
return
|
||||
}
|
||||
|
||||
bridge.wait()
|
||||
defer bridge.free()
|
||||
|
||||
log.Println("Codex stopped...")
|
||||
|
||||
if bridge.err != nil {
|
||||
fmt.Println("Error happened:", err.Error())
|
||||
if err := bridge.wait(); err != nil {
|
||||
fmt.Println("Error happened:", err)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@ -7,11 +7,13 @@ import results
|
||||
import chronos
|
||||
import ../ffi_types
|
||||
import ./requests/node_lifecycle_request
|
||||
import ./requests/node_info_request
|
||||
|
||||
from ../../codex/codex import CodexServer
|
||||
|
||||
type RequestType* {.pure.} = enum
|
||||
LIFECYCLE
|
||||
INFO
|
||||
|
||||
type CodexThreadRequest* = object
|
||||
reqType: RequestType
|
||||
@ -80,6 +82,8 @@ proc process*(
|
||||
case request[].reqType
|
||||
of LIFECYCLE:
|
||||
cast[ptr NodeLifecycleRequest](request[].reqContent).process(codex)
|
||||
of INFO:
|
||||
cast[ptr NodeInfoRequest](request[].reqContent).process(codex)
|
||||
|
||||
handleRes(await retFut, request)
|
||||
|
||||
|
||||
46
library/codex_thread_requests/requests/node_info_request.nim
Normal file
46
library/codex_thread_requests/requests/node_info_request.nim
Normal file
@ -0,0 +1,46 @@
|
||||
## This file contains the lifecycle request type that will be handled.
|
||||
|
||||
import std/[options]
|
||||
import chronos
|
||||
import chronicles
|
||||
import confutils
|
||||
import ../../../codex/conf
|
||||
|
||||
from ../../../codex/codex import CodexServer
|
||||
|
||||
type NodeInfoMsgType* = enum
|
||||
REPO
|
||||
|
||||
type NodeInfoRequest* = object
|
||||
operation: NodeInfoMsgType
|
||||
|
||||
proc createShared*(
|
||||
T: type NodeInfoRequest, op: NodeInfoMsgType, configJson: cstring = ""
|
||||
): ptr type T =
|
||||
var ret = createShared(T)
|
||||
ret[].operation = op
|
||||
return ret
|
||||
|
||||
proc destroyShared(self: ptr NodeInfoRequest) =
|
||||
deallocShared(self)
|
||||
|
||||
proc getRepo(
|
||||
codex: ptr CodexServer
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
return ok($(codex[].config.dataDir))
|
||||
|
||||
proc process*(
|
||||
self: ptr NodeInfoRequest, codex: ptr CodexServer
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
defer:
|
||||
destroyShared(self)
|
||||
|
||||
case self.operation
|
||||
of REPO:
|
||||
let res = (await getRepo(codex))
|
||||
if res.isErr:
|
||||
error "INFO failed", error = res.error
|
||||
return err($res.error)
|
||||
return res
|
||||
|
||||
return ok("")
|
||||
@ -41,6 +41,11 @@ int codex_revision(
|
||||
CodexCallback callback,
|
||||
void* userData);
|
||||
|
||||
int codex_repo(
|
||||
void* ctx,
|
||||
CodexCallback callback,
|
||||
void* userData);
|
||||
|
||||
int codex_start(void* ctx,
|
||||
CodexCallback callback,
|
||||
void* userData);
|
||||
|
||||
@ -30,6 +30,7 @@ import chronos
|
||||
import ./codex_context
|
||||
import ./codex_thread_requests/codex_thread_request
|
||||
import ./codex_thread_requests/requests/node_lifecycle_request
|
||||
import ./codex_thread_requests/requests/node_info_request
|
||||
import ./ffi_types
|
||||
|
||||
from ../codex/conf import codexVersion
|
||||
@ -133,12 +134,15 @@ proc codex_repo(
|
||||
): cint {.dynlib, exportc.} =
|
||||
initializeLibrary()
|
||||
checkLibcodexParams(ctx, callback, userData)
|
||||
callback(
|
||||
RET_OK,
|
||||
cast[ptr cchar]($conf.codexRevision),
|
||||
cast[csize_t](len($conf.codexRevision)),
|
||||
userData,
|
||||
)
|
||||
|
||||
let reqContent = NodeInfoRequest.createShared(NodeInfoMsgType.REPO)
|
||||
|
||||
codex_context.sendRequestToCodexThread(
|
||||
ctx, RequestType.INFO, reqContent, callback, userData
|
||||
).isOkOr:
|
||||
let msg = "libcodex error: " & $error
|
||||
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||
return RET_ERR
|
||||
|
||||
return RET_ACK
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user