diff --git a/examples/golang/codex.go b/examples/golang/codex.go index 8d6ecabd..f921b95f 100644 --- a/examples/golang/codex.go +++ b/examples/golang/codex.go @@ -24,11 +24,7 @@ package main uintptr_t h; } Resp; - static void* allocResp() { - return calloc(1, sizeof(Resp)); - } - - static void* allocRespWithHandle(uintptr_t h) { + static void* allocResp(uintptr_t h) { Resp* r = (Resp*)calloc(1, sizeof(Resp)); r->h = h; return r; @@ -108,6 +104,22 @@ package main return codex_peer_debug(codexCtx, peerId, (CodexCallback) callback, resp); } + static int cGoCodexUploadInit(void* codexCtx, char* mimetype, char* filename, void* resp) { + return codex_upload_init(codexCtx, mimetype, filename, (CodexCallback) callback, resp); + } + + static int cGoCodexUploadChunk(void* codexCtx, char* sessionId, const uint8_t* chunk, size_t len, void* resp) { + return codex_upload_chunk(codexCtx, sessionId, chunk, len, (CodexCallback) callback, resp); + } + + static int cGoCodexUploadFinalize(void* codexCtx, char* sessionId, void* resp) { + return codex_upload_finalize(codexCtx, sessionId, (CodexCallback) callback, resp); + } + + static int cGoCodexUploadCancel(void* codexCtx, char* sessionId, void* resp) { + return codex_upload_cancel(codexCtx, sessionId, (CodexCallback) callback, resp); + } + static int cGoCodexStart(void* codexCtx, void* resp) { return codex_start(codexCtx, (CodexCallback) callback, resp); } @@ -209,9 +221,6 @@ type RestPeerRecord struct { Addresses []string `json:"addresses,omitempty"` } -// peerId* {.serialize.}: PeerId -// seqNo* {.serialize.}: uint64 -// addresses* {.serialize.}: seq[AddressInfo] type RestNode struct { NodeId string `json:"nodeId"` PeerId string `json:"peerId"` @@ -251,7 +260,7 @@ func newBridgeCtx() *bridgeCtx { bridge := &bridgeCtx{wg: &wg} bridge.h = cgo.NewHandle(bridge) - bridge.resp = C.allocRespWithHandle(C.uintptr_t(uintptr(bridge.h))) + bridge.resp = C.allocResp(C.uintptr_t(uintptr(bridge.h))) return bridge } @@ -490,6 +499,72 @@ func (self *CodexNode) CodexPeerDebug(peerId string) (RestPeerRecord, error) { return record, err } +func (self *CodexNode) CodexUploadInit(mimetype, filename string) (string, error) { + bridge := newBridgeCtx() + defer bridge.free() + + var cMimetype = C.CString(mimetype) + defer C.free(unsafe.Pointer(cMimetype)) + + var cFilename = C.CString(filename) + defer C.free(unsafe.Pointer(cFilename)) + + if C.cGoCodexUploadInit(self.ctx, cMimetype, cFilename, bridge.resp) != C.RET_OK { + return "", bridge.CallError("cGoCodexUploadInit") + } + + return bridge.wait() +} + +func (self *CodexNode) CodexUploadChunk(sessionId string, chunk []byte) error { + bridge := newBridgeCtx() + defer bridge.free() + + var cSessionId = C.CString(sessionId) + defer C.free(unsafe.Pointer(cSessionId)) + + var cChunkPtr *C.uint8_t + if len(chunk) > 0 { + cChunkPtr = (*C.uint8_t)(unsafe.Pointer(&chunk[0])) + } + + if C.cGoCodexUploadChunk(self.ctx, cSessionId, cChunkPtr, C.size_t(len(chunk)), bridge.resp) != C.RET_OK { + return bridge.CallError("cGoCodexUploadChunk") + } + + _, err := bridge.wait() + return err +} + +func (self *CodexNode) CodexUploadFinalize(sessionId string) (string, error) { + bridge := newBridgeCtx() + defer bridge.free() + + var cSessionId = C.CString(sessionId) + defer C.free(unsafe.Pointer(cSessionId)) + + if C.cGoCodexUploadFinalize(self.ctx, cSessionId, bridge.resp) != C.RET_OK { + return "", bridge.CallError("cGoCodexUploadFinalize") + } + + return bridge.wait() +} + +func (self *CodexNode) CodexUploadCancel(sessionId string) error { + bridge := newBridgeCtx() + defer bridge.free() + + var cSessionId = C.CString(sessionId) + defer C.free(unsafe.Pointer(cSessionId)) + + if C.cGoCodexUploadCancel(self.ctx, cSessionId, bridge.resp) != C.RET_OK { + return bridge.CallError("cGoCodexUploadCancel") + } + + _, err := bridge.wait() + return err +} + func (self *CodexNode) CodexStart() error { bridge := newBridgeCtx() defer bridge.free() @@ -635,6 +710,30 @@ func main() { log.Println("Codex Log Level set to TRACE") + sessionId, err := node.CodexUploadInit("text/plain", "hello.txt") + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Codex Upload Init sessionId:", sessionId) + + err = node.CodexUploadChunk(sessionId, []byte("Hello ")) + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + err = node.CodexUploadChunk(sessionId, []byte("World!")) + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + cid, err := node.CodexUploadFinalize(sessionId) + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Codex Upload Finalized, cid:", cid) + // err = node.CodexConnect(peerId, []string{}) // if err != nil { // log.Fatal("Error happened:", err.Error()) diff --git a/library/codex_thread_requests/codex_thread_request.nim b/library/codex_thread_requests/codex_thread_request.nim index c49622b1..b0d789ab 100644 --- a/library/codex_thread_requests/codex_thread_request.nim +++ b/library/codex_thread_requests/codex_thread_request.nim @@ -10,6 +10,7 @@ import ./requests/node_lifecycle_request import ./requests/node_info_request import ./requests/node_debug_request import ./requests/node_p2p_request +import ./requests/node_upload_request from ../../codex/codex import CodexServer @@ -18,6 +19,7 @@ type RequestType* {.pure.} = enum INFO DEBUG P2P + UPLOAD type CodexThreadRequest* = object reqType: RequestType @@ -92,6 +94,8 @@ proc process*( cast[ptr NodeDebugRequest](request[].reqContent).process(codex) of P2P: cast[ptr NodeP2PRequest](request[].reqContent).process(codex) + of UPLOAD: + cast[ptr NodeUploadRequest](request[].reqContent).process(codex) handleRes(await retFut, request) diff --git a/library/codex_thread_requests/requests/node_upload_request.nim b/library/codex_thread_requests/requests/node_upload_request.nim new file mode 100644 index 00000000..f34dd94b --- /dev/null +++ b/library/codex_thread_requests/requests/node_upload_request.nim @@ -0,0 +1,181 @@ +## This file contains the lifecycle request type that will be handled. + +import std/[options, os, mimetypes] +import chronos +import chronicles +import libp2p +import ../../alloc +import ../../../codex/streams +import ../../../codex/node + +from ../../../codex/codex import CodexServer, node + +type NodeUploadMsgType* = enum + INIT + CHUNK + FINALIZE + CANCEL + +type NodeUploadRequest* = object + operation: NodeUploadMsgType + mimetype: cstring + filename: cstring + sessionId: cstring + chunk: seq[byte] + +type + UploadSessionId* = string + UploadSessionCount* = int + UploadSession* = object + stream: BufferStream + fut: Future[?!Cid] + +var uploadSessions {.threadvar.}: Table[UploadSessionId, UploadSession] +var nexUploadSessionCount {.threadvar.}: UploadSessionCount + +proc createShared*( + T: type NodeUploadRequest, + op: NodeUploadMsgType, + mimetype: cstring = "", + filename: cstring = "", + sessionId: cstring = "", + chunk: seq[byte] = @[], +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].mimetype = mimetype.alloc() + ret[].filename = filename.alloc() + ret[].sessionId = sessionId.alloc() + ret[].chunk = chunk + return ret + +proc destroyShared(self: ptr NodeUploadRequest) = + deallocShared(self[].mimetype) + deallocShared(self[].filename) + deallocShared(self[].sessionId) + deallocShared(self) + +## Init upload create a new upload session and returns its ID. +## The session can be used to send chunks of data +## and to pause and resume the upload. +proc init( + codex: ptr CodexServer, mimetype: cstring, filename: cstring +): Future[Result[string, string]] {.async: (raises: []).} = + if $filename != "" and not isValidFilename($filename): + return err("Invalid filename") + + if $mimetype != "": + let m = newMimetypes() + if m.getExt($mimetype, "") == "": + return err("Invalid MIME type") + + let sessionId = $nexUploadSessionCount + nexUploadSessionCount.inc() + + let stream = BufferStream.new() + let lpStream = LPStream(stream) + let node = codex[].node + let fut = node.store(lpStream, ($filename).some, ($mimetype).some) + uploadSessions[sessionId] = UploadSession(stream: stream, fut: fut) + + return ok(sessionId) + +proc chunk( + codex: ptr CodexServer, sessionId: cstring, chunk: seq[byte] +): Future[Result[string, string]] {.async: (raises: []).} = + if not uploadSessions.contains($sessionId): + return err("Invalid session ID") + + try: + let session = uploadSessions[$sessionId] + await session.stream.pushData(chunk) + except KeyError as e: + return err("Invalid session ID") + except LPError as e: + return err("Stream error: " & $e.msg) + except CancelledError as e: + return err("Operation cancelled") + + return ok("") + +proc finalize( + codex: ptr CodexServer, sessionId: cstring +): Future[Result[string, string]] {.async: (raises: []).} = + if not uploadSessions.contains($sessionId): + return err("Invalid session ID") + + var session: UploadSession + try: + session = uploadSessions[$sessionId] + await session.stream.pushEof() + except KeyError as e: + return err("Invalid session ID") + except LPStreamError as e: + return err("Stream error: " & $e.msg) + except CancelledError as e: + return err("Operation cancelled") + # except LPError as e: + # return err("Stream error: " & $e.msg) + # except CancelledError as e: + # return err("Operation cancelled") + + try: + let res = await session.fut + if res.isErr: + return err("Upload failed: " & res.error().msg) + + return ok($res.get()) + except CatchableError as e: + return err("Upload failed: " & $e.msg) + finally: + uploadSessions.del($sessionId) + +proc cancel( + codex: ptr CodexServer, sessionId: cstring +): Future[Result[string, string]] {.async: (raises: []).} = + if not uploadSessions.contains($sessionId): + return err("Invalid session ID") + + try: + let session = uploadSessions[$sessionId] + session.fut.cancel() + except KeyError as e: + return err("Invalid session ID") + + uploadSessions.del($sessionId) + + return ok("") + +proc process*( + self: ptr NodeUploadRequest, codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + defer: + destroyShared(self) + + case self.operation + of NodeUploadMsgType.INIT: + let res = (await init(codex, self.mimetype, self.filename)) + if res.isErr: + error "INIT failed", error = res.error + return err($res.error) + return res + of NodeUploadMsgType.CHUNK: + let res = (await chunk(codex, self.sessionId, self.chunk)) + if res.isErr: + error "CHUNK failed", error = res.error + return err($res.error) + return res + of NodeUploadMsgType.FINALIZE: + let res = (await finalize(codex, self.sessionId)) + if res.isErr: + error "FINALIZE failed", error = res.error + return err($res.error) + return res + of NodeUploadMsgType.CANCEL: + let res = (await cancel(codex, self.sessionId)) + if res.isErr: + error "CANCEL failed", error = res.error + return err($res.error) + return res + + return ok("") diff --git a/library/libcodex.h b/library/libcodex.h index 621eebce..65b0c848 100644 --- a/library/libcodex.h +++ b/library/libcodex.h @@ -80,6 +80,34 @@ int codex_peer_debug( CodexCallback callback, void* userData); + +int codex_upload_init( + void* ctx, + const char* mimetype, + const char* filename, + CodexCallback callback, + void* userData); + +int codex_upload_chunk( + void* ctx, + const char* sessionId, + const uint8_t* chunk, + size_t len, + CodexCallback callback, + void* userData); + +int codex_upload_finalize( + void* ctx, + const char* sessionId, + CodexCallback callback, + void* userData); + +int codex_upload_cancel( + void* ctx, + const char* sessionId, + CodexCallback callback, + void* userData); + int codex_start(void* ctx, CodexCallback callback, void* userData); diff --git a/library/libcodex.nim b/library/libcodex.nim index accb2632..08964a4f 100644 --- a/library/libcodex.nim +++ b/library/libcodex.nim @@ -33,6 +33,7 @@ import ./codex_thread_requests/requests/node_lifecycle_request import ./codex_thread_requests/requests/node_info_request import ./codex_thread_requests/requests/node_debug_request import ./codex_thread_requests/requests/node_p2p_request +import ./codex_thread_requests/requests/node_upload_request import ./ffi_types from ../codex/conf import codexVersion, updateLogLevel @@ -281,6 +282,98 @@ proc codex_destroy( return RET_OK +proc codex_upload_init( + ctx: ptr CodexContext, + mimetype: cstring, + filename: cstring, + callback: CodexCallback, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent = NodeUploadRequest.createShared( + NodeUploadMsgType.INIT, mimetype = mimetype, filename = filename + ) + + codex_context.sendRequestToCodexThread( + ctx, RequestType.UPLOAD, reqContent, callback, userData + ).isOkOr: + let msg = "libcodex error: " & $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + return RET_OK + +proc codex_upload_chunk( + ctx: ptr CodexContext, + sessionId: cstring, + data: ptr byte, + len: int, + callback: CodexCallback, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let chunk = newSeq[byte](len) + copyMem(addr chunk[0], data, len) + + let reqContent = NodeUploadRequest.createShared( + NodeUploadMsgType.CHUNK, sessionId = sessionId, chunk = chunk + ) + + codex_context.sendRequestToCodexThread( + ctx, RequestType.UPLOAD, reqContent, callback, userData + ).isOkOr: + let msg = "libcodex error: " & $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + return RET_OK + +proc codex_upload_finalize( + ctx: ptr CodexContext, + sessionId: cstring, + callback: CodexCallback, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent = + NodeUploadRequest.createShared(NodeUploadMsgType.FINALIZE, sessionId = sessionId) + + codex_context.sendRequestToCodexThread( + ctx, RequestType.UPLOAD, reqContent, callback, userData + ).isOkOr: + let msg = "libcodex error: " & $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + return RET_OK + +proc codex_upload_cancel( + ctx: ptr CodexContext, + sessionId: cstring, + callback: CodexCallback, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent = + NodeUploadRequest.createShared(NodeUploadMsgType.CANCEL, sessionId = sessionId) + + codex_context.sendRequestToCodexThread( + ctx, RequestType.UPLOAD, reqContent, callback, userData + ).isOkOr: + let msg = "libcodex error: " & $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + return RET_OK + proc codex_start( ctx: ptr CodexContext, callback: CodexCallback, userData: pointer ): cint {.dynlib, exportc.} =