fix_: make mailserver availability subscriptions concurrency safe (#5804)

This commit is contained in:
richΛrd 2024-09-05 15:18:23 -04:00 committed by GitHub
parent 771a0c3562
commit c24eba8af2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 30 additions and 22 deletions

View File

@ -216,7 +216,30 @@ type mailserverCycle struct {
allMailservers []mailserversDB.Mailserver
activeMailserver *mailserversDB.Mailserver
peers map[string]peerStatus
availabilitySubscriptions []chan struct{}
availabilitySubscriptions *availabilitySubscriptions
}
type availabilitySubscriptions struct {
sync.Mutex
subscriptions []chan struct{}
}
func (s *availabilitySubscriptions) Subscribe() <-chan struct{} {
s.Lock()
defer s.Unlock()
c := make(chan struct{})
s.subscriptions = append(s.subscriptions, c)
return c
}
func (s *availabilitySubscriptions) EmitMailserverAvailable() {
s.Lock()
defer s.Unlock()
for _, subs := range s.subscriptions {
close(subs)
}
s.subscriptions = nil
}
type EnvelopeEventsInterceptor struct {
@ -602,7 +625,7 @@ func NewMessenger(
verificationDatabase: verification.NewPersistence(database),
mailserverCycle: mailserverCycle{
peers: make(map[string]peerStatus),
availabilitySubscriptions: make([]chan struct{}, 0),
availabilitySubscriptions: &availabilitySubscriptions{},
},
mailserversDatabase: c.mailserversDatabase,
communityStorenodes: storenodes.NewCommunityStorenodes(storenodes.NewDB(database), logger),
@ -882,7 +905,7 @@ func (m *Messenger) Start() (*MessengerResponse, error) {
}
if m.archiveManager.IsReady() {
available := m.SubscribeMailserverAvailable()
available := m.mailserverCycle.availabilitySubscriptions.Subscribe()
go func() {
<-available
m.InitHistoryArchiveTasks(controlledCommunities)

View File

@ -401,7 +401,7 @@ func (m *Messenger) checkForMissingMessagesLoop() {
t := time.NewTicker(missingMessageCheckPeriod)
defer t.Stop()
mailserverAvailableSignal := m.SubscribeMailserverAvailable()
mailserverAvailableSignal := m.mailserverCycle.availabilitySubscriptions.Subscribe()
for {
select {
@ -410,7 +410,7 @@ func (m *Messenger) checkForMissingMessagesLoop() {
// Wait for mailserver available, also triggered on mailserver change
case <-mailserverAvailableSignal:
mailserverAvailableSignal = m.SubscribeMailserverAvailable()
mailserverAvailableSignal = m.mailserverCycle.availabilitySubscriptions.Subscribe()
case <-t.C:

View File

@ -357,7 +357,7 @@ func (m *Messenger) connectToMailserver(ms mailservers.Mailserver) error {
}
m.logger.Info("mailserver available", zap.String("mailserverID", m.mailserverCycle.activeMailserver.ID))
m.EmitMailserverAvailable()
m.mailserverCycle.availabilitySubscriptions.EmitMailserverAvailable()
signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver)
m.transport.SetStorePeerID(peerID)
@ -483,21 +483,6 @@ func (m *Messenger) getPinnedMailserver() (*mailservers.Mailserver, error) {
return nil, nil
}
func (m *Messenger) EmitMailserverAvailable() {
for _, s := range m.mailserverCycle.availabilitySubscriptions {
s <- struct{}{}
close(s)
l := len(m.mailserverCycle.availabilitySubscriptions)
m.mailserverCycle.availabilitySubscriptions = m.mailserverCycle.availabilitySubscriptions[:l-1]
}
}
func (m *Messenger) SubscribeMailserverAvailable() chan struct{} {
c := make(chan struct{})
m.mailserverCycle.availabilitySubscriptions = append(m.mailserverCycle.availabilitySubscriptions, c)
return c
}
func (m *Messenger) disconnectStorenodeIfRequired() error {
m.logger.Debug("wakuV2 storenode status verification")
@ -537,7 +522,7 @@ func (m *Messenger) waitForAvailableStoreNode(timeout time.Duration) bool {
}()
for !m.isMailserverAvailable(m.getActiveMailserverID()) {
select {
case <-m.SubscribeMailserverAvailable():
case <-m.mailserverCycle.availabilitySubscriptions.Subscribe():
case <-cancel:
return
}