diff --git a/protocol/messenger_mailserver_cycle.go b/protocol/messenger_mailserver_cycle.go index df549bcbd..9c044958e 100644 --- a/protocol/messenger_mailserver_cycle.go +++ b/protocol/messenger_mailserver_cycle.go @@ -421,6 +421,17 @@ func (m *Messenger) mailserverPeersInfo() []ConnectedPeer { return connectedPeers } +func (m *Messenger) storenodesPeersInfo() []ConnectedPeer { + var connectedPeers []ConnectedPeer + for k := range m.transport.Peers() { + connectedPeers = append(connectedPeers, ConnectedPeer{ + UniqueID: k, + }) + } + + return connectedPeers +} + func (m *Messenger) penalizeMailserver(id string) { m.mailPeersMutex.Lock() defer m.mailPeersMutex.Unlock() @@ -477,6 +488,7 @@ func (m *Messenger) handleMailserverCycleEvent(connectedPeers []ConnectedPeer) e continue } pInfo, ok := m.mailserverCycle.peers[id] + if !ok || pInfo.status != connected { m.logger.Info("peer connected", zap.String("peer", connectedPeer.UniqueID)) pInfo.status = connected @@ -516,8 +528,9 @@ func (m *Messenger) handleMailserverCycleEvent(connectedPeers []ConnectedPeer) e m.mailPeersMutex.Lock() pInfo, ok := m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID] m.mailPeersMutex.Unlock() + if ok { - if pInfo.status != connected && pInfo.lastConnectionAttempt.Add(30*time.Second).Before(time.Now()) { + if pInfo.status != connected && pInfo.lastConnectionAttempt.Add(20*time.Second).Before(time.Now()) { m.logger.Info("penalizing mailserver & disconnecting connecting", zap.String("id", m.mailserverCycle.activeMailserver.ID)) signal.SendMailserverNotWorking() @@ -593,6 +606,9 @@ func (m *Messenger) updateWakuV2PeerStatus() { m.logger.Error("Could not subscribe to connection status changes", zap.Error(err)) } + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { select { case status := <-connSubscription.C: @@ -606,6 +622,13 @@ func (m *Messenger) updateWakuV2PeerStatus() { return } + case <-ticker.C: + err := m.handleMailserverCycleEvent(m.storenodesPeersInfo()) + if err != nil { + m.logger.Error("failed to handle mailserver cycle event", zap.Error(err)) + continue + } + case <-m.quit: close(m.mailserverCycle.events) m.mailserverCycle.subscription.Unsubscribe()