diff --git a/codex/download.go b/codex/download.go index 2feee01..9e1f958 100644 --- a/codex/download.go +++ b/codex/download.go @@ -26,7 +26,9 @@ package codex */ import "C" import ( + "context" "encoding/json" + "fmt" "io" "unsafe" ) @@ -145,7 +147,7 @@ func (node CodexNode) DownloadManifest(cid string) (Manifest, error) { // If options.writer is set, the data will be written into that writer. // The options filepath and writer are not mutually exclusive, i.e you can write // in different places in a same call. -func (node CodexNode) DownloadStream(cid string, options DownloadStreamOptions) error { +func (node CodexNode) DownloadStream(ctx context.Context, cid string, options DownloadStreamOptions) error { bridge := newBridgeCtx() defer bridge.free() @@ -189,6 +191,14 @@ func (node CodexNode) DownloadStream(cid string, options DownloadStreamOptions) var cCid = C.CString(cid) defer C.free(unsafe.Pointer(cCid)) + err := node.DownloadInit(cid, DownloadInitOptions{ + ChunkSize: options.ChunkSize, + Local: options.Local, + }) + if err != nil { + return err + } + var cFilepath = C.CString(options.Filepath) defer C.free(unsafe.Pointer(cFilepath)) @@ -198,8 +208,24 @@ func (node CodexNode) DownloadStream(cid string, options DownloadStreamOptions) return bridge.callError("cGoCodexDownloadLocal") } - _, err := bridge.wait() - return err + var cancelErr error + select { + case <-ctx.Done(): + cancelErr = node.DownloadCancel(cid) + default: + // continue + } + + _, err = bridge.wait() + + if err != nil { + if cancelErr != nil { + return fmt.Errorf("upload canceled: %v, but failed to cancel upload session: %v", ctx.Err(), cancelErr) + } + return err + } + + return nil } // DownloadInit initializes the download process for a specific CID. diff --git a/codex/download_test.go b/codex/download_test.go index 764d388..c680498 100644 --- a/codex/download_test.go +++ b/codex/download_test.go @@ -1,6 +1,7 @@ package codex import ( + "context" "os" "strings" "testing" @@ -32,7 +33,7 @@ func TestDownloadStream(t *testing.T) { }, } - if err := codex.DownloadStream(cid, opt); err != nil { + if err := codex.DownloadStream(context.Background(), cid, opt); err != nil { t.Fatal("Error happened:", err.Error()) } @@ -72,7 +73,7 @@ func TestDownloadStreamWithAutosize(t *testing.T) { }, } - if err := codex.DownloadStream(cid, opt); err != nil { + if err := codex.DownloadStream(context.Background(), cid, opt); err != nil { t.Fatal("Error happened:", err.Error()) } @@ -89,11 +90,35 @@ func TestDownloadStreamWithNotExisting(t *testing.T) { codex := newCodexNode(t, withBlockRetries(1)) opt := DownloadStreamOptions{} - if err := codex.DownloadStream("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku", opt); err == nil { + if err := codex.DownloadStream(context.Background(), "bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku", opt); err == nil { t.Fatal("Error expected when downloading non-existing cid") } } +func TestDownloadStreamCancelled(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + codex := newCodexNode(t) + cid, _ := uploadBigFileHelper(t, codex) + + channelError := make(chan error, 1) + go func() { + err := codex.DownloadStream(ctx, cid, DownloadStreamOptions{Local: true}) + channelError <- err + }() + + cancel() + err := <-channelError + + if err == nil { + t.Fatal("UploadFile should have been canceled") + } + + if err.Error() != "Failed to stream file: Stream EOF!" { + t.Fatalf("UploadFile returned unexpected error: %v", err) + } +} + func TestDownloadManual(t *testing.T) { codex := newCodexNode(t) cid, _ := uploadHelper(t, codex) diff --git a/codex/testutil.go b/codex/testutil.go index 79909b2..54e2ea5 100644 --- a/codex/testutil.go +++ b/codex/testutil.go @@ -34,6 +34,7 @@ func newCodexNode(t *testing.T, opts ...codexNodeTestOption) *CodexNode { LogFormat: LogFormatNoColors, MetricsEnabled: false, BlockRetries: o.blockRetries, + LogLevel: INFO, }) if err != nil { t.Fatalf("Failed to create Codex node: %v", err) @@ -44,6 +45,11 @@ func newCodexNode(t *testing.T, opts ...codexNodeTestOption) *CodexNode { if err != nil { t.Fatalf("Failed to start Codex node: %v", err) } + + err := node.UpdateLogLevel("INFO") + if err != nil { + t.Fatalf("Failed to set log level: %v", err) + } } t.Cleanup(func() { @@ -73,3 +79,17 @@ func uploadHelper(t *testing.T, codex *CodexNode) (string, int) { return cid, len } + +func uploadBigFileHelper(t *testing.T, codex *CodexNode) (string, int) { + t.Helper() + + len := 1024 * 1024 * 50 + buf := bytes.NewBuffer(make([]byte, len)) + + cid, err := codex.UploadReader(context.Background(), UploadOptions{Filepath: "hello.txt"}, buf) + if err != nil { + t.Fatalf("Error happened during upload: %v\n", err) + } + + return cid, len +}