diff --git a/waku/v2/node/keepalive_test.go b/waku/v2/node/keepalive_test.go new file mode 100644 index 00000000..d90b12de --- /dev/null +++ b/waku/v2/node/keepalive_test.go @@ -0,0 +1,37 @@ +package node + +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/peerstore" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "github.com/stretchr/testify/require" +) + +func TestKeepAlive(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) + require.NoError(t, err) + host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) + require.NoError(t, err) + + host1.Peerstore().AddAddrs(host2.ID(), host2.Addrs(), peerstore.PermanentAddrTTL) + + err = host1.Connect(ctx, host1.Peerstore().PeerInfo(host2.ID())) + require.NoError(t, err) + + ping := ping.NewPingService(host1) + + require.Len(t, host1.Network().Peers(), 1) + + ctx2, cancel2 := context.WithTimeout(ctx, 3*time.Second) + defer cancel2() + pingPeer(ctx2, ping, host2.ID()) + + require.NoError(t, ctx.Err()) +} diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index e38caea9..573984cc 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -694,6 +694,9 @@ func (w *WakuNode) Peers() PeerStats { return p } +// startKeepAlive creates a go routine that periodically pings connected peers. +// This is necessary because TCP connections are automatically closed due to inactivity, +// and doing a ping will avoid this (with a small bandwidth cost) func (w *WakuNode) startKeepAlive(t time.Duration) { log.Info("Setting up ping protocol with duration of ", t) @@ -705,20 +708,7 @@ func (w *WakuNode) startKeepAlive(t time.Duration) { select { case <-ticker.C: for _, p := range w.host.Network().Peers() { - log.Debug("Pinging ", p) - go func(peer peer.ID) { - ctx, cancel := context.WithTimeout(w.ctx, 3*time.Second) - defer cancel() - pr := w.ping.Ping(ctx, peer) - select { - case res := <-pr: - if res.Error != nil { - log.Error(fmt.Sprintf("Could not ping %s: %s", peer, res.Error.Error())) - } - case <-ctx.Done(): - log.Error(fmt.Sprintf("Could not ping %s: %s", peer, ctx.Err())) - } - }(p) + go pingPeer(w.ctx, w.ping, p) } case <-w.quit: ticker.Stop() @@ -727,3 +717,19 @@ func (w *WakuNode) startKeepAlive(t time.Duration) { } }() } + +func pingPeer(ctx context.Context, pingService *ping.PingService, peer peer.ID) { + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + + log.Debug("Pinging ", peer) + pr := pingService.Ping(ctx, peer) + select { + case res := <-pr: + if res.Error != nil { + log.Error(fmt.Sprintf("Could not ping %s: %s", peer, res.Error.Error())) + } + case <-ctx.Done(): + log.Error(fmt.Sprintf("Could not ping %s: %s", peer, ctx.Err())) + } +}