diff --git a/codex/testdata/hello.txt b/codex/testdata/hello.txt new file mode 100644 index 0000000..c57eff5 --- /dev/null +++ b/codex/testdata/hello.txt @@ -0,0 +1 @@ +Hello World! \ No newline at end of file diff --git a/codex/upload.go b/codex/upload.go new file mode 100644 index 0000000..f816a5e --- /dev/null +++ b/codex/upload.go @@ -0,0 +1,325 @@ +package codex + +/* + #include "bridge.h" + #include + + static int cGoCodexUploadInit(void* codexCtx, char* filepath, size_t chunkSize, void* resp) { + return codex_upload_init(codexCtx, filepath, chunkSize, (CodexCallback) callback, resp); + } + + static int cGoCodexUploadChunk(void* codexCtx, char* sessionId, const uint8_t* chunk, size_t len, void* resp) { + return codex_upload_chunk(codexCtx, sessionId, chunk, len, (CodexCallback) callback, resp); + } + + static int cGoCodexUploadFinalize(void* codexCtx, char* sessionId, void* resp) { + return codex_upload_finalize(codexCtx, sessionId, (CodexCallback) callback, resp); + } + + static int cGoCodexUploadCancel(void* codexCtx, char* sessionId, void* resp) { + return codex_upload_cancel(codexCtx, sessionId, (CodexCallback) callback, resp); + } + + static int cGoCodexUploadFile(void* codexCtx, char* sessionId, void* resp) { + return codex_upload_file(codexCtx, sessionId, (CodexCallback) callback, resp); + } +*/ +import "C" +import ( + "bytes" + "fmt" + "io" + "os" + "unsafe" +) + +const defaultBlockSize = 1024 * 64 + +type OnUploadProgressFunc func(read, total int, percent float64, err error) + +type ChunckSize int + +func (c ChunckSize) valOrDefault() int { + if c == 0 { + return defaultBlockSize + } + + return int(c) +} + +func (c ChunckSize) toSizeT() C.size_t { + return C.size_t(c.valOrDefault()) +} + +type CodexUploadOptions struct { + filepath string + chunkSize ChunckSize + onProgress OnUploadProgressFunc +} + +func getReaderSize(r io.Reader) int64 { + switch v := r.(type) { + case *os.File: + stat, err := v.Stat() + if err != nil { + return 0 + } + return stat.Size() + case *bytes.Buffer: + return int64(v.Len()) + default: + return 0 + } +} + +// UploadInit initializes a new upload session. +// It returns a session ID that can be used for subsequent upload operations. +// This function is called by UploadReader and UploadFile internally. +// You should use this function only if you need to manage the upload session manually. +// The options parameter contains the following fields: +// - filepath: the full path or the name of the file to upload. The metadata such as the +// filename and mimetype are extracted from this path / name. +// - chunkSize: the size of each upload chunk, passed as `blockSize` to the Codex node +// store. Default is to 64 KB. +// - onProgress: a callback function that is called after each chunk is uploaded. If the chunk +// size is more than the `chunkSize` parameter, the callback is called after the block is actually +// stored in the block store. Otherwise, it is called after the chunk is sent to the stream. +func (node CodexNode) UploadInit(options *CodexUploadOptions) (string, error) { + bridge := newBridgeCtx() + defer bridge.free() + + var cFilename = C.CString(options.filepath) + defer C.free(unsafe.Pointer(cFilename)) + + if C.cGoCodexUploadInit(node.ctx, cFilename, options.chunkSize.toSizeT(), bridge.resp) != C.RET_OK { + return "", bridge.callError("cGoCodexUploadInit") + } + + return bridge.wait() +} + +// UploadChunk uploads a chunk of data to the Codex node. +// It takes the session ID returned by UploadInit +// and a byte slice containing the chunk data. +// This function is called by UploadReader internally. +// You should use this function only if you need to manage the upload session manually. +func (node CodexNode) UploadChunk(sessionId string, chunk []byte) error { + bridge := newBridgeCtx() + defer bridge.free() + + var cSessionId = C.CString(sessionId) + defer C.free(unsafe.Pointer(cSessionId)) + + var cChunkPtr *C.uint8_t + if len(chunk) > 0 { + cChunkPtr = (*C.uint8_t)(unsafe.Pointer(&chunk[0])) + } + + if C.cGoCodexUploadChunk(node.ctx, cSessionId, cChunkPtr, C.size_t(len(chunk)), bridge.resp) != C.RET_OK { + return bridge.callError("cGoCodexUploadChunk") + } + + _, err := bridge.wait() + return err +} + +// UploadFinalize finalizes the upload session and returns the CID of the uploaded file. +// It takes the session ID returned by UploadInit. +// This function is called by UploadReader and UploadFile internally. +// You should use this function only if you need to manage the upload session manually. +func (node CodexNode) UploadFinalize(sessionId string) (string, error) { + bridge := newBridgeCtx() + defer bridge.free() + + var cSessionId = C.CString(sessionId) + defer C.free(unsafe.Pointer(cSessionId)) + + if C.cGoCodexUploadFinalize(node.ctx, cSessionId, bridge.resp) != C.RET_OK { + return "", bridge.callError("cGoCodexUploadFinalize") + } + + return bridge.wait() +} + +// UploadCancel cancels an ongoing upload session. +// It can be only if the upload session is managed manually. +// It doesn't work with UploadFile. +func (node CodexNode) UploadCancel(sessionId string) error { + bridge := newBridgeCtx() + defer bridge.free() + + var cSessionId = C.CString(sessionId) + defer C.free(unsafe.Pointer(cSessionId)) + + if C.cGoCodexUploadCancel(node.ctx, cSessionId, bridge.resp) != C.RET_OK { + return bridge.callError("cGoCodexUploadCancel") + } + + _, err := bridge.wait() + return err +} + +// UploadReader uploads data from an io.Reader to the Codex node. +// It takes the upload options and the reader as parameters. +// It returns the CID of the uploaded file or an error. +// +// The options parameter contains the following fields: +// - filepath: the name of the file to upload. +// - chunkSize: the size of each upload chunk, passed as `blockSize` to the Codex node +// store. Default is to 64 KB. +// - onProgress: a callback function that is called after each chunk is uploaded with: +// - read: the number of bytes read in the last chunk. +// - total: the total number of bytes read so far. +// - percent: the percentage of the total file size that has been uploaded. It is +// determined from a `stat` call if the reader is a file and from the length of the buffer +// if the reader is a buffer. Otherwise, it is 0. +// - err: an error, if one occurred. +// +// If the chunk size is more than the `chunkSize` parameter, the callback is called after +// the block is actually stored in the block store. Otherwise, it is called after the chunk +// is sent to the stream. +// +// Internally, it calls: +// - UploadInit to create the upload session. +// - UploadChunk to upload a chunk to codex. +// - UploadFinalize to finalize the upload session. +// - UploadCancel if an error occurs. +func (node CodexNode) UploadReader(options CodexUploadOptions, r io.Reader) (string, error) { + sessionId, err := node.UploadInit(&options) + if err != nil { + return "", err + } + + buf := make([]byte, options.chunkSize.valOrDefault()) + total := 0 + + var size int64 + if options.onProgress != nil { + size = getReaderSize(r) + } + + for { + n, err := r.Read(buf) + if err == io.EOF { + break + } + + if err != nil { + if cancelErr := node.UploadCancel(sessionId); cancelErr != nil { + return "", fmt.Errorf("failed to upload chunk %v and failed to cancel upload session %v", err, cancelErr) + } + + return "", err + } + + if n == 0 { + break + } + + if err := node.UploadChunk(sessionId, buf[:n]); err != nil { + if cancelErr := node.UploadCancel(sessionId); cancelErr != nil { + return "", fmt.Errorf("failed to upload chunk %v and failed to cancel upload session %v", err, cancelErr) + } + + return "", err + } + + total += n + if options.onProgress != nil && size > 0 { + percent := float64(total) / float64(size) * 100.0 + // The last block could be a bit over the size due to padding + // on the chunk size. + if percent > 100.0 { + percent = 100.0 + } + options.onProgress(n, total, percent, nil) + } else if options.onProgress != nil { + options.onProgress(n, total, 0, nil) + } + } + + return node.UploadFinalize(sessionId) +} + +// UploadReaderAsync is the asynchronous version of UploadReader using a goroutine. +func (node CodexNode) UploadReaderAsync(options CodexUploadOptions, r io.Reader, onDone func(cid string, err error)) { + go func() { + cid, err := node.UploadReader(options, r) + onDone(cid, err) + }() +} + +// UploadFile uploads a file to the Codex node. +// It takes the upload options as parameter. +// It returns the CID of the uploaded file or an error. +// +// The options parameter contains the following fields: +// - filepath: the full path of the file to upload. +// - chunkSize: the size of each upload chunk, passed as `blockSize` to the Codex node +// store. Default is to 64 KB. +// - onProgress: a callback function that is called after each chunk is uploaded with: +// - read: the number of bytes read in the last chunk. +// - total: the total number of bytes read so far. +// - percent: the percentage of the total file size that has been uploaded. It is +// determined from a `stat` call. +// - err: an error, if one occurred. +// +// If the chunk size is more than the `chunkSize` parameter, the callback is called after +// the block is actually stored in the block store. Otherwise, it is called after the chunk +// is sent to the stream. +// +// Internally, it calls UploadInit to create the upload session. +func (node CodexNode) UploadFile(options CodexUploadOptions) (string, error) { + bridge := newBridgeCtx() + defer bridge.free() + + if options.onProgress != nil { + stat, err := os.Stat(options.filepath) + if err != nil { + return "", err + } + + size := stat.Size() + total := 0 + + if size > 0 { + bridge.onProgress = func(read int, _ []byte) { + if read == 0 { + return + } + + total += read + percent := float64(total) / float64(size) * 100.0 + // The last block could be a bit over the size due to padding + // on the chunk size. + if percent > 100.0 { + percent = 100.0 + } + + options.onProgress(read, int(size), percent, nil) + } + } + } + + sessionId, err := node.UploadInit(&options) + if err != nil { + return "", err + } + + var cSessionId = C.CString(sessionId) + defer C.free(unsafe.Pointer(cSessionId)) + + if C.cGoCodexUploadFile(node.ctx, cSessionId, bridge.resp) != C.RET_OK { + return "", bridge.callError("cGoCodexUploadFile") + } + + return bridge.wait() +} + +// UploadFileAsync is the asynchronous version of UploadFile using a goroutine. +func (node CodexNode) UploadFileAsync(options CodexUploadOptions, onDone func(cid string, err error)) { + go func() { + cid, err := node.UploadFile(options) + onDone(cid, err) + }() +} diff --git a/codex/upload_test.go b/codex/upload_test.go new file mode 100644 index 0000000..d3852b0 --- /dev/null +++ b/codex/upload_test.go @@ -0,0 +1,127 @@ +package codex + +import ( + "bytes" + "log" + "os" + "testing" +) + +const expectedCID = "zDvZRwzmAkhzDRPH5EW242gJBNZ2T7aoH2v1fVH66FxXL4kSbvyM" + +func TestUploadReader(t *testing.T) { + start := true + codex := newCodexNode(t, start) + totalBytes := 0 + finalPercent := 0.0 + + buf := bytes.NewBuffer([]byte("Hello World!")) + len := buf.Len() + cid, err := codex.UploadReader(CodexUploadOptions{filepath: "hello.txt", onProgress: func(read, total int, percent float64, err error) { + if err != nil { + log.Fatalf("Error happened during upload: %v\n", err) + } + + totalBytes = total + finalPercent = percent + }}, buf) + + if err != nil { + t.Fatalf("UploadReader failed: %v", err) + } + + if cid != expectedCID { + t.Fatalf("UploadReader returned %s but expected %s", cid, expectedCID) + } + + if totalBytes != len { + t.Fatalf("UploadReader progress callback read %d bytes but expected %d", totalBytes, len) + } + + if finalPercent != 100.0 { + t.Fatalf("UploadReader progress callback final percent %.2f but expected 100.0", finalPercent) + } +} + +func TestUploadFile(t *testing.T) { + start := true + codex := newCodexNode(t, start) + totalBytes := 0 + finalPercent := 0.0 + + stat, err := os.Stat("./testdata/hello.txt") + if err != nil { + log.Fatalf("Error happened during file stat: %v\n", err) + } + + options := CodexUploadOptions{filepath: "./testdata/hello.txt", onProgress: func(read, total int, percent float64, err error) { + if err != nil { + log.Fatalf("Error happened during upload: %v\n", err) + } + + totalBytes = total + finalPercent = percent + }} + + cid, err := codex.UploadFile(options) + if err != nil { + t.Fatalf("UploadReader failed: %v", err) + } + + if cid != expectedCID { + t.Fatalf("UploadReader returned %s but expected %s", cid, expectedCID) + } + + if totalBytes != int(stat.Size()) { + t.Fatalf("UploadReader progress callback read %d bytes but expected %d", totalBytes, int(stat.Size())) + } + + if finalPercent != 100.0 { + t.Fatalf("UploadReader progress callback final percent %.2f but expected 100.0", finalPercent) + } +} + +func TestUploadFileNoProgress(t *testing.T) { + start := true + codex := newCodexNode(t, start) + + options := CodexUploadOptions{filepath: "./testdata/doesnt_exist.txt"} + + cid, err := codex.UploadFile(options) + if err == nil { + t.Fatalf("UploadReader should have failed") + } + + if cid != "" { + t.Fatalf("Cid should be empty but got %s", cid) + } +} + +func TestManualUpload(t *testing.T) { + start := true + codex := newCodexNode(t, start) + + sessionId, err := codex.UploadInit(&CodexUploadOptions{filepath: "hello.txt"}) + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + err = codex.UploadChunk(sessionId, []byte("Hello ")) + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + err = codex.UploadChunk(sessionId, []byte("World!")) + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + cid, err := codex.UploadFinalize(sessionId) + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + if cid != expectedCID { + t.Fatalf("UploadReader returned %s but expected %s", cid, expectedCID) + } +}