status-go/healthmanager/subscription_manager.go
Andrey Bocharnikov f3eed58c78
fix(healthmanager)_: extract subscriber logic from RPC Health Manager (#6147)
- Subscription common logic is extracted to a separate type.
- Fix race condition where a goroutine extracts value from sync.Map and then another goroutine calls unsubscribe and closes the channel before the first goroutine writes to the channel.
- Moved TestInterleavedChainStatusChanges and TestDelayedChainUpdate to the correct file.
- Renamed test suites with duplicate names.

updates CODEOWNERS
closes #6139

Co-authored-by: Igor Sirotin <sirotin@status.im>
2024-12-04 20:26:53 +04:00

53 lines
1.0 KiB
Go

package healthmanager
import (
"context"
"sync"
)
type SubscriptionManager struct {
mu sync.RWMutex
subscribers map[chan struct{}]struct{}
}
func NewSubscriptionManager() *SubscriptionManager {
return &SubscriptionManager{
subscribers: make(map[chan struct{}]struct{}),
}
}
func (s *SubscriptionManager) Subscribe() chan struct{} {
ch := make(chan struct{}, 1)
s.mu.Lock()
defer s.mu.Unlock()
s.subscribers[ch] = struct{}{}
return ch
}
func (s *SubscriptionManager) Unsubscribe(ch chan struct{}) {
s.mu.Lock()
defer s.mu.Unlock()
_, exist := s.subscribers[ch]
if !exist {
return
}
delete(s.subscribers, ch)
close(ch)
}
func (s *SubscriptionManager) Emit(ctx context.Context) {
s.mu.RLock()
defer s.mu.RUnlock()
for subscriber := range s.subscribers {
select {
case <-ctx.Done():
// Stop sending notifications when the context is cancelled
return
case subscriber <- struct{}{}:
// Notified successfully
default:
// Skip notification if the subscriber's channel is full (non-blocking)
}
}
}