diff --git a/.gitignore b/.gitignore index dbe5894..d05311a 100644 --- a/.gitignore +++ b/.gitignore @@ -19,4 +19,7 @@ nimcache # Test files codex/testdata/hello.downloaded.txt -codex/testdata/hello.downloaded.writer.txt \ No newline at end of file +codex/testdata/hello.downloaded.writer.txt + +# Bin +codex-go \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json index c4036ba..7cbc647 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,7 +1,9 @@ { "go.toolsEnvVars": { + "CGO_ENABLED": "1", "CGO_CFLAGS": "-I${workspaceFolder}/vendor/nim-codex/library", - "CGO_LDFLAGS": "-L${workspaceFolder}/vendor/nim-codex/build -Wl,-rpath,${workspaceFolder}/vendor/nim-codex/build", + "CGO_LDFLAGS": "-L${workspaceFolder}/vendor/nim-codex/build -lcodex -Wl,-rpath,${workspaceFolder}/vendor/nim-codex/build", "LD_LIBRARY_PATH": "${workspaceFolder}/vendor/nim-codex/build:${env:LD_LIBRARY_PATH}" - } + }, + "go.testTimeout": "2m" } \ No newline at end of file diff --git a/Makefile b/Makefile index 2f1f378..d7ddf34 100644 --- a/Makefile +++ b/Makefile @@ -29,7 +29,7 @@ build: test: @echo "Running tests..." - CGO_ENABLED=1 CGO_CFLAGS="$(CGO_CFLAGS)" CGO_LDFLAGS="$(CGO_LDFLAGS)" go test ./... + CGO_ENABLED=1 CGO_CFLAGS="$(CGO_CFLAGS)" CGO_LDFLAGS="$(CGO_LDFLAGS)" GOTESTFLAGS="-timeout=2m" go test ./... clean: @echo "Cleaning up..." diff --git a/README.md b/README.md index 2cdd457..29e3c20 100644 --- a/README.md +++ b/README.md @@ -98,7 +98,7 @@ Now the module is ready for use in your project. The release process is defined [here](./RELEASE.md). -## Usage +## API ### Init @@ -171,11 +171,10 @@ buf := bytes.NewBuffer([]byte("Hello World!")) onProgress := func(read, total int, percent float64, err error) { // Do something with the data } -cid, err := codex.UploadReader(UploadOptions{filepath: "hello.txt", onProgress: onProgress}, buf) +ctx := context.Background() +cid, err := codex.UploadReader(ctx, UploadOptions{filepath: "hello.txt", onProgress: onProgress}, buf) ``` -Caveat: once started, the upload cannot be cancelled. - #### file The `file` strategy allows you to upload a file on Codex using the path. @@ -189,11 +188,10 @@ The `UploadFile` returns the cid of the content uploaded. onProgress := func(read, total int, percent float64, err error) { // Do something with the data } -cid, err := codex.UploadFile(UploadOptions{filepath: "./testdata/hello.txt", onProgress: onProgress}) +ctx := context.Background() +cid, err := codex.UploadFile(ctx, UploadOptions{filepath: "./testdata/hello.txt", onProgress: onProgress}) ``` -Caveat: once started, the upload cannot be cancelled. - #### chunks The `chunks` strategy allows you to manage the upload by yourself. It requires more code @@ -246,11 +244,10 @@ opt := DownloadStreamOptions{ // Handle progress }, } -err := codex.DownloadStream(cid, opt) +ctx := context.Background() +err := codex.DownloadStream(ctx, cid, opt) ``` -Caveat: once started, the download cannot be cancelled. - #### chunks The `chunks` strategy allows to manage the download by yourself. It requires more code @@ -310,4 +307,10 @@ peerId := "..." record, err := node.CodexPeerDebug(peerId) ``` -`CodexPeerDebug` is only available if you built with `-d:codex_enable_api_debug_peers=true` flag. \ No newline at end of file +`CodexPeerDebug` is only available if you built with `-d:codex_enable_api_debug_peers=true` flag. + +### Context and cancellation + +Go contexts are exposed only on the long-running operations as `UploadReader`, `UploadFile`, and `DownloadFile`. If the +context is cancelled, those methods cancel the active upload or download. Short lived API calls don’t take a context +because they usually finish before a cancellation signal could matter. \ No newline at end of file diff --git a/codex/codex.go b/codex/codex.go index 78e8a2b..24495b4 100644 --- a/codex/codex.go +++ b/codex/codex.go @@ -92,7 +92,7 @@ const ( type Config struct { // Default: INFO - LogLevel LogLevel `json:"log-level,omitempty"` + LogLevel string `json:"log-level,omitempty"` // Specifies what kind of logs should be written to stdout // Default: auto @@ -280,8 +280,12 @@ func (node CodexNode) Destroy() error { return bridge.callError("cGoCodexDestroy") } - _, err = bridge.wait() - return err + // We don't wait for the bridge here. + // The destroy function does not call the worker thread, + // it destroys the context directly and return the return + // value synchronously. + + return nil } // Version returns the version of the Codex node. diff --git a/codex/codex_test.go b/codex/codex_test.go index d0cb91f..deb7cb7 100644 --- a/codex/codex_test.go +++ b/codex/codex_test.go @@ -3,7 +3,11 @@ package codex import "testing" func TestCodexVersion(t *testing.T) { - node := newCodexNode(t, withNoStart()) + config := defaultConfigHelper(t) + node, err := New(config) + if err != nil { + t.Fatalf("Failed to create Codex node: %v", err) + } version, err := node.Version() if err != nil { @@ -17,7 +21,11 @@ func TestCodexVersion(t *testing.T) { } func TestCodexRevision(t *testing.T) { - node := newCodexNode(t, withNoStart()) + config := defaultConfigHelper(t) + node, err := New(config) + if err != nil { + t.Fatalf("Failed to create Codex node: %v", err) + } revision, err := node.Revision() if err != nil { diff --git a/codex/debug_test.go b/codex/debug_test.go index a3be06c..b0cc52d 100644 --- a/codex/debug_test.go +++ b/codex/debug_test.go @@ -32,38 +32,19 @@ func TestUpdateLogLevel(t *testing.T) { } defer os.Remove(tmpFile.Name()) - node, err := New(Config{ - LogFile: tmpFile.Name(), - MetricsEnabled: false, + node := newCodexNode(t, Config{ + LogLevel: "INFO", + LogFile: tmpFile.Name(), + LogFormat: LogFormatNoColors, }) - if err != nil { - t.Fatalf("Failed to create Codex node: %v", err) - } - - t.Cleanup(func() { - if err := node.Stop(); err != nil { - t.Logf("cleanup codex: %v", err) - } - - if err := node.Destroy(); err != nil { - t.Logf("cleanup codex: %v", err) - } - }) - - if err := node.Start(); err != nil { - t.Fatalf("Failed to start Codex node: %v", err) - } content, err := os.ReadFile(tmpFile.Name()) + if err != nil { t.Fatalf("Failed to read log file: %v", err) } - if !strings.Contains(string(content), "Started codex node") { - t.Errorf("Log file does not contain 'Started codex node' %s", string(content)) - } - - if err := node.Stop(); err != nil { - t.Fatalf("Failed to stop Codex node: %v", err) + if !strings.Contains(string(content), "INF") { + t.Errorf("Log file does not contain INFO statement %s", string(content)) } err = node.UpdateLogLevel("ERROR") @@ -71,6 +52,11 @@ func TestUpdateLogLevel(t *testing.T) { t.Fatalf("UpdateLogLevel call failed: %v", err) } + if err := node.Stop(); err != nil { + t.Fatalf("Failed to stop Codex node: %v", err) + } + + // Clear the file if err := os.WriteFile(tmpFile.Name(), []byte{}, 0644); err != nil { t.Fatalf("Failed to clear log file: %v", err) } @@ -85,8 +71,8 @@ func TestUpdateLogLevel(t *testing.T) { t.Fatalf("Failed to read log file: %v", err) } - if strings.Contains(string(content), "Starting discovery node") { - t.Errorf("Log file contains 'Starting discovery node'") + if strings.Contains(string(content), "INF") { + t.Errorf("Log file contains INFO statement after log level update: %s", string(content)) } } @@ -94,50 +80,10 @@ func TestCodexPeerDebug(t *testing.T) { var bootstrap, node1, node2 *CodexNode var err error - t.Cleanup(func() { - if bootstrap != nil { - if err := bootstrap.Stop(); err != nil { - t.Logf("cleanup bootstrap: %v", err) - } - - if err := bootstrap.Destroy(); err != nil { - t.Logf("cleanup bootstrap: %v", err) - } - } - if node1 != nil { - if err := node1.Stop(); err != nil { - t.Logf("cleanup node1: %v", err) - } - - if err := node1.Destroy(); err != nil { - t.Logf("cleanup node1: %v", err) - } - } - if node2 != nil { - if err := node2.Stop(); err != nil { - t.Logf("cleanup node2: %v", err) - } - - if err := node2.Destroy(); err != nil { - t.Logf("cleanup node2: %v", err) - } - } + bootstrap = newCodexNode(t, Config{ + DiscoveryPort: 8092, }) - bootstrap, err = New(Config{ - DataDir: t.TempDir(), - LogFormat: LogFormatNoColors, - MetricsEnabled: false, - DiscoveryPort: 8092, - }) - if err != nil { - t.Fatalf("Failed to create bootstrap: %v", err) - } - - if err := bootstrap.Start(); err != nil { - t.Fatalf("Failed to start bootstrap: %v", err) - } - spr, err := bootstrap.Spr() if err != nil { t.Fatalf("Failed to get bootstrap spr: %v", err) @@ -145,35 +91,15 @@ func TestCodexPeerDebug(t *testing.T) { bootstrapNodes := []string{spr} - node1, err = New(Config{ - DataDir: t.TempDir(), - LogFormat: LogFormatNoColors, - MetricsEnabled: false, + node1 = newCodexNode(t, Config{ DiscoveryPort: 8090, BootstrapNodes: bootstrapNodes, }) - if err != nil { - t.Fatalf("Failed to create codex: %v", err) - } - if err := node1.Start(); err != nil { - t.Fatalf("Failed to start codex: %v", err) - } - - node2, err = New(Config{ - DataDir: t.TempDir(), - LogFormat: LogFormatNoColors, - MetricsEnabled: false, + node2 = newCodexNode(t, Config{ DiscoveryPort: 8091, BootstrapNodes: bootstrapNodes, }) - if err != nil { - t.Fatalf("Failed to create codex2: %v", err) - } - - if err := node2.Start(); err != nil { - t.Fatalf("Failed to start codex2: %v", err) - } peerId, err := node2.PeerId() if err != nil { @@ -186,9 +112,14 @@ func TestCodexPeerDebug(t *testing.T) { if err == nil { break } + time.Sleep(1 * time.Second) } + if err != nil { + t.Fatalf("CodexPeerDebug call failed: %v", err) + } + if record.PeerId == "" { t.Fatalf("CodexPeerDebug call failed: %v", err) } diff --git a/codex/download.go b/codex/download.go index 2feee01..5794849 100644 --- a/codex/download.go +++ b/codex/download.go @@ -26,7 +26,9 @@ package codex */ import "C" import ( + "context" "encoding/json" + "fmt" "io" "unsafe" ) @@ -145,7 +147,7 @@ func (node CodexNode) DownloadManifest(cid string) (Manifest, error) { // If options.writer is set, the data will be written into that writer. // The options filepath and writer are not mutually exclusive, i.e you can write // in different places in a same call. -func (node CodexNode) DownloadStream(cid string, options DownloadStreamOptions) error { +func (node CodexNode) DownloadStream(ctx context.Context, cid string, options DownloadStreamOptions) error { bridge := newBridgeCtx() defer bridge.free() @@ -189,6 +191,16 @@ func (node CodexNode) DownloadStream(cid string, options DownloadStreamOptions) var cCid = C.CString(cid) defer C.free(unsafe.Pointer(cCid)) + err := node.DownloadInit(cid, DownloadInitOptions{ + ChunkSize: options.ChunkSize, + Local: options.Local, + }) + if err != nil { + return err + } + + defer node.DownloadCancel(cid) + var cFilepath = C.CString(options.Filepath) defer C.free(unsafe.Pointer(cFilepath)) @@ -198,8 +210,39 @@ func (node CodexNode) DownloadStream(cid string, options DownloadStreamOptions) return bridge.callError("cGoCodexDownloadLocal") } - _, err := bridge.wait() - return err + // Create a done channel to signal the goroutine to stop + // when the download is complete and avoid goroutine leaks. + done := make(chan struct{}) + defer close(done) + + channelError := make(chan error, 1) + go func() { + select { + case <-ctx.Done(): + channelError <- node.DownloadCancel(cid) + case <-done: + // Nothing to do, download finished + } + }() + + _, err = bridge.wait() + + // Extract the potential cancellation error + var cancelError error + select { + case cancelError = <-channelError: + default: + } + + if err != nil { + if cancelError != nil { + return fmt.Errorf("download canceled: %v, but failed to cancel download session: %v", ctx.Err(), cancelError) + } + + return err + } + + return cancelError } // DownloadInit initializes the download process for a specific CID. diff --git a/codex/download_test.go b/codex/download_test.go index 764d388..edfc2a5 100644 --- a/codex/download_test.go +++ b/codex/download_test.go @@ -1,6 +1,7 @@ package codex import ( + "context" "os" "strings" "testing" @@ -32,7 +33,7 @@ func TestDownloadStream(t *testing.T) { }, } - if err := codex.DownloadStream(cid, opt); err != nil { + if err := codex.DownloadStream(context.Background(), cid, opt); err != nil { t.Fatal("Error happened:", err.Error()) } @@ -72,7 +73,7 @@ func TestDownloadStreamWithAutosize(t *testing.T) { }, } - if err := codex.DownloadStream(cid, opt); err != nil { + if err := codex.DownloadStream(context.Background(), cid, opt); err != nil { t.Fatal("Error happened:", err.Error()) } @@ -86,14 +87,38 @@ func TestDownloadStreamWithAutosize(t *testing.T) { } func TestDownloadStreamWithNotExisting(t *testing.T) { - codex := newCodexNode(t, withBlockRetries(1)) + codex := newCodexNode(t, Config{BlockRetries: 1}) opt := DownloadStreamOptions{} - if err := codex.DownloadStream("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku", opt); err == nil { + if err := codex.DownloadStream(context.Background(), "bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku", opt); err == nil { t.Fatal("Error expected when downloading non-existing cid") } } +func TestDownloadStreamCancelled(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + codex := newCodexNode(t) + cid, _ := uploadBigFileHelper(t, codex) + + channelError := make(chan error, 1) + go func() { + err := codex.DownloadStream(ctx, cid, DownloadStreamOptions{Local: true}) + channelError <- err + }() + + cancel() + err := <-channelError + + if err == nil { + t.Fatal("UploadFile should have been canceled") + } + + if err.Error() != "Failed to stream file: Stream EOF!" { + t.Fatalf("UploadFile returned unexpected error: %v", err) + } +} + func TestDownloadManual(t *testing.T) { codex := newCodexNode(t) cid, _ := uploadHelper(t, codex) @@ -134,7 +159,7 @@ func TestDownloadManifest(t *testing.T) { } func TestDownloadManifestWithNotExistingCid(t *testing.T) { - codex := newCodexNode(t, withBlockRetries(1)) + codex := newCodexNode(t, Config{BlockRetries: 1}) manifest, err := codex.DownloadManifest("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku") if err == nil { @@ -147,7 +172,7 @@ func TestDownloadManifestWithNotExistingCid(t *testing.T) { } func TestDownloadInitWithNotExistingCid(t *testing.T) { - codex := newCodexNode(t, withBlockRetries(1)) + codex := newCodexNode(t, Config{BlockRetries: 1}) if err := codex.DownloadInit("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku", DownloadInitOptions{}); err == nil { t.Fatal("expected error when initializing download for non-existent cid") diff --git a/codex/p2p_test.go b/codex/p2p_test.go index f7d0e64..44106a9 100644 --- a/codex/p2p_test.go +++ b/codex/p2p_test.go @@ -73,50 +73,10 @@ func TestCodexWithPeerId(t *testing.T) { var bootstrap, node1, node2 *CodexNode var err error - t.Cleanup(func() { - if bootstrap != nil { - if err := bootstrap.Stop(); err != nil { - t.Logf("cleanup bootstrap: %v", err) - } - - if err := bootstrap.Destroy(); err != nil { - t.Logf("cleanup bootstrap: %v", err) - } - } - if node1 != nil { - if err := node1.Stop(); err != nil { - t.Logf("cleanup node1: %v", err) - } - - if err := node1.Destroy(); err != nil { - t.Logf("cleanup node1: %v", err) - } - } - if node2 != nil { - if err := node2.Stop(); err != nil { - t.Logf("cleanup node2: %v", err) - } - - if err := node2.Destroy(); err != nil { - t.Logf("cleanup node2: %v", err) - } - } + bootstrap = newCodexNode(t, Config{ + DiscoveryPort: 8092, }) - bootstrap, err = New(Config{ - DataDir: t.TempDir(), - LogFormat: LogFormatNoColors, - MetricsEnabled: false, - DiscoveryPort: 8092, - }) - if err != nil { - t.Fatalf("Failed to create bootstrap: %v", err) - } - - if err := bootstrap.Start(); err != nil { - t.Fatalf("Failed to start bootstrap: %v", err) - } - spr, err := bootstrap.Spr() if err != nil { t.Fatalf("Failed to get bootstrap spr: %v", err) @@ -124,35 +84,15 @@ func TestCodexWithPeerId(t *testing.T) { bootstrapNodes := []string{spr} - node1, err = New(Config{ - DataDir: t.TempDir(), - LogFormat: LogFormatNoColors, - MetricsEnabled: false, + node1 = newCodexNode(t, Config{ DiscoveryPort: 8090, BootstrapNodes: bootstrapNodes, }) - if err != nil { - t.Fatalf("Failed to create codex: %v", err) - } - if err := node1.Start(); err != nil { - t.Fatalf("Failed to start codex: %v", err) - } - - node2, err = New(Config{ - DataDir: t.TempDir(), - LogFormat: LogFormatNoColors, - MetricsEnabled: false, + node2 = newCodexNode(t, Config{ DiscoveryPort: 8091, BootstrapNodes: bootstrapNodes, }) - if err != nil { - t.Fatalf("Failed to create codex2: %v", err) - } - - if err := node2.Start(); err != nil { - t.Fatalf("Failed to start codex2: %v", err) - } peerId, err := node2.PeerId() if err != nil { diff --git a/codex/storage_test.go b/codex/storage_test.go index f002bb2..bca6381 100644 --- a/codex/storage_test.go +++ b/codex/storage_test.go @@ -84,7 +84,7 @@ func TestFetch(t *testing.T) { } func TestFetchCidDoesNotExist(t *testing.T) { - codex := newCodexNode(t, withBlockRetries(1)) + codex := newCodexNode(t, Config{BlockRetries: 1}) _, err := codex.Fetch("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku") if err == nil { diff --git a/codex/testutil.go b/codex/testutil.go index d98ddc5..274495b 100644 --- a/codex/testutil.go +++ b/codex/testutil.go @@ -2,54 +2,62 @@ package codex import ( "bytes" + "context" "testing" ) -type codexNodeTestOption func(*codexNodeTestOptions) +func defaultConfigHelper(t *testing.T) Config { + t.Helper() -type codexNodeTestOptions struct { - noStart bool - blockRetries int -} - -func withNoStart() codexNodeTestOption { - return func(o *codexNodeTestOptions) { o.noStart = true } -} - -func withBlockRetries(n int) codexNodeTestOption { - return func(o *codexNodeTestOptions) { o.blockRetries = n } -} - -func newCodexNode(t *testing.T, opts ...codexNodeTestOption) *CodexNode { - o := codexNodeTestOptions{ - blockRetries: 3000, - } - for _, opt := range opts { - opt(&o) - } - - node, err := New(Config{ + return Config{ DataDir: t.TempDir(), LogFormat: LogFormatNoColors, MetricsEnabled: false, - BlockRetries: o.blockRetries, - }) + BlockRetries: 3000, + LogLevel: "ERROR", + } +} + +func newCodexNode(t *testing.T, opts ...Config) *CodexNode { + config := defaultConfigHelper(t) + + if len(opts) > 0 { + c := opts[0] + + if c.BlockRetries > 0 { + config.BlockRetries = c.BlockRetries + } + + if c.LogLevel != "" { + config.LogLevel = c.LogLevel + } + + if c.LogFile != "" { + config.LogFile = c.LogFile + } + + if len(c.BootstrapNodes) != 0 { + config.BootstrapNodes = c.BootstrapNodes + } + + if c.DiscoveryPort != 0 { + config.DiscoveryPort = c.DiscoveryPort + } + } + + node, err := New(config) if err != nil { t.Fatalf("Failed to create Codex node: %v", err) } - if !o.noStart { - err = node.Start() - if err != nil { - t.Fatalf("Failed to start Codex node: %v", err) - } + err = node.Start() + if err != nil { + t.Fatalf("Failed to start Codex node: %v", err) } t.Cleanup(func() { - if !o.noStart { - if err := node.Stop(); err != nil { - t.Logf("cleanup codex: %v", err) - } + if err := node.Stop(); err != nil { + t.Logf("cleanup codex: %v", err) } if err := node.Destroy(); err != nil { @@ -65,7 +73,21 @@ func uploadHelper(t *testing.T, codex *CodexNode) (string, int) { buf := bytes.NewBuffer([]byte("Hello World!")) len := buf.Len() - cid, err := codex.UploadReader(UploadOptions{Filepath: "hello.txt"}, buf) + cid, err := codex.UploadReader(context.Background(), UploadOptions{Filepath: "hello.txt"}, buf) + if err != nil { + t.Fatalf("Error happened during upload: %v\n", err) + } + + return cid, len +} + +func uploadBigFileHelper(t *testing.T, codex *CodexNode) (string, int) { + t.Helper() + + len := 1024 * 1024 * 50 + buf := bytes.NewBuffer(make([]byte, len)) + + cid, err := codex.UploadReader(context.Background(), UploadOptions{Filepath: "hello.txt"}, buf) if err != nil { t.Fatalf("Error happened during upload: %v\n", err) } diff --git a/codex/upload.go b/codex/upload.go index 3e0bfc6..532879d 100644 --- a/codex/upload.go +++ b/codex/upload.go @@ -27,6 +27,8 @@ package codex import "C" import ( "bytes" + "context" + "errors" "fmt" "io" "os" @@ -164,11 +166,12 @@ func (node CodexNode) UploadCancel(sessionId string) error { // - UploadChunk to upload a chunk to codex. // - UploadFinalize to finalize the upload session. // - UploadCancel if an error occurs. -func (node CodexNode) UploadReader(options UploadOptions, r io.Reader) (string, error) { +func (node CodexNode) UploadReader(ctx context.Context, options UploadOptions, r io.Reader) (string, error) { sessionId, err := node.UploadInit(&options) if err != nil { return "", err } + defer node.UploadCancel(sessionId) buf := make([]byte, options.ChunkSize.valOrDefault()) total := 0 @@ -179,6 +182,16 @@ func (node CodexNode) UploadReader(options UploadOptions, r io.Reader) (string, } for { + select { + case <-ctx.Done(): + if cancelErr := node.UploadCancel(sessionId); cancelErr != nil { + return "", fmt.Errorf("upload canceled: %v, but failed to cancel upload session: %v", ctx.Err(), cancelErr) + } + return "", errors.New("upload canceled") + default: + // continue + } + n, err := r.Read(buf) if err == io.EOF { break @@ -222,9 +235,9 @@ func (node CodexNode) UploadReader(options UploadOptions, r io.Reader) (string, } // UploadReaderAsync is the asynchronous version of UploadReader using a goroutine. -func (node CodexNode) UploadReaderAsync(options UploadOptions, r io.Reader, onDone func(cid string, err error)) { +func (node CodexNode) UploadReaderAsync(ctx context.Context, options UploadOptions, r io.Reader, onDone func(cid string, err error)) { go func() { - cid, err := node.UploadReader(options, r) + cid, err := node.UploadReader(ctx, options, r) onDone(cid, err) }() } @@ -249,7 +262,7 @@ func (node CodexNode) UploadReaderAsync(options UploadOptions, r io.Reader, onDo // is sent to the stream. // // Internally, it calls UploadInit to create the upload session. -func (node CodexNode) UploadFile(options UploadOptions) (string, error) { +func (node CodexNode) UploadFile(ctx context.Context, options UploadOptions) (string, error) { bridge := newBridgeCtx() defer bridge.free() @@ -285,6 +298,7 @@ func (node CodexNode) UploadFile(options UploadOptions) (string, error) { if err != nil { return "", err } + defer node.UploadCancel(sessionId) var cSessionId = C.CString(sessionId) defer C.free(unsafe.Pointer(cSessionId)) @@ -293,13 +307,44 @@ func (node CodexNode) UploadFile(options UploadOptions) (string, error) { return "", bridge.callError("cGoCodexUploadFile") } - return bridge.wait() + // Create a done channel to signal the goroutine to stop + // when the download is complete and avoid goroutine leaks. + done := make(chan struct{}) + defer close(done) + + channelError := make(chan error, 1) + go func() { + select { + case <-ctx.Done(): + channelError <- node.UploadCancel(sessionId) + case <-done: + // Nothing to do, upload finished + } + }() + + _, err = bridge.wait() + + // Extract the potential cancellation error + var cancelErr error + select { + case cancelErr = <-channelError: + default: + } + + if err != nil { + if cancelErr != nil { + return "", fmt.Errorf("upload canceled: %v, but failed to cancel upload session: %v", ctx.Err(), cancelErr) + } + return "", err + } + + return bridge.result, cancelErr } // UploadFileAsync is the asynchronous version of UploadFile using a goroutine. -func (node CodexNode) UploadFileAsync(options UploadOptions, onDone func(cid string, err error)) { +func (node CodexNode) UploadFileAsync(ctx context.Context, options UploadOptions, onDone func(cid string, err error)) { go func() { - cid, err := node.UploadFile(options) + cid, err := node.UploadFile(ctx, options) onDone(cid, err) }() } diff --git a/codex/upload_test.go b/codex/upload_test.go index c58397f..857064e 100644 --- a/codex/upload_test.go +++ b/codex/upload_test.go @@ -2,6 +2,7 @@ package codex import ( "bytes" + "context" "log" "os" "testing" @@ -16,7 +17,7 @@ func TestUploadReader(t *testing.T) { buf := bytes.NewBuffer([]byte("Hello World!")) len := buf.Len() - cid, err := codex.UploadReader(UploadOptions{Filepath: "hello.txt", OnProgress: func(read, total int, percent float64, err error) { + cid, err := codex.UploadReader(context.Background(), UploadOptions{Filepath: "hello.txt", OnProgress: func(read, total int, percent float64, err error) { if err != nil { log.Fatalf("Error happened during upload: %v\n", err) } @@ -42,6 +43,30 @@ func TestUploadReader(t *testing.T) { } } +func TestUploadReaderCancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + codex := newCodexNode(t) + buf := bytes.NewBuffer(make([]byte, 1024*1024*10)) + + channelErr := make(chan error, 1) + go func() { + _, e := codex.UploadReader(ctx, UploadOptions{Filepath: "hello.txt"}, buf) + channelErr <- e + }() + + cancel() + err := <-channelErr + + if err == nil { + t.Fatal("UploadReader should have been canceled") + } + + if err.Error() != "upload canceled" { + t.Fatalf("UploadReader returned unexpected error: %v", err) + } +} + func TestUploadFile(t *testing.T) { codex := newCodexNode(t) totalBytes := 0 @@ -61,7 +86,7 @@ func TestUploadFile(t *testing.T) { finalPercent = percent }} - cid, err := codex.UploadFile(options) + cid, err := codex.UploadFile(context.Background(), options) if err != nil { t.Fatalf("UploadReader failed: %v", err) } @@ -79,12 +104,47 @@ func TestUploadFile(t *testing.T) { } } +func TestUploadFileCancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + tmpFile, err := os.Create(os.TempDir() + "/large_file.txt") + if err != nil { + t.Fatalf("Failed to create temp file: %v", err) + } + defer os.Remove(tmpFile.Name()) + + largeContent := make([]byte, 1024*1024*50) + if _, err := tmpFile.Write(largeContent); err != nil { + t.Fatalf("Failed to write to temp file: %v", err) + } + tmpFile.Close() + + codex := newCodexNode(t) + + channelError := make(chan error, 1) + go func() { + _, err := codex.UploadFile(ctx, UploadOptions{Filepath: tmpFile.Name()}) + channelError <- err + }() + + cancel() + err = <-channelError + + if err == nil { + t.Fatal("UploadFile should have been canceled") + } + + if err.Error() != "Failed to upload the file: Failed to stream the file: Stream Closed!" { + t.Fatalf("UploadFile returned unexpected error: %v", err) + } +} + func TestUploadFileNoProgress(t *testing.T) { codex := newCodexNode(t) options := UploadOptions{Filepath: "./testdata/doesnt_exist.txt"} - cid, err := codex.UploadFile(options) + cid, err := codex.UploadFile(context.Background(), options) if err == nil { t.Fatalf("UploadReader should have failed") } diff --git a/vendor/nim-codex b/vendor/nim-codex index 1105b81..a86d858 160000 --- a/vendor/nim-codex +++ b/vendor/nim-codex @@ -1 +1 @@ -Subproject commit 1105b81cc1b202006ca5a16485b3cfc5331468d5 +Subproject commit a86d8586456d5eb6805b228e80e264ee736a6a90