mirror of
https://github.com/logos-storage/logos-storage-go-bindings.git
synced 2026-01-02 13:33:10 +00:00
Merge pull request #5 from codex-storage/feat/add-context-cancellation
feat: context cancellation
This commit is contained in:
commit
777b678ffb
3
.gitignore
vendored
3
.gitignore
vendored
@ -20,3 +20,6 @@ nimcache
|
||||
# Test files
|
||||
codex/testdata/hello.downloaded.txt
|
||||
codex/testdata/hello.downloaded.writer.txt
|
||||
|
||||
# Bin
|
||||
codex-go
|
||||
6
.vscode/settings.json
vendored
6
.vscode/settings.json
vendored
@ -1,7 +1,9 @@
|
||||
{
|
||||
"go.toolsEnvVars": {
|
||||
"CGO_ENABLED": "1",
|
||||
"CGO_CFLAGS": "-I${workspaceFolder}/vendor/nim-codex/library",
|
||||
"CGO_LDFLAGS": "-L${workspaceFolder}/vendor/nim-codex/build -Wl,-rpath,${workspaceFolder}/vendor/nim-codex/build",
|
||||
"CGO_LDFLAGS": "-L${workspaceFolder}/vendor/nim-codex/build -lcodex -Wl,-rpath,${workspaceFolder}/vendor/nim-codex/build",
|
||||
"LD_LIBRARY_PATH": "${workspaceFolder}/vendor/nim-codex/build:${env:LD_LIBRARY_PATH}"
|
||||
}
|
||||
},
|
||||
"go.testTimeout": "2m"
|
||||
}
|
||||
2
Makefile
2
Makefile
@ -29,7 +29,7 @@ build:
|
||||
|
||||
test:
|
||||
@echo "Running tests..."
|
||||
CGO_ENABLED=1 CGO_CFLAGS="$(CGO_CFLAGS)" CGO_LDFLAGS="$(CGO_LDFLAGS)" go test ./...
|
||||
CGO_ENABLED=1 CGO_CFLAGS="$(CGO_CFLAGS)" CGO_LDFLAGS="$(CGO_LDFLAGS)" GOTESTFLAGS="-timeout=2m" go test ./...
|
||||
|
||||
clean:
|
||||
@echo "Cleaning up..."
|
||||
|
||||
23
README.md
23
README.md
@ -98,7 +98,7 @@ Now the module is ready for use in your project.
|
||||
|
||||
The release process is defined [here](./RELEASE.md).
|
||||
|
||||
## Usage
|
||||
## API
|
||||
|
||||
### Init
|
||||
|
||||
@ -171,11 +171,10 @@ buf := bytes.NewBuffer([]byte("Hello World!"))
|
||||
onProgress := func(read, total int, percent float64, err error) {
|
||||
// Do something with the data
|
||||
}
|
||||
cid, err := codex.UploadReader(UploadOptions{filepath: "hello.txt", onProgress: onProgress}, buf)
|
||||
ctx := context.Background()
|
||||
cid, err := codex.UploadReader(ctx, UploadOptions{filepath: "hello.txt", onProgress: onProgress}, buf)
|
||||
```
|
||||
|
||||
Caveat: once started, the upload cannot be cancelled.
|
||||
|
||||
#### file
|
||||
|
||||
The `file` strategy allows you to upload a file on Codex using the path.
|
||||
@ -189,11 +188,10 @@ The `UploadFile` returns the cid of the content uploaded.
|
||||
onProgress := func(read, total int, percent float64, err error) {
|
||||
// Do something with the data
|
||||
}
|
||||
cid, err := codex.UploadFile(UploadOptions{filepath: "./testdata/hello.txt", onProgress: onProgress})
|
||||
ctx := context.Background()
|
||||
cid, err := codex.UploadFile(ctx, UploadOptions{filepath: "./testdata/hello.txt", onProgress: onProgress})
|
||||
```
|
||||
|
||||
Caveat: once started, the upload cannot be cancelled.
|
||||
|
||||
#### chunks
|
||||
|
||||
The `chunks` strategy allows you to manage the upload by yourself. It requires more code
|
||||
@ -246,11 +244,10 @@ opt := DownloadStreamOptions{
|
||||
// Handle progress
|
||||
},
|
||||
}
|
||||
err := codex.DownloadStream(cid, opt)
|
||||
ctx := context.Background()
|
||||
err := codex.DownloadStream(ctx, cid, opt)
|
||||
```
|
||||
|
||||
Caveat: once started, the download cannot be cancelled.
|
||||
|
||||
#### chunks
|
||||
|
||||
The `chunks` strategy allows to manage the download by yourself. It requires more code
|
||||
@ -311,3 +308,9 @@ record, err := node.CodexPeerDebug(peerId)
|
||||
```
|
||||
|
||||
`CodexPeerDebug` is only available if you built with `-d:codex_enable_api_debug_peers=true` flag.
|
||||
|
||||
### Context and cancellation
|
||||
|
||||
Go contexts are exposed only on the long-running operations as `UploadReader`, `UploadFile`, and `DownloadFile`. If the
|
||||
context is cancelled, those methods cancel the active upload or download. Short lived API calls don’t take a context
|
||||
because they usually finish before a cancellation signal could matter.
|
||||
@ -92,7 +92,7 @@ const (
|
||||
|
||||
type Config struct {
|
||||
// Default: INFO
|
||||
LogLevel LogLevel `json:"log-level,omitempty"`
|
||||
LogLevel string `json:"log-level,omitempty"`
|
||||
|
||||
// Specifies what kind of logs should be written to stdout
|
||||
// Default: auto
|
||||
@ -280,8 +280,12 @@ func (node CodexNode) Destroy() error {
|
||||
return bridge.callError("cGoCodexDestroy")
|
||||
}
|
||||
|
||||
_, err = bridge.wait()
|
||||
return err
|
||||
// We don't wait for the bridge here.
|
||||
// The destroy function does not call the worker thread,
|
||||
// it destroys the context directly and return the return
|
||||
// value synchronously.
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Version returns the version of the Codex node.
|
||||
|
||||
@ -3,7 +3,11 @@ package codex
|
||||
import "testing"
|
||||
|
||||
func TestCodexVersion(t *testing.T) {
|
||||
node := newCodexNode(t, withNoStart())
|
||||
config := defaultConfigHelper(t)
|
||||
node, err := New(config)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Codex node: %v", err)
|
||||
}
|
||||
|
||||
version, err := node.Version()
|
||||
if err != nil {
|
||||
@ -17,7 +21,11 @@ func TestCodexVersion(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestCodexRevision(t *testing.T) {
|
||||
node := newCodexNode(t, withNoStart())
|
||||
config := defaultConfigHelper(t)
|
||||
node, err := New(config)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Codex node: %v", err)
|
||||
}
|
||||
|
||||
revision, err := node.Revision()
|
||||
if err != nil {
|
||||
|
||||
@ -32,38 +32,19 @@ func TestUpdateLogLevel(t *testing.T) {
|
||||
}
|
||||
defer os.Remove(tmpFile.Name())
|
||||
|
||||
node, err := New(Config{
|
||||
node := newCodexNode(t, Config{
|
||||
LogLevel: "INFO",
|
||||
LogFile: tmpFile.Name(),
|
||||
MetricsEnabled: false,
|
||||
LogFormat: LogFormatNoColors,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Codex node: %v", err)
|
||||
}
|
||||
|
||||
t.Cleanup(func() {
|
||||
if err := node.Stop(); err != nil {
|
||||
t.Logf("cleanup codex: %v", err)
|
||||
}
|
||||
|
||||
if err := node.Destroy(); err != nil {
|
||||
t.Logf("cleanup codex: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
if err := node.Start(); err != nil {
|
||||
t.Fatalf("Failed to start Codex node: %v", err)
|
||||
}
|
||||
|
||||
content, err := os.ReadFile(tmpFile.Name())
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read log file: %v", err)
|
||||
}
|
||||
if !strings.Contains(string(content), "Started codex node") {
|
||||
t.Errorf("Log file does not contain 'Started codex node' %s", string(content))
|
||||
}
|
||||
|
||||
if err := node.Stop(); err != nil {
|
||||
t.Fatalf("Failed to stop Codex node: %v", err)
|
||||
if !strings.Contains(string(content), "INF") {
|
||||
t.Errorf("Log file does not contain INFO statement %s", string(content))
|
||||
}
|
||||
|
||||
err = node.UpdateLogLevel("ERROR")
|
||||
@ -71,6 +52,11 @@ func TestUpdateLogLevel(t *testing.T) {
|
||||
t.Fatalf("UpdateLogLevel call failed: %v", err)
|
||||
}
|
||||
|
||||
if err := node.Stop(); err != nil {
|
||||
t.Fatalf("Failed to stop Codex node: %v", err)
|
||||
}
|
||||
|
||||
// Clear the file
|
||||
if err := os.WriteFile(tmpFile.Name(), []byte{}, 0644); err != nil {
|
||||
t.Fatalf("Failed to clear log file: %v", err)
|
||||
}
|
||||
@ -85,8 +71,8 @@ func TestUpdateLogLevel(t *testing.T) {
|
||||
t.Fatalf("Failed to read log file: %v", err)
|
||||
}
|
||||
|
||||
if strings.Contains(string(content), "Starting discovery node") {
|
||||
t.Errorf("Log file contains 'Starting discovery node'")
|
||||
if strings.Contains(string(content), "INF") {
|
||||
t.Errorf("Log file contains INFO statement after log level update: %s", string(content))
|
||||
}
|
||||
}
|
||||
|
||||
@ -94,49 +80,9 @@ func TestCodexPeerDebug(t *testing.T) {
|
||||
var bootstrap, node1, node2 *CodexNode
|
||||
var err error
|
||||
|
||||
t.Cleanup(func() {
|
||||
if bootstrap != nil {
|
||||
if err := bootstrap.Stop(); err != nil {
|
||||
t.Logf("cleanup bootstrap: %v", err)
|
||||
}
|
||||
|
||||
if err := bootstrap.Destroy(); err != nil {
|
||||
t.Logf("cleanup bootstrap: %v", err)
|
||||
}
|
||||
}
|
||||
if node1 != nil {
|
||||
if err := node1.Stop(); err != nil {
|
||||
t.Logf("cleanup node1: %v", err)
|
||||
}
|
||||
|
||||
if err := node1.Destroy(); err != nil {
|
||||
t.Logf("cleanup node1: %v", err)
|
||||
}
|
||||
}
|
||||
if node2 != nil {
|
||||
if err := node2.Stop(); err != nil {
|
||||
t.Logf("cleanup node2: %v", err)
|
||||
}
|
||||
|
||||
if err := node2.Destroy(); err != nil {
|
||||
t.Logf("cleanup node2: %v", err)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
bootstrap, err = New(Config{
|
||||
DataDir: t.TempDir(),
|
||||
LogFormat: LogFormatNoColors,
|
||||
MetricsEnabled: false,
|
||||
bootstrap = newCodexNode(t, Config{
|
||||
DiscoveryPort: 8092,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create bootstrap: %v", err)
|
||||
}
|
||||
|
||||
if err := bootstrap.Start(); err != nil {
|
||||
t.Fatalf("Failed to start bootstrap: %v", err)
|
||||
}
|
||||
|
||||
spr, err := bootstrap.Spr()
|
||||
if err != nil {
|
||||
@ -145,35 +91,15 @@ func TestCodexPeerDebug(t *testing.T) {
|
||||
|
||||
bootstrapNodes := []string{spr}
|
||||
|
||||
node1, err = New(Config{
|
||||
DataDir: t.TempDir(),
|
||||
LogFormat: LogFormatNoColors,
|
||||
MetricsEnabled: false,
|
||||
node1 = newCodexNode(t, Config{
|
||||
DiscoveryPort: 8090,
|
||||
BootstrapNodes: bootstrapNodes,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create codex: %v", err)
|
||||
}
|
||||
|
||||
if err := node1.Start(); err != nil {
|
||||
t.Fatalf("Failed to start codex: %v", err)
|
||||
}
|
||||
|
||||
node2, err = New(Config{
|
||||
DataDir: t.TempDir(),
|
||||
LogFormat: LogFormatNoColors,
|
||||
MetricsEnabled: false,
|
||||
node2 = newCodexNode(t, Config{
|
||||
DiscoveryPort: 8091,
|
||||
BootstrapNodes: bootstrapNodes,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create codex2: %v", err)
|
||||
}
|
||||
|
||||
if err := node2.Start(); err != nil {
|
||||
t.Fatalf("Failed to start codex2: %v", err)
|
||||
}
|
||||
|
||||
peerId, err := node2.PeerId()
|
||||
if err != nil {
|
||||
@ -186,9 +112,14 @@ func TestCodexPeerDebug(t *testing.T) {
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("CodexPeerDebug call failed: %v", err)
|
||||
}
|
||||
|
||||
if record.PeerId == "" {
|
||||
t.Fatalf("CodexPeerDebug call failed: %v", err)
|
||||
}
|
||||
|
||||
@ -26,7 +26,9 @@ package codex
|
||||
*/
|
||||
import "C"
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"unsafe"
|
||||
)
|
||||
@ -145,7 +147,7 @@ func (node CodexNode) DownloadManifest(cid string) (Manifest, error) {
|
||||
// If options.writer is set, the data will be written into that writer.
|
||||
// The options filepath and writer are not mutually exclusive, i.e you can write
|
||||
// in different places in a same call.
|
||||
func (node CodexNode) DownloadStream(cid string, options DownloadStreamOptions) error {
|
||||
func (node CodexNode) DownloadStream(ctx context.Context, cid string, options DownloadStreamOptions) error {
|
||||
bridge := newBridgeCtx()
|
||||
defer bridge.free()
|
||||
|
||||
@ -189,6 +191,16 @@ func (node CodexNode) DownloadStream(cid string, options DownloadStreamOptions)
|
||||
var cCid = C.CString(cid)
|
||||
defer C.free(unsafe.Pointer(cCid))
|
||||
|
||||
err := node.DownloadInit(cid, DownloadInitOptions{
|
||||
ChunkSize: options.ChunkSize,
|
||||
Local: options.Local,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer node.DownloadCancel(cid)
|
||||
|
||||
var cFilepath = C.CString(options.Filepath)
|
||||
defer C.free(unsafe.Pointer(cFilepath))
|
||||
|
||||
@ -198,8 +210,39 @@ func (node CodexNode) DownloadStream(cid string, options DownloadStreamOptions)
|
||||
return bridge.callError("cGoCodexDownloadLocal")
|
||||
}
|
||||
|
||||
_, err := bridge.wait()
|
||||
// Create a done channel to signal the goroutine to stop
|
||||
// when the download is complete and avoid goroutine leaks.
|
||||
done := make(chan struct{})
|
||||
defer close(done)
|
||||
|
||||
channelError := make(chan error, 1)
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
channelError <- node.DownloadCancel(cid)
|
||||
case <-done:
|
||||
// Nothing to do, download finished
|
||||
}
|
||||
}()
|
||||
|
||||
_, err = bridge.wait()
|
||||
|
||||
// Extract the potential cancellation error
|
||||
var cancelError error
|
||||
select {
|
||||
case cancelError = <-channelError:
|
||||
default:
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if cancelError != nil {
|
||||
return fmt.Errorf("download canceled: %v, but failed to cancel download session: %v", ctx.Err(), cancelError)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
return cancelError
|
||||
}
|
||||
|
||||
// DownloadInit initializes the download process for a specific CID.
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package codex
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
@ -32,7 +33,7 @@ func TestDownloadStream(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
if err := codex.DownloadStream(cid, opt); err != nil {
|
||||
if err := codex.DownloadStream(context.Background(), cid, opt); err != nil {
|
||||
t.Fatal("Error happened:", err.Error())
|
||||
}
|
||||
|
||||
@ -72,7 +73,7 @@ func TestDownloadStreamWithAutosize(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
if err := codex.DownloadStream(cid, opt); err != nil {
|
||||
if err := codex.DownloadStream(context.Background(), cid, opt); err != nil {
|
||||
t.Fatal("Error happened:", err.Error())
|
||||
}
|
||||
|
||||
@ -86,14 +87,38 @@ func TestDownloadStreamWithAutosize(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDownloadStreamWithNotExisting(t *testing.T) {
|
||||
codex := newCodexNode(t, withBlockRetries(1))
|
||||
codex := newCodexNode(t, Config{BlockRetries: 1})
|
||||
|
||||
opt := DownloadStreamOptions{}
|
||||
if err := codex.DownloadStream("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku", opt); err == nil {
|
||||
if err := codex.DownloadStream(context.Background(), "bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku", opt); err == nil {
|
||||
t.Fatal("Error expected when downloading non-existing cid")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDownloadStreamCancelled(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
codex := newCodexNode(t)
|
||||
cid, _ := uploadBigFileHelper(t, codex)
|
||||
|
||||
channelError := make(chan error, 1)
|
||||
go func() {
|
||||
err := codex.DownloadStream(ctx, cid, DownloadStreamOptions{Local: true})
|
||||
channelError <- err
|
||||
}()
|
||||
|
||||
cancel()
|
||||
err := <-channelError
|
||||
|
||||
if err == nil {
|
||||
t.Fatal("UploadFile should have been canceled")
|
||||
}
|
||||
|
||||
if err.Error() != "Failed to stream file: Stream EOF!" {
|
||||
t.Fatalf("UploadFile returned unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDownloadManual(t *testing.T) {
|
||||
codex := newCodexNode(t)
|
||||
cid, _ := uploadHelper(t, codex)
|
||||
@ -134,7 +159,7 @@ func TestDownloadManifest(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDownloadManifestWithNotExistingCid(t *testing.T) {
|
||||
codex := newCodexNode(t, withBlockRetries(1))
|
||||
codex := newCodexNode(t, Config{BlockRetries: 1})
|
||||
|
||||
manifest, err := codex.DownloadManifest("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku")
|
||||
if err == nil {
|
||||
@ -147,7 +172,7 @@ func TestDownloadManifestWithNotExistingCid(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDownloadInitWithNotExistingCid(t *testing.T) {
|
||||
codex := newCodexNode(t, withBlockRetries(1))
|
||||
codex := newCodexNode(t, Config{BlockRetries: 1})
|
||||
|
||||
if err := codex.DownloadInit("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku", DownloadInitOptions{}); err == nil {
|
||||
t.Fatal("expected error when initializing download for non-existent cid")
|
||||
|
||||
@ -73,49 +73,9 @@ func TestCodexWithPeerId(t *testing.T) {
|
||||
var bootstrap, node1, node2 *CodexNode
|
||||
var err error
|
||||
|
||||
t.Cleanup(func() {
|
||||
if bootstrap != nil {
|
||||
if err := bootstrap.Stop(); err != nil {
|
||||
t.Logf("cleanup bootstrap: %v", err)
|
||||
}
|
||||
|
||||
if err := bootstrap.Destroy(); err != nil {
|
||||
t.Logf("cleanup bootstrap: %v", err)
|
||||
}
|
||||
}
|
||||
if node1 != nil {
|
||||
if err := node1.Stop(); err != nil {
|
||||
t.Logf("cleanup node1: %v", err)
|
||||
}
|
||||
|
||||
if err := node1.Destroy(); err != nil {
|
||||
t.Logf("cleanup node1: %v", err)
|
||||
}
|
||||
}
|
||||
if node2 != nil {
|
||||
if err := node2.Stop(); err != nil {
|
||||
t.Logf("cleanup node2: %v", err)
|
||||
}
|
||||
|
||||
if err := node2.Destroy(); err != nil {
|
||||
t.Logf("cleanup node2: %v", err)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
bootstrap, err = New(Config{
|
||||
DataDir: t.TempDir(),
|
||||
LogFormat: LogFormatNoColors,
|
||||
MetricsEnabled: false,
|
||||
bootstrap = newCodexNode(t, Config{
|
||||
DiscoveryPort: 8092,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create bootstrap: %v", err)
|
||||
}
|
||||
|
||||
if err := bootstrap.Start(); err != nil {
|
||||
t.Fatalf("Failed to start bootstrap: %v", err)
|
||||
}
|
||||
|
||||
spr, err := bootstrap.Spr()
|
||||
if err != nil {
|
||||
@ -124,35 +84,15 @@ func TestCodexWithPeerId(t *testing.T) {
|
||||
|
||||
bootstrapNodes := []string{spr}
|
||||
|
||||
node1, err = New(Config{
|
||||
DataDir: t.TempDir(),
|
||||
LogFormat: LogFormatNoColors,
|
||||
MetricsEnabled: false,
|
||||
node1 = newCodexNode(t, Config{
|
||||
DiscoveryPort: 8090,
|
||||
BootstrapNodes: bootstrapNodes,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create codex: %v", err)
|
||||
}
|
||||
|
||||
if err := node1.Start(); err != nil {
|
||||
t.Fatalf("Failed to start codex: %v", err)
|
||||
}
|
||||
|
||||
node2, err = New(Config{
|
||||
DataDir: t.TempDir(),
|
||||
LogFormat: LogFormatNoColors,
|
||||
MetricsEnabled: false,
|
||||
node2 = newCodexNode(t, Config{
|
||||
DiscoveryPort: 8091,
|
||||
BootstrapNodes: bootstrapNodes,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create codex2: %v", err)
|
||||
}
|
||||
|
||||
if err := node2.Start(); err != nil {
|
||||
t.Fatalf("Failed to start codex2: %v", err)
|
||||
}
|
||||
|
||||
peerId, err := node2.PeerId()
|
||||
if err != nil {
|
||||
|
||||
@ -84,7 +84,7 @@ func TestFetch(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFetchCidDoesNotExist(t *testing.T) {
|
||||
codex := newCodexNode(t, withBlockRetries(1))
|
||||
codex := newCodexNode(t, Config{BlockRetries: 1})
|
||||
|
||||
_, err := codex.Fetch("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku")
|
||||
if err == nil {
|
||||
|
||||
@ -2,55 +2,63 @@ package codex
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"testing"
|
||||
)
|
||||
|
||||
type codexNodeTestOption func(*codexNodeTestOptions)
|
||||
func defaultConfigHelper(t *testing.T) Config {
|
||||
t.Helper()
|
||||
|
||||
type codexNodeTestOptions struct {
|
||||
noStart bool
|
||||
blockRetries int
|
||||
}
|
||||
|
||||
func withNoStart() codexNodeTestOption {
|
||||
return func(o *codexNodeTestOptions) { o.noStart = true }
|
||||
}
|
||||
|
||||
func withBlockRetries(n int) codexNodeTestOption {
|
||||
return func(o *codexNodeTestOptions) { o.blockRetries = n }
|
||||
}
|
||||
|
||||
func newCodexNode(t *testing.T, opts ...codexNodeTestOption) *CodexNode {
|
||||
o := codexNodeTestOptions{
|
||||
blockRetries: 3000,
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(&o)
|
||||
}
|
||||
|
||||
node, err := New(Config{
|
||||
return Config{
|
||||
DataDir: t.TempDir(),
|
||||
LogFormat: LogFormatNoColors,
|
||||
MetricsEnabled: false,
|
||||
BlockRetries: o.blockRetries,
|
||||
})
|
||||
BlockRetries: 3000,
|
||||
LogLevel: "ERROR",
|
||||
}
|
||||
}
|
||||
|
||||
func newCodexNode(t *testing.T, opts ...Config) *CodexNode {
|
||||
config := defaultConfigHelper(t)
|
||||
|
||||
if len(opts) > 0 {
|
||||
c := opts[0]
|
||||
|
||||
if c.BlockRetries > 0 {
|
||||
config.BlockRetries = c.BlockRetries
|
||||
}
|
||||
|
||||
if c.LogLevel != "" {
|
||||
config.LogLevel = c.LogLevel
|
||||
}
|
||||
|
||||
if c.LogFile != "" {
|
||||
config.LogFile = c.LogFile
|
||||
}
|
||||
|
||||
if len(c.BootstrapNodes) != 0 {
|
||||
config.BootstrapNodes = c.BootstrapNodes
|
||||
}
|
||||
|
||||
if c.DiscoveryPort != 0 {
|
||||
config.DiscoveryPort = c.DiscoveryPort
|
||||
}
|
||||
}
|
||||
|
||||
node, err := New(config)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Codex node: %v", err)
|
||||
}
|
||||
|
||||
if !o.noStart {
|
||||
err = node.Start()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to start Codex node: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
t.Cleanup(func() {
|
||||
if !o.noStart {
|
||||
if err := node.Stop(); err != nil {
|
||||
t.Logf("cleanup codex: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := node.Destroy(); err != nil {
|
||||
t.Logf("cleanup codex: %v", err)
|
||||
@ -65,7 +73,21 @@ func uploadHelper(t *testing.T, codex *CodexNode) (string, int) {
|
||||
|
||||
buf := bytes.NewBuffer([]byte("Hello World!"))
|
||||
len := buf.Len()
|
||||
cid, err := codex.UploadReader(UploadOptions{Filepath: "hello.txt"}, buf)
|
||||
cid, err := codex.UploadReader(context.Background(), UploadOptions{Filepath: "hello.txt"}, buf)
|
||||
if err != nil {
|
||||
t.Fatalf("Error happened during upload: %v\n", err)
|
||||
}
|
||||
|
||||
return cid, len
|
||||
}
|
||||
|
||||
func uploadBigFileHelper(t *testing.T, codex *CodexNode) (string, int) {
|
||||
t.Helper()
|
||||
|
||||
len := 1024 * 1024 * 50
|
||||
buf := bytes.NewBuffer(make([]byte, len))
|
||||
|
||||
cid, err := codex.UploadReader(context.Background(), UploadOptions{Filepath: "hello.txt"}, buf)
|
||||
if err != nil {
|
||||
t.Fatalf("Error happened during upload: %v\n", err)
|
||||
}
|
||||
|
||||
@ -27,6 +27,8 @@ package codex
|
||||
import "C"
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
@ -164,11 +166,12 @@ func (node CodexNode) UploadCancel(sessionId string) error {
|
||||
// - UploadChunk to upload a chunk to codex.
|
||||
// - UploadFinalize to finalize the upload session.
|
||||
// - UploadCancel if an error occurs.
|
||||
func (node CodexNode) UploadReader(options UploadOptions, r io.Reader) (string, error) {
|
||||
func (node CodexNode) UploadReader(ctx context.Context, options UploadOptions, r io.Reader) (string, error) {
|
||||
sessionId, err := node.UploadInit(&options)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer node.UploadCancel(sessionId)
|
||||
|
||||
buf := make([]byte, options.ChunkSize.valOrDefault())
|
||||
total := 0
|
||||
@ -179,6 +182,16 @@ func (node CodexNode) UploadReader(options UploadOptions, r io.Reader) (string,
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if cancelErr := node.UploadCancel(sessionId); cancelErr != nil {
|
||||
return "", fmt.Errorf("upload canceled: %v, but failed to cancel upload session: %v", ctx.Err(), cancelErr)
|
||||
}
|
||||
return "", errors.New("upload canceled")
|
||||
default:
|
||||
// continue
|
||||
}
|
||||
|
||||
n, err := r.Read(buf)
|
||||
if err == io.EOF {
|
||||
break
|
||||
@ -222,9 +235,9 @@ func (node CodexNode) UploadReader(options UploadOptions, r io.Reader) (string,
|
||||
}
|
||||
|
||||
// UploadReaderAsync is the asynchronous version of UploadReader using a goroutine.
|
||||
func (node CodexNode) UploadReaderAsync(options UploadOptions, r io.Reader, onDone func(cid string, err error)) {
|
||||
func (node CodexNode) UploadReaderAsync(ctx context.Context, options UploadOptions, r io.Reader, onDone func(cid string, err error)) {
|
||||
go func() {
|
||||
cid, err := node.UploadReader(options, r)
|
||||
cid, err := node.UploadReader(ctx, options, r)
|
||||
onDone(cid, err)
|
||||
}()
|
||||
}
|
||||
@ -249,7 +262,7 @@ func (node CodexNode) UploadReaderAsync(options UploadOptions, r io.Reader, onDo
|
||||
// is sent to the stream.
|
||||
//
|
||||
// Internally, it calls UploadInit to create the upload session.
|
||||
func (node CodexNode) UploadFile(options UploadOptions) (string, error) {
|
||||
func (node CodexNode) UploadFile(ctx context.Context, options UploadOptions) (string, error) {
|
||||
bridge := newBridgeCtx()
|
||||
defer bridge.free()
|
||||
|
||||
@ -285,6 +298,7 @@ func (node CodexNode) UploadFile(options UploadOptions) (string, error) {
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer node.UploadCancel(sessionId)
|
||||
|
||||
var cSessionId = C.CString(sessionId)
|
||||
defer C.free(unsafe.Pointer(cSessionId))
|
||||
@ -293,13 +307,44 @@ func (node CodexNode) UploadFile(options UploadOptions) (string, error) {
|
||||
return "", bridge.callError("cGoCodexUploadFile")
|
||||
}
|
||||
|
||||
return bridge.wait()
|
||||
// Create a done channel to signal the goroutine to stop
|
||||
// when the download is complete and avoid goroutine leaks.
|
||||
done := make(chan struct{})
|
||||
defer close(done)
|
||||
|
||||
channelError := make(chan error, 1)
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
channelError <- node.UploadCancel(sessionId)
|
||||
case <-done:
|
||||
// Nothing to do, upload finished
|
||||
}
|
||||
}()
|
||||
|
||||
_, err = bridge.wait()
|
||||
|
||||
// Extract the potential cancellation error
|
||||
var cancelErr error
|
||||
select {
|
||||
case cancelErr = <-channelError:
|
||||
default:
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if cancelErr != nil {
|
||||
return "", fmt.Errorf("upload canceled: %v, but failed to cancel upload session: %v", ctx.Err(), cancelErr)
|
||||
}
|
||||
return "", err
|
||||
}
|
||||
|
||||
return bridge.result, cancelErr
|
||||
}
|
||||
|
||||
// UploadFileAsync is the asynchronous version of UploadFile using a goroutine.
|
||||
func (node CodexNode) UploadFileAsync(options UploadOptions, onDone func(cid string, err error)) {
|
||||
func (node CodexNode) UploadFileAsync(ctx context.Context, options UploadOptions, onDone func(cid string, err error)) {
|
||||
go func() {
|
||||
cid, err := node.UploadFile(options)
|
||||
cid, err := node.UploadFile(ctx, options)
|
||||
onDone(cid, err)
|
||||
}()
|
||||
}
|
||||
|
||||
@ -2,6 +2,7 @@ package codex
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"log"
|
||||
"os"
|
||||
"testing"
|
||||
@ -16,7 +17,7 @@ func TestUploadReader(t *testing.T) {
|
||||
|
||||
buf := bytes.NewBuffer([]byte("Hello World!"))
|
||||
len := buf.Len()
|
||||
cid, err := codex.UploadReader(UploadOptions{Filepath: "hello.txt", OnProgress: func(read, total int, percent float64, err error) {
|
||||
cid, err := codex.UploadReader(context.Background(), UploadOptions{Filepath: "hello.txt", OnProgress: func(read, total int, percent float64, err error) {
|
||||
if err != nil {
|
||||
log.Fatalf("Error happened during upload: %v\n", err)
|
||||
}
|
||||
@ -42,6 +43,30 @@ func TestUploadReader(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestUploadReaderCancel(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
codex := newCodexNode(t)
|
||||
buf := bytes.NewBuffer(make([]byte, 1024*1024*10))
|
||||
|
||||
channelErr := make(chan error, 1)
|
||||
go func() {
|
||||
_, e := codex.UploadReader(ctx, UploadOptions{Filepath: "hello.txt"}, buf)
|
||||
channelErr <- e
|
||||
}()
|
||||
|
||||
cancel()
|
||||
err := <-channelErr
|
||||
|
||||
if err == nil {
|
||||
t.Fatal("UploadReader should have been canceled")
|
||||
}
|
||||
|
||||
if err.Error() != "upload canceled" {
|
||||
t.Fatalf("UploadReader returned unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUploadFile(t *testing.T) {
|
||||
codex := newCodexNode(t)
|
||||
totalBytes := 0
|
||||
@ -61,7 +86,7 @@ func TestUploadFile(t *testing.T) {
|
||||
finalPercent = percent
|
||||
}}
|
||||
|
||||
cid, err := codex.UploadFile(options)
|
||||
cid, err := codex.UploadFile(context.Background(), options)
|
||||
if err != nil {
|
||||
t.Fatalf("UploadReader failed: %v", err)
|
||||
}
|
||||
@ -79,12 +104,47 @@ func TestUploadFile(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestUploadFileCancel(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
tmpFile, err := os.Create(os.TempDir() + "/large_file.txt")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create temp file: %v", err)
|
||||
}
|
||||
defer os.Remove(tmpFile.Name())
|
||||
|
||||
largeContent := make([]byte, 1024*1024*50)
|
||||
if _, err := tmpFile.Write(largeContent); err != nil {
|
||||
t.Fatalf("Failed to write to temp file: %v", err)
|
||||
}
|
||||
tmpFile.Close()
|
||||
|
||||
codex := newCodexNode(t)
|
||||
|
||||
channelError := make(chan error, 1)
|
||||
go func() {
|
||||
_, err := codex.UploadFile(ctx, UploadOptions{Filepath: tmpFile.Name()})
|
||||
channelError <- err
|
||||
}()
|
||||
|
||||
cancel()
|
||||
err = <-channelError
|
||||
|
||||
if err == nil {
|
||||
t.Fatal("UploadFile should have been canceled")
|
||||
}
|
||||
|
||||
if err.Error() != "Failed to upload the file: Failed to stream the file: Stream Closed!" {
|
||||
t.Fatalf("UploadFile returned unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUploadFileNoProgress(t *testing.T) {
|
||||
codex := newCodexNode(t)
|
||||
|
||||
options := UploadOptions{Filepath: "./testdata/doesnt_exist.txt"}
|
||||
|
||||
cid, err := codex.UploadFile(options)
|
||||
cid, err := codex.UploadFile(context.Background(), options)
|
||||
if err == nil {
|
||||
t.Fatalf("UploadReader should have failed")
|
||||
}
|
||||
|
||||
2
vendor/nim-codex
vendored
2
vendor/nim-codex
vendored
@ -1 +1 @@
|
||||
Subproject commit 1105b81cc1b202006ca5a16485b3cfc5331468d5
|
||||
Subproject commit a86d8586456d5eb6805b228e80e264ee736a6a90
|
||||
Loading…
x
Reference in New Issue
Block a user