From c47f9c44361ac12fe85370138edbb1ae811b6545 Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Wed, 22 Oct 2025 03:18:27 +0200 Subject: [PATCH] Fixes LocalDownload in CodexClient --- communities/codex_client.go | 38 +++-- communities/codex_client_integration_test.go | 95 +++++++++++ communities/codex_client_test.go | 162 +++++++++++++++++++ 3 files changed, 285 insertions(+), 10 deletions(-) diff --git a/communities/codex_client.go b/communities/codex_client.go index 36d4936..769349e 100644 --- a/communities/codex_client.go +++ b/communities/codex_client.go @@ -73,8 +73,8 @@ func (c *CodexClient) Download(cid string, output io.Writer) error { return c.DownloadWithContext(context.Background(), cid, output) } -func (c *CodexClient) LocalDownload(cid string, output io.Writer) error { - return c.LocalDownloadWithContext(context.Background(), cid, output) +func (c *CodexClient) LocalDownload(cid string) (*CodexManifest, error) { + return c.LocalDownloadWithContext(context.Background(), cid) } func (c *CodexClient) HasCid(cid string) (bool, error) { @@ -152,27 +152,45 @@ func (c *CodexClient) DownloadWithContext(ctx context.Context, cid string, outpu return c.copyWithContext(ctx, output, resp.Body) } -func (c *CodexClient) LocalDownloadWithContext(ctx context.Context, cid string, output io.Writer) error { - url := fmt.Sprintf("%s/api/codex/v1/data/%s", c.BaseURL, cid) +// CodexManifest represents the manifest returned by async download +type CodexManifest struct { + CID string `json:"cid"` + Manifest struct { + TreeCid string `json:"treeCid"` + DatasetSize int64 `json:"datasetSize"` + BlockSize int `json:"blockSize"` + Protected bool `json:"protected"` + Filename string `json:"filename"` + Mimetype string `json:"mimetype"` + } `json:"manifest"` +} - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) +func (c *CodexClient) LocalDownloadWithContext(ctx context.Context, cid string) (*CodexManifest, error) { + url := fmt.Sprintf("%s/api/codex/v1/data/%s/network", c.BaseURL, cid) + + req, err := http.NewRequestWithContext(ctx, "POST", url, nil) if err != nil { - return fmt.Errorf("failed to create request: %w", err) + return nil, fmt.Errorf("failed to create request: %w", err) } resp, err := c.Client.Do(req) if err != nil { - return fmt.Errorf("failed to download from codex: %w", err) + return nil, fmt.Errorf("failed to trigger download from codex: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("codex download failed with status %d: %s", resp.StatusCode, string(body)) + return nil, fmt.Errorf("codex async download failed with status %d: %s", resp.StatusCode, string(body)) } - // Use context-aware copy for cancellable streaming - return c.copyWithContext(ctx, output, resp.Body) + // Parse JSON response containing manifest + var manifest CodexManifest + if err := json.NewDecoder(resp.Body).Decode(&manifest); err != nil { + return nil, fmt.Errorf("failed to parse download manifest: %w", err) + } + + return &manifest, nil } // copyWithContext performs io.Copy but respects context cancellation diff --git a/communities/codex_client_integration_test.go b/communities/codex_client_integration_test.go index 1785759..ac30602 100644 --- a/communities/codex_client_integration_test.go +++ b/communities/codex_client_integration_test.go @@ -49,6 +49,13 @@ func TestIntegration_UploadAndDownload(t *testing.T) { } t.Logf("Upload successful, CID: %s", cid) + // Clean up after test + defer func() { + if err := client.RemoveCid(cid); err != nil { + t.Logf("Warning: Failed to remove CID %s: %v", cid, err) + } + }() + // Verify existence via HasCid exists, err := client.HasCid(cid) if err != nil { @@ -116,6 +123,94 @@ func TestIntegration_CheckNonExistingCID(t *testing.T) { t.Logf("HasCid confirmed CID is no longer present: %s", cid) } +func TestIntegration_LocalDownload(t *testing.T) { + host := getenv("CODEX_HOST", "localhost") + port := getenv("CODEX_API_PORT", "8001") // Use port 8001 as specified by user + client := NewCodexClient(host, port) + + // Optional request timeout override + if ms := os.Getenv("CODEX_TIMEOUT_MS"); ms != "" { + if d, err := time.ParseDuration(ms + "ms"); err == nil { + client.SetRequestTimeout(d) + } + } + + // Generate random payload to ensure proper round-trip verification + payload := make([]byte, 1024) + if _, err := rand.Read(payload); err != nil { + t.Fatalf("failed to generate random payload: %v", err) + } + t.Logf("Generated payload (first 32 bytes hex): %s", hex.EncodeToString(payload[:32])) + + // Upload the data + cid, err := client.Upload(bytes.NewReader(payload), "local-download-test.bin") + if err != nil { + t.Fatalf("upload failed: %v", err) + } + t.Logf("Upload successful, CID: %s", cid) + + // Clean up after test + defer func() { + if err := client.RemoveCid(cid); err != nil { + t.Logf("Warning: Failed to remove CID %s: %v", cid, err) + } + }() + + // Trigger async download + manifest, err := client.LocalDownload(cid) + if err != nil { + t.Fatalf("LocalDownload failed: %v", err) + } + t.Logf("Async download triggered, manifest CID: %s", manifest.CID) + + // Poll HasCid for up to 10 seconds using goroutine and channel + downloadComplete := make(chan bool, 1) + go func() { + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + for range ticker.C { + hasCid, err := client.HasCid(cid) + if err != nil { + t.Logf("HasCid check failed: %v", err) + continue + } + if hasCid { + t.Logf("CID is now available locally") + downloadComplete <- true + return + } else { + t.Logf("CID not yet available locally, continuing to poll...") + } + } + }() + + // Wait for download completion or timeout + select { + case <-downloadComplete: + // Download completed successfully + case <-time.After(10 * time.Second): + t.Fatalf("Timeout waiting for CID to be available locally after 10 seconds") + } + + // Now download the actual content and verify it matches + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + var downloadBuf bytes.Buffer + if err := client.DownloadWithContext(ctx, cid, &downloadBuf); err != nil { + t.Fatalf("Download after local download failed: %v", err) + } + + downloadedData := downloadBuf.Bytes() + t.Logf("Downloaded data (first 32 bytes hex): %s", hex.EncodeToString(downloadedData[:32])) + + // Verify the data matches + if !bytes.Equal(payload, downloadedData) { + t.Errorf("Downloaded data does not match uploaded data") + t.Errorf("Expected length: %d, got: %d", len(payload), len(downloadedData)) + } +} + func getenv(k, def string) string { if v := os.Getenv(k); v != "" { return v diff --git a/communities/codex_client_test.go b/communities/codex_client_test.go index d0737ee..05a2b8f 100644 --- a/communities/codex_client_test.go +++ b/communities/codex_client_test.go @@ -275,3 +275,165 @@ func TestRemoveCid_Error(t *testing.T) { t.Fatalf("error should mention status 500, got: %v", err) } } + +func TestLocalDownloadWithContext_Success(t *testing.T) { + const testCid = "zDvZRwzmTestCID" + const expectedManifest = `{ + "cid": "zDvZRwzmTestCID", + "manifest": { + "treeCid": "zDvZRwzmTreeCID", + "datasetSize": 1024, + "blockSize": 65536, + "protected": false, + "filename": "test-file.bin", + "mimetype": "application/octet-stream" + } + }` + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + if r.URL.Path != "/api/codex/v1/data/"+testCid+"/network" { + w.WriteHeader(http.StatusNotFound) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(expectedManifest)) + })) + defer server.Close() + + client := NewCodexClient("localhost", "8080") + client.BaseURL = server.URL + + ctx := context.Background() + manifest, err := client.LocalDownloadWithContext(ctx, testCid) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if manifest.CID != testCid { + t.Fatalf("expected CID %q, got %q", testCid, manifest.CID) + } + if manifest.Manifest.TreeCid != "zDvZRwzmTreeCID" { + t.Fatalf("expected TreeCid %q, got %q", "zDvZRwzmTreeCID", manifest.Manifest.TreeCid) + } + if manifest.Manifest.DatasetSize != 1024 { + t.Fatalf("expected DatasetSize %d, got %d", 1024, manifest.Manifest.DatasetSize) + } + if manifest.Manifest.Filename != "test-file.bin" { + t.Fatalf("expected Filename %q, got %q", "test-file.bin", manifest.Manifest.Filename) + } +} + +func TestLocalDownloadWithContext_RequestError(t *testing.T) { + // Create a server and immediately close it to trigger connection error + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + server.Close() + + client := NewCodexClient("localhost", "8080") + client.BaseURL = server.URL + + ctx := context.Background() + manifest, err := client.LocalDownloadWithContext(ctx, "zDvZRwzmTestCID") + if err == nil { + t.Fatal("expected error, got nil") + } + if manifest != nil { + t.Fatalf("expected nil manifest on error, got %v", manifest) + } +} + +func TestLocalDownloadWithContext_JSONParseError(t *testing.T) { + const testCid = "zDvZRwzmTestCID" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + // Return invalid JSON + w.Write([]byte(`{"invalid": json}`)) + })) + defer server.Close() + + client := NewCodexClient("localhost", "8080") + client.BaseURL = server.URL + + ctx := context.Background() + manifest, err := client.LocalDownloadWithContext(ctx, testCid) + if err == nil { + t.Fatal("expected JSON parse error, got nil") + } + if manifest != nil { + t.Fatalf("expected nil manifest on parse error, got %v", manifest) + } + if !strings.Contains(err.Error(), "failed to parse download manifest") { + t.Fatalf("error should mention parse failure, got: %v", err) + } +} + +func TestLocalDownloadWithContext_HTTPError(t *testing.T) { + const testCid = "zDvZRwzmTestCID" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotFound) + w.Write([]byte("CID not found")) + })) + defer server.Close() + + client := NewCodexClient("localhost", "8080") + client.BaseURL = server.URL + + ctx := context.Background() + manifest, err := client.LocalDownloadWithContext(ctx, testCid) + if err == nil { + t.Fatal("expected error for 404 status, got nil") + } + if manifest != nil { + t.Fatalf("expected nil manifest on HTTP error, got %v", manifest) + } + if !strings.Contains(err.Error(), "404") { + t.Fatalf("error should mention status 404, got: %v", err) + } +} + +func TestLocalDownloadWithContext_Cancellation(t *testing.T) { + const testCid = "zDvZRwzmTestCID" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Simulate slow response to allow cancellation + select { + case <-r.Context().Done(): + return + case <-time.After(200 * time.Millisecond): + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"cid": "test"}`)) + } + })) + defer server.Close() + + client := NewCodexClient("localhost", "8080") + client.BaseURL = server.URL + + // Cancel after 50ms (before server responds) + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + manifest, err := client.LocalDownloadWithContext(ctx, testCid) + if err == nil { + t.Fatal("expected cancellation error, got nil") + } + if manifest != nil { + t.Fatalf("expected nil manifest on cancellation, got %v", manifest) + } + // Accept either canceled or deadline exceeded depending on timing + if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + // net/http may wrap the context error; check error string as a fallback + es := err.Error() + if !(es == context.Canceled.Error() || es == context.DeadlineExceeded.Error()) { + t.Fatalf("expected context cancellation, got: %v", err) + } + } +}