From ea06e3df3b5f5828cde020ffa74191817682bc9a Mon Sep 17 00:00:00 2001 From: Arnaud Date: Mon, 22 Sep 2025 15:33:15 +0200 Subject: [PATCH] Add progress callback for upload --- codex/node.nim | 6 + examples/golang/codex.go | 181 ++++++++++++------ .../requests/node_upload_request.nim | 54 ++++-- library/ffi_types.nim | 1 + library/libcodex.h | 3 +- library/libcodex.nim | 25 ++- 6 files changed, 187 insertions(+), 83 deletions(-) diff --git a/codex/node.nim b/codex/node.nim index 1ca471d5..44c4de5c 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -81,6 +81,8 @@ type BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {. gcsafe, async: (raises: [CancelledError]) .} + OnBlockStoreProc = + proc(chunk: seq[byte]): Future[void] {.gcsafe, async: (raises: [CancelledError]).} func switch*(self: CodexNodeRef): Switch = return self.switch @@ -403,6 +405,7 @@ proc store*( filename: ?string = string.none, mimetype: ?string = string.none, blockSize = DefaultBlockSize, + onBlockStoreProc: OnBlockStoreProc = nil, ): Future[?!Cid] {.async.} = ## Save stream contents as dataset with given blockSize ## to nodes's BlockStore, and return Cid of its manifest @@ -432,6 +435,9 @@ proc store*( if err =? (await self.networkStore.putBlock(blk)).errorOption: error "Unable to store block", cid = blk.cid, err = err.msg return failure(&"Unable to store block {blk.cid}") + + if not onBlockStoreProc.isNil: + discard onBlockStoreProc(chunk) except CancelledError as exc: raise exc except CatchableError as exc: diff --git a/examples/golang/codex.go b/examples/golang/codex.go index cb822964..30c65eb3 100644 --- a/examples/golang/codex.go +++ b/examples/golang/codex.go @@ -104,8 +104,8 @@ package main return codex_peer_debug(codexCtx, peerId, (CodexCallback) callback, resp); } - static int cGoCodexUploadInit(void* codexCtx, char* filepath, void* resp) { - return codex_upload_init(codexCtx, filepath, (CodexCallback) callback, resp); + static int cGoCodexUploadInit(void* codexCtx, char* filepath, size_t chunkSize, void* resp) { + return codex_upload_init(codexCtx, filepath, chunkSize, (CodexCallback) callback, resp); } static int cGoCodexUploadChunk(void* codexCtx, char* sessionId, const uint8_t* chunk, size_t len, void* resp) { @@ -120,8 +120,8 @@ package main return codex_upload_cancel(codexCtx, sessionId, (CodexCallback) callback, resp); } - static int cGoCodexUploadFile(void* codexCtx, char* sessionId, size_t chunkSize, void* resp) { - return codex_upload_file(codexCtx, sessionId, chunkSize, (CodexCallback) callback, resp); + static int cGoCodexUploadFile(void* codexCtx, char* sessionId, void* resp) { + return codex_upload_file(codexCtx, sessionId, (CodexCallback) callback, resp); } static int cGoCodexStart(void* codexCtx, void* resp) { @@ -253,12 +253,23 @@ type CodexNode struct { ctx unsafe.Pointer } +const defaultBlockSize = 1024 * 64 + +type CodexUploadOptions struct { + filepath string + chunkSize int + onProgress func(read, total int, percent float64) +} + type bridgeCtx struct { wg *sync.WaitGroup h cgo.Handle resp unsafe.Pointer result string err error + + // Callback used for upload and download + onProgress func(read int) } func newBridgeCtx() *bridgeCtx { @@ -273,18 +284,13 @@ func newBridgeCtx() *bridgeCtx { } func (b *bridgeCtx) free() { + b.h.Delete() + b.h = 0 + C.freeResp(b.resp) b.resp = nil } -func (b *bridgeCtx) isOK() bool { - return C.getRet(b.resp) == C.RET_OK -} - -func (b *bridgeCtx) isError() bool { - return C.getRet(b.resp) == C.RET_ERR -} - func (b *bridgeCtx) CallError(name string) error { return fmt.Errorf("Failed the call to %s. Returned code: %d.", name, C.getRet(b.resp)) } @@ -295,6 +301,21 @@ func (b *bridgeCtx) wait() (string, error) { return b.result, b.err } +func getReaderSize(r io.Reader) int64 { + switch v := r.(type) { + case *os.File: + stat, err := v.Stat() + if err != nil { + return 0 + } + return stat.Size() + case *bytes.Buffer: + return int64(v.Len()) + default: + return 0 + } +} + //export callback func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { if resp == nil { @@ -306,6 +327,16 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { m.msg = msg m.len = len + if ret == C.RET_PROGRESS { + if m.h != 0 { + h := cgo.Handle(m.h) + if v, ok := h.Value().(*bridgeCtx); ok && v.onProgress != nil { + v.onProgress(int(len)) + } + } + return + } + if m.h == 0 { return } @@ -320,20 +351,15 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { if ret == C.RET_OK || ret == C.RET_ERR { retMsg := C.GoStringN(msg, C.int(len)) - // log.Println("Callback called with ret:", ret, " msg:", retMsg, " len:", len) - if ret == C.RET_OK { v.result = retMsg + v.err = nil } else { v.err = errors.New(retMsg) } - h.Delete() - m.h = 0 - if v.wg != nil { v.wg.Done() - v = nil } } } @@ -500,14 +526,32 @@ func (self *CodexNode) CodexPeerDebug(peerId string) (RestPeerRecord, error) { return record, err } -func (self *CodexNode) CodexUploadInit(filename string) (string, error) { +func (self *CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, error) { bridge := newBridgeCtx() - defer bridge.free() + totalRead := 0 - var cFilename = C.CString(filename) + bridge.onProgress = func(bytes int) { + if bytes == -1 { + bridge.free() + } else { + totalRead += bytes + + if options.onProgress != nil { + options.onProgress(bytes, totalRead, 0) + } + } + } + + var cFilename = C.CString(options.filepath) defer C.free(unsafe.Pointer(cFilename)) - if C.cGoCodexUploadInit(self.ctx, cFilename, bridge.resp) != C.RET_OK { + if options.chunkSize == 0 { + options.chunkSize = defaultBlockSize + } + + var cChunkSize = C.size_t(options.chunkSize) + + if C.cGoCodexUploadInit(self.ctx, cFilename, cChunkSize, bridge.resp) != C.RET_OK { return "", bridge.CallError("cGoCodexUploadInit") } @@ -563,13 +607,26 @@ func (self *CodexNode) CodexUploadCancel(sessionId string) error { return err } -func (self *CodexNode) CodexUploadReader(filename string, r io.Reader, chunkSize int) (string, error) { - sessionId, err := self.CodexUploadInit(filename) +func (self *CodexNode) CodexUploadReader(options CodexUploadOptions, r io.Reader) (string, error) { + if options.onProgress != nil { + size := getReaderSize(r) + + if size > 0 { + fn := options.onProgress + + options.onProgress = func(read, total int, _ float64) { + percent := float64(total) / float64(size) * 100.0 + fn(read, total, percent) + } + } + } + + sessionId, err := self.CodexUploadInit(&options) if err != nil { return "", err } - buf := make([]byte, chunkSize) + buf := make([]byte, options.chunkSize) for { n, err := r.Read(buf) if n > 0 { @@ -592,23 +649,38 @@ func (self *CodexNode) CodexUploadReader(filename string, r io.Reader, chunkSize return self.CodexUploadFinalize(sessionId) } -// TODO provide an async version of CodexUploadReader -// that starts a gorountine to upload the chunks -// and take: -// a callback to be called when done -// another callback to cancel the upload -func (self *CodexNode) CodexUploadReaderAsync(filename string, r io.Reader, chunkSize int) (string, error) { - return "", nil +func (self *CodexNode) CodexUploadReaderAsync(options CodexUploadOptions, r io.Reader, onDone func(cid string, err error)) { + go func() { + cid, err := self.CodexUploadReader(options, r) + onDone(cid, err) + }() } -func (self *CodexNode) CodexUploadFile(filepath string, chunkSize int) (string, error) { +func (self *CodexNode) CodexUploadFile(options CodexUploadOptions) (string, error) { + if options.onProgress != nil { + stat, err := os.Stat(options.filepath) + if err != nil { + return "", err + } + + size := stat.Size() + if size > 0 { + fn := options.onProgress + + options.onProgress = func(read, total int, _ float64) { + percent := float64(total) / float64(size) * 100.0 + fn(read, total, percent) + } + } + } + bridge := newBridgeCtx() defer bridge.free() - var cFilePath = C.CString(filepath) + var cFilePath = C.CString(options.filepath) defer C.free(unsafe.Pointer(cFilePath)) - sessionId, err := self.CodexUploadInit(filepath) + sessionId, err := self.CodexUploadInit(&options) if err != nil { return "", err } @@ -616,12 +688,7 @@ func (self *CodexNode) CodexUploadFile(filepath string, chunkSize int) (string, var cSessionId = C.CString(sessionId) defer C.free(unsafe.Pointer(cSessionId)) - var cChunkSize = C.size_t(0) - if chunkSize > 0 { - cChunkSize = C.size_t(chunkSize) - } - - if C.cGoCodexUploadFile(self.ctx, cSessionId, cChunkSize, bridge.resp) != C.RET_OK { + if C.cGoCodexUploadFile(self.ctx, cSessionId, bridge.resp) != C.RET_OK { return "", bridge.CallError("cGoCodexUploadFile") } @@ -634,7 +701,7 @@ func (self *CodexNode) CodexUploadFile(filepath string, chunkSize int) (string, // and take: // a callback to be called when done // another callback to cancel the upload -func (self *CodexNode) CodexUploadFileAsync(filepath string, chunkSize int) (string, error) { +func (self *CodexNode) CodexUploadFileAsync(filepath string, chunkSize int, cb func(error)) (string, error) { return "", nil } @@ -650,20 +717,11 @@ func (self *CodexNode) CodexStart() error { return err } -func (self *CodexNode) CodexStartAsync(cb func(error)) error { - bridge := newBridgeCtx() - defer bridge.free() - - if C.cGoCodexStart(self.ctx, bridge.resp) != C.RET_OK { - return bridge.CallError("cGoCodexStart") - } - +func (self *CodexNode) CodexStartAsync(onDone func(error)) { go func() { - _, err := bridge.wait() - cb(err) + err := self.CodexStart() + onDone(err) }() - - return nil } func (self *CodexNode) CodexStop() error { @@ -717,7 +775,7 @@ func main() { log.Println("Codex created.") - // node.CodexSetEventCallback() + node.CodexSetEventCallback() version, err := node.CodexVersion() if err != nil { @@ -783,7 +841,7 @@ func main() { log.Println("Codex Log Level set to TRACE") - sessionId, err := node.CodexUploadInit("hello.txt") + sessionId, err := node.CodexUploadInit(&CodexUploadOptions{filepath: "hello.txt"}) if err != nil { log.Fatal("Error happened:", err.Error()) } @@ -808,7 +866,7 @@ func main() { log.Println("Codex Upload Finalized, cid:", cid) buf := bytes.NewBuffer([]byte("Hello World!")) - cid, err = node.CodexUploadReader("hello.txt", buf, 16*1024) + cid, err = node.CodexUploadReader(CodexUploadOptions{filepath: "hello.txt"}, buf) if err != nil { log.Fatal("Error happened:", err.Error()) } @@ -820,9 +878,14 @@ func main() { log.Fatal("Error happened:", err.Error()) } + // Choose a big file to see the progress logs filepath := path.Join(current, "examples", "golang", "hello.txt") - log.Println("Uploading file:", filepath) - cid, err = node.CodexUploadFile(filepath, 1024) + // filepath := path.Join(current, "examples", "golang", "bigfile.zip") + + cid, err = node.CodexUploadFile(CodexUploadOptions{filepath: filepath, onProgress: func(read, total int, percent float64) { + log.Printf("Uploaded %d bytes, total %d bytes (%.2f%%)\n", read, total, percent) + }}) + if err != nil { log.Fatal("Error happened:", err.Error()) } diff --git a/library/codex_thread_requests/requests/node_upload_request.nim b/library/codex_thread_requests/requests/node_upload_request.nim index 747b66ac..1468b20d 100644 --- a/library/codex_thread_requests/requests/node_upload_request.nim +++ b/library/codex_thread_requests/requests/node_upload_request.nim @@ -18,12 +18,16 @@ type NodeUploadMsgType* = enum CANCEL FILE +type OnProgressHandler = + proc(bytes: int): Future[void] {.gcsafe, async: (raises: [CancelledError]).} + type NodeUploadRequest* = object operation: NodeUploadMsgType sessionId: cstring filepath: cstring chunk: seq[byte] chunkSize: csize_t + onProgress: OnProgressHandler type UploadSessionId* = string @@ -32,6 +36,7 @@ type stream: BufferStream fut: Future[?!Cid] filepath: string + chunkSize: int var uploadSessions {.threadvar.}: Table[UploadSessionId, UploadSession] var nexUploadSessionCount {.threadvar.}: UploadSessionCount @@ -42,6 +47,7 @@ proc createShared*( sessionId: cstring = "", filepath: cstring = "", chunk: seq[byte] = @[], + onProgress: OnProgressHandler = nil, chunkSize: csize_t = 0, ): ptr type T = var ret = createShared(T) @@ -50,6 +56,7 @@ proc createShared*( ret[].filepath = filepath.alloc() ret[].chunk = chunk ret[].chunkSize = chunkSize + ret[].onProgress = onProgress return ret proc destroyShared(self: ptr NodeUploadRequest) = @@ -64,7 +71,10 @@ proc destroyShared(self: ptr NodeUploadRequest) = ## or it can be the filename when the file will be uploaded via chunks. ## The mimetype is deduced from the filename extension. proc init( - codex: ptr CodexServer, filepath: cstring = "" + codex: ptr CodexServer, + filepath: cstring = "", + chunkSize: csize_t = 0, + onProgress: OnProgressHandler, ): Future[Result[string, string]] {.async: (raises: []).} = var filenameOpt, mimetypeOpt = string.none @@ -94,9 +104,25 @@ proc init( let stream = BufferStream.new() let lpStream = LPStream(stream) let node = codex[].node - let fut = node.store(lpStream, filenameOpt, mimetypeOpt) - uploadSessions[sessionId] = - UploadSession(stream: stream, fut: fut, filepath: $filepath) + + let onBlockStore = proc( + chunk: seq[byte] + ): Future[void] {.gcsafe, async: (raises: [CancelledError]).} = + discard onProgress(chunk.len) + + let blockSize = + if chunkSize.NBytes > 0.NBytes: chunkSize.NBytes else: DefaultBlockSize + let fut = node.store(lpStream, filenameOpt, mimetypeOpt, blockSize, onBlockStore) + + proc cb(_: pointer) {.raises: [].} = + # Signal end of upload + discard onProgress(-1) + + fut.addCallback(cb) + + uploadSessions[sessionId] = UploadSession( + stream: stream, fut: fut, filepath: $filepath, chunkSize: blockSize.int + ) return ok(sessionId) @@ -163,20 +189,21 @@ proc cancel( return ok("") proc file( - codex: ptr CodexServer, sessionId: cstring, chunkSize: csize_t = 1024 + codex: ptr CodexServer, sessionId: cstring ): Future[Result[string, string]] {.raises: [], async: (raises: []).} = if not uploadSessions.contains($sessionId): return err("Invalid session ID") - let size = if chunkSize > 0: chunkSize else: 1024 - var buffer = newSeq[byte](size) var session: UploadSession ## Here we certainly need to spawn a new thread to avoid blocking ## the worker thread while reading the file. try: session = uploadSessions[$sessionId] + var buffer = newSeq[byte](session.chunkSize) let fs = openFileStream(session.filepath) + defer: + fs.close() while true: let bytesRead = fs.readData(addr buffer[0], buffer.len) @@ -185,13 +212,7 @@ proc file( break await session.stream.pushData(buffer[0 ..< bytesRead]) - await session.stream.pushEof() - - let res = await session.fut - if res.isErr: - return err("Upload failed: " & res.error().msg) - - return ok($res.get()) + return await codex.finalize(sessionId) except KeyError as e: return err("Invalid session ID") except LPStreamError, IOError: @@ -202,6 +223,7 @@ proc file( except CatchableError as e: return err("Upload failed: " & $e.msg) finally: + session.fut.cancel() uploadSessions.del($sessionId) proc process*( @@ -212,7 +234,7 @@ proc process*( case self.operation of NodeUploadMsgType.INIT: - let res = (await init(codex, self.filepath)) + let res = (await init(codex, self.filepath, self.chunkSize, self.onProgress)) if res.isErr: error "INIT failed", error = res.error return err($res.error) @@ -236,7 +258,7 @@ proc process*( return err($res.error) return res of NodeUploadMsgType.FILE: - let res = (await file(codex, self.sessionId, self.chunkSize)) + let res = (await file(codex, self.sessionId)) if res.isErr: error "FILE failed", error = res.error return err($res.error) diff --git a/library/ffi_types.nim b/library/ffi_types.nim index 6923e708..1a865eaf 100644 --- a/library/ffi_types.nim +++ b/library/ffi_types.nim @@ -14,6 +14,7 @@ type CodexCallback* = proc( const RET_OK*: cint = 0 const RET_ERR*: cint = 1 const RET_MISSING_CALLBACK*: cint = 2 +const RET_PROGRESS*: cint = 3 ## Returns RET_OK as acknowledgment and call the callback ## with RET_OK code and the provided message. diff --git a/library/libcodex.h b/library/libcodex.h index d3e52a55..cd0d2fae 100644 --- a/library/libcodex.h +++ b/library/libcodex.h @@ -18,6 +18,7 @@ #define RET_OK 0 #define RET_ERR 1 #define RET_MISSING_CALLBACK 2 +#define RET_PROGRESS 3 #ifdef __cplusplus extern "C" { @@ -84,6 +85,7 @@ int codex_peer_debug( int codex_upload_init( void* ctx, const char* filepath, + size_t chunkSize, CodexCallback callback, void* userData); @@ -110,7 +112,6 @@ int codex_upload_cancel( int codex_upload_file( void* ctx, const char* sessionId, - size_t chunkSize, CodexCallback callback, void* userData); diff --git a/library/libcodex.nim b/library/libcodex.nim index b720efbf..3c3f3efe 100644 --- a/library/libcodex.nim +++ b/library/libcodex.nim @@ -247,12 +247,25 @@ proc codex_destroy( return callback.success("", userData) proc codex_upload_init( - ctx: ptr CodexContext, filepath: cstring, callback: CodexCallback, userData: pointer + ctx: ptr CodexContext, + filepath: cstring, + chunkSize: csize_t, + callback: CodexCallback, + userData: pointer, ): cint {.dynlib, exportc.} = init(ctx, callback, userData) - let reqContent = - NodeUploadRequest.createShared(NodeUploadMsgType.INIT, filepath = filepath) + let onProgress = proc( + bytes: int + ): Future[void] {.gcsafe, async: (raises: [CancelledError]).} = + callback(RET_PROGRESS, nil, bytes.csize_t, userData) + + let reqContent = NodeUploadRequest.createShared( + NodeUploadMsgType.INIT, + filepath = filepath, + chunkSize = chunkSize, + onProgress = onProgress, + ) let res = codex_context.sendRequestToCodexThread( ctx, RequestType.UPLOAD, reqContent, callback, userData ) @@ -317,15 +330,13 @@ proc codex_upload_cancel( proc codex_upload_file( ctx: ptr CodexContext, sessionId: cstring, - chunkSize: csize_t, callback: CodexCallback, userData: pointer, ): cint {.dynlib, exportc.} = init(ctx, callback, userData) - let reqContent = NodeUploadRequest.createShared( - NodeUploadMsgType.FILE, sessionId = sessionId, chunkSize = chunkSize - ) + let reqContent = + NodeUploadRequest.createShared(NodeUploadMsgType.FILE, sessionId = sessionId) let res = codex_context.sendRequestToCodexThread( ctx, RequestType.UPLOAD, reqContent, callback, userData