fix: wakuv2 mailserver cycle ticker

This commit is contained in:
Richard Ramos 2022-10-26 14:23:41 -04:00 committed by RichΛrd
parent de61ed1213
commit bb4237f616
1 changed files with 24 additions and 1 deletions

View File

@ -421,6 +421,17 @@ func (m *Messenger) mailserverPeersInfo() []ConnectedPeer {
return connectedPeers return connectedPeers
} }
func (m *Messenger) storenodesPeersInfo() []ConnectedPeer {
var connectedPeers []ConnectedPeer
for k := range m.transport.Peers() {
connectedPeers = append(connectedPeers, ConnectedPeer{
UniqueID: k,
})
}
return connectedPeers
}
func (m *Messenger) penalizeMailserver(id string) { func (m *Messenger) penalizeMailserver(id string) {
m.mailPeersMutex.Lock() m.mailPeersMutex.Lock()
defer m.mailPeersMutex.Unlock() defer m.mailPeersMutex.Unlock()
@ -477,6 +488,7 @@ func (m *Messenger) handleMailserverCycleEvent(connectedPeers []ConnectedPeer) e
continue continue
} }
pInfo, ok := m.mailserverCycle.peers[id] pInfo, ok := m.mailserverCycle.peers[id]
if !ok || pInfo.status != connected { if !ok || pInfo.status != connected {
m.logger.Info("peer connected", zap.String("peer", connectedPeer.UniqueID)) m.logger.Info("peer connected", zap.String("peer", connectedPeer.UniqueID))
pInfo.status = connected pInfo.status = connected
@ -516,8 +528,9 @@ func (m *Messenger) handleMailserverCycleEvent(connectedPeers []ConnectedPeer) e
m.mailPeersMutex.Lock() m.mailPeersMutex.Lock()
pInfo, ok := m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID] pInfo, ok := m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID]
m.mailPeersMutex.Unlock() m.mailPeersMutex.Unlock()
if ok { if ok {
if pInfo.status != connected && pInfo.lastConnectionAttempt.Add(30*time.Second).Before(time.Now()) { if pInfo.status != connected && pInfo.lastConnectionAttempt.Add(20*time.Second).Before(time.Now()) {
m.logger.Info("penalizing mailserver & disconnecting connecting", zap.String("id", m.mailserverCycle.activeMailserver.ID)) m.logger.Info("penalizing mailserver & disconnecting connecting", zap.String("id", m.mailserverCycle.activeMailserver.ID))
signal.SendMailserverNotWorking() signal.SendMailserverNotWorking()
@ -593,6 +606,9 @@ func (m *Messenger) updateWakuV2PeerStatus() {
m.logger.Error("Could not subscribe to connection status changes", zap.Error(err)) m.logger.Error("Could not subscribe to connection status changes", zap.Error(err))
} }
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for { for {
select { select {
case status := <-connSubscription.C: case status := <-connSubscription.C:
@ -606,6 +622,13 @@ func (m *Messenger) updateWakuV2PeerStatus() {
return return
} }
case <-ticker.C:
err := m.handleMailserverCycleEvent(m.storenodesPeersInfo())
if err != nil {
m.logger.Error("failed to handle mailserver cycle event", zap.Error(err))
continue
}
case <-m.quit: case <-m.quit:
close(m.mailserverCycle.events) close(m.mailserverCycle.events)
m.mailserverCycle.subscription.Unsubscribe() m.mailserverCycle.subscription.Unsubscribe()