From 9e52e09dd54da1dab2327c5173193b484b4558c7 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Tue, 8 Aug 2023 10:44:38 -0400 Subject: [PATCH] chore: keepAlive will not immediatly disconnect peers when waking up from sleep, but do it only if pinging the peer fails --- waku/v2/node/keepalive.go | 40 +++++++++++++++------------------- waku/v2/node/keepalive_test.go | 7 +++--- 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/waku/v2/node/keepalive.go b/waku/v2/node/keepalive.go index 847bc4fa..a2a8256e 100644 --- a/waku/v2/node/keepalive.go +++ b/waku/v2/node/keepalive.go @@ -5,7 +5,6 @@ import ( "sync" "time" - "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/protocol/ping" @@ -15,15 +14,10 @@ import ( const maxAllowedPingFailures = 2 -func disconnectPeers(host host.Host, logger *zap.Logger) { - logger.Warn("keep alive hasnt been executed recently. Killing all connections to peers") - for _, p := range host.Network().Peers() { - err := host.Network().ClosePeer(p) - if err != nil { - logger.Warn("while disconnecting peer", zap.Error(err)) - } - } -} +// If the difference between the last time the keep alive code was executed and now is greater +// than sleepDectectionIntervalFactor * keepAlivePeriod, force the ping verification to disconnect +// the peers if they don't reply back +const sleepDetectionIntervalFactor = 3 // startKeepAlive creates a go routine that periodically pings connected peers. // This is necessary because TCP connections are automatically closed due to inactivity, @@ -36,15 +30,17 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, t time.Duration) { lastTimeExecuted := w.timesource.Now() - sleepDetectionInterval := int64(t) * 3 + sleepDetectionInterval := int64(t) * sleepDetectionIntervalFactor for { select { case <-ticker.C: difference := w.timesource.Now().UnixNano() - lastTimeExecuted.UnixNano() + forceDisconnectOnPingFailure := false if difference > sleepDetectionInterval { - disconnectPeers(w.host, w.log) + forceDisconnectOnPingFailure = true lastTimeExecuted = w.timesource.Now() + w.log.Warn("keep alive hasnt been executed recently. Killing connections to peers if ping fails") continue } @@ -55,7 +51,7 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, t time.Duration) { pingWg.Add(len(peersToPing)) for _, p := range peersToPing { if p != w.host.ID() { - go w.pingPeer(ctx, &pingWg, p) + go w.pingPeer(ctx, &pingWg, p, forceDisconnectOnPingFailure) } } pingWg.Wait() @@ -68,41 +64,41 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, t time.Duration) { } } -func (w *WakuNode) pingPeer(ctx context.Context, wg *sync.WaitGroup, peer peer.ID) { +func (w *WakuNode) pingPeer(ctx context.Context, wg *sync.WaitGroup, peerID peer.ID, forceDisconnectOnFail bool) { defer wg.Done() ctx, cancel := context.WithTimeout(ctx, 7*time.Second) defer cancel() - logger := w.log.With(logging.HostID("peer", peer)) + logger := w.log.With(logging.HostID("peer", peerID)) logger.Debug("pinging") - pr := ping.Ping(ctx, w.host, peer) + pr := ping.Ping(ctx, w.host, peerID) select { case res := <-pr: if res.Error != nil { w.keepAliveMutex.Lock() - w.keepAliveFails[peer]++ + w.keepAliveFails[peerID]++ w.keepAliveMutex.Unlock() logger.Debug("could not ping", zap.Error(res.Error)) } else { w.keepAliveMutex.Lock() - delete(w.keepAliveFails, peer) + delete(w.keepAliveFails, peerID) w.keepAliveMutex.Unlock() } case <-ctx.Done(): w.keepAliveMutex.Lock() - w.keepAliveFails[peer]++ + w.keepAliveFails[peerID]++ w.keepAliveMutex.Unlock() logger.Debug("could not ping (context done)", zap.Error(ctx.Err())) } w.keepAliveMutex.Lock() - if w.keepAliveFails[peer] > maxAllowedPingFailures && w.host.Network().Connectedness(peer) == network.Connected { + if (forceDisconnectOnFail || w.keepAliveFails[peerID] > maxAllowedPingFailures) && w.host.Network().Connectedness(peerID) == network.Connected { logger.Info("disconnecting peer") - if err := w.host.Network().ClosePeer(peer); err != nil { + if err := w.host.Network().ClosePeer(peerID); err != nil { logger.Debug("closing conn to peer", zap.Error(err)) } - w.keepAliveFails[peer] = 0 + w.keepAliveFails[peerID] = 0 } w.keepAliveMutex.Unlock() } diff --git a/waku/v2/node/keepalive_test.go b/waku/v2/node/keepalive_test.go index f8a56b30..b7922884 100644 --- a/waku/v2/node/keepalive_test.go +++ b/waku/v2/node/keepalive_test.go @@ -21,10 +21,11 @@ func TestKeepAlive(t *testing.T) { require.NoError(t, err) host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) + peerID2 := host2.ID() - host1.Peerstore().AddAddrs(host2.ID(), host2.Addrs(), peerstore.PermanentAddrTTL) + host1.Peerstore().AddAddrs(peerID2, host2.Addrs(), peerstore.PermanentAddrTTL) - err = host1.Connect(ctx, host1.Peerstore().PeerInfo(host2.ID())) + err = host1.Connect(ctx, host1.Peerstore().PeerInfo(peerID2)) require.NoError(t, err) require.Len(t, host1.Network().Peers(), 1) @@ -42,7 +43,7 @@ func TestKeepAlive(t *testing.T) { } w.wg.Add(1) - w.pingPeer(ctx2, w.wg, host2.ID()) + w.pingPeer(ctx2, w.wg, peerID2, false) require.NoError(t, ctx.Err()) }