simplifies CodexArchiveDownloader

This commit is contained in:
Marcin Czenko 2025-10-22 04:50:52 +02:00
parent 774660639b
commit 57f7b873ac
No known key found for this signature in database
GPG Key ID: A0449219BDBA98AE

View File

@ -4,9 +4,9 @@
package communities
import (
"bytes"
"context"
"fmt"
"log"
"slices"
"sync"
"time"
@ -43,8 +43,8 @@ type CodexArchiveDownloader struct {
downloadError error
cancelled bool
// Callback for signaling archive completion with raw data
onArchiveDownloaded func(hash string, from, to uint64, archiveData []byte)
// Callback for signaling archive download completion
onArchiveDownloaded func(hash string, from, to uint64)
}
// NewCodexArchiveDownloader creates a new archive downloader
@ -63,7 +63,7 @@ func NewCodexArchiveDownloader(codexClient *CodexClient, index *protobuf.CodexWa
}
// SetOnArchiveDownloaded sets a callback function to be called when an archive is successfully downloaded
func (d *CodexArchiveDownloader) SetOnArchiveDownloaded(callback func(hash string, from, to uint64, archiveData []byte)) {
func (d *CodexArchiveDownloader) SetOnArchiveDownloaded(callback func(hash string, from, to uint64)) {
d.onArchiveDownloaded = callback
}
@ -121,26 +121,17 @@ func (d *CodexArchiveDownloader) IsCancelled() bool {
}
// StartDownload begins downloading all missing archives
func (d *CodexArchiveDownloader) StartDownload() error {
if d.totalArchivesCount == 0 {
return fmt.Errorf("no archives to download")
}
go func() {
err := d.downloadAllArchives()
d.mu.Lock()
d.downloadError = err
d.downloadComplete = true
d.mu.Unlock()
}()
return nil
func (d *CodexArchiveDownloader) StartDownload() {
d.downloadAllArchives()
}
// downloadAllArchives handles the main download loop for all archives
func (d *CodexArchiveDownloader) downloadAllArchives() error {
func (d *CodexArchiveDownloader) downloadAllArchives() {
// Create sorted list of archives (newest first, like torrent version)
type archiveInfo struct {
hash string
from uint64
to uint64
cid string
}
@ -149,6 +140,7 @@ func (d *CodexArchiveDownloader) downloadAllArchives() error {
archivesList = append(archivesList, archiveInfo{
hash: hash,
from: metadata.Metadata.From,
to: metadata.Metadata.To,
cid: metadata.Cid,
})
}
@ -212,37 +204,63 @@ func (d *CodexArchiveDownloader) downloadAllArchives() error {
d.archiveDownloadCancel[archive.hash] = archiveCancelChan
d.mu.Unlock()
// Download this archive in parallel
go func(archiveHash, archiveCid string, archiveCancel chan struct{}) {
err := d.downloadSingleArchive(archiveHash, archiveCid, archiveCancel)
d.mu.Lock()
defer d.mu.Unlock()
// Trigger archive download and track progress in a goroutine
go func(archiveHash, archiveCid string, archiveFrom, archiveTo uint64, archiveCancel chan struct{}) {
err := d.triggerSingleArchiveDownload(archiveHash, archiveCid, archiveCancel)
if err != nil {
// Store the last error encountered
d.downloadError = err
} else {
// Only increment on successful download
// Update shared state with minimal lock scope
d.mu.Lock()
if err == nil {
d.totalDownloadedArchivesCount++
}
d.mu.Unlock()
// poll every second until we confirm it's downloaded
// or timeout after 30 seconds
timeout := time.After(30 * time.Second)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
PollLoop:
for {
select {
case <-timeout:
log.Printf("timeout waiting for CID %s to be available locally", archiveCid)
break PollLoop
case <-ticker.C:
hasCid, err := d.codexClient.HasCid(archiveCid)
if err != nil {
// Log error but continue polling
log.Printf("error checking CID %s: %v", archiveCid, err)
continue
}
if hasCid {
// CID is now available locally
break PollLoop
}
}
}
// Update shared state with minimal lock scope
d.mu.Lock()
// Remove from active downloads
delete(d.archiveDownloadCancel, archiveHash)
// Check if all downloads are complete
if len(d.archiveDownloadCancel) == 0 {
d.downloadComplete = true
}
}(archive.hash, archive.cid, archiveCancelChan)
}
d.downloadComplete = len(d.archiveDownloadCancel) == 0
d.mu.Unlock()
return nil
// Call the callback outside the lock to avoid blocking other operations
if d.onArchiveDownloaded != nil {
d.onArchiveDownloaded(archiveHash, archiveFrom, archiveTo)
}
}(archive.hash, archive.cid, archive.from, archive.to, archiveCancelChan)
}
}
// downloadSingleArchive downloads a single archive by its CID
func (d *CodexArchiveDownloader) downloadSingleArchive(hash, cid string, cancelChan <-chan struct{}) error {
// triggerSingleArchiveDownload downloads a single archive by its CID
func (d *CodexArchiveDownloader) triggerSingleArchiveDownload(hash, cid string, cancelChan <-chan struct{}) error {
// Create a context that can be cancelled via our cancel channel
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Monitor for cancellation in a separate goroutine
@ -255,51 +273,14 @@ func (d *CodexArchiveDownloader) downloadSingleArchive(hash, cid string, cancelC
}
}()
// Download the archive data into a buffer
var archiveBuffer bytes.Buffer
progressWriter := &archiveProgressWriter{
buffer: &archiveBuffer,
hash: hash,
progress: &d.archiveDownloadProgress,
mu: &d.mu,
}
// Use context-aware download for immediate cancellation
err := d.codexClient.DownloadWithContext(ctx, cid, progressWriter)
manifest, err := d.codexClient.TriggerDownloadWithContext(ctx, cid)
if err != nil {
return fmt.Errorf("failed to download archive data for CID %s: %w", cid, err)
return fmt.Errorf("failed to trigger archive download with CID %s: %w", cid, err)
}
// Get metadata for this archive
metadata := d.index.Archives[hash]
if metadata == nil {
return fmt.Errorf("metadata not found for archive hash %s", hash)
}
// Call the callback with the downloaded archive data
if d.onArchiveDownloaded != nil {
d.onArchiveDownloaded(hash, metadata.Metadata.From, metadata.Metadata.To, archiveBuffer.Bytes())
if manifest.CID != cid {
return fmt.Errorf("unexpected manifest CID %s, expected %s", manifest.CID, cid)
}
return nil
}
// archiveProgressWriter tracks download progress and collects data for individual archives
type archiveProgressWriter struct {
buffer *bytes.Buffer
hash string
progress *map[string]int64
mu *sync.RWMutex
}
func (apw *archiveProgressWriter) Write(p []byte) (n int, err error) {
n = len(p)
// Update progress tracking
apw.mu.Lock()
(*apw.progress)[apw.hash] += int64(n)
apw.mu.Unlock()
// Write data to buffer for processing
return apw.buffer.Write(p)
}