Provide better api for progress, fix memory pointer issues and improve logs

This commit is contained in:
Arnaud 2025-09-25 08:38:52 +02:00
parent 278c2660a9
commit e8b76e160a
No known key found for this signature in database
GPG Key ID: B8FBC178F10CA7AE
9 changed files with 345 additions and 169 deletions

View File

@ -124,6 +124,10 @@ package main
return codex_upload_file(codexCtx, sessionId, (CodexCallback) callback, resp); return codex_upload_file(codexCtx, sessionId, (CodexCallback) callback, resp);
} }
static int cGoCodexUploadSubscribe(void* codexCtx, char* sessionId, void* resp) {
return codex_upload_subscribe(codexCtx, sessionId, (CodexCallback) callback, resp);
}
static int cGoCodexStart(void* codexCtx, void* resp) { static int cGoCodexStart(void* codexCtx, void* resp) {
return codex_start(codexCtx, (CodexCallback) callback, resp); return codex_start(codexCtx, (CodexCallback) callback, resp);
} }
@ -255,10 +259,12 @@ type CodexNode struct {
const defaultBlockSize = 1024 * 64 const defaultBlockSize = 1024 * 64
type OnProgressFunc func(read, total int, percent float64)
type CodexUploadOptions struct { type CodexUploadOptions struct {
filepath string filepath string
chunkSize int chunkSize int
onProgress func(read, total int, percent float64) onProgress OnProgressFunc
} }
type bridgeCtx struct { type bridgeCtx struct {
@ -273,22 +279,25 @@ type bridgeCtx struct {
} }
func newBridgeCtx() *bridgeCtx { func newBridgeCtx() *bridgeCtx {
var wg sync.WaitGroup bridge := &bridgeCtx{}
wg.Add(1) bridge.wg = &sync.WaitGroup{}
bridge.wg.Add(1)
bridge := &bridgeCtx{wg: &wg}
bridge.h = cgo.NewHandle(bridge) bridge.h = cgo.NewHandle(bridge)
bridge.resp = C.allocResp(C.uintptr_t(uintptr(bridge.h))) bridge.resp = C.allocResp(C.uintptr_t(uintptr(bridge.h)))
return bridge return bridge
} }
func (b *bridgeCtx) free() { func (b *bridgeCtx) free() {
b.h.Delete() if b.h > 0 {
b.h = 0 b.h.Delete()
b.h = 0
}
C.freeResp(b.resp) if b.resp != nil {
b.resp = nil C.freeResp(b.resp)
b.resp = nil
}
} }
func (b *bridgeCtx) CallError(name string) error { func (b *bridgeCtx) CallError(name string) error {
@ -298,7 +307,12 @@ func (b *bridgeCtx) CallError(name string) error {
func (b *bridgeCtx) wait() (string, error) { func (b *bridgeCtx) wait() (string, error) {
b.wg.Wait() b.wg.Wait()
return b.result, b.err result := b.result
err := b.err
b.free()
return result, err
} }
func getReaderSize(r io.Reader) int64 { func getReaderSize(r io.Reader) int64 {
@ -327,16 +341,6 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
m.msg = msg m.msg = msg
m.len = len m.len = len
if ret == C.RET_PROGRESS {
if m.h != 0 {
h := cgo.Handle(m.h)
if v, ok := h.Value().(*bridgeCtx); ok && v.onProgress != nil {
v.onProgress(int(len))
}
}
return
}
if m.h == 0 { if m.h == 0 {
return return
} }
@ -348,15 +352,22 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
} }
if v, ok := h.Value().(*bridgeCtx); ok { if v, ok := h.Value().(*bridgeCtx); ok {
if ret == C.RET_OK || ret == C.RET_ERR { switch ret {
retMsg := C.GoStringN(msg, C.int(len)) case C.RET_PROGRESS:
if v.onProgress != nil {
if ret == C.RET_OK { v.onProgress(int(C.int(len)))
v.result = retMsg
v.err = nil
} else {
v.err = errors.New(retMsg)
} }
case C.RET_OK:
retMsg := C.GoStringN(msg, C.int(len))
v.result = retMsg
v.err = nil
if v.wg != nil {
v.wg.Done()
}
case C.RET_ERR:
retMsg := C.GoStringN(msg, C.int(len))
v.err = errors.New(retMsg)
if v.wg != nil { if v.wg != nil {
v.wg.Done() v.wg.Done()
@ -367,7 +378,6 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
func CodexNew(config CodexConfig) (*CodexNode, error) { func CodexNew(config CodexConfig) (*CodexNode, error) {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
jsonConfig, err := json.Marshal(config) jsonConfig, err := json.Marshal(config)
if err != nil { if err != nil {
@ -386,9 +396,8 @@ func CodexNew(config CodexConfig) (*CodexNode, error) {
return &CodexNode{ctx: ctx}, bridge.err return &CodexNode{ctx: ctx}, bridge.err
} }
func (self *CodexNode) CodexVersion() (string, error) { func (self CodexNode) CodexVersion() (string, error) {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexVersion(self.ctx, bridge.resp) != C.RET_OK { if C.cGoCodexVersion(self.ctx, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexVersion") return "", bridge.CallError("cGoCodexVersion")
@ -397,9 +406,8 @@ func (self *CodexNode) CodexVersion() (string, error) {
return bridge.wait() return bridge.wait()
} }
func (self *CodexNode) CodexRevision() (string, error) { func (self CodexNode) CodexRevision() (string, error) {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexRevision(self.ctx, bridge.resp) != C.RET_OK { if C.cGoCodexRevision(self.ctx, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexRevision") return "", bridge.CallError("cGoCodexRevision")
@ -408,9 +416,8 @@ func (self *CodexNode) CodexRevision() (string, error) {
return bridge.wait() return bridge.wait()
} }
func (self *CodexNode) CodexRepo() (string, error) { func (self CodexNode) CodexRepo() (string, error) {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexRepo(self.ctx, bridge.resp) != C.RET_OK { if C.cGoCodexRepo(self.ctx, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexRepo") return "", bridge.CallError("cGoCodexRepo")
@ -419,11 +426,10 @@ func (self *CodexNode) CodexRepo() (string, error) {
return bridge.wait() return bridge.wait()
} }
func (self *CodexNode) CodexDebug() (CodexDebugInfo, error) { func (self CodexNode) CodexDebug() (CodexDebugInfo, error) {
var info CodexDebugInfo var info CodexDebugInfo
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexDebug(self.ctx, bridge.resp) != C.RET_OK { if C.cGoCodexDebug(self.ctx, bridge.resp) != C.RET_OK {
return info, bridge.CallError("cGoCodexDebug") return info, bridge.CallError("cGoCodexDebug")
@ -439,9 +445,8 @@ func (self *CodexNode) CodexDebug() (CodexDebugInfo, error) {
return info, err return info, err
} }
func (self *CodexNode) CodexSpr() (string, error) { func (self CodexNode) CodexSpr() (string, error) {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexSpr(self.ctx, bridge.resp) != C.RET_OK { if C.cGoCodexSpr(self.ctx, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexSpr") return "", bridge.CallError("cGoCodexSpr")
@ -450,9 +455,8 @@ func (self *CodexNode) CodexSpr() (string, error) {
return bridge.wait() return bridge.wait()
} }
func (self *CodexNode) CodexPeerId() (string, error) { func (self CodexNode) CodexPeerId() (string, error) {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexPeerId(self.ctx, bridge.resp) != C.RET_OK { if C.cGoCodexPeerId(self.ctx, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexPeerId") return "", bridge.CallError("cGoCodexPeerId")
@ -461,9 +465,8 @@ func (self *CodexNode) CodexPeerId() (string, error) {
return bridge.wait() return bridge.wait()
} }
func (self *CodexNode) CodexLogLevel(logLevel LogLevel) error { func (self CodexNode) CodexLogLevel(logLevel LogLevel) error {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
var cLogLevel = C.CString(fmt.Sprintf("%s", logLevel)) var cLogLevel = C.CString(fmt.Sprintf("%s", logLevel))
defer C.free(unsafe.Pointer(cLogLevel)) defer C.free(unsafe.Pointer(cLogLevel))
@ -476,9 +479,8 @@ func (self *CodexNode) CodexLogLevel(logLevel LogLevel) error {
return err return err
} }
func (self *CodexNode) CodexConnect(peerId string, peerAddresses []string) error { func (self CodexNode) CodexConnect(peerId string, peerAddresses []string) error {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
var cPeerId = C.CString(peerId) var cPeerId = C.CString(peerId)
defer C.free(unsafe.Pointer(cPeerId)) defer C.free(unsafe.Pointer(cPeerId))
@ -503,11 +505,10 @@ func (self *CodexNode) CodexConnect(peerId string, peerAddresses []string) error
return err return err
} }
func (self *CodexNode) CodexPeerDebug(peerId string) (RestPeerRecord, error) { func (self CodexNode) CodexPeerDebug(peerId string) (RestPeerRecord, error) {
var record RestPeerRecord var record RestPeerRecord
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
var cPeerId = C.CString(peerId) var cPeerId = C.CString(peerId)
defer C.free(unsafe.Pointer(cPeerId)) defer C.free(unsafe.Pointer(cPeerId))
@ -526,21 +527,8 @@ func (self *CodexNode) CodexPeerDebug(peerId string) (RestPeerRecord, error) {
return record, err return record, err
} }
func (self *CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, error) { func (self CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, error) {
bridge := newBridgeCtx() bridge := newBridgeCtx()
totalRead := 0
bridge.onProgress = func(bytes int) {
if bytes == -1 {
bridge.free()
} else {
totalRead += bytes
if options.onProgress != nil {
options.onProgress(bytes, totalRead, 0)
}
}
}
var cFilename = C.CString(options.filepath) var cFilename = C.CString(options.filepath)
defer C.free(unsafe.Pointer(cFilename)) defer C.free(unsafe.Pointer(cFilename))
@ -558,9 +546,8 @@ func (self *CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, err
return bridge.wait() return bridge.wait()
} }
func (self *CodexNode) CodexUploadChunk(sessionId string, chunk []byte) error { func (self CodexNode) CodexUploadChunk(sessionId string, chunk []byte) error {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
var cSessionId = C.CString(sessionId) var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId)) defer C.free(unsafe.Pointer(cSessionId))
@ -578,9 +565,8 @@ func (self *CodexNode) CodexUploadChunk(sessionId string, chunk []byte) error {
return err return err
} }
func (self *CodexNode) CodexUploadFinalize(sessionId string) (string, error) { func (self CodexNode) CodexUploadFinalize(sessionId string) (string, error) {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
var cSessionId = C.CString(sessionId) var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId)) defer C.free(unsafe.Pointer(cSessionId))
@ -592,9 +578,8 @@ func (self *CodexNode) CodexUploadFinalize(sessionId string) (string, error) {
return bridge.wait() return bridge.wait()
} }
func (self *CodexNode) CodexUploadCancel(sessionId string) error { func (self CodexNode) CodexUploadCancel(sessionId string) error {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free()
var cSessionId = C.CString(sessionId) var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId)) defer C.free(unsafe.Pointer(cSessionId))
@ -607,28 +592,46 @@ func (self *CodexNode) CodexUploadCancel(sessionId string) error {
return err return err
} }
func (self *CodexNode) CodexUploadReader(options CodexUploadOptions, r io.Reader) (string, error) { func (self CodexNode) CodexUploadReader(options CodexUploadOptions, r io.Reader) (string, error) {
sessionId, err := self.CodexUploadInit(&options)
if err != nil {
return "", err
}
if options.onProgress != nil { if options.onProgress != nil {
size := getReaderSize(r) size := getReaderSize(r)
total := 0
if size > 0 { if size > 0 {
fn := options.onProgress onProgress := func(read int) {
if read == 0 {
return
}
total += read
options.onProgress = func(read, total int, _ float64) {
percent := float64(total) / float64(size) * 100.0 percent := float64(total) / float64(size) * 100.0
// The last block could be a bit over the size due to padding // The last block could be a bit over the size due to padding
// on the chunk size. // on the chunk size.
if percent > 100.0 { if percent > 100.0 {
percent = 100.0 percent = 100.0
} }
fn(read, total, percent)
options.onProgress(read, int(size), percent)
}
if err := self.CodexUploadSubscribe(sessionId, onProgress); err != nil {
if err := self.CodexUploadCancel(sessionId); err != nil {
log.Println("Error cancelling upload after subscribe failure:", err)
}
return "", err
} }
} }
} }
sessionId, err := self.CodexUploadInit(&options) if options.chunkSize == 0 {
if err != nil { options.chunkSize = defaultBlockSize
return "", err
} }
buf := make([]byte, options.chunkSize) buf := make([]byte, options.chunkSize)
@ -649,19 +652,25 @@ func (self *CodexNode) CodexUploadReader(options CodexUploadOptions, r io.Reader
return "", err return "", err
} }
if n == 0 {
break
}
} }
return self.CodexUploadFinalize(sessionId) return self.CodexUploadFinalize(sessionId)
} }
func (self *CodexNode) CodexUploadReaderAsync(options CodexUploadOptions, r io.Reader, onDone func(cid string, err error)) { func (self CodexNode) CodexUploadReaderAsync(options CodexUploadOptions, r io.Reader, onDone func(cid string, err error)) {
go func() { go func() {
cid, err := self.CodexUploadReader(options, r) cid, err := self.CodexUploadReader(options, r)
onDone(cid, err) onDone(cid, err)
}() }()
} }
func (self *CodexNode) CodexUploadFile(options CodexUploadOptions) (string, error) { func (self CodexNode) CodexUploadFile(options CodexUploadOptions) (string, error) {
bridge := newBridgeCtx()
if options.onProgress != nil { if options.onProgress != nil {
stat, err := os.Stat(options.filepath) stat, err := os.Stat(options.filepath)
if err != nil { if err != nil {
@ -669,24 +678,28 @@ func (self *CodexNode) CodexUploadFile(options CodexUploadOptions) (string, erro
} }
size := stat.Size() size := stat.Size()
if size > 0 { total := 0
fn := options.onProgress
if size > 0 {
bridge.onProgress = func(read int) {
if read == 0 {
return
}
total += read
options.onProgress = func(read, total int, _ float64) {
percent := float64(total) / float64(size) * 100.0 percent := float64(total) / float64(size) * 100.0
// The last block could be a bit over the size due to padding // The last block could be a bit over the size due to padding
// on the chunk size. // on the chunk size.
if percent > 100.0 { if percent > 100.0 {
percent = 100.0 percent = 100.0
} }
fn(read, total, percent)
options.onProgress(read, int(size), percent)
} }
} }
} }
bridge := newBridgeCtx()
defer bridge.free()
var cFilePath = C.CString(options.filepath) var cFilePath = C.CString(options.filepath)
defer C.free(unsafe.Pointer(cFilePath)) defer C.free(unsafe.Pointer(cFilePath))
@ -702,18 +715,40 @@ func (self *CodexNode) CodexUploadFile(options CodexUploadOptions) (string, erro
return "", bridge.CallError("cGoCodexUploadFile") return "", bridge.CallError("cGoCodexUploadFile")
} }
cid, err := bridge.wait() return bridge.wait()
return cid, err
} }
func (self *CodexNode) CodexUploadFileAsync(options CodexUploadOptions, onDone func(cid string, err error)) { func (self CodexNode) CodexUploadFileAsync(options CodexUploadOptions, onDone func(cid string, err error)) {
go func() { go func() {
cid, err := self.CodexUploadFile(options) cid, err := self.CodexUploadFile(options)
onDone(cid, err) onDone(cid, err)
}() }()
} }
func (self *CodexNode) CodexStart() error { func (self CodexNode) CodexUploadSubscribe(sessionId string, onProgress func(read int)) error {
bridge := newBridgeCtx()
bridge.onProgress = onProgress
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
log.Println("Subscribing to upload progress...")
if C.cGoCodexUploadSubscribe(self.ctx, cSessionId, bridge.resp) != C.RET_OK {
return bridge.CallError("cGoCodexUploadSubscribe")
}
go func() {
if _, err := bridge.wait(); err != nil {
log.Println("Error in CodexUploadSubscribe:", err)
}
}()
return nil
}
func (self CodexNode) CodexStart() error {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free() defer bridge.free()
@ -725,14 +760,14 @@ func (self *CodexNode) CodexStart() error {
return err return err
} }
func (self *CodexNode) CodexStartAsync(onDone func(error)) { func (self CodexNode) CodexStartAsync(onDone func(error)) {
go func() { go func() {
err := self.CodexStart() err := self.CodexStart()
onDone(err) onDone(err)
}() }()
} }
func (self *CodexNode) CodexStop() error { func (self CodexNode) CodexStop() error {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free() defer bridge.free()
@ -744,7 +779,7 @@ func (self *CodexNode) CodexStop() error {
return err return err
} }
func (self *CodexNode) CodexDestroy() error { func (self CodexNode) CodexDestroy() error {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free() defer bridge.free()
@ -764,11 +799,11 @@ func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData un
self.MyEventCallback(callerRet, msg, len) self.MyEventCallback(callerRet, msg, len)
} }
func (self *CodexNode) MyEventCallback(callerRet C.int, msg *C.char, len C.size_t) { func (self CodexNode) MyEventCallback(callerRet C.int, msg *C.char, len C.size_t) {
log.Println("Event received:", C.GoStringN(msg, C.int(len))) log.Println("Event received:", C.GoStringN(msg, C.int(len)))
} }
func (self *CodexNode) CodexSetEventCallback() { func (self CodexNode) CodexSetEventCallback() {
// Notice that the events for self node are handled by the 'MyEventCallback' method // Notice that the events for self node are handled by the 'MyEventCallback' method
C.cGoCodexSetEventCallback(self.ctx) C.cGoCodexSetEventCallback(self.ctx)
} }
@ -853,7 +888,6 @@ func main() {
if err != nil { if err != nil {
log.Fatal("Error happened:", err.Error()) log.Fatal("Error happened:", err.Error())
} }
log.Println("Codex Upload Init sessionId:", sessionId) log.Println("Codex Upload Init sessionId:", sessionId)
err = node.CodexUploadChunk(sessionId, []byte("Hello ")) err = node.CodexUploadChunk(sessionId, []byte("Hello "))
@ -874,7 +908,9 @@ func main() {
log.Println("Codex Upload Finalized, cid:", cid) log.Println("Codex Upload Finalized, cid:", cid)
buf := bytes.NewBuffer([]byte("Hello World!")) buf := bytes.NewBuffer([]byte("Hello World!"))
cid, err = node.CodexUploadReader(CodexUploadOptions{filepath: "hello.txt"}, buf) cid, err = node.CodexUploadReader(CodexUploadOptions{filepath: "hello.txt", onProgress: func(read, total int, percent float64) {
log.Printf("Uploaded %d bytes, total %d bytes (%.2f%%)\n", read, total, percent)
}}, buf)
if err != nil { if err != nil {
log.Fatal("Error happened:", err.Error()) log.Fatal("Error happened:", err.Error())
} }
@ -888,10 +924,12 @@ func main() {
// Choose a big file to see the progress logs // Choose a big file to see the progress logs
filepath := path.Join(current, "examples", "golang", "hello.txt") filepath := path.Join(current, "examples", "golang", "hello.txt")
// filepath := path.Join(current, "examples", "golang", "discord-0.0.109.deb") //filepath := path.Join(current, "examples", "golang", "discord-0.0.109.deb")
options := CodexUploadOptions{filepath: filepath, onProgress: func(read, total int, percent float64) { options := CodexUploadOptions{filepath: filepath, onProgress: func(read, total int, percent float64) {
log.Printf("Uploaded %d bytes, total %d bytes (%.2f%%)\n", read, total, percent) log.Printf("Uploaded %d bytes, total %d bytes (%.2f%%)\n", read, total, percent)
}} }}
cid, err = node.CodexUploadFile(options) cid, err = node.CodexUploadFile(options)
if err != nil { if err != nil {

View File

@ -18,6 +18,9 @@ import ./codex_thread_requests/[codex_thread_request]
from ../codex/codex import CodexServer from ../codex/codex import CodexServer
logScope:
topics = "codexlib"
type CodexContext* = object type CodexContext* = object
thread: Thread[(ptr CodexContext)] thread: Thread[(ptr CodexContext)]

View File

@ -16,6 +16,9 @@ import ../../../codex/node
from ../../../codex/codex import CodexServer, node from ../../../codex/codex import CodexServer, node
logScope:
topics = "codexlib codexlibdebug"
type NodeDebugMsgType* = enum type NodeDebugMsgType* = enum
DEBUG DEBUG
PEER PEER

View File

@ -11,6 +11,9 @@ import ../../../codex/node
from ../../../codex/codex import CodexServer, config, node from ../../../codex/codex import CodexServer, config, node
logScope:
topics = "codexlib codexlibinfo"
type NodeInfoMsgType* = enum type NodeInfoMsgType* = enum
REPO REPO
SPR SPR

View File

@ -24,6 +24,9 @@ import ../../../codex/units
from ../../../codex/codex import CodexServer, new, start, stop from ../../../codex/codex import CodexServer, new, start, stop
logScope:
topics = "codexlib codexliblifecycle"
type NodeLifecycleMsgType* = enum type NodeLifecycleMsgType* = enum
CREATE_NODE CREATE_NODE
START_NODE START_NODE

View File

@ -12,6 +12,9 @@ import ../../../codex/node
from ../../../codex/codex import CodexServer, node from ../../../codex/codex import CodexServer, node
logScope:
topics = "codexlib codexlibp2p"
type NodeP2PMsgType* = enum type NodeP2PMsgType* = enum
CONNECT CONNECT

View File

@ -10,6 +10,7 @@
## - CHUNK: sends a chunk of data to the upload session. ## - CHUNK: sends a chunk of data to the upload session.
## - FINALIZE: finalizes the upload and returns the CID of the uploaded file. ## - FINALIZE: finalizes the upload and returns the CID of the uploaded file.
## - CANCEL: cancels the upload session. ## - CANCEL: cancels the upload session.
## - SUBSCRIBE: subscribes to progress updates for the upload session.
## ##
## 2. Directly from a file path: the filepath has to be absolute. ## 2. Directly from a file path: the filepath has to be absolute.
## - INIT: creates a new upload session and returns its ID ## - INIT: creates a new upload session and returns its ID
@ -32,12 +33,16 @@ from ../../../codex/codex import CodexServer, node
from ../../../codex/node import store from ../../../codex/node import store
from libp2p import Cid from libp2p import Cid
logScope:
topics = "codexlib codexlibupload"
type NodeUploadMsgType* = enum type NodeUploadMsgType* = enum
INIT INIT
CHUNK CHUNK
FINALIZE FINALIZE
CANCEL CANCEL
FILE FILE
SUBSCRIBE
type OnProgressHandler = type OnProgressHandler =
proc(bytes: int): Future[void] {.gcsafe, async: (raises: [CancelledError]).} proc(bytes: int): Future[void] {.gcsafe, async: (raises: [CancelledError]).}
@ -57,6 +62,8 @@ type
stream: BufferStream stream: BufferStream
fut: Future[?!Cid] fut: Future[?!Cid]
filepath: string filepath: string
chunkSize: int
onProgress: OnProgressHandler
var uploadSessions {.threadvar.}: Table[UploadSessionId, UploadSession] var uploadSessions {.threadvar.}: Table[UploadSessionId, UploadSession]
var nexUploadSessionCount {.threadvar.}: UploadSessionCount var nexUploadSessionCount {.threadvar.}: UploadSessionCount
@ -85,10 +92,7 @@ proc destroyShared(self: ptr NodeUploadRequest) =
deallocShared(self) deallocShared(self)
proc init( proc init(
codex: ptr CodexServer, codex: ptr CodexServer, filepath: cstring = "", chunkSize: csize_t = 0
filepath: cstring = "",
chunkSize: csize_t = 0,
onProgress: OnProgressHandler,
): Future[Result[string, string]] {.async: (raises: []).} = ): Future[Result[string, string]] {.async: (raises: []).} =
## Init a new session upload and return its ID. ## Init a new session upload and return its ID.
## The session contains the future corresponding to the ## The session contains the future corresponding to the
@ -100,18 +104,19 @@ proc init(
## ##
## The chunkSize matches by default the block size used to store the file. ## The chunkSize matches by default the block size used to store the file.
## ##
## An onProgress handler can be provided to get upload progress. ## When a session contains an onProgress handler, it is called
## The handler is called with the size of the block stored in the node ## with the number of bytes received each time a block is stored thanks to
## when a new block is put in the node. ## `onBlockStore` callback.
## After the `node.store` future is completed, whether successfully or not, ## After the `node.store` future is done, the onProgress handler
## the onProgress handler is called with -1 to signal the end of the upload. ## is called one last time with 0 bytes to signal the end of the upload.
## This allows to clean up the cGo states.
var filenameOpt, mimetypeOpt = string.none var filenameOpt, mimetypeOpt = string.none
if isAbsolute($filepath): if isAbsolute($filepath):
if not fileExists($filepath): if not fileExists($filepath):
return err("File does not exist") return err(
"Failed to create an upload session, the filepath does not exist: " & $filepath
)
if filepath != "": if filepath != "":
let (_, name, ext) = splitFile($filepath) let (_, name, ext) = splitFile($filepath)
@ -139,20 +144,32 @@ proc init(
let onBlockStore = proc( let onBlockStore = proc(
chunk: seq[byte] chunk: seq[byte]
): Future[void] {.gcsafe, async: (raises: [CancelledError]).} = ): Future[void] {.gcsafe, async: (raises: [CancelledError]).} =
discard onProgress(chunk.len) try:
let session = uploadSessions[$sessionId]
if session.onProgress != nil:
await session.onProgress(chunk.len)
except KeyError:
error "Failed to push progress update, session is not found: ",
sessionId = $sessionId
let blockSize = let blockSize =
if chunkSize.NBytes > 0.NBytes: chunkSize.NBytes else: DefaultBlockSize if chunkSize.NBytes > 0.NBytes: chunkSize.NBytes else: DefaultBlockSize
let fut = node.store(lpStream, filenameOpt, mimetypeOpt, blockSize, onBlockStore) let fut = node.store(lpStream, filenameOpt, mimetypeOpt, blockSize, onBlockStore)
proc cb(_: pointer) {.raises: [].} = proc cb(_: pointer) {.raises: [].} =
# Signal end of upload try:
discard onProgress(-1) let session = uploadSessions[$sessionId]
if session.onProgress != nil:
discard session.onProgress(0)
except KeyError:
error "Failed to push the progress final state, session is not found.",
sessionId = $sessionId
fut.addCallback(cb) fut.addCallback(cb)
uploadSessions[sessionId] = uploadSessions[sessionId] = UploadSession(
UploadSession(stream: stream, fut: fut, filepath: $filepath) stream: stream, fut: fut, filepath: $filepath, chunkSize: blockSize.int
)
return ok(sessionId) return ok(sessionId)
@ -163,17 +180,17 @@ proc chunk(
## The chunk is pushed to the BufferStream of the session. ## The chunk is pushed to the BufferStream of the session.
if not uploadSessions.contains($sessionId): if not uploadSessions.contains($sessionId):
return err("Invalid session ID") return err("Failed to upload the chunk, the session is not found: " & $sessionId)
try: try:
let session = uploadSessions[$sessionId] let session = uploadSessions[$sessionId]
await session.stream.pushData(chunk) await session.stream.pushData(chunk)
except KeyError as e: except KeyError as e:
return err("Invalid session ID") return err("Failed to upload the chunk, the session is not found: " & $sessionId)
except LPError as e: except LPError as e:
return err("Stream error: " & $e.msg) return err("Failed to upload the chunk, stream error: " & $e.msg)
except CancelledError as e: except CancelledError as e:
return err("Operation cancelled") return err("Failed to upload the chunk, operation cancelled.")
return ok("") return ok("")
@ -189,7 +206,8 @@ proc finalize(
## case of errors). ## case of errors).
if not uploadSessions.contains($sessionId): if not uploadSessions.contains($sessionId):
return err("Invalid session ID") return
err("Failed to finalize the upload session, session not found: " & $sessionId)
var session: UploadSession var session: UploadSession
try: try:
@ -198,17 +216,18 @@ proc finalize(
let res = await session.fut let res = await session.fut
if res.isErr: if res.isErr:
return err("Upload failed: " & res.error().msg) return err("Failed to finalize the upload session: " & res.error().msg)
return ok($res.get()) return ok($res.get())
except KeyError as e: except KeyError as e:
return err("Invalid session ID") return
err("Failed to finalize the upload session, invalid session ID: " & $sessionId)
except LPStreamError as e: except LPStreamError as e:
return err("Stream error: " & $e.msg) return err("Failed to finalize the upload session, stream error: " & $e.msg)
except CancelledError as e: except CancelledError as e:
return err("Operation cancelled") return err("Failed to finalize the upload session, operation cancelled")
except CatchableError as e: except CatchableError as e:
return err("Upload failed: " & $e.msg) return err("Failed to finalize the upload session: " & $e.msg)
finally: finally:
if uploadSessions.contains($sessionId): if uploadSessions.contains($sessionId):
uploadSessions.del($sessionId) uploadSessions.del($sessionId)
@ -224,20 +243,20 @@ proc cancel(
## from the table. ## from the table.
if not uploadSessions.contains($sessionId): if not uploadSessions.contains($sessionId):
return err("Invalid session ID") return err("Failed to cancel the upload session, session not found: " & $sessionId)
try: try:
let session = uploadSessions[$sessionId] let session = uploadSessions[$sessionId]
session.fut.cancelSoon() session.fut.cancelSoon()
except KeyError as e: except KeyError as e:
return err("Invalid session ID") return err("Failed to cancel the upload session, invalid session ID: " & $sessionId)
uploadSessions.del($sessionId) uploadSessions.del($sessionId)
return ok("") return ok("")
proc streamFile( proc streamFile(
filepath: string, stream: BufferStream filepath: string, stream: BufferStream, chunkSize: int
): Future[Result[void, string]] {.async: (raises: [CancelledError]).} = ): Future[Result[void, string]] {.async: (raises: [CancelledError]).} =
## Streams a file from the given filepath using faststream. ## Streams a file from the given filepath using faststream.
## fsMultiSync cannot be used with chronos because of this warning: ## fsMultiSync cannot be used with chronos because of this warning:
@ -252,43 +271,54 @@ proc streamFile(
let inputStreamHandle = filePath.fileInput() let inputStreamHandle = filePath.fileInput()
let inputStream = inputStreamHandle.implicitDeref let inputStream = inputStreamHandle.implicitDeref
var buf = newSeq[byte](chunkSize)
while inputStream.readable: while inputStream.readable:
let byt = inputStream.read let read = inputStream.readIntoEx(buf)
await stream.pushData(@[byt]) if read == 0:
break
await stream.pushData(buf[0 ..< read])
# let byt = inputStream.read
# await stream.pushData(@[byt])
return ok() return ok()
except IOError, OSError, LPStreamError: except IOError, OSError, LPStreamError:
let e = getCurrentException() let e = getCurrentException()
return err("Stream error: " & $e.msg) return err("Failed to stream the file: " & $e.msg)
proc file( proc file(
codex: ptr CodexServer, sessionId: cstring codex: ptr CodexServer, sessionId: cstring, onProgress: OnProgressHandler
): Future[Result[string, string]] {.raises: [], async: (raises: []).} = ): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
## Starts the file upload for the session identified by sessionId. ## Starts the file upload for the session identified by sessionId.
## Will call finalize when done and return the CID of the uploaded file. ## Will call finalize when done and return the CID of the uploaded file.
## In the finally block, the cleanup section removes the session ## In the finally block, the cleanup section removes the session
## from the table and cancels the future if it is not complete (in ## from the table and cancels the future if it is not complete (in
## case of errors). ## case of errors).
##
## If `onProgress` is provided, it is called with the number of bytes
## received each time a block is stored thanks to `onBlockStore` callback.
if not uploadSessions.contains($sessionId): if not uploadSessions.contains($sessionId):
return err("Invalid session ID") return err("Failed to upload the file, invalid session ID: " & $sessionId)
var session: UploadSession var session: UploadSession
try: try:
session = uploadSessions[$sessionId] session = uploadSessions[$sessionId]
let res = await streamFile(session.filepath, session.stream) if onProgress != nil:
uploadSessions[$sessionId].onProgress = onProgress
let res = await streamFile(session.filepath, session.stream, session.chunkSize)
if res.isErr: if res.isErr:
return err("Failed to stream file: " & res.error) return err("Failed to upload the file: " & res.error)
return await codex.finalize(sessionId) return await codex.finalize(sessionId)
except KeyError as e: except KeyError as e:
return err("Invalid session ID") return err("Failed to upload the file, the session is not found: " & $sessionId)
except LPStreamError, IOError: except LPStreamError, IOError:
let e = getCurrentException() let e = getCurrentException()
return err("Stream error: " & $e.msg) return err("Failed to upload the file: " & $e.msg)
except CancelledError as e: except CancelledError as e:
return err("Operation cancelled") return err("Failed to upload the file, the operation is cancelled.")
except CatchableError as e: except CatchableError as e:
return err("Upload failed: " & $e.msg) return err("Failed to upload the file: " & $e.msg)
finally: finally:
if uploadSessions.contains($sessionId): if uploadSessions.contains($sessionId):
uploadSessions.del($sessionId) uploadSessions.del($sessionId)
@ -296,6 +326,46 @@ proc file(
if session.fut != nil and not session.fut.finished(): if session.fut != nil and not session.fut.finished():
session.fut.cancelSoon() session.fut.cancelSoon()
proc subscribe(
codex: ptr CodexServer, sessionId: cstring, onProgress: OnProgressHandler
): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
## Subscribes to progress updates for the upload session identified by sessionId.
## The onProgress handler is called with the number of bytes received
## each time a block is stored thanks to `onBlockStore` callback.
if not uploadSessions.contains($sessionId):
return err(
"Failed to subscribe to the upload session, invalid session ID: " & $sessionId
)
let fut = newFuture[void]()
proc onBlockReceived(bytes: int): Future[void] {.async: (raises: [CancelledError]).} =
try:
let session = uploadSessions[$sessionId]
await onProgress(bytes)
if bytes == 0:
fut.complete()
except KeyError:
fut.cancelSoon()
error "Failed to push progress update, session is not found: ",
sessionId = $sessionId
try:
uploadSessions[$sessionId].onProgress = onBlockReceived
except KeyError:
return err(
"Failed to subscribe to the upload session, session is not found: " & $sessionId
)
try:
await fut
except CatchableError as e:
return err("Failed to subscribe to the upload session: " & $e.msg)
return ok("")
proc process*( proc process*(
self: ptr NodeUploadRequest, codex: ptr CodexServer self: ptr NodeUploadRequest, codex: ptr CodexServer
): Future[Result[string, string]] {.async: (raises: []).} = ): Future[Result[string, string]] {.async: (raises: []).} =
@ -304,7 +374,7 @@ proc process*(
case self.operation case self.operation
of NodeUploadMsgType.INIT: of NodeUploadMsgType.INIT:
let res = (await init(codex, self.filepath, self.chunkSize, self.onProgress)) let res = (await init(codex, self.filepath, self.chunkSize))
if res.isErr: if res.isErr:
error "INIT failed", error = res.error error "INIT failed", error = res.error
return err($res.error) return err($res.error)
@ -328,10 +398,16 @@ proc process*(
return err($res.error) return err($res.error)
return res return res
of NodeUploadMsgType.FILE: of NodeUploadMsgType.FILE:
let res = (await file(codex, self.sessionId)) let res = (await file(codex, self.sessionId, self.onProgress))
if res.isErr: if res.isErr:
error "FILE failed", error = res.error error "FILE failed", error = res.error
return err($res.error) return err($res.error)
return res return res
of NodeUploadMsgType.SUBSCRIBE:
let res = (await subscribe(codex, self.sessionId, self.onProgress))
if res.isErr:
error "SUBSCRIBE failed", error = res.error
return err($res.error)
return res
return ok("") return ok("")

View File

@ -115,6 +115,12 @@ int codex_upload_file(
CodexCallback callback, CodexCallback callback,
void* userData); void* userData);
int codex_upload_subscribe(
void* ctx,
const char* sessionId,
CodexCallback callback,
void* userData);
int codex_start(void* ctx, int codex_start(void* ctx,
CodexCallback callback, CodexCallback callback,
void* userData); void* userData);

View File

@ -38,6 +38,9 @@ import ./ffi_types
from ../codex/conf import codexVersion, updateLogLevel from ../codex/conf import codexVersion, updateLogLevel
logScope:
topics = "codexlib"
template checkLibcodexParams*( template checkLibcodexParams*(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
) = ) =
@ -76,10 +79,6 @@ proc initializeLibrary() {.exported.} =
locals = addr(locals) locals = addr(locals)
nimGC_setStackBottom(locals) nimGC_setStackBottom(locals)
template init(ctx, callback, userData) =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
proc codex_new( proc codex_new(
configJson: cstring, callback: CodexCallback, userData: pointer configJson: cstring, callback: CodexCallback, userData: pointer
): pointer {.dynlib, exported.} = ): pointer {.dynlib, exported.} =
@ -111,7 +110,8 @@ proc codex_new(
proc codex_version( proc codex_version(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} = ): cint {.dynlib, exportc.} =
init(ctx, callback, userData) initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
callback( callback(
RET_OK, RET_OK,
@ -125,7 +125,8 @@ proc codex_version(
proc codex_revision( proc codex_revision(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} = ): cint {.dynlib, exportc.} =
init(ctx, callback, userData) initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
callback( callback(
RET_OK, RET_OK,
@ -139,7 +140,8 @@ proc codex_revision(
proc codex_repo( proc codex_repo(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} = ): cint {.dynlib, exportc.} =
init(ctx, callback, userData) initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent = NodeInfoRequest.createShared(NodeInfoMsgType.REPO) let reqContent = NodeInfoRequest.createShared(NodeInfoMsgType.REPO)
let res = codex_context.sendRequestToCodexThread( let res = codex_context.sendRequestToCodexThread(
@ -151,7 +153,8 @@ proc codex_repo(
proc codex_debug( proc codex_debug(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} = ): cint {.dynlib, exportc.} =
init(ctx, callback, userData) initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent = NodeDebugRequest.createShared(NodeDebugMsgType.DEBUG) let reqContent = NodeDebugRequest.createShared(NodeDebugMsgType.DEBUG)
let res = codex_context.sendRequestToCodexThread( let res = codex_context.sendRequestToCodexThread(
@ -163,7 +166,8 @@ proc codex_debug(
proc codex_spr( proc codex_spr(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} = ): cint {.dynlib, exportc.} =
init(ctx, callback, userData) initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent = NodeInfoRequest.createShared(NodeInfoMsgType.SPR) let reqContent = NodeInfoRequest.createShared(NodeInfoMsgType.SPR)
let res = codex_context.sendRequestToCodexThread( let res = codex_context.sendRequestToCodexThread(
@ -175,7 +179,8 @@ proc codex_spr(
proc codex_peer_id( proc codex_peer_id(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} = ): cint {.dynlib, exportc.} =
init(ctx, callback, userData) initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent = NodeInfoRequest.createShared(NodeInfoMsgType.PEERID) let reqContent = NodeInfoRequest.createShared(NodeInfoMsgType.PEERID)
let res = codex_context.sendRequestToCodexThread( let res = codex_context.sendRequestToCodexThread(
@ -190,7 +195,8 @@ proc codex_peer_id(
proc codex_log_level( proc codex_log_level(
ctx: ptr CodexContext, logLevel: cstring, callback: CodexCallback, userData: pointer ctx: ptr CodexContext, logLevel: cstring, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} = ): cint {.dynlib, exportc.} =
init(ctx, callback, userData) initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
try: try:
updateLogLevel($logLevel) updateLogLevel($logLevel)
@ -207,7 +213,8 @@ proc codex_connect(
callback: CodexCallback, callback: CodexCallback,
userData: pointer, userData: pointer,
): cint {.dynlib, exportc.} = ): cint {.dynlib, exportc.} =
init(ctx, callback, userData) initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
var peerAddresses = newSeq[cstring](peerAddressesLength) var peerAddresses = newSeq[cstring](peerAddressesLength)
let peers = cast[ptr UncheckedArray[cstring]](peerAddressesPtr) let peers = cast[ptr UncheckedArray[cstring]](peerAddressesPtr)
@ -226,7 +233,8 @@ proc codex_connect(
proc codex_peer_debug( proc codex_peer_debug(
ctx: ptr CodexContext, peerId: cstring, callback: CodexCallback, userData: pointer ctx: ptr CodexContext, peerId: cstring, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} = ): cint {.dynlib, exportc.} =
init(ctx, callback, userData) initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent = NodeDebugRequest.createShared(NodeDebugMsgType.PEER, peerId = peerId) let reqContent = NodeDebugRequest.createShared(NodeDebugMsgType.PEER, peerId = peerId)
let res = codex_context.sendRequestToCodexThread( let res = codex_context.sendRequestToCodexThread(
@ -238,7 +246,8 @@ proc codex_peer_debug(
proc codex_destroy( proc codex_destroy(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} = ): cint {.dynlib, exportc.} =
init(ctx, callback, userData) initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let res = codex_context.destroyCodexContext(ctx) let res = codex_context.destroyCodexContext(ctx)
if res.isErr: if res.isErr:
@ -253,19 +262,13 @@ proc codex_upload_init(
callback: CodexCallback, callback: CodexCallback,
userData: pointer, userData: pointer,
): cint {.dynlib, exportc.} = ): cint {.dynlib, exportc.} =
init(ctx, callback, userData) initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let onProgress = proc(
bytes: int
): Future[void] {.gcsafe, async: (raises: [CancelledError]).} =
callback(RET_PROGRESS, nil, bytes.csize_t, userData)
let reqContent = NodeUploadRequest.createShared( let reqContent = NodeUploadRequest.createShared(
NodeUploadMsgType.INIT, NodeUploadMsgType.INIT, filepath = filepath, chunkSize = chunkSize
filepath = filepath,
chunkSize = chunkSize,
onProgress = onProgress,
) )
let res = codex_context.sendRequestToCodexThread( let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.UPLOAD, reqContent, callback, userData ctx, RequestType.UPLOAD, reqContent, callback, userData
) )
@ -280,7 +283,8 @@ proc codex_upload_chunk(
callback: CodexCallback, callback: CodexCallback,
userData: pointer, userData: pointer,
): cint {.dynlib, exportc.} = ): cint {.dynlib, exportc.} =
init(ctx, callback, userData) initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let chunk = newSeq[byte](len) let chunk = newSeq[byte](len)
copyMem(addr chunk[0], data, len) copyMem(addr chunk[0], data, len)
@ -300,7 +304,8 @@ proc codex_upload_finalize(
callback: CodexCallback, callback: CodexCallback,
userData: pointer, userData: pointer,
): cint {.dynlib, exportc.} = ): cint {.dynlib, exportc.} =
init(ctx, callback, userData) initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent = let reqContent =
NodeUploadRequest.createShared(NodeUploadMsgType.FINALIZE, sessionId = sessionId) NodeUploadRequest.createShared(NodeUploadMsgType.FINALIZE, sessionId = sessionId)
@ -316,7 +321,8 @@ proc codex_upload_cancel(
callback: CodexCallback, callback: CodexCallback,
userData: pointer, userData: pointer,
): cint {.dynlib, exportc.} = ): cint {.dynlib, exportc.} =
init(ctx, callback, userData) initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent = let reqContent =
NodeUploadRequest.createShared(NodeUploadMsgType.CANCEL, sessionId = sessionId) NodeUploadRequest.createShared(NodeUploadMsgType.CANCEL, sessionId = sessionId)
@ -333,10 +339,43 @@ proc codex_upload_file(
callback: CodexCallback, callback: CodexCallback,
userData: pointer, userData: pointer,
): cint {.dynlib, exportc.} = ): cint {.dynlib, exportc.} =
init(ctx, callback, userData) initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent = let onProgress = proc(
NodeUploadRequest.createShared(NodeUploadMsgType.FILE, sessionId = sessionId) bytes: int
): Future[void] {.async: (raises: [CancelledError]).} =
if userData != nil:
callback(RET_PROGRESS, nil, cast[csize_t](bytes), userData)
let reqContent = NodeUploadRequest.createShared(
NodeUploadMsgType.FILE, sessionId = sessionId, onProgress = onProgress
)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.UPLOAD, reqContent, callback, userData
)
return callback.okOrError(res, userData)
proc codex_upload_subscribe(
ctx: ptr CodexContext,
sessionId: cstring,
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let onProgress = proc(
bytes: int
): Future[void] {.async: (raises: [CancelledError]).} =
if userData != nil:
callback(RET_PROGRESS, nil, cast[csize_t](bytes), userData)
let reqContent = NodeUploadRequest.createShared(
NodeUploadMsgType.SUBSCRIBE, sessionId = sessionId, onProgress = onProgress
)
let res = codex_context.sendRequestToCodexThread( let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.UPLOAD, reqContent, callback, userData ctx, RequestType.UPLOAD, reqContent, callback, userData
@ -347,7 +386,8 @@ proc codex_upload_file(
proc codex_start( proc codex_start(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} = ): cint {.dynlib, exportc.} =
init(ctx, callback, userData) initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent: ptr NodeLifecycleRequest = let reqContent: ptr NodeLifecycleRequest =
NodeLifecycleRequest.createShared(NodeLifecycleMsgType.START_NODE) NodeLifecycleRequest.createShared(NodeLifecycleMsgType.START_NODE)
@ -360,7 +400,8 @@ proc codex_start(
proc codex_stop( proc codex_stop(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} = ): cint {.dynlib, exportc.} =
init(ctx, callback, userData) initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent: ptr NodeLifecycleRequest = let reqContent: ptr NodeLifecycleRequest =
NodeLifecycleRequest.createShared(NodeLifecycleMsgType.STOP_NODE) NodeLifecycleRequest.createShared(NodeLifecycleMsgType.STOP_NODE)