diff --git a/waku/v2/api/filter.go b/waku/v2/api/filter.go index b9d64310..1f9ea6be 100644 --- a/waku/v2/api/filter.go +++ b/waku/v2/api/filter.go @@ -14,7 +14,6 @@ import ( "go.uber.org/zap" ) -const FilterPingTimeout = 5 * time.Second const MultiplexChannelBuffer = 100 type FilterConfig struct { diff --git a/waku/v2/protocol/filter/filter_health_check.go b/waku/v2/protocol/filter/filter_health_check.go index 836175b5..a6b76a34 100644 --- a/waku/v2/protocol/filter/filter_health_check.go +++ b/waku/v2/protocol/filter/filter_health_check.go @@ -8,6 +8,8 @@ import ( "go.uber.org/zap" ) +const PingTimeout = 5 * time.Second + func (wf *WakuFilterLightNode) PingPeers() { //Send a ping to all the peers and report their status to corresponding subscriptions // Alive or not or set state of subcription?? @@ -17,17 +19,23 @@ func (wf *WakuFilterLightNode) PingPeers() { } func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) { - ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), wf.peerPingInterval) + ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), PingTimeout) defer cancel() err := wf.Ping(ctxWithTimeout, peer) if err != nil { wf.log.Warn("Filter ping failed towards peer", zap.Stringer("peer", peer), zap.Error(err)) - - subscriptions := wf.subscriptions.GetAllSubscriptionsForPeer(peer) - for _, subscription := range subscriptions { - wf.log.Debug("Notifying sub closing", zap.String("subID", subscription.ID)) - //Indicating that subscription is closing, - subscription.SetClosing() + //quickly retry ping again before marking subscription as failure + //Note that PingTimeout is a fraction of PingInterval so this shouldn't cause parallel pings being sent. + ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), PingTimeout) + defer cancel() + err = wf.Ping(ctxWithTimeout, peer) + if err != nil { + subscriptions := wf.subscriptions.GetAllSubscriptionsForPeer(peer) + for _, subscription := range subscriptions { + wf.log.Debug("Notifying sub closing", zap.String("subID", subscription.ID)) + //Indicating that subscription is closing, + subscription.SetClosing() + } } } } @@ -39,7 +47,9 @@ func (wf *WakuFilterLightNode) FilterHealthCheckLoop() { for { select { case <-ticker.C: - wf.PingPeers() + if wf.onlineChecker.IsOnline() { + wf.PingPeers() + } case <-wf.CommonService.Context().Done(): return }