//go:build !disable_torrent // +build !disable_torrent package communities import ( "bytes" "context" "fmt" "slices" "sync" "time" "go-codex-client/protobuf" ) // CodexArchiveProcessor handles processing of downloaded archive data type CodexArchiveProcessor interface { // ProcessArchiveData processes the raw archive data and returns any error // The processor is responsible for extracting messages, handling them, // and saving the archive ID to persistence ProcessArchiveData(communityID string, archiveHash string, archiveData []byte, from, to uint64) error } // CodexArchiveDownloader handles downloading individual archive files from Codex storage type CodexArchiveDownloader struct { codexClient *CodexClient index *protobuf.CodexWakuMessageArchiveIndex communityID string existingArchiveIDs []string cancelChan chan struct{} // for cancellation support // Progress tracking totalArchivesCount int totalDownloadedArchivesCount int currentArchiveHash string archiveDownloadProgress map[string]int64 // hash -> bytes downloaded archiveDownloadCancel map[string]chan struct{} mu sync.RWMutex // Download control downloadComplete bool downloadError error cancelled bool // Callback for signaling archive completion with raw data onArchiveDownloaded func(hash string, from, to uint64, archiveData []byte) } // NewCodexArchiveDownloader creates a new archive downloader func NewCodexArchiveDownloader(codexClient *CodexClient, index *protobuf.CodexWakuMessageArchiveIndex, communityID string, existingArchiveIDs []string, cancelChan chan struct{}) *CodexArchiveDownloader { return &CodexArchiveDownloader{ codexClient: codexClient, index: index, communityID: communityID, existingArchiveIDs: existingArchiveIDs, cancelChan: cancelChan, totalArchivesCount: len(index.Archives), totalDownloadedArchivesCount: len(existingArchiveIDs), archiveDownloadProgress: make(map[string]int64), archiveDownloadCancel: make(map[string]chan struct{}), } } // SetOnArchiveDownloaded sets a callback function to be called when an archive is successfully downloaded func (d *CodexArchiveDownloader) SetOnArchiveDownloaded(callback func(hash string, from, to uint64, archiveData []byte)) { d.onArchiveDownloaded = callback } // GetTotalArchivesCount returns the total number of archives to download func (d *CodexArchiveDownloader) GetTotalArchivesCount() int { return d.totalArchivesCount } // GetTotalDownloadedArchivesCount returns the number of archives already downloaded func (d *CodexArchiveDownloader) GetTotalDownloadedArchivesCount() int { d.mu.RLock() defer d.mu.RUnlock() return d.totalDownloadedArchivesCount } func (d *CodexArchiveDownloader) GetPendingArchivesCount() int { d.mu.RLock() defer d.mu.RUnlock() return len(d.archiveDownloadCancel) } // GetCurrentArchiveHash returns the hash of the currently downloading archive func (d *CodexArchiveDownloader) GetCurrentArchiveHash() string { d.mu.RLock() defer d.mu.RUnlock() return d.currentArchiveHash } // GetArchiveDownloadProgress returns the download progress for a specific archive func (d *CodexArchiveDownloader) GetArchiveDownloadProgress(hash string) int64 { d.mu.RLock() defer d.mu.RUnlock() return d.archiveDownloadProgress[hash] } // IsDownloadComplete returns whether all archives have been downloaded func (d *CodexArchiveDownloader) IsDownloadComplete() bool { d.mu.RLock() defer d.mu.RUnlock() return d.downloadComplete } // GetDownloadError returns any error that occurred during download func (d *CodexArchiveDownloader) GetDownloadError() error { d.mu.RLock() defer d.mu.RUnlock() return d.downloadError } // IsCancelled returns whether the download was cancelled func (d *CodexArchiveDownloader) IsCancelled() bool { d.mu.RLock() defer d.mu.RUnlock() return d.cancelled } // StartDownload begins downloading all missing archives func (d *CodexArchiveDownloader) StartDownload() error { if d.totalArchivesCount == 0 { return fmt.Errorf("no archives to download") } go func() { err := d.downloadAllArchives() d.mu.Lock() d.downloadError = err d.downloadComplete = true d.mu.Unlock() }() return nil } // downloadAllArchives handles the main download loop for all archives func (d *CodexArchiveDownloader) downloadAllArchives() error { // Create sorted list of archives (newest first, like torrent version) type archiveInfo struct { hash string from uint64 cid string } var archivesList []archiveInfo for hash, metadata := range d.index.Archives { archivesList = append(archivesList, archiveInfo{ hash: hash, from: metadata.Metadata.From, cid: metadata.Cid, }) } // Sort by timestamp (newest first) - same as torrent version for i := 0; i < len(archivesList)-1; i++ { for j := i + 1; j < len(archivesList); j++ { if archivesList[i].from < archivesList[j].from { archivesList[i], archivesList[j] = archivesList[j], archivesList[i] } } } // Monitor for cancellation in a separate goroutine go func() { ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for { select { case <-d.cancelChan: d.mu.Lock() for hash, cancelChan := range d.archiveDownloadCancel { select { case <-cancelChan: // Already closed default: close(cancelChan) // Safe to close } delete(d.archiveDownloadCancel, hash) } d.cancelled = true d.mu.Unlock() return // Exit goroutine after cancellation case <-ticker.C: // Check if downloads are complete d.mu.RLock() complete := d.downloadComplete d.mu.RUnlock() if complete { return // Exit goroutine when downloads complete } } } }() // Download each missing archive for _, archive := range archivesList { // Check if we already have this archive hasArchive := slices.Contains(d.existingArchiveIDs, archive.hash) if hasArchive { continue } archiveCancelChan := make(chan struct{}) d.mu.Lock() d.currentArchiveHash = archive.hash d.archiveDownloadProgress[archive.hash] = 0 d.archiveDownloadCancel[archive.hash] = archiveCancelChan d.mu.Unlock() // Download this archive in parallel go func(archiveHash, archiveCid string, archiveCancel chan struct{}) { err := d.downloadSingleArchive(archiveHash, archiveCid, archiveCancel) d.mu.Lock() defer d.mu.Unlock() if err != nil { // Store the last error encountered d.downloadError = err } else { // Only increment on successful download d.totalDownloadedArchivesCount++ } // Remove from active downloads delete(d.archiveDownloadCancel, archiveHash) // Check if all downloads are complete if len(d.archiveDownloadCancel) == 0 { d.downloadComplete = true } }(archive.hash, archive.cid, archiveCancelChan) } return nil } // downloadSingleArchive downloads a single archive by its CID func (d *CodexArchiveDownloader) downloadSingleArchive(hash, cid string, cancelChan <-chan struct{}) error { // Create a context that can be cancelled via our cancel channel ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Monitor for cancellation in a separate goroutine go func() { select { case <-cancelChan: cancel() // Cancel the download immediately case <-ctx.Done(): // Context already cancelled, nothing to do } }() // Download the archive data into a buffer var archiveBuffer bytes.Buffer progressWriter := &archiveProgressWriter{ buffer: &archiveBuffer, hash: hash, progress: &d.archiveDownloadProgress, mu: &d.mu, } // Use context-aware download for immediate cancellation err := d.codexClient.DownloadWithContext(ctx, cid, progressWriter) if err != nil { return fmt.Errorf("failed to download archive data for CID %s: %w", cid, err) } // Get metadata for this archive metadata := d.index.Archives[hash] if metadata == nil { return fmt.Errorf("metadata not found for archive hash %s", hash) } // Call the callback with the downloaded archive data if d.onArchiveDownloaded != nil { d.onArchiveDownloaded(hash, metadata.Metadata.From, metadata.Metadata.To, archiveBuffer.Bytes()) } return nil } // archiveProgressWriter tracks download progress and collects data for individual archives type archiveProgressWriter struct { buffer *bytes.Buffer hash string progress *map[string]int64 mu *sync.RWMutex } func (apw *archiveProgressWriter) Write(p []byte) (n int, err error) { n = len(p) // Update progress tracking apw.mu.Lock() (*apw.progress)[apw.hash] += int64(n) apw.mu.Unlock() // Write data to buffer for processing return apw.buffer.Write(p) }