diff --git a/README.md b/README.md index ae7c92b..2002b50 100644 --- a/README.md +++ b/README.md @@ -257,7 +257,7 @@ whenever you want! When you receive a cid, you can download the `Manifest` to get information about the data: ```go -manifest, err := codex.DownloadManifest(cid) +manifest, err := codex.DownloadManifest(ctx, cid) ``` It is not mandatory for downloading the data but it is really useful. @@ -300,7 +300,7 @@ to terminate the download session. ```go cid := "..." -err := codex.DownloadInit(cid, DownloadInitOptions{}) +err := codex.DownloadInit(ctx, cid, DownloadInitOptions{}) chunk, err := codex.DownloadChunk(cid) err := codex.DownloadCancel(cid) ``` diff --git a/codex/bridge.go b/codex/bridge.go index e685d35..d1d7f01 100644 --- a/codex/bridge.go +++ b/codex/bridge.go @@ -34,10 +34,12 @@ package codex */ import "C" import ( + "context" "errors" "fmt" "runtime/cgo" "sync" + "sync/atomic" "unsafe" ) @@ -47,11 +49,12 @@ import ( // a response pointer for receiving data from the C code, // and fields for storing the result and error of the call. type bridgeCtx struct { - wg *sync.WaitGroup - h cgo.Handle - resp unsafe.Pointer - result string - err error + wg *sync.WaitGroup + h cgo.Handle + resp unsafe.Pointer + result string + err error + cancelled atomic.Bool // Callback used for receiving progress updates during upload/download. // @@ -85,6 +88,8 @@ func (b *bridgeCtx) callError(name string) error { // including the cgo.Handle and the response pointer. func (b *bridgeCtx) free() { if b.h > 0 { + // (*C.Resp)(b.resp).h = 0 + b.h.Delete() b.h = 0 } @@ -116,12 +121,14 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { } h := cgo.Handle(m.h) + if h == 0 { return } if v, ok := h.Value().(*bridgeCtx); ok { switch ret { + case C.RET_PROGRESS: if v.onProgress == nil { return @@ -136,22 +143,55 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { retMsg := C.GoStringN(msg, C.int(len)) v.result = retMsg v.err = nil - if v.wg != nil { + if !v.cancelled.Load() && v.wg != nil { v.wg.Done() } + v.free() case C.RET_ERR: retMsg := C.GoStringN(msg, C.int(len)) v.err = errors.New(retMsg) - if v.wg != nil { + if !v.cancelled.Load() && v.wg != nil { v.wg.Done() } + v.free() } } } -// wait waits for the bridge context to complete its operation. -// It returns the result and error of the operation. func (b *bridgeCtx) wait() (string, error) { b.wg.Wait() return b.result, b.err } + +// wait waits for the bridge context to complete its operation. +// It returns the result and error of the operation. +func (b *bridgeCtx) waitWithContext(ctx context.Context) (string, error) { + type result struct { + value string + err error + } + + done := make(chan result, 1) + defer close(done) + + go func() { + b.wg.Wait() + + if b.cancelled.Load() { + return + } + + done <- result{value: b.result, err: b.err} + }() + + select { + case res := <-done: + return res.value, res.err + case <-ctx.Done(): + b.cancelled.Store(true) + + b.wg.Done() + + return "", ctx.Err() + } +} diff --git a/codex/codex.go b/codex/codex.go index ab62c68..e2cc416 100644 --- a/codex/codex.go +++ b/codex/codex.go @@ -208,7 +208,6 @@ func (c ChunkSize) toSizeT() C.size_t { // with the Codex network. func New(config Config) (*CodexNode, error) { bridge := newBridgeCtx() - defer bridge.free() jsonConfig, err := json.Marshal(config) if err != nil { @@ -230,7 +229,6 @@ func New(config Config) (*CodexNode, error) { // Start starts the Codex node. func (node CodexNode) Start() error { bridge := newBridgeCtx() - defer bridge.free() if C.cGoCodexStart(node.ctx, bridge.resp) != C.RET_OK { return bridge.callError("cGoCodexStart") @@ -251,7 +249,6 @@ func (node CodexNode) StartAsync(onDone func(error)) { // Stop stops the Codex node. func (node CodexNode) Stop() error { bridge := newBridgeCtx() - defer bridge.free() if C.cGoCodexStop(node.ctx, bridge.resp) != C.RET_OK { return bridge.callError("cGoCodexStop") @@ -265,7 +262,6 @@ func (node CodexNode) Stop() error { // The node must be stopped before calling this method. func (node CodexNode) Destroy() error { bridge := newBridgeCtx() - defer bridge.free() if C.cGoCodexClose(node.ctx, bridge.resp) != C.RET_OK { return bridge.callError("cGoCodexClose") @@ -291,7 +287,6 @@ func (node CodexNode) Destroy() error { // Version returns the version of the Codex node. func (node CodexNode) Version() (string, error) { bridge := newBridgeCtx() - defer bridge.free() if C.cGoCodexVersion(node.ctx, bridge.resp) != C.RET_OK { return "", bridge.callError("cGoCodexVersion") @@ -302,7 +297,6 @@ func (node CodexNode) Version() (string, error) { func (node CodexNode) Revision() (string, error) { bridge := newBridgeCtx() - defer bridge.free() if C.cGoCodexRevision(node.ctx, bridge.resp) != C.RET_OK { return "", bridge.callError("cGoCodexRevision") @@ -314,7 +308,6 @@ func (node CodexNode) Revision() (string, error) { // Repo returns the path of the data dir folder. func (node CodexNode) Repo() (string, error) { bridge := newBridgeCtx() - defer bridge.free() if C.cGoCodexRepo(node.ctx, bridge.resp) != C.RET_OK { return "", bridge.callError("cGoCodexRepo") @@ -325,7 +318,6 @@ func (node CodexNode) Repo() (string, error) { func (node CodexNode) Spr() (string, error) { bridge := newBridgeCtx() - defer bridge.free() if C.cGoCodexSpr(node.ctx, bridge.resp) != C.RET_OK { return "", bridge.callError("cGoCodexSpr") @@ -336,7 +328,6 @@ func (node CodexNode) Spr() (string, error) { func (node CodexNode) PeerId() (string, error) { bridge := newBridgeCtx() - defer bridge.free() if C.cGoCodexPeerId(node.ctx, bridge.resp) != C.RET_OK { return "", bridge.callError("cGoCodexPeerId") diff --git a/codex/debug.go b/codex/debug.go index 8166ddf..ebb0ea9 100644 --- a/codex/debug.go +++ b/codex/debug.go @@ -59,7 +59,6 @@ func (node CodexNode) Debug() (DebugInfo, error) { var info DebugInfo bridge := newBridgeCtx() - defer bridge.free() if C.cGoCodexDebug(node.ctx, bridge.resp) != C.RET_OK { return info, bridge.callError("cGoCodexDebug") @@ -82,7 +81,6 @@ func (node CodexNode) Debug() (DebugInfo, error) { // topic, you can pass "INFO,codexlib:TRACE". func (node CodexNode) UpdateLogLevel(logLevel string) error { bridge := newBridgeCtx() - defer bridge.free() var cLogLevel = C.CString(string(logLevel)) defer C.free(unsafe.Pointer(cLogLevel)) @@ -102,7 +100,6 @@ func (node CodexNode) CodexPeerDebug(peerId string) (PeerRecord, error) { var record PeerRecord bridge := newBridgeCtx() - defer bridge.free() var cPeerId = C.CString(peerId) defer C.free(unsafe.Pointer(cPeerId)) diff --git a/codex/download.go b/codex/download.go index 73a89b6..c592bc5 100644 --- a/codex/download.go +++ b/codex/download.go @@ -30,7 +30,6 @@ import ( "encoding/json" "fmt" "io" - "sync/atomic" "unsafe" ) @@ -116,9 +115,8 @@ type Manifest struct { // DownloadManifest retrieves the Codex manifest from its cid. // The session identifier is the cid, i.e you cannot have multiple // sessions for a cid. -func (node CodexNode) DownloadManifest(cid string) (Manifest, error) { +func (node CodexNode) DownloadManifest(ctx context.Context, cid string) (Manifest, error) { bridge := newBridgeCtx() - defer bridge.free() var cCid = C.CString(cid) defer C.free(unsafe.Pointer(cCid)) @@ -127,7 +125,7 @@ func (node CodexNode) DownloadManifest(cid string) (Manifest, error) { return Manifest{}, bridge.callError("cGoCodexDownloadManifest") } - val, err := bridge.wait() + val, err := bridge.waitWithContext(ctx) if err != nil { return Manifest{}, err } @@ -150,10 +148,9 @@ func (node CodexNode) DownloadManifest(cid string) (Manifest, error) { // in different places in a same call. func (node CodexNode) DownloadStream(ctx context.Context, cid string, options DownloadStreamOptions) error { bridge := newBridgeCtx() - defer bridge.free() if options.DatasetSizeAuto { - manifest, err := node.DownloadManifest(cid) + manifest, err := node.DownloadManifest(ctx, cid) if err != nil { return err @@ -192,7 +189,7 @@ func (node CodexNode) DownloadStream(ctx context.Context, cid string, options Do var cCid = C.CString(cid) defer C.free(unsafe.Pointer(cCid)) - err := node.DownloadInit(cid, DownloadInitOptions{ + err := node.DownloadInit(ctx, cid, DownloadInitOptions{ ChunkSize: options.ChunkSize, Local: options.Local, }) @@ -211,30 +208,12 @@ func (node CodexNode) DownloadStream(ctx context.Context, cid string, options Do return bridge.callError("cGoCodexDownloadLocal") } - // Create a done channel to signal the goroutine to stop - // when the download is complete and avoid goroutine leaks. - done := make(chan struct{}) - defer close(done) - - channelError := make(chan error, 1) - var cancelled atomic.Bool - go func() { - select { - case <-ctx.Done(): - channelError <- node.DownloadCancel(cid) - cancelled.Store(true) - case <-done: - // Nothing to do, download finished - } - }() - _, err = bridge.wait() - // Extract the potential cancellation error var cancelError error - select { - case cancelError = <-channelError: - default: + + if err == context.Canceled { + cancelError = node.DownloadCancel(cid) } if err != nil { @@ -242,10 +221,6 @@ func (node CodexNode) DownloadStream(ctx context.Context, cid string, options Do return fmt.Errorf("context canceled: %v, but failed to cancel download session: %v", ctx.Err(), cancelError) } - if cancelled.Load() { - return context.Canceled - } - return err } @@ -255,9 +230,8 @@ func (node CodexNode) DownloadStream(ctx context.Context, cid string, options Do // DownloadInit initializes the download process for a specific CID. // This method should be used if you want to manage the download session // and the chunk downloads manually. -func (node CodexNode) DownloadInit(cid string, options DownloadInitOptions) error { +func (node CodexNode) DownloadInit(ctx context.Context, cid string, options DownloadInitOptions) error { bridge := newBridgeCtx() - defer bridge.free() var cCid = C.CString(cid) defer C.free(unsafe.Pointer(cCid)) @@ -268,8 +242,9 @@ func (node CodexNode) DownloadInit(cid string, options DownloadInitOptions) erro return bridge.callError("cGoCodexDownloadInit") } - _, err := bridge.wait() + _, err := bridge.waitWithContext(ctx) return err + } // DownloadChunk downloads a chunk from its cid. @@ -281,7 +256,6 @@ func (node CodexNode) DownloadInit(cid string, options DownloadInitOptions) erro // to free the resources. func (node CodexNode) DownloadChunk(cid string) ([]byte, error) { bridge := newBridgeCtx() - defer bridge.free() var bytes []byte @@ -308,7 +282,6 @@ func (node CodexNode) DownloadChunk(cid string) ([]byte, error) { // It doesn't work with DownloadStream. func (node CodexNode) DownloadCancel(cid string) error { bridge := newBridgeCtx() - defer bridge.free() var cCid = C.CString(cid) defer C.free(unsafe.Pointer(cCid)) diff --git a/codex/download_test.go b/codex/download_test.go index 66f873f..a860a1d 100644 --- a/codex/download_test.go +++ b/codex/download_test.go @@ -1,10 +1,12 @@ package codex import ( + "bytes" "context" "os" "strings" "testing" + "time" ) func TestDownloadStream(t *testing.T) { @@ -120,10 +122,11 @@ func TestDownloadStreamCancelled(t *testing.T) { } func TestDownloadManual(t *testing.T) { + ctx := context.Background() codex := newCodexNode(t) cid, _ := uploadHelper(t, codex) - if err := codex.DownloadInit(cid, DownloadInitOptions{}); err != nil { + if err := codex.DownloadInit(ctx, cid, DownloadInitOptions{}); err != nil { t.Fatal("Error when initializing download:", err) } @@ -145,10 +148,11 @@ func TestDownloadManual(t *testing.T) { } func TestDownloadManifest(t *testing.T) { + ctx := context.Background() codex := newCodexNode(t) cid, _ := uploadHelper(t, codex) - manifest, err := codex.DownloadManifest(cid) + manifest, err := codex.DownloadManifest(ctx, cid) if err != nil { t.Fatal("Error when downloading manifest:", err) } @@ -159,9 +163,10 @@ func TestDownloadManifest(t *testing.T) { } func TestDownloadManifestWithNotExistingCid(t *testing.T) { + ctx := context.Background() codex := newCodexNode(t, Config{BlockRetries: 1}) - manifest, err := codex.DownloadManifest("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku") + manifest, err := codex.DownloadManifest(ctx, "bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku") if err == nil { t.Fatal("Error when downloading manifest:", err) } @@ -172,9 +177,100 @@ func TestDownloadManifestWithNotExistingCid(t *testing.T) { } func TestDownloadInitWithNotExistingCid(t *testing.T) { + ctx := context.Background() codex := newCodexNode(t, Config{BlockRetries: 1}) - if err := codex.DownloadInit("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku", DownloadInitOptions{}); err == nil { + if err := codex.DownloadInit(ctx, "bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku", DownloadInitOptions{}); err == nil { t.Fatal("expected error when initializing download for non-existent cid") } } + +func TestDownloadWithTwoNodes(t *testing.T) { + var err error + + node1 := newCodexNode(t, Config{ + DiscoveryPort: 8100, + }) + + info1, err := node1.Debug() + if err != nil { + t.Fatal(err) + } + + spr, err := node1.Spr() + if err != nil { + t.Fatalf("Failed to get bootstrap spr: %v", err) + } + + t.Logf("spr: %s, info.spr: %s", spr, info1.Spr) + + bootstrapNodes := []string{info1.Spr} + + data := []byte("Hello World!") + cid, _ := uploadData(t, node1, data) + + node2 := newCodexNode(t, Config{ + DiscoveryPort: 8101, + BootstrapNodes: bootstrapNodes, + }) + + var buf bytes.Buffer + options := DownloadStreamOptions{ + Writer: &buf, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err = node2.DownloadStream(ctx, cid, options) + + if err != nil { + t.Fatal("Error happened during download:", err.Error()) + } + + downloadedData := buf.Bytes() + if string(downloadedData) != string(data) { + t.Fatalf("Downloaded content does not match, expected %s got %s", data, downloadedData) + } +} + +func TestCancellingContextWhenNodeCannotResolveCID(t *testing.T) { + var err error + + node1 := newCodexNode(t, Config{ + DiscoveryPort: 8100, + }) + + data := []byte("Hello World!") + cid, _ := uploadData(t, node1, data) + + // Notice - no bootstrap nodes, so node2 cannot resolve the CID + + node2 := newCodexNode(t, Config{ + DiscoveryPort: 8101, + }) + + // Set a test-specific timeout to catch if DownloadStream hangs + testTimeout := time.AfterFunc(10*time.Second, func() { + panic("Test exceeded 10 second timeout - DownloadStream likely not respecting context cancellation") + }) + defer testTimeout.Stop() + + var buf bytes.Buffer + options := DownloadStreamOptions{ + Writer: &buf, + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + err = node2.DownloadStream(ctx, cid, options) + + if err == nil { + t.Fatal("Expected cancellation error, got nil") + } + + // The error should be context.DeadlineExceeded since node2 cannot resolve the CID + if err != context.DeadlineExceeded && !strings.Contains(err.Error(), "context deadline exceeded") { + t.Fatalf("Got unexpected error): %v", err) + } +} diff --git a/codex/p2p.go b/codex/p2p.go index 62ebc92..b8465c7 100644 --- a/codex/p2p.go +++ b/codex/p2p.go @@ -21,7 +21,6 @@ import ( // eg the one specified with `ListenAddresses` in `CodexConfig`. func (node CodexNode) Connect(peerId string, peerAddresses []string) error { bridge := newBridgeCtx() - defer bridge.free() var cPeerId = C.CString(peerId) defer C.free(unsafe.Pointer(cPeerId)) diff --git a/codex/p2p_test.go b/codex/p2p_test.go index 5dc9f53..8cb5476 100644 --- a/codex/p2p_test.go +++ b/codex/p2p_test.go @@ -6,59 +6,15 @@ import ( ) func TestConnectWithAddress(t *testing.T) { - var node1, node2 *CodexNode var err error - t.Cleanup(func() { - if node1 != nil { - if err := node1.Stop(); err != nil { - t.Logf("cleanup codex1: %v", err) - } - - if err := node1.Destroy(); err != nil { - t.Logf("cleanup codex1: %v", err) - } - } - - if node2 != nil { - if err := node2.Stop(); err != nil { - t.Logf("cleanup codex2: %v", err) - } - - if err := node2.Destroy(); err != nil { - t.Logf("cleanup codex2: %v", err) - } - } + node1 := newCodexNode(t, Config{ + DiscoveryPort: 8090, }) - node1, err = New(Config{ - DataDir: t.TempDir(), - LogFormat: LogFormatNoColors, - MetricsEnabled: false, - DiscoveryPort: 8090, - Nat: "none", + node2 := newCodexNode(t, Config{ + DiscoveryPort: 8091, }) - if err != nil { - t.Fatalf("Failed to create codex1: %v", err) - } - - if err := node1.Start(); err != nil { - t.Fatalf("Failed to start codex1: %v", err) - } - - node2, err = New(Config{ - DataDir: t.TempDir(), - LogFormat: LogFormatNoColors, - MetricsEnabled: false, - DiscoveryPort: 8091, - }) - if err != nil { - t.Fatalf("Failed to create codex2: %v", err) - } - - if err := node2.Start(); err != nil { - t.Fatalf("Failed to start codex2: %v", err) - } info2, err := node2.Debug() if err != nil { diff --git a/codex/storage.go b/codex/storage.go index 8af1f5b..062c5c4 100644 --- a/codex/storage.go +++ b/codex/storage.go @@ -58,7 +58,6 @@ type Space struct { // Manifests returns the list of all manifests stored by the Codex node. func (node CodexNode) Manifests() ([]Manifest, error) { bridge := newBridgeCtx() - defer bridge.free() if C.cGoCodexStorageList(node.ctx, bridge.resp) != C.RET_OK { return nil, bridge.callError("cGoCodexStorageList") @@ -86,7 +85,6 @@ func (node CodexNode) Manifests() ([]Manifest, error) { // Fetch download a file from the network and store it to the local node. func (node CodexNode) Fetch(cid string) (Manifest, error) { bridge := newBridgeCtx() - defer bridge.free() var cCid = C.CString(cid) defer C.free(unsafe.Pointer(cCid)) @@ -115,7 +113,6 @@ func (node CodexNode) Space() (Space, error) { var space Space bridge := newBridgeCtx() - defer bridge.free() if C.cGoCodexStorageSpace(node.ctx, bridge.resp) != C.RET_OK { return space, bridge.callError("cGoCodexStorageSpace") @@ -134,7 +131,6 @@ func (node CodexNode) Space() (Space, error) { // from the local node. Does nothing if the dataset is not locally available. func (node CodexNode) Delete(cid string) error { bridge := newBridgeCtx() - defer bridge.free() var cCid = C.CString(cid) defer C.free(unsafe.Pointer(cCid)) @@ -150,7 +146,6 @@ func (node CodexNode) Delete(cid string) error { // Exists checks if a given cid exists in the local storage. func (node CodexNode) Exists(cid string) (bool, error) { bridge := newBridgeCtx() - defer bridge.free() var cCid = C.CString(cid) defer C.free(unsafe.Pointer(cCid)) diff --git a/codex/testutil.go b/codex/testutil.go index 2d297da..5e92bcd 100644 --- a/codex/testutil.go +++ b/codex/testutil.go @@ -44,6 +44,10 @@ func newCodexNode(t *testing.T, opts ...Config) *CodexNode { config.DiscoveryPort = c.DiscoveryPort } + if c.Nat != "" { + config.Nat = c.Nat + } + if c.StorageQuota != 0 { config.StorageQuota = c.StorageQuota } @@ -97,6 +101,19 @@ func uploadHelper(t *testing.T, codex *CodexNode) (string, int) { return cid, len } +func uploadData(t *testing.T, codex *CodexNode, data []byte) (string, int) { + t.Helper() + + buf := bytes.NewBuffer(data) + len := buf.Len() + cid, err := codex.UploadReader(context.Background(), UploadOptions{Filepath: "hello.txt"}, buf) + if err != nil { + t.Fatalf("Error happened during upload: %v\n", err) + } + + return cid, len +} + func uploadBigFileHelper(t *testing.T, codex *CodexNode) (string, int) { t.Helper() diff --git a/codex/upload.go b/codex/upload.go index a14b876..df12830 100644 --- a/codex/upload.go +++ b/codex/upload.go @@ -31,7 +31,6 @@ import ( "fmt" "io" "os" - "sync/atomic" "unsafe" ) @@ -84,7 +83,6 @@ func getReaderSize(r io.Reader) int64 { // You should use this function only if you need to manage the upload session manually. func (node CodexNode) UploadInit(options *UploadOptions) (string, error) { bridge := newBridgeCtx() - defer bridge.free() var cFilename = C.CString(options.Filepath) defer C.free(unsafe.Pointer(cFilename)) @@ -103,7 +101,6 @@ func (node CodexNode) UploadInit(options *UploadOptions) (string, error) { // You should use this function only if you need to manage the upload session manually. func (node CodexNode) UploadChunk(sessionId string, chunk []byte) error { bridge := newBridgeCtx() - defer bridge.free() var cSessionId = C.CString(sessionId) defer C.free(unsafe.Pointer(cSessionId)) @@ -127,7 +124,6 @@ func (node CodexNode) UploadChunk(sessionId string, chunk []byte) error { // You should use this function only if you need to manage the upload session manually. func (node CodexNode) UploadFinalize(sessionId string) (string, error) { bridge := newBridgeCtx() - defer bridge.free() var cSessionId = C.CString(sessionId) defer C.free(unsafe.Pointer(cSessionId)) @@ -144,7 +140,6 @@ func (node CodexNode) UploadFinalize(sessionId string) (string, error) { // It doesn't work with UploadFile. func (node CodexNode) UploadCancel(sessionId string) error { bridge := newBridgeCtx() - defer bridge.free() var cSessionId = C.CString(sessionId) defer C.free(unsafe.Pointer(cSessionId)) @@ -264,7 +259,6 @@ func (node CodexNode) UploadReaderAsync(ctx context.Context, options UploadOptio // Internally, it calls UploadInit to create the upload session. func (node CodexNode) UploadFile(ctx context.Context, options UploadOptions) (string, error) { bridge := newBridgeCtx() - defer bridge.free() if options.OnProgress != nil { stat, err := os.Stat(options.Filepath) @@ -307,30 +301,12 @@ func (node CodexNode) UploadFile(ctx context.Context, options UploadOptions) (st return "", bridge.callError("cGoCodexUploadFile") } - // Create a done channel to signal the goroutine to stop - // when the download is complete and avoid goroutine leaks. - done := make(chan struct{}) - defer close(done) + _, err = bridge.waitWithContext(ctx) - channelError := make(chan error, 1) - var cancelled atomic.Bool - go func() { - select { - case <-ctx.Done(): - channelError <- node.UploadCancel(sessionId) - cancelled.Store(true) - case <-done: - // Nothing to do, upload finished - } - }() - - _, err = bridge.wait() - - // Extract the potential cancellation error var cancelErr error - select { - case cancelErr = <-channelError: - default: + + if err == context.Canceled { + cancelErr = node.UploadCancel(sessionId) } if err != nil { @@ -338,10 +314,6 @@ func (node CodexNode) UploadFile(ctx context.Context, options UploadOptions) (st return "", fmt.Errorf("context canceled: %v, but failed to cancel upload session: %v", ctx.Err(), cancelErr) } - if cancelled.Load() { - return "", context.Canceled - } - return "", err } diff --git a/go.mod b/go.mod index f80bbe0..0082e7b 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ module github.com/codex-storage/codex-go-bindings -go 1.24.0 +go 1.24