Merge c38f8d722f769cc615890e9ea86bde87819ad1a3 into 59437382121afb90f291e9c24d22c57c681456aa

This commit is contained in:
Marcin Czenko 2025-12-11 12:59:34 +00:00 committed by GitHub
commit af30a9d961
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 187 additions and 151 deletions

View File

@ -257,7 +257,7 @@ whenever you want!
When you receive a cid, you can download the `Manifest` to get information about the data:
```go
manifest, err := codex.DownloadManifest(cid)
manifest, err := codex.DownloadManifest(ctx, cid)
```
It is not mandatory for downloading the data but it is really useful.
@ -300,7 +300,7 @@ to terminate the download session.
```go
cid := "..."
err := codex.DownloadInit(cid, DownloadInitOptions{})
err := codex.DownloadInit(ctx, cid, DownloadInitOptions{})
chunk, err := codex.DownloadChunk(cid)
err := codex.DownloadCancel(cid)
```

View File

@ -34,10 +34,12 @@ package codex
*/
import "C"
import (
"context"
"errors"
"fmt"
"runtime/cgo"
"sync"
"sync/atomic"
"unsafe"
)
@ -47,11 +49,12 @@ import (
// a response pointer for receiving data from the C code,
// and fields for storing the result and error of the call.
type bridgeCtx struct {
wg *sync.WaitGroup
h cgo.Handle
resp unsafe.Pointer
result string
err error
wg *sync.WaitGroup
h cgo.Handle
resp unsafe.Pointer
result string
err error
cancelled atomic.Bool
// Callback used for receiving progress updates during upload/download.
//
@ -85,6 +88,8 @@ func (b *bridgeCtx) callError(name string) error {
// including the cgo.Handle and the response pointer.
func (b *bridgeCtx) free() {
if b.h > 0 {
// (*C.Resp)(b.resp).h = 0
b.h.Delete()
b.h = 0
}
@ -116,12 +121,14 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
}
h := cgo.Handle(m.h)
if h == 0 {
return
}
if v, ok := h.Value().(*bridgeCtx); ok {
switch ret {
case C.RET_PROGRESS:
if v.onProgress == nil {
return
@ -136,22 +143,55 @@ func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
retMsg := C.GoStringN(msg, C.int(len))
v.result = retMsg
v.err = nil
if v.wg != nil {
if !v.cancelled.Load() && v.wg != nil {
v.wg.Done()
}
v.free()
case C.RET_ERR:
retMsg := C.GoStringN(msg, C.int(len))
v.err = errors.New(retMsg)
if v.wg != nil {
if !v.cancelled.Load() && v.wg != nil {
v.wg.Done()
}
v.free()
}
}
}
// wait waits for the bridge context to complete its operation.
// It returns the result and error of the operation.
func (b *bridgeCtx) wait() (string, error) {
b.wg.Wait()
return b.result, b.err
}
// wait waits for the bridge context to complete its operation.
// It returns the result and error of the operation.
func (b *bridgeCtx) waitWithContext(ctx context.Context) (string, error) {
type result struct {
value string
err error
}
done := make(chan result, 1)
defer close(done)
go func() {
b.wg.Wait()
if b.cancelled.Load() {
return
}
done <- result{value: b.result, err: b.err}
}()
select {
case res := <-done:
return res.value, res.err
case <-ctx.Done():
b.cancelled.Store(true)
b.wg.Done()
return "", ctx.Err()
}
}

View File

@ -208,7 +208,6 @@ func (c ChunkSize) toSizeT() C.size_t {
// with the Codex network.
func New(config Config) (*CodexNode, error) {
bridge := newBridgeCtx()
defer bridge.free()
jsonConfig, err := json.Marshal(config)
if err != nil {
@ -230,7 +229,6 @@ func New(config Config) (*CodexNode, error) {
// Start starts the Codex node.
func (node CodexNode) Start() error {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexStart(node.ctx, bridge.resp) != C.RET_OK {
return bridge.callError("cGoCodexStart")
@ -251,7 +249,6 @@ func (node CodexNode) StartAsync(onDone func(error)) {
// Stop stops the Codex node.
func (node CodexNode) Stop() error {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexStop(node.ctx, bridge.resp) != C.RET_OK {
return bridge.callError("cGoCodexStop")
@ -265,7 +262,6 @@ func (node CodexNode) Stop() error {
// The node must be stopped before calling this method.
func (node CodexNode) Destroy() error {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexClose(node.ctx, bridge.resp) != C.RET_OK {
return bridge.callError("cGoCodexClose")
@ -291,7 +287,6 @@ func (node CodexNode) Destroy() error {
// Version returns the version of the Codex node.
func (node CodexNode) Version() (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexVersion(node.ctx, bridge.resp) != C.RET_OK {
return "", bridge.callError("cGoCodexVersion")
@ -302,7 +297,6 @@ func (node CodexNode) Version() (string, error) {
func (node CodexNode) Revision() (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexRevision(node.ctx, bridge.resp) != C.RET_OK {
return "", bridge.callError("cGoCodexRevision")
@ -314,7 +308,6 @@ func (node CodexNode) Revision() (string, error) {
// Repo returns the path of the data dir folder.
func (node CodexNode) Repo() (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexRepo(node.ctx, bridge.resp) != C.RET_OK {
return "", bridge.callError("cGoCodexRepo")
@ -325,7 +318,6 @@ func (node CodexNode) Repo() (string, error) {
func (node CodexNode) Spr() (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexSpr(node.ctx, bridge.resp) != C.RET_OK {
return "", bridge.callError("cGoCodexSpr")
@ -336,7 +328,6 @@ func (node CodexNode) Spr() (string, error) {
func (node CodexNode) PeerId() (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexPeerId(node.ctx, bridge.resp) != C.RET_OK {
return "", bridge.callError("cGoCodexPeerId")

View File

@ -59,7 +59,6 @@ func (node CodexNode) Debug() (DebugInfo, error) {
var info DebugInfo
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexDebug(node.ctx, bridge.resp) != C.RET_OK {
return info, bridge.callError("cGoCodexDebug")
@ -82,7 +81,6 @@ func (node CodexNode) Debug() (DebugInfo, error) {
// topic, you can pass "INFO,codexlib:TRACE".
func (node CodexNode) UpdateLogLevel(logLevel string) error {
bridge := newBridgeCtx()
defer bridge.free()
var cLogLevel = C.CString(string(logLevel))
defer C.free(unsafe.Pointer(cLogLevel))
@ -102,7 +100,6 @@ func (node CodexNode) CodexPeerDebug(peerId string) (PeerRecord, error) {
var record PeerRecord
bridge := newBridgeCtx()
defer bridge.free()
var cPeerId = C.CString(peerId)
defer C.free(unsafe.Pointer(cPeerId))

View File

@ -30,7 +30,6 @@ import (
"encoding/json"
"fmt"
"io"
"sync/atomic"
"unsafe"
)
@ -116,9 +115,8 @@ type Manifest struct {
// DownloadManifest retrieves the Codex manifest from its cid.
// The session identifier is the cid, i.e you cannot have multiple
// sessions for a cid.
func (node CodexNode) DownloadManifest(cid string) (Manifest, error) {
func (node CodexNode) DownloadManifest(ctx context.Context, cid string) (Manifest, error) {
bridge := newBridgeCtx()
defer bridge.free()
var cCid = C.CString(cid)
defer C.free(unsafe.Pointer(cCid))
@ -127,7 +125,7 @@ func (node CodexNode) DownloadManifest(cid string) (Manifest, error) {
return Manifest{}, bridge.callError("cGoCodexDownloadManifest")
}
val, err := bridge.wait()
val, err := bridge.waitWithContext(ctx)
if err != nil {
return Manifest{}, err
}
@ -150,10 +148,9 @@ func (node CodexNode) DownloadManifest(cid string) (Manifest, error) {
// in different places in a same call.
func (node CodexNode) DownloadStream(ctx context.Context, cid string, options DownloadStreamOptions) error {
bridge := newBridgeCtx()
defer bridge.free()
if options.DatasetSizeAuto {
manifest, err := node.DownloadManifest(cid)
manifest, err := node.DownloadManifest(ctx, cid)
if err != nil {
return err
@ -192,7 +189,7 @@ func (node CodexNode) DownloadStream(ctx context.Context, cid string, options Do
var cCid = C.CString(cid)
defer C.free(unsafe.Pointer(cCid))
err := node.DownloadInit(cid, DownloadInitOptions{
err := node.DownloadInit(ctx, cid, DownloadInitOptions{
ChunkSize: options.ChunkSize,
Local: options.Local,
})
@ -211,30 +208,12 @@ func (node CodexNode) DownloadStream(ctx context.Context, cid string, options Do
return bridge.callError("cGoCodexDownloadLocal")
}
// Create a done channel to signal the goroutine to stop
// when the download is complete and avoid goroutine leaks.
done := make(chan struct{})
defer close(done)
channelError := make(chan error, 1)
var cancelled atomic.Bool
go func() {
select {
case <-ctx.Done():
channelError <- node.DownloadCancel(cid)
cancelled.Store(true)
case <-done:
// Nothing to do, download finished
}
}()
_, err = bridge.wait()
// Extract the potential cancellation error
var cancelError error
select {
case cancelError = <-channelError:
default:
if err == context.Canceled {
cancelError = node.DownloadCancel(cid)
}
if err != nil {
@ -242,10 +221,6 @@ func (node CodexNode) DownloadStream(ctx context.Context, cid string, options Do
return fmt.Errorf("context canceled: %v, but failed to cancel download session: %v", ctx.Err(), cancelError)
}
if cancelled.Load() {
return context.Canceled
}
return err
}
@ -255,9 +230,8 @@ func (node CodexNode) DownloadStream(ctx context.Context, cid string, options Do
// DownloadInit initializes the download process for a specific CID.
// This method should be used if you want to manage the download session
// and the chunk downloads manually.
func (node CodexNode) DownloadInit(cid string, options DownloadInitOptions) error {
func (node CodexNode) DownloadInit(ctx context.Context, cid string, options DownloadInitOptions) error {
bridge := newBridgeCtx()
defer bridge.free()
var cCid = C.CString(cid)
defer C.free(unsafe.Pointer(cCid))
@ -268,8 +242,9 @@ func (node CodexNode) DownloadInit(cid string, options DownloadInitOptions) erro
return bridge.callError("cGoCodexDownloadInit")
}
_, err := bridge.wait()
_, err := bridge.waitWithContext(ctx)
return err
}
// DownloadChunk downloads a chunk from its cid.
@ -281,7 +256,6 @@ func (node CodexNode) DownloadInit(cid string, options DownloadInitOptions) erro
// to free the resources.
func (node CodexNode) DownloadChunk(cid string) ([]byte, error) {
bridge := newBridgeCtx()
defer bridge.free()
var bytes []byte
@ -308,7 +282,6 @@ func (node CodexNode) DownloadChunk(cid string) ([]byte, error) {
// It doesn't work with DownloadStream.
func (node CodexNode) DownloadCancel(cid string) error {
bridge := newBridgeCtx()
defer bridge.free()
var cCid = C.CString(cid)
defer C.free(unsafe.Pointer(cCid))

View File

@ -1,10 +1,12 @@
package codex
import (
"bytes"
"context"
"os"
"strings"
"testing"
"time"
)
func TestDownloadStream(t *testing.T) {
@ -120,10 +122,11 @@ func TestDownloadStreamCancelled(t *testing.T) {
}
func TestDownloadManual(t *testing.T) {
ctx := context.Background()
codex := newCodexNode(t)
cid, _ := uploadHelper(t, codex)
if err := codex.DownloadInit(cid, DownloadInitOptions{}); err != nil {
if err := codex.DownloadInit(ctx, cid, DownloadInitOptions{}); err != nil {
t.Fatal("Error when initializing download:", err)
}
@ -145,10 +148,11 @@ func TestDownloadManual(t *testing.T) {
}
func TestDownloadManifest(t *testing.T) {
ctx := context.Background()
codex := newCodexNode(t)
cid, _ := uploadHelper(t, codex)
manifest, err := codex.DownloadManifest(cid)
manifest, err := codex.DownloadManifest(ctx, cid)
if err != nil {
t.Fatal("Error when downloading manifest:", err)
}
@ -159,9 +163,10 @@ func TestDownloadManifest(t *testing.T) {
}
func TestDownloadManifestWithNotExistingCid(t *testing.T) {
ctx := context.Background()
codex := newCodexNode(t, Config{BlockRetries: 1})
manifest, err := codex.DownloadManifest("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku")
manifest, err := codex.DownloadManifest(ctx, "bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku")
if err == nil {
t.Fatal("Error when downloading manifest:", err)
}
@ -172,9 +177,100 @@ func TestDownloadManifestWithNotExistingCid(t *testing.T) {
}
func TestDownloadInitWithNotExistingCid(t *testing.T) {
ctx := context.Background()
codex := newCodexNode(t, Config{BlockRetries: 1})
if err := codex.DownloadInit("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku", DownloadInitOptions{}); err == nil {
if err := codex.DownloadInit(ctx, "bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku", DownloadInitOptions{}); err == nil {
t.Fatal("expected error when initializing download for non-existent cid")
}
}
func TestDownloadWithTwoNodes(t *testing.T) {
var err error
node1 := newCodexNode(t, Config{
DiscoveryPort: 8100,
})
info1, err := node1.Debug()
if err != nil {
t.Fatal(err)
}
spr, err := node1.Spr()
if err != nil {
t.Fatalf("Failed to get bootstrap spr: %v", err)
}
t.Logf("spr: %s, info.spr: %s", spr, info1.Spr)
bootstrapNodes := []string{info1.Spr}
data := []byte("Hello World!")
cid, _ := uploadData(t, node1, data)
node2 := newCodexNode(t, Config{
DiscoveryPort: 8101,
BootstrapNodes: bootstrapNodes,
})
var buf bytes.Buffer
options := DownloadStreamOptions{
Writer: &buf,
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = node2.DownloadStream(ctx, cid, options)
if err != nil {
t.Fatal("Error happened during download:", err.Error())
}
downloadedData := buf.Bytes()
if string(downloadedData) != string(data) {
t.Fatalf("Downloaded content does not match, expected %s got %s", data, downloadedData)
}
}
func TestCancellingContextWhenNodeCannotResolveCID(t *testing.T) {
var err error
node1 := newCodexNode(t, Config{
DiscoveryPort: 8100,
})
data := []byte("Hello World!")
cid, _ := uploadData(t, node1, data)
// Notice - no bootstrap nodes, so node2 cannot resolve the CID
node2 := newCodexNode(t, Config{
DiscoveryPort: 8101,
})
// Set a test-specific timeout to catch if DownloadStream hangs
testTimeout := time.AfterFunc(10*time.Second, func() {
panic("Test exceeded 10 second timeout - DownloadStream likely not respecting context cancellation")
})
defer testTimeout.Stop()
var buf bytes.Buffer
options := DownloadStreamOptions{
Writer: &buf,
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
err = node2.DownloadStream(ctx, cid, options)
if err == nil {
t.Fatal("Expected cancellation error, got nil")
}
// The error should be context.DeadlineExceeded since node2 cannot resolve the CID
if err != context.DeadlineExceeded && !strings.Contains(err.Error(), "context deadline exceeded") {
t.Fatalf("Got unexpected error): %v", err)
}
}

View File

@ -21,7 +21,6 @@ import (
// eg the one specified with `ListenAddresses` in `CodexConfig`.
func (node CodexNode) Connect(peerId string, peerAddresses []string) error {
bridge := newBridgeCtx()
defer bridge.free()
var cPeerId = C.CString(peerId)
defer C.free(unsafe.Pointer(cPeerId))

View File

@ -6,59 +6,15 @@ import (
)
func TestConnectWithAddress(t *testing.T) {
var node1, node2 *CodexNode
var err error
t.Cleanup(func() {
if node1 != nil {
if err := node1.Stop(); err != nil {
t.Logf("cleanup codex1: %v", err)
}
if err := node1.Destroy(); err != nil {
t.Logf("cleanup codex1: %v", err)
}
}
if node2 != nil {
if err := node2.Stop(); err != nil {
t.Logf("cleanup codex2: %v", err)
}
if err := node2.Destroy(); err != nil {
t.Logf("cleanup codex2: %v", err)
}
}
node1 := newCodexNode(t, Config{
DiscoveryPort: 8090,
})
node1, err = New(Config{
DataDir: t.TempDir(),
LogFormat: LogFormatNoColors,
MetricsEnabled: false,
DiscoveryPort: 8090,
Nat: "none",
node2 := newCodexNode(t, Config{
DiscoveryPort: 8091,
})
if err != nil {
t.Fatalf("Failed to create codex1: %v", err)
}
if err := node1.Start(); err != nil {
t.Fatalf("Failed to start codex1: %v", err)
}
node2, err = New(Config{
DataDir: t.TempDir(),
LogFormat: LogFormatNoColors,
MetricsEnabled: false,
DiscoveryPort: 8091,
})
if err != nil {
t.Fatalf("Failed to create codex2: %v", err)
}
if err := node2.Start(); err != nil {
t.Fatalf("Failed to start codex2: %v", err)
}
info2, err := node2.Debug()
if err != nil {

View File

@ -58,7 +58,6 @@ type Space struct {
// Manifests returns the list of all manifests stored by the Codex node.
func (node CodexNode) Manifests() ([]Manifest, error) {
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexStorageList(node.ctx, bridge.resp) != C.RET_OK {
return nil, bridge.callError("cGoCodexStorageList")
@ -86,7 +85,6 @@ func (node CodexNode) Manifests() ([]Manifest, error) {
// Fetch download a file from the network and store it to the local node.
func (node CodexNode) Fetch(cid string) (Manifest, error) {
bridge := newBridgeCtx()
defer bridge.free()
var cCid = C.CString(cid)
defer C.free(unsafe.Pointer(cCid))
@ -115,7 +113,6 @@ func (node CodexNode) Space() (Space, error) {
var space Space
bridge := newBridgeCtx()
defer bridge.free()
if C.cGoCodexStorageSpace(node.ctx, bridge.resp) != C.RET_OK {
return space, bridge.callError("cGoCodexStorageSpace")
@ -134,7 +131,6 @@ func (node CodexNode) Space() (Space, error) {
// from the local node. Does nothing if the dataset is not locally available.
func (node CodexNode) Delete(cid string) error {
bridge := newBridgeCtx()
defer bridge.free()
var cCid = C.CString(cid)
defer C.free(unsafe.Pointer(cCid))
@ -150,7 +146,6 @@ func (node CodexNode) Delete(cid string) error {
// Exists checks if a given cid exists in the local storage.
func (node CodexNode) Exists(cid string) (bool, error) {
bridge := newBridgeCtx()
defer bridge.free()
var cCid = C.CString(cid)
defer C.free(unsafe.Pointer(cCid))

View File

@ -44,6 +44,10 @@ func newCodexNode(t *testing.T, opts ...Config) *CodexNode {
config.DiscoveryPort = c.DiscoveryPort
}
if c.Nat != "" {
config.Nat = c.Nat
}
if c.StorageQuota != 0 {
config.StorageQuota = c.StorageQuota
}
@ -97,6 +101,19 @@ func uploadHelper(t *testing.T, codex *CodexNode) (string, int) {
return cid, len
}
func uploadData(t *testing.T, codex *CodexNode, data []byte) (string, int) {
t.Helper()
buf := bytes.NewBuffer(data)
len := buf.Len()
cid, err := codex.UploadReader(context.Background(), UploadOptions{Filepath: "hello.txt"}, buf)
if err != nil {
t.Fatalf("Error happened during upload: %v\n", err)
}
return cid, len
}
func uploadBigFileHelper(t *testing.T, codex *CodexNode) (string, int) {
t.Helper()

View File

@ -31,7 +31,6 @@ import (
"fmt"
"io"
"os"
"sync/atomic"
"unsafe"
)
@ -84,7 +83,6 @@ func getReaderSize(r io.Reader) int64 {
// You should use this function only if you need to manage the upload session manually.
func (node CodexNode) UploadInit(options *UploadOptions) (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
var cFilename = C.CString(options.Filepath)
defer C.free(unsafe.Pointer(cFilename))
@ -103,7 +101,6 @@ func (node CodexNode) UploadInit(options *UploadOptions) (string, error) {
// You should use this function only if you need to manage the upload session manually.
func (node CodexNode) UploadChunk(sessionId string, chunk []byte) error {
bridge := newBridgeCtx()
defer bridge.free()
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
@ -127,7 +124,6 @@ func (node CodexNode) UploadChunk(sessionId string, chunk []byte) error {
// You should use this function only if you need to manage the upload session manually.
func (node CodexNode) UploadFinalize(sessionId string) (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
@ -144,7 +140,6 @@ func (node CodexNode) UploadFinalize(sessionId string) (string, error) {
// It doesn't work with UploadFile.
func (node CodexNode) UploadCancel(sessionId string) error {
bridge := newBridgeCtx()
defer bridge.free()
var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId))
@ -264,7 +259,6 @@ func (node CodexNode) UploadReaderAsync(ctx context.Context, options UploadOptio
// Internally, it calls UploadInit to create the upload session.
func (node CodexNode) UploadFile(ctx context.Context, options UploadOptions) (string, error) {
bridge := newBridgeCtx()
defer bridge.free()
if options.OnProgress != nil {
stat, err := os.Stat(options.Filepath)
@ -307,30 +301,12 @@ func (node CodexNode) UploadFile(ctx context.Context, options UploadOptions) (st
return "", bridge.callError("cGoCodexUploadFile")
}
// Create a done channel to signal the goroutine to stop
// when the download is complete and avoid goroutine leaks.
done := make(chan struct{})
defer close(done)
_, err = bridge.waitWithContext(ctx)
channelError := make(chan error, 1)
var cancelled atomic.Bool
go func() {
select {
case <-ctx.Done():
channelError <- node.UploadCancel(sessionId)
cancelled.Store(true)
case <-done:
// Nothing to do, upload finished
}
}()
_, err = bridge.wait()
// Extract the potential cancellation error
var cancelErr error
select {
case cancelErr = <-channelError:
default:
if err == context.Canceled {
cancelErr = node.UploadCancel(sessionId)
}
if err != nil {
@ -338,10 +314,6 @@ func (node CodexNode) UploadFile(ctx context.Context, options UploadOptions) (st
return "", fmt.Errorf("context canceled: %v, but failed to cancel upload session: %v", ctx.Err(), cancelErr)
}
if cancelled.Load() {
return "", context.Canceled
}
return "", err
}

2
go.mod
View File

@ -1,3 +1,3 @@
module github.com/codex-storage/codex-go-bindings
go 1.24.0
go 1.24