Add upload file API

This commit is contained in:
Arnaud 2025-09-19 15:05:03 +02:00 committed by Eric
parent c742304e33
commit fa91e2efcc
No known key found for this signature in database
5 changed files with 196 additions and 45 deletions

View File

@ -104,11 +104,11 @@ package main
return codex_peer_debug(codexCtx, peerId, (CodexCallback) callback, resp);
}
static int cGoCodexUploadInit(void* codexCtx, char* mimetype, char* filename, void* resp) {
return codex_upload_init(codexCtx, mimetype, filename, (CodexCallback) callback, resp);
static int cGoCodexUploadInit(void* codexCtx, char* filepath, void* resp) {
return codex_upload_init(codexCtx, filepath, (CodexCallback) callback, resp);
}
static int cGoCodexUploadChunk(void* codexCtx, char* sessionId, const uint32_t* chunk, size_t len, void* resp) {
static int cGoCodexUploadChunk(void* codexCtx, char* sessionId, const uint8_t* chunk, size_t len, void* resp) {
return codex_upload_chunk(codexCtx, sessionId, chunk, len, (CodexCallback) callback, resp);
}
@ -120,6 +120,10 @@ package main
return codex_upload_cancel(codexCtx, sessionId, (CodexCallback) callback, resp);
}
static int cGoCodexUploadFile(void* codexCtx, char* sessionId, size_t chunkSize, void* resp) {
return codex_upload_file(codexCtx, sessionId, chunkSize, (CodexCallback) callback, resp);
}
static int cGoCodexStart(void* codexCtx, void* resp) {
return codex_start(codexCtx, (CodexCallback) callback, resp);
}
@ -160,6 +164,7 @@ import (
"log"
"os"
"os/signal"
"path"
"runtime/cgo"
"sync"
"syscall"
@ -495,17 +500,14 @@ func (self *CodexNode) CodexPeerDebug(peerId string) (RestPeerRecord, error) {
return record, err
}
func (self *CodexNode) CodexUploadInit(mimetype, filename string) (string, error) {
func (self *CodexNode) CodexUploadInit(filename string) (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
var cMimetype = C.CString(mimetype)
defer C.free(unsafe.Pointer(cMimetype))
var cFilename = C.CString(filename)
defer C.free(unsafe.Pointer(cFilename))
if C.cGoCodexUploadInit(self.ctx, cMimetype, cFilename, bridge.resp) != C.RET_OK {
if C.cGoCodexUploadInit(self.ctx, cFilename, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexUploadInit")
}
@ -519,9 +521,9 @@ func (self *CodexNode) CodexUploadChunk(sessionId string, chunk []byte) error {
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
var cChunkPtr *C.uint32_t
var cChunkPtr *C.uint8_t
if len(chunk) > 0 {
cChunkPtr = (*C.uint32_t)(unsafe.Pointer(&chunk[0]))
cChunkPtr = (*C.uint8_t)(unsafe.Pointer(&chunk[0]))
}
if C.cGoCodexUploadChunk(self.ctx, cSessionId, cChunkPtr, C.size_t(len(chunk)), bridge.resp) != C.RET_OK {
@ -561,8 +563,8 @@ func (self *CodexNode) CodexUploadCancel(sessionId string) error {
return err
}
func (self *CodexNode) CodexUploadReader(mimetype, filename string, r io.Reader, chunkSize int) (string, error) {
sessionId, err := self.CodexUploadInit(mimetype, filename)
func (self *CodexNode) CodexUploadReader(filename string, r io.Reader, chunkSize int) (string, error) {
sessionId, err := self.CodexUploadInit(filename)
if err != nil {
return "", err
}
@ -590,6 +592,52 @@ func (self *CodexNode) CodexUploadReader(mimetype, filename string, r io.Reader,
return self.CodexUploadFinalize(sessionId)
}
// TODO provide an async version of CodexUploadReader
// that starts a gorountine to upload the chunks
// and take:
// a callback to be called when done
// another callback to cancel the upload
func (self *CodexNode) CodexUploadReaderAsync(filename string, r io.Reader, chunkSize int) (string, error) {
return "", nil
}
func (self *CodexNode) CodexUploadFile(filepath string, chunkSize int) (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
var cFilePath = C.CString(filepath)
defer C.free(unsafe.Pointer(cFilePath))
sessionId, err := self.CodexUploadInit(filepath)
if err != nil {
return "", err
}
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
var cChunkSize = C.size_t(0)
if chunkSize > 0 {
cChunkSize = C.size_t(chunkSize)
}
if C.cGoCodexUploadFile(self.ctx, cSessionId, cChunkSize, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexUploadFile")
}
cid, err := bridge.wait()
return cid, err
}
// TODO provide an async version of CodexUploadFile
// that starts a gorountine to upload the file
// and take:
// a callback to be called when done
// another callback to cancel the upload
func (self *CodexNode) CodexUploadFileAsync(filepath string, chunkSize int) (string, error) {
return "", nil
}
func (self *CodexNode) CodexStart() error {
bridge := newBridgeCtx()
defer bridge.free()
@ -735,7 +783,7 @@ func main() {
log.Println("Codex Log Level set to TRACE")
sessionId, err := node.CodexUploadInit("text/plain", "hello.txt")
sessionId, err := node.CodexUploadInit("hello.txt")
if err != nil {
log.Fatal("Error happened:", err.Error())
}
@ -760,13 +808,27 @@ func main() {
log.Println("Codex Upload Finalized, cid:", cid)
buf := bytes.NewBuffer([]byte("Hello World!"))
cid, err = node.CodexUploadReader("text/plain", "hello.txt", buf, 16*1024)
cid, err = node.CodexUploadReader("hello.txt", buf, 16*1024)
if err != nil {
log.Fatal("Error happened:", err.Error())
}
log.Println("Codex Upload Finalized from reader, cid:", cid)
current, err := os.Getwd()
if err != nil {
log.Fatal("Error happened:", err.Error())
}
filepath := path.Join(current, "examples", "golang", "hello.txt")
log.Println("Uploading file:", filepath)
cid, err = node.CodexUploadFile(filepath, 1024)
if err != nil {
log.Fatal("Error happened:", err.Error())
}
log.Println("Codex Upload File finalized, cid: .", cid)
// err = node.CodexConnect(peerId, []string{})
// if err != nil {
// log.Fatal("Error happened:", err.Error())

View File

@ -0,0 +1 @@
Hello World!

View File

@ -1,6 +1,7 @@
## This file contains the lifecycle request type that will be handled.
{.push raises: [].}
import std/[options, os, mimetypes]
import std/[options, os, mimetypes, streams]
import chronos
import chronicles
import libp2p
@ -15,13 +16,14 @@ type NodeUploadMsgType* = enum
CHUNK
FINALIZE
CANCEL
FILE
type NodeUploadRequest* = object
operation: NodeUploadMsgType
mimetype: cstring
filename: cstring
sessionId: cstring
filepath: cstring
chunk: seq[byte]
chunkSize: csize_t
type
UploadSessionId* = string
@ -29,6 +31,7 @@ type
UploadSession* = object
stream: BufferStream
fut: Future[?!Cid]
filepath: string
var uploadSessions {.threadvar.}: Table[UploadSessionId, UploadSession]
var nexUploadSessionCount {.threadvar.}: UploadSessionCount
@ -36,38 +39,54 @@ var nexUploadSessionCount {.threadvar.}: UploadSessionCount
proc createShared*(
T: type NodeUploadRequest,
op: NodeUploadMsgType,
mimetype: cstring = "",
filename: cstring = "",
sessionId: cstring = "",
filepath: cstring = "",
chunk: seq[byte] = @[],
chunkSize: csize_t = 0,
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].mimetype = mimetype.alloc()
ret[].filename = filename.alloc()
ret[].sessionId = sessionId.alloc()
ret[].filepath = filepath.alloc()
ret[].chunk = chunk
ret[].chunkSize = chunkSize
return ret
proc destroyShared(self: ptr NodeUploadRequest) =
deallocShared(self[].mimetype)
deallocShared(self[].filename)
deallocShared(self[].filepath)
deallocShared(self[].sessionId)
deallocShared(self)
## Init upload create a new upload session and returns its ID.
## The session can be used to send chunks of data
## and to pause and resume the upload.
## filepath can be the absolute path to a file to upload directly,
## or it can be the filename when the file will be uploaded via chunks.
## The mimetype is deduced from the filename extension.
proc init(
codex: ptr CodexServer, mimetype: cstring, filename: cstring
codex: ptr CodexServer, filepath: cstring = ""
): Future[Result[string, string]] {.async: (raises: []).} =
if $filename != "" and not isValidFilename($filename):
return err("Invalid filename")
var filenameOpt, mimetypeOpt = string.none
if $mimetype != "":
let m = newMimetypes()
if m.getExt($mimetype, "") == "":
return err("Invalid MIME type")
if isAbsolute($filepath):
if not fileExists($filepath):
return err("File does not exist")
if filepath != "":
let (_, name, ext) = splitFile($filepath)
filenameOpt = (name & ext).some
if ext != "":
let extNoDot =
if ext.len > 0:
ext[1 ..^ 1]
else:
""
let mime = newMimetypes()
let mimetypeStr = mime.getMimetype(extNoDot, "")
mimetypeOpt = if mimetypeStr == "": string.none else: mimetypeStr.some
let sessionId = $nexUploadSessionCount
nexUploadSessionCount.inc()
@ -75,8 +94,9 @@ proc init(
let stream = BufferStream.new()
let lpStream = LPStream(stream)
let node = codex[].node
let fut = node.store(lpStream, ($filename).some, ($mimetype).some)
uploadSessions[sessionId] = UploadSession(stream: stream, fut: fut)
let fut = node.store(lpStream, filenameOpt, mimetypeOpt)
uploadSessions[sessionId] =
UploadSession(stream: stream, fut: fut, filepath: $filepath)
return ok(sessionId)
@ -142,6 +162,48 @@ proc cancel(
return ok("")
proc file(
codex: ptr CodexServer, sessionId: cstring, chunkSize: csize_t = 1024
): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
if not uploadSessions.contains($sessionId):
return err("Invalid session ID")
let size = if chunkSize > 0: chunkSize else: 1024
var buffer = newSeq[byte](size)
var session: UploadSession
## Here we certainly need to spawn a new thread to avoid blocking
## the worker thread while reading the file.
try:
session = uploadSessions[$sessionId]
let fs = openFileStream(session.filepath)
while true:
let bytesRead = fs.readData(addr buffer[0], buffer.len)
if bytesRead == 0:
break
await session.stream.pushData(buffer[0 ..< bytesRead])
await session.stream.pushEof()
let res = await session.fut
if res.isErr:
return err("Upload failed: " & res.error().msg)
return ok($res.get())
except KeyError as e:
return err("Invalid session ID")
except LPStreamError, IOError:
let e = getCurrentException()
return err("Stream error: " & $e.msg)
except CancelledError as e:
return err("Operation cancelled")
except CatchableError as e:
return err("Upload failed: " & $e.msg)
finally:
uploadSessions.del($sessionId)
proc process*(
self: ptr NodeUploadRequest, codex: ptr CodexServer
): Future[Result[string, string]] {.async: (raises: []).} =
@ -150,7 +212,7 @@ proc process*(
case self.operation
of NodeUploadMsgType.INIT:
let res = (await init(codex, self.mimetype, self.filename))
let res = (await init(codex, self.filepath))
if res.isErr:
error "INIT failed", error = res.error
return err($res.error)
@ -173,5 +235,11 @@ proc process*(
error "CANCEL failed", error = res.error
return err($res.error)
return res
of NodeUploadMsgType.FILE:
let res = (await file(codex, self.sessionId, self.chunkSize))
if res.isErr:
error "FILE failed", error = res.error
return err($res.error)
return res
return ok("")

View File

@ -83,15 +83,14 @@ int codex_peer_debug(
int codex_upload_init(
void* ctx,
const char* mimetype,
const char* filename,
const char* filepath,
CodexCallback callback,
void* userData);
int codex_upload_chunk(
void* ctx,
const char* sessionId,
const uint32_t* chunk,
const uint8_t* chunk,
size_t len,
CodexCallback callback,
void* userData);
@ -108,6 +107,13 @@ int codex_upload_cancel(
CodexCallback callback,
void* userData);
int codex_upload_file(
void* ctx,
const char* sessionId,
size_t chunkSize,
CodexCallback callback,
void* userData);
int codex_start(void* ctx,
CodexCallback callback,
void* userData);

View File

@ -247,17 +247,12 @@ proc codex_destroy(
return callback.success("", userData)
proc codex_upload_init(
ctx: ptr CodexContext,
mimetype: cstring,
filename: cstring,
callback: CodexCallback,
userData: pointer,
ctx: ptr CodexContext, filepath: cstring, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =
init(ctx, callback, userData)
let reqContent = NodeUploadRequest.createShared(
NodeUploadMsgType.INIT, mimetype = mimetype, filename = filename
)
let reqContent =
NodeUploadRequest.createShared(NodeUploadMsgType.INIT, filepath = filepath)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.UPLOAD, reqContent, callback, userData
)
@ -268,7 +263,7 @@ proc codex_upload_chunk(
ctx: ptr CodexContext,
sessionId: cstring,
data: ptr byte,
len: int,
len: csize_t,
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
@ -319,6 +314,25 @@ proc codex_upload_cancel(
return callback.okOrError(res, userData)
proc codex_upload_file(
ctx: ptr CodexContext,
sessionId: cstring,
chunkSize: csize_t,
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
init(ctx, callback, userData)
let reqContent = NodeUploadRequest.createShared(
NodeUploadMsgType.FILE, sessionId = sessionId, chunkSize = chunkSize
)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.UPLOAD, reqContent, callback, userData
)
return callback.okOrError(res, userData)
proc codex_start(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =