Add progress callback for upload

This commit is contained in:
Arnaud 2025-09-22 15:33:15 +02:00
parent f3d1ead21a
commit 8682987343
No known key found for this signature in database
GPG Key ID: B8FBC178F10CA7AE
6 changed files with 187 additions and 83 deletions

View File

@ -84,6 +84,8 @@ type
BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {.
gcsafe, async: (raises: [CancelledError])
.}
OnBlockStoreProc =
proc(chunk: seq[byte]): Future[void] {.gcsafe, async: (raises: [CancelledError]).}
func switch*(self: CodexNodeRef): Switch =
return self.switch
@ -434,6 +436,7 @@ proc store*(
filename: ?string = string.none,
mimetype: ?string = string.none,
blockSize = DefaultBlockSize,
onBlockStoreProc: OnBlockStoreProc = nil,
): Future[?!Cid] {.async.} =
## Save stream contents as dataset with given blockSize
## to nodes's BlockStore, and return Cid of its manifest
@ -463,6 +466,9 @@ proc store*(
if err =? (await self.networkStore.putBlock(blk)).errorOption:
error "Unable to store block", cid = blk.cid, err = err.msg
return failure(&"Unable to store block {blk.cid}")
if not onBlockStoreProc.isNil:
discard onBlockStoreProc(chunk)
except CancelledError as exc:
raise exc
except CatchableError as exc:

View File

@ -104,8 +104,8 @@ package main
return codex_peer_debug(codexCtx, peerId, (CodexCallback) callback, resp);
}
static int cGoCodexUploadInit(void* codexCtx, char* filepath, void* resp) {
return codex_upload_init(codexCtx, filepath, (CodexCallback) callback, resp);
static int cGoCodexUploadInit(void* codexCtx, char* filepath, size_t chunkSize, void* resp) {
return codex_upload_init(codexCtx, filepath, chunkSize, (CodexCallback) callback, resp);
}
static int cGoCodexUploadChunk(void* codexCtx, char* sessionId, const uint8_t* chunk, size_t len, void* resp) {
@ -120,8 +120,8 @@ package main
return codex_upload_cancel(codexCtx, sessionId, (CodexCallback) callback, resp);
}
static int cGoCodexUploadFile(void* codexCtx, char* sessionId, size_t chunkSize, void* resp) {
return codex_upload_file(codexCtx, sessionId, chunkSize, (CodexCallback) callback, resp);
static int cGoCodexUploadFile(void* codexCtx, char* sessionId, void* resp) {
return codex_upload_file(codexCtx, sessionId, (CodexCallback) callback, resp);
}
static int cGoCodexStart(void* codexCtx, void* resp) {
@ -253,12 +253,23 @@ type CodexNode struct {
ctx unsafe.Pointer
}
const defaultBlockSize = 1024 * 64
type CodexUploadOptions struct {
filepath string
chunkSize int
onProgress func(read, total int, percent float64)
}
type bridgeCtx struct {
wg *sync.WaitGroup
h cgo.Handle
resp unsafe.Pointer
result string
err error
// Callback used for upload and download
onProgress func(read int)
}
func newBridgeCtx() *bridgeCtx {
@ -273,18 +284,13 @@ func newBridgeCtx() *bridgeCtx {
}
func (b *bridgeCtx) free() {
b.h.Delete()
b.h = 0
C.freeResp(b.resp)
b.resp = nil
}
func (b *bridgeCtx) isOK() bool {
return C.getRet(b.resp) == C.RET_OK
}
func (b *bridgeCtx) isError() bool {
return C.getRet(b.resp) == C.RET_ERR
}
func (b *bridgeCtx) CallError(name string) error {
return fmt.Errorf("Failed the call to %s. Returned code: %d.", name, C.getRet(b.resp))
}
@ -295,6 +301,21 @@ func (b *bridgeCtx) wait() (string, error) {
return b.result, b.err
}
func getReaderSize(r io.Reader) int64 {
switch v := r.(type) {
case *os.File:
stat, err := v.Stat()
if err != nil {
return 0
}
return stat.Size()
case *bytes.Buffer:
return int64(v.Len())
default:
return 0
}
}
//export callback
func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
if resp == nil {
@ -306,6 +327,16 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
m.msg = msg
m.len = len
if ret == C.RET_PROGRESS {
if m.h != 0 {
h := cgo.Handle(m.h)
if v, ok := h.Value().(*bridgeCtx); ok && v.onProgress != nil {
v.onProgress(int(len))
}
}
return
}
if m.h == 0 {
return
}
@ -320,20 +351,15 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
if ret == C.RET_OK || ret == C.RET_ERR {
retMsg := C.GoStringN(msg, C.int(len))
// log.Println("Callback called with ret:", ret, " msg:", retMsg, " len:", len)
if ret == C.RET_OK {
v.result = retMsg
v.err = nil
} else {
v.err = errors.New(retMsg)
}
h.Delete()
m.h = 0
if v.wg != nil {
v.wg.Done()
v = nil
}
}
}
@ -500,14 +526,32 @@ func (self *CodexNode) CodexPeerDebug(peerId string) (RestPeerRecord, error) {
return record, err
}
func (self *CodexNode) CodexUploadInit(filename string) (string, error) {
func (self *CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
totalRead := 0
var cFilename = C.CString(filename)
bridge.onProgress = func(bytes int) {
if bytes == -1 {
bridge.free()
} else {
totalRead += bytes
if options.onProgress != nil {
options.onProgress(bytes, totalRead, 0)
}
}
}
var cFilename = C.CString(options.filepath)
defer C.free(unsafe.Pointer(cFilename))
if C.cGoCodexUploadInit(self.ctx, cFilename, bridge.resp) != C.RET_OK {
if options.chunkSize == 0 {
options.chunkSize = defaultBlockSize
}
var cChunkSize = C.size_t(options.chunkSize)
if C.cGoCodexUploadInit(self.ctx, cFilename, cChunkSize, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexUploadInit")
}
@ -563,13 +607,26 @@ func (self *CodexNode) CodexUploadCancel(sessionId string) error {
return err
}
func (self *CodexNode) CodexUploadReader(filename string, r io.Reader, chunkSize int) (string, error) {
sessionId, err := self.CodexUploadInit(filename)
func (self *CodexNode) CodexUploadReader(options CodexUploadOptions, r io.Reader) (string, error) {
if options.onProgress != nil {
size := getReaderSize(r)
if size > 0 {
fn := options.onProgress
options.onProgress = func(read, total int, _ float64) {
percent := float64(total) / float64(size) * 100.0
fn(read, total, percent)
}
}
}
sessionId, err := self.CodexUploadInit(&options)
if err != nil {
return "", err
}
buf := make([]byte, chunkSize)
buf := make([]byte, options.chunkSize)
for {
n, err := r.Read(buf)
if n > 0 {
@ -592,23 +649,38 @@ func (self *CodexNode) CodexUploadReader(filename string, r io.Reader, chunkSize
return self.CodexUploadFinalize(sessionId)
}
// TODO provide an async version of CodexUploadReader
// that starts a gorountine to upload the chunks
// and take:
// a callback to be called when done
// another callback to cancel the upload
func (self *CodexNode) CodexUploadReaderAsync(filename string, r io.Reader, chunkSize int) (string, error) {
return "", nil
func (self *CodexNode) CodexUploadReaderAsync(options CodexUploadOptions, r io.Reader, onDone func(cid string, err error)) {
go func() {
cid, err := self.CodexUploadReader(options, r)
onDone(cid, err)
}()
}
func (self *CodexNode) CodexUploadFile(filepath string, chunkSize int) (string, error) {
func (self *CodexNode) CodexUploadFile(options CodexUploadOptions) (string, error) {
if options.onProgress != nil {
stat, err := os.Stat(options.filepath)
if err != nil {
return "", err
}
size := stat.Size()
if size > 0 {
fn := options.onProgress
options.onProgress = func(read, total int, _ float64) {
percent := float64(total) / float64(size) * 100.0
fn(read, total, percent)
}
}
}
bridge := newBridgeCtx()
defer bridge.free()
var cFilePath = C.CString(filepath)
var cFilePath = C.CString(options.filepath)
defer C.free(unsafe.Pointer(cFilePath))
sessionId, err := self.CodexUploadInit(filepath)
sessionId, err := self.CodexUploadInit(&options)
if err != nil {
return "", err
}
@ -616,12 +688,7 @@ func (self *CodexNode) CodexUploadFile(filepath string, chunkSize int) (string,
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
var cChunkSize = C.size_t(0)
if chunkSize > 0 {
cChunkSize = C.size_t(chunkSize)
}
if C.cGoCodexUploadFile(self.ctx, cSessionId, cChunkSize, bridge.resp) != C.RET_OK {
if C.cGoCodexUploadFile(self.ctx, cSessionId, bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexUploadFile")
}
@ -634,7 +701,7 @@ func (self *CodexNode) CodexUploadFile(filepath string, chunkSize int) (string,
// and take:
// a callback to be called when done
// another callback to cancel the upload
func (self *CodexNode) CodexUploadFileAsync(filepath string, chunkSize int) (string, error) {
func (self *CodexNode) CodexUploadFileAsync(filepath string, chunkSize int, cb func(error)) (string, error) {
return "", nil
}
@ -650,20 +717,11 @@ func (self *CodexNode) CodexStart() error {
return err
}
func (self *CodexNode) CodexStartAsync(cb func(error)) error {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexStart(self.ctx, bridge.resp) != C.RET_OK {
return bridge.CallError("cGoCodexStart")
}
func (self *CodexNode) CodexStartAsync(onDone func(error)) {
go func() {
_, err := bridge.wait()
cb(err)
err := self.CodexStart()
onDone(err)
}()
return nil
}
func (self *CodexNode) CodexStop() error {
@ -717,7 +775,7 @@ func main() {
log.Println("Codex created.")
// node.CodexSetEventCallback()
node.CodexSetEventCallback()
version, err := node.CodexVersion()
if err != nil {
@ -783,7 +841,7 @@ func main() {
log.Println("Codex Log Level set to TRACE")
sessionId, err := node.CodexUploadInit("hello.txt")
sessionId, err := node.CodexUploadInit(&CodexUploadOptions{filepath: "hello.txt"})
if err != nil {
log.Fatal("Error happened:", err.Error())
}
@ -808,7 +866,7 @@ func main() {
log.Println("Codex Upload Finalized, cid:", cid)
buf := bytes.NewBuffer([]byte("Hello World!"))
cid, err = node.CodexUploadReader("hello.txt", buf, 16*1024)
cid, err = node.CodexUploadReader(CodexUploadOptions{filepath: "hello.txt"}, buf)
if err != nil {
log.Fatal("Error happened:", err.Error())
}
@ -820,9 +878,14 @@ func main() {
log.Fatal("Error happened:", err.Error())
}
// Choose a big file to see the progress logs
filepath := path.Join(current, "examples", "golang", "hello.txt")
log.Println("Uploading file:", filepath)
cid, err = node.CodexUploadFile(filepath, 1024)
// filepath := path.Join(current, "examples", "golang", "bigfile.zip")
cid, err = node.CodexUploadFile(CodexUploadOptions{filepath: filepath, onProgress: func(read, total int, percent float64) {
log.Printf("Uploaded %d bytes, total %d bytes (%.2f%%)\n", read, total, percent)
}})
if err != nil {
log.Fatal("Error happened:", err.Error())
}

View File

@ -18,12 +18,16 @@ type NodeUploadMsgType* = enum
CANCEL
FILE
type OnProgressHandler =
proc(bytes: int): Future[void] {.gcsafe, async: (raises: [CancelledError]).}
type NodeUploadRequest* = object
operation: NodeUploadMsgType
sessionId: cstring
filepath: cstring
chunk: seq[byte]
chunkSize: csize_t
onProgress: OnProgressHandler
type
UploadSessionId* = string
@ -32,6 +36,7 @@ type
stream: BufferStream
fut: Future[?!Cid]
filepath: string
chunkSize: int
var uploadSessions {.threadvar.}: Table[UploadSessionId, UploadSession]
var nexUploadSessionCount {.threadvar.}: UploadSessionCount
@ -42,6 +47,7 @@ proc createShared*(
sessionId: cstring = "",
filepath: cstring = "",
chunk: seq[byte] = @[],
onProgress: OnProgressHandler = nil,
chunkSize: csize_t = 0,
): ptr type T =
var ret = createShared(T)
@ -50,6 +56,7 @@ proc createShared*(
ret[].filepath = filepath.alloc()
ret[].chunk = chunk
ret[].chunkSize = chunkSize
ret[].onProgress = onProgress
return ret
proc destroyShared(self: ptr NodeUploadRequest) =
@ -64,7 +71,10 @@ proc destroyShared(self: ptr NodeUploadRequest) =
## or it can be the filename when the file will be uploaded via chunks.
## The mimetype is deduced from the filename extension.
proc init(
codex: ptr CodexServer, filepath: cstring = ""
codex: ptr CodexServer,
filepath: cstring = "",
chunkSize: csize_t = 0,
onProgress: OnProgressHandler,
): Future[Result[string, string]] {.async: (raises: []).} =
var filenameOpt, mimetypeOpt = string.none
@ -94,9 +104,25 @@ proc init(
let stream = BufferStream.new()
let lpStream = LPStream(stream)
let node = codex[].node
let fut = node.store(lpStream, filenameOpt, mimetypeOpt)
uploadSessions[sessionId] =
UploadSession(stream: stream, fut: fut, filepath: $filepath)
let onBlockStore = proc(
chunk: seq[byte]
): Future[void] {.gcsafe, async: (raises: [CancelledError]).} =
discard onProgress(chunk.len)
let blockSize =
if chunkSize.NBytes > 0.NBytes: chunkSize.NBytes else: DefaultBlockSize
let fut = node.store(lpStream, filenameOpt, mimetypeOpt, blockSize, onBlockStore)
proc cb(_: pointer) {.raises: [].} =
# Signal end of upload
discard onProgress(-1)
fut.addCallback(cb)
uploadSessions[sessionId] = UploadSession(
stream: stream, fut: fut, filepath: $filepath, chunkSize: blockSize.int
)
return ok(sessionId)
@ -163,20 +189,21 @@ proc cancel(
return ok("")
proc file(
codex: ptr CodexServer, sessionId: cstring, chunkSize: csize_t = 1024
codex: ptr CodexServer, sessionId: cstring
): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
if not uploadSessions.contains($sessionId):
return err("Invalid session ID")
let size = if chunkSize > 0: chunkSize else: 1024
var buffer = newSeq[byte](size)
var session: UploadSession
## Here we certainly need to spawn a new thread to avoid blocking
## the worker thread while reading the file.
try:
session = uploadSessions[$sessionId]
var buffer = newSeq[byte](session.chunkSize)
let fs = openFileStream(session.filepath)
defer:
fs.close()
while true:
let bytesRead = fs.readData(addr buffer[0], buffer.len)
@ -185,13 +212,7 @@ proc file(
break
await session.stream.pushData(buffer[0 ..< bytesRead])
await session.stream.pushEof()
let res = await session.fut
if res.isErr:
return err("Upload failed: " & res.error().msg)
return ok($res.get())
return await codex.finalize(sessionId)
except KeyError as e:
return err("Invalid session ID")
except LPStreamError, IOError:
@ -202,6 +223,7 @@ proc file(
except CatchableError as e:
return err("Upload failed: " & $e.msg)
finally:
session.fut.cancel()
uploadSessions.del($sessionId)
proc process*(
@ -212,7 +234,7 @@ proc process*(
case self.operation
of NodeUploadMsgType.INIT:
let res = (await init(codex, self.filepath))
let res = (await init(codex, self.filepath, self.chunkSize, self.onProgress))
if res.isErr:
error "INIT failed", error = res.error
return err($res.error)
@ -236,7 +258,7 @@ proc process*(
return err($res.error)
return res
of NodeUploadMsgType.FILE:
let res = (await file(codex, self.sessionId, self.chunkSize))
let res = (await file(codex, self.sessionId))
if res.isErr:
error "FILE failed", error = res.error
return err($res.error)

View File

@ -14,6 +14,7 @@ type CodexCallback* = proc(
const RET_OK*: cint = 0
const RET_ERR*: cint = 1
const RET_MISSING_CALLBACK*: cint = 2
const RET_PROGRESS*: cint = 3
## Returns RET_OK as acknowledgment and call the callback
## with RET_OK code and the provided message.

View File

@ -18,6 +18,7 @@
#define RET_OK 0
#define RET_ERR 1
#define RET_MISSING_CALLBACK 2
#define RET_PROGRESS 3
#ifdef __cplusplus
extern "C" {
@ -84,6 +85,7 @@ int codex_peer_debug(
int codex_upload_init(
void* ctx,
const char* filepath,
size_t chunkSize,
CodexCallback callback,
void* userData);
@ -110,7 +112,6 @@ int codex_upload_cancel(
int codex_upload_file(
void* ctx,
const char* sessionId,
size_t chunkSize,
CodexCallback callback,
void* userData);

View File

@ -247,12 +247,25 @@ proc codex_destroy(
return callback.success("", userData)
proc codex_upload_init(
ctx: ptr CodexContext, filepath: cstring, callback: CodexCallback, userData: pointer
ctx: ptr CodexContext,
filepath: cstring,
chunkSize: csize_t,
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
init(ctx, callback, userData)
let reqContent =
NodeUploadRequest.createShared(NodeUploadMsgType.INIT, filepath = filepath)
let onProgress = proc(
bytes: int
): Future[void] {.gcsafe, async: (raises: [CancelledError]).} =
callback(RET_PROGRESS, nil, bytes.csize_t, userData)
let reqContent = NodeUploadRequest.createShared(
NodeUploadMsgType.INIT,
filepath = filepath,
chunkSize = chunkSize,
onProgress = onProgress,
)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.UPLOAD, reqContent, callback, userData
)
@ -317,15 +330,13 @@ proc codex_upload_cancel(
proc codex_upload_file(
ctx: ptr CodexContext,
sessionId: cstring,
chunkSize: csize_t,
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
init(ctx, callback, userData)
let reqContent = NodeUploadRequest.createShared(
NodeUploadMsgType.FILE, sessionId = sessionId, chunkSize = chunkSize
)
let reqContent =
NodeUploadRequest.createShared(NodeUploadMsgType.FILE, sessionId = sessionId)
let res = codex_context.sendRequestToCodexThread(
ctx, RequestType.UPLOAD, reqContent, callback, userData