diff --git a/communities/codex_index_downloader.go b/communities/codex_index_downloader.go index 6de1469..99ca679 100644 --- a/communities/codex_index_downloader.go +++ b/communities/codex_index_downloader.go @@ -2,21 +2,25 @@ package communities import ( "context" + "fmt" "io" "os" + "path/filepath" "go.uber.org/zap" ) // CodexIndexDownloader handles downloading index files from Codex storage type CodexIndexDownloader struct { - codexClient CodexClientInterface - indexCid string - filePath string - datasetSize int64 // stores the dataset size from the manifest - bytesCompleted int64 // tracks download progress - cancelChan <-chan struct{} // for cancellation support - logger *zap.Logger + codexClient CodexClientInterface + indexCid string + filePath string + datasetSize int64 // stores the dataset size from the manifest + bytesCompleted int64 // tracks download progress + downloadComplete bool // true when file is fully downloaded and renamed + downloadError error // stores the last error that occurred during manifest fetch or download + cancelChan <-chan struct{} // for cancellation support + logger *zap.Logger } // NewCodexIndexDownloader creates a new index downloader @@ -39,6 +43,7 @@ func (d *CodexIndexDownloader) GotManifest() <-chan struct{} { go func() { // Reset datasetSize to 0 to indicate no successful fetch yet d.datasetSize = 0 + d.downloadError = nil // Check for cancellation before starting select { @@ -63,6 +68,7 @@ func (d *CodexIndexDownloader) GotManifest() <-chan struct{} { // Fetch manifest from Codex manifest, err := d.codexClient.FetchManifestWithContext(ctx, d.indexCid) if err != nil { + d.downloadError = err d.logger.Debug("failed to fetch manifest", zap.String("indexCid", d.indexCid), zap.Error(err)) @@ -73,6 +79,7 @@ func (d *CodexIndexDownloader) GotManifest() <-chan struct{} { // Verify that the CID matches our configured indexCid if manifest.CID != d.indexCid { + d.downloadError = fmt.Errorf("manifest CID mismatch: expected %s, got %s", d.indexCid, manifest.CID) d.logger.Debug("manifest CID mismatch", zap.String("expected", d.indexCid), zap.String("got", manifest.CID)) @@ -96,56 +103,85 @@ func (d *CodexIndexDownloader) GetDatasetSize() int64 { // DownloadIndexFile starts downloading the index file from Codex and writes it to the configured file path func (d *CodexIndexDownloader) DownloadIndexFile() { - // Reset progress counter + // Reset progress counter and completion flag d.bytesCompleted = 0 + d.downloadComplete = false + d.downloadError = nil + // Create cancellable context + ctx, cancel := context.WithCancel(context.Background()) + + // Monitor for cancellation in separate goroutine go func() { - // Check for cancellation before starting select { case <-d.cancelChan: - return // Exit early if cancelled - default: + cancel() // Cancel download immediately + case <-ctx.Done(): + // Context already cancelled, nothing to do } + }() - // Create cancellable context - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + // Start download in separate goroutine + go func() { + defer cancel() // Ensure context is cancelled when download completes or fails - // Monitor for cancellation - go func() { - select { - case <-d.cancelChan: - cancel() // Cancel download immediately - case <-ctx.Done(): - // Context already cancelled, nothing to do - } - }() - - // Create the output file - file, err := os.Create(d.filePath) + // Create a temporary file in the same directory as the target file + // This ensures atomic rename works (same filesystem) + tmpFile, err := os.CreateTemp(filepath.Dir(d.filePath), ".codex-download-*.tmp") if err != nil { - d.logger.Debug("failed to create file", + d.downloadError = fmt.Errorf("failed to create temporary file: %w", err) + d.logger.Debug("failed to create temporary file", zap.String("filePath", d.filePath), zap.Error(err)) return } - defer file.Close() + tmpPath := tmpFile.Name() + defer func() { + tmpFile.Close() + // Clean up temporary file if it still exists (i.e., download failed) + os.Remove(tmpPath) + }() // Create a progress tracking writer progressWriter := &progressWriter{ - writer: file, + writer: tmpFile, completed: &d.bytesCompleted, } - // Use CodexClient to download and stream to file with context for cancellation + // Use CodexClient to download and stream to temporary file with context for cancellation err = d.codexClient.DownloadWithContext(ctx, d.indexCid, progressWriter) if err != nil { + d.downloadError = fmt.Errorf("failed to download index file: %w", err) d.logger.Debug("failed to download index file", zap.String("indexCid", d.indexCid), zap.String("filePath", d.filePath), + zap.String("tmpPath", tmpPath), + zap.Error(err)) + return + } + + // Close the temporary file before renaming + if err := tmpFile.Close(); err != nil { + d.downloadError = fmt.Errorf("failed to close temporary file: %w", err) + d.logger.Debug("failed to close temporary file", + zap.String("tmpPath", tmpPath), + zap.Error(err)) + return + } + + // Atomically rename temporary file to final destination + // This ensures we only have a complete file at filePath + if err := os.Rename(tmpPath, d.filePath); err != nil { + d.downloadError = fmt.Errorf("failed to rename temporary file to final destination: %w", err) + d.logger.Debug("failed to rename temporary file to final destination", + zap.String("tmpPath", tmpPath), + zap.String("filePath", d.filePath), zap.Error(err)) return } + + // Mark download as complete only after successful rename + d.downloadComplete = true }() } @@ -154,6 +190,16 @@ func (d *CodexIndexDownloader) BytesCompleted() int64 { return d.bytesCompleted } +// IsDownloadComplete returns true when the file has been fully downloaded and saved to disk +func (d *CodexIndexDownloader) IsDownloadComplete() bool { + return d.downloadComplete +} + +// GetError returns the last error that occurred during manifest fetch or download, or nil if no error +func (d *CodexIndexDownloader) GetError() error { + return d.downloadError +} + // Length returns the total dataset size (equivalent to torrent file length) func (d *CodexIndexDownloader) Length() int64 { return d.datasetSize diff --git a/communities/codex_index_downloader_integration_test.go b/communities/codex_index_downloader_integration_test.go index 0d46f49..642c964 100644 --- a/communities/codex_index_downloader_integration_test.go +++ b/communities/codex_index_downloader_integration_test.go @@ -117,6 +117,10 @@ func (suite *CodexIndexDownloaderIntegrationTestSuite) TestIntegration_GotManife // Verify Length returns the same value assert.Equal(suite.T(), datasetSize, downloader.Length(), "Length() should return dataset size") + + // Verify no error occurred + assert.NoError(suite.T(), downloader.GetError(), "No error should occur during manifest fetch") + suite.T().Log("✅ No errors during manifest fetch") } func (suite *CodexIndexDownloaderIntegrationTestSuite) TestIntegration_DownloadIndexFile() { @@ -157,6 +161,9 @@ func (suite *CodexIndexDownloaderIntegrationTestSuite) TestIntegration_DownloadI expectedSize := downloader.GetDatasetSize() suite.T().Logf("Expected file size: %d bytes", expectedSize) + // Verify no error from manifest fetch + assert.NoError(suite.T(), downloader.GetError(), "No error should occur during manifest fetch") + // Start the download downloader.DownloadIndexFile() @@ -167,6 +174,14 @@ func (suite *CodexIndexDownloaderIntegrationTestSuite) TestIntegration_DownloadI suite.T().Logf("✅ Download completed: %d/%d bytes", downloader.BytesCompleted(), expectedSize) + // Verify download is marked as complete + assert.True(suite.T(), downloader.IsDownloadComplete(), "Download should be marked as complete") + suite.T().Log("✅ Download marked as complete") + + // Verify no error occurred during download + assert.NoError(suite.T(), downloader.GetError(), "No error should occur during download") + suite.T().Log("✅ No errors during download") + // Verify file exists and has correct size stat, err := os.Stat(filePath) require.NoError(suite.T(), err, "Downloaded file should exist") diff --git a/communities/codex_index_downloader_test.go b/communities/codex_index_downloader_test.go index 355d4f0..5743d2e 100644 --- a/communities/codex_index_downloader_test.go +++ b/communities/codex_index_downloader_test.go @@ -45,8 +45,8 @@ func (suite *CodexIndexDownloaderTestSuite) SetupTest() { // Create a fresh cancel channel for each test suite.cancelChan = make(chan struct{}) - // Create logger - suite.logger, _ = zap.NewDevelopment() + // Use NOP logger for unit tests (no output noise) + suite.logger = zap.NewNop() } // TearDownTest runs after each test method @@ -110,6 +110,9 @@ func (suite *CodexIndexDownloaderTestSuite) TestGotManifest_SuccessClosesChannel // Verify dataset size was recorded assert.Equal(suite.T(), int64(1024), downloader.GetDatasetSize(), "Dataset size should be recorded") + + // Verify no error was recorded + assert.NoError(suite.T(), downloader.GetError(), "No error should be recorded on success") } func (suite *CodexIndexDownloaderTestSuite) TestGotManifest_ErrorDoesNotCloseChannel() { @@ -138,6 +141,14 @@ func (suite *CodexIndexDownloaderTestSuite) TestGotManifest_ErrorDoesNotCloseCha // Verify dataset size was NOT recorded (should be 0) assert.Equal(suite.T(), int64(0), downloader.GetDatasetSize(), "Dataset size should be 0 on error") + + // Verify download is not complete + assert.False(suite.T(), downloader.IsDownloadComplete(), "Download should not be complete on error") + + // Verify error was recorded + assert.Error(suite.T(), downloader.GetError(), "Error should be recorded") + assert.Contains(suite.T(), downloader.GetError().Error(), "fetch error", "Error message should contain fetch error") + suite.T().Log("✅ Error was recorded correctly") } func (suite *CodexIndexDownloaderTestSuite) TestGotManifest_CidMismatchDoesNotCloseChannel() { @@ -172,6 +183,14 @@ func (suite *CodexIndexDownloaderTestSuite) TestGotManifest_CidMismatchDoesNotCl // Verify dataset size was NOT recorded (should be 0) assert.Equal(suite.T(), int64(0), downloader.GetDatasetSize(), "Dataset size should be 0 on CID mismatch") + + // Verify download is not complete + assert.False(suite.T(), downloader.IsDownloadComplete(), "Download should not be complete on CID mismatch") + + // Verify error was recorded + assert.Error(suite.T(), downloader.GetError(), "Error should be recorded for CID mismatch") + assert.Contains(suite.T(), downloader.GetError().Error(), "CID mismatch", "Error message should mention CID mismatch") + suite.T().Log("✅ Error was recorded for CID mismatch") } func (suite *CodexIndexDownloaderTestSuite) TestGotManifest_Cancellation() { @@ -218,6 +237,14 @@ func (suite *CodexIndexDownloaderTestSuite) TestGotManifest_Cancellation() { // Verify dataset size was NOT recorded assert.Equal(suite.T(), int64(0), downloader.GetDatasetSize(), "Dataset size should be 0 on cancellation") + + // Verify download is not complete + assert.False(suite.T(), downloader.IsDownloadComplete(), "Download should not be complete on cancellation") + + // Verify error was recorded (context cancellation) + assert.Error(suite.T(), downloader.GetError(), "Error should be recorded for cancellation") + assert.ErrorIs(suite.T(), downloader.GetError(), context.Canceled, "Error should be context.Canceled") + suite.T().Log("✅ Cancellation error was recorded correctly") } func (suite *CodexIndexDownloaderTestSuite) TestGotManifest_RecordsDatasetSize() { @@ -256,6 +283,9 @@ func (suite *CodexIndexDownloaderTestSuite) TestGotManifest_RecordsDatasetSize() // Verify dataset size was recorded correctly assert.Equal(suite.T(), expectedSize, downloader.GetDatasetSize(), "Dataset size should match manifest") suite.T().Logf("✅ Dataset size correctly recorded: %d", downloader.GetDatasetSize()) + + // Verify no error was recorded + assert.NoError(suite.T(), downloader.GetError(), "No error should be recorded on success") } // ==================== DownloadIndexFile Tests ==================== @@ -279,20 +309,35 @@ func (suite *CodexIndexDownloaderTestSuite) TestDownloadIndexFile_StoresFileCorr // Start download downloader.DownloadIndexFile() - // Wait for download to complete (check file existence and size) + // Wait for download to complete (check bytes completed first, then file existence) require.Eventually(suite.T(), func() bool { + // First check: all bytes downloaded + if downloader.BytesCompleted() != int64(len(testData)) { + return false + } + // Second check: download marked as complete (file renamed) + if !downloader.IsDownloadComplete() { + return false + } + // Third check: file actually exists with correct size stat, err := os.Stat(filePath) if err != nil { return false } return stat.Size() == int64(len(testData)) - }, 2*time.Second, 50*time.Millisecond, "File should be created with correct size") + }, 2*time.Second, 50*time.Millisecond, "File should be fully downloaded and saved") // Verify file contents actualData, err := os.ReadFile(filePath) require.NoError(suite.T(), err, "Should be able to read downloaded file") assert.Equal(suite.T(), testData, actualData, "File contents should match") suite.T().Logf("✅ File downloaded successfully to: %s", filePath) + + // Verify download is complete + assert.True(suite.T(), downloader.IsDownloadComplete(), "Download should be complete") + + // Verify no error was recorded + assert.NoError(suite.T(), downloader.GetError(), "No error should be recorded on successful download") } func (suite *CodexIndexDownloaderTestSuite) TestDownloadIndexFile_TracksProgress() { @@ -342,6 +387,12 @@ func (suite *CodexIndexDownloaderTestSuite) TestDownloadIndexFile_TracksProgress assert.Equal(suite.T(), int64(len(testData)), downloader.BytesCompleted(), "All bytes should be downloaded") suite.T().Logf("✅ Download progress tracked correctly: %d/%d bytes", downloader.BytesCompleted(), len(testData)) + + // Verify download is complete + assert.True(suite.T(), downloader.IsDownloadComplete(), "Download should be complete") + + // Verify no error was recorded + assert.NoError(suite.T(), downloader.GetError(), "No error should be recorded on successful download") } func (suite *CodexIndexDownloaderTestSuite) TestDownloadIndexFile_Cancellation() { @@ -356,7 +407,7 @@ func (suite *CodexIndexDownloaderTestSuite) TestDownloadIndexFile_Cancellation() close(downloadStarted) // Signal that download started // Simulate slow download with cancellation check - for i := 0; i < 100; i++ { + for range 100 { select { case <-ctx.Done(): return ctx.Err() // Return cancellation error @@ -396,6 +447,20 @@ func (suite *CodexIndexDownloaderTestSuite) TestDownloadIndexFile_Cancellation() bytesCompleted := downloader.BytesCompleted() suite.T().Logf("✅ Download cancelled after %d bytes (should be < 100)", bytesCompleted) assert.Less(suite.T(), bytesCompleted, int64(100), "Download should be cancelled before completing all 100 bytes") + + // Verify download is not complete + assert.False(suite.T(), downloader.IsDownloadComplete(), "Download should not be complete on cancellation") + + // Verify error was recorded (context cancellation) + assert.Error(suite.T(), downloader.GetError(), "Error should be recorded for cancellation") + assert.ErrorIs(suite.T(), downloader.GetError(), context.Canceled, "Error should be context.Canceled") + suite.T().Log("✅ Cancellation error was recorded correctly") + + // Verify that the target file does NOT exist (atomic write should clean up temp file on cancellation) + _, err := os.Stat(filePath) + assert.True(suite.T(), os.IsNotExist(err), "Target file should not exist after cancellation") + suite.T().Log("✅ Target file does not exist after cancellation (temp file cleaned up)") + assert.False(suite.T(), downloader.IsDownloadComplete(), "✅ IsDownloadComplete should be false after cancellation") } func (suite *CodexIndexDownloaderTestSuite) TestDownloadIndexFile_ErrorHandling() { @@ -420,13 +485,19 @@ func (suite *CodexIndexDownloaderTestSuite) TestDownloadIndexFile_ErrorHandling( assert.Equal(suite.T(), int64(0), downloader.BytesCompleted(), "No bytes should be recorded on error") suite.T().Log("✅ Error handling: no bytes recorded on download failure") - // File should exist but be empty (or not exist if creation failed) - // This is current behavior - we might want to improve it to clean up on error - if stat, err := os.Stat(filePath); err == nil { - suite.T().Logf("File exists with size: %d bytes (current behavior)", stat.Size()) - } else { - suite.T().Log("File does not exist (current behavior)") - } + // Verify download is not complete + assert.False(suite.T(), downloader.IsDownloadComplete(), "Download should not be complete on error") + + // Verify error was recorded + assert.Error(suite.T(), downloader.GetError(), "Error should be recorded") + assert.Contains(suite.T(), downloader.GetError().Error(), "download failed", "Error message should contain download failed") + suite.T().Log("✅ Error was recorded correctly") + + // Verify that the target file does NOT exist (atomic write should clean up temp file) + _, err := os.Stat(filePath) + assert.True(suite.T(), os.IsNotExist(err), "Target file should not exist on download error") + suite.T().Log("✅ Target file does not exist after download error (temp file cleaned up)") + assert.False(suite.T(), downloader.IsDownloadComplete(), "✅ IsDownloadComplete should be false after cancellation") } func (suite *CodexIndexDownloaderTestSuite) TestLength_ReturnsDatasetSize() {