fix: fetch history when back online (#4651)

This commit is contained in:
Igor Sirotin 2024-01-30 18:13:18 +00:00 committed by GitHub
parent 98c1ebec05
commit 0c2a935578
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 37 additions and 28 deletions

View File

@ -928,25 +928,29 @@ func (m *Messenger) cleanTopics() error {
// handle connection change is called each time we go from offline/online or viceversa // handle connection change is called each time we go from offline/online or viceversa
func (m *Messenger) handleConnectionChange(online bool) { func (m *Messenger) handleConnectionChange(online bool) {
if online { // Update pushNotificationClient
if m.pushNotificationClient != nil { if m.pushNotificationClient != nil {
if online {
m.pushNotificationClient.Online() m.pushNotificationClient.Online()
} else {
m.pushNotificationClient.Offline()
}
} }
if m.shouldPublishContactCode { // Publish contact code
if online && m.shouldPublishContactCode {
if err := m.publishContactCode(); err != nil { if err := m.publishContactCode(); err != nil {
m.logger.Error("could not publish on contact code", zap.Error(err)) m.logger.Error("could not publish on contact code", zap.Error(err))
return
} }
m.shouldPublishContactCode = false m.shouldPublishContactCode = false
} }
} else {
if m.pushNotificationClient != nil { // Start fetching messages from store nodes
m.pushNotificationClient.Offline() if online {
} m.asyncRequestAllHistoricMessages()
} }
// Update ENS verifier
m.ensVerifier.SetOnline(online) m.ensVerifier.SetOnline(online)
} }

View File

@ -309,7 +309,9 @@ func (m *Messenger) resetFiltersPriority(filters []*transport.Filter) {
} }
func (m *Messenger) RequestAllHistoricMessagesWithRetries(forceFetchingBackup bool) (*MessengerResponse, error) { func (m *Messenger) RequestAllHistoricMessagesWithRetries(forceFetchingBackup bool) (*MessengerResponse, error) {
return m.performMailserverRequest(func() (*MessengerResponse, error) { return m.RequestAllHistoricMessages(forceFetchingBackup) }) return m.performMailserverRequest(func() (*MessengerResponse, error) {
return m.RequestAllHistoricMessages(forceFetchingBackup)
})
} }
// RequestAllHistoricMessages requests all the historic messages for any topic // RequestAllHistoricMessages requests all the historic messages for any topic

View File

@ -425,12 +425,7 @@ func (m *Messenger) connectToMailserver(ms mailservers.Mailserver) error {
signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver.Address, m.mailserverCycle.activeMailserver.ID) signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver.Address, m.mailserverCycle.activeMailserver.ID)
// Query mailserver // Query mailserver
go func() { m.asyncRequestAllHistoricMessages()
_, err := m.performMailserverRequest(func() (*MessengerResponse, error) { return m.RequestAllHistoricMessages(false) })
if err != nil {
m.logger.Error("could not perform mailserver request", zap.Error(err))
}
}()
} }
} }
return nil return nil
@ -557,12 +552,7 @@ func (m *Messenger) handleMailserverCycleEvent(connectedPeers []ConnectedPeer) e
signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver.Address, m.mailserverCycle.activeMailserver.ID) signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver.Address, m.mailserverCycle.activeMailserver.ID)
} }
// Query mailserver // Query mailserver
go func() { m.asyncRequestAllHistoricMessages()
_, err := m.RequestAllHistoricMessagesWithRetries(false)
if err != nil {
m.logger.Error("failed to request historic messages", zap.Error(err))
}
}()
} else { } else {
m.mailPeersMutex.Unlock() m.mailPeersMutex.Unlock()
} }
@ -603,6 +593,16 @@ func (m *Messenger) handleMailserverCycleEvent(connectedPeers []ConnectedPeer) e
return nil return nil
} }
func (m *Messenger) asyncRequestAllHistoricMessages() {
m.logger.Debug("asyncRequestAllHistoricMessages")
go func() {
_, err := m.RequestAllHistoricMessagesWithRetries(false)
if err != nil {
m.logger.Error("failed to request historic messages", zap.Error(err))
}
}()
}
func (m *Messenger) updateWakuV1PeerStatus() { func (m *Messenger) updateWakuV1PeerStatus() {
ticker := time.NewTicker(1 * time.Second) ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop() defer ticker.Stop()

View File

@ -1232,7 +1232,9 @@ func (w *Waku) Start() error {
case c := <-w.connStatusChan: case c := <-w.connStatusChan:
w.connStatusMu.Lock() w.connStatusMu.Lock()
latestConnStatus := formatConnStatus(w.node, c) latestConnStatus := formatConnStatus(w.node, c)
w.logger.Debug("PeerStats", zap.Any("stats", latestConnStatus)) w.logger.Debug("peer stats",
zap.Int("peersCount", len(latestConnStatus.Peers)),
zap.Any("stats", latestConnStatus))
for k, subs := range w.connStatusSubscriptions { for k, subs := range w.connStatusSubscriptions {
if subs.Active() { if subs.Active() {
subs.C <- latestConnStatus subs.C <- latestConnStatus
@ -1372,6 +1374,7 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag
} }
logger := w.logger.With( logger := w.logger.With(
zap.Any("messageType", msgType),
zap.String("envelopeHash", hexutil.Encode(envelope.Hash())), zap.String("envelopeHash", hexutil.Encode(envelope.Hash())),
zap.String("contentTopic", envelope.Message().ContentTopic), zap.String("contentTopic", envelope.Message().ContentTopic),
zap.Int64("timestamp", envelope.Message().GetTimestamp()), zap.Int64("timestamp", envelope.Message().GetTimestamp()),