diff --git a/waku/v2/node/keepalive.go b/waku/v2/node/keepalive.go index 92e0ab1b..eb28d517 100644 --- a/waku/v2/node/keepalive.go +++ b/waku/v2/node/keepalive.go @@ -23,7 +23,7 @@ const maxAllowedPingFailures = 2 // the peers if they don't reply back const sleepDetectionIntervalFactor = 3 -const maxPeersToPing = 10 +const maxPeersToPingPerProtocol = 10 const maxAllowedSubsequentPingFailures = 2 @@ -56,8 +56,8 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration t } allPeersTickerC := make(<-chan time.Time) - if randomPeersPingDuration != 0 { - allPeersTicker := time.NewTicker(randomPeersPingDuration) + if allPeersPingDuration != 0 { + allPeersTicker := time.NewTicker(allPeersPingDuration) defer allPeersTicker.Stop() randomPeersTickerC = allPeersTicker.C } @@ -72,13 +72,15 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration t select { case <-allPeersTickerC: - relayPeersSet := make(map[peer.ID]struct{}) - for _, t := range w.Relay().Topics() { - for _, p := range w.Relay().PubSub().ListPeers(t) { - relayPeersSet[p] = struct{}{} + if w.opts.enableRelay { + relayPeersSet := make(map[peer.ID]struct{}) + for _, t := range w.Relay().Topics() { + for _, p := range w.Relay().PubSub().ListPeers(t) { + relayPeersSet[p] = struct{}{} + } } + peersToPing = append(peersToPing, maps.Keys(relayPeersSet)...) } - peersToPing = maps.Keys(relayPeersSet) case <-randomPeersTickerC: difference := w.timesource.Now().UnixNano() - lastTimeExecuted.UnixNano() @@ -94,36 +96,46 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration t continue } - // Priorize mesh peers - meshPeersSet := make(map[peer.ID]struct{}) - for _, t := range w.Relay().Topics() { - for _, p := range w.Relay().PubSub().MeshPeers(t) { - meshPeersSet[p] = struct{}{} - } - } - peersToPing = append(peersToPing, maps.Keys(meshPeersSet)...) - - // Ping also some random relay peers - if maxPeersToPing-len(peersToPing) > 0 { - relayPeersSet := make(map[peer.ID]struct{}) + if w.opts.enableRelay { + // Priorize mesh peers + meshPeersSet := make(map[peer.ID]struct{}) for _, t := range w.Relay().Topics() { - for _, p := range w.Relay().PubSub().ListPeers(t) { - if _, ok := meshPeersSet[p]; !ok { - relayPeersSet[p] = struct{}{} - } + for _, p := range w.Relay().PubSub().MeshPeers(t) { + meshPeersSet[p] = struct{}{} } } + peersToPing = append(peersToPing, maps.Keys(meshPeersSet)...) - relayPeers := maps.Keys(relayPeersSet) - rand.Shuffle(len(relayPeers), func(i, j int) { relayPeers[i], relayPeers[j] = relayPeers[j], relayPeers[i] }) + // Ping also some random relay peers + if maxPeersToPingPerProtocol-len(peersToPing) > 0 { + relayPeersSet := make(map[peer.ID]struct{}) + for _, t := range w.Relay().Topics() { + for _, p := range w.Relay().PubSub().ListPeers(t) { + if _, ok := meshPeersSet[p]; !ok { + relayPeersSet[p] = struct{}{} + } + } + } - peerLen := maxPeersToPing - len(peersToPing) - if peerLen > len(relayPeers) { - peerLen = len(relayPeers) + relayPeers := maps.Keys(relayPeersSet) + rand.Shuffle(len(relayPeers), func(i, j int) { relayPeers[i], relayPeers[j] = relayPeers[j], relayPeers[i] }) + + peerLen := maxPeersToPingPerProtocol - len(peersToPing) + if peerLen > len(relayPeers) { + peerLen = len(relayPeers) + } + peersToPing = append(peersToPing, relayPeers[0:peerLen]...) } - peersToPing = append(peersToPing, relayPeers[0:peerLen]...) } + if w.opts.enableFilterLightNode { + // We also ping all filter nodes + filterPeersSet := make(map[peer.ID]struct{}) + for _, s := range w.FilterLightnode().Subscriptions() { + filterPeersSet[s.PeerID] = struct{}{} + } + peersToPing = append(peersToPing, maps.Keys(filterPeersSet)...) + } case <-ctx.Done(): w.log.Info("stopping ping protocol") return diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index dd7fbae9..747109ff 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -378,11 +378,6 @@ func (w *WakuNode) Start(ctx context.Context) error { return err } - if w.opts.keepAliveRandomPeersInterval > time.Duration(0) || w.opts.keepAliveAllPeersInterval > time.Duration(0) { - w.wg.Add(1) - go w.startKeepAlive(ctx, w.opts.keepAliveRandomPeersInterval, w.opts.keepAliveAllPeersInterval) - } - w.metadata.SetHost(host) err = w.metadata.Start(ctx) if err != nil { @@ -478,6 +473,11 @@ func (w *WakuNode) Start(ctx context.Context) error { } } + if w.opts.keepAliveRandomPeersInterval > time.Duration(0) || w.opts.keepAliveAllPeersInterval > time.Duration(0) { + w.wg.Add(1) + go w.startKeepAlive(ctx, w.opts.keepAliveRandomPeersInterval, w.opts.keepAliveAllPeersInterval) + } + w.peerExchange.SetHost(host) if w.opts.enablePeerExchange { err := w.peerExchange.Start(ctx)