diff --git a/waku/v2/node/keepalive.go b/waku/v2/node/keepalive.go index b0f7022c..f9b33e70 100644 --- a/waku/v2/node/keepalive.go +++ b/waku/v2/node/keepalive.go @@ -23,9 +23,26 @@ func (w *WakuNode) startKeepAlive(t time.Duration) { w.log.Info("setting up ping protocol", zap.Duration("duration", t)) ticker := time.NewTicker(t) defer ticker.Stop() + + lastTimeExecuted := <-ticker.C + sleepDetectionInterval := int64(t) * 3 + for { select { case <-ticker.C: + difference := time.Now().UnixNano() - lastTimeExecuted.UnixNano() + if difference > sleepDetectionInterval { + w.log.Warn("keep alive hasnt been executed recently. Killing all connections to peers") + for _, p := range w.host.Network().Peers() { + err := w.host.Network().ClosePeer(p) + if err != nil { + w.log.Warn("while disconnecting peer", zap.Error(err)) + } + } + lastTimeExecuted = time.Now() + continue + } + // Network's peers collection, // contains only currently active peers for _, p := range w.host.Network().Peers() { @@ -34,6 +51,8 @@ func (w *WakuNode) startKeepAlive(t time.Duration) { go w.pingPeer(p) } } + + lastTimeExecuted = time.Now() case <-w.quit: return } @@ -58,7 +77,7 @@ func (w *WakuNode) pingPeer(peer peer.ID) { w.keepAliveFails[peer]++ logger.Debug("could not ping", zap.Error(res.Error)) } else { - w.keepAliveFails[peer] = 0 + delete(w.keepAliveFails, peer) } case <-ctx.Done(): w.keepAliveFails[peer]++