mirror of
https://github.com/logos-storage/logos-storage-go.git
synced 2026-01-08 08:13:13 +00:00
gets better completion reporting and error handling for the index downloader
This commit is contained in:
parent
69b0daac1f
commit
f0aa462dc0
@ -2,21 +2,25 @@ package communities
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// CodexIndexDownloader handles downloading index files from Codex storage
|
||||
type CodexIndexDownloader struct {
|
||||
codexClient CodexClientInterface
|
||||
indexCid string
|
||||
filePath string
|
||||
datasetSize int64 // stores the dataset size from the manifest
|
||||
bytesCompleted int64 // tracks download progress
|
||||
cancelChan <-chan struct{} // for cancellation support
|
||||
logger *zap.Logger
|
||||
codexClient CodexClientInterface
|
||||
indexCid string
|
||||
filePath string
|
||||
datasetSize int64 // stores the dataset size from the manifest
|
||||
bytesCompleted int64 // tracks download progress
|
||||
downloadComplete bool // true when file is fully downloaded and renamed
|
||||
downloadError error // stores the last error that occurred during manifest fetch or download
|
||||
cancelChan <-chan struct{} // for cancellation support
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// NewCodexIndexDownloader creates a new index downloader
|
||||
@ -39,6 +43,7 @@ func (d *CodexIndexDownloader) GotManifest() <-chan struct{} {
|
||||
go func() {
|
||||
// Reset datasetSize to 0 to indicate no successful fetch yet
|
||||
d.datasetSize = 0
|
||||
d.downloadError = nil
|
||||
|
||||
// Check for cancellation before starting
|
||||
select {
|
||||
@ -63,6 +68,7 @@ func (d *CodexIndexDownloader) GotManifest() <-chan struct{} {
|
||||
// Fetch manifest from Codex
|
||||
manifest, err := d.codexClient.FetchManifestWithContext(ctx, d.indexCid)
|
||||
if err != nil {
|
||||
d.downloadError = err
|
||||
d.logger.Debug("failed to fetch manifest",
|
||||
zap.String("indexCid", d.indexCid),
|
||||
zap.Error(err))
|
||||
@ -73,6 +79,7 @@ func (d *CodexIndexDownloader) GotManifest() <-chan struct{} {
|
||||
|
||||
// Verify that the CID matches our configured indexCid
|
||||
if manifest.CID != d.indexCid {
|
||||
d.downloadError = fmt.Errorf("manifest CID mismatch: expected %s, got %s", d.indexCid, manifest.CID)
|
||||
d.logger.Debug("manifest CID mismatch",
|
||||
zap.String("expected", d.indexCid),
|
||||
zap.String("got", manifest.CID))
|
||||
@ -96,56 +103,85 @@ func (d *CodexIndexDownloader) GetDatasetSize() int64 {
|
||||
|
||||
// DownloadIndexFile starts downloading the index file from Codex and writes it to the configured file path
|
||||
func (d *CodexIndexDownloader) DownloadIndexFile() {
|
||||
// Reset progress counter
|
||||
// Reset progress counter and completion flag
|
||||
d.bytesCompleted = 0
|
||||
d.downloadComplete = false
|
||||
d.downloadError = nil
|
||||
|
||||
// Create cancellable context
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
// Monitor for cancellation in separate goroutine
|
||||
go func() {
|
||||
// Check for cancellation before starting
|
||||
select {
|
||||
case <-d.cancelChan:
|
||||
return // Exit early if cancelled
|
||||
default:
|
||||
cancel() // Cancel download immediately
|
||||
case <-ctx.Done():
|
||||
// Context already cancelled, nothing to do
|
||||
}
|
||||
}()
|
||||
|
||||
// Create cancellable context
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
// Start download in separate goroutine
|
||||
go func() {
|
||||
defer cancel() // Ensure context is cancelled when download completes or fails
|
||||
|
||||
// Monitor for cancellation
|
||||
go func() {
|
||||
select {
|
||||
case <-d.cancelChan:
|
||||
cancel() // Cancel download immediately
|
||||
case <-ctx.Done():
|
||||
// Context already cancelled, nothing to do
|
||||
}
|
||||
}()
|
||||
|
||||
// Create the output file
|
||||
file, err := os.Create(d.filePath)
|
||||
// Create a temporary file in the same directory as the target file
|
||||
// This ensures atomic rename works (same filesystem)
|
||||
tmpFile, err := os.CreateTemp(filepath.Dir(d.filePath), ".codex-download-*.tmp")
|
||||
if err != nil {
|
||||
d.logger.Debug("failed to create file",
|
||||
d.downloadError = fmt.Errorf("failed to create temporary file: %w", err)
|
||||
d.logger.Debug("failed to create temporary file",
|
||||
zap.String("filePath", d.filePath),
|
||||
zap.Error(err))
|
||||
return
|
||||
}
|
||||
defer file.Close()
|
||||
tmpPath := tmpFile.Name()
|
||||
defer func() {
|
||||
tmpFile.Close()
|
||||
// Clean up temporary file if it still exists (i.e., download failed)
|
||||
os.Remove(tmpPath)
|
||||
}()
|
||||
|
||||
// Create a progress tracking writer
|
||||
progressWriter := &progressWriter{
|
||||
writer: file,
|
||||
writer: tmpFile,
|
||||
completed: &d.bytesCompleted,
|
||||
}
|
||||
|
||||
// Use CodexClient to download and stream to file with context for cancellation
|
||||
// Use CodexClient to download and stream to temporary file with context for cancellation
|
||||
err = d.codexClient.DownloadWithContext(ctx, d.indexCid, progressWriter)
|
||||
if err != nil {
|
||||
d.downloadError = fmt.Errorf("failed to download index file: %w", err)
|
||||
d.logger.Debug("failed to download index file",
|
||||
zap.String("indexCid", d.indexCid),
|
||||
zap.String("filePath", d.filePath),
|
||||
zap.String("tmpPath", tmpPath),
|
||||
zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
// Close the temporary file before renaming
|
||||
if err := tmpFile.Close(); err != nil {
|
||||
d.downloadError = fmt.Errorf("failed to close temporary file: %w", err)
|
||||
d.logger.Debug("failed to close temporary file",
|
||||
zap.String("tmpPath", tmpPath),
|
||||
zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
// Atomically rename temporary file to final destination
|
||||
// This ensures we only have a complete file at filePath
|
||||
if err := os.Rename(tmpPath, d.filePath); err != nil {
|
||||
d.downloadError = fmt.Errorf("failed to rename temporary file to final destination: %w", err)
|
||||
d.logger.Debug("failed to rename temporary file to final destination",
|
||||
zap.String("tmpPath", tmpPath),
|
||||
zap.String("filePath", d.filePath),
|
||||
zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
// Mark download as complete only after successful rename
|
||||
d.downloadComplete = true
|
||||
}()
|
||||
}
|
||||
|
||||
@ -154,6 +190,16 @@ func (d *CodexIndexDownloader) BytesCompleted() int64 {
|
||||
return d.bytesCompleted
|
||||
}
|
||||
|
||||
// IsDownloadComplete returns true when the file has been fully downloaded and saved to disk
|
||||
func (d *CodexIndexDownloader) IsDownloadComplete() bool {
|
||||
return d.downloadComplete
|
||||
}
|
||||
|
||||
// GetError returns the last error that occurred during manifest fetch or download, or nil if no error
|
||||
func (d *CodexIndexDownloader) GetError() error {
|
||||
return d.downloadError
|
||||
}
|
||||
|
||||
// Length returns the total dataset size (equivalent to torrent file length)
|
||||
func (d *CodexIndexDownloader) Length() int64 {
|
||||
return d.datasetSize
|
||||
|
||||
@ -117,6 +117,10 @@ func (suite *CodexIndexDownloaderIntegrationTestSuite) TestIntegration_GotManife
|
||||
|
||||
// Verify Length returns the same value
|
||||
assert.Equal(suite.T(), datasetSize, downloader.Length(), "Length() should return dataset size")
|
||||
|
||||
// Verify no error occurred
|
||||
assert.NoError(suite.T(), downloader.GetError(), "No error should occur during manifest fetch")
|
||||
suite.T().Log("✅ No errors during manifest fetch")
|
||||
}
|
||||
|
||||
func (suite *CodexIndexDownloaderIntegrationTestSuite) TestIntegration_DownloadIndexFile() {
|
||||
@ -157,6 +161,9 @@ func (suite *CodexIndexDownloaderIntegrationTestSuite) TestIntegration_DownloadI
|
||||
expectedSize := downloader.GetDatasetSize()
|
||||
suite.T().Logf("Expected file size: %d bytes", expectedSize)
|
||||
|
||||
// Verify no error from manifest fetch
|
||||
assert.NoError(suite.T(), downloader.GetError(), "No error should occur during manifest fetch")
|
||||
|
||||
// Start the download
|
||||
downloader.DownloadIndexFile()
|
||||
|
||||
@ -167,6 +174,14 @@ func (suite *CodexIndexDownloaderIntegrationTestSuite) TestIntegration_DownloadI
|
||||
|
||||
suite.T().Logf("✅ Download completed: %d/%d bytes", downloader.BytesCompleted(), expectedSize)
|
||||
|
||||
// Verify download is marked as complete
|
||||
assert.True(suite.T(), downloader.IsDownloadComplete(), "Download should be marked as complete")
|
||||
suite.T().Log("✅ Download marked as complete")
|
||||
|
||||
// Verify no error occurred during download
|
||||
assert.NoError(suite.T(), downloader.GetError(), "No error should occur during download")
|
||||
suite.T().Log("✅ No errors during download")
|
||||
|
||||
// Verify file exists and has correct size
|
||||
stat, err := os.Stat(filePath)
|
||||
require.NoError(suite.T(), err, "Downloaded file should exist")
|
||||
|
||||
@ -45,8 +45,8 @@ func (suite *CodexIndexDownloaderTestSuite) SetupTest() {
|
||||
// Create a fresh cancel channel for each test
|
||||
suite.cancelChan = make(chan struct{})
|
||||
|
||||
// Create logger
|
||||
suite.logger, _ = zap.NewDevelopment()
|
||||
// Use NOP logger for unit tests (no output noise)
|
||||
suite.logger = zap.NewNop()
|
||||
}
|
||||
|
||||
// TearDownTest runs after each test method
|
||||
@ -110,6 +110,9 @@ func (suite *CodexIndexDownloaderTestSuite) TestGotManifest_SuccessClosesChannel
|
||||
|
||||
// Verify dataset size was recorded
|
||||
assert.Equal(suite.T(), int64(1024), downloader.GetDatasetSize(), "Dataset size should be recorded")
|
||||
|
||||
// Verify no error was recorded
|
||||
assert.NoError(suite.T(), downloader.GetError(), "No error should be recorded on success")
|
||||
}
|
||||
|
||||
func (suite *CodexIndexDownloaderTestSuite) TestGotManifest_ErrorDoesNotCloseChannel() {
|
||||
@ -138,6 +141,14 @@ func (suite *CodexIndexDownloaderTestSuite) TestGotManifest_ErrorDoesNotCloseCha
|
||||
|
||||
// Verify dataset size was NOT recorded (should be 0)
|
||||
assert.Equal(suite.T(), int64(0), downloader.GetDatasetSize(), "Dataset size should be 0 on error")
|
||||
|
||||
// Verify download is not complete
|
||||
assert.False(suite.T(), downloader.IsDownloadComplete(), "Download should not be complete on error")
|
||||
|
||||
// Verify error was recorded
|
||||
assert.Error(suite.T(), downloader.GetError(), "Error should be recorded")
|
||||
assert.Contains(suite.T(), downloader.GetError().Error(), "fetch error", "Error message should contain fetch error")
|
||||
suite.T().Log("✅ Error was recorded correctly")
|
||||
}
|
||||
|
||||
func (suite *CodexIndexDownloaderTestSuite) TestGotManifest_CidMismatchDoesNotCloseChannel() {
|
||||
@ -172,6 +183,14 @@ func (suite *CodexIndexDownloaderTestSuite) TestGotManifest_CidMismatchDoesNotCl
|
||||
|
||||
// Verify dataset size was NOT recorded (should be 0)
|
||||
assert.Equal(suite.T(), int64(0), downloader.GetDatasetSize(), "Dataset size should be 0 on CID mismatch")
|
||||
|
||||
// Verify download is not complete
|
||||
assert.False(suite.T(), downloader.IsDownloadComplete(), "Download should not be complete on CID mismatch")
|
||||
|
||||
// Verify error was recorded
|
||||
assert.Error(suite.T(), downloader.GetError(), "Error should be recorded for CID mismatch")
|
||||
assert.Contains(suite.T(), downloader.GetError().Error(), "CID mismatch", "Error message should mention CID mismatch")
|
||||
suite.T().Log("✅ Error was recorded for CID mismatch")
|
||||
}
|
||||
|
||||
func (suite *CodexIndexDownloaderTestSuite) TestGotManifest_Cancellation() {
|
||||
@ -218,6 +237,14 @@ func (suite *CodexIndexDownloaderTestSuite) TestGotManifest_Cancellation() {
|
||||
|
||||
// Verify dataset size was NOT recorded
|
||||
assert.Equal(suite.T(), int64(0), downloader.GetDatasetSize(), "Dataset size should be 0 on cancellation")
|
||||
|
||||
// Verify download is not complete
|
||||
assert.False(suite.T(), downloader.IsDownloadComplete(), "Download should not be complete on cancellation")
|
||||
|
||||
// Verify error was recorded (context cancellation)
|
||||
assert.Error(suite.T(), downloader.GetError(), "Error should be recorded for cancellation")
|
||||
assert.ErrorIs(suite.T(), downloader.GetError(), context.Canceled, "Error should be context.Canceled")
|
||||
suite.T().Log("✅ Cancellation error was recorded correctly")
|
||||
}
|
||||
|
||||
func (suite *CodexIndexDownloaderTestSuite) TestGotManifest_RecordsDatasetSize() {
|
||||
@ -256,6 +283,9 @@ func (suite *CodexIndexDownloaderTestSuite) TestGotManifest_RecordsDatasetSize()
|
||||
// Verify dataset size was recorded correctly
|
||||
assert.Equal(suite.T(), expectedSize, downloader.GetDatasetSize(), "Dataset size should match manifest")
|
||||
suite.T().Logf("✅ Dataset size correctly recorded: %d", downloader.GetDatasetSize())
|
||||
|
||||
// Verify no error was recorded
|
||||
assert.NoError(suite.T(), downloader.GetError(), "No error should be recorded on success")
|
||||
}
|
||||
|
||||
// ==================== DownloadIndexFile Tests ====================
|
||||
@ -279,20 +309,35 @@ func (suite *CodexIndexDownloaderTestSuite) TestDownloadIndexFile_StoresFileCorr
|
||||
// Start download
|
||||
downloader.DownloadIndexFile()
|
||||
|
||||
// Wait for download to complete (check file existence and size)
|
||||
// Wait for download to complete (check bytes completed first, then file existence)
|
||||
require.Eventually(suite.T(), func() bool {
|
||||
// First check: all bytes downloaded
|
||||
if downloader.BytesCompleted() != int64(len(testData)) {
|
||||
return false
|
||||
}
|
||||
// Second check: download marked as complete (file renamed)
|
||||
if !downloader.IsDownloadComplete() {
|
||||
return false
|
||||
}
|
||||
// Third check: file actually exists with correct size
|
||||
stat, err := os.Stat(filePath)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return stat.Size() == int64(len(testData))
|
||||
}, 2*time.Second, 50*time.Millisecond, "File should be created with correct size")
|
||||
}, 2*time.Second, 50*time.Millisecond, "File should be fully downloaded and saved")
|
||||
|
||||
// Verify file contents
|
||||
actualData, err := os.ReadFile(filePath)
|
||||
require.NoError(suite.T(), err, "Should be able to read downloaded file")
|
||||
assert.Equal(suite.T(), testData, actualData, "File contents should match")
|
||||
suite.T().Logf("✅ File downloaded successfully to: %s", filePath)
|
||||
|
||||
// Verify download is complete
|
||||
assert.True(suite.T(), downloader.IsDownloadComplete(), "Download should be complete")
|
||||
|
||||
// Verify no error was recorded
|
||||
assert.NoError(suite.T(), downloader.GetError(), "No error should be recorded on successful download")
|
||||
}
|
||||
|
||||
func (suite *CodexIndexDownloaderTestSuite) TestDownloadIndexFile_TracksProgress() {
|
||||
@ -342,6 +387,12 @@ func (suite *CodexIndexDownloaderTestSuite) TestDownloadIndexFile_TracksProgress
|
||||
|
||||
assert.Equal(suite.T(), int64(len(testData)), downloader.BytesCompleted(), "All bytes should be downloaded")
|
||||
suite.T().Logf("✅ Download progress tracked correctly: %d/%d bytes", downloader.BytesCompleted(), len(testData))
|
||||
|
||||
// Verify download is complete
|
||||
assert.True(suite.T(), downloader.IsDownloadComplete(), "Download should be complete")
|
||||
|
||||
// Verify no error was recorded
|
||||
assert.NoError(suite.T(), downloader.GetError(), "No error should be recorded on successful download")
|
||||
}
|
||||
|
||||
func (suite *CodexIndexDownloaderTestSuite) TestDownloadIndexFile_Cancellation() {
|
||||
@ -356,7 +407,7 @@ func (suite *CodexIndexDownloaderTestSuite) TestDownloadIndexFile_Cancellation()
|
||||
close(downloadStarted) // Signal that download started
|
||||
|
||||
// Simulate slow download with cancellation check
|
||||
for i := 0; i < 100; i++ {
|
||||
for range 100 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err() // Return cancellation error
|
||||
@ -396,6 +447,20 @@ func (suite *CodexIndexDownloaderTestSuite) TestDownloadIndexFile_Cancellation()
|
||||
bytesCompleted := downloader.BytesCompleted()
|
||||
suite.T().Logf("✅ Download cancelled after %d bytes (should be < 100)", bytesCompleted)
|
||||
assert.Less(suite.T(), bytesCompleted, int64(100), "Download should be cancelled before completing all 100 bytes")
|
||||
|
||||
// Verify download is not complete
|
||||
assert.False(suite.T(), downloader.IsDownloadComplete(), "Download should not be complete on cancellation")
|
||||
|
||||
// Verify error was recorded (context cancellation)
|
||||
assert.Error(suite.T(), downloader.GetError(), "Error should be recorded for cancellation")
|
||||
assert.ErrorIs(suite.T(), downloader.GetError(), context.Canceled, "Error should be context.Canceled")
|
||||
suite.T().Log("✅ Cancellation error was recorded correctly")
|
||||
|
||||
// Verify that the target file does NOT exist (atomic write should clean up temp file on cancellation)
|
||||
_, err := os.Stat(filePath)
|
||||
assert.True(suite.T(), os.IsNotExist(err), "Target file should not exist after cancellation")
|
||||
suite.T().Log("✅ Target file does not exist after cancellation (temp file cleaned up)")
|
||||
assert.False(suite.T(), downloader.IsDownloadComplete(), "✅ IsDownloadComplete should be false after cancellation")
|
||||
}
|
||||
|
||||
func (suite *CodexIndexDownloaderTestSuite) TestDownloadIndexFile_ErrorHandling() {
|
||||
@ -420,13 +485,19 @@ func (suite *CodexIndexDownloaderTestSuite) TestDownloadIndexFile_ErrorHandling(
|
||||
assert.Equal(suite.T(), int64(0), downloader.BytesCompleted(), "No bytes should be recorded on error")
|
||||
suite.T().Log("✅ Error handling: no bytes recorded on download failure")
|
||||
|
||||
// File should exist but be empty (or not exist if creation failed)
|
||||
// This is current behavior - we might want to improve it to clean up on error
|
||||
if stat, err := os.Stat(filePath); err == nil {
|
||||
suite.T().Logf("File exists with size: %d bytes (current behavior)", stat.Size())
|
||||
} else {
|
||||
suite.T().Log("File does not exist (current behavior)")
|
||||
}
|
||||
// Verify download is not complete
|
||||
assert.False(suite.T(), downloader.IsDownloadComplete(), "Download should not be complete on error")
|
||||
|
||||
// Verify error was recorded
|
||||
assert.Error(suite.T(), downloader.GetError(), "Error should be recorded")
|
||||
assert.Contains(suite.T(), downloader.GetError().Error(), "download failed", "Error message should contain download failed")
|
||||
suite.T().Log("✅ Error was recorded correctly")
|
||||
|
||||
// Verify that the target file does NOT exist (atomic write should clean up temp file)
|
||||
_, err := os.Stat(filePath)
|
||||
assert.True(suite.T(), os.IsNotExist(err), "Target file should not exist on download error")
|
||||
suite.T().Log("✅ Target file does not exist after download error (temp file cleaned up)")
|
||||
assert.False(suite.T(), downloader.IsDownloadComplete(), "✅ IsDownloadComplete should be false after cancellation")
|
||||
}
|
||||
|
||||
func (suite *CodexIndexDownloaderTestSuite) TestLength_ReturnsDatasetSize() {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user