From 7218764de54bd05bfc10cd855e55d1432f7542c1 Mon Sep 17 00:00:00 2001 From: Arnaud Date: Fri, 17 Oct 2025 08:55:01 +0200 Subject: [PATCH] Add upload cancellation --- codex/testutil.go | 3 +- codex/upload.go | 42 ++++++++++++++++++++++----- codex/upload_test.go | 69 ++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 103 insertions(+), 11 deletions(-) diff --git a/codex/testutil.go b/codex/testutil.go index d98ddc5..79909b2 100644 --- a/codex/testutil.go +++ b/codex/testutil.go @@ -2,6 +2,7 @@ package codex import ( "bytes" + "context" "testing" ) @@ -65,7 +66,7 @@ func uploadHelper(t *testing.T, codex *CodexNode) (string, int) { buf := bytes.NewBuffer([]byte("Hello World!")) len := buf.Len() - cid, err := codex.UploadReader(UploadOptions{Filepath: "hello.txt"}, buf) + cid, err := codex.UploadReader(context.Background(), UploadOptions{Filepath: "hello.txt"}, buf) if err != nil { t.Fatalf("Error happened during upload: %v\n", err) } diff --git a/codex/upload.go b/codex/upload.go index 3e0bfc6..cd71f8d 100644 --- a/codex/upload.go +++ b/codex/upload.go @@ -27,6 +27,8 @@ package codex import "C" import ( "bytes" + "context" + "errors" "fmt" "io" "os" @@ -164,7 +166,7 @@ func (node CodexNode) UploadCancel(sessionId string) error { // - UploadChunk to upload a chunk to codex. // - UploadFinalize to finalize the upload session. // - UploadCancel if an error occurs. -func (node CodexNode) UploadReader(options UploadOptions, r io.Reader) (string, error) { +func (node CodexNode) UploadReader(ctx context.Context, options UploadOptions, r io.Reader) (string, error) { sessionId, err := node.UploadInit(&options) if err != nil { return "", err @@ -179,6 +181,16 @@ func (node CodexNode) UploadReader(options UploadOptions, r io.Reader) (string, } for { + select { + case <-ctx.Done(): + if cancelErr := node.UploadCancel(sessionId); cancelErr != nil { + return "", fmt.Errorf("upload canceled: %v, but failed to cancel upload session: %v", ctx.Err(), cancelErr) + } + return "", errors.New("upload canceled") + default: + // continue + } + n, err := r.Read(buf) if err == io.EOF { break @@ -222,9 +234,9 @@ func (node CodexNode) UploadReader(options UploadOptions, r io.Reader) (string, } // UploadReaderAsync is the asynchronous version of UploadReader using a goroutine. -func (node CodexNode) UploadReaderAsync(options UploadOptions, r io.Reader, onDone func(cid string, err error)) { +func (node CodexNode) UploadReaderAsync(ctx context.Context, options UploadOptions, r io.Reader, onDone func(cid string, err error)) { go func() { - cid, err := node.UploadReader(options, r) + cid, err := node.UploadReader(ctx, options, r) onDone(cid, err) }() } @@ -249,7 +261,7 @@ func (node CodexNode) UploadReaderAsync(options UploadOptions, r io.Reader, onDo // is sent to the stream. // // Internally, it calls UploadInit to create the upload session. -func (node CodexNode) UploadFile(options UploadOptions) (string, error) { +func (node CodexNode) UploadFile(ctx context.Context, options UploadOptions) (string, error) { bridge := newBridgeCtx() defer bridge.free() @@ -293,13 +305,29 @@ func (node CodexNode) UploadFile(options UploadOptions) (string, error) { return "", bridge.callError("cGoCodexUploadFile") } - return bridge.wait() + var cancelErr error + select { + case <-ctx.Done(): + cancelErr = node.UploadCancel(sessionId) + default: + // continue + } + + _, err = bridge.wait() + if err != nil { + if cancelErr != nil { + return "", fmt.Errorf("upload canceled: %v, but failed to cancel upload session: %v", ctx.Err(), cancelErr) + } + return "", err + } + + return bridge.result, nil } // UploadFileAsync is the asynchronous version of UploadFile using a goroutine. -func (node CodexNode) UploadFileAsync(options UploadOptions, onDone func(cid string, err error)) { +func (node CodexNode) UploadFileAsync(ctx context.Context, options UploadOptions, onDone func(cid string, err error)) { go func() { - cid, err := node.UploadFile(options) + cid, err := node.UploadFile(ctx, options) onDone(cid, err) }() } diff --git a/codex/upload_test.go b/codex/upload_test.go index c58397f..687b214 100644 --- a/codex/upload_test.go +++ b/codex/upload_test.go @@ -2,6 +2,7 @@ package codex import ( "bytes" + "context" "log" "os" "testing" @@ -16,7 +17,7 @@ func TestUploadReader(t *testing.T) { buf := bytes.NewBuffer([]byte("Hello World!")) len := buf.Len() - cid, err := codex.UploadReader(UploadOptions{Filepath: "hello.txt", OnProgress: func(read, total int, percent float64, err error) { + cid, err := codex.UploadReader(context.Background(), UploadOptions{Filepath: "hello.txt", OnProgress: func(read, total int, percent float64, err error) { if err != nil { log.Fatalf("Error happened during upload: %v\n", err) } @@ -42,6 +43,31 @@ func TestUploadReader(t *testing.T) { } } +func TestUploadReaderCancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + codex := newCodexNode(t) + buf := bytes.NewBuffer(make([]byte, 1024*1024*10)) + + channelErr := make(chan error, 1) + go func() { + _, e := codex.UploadReader(ctx, UploadOptions{Filepath: "hello.txt"}, buf) + channelErr <- e + }() + + cancel() + err := <-channelErr + + if err == nil { + t.Fatal("UploadReader should have been canceled") + } + + if err.Error() != "upload canceled" { + t.Fatalf("UploadReader returned unexpected error: %v", err) + } +} + func TestUploadFile(t *testing.T) { codex := newCodexNode(t) totalBytes := 0 @@ -61,7 +87,7 @@ func TestUploadFile(t *testing.T) { finalPercent = percent }} - cid, err := codex.UploadFile(options) + cid, err := codex.UploadFile(context.Background(), options) if err != nil { t.Fatalf("UploadReader failed: %v", err) } @@ -79,12 +105,49 @@ func TestUploadFile(t *testing.T) { } } +func TestUploadFileCancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // create a tmp file with large content + tmpFile, err := os.Create("./testdata/large_file.txt") + if err != nil { + t.Fatalf("Failed to create temp file: %v", err) + } + defer os.Remove(tmpFile.Name()) + + largeContent := make([]byte, 1024*1024*50) + if _, err := tmpFile.Write(largeContent); err != nil { + t.Fatalf("Failed to write to temp file: %v", err) + } + tmpFile.Close() + + codex := newCodexNode(t) + + channelError := make(chan error, 1) + go func() { + _, e := codex.UploadFile(ctx, UploadOptions{Filepath: tmpFile.Name()}) + channelError <- e + }() + + cancel() + err = <-channelError + + if err == nil { + t.Fatal("UploadFile should have been canceled") + } + + if err.Error() != "Failed to upload the file: Failed to stream the file: Stream Closed!" { + t.Fatalf("UploadFile returned unexpected error: %v", err) + } +} + func TestUploadFileNoProgress(t *testing.T) { codex := newCodexNode(t) options := UploadOptions{Filepath: "./testdata/doesnt_exist.txt"} - cid, err := codex.UploadFile(options) + cid, err := codex.UploadFile(context.Background(), options) if err == nil { t.Fatalf("UploadReader should have failed") }