diff --git a/examples/golang/codex.go b/examples/golang/codex.go index 66127126..979616fc 100644 --- a/examples/golang/codex.go +++ b/examples/golang/codex.go @@ -132,8 +132,8 @@ package main return codex_download_chunk(codexCtx, cid, (CodexCallback) callback, resp); } - static int cGoCodexDownloadLocal(void* codexCtx, char* cid, size_t chunkSize, void* resp) { - return codex_download_local(codexCtx, cid, chunkSize, (CodexCallback) callback, resp); + static int cGoCodexDownloadStream(void* codexCtx, char* cid, size_t chunkSize, bool local, const char* filepath, void* resp) { + return codex_download_stream(codexCtx, cid, chunkSize, local, filepath, (CodexCallback) callback, resp); } static int cGoCodexDownloadCancel(void* codexCtx, char* cid, void* resp) { @@ -270,12 +270,39 @@ type CodexNode struct { const defaultBlockSize = 1024 * 64 -type OnProgressFunc func(read, total int, percent float64, err error) +type OnUploadProgressFunc func(read, total int, percent float64, err error) + +type ChunckSize int type CodexUploadOptions struct { filepath string - chunkSize int - onProgress OnProgressFunc + chunkSize ChunckSize + onProgress OnUploadProgressFunc +} + +func (c ChunckSize) valOrDefault() int { + if c == 0 { + return defaultBlockSize + } + + return int(c) +} + +func (c ChunckSize) toSizeT() C.size_t { + return C.size_t(c.valOrDefault()) +} + +type CodexDownloadStreamOptions = struct { + filepath string + chunkSize ChunckSize + onProgress OnUploadProgressFunc + writer io.Writer + local bool +} + +type CodexDownloadInitOptions = struct { + local bool + chunkSize ChunckSize } type bridgeCtx struct { @@ -285,8 +312,16 @@ type bridgeCtx struct { result string err error - // Callback used for upload and download - onProgress func(read int, chunk []byte) + // Callback used for receiving progress updates during upload/download. + // + // For the upload, the bytes parameter indicates the number of bytes uploaded. + // If the chunk size is superior or equal to the blocksize (passed in init function), + // the callback will be called when a block is put in the store. + // Otherwise, it will be called when a chunk is pushed into the stream. + // + // For the download, the bytes is the size of the chunk received, and the chunk + // is the actual chunk of data received. + onProgress func(bytes int, chunk []byte) } func newBridgeCtx() *bridgeCtx { @@ -559,12 +594,7 @@ func (self CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, erro var cFilename = C.CString(options.filepath) defer C.free(unsafe.Pointer(cFilename)) - if options.chunkSize == 0 { - options.chunkSize = defaultBlockSize - } - var cChunkSize = C.size_t(options.chunkSize) - - if C.cGoCodexUploadInit(self.ctx, cFilename, cChunkSize, bridge.resp) != C.RET_OK { + if C.cGoCodexUploadInit(self.ctx, cFilename, options.chunkSize.toSizeT(), bridge.resp) != C.RET_OK { return "", bridge.CallError("cGoCodexUploadInit") } @@ -629,11 +659,7 @@ func (self CodexNode) CodexUploadReader(options CodexUploadOptions, r io.Reader) return "", err } - if options.chunkSize == 0 { - options.chunkSize = defaultBlockSize - } - - buf := make([]byte, options.chunkSize) + buf := make([]byte, options.chunkSize.valOrDefault()) total := 0 var size int64 @@ -746,29 +772,44 @@ func (self CodexNode) CodexUploadFileAsync(options CodexUploadOptions, onDone fu }() } -func (self CodexNode) CodexDownloadLocal(cid string, chunkSize int, w io.Writer) error { +func (self CodexNode) CodexDownloadStream(cid string, options CodexDownloadStreamOptions) error { bridge := newBridgeCtx() defer bridge.free() + total := 0 bridge.onProgress = func(read int, chunk []byte) { if read == 0 { return } - if _, err := w.Write(chunk); err != nil { - log.Println(err) + if options.writer != nil { + w := options.writer + if _, err := w.Write(chunk); err != nil { + if options.onProgress != nil { + options.onProgress(0, 0, 0.0, err) + } + } + } + + total += read + + if options.onProgress != nil { + // TODO: retrieve the total size from the manifest of from the options struct + percent := 0.0 + + options.onProgress(read, total, percent, nil) } } var cCid = C.CString(cid) defer C.free(unsafe.Pointer(cCid)) - if chunkSize == 0 { - chunkSize = defaultBlockSize - } - var cChunkSize = C.size_t(chunkSize) + var cFilepath = C.CString(options.filepath) + defer C.free(unsafe.Pointer(cFilepath)) - if C.cGoCodexDownloadLocal(self.ctx, cCid, cChunkSize, bridge.resp) != C.RET_OK { + var cLocal = C.bool(options.local) + + if C.cGoCodexDownloadStream(self.ctx, cCid, options.chunkSize.toSizeT(), cLocal, cFilepath, bridge.resp) != C.RET_OK { return bridge.CallError("cGoCodexDownloadLocal") } @@ -777,27 +818,16 @@ func (self CodexNode) CodexDownloadLocal(cid string, chunkSize int, w io.Writer) return err } -func (self CodexNode) CodexDownloadLocalAsync(cid string, chunkSize int, w io.Writer, onDone func(error)) { - go func() { - err := self.CodexDownloadLocal(cid, chunkSize, w) - onDone(err) - }() -} - -func (self CodexNode) CodexDownloadInit(cid string, chunkSize int, local bool) error { +func (self CodexNode) CodexDownloadInit(cid string, options CodexDownloadInitOptions) error { bridge := newBridgeCtx() defer bridge.free() var cCid = C.CString(cid) defer C.free(unsafe.Pointer(cCid)) - if chunkSize == 0 { - chunkSize = defaultBlockSize - } - var cChunkSize = C.size_t(chunkSize) - var cLocal = C.bool(local) + var cLocal = C.bool(options.local) - if C.cGoCodexDownloadInit(self.ctx, cCid, cChunkSize, cLocal, bridge.resp) != C.RET_OK { + if C.cGoCodexDownloadInit(self.ctx, cCid, options.chunkSize.toSizeT(), cLocal, bridge.resp) != C.RET_OK { return bridge.CallError("cGoCodexDownloadInit") } @@ -1009,7 +1039,7 @@ func main() { buf := bytes.NewBuffer([]byte("Hello World!")) cid, err = node.CodexUploadReader(CodexUploadOptions{filepath: "hello.txt", onProgress: func(read, total int, percent float64, err error) { if err != nil { - log.Fatal("Error happened during upload: %v\n", err) + log.Fatalf("Error happened during upload: %v\n", err) } log.Printf("Uploaded %d bytes, total %d bytes (%.2f%%)\n", read, total, percent) @@ -1030,7 +1060,7 @@ func main() { options := CodexUploadOptions{filepath: filepath, onProgress: func(read, total int, percent float64, err error) { if err != nil { - log.Fatal("Error happened during upload: %v\n", err) + log.Fatalf("Error happened during upload: %v\n", err) } log.Printf("Uploaded %d bytes, total %d bytes (%.2f%%)\n", read, total, percent) @@ -1050,17 +1080,20 @@ func main() { } defer f.Close() - // log.Println("Codex Download Local starting... attempt", i+1) - - if err := node.CodexDownloadLocal(cid, 0, f); err != nil { + if err := node.CodexDownloadStream(cid, + CodexDownloadStreamOptions{writer: f, filepath: "hello.reloaded.txt", + onProgress: func(read, total int, percent float64, err error) { + log.Println("Downloaded", read, "bytes. Total:", total, "bytes (", percent, "%)") + }, + }); err != nil { log.Fatal("Error happened:", err.Error()) } - log.Println("Codex Download Local finished.") + log.Println("Codex Download finished.") // log.Println("Codex Download Init starting... attempt", i+1) - if err := node.CodexDownloadInit(cid, 0, true); err != nil { + if err := node.CodexDownloadInit(cid, CodexDownloadInitOptions{local: true}); err != nil { log.Fatal("Error happened:", err.Error()) } diff --git a/library/codex_thread_requests/requests/node_download_request.nim b/library/codex_thread_requests/requests/node_download_request.nim index a883348d..61a73bc5 100644 --- a/library/codex_thread_requests/requests/node_download_request.nim +++ b/library/codex_thread_requests/requests/node_download_request.nim @@ -1,6 +1,19 @@ {.push raises: [].} ## This file contains the download request. +## A session is created for each download identified by the CID, +## allowing to resume, pause and cancel the download (using chunks). +## +## There are two ways to download a file: +## 1. Via chunks: the cid parameter is the CID of the file to download. Steps are: +## - INIT: initializes the download session +## - CHUNK: downloads the next chunk of the file +## - CANCEL: cancels the download session +## 2. Via stream. +## - STREAM: downloads the file in a streaming manner, calling +## the onChunk handler for each chunk and / or writing to a file if filepath is set. +## Cancel is supported in this mode because the worker will be busy +## downloading the file so it cannot pickup another request to cancel the download. import std/[options, streams] import chronos @@ -19,9 +32,8 @@ logScope: type NodeDownloadMsgType* = enum INIT - LOCAL - NETWORK CHUNK + STREAM CANCEL type OnChunkHandler = proc(bytes: seq[byte]): void {.gcsafe, raises: [].} @@ -31,6 +43,7 @@ type NodeDownloadRequest* = object cid: cstring chunkSize: csize_t local: bool + filepath: cstring type DownloadSessionId* = string @@ -47,12 +60,14 @@ proc createShared*( cid: cstring = "", chunkSize: csize_t = 0, local: bool = false, + filepath: cstring = "", ): ptr type T = var ret = createShared(T) ret[].operation = op ret[].cid = cid.alloc() ret[].chunkSize = chunkSize ret[].local = local + ret[].filepath = filepath.alloc() return ret @@ -60,10 +75,7 @@ proc destroyShared(self: ptr NodeDownloadRequest) = deallocShared(self) proc init( - codex: ptr CodexServer, - cCid: cstring = "", - chunkSize: csize_t = 0, - local: bool = true, + codex: ptr CodexServer, cCid: cstring = "", chunkSize: csize_t = 0, local: bool ): Future[Result[string, string]] {.async: (raises: []).} = if downloadSessions.contains($cCid): return ok("Download session already exists.") @@ -128,12 +140,13 @@ proc chunk( return ok("") -proc streamFile( +proc streamData( codex: ptr CodexServer, cid: Cid, - local: bool = true, + local: bool, onChunk: OnChunkHandler, chunkSize: csize_t, + filepath: cstring, ): Future[Result[string, string]] {.async: (raises: [CancelledError]).} = let node = codex[].node @@ -148,9 +161,14 @@ proc streamFile( let blockSize = if chunkSize.int > 0: chunkSize.int else: DefaultBlockSize.int var buf = newSeq[byte](blockSize) - var read = 0 + var outputStream: OutputStreamHandle + var filedest: string = $filepath + try: + if filepath != "": + outputStream = filedest.fileOutput() + while not stream.atEof: let read = await stream.readOnce(addr buf[0], buf.len) buf.setLen(read) @@ -158,18 +176,30 @@ proc streamFile( if buf.len <= 0: break - if onChunk != nil: - onChunk(buf) + onChunk(buf) + + if outputStream != nil: + outputStream.write(buf) + + if outputStream != nil: + outputStream.close() except LPStreamError as e: return err("Failed to stream file: " & $e.msg) + except IOError as e: + return err("Failed to write to file: " & $e.msg) finally: await stream.close() downloadSessions.del($cid) return ok("") -proc local( - codex: ptr CodexServer, cCid: cstring, chunkSize: csize_t, onChunk: OnChunkHandler +proc stream( + codex: ptr CodexServer, + cCid: cstring, + chunkSize: csize_t, + local: bool, + filepath: cstring, + onChunk: OnChunkHandler, ): Future[Result[string, string]] {.raises: [], async: (raises: []).} = let node = codex[].node @@ -178,8 +208,7 @@ proc local( return err("Failed to download locally: cannot parse cid: " & $cCid) try: - let local = true - let res = await codex.streamFile(cid.get(), true, onChunk, chunkSize) + let res = await codex.streamData(cid.get(), local, onChunk, chunkSize, filepath) if res.isErr: return err($res.error) except CancelledError: @@ -218,25 +247,20 @@ proc process*( error "Failed to INIT.", error = res.error return err($res.error) return res - of NodeDownloadMsgType.LOCAL: - let res = (await local(codex, self.cid, self.chunkSize, onChunk)) - if res.isErr: - error "Failed to LOCAL.", error = res.error - return err($res.error) - return res - of NodeDownloadMsgType.NETWORK: - return err("NETWORK download not implemented yet.") - # let res = (await local(codex, self.cid, self.onChunk2, self.chunkSize, onChunk)) - # if res.isErr: - # error "Failed to NETWORK.", error = res.error - # return err($res.error) - # return res of NodeDownloadMsgType.CHUNK: let res = (await chunk(codex, self.cid, onChunk)) if res.isErr: error "Failed to CHUNK.", error = res.error return err($res.error) return res + of NodeDownloadMsgType.STREAM: + let res = ( + await stream(codex, self.cid, self.chunkSize, self.local, self.filepath, onChunk) + ) + if res.isErr: + error "Failed to STREAM.", error = res.error + return err($res.error) + return res of NodeDownloadMsgType.CANCEL: let res = (await cancel(codex, self.cid)) if res.isErr: diff --git a/library/codex_thread_requests/requests/node_upload_request.nim b/library/codex_thread_requests/requests/node_upload_request.nim index 7dc70d4e..c4ed75d8 100644 --- a/library/codex_thread_requests/requests/node_upload_request.nim +++ b/library/codex_thread_requests/requests/node_upload_request.nim @@ -2,7 +2,7 @@ ## This file contains the upload request. ## A session is created for each upload allowing to resume, -## pause (using chunks) and cancels uploads. +## pause and cancel uploads (using chunks). ## ## There are two ways to upload a file: ## 1. Via chunks: the filepath parameter is the data filename. Steps are: diff --git a/library/libcodex.h b/library/libcodex.h index 3386c52b..6bd50309 100644 --- a/library/libcodex.h +++ b/library/libcodex.h @@ -115,10 +115,12 @@ int codex_upload_file( CodexCallback callback, void* userData); -int codex_download_local( +int codex_download_stream( void* ctx, const char* cid, size_t chunkSize, + bool local, + const char* filepath, CodexCallback callback, void* userData); diff --git a/library/libcodex.nim b/library/libcodex.nim index 66de2cd1..ffcefa63 100644 --- a/library/libcodex.nim +++ b/library/libcodex.nim @@ -387,10 +387,12 @@ proc codex_download_chunk( result = callback.okOrError(res, userData) -proc codex_download_local( +proc codex_download_stream( ctx: ptr CodexContext, cid: cstring, chunkSize: csize_t, + local: bool, + filepath: cstring, callback: CodexCallback, userData: pointer, ): cint {.dynlib, exportc.} = @@ -398,7 +400,11 @@ proc codex_download_local( checkLibcodexParams(ctx, callback, userData) let req = NodeDownloadRequest.createShared( - NodeDownloadMsgType.LOCAL, cid = cid, chunkSize = chunkSize + NodeDownloadMsgType.STREAM, + cid = cid, + chunkSize = chunkSize, + local = local, + filepath = filepath, ) let res = codex_context.sendRequestToCodexThread(