diff --git a/examples/golang/codex.go b/examples/golang/codex.go index 5c04a9f1..66127126 100644 --- a/examples/golang/codex.go +++ b/examples/golang/codex.go @@ -124,8 +124,20 @@ package main return codex_upload_file(codexCtx, sessionId, (CodexCallback) callback, resp); } - static int cGoCodexUploadSubscribe(void* codexCtx, char* sessionId, void* resp) { - return codex_upload_subscribe(codexCtx, sessionId, (CodexCallback) callback, resp); + static int cGoCodexDownloadInit(void* codexCtx, char* cid, size_t chunkSize, bool local, void* resp) { + return codex_download_init(codexCtx, cid, chunkSize, local, (CodexCallback) callback, resp); + } + + static int cGoCodexDownloadChunk(void* codexCtx, char* cid, void* resp) { + return codex_download_chunk(codexCtx, cid, (CodexCallback) callback, resp); + } + + static int cGoCodexDownloadLocal(void* codexCtx, char* cid, size_t chunkSize, void* resp) { + return codex_download_local(codexCtx, cid, chunkSize, (CodexCallback) callback, resp); + } + + static int cGoCodexDownloadCancel(void* codexCtx, char* cid, void* resp) { + return codex_download_cancel(codexCtx, cid, (CodexCallback) callback, resp); } static int cGoCodexStart(void* codexCtx, void* resp) { @@ -158,7 +170,6 @@ package main */ import "C" - import ( "bytes" "encoding/json" @@ -236,7 +247,7 @@ type RestNode struct { NodeId string `json:"nodeId"` PeerId string `json:"peerId"` Record string `json:"record"` - Address *string `json:"address"` // Use pointer for nullable + Address *string `json:"address"` Seen bool `json:"seen"` } @@ -259,7 +270,7 @@ type CodexNode struct { const defaultBlockSize = 1024 * 64 -type OnProgressFunc func(read, total int, percent float64) +type OnProgressFunc func(read, total int, percent float64, err error) type CodexUploadOptions struct { filepath string @@ -275,16 +286,16 @@ type bridgeCtx struct { err error // Callback used for upload and download - onProgress func(read int) + onProgress func(read int, chunk []byte) } func newBridgeCtx() *bridgeCtx { bridge := &bridgeCtx{} bridge.wg = &sync.WaitGroup{} bridge.wg.Add(1) - bridge.h = cgo.NewHandle(bridge) bridge.resp = C.allocResp(C.uintptr_t(uintptr(bridge.h))) + return bridge } @@ -307,12 +318,7 @@ func (b *bridgeCtx) CallError(name string) error { func (b *bridgeCtx) wait() (string, error) { b.wg.Wait() - result := b.result - err := b.err - - b.free() - - return result, err + return b.result, b.err } func getReaderSize(r io.Reader) int64 { @@ -354,8 +360,15 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { if v, ok := h.Value().(*bridgeCtx); ok { switch ret { case C.RET_PROGRESS: - if v.onProgress != nil { - v.onProgress(int(C.int(len))) + if v.onProgress == nil { + return + } + + if msg != nil { + chunk := C.GoBytes(unsafe.Pointer(msg), C.int(len)) + v.onProgress(int(C.int(len)), chunk) + } else { + v.onProgress(int(C.int(len)), nil) } case C.RET_OK: retMsg := C.GoStringN(msg, C.int(len)) @@ -378,8 +391,10 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { func CodexNew(config CodexConfig) (*CodexNode, error) { bridge := newBridgeCtx() + defer bridge.free() jsonConfig, err := json.Marshal(config) + if err != nil { return nil, err } @@ -398,6 +413,7 @@ func CodexNew(config CodexConfig) (*CodexNode, error) { func (self CodexNode) CodexVersion() (string, error) { bridge := newBridgeCtx() + defer bridge.free() if C.cGoCodexVersion(self.ctx, bridge.resp) != C.RET_OK { return "", bridge.CallError("cGoCodexVersion") @@ -408,6 +424,7 @@ func (self CodexNode) CodexVersion() (string, error) { func (self CodexNode) CodexRevision() (string, error) { bridge := newBridgeCtx() + defer bridge.free() if C.cGoCodexRevision(self.ctx, bridge.resp) != C.RET_OK { return "", bridge.CallError("cGoCodexRevision") @@ -418,6 +435,7 @@ func (self CodexNode) CodexRevision() (string, error) { func (self CodexNode) CodexRepo() (string, error) { bridge := newBridgeCtx() + defer bridge.free() if C.cGoCodexRepo(self.ctx, bridge.resp) != C.RET_OK { return "", bridge.CallError("cGoCodexRepo") @@ -430,6 +448,7 @@ func (self CodexNode) CodexDebug() (CodexDebugInfo, error) { var info CodexDebugInfo bridge := newBridgeCtx() + defer bridge.free() if C.cGoCodexDebug(self.ctx, bridge.resp) != C.RET_OK { return info, bridge.CallError("cGoCodexDebug") @@ -447,6 +466,7 @@ func (self CodexNode) CodexDebug() (CodexDebugInfo, error) { func (self CodexNode) CodexSpr() (string, error) { bridge := newBridgeCtx() + defer bridge.free() if C.cGoCodexSpr(self.ctx, bridge.resp) != C.RET_OK { return "", bridge.CallError("cGoCodexSpr") @@ -457,6 +477,7 @@ func (self CodexNode) CodexSpr() (string, error) { func (self CodexNode) CodexPeerId() (string, error) { bridge := newBridgeCtx() + defer bridge.free() if C.cGoCodexPeerId(self.ctx, bridge.resp) != C.RET_OK { return "", bridge.CallError("cGoCodexPeerId") @@ -467,6 +488,7 @@ func (self CodexNode) CodexPeerId() (string, error) { func (self CodexNode) CodexLogLevel(logLevel LogLevel) error { bridge := newBridgeCtx() + defer bridge.free() var cLogLevel = C.CString(fmt.Sprintf("%s", logLevel)) defer C.free(unsafe.Pointer(cLogLevel)) @@ -476,11 +498,13 @@ func (self CodexNode) CodexLogLevel(logLevel LogLevel) error { } _, err := bridge.wait() + return err } func (self CodexNode) CodexConnect(peerId string, peerAddresses []string) error { bridge := newBridgeCtx() + defer bridge.free() var cPeerId = C.CString(peerId) defer C.free(unsafe.Pointer(cPeerId)) @@ -509,6 +533,7 @@ func (self CodexNode) CodexPeerDebug(peerId string) (RestPeerRecord, error) { var record RestPeerRecord bridge := newBridgeCtx() + defer bridge.free() var cPeerId = C.CString(peerId) defer C.free(unsafe.Pointer(cPeerId)) @@ -529,6 +554,7 @@ func (self CodexNode) CodexPeerDebug(peerId string) (RestPeerRecord, error) { func (self CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, error) { bridge := newBridgeCtx() + defer bridge.free() var cFilename = C.CString(options.filepath) defer C.free(unsafe.Pointer(cFilename)) @@ -536,7 +562,6 @@ func (self CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, erro if options.chunkSize == 0 { options.chunkSize = defaultBlockSize } - var cChunkSize = C.size_t(options.chunkSize) if C.cGoCodexUploadInit(self.ctx, cFilename, cChunkSize, bridge.resp) != C.RET_OK { @@ -548,6 +573,7 @@ func (self CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, erro func (self CodexNode) CodexUploadChunk(sessionId string, chunk []byte) error { bridge := newBridgeCtx() + defer bridge.free() var cSessionId = C.CString(sessionId) defer C.free(unsafe.Pointer(cSessionId)) @@ -562,11 +588,13 @@ func (self CodexNode) CodexUploadChunk(sessionId string, chunk []byte) error { } _, err := bridge.wait() + return err } func (self CodexNode) CodexUploadFinalize(sessionId string) (string, error) { bridge := newBridgeCtx() + defer bridge.free() var cSessionId = C.CString(sessionId) defer C.free(unsafe.Pointer(cSessionId)) @@ -580,6 +608,7 @@ func (self CodexNode) CodexUploadFinalize(sessionId string) (string, error) { func (self CodexNode) CodexUploadCancel(sessionId string) error { bridge := newBridgeCtx() + defer bridge.free() var cSessionId = C.CString(sessionId) defer C.free(unsafe.Pointer(cSessionId)) @@ -589,66 +618,40 @@ func (self CodexNode) CodexUploadCancel(sessionId string) error { } _, err := bridge.wait() + return err } func (self CodexNode) CodexUploadReader(options CodexUploadOptions, r io.Reader) (string, error) { sessionId, err := self.CodexUploadInit(&options) + if err != nil { return "", err } - if options.onProgress != nil { - size := getReaderSize(r) - total := 0 - - if size > 0 { - onProgress := func(read int) { - if read == 0 { - return - } - - total += read - - percent := float64(total) / float64(size) * 100.0 - // The last block could be a bit over the size due to padding - // on the chunk size. - if percent > 100.0 { - percent = 100.0 - } - - options.onProgress(read, int(size), percent) - } - - if err := self.CodexUploadSubscribe(sessionId, onProgress); err != nil { - if err := self.CodexUploadCancel(sessionId); err != nil { - log.Println("Error cancelling upload after subscribe failure:", err) - } - - return "", err - } - } - } - if options.chunkSize == 0 { options.chunkSize = defaultBlockSize } buf := make([]byte, options.chunkSize) + total := 0 + var size int64 + + if options.onProgress != nil { + size = getReaderSize(r) + } + for { n, err := r.Read(buf) - if n > 0 { - if err := self.CodexUploadChunk(sessionId, buf[:n]); err != nil { - return "", err - } - } if err == io.EOF { break } if err != nil { - self.CodexUploadCancel(sessionId) + if cancelErr := self.CodexUploadCancel(sessionId); cancelErr != nil { + return "", fmt.Errorf("failed to upload chunk %v and failed to cancel upload session %v", err, cancelErr) + } return "", err } @@ -656,6 +659,25 @@ func (self CodexNode) CodexUploadReader(options CodexUploadOptions, r io.Reader) if n == 0 { break } + + if err := self.CodexUploadChunk(sessionId, buf[:n]); err != nil { + if cancelErr := self.CodexUploadCancel(sessionId); cancelErr != nil { + return "", fmt.Errorf("failed to upload chunk %v and failed to cancel upload session %v", err, cancelErr) + } + + return "", err + } + + total += n + if options.onProgress != nil && size > 0 { + percent := float64(total) / float64(size) * 100.0 + // The last block could be a bit over the size due to padding + // on the chunk size. + if percent > 100.0 { + percent = 100.0 + } + options.onProgress(n, total, percent, nil) + } } return self.CodexUploadFinalize(sessionId) @@ -670,9 +692,11 @@ func (self CodexNode) CodexUploadReaderAsync(options CodexUploadOptions, r io.Re func (self CodexNode) CodexUploadFile(options CodexUploadOptions) (string, error) { bridge := newBridgeCtx() + defer bridge.free() if options.onProgress != nil { stat, err := os.Stat(options.filepath) + if err != nil { return "", err } @@ -681,7 +705,7 @@ func (self CodexNode) CodexUploadFile(options CodexUploadOptions) (string, error total := 0 if size > 0 { - bridge.onProgress = func(read int) { + bridge.onProgress = func(read int, _ []byte) { if read == 0 { return } @@ -695,14 +719,11 @@ func (self CodexNode) CodexUploadFile(options CodexUploadOptions) (string, error percent = 100.0 } - options.onProgress(read, int(size), percent) + options.onProgress(read, int(size), percent, nil) } } } - var cFilePath = C.CString(options.filepath) - defer C.free(unsafe.Pointer(cFilePath)) - sessionId, err := self.CodexUploadInit(&options) if err != nil { return "", err @@ -725,27 +746,104 @@ func (self CodexNode) CodexUploadFileAsync(options CodexUploadOptions, onDone fu }() } -func (self CodexNode) CodexUploadSubscribe(sessionId string, onProgress func(read int)) error { +func (self CodexNode) CodexDownloadLocal(cid string, chunkSize int, w io.Writer) error { bridge := newBridgeCtx() + defer bridge.free() - bridge.onProgress = onProgress + bridge.onProgress = func(read int, chunk []byte) { + if read == 0 { + return + } - var cSessionId = C.CString(sessionId) - defer C.free(unsafe.Pointer(cSessionId)) - - log.Println("Subscribing to upload progress...") - - if C.cGoCodexUploadSubscribe(self.ctx, cSessionId, bridge.resp) != C.RET_OK { - return bridge.CallError("cGoCodexUploadSubscribe") + if _, err := w.Write(chunk); err != nil { + log.Println(err) + } } - go func() { - if _, err := bridge.wait(); err != nil { - log.Println("Error in CodexUploadSubscribe:", err) - } - }() + var cCid = C.CString(cid) + defer C.free(unsafe.Pointer(cCid)) - return nil + if chunkSize == 0 { + chunkSize = defaultBlockSize + } + var cChunkSize = C.size_t(chunkSize) + + if C.cGoCodexDownloadLocal(self.ctx, cCid, cChunkSize, bridge.resp) != C.RET_OK { + return bridge.CallError("cGoCodexDownloadLocal") + } + + _, err := bridge.wait() + + return err +} + +func (self CodexNode) CodexDownloadLocalAsync(cid string, chunkSize int, w io.Writer, onDone func(error)) { + go func() { + err := self.CodexDownloadLocal(cid, chunkSize, w) + onDone(err) + }() +} + +func (self CodexNode) CodexDownloadInit(cid string, chunkSize int, local bool) error { + bridge := newBridgeCtx() + defer bridge.free() + + var cCid = C.CString(cid) + defer C.free(unsafe.Pointer(cCid)) + + if chunkSize == 0 { + chunkSize = defaultBlockSize + } + var cChunkSize = C.size_t(chunkSize) + var cLocal = C.bool(local) + + if C.cGoCodexDownloadInit(self.ctx, cCid, cChunkSize, cLocal, bridge.resp) != C.RET_OK { + return bridge.CallError("cGoCodexDownloadInit") + } + + _, err := bridge.wait() + + return err +} + +func (self CodexNode) CodexDownloadChunk(cid string) ([]byte, error) { + bridge := newBridgeCtx() + defer bridge.free() + + var bytes []byte + + bridge.onProgress = func(read int, chunk []byte) { + bytes = chunk + } + + var cCid = C.CString(cid) + defer C.free(unsafe.Pointer(cCid)) + + if C.cGoCodexDownloadChunk(self.ctx, cCid, bridge.resp) != C.RET_OK { + return nil, bridge.CallError("cGoCodexDownloadChunk") + } + + if _, err := bridge.wait(); err != nil { + return nil, err + } + + return bytes, nil +} + +func (self CodexNode) CodexDownloadCancel(cid string) error { + bridge := newBridgeCtx() + defer bridge.free() + + var cCid = C.CString(cid) + defer C.free(unsafe.Pointer(cCid)) + + if C.cGoCodexDownloadCancel(self.ctx, cCid, bridge.resp) != C.RET_OK { + return bridge.CallError("cGoCodexDownloadCancel") + } + + _, err := bridge.wait() + + return err } func (self CodexNode) CodexStart() error { @@ -757,6 +855,7 @@ func (self CodexNode) CodexStart() error { } _, err := bridge.wait() + return err } @@ -769,7 +868,6 @@ func (self CodexNode) CodexStartAsync(onDone func(error)) { func (self CodexNode) CodexStop() error { bridge := newBridgeCtx() - defer bridge.free() if C.cGoCodexStop(self.ctx, bridge.resp) != C.RET_OK { return bridge.CallError("cGoCodexStop") @@ -781,7 +879,6 @@ func (self CodexNode) CodexStop() error { func (self CodexNode) CodexDestroy() error { bridge := newBridgeCtx() - defer bridge.free() if C.cGoCodexDestroy(self.ctx, bridge.resp) != C.RET_OK { return bridge.CallError("cGoCodexDestroy") @@ -851,6 +948,8 @@ func main() { log.Println("Codex started...") + // for i := 0; i < 150; i++ { + debug, err := node.CodexDebug() if err != nil { log.Fatal("Error happened:", err.Error()) @@ -908,7 +1007,11 @@ func main() { log.Println("Codex Upload Finalized, cid:", cid) buf := bytes.NewBuffer([]byte("Hello World!")) - cid, err = node.CodexUploadReader(CodexUploadOptions{filepath: "hello.txt", onProgress: func(read, total int, percent float64) { + cid, err = node.CodexUploadReader(CodexUploadOptions{filepath: "hello.txt", onProgress: func(read, total int, percent float64, err error) { + if err != nil { + log.Fatal("Error happened during upload: %v\n", err) + } + log.Printf("Uploaded %d bytes, total %d bytes (%.2f%%)\n", read, total, percent) }}, buf) if err != nil { @@ -921,12 +1024,15 @@ func main() { if err != nil { log.Fatal("Error happened:", err.Error()) } - // Choose a big file to see the progress logs filepath := path.Join(current, "examples", "golang", "hello.txt") //filepath := path.Join(current, "examples", "golang", "discord-0.0.109.deb") - options := CodexUploadOptions{filepath: filepath, onProgress: func(read, total int, percent float64) { + options := CodexUploadOptions{filepath: filepath, onProgress: func(read, total int, percent float64, err error) { + if err != nil { + log.Fatal("Error happened during upload: %v\n", err) + } + log.Printf("Uploaded %d bytes, total %d bytes (%.2f%%)\n", read, total, percent) }} @@ -938,6 +1044,38 @@ func main() { log.Println("Codex Upload File finalized, cid: .", cid) + f, err := os.Create("hello.loaded.txt") + if err != nil { + log.Fatal(err) + } + defer f.Close() + + // log.Println("Codex Download Local starting... attempt", i+1) + + if err := node.CodexDownloadLocal(cid, 0, f); err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Codex Download Local finished.") + + // log.Println("Codex Download Init starting... attempt", i+1) + + if err := node.CodexDownloadInit(cid, 0, true); err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Codex Download Init finished.") + + // log.Println("Codex Download Chunk starting... attempt", i+1) + + chunk, err := node.CodexDownloadChunk(cid) + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Codex Download Chunk finished. Size:", len(chunk)) + // } + // err = node.CodexConnect(peerId, []string{}) // if err != nil { // log.Fatal("Error happened:", err.Error()) diff --git a/library/codex_thread_requests/codex_thread_request.nim b/library/codex_thread_requests/codex_thread_request.nim index e717554b..d4a1e08c 100644 --- a/library/codex_thread_requests/codex_thread_request.nim +++ b/library/codex_thread_requests/codex_thread_request.nim @@ -11,6 +11,7 @@ import ./requests/node_info_request import ./requests/node_debug_request import ./requests/node_p2p_request import ./requests/node_upload_request +import ./requests/node_download_request from ../../codex/codex import CodexServer @@ -20,6 +21,7 @@ type RequestType* {.pure.} = enum DEBUG P2P UPLOAD + DOWNLOAD type CodexThreadRequest* = object reqType: RequestType @@ -94,11 +96,22 @@ proc process*( cast[ptr NodeDebugRequest](request[].reqContent).process(codex) of P2P: cast[ptr NodeP2PRequest](request[].reqContent).process(codex) + of DOWNLOAD: + let onChunk = proc(bytes: seq[byte]) = + if bytes.len > 0: + request[].callback( + RET_PROGRESS, + cast[ptr cchar](unsafeAddr bytes[0]), + cast[csize_t](bytes.len), + request[].userData, + ) + + cast[ptr NodeDownloadRequest](request[].reqContent).process(codex, onChunk) of UPLOAD: - let onUploadProgress = proc(bytes: int) = + let onBlockReceived = proc(bytes: int) = request[].callback(RET_PROGRESS, nil, cast[csize_t](bytes), request[].userData) - cast[ptr NodeUploadRequest](request[].reqContent).process(codex, onUploadProgress) + cast[ptr NodeUploadRequest](request[].reqContent).process(codex, onBlockReceived) handleRes(await retFut, request) diff --git a/library/codex_thread_requests/requests/node_download_request.nim b/library/codex_thread_requests/requests/node_download_request.nim new file mode 100644 index 00000000..a883348d --- /dev/null +++ b/library/codex_thread_requests/requests/node_download_request.nim @@ -0,0 +1,245 @@ +{.push raises: [].} + +## This file contains the download request. + +import std/[options, streams] +import chronos +import chronicles +import libp2p/stream/[lpstream] +import ../../alloc +import ../../../codex/units +import ../../../codex/codextypes + +from ../../../codex/codex import CodexServer, node +from ../../../codex/node import retrieve +from libp2p import Cid, init, `$` + +logScope: + topics = "codexlib codexlibdownload" + +type NodeDownloadMsgType* = enum + INIT + LOCAL + NETWORK + CHUNK + CANCEL + +type OnChunkHandler = proc(bytes: seq[byte]): void {.gcsafe, raises: [].} + +type NodeDownloadRequest* = object + operation: NodeDownloadMsgType + cid: cstring + chunkSize: csize_t + local: bool + +type + DownloadSessionId* = string + DownloadSessionCount* = int + DownloadSession* = object + stream: LPStream + chunkSize: int + +var downloadSessions {.threadvar.}: Table[DownloadSessionId, DownloadSession] + +proc createShared*( + T: type NodeDownloadRequest, + op: NodeDownloadMsgType, + cid: cstring = "", + chunkSize: csize_t = 0, + local: bool = false, +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].cid = cid.alloc() + ret[].chunkSize = chunkSize + ret[].local = local + + return ret + +proc destroyShared(self: ptr NodeDownloadRequest) = + deallocShared(self) + +proc init( + codex: ptr CodexServer, + cCid: cstring = "", + chunkSize: csize_t = 0, + local: bool = true, +): Future[Result[string, string]] {.async: (raises: []).} = + if downloadSessions.contains($cCid): + return ok("Download session already exists.") + + let cid = Cid.init($cCid) + if cid.isErr: + return err("Failed to download locally: cannot parse cid: " & $cCid) + + let node = codex[].node + var stream: LPStream + + try: + let res = await node.retrieve(cid.get(), local) + if res.isErr(): + return err("Failed to init the download: " & res.error.msg) + stream = res.get() + except CancelledError: + downloadSessions.del($cCid) + return err("Failed to init the download: download cancelled.") + + let blockSize = if chunkSize.int > 0: chunkSize.int else: DefaultBlockSize.int + downloadSessions[$cCid] = DownloadSession(stream: stream, chunkSize: blockSize) + + return ok("") + +proc chunk( + codex: ptr CodexServer, cid: cstring = "", onChunk: OnChunkHandler +): Future[Result[string, string]] {.async: (raises: []).} = + if not downloadSessions.contains($cid): + return err("Failed to download chunk: no session for cid " & $cid) + + var session: DownloadSession + try: + session = downloadSessions[$cid] + except KeyError: + return err("Failed to download chunk: no session for cid " & $cid) + + let stream = session.stream + let chunkSize = session.chunkSize + + if stream.atEof: + return ok("") + + var buf = newSeq[byte](chunkSize) + + try: + let read = await stream.readOnce(addr buf[0], buf.len) + buf.setLen(read) + except LPStreamError as e: + await stream.close() + downloadSessions.del($cid) + return err("Failed to download chunk: " & $e.msg) + except CancelledError: + await stream.close() + downloadSessions.del($cid) + return err("Failed to download chunk: download cancelled.") + + if buf.len <= 0: + return err("Failed to download chunk: no data") + + onChunk(buf) + + return ok("") + +proc streamFile( + codex: ptr CodexServer, + cid: Cid, + local: bool = true, + onChunk: OnChunkHandler, + chunkSize: csize_t, +): Future[Result[string, string]] {.async: (raises: [CancelledError]).} = + let node = codex[].node + + let res = await node.retrieve(cid, local = local) + if res.isErr(): + return err("Failed to retrieve CID: " & res.error.msg) + + let stream = res.get() + + if stream.atEof: + return err("Failed to retrieve CID: empty stream.") + + let blockSize = if chunkSize.int > 0: chunkSize.int else: DefaultBlockSize.int + var buf = newSeq[byte](blockSize) + + var read = 0 + try: + while not stream.atEof: + let read = await stream.readOnce(addr buf[0], buf.len) + buf.setLen(read) + + if buf.len <= 0: + break + + if onChunk != nil: + onChunk(buf) + except LPStreamError as e: + return err("Failed to stream file: " & $e.msg) + finally: + await stream.close() + downloadSessions.del($cid) + + return ok("") + +proc local( + codex: ptr CodexServer, cCid: cstring, chunkSize: csize_t, onChunk: OnChunkHandler +): Future[Result[string, string]] {.raises: [], async: (raises: []).} = + let node = codex[].node + + let cid = Cid.init($cCid) + if cid.isErr: + return err("Failed to download locally: cannot parse cid: " & $cCid) + + try: + let local = true + let res = await codex.streamFile(cid.get(), true, onChunk, chunkSize) + if res.isErr: + return err($res.error) + except CancelledError: + return err("Failed to download locally: download cancelled.") + + return ok("") + +proc cancel( + codex: ptr CodexServer, cCid: cstring +): Future[Result[string, string]] {.raises: [], async: (raises: []).} = + if not downloadSessions.contains($cCid): + return err("Failed to download chunk: no session for cid " & $cCid) + + var session: DownloadSession + try: + session = downloadSessions[$cCid] + except KeyError: + return err("Failed to download chunk: no session for cid " & $cCid) + + let stream = session.stream + await stream.close() + downloadSessions.del($cCid) + + return ok("") + +proc process*( + self: ptr NodeDownloadRequest, codex: ptr CodexServer, onChunk: OnChunkHandler +): Future[Result[string, string]] {.async: (raises: []).} = + defer: + destroyShared(self) + + case self.operation + of NodeDownloadMsgType.INIT: + let res = (await init(codex, self.cid, self.chunkSize, self.local)) + if res.isErr: + error "Failed to INIT.", error = res.error + return err($res.error) + return res + of NodeDownloadMsgType.LOCAL: + let res = (await local(codex, self.cid, self.chunkSize, onChunk)) + if res.isErr: + error "Failed to LOCAL.", error = res.error + return err($res.error) + return res + of NodeDownloadMsgType.NETWORK: + return err("NETWORK download not implemented yet.") + # let res = (await local(codex, self.cid, self.onChunk2, self.chunkSize, onChunk)) + # if res.isErr: + # error "Failed to NETWORK.", error = res.error + # return err($res.error) + # return res + of NodeDownloadMsgType.CHUNK: + let res = (await chunk(codex, self.cid, onChunk)) + if res.isErr: + error "Failed to CHUNK.", error = res.error + return err($res.error) + return res + of NodeDownloadMsgType.CANCEL: + let res = (await cancel(codex, self.cid)) + if res.isErr: + error "Failed to CANCEL.", error = res.error + return err($res.error) + return res diff --git a/library/libcodex.nim b/library/libcodex.nim index 05bfc3e9..66de2cd1 100644 --- a/library/libcodex.nim +++ b/library/libcodex.nim @@ -34,6 +34,7 @@ import ./codex_thread_requests/requests/node_info_request import ./codex_thread_requests/requests/node_debug_request import ./codex_thread_requests/requests/node_p2p_request import ./codex_thread_requests/requests/node_upload_request +import ./codex_thread_requests/requests/node_download_request import ./ffi_types from ../codex/conf import codexVersion, updateLogLevel @@ -342,15 +343,8 @@ proc codex_upload_file( initializeLibrary() checkLibcodexParams(ctx, callback, userData) - let onProgress = proc( - bytes: int - ): Future[void] {.async: (raises: [CancelledError]).} = - if userData != nil: - callback(RET_PROGRESS, nil, cast[csize_t](bytes), userData) - - let reqContent = NodeUploadRequest.createShared( - NodeUploadMsgType.FILE, sessionId = sessionId, onProgress = onProgress - ) + let reqContent = + NodeUploadRequest.createShared(NodeUploadMsgType.FILE, sessionId = sessionId) let res = codex_context.sendRequestToCodexThread( ctx, RequestType.UPLOAD, reqContent, callback, userData @@ -358,30 +352,74 @@ proc codex_upload_file( return callback.okOrError(res, userData) -proc codex_upload_subscribe( +proc codex_download_init( ctx: ptr CodexContext, - sessionId: cstring, + cid: cstring, + chunkSize: csize_t, + local: bool, callback: CodexCallback, userData: pointer, ): cint {.dynlib, exportc.} = initializeLibrary() checkLibcodexParams(ctx, callback, userData) - let onProgress = proc( - bytes: int - ): Future[void] {.async: (raises: [CancelledError]).} = - if userData != nil: - callback(RET_PROGRESS, nil, cast[csize_t](bytes), userData) - - let reqContent = NodeUploadRequest.createShared( - NodeUploadMsgType.SUBSCRIBE, sessionId = sessionId, onProgress = onProgress + let req = NodeDownloadRequest.createShared( + NodeDownloadMsgType.INIT, cid = cid, chunkSize = chunkSize, local = local ) let res = codex_context.sendRequestToCodexThread( - ctx, RequestType.UPLOAD, reqContent, callback, userData + ctx, RequestType.DOWNLOAD, req, callback, userData ) - return callback.okOrError(res, userData) + result = callback.okOrError(res, userData) + +proc codex_download_chunk( + ctx: ptr CodexContext, cid: cstring, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let req = NodeDownloadRequest.createShared(NodeDownloadMsgType.CHUNK, cid = cid) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.DOWNLOAD, req, callback, userData + ) + + result = callback.okOrError(res, userData) + +proc codex_download_local( + ctx: ptr CodexContext, + cid: cstring, + chunkSize: csize_t, + callback: CodexCallback, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let req = NodeDownloadRequest.createShared( + NodeDownloadMsgType.LOCAL, cid = cid, chunkSize = chunkSize + ) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.DOWNLOAD, req, callback, userData + ) + + result = callback.okOrError(res, userData) + +proc codex_download_cancel( + ctx: ptr CodexContext, cid: cstring, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let req = NodeDownloadRequest.createShared(NodeDownloadMsgType.CANCEL, cid = cid) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.DOWNLOAD, req, callback, userData + ) + + result = callback.okOrError(res, userData) proc codex_start( ctx: ptr CodexContext, callback: CodexCallback, userData: pointer