From 56cc5c96c5df0b96e4a15d7ce3c9e05561ca43b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Thu, 11 Jul 2024 14:36:34 -0400 Subject: [PATCH] feat_: bump go-waku to introduce new keep alive interval (#5484) - Also renames the existing keepAliveInterval to randomPeerKeepAliveInterval, and uses time.Duration instead of int --- VERSION | 2 +- appdatabase/node_config_test.go | 1 - go.mod | 4 +- go.sum | 8 +- node/status_node_services.go | 1 - nodecfg/node_config.go | 10 +- params/config.go | 3 - protocol/waku_builder_test.go | 11 +- .../libp2p/go-libp2p-pubsub/gossipsub.go | 2 +- .../libp2p/go-libp2p-pubsub/pubsub.go | 29 ++- .../go-waku/waku/v2/node/keepalive.go | 176 +++++++++++++----- .../go-waku/waku/v2/node/wakunode2.go | 19 +- .../go-waku/waku/v2/node/wakuoptions.go | 13 +- .../waku/v2/peermanager/peer_manager.go | 9 +- .../waku/v2/protocol/filter/test_utils.go | 2 +- vendor/modules.txt | 4 +- wakuv2/config.go | 6 - wakuv2/waku.go | 144 +++++--------- wakuv2/waku_test.go | 7 +- 19 files changed, 259 insertions(+), 192 deletions(-) diff --git a/VERSION b/VERSION index 1f412b3f5..18107b43b 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.180.0 +0.179.11 diff --git a/appdatabase/node_config_test.go b/appdatabase/node_config_test.go index b3b13ca0f..75d8dffac 100644 --- a/appdatabase/node_config_test.go +++ b/appdatabase/node_config_test.go @@ -215,7 +215,6 @@ func randomNodeConfig() *params.NodeConfig { Enabled: randomBool(), Host: randomString(), Port: randomInt(math.MaxInt64), - KeepAliveInterval: randomInt(math.MaxInt64), LightClient: randomBool(), FullNode: randomBool(), DiscoveryLimit: randomInt(math.MaxInt64), diff --git a/go.mod b/go.mod index 2e4fd00eb..716532523 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ replace github.com/forPelevin/gomoji => github.com/status-im/gomoji v1.1.3-0.202 replace github.com/mutecomm/go-sqlcipher/v4 v4.4.2 => github.com/status-im/go-sqlcipher/v4 v4.5.4-status.2 -replace github.com/libp2p/go-libp2p-pubsub v0.11.0 => github.com/waku-org/go-libp2p-pubsub v0.0.0-20240701005450-b4513d154445 +replace github.com/libp2p/go-libp2p-pubsub v0.11.0 => github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 require ( github.com/anacrolix/torrent v1.41.0 @@ -96,7 +96,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20240701141800-5b5ea977afe0 + github.com/waku-org/go-waku v0.8.1-0.20240705124012-8df5a0337a1e github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index 881ea8e85..3e185b46d 100644 --- a/go.sum +++ b/go.sum @@ -2139,12 +2139,12 @@ github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1 github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5 h1:4K3IS97JryAEV8pRXB//qPcg+8bPXl/O+AOLt3FeCKc= github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw= -github.com/waku-org/go-libp2p-pubsub v0.0.0-20240701005450-b4513d154445 h1:V5f5NGsf/UwlJENmJjHGD9lr+3/Bz4ZZ6mL61tvtxgg= -github.com/waku-org/go-libp2p-pubsub v0.0.0-20240701005450-b4513d154445/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= +github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16Et17jSGWM3mQhlIOZYiG+O+rlX5BsrBumw5flxk= +github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20240701141800-5b5ea977afe0 h1:3Idg7XvXc9iQpUyg8KNKgZnziHJRs3xm7EDJHFzC9to= -github.com/waku-org/go-waku v0.8.1-0.20240701141800-5b5ea977afe0/go.mod h1:hkW5zXyM/ZIMDPniVooTk4dOGwY+OzrB0Q6fx+1sLpo= +github.com/waku-org/go-waku v0.8.1-0.20240705124012-8df5a0337a1e h1:aqZA60QlYiMBcgqHEx6ksof4A6+KspMIqoXu/ACXmKU= +github.com/waku-org/go-waku v0.8.1-0.20240705124012-8df5a0337a1e/go.mod h1:nLQmeqztw19wmJI9rpXY6Lx81yYE/qOAd1qZsrAK2A8= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/node/status_node_services.go b/node/status_node_services.go index a61750b42..671505497 100644 --- a/node/status_node_services.go +++ b/node/status_node_services.go @@ -323,7 +323,6 @@ func (b *StatusNode) wakuV2Service(nodeConfig *params.NodeConfig) (*wakuv2.Waku, Host: nodeConfig.WakuV2Config.Host, Port: nodeConfig.WakuV2Config.Port, LightClient: nodeConfig.WakuV2Config.LightClient, - KeepAliveInterval: nodeConfig.WakuV2Config.KeepAliveInterval, Rendezvous: nodeConfig.Rendezvous, WakuNodes: nodeConfig.ClusterConfig.WakuNodes, EnableStore: nodeConfig.WakuV2Config.EnableStore, diff --git a/nodecfg/node_config.go b/nodecfg/node_config.go index 7eb00d919..9ed6863ff 100644 --- a/nodecfg/node_config.go +++ b/nodecfg/node_config.go @@ -206,10 +206,10 @@ func insertTorrentConfig(tx *sql.Tx, c *params.NodeConfig) error { func insertWakuV2ConfigPreMigration(tx *sql.Tx, c *params.NodeConfig) error { _, err := tx.Exec(` INSERT OR REPLACE INTO wakuv2_config ( - enabled, host, port, keep_alive_interval, light_client, full_node, discovery_limit, data_dir, + enabled, host, port, light_client, full_node, discovery_limit, data_dir, max_message_size, enable_confirmations, peer_exchange, enable_discv5, udp_port, auto_update, synthetic_id - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'id')`, - c.WakuV2Config.Enabled, c.WakuV2Config.Host, c.WakuV2Config.Port, c.WakuV2Config.KeepAliveInterval, c.WakuV2Config.LightClient, c.WakuV2Config.FullNode, c.WakuV2Config.DiscoveryLimit, c.WakuV2Config.DataDir, + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'id')`, + c.WakuV2Config.Enabled, c.WakuV2Config.Host, c.WakuV2Config.Port, c.WakuV2Config.LightClient, c.WakuV2Config.FullNode, c.WakuV2Config.DiscoveryLimit, c.WakuV2Config.DataDir, c.WakuV2Config.MaxMessageSize, c.WakuV2Config.EnableConfirmations, c.WakuV2Config.PeerExchange, c.WakuV2Config.EnableDiscV5, c.WakuV2Config.UDPPort, c.WakuV2Config.AutoUpdate, ) if err != nil { @@ -676,12 +676,12 @@ func loadNodeConfig(tx *sql.Tx) (*params.NodeConfig, error) { } err = tx.QueryRow(` - SELECT enabled, host, port, keep_alive_interval, light_client, full_node, discovery_limit, data_dir, + SELECT enabled, host, port, light_client, full_node, discovery_limit, data_dir, max_message_size, enable_confirmations, peer_exchange, enable_discv5, udp_port, auto_update, enable_store, store_capacity, store_seconds, use_shard_default_topic, enable_missing_message_verification FROM wakuv2_config WHERE synthetic_id = 'id' `).Scan( - &nodecfg.WakuV2Config.Enabled, &nodecfg.WakuV2Config.Host, &nodecfg.WakuV2Config.Port, &nodecfg.WakuV2Config.KeepAliveInterval, &nodecfg.WakuV2Config.LightClient, &nodecfg.WakuV2Config.FullNode, + &nodecfg.WakuV2Config.Enabled, &nodecfg.WakuV2Config.Host, &nodecfg.WakuV2Config.Port, &nodecfg.WakuV2Config.LightClient, &nodecfg.WakuV2Config.FullNode, &nodecfg.WakuV2Config.DiscoveryLimit, &nodecfg.WakuV2Config.DataDir, &nodecfg.WakuV2Config.MaxMessageSize, &nodecfg.WakuV2Config.EnableConfirmations, &nodecfg.WakuV2Config.PeerExchange, &nodecfg.WakuV2Config.EnableDiscV5, &nodecfg.WakuV2Config.UDPPort, &nodecfg.WakuV2Config.AutoUpdate, &nodecfg.WakuV2Config.EnableStore, &nodecfg.WakuV2Config.StoreCapacity, &nodecfg.WakuV2Config.StoreSeconds, &nodecfg.WakuV2Config.UseShardAsDefaultTopic, diff --git a/params/config.go b/params/config.go index c3bffea98..55165bb50 100644 --- a/params/config.go +++ b/params/config.go @@ -161,9 +161,6 @@ type WakuV2Config struct { // Port number in which to start libp2p protocol (0 for random) Port int - // Interval of time in seconds to send a ping to peers to keep the connection to them alive - KeepAliveInterval int - // LightClient should be true if the node will not relay messages and only rely on lightpush/filter nodes LightClient bool diff --git a/protocol/waku_builder_test.go b/protocol/waku_builder_test.go index 35c1493a4..9771be218 100644 --- a/protocol/waku_builder_test.go +++ b/protocol/waku_builder_test.go @@ -1,11 +1,14 @@ package protocol import ( + "crypto/ecdsa" "database/sql" "github.com/stretchr/testify/suite" "go.uber.org/zap" + "github.com/ethereum/go-ethereum/crypto" + "github.com/status-im/status-go/appdatabase" gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" "github.com/status-im/status-go/eth-node/types" @@ -19,6 +22,7 @@ type testWakuV2Config struct { enableStore bool useShardAsDefaultTopic bool clusterID uint16 + nodekey []byte } func NewTestWakuV2(s *suite.Suite, cfg testWakuV2Config) *waku2.Waku { @@ -31,6 +35,11 @@ func NewTestWakuV2(s *suite.Suite, cfg testWakuV2Config) *waku2.Waku { EnableDiscV5: false, } + var nodeKey *ecdsa.PrivateKey + if len(cfg.nodekey) != 0 { + nodeKey, _ = crypto.ToECDSA(cfg.nodekey) + } + var db *sql.DB if cfg.enableStore { @@ -44,7 +53,7 @@ func NewTestWakuV2(s *suite.Suite, cfg testWakuV2Config) *waku2.Waku { } wakuNode, err := waku2.New( - nil, + nodeKey, "", wakuConfig, cfg.logger, diff --git a/vendor/github.com/libp2p/go-libp2p-pubsub/gossipsub.go b/vendor/github.com/libp2p/go-libp2p-pubsub/gossipsub.go index 6e42a7a02..21f34bece 100644 --- a/vendor/github.com/libp2p/go-libp2p-pubsub/gossipsub.go +++ b/vendor/github.com/libp2p/go-libp2p-pubsub/gossipsub.go @@ -1985,7 +1985,7 @@ func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID return peers } -func (gs *GossipSubRouter) MeshPeers(topic string) []peer.ID { +func (gs *GossipSubRouter) meshPeers(topic string) []peer.ID { peers, ok := gs.mesh[topic] if !ok { return nil diff --git a/vendor/github.com/libp2p/go-libp2p-pubsub/pubsub.go b/vendor/github.com/libp2p/go-libp2p-pubsub/pubsub.go index 74a3ea924..a9d646e85 100644 --- a/vendor/github.com/libp2p/go-libp2p-pubsub/pubsub.go +++ b/vendor/github.com/libp2p/go-libp2p-pubsub/pubsub.go @@ -90,6 +90,9 @@ type PubSub struct { // get chan of peers we are connected to getPeers chan *listPeerReq + // get chan to obtain list of full mesh peers (only applies when ussing gossipsub) + getMeshPeers chan *listPeerReq + // send subscription here to cancel it cancelCh chan *Subscription @@ -271,6 +274,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option deadPeerBackoff: newBackoff(ctx, 1000, BackoffCleanupInterval, MaxBackoffAttempts), cancelCh: make(chan *Subscription), getPeers: make(chan *listPeerReq), + getMeshPeers: make(chan *listPeerReq), addSub: make(chan *addSubReq), addRelay: make(chan *addRelayReq), rmRelay: make(chan string), @@ -618,6 +622,13 @@ func (p *PubSub) processLoop(ctx context.Context) { p.handleAddRelay(relay) case topic := <-p.rmRelay: p.handleRemoveRelay(topic) + case meshpreq := <-p.getMeshPeers: + var peers []peer.ID + rt, ok := p.rt.(*GossipSubRouter) + if ok { + peers = rt.meshPeers(meshpreq.topic) + } + meshpreq.resp <- peers case preq := <-p.getPeers: tmap, ok := p.topics[preq.topic] if preq.topic != "" && !ok { @@ -1364,6 +1375,20 @@ func (p *PubSub) ListPeers(topic string) []peer.ID { return <-out } +// MeshPeers returns a list of full mesh peers for a given topic +func (p *PubSub) MeshPeers(topic string) []peer.ID { + out := make(chan []peer.ID) + select { + case p.getMeshPeers <- &listPeerReq{ + resp: out, + topic: topic, + }: + case <-p.ctx.Done(): + return nil + } + return <-out +} + // BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped. func (p *PubSub) BlacklistPeer(pid peer.ID) { select { @@ -1420,7 +1445,3 @@ type addRelayReq struct { topic string resp chan RelayCancelFunc } - -func (p *PubSub) Router() PubSubRouter { - return p.rt -} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/keepalive.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/keepalive.go index a2a8256e5..5e3815f9c 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/keepalive.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/keepalive.go @@ -2,6 +2,8 @@ package node import ( "context" + "errors" + "math/rand" "sync" "time" @@ -10,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/waku-org/go-waku/logging" "go.uber.org/zap" + "golang.org/x/exp/maps" ) const maxAllowedPingFailures = 2 @@ -19,86 +22,175 @@ const maxAllowedPingFailures = 2 // the peers if they don't reply back const sleepDetectionIntervalFactor = 3 +const maxPeersToPing = 10 + // startKeepAlive creates a go routine that periodically pings connected peers. // This is necessary because TCP connections are automatically closed due to inactivity, // and doing a ping will avoid this (with a small bandwidth cost) -func (w *WakuNode) startKeepAlive(ctx context.Context, t time.Duration) { +func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration time.Duration, allPeersPingDuration time.Duration) { defer w.wg.Done() - w.log.Info("setting up ping protocol", zap.Duration("duration", t)) - ticker := time.NewTicker(t) - defer ticker.Stop() + + if !w.opts.enableRelay { + return + } + + w.log.Info("setting up ping protocol", zap.Duration("randomPeersPingDuration", randomPeersPingDuration), zap.Duration("allPeersPingDuration", allPeersPingDuration)) + + randomPeersTickerC := make(<-chan time.Time) + if randomPeersPingDuration != 0 { + randomPeersTicker := time.NewTicker(randomPeersPingDuration) + defer randomPeersTicker.Stop() + randomPeersTickerC = randomPeersTicker.C + } + + allPeersTickerC := make(<-chan time.Time) + if randomPeersPingDuration != 0 { + allPeersTicker := time.NewTicker(randomPeersPingDuration) + defer allPeersTicker.Stop() + randomPeersTickerC = allPeersTicker.C + } lastTimeExecuted := w.timesource.Now() - sleepDetectionInterval := int64(t) * sleepDetectionIntervalFactor + sleepDetectionInterval := int64(randomPeersPingDuration) * sleepDetectionIntervalFactor for { + peersToPing := []peer.ID{} + select { - case <-ticker.C: + case <-allPeersTickerC: + relayPeersSet := make(map[peer.ID]struct{}) + for _, t := range w.Relay().Topics() { + for _, p := range w.Relay().PubSub().ListPeers(t) { + relayPeersSet[p] = struct{}{} + } + } + peersToPing = maps.Keys(relayPeersSet) + + case <-randomPeersTickerC: difference := w.timesource.Now().UnixNano() - lastTimeExecuted.UnixNano() - forceDisconnectOnPingFailure := false if difference > sleepDetectionInterval { - forceDisconnectOnPingFailure = true lastTimeExecuted = w.timesource.Now() - w.log.Warn("keep alive hasnt been executed recently. Killing connections to peers if ping fails") + w.log.Warn("keep alive hasnt been executed recently. Killing all connections") + for _, p := range w.host.Network().Peers() { + err := w.host.Network().ClosePeer(p) + if err != nil { + w.log.Debug("closing conn to peer", zap.Error(err)) + } + } continue } - // Network's peers collection, - // contains only currently active peers - pingWg := sync.WaitGroup{} - peersToPing := w.host.Network().Peers() - pingWg.Add(len(peersToPing)) - for _, p := range peersToPing { - if p != w.host.ID() { - go w.pingPeer(ctx, &pingWg, p, forceDisconnectOnPingFailure) + // Priorize mesh peers + meshPeersSet := make(map[peer.ID]struct{}) + for _, t := range w.Relay().Topics() { + for _, p := range w.Relay().PubSub().MeshPeers(t) { + meshPeersSet[p] = struct{}{} } } - pingWg.Wait() + peersToPing = append(peersToPing, maps.Keys(meshPeersSet)...) + + // Ping also some random relay peers + if maxPeersToPing-len(peersToPing) > 0 { + relayPeersSet := make(map[peer.ID]struct{}) + for _, t := range w.Relay().Topics() { + for _, p := range w.Relay().PubSub().ListPeers(t) { + if _, ok := meshPeersSet[p]; !ok { + relayPeersSet[p] = struct{}{} + } + } + } + + relayPeers := maps.Keys(relayPeersSet) + rand.Shuffle(len(relayPeers), func(i, j int) { relayPeers[i], relayPeers[j] = relayPeers[j], relayPeers[i] }) + + peerLen := maxPeersToPing - len(peersToPing) + if peerLen > len(relayPeers) { + peerLen = len(relayPeers) + } + peersToPing = append(peersToPing, relayPeers[0:peerLen]...) + } - lastTimeExecuted = w.timesource.Now() case <-ctx.Done(): w.log.Info("stopping ping protocol") return } + + pingWg := sync.WaitGroup{} + pingWg.Add(len(peersToPing)) + for _, p := range peersToPing { + go w.pingPeer(ctx, &pingWg, p) + } + pingWg.Wait() + + lastTimeExecuted = w.timesource.Now() } } -func (w *WakuNode) pingPeer(ctx context.Context, wg *sync.WaitGroup, peerID peer.ID, forceDisconnectOnFail bool) { +func (w *WakuNode) pingPeer(ctx context.Context, wg *sync.WaitGroup, peerID peer.ID) { defer wg.Done() + logger := w.log.With(logging.HostID("peer", peerID)) + + for i := 0; i < maxAllowedPingFailures; i++ { + if w.host.Network().Connectedness(peerID) != network.Connected { + // Peer is no longer connected. No need to ping + return + } + + logger.Debug("pinging") + + if w.tryPing(ctx, peerID, logger) { + return + } + } + + if w.host.Network().Connectedness(peerID) != network.Connected { + return + } + + logger.Info("disconnecting dead peer") + if err := w.host.Network().ClosePeer(peerID); err != nil { + logger.Debug("closing conn to peer", zap.Error(err)) + } +} + +func (w *WakuNode) tryPing(ctx context.Context, peerID peer.ID, logger *zap.Logger) bool { ctx, cancel := context.WithTimeout(ctx, 7*time.Second) defer cancel() - logger := w.log.With(logging.HostID("peer", peerID)) - logger.Debug("pinging") pr := ping.Ping(ctx, w.host, peerID) select { case res := <-pr: if res.Error != nil { - w.keepAliveMutex.Lock() - w.keepAliveFails[peerID]++ - w.keepAliveMutex.Unlock() logger.Debug("could not ping", zap.Error(res.Error)) - } else { - w.keepAliveMutex.Lock() - delete(w.keepAliveFails, peerID) - w.keepAliveMutex.Unlock() + return false } case <-ctx.Done(): - w.keepAliveMutex.Lock() - w.keepAliveFails[peerID]++ - w.keepAliveMutex.Unlock() - logger.Debug("could not ping (context done)", zap.Error(ctx.Err())) - } - - w.keepAliveMutex.Lock() - if (forceDisconnectOnFail || w.keepAliveFails[peerID] > maxAllowedPingFailures) && w.host.Network().Connectedness(peerID) == network.Connected { - logger.Info("disconnecting peer") - if err := w.host.Network().ClosePeer(peerID); err != nil { - logger.Debug("closing conn to peer", zap.Error(err)) + if !errors.Is(ctx.Err(), context.Canceled) { + logger.Debug("could not ping (context)", zap.Error(ctx.Err())) } - w.keepAliveFails[peerID] = 0 + return false } - w.keepAliveMutex.Unlock() + return true +} + +func (w *WakuNode) getRelayPeers() []peer.ID { + relayPeers := make(map[peer.ID]struct{}) + for _, t := range w.Relay().Topics() { + for _, p := range w.Relay().PubSub().ListPeers(t) { + relayPeers[p] = struct{}{} + } + } + return maps.Keys(relayPeers) +} + +func (w *WakuNode) getFullMeshPeers() []peer.ID { + meshPeers := make(map[peer.ID]struct{}) + for _, t := range w.Relay().Topics() { + for _, p := range w.Relay().PubSub().MeshPeers(t) { + meshPeers[p] = struct{}{} + } + } + return maps.Keys(meshPeers) } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go index 3e9f137ed..5032c823c 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go @@ -116,9 +116,6 @@ type WakuNode struct { addressChangesSub event.Subscription enrChangeCh chan struct{} - keepAliveMutex sync.Mutex - keepAliveFails map[peer.ID]int - cancel context.CancelFunc wg *sync.WaitGroup @@ -193,7 +190,6 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.opts = params w.log = params.logger.Named("node2") w.wg = &sync.WaitGroup{} - w.keepAliveFails = make(map[peer.ID]int) w.wakuFlag = enr.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilterFullNode, w.opts.enableStore, w.opts.enableRelay) w.circuitRelayNodes = make(chan peer.AddrInfo) w.metrics = newMetrics(params.prometheusReg) @@ -255,8 +251,14 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { metadata := metadata.NewWakuMetadata(w.opts.clusterID, w.localNode, w.log) w.metadata = metadata + relay := relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.opts.prometheusReg, w.log, + relay.WithPubSubOptions(w.opts.pubsubOpts), + relay.WithMaxMsgSize(w.opts.maxMsgSizeBytes)) + + w.relay = relay + //Initialize peer manager. - w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, metadata, params.enableRelay, w.log) + w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, metadata, relay, params.enableRelay, w.log) w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, w.opts.onlineChecker, discoveryConnectTimeout, w.log) if err != nil { @@ -276,9 +278,6 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { } w.rendezvous = rendezvous.NewRendezvous(w.opts.rendezvousDB, w.peerConnector, w.log) - w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.opts.prometheusReg, w.log, - relay.WithPubSubOptions(w.opts.pubsubOpts), - relay.WithMaxMsgSize(w.opts.maxMsgSizeBytes)) if w.opts.enableRelay { err = w.setupRLNRelay() @@ -379,9 +378,9 @@ func (w *WakuNode) Start(ctx context.Context) error { return err } - if w.opts.keepAliveInterval > time.Duration(0) { + if w.opts.keepAliveRandomPeersInterval > time.Duration(0) || w.opts.keepAliveAllPeersInterval > time.Duration(0) { w.wg.Add(1) - go w.startKeepAlive(ctx, w.opts.keepAliveInterval) + go w.startKeepAlive(ctx, w.opts.keepAliveRandomPeersInterval, w.opts.keepAliveAllPeersInterval) } w.metadata.SetHost(host) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go index 26a82d0d0..82d964611 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go @@ -114,7 +114,8 @@ type WakuNodeParameters struct { rlnTreePath string rlnMembershipContractAddress common.Address - keepAliveInterval time.Duration + keepAliveRandomPeersInterval time.Duration + keepAliveAllPeersInterval time.Duration enableLightPush bool @@ -476,10 +477,14 @@ func WithLightPush(lightpushOpts ...lightpush.Option) WakuNodeOption { } // WithKeepAlive is a WakuNodeOption used to set the interval of time when -// each peer will be ping to keep the TCP connection alive -func WithKeepAlive(t time.Duration) WakuNodeOption { +// each peer will be ping to keep the TCP connection alive. Option accepts two +// intervals, the `randomPeersInterval`, which will be used to ping full mesh +// peers (if using relay) and random connected peers, and `allPeersInterval` +// which is used to ping all connected peers +func WithKeepAlive(randomPeersInterval time.Duration, allPeersInterval time.Duration) WakuNodeOption { return func(params *WakuNodeParameters) error { - params.keepAliveInterval = t + params.keepAliveRandomPeersInterval = randomPeersInterval + params.keepAliveAllPeersInterval = allPeersInterval return nil } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go index 173663c07..3126f9142 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go @@ -70,6 +70,7 @@ type WakuProtoInfo struct { type PeerManager struct { peerConnector *PeerConnectionStrategy metadata *metadata.WakuMetadata + relay *relay.WakuRelay maxPeers int maxRelayPeers int logger *zap.Logger @@ -123,7 +124,8 @@ func inAndOutRelayPeers(relayPeers int) (int, int) { // Also returns the healthyPeerCount func (pm *PeerManager) checkAndUpdateTopicHealth(topic *NodeTopicDetails) int { healthyPeerCount := 0 - for _, p := range topic.topic.ListPeers() { + + for _, p := range pm.relay.PubSub().MeshPeers(topic.topic.String()) { if pm.host.Network().Connectedness(p) == network.Connected { pThreshold, err := pm.host.Peerstore().(wps.WakuPeerstore).Score(p) if err == nil { @@ -143,8 +145,8 @@ func (pm *PeerManager) checkAndUpdateTopicHealth(topic *NodeTopicDetails) int { } } } + //Update topic's health - //TODO: This should be done based on number of full-mesh peers. oldHealth := topic.healthStatus if healthyPeerCount < 1 { //Ideally this check should be done with minPeersForRelay, but leaving it as is for now. topic.healthStatus = UnHealthy @@ -176,7 +178,7 @@ func (pm *PeerManager) TopicHealth(pubsubTopic string) (TopicHealth, error) { } // NewPeerManager creates a new peerManager instance. -func NewPeerManager(maxConnections int, maxPeers int, metadata *metadata.WakuMetadata, relayEnabled bool, logger *zap.Logger) *PeerManager { +func NewPeerManager(maxConnections int, maxPeers int, metadata *metadata.WakuMetadata, relay *relay.WakuRelay, relayEnabled bool, logger *zap.Logger) *PeerManager { var inPeersTarget, outPeersTarget, maxRelayPeers int if relayEnabled { maxRelayPeers, _ := relayAndServicePeers(maxConnections) @@ -194,6 +196,7 @@ func NewPeerManager(maxConnections int, maxPeers int, metadata *metadata.WakuMet pm := &PeerManager{ logger: logger.Named("peer-manager"), metadata: metadata, + relay: relay, maxRelayPeers: maxRelayPeers, InPeersTarget: inPeersTarget, OutPeersTarget: outPeersTarget, diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go index 361ab561b..9a2b651e8 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go @@ -165,7 +165,7 @@ func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData { s.Require().NoError(err) b := relay.NewBroadcaster(10) s.Require().NoError(b.Start(context.Background())) - pm := peermanager.NewPeerManager(5, 5, nil, true, s.Log) + pm := peermanager.NewPeerManager(5, 5, nil, nil, true, s.Log) filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), onlinechecker.NewDefaultOnlineChecker(true), prometheus.DefaultRegisterer, s.Log) filterPush.SetHost(host) pm.SetHost(host) diff --git a/vendor/modules.txt b/vendor/modules.txt index 36244ffc3..3184c15c0 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -568,7 +568,7 @@ github.com/libp2p/go-libp2p/p2p/transport/webtransport # github.com/libp2p/go-libp2p-asn-util v0.4.1 ## explicit; go 1.19 github.com/libp2p/go-libp2p-asn-util -# github.com/libp2p/go-libp2p-pubsub v0.11.0 => github.com/waku-org/go-libp2p-pubsub v0.0.0-20240701005450-b4513d154445 +# github.com/libp2p/go-libp2p-pubsub v0.11.0 => github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 ## explicit; go 1.21 github.com/libp2p/go-libp2p-pubsub github.com/libp2p/go-libp2p-pubsub/pb @@ -1018,7 +1018,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20240701141800-5b5ea977afe0 +# github.com/waku-org/go-waku v0.8.1-0.20240705124012-8df5a0337a1e ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests diff --git a/wakuv2/config.go b/wakuv2/config.go index 02ef66ad1..72a8c2f51 100644 --- a/wakuv2/config.go +++ b/wakuv2/config.go @@ -44,7 +44,6 @@ type Config struct { Port int `toml:",omitempty"` EnablePeerExchangeServer bool `toml:",omitempty"` // PeerExchange server makes sense only when discv5 is running locally as it will have a cache of peers that it can respond to in case a PeerExchange request comes from the PeerExchangeClient EnablePeerExchangeClient bool `toml:",omitempty"` - KeepAliveInterval int `toml:",omitempty"` MinPeersForRelay int `toml:",omitempty"` // Indicates the minimum number of peers required for using Relay Protocol MinPeersForFilter int `toml:",omitempty"` // Indicates the minimum number of peers required for using Filter Protocol LightClient bool `toml:",omitempty"` // Indicates if the node is a light client @@ -87,7 +86,6 @@ var DefaultConfig = Config{ MaxMessageSize: common.DefaultMaxMessageSize, Host: "0.0.0.0", Port: 0, - KeepAliveInterval: 10, // second DiscoveryLimit: 20, MinPeersForRelay: 1, // TODO: determine correct value with Vac team MinPeersForFilter: 2, // TODO: determine correct value with Vac team and via testing @@ -107,10 +105,6 @@ func setDefaults(cfg *Config) *Config { cfg.Host = DefaultConfig.Host } - if cfg.KeepAliveInterval == 0 { - cfg.KeepAliveInterval = DefaultConfig.KeepAliveInterval - } - if cfg.DiscoveryLimit == 0 { cfg.DiscoveryLimit = DefaultConfig.DiscoveryLimit } diff --git a/wakuv2/waku.go b/wakuv2/waku.go index c2d431cfd..11802c5a3 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -89,6 +89,8 @@ const hashQueryInterval = 3 * time.Second const messageSentPeriod = 3 // in seconds const messageExpiredPerid = 10 // in seconds const maxRelayPeers = 300 +const randomPeersKeepAliveInterval = 5 * time.Second +const allPeersKeepAliveInterval = 5 * time.Minute type SentEnvelope struct { Envelope *protocol.Envelope @@ -154,6 +156,7 @@ type Waku struct { storePeerID peer.ID topicHealthStatusChan chan peermanager.TopicHealthStatus + connectionNotifChan chan node.PeerConnection connStatusSubscriptions map[string]*types.ConnStatusSubscription connStatusMu sync.Mutex onlineChecker *onlinechecker.DefaultOnlineChecker @@ -167,8 +170,8 @@ type Waku struct { // bootnodes successfully seededBootnodesForDiscV5 bool - // connectionChanged is channel that notifies when connectivity has changed - connectionChanged chan struct{} + // goingOnline is channel that notifies when connectivity has changed from offline to online + goingOnline chan struct{} // discV5BootstrapNodes is the ENR to be used to fetch bootstrap nodes for discovery discV5BootstrapNodes []string @@ -224,6 +227,7 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit), sendQueue: make(chan *protocol.Envelope, 1000), topicHealthStatusChan: make(chan peermanager.TopicHealthStatus, 100), + connectionNotifChan: make(chan node.PeerConnection), connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription), topicInterest: make(map[string]TopicInterest), ctx: ctx, @@ -259,10 +263,6 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge return nil, fmt.Errorf("failed to setup the network interface: %v", err) } - if cfg.KeepAliveInterval == 0 { - cfg.KeepAliveInterval = DefaultConfig.KeepAliveInterval - } - libp2pOpts := node.DefaultLibP2POptions libp2pOpts = append(libp2pOpts, libp2p.BandwidthReporter(waku.bandwidthCounter)) libp2pOpts = append(libp2pOpts, libp2p.NATPortMap()) @@ -271,8 +271,9 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge node.WithLibP2POptions(libp2pOpts...), node.WithPrivateKey(nodeKey), node.WithHostAddress(hostAddr), + node.WithConnectionNotification(waku.connectionNotifChan), node.WithTopicHealthStatusChannel(waku.topicHealthStatusChan), - node.WithKeepAlive(time.Duration(cfg.KeepAliveInterval) * time.Second), + node.WithKeepAlive(randomPeersKeepAliveInterval, allPeersKeepAliveInterval), node.WithLogger(logger), node.WithLogLevel(logger.Level()), node.WithClusterID(cfg.ClusterID), @@ -1293,48 +1294,6 @@ func (w *Waku) Query(ctx context.Context, peerID peer.ID, query legacy_store.Que return result.Cursor(), len(result.Messages), nil } -func (w *Waku) lightClientConnectionStatus() { - - peers := w.node.Host().Network().Peers() - w.logger.Debug("peer stats", - zap.Int("peersCount", len(peers))) - subs := w.node.FilterLightnode().Subscriptions() - w.logger.Debug("filter subs count", zap.Int("count", len(subs))) - isOnline := false - if len(peers) > 0 { - isOnline = true - } - //TODOL needs fixing, right now invoking everytime. - //Trigger FilterManager to take care of any pending filter subscriptions - //TODO: Pass pubsubTopic based on topicHealth notif received. - go w.filterManager.onConnectionStatusChange(w.cfg.DefaultShardPubsubTopic, isOnline) - - w.connStatusMu.Lock() - - connStatus := types.ConnStatus{ - IsOnline: isOnline, - Peers: FormatPeerStats(w.node), - } - for k, subs := range w.connStatusSubscriptions { - if !subs.Send(connStatus) { - delete(w.connStatusSubscriptions, k) - } - } - w.connStatusMu.Unlock() - if w.onPeerStats != nil { - w.onPeerStats(connStatus) - } - - //TODO:Analyze if we need to discover and connect to peers with peerExchange loop enabled. - if !w.onlineChecker.IsOnline() && isOnline { - if err := w.discoverAndConnectPeers(); err != nil { - w.logger.Error("failed to add wakuv2 peers", zap.Error(err)) - } - } - - w.onlineChecker.SetOnline(isOnline) -} - // Start implements node.Service, starting the background data propagation thread // of the Waku protocol. func (w *Waku) Start() error { @@ -1347,7 +1306,7 @@ func (w *Waku) Start() error { return fmt.Errorf("failed to create a go-waku node: %v", err) } - w.connectionChanged = make(chan struct{}) + w.goingOnline = make(chan struct{}) if err = w.node.Start(w.ctx); err != nil { return fmt.Errorf("failed to start go-waku node: %v", err) @@ -1377,46 +1336,40 @@ func (w *Waku) Start() error { go func() { defer w.wg.Done() - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - peerCountTimer := time.NewTimer(0) // fire immediately - defer peerCountTimer.Stop() - - peerCount := 0 for { select { case <-w.ctx.Done(): return - case <-ticker.C: - //TODO: Need to fix. - // Temporary changes for lightNodes to have health check based on connected peers. - //This needs to be enhanced to be based on healthy Filter and lightPush peers available for each shard. - //This would get fixed as part of https://github.com/waku-org/go-waku/issues/1114 + + case <-w.topicHealthStatusChan: + // TODO: https://github.com/status-im/status-go/issues/4628 + + case <-w.connectionNotifChan: + + isOnline := len(w.node.Host().Network().Peers()) > 0 + if w.cfg.LightClient { - w.lightClientConnectionStatus() + // TODO: Temporary changes for lightNodes to have health check based on connected peers. + //This needs to be enhanced to be based on healthy Filter and lightPush peers available for each shard. + //This would get fixed as part of https://github.com/waku-org/go-waku/issues/1114 + + subs := w.node.FilterLightnode().Subscriptions() + w.logger.Debug("filter subs count", zap.Int("count", len(subs))) + + //TODO: needs fixing, right now invoking everytime. + //Trigger FilterManager to take care of any pending filter subscriptions + //TODO: Pass pubsubTopic based on topicHealth notif received. + go w.filterManager.onConnectionStatusChange(w.cfg.DefaultShardPubsubTopic, isOnline) + } - case <-peerCountTimer.C: - peerCountTimer.Reset(3 * time.Second) - newPeerCount := len(w.node.Host().Network().Peers()) - if newPeerCount != peerCount && w.onPeerStats != nil { - peerCount = newPeerCount - // TODO: `IsOnline` is not implemented correctly here, however - // this is not a problem because Desktop ignores that value. - // This should be fixed to as part of issue - // https://github.com/status-im/status-go/issues/4628 - w.onPeerStats(types.ConnStatus{ - IsOnline: true, - Peers: FormatPeerStats(w.node), - }) - } - case c := <-w.topicHealthStatusChan: w.connStatusMu.Lock() - // TODO: https://github.com/status-im/status-go/issues/4628 - // This code is not using the topic health status correctly. - // It assumes we are using a single pubsub topic for now - latestConnStatus := formatConnStatus(w.node, c) + latestConnStatus := types.ConnStatus{ + IsOnline: isOnline, + Peers: FormatPeerStats(w.node), + } + w.logger.Debug("peer stats", zap.Int("peersCount", len(latestConnStatus.Peers)), zap.Any("stats", latestConnStatus)) @@ -1425,11 +1378,20 @@ func (w *Waku) Start() error { delete(w.connStatusSubscriptions, k) } } + w.connStatusMu.Unlock() + if w.onPeerStats != nil { w.onPeerStats(latestConnStatus) } + //TODO: analyze if we need to discover and connect to peers with peerExchange loop enabled. + if !w.onlineChecker.IsOnline() && isOnline { + if err := w.discoverAndConnectPeers(); err != nil { + w.logger.Error("failed to add wakuv2 peers", zap.Error(err)) + } + } + w.ConnectionChanged(connection.State{ Offline: !latestConnStatus.IsOnline, }) @@ -1520,7 +1482,7 @@ func (w *Waku) Stop() error { } } - close(w.connectionChanged) + close(w.goingOnline) w.wg.Wait() w.ctx = nil @@ -1724,7 +1686,7 @@ func (w *Waku) RelayPeersByTopic(topic string) (*types.PeerList, error) { } return &types.PeerList{ - FullMeshPeers: w.node.Relay().PubSub().Router().(*pubsub.GossipSubRouter).MeshPeers(topic), + FullMeshPeers: w.node.Relay().PubSub().MeshPeers(topic), AllPeers: w.node.Relay().PubSub().ListPeers(topic), }, nil } @@ -1818,7 +1780,7 @@ func (w *Waku) StopDiscV5() error { func (w *Waku) ConnectionChanged(state connection.State) { if !state.Offline && !w.onlineChecker.IsOnline() { select { - case w.connectionChanged <- struct{}{}: + case w.goingOnline <- struct{}{}: default: w.logger.Warn("could not write on connection changed channel") } @@ -1880,7 +1842,7 @@ func (w *Waku) seedBootnodesForDiscV5() { } // If we go online, trigger immediately - case <-w.connectionChanged: + case <-w.goingOnline: if w.cfg.EnableDiscV5 { if canQuery() { err := w.restartDiscV5() @@ -2073,18 +2035,6 @@ func FormatPeerStats(wakuNode *node.WakuNode) map[string]types.WakuV2Peer { return p } -func formatConnStatus(wakuNode *node.WakuNode, c peermanager.TopicHealthStatus) types.ConnStatus { - isOnline := true - if c.Health == peermanager.UnHealthy { - isOnline = false - } - - return types.ConnStatus{ - IsOnline: isOnline, - Peers: FormatPeerStats(wakuNode), - } -} - func (w *Waku) StoreNode() legacy_store.Store { return w.node.LegacyStore() } diff --git a/wakuv2/waku_test.go b/wakuv2/waku_test.go index 76d46a6fd..0e9a18da2 100644 --- a/wakuv2/waku_test.go +++ b/wakuv2/waku_test.go @@ -357,7 +357,6 @@ func TestWakuV2Filter(t *testing.T) { setDefaultConfig(config, true) config.EnablePeerExchangeClient = false config.Port = 0 - config.KeepAliveInterval = 0 config.MinPeersForFilter = 2 config.DiscV5BootstrapNodes = []string{enrTreeAddress} @@ -455,7 +454,6 @@ func TestWakuV2Store(t *testing.T) { EnableStore: false, StoreCapacity: 100, StoreSeconds: 3600, - KeepAliveInterval: 10, } w1PeersCh := make(chan []string, 100) // buffered not to block on the send side @@ -482,7 +480,6 @@ func TestWakuV2Store(t *testing.T) { EnableStore: true, StoreCapacity: 100, StoreSeconds: 3600, - KeepAliveInterval: 10, } // Start the second Waku node @@ -513,6 +510,8 @@ func TestWakuV2Store(t *testing.T) { _, err = w2.Subscribe(filter) require.NoError(t, err) + time.Sleep(2 * time.Second) + // Send a message from the first node msgTimestamp := w1.CurrentTime().UnixNano() contentTopic := maps.Keys(filter.ContentTopics)[0] @@ -654,7 +653,7 @@ func TestOnlineChecker(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - <-w.connectionChanged + <-w.goingOnline require.True(t, true) }()