Compare commits

...

65 Commits

Author SHA1 Message Date
Arnaud
5943738212
Merge pull request #19 from logos-storage/build/dev-env-gotestsum-and-makefile
Build: small updates to dev env: debugging, gotestsum and makefile
2025-12-11 06:08:14 +01:00
Marcin Czenko
2cd2df0beb
Updates README 2025-11-28 01:48:34 +01:00
Marcin Czenko
d8b21f2087
Installs gotestsum on the CI 2025-11-28 01:16:08 +01:00
Marcin Czenko
316d2bde6c
adds make target to build the lib with debug API 2025-11-28 01:16:08 +01:00
Marcin Czenko
be94580b23
adds extra make target to Makefile allowing you to pass parameters to go test 2025-11-28 01:16:08 +01:00
Marcin Czenko
99cffb1829
adds possibility to run selected tests with "make test <name>" 2025-11-28 01:16:08 +01:00
Marcin Czenko
79d9643745
adds launch configuration for more convenient debugging 2025-11-28 01:16:08 +01:00
Marcin Czenko
56914260a0 uses Nat: "none" in recently added tests 2025-11-27 22:06:05 +01:00
Arnaud
5ba9d94ecf
Update changelog 2025-11-14 06:00:10 +01:00
Arnaud
8026f3fd8d
Merge pull request #16 from codex-storage/fix/configuration-duration
fix: fix configuration types
2025-11-14 08:22:05 +04:00
Arnaud
017b032d84
Bump nim codex and fix configuration 2025-11-14 05:01:45 +01:00
Arnaud
1cf7fe4407
Merge pull request #15 from codex-storage/fix/prevent-datastore-lock
fix: bump nim codex to prevent datastore lock when closing the Codex client
2025-11-14 07:42:49 +04:00
Arnaud
429d076008
Bump nim codex to prevent datastore lock when closing the Codex client 2025-11-14 04:24:16 +01:00
Arnaud
c47e7ae49d
Update changelog 2025-11-11 11:46:18 +01:00
Eric
a83c427561
Merge pull request #13 from codex-storage/chore/bump-codex-to-release-branch
chore: bump nim-codex to prototype release branch
2025-11-11 17:53:10 +11:00
Arnaud
7bf087ad88
Add StorageQuota config to testutil 2025-11-11 07:34:41 +01:00
Arnaud
763c7745f7
Add test for storage quota 2025-11-11 07:31:44 +01:00
Arnaud
6a5c6da14f
Bump nim-codex version 2025-11-11 07:29:20 +01:00
Eric
a5ae146e5d
Remove loglevel trace from tests 2025-11-11 16:40:49 +11:00
Eric
e1511cd9e7
bump to latest release/1.0.0-prototype.1 commit 2025-11-11 16:40:49 +11:00
Eric
0000ded180
Set loglevel: trace for tests, remove nat: none 2025-11-11 16:40:49 +11:00
Eric
b48610b1d6
Set nat: none for tests 2025-11-11 16:40:48 +11:00
Eric
123c5a482b
chore: bump nim-codex to prototype release branch 2025-11-11 16:40:48 +11:00
Arnaud
643a9cbd0e
Merge pull request #11 from codex-storage/chore/release-note
chore: enhance release note
2025-11-11 08:59:47 +04:00
Arnaud
16a4fbbde5
Add release note 2025-11-04 06:31:14 +01:00
Arnaud
f69939fdae
Update changelog 2025-11-03 15:00:59 +01:00
Arnaud
e5d5e9bdf5
Merge pull request #10 from codex-storage/chore/bump-codex-to-release-branch
chore: bump codex to prototype release branch
2025-11-03 16:51:01 +04:00
Eric
0e0a31cf58
chore: bump codex to prototype release branch 2025-11-03 23:13:41 +11:00
Arnaud
c898f5c48a
Update changelog 2025-11-03 07:45:33 +01:00
Arnaud
32f40d0c12
Merge pull request #9 from codex-storage/feat/has-block
feat: exists feature
2025-11-03 09:51:51 +04:00
Arnaud
84d3e298d8
Bump nim codex 2025-11-02 20:20:15 +01:00
Arnaud
232fd381bb
Add exists feature 2025-11-02 17:20:59 +01:00
Arnaud
9aa9977982
Update changelog 2025-10-30 08:33:42 +01:00
Arnaud
fd5a0c371d
Merge pull request #8 from codex-storage/chore/return-context-cancelled-error
chore: return standard cancellation error
2025-10-30 11:21:11 +04:00
Arnaud
d8f5e0627a
Return standard cancellation error 2025-10-30 07:56:43 +01:00
Arnaud
777b678ffb
Merge pull request #5 from codex-storage/feat/add-context-cancellation
feat: context cancellation
2025-10-30 09:37:53 +04:00
Arnaud
e02490588b
Bump nim-codex version 2025-10-30 06:09:32 +01:00
Arnaud
621103bedb
Ignore bin 2025-10-30 05:56:53 +01:00
Arnaud
92d0245bc6
Small test refactor 2025-10-30 05:56:53 +01:00
Arnaud
f184f0db8d
Bump nim codex 2025-10-30 05:56:53 +01:00
Arnaud
25931bdbf0
Increase test timeout 2025-10-30 05:56:52 +01:00
Arnaud
0c19d39d8c
Set LogLevel as string and do not wait for bridge when destroying because it is a sync call 2025-10-30 05:56:52 +01:00
Arnaud
35c06ba719
Update the configuration pattern to create a new node for the tests 2025-10-30 05:56:50 +01:00
Arnaud
597b98dacc
Bump nim codex 2025-10-30 05:56:50 +01:00
Arnaud
cf52b7097e
Bump nim codex 2025-10-30 05:56:50 +01:00
Arnaud
24789fb855
Defer UploadCancel after UploadInit to be sure that the upload is cancelled after the function is done 2025-10-30 05:56:49 +01:00
Arnaud
d2d8802245
Bump nim-codex 2025-10-30 05:56:49 +01:00
Arnaud
a9fd02863c
Defer channel close 2025-10-30 05:56:48 +01:00
Arnaud
bc8bbf4172
Return cancellation errors 2025-10-30 05:56:48 +01:00
Arnaud
bcdff2f8ba
Defer download cancel after the download session is created 2025-10-30 05:56:47 +01:00
Arnaud
3d66b10fa9
Fix cancel error monitoring 2025-10-30 05:56:47 +01:00
Arnaud
a2983631d6
Remove useless call to UpdateLogLevel 2025-10-30 05:56:46 +01:00
Arnaud
4a20b99444
Remove log level 2025-10-30 05:56:45 +01:00
Arnaud
a68d92ee8c
Update readme 2025-10-30 05:56:44 +01:00
Arnaud
27b68f6d8f
Update README 2025-10-30 05:56:43 +01:00
Arnaud
40aadaf104
Bump nim codex 2025-10-30 05:56:43 +01:00
Arnaud
fca7f76ac1
Add cancellation test 2025-10-30 05:56:42 +01:00
Arnaud
43e6e5e81f
Add support for context and cancellation 2025-10-30 05:56:40 +01:00
Arnaud
ca4993e4b0
Bump nim codex 2025-10-30 05:56:39 +01:00
Arnaud
e30c492cf8
Update README 2025-10-30 05:56:39 +01:00
Arnaud
7218764de5
Add upload cancellation 2025-10-30 05:55:37 +01:00
Arnaud
37cdd17ba8
Update CGO variables settings 2025-10-30 05:55:35 +01:00
Arnaud
277f856b1e
Merge pull request #7 from codex-storage/chore/macos-ci
chore: add macos on ci
2025-10-24 17:20:18 +04:00
Arnaud
aabf78a02f
Try to add macos on ci 2025-10-23 08:10:18 +02:00
Arnaud
89003bbf3a
Update changelog 2025-10-20 13:29:45 +02:00
20 changed files with 716 additions and 240 deletions

View File

@ -2,12 +2,14 @@ name: Go Tests
on: on:
push: push:
branches: master
pull_request: pull_request:
jobs: jobs:
test: test:
runs-on: ubuntu-latest runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macos-latest]
steps: steps:
- name: Checkout - name: Checkout
@ -44,5 +46,8 @@ jobs:
- name: Build codex go - name: Build codex go
run: make run: make
- name: Install gotestsum
run: go install gotest.tools/gotestsum@latest
- name: Go test - name: Go test
run: make test run: make test

5
.gitignore vendored
View File

@ -19,4 +19,7 @@ nimcache
# Test files # Test files
codex/testdata/hello.downloaded.txt codex/testdata/hello.downloaded.txt
codex/testdata/hello.downloaded.writer.txt codex/testdata/hello.downloaded.writer.txt
# Bin
codex-go

77
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,77 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Debug Current Test",
"type": "go",
"request": "launch",
"mode": "test",
"program": "${fileDirname}",
"args": ["-test.v", "-test.run", "^${selectedText}$"],
"env": {
"CGO_ENABLED": "1"
}
},
{
"name": "Debug Current Test Function",
"type": "go",
"request": "launch",
"mode": "test",
"program": "${fileDirname}",
"env": {
"CGO_ENABLED": "1"
}
},
{
"name": "Debug All Tests in Current File",
"type": "go",
"request": "launch",
"mode": "test",
"program": "${fileDirname}",
"args": ["-test.v"],
"env": {
"CGO_ENABLED": "1"
}
},
{
"name": "Debug All Tests in Current Package",
"type": "go",
"request": "launch",
"mode": "test",
"program": "${fileDirname}",
"args": ["-test.v", "-count=1"],
"env": {
"CGO_ENABLED": "1"
}
},
{
"name": "Debug Codex Tests",
"type": "go",
"request": "launch",
"mode": "test",
"program": "${workspaceFolder}/codex",
"args": ["-test.v"],
"env": {
"CGO_ENABLED": "1"
}
},
{
"name": "Debug Specific Test (e.g., TestUpload)",
"type": "go",
"request": "launch",
"mode": "test",
"program": "${workspaceFolder}/codex",
"args": [
"-test.v",
"-test.run",
"TestUpload"
],
"env": {
"CGO_ENABLED": "1"
}
}
]
}

View File

@ -1,7 +1,9 @@
{ {
"go.toolsEnvVars": { "go.toolsEnvVars": {
"CGO_ENABLED": "1",
"CGO_CFLAGS": "-I${workspaceFolder}/vendor/nim-codex/library", "CGO_CFLAGS": "-I${workspaceFolder}/vendor/nim-codex/library",
"CGO_LDFLAGS": "-L${workspaceFolder}/vendor/nim-codex/build -Wl,-rpath,${workspaceFolder}/vendor/nim-codex/build", "CGO_LDFLAGS": "-L${workspaceFolder}/vendor/nim-codex/build -lcodex -Wl,-rpath,${workspaceFolder}/vendor/nim-codex/build",
"LD_LIBRARY_PATH": "${workspaceFolder}/vendor/nim-codex/build:${env:LD_LIBRARY_PATH}" "LD_LIBRARY_PATH": "${workspaceFolder}/vendor/nim-codex/build:${env:LD_LIBRARY_PATH}"
} },
"go.testTimeout": "2m"
} }

View File

@ -1,3 +1,41 @@
## v0.0.28 (2025-11-14)
### Notes
- fix: bump nim codex to prevent datastore lock when closing the Codex client
- fix: configuration duration for block-ttl, block-mi and int for num-threads
## v0.0.27 (2025-11-11)
### Notes
- Enhance release note in
- Bump nim-codex to prototype release branch
## v0.0.26 (2025-11-03)
### Notes
- Bump `nim-codex` to prototype release branch
## v0.0.25 (2025-11-03)
### Notes
- Add `exists` to check the existence of a cid in the local store
## v0.0.24 (2025-10-30)
### Notes
- Return the standard context.Canceled when a context is cancelled
## v0.0.23 (2025-10-30)
### Notes
- Add context cancellation support
- Prevent segmentation fault when trying to stop and node not started
## v0.0.22 (2025-10-20)
### Notes
- Downgrade Go version requirement to 1.24.0
## v0.0.21 (2025-10-15) ## v0.0.21 (2025-10-15)
### Notes ### Notes

View File

@ -23,13 +23,24 @@ libcodex:
@echo "Building libcodex..." @echo "Building libcodex..."
@$(MAKE) -C $(NIM_CODEX_DIR) libcodex @$(MAKE) -C $(NIM_CODEX_DIR) libcodex
libcodex-with-debug-api:
@echo "Building libcodex..."
@$(MAKE) -C $(NIM_CODEX_DIR) libcodex CODEX_LIB_PARAMS="-d:codex_enable_api_debug_peers"
build: build:
@echo "Building Codex Go Bindings..." @echo "Building Codex Go Bindings..."
CGO_ENABLED=1 CGO_CFLAGS="$(CGO_CFLAGS)" CGO_LDFLAGS="$(CGO_LDFLAGS)" go build -o codex-go ./codex CGO_ENABLED=1 CGO_CFLAGS="$(CGO_CFLAGS)" CGO_LDFLAGS="$(CGO_LDFLAGS)" go build -o codex-go ./codex
test: test:
@echo "Running tests..." @echo "Running tests..."
CGO_ENABLED=1 CGO_CFLAGS="$(CGO_CFLAGS)" CGO_LDFLAGS="$(CGO_LDFLAGS)" go test ./... CGO_ENABLED=1 CGO_CFLAGS="$(CGO_CFLAGS)" CGO_LDFLAGS="$(CGO_LDFLAGS)" GOTESTFLAGS="-timeout=2m" gotestsum --packages="./..." -f testname -- $(if $(filter-out test,$(MAKECMDGOALS)),-run "$(filter-out test,$(MAKECMDGOALS))")
test-with-params:
@echo "Running tests..."
CGO_ENABLED=1 CGO_CFLAGS="$(CGO_CFLAGS)" CGO_LDFLAGS="$(CGO_LDFLAGS)" GOTESTFLAGS="-timeout=2m" gotestsum --packages="./..." -f testname -- $(ARGS)
%:
@:
clean: clean:
@echo "Cleaning up..." @echo "Cleaning up..."

View File

@ -78,27 +78,68 @@ Follow these steps to install and set up the module:
1. Make sure your system has the [prerequisites](https://github.com/codex-storage/nim-codex) to run a local Codex node. 1. Make sure your system has the [prerequisites](https://github.com/codex-storage/nim-codex) to run a local Codex node.
2. Fetch the dependencies: 2. Fetch the dependencies:
``` ```sh
make update make update
``` ```
3. Build the library: 3. Build the library:
``` ```sh
make libcodex make libcodex
``` ```
You can pass flags to the Codex building step by using `CODEX_LIB_PARAMS`. For example, You can pass flags to the Codex building step by using `CODEX_LIB_PARAMS`. For example,
if you want to enable debug API for peers, you can build the library using: if you want to enable debug API for peers, you can build the library using:
``` ```sh
CODEX_LIB_PARAMS="-d:codex_enable_api_debug_peers=true" make libcodex CODEX_LIB_PARAMS="-d:codex_enable_api_debug_peers=true" make libcodex
``` ```
or you can use a convenience `libcodex-with-debug-api` make target:
```sh
make libcodex-with-debug-api
```
To run the test, you have to make sure you have `gotestsum` installed on your system, e.g.:
```sh
go install gotest.tools/gotestsum@latest
```
Then you can run the tests as follows.
To run all the tests:
```sh
make test
```
To run selected test only:
```sh
make test "TestDownloadManifest$"
```
> We use `$` to make sure we run only the `TestDownloadManifest` test.
> Without `$` we would run all the tests starting with `TestDownloadManifest` and
> so also `TestDownloadManifestWithNotExistingCid`
>
If you need to pass more arguments to the underlying `go test` (`gotestsum` passes
everything after `--` to `go test`), you can use: `test-with-params` make target, e.g.:
```sh
make test-with-params ARGS='-run "TestDownloadManifest$$" -count=2'
```
> Here, we use double escape `$$` instead of just `$`, otherwise make
> will interpret `$` as a make variable inside `ARGS`.
Now the module is ready for use in your project. Now the module is ready for use in your project.
The release process is defined [here](./RELEASE.md). The release process is defined [here](./RELEASE.md).
## Usage ## API
### Init ### Init
@ -171,11 +212,10 @@ buf := bytes.NewBuffer([]byte("Hello World!"))
onProgress := func(read, total int, percent float64, err error) { onProgress := func(read, total int, percent float64, err error) {
// Do something with the data // Do something with the data
} }
cid, err := codex.UploadReader(UploadOptions{filepath: "hello.txt", onProgress: onProgress}, buf) ctx := context.Background()
cid, err := codex.UploadReader(ctx, UploadOptions{filepath: "hello.txt", onProgress: onProgress}, buf)
``` ```
Caveat: once started, the upload cannot be cancelled.
#### file #### file
The `file` strategy allows you to upload a file on Codex using the path. The `file` strategy allows you to upload a file on Codex using the path.
@ -189,11 +229,10 @@ The `UploadFile` returns the cid of the content uploaded.
onProgress := func(read, total int, percent float64, err error) { onProgress := func(read, total int, percent float64, err error) {
// Do something with the data // Do something with the data
} }
cid, err := codex.UploadFile(UploadOptions{filepath: "./testdata/hello.txt", onProgress: onProgress}) ctx := context.Background()
cid, err := codex.UploadFile(ctx, UploadOptions{filepath: "./testdata/hello.txt", onProgress: onProgress})
``` ```
Caveat: once started, the upload cannot be cancelled.
#### chunks #### chunks
The `chunks` strategy allows you to manage the upload by yourself. It requires more code The `chunks` strategy allows you to manage the upload by yourself. It requires more code
@ -246,11 +285,10 @@ opt := DownloadStreamOptions{
// Handle progress // Handle progress
}, },
} }
err := codex.DownloadStream(cid, opt) ctx := context.Background()
err := codex.DownloadStream(ctx, cid, opt)
``` ```
Caveat: once started, the download cannot be cancelled.
#### chunks #### chunks
The `chunks` strategy allows to manage the download by yourself. It requires more code The `chunks` strategy allows to manage the download by yourself. It requires more code
@ -310,4 +348,10 @@ peerId := "..."
record, err := node.CodexPeerDebug(peerId) record, err := node.CodexPeerDebug(peerId)
``` ```
`CodexPeerDebug` is only available if you built with `-d:codex_enable_api_debug_peers=true` flag. `CodexPeerDebug` is only available if you built with `-d:codex_enable_api_debug_peers=true` flag.
### Context and cancellation
Go contexts are exposed only on the long-running operations as `UploadReader`, `UploadFile`, and `DownloadFile`. If the
context is cancelled, those methods cancel the active upload or download. Short lived API calls dont take a context
because they usually finish before a cancellation signal could matter.

View File

@ -27,3 +27,70 @@ Once published, the artifacts can be downloaded using the `version`, example:
It is not recommended to use the `latest` URL because you may face cache issues. It is not recommended to use the `latest` URL because you may face cache issues.
## Integration
Once released, you can integrate it into your Go project using:
```bash
go get github.com/codex-storage/codex-go-bindings@v0.0.26
```
Then you can use the following `Makefile` command to fetch the artifact:
```bash
LIBS_DIR := $(abspath ./libs)
CODEX_OS := linux
CODEX_ARCH := amd64
CODEX_VERSION := $(shell go list -m -f '{{.Version}}' github.com/codex-storage/codex-go-bindings 2>/dev/null)
CODEX_DOWNLOAD_URL := "https://github.com/codex-storage/codex-go-bindings/releases/download/$(CODEX_VERSION)/codex-${CODEX_OS}-${CODEX_ARCH}.zip"
fetch-libcodex:
mkdir -p $(LIBS_DIR); \
curl -fSL --create-dirs -o $(LIBS_DIR)/codex-${CODEX_OS}-${CODEX_ARCH}.zip ${CODEX_DOWNLOAD_URL}; \
unzip -o -qq $(LIBS_DIR)/codex-${CODEX_OS}-${CODEX_ARCH}.zip -d $(LIBS_DIR); \
rm -f $(LIBS_DIR)/codex*.zip;
```
`CODEX_VERSION` uses the same version as the Codex Go dependency declared in your project.
### Nix
If you use Nix in a sandboxed environment, you cannot use curl to download the artifacts, so you have to prefetch them using the artifacts `SHA-256` hash. To generate the hash, you can use the following command:
```bash
nix store prefetch-file --json --unpack https://github.com/codex-storage/codex-go-bindings/releases/download/v0.0.26/codex-macos-arm64.zip | jq -r .hash
# [10.4 MiB DL] sha256-3CHIWoSjo0plsYqzXQWm1EtY1STcljV4yfXTPon90uE=
```
Then include this hash in your Nix configuration. For example:
```nix
let
optionalString = pkgs.lib.optionalString;
codexVersion = "v0.0.26";
arch =
if stdenv.hostPlatform.isx86_64 then "amd64"
else if stdenv.hostPlatform.isAarch64 then "arm64"
else stdenv.hostPlatform.arch;
os = if stdenv.isDarwin then "macos" else "Linux";
hash =
if stdenv.hostPlatform.isDarwin
# nix store prefetch-file --json --unpack https://github.com/codex-storage/codex-go-bindings/releases/download/v0.0.26/codex-macos-arm64.zip | jq -r .hash
then "sha256-3CHIWoSjo0plsYqzXQWm1EtY1STcljV4yfXTPon90uE="
# nix store prefetch-file --json --unpack https://github.com/codex-storage/codex-go-bindings/releases/download/v0.0.26/codex-Linux-amd64.zip | jq -r .hash
else "sha256-YxW2vFZlcLrOx1PYgWW4MIstH/oFBRF0ooS0sl3v6ig=";
# Pre-fetch libcodex to avoid network during build
codexLib = pkgs.fetchzip {
url = "https://github.com/codex-storage/codex-go-bindings/releases/download/${codexVersion}/codex-${os}-${arch}.zip";
hash = hash;
stripRoot = false;
};
preBuild = ''
export LIBS_DIR="${codexLib}"
# Build something cool with Codex
'';
```

View File

@ -92,7 +92,7 @@ const (
type Config struct { type Config struct {
// Default: INFO // Default: INFO
LogLevel LogLevel `json:"log-level,omitempty"` LogLevel string `json:"log-level,omitempty"`
// Specifies what kind of logs should be written to stdout // Specifies what kind of logs should be written to stdout
// Default: auto // Default: auto
@ -159,12 +159,12 @@ type Config struct {
// Default block timeout in seconds - 0 disables the ttl // Default block timeout in seconds - 0 disables the ttl
// Default: 30 days // Default: 30 days
BlockTtl int `json:"block-ttl,omitempty"` BlockTtl string `json:"block-ttl,omitempty"`
// Time interval in seconds - determines frequency of block // Time interval in seconds - determines frequency of block
// maintenance cycle: how often blocks are checked for expiration and cleanup // maintenance cycle: how often blocks are checked for expiration and cleanup
// Default: 10 minutes // Default: 10 minutes
BlockMaintenanceInterval int `json:"block-mi,omitempty"` BlockMaintenanceInterval string `json:"block-mi,omitempty"`
// Number of blocks to check every maintenance cycle // Number of blocks to check every maintenance cycle
// Default: 1000 // Default: 1000
@ -280,8 +280,12 @@ func (node CodexNode) Destroy() error {
return bridge.callError("cGoCodexDestroy") return bridge.callError("cGoCodexDestroy")
} }
_, err = bridge.wait() // We don't wait for the bridge here.
return err // The destroy function does not call the worker thread,
// it destroys the context directly and return the return
// value synchronously.
return nil
} }
// Version returns the version of the Codex node. // Version returns the version of the Codex node.

View File

@ -1,9 +1,15 @@
package codex package codex
import "testing" import (
"testing"
)
func TestCodexVersion(t *testing.T) { func TestCodexVersion(t *testing.T) {
node := newCodexNode(t, withNoStart()) config := defaultConfigHelper(t)
node, err := New(config)
if err != nil {
t.Fatalf("Failed to create Codex node: %v", err)
}
version, err := node.Version() version, err := node.Version()
if err != nil { if err != nil {
@ -17,7 +23,11 @@ func TestCodexVersion(t *testing.T) {
} }
func TestCodexRevision(t *testing.T) { func TestCodexRevision(t *testing.T) {
node := newCodexNode(t, withNoStart()) config := defaultConfigHelper(t)
node, err := New(config)
if err != nil {
t.Fatalf("Failed to create Codex node: %v", err)
}
revision, err := node.Revision() revision, err := node.Revision()
if err != nil { if err != nil {
@ -71,3 +81,74 @@ func TestPeerId(t *testing.T) {
t.Logf("Codex PeerId: %s", peerId) t.Logf("Codex PeerId: %s", peerId)
} }
func TestStorageQuota(t *testing.T) {
node := newCodexNode(t, Config{
StorageQuota: 1024 * 1024 * 1024, // 1GB
})
if node == nil {
t.Fatal("expected codex node to be created")
}
}
func TestCreateAndDestroyMultipleInstancesWithSameDatadir(t *testing.T) {
datadir := t.TempDir()
config := Config{
DataDir: datadir,
LogFormat: LogFormatNoColors,
MetricsEnabled: false,
BlockRetries: 5,
Nat: "none",
}
for range 2 {
node, err := New(config)
if err != nil {
t.Fatalf("Failed to create Codex node: %v", err)
}
if err := node.Start(); err != nil {
t.Fatalf("Failed to start Codex node: %v", err)
}
if err := node.Stop(); err != nil {
t.Fatalf("Failed to stop Codex node: %v", err)
}
if err := node.Destroy(); err != nil {
t.Fatalf("Failed to stop Codex node after restart: %v", err)
}
}
}
func TestNumThreads(t *testing.T) {
node := newCodexNode(t, Config{
NumThreads: 1,
})
if node == nil {
t.Fatal("expected codex node to be created")
}
}
func TestBlockTtl(t *testing.T) {
node := newCodexNode(t, Config{
BlockTtl: "10H",
})
if node == nil {
t.Fatal("expected codex node to be created")
}
}
func TestBlockMaintenanceInterval(t *testing.T) {
node := newCodexNode(t, Config{
BlockMaintenanceInterval: "10H",
})
if node == nil {
t.Fatal("expected codex node to be created")
}
}

View File

@ -32,38 +32,19 @@ func TestUpdateLogLevel(t *testing.T) {
} }
defer os.Remove(tmpFile.Name()) defer os.Remove(tmpFile.Name())
node, err := New(Config{ node := newCodexNode(t, Config{
LogFile: tmpFile.Name(), LogLevel: "INFO",
MetricsEnabled: false, LogFile: tmpFile.Name(),
LogFormat: LogFormatNoColors,
}) })
if err != nil {
t.Fatalf("Failed to create Codex node: %v", err)
}
t.Cleanup(func() {
if err := node.Stop(); err != nil {
t.Logf("cleanup codex: %v", err)
}
if err := node.Destroy(); err != nil {
t.Logf("cleanup codex: %v", err)
}
})
if err := node.Start(); err != nil {
t.Fatalf("Failed to start Codex node: %v", err)
}
content, err := os.ReadFile(tmpFile.Name()) content, err := os.ReadFile(tmpFile.Name())
if err != nil { if err != nil {
t.Fatalf("Failed to read log file: %v", err) t.Fatalf("Failed to read log file: %v", err)
} }
if !strings.Contains(string(content), "Started codex node") { if !strings.Contains(string(content), "INF") {
t.Errorf("Log file does not contain 'Started codex node' %s", string(content)) t.Errorf("Log file does not contain INFO statement %s", string(content))
}
if err := node.Stop(); err != nil {
t.Fatalf("Failed to stop Codex node: %v", err)
} }
err = node.UpdateLogLevel("ERROR") err = node.UpdateLogLevel("ERROR")
@ -71,6 +52,11 @@ func TestUpdateLogLevel(t *testing.T) {
t.Fatalf("UpdateLogLevel call failed: %v", err) t.Fatalf("UpdateLogLevel call failed: %v", err)
} }
if err := node.Stop(); err != nil {
t.Fatalf("Failed to stop Codex node: %v", err)
}
// Clear the file
if err := os.WriteFile(tmpFile.Name(), []byte{}, 0644); err != nil { if err := os.WriteFile(tmpFile.Name(), []byte{}, 0644); err != nil {
t.Fatalf("Failed to clear log file: %v", err) t.Fatalf("Failed to clear log file: %v", err)
} }
@ -85,8 +71,8 @@ func TestUpdateLogLevel(t *testing.T) {
t.Fatalf("Failed to read log file: %v", err) t.Fatalf("Failed to read log file: %v", err)
} }
if strings.Contains(string(content), "Starting discovery node") { if strings.Contains(string(content), "INF") {
t.Errorf("Log file contains 'Starting discovery node'") t.Errorf("Log file contains INFO statement after log level update: %s", string(content))
} }
} }
@ -94,50 +80,10 @@ func TestCodexPeerDebug(t *testing.T) {
var bootstrap, node1, node2 *CodexNode var bootstrap, node1, node2 *CodexNode
var err error var err error
t.Cleanup(func() { bootstrap = newCodexNode(t, Config{
if bootstrap != nil { DiscoveryPort: 8092,
if err := bootstrap.Stop(); err != nil {
t.Logf("cleanup bootstrap: %v", err)
}
if err := bootstrap.Destroy(); err != nil {
t.Logf("cleanup bootstrap: %v", err)
}
}
if node1 != nil {
if err := node1.Stop(); err != nil {
t.Logf("cleanup node1: %v", err)
}
if err := node1.Destroy(); err != nil {
t.Logf("cleanup node1: %v", err)
}
}
if node2 != nil {
if err := node2.Stop(); err != nil {
t.Logf("cleanup node2: %v", err)
}
if err := node2.Destroy(); err != nil {
t.Logf("cleanup node2: %v", err)
}
}
}) })
bootstrap, err = New(Config{
DataDir: t.TempDir(),
LogFormat: LogFormatNoColors,
MetricsEnabled: false,
DiscoveryPort: 8092,
})
if err != nil {
t.Fatalf("Failed to create bootstrap: %v", err)
}
if err := bootstrap.Start(); err != nil {
t.Fatalf("Failed to start bootstrap: %v", err)
}
spr, err := bootstrap.Spr() spr, err := bootstrap.Spr()
if err != nil { if err != nil {
t.Fatalf("Failed to get bootstrap spr: %v", err) t.Fatalf("Failed to get bootstrap spr: %v", err)
@ -145,35 +91,15 @@ func TestCodexPeerDebug(t *testing.T) {
bootstrapNodes := []string{spr} bootstrapNodes := []string{spr}
node1, err = New(Config{ node1 = newCodexNode(t, Config{
DataDir: t.TempDir(),
LogFormat: LogFormatNoColors,
MetricsEnabled: false,
DiscoveryPort: 8090, DiscoveryPort: 8090,
BootstrapNodes: bootstrapNodes, BootstrapNodes: bootstrapNodes,
}) })
if err != nil {
t.Fatalf("Failed to create codex: %v", err)
}
if err := node1.Start(); err != nil { node2 = newCodexNode(t, Config{
t.Fatalf("Failed to start codex: %v", err)
}
node2, err = New(Config{
DataDir: t.TempDir(),
LogFormat: LogFormatNoColors,
MetricsEnabled: false,
DiscoveryPort: 8091, DiscoveryPort: 8091,
BootstrapNodes: bootstrapNodes, BootstrapNodes: bootstrapNodes,
}) })
if err != nil {
t.Fatalf("Failed to create codex2: %v", err)
}
if err := node2.Start(); err != nil {
t.Fatalf("Failed to start codex2: %v", err)
}
peerId, err := node2.PeerId() peerId, err := node2.PeerId()
if err != nil { if err != nil {
@ -186,9 +112,14 @@ func TestCodexPeerDebug(t *testing.T) {
if err == nil { if err == nil {
break break
} }
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} }
if err != nil {
t.Fatalf("CodexPeerDebug call failed: %v", err)
}
if record.PeerId == "" { if record.PeerId == "" {
t.Fatalf("CodexPeerDebug call failed: %v", err) t.Fatalf("CodexPeerDebug call failed: %v", err)
} }

View File

@ -26,8 +26,11 @@ package codex
*/ */
import "C" import "C"
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt"
"io" "io"
"sync/atomic"
"unsafe" "unsafe"
) )
@ -145,7 +148,7 @@ func (node CodexNode) DownloadManifest(cid string) (Manifest, error) {
// If options.writer is set, the data will be written into that writer. // If options.writer is set, the data will be written into that writer.
// The options filepath and writer are not mutually exclusive, i.e you can write // The options filepath and writer are not mutually exclusive, i.e you can write
// in different places in a same call. // in different places in a same call.
func (node CodexNode) DownloadStream(cid string, options DownloadStreamOptions) error { func (node CodexNode) DownloadStream(ctx context.Context, cid string, options DownloadStreamOptions) error {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free() defer bridge.free()
@ -189,6 +192,16 @@ func (node CodexNode) DownloadStream(cid string, options DownloadStreamOptions)
var cCid = C.CString(cid) var cCid = C.CString(cid)
defer C.free(unsafe.Pointer(cCid)) defer C.free(unsafe.Pointer(cCid))
err := node.DownloadInit(cid, DownloadInitOptions{
ChunkSize: options.ChunkSize,
Local: options.Local,
})
if err != nil {
return err
}
defer node.DownloadCancel(cid)
var cFilepath = C.CString(options.Filepath) var cFilepath = C.CString(options.Filepath)
defer C.free(unsafe.Pointer(cFilepath)) defer C.free(unsafe.Pointer(cFilepath))
@ -198,8 +211,45 @@ func (node CodexNode) DownloadStream(cid string, options DownloadStreamOptions)
return bridge.callError("cGoCodexDownloadLocal") return bridge.callError("cGoCodexDownloadLocal")
} }
_, err := bridge.wait() // Create a done channel to signal the goroutine to stop
return err // when the download is complete and avoid goroutine leaks.
done := make(chan struct{})
defer close(done)
channelError := make(chan error, 1)
var cancelled atomic.Bool
go func() {
select {
case <-ctx.Done():
channelError <- node.DownloadCancel(cid)
cancelled.Store(true)
case <-done:
// Nothing to do, download finished
}
}()
_, err = bridge.wait()
// Extract the potential cancellation error
var cancelError error
select {
case cancelError = <-channelError:
default:
}
if err != nil {
if cancelError != nil {
return fmt.Errorf("context canceled: %v, but failed to cancel download session: %v", ctx.Err(), cancelError)
}
if cancelled.Load() {
return context.Canceled
}
return err
}
return cancelError
} }
// DownloadInit initializes the download process for a specific CID. // DownloadInit initializes the download process for a specific CID.

View File

@ -1,6 +1,7 @@
package codex package codex
import ( import (
"context"
"os" "os"
"strings" "strings"
"testing" "testing"
@ -32,7 +33,7 @@ func TestDownloadStream(t *testing.T) {
}, },
} }
if err := codex.DownloadStream(cid, opt); err != nil { if err := codex.DownloadStream(context.Background(), cid, opt); err != nil {
t.Fatal("Error happened:", err.Error()) t.Fatal("Error happened:", err.Error())
} }
@ -72,7 +73,7 @@ func TestDownloadStreamWithAutosize(t *testing.T) {
}, },
} }
if err := codex.DownloadStream(cid, opt); err != nil { if err := codex.DownloadStream(context.Background(), cid, opt); err != nil {
t.Fatal("Error happened:", err.Error()) t.Fatal("Error happened:", err.Error())
} }
@ -86,14 +87,38 @@ func TestDownloadStreamWithAutosize(t *testing.T) {
} }
func TestDownloadStreamWithNotExisting(t *testing.T) { func TestDownloadStreamWithNotExisting(t *testing.T) {
codex := newCodexNode(t, withBlockRetries(1)) codex := newCodexNode(t, Config{BlockRetries: 1})
opt := DownloadStreamOptions{} opt := DownloadStreamOptions{}
if err := codex.DownloadStream("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku", opt); err == nil { if err := codex.DownloadStream(context.Background(), "bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku", opt); err == nil {
t.Fatal("Error expected when downloading non-existing cid") t.Fatal("Error expected when downloading non-existing cid")
} }
} }
func TestDownloadStreamCancelled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
codex := newCodexNode(t)
cid, _ := uploadBigFileHelper(t, codex)
channelError := make(chan error, 1)
go func() {
err := codex.DownloadStream(ctx, cid, DownloadStreamOptions{Local: true})
channelError <- err
}()
cancel()
err := <-channelError
if err == nil {
t.Fatal("DownloadStream should have been canceled")
}
if err.Error() != context.Canceled.Error() {
t.Fatalf("DownloadStream returned unexpected error: %v", err)
}
}
func TestDownloadManual(t *testing.T) { func TestDownloadManual(t *testing.T) {
codex := newCodexNode(t) codex := newCodexNode(t)
cid, _ := uploadHelper(t, codex) cid, _ := uploadHelper(t, codex)
@ -134,7 +159,7 @@ func TestDownloadManifest(t *testing.T) {
} }
func TestDownloadManifestWithNotExistingCid(t *testing.T) { func TestDownloadManifestWithNotExistingCid(t *testing.T) {
codex := newCodexNode(t, withBlockRetries(1)) codex := newCodexNode(t, Config{BlockRetries: 1})
manifest, err := codex.DownloadManifest("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku") manifest, err := codex.DownloadManifest("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku")
if err == nil { if err == nil {
@ -147,7 +172,7 @@ func TestDownloadManifestWithNotExistingCid(t *testing.T) {
} }
func TestDownloadInitWithNotExistingCid(t *testing.T) { func TestDownloadInitWithNotExistingCid(t *testing.T) {
codex := newCodexNode(t, withBlockRetries(1)) codex := newCodexNode(t, Config{BlockRetries: 1})
if err := codex.DownloadInit("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku", DownloadInitOptions{}); err == nil { if err := codex.DownloadInit("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku", DownloadInitOptions{}); err == nil {
t.Fatal("expected error when initializing download for non-existent cid") t.Fatal("expected error when initializing download for non-existent cid")

View File

@ -36,6 +36,7 @@ func TestConnectWithAddress(t *testing.T) {
LogFormat: LogFormatNoColors, LogFormat: LogFormatNoColors,
MetricsEnabled: false, MetricsEnabled: false,
DiscoveryPort: 8090, DiscoveryPort: 8090,
Nat: "none",
}) })
if err != nil { if err != nil {
t.Fatalf("Failed to create codex1: %v", err) t.Fatalf("Failed to create codex1: %v", err)
@ -73,50 +74,10 @@ func TestCodexWithPeerId(t *testing.T) {
var bootstrap, node1, node2 *CodexNode var bootstrap, node1, node2 *CodexNode
var err error var err error
t.Cleanup(func() { bootstrap = newCodexNode(t, Config{
if bootstrap != nil { DiscoveryPort: 8092,
if err := bootstrap.Stop(); err != nil {
t.Logf("cleanup bootstrap: %v", err)
}
if err := bootstrap.Destroy(); err != nil {
t.Logf("cleanup bootstrap: %v", err)
}
}
if node1 != nil {
if err := node1.Stop(); err != nil {
t.Logf("cleanup node1: %v", err)
}
if err := node1.Destroy(); err != nil {
t.Logf("cleanup node1: %v", err)
}
}
if node2 != nil {
if err := node2.Stop(); err != nil {
t.Logf("cleanup node2: %v", err)
}
if err := node2.Destroy(); err != nil {
t.Logf("cleanup node2: %v", err)
}
}
}) })
bootstrap, err = New(Config{
DataDir: t.TempDir(),
LogFormat: LogFormatNoColors,
MetricsEnabled: false,
DiscoveryPort: 8092,
})
if err != nil {
t.Fatalf("Failed to create bootstrap: %v", err)
}
if err := bootstrap.Start(); err != nil {
t.Fatalf("Failed to start bootstrap: %v", err)
}
spr, err := bootstrap.Spr() spr, err := bootstrap.Spr()
if err != nil { if err != nil {
t.Fatalf("Failed to get bootstrap spr: %v", err) t.Fatalf("Failed to get bootstrap spr: %v", err)
@ -124,35 +85,15 @@ func TestCodexWithPeerId(t *testing.T) {
bootstrapNodes := []string{spr} bootstrapNodes := []string{spr}
node1, err = New(Config{ node1 = newCodexNode(t, Config{
DataDir: t.TempDir(),
LogFormat: LogFormatNoColors,
MetricsEnabled: false,
DiscoveryPort: 8090, DiscoveryPort: 8090,
BootstrapNodes: bootstrapNodes, BootstrapNodes: bootstrapNodes,
}) })
if err != nil {
t.Fatalf("Failed to create codex: %v", err)
}
if err := node1.Start(); err != nil { node2 = newCodexNode(t, Config{
t.Fatalf("Failed to start codex: %v", err)
}
node2, err = New(Config{
DataDir: t.TempDir(),
LogFormat: LogFormatNoColors,
MetricsEnabled: false,
DiscoveryPort: 8091, DiscoveryPort: 8091,
BootstrapNodes: bootstrapNodes, BootstrapNodes: bootstrapNodes,
}) })
if err != nil {
t.Fatalf("Failed to create codex2: %v", err)
}
if err := node2.Start(); err != nil {
t.Fatalf("Failed to start codex2: %v", err)
}
peerId, err := node2.PeerId() peerId, err := node2.PeerId()
if err != nil { if err != nil {

View File

@ -24,6 +24,10 @@ import (
static int cGoCodexStorageDelete(void* codexCtx, char* cid, void* resp) { static int cGoCodexStorageDelete(void* codexCtx, char* cid, void* resp) {
return codex_storage_delete(codexCtx, cid, (CodexCallback) callback, resp); return codex_storage_delete(codexCtx, cid, (CodexCallback) callback, resp);
} }
static int cGoCodexStorageExists(void* codexCtx, char* cid, void* resp) {
return codex_storage_exists(codexCtx, cid, (CodexCallback) callback, resp);
}
*/ */
import "C" import "C"
@ -142,3 +146,19 @@ func (node CodexNode) Delete(cid string) error {
_, err := bridge.wait() _, err := bridge.wait()
return err return err
} }
// Exists checks if a given cid exists in the local storage.
func (node CodexNode) Exists(cid string) (bool, error) {
bridge := newBridgeCtx()
defer bridge.free()
var cCid = C.CString(cid)
defer C.free(unsafe.Pointer(cCid))
if C.cGoCodexStorageExists(node.ctx, cCid, bridge.resp) != C.RET_OK {
return false, bridge.callError("cGoCodexStorageExists")
}
result, err := bridge.wait()
return result == "true", err
}

View File

@ -84,7 +84,7 @@ func TestFetch(t *testing.T) {
} }
func TestFetchCidDoesNotExist(t *testing.T) { func TestFetchCidDoesNotExist(t *testing.T) {
codex := newCodexNode(t, withBlockRetries(1)) codex := newCodexNode(t, Config{BlockRetries: 1})
_, err := codex.Fetch("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku") _, err := codex.Fetch("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku")
if err == nil { if err == nil {
@ -119,3 +119,30 @@ func TestDelete(t *testing.T) {
t.Fatal("expected manifests to be empty after deletion") t.Fatal("expected manifests to be empty after deletion")
} }
} }
func TestExists(t *testing.T) {
codex := newCodexNode(t)
cid, _ := uploadHelper(t, codex)
exists, err := codex.Exists(cid)
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatal("expected cid to exist")
}
err = codex.Delete(cid)
if err != nil {
t.Fatal(err)
}
exists, err = codex.Exists(cid)
if err != nil {
t.Fatal(err)
}
if exists {
t.Fatal("expected cid to not exist after deletion")
}
}

View File

@ -2,54 +2,78 @@ package codex
import ( import (
"bytes" "bytes"
"context"
"testing" "testing"
) )
type codexNodeTestOption func(*codexNodeTestOptions) func defaultConfigHelper(t *testing.T) Config {
t.Helper()
type codexNodeTestOptions struct { return Config{
noStart bool
blockRetries int
}
func withNoStart() codexNodeTestOption {
return func(o *codexNodeTestOptions) { o.noStart = true }
}
func withBlockRetries(n int) codexNodeTestOption {
return func(o *codexNodeTestOptions) { o.blockRetries = n }
}
func newCodexNode(t *testing.T, opts ...codexNodeTestOption) *CodexNode {
o := codexNodeTestOptions{
blockRetries: 3000,
}
for _, opt := range opts {
opt(&o)
}
node, err := New(Config{
DataDir: t.TempDir(), DataDir: t.TempDir(),
LogFormat: LogFormatNoColors, LogFormat: LogFormatNoColors,
MetricsEnabled: false, MetricsEnabled: false,
BlockRetries: o.blockRetries, BlockRetries: 3000,
}) Nat: "none",
}
}
func newCodexNode(t *testing.T, opts ...Config) *CodexNode {
config := defaultConfigHelper(t)
if len(opts) > 0 {
c := opts[0]
if c.BlockRetries > 0 {
config.BlockRetries = c.BlockRetries
}
if c.LogLevel != "" {
config.LogLevel = c.LogLevel
}
if c.LogFile != "" {
config.LogFile = c.LogFile
}
if len(c.BootstrapNodes) != 0 {
config.BootstrapNodes = c.BootstrapNodes
}
if c.DiscoveryPort != 0 {
config.DiscoveryPort = c.DiscoveryPort
}
if c.StorageQuota != 0 {
config.StorageQuota = c.StorageQuota
}
if c.NumThreads != 0 {
config.NumThreads = c.NumThreads
}
if c.BlockTtl != "" {
config.BlockTtl = c.BlockTtl
}
if c.BlockMaintenanceInterval != "" {
config.BlockMaintenanceInterval = c.BlockMaintenanceInterval
}
}
node, err := New(config)
if err != nil { if err != nil {
t.Fatalf("Failed to create Codex node: %v", err) t.Fatalf("Failed to create Codex node: %v", err)
} }
if !o.noStart { err = node.Start()
err = node.Start() if err != nil {
if err != nil { t.Fatalf("Failed to start Codex node: %v", err)
t.Fatalf("Failed to start Codex node: %v", err)
}
} }
t.Cleanup(func() { t.Cleanup(func() {
if !o.noStart { if err := node.Stop(); err != nil {
if err := node.Stop(); err != nil { t.Logf("cleanup codex: %v", err)
t.Logf("cleanup codex: %v", err)
}
} }
if err := node.Destroy(); err != nil { if err := node.Destroy(); err != nil {
@ -65,7 +89,21 @@ func uploadHelper(t *testing.T, codex *CodexNode) (string, int) {
buf := bytes.NewBuffer([]byte("Hello World!")) buf := bytes.NewBuffer([]byte("Hello World!"))
len := buf.Len() len := buf.Len()
cid, err := codex.UploadReader(UploadOptions{Filepath: "hello.txt"}, buf) cid, err := codex.UploadReader(context.Background(), UploadOptions{Filepath: "hello.txt"}, buf)
if err != nil {
t.Fatalf("Error happened during upload: %v\n", err)
}
return cid, len
}
func uploadBigFileHelper(t *testing.T, codex *CodexNode) (string, int) {
t.Helper()
len := 1024 * 1024 * 50
buf := bytes.NewBuffer(make([]byte, len))
cid, err := codex.UploadReader(context.Background(), UploadOptions{Filepath: "hello.txt"}, buf)
if err != nil { if err != nil {
t.Fatalf("Error happened during upload: %v\n", err) t.Fatalf("Error happened during upload: %v\n", err)
} }

View File

@ -27,9 +27,11 @@ package codex
import "C" import "C"
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"io" "io"
"os" "os"
"sync/atomic"
"unsafe" "unsafe"
) )
@ -164,11 +166,12 @@ func (node CodexNode) UploadCancel(sessionId string) error {
// - UploadChunk to upload a chunk to codex. // - UploadChunk to upload a chunk to codex.
// - UploadFinalize to finalize the upload session. // - UploadFinalize to finalize the upload session.
// - UploadCancel if an error occurs. // - UploadCancel if an error occurs.
func (node CodexNode) UploadReader(options UploadOptions, r io.Reader) (string, error) { func (node CodexNode) UploadReader(ctx context.Context, options UploadOptions, r io.Reader) (string, error) {
sessionId, err := node.UploadInit(&options) sessionId, err := node.UploadInit(&options)
if err != nil { if err != nil {
return "", err return "", err
} }
defer node.UploadCancel(sessionId)
buf := make([]byte, options.ChunkSize.valOrDefault()) buf := make([]byte, options.ChunkSize.valOrDefault())
total := 0 total := 0
@ -179,6 +182,16 @@ func (node CodexNode) UploadReader(options UploadOptions, r io.Reader) (string,
} }
for { for {
select {
case <-ctx.Done():
if cancelErr := node.UploadCancel(sessionId); cancelErr != nil {
return "", fmt.Errorf("upload canceled: %v, but failed to cancel upload session: %v", ctx.Err(), cancelErr)
}
return "", context.Canceled
default:
// continue
}
n, err := r.Read(buf) n, err := r.Read(buf)
if err == io.EOF { if err == io.EOF {
break break
@ -222,9 +235,9 @@ func (node CodexNode) UploadReader(options UploadOptions, r io.Reader) (string,
} }
// UploadReaderAsync is the asynchronous version of UploadReader using a goroutine. // UploadReaderAsync is the asynchronous version of UploadReader using a goroutine.
func (node CodexNode) UploadReaderAsync(options UploadOptions, r io.Reader, onDone func(cid string, err error)) { func (node CodexNode) UploadReaderAsync(ctx context.Context, options UploadOptions, r io.Reader, onDone func(cid string, err error)) {
go func() { go func() {
cid, err := node.UploadReader(options, r) cid, err := node.UploadReader(ctx, options, r)
onDone(cid, err) onDone(cid, err)
}() }()
} }
@ -249,7 +262,7 @@ func (node CodexNode) UploadReaderAsync(options UploadOptions, r io.Reader, onDo
// is sent to the stream. // is sent to the stream.
// //
// Internally, it calls UploadInit to create the upload session. // Internally, it calls UploadInit to create the upload session.
func (node CodexNode) UploadFile(options UploadOptions) (string, error) { func (node CodexNode) UploadFile(ctx context.Context, options UploadOptions) (string, error) {
bridge := newBridgeCtx() bridge := newBridgeCtx()
defer bridge.free() defer bridge.free()
@ -285,6 +298,7 @@ func (node CodexNode) UploadFile(options UploadOptions) (string, error) {
if err != nil { if err != nil {
return "", err return "", err
} }
defer node.UploadCancel(sessionId)
var cSessionId = C.CString(sessionId) var cSessionId = C.CString(sessionId)
defer C.free(unsafe.Pointer(cSessionId)) defer C.free(unsafe.Pointer(cSessionId))
@ -293,13 +307,51 @@ func (node CodexNode) UploadFile(options UploadOptions) (string, error) {
return "", bridge.callError("cGoCodexUploadFile") return "", bridge.callError("cGoCodexUploadFile")
} }
return bridge.wait() // Create a done channel to signal the goroutine to stop
// when the download is complete and avoid goroutine leaks.
done := make(chan struct{})
defer close(done)
channelError := make(chan error, 1)
var cancelled atomic.Bool
go func() {
select {
case <-ctx.Done():
channelError <- node.UploadCancel(sessionId)
cancelled.Store(true)
case <-done:
// Nothing to do, upload finished
}
}()
_, err = bridge.wait()
// Extract the potential cancellation error
var cancelErr error
select {
case cancelErr = <-channelError:
default:
}
if err != nil {
if cancelErr != nil {
return "", fmt.Errorf("context canceled: %v, but failed to cancel upload session: %v", ctx.Err(), cancelErr)
}
if cancelled.Load() {
return "", context.Canceled
}
return "", err
}
return bridge.result, cancelErr
} }
// UploadFileAsync is the asynchronous version of UploadFile using a goroutine. // UploadFileAsync is the asynchronous version of UploadFile using a goroutine.
func (node CodexNode) UploadFileAsync(options UploadOptions, onDone func(cid string, err error)) { func (node CodexNode) UploadFileAsync(ctx context.Context, options UploadOptions, onDone func(cid string, err error)) {
go func() { go func() {
cid, err := node.UploadFile(options) cid, err := node.UploadFile(ctx, options)
onDone(cid, err) onDone(cid, err)
}() }()
} }

View File

@ -2,6 +2,7 @@ package codex
import ( import (
"bytes" "bytes"
"context"
"log" "log"
"os" "os"
"testing" "testing"
@ -16,7 +17,7 @@ func TestUploadReader(t *testing.T) {
buf := bytes.NewBuffer([]byte("Hello World!")) buf := bytes.NewBuffer([]byte("Hello World!"))
len := buf.Len() len := buf.Len()
cid, err := codex.UploadReader(UploadOptions{Filepath: "hello.txt", OnProgress: func(read, total int, percent float64, err error) { cid, err := codex.UploadReader(context.Background(), UploadOptions{Filepath: "hello.txt", OnProgress: func(read, total int, percent float64, err error) {
if err != nil { if err != nil {
log.Fatalf("Error happened during upload: %v\n", err) log.Fatalf("Error happened during upload: %v\n", err)
} }
@ -42,6 +43,30 @@ func TestUploadReader(t *testing.T) {
} }
} }
func TestUploadReaderCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
codex := newCodexNode(t)
buf := bytes.NewBuffer(make([]byte, 1024*1024*10))
channelErr := make(chan error, 1)
go func() {
_, e := codex.UploadReader(ctx, UploadOptions{Filepath: "hello.txt"}, buf)
channelErr <- e
}()
cancel()
err := <-channelErr
if err == nil {
t.Fatal("UploadReader should have been canceled")
}
if err.Error() != context.Canceled.Error() {
t.Fatalf("UploadReader returned unexpected error: %v expected %v", err, context.Canceled)
}
}
func TestUploadFile(t *testing.T) { func TestUploadFile(t *testing.T) {
codex := newCodexNode(t) codex := newCodexNode(t)
totalBytes := 0 totalBytes := 0
@ -61,7 +86,7 @@ func TestUploadFile(t *testing.T) {
finalPercent = percent finalPercent = percent
}} }}
cid, err := codex.UploadFile(options) cid, err := codex.UploadFile(context.Background(), options)
if err != nil { if err != nil {
t.Fatalf("UploadReader failed: %v", err) t.Fatalf("UploadReader failed: %v", err)
} }
@ -79,12 +104,47 @@ func TestUploadFile(t *testing.T) {
} }
} }
func TestUploadFileCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
tmpFile, err := os.Create(os.TempDir() + "/large_file.txt")
if err != nil {
t.Fatalf("Failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
largeContent := make([]byte, 1024*1024*50)
if _, err := tmpFile.Write(largeContent); err != nil {
t.Fatalf("Failed to write to temp file: %v", err)
}
tmpFile.Close()
codex := newCodexNode(t)
channelError := make(chan error, 1)
go func() {
_, err := codex.UploadFile(ctx, UploadOptions{Filepath: tmpFile.Name()})
channelError <- err
}()
cancel()
err = <-channelError
if err == nil {
t.Fatal("UploadFile should have been canceled")
}
if err.Error() != context.Canceled.Error() {
t.Fatalf("UploadFile returned unexpected error: %v", err)
}
}
func TestUploadFileNoProgress(t *testing.T) { func TestUploadFileNoProgress(t *testing.T) {
codex := newCodexNode(t) codex := newCodexNode(t)
options := UploadOptions{Filepath: "./testdata/doesnt_exist.txt"} options := UploadOptions{Filepath: "./testdata/doesnt_exist.txt"}
cid, err := codex.UploadFile(options) cid, err := codex.UploadFile(context.Background(), options)
if err == nil { if err == nil {
t.Fatalf("UploadReader should have failed") t.Fatalf("UploadReader should have failed")
} }

2
vendor/nim-codex vendored

@ -1 +1 @@
Subproject commit 1105b81cc1b202006ca5a16485b3cfc5331468d5 Subproject commit 2b9fc1eb554e5eee43b8a815084fb8c61687ada9