fix: filter ping timeout and retry in case of failure (#1166)

This commit is contained in:
Prem Chaitanya Prathi 2024-07-24 07:59:17 +05:30 committed by GitHub
parent 75047cc9da
commit 58d9721026
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 18 additions and 9 deletions

View File

@ -14,7 +14,6 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
const FilterPingTimeout = 5 * time.Second
const MultiplexChannelBuffer = 100 const MultiplexChannelBuffer = 100
type FilterConfig struct { type FilterConfig struct {

View File

@ -8,6 +8,8 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
const PingTimeout = 5 * time.Second
func (wf *WakuFilterLightNode) PingPeers() { func (wf *WakuFilterLightNode) PingPeers() {
//Send a ping to all the peers and report their status to corresponding subscriptions //Send a ping to all the peers and report their status to corresponding subscriptions
// Alive or not or set state of subcription?? // Alive or not or set state of subcription??
@ -17,17 +19,23 @@ func (wf *WakuFilterLightNode) PingPeers() {
} }
func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) { func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) {
ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), wf.peerPingInterval) ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), PingTimeout)
defer cancel() defer cancel()
err := wf.Ping(ctxWithTimeout, peer) err := wf.Ping(ctxWithTimeout, peer)
if err != nil { if err != nil {
wf.log.Warn("Filter ping failed towards peer", zap.Stringer("peer", peer), zap.Error(err)) wf.log.Warn("Filter ping failed towards peer", zap.Stringer("peer", peer), zap.Error(err))
//quickly retry ping again before marking subscription as failure
subscriptions := wf.subscriptions.GetAllSubscriptionsForPeer(peer) //Note that PingTimeout is a fraction of PingInterval so this shouldn't cause parallel pings being sent.
for _, subscription := range subscriptions { ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), PingTimeout)
wf.log.Debug("Notifying sub closing", zap.String("subID", subscription.ID)) defer cancel()
//Indicating that subscription is closing, err = wf.Ping(ctxWithTimeout, peer)
subscription.SetClosing() if err != nil {
subscriptions := wf.subscriptions.GetAllSubscriptionsForPeer(peer)
for _, subscription := range subscriptions {
wf.log.Debug("Notifying sub closing", zap.String("subID", subscription.ID))
//Indicating that subscription is closing,
subscription.SetClosing()
}
} }
} }
} }
@ -39,7 +47,9 @@ func (wf *WakuFilterLightNode) FilterHealthCheckLoop() {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
wf.PingPeers() if wf.onlineChecker.IsOnline() {
wf.PingPeers()
}
case <-wf.CommonService.Context().Done(): case <-wf.CommonService.Context().Done():
return return
} }