diff --git a/examples/golang/codex.go b/examples/golang/codex.go index b2cbe7a7..cb822964 100644 --- a/examples/golang/codex.go +++ b/examples/golang/codex.go @@ -104,11 +104,11 @@ package main return codex_peer_debug(codexCtx, peerId, (CodexCallback) callback, resp); } - static int cGoCodexUploadInit(void* codexCtx, char* mimetype, char* filename, void* resp) { - return codex_upload_init(codexCtx, mimetype, filename, (CodexCallback) callback, resp); + static int cGoCodexUploadInit(void* codexCtx, char* filepath, void* resp) { + return codex_upload_init(codexCtx, filepath, (CodexCallback) callback, resp); } - static int cGoCodexUploadChunk(void* codexCtx, char* sessionId, const uint32_t* chunk, size_t len, void* resp) { + static int cGoCodexUploadChunk(void* codexCtx, char* sessionId, const uint8_t* chunk, size_t len, void* resp) { return codex_upload_chunk(codexCtx, sessionId, chunk, len, (CodexCallback) callback, resp); } @@ -120,6 +120,10 @@ package main return codex_upload_cancel(codexCtx, sessionId, (CodexCallback) callback, resp); } + static int cGoCodexUploadFile(void* codexCtx, char* sessionId, size_t chunkSize, void* resp) { + return codex_upload_file(codexCtx, sessionId, chunkSize, (CodexCallback) callback, resp); + } + static int cGoCodexStart(void* codexCtx, void* resp) { return codex_start(codexCtx, (CodexCallback) callback, resp); } @@ -160,6 +164,7 @@ import ( "log" "os" "os/signal" + "path" "runtime/cgo" "sync" "syscall" @@ -495,17 +500,14 @@ func (self *CodexNode) CodexPeerDebug(peerId string) (RestPeerRecord, error) { return record, err } -func (self *CodexNode) CodexUploadInit(mimetype, filename string) (string, error) { +func (self *CodexNode) CodexUploadInit(filename string) (string, error) { bridge := newBridgeCtx() defer bridge.free() - var cMimetype = C.CString(mimetype) - defer C.free(unsafe.Pointer(cMimetype)) - var cFilename = C.CString(filename) defer C.free(unsafe.Pointer(cFilename)) - if C.cGoCodexUploadInit(self.ctx, cMimetype, cFilename, bridge.resp) != C.RET_OK { + if C.cGoCodexUploadInit(self.ctx, cFilename, bridge.resp) != C.RET_OK { return "", bridge.CallError("cGoCodexUploadInit") } @@ -519,9 +521,9 @@ func (self *CodexNode) CodexUploadChunk(sessionId string, chunk []byte) error { var cSessionId = C.CString(sessionId) defer C.free(unsafe.Pointer(cSessionId)) - var cChunkPtr *C.uint32_t + var cChunkPtr *C.uint8_t if len(chunk) > 0 { - cChunkPtr = (*C.uint32_t)(unsafe.Pointer(&chunk[0])) + cChunkPtr = (*C.uint8_t)(unsafe.Pointer(&chunk[0])) } if C.cGoCodexUploadChunk(self.ctx, cSessionId, cChunkPtr, C.size_t(len(chunk)), bridge.resp) != C.RET_OK { @@ -561,8 +563,8 @@ func (self *CodexNode) CodexUploadCancel(sessionId string) error { return err } -func (self *CodexNode) CodexUploadReader(mimetype, filename string, r io.Reader, chunkSize int) (string, error) { - sessionId, err := self.CodexUploadInit(mimetype, filename) +func (self *CodexNode) CodexUploadReader(filename string, r io.Reader, chunkSize int) (string, error) { + sessionId, err := self.CodexUploadInit(filename) if err != nil { return "", err } @@ -590,6 +592,52 @@ func (self *CodexNode) CodexUploadReader(mimetype, filename string, r io.Reader, return self.CodexUploadFinalize(sessionId) } +// TODO provide an async version of CodexUploadReader +// that starts a gorountine to upload the chunks +// and take: +// a callback to be called when done +// another callback to cancel the upload +func (self *CodexNode) CodexUploadReaderAsync(filename string, r io.Reader, chunkSize int) (string, error) { + return "", nil +} + +func (self *CodexNode) CodexUploadFile(filepath string, chunkSize int) (string, error) { + bridge := newBridgeCtx() + defer bridge.free() + + var cFilePath = C.CString(filepath) + defer C.free(unsafe.Pointer(cFilePath)) + + sessionId, err := self.CodexUploadInit(filepath) + if err != nil { + return "", err + } + + var cSessionId = C.CString(sessionId) + defer C.free(unsafe.Pointer(cSessionId)) + + var cChunkSize = C.size_t(0) + if chunkSize > 0 { + cChunkSize = C.size_t(chunkSize) + } + + if C.cGoCodexUploadFile(self.ctx, cSessionId, cChunkSize, bridge.resp) != C.RET_OK { + return "", bridge.CallError("cGoCodexUploadFile") + } + + cid, err := bridge.wait() + return cid, err +} + +// TODO provide an async version of CodexUploadFile +// that starts a gorountine to upload the file +// and take: +// a callback to be called when done +// another callback to cancel the upload +func (self *CodexNode) CodexUploadFileAsync(filepath string, chunkSize int) (string, error) { + return "", nil +} + func (self *CodexNode) CodexStart() error { bridge := newBridgeCtx() defer bridge.free() @@ -735,7 +783,7 @@ func main() { log.Println("Codex Log Level set to TRACE") - sessionId, err := node.CodexUploadInit("text/plain", "hello.txt") + sessionId, err := node.CodexUploadInit("hello.txt") if err != nil { log.Fatal("Error happened:", err.Error()) } @@ -760,13 +808,27 @@ func main() { log.Println("Codex Upload Finalized, cid:", cid) buf := bytes.NewBuffer([]byte("Hello World!")) - cid, err = node.CodexUploadReader("text/plain", "hello.txt", buf, 16*1024) + cid, err = node.CodexUploadReader("hello.txt", buf, 16*1024) if err != nil { log.Fatal("Error happened:", err.Error()) } log.Println("Codex Upload Finalized from reader, cid:", cid) + current, err := os.Getwd() + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + filepath := path.Join(current, "examples", "golang", "hello.txt") + log.Println("Uploading file:", filepath) + cid, err = node.CodexUploadFile(filepath, 1024) + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Codex Upload File finalized, cid: .", cid) + // err = node.CodexConnect(peerId, []string{}) // if err != nil { // log.Fatal("Error happened:", err.Error()) diff --git a/examples/golang/hello.txt b/examples/golang/hello.txt new file mode 100644 index 00000000..c57eff55 --- /dev/null +++ b/examples/golang/hello.txt @@ -0,0 +1 @@ +Hello World! \ No newline at end of file diff --git a/library/codex_thread_requests/requests/node_upload_request.nim b/library/codex_thread_requests/requests/node_upload_request.nim index 7cd4964e..747b66ac 100644 --- a/library/codex_thread_requests/requests/node_upload_request.nim +++ b/library/codex_thread_requests/requests/node_upload_request.nim @@ -1,6 +1,7 @@ ## This file contains the lifecycle request type that will be handled. +{.push raises: [].} -import std/[options, os, mimetypes] +import std/[options, os, mimetypes, streams] import chronos import chronicles import libp2p @@ -15,13 +16,14 @@ type NodeUploadMsgType* = enum CHUNK FINALIZE CANCEL + FILE type NodeUploadRequest* = object operation: NodeUploadMsgType - mimetype: cstring - filename: cstring sessionId: cstring + filepath: cstring chunk: seq[byte] + chunkSize: csize_t type UploadSessionId* = string @@ -29,6 +31,7 @@ type UploadSession* = object stream: BufferStream fut: Future[?!Cid] + filepath: string var uploadSessions {.threadvar.}: Table[UploadSessionId, UploadSession] var nexUploadSessionCount {.threadvar.}: UploadSessionCount @@ -36,38 +39,54 @@ var nexUploadSessionCount {.threadvar.}: UploadSessionCount proc createShared*( T: type NodeUploadRequest, op: NodeUploadMsgType, - mimetype: cstring = "", - filename: cstring = "", sessionId: cstring = "", + filepath: cstring = "", chunk: seq[byte] = @[], + chunkSize: csize_t = 0, ): ptr type T = var ret = createShared(T) ret[].operation = op - ret[].mimetype = mimetype.alloc() - ret[].filename = filename.alloc() ret[].sessionId = sessionId.alloc() + ret[].filepath = filepath.alloc() ret[].chunk = chunk + ret[].chunkSize = chunkSize return ret proc destroyShared(self: ptr NodeUploadRequest) = - deallocShared(self[].mimetype) - deallocShared(self[].filename) + deallocShared(self[].filepath) deallocShared(self[].sessionId) deallocShared(self) ## Init upload create a new upload session and returns its ID. ## The session can be used to send chunks of data ## and to pause and resume the upload. +## filepath can be the absolute path to a file to upload directly, +## or it can be the filename when the file will be uploaded via chunks. +## The mimetype is deduced from the filename extension. proc init( - codex: ptr CodexServer, mimetype: cstring, filename: cstring + codex: ptr CodexServer, filepath: cstring = "" ): Future[Result[string, string]] {.async: (raises: []).} = - if $filename != "" and not isValidFilename($filename): - return err("Invalid filename") + var filenameOpt, mimetypeOpt = string.none - if $mimetype != "": - let m = newMimetypes() - if m.getExt($mimetype, "") == "": - return err("Invalid MIME type") + if isAbsolute($filepath): + if not fileExists($filepath): + return err("File does not exist") + + if filepath != "": + let (_, name, ext) = splitFile($filepath) + + filenameOpt = (name & ext).some + + if ext != "": + let extNoDot = + if ext.len > 0: + ext[1 ..^ 1] + else: + "" + let mime = newMimetypes() + let mimetypeStr = mime.getMimetype(extNoDot, "") + + mimetypeOpt = if mimetypeStr == "": string.none else: mimetypeStr.some let sessionId = $nexUploadSessionCount nexUploadSessionCount.inc() @@ -75,8 +94,9 @@ proc init( let stream = BufferStream.new() let lpStream = LPStream(stream) let node = codex[].node - let fut = node.store(lpStream, ($filename).some, ($mimetype).some) - uploadSessions[sessionId] = UploadSession(stream: stream, fut: fut) + let fut = node.store(lpStream, filenameOpt, mimetypeOpt) + uploadSessions[sessionId] = + UploadSession(stream: stream, fut: fut, filepath: $filepath) return ok(sessionId) @@ -142,6 +162,48 @@ proc cancel( return ok("") +proc file( + codex: ptr CodexServer, sessionId: cstring, chunkSize: csize_t = 1024 +): Future[Result[string, string]] {.raises: [], async: (raises: []).} = + if not uploadSessions.contains($sessionId): + return err("Invalid session ID") + + let size = if chunkSize > 0: chunkSize else: 1024 + var buffer = newSeq[byte](size) + var session: UploadSession + + ## Here we certainly need to spawn a new thread to avoid blocking + ## the worker thread while reading the file. + try: + session = uploadSessions[$sessionId] + let fs = openFileStream(session.filepath) + + while true: + let bytesRead = fs.readData(addr buffer[0], buffer.len) + + if bytesRead == 0: + break + await session.stream.pushData(buffer[0 ..< bytesRead]) + + await session.stream.pushEof() + + let res = await session.fut + if res.isErr: + return err("Upload failed: " & res.error().msg) + + return ok($res.get()) + except KeyError as e: + return err("Invalid session ID") + except LPStreamError, IOError: + let e = getCurrentException() + return err("Stream error: " & $e.msg) + except CancelledError as e: + return err("Operation cancelled") + except CatchableError as e: + return err("Upload failed: " & $e.msg) + finally: + uploadSessions.del($sessionId) + proc process*( self: ptr NodeUploadRequest, codex: ptr CodexServer ): Future[Result[string, string]] {.async: (raises: []).} = @@ -150,7 +212,7 @@ proc process*( case self.operation of NodeUploadMsgType.INIT: - let res = (await init(codex, self.mimetype, self.filename)) + let res = (await init(codex, self.filepath)) if res.isErr: error "INIT failed", error = res.error return err($res.error) @@ -173,5 +235,11 @@ proc process*( error "CANCEL failed", error = res.error return err($res.error) return res + of NodeUploadMsgType.FILE: + let res = (await file(codex, self.sessionId, self.chunkSize)) + if res.isErr: + error "FILE failed", error = res.error + return err($res.error) + return res return ok("") diff --git a/library/libcodex.h b/library/libcodex.h index 811dc38a..d3e52a55 100644 --- a/library/libcodex.h +++ b/library/libcodex.h @@ -83,15 +83,14 @@ int codex_peer_debug( int codex_upload_init( void* ctx, - const char* mimetype, - const char* filename, + const char* filepath, CodexCallback callback, void* userData); int codex_upload_chunk( void* ctx, const char* sessionId, - const uint32_t* chunk, + const uint8_t* chunk, size_t len, CodexCallback callback, void* userData); @@ -108,6 +107,13 @@ int codex_upload_cancel( CodexCallback callback, void* userData); +int codex_upload_file( + void* ctx, + const char* sessionId, + size_t chunkSize, + CodexCallback callback, + void* userData); + int codex_start(void* ctx, CodexCallback callback, void* userData); diff --git a/library/libcodex.nim b/library/libcodex.nim index b11c8524..b720efbf 100644 --- a/library/libcodex.nim +++ b/library/libcodex.nim @@ -247,17 +247,12 @@ proc codex_destroy( return callback.success("", userData) proc codex_upload_init( - ctx: ptr CodexContext, - mimetype: cstring, - filename: cstring, - callback: CodexCallback, - userData: pointer, + ctx: ptr CodexContext, filepath: cstring, callback: CodexCallback, userData: pointer ): cint {.dynlib, exportc.} = init(ctx, callback, userData) - let reqContent = NodeUploadRequest.createShared( - NodeUploadMsgType.INIT, mimetype = mimetype, filename = filename - ) + let reqContent = + NodeUploadRequest.createShared(NodeUploadMsgType.INIT, filepath = filepath) let res = codex_context.sendRequestToCodexThread( ctx, RequestType.UPLOAD, reqContent, callback, userData ) @@ -268,7 +263,7 @@ proc codex_upload_chunk( ctx: ptr CodexContext, sessionId: cstring, data: ptr byte, - len: int, + len: csize_t, callback: CodexCallback, userData: pointer, ): cint {.dynlib, exportc.} = @@ -319,6 +314,25 @@ proc codex_upload_cancel( return callback.okOrError(res, userData) +proc codex_upload_file( + ctx: ptr CodexContext, + sessionId: cstring, + chunkSize: csize_t, + callback: CodexCallback, + userData: pointer, +): cint {.dynlib, exportc.} = + init(ctx, callback, userData) + + let reqContent = NodeUploadRequest.createShared( + NodeUploadMsgType.FILE, sessionId = sessionId, chunkSize = chunkSize + ) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.UPLOAD, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + proc codex_start( ctx: ptr CodexContext, callback: CodexCallback, userData: pointer ): cint {.dynlib, exportc.} =