mirror of
https://github.com/logos-storage/logos-storage-go-bindings.git
synced 2026-01-07 16:03:06 +00:00
Add upload cancellation
This commit is contained in:
parent
37cdd17ba8
commit
7218764de5
@ -2,6 +2,7 @@ package codex
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -65,7 +66,7 @@ func uploadHelper(t *testing.T, codex *CodexNode) (string, int) {
|
|||||||
|
|
||||||
buf := bytes.NewBuffer([]byte("Hello World!"))
|
buf := bytes.NewBuffer([]byte("Hello World!"))
|
||||||
len := buf.Len()
|
len := buf.Len()
|
||||||
cid, err := codex.UploadReader(UploadOptions{Filepath: "hello.txt"}, buf)
|
cid, err := codex.UploadReader(context.Background(), UploadOptions{Filepath: "hello.txt"}, buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error happened during upload: %v\n", err)
|
t.Fatalf("Error happened during upload: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -27,6 +27,8 @@ package codex
|
|||||||
import "C"
|
import "C"
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
@ -164,7 +166,7 @@ func (node CodexNode) UploadCancel(sessionId string) error {
|
|||||||
// - UploadChunk to upload a chunk to codex.
|
// - UploadChunk to upload a chunk to codex.
|
||||||
// - UploadFinalize to finalize the upload session.
|
// - UploadFinalize to finalize the upload session.
|
||||||
// - UploadCancel if an error occurs.
|
// - UploadCancel if an error occurs.
|
||||||
func (node CodexNode) UploadReader(options UploadOptions, r io.Reader) (string, error) {
|
func (node CodexNode) UploadReader(ctx context.Context, options UploadOptions, r io.Reader) (string, error) {
|
||||||
sessionId, err := node.UploadInit(&options)
|
sessionId, err := node.UploadInit(&options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
@ -179,6 +181,16 @@ func (node CodexNode) UploadReader(options UploadOptions, r io.Reader) (string,
|
|||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
if cancelErr := node.UploadCancel(sessionId); cancelErr != nil {
|
||||||
|
return "", fmt.Errorf("upload canceled: %v, but failed to cancel upload session: %v", ctx.Err(), cancelErr)
|
||||||
|
}
|
||||||
|
return "", errors.New("upload canceled")
|
||||||
|
default:
|
||||||
|
// continue
|
||||||
|
}
|
||||||
|
|
||||||
n, err := r.Read(buf)
|
n, err := r.Read(buf)
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
break
|
break
|
||||||
@ -222,9 +234,9 @@ func (node CodexNode) UploadReader(options UploadOptions, r io.Reader) (string,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// UploadReaderAsync is the asynchronous version of UploadReader using a goroutine.
|
// UploadReaderAsync is the asynchronous version of UploadReader using a goroutine.
|
||||||
func (node CodexNode) UploadReaderAsync(options UploadOptions, r io.Reader, onDone func(cid string, err error)) {
|
func (node CodexNode) UploadReaderAsync(ctx context.Context, options UploadOptions, r io.Reader, onDone func(cid string, err error)) {
|
||||||
go func() {
|
go func() {
|
||||||
cid, err := node.UploadReader(options, r)
|
cid, err := node.UploadReader(ctx, options, r)
|
||||||
onDone(cid, err)
|
onDone(cid, err)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@ -249,7 +261,7 @@ func (node CodexNode) UploadReaderAsync(options UploadOptions, r io.Reader, onDo
|
|||||||
// is sent to the stream.
|
// is sent to the stream.
|
||||||
//
|
//
|
||||||
// Internally, it calls UploadInit to create the upload session.
|
// Internally, it calls UploadInit to create the upload session.
|
||||||
func (node CodexNode) UploadFile(options UploadOptions) (string, error) {
|
func (node CodexNode) UploadFile(ctx context.Context, options UploadOptions) (string, error) {
|
||||||
bridge := newBridgeCtx()
|
bridge := newBridgeCtx()
|
||||||
defer bridge.free()
|
defer bridge.free()
|
||||||
|
|
||||||
@ -293,13 +305,29 @@ func (node CodexNode) UploadFile(options UploadOptions) (string, error) {
|
|||||||
return "", bridge.callError("cGoCodexUploadFile")
|
return "", bridge.callError("cGoCodexUploadFile")
|
||||||
}
|
}
|
||||||
|
|
||||||
return bridge.wait()
|
var cancelErr error
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
cancelErr = node.UploadCancel(sessionId)
|
||||||
|
default:
|
||||||
|
// continue
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = bridge.wait()
|
||||||
|
if err != nil {
|
||||||
|
if cancelErr != nil {
|
||||||
|
return "", fmt.Errorf("upload canceled: %v, but failed to cancel upload session: %v", ctx.Err(), cancelErr)
|
||||||
|
}
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return bridge.result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// UploadFileAsync is the asynchronous version of UploadFile using a goroutine.
|
// UploadFileAsync is the asynchronous version of UploadFile using a goroutine.
|
||||||
func (node CodexNode) UploadFileAsync(options UploadOptions, onDone func(cid string, err error)) {
|
func (node CodexNode) UploadFileAsync(ctx context.Context, options UploadOptions, onDone func(cid string, err error)) {
|
||||||
go func() {
|
go func() {
|
||||||
cid, err := node.UploadFile(options)
|
cid, err := node.UploadFile(ctx, options)
|
||||||
onDone(cid, err)
|
onDone(cid, err)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,6 +2,7 @@ package codex
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
@ -16,7 +17,7 @@ func TestUploadReader(t *testing.T) {
|
|||||||
|
|
||||||
buf := bytes.NewBuffer([]byte("Hello World!"))
|
buf := bytes.NewBuffer([]byte("Hello World!"))
|
||||||
len := buf.Len()
|
len := buf.Len()
|
||||||
cid, err := codex.UploadReader(UploadOptions{Filepath: "hello.txt", OnProgress: func(read, total int, percent float64, err error) {
|
cid, err := codex.UploadReader(context.Background(), UploadOptions{Filepath: "hello.txt", OnProgress: func(read, total int, percent float64, err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Error happened during upload: %v\n", err)
|
log.Fatalf("Error happened during upload: %v\n", err)
|
||||||
}
|
}
|
||||||
@ -42,6 +43,31 @@ func TestUploadReader(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestUploadReaderCancel(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
codex := newCodexNode(t)
|
||||||
|
buf := bytes.NewBuffer(make([]byte, 1024*1024*10))
|
||||||
|
|
||||||
|
channelErr := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
_, e := codex.UploadReader(ctx, UploadOptions{Filepath: "hello.txt"}, buf)
|
||||||
|
channelErr <- e
|
||||||
|
}()
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
err := <-channelErr
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("UploadReader should have been canceled")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err.Error() != "upload canceled" {
|
||||||
|
t.Fatalf("UploadReader returned unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestUploadFile(t *testing.T) {
|
func TestUploadFile(t *testing.T) {
|
||||||
codex := newCodexNode(t)
|
codex := newCodexNode(t)
|
||||||
totalBytes := 0
|
totalBytes := 0
|
||||||
@ -61,7 +87,7 @@ func TestUploadFile(t *testing.T) {
|
|||||||
finalPercent = percent
|
finalPercent = percent
|
||||||
}}
|
}}
|
||||||
|
|
||||||
cid, err := codex.UploadFile(options)
|
cid, err := codex.UploadFile(context.Background(), options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("UploadReader failed: %v", err)
|
t.Fatalf("UploadReader failed: %v", err)
|
||||||
}
|
}
|
||||||
@ -79,12 +105,49 @@ func TestUploadFile(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestUploadFileCancel(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// create a tmp file with large content
|
||||||
|
tmpFile, err := os.Create("./testdata/large_file.txt")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create temp file: %v", err)
|
||||||
|
}
|
||||||
|
defer os.Remove(tmpFile.Name())
|
||||||
|
|
||||||
|
largeContent := make([]byte, 1024*1024*50)
|
||||||
|
if _, err := tmpFile.Write(largeContent); err != nil {
|
||||||
|
t.Fatalf("Failed to write to temp file: %v", err)
|
||||||
|
}
|
||||||
|
tmpFile.Close()
|
||||||
|
|
||||||
|
codex := newCodexNode(t)
|
||||||
|
|
||||||
|
channelError := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
_, e := codex.UploadFile(ctx, UploadOptions{Filepath: tmpFile.Name()})
|
||||||
|
channelError <- e
|
||||||
|
}()
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
err = <-channelError
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("UploadFile should have been canceled")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err.Error() != "Failed to upload the file: Failed to stream the file: Stream Closed!" {
|
||||||
|
t.Fatalf("UploadFile returned unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestUploadFileNoProgress(t *testing.T) {
|
func TestUploadFileNoProgress(t *testing.T) {
|
||||||
codex := newCodexNode(t)
|
codex := newCodexNode(t)
|
||||||
|
|
||||||
options := UploadOptions{Filepath: "./testdata/doesnt_exist.txt"}
|
options := UploadOptions{Filepath: "./testdata/doesnt_exist.txt"}
|
||||||
|
|
||||||
cid, err := codex.UploadFile(options)
|
cid, err := codex.UploadFile(context.Background(), options)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatalf("UploadReader should have failed")
|
t.Fatalf("UploadReader should have failed")
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user