diff --git a/communities/codex_archive_downloader.go b/communities/codex_archive_downloader.go index e24cb28..b922fd4 100644 --- a/communities/codex_archive_downloader.go +++ b/communities/codex_archive_downloader.go @@ -4,9 +4,9 @@ package communities import ( - "bytes" "context" "fmt" + "log" "slices" "sync" "time" @@ -43,8 +43,8 @@ type CodexArchiveDownloader struct { downloadError error cancelled bool - // Callback for signaling archive completion with raw data - onArchiveDownloaded func(hash string, from, to uint64, archiveData []byte) + // Callback for signaling archive download completion + onArchiveDownloaded func(hash string, from, to uint64) } // NewCodexArchiveDownloader creates a new archive downloader @@ -63,7 +63,7 @@ func NewCodexArchiveDownloader(codexClient *CodexClient, index *protobuf.CodexWa } // SetOnArchiveDownloaded sets a callback function to be called when an archive is successfully downloaded -func (d *CodexArchiveDownloader) SetOnArchiveDownloaded(callback func(hash string, from, to uint64, archiveData []byte)) { +func (d *CodexArchiveDownloader) SetOnArchiveDownloaded(callback func(hash string, from, to uint64)) { d.onArchiveDownloaded = callback } @@ -121,26 +121,17 @@ func (d *CodexArchiveDownloader) IsCancelled() bool { } // StartDownload begins downloading all missing archives -func (d *CodexArchiveDownloader) StartDownload() error { - if d.totalArchivesCount == 0 { - return fmt.Errorf("no archives to download") - } - go func() { - err := d.downloadAllArchives() - d.mu.Lock() - d.downloadError = err - d.downloadComplete = true - d.mu.Unlock() - }() - return nil +func (d *CodexArchiveDownloader) StartDownload() { + d.downloadAllArchives() } // downloadAllArchives handles the main download loop for all archives -func (d *CodexArchiveDownloader) downloadAllArchives() error { +func (d *CodexArchiveDownloader) downloadAllArchives() { // Create sorted list of archives (newest first, like torrent version) type archiveInfo struct { hash string from uint64 + to uint64 cid string } @@ -149,6 +140,7 @@ func (d *CodexArchiveDownloader) downloadAllArchives() error { archivesList = append(archivesList, archiveInfo{ hash: hash, from: metadata.Metadata.From, + to: metadata.Metadata.To, cid: metadata.Cid, }) } @@ -212,37 +204,63 @@ func (d *CodexArchiveDownloader) downloadAllArchives() error { d.archiveDownloadCancel[archive.hash] = archiveCancelChan d.mu.Unlock() - // Download this archive in parallel - go func(archiveHash, archiveCid string, archiveCancel chan struct{}) { - err := d.downloadSingleArchive(archiveHash, archiveCid, archiveCancel) - d.mu.Lock() - defer d.mu.Unlock() + // Trigger archive download and track progress in a goroutine + go func(archiveHash, archiveCid string, archiveFrom, archiveTo uint64, archiveCancel chan struct{}) { + err := d.triggerSingleArchiveDownload(archiveHash, archiveCid, archiveCancel) - if err != nil { - // Store the last error encountered - d.downloadError = err - } else { - // Only increment on successful download + // Update shared state with minimal lock scope + d.mu.Lock() + if err == nil { d.totalDownloadedArchivesCount++ } + d.mu.Unlock() + // poll every second until we confirm it's downloaded + // or timeout after 30 seconds + timeout := time.After(30 * time.Second) + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + PollLoop: + for { + select { + case <-timeout: + log.Printf("timeout waiting for CID %s to be available locally", archiveCid) + break PollLoop + case <-ticker.C: + hasCid, err := d.codexClient.HasCid(archiveCid) + if err != nil { + // Log error but continue polling + log.Printf("error checking CID %s: %v", archiveCid, err) + continue + } + if hasCid { + // CID is now available locally + break PollLoop + } + } + } + + // Update shared state with minimal lock scope + d.mu.Lock() // Remove from active downloads delete(d.archiveDownloadCancel, archiveHash) // Check if all downloads are complete - if len(d.archiveDownloadCancel) == 0 { - d.downloadComplete = true - } - }(archive.hash, archive.cid, archiveCancelChan) - } + d.downloadComplete = len(d.archiveDownloadCancel) == 0 + d.mu.Unlock() - return nil + // Call the callback outside the lock to avoid blocking other operations + if d.onArchiveDownloaded != nil { + d.onArchiveDownloaded(archiveHash, archiveFrom, archiveTo) + } + }(archive.hash, archive.cid, archive.from, archive.to, archiveCancelChan) + } } -// downloadSingleArchive downloads a single archive by its CID -func (d *CodexArchiveDownloader) downloadSingleArchive(hash, cid string, cancelChan <-chan struct{}) error { +// triggerSingleArchiveDownload downloads a single archive by its CID +func (d *CodexArchiveDownloader) triggerSingleArchiveDownload(hash, cid string, cancelChan <-chan struct{}) error { // Create a context that can be cancelled via our cancel channel - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() // Monitor for cancellation in a separate goroutine @@ -255,51 +273,14 @@ func (d *CodexArchiveDownloader) downloadSingleArchive(hash, cid string, cancelC } }() - // Download the archive data into a buffer - var archiveBuffer bytes.Buffer - progressWriter := &archiveProgressWriter{ - buffer: &archiveBuffer, - hash: hash, - progress: &d.archiveDownloadProgress, - mu: &d.mu, - } - - // Use context-aware download for immediate cancellation - err := d.codexClient.DownloadWithContext(ctx, cid, progressWriter) + manifest, err := d.codexClient.TriggerDownloadWithContext(ctx, cid) if err != nil { - return fmt.Errorf("failed to download archive data for CID %s: %w", cid, err) + return fmt.Errorf("failed to trigger archive download with CID %s: %w", cid, err) } - // Get metadata for this archive - metadata := d.index.Archives[hash] - if metadata == nil { - return fmt.Errorf("metadata not found for archive hash %s", hash) - } - - // Call the callback with the downloaded archive data - if d.onArchiveDownloaded != nil { - d.onArchiveDownloaded(hash, metadata.Metadata.From, metadata.Metadata.To, archiveBuffer.Bytes()) + if manifest.CID != cid { + return fmt.Errorf("unexpected manifest CID %s, expected %s", manifest.CID, cid) } return nil } - -// archiveProgressWriter tracks download progress and collects data for individual archives -type archiveProgressWriter struct { - buffer *bytes.Buffer - hash string - progress *map[string]int64 - mu *sync.RWMutex -} - -func (apw *archiveProgressWriter) Write(p []byte) (n int, err error) { - n = len(p) - - // Update progress tracking - apw.mu.Lock() - (*apw.progress)[apw.hash] += int64(n) - apw.mu.Unlock() - - // Write data to buffer for processing - return apw.buffer.Write(p) -}