Add download streaming mode

This commit is contained in:
Arnaud 2025-09-30 12:47:44 +02:00 committed by Eric
parent 5cc4f24dc5
commit 0a98b8b250
No known key found for this signature in database
5 changed files with 144 additions and 79 deletions

View File

@ -132,8 +132,8 @@ package main
return codex_download_chunk(codexCtx, cid, (CodexCallback) callback, resp);
}
static int cGoCodexDownloadLocal(void* codexCtx, char* cid, size_t chunkSize, void* resp) {
return codex_download_local(codexCtx, cid, chunkSize, (CodexCallback) callback, resp);
static int cGoCodexDownloadStream(void* codexCtx, char* cid, size_t chunkSize, bool local, const char* filepath, void* resp) {
return codex_download_stream(codexCtx, cid, chunkSize, local, filepath, (CodexCallback) callback, resp);
}
static int cGoCodexDownloadCancel(void* codexCtx, char* cid, void* resp) {
@ -270,12 +270,39 @@ type CodexNode struct {
const defaultBlockSize = 1024 * 64
type OnProgressFunc func(read, total int, percent float64, err error)
type OnUploadProgressFunc func(read, total int, percent float64, err error)
type ChunckSize int
type CodexUploadOptions struct {
filepath string
chunkSize int
onProgress OnProgressFunc
chunkSize ChunckSize
onProgress OnUploadProgressFunc
}
func (c ChunckSize) valOrDefault() int {
if c == 0 {
return defaultBlockSize
}
return int(c)
}
func (c ChunckSize) toSizeT() C.size_t {
return C.size_t(c.valOrDefault())
}
type CodexDownloadStreamOptions = struct {
filepath string
chunkSize ChunckSize
onProgress OnUploadProgressFunc
writer io.Writer
local bool
}
type CodexDownloadInitOptions = struct {
local bool
chunkSize ChunckSize
}
type bridgeCtx struct {
@ -285,8 +312,16 @@ type bridgeCtx struct {
result string
err error
// Callback used for upload and download
onProgress func(read int, chunk []byte)
// Callback used for receiving progress updates during upload/download.
//
// For the upload, the bytes parameter indicates the number of bytes uploaded.
// If the chunk size is superior or equal to the blocksize (passed in init function),
// the callback will be called when a block is put in the store.
// Otherwise, it will be called when a chunk is pushed into the stream.
//
// For the download, the bytes is the size of the chunk received, and the chunk
// is the actual chunk of data received.
onProgress func(bytes int, chunk []byte)
}
func newBridgeCtx() *bridgeCtx {
@ -559,12 +594,7 @@ func (self CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, erro
var cFilename = C.CString(options.filepath)
defer C.free(unsafe.Pointer(cFilename))
if options.chunkSize == 0 {
options.chunkSize = defaultBlockSize
}
var cChunkSize = C.size_t(options.chunkSize)
if C.cGoCodexUploadInit(self.ctx, cFilename, cChunkSize, bridge.resp) != C.RET_OK {
if C.cGoCodexUploadInit(self.ctx, cFilename, options.chunkSize.toSizeT(), bridge.resp) != C.RET_OK {
return "", bridge.CallError("cGoCodexUploadInit")
}
@ -629,11 +659,7 @@ func (self CodexNode) CodexUploadReader(options CodexUploadOptions, r io.Reader)
return "", err
}
if options.chunkSize == 0 {
options.chunkSize = defaultBlockSize
}
buf := make([]byte, options.chunkSize)
buf := make([]byte, options.chunkSize.valOrDefault())
total := 0
var size int64
@ -746,29 +772,44 @@ func (self CodexNode) CodexUploadFileAsync(options CodexUploadOptions, onDone fu
}()
}
func (self CodexNode) CodexDownloadLocal(cid string, chunkSize int, w io.Writer) error {
func (self CodexNode) CodexDownloadStream(cid string, options CodexDownloadStreamOptions) error {
bridge := newBridgeCtx()
defer bridge.free()
total := 0
bridge.onProgress = func(read int, chunk []byte) {
if read == 0 {
return
}
if _, err := w.Write(chunk); err != nil {
log.Println(err)
if options.writer != nil {
w := options.writer
if _, err := w.Write(chunk); err != nil {
if options.onProgress != nil {
options.onProgress(0, 0, 0.0, err)
}
}
}
total += read
if options.onProgress != nil {
// TODO: retrieve the total size from the manifest of from the options struct
percent := 0.0
options.onProgress(read, total, percent, nil)
}
}
var cCid = C.CString(cid)
defer C.free(unsafe.Pointer(cCid))
if chunkSize == 0 {
chunkSize = defaultBlockSize
}
var cChunkSize = C.size_t(chunkSize)
var cFilepath = C.CString(options.filepath)
defer C.free(unsafe.Pointer(cFilepath))
if C.cGoCodexDownloadLocal(self.ctx, cCid, cChunkSize, bridge.resp) != C.RET_OK {
var cLocal = C.bool(options.local)
if C.cGoCodexDownloadStream(self.ctx, cCid, options.chunkSize.toSizeT(), cLocal, cFilepath, bridge.resp) != C.RET_OK {
return bridge.CallError("cGoCodexDownloadLocal")
}
@ -777,27 +818,16 @@ func (self CodexNode) CodexDownloadLocal(cid string, chunkSize int, w io.Writer)
return err
}
func (self CodexNode) CodexDownloadLocalAsync(cid string, chunkSize int, w io.Writer, onDone func(error)) {
go func() {
err := self.CodexDownloadLocal(cid, chunkSize, w)
onDone(err)
}()
}
func (self CodexNode) CodexDownloadInit(cid string, chunkSize int, local bool) error {
func (self CodexNode) CodexDownloadInit(cid string, options CodexDownloadInitOptions) error {
bridge := newBridgeCtx()
defer bridge.free()
var cCid = C.CString(cid)
defer C.free(unsafe.Pointer(cCid))
if chunkSize == 0 {
chunkSize = defaultBlockSize
}
var cChunkSize = C.size_t(chunkSize)
var cLocal = C.bool(local)
var cLocal = C.bool(options.local)
if C.cGoCodexDownloadInit(self.ctx, cCid, cChunkSize, cLocal, bridge.resp) != C.RET_OK {
if C.cGoCodexDownloadInit(self.ctx, cCid, options.chunkSize.toSizeT(), cLocal, bridge.resp) != C.RET_OK {
return bridge.CallError("cGoCodexDownloadInit")
}
@ -1009,7 +1039,7 @@ func main() {
buf := bytes.NewBuffer([]byte("Hello World!"))
cid, err = node.CodexUploadReader(CodexUploadOptions{filepath: "hello.txt", onProgress: func(read, total int, percent float64, err error) {
if err != nil {
log.Fatal("Error happened during upload: %v\n", err)
log.Fatalf("Error happened during upload: %v\n", err)
}
log.Printf("Uploaded %d bytes, total %d bytes (%.2f%%)\n", read, total, percent)
@ -1030,7 +1060,7 @@ func main() {
options := CodexUploadOptions{filepath: filepath, onProgress: func(read, total int, percent float64, err error) {
if err != nil {
log.Fatal("Error happened during upload: %v\n", err)
log.Fatalf("Error happened during upload: %v\n", err)
}
log.Printf("Uploaded %d bytes, total %d bytes (%.2f%%)\n", read, total, percent)
@ -1050,17 +1080,20 @@ func main() {
}
defer f.Close()
// log.Println("Codex Download Local starting... attempt", i+1)
if err := node.CodexDownloadLocal(cid, 0, f); err != nil {
if err := node.CodexDownloadStream(cid,
CodexDownloadStreamOptions{writer: f, filepath: "hello.reloaded.txt",
onProgress: func(read, total int, percent float64, err error) {
log.Println("Downloaded", read, "bytes. Total:", total, "bytes (", percent, "%)")
},
}); err != nil {
log.Fatal("Error happened:", err.Error())
}
log.Println("Codex Download Local finished.")
log.Println("Codex Download finished.")
// log.Println("Codex Download Init starting... attempt", i+1)
if err := node.CodexDownloadInit(cid, 0, true); err != nil {
if err := node.CodexDownloadInit(cid, CodexDownloadInitOptions{local: true}); err != nil {
log.Fatal("Error happened:", err.Error())
}

View File

@ -1,6 +1,19 @@
{.push raises: [].}
## This file contains the download request.
## A session is created for each download identified by the CID,
## allowing to resume, pause and cancel the download (using chunks).
##
## There are two ways to download a file:
## 1. Via chunks: the cid parameter is the CID of the file to download. Steps are:
## - INIT: initializes the download session
## - CHUNK: downloads the next chunk of the file
## - CANCEL: cancels the download session
## 2. Via stream.
## - STREAM: downloads the file in a streaming manner, calling
## the onChunk handler for each chunk and / or writing to a file if filepath is set.
## Cancel is supported in this mode because the worker will be busy
## downloading the file so it cannot pickup another request to cancel the download.
import std/[options, streams]
import chronos
@ -19,9 +32,8 @@ logScope:
type NodeDownloadMsgType* = enum
INIT
LOCAL
NETWORK
CHUNK
STREAM
CANCEL
type OnChunkHandler = proc(bytes: seq[byte]): void {.gcsafe, raises: [].}
@ -31,6 +43,7 @@ type NodeDownloadRequest* = object
cid: cstring
chunkSize: csize_t
local: bool
filepath: cstring
type
DownloadSessionId* = string
@ -47,12 +60,14 @@ proc createShared*(
cid: cstring = "",
chunkSize: csize_t = 0,
local: bool = false,
filepath: cstring = "",
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].cid = cid.alloc()
ret[].chunkSize = chunkSize
ret[].local = local
ret[].filepath = filepath.alloc()
return ret
@ -60,10 +75,7 @@ proc destroyShared(self: ptr NodeDownloadRequest) =
deallocShared(self)
proc init(
codex: ptr CodexServer,
cCid: cstring = "",
chunkSize: csize_t = 0,
local: bool = true,
codex: ptr CodexServer, cCid: cstring = "", chunkSize: csize_t = 0, local: bool
): Future[Result[string, string]] {.async: (raises: []).} =
if downloadSessions.contains($cCid):
return ok("Download session already exists.")
@ -128,12 +140,13 @@ proc chunk(
return ok("")
proc streamFile(
proc streamData(
codex: ptr CodexServer,
cid: Cid,
local: bool = true,
local: bool,
onChunk: OnChunkHandler,
chunkSize: csize_t,
filepath: cstring,
): Future[Result[string, string]] {.async: (raises: [CancelledError]).} =
let node = codex[].node
@ -148,9 +161,14 @@ proc streamFile(
let blockSize = if chunkSize.int > 0: chunkSize.int else: DefaultBlockSize.int
var buf = newSeq[byte](blockSize)
var read = 0
var outputStream: OutputStreamHandle
var filedest: string = $filepath
try:
if filepath != "":
outputStream = filedest.fileOutput()
while not stream.atEof:
let read = await stream.readOnce(addr buf[0], buf.len)
buf.setLen(read)
@ -158,18 +176,30 @@ proc streamFile(
if buf.len <= 0:
break
if onChunk != nil:
onChunk(buf)
onChunk(buf)
if outputStream != nil:
outputStream.write(buf)
if outputStream != nil:
outputStream.close()
except LPStreamError as e:
return err("Failed to stream file: " & $e.msg)
except IOError as e:
return err("Failed to write to file: " & $e.msg)
finally:
await stream.close()
downloadSessions.del($cid)
return ok("")
proc local(
codex: ptr CodexServer, cCid: cstring, chunkSize: csize_t, onChunk: OnChunkHandler
proc stream(
codex: ptr CodexServer,
cCid: cstring,
chunkSize: csize_t,
local: bool,
filepath: cstring,
onChunk: OnChunkHandler,
): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
let node = codex[].node
@ -178,8 +208,7 @@ proc local(
return err("Failed to download locally: cannot parse cid: " & $cCid)
try:
let local = true
let res = await codex.streamFile(cid.get(), true, onChunk, chunkSize)
let res = await codex.streamData(cid.get(), local, onChunk, chunkSize, filepath)
if res.isErr:
return err($res.error)
except CancelledError:
@ -218,25 +247,20 @@ proc process*(
error "Failed to INIT.", error = res.error
return err($res.error)
return res
of NodeDownloadMsgType.LOCAL:
let res = (await local(codex, self.cid, self.chunkSize, onChunk))
if res.isErr:
error "Failed to LOCAL.", error = res.error
return err($res.error)
return res
of NodeDownloadMsgType.NETWORK:
return err("NETWORK download not implemented yet.")
# let res = (await local(codex, self.cid, self.onChunk2, self.chunkSize, onChunk))
# if res.isErr:
# error "Failed to NETWORK.", error = res.error
# return err($res.error)
# return res
of NodeDownloadMsgType.CHUNK:
let res = (await chunk(codex, self.cid, onChunk))
if res.isErr:
error "Failed to CHUNK.", error = res.error
return err($res.error)
return res
of NodeDownloadMsgType.STREAM:
let res = (
await stream(codex, self.cid, self.chunkSize, self.local, self.filepath, onChunk)
)
if res.isErr:
error "Failed to STREAM.", error = res.error
return err($res.error)
return res
of NodeDownloadMsgType.CANCEL:
let res = (await cancel(codex, self.cid))
if res.isErr:

View File

@ -2,7 +2,7 @@
## This file contains the upload request.
## A session is created for each upload allowing to resume,
## pause (using chunks) and cancels uploads.
## pause and cancel uploads (using chunks).
##
## There are two ways to upload a file:
## 1. Via chunks: the filepath parameter is the data filename. Steps are:

View File

@ -115,10 +115,12 @@ int codex_upload_file(
CodexCallback callback,
void* userData);
int codex_download_local(
int codex_download_stream(
void* ctx,
const char* cid,
size_t chunkSize,
bool local,
const char* filepath,
CodexCallback callback,
void* userData);

View File

@ -387,10 +387,12 @@ proc codex_download_chunk(
result = callback.okOrError(res, userData)
proc codex_download_local(
proc codex_download_stream(
ctx: ptr CodexContext,
cid: cstring,
chunkSize: csize_t,
local: bool,
filepath: cstring,
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
@ -398,7 +400,11 @@ proc codex_download_local(
checkLibcodexParams(ctx, callback, userData)
let req = NodeDownloadRequest.createShared(
NodeDownloadMsgType.LOCAL, cid = cid, chunkSize = chunkSize
NodeDownloadMsgType.STREAM,
cid = cid,
chunkSize = chunkSize,
local = local,
filepath = filepath,
)
let res = codex_context.sendRequestToCodexThread(