Provide better api for progress, fix memory pointer issues and improve logs

This commit is contained in:
Arnaud 2025-09-25 08:38:52 +02:00 committed by Eric
parent 42a097aa37
commit ef291873ab
No known key found for this signature in database
9 changed files with 345 additions and 169 deletions

View File

@ -124,6 +124,10 @@ package main
return codex_upload_file(codexCtx, sessionId, (CodexCallback) callback, resp);
}
static int cGoCodexUploadSubscribe(void* codexCtx, char* sessionId, void* resp) {
return codex_upload_subscribe(codexCtx, sessionId, (CodexCallback) callback, resp);
}
static int cGoCodexStart(void* codexCtx, void* resp) {
return codex_start(codexCtx, (CodexCallback) callback, resp);
}
@ -255,10 +259,12 @@ type CodexNode struct {
const defaultBlockSize = 1024 * 64
type OnProgressFunc func(read, total int, percent float64)
type CodexUploadOptions struct {
filepath string
chunkSize int
onProgress func(read, total int, percent float64)
onProgress OnProgressFunc
}
type bridgeCtx struct {
@ -273,22 +279,25 @@ type bridgeCtx struct {
}
func newBridgeCtx() *bridgeCtx {
var wg sync.WaitGroup
wg.Add(1)
bridge := &bridgeCtx{}
bridge.wg = &sync.WaitGroup{}
bridge.wg.Add(1)
bridge := &bridgeCtx{wg: &wg}
bridge.h = cgo.NewHandle(bridge)
bridge.resp = C.allocResp(C.uintptr_t(uintptr(bridge.h)))
return bridge
}
func (b *bridgeCtx) free() {
b.h.Delete()
b.h = 0
if b.h > 0 {
b.h.Delete()
b.h = 0
}
C.freeResp(b.resp)
b.resp = nil
if b.resp != nil {
C.freeResp(b.resp)
b.resp = nil
}
}
func (b *bridgeCtx) CallError(name string) error {
@ -298,7 +307,12 @@ func (b *bridgeCtx) CallError(name string) error {
func (b *bridgeCtx) wait() (string, error) {
b.wg.Wait()
return b.result, b.err
result := b.result
err := b.err
b.free()
return result, err
}
func getReaderSize(r io.Reader) int64 {
@ -327,16 +341,6 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
m.msg = msg
m.len = len
if ret == C.RET_PROGRESS {
if m.h != 0 {
h := cgo.Handle(m.h)
if v, ok := h.Value().(*bridgeCtx); ok && v.onProgress != nil {
v.onProgress(int(len))
}
}
return
}
if m.h == 0 {
return
}
@ -348,15 +352,22 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
}
if v, ok := h.Value().(*bridgeCtx); ok {
if ret == C.RET_OK || ret == C.RET_ERR {
retMsg := C.GoStringN(msg, C.int(len))
if ret == C.RET_OK {
v.result = retMsg
v.err = nil
} else {
v.err = errors.New(retMsg)
switch ret {
case C.RET_PROGRESS:
if v.onProgress != nil {
v.onProgress(int(C.int(len)))
}
case C.RET_OK:
retMsg := C.GoStringN(msg, C.int(len))
v.result = retMsg
v.err = nil
if v.wg != nil {
v.wg.Done()
}
case C.RET_ERR:
retMsg := C.GoStringN(msg, C.int(len))
v.err = errors.New(retMsg)
if v.wg != nil {
v.wg.Done()
@ -367,7 +378,6 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
func CodexNew(config CodexConfig) (*CodexNode, error) {
bridge := newBridgeCtx()
defer bridge.free()
jsonConfig, err := json.Marshal(config)
if err != nil {
@ -386,9 +396,8 @@ func CodexNew(config CodexConfig) (*CodexNode, error) {
return &CodexNode{ctx: ctx}, bridge.err
}
func (self *CodexNode) CodexVersion() (string, error) {
func (self CodexNode) CodexVersion() (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexVersion(self.ctx, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexVersion")
@ -397,9 +406,8 @@ func (self *CodexNode) CodexVersion() (string, error) {
return bridge.wait()
}
func (self *CodexNode) CodexRevision() (string, error) {
func (self CodexNode) CodexRevision() (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexRevision(self.ctx, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexRevision")
@ -408,9 +416,8 @@ func (self *CodexNode) CodexRevision() (string, error) {
return bridge.wait()
}
func (self *CodexNode) CodexRepo() (string, error) {
func (self CodexNode) CodexRepo() (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexRepo(self.ctx, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexRepo")
@ -419,11 +426,10 @@ func (self *CodexNode) CodexRepo() (string, error) {
return bridge.wait()
}
func (self *CodexNode) CodexDebug() (CodexDebugInfo, error) {
func (self CodexNode) CodexDebug() (CodexDebugInfo, error) {
var info CodexDebugInfo
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexDebug(self.ctx, bridge.resp) != C.RET_OK {
return info, bridge.CallError("cGoCodexDebug")
@ -439,9 +445,8 @@ func (self *CodexNode) CodexDebug() (CodexDebugInfo, error) {
return info, err
}
func (self *CodexNode) CodexSpr() (string, error) {
func (self CodexNode) CodexSpr() (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexSpr(self.ctx, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexSpr")
@ -450,9 +455,8 @@ func (self *CodexNode) CodexSpr() (string, error) {
return bridge.wait()
}
func (self *CodexNode) CodexPeerId() (string, error) {
func (self CodexNode) CodexPeerId() (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexPeerId(self.ctx, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexPeerId")
@ -461,9 +465,8 @@ func (self *CodexNode) CodexPeerId() (string, error) {
return bridge.wait()
}
func (self *CodexNode) CodexLogLevel(logLevel LogLevel) error {
func (self CodexNode) CodexLogLevel(logLevel LogLevel) error {
bridge := newBridgeCtx()
defer bridge.free()
var cLogLevel = C.CString(fmt.Sprintf("%s", logLevel))
defer C.free(unsafe.Pointer(cLogLevel))
@ -476,9 +479,8 @@ func (self *CodexNode) CodexLogLevel(logLevel LogLevel) error {
return err
}
func (self *CodexNode) CodexConnect(peerId string, peerAddresses []string) error {
func (self CodexNode) CodexConnect(peerId string, peerAddresses []string) error {
bridge := newBridgeCtx()
defer bridge.free()
var cPeerId = C.CString(peerId)
defer C.free(unsafe.Pointer(cPeerId))
@ -503,11 +505,10 @@ func (self *CodexNode) CodexConnect(peerId string, peerAddresses []string) error
return err
}
func (self *CodexNode) CodexPeerDebug(peerId string) (RestPeerRecord, error) {
func (self CodexNode) CodexPeerDebug(peerId string) (RestPeerRecord, error) {
var record RestPeerRecord
bridge := newBridgeCtx()
defer bridge.free()
var cPeerId = C.CString(peerId)
defer C.free(unsafe.Pointer(cPeerId))
@ -526,21 +527,8 @@ func (self *CodexNode) CodexPeerDebug(peerId string) (RestPeerRecord, error) {
return record, err
}
func (self *CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, error) {
func (self CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, error) {
bridge := newBridgeCtx()
totalRead := 0
bridge.onProgress = func(bytes int) {
if bytes == -1 {
bridge.free()
} else {
totalRead += bytes
if options.onProgress != nil {
options.onProgress(bytes, totalRead, 0)
}
}
}
var cFilename = C.CString(options.filepath)
defer C.free(unsafe.Pointer(cFilename))
@ -558,9 +546,8 @@ func (self *CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, err
return bridge.wait()
}
func (self *CodexNode) CodexUploadChunk(sessionId string, chunk []byte) error {
func (self CodexNode) CodexUploadChunk(sessionId string, chunk []byte) error {
bridge := newBridgeCtx()
defer bridge.free()
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
@ -578,9 +565,8 @@ func (self *CodexNode) CodexUploadChunk(sessionId string, chunk []byte) error {
return err
}
func (self *CodexNode) CodexUploadFinalize(sessionId string) (string, error) {
func (self CodexNode) CodexUploadFinalize(sessionId string) (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
@ -592,9 +578,8 @@ func (self *CodexNode) CodexUploadFinalize(sessionId string) (string, error) {
return bridge.wait()
}
func (self *CodexNode) CodexUploadCancel(sessionId string) error {
func (self CodexNode) CodexUploadCancel(sessionId string) error {
bridge := newBridgeCtx()
defer bridge.free()
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
@ -607,28 +592,46 @@ func (self *CodexNode) CodexUploadCancel(sessionId string) error {
return err
}
func (self *CodexNode) CodexUploadReader(options CodexUploadOptions, r io.Reader) (string, error) {
func (self CodexNode) CodexUploadReader(options CodexUploadOptions, r io.Reader) (string, error) {
sessionId, err := self.CodexUploadInit(&options)
if err != nil {
return "", err
}
if options.onProgress != nil {
size := getReaderSize(r)
total := 0
if size > 0 {
fn := options.onProgress
onProgress := func(read int) {
if read == 0 {
return
}
total += read
options.onProgress = func(read, total int, _ float64) {
percent := float64(total) / float64(size) * 100.0
// The last block could be a bit over the size due to padding
// on the chunk size.
if percent > 100.0 {
percent = 100.0
}
fn(read, total, percent)
options.onProgress(read, int(size), percent)
}
if err := self.CodexUploadSubscribe(sessionId, onProgress); err != nil {
if err := self.CodexUploadCancel(sessionId); err != nil {
log.Println("Error cancelling upload after subscribe failure:", err)
}
return "", err
}
}
}
sessionId, err := self.CodexUploadInit(&options)
if err != nil {
return "", err
if options.chunkSize == 0 {
options.chunkSize = defaultBlockSize
}
buf := make([]byte, options.chunkSize)
@ -649,19 +652,25 @@ func (self *CodexNode) CodexUploadReader(options CodexUploadOptions, r io.Reader
return "", err
}
if n == 0 {
break
}
}
return self.CodexUploadFinalize(sessionId)
}
func (self *CodexNode) CodexUploadReaderAsync(options CodexUploadOptions, r io.Reader, onDone func(cid string, err error)) {
func (self CodexNode) CodexUploadReaderAsync(options CodexUploadOptions, r io.Reader, onDone func(cid string, err error)) {
go func() {
cid, err := self.CodexUploadReader(options, r)
onDone(cid, err)
}()
}
func (self *CodexNode) CodexUploadFile(options CodexUploadOptions) (string, error) {
func (self CodexNode) CodexUploadFile(options CodexUploadOptions) (string, error) {
bridge := newBridgeCtx()
if options.onProgress != nil {
stat, err := os.Stat(options.filepath)
if err != nil {
@ -669,24 +678,28 @@ func (self *CodexNode) CodexUploadFile(options CodexUploadOptions) (string, erro
}
size := stat.Size()
if size > 0 {
fn := options.onProgress
total := 0
if size > 0 {
bridge.onProgress = func(read int) {
if read == 0 {
return
}
total += read
options.onProgress = func(read, total int, _ float64) {
percent := float64(total) / float64(size) * 100.0
// The last block could be a bit over the size due to padding
// on the chunk size.
if percent > 100.0 {
percent = 100.0
}
fn(read, total, percent)
options.onProgress(read, int(size), percent)
}
}
}
bridge := newBridgeCtx()
defer bridge.free()
var cFilePath = C.CString(options.filepath)
defer C.free(unsafe.Pointer(cFilePath))
@ -702,18 +715,40 @@ func (self *CodexNode) CodexUploadFile(options CodexUploadOptions) (string, erro
return "", bridge.CallError("cGoCodexUploadFile")
}
cid, err := bridge.wait()
return cid, err
return bridge.wait()
}
func (self *CodexNode) CodexUploadFileAsync(options CodexUploadOptions, onDone func(cid string, err error)) {
func (self CodexNode) CodexUploadFileAsync(options CodexUploadOptions, onDone func(cid string, err error)) {
go func() {
cid, err := self.CodexUploadFile(options)
onDone(cid, err)
}()
}
func (self *CodexNode) CodexStart() error {
func (self CodexNode) CodexUploadSubscribe(sessionId string, onProgress func(read int)) error {
bridge := newBridgeCtx()
bridge.onProgress = onProgress
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
log.Println("Subscribing to upload progress...")
if C.cGoCodexUploadSubscribe(self.ctx, cSessionId, bridge.resp) != C.RET_OK {
return bridge.CallError("cGoCodexUploadSubscribe")
}
go func() {
if _, err := bridge.wait(); err != nil {
log.Println("Error in CodexUploadSubscribe:", err)
}
}()
return nil
}
func (self CodexNode) CodexStart() error {
bridge := newBridgeCtx()
defer bridge.free()
@ -725,14 +760,14 @@ func (self *CodexNode) CodexStart() error {
return err
}
func (self *CodexNode) CodexStartAsync(onDone func(error)) {
func (self CodexNode) CodexStartAsync(onDone func(error)) {
go func() {
err := self.CodexStart()
onDone(err)
}()
}
func (self *CodexNode) CodexStop() error {
func (self CodexNode) CodexStop() error {
bridge := newBridgeCtx()
defer bridge.free()
@ -744,7 +779,7 @@ func (self *CodexNode) CodexStop() error {
return err
}
func (self *CodexNode) CodexDestroy() error {
func (self CodexNode) CodexDestroy() error {
bridge := newBridgeCtx()
defer bridge.free()
@ -764,11 +799,11 @@ func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData un
self.MyEventCallback(callerRet, msg, len)
}
func (self *CodexNode) MyEventCallback(callerRet C.int, msg *C.char, len C.size_t) {
func (self CodexNode) MyEventCallback(callerRet C.int, msg *C.char, len C.size_t) {
log.Println("Event received:", C.GoStringN(msg, C.int(len)))
}
func (self *CodexNode) CodexSetEventCallback() {
func (self CodexNode) CodexSetEventCallback() {
// Notice that the events for self node are handled by the 'MyEventCallback' method
C.cGoCodexSetEventCallback(self.ctx)
}
@ -853,7 +888,6 @@ func main() {
if err != nil {
log.Fatal("Error happened:", err.Error())
}
log.Println("Codex Upload Init sessionId:", sessionId)
err = node.CodexUploadChunk(sessionId, []byte("Hello "))
@ -874,7 +908,9 @@ func main() {
log.Println("Codex Upload Finalized, cid:", cid)
buf := bytes.NewBuffer([]byte("Hello World!"))
cid, err = node.CodexUploadReader(CodexUploadOptions{filepath: "hello.txt"}, buf)
cid, err = node.CodexUploadReader(CodexUploadOptions{filepath: "hello.txt", onProgress: func(read, total int, percent float64) {
log.Printf("Uploaded %d bytes, total %d bytes (%.2f%%)\n", read, total, percent)
}}, buf)
if err != nil {
log.Fatal("Error happened:", err.Error())
}
@ -888,10 +924,12 @@ func main() {
// Choose a big file to see the progress logs
filepath := path.Join(current, "examples", "golang", "hello.txt")
// filepath := path.Join(current, "examples", "golang", "discord-0.0.109.deb")
//filepath := path.Join(current, "examples", "golang", "discord-0.0.109.deb")
options := CodexUploadOptions{filepath: filepath, onProgress: func(read, total int, percent float64) {
log.Printf("Uploaded %d bytes, total %d bytes (%.2f%%)\n", read, total, percent)
}}
cid, err = node.CodexUploadFile(options)
if err != nil {

View File

@ -18,6 +18,9 @@ import ./codex_thread_requests/[codex_thread_request]
from ../codex/codex import CodexServer
logScope:
topics = "codexlib"
type CodexContext* = object
thread: Thread[(ptr CodexContext)]

View File

@ -16,6 +16,9 @@ import ../../../codex/node
from ../../../codex/codex import CodexServer, node
logScope:
topics = "codexlib codexlibdebug"
type NodeDebugMsgType* = enum
DEBUG
PEER

View File

@ -11,6 +11,9 @@ import ../../../codex/node
from ../../../codex/codex import CodexServer, config, node
logScope:
topics = "codexlib codexlibinfo"
type NodeInfoMsgType* = enum
REPO
SPR

View File

@ -24,6 +24,9 @@ import ../../../codex/units
from ../../../codex/codex import CodexServer, new, start, stop
logScope:
topics = "codexlib codexliblifecycle"
type NodeLifecycleMsgType* = enum
CREATE_NODE
START_NODE

View File

@ -12,6 +12,9 @@ import ../../../codex/node
from ../../../codex/codex import CodexServer, node
logScope:
topics = "codexlib codexlibp2p"
type NodeP2PMsgType* = enum
CONNECT

View File

@ -10,6 +10,7 @@
## - CHUNK: sends a chunk of data to the upload session.
## - FINALIZE: finalizes the upload and returns the CID of the uploaded file.
## - CANCEL: cancels the upload session.
## - SUBSCRIBE: subscribes to progress updates for the upload session.
##
## 2. Directly from a file path: the filepath has to be absolute.
## - INIT: creates a new upload session and returns its ID
@ -32,12 +33,16 @@ from ../../../codex/codex import CodexServer, node
from ../../../codex/node import store
from libp2p import Cid
logScope:
topics = "codexlib codexlibupload"
type NodeUploadMsgType* = enum
INIT
CHUNK
FINALIZE
CANCEL
FILE
SUBSCRIBE
type OnProgressHandler =
proc(bytes: int): Future[void] {.gcsafe, async: (raises: [CancelledError]).}
@ -57,6 +62,8 @@ type
stream: BufferStream
fut: Future[?!Cid]
filepath: string
chunkSize: int
onProgress: OnProgressHandler
var uploadSessions {.threadvar.}: Table[UploadSessionId, UploadSession]
var nexUploadSessionCount {.threadvar.}: UploadSessionCount
@ -85,10 +92,7 @@ proc destroyShared(self: ptr NodeUploadRequest) =
deallocShared(self)
proc init(
codex: ptr CodexServer,
filepath: cstring = "",
chunkSize: csize_t = 0,
onProgress: OnProgressHandler,
codex: ptr CodexServer, filepath: cstring = "", chunkSize: csize_t = 0
): Future[Result[string, string]] {.async: (raises: []).} =
## Init a new session upload and return its ID.
## The session contains the future corresponding to the
@ -100,18 +104,19 @@ proc init(
##
## The chunkSize matches by default the block size used to store the file.
##
## An onProgress handler can be provided to get upload progress.
## The handler is called with the size of the block stored in the node
## when a new block is put in the node.
## After the `node.store` future is completed, whether successfully or not,
## the onProgress handler is called with -1 to signal the end of the upload.
## This allows to clean up the cGo states.
## When a session contains an onProgress handler, it is called
## with the number of bytes received each time a block is stored thanks to
## `onBlockStore` callback.
## After the `node.store` future is done, the onProgress handler
## is called one last time with 0 bytes to signal the end of the upload.
var filenameOpt, mimetypeOpt = string.none
if isAbsolute($filepath):
if not fileExists($filepath):
return err("File does not exist")
return err(
"Failed to create an upload session, the filepath does not exist: " & $filepath
)
if filepath != "":
let (_, name, ext) = splitFile($filepath)
@ -139,20 +144,32 @@ proc init(
let onBlockStore = proc(
chunk: seq[byte]
): Future[void] {.gcsafe, async: (raises: [CancelledError]).} =
discard onProgress(chunk.len)
try:
let session = uploadSessions[$sessionId]
if session.onProgress != nil:
await session.onProgress(chunk.len)
except KeyError:
error "Failed to push progress update, session is not found: ",
sessionId = $sessionId
let blockSize =
if chunkSize.NBytes > 0.NBytes: chunkSize.NBytes else: DefaultBlockSize
let fut = node.store(lpStream, filenameOpt, mimetypeOpt, blockSize, onBlockStore)
proc cb(_: pointer) {.raises: [].} =
# Signal end of upload
discard onProgress(-1)
try:
let session = uploadSessions[$sessionId]
if session.onProgress != nil:
discard session.onProgress(0)
except KeyError:
error "Failed to push the progress final state, session is not found.",
sessionId = $sessionId
fut.addCallback(cb)
uploadSessions[sessionId] =
UploadSession(stream: stream, fut: fut, filepath: $filepath)
uploadSessions[sessionId] = UploadSession(
stream: stream, fut: fut, filepath: $filepath, chunkSize: blockSize.int
)
return ok(sessionId)
@ -163,17 +180,17 @@ proc chunk(
## The chunk is pushed to the BufferStream of the session.
if not uploadSessions.contains($sessionId):
return err("Invalid session ID")
return err("Failed to upload the chunk, the session is not found: " & $sessionId)
try:
let session = uploadSessions[$sessionId]
await session.stream.pushData(chunk)
except KeyError as e:
return err("Invalid session ID")
return err("Failed to upload the chunk, the session is not found: " & $sessionId)
except LPError as e:
return err("Stream error: " & $e.msg)
return err("Failed to upload the chunk, stream error: " & $e.msg)
except CancelledError as e:
return err("Operation cancelled")
return err("Failed to upload the chunk, operation cancelled.")
return ok("")
@ -189,7 +206,8 @@ proc finalize(
## case of errors).
if not uploadSessions.contains($sessionId):
return err("Invalid session ID")
return
err("Failed to finalize the upload session, session not found: " & $sessionId)
var session: UploadSession
try:
@ -198,17 +216,18 @@ proc finalize(
let res = await session.fut
if res.isErr:
return err("Upload failed: " & res.error().msg)
return err("Failed to finalize the upload session: " & res.error().msg)
return ok($res.get())
except KeyError as e:
return err("Invalid session ID")
return
err("Failed to finalize the upload session, invalid session ID: " & $sessionId)
except LPStreamError as e:
return err("Stream error: " & $e.msg)
return err("Failed to finalize the upload session, stream error: " & $e.msg)
except CancelledError as e:
return err("Operation cancelled")
return err("Failed to finalize the upload session, operation cancelled")
except CatchableError as e:
return err("Upload failed: " & $e.msg)
return err("Failed to finalize the upload session: " & $e.msg)
finally:
if uploadSessions.contains($sessionId):
uploadSessions.del($sessionId)
@ -224,20 +243,20 @@ proc cancel(
## from the table.
if not uploadSessions.contains($sessionId):
return err("Invalid session ID")
return err("Failed to cancel the upload session, session not found: " & $sessionId)
try:
let session = uploadSessions[$sessionId]
session.fut.cancelSoon()
except KeyError as e:
return err("Invalid session ID")
return err("Failed to cancel the upload session, invalid session ID: " & $sessionId)
uploadSessions.del($sessionId)
return ok("")
proc streamFile(
filepath: string, stream: BufferStream
filepath: string, stream: BufferStream, chunkSize: int
): Future[Result[void, string]] {.async: (raises: [CancelledError]).} =
## Streams a file from the given filepath using faststream.
## fsMultiSync cannot be used with chronos because of this warning:
@ -252,43 +271,54 @@ proc streamFile(
let inputStreamHandle = filePath.fileInput()
let inputStream = inputStreamHandle.implicitDeref
var buf = newSeq[byte](chunkSize)
while inputStream.readable:
let byt = inputStream.read
await stream.pushData(@[byt])
let read = inputStream.readIntoEx(buf)
if read == 0:
break
await stream.pushData(buf[0 ..< read])
# let byt = inputStream.read
# await stream.pushData(@[byt])
return ok()
except IOError, OSError, LPStreamError:
let e = getCurrentException()
return err("Stream error: " & $e.msg)
return err("Failed to stream the file: " & $e.msg)
proc file(
codex: ptr CodexServer, sessionId: cstring
codex: ptr CodexServer, sessionId: cstring, onProgress: OnProgressHandler
): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
## Starts the file upload for the session identified by sessionId.
## Will call finalize when done and return the CID of the uploaded file.
## In the finally block, the cleanup section removes the session
## from the table and cancels the future if it is not complete (in
## case of errors).
##
## If `onProgress` is provided, it is called with the number of bytes
## received each time a block is stored thanks to `onBlockStore` callback.
if not uploadSessions.contains($sessionId):
return err("Invalid session ID")
return err("Failed to upload the file, invalid session ID: " & $sessionId)
var session: UploadSession
try:
session = uploadSessions[$sessionId]
let res = await streamFile(session.filepath, session.stream)
if onProgress != nil:
uploadSessions[$sessionId].onProgress = onProgress
let res = await streamFile(session.filepath, session.stream, session.chunkSize)
if res.isErr:
return err("Failed to stream file: " & res.error)
return err("Failed to upload the file: " & res.error)
return await codex.finalize(sessionId)
except KeyError as e:
return err("Invalid session ID")
return err("Failed to upload the file, the session is not found: " & $sessionId)
except LPStreamError, IOError:
let e = getCurrentException()
return err("Stream error: " & $e.msg)
return err("Failed to upload the file: " & $e.msg)
except CancelledError as e:
return err("Operation cancelled")
return err("Failed to upload the file, the operation is cancelled.")
except CatchableError as e:
return err("Upload failed: " & $e.msg)
return err("Failed to upload the file: " & $e.msg)
finally:
if uploadSessions.contains($sessionId):
uploadSessions.del($sessionId)
@ -296,6 +326,46 @@ proc file(
if session.fut != nil and not session.fut.finished():
session.fut.cancelSoon()
proc subscribe(
codex: ptr CodexServer, sessionId: cstring, onProgress: OnProgressHandler
): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
## Subscribes to progress updates for the upload session identified by sessionId.
## The onProgress handler is called with the number of bytes received
## each time a block is stored thanks to `onBlockStore` callback.
if not uploadSessions.contains($sessionId):
return err(
"Failed to subscribe to the upload session, invalid session ID: " & $sessionId
)
let fut = newFuture[void]()
proc onBlockReceived(bytes: int): Future[void] {.async: (raises: [CancelledError]).} =
try:
let session = uploadSessions[$sessionId]
await onProgress(bytes)
if bytes == 0:
fut.complete()
except KeyError:
fut.cancelSoon()
error "Failed to push progress update, session is not found: ",
sessionId = $sessionId
try:
uploadSessions[$sessionId].onProgress = onBlockReceived
except KeyError:
return err(
"Failed to subscribe to the upload session, session is not found: " & $sessionId
)
try:
await fut
except CatchableError as e:
return err("Failed to subscribe to the upload session: " & $e.msg)
return ok("")
proc process*(
self: ptr NodeUploadRequest, codex: ptr CodexServer
): Future[Result[string, string]] {.async: (raises: []).} =
@ -304,7 +374,7 @@ proc process*(
case self.operation
of NodeUploadMsgType.INIT:
let res = (await init(codex, self.filepath, self.chunkSize, self.onProgress))
let res = (await init(codex, self.filepath, self.chunkSize))
if res.isErr:
error "INIT failed", error = res.error
return err($res.error)
@ -328,10 +398,16 @@ proc process*(
return err($res.error)
return res
of NodeUploadMsgType.FILE:
let res = (await file(codex, self.sessionId))
let res = (await file(codex, self.sessionId, self.onProgress))
if res.isErr:
error "FILE failed", error = res.error
return err($res.error)
return res
of NodeUploadMsgType.SUBSCRIBE:
let res = (await subscribe(codex, self.sessionId, self.onProgress))
if res.isErr:
error "SUBSCRIBE failed", error = res.error
return err($res.error)
return res
return ok("")

View File

@ -115,6 +115,12 @@ int codex_upload_file(
CodexCallback callback,
void* userData);
int codex_upload_subscribe(
void* ctx,
const char* sessionId,
CodexCallback callback,
void* userData);
int codex_start(void* ctx,
CodexCallback callback,
void* userData);

View File

@ -38,6 +38,9 @@ import ./ffi_types
from ../codex/conf import codexVersion, updateLogLevel
logScope:
topics = "codexlib"
template checkLibcodexParams*(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
) =
@ -76,10 +79,6 @@ proc initializeLibrary() {.exported.} =
locals = addr(locals)
nimGC_setStackBottom(locals)
template init(ctx, callback, userData) =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
proc codex_new(
configJson: cstring, callback: CodexCallback, userData: pointer
): pointer {.dynlib, exported.} =
@ -111,7 +110,8 @@ proc codex_new(
proc codex_version(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
init(ctx, callback, userData)
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
callback(
RET_OK,
@ -125,7 +125,8 @@ proc codex_version(
proc codex_revision(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
init(ctx, callback, userData)
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
callback(
RET_OK,
@ -139,7 +140,8 @@ proc codex_revision(
proc codex_repo(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
init(ctx, callback, userData)
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent = NodeInfoRequest.createShared(NodeInfoMsgType.REPO)
let res = codex_context.sendRequestToCodexThread(
@ -151,7 +153,8 @@ proc codex_repo(
proc codex_debug(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
init(ctx, callback, userData)
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent = NodeDebugRequest.createShared(NodeDebugMsgType.DEBUG)
let res = codex_context.sendRequestToCodexThread(
@ -163,7 +166,8 @@ proc codex_debug(
proc codex_spr(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
init(ctx, callback, userData)
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent = NodeInfoRequest.createShared(NodeInfoMsgType.SPR)
let res = codex_context.sendRequestToCodexThread(
@ -175,7 +179,8 @@ proc codex_spr(
proc codex_peer_id(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
init(ctx, callback, userData)
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent = NodeInfoRequest.createShared(NodeInfoMsgType.PEERID)
let res = codex_context.sendRequestToCodexThread(
@ -190,7 +195,8 @@ proc codex_peer_id(
proc codex_log_level(
ctx: ptr CodexContext, logLevel: cstring, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
init(ctx, callback, userData)
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
try:
updateLogLevel($logLevel)
@ -207,7 +213,8 @@ proc codex_connect(
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
init(ctx, callback, userData)
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
var peerAddresses = newSeq[cstring](peerAddressesLength)
let peers = cast[ptr UncheckedArray[cstring]](peerAddressesPtr)
@ -226,7 +233,8 @@ proc codex_connect(
proc codex_peer_debug(
ctx: ptr CodexContext, peerId: cstring, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
init(ctx, callback, userData)
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent = NodeDebugRequest.createShared(NodeDebugMsgType.PEER, peerId = peerId)
let res = codex_context.sendRequestToCodexThread(
@ -238,7 +246,8 @@ proc codex_peer_debug(
proc codex_destroy(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
init(ctx, callback, userData)
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let res = codex_context.destroyCodexContext(ctx)
if res.isErr:
@ -253,19 +262,13 @@ proc codex_upload_init(
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
init(ctx, callback, userData)
let onProgress = proc(
bytes: int
): Future[void] {.gcsafe, async: (raises: [CancelledError]).} =
callback(RET_PROGRESS, nil, bytes.csize_t, userData)
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent = NodeUploadRequest.createShared(
NodeUploadMsgType.INIT,
filepath = filepath,
chunkSize = chunkSize,
onProgress = onProgress,
NodeUploadMsgType.INIT, filepath = filepath, chunkSize = chunkSize
)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.UPLOAD, reqContent, callback, userData
)
@ -280,7 +283,8 @@ proc codex_upload_chunk(
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
init(ctx, callback, userData)
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let chunk = newSeq[byte](len)
copyMem(addr chunk[0], data, len)
@ -300,7 +304,8 @@ proc codex_upload_finalize(
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
init(ctx, callback, userData)
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent =
NodeUploadRequest.createShared(NodeUploadMsgType.FINALIZE, sessionId = sessionId)
@ -316,7 +321,8 @@ proc codex_upload_cancel(
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
init(ctx, callback, userData)
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent =
NodeUploadRequest.createShared(NodeUploadMsgType.CANCEL, sessionId = sessionId)
@ -333,10 +339,43 @@ proc codex_upload_file(
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
init(ctx, callback, userData)
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent =
NodeUploadRequest.createShared(NodeUploadMsgType.FILE, sessionId = sessionId)
let onProgress = proc(
bytes: int
): Future[void] {.async: (raises: [CancelledError]).} =
if userData != nil:
callback(RET_PROGRESS, nil, cast[csize_t](bytes), userData)
let reqContent = NodeUploadRequest.createShared(
NodeUploadMsgType.FILE, sessionId = sessionId, onProgress = onProgress
)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.UPLOAD, reqContent, callback, userData
)
return callback.okOrError(res, userData)
proc codex_upload_subscribe(
ctx: ptr CodexContext,
sessionId: cstring,
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let onProgress = proc(
bytes: int
): Future[void] {.async: (raises: [CancelledError]).} =
if userData != nil:
callback(RET_PROGRESS, nil, cast[csize_t](bytes), userData)
let reqContent = NodeUploadRequest.createShared(
NodeUploadMsgType.SUBSCRIBE, sessionId = sessionId, onProgress = onProgress
)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.UPLOAD, reqContent, callback, userData
@ -347,7 +386,8 @@ proc codex_upload_file(
proc codex_start(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
init(ctx, callback, userData)
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent: ptr NodeLifecycleRequest =
NodeLifecycleRequest.createShared(NodeLifecycleMsgType.START_NODE)
@ -360,7 +400,8 @@ proc codex_start(
proc codex_stop(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
init(ctx, callback, userData)
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent: ptr NodeLifecycleRequest =
NodeLifecycleRequest.createShared(NodeLifecycleMsgType.STOP_NODE)