Add upload feature

This commit is contained in:
Arnaud 2025-09-18 09:14:49 +02:00
parent 1754d4e509
commit 5b91976152
No known key found for this signature in database
GPG Key ID: B8FBC178F10CA7AE
5 changed files with 414 additions and 9 deletions

View File

@ -24,11 +24,7 @@ package main
uintptr_t h;
} Resp;
static void* allocResp() {
return calloc(1, sizeof(Resp));
}
static void* allocRespWithHandle(uintptr_t h) {
static void* allocResp(uintptr_t h) {
Resp* r = (Resp*)calloc(1, sizeof(Resp));
r->h = h;
return r;
@ -108,6 +104,22 @@ package main
return codex_peer_debug(codexCtx, peerId, (CodexCallback) callback, resp);
}
static int cGoCodexUploadInit(void* codexCtx, char* mimetype, char* filename, void* resp) {
return codex_upload_init(codexCtx, mimetype, filename, (CodexCallback) callback, resp);
}
static int cGoCodexUploadChunk(void* codexCtx, char* sessionId, const uint8_t* chunk, size_t len, void* resp) {
return codex_upload_chunk(codexCtx, sessionId, chunk, len, (CodexCallback) callback, resp);
}
static int cGoCodexUploadFinalize(void* codexCtx, char* sessionId, void* resp) {
return codex_upload_finalize(codexCtx, sessionId, (CodexCallback) callback, resp);
}
static int cGoCodexUploadCancel(void* codexCtx, char* sessionId, void* resp) {
return codex_upload_cancel(codexCtx, sessionId, (CodexCallback) callback, resp);
}
static int cGoCodexStart(void* codexCtx, void* resp) {
return codex_start(codexCtx, (CodexCallback) callback, resp);
}
@ -209,9 +221,6 @@ type RestPeerRecord struct {
Addresses []string `json:"addresses,omitempty"`
}
// peerId* {.serialize.}: PeerId
// seqNo* {.serialize.}: uint64
// addresses* {.serialize.}: seq[AddressInfo]
type RestNode struct {
NodeId string `json:"nodeId"`
PeerId string `json:"peerId"`
@ -251,7 +260,7 @@ func newBridgeCtx() *bridgeCtx {
bridge := &bridgeCtx{wg: &wg}
bridge.h = cgo.NewHandle(bridge)
bridge.resp = C.allocRespWithHandle(C.uintptr_t(uintptr(bridge.h)))
bridge.resp = C.allocResp(C.uintptr_t(uintptr(bridge.h)))
return bridge
}
@ -490,6 +499,72 @@ func (self *CodexNode) CodexPeerDebug(peerId string) (RestPeerRecord, error) {
return record, err
}
func (self *CodexNode) CodexUploadInit(mimetype, filename string) (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
var cMimetype = C.CString(mimetype)
defer C.free(unsafe.Pointer(cMimetype))
var cFilename = C.CString(filename)
defer C.free(unsafe.Pointer(cFilename))
if C.cGoCodexUploadInit(self.ctx, cMimetype, cFilename, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexUploadInit")
}
return bridge.wait()
}
func (self *CodexNode) CodexUploadChunk(sessionId string, chunk []byte) error {
bridge := newBridgeCtx()
defer bridge.free()
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
var cChunkPtr *C.uint8_t
if len(chunk) > 0 {
cChunkPtr = (*C.uint8_t)(unsafe.Pointer(&chunk[0]))
}
if C.cGoCodexUploadChunk(self.ctx, cSessionId, cChunkPtr, C.size_t(len(chunk)), bridge.resp) != C.RET_OK {
return bridge.CallError("cGoCodexUploadChunk")
}
_, err := bridge.wait()
return err
}
func (self *CodexNode) CodexUploadFinalize(sessionId string) (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
if C.cGoCodexUploadFinalize(self.ctx, cSessionId, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexUploadFinalize")
}
return bridge.wait()
}
func (self *CodexNode) CodexUploadCancel(sessionId string) error {
bridge := newBridgeCtx()
defer bridge.free()
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
if C.cGoCodexUploadCancel(self.ctx, cSessionId, bridge.resp) != C.RET_OK {
return bridge.CallError("cGoCodexUploadCancel")
}
_, err := bridge.wait()
return err
}
func (self *CodexNode) CodexStart() error {
bridge := newBridgeCtx()
defer bridge.free()
@ -635,6 +710,30 @@ func main() {
log.Println("Codex Log Level set to TRACE")
sessionId, err := node.CodexUploadInit("text/plain", "hello.txt")
if err != nil {
log.Fatal("Error happened:", err.Error())
}
log.Println("Codex Upload Init sessionId:", sessionId)
err = node.CodexUploadChunk(sessionId, []byte("Hello "))
if err != nil {
log.Fatal("Error happened:", err.Error())
}
err = node.CodexUploadChunk(sessionId, []byte("World!"))
if err != nil {
log.Fatal("Error happened:", err.Error())
}
cid, err := node.CodexUploadFinalize(sessionId)
if err != nil {
log.Fatal("Error happened:", err.Error())
}
log.Println("Codex Upload Finalized, cid:", cid)
// err = node.CodexConnect(peerId, []string{})
// if err != nil {
// log.Fatal("Error happened:", err.Error())

View File

@ -10,6 +10,7 @@ import ./requests/node_lifecycle_request
import ./requests/node_info_request
import ./requests/node_debug_request
import ./requests/node_p2p_request
import ./requests/node_upload_request
from ../../codex/codex import CodexServer
@ -18,6 +19,7 @@ type RequestType* {.pure.} = enum
INFO
DEBUG
P2P
UPLOAD
type CodexThreadRequest* = object
reqType: RequestType
@ -92,6 +94,8 @@ proc process*(
cast[ptr NodeDebugRequest](request[].reqContent).process(codex)
of P2P:
cast[ptr NodeP2PRequest](request[].reqContent).process(codex)
of UPLOAD:
cast[ptr NodeUploadRequest](request[].reqContent).process(codex)
handleRes(await retFut, request)

View File

@ -0,0 +1,181 @@
## This file contains the lifecycle request type that will be handled.
import std/[options, os, mimetypes]
import chronos
import chronicles
import libp2p
import ../../alloc
import ../../../codex/streams
import ../../../codex/node
from ../../../codex/codex import CodexServer, node
type NodeUploadMsgType* = enum
INIT
CHUNK
FINALIZE
CANCEL
type NodeUploadRequest* = object
operation: NodeUploadMsgType
mimetype: cstring
filename: cstring
sessionId: cstring
chunk: seq[byte]
type
UploadSessionId* = string
UploadSessionCount* = int
UploadSession* = object
stream: BufferStream
fut: Future[?!Cid]
var uploadSessions {.threadvar.}: Table[UploadSessionId, UploadSession]
var nexUploadSessionCount {.threadvar.}: UploadSessionCount
proc createShared*(
T: type NodeUploadRequest,
op: NodeUploadMsgType,
mimetype: cstring = "",
filename: cstring = "",
sessionId: cstring = "",
chunk: seq[byte] = @[],
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].mimetype = mimetype.alloc()
ret[].filename = filename.alloc()
ret[].sessionId = sessionId.alloc()
ret[].chunk = chunk
return ret
proc destroyShared(self: ptr NodeUploadRequest) =
deallocShared(self[].mimetype)
deallocShared(self[].filename)
deallocShared(self[].sessionId)
deallocShared(self)
## Init upload create a new upload session and returns its ID.
## The session can be used to send chunks of data
## and to pause and resume the upload.
proc init(
codex: ptr CodexServer, mimetype: cstring, filename: cstring
): Future[Result[string, string]] {.async: (raises: []).} =
if $filename != "" and not isValidFilename($filename):
return err("Invalid filename")
if $mimetype != "":
let m = newMimetypes()
if m.getExt($mimetype, "") == "":
return err("Invalid MIME type")
let sessionId = $nexUploadSessionCount
nexUploadSessionCount.inc()
let stream = BufferStream.new()
let lpStream = LPStream(stream)
let node = codex[].node
let fut = node.store(lpStream, ($filename).some, ($mimetype).some)
uploadSessions[sessionId] = UploadSession(stream: stream, fut: fut)
return ok(sessionId)
proc chunk(
codex: ptr CodexServer, sessionId: cstring, chunk: seq[byte]
): Future[Result[string, string]] {.async: (raises: []).} =
if not uploadSessions.contains($sessionId):
return err("Invalid session ID")
try:
let session = uploadSessions[$sessionId]
await session.stream.pushData(chunk)
except KeyError as e:
return err("Invalid session ID")
except LPError as e:
return err("Stream error: " & $e.msg)
except CancelledError as e:
return err("Operation cancelled")
return ok("")
proc finalize(
codex: ptr CodexServer, sessionId: cstring
): Future[Result[string, string]] {.async: (raises: []).} =
if not uploadSessions.contains($sessionId):
return err("Invalid session ID")
var session: UploadSession
try:
session = uploadSessions[$sessionId]
await session.stream.pushEof()
except KeyError as e:
return err("Invalid session ID")
except LPStreamError as e:
return err("Stream error: " & $e.msg)
except CancelledError as e:
return err("Operation cancelled")
# except LPError as e:
# return err("Stream error: " & $e.msg)
# except CancelledError as e:
# return err("Operation cancelled")
try:
let res = await session.fut
if res.isErr:
return err("Upload failed: " & res.error().msg)
return ok($res.get())
except CatchableError as e:
return err("Upload failed: " & $e.msg)
finally:
uploadSessions.del($sessionId)
proc cancel(
codex: ptr CodexServer, sessionId: cstring
): Future[Result[string, string]] {.async: (raises: []).} =
if not uploadSessions.contains($sessionId):
return err("Invalid session ID")
try:
let session = uploadSessions[$sessionId]
session.fut.cancel()
except KeyError as e:
return err("Invalid session ID")
uploadSessions.del($sessionId)
return ok("")
proc process*(
self: ptr NodeUploadRequest, codex: ptr CodexServer
): Future[Result[string, string]] {.async: (raises: []).} =
defer:
destroyShared(self)
case self.operation
of NodeUploadMsgType.INIT:
let res = (await init(codex, self.mimetype, self.filename))
if res.isErr:
error "INIT failed", error = res.error
return err($res.error)
return res
of NodeUploadMsgType.CHUNK:
let res = (await chunk(codex, self.sessionId, self.chunk))
if res.isErr:
error "CHUNK failed", error = res.error
return err($res.error)
return res
of NodeUploadMsgType.FINALIZE:
let res = (await finalize(codex, self.sessionId))
if res.isErr:
error "FINALIZE failed", error = res.error
return err($res.error)
return res
of NodeUploadMsgType.CANCEL:
let res = (await cancel(codex, self.sessionId))
if res.isErr:
error "CANCEL failed", error = res.error
return err($res.error)
return res
return ok("")

View File

@ -80,6 +80,34 @@ int codex_peer_debug(
CodexCallback callback,
void* userData);
int codex_upload_init(
void* ctx,
const char* mimetype,
const char* filename,
CodexCallback callback,
void* userData);
int codex_upload_chunk(
void* ctx,
const char* sessionId,
const uint8_t* chunk,
size_t len,
CodexCallback callback,
void* userData);
int codex_upload_finalize(
void* ctx,
const char* sessionId,
CodexCallback callback,
void* userData);
int codex_upload_cancel(
void* ctx,
const char* sessionId,
CodexCallback callback,
void* userData);
int codex_start(void* ctx,
CodexCallback callback,
void* userData);

View File

@ -33,6 +33,7 @@ import ./codex_thread_requests/requests/node_lifecycle_request
import ./codex_thread_requests/requests/node_info_request
import ./codex_thread_requests/requests/node_debug_request
import ./codex_thread_requests/requests/node_p2p_request
import ./codex_thread_requests/requests/node_upload_request
import ./ffi_types
from ../codex/conf import codexVersion, updateLogLevel
@ -281,6 +282,98 @@ proc codex_destroy(
return RET_OK
proc codex_upload_init(
ctx: ptr CodexContext,
mimetype: cstring,
filename: cstring,
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent = NodeUploadRequest.createShared(
NodeUploadMsgType.INIT, mimetype = mimetype, filename = filename
)
codex_context.sendRequestToCodexThread(
ctx, RequestType.UPLOAD, reqContent, callback, userData
).isOkOr:
let msg = "libcodex error: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
return RET_OK
proc codex_upload_chunk(
ctx: ptr CodexContext,
sessionId: cstring,
data: ptr byte,
len: int,
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let chunk = newSeq[byte](len)
copyMem(addr chunk[0], data, len)
let reqContent = NodeUploadRequest.createShared(
NodeUploadMsgType.CHUNK, sessionId = sessionId, chunk = chunk
)
codex_context.sendRequestToCodexThread(
ctx, RequestType.UPLOAD, reqContent, callback, userData
).isOkOr:
let msg = "libcodex error: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
return RET_OK
proc codex_upload_finalize(
ctx: ptr CodexContext,
sessionId: cstring,
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent =
NodeUploadRequest.createShared(NodeUploadMsgType.FINALIZE, sessionId = sessionId)
codex_context.sendRequestToCodexThread(
ctx, RequestType.UPLOAD, reqContent, callback, userData
).isOkOr:
let msg = "libcodex error: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
return RET_OK
proc codex_upload_cancel(
ctx: ptr CodexContext,
sessionId: cstring,
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
let reqContent =
NodeUploadRequest.createShared(NodeUploadMsgType.CANCEL, sessionId = sessionId)
codex_context.sendRequestToCodexThread(
ctx, RequestType.UPLOAD, reqContent, callback, userData
).isOkOr:
let msg = "libcodex error: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
return RET_OK
proc codex_start(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =