mirror of
https://github.com/logos-storage/logos-storage-go.git
synced 2026-01-05 23:03:09 +00:00
Fixes LocalDownload in CodexClient
This commit is contained in:
parent
d8b762ea06
commit
c47f9c4436
@ -73,8 +73,8 @@ func (c *CodexClient) Download(cid string, output io.Writer) error {
|
||||
return c.DownloadWithContext(context.Background(), cid, output)
|
||||
}
|
||||
|
||||
func (c *CodexClient) LocalDownload(cid string, output io.Writer) error {
|
||||
return c.LocalDownloadWithContext(context.Background(), cid, output)
|
||||
func (c *CodexClient) LocalDownload(cid string) (*CodexManifest, error) {
|
||||
return c.LocalDownloadWithContext(context.Background(), cid)
|
||||
}
|
||||
|
||||
func (c *CodexClient) HasCid(cid string) (bool, error) {
|
||||
@ -152,27 +152,45 @@ func (c *CodexClient) DownloadWithContext(ctx context.Context, cid string, outpu
|
||||
return c.copyWithContext(ctx, output, resp.Body)
|
||||
}
|
||||
|
||||
func (c *CodexClient) LocalDownloadWithContext(ctx context.Context, cid string, output io.Writer) error {
|
||||
url := fmt.Sprintf("%s/api/codex/v1/data/%s", c.BaseURL, cid)
|
||||
// CodexManifest represents the manifest returned by async download
|
||||
type CodexManifest struct {
|
||||
CID string `json:"cid"`
|
||||
Manifest struct {
|
||||
TreeCid string `json:"treeCid"`
|
||||
DatasetSize int64 `json:"datasetSize"`
|
||||
BlockSize int `json:"blockSize"`
|
||||
Protected bool `json:"protected"`
|
||||
Filename string `json:"filename"`
|
||||
Mimetype string `json:"mimetype"`
|
||||
} `json:"manifest"`
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
|
||||
func (c *CodexClient) LocalDownloadWithContext(ctx context.Context, cid string) (*CodexManifest, error) {
|
||||
url := fmt.Sprintf("%s/api/codex/v1/data/%s/network", c.BaseURL, cid)
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", url, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create request: %w", err)
|
||||
return nil, fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
|
||||
resp, err := c.Client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to download from codex: %w", err)
|
||||
return nil, fmt.Errorf("failed to trigger download from codex: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return fmt.Errorf("codex download failed with status %d: %s", resp.StatusCode, string(body))
|
||||
return nil, fmt.Errorf("codex async download failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
// Use context-aware copy for cancellable streaming
|
||||
return c.copyWithContext(ctx, output, resp.Body)
|
||||
// Parse JSON response containing manifest
|
||||
var manifest CodexManifest
|
||||
if err := json.NewDecoder(resp.Body).Decode(&manifest); err != nil {
|
||||
return nil, fmt.Errorf("failed to parse download manifest: %w", err)
|
||||
}
|
||||
|
||||
return &manifest, nil
|
||||
}
|
||||
|
||||
// copyWithContext performs io.Copy but respects context cancellation
|
||||
|
||||
@ -49,6 +49,13 @@ func TestIntegration_UploadAndDownload(t *testing.T) {
|
||||
}
|
||||
t.Logf("Upload successful, CID: %s", cid)
|
||||
|
||||
// Clean up after test
|
||||
defer func() {
|
||||
if err := client.RemoveCid(cid); err != nil {
|
||||
t.Logf("Warning: Failed to remove CID %s: %v", cid, err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Verify existence via HasCid
|
||||
exists, err := client.HasCid(cid)
|
||||
if err != nil {
|
||||
@ -116,6 +123,94 @@ func TestIntegration_CheckNonExistingCID(t *testing.T) {
|
||||
t.Logf("HasCid confirmed CID is no longer present: %s", cid)
|
||||
}
|
||||
|
||||
func TestIntegration_LocalDownload(t *testing.T) {
|
||||
host := getenv("CODEX_HOST", "localhost")
|
||||
port := getenv("CODEX_API_PORT", "8001") // Use port 8001 as specified by user
|
||||
client := NewCodexClient(host, port)
|
||||
|
||||
// Optional request timeout override
|
||||
if ms := os.Getenv("CODEX_TIMEOUT_MS"); ms != "" {
|
||||
if d, err := time.ParseDuration(ms + "ms"); err == nil {
|
||||
client.SetRequestTimeout(d)
|
||||
}
|
||||
}
|
||||
|
||||
// Generate random payload to ensure proper round-trip verification
|
||||
payload := make([]byte, 1024)
|
||||
if _, err := rand.Read(payload); err != nil {
|
||||
t.Fatalf("failed to generate random payload: %v", err)
|
||||
}
|
||||
t.Logf("Generated payload (first 32 bytes hex): %s", hex.EncodeToString(payload[:32]))
|
||||
|
||||
// Upload the data
|
||||
cid, err := client.Upload(bytes.NewReader(payload), "local-download-test.bin")
|
||||
if err != nil {
|
||||
t.Fatalf("upload failed: %v", err)
|
||||
}
|
||||
t.Logf("Upload successful, CID: %s", cid)
|
||||
|
||||
// Clean up after test
|
||||
defer func() {
|
||||
if err := client.RemoveCid(cid); err != nil {
|
||||
t.Logf("Warning: Failed to remove CID %s: %v", cid, err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Trigger async download
|
||||
manifest, err := client.LocalDownload(cid)
|
||||
if err != nil {
|
||||
t.Fatalf("LocalDownload failed: %v", err)
|
||||
}
|
||||
t.Logf("Async download triggered, manifest CID: %s", manifest.CID)
|
||||
|
||||
// Poll HasCid for up to 10 seconds using goroutine and channel
|
||||
downloadComplete := make(chan bool, 1)
|
||||
go func() {
|
||||
ticker := time.NewTicker(500 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
hasCid, err := client.HasCid(cid)
|
||||
if err != nil {
|
||||
t.Logf("HasCid check failed: %v", err)
|
||||
continue
|
||||
}
|
||||
if hasCid {
|
||||
t.Logf("CID is now available locally")
|
||||
downloadComplete <- true
|
||||
return
|
||||
} else {
|
||||
t.Logf("CID not yet available locally, continuing to poll...")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for download completion or timeout
|
||||
select {
|
||||
case <-downloadComplete:
|
||||
// Download completed successfully
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatalf("Timeout waiting for CID to be available locally after 10 seconds")
|
||||
}
|
||||
|
||||
// Now download the actual content and verify it matches
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
var downloadBuf bytes.Buffer
|
||||
if err := client.DownloadWithContext(ctx, cid, &downloadBuf); err != nil {
|
||||
t.Fatalf("Download after local download failed: %v", err)
|
||||
}
|
||||
|
||||
downloadedData := downloadBuf.Bytes()
|
||||
t.Logf("Downloaded data (first 32 bytes hex): %s", hex.EncodeToString(downloadedData[:32]))
|
||||
|
||||
// Verify the data matches
|
||||
if !bytes.Equal(payload, downloadedData) {
|
||||
t.Errorf("Downloaded data does not match uploaded data")
|
||||
t.Errorf("Expected length: %d, got: %d", len(payload), len(downloadedData))
|
||||
}
|
||||
}
|
||||
|
||||
func getenv(k, def string) string {
|
||||
if v := os.Getenv(k); v != "" {
|
||||
return v
|
||||
|
||||
@ -275,3 +275,165 @@ func TestRemoveCid_Error(t *testing.T) {
|
||||
t.Fatalf("error should mention status 500, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLocalDownloadWithContext_Success(t *testing.T) {
|
||||
const testCid = "zDvZRwzmTestCID"
|
||||
const expectedManifest = `{
|
||||
"cid": "zDvZRwzmTestCID",
|
||||
"manifest": {
|
||||
"treeCid": "zDvZRwzmTreeCID",
|
||||
"datasetSize": 1024,
|
||||
"blockSize": 65536,
|
||||
"protected": false,
|
||||
"filename": "test-file.bin",
|
||||
"mimetype": "application/octet-stream"
|
||||
}
|
||||
}`
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
if r.URL.Path != "/api/codex/v1/data/"+testCid+"/network" {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte(expectedManifest))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewCodexClient("localhost", "8080")
|
||||
client.BaseURL = server.URL
|
||||
|
||||
ctx := context.Background()
|
||||
manifest, err := client.LocalDownloadWithContext(ctx, testCid)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if manifest.CID != testCid {
|
||||
t.Fatalf("expected CID %q, got %q", testCid, manifest.CID)
|
||||
}
|
||||
if manifest.Manifest.TreeCid != "zDvZRwzmTreeCID" {
|
||||
t.Fatalf("expected TreeCid %q, got %q", "zDvZRwzmTreeCID", manifest.Manifest.TreeCid)
|
||||
}
|
||||
if manifest.Manifest.DatasetSize != 1024 {
|
||||
t.Fatalf("expected DatasetSize %d, got %d", 1024, manifest.Manifest.DatasetSize)
|
||||
}
|
||||
if manifest.Manifest.Filename != "test-file.bin" {
|
||||
t.Fatalf("expected Filename %q, got %q", "test-file.bin", manifest.Manifest.Filename)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLocalDownloadWithContext_RequestError(t *testing.T) {
|
||||
// Create a server and immediately close it to trigger connection error
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
|
||||
server.Close()
|
||||
|
||||
client := NewCodexClient("localhost", "8080")
|
||||
client.BaseURL = server.URL
|
||||
|
||||
ctx := context.Background()
|
||||
manifest, err := client.LocalDownloadWithContext(ctx, "zDvZRwzmTestCID")
|
||||
if err == nil {
|
||||
t.Fatal("expected error, got nil")
|
||||
}
|
||||
if manifest != nil {
|
||||
t.Fatalf("expected nil manifest on error, got %v", manifest)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLocalDownloadWithContext_JSONParseError(t *testing.T) {
|
||||
const testCid = "zDvZRwzmTestCID"
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
// Return invalid JSON
|
||||
w.Write([]byte(`{"invalid": json}`))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewCodexClient("localhost", "8080")
|
||||
client.BaseURL = server.URL
|
||||
|
||||
ctx := context.Background()
|
||||
manifest, err := client.LocalDownloadWithContext(ctx, testCid)
|
||||
if err == nil {
|
||||
t.Fatal("expected JSON parse error, got nil")
|
||||
}
|
||||
if manifest != nil {
|
||||
t.Fatalf("expected nil manifest on parse error, got %v", manifest)
|
||||
}
|
||||
if !strings.Contains(err.Error(), "failed to parse download manifest") {
|
||||
t.Fatalf("error should mention parse failure, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLocalDownloadWithContext_HTTPError(t *testing.T) {
|
||||
const testCid = "zDvZRwzmTestCID"
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
w.Write([]byte("CID not found"))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewCodexClient("localhost", "8080")
|
||||
client.BaseURL = server.URL
|
||||
|
||||
ctx := context.Background()
|
||||
manifest, err := client.LocalDownloadWithContext(ctx, testCid)
|
||||
if err == nil {
|
||||
t.Fatal("expected error for 404 status, got nil")
|
||||
}
|
||||
if manifest != nil {
|
||||
t.Fatalf("expected nil manifest on HTTP error, got %v", manifest)
|
||||
}
|
||||
if !strings.Contains(err.Error(), "404") {
|
||||
t.Fatalf("error should mention status 404, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLocalDownloadWithContext_Cancellation(t *testing.T) {
|
||||
const testCid = "zDvZRwzmTestCID"
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// Simulate slow response to allow cancellation
|
||||
select {
|
||||
case <-r.Context().Done():
|
||||
return
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte(`{"cid": "test"}`))
|
||||
}
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewCodexClient("localhost", "8080")
|
||||
client.BaseURL = server.URL
|
||||
|
||||
// Cancel after 50ms (before server responds)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
manifest, err := client.LocalDownloadWithContext(ctx, testCid)
|
||||
if err == nil {
|
||||
t.Fatal("expected cancellation error, got nil")
|
||||
}
|
||||
if manifest != nil {
|
||||
t.Fatalf("expected nil manifest on cancellation, got %v", manifest)
|
||||
}
|
||||
// Accept either canceled or deadline exceeded depending on timing
|
||||
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
// net/http may wrap the context error; check error string as a fallback
|
||||
es := err.Error()
|
||||
if !(es == context.Canceled.Error() || es == context.DeadlineExceeded.Error()) {
|
||||
t.Fatalf("expected context cancellation, got: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user