Make completion signaling more consistent

This commit is contained in:
Marcin Czenko 2025-10-27 21:34:40 +01:00
parent 5b99a900d5
commit f063d25da4
No known key found for this signature in database
GPG Key ID: A0449219BDBA98AE
2 changed files with 305 additions and 15 deletions

View File

@ -200,6 +200,7 @@ func (d *CodexArchiveDownloader) downloadAllArchives() {
delete(d.archiveDownloadCancel, hash)
}
d.cancelled = true
d.downloadComplete = true // Mark as complete even on cancellation
d.mu.Unlock()
return // Exit goroutine after cancellation
case <-ticker.C:

View File

@ -2,6 +2,7 @@ package communities_test
import (
"context"
"fmt"
"sync"
"testing"
"time"
@ -18,8 +19,8 @@ import (
"go.uber.org/zap"
)
// CodexArchiveDownloaderTestifySuite demonstrates testify's suite functionality
type CodexArchiveDownloaderTestifySuite struct {
// CodexArchiveDownloaderSuite demonstrates testify's suite functionality
type CodexArchiveDownloaderSuite struct {
suite.Suite
ctrl *gomock.Controller
mockClient *mock_communities.MockCodexClientInterface
@ -27,7 +28,7 @@ type CodexArchiveDownloaderTestifySuite struct {
}
// SetupTest runs before each test method
func (suite *CodexArchiveDownloaderTestifySuite) SetupTest() {
func (suite *CodexArchiveDownloaderSuite) SetupTest() {
suite.ctrl = gomock.NewController(suite.T())
suite.mockClient = mock_communities.NewMockCodexClientInterface(suite.ctrl)
suite.index = &protobuf.CodexWakuMessageArchiveIndex{
@ -44,11 +45,16 @@ func (suite *CodexArchiveDownloaderTestifySuite) SetupTest() {
}
// TearDownTest runs after each test method
func (suite *CodexArchiveDownloaderTestifySuite) TearDownTest() {
func (suite *CodexArchiveDownloaderSuite) TearDownTest() {
suite.ctrl.Finish()
}
func (suite *CodexArchiveDownloaderTestifySuite) TestBasicSingleArchive() {
// Run the test suite
func TestCodexArchiveDownloaderSuite(t *testing.T) {
suite.Run(t, new(CodexArchiveDownloaderSuite))
}
func (suite *CodexArchiveDownloaderSuite) TestBasicSingleArchive() {
// Test data
communityID := "test-community"
existingArchiveIDs := []string{} // No existing archives
@ -114,7 +120,7 @@ func (suite *CodexArchiveDownloaderTestifySuite) TestBasicSingleArchive() {
suite.T().Logf(" - Callback invoked: %v", callbackInvoked)
}
func (suite *CodexArchiveDownloaderTestifySuite) TestMultipleArchives() {
func (suite *CodexArchiveDownloaderSuite) TestMultipleArchives() {
// Create test data with multiple archives
index := &protobuf.CodexWakuMessageArchiveIndex{
Archives: map[string]*protobuf.CodexWakuMessageArchiveIndexMetadata{
@ -221,7 +227,7 @@ func (suite *CodexArchiveDownloaderTestifySuite) TestMultipleArchives() {
suite.T().Logf(" - Start order (sorted): %v", startOrder)
}
func (suite *CodexArchiveDownloaderTestifySuite) TestErrorDuringTriggerDownload() {
func (suite *CodexArchiveDownloaderSuite) TestErrorDuringTriggerDownload() {
// Test that errors during TriggerDownloadWithContext are handled properly
communityID := "test-community"
existingArchiveIDs := []string{} // No existing archives
@ -273,7 +279,7 @@ func (suite *CodexArchiveDownloaderTestifySuite) TestErrorDuringTriggerDownload(
suite.T().Log(" - Success callback was NOT invoked")
}
func (suite *CodexArchiveDownloaderTestifySuite) TestActualCancellationDuringTriggerDownload() {
func (suite *CodexArchiveDownloaderSuite) TestActualCancellationDuringTriggerDownload() {
// Test real cancellation during TriggerDownloadWithContext using DoAndReturn
communityID := "test-community"
existingArchiveIDs := []string{} // No existing archives
@ -327,7 +333,8 @@ func (suite *CodexArchiveDownloaderTestifySuite) TestActualCancellationDuringTri
assert.True(suite.T(), startCallbackInvoked, "Start callback should be invoked")
assert.False(suite.T(), callbackInvoked, "Success callback should NOT be invoked on cancellation")
assert.Equal(suite.T(), 0, downloader.GetTotalDownloadedArchivesCount(), "No archives should be downloaded on cancellation")
assert.True(suite.T(), downloader.IsDownloadComplete(), "Download should be complete (no pending downloads)")
assert.True(suite.T(), downloader.IsDownloadComplete(), "✅ Download should be complete after cancellation")
assert.True(suite.T(), downloader.IsCancelled(), "✅ Download should be marked as cancelled")
assert.Equal(suite.T(), 0, downloader.GetPendingArchivesCount(), "No archives should be pending")
suite.T().Log("✅ Actual cancellation during trigger download test passed")
@ -336,7 +343,7 @@ func (suite *CodexArchiveDownloaderTestifySuite) TestActualCancellationDuringTri
suite.T().Log(" - Success callback was NOT invoked")
}
func (suite *CodexArchiveDownloaderTestifySuite) TestCancellationDuringPolling() {
func (suite *CodexArchiveDownloaderSuite) TestCancellationDuringPolling() {
// Test that cancellation during the polling phase is handled properly
communityID := "test-community"
existingArchiveIDs := []string{} // No existing archives
@ -394,6 +401,7 @@ func (suite *CodexArchiveDownloaderTestifySuite) TestCancellationDuringPolling()
// Verify final state
assert.False(suite.T(), successCallbackInvoked, "Success callback should NOT be invoked on cancellation")
assert.Equal(suite.T(), 0, downloader.GetPendingArchivesCount(), "No archives should be pending after cancellation")
assert.True(suite.T(), downloader.IsDownloadComplete(), "✅ Download should be complete after cancellation")
assert.True(suite.T(), downloader.IsCancelled(), "Downloader should be marked as cancelled")
suite.T().Log("✅ Cancellation during polling test passed")
@ -403,7 +411,7 @@ func (suite *CodexArchiveDownloaderTestifySuite) TestCancellationDuringPolling()
suite.T().Log(" - Download marked as cancelled")
}
func (suite *CodexArchiveDownloaderTestifySuite) TestPollingTimeout() {
func (suite *CodexArchiveDownloaderSuite) TestPollingTimeout() {
// Test that polling timeout is handled properly (no success callback)
communityID := "test-community"
existingArchiveIDs := []string{} // No existing archives
@ -461,7 +469,7 @@ func (suite *CodexArchiveDownloaderTestifySuite) TestPollingTimeout() {
suite.T().Log(" - Success callback was NOT invoked")
}
func (suite *CodexArchiveDownloaderTestifySuite) TestWithExistingArchives() {
func (suite *CodexArchiveDownloaderSuite) TestWithExistingArchives() {
// Test with some archives already downloaded (existing archive IDs)
index := &protobuf.CodexWakuMessageArchiveIndex{
Archives: map[string]*protobuf.CodexWakuMessageArchiveIndexMetadata{
@ -546,7 +554,288 @@ func (suite *CodexArchiveDownloaderTestifySuite) TestWithExistingArchives() {
suite.T().Logf(" - Final count: %d total", downloader.GetTotalDownloadedArchivesCount())
}
// Run the test suite
func TestCodexArchiveDownloaderSuite(t *testing.T) {
suite.Run(t, new(CodexArchiveDownloaderTestifySuite))
// Test case: One success, one error
func (suite *CodexArchiveDownloaderSuite) TestPartialSuccess_OneSuccessOneError() {
communityID := "test-community"
cancelChan := make(chan struct{})
defer close(cancelChan)
// 2 archives: archive-2 (newer) succeeds, archive-1 (older) fails
index := &protobuf.CodexWakuMessageArchiveIndex{
Archives: map[string]*protobuf.CodexWakuMessageArchiveIndexMetadata{
"archive-1": {
Metadata: &protobuf.WakuMessageArchiveMetadata{From: 1000, To: 2000},
Cid: "cid-1",
},
"archive-2": {
Metadata: &protobuf.WakuMessageArchiveMetadata{From: 2000, To: 3000},
Cid: "cid-2",
},
},
}
// Archive-2 succeeds
suite.mockClient.EXPECT().
TriggerDownloadWithContext(gomock.Any(), "cid-2").
Return(&communities.CodexManifest{CID: "cid-2"}, nil)
suite.mockClient.EXPECT().
HasCid("cid-2").
Return(true, nil)
// Archive-1 fails
suite.mockClient.EXPECT().
TriggerDownloadWithContext(gomock.Any(), "cid-1").
Return(nil, fmt.Errorf("trigger failed"))
logger := zap.NewNop()
downloader := communities.NewCodexArchiveDownloader(suite.mockClient, index, communityID, []string{}, cancelChan, logger)
downloader.SetPollingInterval(10 * time.Millisecond)
downloader.SetPollingTimeout(1 * time.Second)
downloader.StartDownload()
// Wait for completion
require.Eventually(suite.T(), func() bool {
return downloader.IsDownloadComplete()
}, 3*time.Second, 50*time.Millisecond)
// Assertions
assert.True(suite.T(), downloader.IsDownloadComplete(), "✅ Should be complete")
assert.False(suite.T(), downloader.IsCancelled(), "✅ Should NOT be cancelled")
assert.Equal(suite.T(), 1, downloader.GetTotalDownloadedArchivesCount(), "✅ Should have 1 successful download")
suite.T().Log("✅ Partial success test passed (1 success, 1 error)")
}
// Test case: One success, one error, one cancellation
func (suite *CodexArchiveDownloaderSuite) TestPartialSuccess_SuccessErrorCancellation() {
communityID := "test-community"
cancelChan := make(chan struct{})
// 3 archives
index := &protobuf.CodexWakuMessageArchiveIndex{
Archives: map[string]*protobuf.CodexWakuMessageArchiveIndexMetadata{
"archive-1": {
Metadata: &protobuf.WakuMessageArchiveMetadata{From: 1000, To: 2000},
Cid: "cid-1",
},
"archive-2": {
Metadata: &protobuf.WakuMessageArchiveMetadata{From: 2000, To: 3000},
Cid: "cid-2",
},
"archive-3": {
Metadata: &protobuf.WakuMessageArchiveMetadata{From: 3000, To: 4000},
Cid: "cid-3",
},
},
}
// Archive-3 (newest) succeeds
suite.mockClient.EXPECT().
TriggerDownloadWithContext(gomock.Any(), "cid-3").
Return(&communities.CodexManifest{CID: "cid-3"}, nil)
suite.mockClient.EXPECT().
HasCid("cid-3").
Return(true, nil)
// Archive-2 fails
suite.mockClient.EXPECT().
TriggerDownloadWithContext(gomock.Any(), "cid-2").
Return(nil, fmt.Errorf("trigger failed"))
// Archive-1 will be cancelled (no expectations needed)
suite.mockClient.EXPECT().
TriggerDownloadWithContext(gomock.Any(), "cid-1").
DoAndReturn(func(ctx context.Context, cid string) (*communities.CodexManifest, error) {
<-ctx.Done() // Wait for cancellation
return nil, ctx.Err()
}).
AnyTimes()
logger := zap.NewNop()
downloader := communities.NewCodexArchiveDownloader(suite.mockClient, index, communityID, []string{}, cancelChan, logger)
downloader.SetPollingInterval(10 * time.Millisecond)
downloader.SetPollingTimeout(1 * time.Second)
downloader.StartDownload()
// Wait a bit for first two to process
time.Sleep(200 * time.Millisecond)
// Now cancel
close(cancelChan)
// Wait for completion
require.Eventually(suite.T(), func() bool {
return downloader.IsDownloadComplete()
}, 3*time.Second, 50*time.Millisecond)
// Assertions
assert.True(suite.T(), downloader.IsDownloadComplete(), "✅ Should be complete")
assert.True(suite.T(), downloader.IsCancelled(), "✅ Should be cancelled")
assert.Equal(suite.T(), 1, downloader.GetTotalDownloadedArchivesCount(), "✅ Should have 1 successful download")
suite.T().Log("✅ Partial success test passed (1 success, 1 error, 1 cancellation)")
}
// Test case: One success, then cancellation
func (suite *CodexArchiveDownloaderSuite) TestPartialSuccess_SuccessThenCancellation() {
communityID := "test-community"
cancelChan := make(chan struct{})
// 2 archives
index := &protobuf.CodexWakuMessageArchiveIndex{
Archives: map[string]*protobuf.CodexWakuMessageArchiveIndexMetadata{
"archive-1": {
Metadata: &protobuf.WakuMessageArchiveMetadata{From: 1000, To: 2000},
Cid: "cid-1",
},
"archive-2": {
Metadata: &protobuf.WakuMessageArchiveMetadata{From: 2000, To: 3000},
Cid: "cid-2",
},
},
}
// Archive-2 (newer) succeeds
suite.mockClient.EXPECT().
TriggerDownloadWithContext(gomock.Any(), "cid-2").
Return(&communities.CodexManifest{CID: "cid-2"}, nil)
suite.mockClient.EXPECT().
HasCid("cid-2").
Return(true, nil)
// Archive-1 will be cancelled
suite.mockClient.EXPECT().
TriggerDownloadWithContext(gomock.Any(), "cid-1").
DoAndReturn(func(ctx context.Context, cid string) (*communities.CodexManifest, error) {
<-ctx.Done() // Wait for cancellation
return nil, ctx.Err()
}).
AnyTimes()
logger := zap.NewNop()
downloader := communities.NewCodexArchiveDownloader(suite.mockClient, index, communityID, []string{}, cancelChan, logger)
downloader.SetPollingInterval(10 * time.Millisecond)
downloader.SetPollingTimeout(1 * time.Second)
downloader.StartDownload()
// Wait for first archive to complete
time.Sleep(200 * time.Millisecond)
// Now cancel
close(cancelChan)
// Wait for completion
require.Eventually(suite.T(), func() bool {
return downloader.IsDownloadComplete()
}, 3*time.Second, 50*time.Millisecond)
// Assertions
assert.True(suite.T(), downloader.IsDownloadComplete(), "✅ Should be complete")
assert.True(suite.T(), downloader.IsCancelled(), "✅ Should be cancelled")
assert.Equal(suite.T(), 1, downloader.GetTotalDownloadedArchivesCount(), "✅ Should have 1 successful download")
suite.T().Log("✅ Success then cancellation test passed")
}
// Test case: No success, only cancellation
func (suite *CodexArchiveDownloaderSuite) TestNoSuccess_OnlyCancellation() {
communityID := "test-community"
cancelChan := make(chan struct{})
// 2 archives
index := &protobuf.CodexWakuMessageArchiveIndex{
Archives: map[string]*protobuf.CodexWakuMessageArchiveIndexMetadata{
"archive-1": {
Metadata: &protobuf.WakuMessageArchiveMetadata{From: 1000, To: 2000},
Cid: "cid-1",
},
"archive-2": {
Metadata: &protobuf.WakuMessageArchiveMetadata{From: 2000, To: 3000},
Cid: "cid-2",
},
},
}
// Both archives will be cancelled
suite.mockClient.EXPECT().
TriggerDownloadWithContext(gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, cid string) (*communities.CodexManifest, error) {
<-ctx.Done() // Wait for cancellation
return nil, ctx.Err()
}).
AnyTimes()
logger := zap.NewNop()
downloader := communities.NewCodexArchiveDownloader(suite.mockClient, index, communityID, []string{}, cancelChan, logger)
downloader.SetPollingInterval(10 * time.Millisecond)
downloader.SetPollingTimeout(1 * time.Second)
downloader.StartDownload()
// Cancel immediately
time.Sleep(50 * time.Millisecond)
close(cancelChan)
// Wait for completion
require.Eventually(suite.T(), func() bool {
return downloader.IsDownloadComplete()
}, 3*time.Second, 50*time.Millisecond)
// Assertions
assert.True(suite.T(), downloader.IsDownloadComplete(), "✅ Should be complete")
assert.True(suite.T(), downloader.IsCancelled(), "✅ Should be cancelled")
assert.Equal(suite.T(), 0, downloader.GetTotalDownloadedArchivesCount(), "✅ Should have 0 successful downloads")
suite.T().Log("✅ Only cancellation test passed (no successful downloads)")
}
// Test case: No success, only errors
func (suite *CodexArchiveDownloaderSuite) TestNoSuccess_OnlyErrors() {
communityID := "test-community"
cancelChan := make(chan struct{})
defer close(cancelChan)
// 2 archives
index := &protobuf.CodexWakuMessageArchiveIndex{
Archives: map[string]*protobuf.CodexWakuMessageArchiveIndexMetadata{
"archive-1": {
Metadata: &protobuf.WakuMessageArchiveMetadata{From: 1000, To: 2000},
Cid: "cid-1",
},
"archive-2": {
Metadata: &protobuf.WakuMessageArchiveMetadata{From: 2000, To: 3000},
Cid: "cid-2",
},
},
}
// Both archives fail
suite.mockClient.EXPECT().
TriggerDownloadWithContext(gomock.Any(), "cid-1").
Return(nil, fmt.Errorf("trigger failed for cid-1"))
suite.mockClient.EXPECT().
TriggerDownloadWithContext(gomock.Any(), "cid-2").
Return(nil, fmt.Errorf("trigger failed for cid-2"))
logger := zap.NewNop()
downloader := communities.NewCodexArchiveDownloader(suite.mockClient, index, communityID, []string{}, cancelChan, logger)
downloader.SetPollingInterval(10 * time.Millisecond)
downloader.SetPollingTimeout(1 * time.Second)
downloader.StartDownload()
// Wait for completion
require.Eventually(suite.T(), func() bool {
return downloader.IsDownloadComplete()
}, 3*time.Second, 50*time.Millisecond)
// Assertions
assert.True(suite.T(), downloader.IsDownloadComplete(), "✅ Should be complete")
assert.False(suite.T(), downloader.IsCancelled(), "✅ Should NOT be cancelled")
assert.Equal(suite.T(), 0, downloader.GetTotalDownloadedArchivesCount(), "✅ Should have 0 successful downloads")
suite.T().Log("✅ Only errors test passed (no successful downloads)")
}