mirror of
https://github.com/logos-storage/logos-storage-go-bindings.git
synced 2026-01-08 00:13:05 +00:00
Add support for context and cancellation
This commit is contained in:
parent
ca4993e4b0
commit
43e6e5e81f
@ -26,7 +26,9 @@ package codex
|
||||
*/
|
||||
import "C"
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"unsafe"
|
||||
)
|
||||
@ -145,7 +147,7 @@ func (node CodexNode) DownloadManifest(cid string) (Manifest, error) {
|
||||
// If options.writer is set, the data will be written into that writer.
|
||||
// The options filepath and writer are not mutually exclusive, i.e you can write
|
||||
// in different places in a same call.
|
||||
func (node CodexNode) DownloadStream(cid string, options DownloadStreamOptions) error {
|
||||
func (node CodexNode) DownloadStream(ctx context.Context, cid string, options DownloadStreamOptions) error {
|
||||
bridge := newBridgeCtx()
|
||||
defer bridge.free()
|
||||
|
||||
@ -189,6 +191,14 @@ func (node CodexNode) DownloadStream(cid string, options DownloadStreamOptions)
|
||||
var cCid = C.CString(cid)
|
||||
defer C.free(unsafe.Pointer(cCid))
|
||||
|
||||
err := node.DownloadInit(cid, DownloadInitOptions{
|
||||
ChunkSize: options.ChunkSize,
|
||||
Local: options.Local,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var cFilepath = C.CString(options.Filepath)
|
||||
defer C.free(unsafe.Pointer(cFilepath))
|
||||
|
||||
@ -198,8 +208,24 @@ func (node CodexNode) DownloadStream(cid string, options DownloadStreamOptions)
|
||||
return bridge.callError("cGoCodexDownloadLocal")
|
||||
}
|
||||
|
||||
_, err := bridge.wait()
|
||||
return err
|
||||
var cancelErr error
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
cancelErr = node.DownloadCancel(cid)
|
||||
default:
|
||||
// continue
|
||||
}
|
||||
|
||||
_, err = bridge.wait()
|
||||
|
||||
if err != nil {
|
||||
if cancelErr != nil {
|
||||
return fmt.Errorf("upload canceled: %v, but failed to cancel upload session: %v", ctx.Err(), cancelErr)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// DownloadInit initializes the download process for a specific CID.
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package codex
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
@ -32,7 +33,7 @@ func TestDownloadStream(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
if err := codex.DownloadStream(cid, opt); err != nil {
|
||||
if err := codex.DownloadStream(context.Background(), cid, opt); err != nil {
|
||||
t.Fatal("Error happened:", err.Error())
|
||||
}
|
||||
|
||||
@ -72,7 +73,7 @@ func TestDownloadStreamWithAutosize(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
if err := codex.DownloadStream(cid, opt); err != nil {
|
||||
if err := codex.DownloadStream(context.Background(), cid, opt); err != nil {
|
||||
t.Fatal("Error happened:", err.Error())
|
||||
}
|
||||
|
||||
@ -89,11 +90,35 @@ func TestDownloadStreamWithNotExisting(t *testing.T) {
|
||||
codex := newCodexNode(t, withBlockRetries(1))
|
||||
|
||||
opt := DownloadStreamOptions{}
|
||||
if err := codex.DownloadStream("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku", opt); err == nil {
|
||||
if err := codex.DownloadStream(context.Background(), "bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku", opt); err == nil {
|
||||
t.Fatal("Error expected when downloading non-existing cid")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDownloadStreamCancelled(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
codex := newCodexNode(t)
|
||||
cid, _ := uploadBigFileHelper(t, codex)
|
||||
|
||||
channelError := make(chan error, 1)
|
||||
go func() {
|
||||
err := codex.DownloadStream(ctx, cid, DownloadStreamOptions{Local: true})
|
||||
channelError <- err
|
||||
}()
|
||||
|
||||
cancel()
|
||||
err := <-channelError
|
||||
|
||||
if err == nil {
|
||||
t.Fatal("UploadFile should have been canceled")
|
||||
}
|
||||
|
||||
if err.Error() != "Failed to stream file: Stream EOF!" {
|
||||
t.Fatalf("UploadFile returned unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDownloadManual(t *testing.T) {
|
||||
codex := newCodexNode(t)
|
||||
cid, _ := uploadHelper(t, codex)
|
||||
|
||||
@ -34,6 +34,7 @@ func newCodexNode(t *testing.T, opts ...codexNodeTestOption) *CodexNode {
|
||||
LogFormat: LogFormatNoColors,
|
||||
MetricsEnabled: false,
|
||||
BlockRetries: o.blockRetries,
|
||||
LogLevel: INFO,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Codex node: %v", err)
|
||||
@ -44,6 +45,11 @@ func newCodexNode(t *testing.T, opts ...codexNodeTestOption) *CodexNode {
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to start Codex node: %v", err)
|
||||
}
|
||||
|
||||
err := node.UpdateLogLevel("INFO")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to set log level: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
t.Cleanup(func() {
|
||||
@ -73,3 +79,17 @@ func uploadHelper(t *testing.T, codex *CodexNode) (string, int) {
|
||||
|
||||
return cid, len
|
||||
}
|
||||
|
||||
func uploadBigFileHelper(t *testing.T, codex *CodexNode) (string, int) {
|
||||
t.Helper()
|
||||
|
||||
len := 1024 * 1024 * 50
|
||||
buf := bytes.NewBuffer(make([]byte, len))
|
||||
|
||||
cid, err := codex.UploadReader(context.Background(), UploadOptions{Filepath: "hello.txt"}, buf)
|
||||
if err != nil {
|
||||
t.Fatalf("Error happened during upload: %v\n", err)
|
||||
}
|
||||
|
||||
return cid, len
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user