From f3eed58c7894a7ba79f7eace6de9160babc2ef7a Mon Sep 17 00:00:00 2001 From: Andrey Bocharnikov Date: Wed, 4 Dec 2024 23:26:53 +0700 Subject: [PATCH] fix(healthmanager)_: extract subscriber logic from RPC Health Manager (#6147) - Subscription common logic is extracted to a separate type. - Fix race condition where a goroutine extracts value from sync.Map and then another goroutine calls unsubscribe and closes the channel before the first goroutine writes to the channel. - Moved TestInterleavedChainStatusChanges and TestDelayedChainUpdate to the correct file. - Renamed test suites with duplicate names. updates CODEOWNERS closes #6139 Co-authored-by: Igor Sirotin --- .github/CODEOWNERS | 1 + healthmanager/blockchain_health_manager.go | 34 ++---- .../blockchain_health_manager_test.go | 96 ++++++++++++++--- healthmanager/providers_health_manager.go | 46 +++----- .../providers_health_manager_test.go | 78 +------------- healthmanager/subscription_manager.go | 52 +++++++++ healthmanager/subscription_manager_test.go | 101 ++++++++++++++++++ rpc/chain/blockchain_health_test.go | 28 ++--- 8 files changed, 279 insertions(+), 157 deletions(-) create mode 100644 healthmanager/subscription_manager.go create mode 100644 healthmanager/subscription_manager_test.go diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 2310d23cf..7c79679d5 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -14,3 +14,4 @@ shell.nix @status-im/devops # Feel free to add yourself for any new packages you implement. /cmd/status-backend @igor-sirotin /internal/sentry @igor-sirotin +/healthmanager @friofry \ No newline at end of file diff --git a/healthmanager/blockchain_health_manager.go b/healthmanager/blockchain_health_manager.go index 234ac32d2..4f5a68696 100644 --- a/healthmanager/blockchain_health_manager.go +++ b/healthmanager/blockchain_health_manager.go @@ -22,11 +22,10 @@ type BlockchainStatus struct { StatusPerChain map[uint64]rpcstatus.ProviderStatus `json:"statusPerChain"` } -// BlockchainHealthManager manages the state of all providers and aggregates their statuses. type BlockchainHealthManager struct { - mu sync.RWMutex - aggregator *aggregator.Aggregator - subscribers sync.Map // thread-safe + mu sync.RWMutex + aggregator *aggregator.Aggregator + subscriptionManager *SubscriptionManager providers map[uint64]*ProvidersHealthManager cancelFuncs map[uint64]context.CancelFunc // Map chainID to cancel functions @@ -38,9 +37,10 @@ type BlockchainHealthManager struct { func NewBlockchainHealthManager() *BlockchainHealthManager { agg := aggregator.NewAggregator("blockchain") return &BlockchainHealthManager{ - aggregator: agg, - providers: make(map[uint64]*ProvidersHealthManager), - cancelFuncs: make(map[uint64]context.CancelFunc), + aggregator: agg, + providers: make(map[uint64]*ProvidersHealthManager), + cancelFuncs: make(map[uint64]context.CancelFunc), + subscriptionManager: &SubscriptionManager{subscribers: make(map[chan struct{}]struct{})}, } } @@ -109,15 +109,12 @@ func (b *BlockchainHealthManager) Stop() { // Subscribe allows clients to receive notifications about changes. func (b *BlockchainHealthManager) Subscribe() chan struct{} { - ch := make(chan struct{}, 1) - b.subscribers.Store(ch, struct{}{}) - return ch + return b.subscriptionManager.Subscribe() } // Unsubscribe removes a subscriber from receiving notifications. func (b *BlockchainHealthManager) Unsubscribe(ch chan struct{}) { - b.subscribers.Delete(ch) // Удаляем подписчика из sync.Map - close(ch) + b.subscriptionManager.Unsubscribe(ch) } // aggregateAndUpdateStatus collects statuses from all providers and updates the overall and short status. @@ -185,18 +182,7 @@ func compareShortStatus(newStatus, previousStatus BlockchainStatus) bool { // emitBlockchainHealthStatus sends a notification to all subscribers about the new blockchain status. func (b *BlockchainHealthManager) emitBlockchainHealthStatus(ctx context.Context) { - b.subscribers.Range(func(key, value interface{}) bool { - subscriber := key.(chan struct{}) - select { - case <-ctx.Done(): - // Stop sending notifications when the context is cancelled - return false - case subscriber <- struct{}{}: - default: - // Skip notification if the subscriber's channel is full (non-blocking) - } - return true - }) + b.subscriptionManager.Emit(ctx) } func (b *BlockchainHealthManager) GetFullStatus() BlockchainFullStatus { diff --git a/healthmanager/blockchain_health_manager_test.go b/healthmanager/blockchain_health_manager_test.go index 88fdc447c..9c5c249de 100644 --- a/healthmanager/blockchain_health_manager_test.go +++ b/healthmanager/blockchain_health_manager_test.go @@ -113,17 +113,9 @@ func (s *BlockchainHealthManagerSuite) TestConcurrentSubscriptionUnsubscription( } wg.Wait() - - activeSubscribersCount := 0 - s.manager.subscribers.Range(func(key, value interface{}) bool { - activeSubscribersCount++ - return true - }) - // After all subscribers are removed, there should be no active subscribers - s.Equal(0, activeSubscribersCount, "Expected no subscribers after unsubscription") + s.Equal(0, len(s.manager.subscriptionManager.subscribers), "Expected no subscribers after unsubscription") } - func (s *BlockchainHealthManagerSuite) TestConcurrency() { var wg sync.WaitGroup chainsCount := 10 @@ -137,6 +129,7 @@ func (s *BlockchainHealthManagerSuite) TestConcurrency() { } ch := s.manager.Subscribe() + defer s.manager.Unsubscribe(ch) for i := 1; i <= chainsCount; i++ { wg.Add(1) @@ -144,14 +137,19 @@ func (s *BlockchainHealthManagerSuite) TestConcurrency() { defer wg.Done() phm := s.manager.providers[chainID] for j := 0; j < providersCount; j++ { + wg.Add(1) err := errors.New("connection error") if j == providersCount-1 { err = nil } name := fmt.Sprintf("provider-%d", j) - go phm.Update(ctx, []rpcstatus.RpcProviderCallStatus{ - {Name: name, Timestamp: time.Now(), Err: err}, - }) + + go func(name string, err error) { + defer wg.Done() + phm.Update(ctx, []rpcstatus.RpcProviderCallStatus{ + {Name: name, Timestamp: time.Now(), Err: err}, + }) + }(name, err) } }(uint64(i)) } @@ -232,6 +230,80 @@ func (s *BlockchainHealthManagerSuite) TestMixedProviderStatusInSingleChain() { s.Equal(rpcstatus.StatusUp, shortStatus.StatusPerChain[1].Status) // Chain 1 should be marked as down } +func (s *BlockchainHealthManagerSuite) TestInterleavedChainStatusChanges() { + // Register providers for chains 1, 2, and 3 + phm1 := NewProvidersHealthManager(1) + phm2 := NewProvidersHealthManager(2) + phm3 := NewProvidersHealthManager(3) + err := s.manager.RegisterProvidersHealthManager(s.ctx, phm1) + s.Require().NoError(err) + err = s.manager.RegisterProvidersHealthManager(s.ctx, phm2) + s.Require().NoError(err) + err = s.manager.RegisterProvidersHealthManager(s.ctx, phm3) + s.Require().NoError(err) + + // Subscribe to status updates + ch := s.manager.Subscribe() + + defer s.manager.Unsubscribe(ch) + + // Initially, all chains are up + phm1.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider_chain1", Timestamp: time.Now(), Err: nil}}) + phm2.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider_chain2", Timestamp: time.Now(), Err: nil}}) + phm3.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider_chain3", Timestamp: time.Now(), Err: nil}}) + + // Wait for the status to propagate + s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond) + + // Now chain 1 goes down, and chain 3 goes down at the same time + phm1.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider_chain1", Timestamp: time.Now(), Err: errors.New("connection error")}}) + phm3.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider_chain3", Timestamp: time.Now(), Err: errors.New("connection error")}}) + + // Wait for the status to reflect the changes + s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond) + + // Check that short status correctly reflects the mixed state + shortStatus := s.manager.GetStatusPerChain() + s.Equal(rpcstatus.StatusUp, shortStatus.Status.Status) + s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[1].Status) // Chain 1 is down + s.Equal(rpcstatus.StatusUp, shortStatus.StatusPerChain[2].Status) // Chain 2 is still up + s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[3].Status) // Chain 3 is down +} + +func (s *BlockchainHealthManagerSuite) TestDelayedChainUpdate() { + // Register providers for chains 1 and 2 + phm1 := NewProvidersHealthManager(1) + phm2 := NewProvidersHealthManager(2) + err := s.manager.RegisterProvidersHealthManager(s.ctx, phm1) + s.Require().NoError(err) + err = s.manager.RegisterProvidersHealthManager(s.ctx, phm2) + s.Require().NoError(err) + + // Subscribe to status updates + ch := s.manager.Subscribe() + defer s.manager.Unsubscribe(ch) + + // Initially, both chains are up + phm1.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider1_chain1", Timestamp: time.Now(), Err: nil}}) + s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond) + phm2.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider1_chain2", Timestamp: time.Now(), Err: nil}}) + s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond) + + // Chain 2 goes down + phm2.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider1_chain2", Timestamp: time.Now(), Err: errors.New("connection error")}}) + s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond) + + // Chain 1 goes down after a delay + phm1.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider1_chain1", Timestamp: time.Now(), Err: errors.New("connection error")}}) + s.waitForUpdate(ch, rpcstatus.StatusDown, 100*time.Millisecond) + + // Check that short status reflects the final state where both chains are down + shortStatus := s.manager.GetStatusPerChain() + s.Equal(rpcstatus.StatusDown, shortStatus.Status.Status) + s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[1].Status) // Chain 1 is down + s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[2].Status) // Chain 2 is down +} + func TestBlockchainHealthManagerSuite(t *testing.T) { suite.Run(t, new(BlockchainHealthManagerSuite)) } diff --git a/healthmanager/providers_health_manager.go b/healthmanager/providers_health_manager.go index 4d1c340e9..052876c7b 100644 --- a/healthmanager/providers_health_manager.go +++ b/healthmanager/providers_health_manager.go @@ -10,11 +10,11 @@ import ( ) type ProvidersHealthManager struct { - mu sync.RWMutex - chainID uint64 - aggregator *aggregator.Aggregator - subscribers sync.Map // Use sync.Map for concurrent access to subscribers - lastStatus *rpcstatus.ProviderStatus + mu sync.RWMutex + chainID uint64 + aggregator *aggregator.Aggregator + subscriptionManager *SubscriptionManager + lastStatus *rpcstatus.ProviderStatus } // NewProvidersHealthManager creates a new instance of ProvidersHealthManager with the given chain ID. @@ -22,8 +22,9 @@ func NewProvidersHealthManager(chainID uint64) *ProvidersHealthManager { agg := aggregator.NewAggregator(fmt.Sprintf("%d", chainID)) return &ProvidersHealthManager{ - chainID: chainID, - aggregator: agg, + chainID: chainID, + aggregator: agg, + subscriptionManager: NewSubscriptionManager(), } } @@ -61,25 +62,12 @@ func (p *ProvidersHealthManager) GetStatuses() map[string]rpcstatus.ProviderStat // Subscribe allows providers to receive notifications about changes. func (p *ProvidersHealthManager) Subscribe() chan struct{} { - ch := make(chan struct{}, 1) - p.subscribers.Store(ch, struct{}{}) - return ch + return p.subscriptionManager.Subscribe() } // Unsubscribe removes a subscriber from receiving notifications. func (p *ProvidersHealthManager) Unsubscribe(ch chan struct{}) { - p.subscribers.Delete(ch) - close(ch) -} - -// UnsubscribeAll removes all subscriber channels. -func (p *ProvidersHealthManager) UnsubscribeAll() { - p.subscribers.Range(func(key, value interface{}) bool { - ch := key.(chan struct{}) - close(ch) - p.subscribers.Delete(key) - return true - }) + p.subscriptionManager.Unsubscribe(ch) } // Reset clears all provider statuses and resets the chain status to unknown. @@ -89,7 +77,7 @@ func (p *ProvidersHealthManager) Reset() { p.aggregator = aggregator.NewAggregator(fmt.Sprintf("%d", p.chainID)) } -// Status Returns the current aggregated status. +// Status returns the current aggregated status. func (p *ProvidersHealthManager) Status() rpcstatus.ProviderStatus { p.mu.RLock() defer p.mu.RUnlock() @@ -103,15 +91,5 @@ func (p *ProvidersHealthManager) ChainID() uint64 { // emitChainStatus sends a notification to all subscribers. func (p *ProvidersHealthManager) emitChainStatus(ctx context.Context) { - p.subscribers.Range(func(key, value interface{}) bool { - subscriber := key.(chan struct{}) - select { - case subscriber <- struct{}{}: - case <-ctx.Done(): - return false // Stop sending if context is done - default: - // Non-blocking send; skip if the channel is full - } - return true - }) + p.subscriptionManager.Emit(ctx) } diff --git a/healthmanager/providers_health_manager_test.go b/healthmanager/providers_health_manager_test.go index c40e3228f..f9450a897 100644 --- a/healthmanager/providers_health_manager_test.go +++ b/healthmanager/providers_health_manager_test.go @@ -62,7 +62,9 @@ func (s *ProvidersHealthManagerSuite) TestInitialStatus() { } func (s *ProvidersHealthManagerSuite) TestUpdateProviderStatuses() { - s.updateAndWait(s.phm.Subscribe(), []rpcstatus.RpcProviderCallStatus{ + ch := s.phm.Subscribe() + defer s.phm.Unsubscribe(ch) + s.updateAndWait(ch, []rpcstatus.RpcProviderCallStatus{ {Name: "Provider1", Timestamp: time.Now(), Err: nil}, {Name: "Provider2", Timestamp: time.Now(), Err: errors.New("connection error")}, }, rpcstatus.StatusUp, time.Second) @@ -75,6 +77,7 @@ func (s *ProvidersHealthManagerSuite) TestUpdateProviderStatuses() { func (s *ProvidersHealthManagerSuite) TestChainStatusUpdatesOnce() { ch := s.phm.Subscribe() + defer s.phm.Unsubscribe(ch) s.assertChainStatus(rpcstatus.StatusDown) // Update providers to Down @@ -88,6 +91,7 @@ func (s *ProvidersHealthManagerSuite) TestChainStatusUpdatesOnce() { func (s *ProvidersHealthManagerSuite) TestSubscribeReceivesOnlyOnChange() { ch := s.phm.Subscribe() + defer s.phm.Unsubscribe(ch) // Update provider to Up and wait for notification upStatuses := []rpcstatus.RpcProviderCallStatus{ @@ -136,78 +140,6 @@ func (s *ProvidersHealthManagerSuite) TestConcurrency() { s.Equal(chainStatus, rpcstatus.StatusUp, "Expected chain status to be either Up or Down") } -func (s *BlockchainHealthManagerSuite) TestInterleavedChainStatusChanges() { - // Register providers for chains 1, 2, and 3 - phm1 := NewProvidersHealthManager(1) - phm2 := NewProvidersHealthManager(2) - phm3 := NewProvidersHealthManager(3) - err := s.manager.RegisterProvidersHealthManager(s.ctx, phm1) - s.Require().NoError(err) - err = s.manager.RegisterProvidersHealthManager(s.ctx, phm2) - s.Require().NoError(err) - err = s.manager.RegisterProvidersHealthManager(s.ctx, phm3) - s.Require().NoError(err) - - // Subscribe to status updates - ch := s.manager.Subscribe() - defer s.manager.Unsubscribe(ch) - - // Initially, all chains are up - phm1.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider_chain1", Timestamp: time.Now(), Err: nil}}) - phm2.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider_chain2", Timestamp: time.Now(), Err: nil}}) - phm3.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider_chain3", Timestamp: time.Now(), Err: nil}}) - - // Wait for the status to propagate - s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond) - - // Now chain 1 goes down, and chain 3 goes down at the same time - phm1.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider_chain1", Timestamp: time.Now(), Err: errors.New("connection error")}}) - phm3.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider_chain3", Timestamp: time.Now(), Err: errors.New("connection error")}}) - - // Wait for the status to reflect the changes - s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond) - - // Check that short status correctly reflects the mixed state - shortStatus := s.manager.GetStatusPerChain() - s.Equal(rpcstatus.StatusUp, shortStatus.Status.Status) - s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[1].Status) // Chain 1 is down - s.Equal(rpcstatus.StatusUp, shortStatus.StatusPerChain[2].Status) // Chain 2 is still up - s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[3].Status) // Chain 3 is down -} - -func (s *BlockchainHealthManagerSuite) TestDelayedChainUpdate() { - // Register providers for chains 1 and 2 - phm1 := NewProvidersHealthManager(1) - phm2 := NewProvidersHealthManager(2) - err := s.manager.RegisterProvidersHealthManager(s.ctx, phm1) - s.Require().NoError(err) - err = s.manager.RegisterProvidersHealthManager(s.ctx, phm2) - s.Require().NoError(err) - - // Subscribe to status updates - ch := s.manager.Subscribe() - defer s.manager.Unsubscribe(ch) - - // Initially, both chains are up - phm1.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider1_chain1", Timestamp: time.Now(), Err: nil}}) - phm2.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider1_chain2", Timestamp: time.Now(), Err: nil}}) - s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond) - - // Chain 2 goes down - phm2.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider1_chain2", Timestamp: time.Now(), Err: errors.New("connection error")}}) - s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond) - - // Chain 1 goes down after a delay - phm1.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider1_chain1", Timestamp: time.Now(), Err: errors.New("connection error")}}) - s.waitForUpdate(ch, rpcstatus.StatusDown, 100*time.Millisecond) - - // Check that short status reflects the final state where both chains are down - shortStatus := s.manager.GetStatusPerChain() - s.Equal(rpcstatus.StatusDown, shortStatus.Status.Status) - s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[1].Status) // Chain 1 is down - s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[2].Status) // Chain 2 is down -} - func TestProvidersHealthManagerSuite(t *testing.T) { suite.Run(t, new(ProvidersHealthManagerSuite)) } diff --git a/healthmanager/subscription_manager.go b/healthmanager/subscription_manager.go new file mode 100644 index 000000000..d810372f2 --- /dev/null +++ b/healthmanager/subscription_manager.go @@ -0,0 +1,52 @@ +package healthmanager + +import ( + "context" + "sync" +) + +type SubscriptionManager struct { + mu sync.RWMutex + subscribers map[chan struct{}]struct{} +} + +func NewSubscriptionManager() *SubscriptionManager { + return &SubscriptionManager{ + subscribers: make(map[chan struct{}]struct{}), + } +} + +func (s *SubscriptionManager) Subscribe() chan struct{} { + ch := make(chan struct{}, 1) + s.mu.Lock() + defer s.mu.Unlock() + s.subscribers[ch] = struct{}{} + return ch +} + +func (s *SubscriptionManager) Unsubscribe(ch chan struct{}) { + s.mu.Lock() + defer s.mu.Unlock() + _, exist := s.subscribers[ch] + if !exist { + return + } + delete(s.subscribers, ch) + close(ch) +} + +func (s *SubscriptionManager) Emit(ctx context.Context) { + s.mu.RLock() + defer s.mu.RUnlock() + for subscriber := range s.subscribers { + select { + case <-ctx.Done(): + // Stop sending notifications when the context is cancelled + return + case subscriber <- struct{}{}: + // Notified successfully + default: + // Skip notification if the subscriber's channel is full (non-blocking) + } + } +} diff --git a/healthmanager/subscription_manager_test.go b/healthmanager/subscription_manager_test.go new file mode 100644 index 000000000..c7fae4ff3 --- /dev/null +++ b/healthmanager/subscription_manager_test.go @@ -0,0 +1,101 @@ +package healthmanager + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestSubscriptionManager(t *testing.T) { + // Create a new SubscriptionManager + sm := NewSubscriptionManager() + + // Create a context for cancellation + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create a WaitGroup to wait for all goroutines to finish + var wg sync.WaitGroup + + // Test adding subscribers and receiving notifications + subscriberCount := 5 + notificationReceived := make([]bool, subscriberCount) + + for i := 0; i < subscriberCount; i++ { + ch := sm.Subscribe() + idx := i // Copy the value of i to use inside the goroutine + + wg.Add(1) + go func(ch chan struct{}, idx int) { + defer wg.Done() + select { + case <-ch: + notificationReceived[idx] = true + case <-time.After(1 * time.Second): + // Timeout waiting for notification + } + }(ch, idx) + } + + // Emit a notification to all subscribers + sm.Emit(ctx) + + // Wait for all goroutines to finish + wg.Wait() + + // Verify that all subscribers received the notification + for i, received := range notificationReceived { + require.Truef(t, received, "Subscriber %d did not receive notification", i) + } + + // Test that notifications are not sent to closed channels + chClosed := sm.Subscribe() + // Ensure that unsubscribe handles already closed channels properly + sm.Unsubscribe(chClosed) + + // ensure subscription is removed + _, exists := sm.subscribers[chClosed] + require.False(t, exists, "Subscription was not removed") + + sm.Unsubscribe(chClosed) + // Emit a notification + sm.Emit(ctx) + + // If no panic occurs, the test is successful +} + +func TestSubscriptionManager_MultipleEmitsCollapse(t *testing.T) { + sm := NewSubscriptionManager() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch := sm.Subscribe() + defer sm.Unsubscribe(ch) + + var received int + var mu sync.Mutex + + go func() { + for range ch { + mu.Lock() + received++ + mu.Unlock() + } + }() + + // Emit multiple notifications + sm.Emit(ctx) + sm.Emit(ctx) + sm.Emit(ctx) + + // Allow some time for the goroutine to process notifications + time.Sleep(100 * time.Millisecond) + + mu.Lock() + require.Equalf(t, 1, received, "Expected 1 notifications, but received %d", received) + mu.Unlock() +} diff --git a/rpc/chain/blockchain_health_test.go b/rpc/chain/blockchain_health_test.go index 7665ee7f9..6764fe8ae 100644 --- a/rpc/chain/blockchain_health_test.go +++ b/rpc/chain/blockchain_health_test.go @@ -23,7 +23,7 @@ import ( "github.com/status-im/status-go/rpc/chain/ethclient" ) -type BlockchainHealthManagerSuite struct { +type BlockchainHealthSuite struct { suite.Suite blockchainHealthManager *healthmanager.BlockchainHealthManager mockProviders map[uint64]*healthmanager.ProvidersHealthManager @@ -32,7 +32,7 @@ type BlockchainHealthManagerSuite struct { mockCtrl *gomock.Controller } -func (s *BlockchainHealthManagerSuite) SetupTest() { +func (s *BlockchainHealthSuite) SetupTest() { s.blockchainHealthManager = healthmanager.NewBlockchainHealthManager() s.mockProviders = make(map[uint64]*healthmanager.ProvidersHealthManager) s.mockEthClients = make(map[uint64]*mockEthclient.MockRPSLimitedEthClientInterface) @@ -40,12 +40,12 @@ func (s *BlockchainHealthManagerSuite) SetupTest() { s.mockCtrl = gomock.NewController(s.T()) } -func (s *BlockchainHealthManagerSuite) TearDownTest() { +func (s *BlockchainHealthSuite) TearDownTest() { s.blockchainHealthManager.Stop() s.mockCtrl.Finish() } -func (s *BlockchainHealthManagerSuite) setupClients(chainIDs []uint64) { +func (s *BlockchainHealthSuite) setupClients(chainIDs []uint64) { ctx := context.Background() for _, chainID := range chainIDs { @@ -65,7 +65,7 @@ func (s *BlockchainHealthManagerSuite) setupClients(chainIDs []uint64) { } } -func (s *BlockchainHealthManagerSuite) simulateChainStatus(chainID uint64, up bool) { +func (s *BlockchainHealthSuite) simulateChainStatus(chainID uint64, up bool) { client, exists := s.clients[chainID] require.True(s.T(), exists, "Client for chainID %d not found", chainID) @@ -85,7 +85,7 @@ func (s *BlockchainHealthManagerSuite) simulateChainStatus(chainID uint64, up bo } } -func (s *BlockchainHealthManagerSuite) waitForStatus(statusCh chan struct{}, expectedStatus rpcstatus.StatusType) { +func (s *BlockchainHealthSuite) waitForStatus(statusCh chan struct{}, expectedStatus rpcstatus.StatusType) { timeout := time.After(2 * time.Second) for { select { @@ -101,7 +101,7 @@ func (s *BlockchainHealthManagerSuite) waitForStatus(statusCh chan struct{}, exp } } -func (s *BlockchainHealthManagerSuite) TestAllChainsUp() { +func (s *BlockchainHealthSuite) TestAllChainsUp() { s.setupClients([]uint64{1, 2, 3}) statusCh := s.blockchainHealthManager.Subscribe() @@ -114,7 +114,7 @@ func (s *BlockchainHealthManagerSuite) TestAllChainsUp() { s.waitForStatus(statusCh, rpcstatus.StatusUp) } -func (s *BlockchainHealthManagerSuite) TestSomeChainsDown() { +func (s *BlockchainHealthSuite) TestSomeChainsDown() { s.setupClients([]uint64{1, 2, 3}) statusCh := s.blockchainHealthManager.Subscribe() @@ -127,7 +127,7 @@ func (s *BlockchainHealthManagerSuite) TestSomeChainsDown() { s.waitForStatus(statusCh, rpcstatus.StatusUp) } -func (s *BlockchainHealthManagerSuite) TestAllChainsDown() { +func (s *BlockchainHealthSuite) TestAllChainsDown() { s.setupClients([]uint64{1, 2}) statusCh := s.blockchainHealthManager.Subscribe() @@ -139,7 +139,7 @@ func (s *BlockchainHealthManagerSuite) TestAllChainsDown() { s.waitForStatus(statusCh, rpcstatus.StatusDown) } -func (s *BlockchainHealthManagerSuite) TestChainStatusChanges() { +func (s *BlockchainHealthSuite) TestChainStatusChanges() { s.setupClients([]uint64{1, 2}) statusCh := s.blockchainHealthManager.Subscribe() @@ -153,7 +153,7 @@ func (s *BlockchainHealthManagerSuite) TestChainStatusChanges() { s.waitForStatus(statusCh, rpcstatus.StatusUp) } -func (s *BlockchainHealthManagerSuite) TestGetFullStatus() { +func (s *BlockchainHealthSuite) TestGetFullStatus() { // Setup clients for chain IDs 1 and 2 s.setupClients([]uint64{1, 2}) @@ -233,7 +233,7 @@ func (s *BlockchainHealthManagerSuite) TestGetFullStatus() { require.NotEmpty(s.T(), jsonData) } -func (s *BlockchainHealthManagerSuite) TestGetShortStatus() { +func (s *BlockchainHealthSuite) TestGetShortStatus() { // Setup clients for chain IDs 1 and 2 s.setupClients([]uint64{1, 2}) @@ -294,6 +294,6 @@ func (s *BlockchainHealthManagerSuite) TestGetShortStatus() { require.NotEmpty(s.T(), jsonData) } -func TestBlockchainHealthManagerSuite(t *testing.T) { - suite.Run(t, new(BlockchainHealthManagerSuite)) +func TestBlockchainHealthSuite(t *testing.T) { + suite.Run(t, new(BlockchainHealthSuite)) }