diff --git a/examples/golang/codex.go b/examples/golang/codex.go index 19a44357..5c04a9f1 100644 --- a/examples/golang/codex.go +++ b/examples/golang/codex.go @@ -124,6 +124,10 @@ package main return codex_upload_file(codexCtx, sessionId, (CodexCallback) callback, resp); } + static int cGoCodexUploadSubscribe(void* codexCtx, char* sessionId, void* resp) { + return codex_upload_subscribe(codexCtx, sessionId, (CodexCallback) callback, resp); + } + static int cGoCodexStart(void* codexCtx, void* resp) { return codex_start(codexCtx, (CodexCallback) callback, resp); } @@ -255,10 +259,12 @@ type CodexNode struct { const defaultBlockSize = 1024 * 64 +type OnProgressFunc func(read, total int, percent float64) + type CodexUploadOptions struct { filepath string chunkSize int - onProgress func(read, total int, percent float64) + onProgress OnProgressFunc } type bridgeCtx struct { @@ -273,22 +279,25 @@ type bridgeCtx struct { } func newBridgeCtx() *bridgeCtx { - var wg sync.WaitGroup - wg.Add(1) + bridge := &bridgeCtx{} + bridge.wg = &sync.WaitGroup{} + bridge.wg.Add(1) - bridge := &bridgeCtx{wg: &wg} bridge.h = cgo.NewHandle(bridge) bridge.resp = C.allocResp(C.uintptr_t(uintptr(bridge.h))) - return bridge } func (b *bridgeCtx) free() { - b.h.Delete() - b.h = 0 + if b.h > 0 { + b.h.Delete() + b.h = 0 + } - C.freeResp(b.resp) - b.resp = nil + if b.resp != nil { + C.freeResp(b.resp) + b.resp = nil + } } func (b *bridgeCtx) CallError(name string) error { @@ -298,7 +307,12 @@ func (b *bridgeCtx) CallError(name string) error { func (b *bridgeCtx) wait() (string, error) { b.wg.Wait() - return b.result, b.err + result := b.result + err := b.err + + b.free() + + return result, err } func getReaderSize(r io.Reader) int64 { @@ -327,16 +341,6 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { m.msg = msg m.len = len - if ret == C.RET_PROGRESS { - if m.h != 0 { - h := cgo.Handle(m.h) - if v, ok := h.Value().(*bridgeCtx); ok && v.onProgress != nil { - v.onProgress(int(len)) - } - } - return - } - if m.h == 0 { return } @@ -348,15 +352,22 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { } if v, ok := h.Value().(*bridgeCtx); ok { - if ret == C.RET_OK || ret == C.RET_ERR { - retMsg := C.GoStringN(msg, C.int(len)) - - if ret == C.RET_OK { - v.result = retMsg - v.err = nil - } else { - v.err = errors.New(retMsg) + switch ret { + case C.RET_PROGRESS: + if v.onProgress != nil { + v.onProgress(int(C.int(len))) } + case C.RET_OK: + retMsg := C.GoStringN(msg, C.int(len)) + v.result = retMsg + v.err = nil + + if v.wg != nil { + v.wg.Done() + } + case C.RET_ERR: + retMsg := C.GoStringN(msg, C.int(len)) + v.err = errors.New(retMsg) if v.wg != nil { v.wg.Done() @@ -367,7 +378,6 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { func CodexNew(config CodexConfig) (*CodexNode, error) { bridge := newBridgeCtx() - defer bridge.free() jsonConfig, err := json.Marshal(config) if err != nil { @@ -386,9 +396,8 @@ func CodexNew(config CodexConfig) (*CodexNode, error) { return &CodexNode{ctx: ctx}, bridge.err } -func (self *CodexNode) CodexVersion() (string, error) { +func (self CodexNode) CodexVersion() (string, error) { bridge := newBridgeCtx() - defer bridge.free() if C.cGoCodexVersion(self.ctx, bridge.resp) != C.RET_OK { return "", bridge.CallError("cGoCodexVersion") @@ -397,9 +406,8 @@ func (self *CodexNode) CodexVersion() (string, error) { return bridge.wait() } -func (self *CodexNode) CodexRevision() (string, error) { +func (self CodexNode) CodexRevision() (string, error) { bridge := newBridgeCtx() - defer bridge.free() if C.cGoCodexRevision(self.ctx, bridge.resp) != C.RET_OK { return "", bridge.CallError("cGoCodexRevision") @@ -408,9 +416,8 @@ func (self *CodexNode) CodexRevision() (string, error) { return bridge.wait() } -func (self *CodexNode) CodexRepo() (string, error) { +func (self CodexNode) CodexRepo() (string, error) { bridge := newBridgeCtx() - defer bridge.free() if C.cGoCodexRepo(self.ctx, bridge.resp) != C.RET_OK { return "", bridge.CallError("cGoCodexRepo") @@ -419,11 +426,10 @@ func (self *CodexNode) CodexRepo() (string, error) { return bridge.wait() } -func (self *CodexNode) CodexDebug() (CodexDebugInfo, error) { +func (self CodexNode) CodexDebug() (CodexDebugInfo, error) { var info CodexDebugInfo bridge := newBridgeCtx() - defer bridge.free() if C.cGoCodexDebug(self.ctx, bridge.resp) != C.RET_OK { return info, bridge.CallError("cGoCodexDebug") @@ -439,9 +445,8 @@ func (self *CodexNode) CodexDebug() (CodexDebugInfo, error) { return info, err } -func (self *CodexNode) CodexSpr() (string, error) { +func (self CodexNode) CodexSpr() (string, error) { bridge := newBridgeCtx() - defer bridge.free() if C.cGoCodexSpr(self.ctx, bridge.resp) != C.RET_OK { return "", bridge.CallError("cGoCodexSpr") @@ -450,9 +455,8 @@ func (self *CodexNode) CodexSpr() (string, error) { return bridge.wait() } -func (self *CodexNode) CodexPeerId() (string, error) { +func (self CodexNode) CodexPeerId() (string, error) { bridge := newBridgeCtx() - defer bridge.free() if C.cGoCodexPeerId(self.ctx, bridge.resp) != C.RET_OK { return "", bridge.CallError("cGoCodexPeerId") @@ -461,9 +465,8 @@ func (self *CodexNode) CodexPeerId() (string, error) { return bridge.wait() } -func (self *CodexNode) CodexLogLevel(logLevel LogLevel) error { +func (self CodexNode) CodexLogLevel(logLevel LogLevel) error { bridge := newBridgeCtx() - defer bridge.free() var cLogLevel = C.CString(fmt.Sprintf("%s", logLevel)) defer C.free(unsafe.Pointer(cLogLevel)) @@ -476,9 +479,8 @@ func (self *CodexNode) CodexLogLevel(logLevel LogLevel) error { return err } -func (self *CodexNode) CodexConnect(peerId string, peerAddresses []string) error { +func (self CodexNode) CodexConnect(peerId string, peerAddresses []string) error { bridge := newBridgeCtx() - defer bridge.free() var cPeerId = C.CString(peerId) defer C.free(unsafe.Pointer(cPeerId)) @@ -503,11 +505,10 @@ func (self *CodexNode) CodexConnect(peerId string, peerAddresses []string) error return err } -func (self *CodexNode) CodexPeerDebug(peerId string) (RestPeerRecord, error) { +func (self CodexNode) CodexPeerDebug(peerId string) (RestPeerRecord, error) { var record RestPeerRecord bridge := newBridgeCtx() - defer bridge.free() var cPeerId = C.CString(peerId) defer C.free(unsafe.Pointer(cPeerId)) @@ -526,21 +527,8 @@ func (self *CodexNode) CodexPeerDebug(peerId string) (RestPeerRecord, error) { return record, err } -func (self *CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, error) { +func (self CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, error) { bridge := newBridgeCtx() - totalRead := 0 - - bridge.onProgress = func(bytes int) { - if bytes == -1 { - bridge.free() - } else { - totalRead += bytes - - if options.onProgress != nil { - options.onProgress(bytes, totalRead, 0) - } - } - } var cFilename = C.CString(options.filepath) defer C.free(unsafe.Pointer(cFilename)) @@ -558,9 +546,8 @@ func (self *CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, err return bridge.wait() } -func (self *CodexNode) CodexUploadChunk(sessionId string, chunk []byte) error { +func (self CodexNode) CodexUploadChunk(sessionId string, chunk []byte) error { bridge := newBridgeCtx() - defer bridge.free() var cSessionId = C.CString(sessionId) defer C.free(unsafe.Pointer(cSessionId)) @@ -578,9 +565,8 @@ func (self *CodexNode) CodexUploadChunk(sessionId string, chunk []byte) error { return err } -func (self *CodexNode) CodexUploadFinalize(sessionId string) (string, error) { +func (self CodexNode) CodexUploadFinalize(sessionId string) (string, error) { bridge := newBridgeCtx() - defer bridge.free() var cSessionId = C.CString(sessionId) defer C.free(unsafe.Pointer(cSessionId)) @@ -592,9 +578,8 @@ func (self *CodexNode) CodexUploadFinalize(sessionId string) (string, error) { return bridge.wait() } -func (self *CodexNode) CodexUploadCancel(sessionId string) error { +func (self CodexNode) CodexUploadCancel(sessionId string) error { bridge := newBridgeCtx() - defer bridge.free() var cSessionId = C.CString(sessionId) defer C.free(unsafe.Pointer(cSessionId)) @@ -607,28 +592,46 @@ func (self *CodexNode) CodexUploadCancel(sessionId string) error { return err } -func (self *CodexNode) CodexUploadReader(options CodexUploadOptions, r io.Reader) (string, error) { +func (self CodexNode) CodexUploadReader(options CodexUploadOptions, r io.Reader) (string, error) { + sessionId, err := self.CodexUploadInit(&options) + if err != nil { + return "", err + } + if options.onProgress != nil { size := getReaderSize(r) + total := 0 if size > 0 { - fn := options.onProgress + onProgress := func(read int) { + if read == 0 { + return + } + + total += read - options.onProgress = func(read, total int, _ float64) { percent := float64(total) / float64(size) * 100.0 // The last block could be a bit over the size due to padding // on the chunk size. if percent > 100.0 { percent = 100.0 } - fn(read, total, percent) + + options.onProgress(read, int(size), percent) + } + + if err := self.CodexUploadSubscribe(sessionId, onProgress); err != nil { + if err := self.CodexUploadCancel(sessionId); err != nil { + log.Println("Error cancelling upload after subscribe failure:", err) + } + + return "", err } } } - sessionId, err := self.CodexUploadInit(&options) - if err != nil { - return "", err + if options.chunkSize == 0 { + options.chunkSize = defaultBlockSize } buf := make([]byte, options.chunkSize) @@ -649,19 +652,25 @@ func (self *CodexNode) CodexUploadReader(options CodexUploadOptions, r io.Reader return "", err } + + if n == 0 { + break + } } return self.CodexUploadFinalize(sessionId) } -func (self *CodexNode) CodexUploadReaderAsync(options CodexUploadOptions, r io.Reader, onDone func(cid string, err error)) { +func (self CodexNode) CodexUploadReaderAsync(options CodexUploadOptions, r io.Reader, onDone func(cid string, err error)) { go func() { cid, err := self.CodexUploadReader(options, r) onDone(cid, err) }() } -func (self *CodexNode) CodexUploadFile(options CodexUploadOptions) (string, error) { +func (self CodexNode) CodexUploadFile(options CodexUploadOptions) (string, error) { + bridge := newBridgeCtx() + if options.onProgress != nil { stat, err := os.Stat(options.filepath) if err != nil { @@ -669,24 +678,28 @@ func (self *CodexNode) CodexUploadFile(options CodexUploadOptions) (string, erro } size := stat.Size() - if size > 0 { - fn := options.onProgress + total := 0 + + if size > 0 { + bridge.onProgress = func(read int) { + if read == 0 { + return + } + + total += read - options.onProgress = func(read, total int, _ float64) { percent := float64(total) / float64(size) * 100.0 // The last block could be a bit over the size due to padding // on the chunk size. if percent > 100.0 { percent = 100.0 } - fn(read, total, percent) + + options.onProgress(read, int(size), percent) } } } - bridge := newBridgeCtx() - defer bridge.free() - var cFilePath = C.CString(options.filepath) defer C.free(unsafe.Pointer(cFilePath)) @@ -702,18 +715,40 @@ func (self *CodexNode) CodexUploadFile(options CodexUploadOptions) (string, erro return "", bridge.CallError("cGoCodexUploadFile") } - cid, err := bridge.wait() - return cid, err + return bridge.wait() } -func (self *CodexNode) CodexUploadFileAsync(options CodexUploadOptions, onDone func(cid string, err error)) { +func (self CodexNode) CodexUploadFileAsync(options CodexUploadOptions, onDone func(cid string, err error)) { go func() { cid, err := self.CodexUploadFile(options) onDone(cid, err) }() } -func (self *CodexNode) CodexStart() error { +func (self CodexNode) CodexUploadSubscribe(sessionId string, onProgress func(read int)) error { + bridge := newBridgeCtx() + + bridge.onProgress = onProgress + + var cSessionId = C.CString(sessionId) + defer C.free(unsafe.Pointer(cSessionId)) + + log.Println("Subscribing to upload progress...") + + if C.cGoCodexUploadSubscribe(self.ctx, cSessionId, bridge.resp) != C.RET_OK { + return bridge.CallError("cGoCodexUploadSubscribe") + } + + go func() { + if _, err := bridge.wait(); err != nil { + log.Println("Error in CodexUploadSubscribe:", err) + } + }() + + return nil +} + +func (self CodexNode) CodexStart() error { bridge := newBridgeCtx() defer bridge.free() @@ -725,14 +760,14 @@ func (self *CodexNode) CodexStart() error { return err } -func (self *CodexNode) CodexStartAsync(onDone func(error)) { +func (self CodexNode) CodexStartAsync(onDone func(error)) { go func() { err := self.CodexStart() onDone(err) }() } -func (self *CodexNode) CodexStop() error { +func (self CodexNode) CodexStop() error { bridge := newBridgeCtx() defer bridge.free() @@ -744,7 +779,7 @@ func (self *CodexNode) CodexStop() error { return err } -func (self *CodexNode) CodexDestroy() error { +func (self CodexNode) CodexDestroy() error { bridge := newBridgeCtx() defer bridge.free() @@ -764,11 +799,11 @@ func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData un self.MyEventCallback(callerRet, msg, len) } -func (self *CodexNode) MyEventCallback(callerRet C.int, msg *C.char, len C.size_t) { +func (self CodexNode) MyEventCallback(callerRet C.int, msg *C.char, len C.size_t) { log.Println("Event received:", C.GoStringN(msg, C.int(len))) } -func (self *CodexNode) CodexSetEventCallback() { +func (self CodexNode) CodexSetEventCallback() { // Notice that the events for self node are handled by the 'MyEventCallback' method C.cGoCodexSetEventCallback(self.ctx) } @@ -853,7 +888,6 @@ func main() { if err != nil { log.Fatal("Error happened:", err.Error()) } - log.Println("Codex Upload Init sessionId:", sessionId) err = node.CodexUploadChunk(sessionId, []byte("Hello ")) @@ -874,7 +908,9 @@ func main() { log.Println("Codex Upload Finalized, cid:", cid) buf := bytes.NewBuffer([]byte("Hello World!")) - cid, err = node.CodexUploadReader(CodexUploadOptions{filepath: "hello.txt"}, buf) + cid, err = node.CodexUploadReader(CodexUploadOptions{filepath: "hello.txt", onProgress: func(read, total int, percent float64) { + log.Printf("Uploaded %d bytes, total %d bytes (%.2f%%)\n", read, total, percent) + }}, buf) if err != nil { log.Fatal("Error happened:", err.Error()) } @@ -888,10 +924,12 @@ func main() { // Choose a big file to see the progress logs filepath := path.Join(current, "examples", "golang", "hello.txt") - // filepath := path.Join(current, "examples", "golang", "discord-0.0.109.deb") + //filepath := path.Join(current, "examples", "golang", "discord-0.0.109.deb") + options := CodexUploadOptions{filepath: filepath, onProgress: func(read, total int, percent float64) { log.Printf("Uploaded %d bytes, total %d bytes (%.2f%%)\n", read, total, percent) }} + cid, err = node.CodexUploadFile(options) if err != nil { diff --git a/library/codex_context.nim b/library/codex_context.nim index fb69aad5..e983cd52 100644 --- a/library/codex_context.nim +++ b/library/codex_context.nim @@ -18,6 +18,9 @@ import ./codex_thread_requests/[codex_thread_request] from ../codex/codex import CodexServer +logScope: + topics = "codexlib" + type CodexContext* = object thread: Thread[(ptr CodexContext)] diff --git a/library/codex_thread_requests/requests/node_debug_request.nim b/library/codex_thread_requests/requests/node_debug_request.nim index 855db904..2f492505 100644 --- a/library/codex_thread_requests/requests/node_debug_request.nim +++ b/library/codex_thread_requests/requests/node_debug_request.nim @@ -16,6 +16,9 @@ import ../../../codex/node from ../../../codex/codex import CodexServer, node +logScope: + topics = "codexlib codexlibdebug" + type NodeDebugMsgType* = enum DEBUG PEER diff --git a/library/codex_thread_requests/requests/node_info_request.nim b/library/codex_thread_requests/requests/node_info_request.nim index 37a3499a..167a3094 100644 --- a/library/codex_thread_requests/requests/node_info_request.nim +++ b/library/codex_thread_requests/requests/node_info_request.nim @@ -11,6 +11,9 @@ import ../../../codex/node from ../../../codex/codex import CodexServer, config, node +logScope: + topics = "codexlib codexlibinfo" + type NodeInfoMsgType* = enum REPO SPR diff --git a/library/codex_thread_requests/requests/node_lifecycle_request.nim b/library/codex_thread_requests/requests/node_lifecycle_request.nim index d5431867..c6d06f48 100644 --- a/library/codex_thread_requests/requests/node_lifecycle_request.nim +++ b/library/codex_thread_requests/requests/node_lifecycle_request.nim @@ -24,6 +24,9 @@ import ../../../codex/units from ../../../codex/codex import CodexServer, new, start, stop +logScope: + topics = "codexlib codexliblifecycle" + type NodeLifecycleMsgType* = enum CREATE_NODE START_NODE diff --git a/library/codex_thread_requests/requests/node_p2p_request.nim b/library/codex_thread_requests/requests/node_p2p_request.nim index 3b840702..46790070 100644 --- a/library/codex_thread_requests/requests/node_p2p_request.nim +++ b/library/codex_thread_requests/requests/node_p2p_request.nim @@ -12,6 +12,9 @@ import ../../../codex/node from ../../../codex/codex import CodexServer, node +logScope: + topics = "codexlib codexlibp2p" + type NodeP2PMsgType* = enum CONNECT diff --git a/library/codex_thread_requests/requests/node_upload_request.nim b/library/codex_thread_requests/requests/node_upload_request.nim index e7b2a64a..b83d6aeb 100644 --- a/library/codex_thread_requests/requests/node_upload_request.nim +++ b/library/codex_thread_requests/requests/node_upload_request.nim @@ -10,6 +10,7 @@ ## - CHUNK: sends a chunk of data to the upload session. ## - FINALIZE: finalizes the upload and returns the CID of the uploaded file. ## - CANCEL: cancels the upload session. +## - SUBSCRIBE: subscribes to progress updates for the upload session. ## ## 2. Directly from a file path: the filepath has to be absolute. ## - INIT: creates a new upload session and returns its ID @@ -32,12 +33,16 @@ from ../../../codex/codex import CodexServer, node from ../../../codex/node import store from libp2p import Cid +logScope: + topics = "codexlib codexlibupload" + type NodeUploadMsgType* = enum INIT CHUNK FINALIZE CANCEL FILE + SUBSCRIBE type OnProgressHandler = proc(bytes: int): Future[void] {.gcsafe, async: (raises: [CancelledError]).} @@ -57,6 +62,8 @@ type stream: BufferStream fut: Future[?!Cid] filepath: string + chunkSize: int + onProgress: OnProgressHandler var uploadSessions {.threadvar.}: Table[UploadSessionId, UploadSession] var nexUploadSessionCount {.threadvar.}: UploadSessionCount @@ -85,10 +92,7 @@ proc destroyShared(self: ptr NodeUploadRequest) = deallocShared(self) proc init( - codex: ptr CodexServer, - filepath: cstring = "", - chunkSize: csize_t = 0, - onProgress: OnProgressHandler, + codex: ptr CodexServer, filepath: cstring = "", chunkSize: csize_t = 0 ): Future[Result[string, string]] {.async: (raises: []).} = ## Init a new session upload and return its ID. ## The session contains the future corresponding to the @@ -100,18 +104,19 @@ proc init( ## ## The chunkSize matches by default the block size used to store the file. ## - ## An onProgress handler can be provided to get upload progress. - ## The handler is called with the size of the block stored in the node - ## when a new block is put in the node. - ## After the `node.store` future is completed, whether successfully or not, - ## the onProgress handler is called with -1 to signal the end of the upload. - ## This allows to clean up the cGo states. + ## When a session contains an onProgress handler, it is called + ## with the number of bytes received each time a block is stored thanks to + ## `onBlockStore` callback. + ## After the `node.store` future is done, the onProgress handler + ## is called one last time with 0 bytes to signal the end of the upload. var filenameOpt, mimetypeOpt = string.none if isAbsolute($filepath): if not fileExists($filepath): - return err("File does not exist") + return err( + "Failed to create an upload session, the filepath does not exist: " & $filepath + ) if filepath != "": let (_, name, ext) = splitFile($filepath) @@ -139,20 +144,32 @@ proc init( let onBlockStore = proc( chunk: seq[byte] ): Future[void] {.gcsafe, async: (raises: [CancelledError]).} = - discard onProgress(chunk.len) + try: + let session = uploadSessions[$sessionId] + if session.onProgress != nil: + await session.onProgress(chunk.len) + except KeyError: + error "Failed to push progress update, session is not found: ", + sessionId = $sessionId let blockSize = if chunkSize.NBytes > 0.NBytes: chunkSize.NBytes else: DefaultBlockSize let fut = node.store(lpStream, filenameOpt, mimetypeOpt, blockSize, onBlockStore) proc cb(_: pointer) {.raises: [].} = - # Signal end of upload - discard onProgress(-1) + try: + let session = uploadSessions[$sessionId] + if session.onProgress != nil: + discard session.onProgress(0) + except KeyError: + error "Failed to push the progress final state, session is not found.", + sessionId = $sessionId fut.addCallback(cb) - uploadSessions[sessionId] = - UploadSession(stream: stream, fut: fut, filepath: $filepath) + uploadSessions[sessionId] = UploadSession( + stream: stream, fut: fut, filepath: $filepath, chunkSize: blockSize.int + ) return ok(sessionId) @@ -163,17 +180,17 @@ proc chunk( ## The chunk is pushed to the BufferStream of the session. if not uploadSessions.contains($sessionId): - return err("Invalid session ID") + return err("Failed to upload the chunk, the session is not found: " & $sessionId) try: let session = uploadSessions[$sessionId] await session.stream.pushData(chunk) except KeyError as e: - return err("Invalid session ID") + return err("Failed to upload the chunk, the session is not found: " & $sessionId) except LPError as e: - return err("Stream error: " & $e.msg) + return err("Failed to upload the chunk, stream error: " & $e.msg) except CancelledError as e: - return err("Operation cancelled") + return err("Failed to upload the chunk, operation cancelled.") return ok("") @@ -189,7 +206,8 @@ proc finalize( ## case of errors). if not uploadSessions.contains($sessionId): - return err("Invalid session ID") + return + err("Failed to finalize the upload session, session not found: " & $sessionId) var session: UploadSession try: @@ -198,17 +216,18 @@ proc finalize( let res = await session.fut if res.isErr: - return err("Upload failed: " & res.error().msg) + return err("Failed to finalize the upload session: " & res.error().msg) return ok($res.get()) except KeyError as e: - return err("Invalid session ID") + return + err("Failed to finalize the upload session, invalid session ID: " & $sessionId) except LPStreamError as e: - return err("Stream error: " & $e.msg) + return err("Failed to finalize the upload session, stream error: " & $e.msg) except CancelledError as e: - return err("Operation cancelled") + return err("Failed to finalize the upload session, operation cancelled") except CatchableError as e: - return err("Upload failed: " & $e.msg) + return err("Failed to finalize the upload session: " & $e.msg) finally: if uploadSessions.contains($sessionId): uploadSessions.del($sessionId) @@ -224,20 +243,20 @@ proc cancel( ## from the table. if not uploadSessions.contains($sessionId): - return err("Invalid session ID") + return err("Failed to cancel the upload session, session not found: " & $sessionId) try: let session = uploadSessions[$sessionId] session.fut.cancelSoon() except KeyError as e: - return err("Invalid session ID") + return err("Failed to cancel the upload session, invalid session ID: " & $sessionId) uploadSessions.del($sessionId) return ok("") proc streamFile( - filepath: string, stream: BufferStream + filepath: string, stream: BufferStream, chunkSize: int ): Future[Result[void, string]] {.async: (raises: [CancelledError]).} = ## Streams a file from the given filepath using faststream. ## fsMultiSync cannot be used with chronos because of this warning: @@ -252,43 +271,54 @@ proc streamFile( let inputStreamHandle = filePath.fileInput() let inputStream = inputStreamHandle.implicitDeref + var buf = newSeq[byte](chunkSize) while inputStream.readable: - let byt = inputStream.read - await stream.pushData(@[byt]) + let read = inputStream.readIntoEx(buf) + if read == 0: + break + await stream.pushData(buf[0 ..< read]) + # let byt = inputStream.read + # await stream.pushData(@[byt]) return ok() except IOError, OSError, LPStreamError: let e = getCurrentException() - return err("Stream error: " & $e.msg) + return err("Failed to stream the file: " & $e.msg) proc file( - codex: ptr CodexServer, sessionId: cstring + codex: ptr CodexServer, sessionId: cstring, onProgress: OnProgressHandler ): Future[Result[string, string]] {.raises: [], async: (raises: []).} = ## Starts the file upload for the session identified by sessionId. ## Will call finalize when done and return the CID of the uploaded file. ## In the finally block, the cleanup section removes the session ## from the table and cancels the future if it is not complete (in ## case of errors). + ## + ## If `onProgress` is provided, it is called with the number of bytes + ## received each time a block is stored thanks to `onBlockStore` callback. + if not uploadSessions.contains($sessionId): - return err("Invalid session ID") + return err("Failed to upload the file, invalid session ID: " & $sessionId) var session: UploadSession try: session = uploadSessions[$sessionId] - let res = await streamFile(session.filepath, session.stream) + if onProgress != nil: + uploadSessions[$sessionId].onProgress = onProgress + let res = await streamFile(session.filepath, session.stream, session.chunkSize) if res.isErr: - return err("Failed to stream file: " & res.error) + return err("Failed to upload the file: " & res.error) return await codex.finalize(sessionId) except KeyError as e: - return err("Invalid session ID") + return err("Failed to upload the file, the session is not found: " & $sessionId) except LPStreamError, IOError: let e = getCurrentException() - return err("Stream error: " & $e.msg) + return err("Failed to upload the file: " & $e.msg) except CancelledError as e: - return err("Operation cancelled") + return err("Failed to upload the file, the operation is cancelled.") except CatchableError as e: - return err("Upload failed: " & $e.msg) + return err("Failed to upload the file: " & $e.msg) finally: if uploadSessions.contains($sessionId): uploadSessions.del($sessionId) @@ -296,6 +326,46 @@ proc file( if session.fut != nil and not session.fut.finished(): session.fut.cancelSoon() +proc subscribe( + codex: ptr CodexServer, sessionId: cstring, onProgress: OnProgressHandler +): Future[Result[string, string]] {.raises: [], async: (raises: []).} = + ## Subscribes to progress updates for the upload session identified by sessionId. + ## The onProgress handler is called with the number of bytes received + ## each time a block is stored thanks to `onBlockStore` callback. + + if not uploadSessions.contains($sessionId): + return err( + "Failed to subscribe to the upload session, invalid session ID: " & $sessionId + ) + + let fut = newFuture[void]() + + proc onBlockReceived(bytes: int): Future[void] {.async: (raises: [CancelledError]).} = + try: + let session = uploadSessions[$sessionId] + await onProgress(bytes) + + if bytes == 0: + fut.complete() + except KeyError: + fut.cancelSoon() + error "Failed to push progress update, session is not found: ", + sessionId = $sessionId + + try: + uploadSessions[$sessionId].onProgress = onBlockReceived + except KeyError: + return err( + "Failed to subscribe to the upload session, session is not found: " & $sessionId + ) + + try: + await fut + except CatchableError as e: + return err("Failed to subscribe to the upload session: " & $e.msg) + + return ok("") + proc process*( self: ptr NodeUploadRequest, codex: ptr CodexServer ): Future[Result[string, string]] {.async: (raises: []).} = @@ -304,7 +374,7 @@ proc process*( case self.operation of NodeUploadMsgType.INIT: - let res = (await init(codex, self.filepath, self.chunkSize, self.onProgress)) + let res = (await init(codex, self.filepath, self.chunkSize)) if res.isErr: error "INIT failed", error = res.error return err($res.error) @@ -328,10 +398,16 @@ proc process*( return err($res.error) return res of NodeUploadMsgType.FILE: - let res = (await file(codex, self.sessionId)) + let res = (await file(codex, self.sessionId, self.onProgress)) if res.isErr: error "FILE failed", error = res.error return err($res.error) return res + of NodeUploadMsgType.SUBSCRIBE: + let res = (await subscribe(codex, self.sessionId, self.onProgress)) + if res.isErr: + error "SUBSCRIBE failed", error = res.error + return err($res.error) + return res return ok("") diff --git a/library/libcodex.h b/library/libcodex.h index cd0d2fae..8523e86a 100644 --- a/library/libcodex.h +++ b/library/libcodex.h @@ -115,6 +115,12 @@ int codex_upload_file( CodexCallback callback, void* userData); +int codex_upload_subscribe( + void* ctx, + const char* sessionId, + CodexCallback callback, + void* userData); + int codex_start(void* ctx, CodexCallback callback, void* userData); diff --git a/library/libcodex.nim b/library/libcodex.nim index 3c3f3efe..b537707e 100644 --- a/library/libcodex.nim +++ b/library/libcodex.nim @@ -38,6 +38,9 @@ import ./ffi_types from ../codex/conf import codexVersion, updateLogLevel +logScope: + topics = "codexlib" + template checkLibcodexParams*( ctx: ptr CodexContext, callback: CodexCallback, userData: pointer ) = @@ -76,10 +79,6 @@ proc initializeLibrary() {.exported.} = locals = addr(locals) nimGC_setStackBottom(locals) -template init(ctx, callback, userData) = - initializeLibrary() - checkLibcodexParams(ctx, callback, userData) - proc codex_new( configJson: cstring, callback: CodexCallback, userData: pointer ): pointer {.dynlib, exported.} = @@ -111,7 +110,8 @@ proc codex_new( proc codex_version( ctx: ptr CodexContext, callback: CodexCallback, userData: pointer ): cint {.dynlib, exportc.} = - init(ctx, callback, userData) + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) callback( RET_OK, @@ -125,7 +125,8 @@ proc codex_version( proc codex_revision( ctx: ptr CodexContext, callback: CodexCallback, userData: pointer ): cint {.dynlib, exportc.} = - init(ctx, callback, userData) + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) callback( RET_OK, @@ -139,7 +140,8 @@ proc codex_revision( proc codex_repo( ctx: ptr CodexContext, callback: CodexCallback, userData: pointer ): cint {.dynlib, exportc.} = - init(ctx, callback, userData) + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) let reqContent = NodeInfoRequest.createShared(NodeInfoMsgType.REPO) let res = codex_context.sendRequestToCodexThread( @@ -151,7 +153,8 @@ proc codex_repo( proc codex_debug( ctx: ptr CodexContext, callback: CodexCallback, userData: pointer ): cint {.dynlib, exportc.} = - init(ctx, callback, userData) + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) let reqContent = NodeDebugRequest.createShared(NodeDebugMsgType.DEBUG) let res = codex_context.sendRequestToCodexThread( @@ -163,7 +166,8 @@ proc codex_debug( proc codex_spr( ctx: ptr CodexContext, callback: CodexCallback, userData: pointer ): cint {.dynlib, exportc.} = - init(ctx, callback, userData) + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) let reqContent = NodeInfoRequest.createShared(NodeInfoMsgType.SPR) let res = codex_context.sendRequestToCodexThread( @@ -175,7 +179,8 @@ proc codex_spr( proc codex_peer_id( ctx: ptr CodexContext, callback: CodexCallback, userData: pointer ): cint {.dynlib, exportc.} = - init(ctx, callback, userData) + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) let reqContent = NodeInfoRequest.createShared(NodeInfoMsgType.PEERID) let res = codex_context.sendRequestToCodexThread( @@ -190,7 +195,8 @@ proc codex_peer_id( proc codex_log_level( ctx: ptr CodexContext, logLevel: cstring, callback: CodexCallback, userData: pointer ): cint {.dynlib, exportc.} = - init(ctx, callback, userData) + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) try: updateLogLevel($logLevel) @@ -207,7 +213,8 @@ proc codex_connect( callback: CodexCallback, userData: pointer, ): cint {.dynlib, exportc.} = - init(ctx, callback, userData) + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) var peerAddresses = newSeq[cstring](peerAddressesLength) let peers = cast[ptr UncheckedArray[cstring]](peerAddressesPtr) @@ -226,7 +233,8 @@ proc codex_connect( proc codex_peer_debug( ctx: ptr CodexContext, peerId: cstring, callback: CodexCallback, userData: pointer ): cint {.dynlib, exportc.} = - init(ctx, callback, userData) + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) let reqContent = NodeDebugRequest.createShared(NodeDebugMsgType.PEER, peerId = peerId) let res = codex_context.sendRequestToCodexThread( @@ -238,7 +246,8 @@ proc codex_peer_debug( proc codex_destroy( ctx: ptr CodexContext, callback: CodexCallback, userData: pointer ): cint {.dynlib, exportc.} = - init(ctx, callback, userData) + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) let res = codex_context.destroyCodexContext(ctx) if res.isErr: @@ -253,19 +262,13 @@ proc codex_upload_init( callback: CodexCallback, userData: pointer, ): cint {.dynlib, exportc.} = - init(ctx, callback, userData) - - let onProgress = proc( - bytes: int - ): Future[void] {.gcsafe, async: (raises: [CancelledError]).} = - callback(RET_PROGRESS, nil, bytes.csize_t, userData) + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) let reqContent = NodeUploadRequest.createShared( - NodeUploadMsgType.INIT, - filepath = filepath, - chunkSize = chunkSize, - onProgress = onProgress, + NodeUploadMsgType.INIT, filepath = filepath, chunkSize = chunkSize ) + let res = codex_context.sendRequestToCodexThread( ctx, RequestType.UPLOAD, reqContent, callback, userData ) @@ -280,7 +283,8 @@ proc codex_upload_chunk( callback: CodexCallback, userData: pointer, ): cint {.dynlib, exportc.} = - init(ctx, callback, userData) + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) let chunk = newSeq[byte](len) copyMem(addr chunk[0], data, len) @@ -300,7 +304,8 @@ proc codex_upload_finalize( callback: CodexCallback, userData: pointer, ): cint {.dynlib, exportc.} = - init(ctx, callback, userData) + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) let reqContent = NodeUploadRequest.createShared(NodeUploadMsgType.FINALIZE, sessionId = sessionId) @@ -316,7 +321,8 @@ proc codex_upload_cancel( callback: CodexCallback, userData: pointer, ): cint {.dynlib, exportc.} = - init(ctx, callback, userData) + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) let reqContent = NodeUploadRequest.createShared(NodeUploadMsgType.CANCEL, sessionId = sessionId) @@ -333,10 +339,43 @@ proc codex_upload_file( callback: CodexCallback, userData: pointer, ): cint {.dynlib, exportc.} = - init(ctx, callback, userData) + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) - let reqContent = - NodeUploadRequest.createShared(NodeUploadMsgType.FILE, sessionId = sessionId) + let onProgress = proc( + bytes: int + ): Future[void] {.async: (raises: [CancelledError]).} = + if userData != nil: + callback(RET_PROGRESS, nil, cast[csize_t](bytes), userData) + + let reqContent = NodeUploadRequest.createShared( + NodeUploadMsgType.FILE, sessionId = sessionId, onProgress = onProgress + ) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.UPLOAD, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_upload_subscribe( + ctx: ptr CodexContext, + sessionId: cstring, + callback: CodexCallback, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let onProgress = proc( + bytes: int + ): Future[void] {.async: (raises: [CancelledError]).} = + if userData != nil: + callback(RET_PROGRESS, nil, cast[csize_t](bytes), userData) + + let reqContent = NodeUploadRequest.createShared( + NodeUploadMsgType.SUBSCRIBE, sessionId = sessionId, onProgress = onProgress + ) let res = codex_context.sendRequestToCodexThread( ctx, RequestType.UPLOAD, reqContent, callback, userData @@ -347,7 +386,8 @@ proc codex_upload_file( proc codex_start( ctx: ptr CodexContext, callback: CodexCallback, userData: pointer ): cint {.dynlib, exportc.} = - init(ctx, callback, userData) + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) let reqContent: ptr NodeLifecycleRequest = NodeLifecycleRequest.createShared(NodeLifecycleMsgType.START_NODE) @@ -360,7 +400,8 @@ proc codex_start( proc codex_stop( ctx: ptr CodexContext, callback: CodexCallback, userData: pointer ): cint {.dynlib, exportc.} = - init(ctx, callback, userData) + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) let reqContent: ptr NodeLifecycleRequest = NodeLifecycleRequest.createShared(NodeLifecycleMsgType.STOP_NODE)