mirror of
https://github.com/logos-storage/logos-storage-go-bindings.git
synced 2026-01-02 13:33:10 +00:00
Add upload
This commit is contained in:
parent
3a4e2f0c94
commit
609030cc13
1
codex/testdata/hello.txt
vendored
Normal file
1
codex/testdata/hello.txt
vendored
Normal file
@ -0,0 +1 @@
|
||||
Hello World!
|
||||
325
codex/upload.go
Normal file
325
codex/upload.go
Normal file
@ -0,0 +1,325 @@
|
||||
package codex
|
||||
|
||||
/*
|
||||
#include "bridge.h"
|
||||
#include <stdlib.h>
|
||||
|
||||
static int cGoCodexUploadInit(void* codexCtx, char* filepath, size_t chunkSize, void* resp) {
|
||||
return codex_upload_init(codexCtx, filepath, chunkSize, (CodexCallback) callback, resp);
|
||||
}
|
||||
|
||||
static int cGoCodexUploadChunk(void* codexCtx, char* sessionId, const uint8_t* chunk, size_t len, void* resp) {
|
||||
return codex_upload_chunk(codexCtx, sessionId, chunk, len, (CodexCallback) callback, resp);
|
||||
}
|
||||
|
||||
static int cGoCodexUploadFinalize(void* codexCtx, char* sessionId, void* resp) {
|
||||
return codex_upload_finalize(codexCtx, sessionId, (CodexCallback) callback, resp);
|
||||
}
|
||||
|
||||
static int cGoCodexUploadCancel(void* codexCtx, char* sessionId, void* resp) {
|
||||
return codex_upload_cancel(codexCtx, sessionId, (CodexCallback) callback, resp);
|
||||
}
|
||||
|
||||
static int cGoCodexUploadFile(void* codexCtx, char* sessionId, void* resp) {
|
||||
return codex_upload_file(codexCtx, sessionId, (CodexCallback) callback, resp);
|
||||
}
|
||||
*/
|
||||
import "C"
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
const defaultBlockSize = 1024 * 64
|
||||
|
||||
type OnUploadProgressFunc func(read, total int, percent float64, err error)
|
||||
|
||||
type ChunckSize int
|
||||
|
||||
func (c ChunckSize) valOrDefault() int {
|
||||
if c == 0 {
|
||||
return defaultBlockSize
|
||||
}
|
||||
|
||||
return int(c)
|
||||
}
|
||||
|
||||
func (c ChunckSize) toSizeT() C.size_t {
|
||||
return C.size_t(c.valOrDefault())
|
||||
}
|
||||
|
||||
type CodexUploadOptions struct {
|
||||
filepath string
|
||||
chunkSize ChunckSize
|
||||
onProgress OnUploadProgressFunc
|
||||
}
|
||||
|
||||
func getReaderSize(r io.Reader) int64 {
|
||||
switch v := r.(type) {
|
||||
case *os.File:
|
||||
stat, err := v.Stat()
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return stat.Size()
|
||||
case *bytes.Buffer:
|
||||
return int64(v.Len())
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
// UploadInit initializes a new upload session.
|
||||
// It returns a session ID that can be used for subsequent upload operations.
|
||||
// This function is called by UploadReader and UploadFile internally.
|
||||
// You should use this function only if you need to manage the upload session manually.
|
||||
// The options parameter contains the following fields:
|
||||
// - filepath: the full path or the name of the file to upload. The metadata such as the
|
||||
// filename and mimetype are extracted from this path / name.
|
||||
// - chunkSize: the size of each upload chunk, passed as `blockSize` to the Codex node
|
||||
// store. Default is to 64 KB.
|
||||
// - onProgress: a callback function that is called after each chunk is uploaded. If the chunk
|
||||
// size is more than the `chunkSize` parameter, the callback is called after the block is actually
|
||||
// stored in the block store. Otherwise, it is called after the chunk is sent to the stream.
|
||||
func (node CodexNode) UploadInit(options *CodexUploadOptions) (string, error) {
|
||||
bridge := newBridgeCtx()
|
||||
defer bridge.free()
|
||||
|
||||
var cFilename = C.CString(options.filepath)
|
||||
defer C.free(unsafe.Pointer(cFilename))
|
||||
|
||||
if C.cGoCodexUploadInit(node.ctx, cFilename, options.chunkSize.toSizeT(), bridge.resp) != C.RET_OK {
|
||||
return "", bridge.callError("cGoCodexUploadInit")
|
||||
}
|
||||
|
||||
return bridge.wait()
|
||||
}
|
||||
|
||||
// UploadChunk uploads a chunk of data to the Codex node.
|
||||
// It takes the session ID returned by UploadInit
|
||||
// and a byte slice containing the chunk data.
|
||||
// This function is called by UploadReader internally.
|
||||
// You should use this function only if you need to manage the upload session manually.
|
||||
func (node CodexNode) UploadChunk(sessionId string, chunk []byte) error {
|
||||
bridge := newBridgeCtx()
|
||||
defer bridge.free()
|
||||
|
||||
var cSessionId = C.CString(sessionId)
|
||||
defer C.free(unsafe.Pointer(cSessionId))
|
||||
|
||||
var cChunkPtr *C.uint8_t
|
||||
if len(chunk) > 0 {
|
||||
cChunkPtr = (*C.uint8_t)(unsafe.Pointer(&chunk[0]))
|
||||
}
|
||||
|
||||
if C.cGoCodexUploadChunk(node.ctx, cSessionId, cChunkPtr, C.size_t(len(chunk)), bridge.resp) != C.RET_OK {
|
||||
return bridge.callError("cGoCodexUploadChunk")
|
||||
}
|
||||
|
||||
_, err := bridge.wait()
|
||||
return err
|
||||
}
|
||||
|
||||
// UploadFinalize finalizes the upload session and returns the CID of the uploaded file.
|
||||
// It takes the session ID returned by UploadInit.
|
||||
// This function is called by UploadReader and UploadFile internally.
|
||||
// You should use this function only if you need to manage the upload session manually.
|
||||
func (node CodexNode) UploadFinalize(sessionId string) (string, error) {
|
||||
bridge := newBridgeCtx()
|
||||
defer bridge.free()
|
||||
|
||||
var cSessionId = C.CString(sessionId)
|
||||
defer C.free(unsafe.Pointer(cSessionId))
|
||||
|
||||
if C.cGoCodexUploadFinalize(node.ctx, cSessionId, bridge.resp) != C.RET_OK {
|
||||
return "", bridge.callError("cGoCodexUploadFinalize")
|
||||
}
|
||||
|
||||
return bridge.wait()
|
||||
}
|
||||
|
||||
// UploadCancel cancels an ongoing upload session.
|
||||
// It can be only if the upload session is managed manually.
|
||||
// It doesn't work with UploadFile.
|
||||
func (node CodexNode) UploadCancel(sessionId string) error {
|
||||
bridge := newBridgeCtx()
|
||||
defer bridge.free()
|
||||
|
||||
var cSessionId = C.CString(sessionId)
|
||||
defer C.free(unsafe.Pointer(cSessionId))
|
||||
|
||||
if C.cGoCodexUploadCancel(node.ctx, cSessionId, bridge.resp) != C.RET_OK {
|
||||
return bridge.callError("cGoCodexUploadCancel")
|
||||
}
|
||||
|
||||
_, err := bridge.wait()
|
||||
return err
|
||||
}
|
||||
|
||||
// UploadReader uploads data from an io.Reader to the Codex node.
|
||||
// It takes the upload options and the reader as parameters.
|
||||
// It returns the CID of the uploaded file or an error.
|
||||
//
|
||||
// The options parameter contains the following fields:
|
||||
// - filepath: the name of the file to upload.
|
||||
// - chunkSize: the size of each upload chunk, passed as `blockSize` to the Codex node
|
||||
// store. Default is to 64 KB.
|
||||
// - onProgress: a callback function that is called after each chunk is uploaded with:
|
||||
// - read: the number of bytes read in the last chunk.
|
||||
// - total: the total number of bytes read so far.
|
||||
// - percent: the percentage of the total file size that has been uploaded. It is
|
||||
// determined from a `stat` call if the reader is a file and from the length of the buffer
|
||||
// if the reader is a buffer. Otherwise, it is 0.
|
||||
// - err: an error, if one occurred.
|
||||
//
|
||||
// If the chunk size is more than the `chunkSize` parameter, the callback is called after
|
||||
// the block is actually stored in the block store. Otherwise, it is called after the chunk
|
||||
// is sent to the stream.
|
||||
//
|
||||
// Internally, it calls:
|
||||
// - UploadInit to create the upload session.
|
||||
// - UploadChunk to upload a chunk to codex.
|
||||
// - UploadFinalize to finalize the upload session.
|
||||
// - UploadCancel if an error occurs.
|
||||
func (node CodexNode) UploadReader(options CodexUploadOptions, r io.Reader) (string, error) {
|
||||
sessionId, err := node.UploadInit(&options)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
buf := make([]byte, options.chunkSize.valOrDefault())
|
||||
total := 0
|
||||
|
||||
var size int64
|
||||
if options.onProgress != nil {
|
||||
size = getReaderSize(r)
|
||||
}
|
||||
|
||||
for {
|
||||
n, err := r.Read(buf)
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if cancelErr := node.UploadCancel(sessionId); cancelErr != nil {
|
||||
return "", fmt.Errorf("failed to upload chunk %v and failed to cancel upload session %v", err, cancelErr)
|
||||
}
|
||||
|
||||
return "", err
|
||||
}
|
||||
|
||||
if n == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
if err := node.UploadChunk(sessionId, buf[:n]); err != nil {
|
||||
if cancelErr := node.UploadCancel(sessionId); cancelErr != nil {
|
||||
return "", fmt.Errorf("failed to upload chunk %v and failed to cancel upload session %v", err, cancelErr)
|
||||
}
|
||||
|
||||
return "", err
|
||||
}
|
||||
|
||||
total += n
|
||||
if options.onProgress != nil && size > 0 {
|
||||
percent := float64(total) / float64(size) * 100.0
|
||||
// The last block could be a bit over the size due to padding
|
||||
// on the chunk size.
|
||||
if percent > 100.0 {
|
||||
percent = 100.0
|
||||
}
|
||||
options.onProgress(n, total, percent, nil)
|
||||
} else if options.onProgress != nil {
|
||||
options.onProgress(n, total, 0, nil)
|
||||
}
|
||||
}
|
||||
|
||||
return node.UploadFinalize(sessionId)
|
||||
}
|
||||
|
||||
// UploadReaderAsync is the asynchronous version of UploadReader using a goroutine.
|
||||
func (node CodexNode) UploadReaderAsync(options CodexUploadOptions, r io.Reader, onDone func(cid string, err error)) {
|
||||
go func() {
|
||||
cid, err := node.UploadReader(options, r)
|
||||
onDone(cid, err)
|
||||
}()
|
||||
}
|
||||
|
||||
// UploadFile uploads a file to the Codex node.
|
||||
// It takes the upload options as parameter.
|
||||
// It returns the CID of the uploaded file or an error.
|
||||
//
|
||||
// The options parameter contains the following fields:
|
||||
// - filepath: the full path of the file to upload.
|
||||
// - chunkSize: the size of each upload chunk, passed as `blockSize` to the Codex node
|
||||
// store. Default is to 64 KB.
|
||||
// - onProgress: a callback function that is called after each chunk is uploaded with:
|
||||
// - read: the number of bytes read in the last chunk.
|
||||
// - total: the total number of bytes read so far.
|
||||
// - percent: the percentage of the total file size that has been uploaded. It is
|
||||
// determined from a `stat` call.
|
||||
// - err: an error, if one occurred.
|
||||
//
|
||||
// If the chunk size is more than the `chunkSize` parameter, the callback is called after
|
||||
// the block is actually stored in the block store. Otherwise, it is called after the chunk
|
||||
// is sent to the stream.
|
||||
//
|
||||
// Internally, it calls UploadInit to create the upload session.
|
||||
func (node CodexNode) UploadFile(options CodexUploadOptions) (string, error) {
|
||||
bridge := newBridgeCtx()
|
||||
defer bridge.free()
|
||||
|
||||
if options.onProgress != nil {
|
||||
stat, err := os.Stat(options.filepath)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
size := stat.Size()
|
||||
total := 0
|
||||
|
||||
if size > 0 {
|
||||
bridge.onProgress = func(read int, _ []byte) {
|
||||
if read == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
total += read
|
||||
percent := float64(total) / float64(size) * 100.0
|
||||
// The last block could be a bit over the size due to padding
|
||||
// on the chunk size.
|
||||
if percent > 100.0 {
|
||||
percent = 100.0
|
||||
}
|
||||
|
||||
options.onProgress(read, int(size), percent, nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sessionId, err := node.UploadInit(&options)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
var cSessionId = C.CString(sessionId)
|
||||
defer C.free(unsafe.Pointer(cSessionId))
|
||||
|
||||
if C.cGoCodexUploadFile(node.ctx, cSessionId, bridge.resp) != C.RET_OK {
|
||||
return "", bridge.callError("cGoCodexUploadFile")
|
||||
}
|
||||
|
||||
return bridge.wait()
|
||||
}
|
||||
|
||||
// UploadFileAsync is the asynchronous version of UploadFile using a goroutine.
|
||||
func (node CodexNode) UploadFileAsync(options CodexUploadOptions, onDone func(cid string, err error)) {
|
||||
go func() {
|
||||
cid, err := node.UploadFile(options)
|
||||
onDone(cid, err)
|
||||
}()
|
||||
}
|
||||
127
codex/upload_test.go
Normal file
127
codex/upload_test.go
Normal file
@ -0,0 +1,127 @@
|
||||
package codex
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"log"
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
const expectedCID = "zDvZRwzmAkhzDRPH5EW242gJBNZ2T7aoH2v1fVH66FxXL4kSbvyM"
|
||||
|
||||
func TestUploadReader(t *testing.T) {
|
||||
start := true
|
||||
codex := newCodexNode(t, start)
|
||||
totalBytes := 0
|
||||
finalPercent := 0.0
|
||||
|
||||
buf := bytes.NewBuffer([]byte("Hello World!"))
|
||||
len := buf.Len()
|
||||
cid, err := codex.UploadReader(CodexUploadOptions{filepath: "hello.txt", onProgress: func(read, total int, percent float64, err error) {
|
||||
if err != nil {
|
||||
log.Fatalf("Error happened during upload: %v\n", err)
|
||||
}
|
||||
|
||||
totalBytes = total
|
||||
finalPercent = percent
|
||||
}}, buf)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("UploadReader failed: %v", err)
|
||||
}
|
||||
|
||||
if cid != expectedCID {
|
||||
t.Fatalf("UploadReader returned %s but expected %s", cid, expectedCID)
|
||||
}
|
||||
|
||||
if totalBytes != len {
|
||||
t.Fatalf("UploadReader progress callback read %d bytes but expected %d", totalBytes, len)
|
||||
}
|
||||
|
||||
if finalPercent != 100.0 {
|
||||
t.Fatalf("UploadReader progress callback final percent %.2f but expected 100.0", finalPercent)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUploadFile(t *testing.T) {
|
||||
start := true
|
||||
codex := newCodexNode(t, start)
|
||||
totalBytes := 0
|
||||
finalPercent := 0.0
|
||||
|
||||
stat, err := os.Stat("./testdata/hello.txt")
|
||||
if err != nil {
|
||||
log.Fatalf("Error happened during file stat: %v\n", err)
|
||||
}
|
||||
|
||||
options := CodexUploadOptions{filepath: "./testdata/hello.txt", onProgress: func(read, total int, percent float64, err error) {
|
||||
if err != nil {
|
||||
log.Fatalf("Error happened during upload: %v\n", err)
|
||||
}
|
||||
|
||||
totalBytes = total
|
||||
finalPercent = percent
|
||||
}}
|
||||
|
||||
cid, err := codex.UploadFile(options)
|
||||
if err != nil {
|
||||
t.Fatalf("UploadReader failed: %v", err)
|
||||
}
|
||||
|
||||
if cid != expectedCID {
|
||||
t.Fatalf("UploadReader returned %s but expected %s", cid, expectedCID)
|
||||
}
|
||||
|
||||
if totalBytes != int(stat.Size()) {
|
||||
t.Fatalf("UploadReader progress callback read %d bytes but expected %d", totalBytes, int(stat.Size()))
|
||||
}
|
||||
|
||||
if finalPercent != 100.0 {
|
||||
t.Fatalf("UploadReader progress callback final percent %.2f but expected 100.0", finalPercent)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUploadFileNoProgress(t *testing.T) {
|
||||
start := true
|
||||
codex := newCodexNode(t, start)
|
||||
|
||||
options := CodexUploadOptions{filepath: "./testdata/doesnt_exist.txt"}
|
||||
|
||||
cid, err := codex.UploadFile(options)
|
||||
if err == nil {
|
||||
t.Fatalf("UploadReader should have failed")
|
||||
}
|
||||
|
||||
if cid != "" {
|
||||
t.Fatalf("Cid should be empty but got %s", cid)
|
||||
}
|
||||
}
|
||||
|
||||
func TestManualUpload(t *testing.T) {
|
||||
start := true
|
||||
codex := newCodexNode(t, start)
|
||||
|
||||
sessionId, err := codex.UploadInit(&CodexUploadOptions{filepath: "hello.txt"})
|
||||
if err != nil {
|
||||
log.Fatal("Error happened:", err.Error())
|
||||
}
|
||||
|
||||
err = codex.UploadChunk(sessionId, []byte("Hello "))
|
||||
if err != nil {
|
||||
log.Fatal("Error happened:", err.Error())
|
||||
}
|
||||
|
||||
err = codex.UploadChunk(sessionId, []byte("World!"))
|
||||
if err != nil {
|
||||
log.Fatal("Error happened:", err.Error())
|
||||
}
|
||||
|
||||
cid, err := codex.UploadFinalize(sessionId)
|
||||
if err != nil {
|
||||
log.Fatal("Error happened:", err.Error())
|
||||
}
|
||||
|
||||
if cid != expectedCID {
|
||||
t.Fatalf("UploadReader returned %s but expected %s", cid, expectedCID)
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user