2025-10-08 06:40:59 +02:00

326 lines
9.9 KiB
Go

package codex
/*
#include "bridge.h"
#include <stdlib.h>
static int cGoCodexUploadInit(void* codexCtx, char* filepath, size_t chunkSize, void* resp) {
return codex_upload_init(codexCtx, filepath, chunkSize, (CodexCallback) callback, resp);
}
static int cGoCodexUploadChunk(void* codexCtx, char* sessionId, const uint8_t* chunk, size_t len, void* resp) {
return codex_upload_chunk(codexCtx, sessionId, chunk, len, (CodexCallback) callback, resp);
}
static int cGoCodexUploadFinalize(void* codexCtx, char* sessionId, void* resp) {
return codex_upload_finalize(codexCtx, sessionId, (CodexCallback) callback, resp);
}
static int cGoCodexUploadCancel(void* codexCtx, char* sessionId, void* resp) {
return codex_upload_cancel(codexCtx, sessionId, (CodexCallback) callback, resp);
}
static int cGoCodexUploadFile(void* codexCtx, char* sessionId, void* resp) {
return codex_upload_file(codexCtx, sessionId, (CodexCallback) callback, resp);
}
*/
import "C"
import (
"bytes"
"fmt"
"io"
"os"
"unsafe"
)
const defaultBlockSize = 1024 * 64
type OnUploadProgressFunc func(read, total int, percent float64, err error)
type ChunckSize int
func (c ChunckSize) valOrDefault() int {
if c == 0 {
return defaultBlockSize
}
return int(c)
}
func (c ChunckSize) toSizeT() C.size_t {
return C.size_t(c.valOrDefault())
}
type CodexUploadOptions struct {
filepath string
chunkSize ChunckSize
onProgress OnUploadProgressFunc
}
func getReaderSize(r io.Reader) int64 {
switch v := r.(type) {
case *os.File:
stat, err := v.Stat()
if err != nil {
return 0
}
return stat.Size()
case *bytes.Buffer:
return int64(v.Len())
default:
return 0
}
}
// UploadInit initializes a new upload session.
// It returns a session ID that can be used for subsequent upload operations.
// This function is called by UploadReader and UploadFile internally.
// You should use this function only if you need to manage the upload session manually.
// The options parameter contains the following fields:
// - filepath: the full path or the name of the file to upload. The metadata such as the
// filename and mimetype are extracted from this path / name.
// - chunkSize: the size of each upload chunk, passed as `blockSize` to the Codex node
// store. Default is to 64 KB.
// - onProgress: a callback function that is called after each chunk is uploaded. If the chunk
// size is more than the `chunkSize` parameter, the callback is called after the block is actually
// stored in the block store. Otherwise, it is called after the chunk is sent to the stream.
func (node CodexNode) UploadInit(options *CodexUploadOptions) (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
var cFilename = C.CString(options.filepath)
defer C.free(unsafe.Pointer(cFilename))
if C.cGoCodexUploadInit(node.ctx, cFilename, options.chunkSize.toSizeT(), bridge.resp) != C.RET_OK {
return "", bridge.callError("cGoCodexUploadInit")
}
return bridge.wait()
}
// UploadChunk uploads a chunk of data to the Codex node.
// It takes the session ID returned by UploadInit
// and a byte slice containing the chunk data.
// This function is called by UploadReader internally.
// You should use this function only if you need to manage the upload session manually.
func (node CodexNode) UploadChunk(sessionId string, chunk []byte) error {
bridge := newBridgeCtx()
defer bridge.free()
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
var cChunkPtr *C.uint8_t
if len(chunk) > 0 {
cChunkPtr = (*C.uint8_t)(unsafe.Pointer(&chunk[0]))
}
if C.cGoCodexUploadChunk(node.ctx, cSessionId, cChunkPtr, C.size_t(len(chunk)), bridge.resp) != C.RET_OK {
return bridge.callError("cGoCodexUploadChunk")
}
_, err := bridge.wait()
return err
}
// UploadFinalize finalizes the upload session and returns the CID of the uploaded file.
// It takes the session ID returned by UploadInit.
// This function is called by UploadReader and UploadFile internally.
// You should use this function only if you need to manage the upload session manually.
func (node CodexNode) UploadFinalize(sessionId string) (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
if C.cGoCodexUploadFinalize(node.ctx, cSessionId, bridge.resp) != C.RET_OK {
return "", bridge.callError("cGoCodexUploadFinalize")
}
return bridge.wait()
}
// UploadCancel cancels an ongoing upload session.
// It can be only if the upload session is managed manually.
// It doesn't work with UploadFile.
func (node CodexNode) UploadCancel(sessionId string) error {
bridge := newBridgeCtx()
defer bridge.free()
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
if C.cGoCodexUploadCancel(node.ctx, cSessionId, bridge.resp) != C.RET_OK {
return bridge.callError("cGoCodexUploadCancel")
}
_, err := bridge.wait()
return err
}
// UploadReader uploads data from an io.Reader to the Codex node.
// It takes the upload options and the reader as parameters.
// It returns the CID of the uploaded file or an error.
//
// The options parameter contains the following fields:
// - filepath: the name of the file to upload.
// - chunkSize: the size of each upload chunk, passed as `blockSize` to the Codex node
// store. Default is to 64 KB.
// - onProgress: a callback function that is called after each chunk is uploaded with:
// - read: the number of bytes read in the last chunk.
// - total: the total number of bytes read so far.
// - percent: the percentage of the total file size that has been uploaded. It is
// determined from a `stat` call if the reader is a file and from the length of the buffer
// if the reader is a buffer. Otherwise, it is 0.
// - err: an error, if one occurred.
//
// If the chunk size is more than the `chunkSize` parameter, the callback is called after
// the block is actually stored in the block store. Otherwise, it is called after the chunk
// is sent to the stream.
//
// Internally, it calls:
// - UploadInit to create the upload session.
// - UploadChunk to upload a chunk to codex.
// - UploadFinalize to finalize the upload session.
// - UploadCancel if an error occurs.
func (node CodexNode) UploadReader(options CodexUploadOptions, r io.Reader) (string, error) {
sessionId, err := node.UploadInit(&options)
if err != nil {
return "", err
}
buf := make([]byte, options.chunkSize.valOrDefault())
total := 0
var size int64
if options.onProgress != nil {
size = getReaderSize(r)
}
for {
n, err := r.Read(buf)
if err == io.EOF {
break
}
if err != nil {
if cancelErr := node.UploadCancel(sessionId); cancelErr != nil {
return "", fmt.Errorf("failed to upload chunk %v and failed to cancel upload session %v", err, cancelErr)
}
return "", err
}
if n == 0 {
break
}
if err := node.UploadChunk(sessionId, buf[:n]); err != nil {
if cancelErr := node.UploadCancel(sessionId); cancelErr != nil {
return "", fmt.Errorf("failed to upload chunk %v and failed to cancel upload session %v", err, cancelErr)
}
return "", err
}
total += n
if options.onProgress != nil && size > 0 {
percent := float64(total) / float64(size) * 100.0
// The last block could be a bit over the size due to padding
// on the chunk size.
if percent > 100.0 {
percent = 100.0
}
options.onProgress(n, total, percent, nil)
} else if options.onProgress != nil {
options.onProgress(n, total, 0, nil)
}
}
return node.UploadFinalize(sessionId)
}
// UploadReaderAsync is the asynchronous version of UploadReader using a goroutine.
func (node CodexNode) UploadReaderAsync(options CodexUploadOptions, r io.Reader, onDone func(cid string, err error)) {
go func() {
cid, err := node.UploadReader(options, r)
onDone(cid, err)
}()
}
// UploadFile uploads a file to the Codex node.
// It takes the upload options as parameter.
// It returns the CID of the uploaded file or an error.
//
// The options parameter contains the following fields:
// - filepath: the full path of the file to upload.
// - chunkSize: the size of each upload chunk, passed as `blockSize` to the Codex node
// store. Default is to 64 KB.
// - onProgress: a callback function that is called after each chunk is uploaded with:
// - read: the number of bytes read in the last chunk.
// - total: the total number of bytes read so far.
// - percent: the percentage of the total file size that has been uploaded. It is
// determined from a `stat` call.
// - err: an error, if one occurred.
//
// If the chunk size is more than the `chunkSize` parameter, the callback is called after
// the block is actually stored in the block store. Otherwise, it is called after the chunk
// is sent to the stream.
//
// Internally, it calls UploadInit to create the upload session.
func (node CodexNode) UploadFile(options CodexUploadOptions) (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
if options.onProgress != nil {
stat, err := os.Stat(options.filepath)
if err != nil {
return "", err
}
size := stat.Size()
total := 0
if size > 0 {
bridge.onProgress = func(read int, _ []byte) {
if read == 0 {
return
}
total += read
percent := float64(total) / float64(size) * 100.0
// The last block could be a bit over the size due to padding
// on the chunk size.
if percent > 100.0 {
percent = 100.0
}
options.onProgress(read, int(size), percent, nil)
}
}
}
sessionId, err := node.UploadInit(&options)
if err != nil {
return "", err
}
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
if C.cGoCodexUploadFile(node.ctx, cSessionId, bridge.resp) != C.RET_OK {
return "", bridge.callError("cGoCodexUploadFile")
}
return bridge.wait()
}
// UploadFileAsync is the asynchronous version of UploadFile using a goroutine.
func (node CodexNode) UploadFileAsync(options CodexUploadOptions, onDone func(cid string, err error)) {
go func() {
cid, err := node.UploadFile(options)
onDone(cid, err)
}()
}