feat_: bump go-waku to introduce new keep alive interval (#5484)

- Also renames the existing keepAliveInterval to randomPeerKeepAliveInterval, and uses time.Duration instead of int
This commit is contained in:
richΛrd 2024-07-11 14:36:34 -04:00 committed by GitHub
parent 6ace9f9be0
commit 56cc5c96c5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 259 additions and 192 deletions

View File

@ -1 +1 @@
0.180.0 0.179.11

View File

@ -215,7 +215,6 @@ func randomNodeConfig() *params.NodeConfig {
Enabled: randomBool(), Enabled: randomBool(),
Host: randomString(), Host: randomString(),
Port: randomInt(math.MaxInt64), Port: randomInt(math.MaxInt64),
KeepAliveInterval: randomInt(math.MaxInt64),
LightClient: randomBool(), LightClient: randomBool(),
FullNode: randomBool(), FullNode: randomBool(),
DiscoveryLimit: randomInt(math.MaxInt64), DiscoveryLimit: randomInt(math.MaxInt64),

4
go.mod
View File

@ -16,7 +16,7 @@ replace github.com/forPelevin/gomoji => github.com/status-im/gomoji v1.1.3-0.202
replace github.com/mutecomm/go-sqlcipher/v4 v4.4.2 => github.com/status-im/go-sqlcipher/v4 v4.5.4-status.2 replace github.com/mutecomm/go-sqlcipher/v4 v4.4.2 => github.com/status-im/go-sqlcipher/v4 v4.5.4-status.2
replace github.com/libp2p/go-libp2p-pubsub v0.11.0 => github.com/waku-org/go-libp2p-pubsub v0.0.0-20240701005450-b4513d154445 replace github.com/libp2p/go-libp2p-pubsub v0.11.0 => github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5
require ( require (
github.com/anacrolix/torrent v1.41.0 github.com/anacrolix/torrent v1.41.0
@ -96,7 +96,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0 github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2 github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20240701141800-5b5ea977afe0 github.com/waku-org/go-waku v0.8.1-0.20240705124012-8df5a0337a1e
github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1

8
go.sum
View File

@ -2139,12 +2139,12 @@ github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5 h1:4K3IS97JryAEV8pRXB//qPcg+8bPXl/O+AOLt3FeCKc= github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5 h1:4K3IS97JryAEV8pRXB//qPcg+8bPXl/O+AOLt3FeCKc=
github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw= github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240701005450-b4513d154445 h1:V5f5NGsf/UwlJENmJjHGD9lr+3/Bz4ZZ6mL61tvtxgg= github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16Et17jSGWM3mQhlIOZYiG+O+rlX5BsrBumw5flxk=
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240701005450-b4513d154445/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20240701141800-5b5ea977afe0 h1:3Idg7XvXc9iQpUyg8KNKgZnziHJRs3xm7EDJHFzC9to= github.com/waku-org/go-waku v0.8.1-0.20240705124012-8df5a0337a1e h1:aqZA60QlYiMBcgqHEx6ksof4A6+KspMIqoXu/ACXmKU=
github.com/waku-org/go-waku v0.8.1-0.20240701141800-5b5ea977afe0/go.mod h1:hkW5zXyM/ZIMDPniVooTk4dOGwY+OzrB0Q6fx+1sLpo= github.com/waku-org/go-waku v0.8.1-0.20240705124012-8df5a0337a1e/go.mod h1:nLQmeqztw19wmJI9rpXY6Lx81yYE/qOAd1qZsrAK2A8=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=

View File

@ -323,7 +323,6 @@ func (b *StatusNode) wakuV2Service(nodeConfig *params.NodeConfig) (*wakuv2.Waku,
Host: nodeConfig.WakuV2Config.Host, Host: nodeConfig.WakuV2Config.Host,
Port: nodeConfig.WakuV2Config.Port, Port: nodeConfig.WakuV2Config.Port,
LightClient: nodeConfig.WakuV2Config.LightClient, LightClient: nodeConfig.WakuV2Config.LightClient,
KeepAliveInterval: nodeConfig.WakuV2Config.KeepAliveInterval,
Rendezvous: nodeConfig.Rendezvous, Rendezvous: nodeConfig.Rendezvous,
WakuNodes: nodeConfig.ClusterConfig.WakuNodes, WakuNodes: nodeConfig.ClusterConfig.WakuNodes,
EnableStore: nodeConfig.WakuV2Config.EnableStore, EnableStore: nodeConfig.WakuV2Config.EnableStore,

View File

@ -206,10 +206,10 @@ func insertTorrentConfig(tx *sql.Tx, c *params.NodeConfig) error {
func insertWakuV2ConfigPreMigration(tx *sql.Tx, c *params.NodeConfig) error { func insertWakuV2ConfigPreMigration(tx *sql.Tx, c *params.NodeConfig) error {
_, err := tx.Exec(` _, err := tx.Exec(`
INSERT OR REPLACE INTO wakuv2_config ( INSERT OR REPLACE INTO wakuv2_config (
enabled, host, port, keep_alive_interval, light_client, full_node, discovery_limit, data_dir, enabled, host, port, light_client, full_node, discovery_limit, data_dir,
max_message_size, enable_confirmations, peer_exchange, enable_discv5, udp_port, auto_update, synthetic_id max_message_size, enable_confirmations, peer_exchange, enable_discv5, udp_port, auto_update, synthetic_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'id')`, ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'id')`,
c.WakuV2Config.Enabled, c.WakuV2Config.Host, c.WakuV2Config.Port, c.WakuV2Config.KeepAliveInterval, c.WakuV2Config.LightClient, c.WakuV2Config.FullNode, c.WakuV2Config.DiscoveryLimit, c.WakuV2Config.DataDir, c.WakuV2Config.Enabled, c.WakuV2Config.Host, c.WakuV2Config.Port, c.WakuV2Config.LightClient, c.WakuV2Config.FullNode, c.WakuV2Config.DiscoveryLimit, c.WakuV2Config.DataDir,
c.WakuV2Config.MaxMessageSize, c.WakuV2Config.EnableConfirmations, c.WakuV2Config.PeerExchange, c.WakuV2Config.EnableDiscV5, c.WakuV2Config.UDPPort, c.WakuV2Config.AutoUpdate, c.WakuV2Config.MaxMessageSize, c.WakuV2Config.EnableConfirmations, c.WakuV2Config.PeerExchange, c.WakuV2Config.EnableDiscV5, c.WakuV2Config.UDPPort, c.WakuV2Config.AutoUpdate,
) )
if err != nil { if err != nil {
@ -676,12 +676,12 @@ func loadNodeConfig(tx *sql.Tx) (*params.NodeConfig, error) {
} }
err = tx.QueryRow(` err = tx.QueryRow(`
SELECT enabled, host, port, keep_alive_interval, light_client, full_node, discovery_limit, data_dir, SELECT enabled, host, port, light_client, full_node, discovery_limit, data_dir,
max_message_size, enable_confirmations, peer_exchange, enable_discv5, udp_port, auto_update, max_message_size, enable_confirmations, peer_exchange, enable_discv5, udp_port, auto_update,
enable_store, store_capacity, store_seconds, use_shard_default_topic, enable_missing_message_verification enable_store, store_capacity, store_seconds, use_shard_default_topic, enable_missing_message_verification
FROM wakuv2_config WHERE synthetic_id = 'id' FROM wakuv2_config WHERE synthetic_id = 'id'
`).Scan( `).Scan(
&nodecfg.WakuV2Config.Enabled, &nodecfg.WakuV2Config.Host, &nodecfg.WakuV2Config.Port, &nodecfg.WakuV2Config.KeepAliveInterval, &nodecfg.WakuV2Config.LightClient, &nodecfg.WakuV2Config.FullNode, &nodecfg.WakuV2Config.Enabled, &nodecfg.WakuV2Config.Host, &nodecfg.WakuV2Config.Port, &nodecfg.WakuV2Config.LightClient, &nodecfg.WakuV2Config.FullNode,
&nodecfg.WakuV2Config.DiscoveryLimit, &nodecfg.WakuV2Config.DataDir, &nodecfg.WakuV2Config.MaxMessageSize, &nodecfg.WakuV2Config.EnableConfirmations, &nodecfg.WakuV2Config.DiscoveryLimit, &nodecfg.WakuV2Config.DataDir, &nodecfg.WakuV2Config.MaxMessageSize, &nodecfg.WakuV2Config.EnableConfirmations,
&nodecfg.WakuV2Config.PeerExchange, &nodecfg.WakuV2Config.EnableDiscV5, &nodecfg.WakuV2Config.UDPPort, &nodecfg.WakuV2Config.AutoUpdate, &nodecfg.WakuV2Config.PeerExchange, &nodecfg.WakuV2Config.EnableDiscV5, &nodecfg.WakuV2Config.UDPPort, &nodecfg.WakuV2Config.AutoUpdate,
&nodecfg.WakuV2Config.EnableStore, &nodecfg.WakuV2Config.StoreCapacity, &nodecfg.WakuV2Config.StoreSeconds, &nodecfg.WakuV2Config.UseShardAsDefaultTopic, &nodecfg.WakuV2Config.EnableStore, &nodecfg.WakuV2Config.StoreCapacity, &nodecfg.WakuV2Config.StoreSeconds, &nodecfg.WakuV2Config.UseShardAsDefaultTopic,

View File

@ -161,9 +161,6 @@ type WakuV2Config struct {
// Port number in which to start libp2p protocol (0 for random) // Port number in which to start libp2p protocol (0 for random)
Port int Port int
// Interval of time in seconds to send a ping to peers to keep the connection to them alive
KeepAliveInterval int
// LightClient should be true if the node will not relay messages and only rely on lightpush/filter nodes // LightClient should be true if the node will not relay messages and only rely on lightpush/filter nodes
LightClient bool LightClient bool

View File

@ -1,11 +1,14 @@
package protocol package protocol
import ( import (
"crypto/ecdsa"
"database/sql" "database/sql"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/ethereum/go-ethereum/crypto"
"github.com/status-im/status-go/appdatabase" "github.com/status-im/status-go/appdatabase"
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
@ -19,6 +22,7 @@ type testWakuV2Config struct {
enableStore bool enableStore bool
useShardAsDefaultTopic bool useShardAsDefaultTopic bool
clusterID uint16 clusterID uint16
nodekey []byte
} }
func NewTestWakuV2(s *suite.Suite, cfg testWakuV2Config) *waku2.Waku { func NewTestWakuV2(s *suite.Suite, cfg testWakuV2Config) *waku2.Waku {
@ -31,6 +35,11 @@ func NewTestWakuV2(s *suite.Suite, cfg testWakuV2Config) *waku2.Waku {
EnableDiscV5: false, EnableDiscV5: false,
} }
var nodeKey *ecdsa.PrivateKey
if len(cfg.nodekey) != 0 {
nodeKey, _ = crypto.ToECDSA(cfg.nodekey)
}
var db *sql.DB var db *sql.DB
if cfg.enableStore { if cfg.enableStore {
@ -44,7 +53,7 @@ func NewTestWakuV2(s *suite.Suite, cfg testWakuV2Config) *waku2.Waku {
} }
wakuNode, err := waku2.New( wakuNode, err := waku2.New(
nil, nodeKey,
"", "",
wakuConfig, wakuConfig,
cfg.logger, cfg.logger,

View File

@ -1985,7 +1985,7 @@ func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID
return peers return peers
} }
func (gs *GossipSubRouter) MeshPeers(topic string) []peer.ID { func (gs *GossipSubRouter) meshPeers(topic string) []peer.ID {
peers, ok := gs.mesh[topic] peers, ok := gs.mesh[topic]
if !ok { if !ok {
return nil return nil

View File

@ -90,6 +90,9 @@ type PubSub struct {
// get chan of peers we are connected to // get chan of peers we are connected to
getPeers chan *listPeerReq getPeers chan *listPeerReq
// get chan to obtain list of full mesh peers (only applies when ussing gossipsub)
getMeshPeers chan *listPeerReq
// send subscription here to cancel it // send subscription here to cancel it
cancelCh chan *Subscription cancelCh chan *Subscription
@ -271,6 +274,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
deadPeerBackoff: newBackoff(ctx, 1000, BackoffCleanupInterval, MaxBackoffAttempts), deadPeerBackoff: newBackoff(ctx, 1000, BackoffCleanupInterval, MaxBackoffAttempts),
cancelCh: make(chan *Subscription), cancelCh: make(chan *Subscription),
getPeers: make(chan *listPeerReq), getPeers: make(chan *listPeerReq),
getMeshPeers: make(chan *listPeerReq),
addSub: make(chan *addSubReq), addSub: make(chan *addSubReq),
addRelay: make(chan *addRelayReq), addRelay: make(chan *addRelayReq),
rmRelay: make(chan string), rmRelay: make(chan string),
@ -618,6 +622,13 @@ func (p *PubSub) processLoop(ctx context.Context) {
p.handleAddRelay(relay) p.handleAddRelay(relay)
case topic := <-p.rmRelay: case topic := <-p.rmRelay:
p.handleRemoveRelay(topic) p.handleRemoveRelay(topic)
case meshpreq := <-p.getMeshPeers:
var peers []peer.ID
rt, ok := p.rt.(*GossipSubRouter)
if ok {
peers = rt.meshPeers(meshpreq.topic)
}
meshpreq.resp <- peers
case preq := <-p.getPeers: case preq := <-p.getPeers:
tmap, ok := p.topics[preq.topic] tmap, ok := p.topics[preq.topic]
if preq.topic != "" && !ok { if preq.topic != "" && !ok {
@ -1364,6 +1375,20 @@ func (p *PubSub) ListPeers(topic string) []peer.ID {
return <-out return <-out
} }
// MeshPeers returns a list of full mesh peers for a given topic
func (p *PubSub) MeshPeers(topic string) []peer.ID {
out := make(chan []peer.ID)
select {
case p.getMeshPeers <- &listPeerReq{
resp: out,
topic: topic,
}:
case <-p.ctx.Done():
return nil
}
return <-out
}
// BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped. // BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped.
func (p *PubSub) BlacklistPeer(pid peer.ID) { func (p *PubSub) BlacklistPeer(pid peer.ID) {
select { select {
@ -1420,7 +1445,3 @@ type addRelayReq struct {
topic string topic string
resp chan RelayCancelFunc resp chan RelayCancelFunc
} }
func (p *PubSub) Router() PubSubRouter {
return p.rt
}

View File

@ -2,6 +2,8 @@ package node
import ( import (
"context" "context"
"errors"
"math/rand"
"sync" "sync"
"time" "time"
@ -10,6 +12,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/logging"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/exp/maps"
) )
const maxAllowedPingFailures = 2 const maxAllowedPingFailures = 2
@ -19,86 +22,175 @@ const maxAllowedPingFailures = 2
// the peers if they don't reply back // the peers if they don't reply back
const sleepDetectionIntervalFactor = 3 const sleepDetectionIntervalFactor = 3
const maxPeersToPing = 10
// startKeepAlive creates a go routine that periodically pings connected peers. // startKeepAlive creates a go routine that periodically pings connected peers.
// This is necessary because TCP connections are automatically closed due to inactivity, // This is necessary because TCP connections are automatically closed due to inactivity,
// and doing a ping will avoid this (with a small bandwidth cost) // and doing a ping will avoid this (with a small bandwidth cost)
func (w *WakuNode) startKeepAlive(ctx context.Context, t time.Duration) { func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration time.Duration, allPeersPingDuration time.Duration) {
defer w.wg.Done() defer w.wg.Done()
w.log.Info("setting up ping protocol", zap.Duration("duration", t))
ticker := time.NewTicker(t) if !w.opts.enableRelay {
defer ticker.Stop() return
}
w.log.Info("setting up ping protocol", zap.Duration("randomPeersPingDuration", randomPeersPingDuration), zap.Duration("allPeersPingDuration", allPeersPingDuration))
randomPeersTickerC := make(<-chan time.Time)
if randomPeersPingDuration != 0 {
randomPeersTicker := time.NewTicker(randomPeersPingDuration)
defer randomPeersTicker.Stop()
randomPeersTickerC = randomPeersTicker.C
}
allPeersTickerC := make(<-chan time.Time)
if randomPeersPingDuration != 0 {
allPeersTicker := time.NewTicker(randomPeersPingDuration)
defer allPeersTicker.Stop()
randomPeersTickerC = allPeersTicker.C
}
lastTimeExecuted := w.timesource.Now() lastTimeExecuted := w.timesource.Now()
sleepDetectionInterval := int64(t) * sleepDetectionIntervalFactor sleepDetectionInterval := int64(randomPeersPingDuration) * sleepDetectionIntervalFactor
for { for {
peersToPing := []peer.ID{}
select { select {
case <-ticker.C: case <-allPeersTickerC:
relayPeersSet := make(map[peer.ID]struct{})
for _, t := range w.Relay().Topics() {
for _, p := range w.Relay().PubSub().ListPeers(t) {
relayPeersSet[p] = struct{}{}
}
}
peersToPing = maps.Keys(relayPeersSet)
case <-randomPeersTickerC:
difference := w.timesource.Now().UnixNano() - lastTimeExecuted.UnixNano() difference := w.timesource.Now().UnixNano() - lastTimeExecuted.UnixNano()
forceDisconnectOnPingFailure := false
if difference > sleepDetectionInterval { if difference > sleepDetectionInterval {
forceDisconnectOnPingFailure = true
lastTimeExecuted = w.timesource.Now() lastTimeExecuted = w.timesource.Now()
w.log.Warn("keep alive hasnt been executed recently. Killing connections to peers if ping fails") w.log.Warn("keep alive hasnt been executed recently. Killing all connections")
for _, p := range w.host.Network().Peers() {
err := w.host.Network().ClosePeer(p)
if err != nil {
w.log.Debug("closing conn to peer", zap.Error(err))
}
}
continue continue
} }
// Network's peers collection, // Priorize mesh peers
// contains only currently active peers meshPeersSet := make(map[peer.ID]struct{})
pingWg := sync.WaitGroup{} for _, t := range w.Relay().Topics() {
peersToPing := w.host.Network().Peers() for _, p := range w.Relay().PubSub().MeshPeers(t) {
pingWg.Add(len(peersToPing)) meshPeersSet[p] = struct{}{}
for _, p := range peersToPing {
if p != w.host.ID() {
go w.pingPeer(ctx, &pingWg, p, forceDisconnectOnPingFailure)
} }
} }
pingWg.Wait() peersToPing = append(peersToPing, maps.Keys(meshPeersSet)...)
// Ping also some random relay peers
if maxPeersToPing-len(peersToPing) > 0 {
relayPeersSet := make(map[peer.ID]struct{})
for _, t := range w.Relay().Topics() {
for _, p := range w.Relay().PubSub().ListPeers(t) {
if _, ok := meshPeersSet[p]; !ok {
relayPeersSet[p] = struct{}{}
}
}
}
relayPeers := maps.Keys(relayPeersSet)
rand.Shuffle(len(relayPeers), func(i, j int) { relayPeers[i], relayPeers[j] = relayPeers[j], relayPeers[i] })
peerLen := maxPeersToPing - len(peersToPing)
if peerLen > len(relayPeers) {
peerLen = len(relayPeers)
}
peersToPing = append(peersToPing, relayPeers[0:peerLen]...)
}
lastTimeExecuted = w.timesource.Now()
case <-ctx.Done(): case <-ctx.Done():
w.log.Info("stopping ping protocol") w.log.Info("stopping ping protocol")
return return
} }
pingWg := sync.WaitGroup{}
pingWg.Add(len(peersToPing))
for _, p := range peersToPing {
go w.pingPeer(ctx, &pingWg, p)
}
pingWg.Wait()
lastTimeExecuted = w.timesource.Now()
} }
} }
func (w *WakuNode) pingPeer(ctx context.Context, wg *sync.WaitGroup, peerID peer.ID, forceDisconnectOnFail bool) { func (w *WakuNode) pingPeer(ctx context.Context, wg *sync.WaitGroup, peerID peer.ID) {
defer wg.Done() defer wg.Done()
logger := w.log.With(logging.HostID("peer", peerID))
for i := 0; i < maxAllowedPingFailures; i++ {
if w.host.Network().Connectedness(peerID) != network.Connected {
// Peer is no longer connected. No need to ping
return
}
logger.Debug("pinging")
if w.tryPing(ctx, peerID, logger) {
return
}
}
if w.host.Network().Connectedness(peerID) != network.Connected {
return
}
logger.Info("disconnecting dead peer")
if err := w.host.Network().ClosePeer(peerID); err != nil {
logger.Debug("closing conn to peer", zap.Error(err))
}
}
func (w *WakuNode) tryPing(ctx context.Context, peerID peer.ID, logger *zap.Logger) bool {
ctx, cancel := context.WithTimeout(ctx, 7*time.Second) ctx, cancel := context.WithTimeout(ctx, 7*time.Second)
defer cancel() defer cancel()
logger := w.log.With(logging.HostID("peer", peerID))
logger.Debug("pinging")
pr := ping.Ping(ctx, w.host, peerID) pr := ping.Ping(ctx, w.host, peerID)
select { select {
case res := <-pr: case res := <-pr:
if res.Error != nil { if res.Error != nil {
w.keepAliveMutex.Lock()
w.keepAliveFails[peerID]++
w.keepAliveMutex.Unlock()
logger.Debug("could not ping", zap.Error(res.Error)) logger.Debug("could not ping", zap.Error(res.Error))
} else { return false
w.keepAliveMutex.Lock()
delete(w.keepAliveFails, peerID)
w.keepAliveMutex.Unlock()
} }
case <-ctx.Done(): case <-ctx.Done():
w.keepAliveMutex.Lock() if !errors.Is(ctx.Err(), context.Canceled) {
w.keepAliveFails[peerID]++ logger.Debug("could not ping (context)", zap.Error(ctx.Err()))
w.keepAliveMutex.Unlock() }
logger.Debug("could not ping (context done)", zap.Error(ctx.Err())) return false
}
return true
} }
w.keepAliveMutex.Lock() func (w *WakuNode) getRelayPeers() []peer.ID {
if (forceDisconnectOnFail || w.keepAliveFails[peerID] > maxAllowedPingFailures) && w.host.Network().Connectedness(peerID) == network.Connected { relayPeers := make(map[peer.ID]struct{})
logger.Info("disconnecting peer") for _, t := range w.Relay().Topics() {
if err := w.host.Network().ClosePeer(peerID); err != nil { for _, p := range w.Relay().PubSub().ListPeers(t) {
logger.Debug("closing conn to peer", zap.Error(err)) relayPeers[p] = struct{}{}
} }
w.keepAliveFails[peerID] = 0
} }
w.keepAliveMutex.Unlock() return maps.Keys(relayPeers)
}
func (w *WakuNode) getFullMeshPeers() []peer.ID {
meshPeers := make(map[peer.ID]struct{})
for _, t := range w.Relay().Topics() {
for _, p := range w.Relay().PubSub().MeshPeers(t) {
meshPeers[p] = struct{}{}
}
}
return maps.Keys(meshPeers)
} }

View File

@ -116,9 +116,6 @@ type WakuNode struct {
addressChangesSub event.Subscription addressChangesSub event.Subscription
enrChangeCh chan struct{} enrChangeCh chan struct{}
keepAliveMutex sync.Mutex
keepAliveFails map[peer.ID]int
cancel context.CancelFunc cancel context.CancelFunc
wg *sync.WaitGroup wg *sync.WaitGroup
@ -193,7 +190,6 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.opts = params w.opts = params
w.log = params.logger.Named("node2") w.log = params.logger.Named("node2")
w.wg = &sync.WaitGroup{} w.wg = &sync.WaitGroup{}
w.keepAliveFails = make(map[peer.ID]int)
w.wakuFlag = enr.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilterFullNode, w.opts.enableStore, w.opts.enableRelay) w.wakuFlag = enr.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilterFullNode, w.opts.enableStore, w.opts.enableRelay)
w.circuitRelayNodes = make(chan peer.AddrInfo) w.circuitRelayNodes = make(chan peer.AddrInfo)
w.metrics = newMetrics(params.prometheusReg) w.metrics = newMetrics(params.prometheusReg)
@ -255,8 +251,14 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
metadata := metadata.NewWakuMetadata(w.opts.clusterID, w.localNode, w.log) metadata := metadata.NewWakuMetadata(w.opts.clusterID, w.localNode, w.log)
w.metadata = metadata w.metadata = metadata
relay := relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.opts.prometheusReg, w.log,
relay.WithPubSubOptions(w.opts.pubsubOpts),
relay.WithMaxMsgSize(w.opts.maxMsgSizeBytes))
w.relay = relay
//Initialize peer manager. //Initialize peer manager.
w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, metadata, params.enableRelay, w.log) w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, metadata, relay, params.enableRelay, w.log)
w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, w.opts.onlineChecker, discoveryConnectTimeout, w.log) w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, w.opts.onlineChecker, discoveryConnectTimeout, w.log)
if err != nil { if err != nil {
@ -276,9 +278,6 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
} }
w.rendezvous = rendezvous.NewRendezvous(w.opts.rendezvousDB, w.peerConnector, w.log) w.rendezvous = rendezvous.NewRendezvous(w.opts.rendezvousDB, w.peerConnector, w.log)
w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.opts.prometheusReg, w.log,
relay.WithPubSubOptions(w.opts.pubsubOpts),
relay.WithMaxMsgSize(w.opts.maxMsgSizeBytes))
if w.opts.enableRelay { if w.opts.enableRelay {
err = w.setupRLNRelay() err = w.setupRLNRelay()
@ -379,9 +378,9 @@ func (w *WakuNode) Start(ctx context.Context) error {
return err return err
} }
if w.opts.keepAliveInterval > time.Duration(0) { if w.opts.keepAliveRandomPeersInterval > time.Duration(0) || w.opts.keepAliveAllPeersInterval > time.Duration(0) {
w.wg.Add(1) w.wg.Add(1)
go w.startKeepAlive(ctx, w.opts.keepAliveInterval) go w.startKeepAlive(ctx, w.opts.keepAliveRandomPeersInterval, w.opts.keepAliveAllPeersInterval)
} }
w.metadata.SetHost(host) w.metadata.SetHost(host)

View File

@ -114,7 +114,8 @@ type WakuNodeParameters struct {
rlnTreePath string rlnTreePath string
rlnMembershipContractAddress common.Address rlnMembershipContractAddress common.Address
keepAliveInterval time.Duration keepAliveRandomPeersInterval time.Duration
keepAliveAllPeersInterval time.Duration
enableLightPush bool enableLightPush bool
@ -476,10 +477,14 @@ func WithLightPush(lightpushOpts ...lightpush.Option) WakuNodeOption {
} }
// WithKeepAlive is a WakuNodeOption used to set the interval of time when // WithKeepAlive is a WakuNodeOption used to set the interval of time when
// each peer will be ping to keep the TCP connection alive // each peer will be ping to keep the TCP connection alive. Option accepts two
func WithKeepAlive(t time.Duration) WakuNodeOption { // intervals, the `randomPeersInterval`, which will be used to ping full mesh
// peers (if using relay) and random connected peers, and `allPeersInterval`
// which is used to ping all connected peers
func WithKeepAlive(randomPeersInterval time.Duration, allPeersInterval time.Duration) WakuNodeOption {
return func(params *WakuNodeParameters) error { return func(params *WakuNodeParameters) error {
params.keepAliveInterval = t params.keepAliveRandomPeersInterval = randomPeersInterval
params.keepAliveAllPeersInterval = allPeersInterval
return nil return nil
} }
} }

View File

@ -70,6 +70,7 @@ type WakuProtoInfo struct {
type PeerManager struct { type PeerManager struct {
peerConnector *PeerConnectionStrategy peerConnector *PeerConnectionStrategy
metadata *metadata.WakuMetadata metadata *metadata.WakuMetadata
relay *relay.WakuRelay
maxPeers int maxPeers int
maxRelayPeers int maxRelayPeers int
logger *zap.Logger logger *zap.Logger
@ -123,7 +124,8 @@ func inAndOutRelayPeers(relayPeers int) (int, int) {
// Also returns the healthyPeerCount // Also returns the healthyPeerCount
func (pm *PeerManager) checkAndUpdateTopicHealth(topic *NodeTopicDetails) int { func (pm *PeerManager) checkAndUpdateTopicHealth(topic *NodeTopicDetails) int {
healthyPeerCount := 0 healthyPeerCount := 0
for _, p := range topic.topic.ListPeers() {
for _, p := range pm.relay.PubSub().MeshPeers(topic.topic.String()) {
if pm.host.Network().Connectedness(p) == network.Connected { if pm.host.Network().Connectedness(p) == network.Connected {
pThreshold, err := pm.host.Peerstore().(wps.WakuPeerstore).Score(p) pThreshold, err := pm.host.Peerstore().(wps.WakuPeerstore).Score(p)
if err == nil { if err == nil {
@ -143,8 +145,8 @@ func (pm *PeerManager) checkAndUpdateTopicHealth(topic *NodeTopicDetails) int {
} }
} }
} }
//Update topic's health //Update topic's health
//TODO: This should be done based on number of full-mesh peers.
oldHealth := topic.healthStatus oldHealth := topic.healthStatus
if healthyPeerCount < 1 { //Ideally this check should be done with minPeersForRelay, but leaving it as is for now. if healthyPeerCount < 1 { //Ideally this check should be done with minPeersForRelay, but leaving it as is for now.
topic.healthStatus = UnHealthy topic.healthStatus = UnHealthy
@ -176,7 +178,7 @@ func (pm *PeerManager) TopicHealth(pubsubTopic string) (TopicHealth, error) {
} }
// NewPeerManager creates a new peerManager instance. // NewPeerManager creates a new peerManager instance.
func NewPeerManager(maxConnections int, maxPeers int, metadata *metadata.WakuMetadata, relayEnabled bool, logger *zap.Logger) *PeerManager { func NewPeerManager(maxConnections int, maxPeers int, metadata *metadata.WakuMetadata, relay *relay.WakuRelay, relayEnabled bool, logger *zap.Logger) *PeerManager {
var inPeersTarget, outPeersTarget, maxRelayPeers int var inPeersTarget, outPeersTarget, maxRelayPeers int
if relayEnabled { if relayEnabled {
maxRelayPeers, _ := relayAndServicePeers(maxConnections) maxRelayPeers, _ := relayAndServicePeers(maxConnections)
@ -194,6 +196,7 @@ func NewPeerManager(maxConnections int, maxPeers int, metadata *metadata.WakuMet
pm := &PeerManager{ pm := &PeerManager{
logger: logger.Named("peer-manager"), logger: logger.Named("peer-manager"),
metadata: metadata, metadata: metadata,
relay: relay,
maxRelayPeers: maxRelayPeers, maxRelayPeers: maxRelayPeers,
InPeersTarget: inPeersTarget, InPeersTarget: inPeersTarget,
OutPeersTarget: outPeersTarget, OutPeersTarget: outPeersTarget,

View File

@ -165,7 +165,7 @@ func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData {
s.Require().NoError(err) s.Require().NoError(err)
b := relay.NewBroadcaster(10) b := relay.NewBroadcaster(10)
s.Require().NoError(b.Start(context.Background())) s.Require().NoError(b.Start(context.Background()))
pm := peermanager.NewPeerManager(5, 5, nil, true, s.Log) pm := peermanager.NewPeerManager(5, 5, nil, nil, true, s.Log)
filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), onlinechecker.NewDefaultOnlineChecker(true), prometheus.DefaultRegisterer, s.Log) filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), onlinechecker.NewDefaultOnlineChecker(true), prometheus.DefaultRegisterer, s.Log)
filterPush.SetHost(host) filterPush.SetHost(host)
pm.SetHost(host) pm.SetHost(host)

4
vendor/modules.txt vendored
View File

@ -568,7 +568,7 @@ github.com/libp2p/go-libp2p/p2p/transport/webtransport
# github.com/libp2p/go-libp2p-asn-util v0.4.1 # github.com/libp2p/go-libp2p-asn-util v0.4.1
## explicit; go 1.19 ## explicit; go 1.19
github.com/libp2p/go-libp2p-asn-util github.com/libp2p/go-libp2p-asn-util
# github.com/libp2p/go-libp2p-pubsub v0.11.0 => github.com/waku-org/go-libp2p-pubsub v0.0.0-20240701005450-b4513d154445 # github.com/libp2p/go-libp2p-pubsub v0.11.0 => github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5
## explicit; go 1.21 ## explicit; go 1.21
github.com/libp2p/go-libp2p-pubsub github.com/libp2p/go-libp2p-pubsub
github.com/libp2p/go-libp2p-pubsub/pb github.com/libp2p/go-libp2p-pubsub/pb
@ -1018,7 +1018,7 @@ github.com/waku-org/go-discover/discover/v5wire
github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous
github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/db
github.com/waku-org/go-libp2p-rendezvous/pb github.com/waku-org/go-libp2p-rendezvous/pb
# github.com/waku-org/go-waku v0.8.1-0.20240701141800-5b5ea977afe0 # github.com/waku-org/go-waku v0.8.1-0.20240705124012-8df5a0337a1e
## explicit; go 1.21 ## explicit; go 1.21
github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/tests github.com/waku-org/go-waku/tests

View File

@ -44,7 +44,6 @@ type Config struct {
Port int `toml:",omitempty"` Port int `toml:",omitempty"`
EnablePeerExchangeServer bool `toml:",omitempty"` // PeerExchange server makes sense only when discv5 is running locally as it will have a cache of peers that it can respond to in case a PeerExchange request comes from the PeerExchangeClient EnablePeerExchangeServer bool `toml:",omitempty"` // PeerExchange server makes sense only when discv5 is running locally as it will have a cache of peers that it can respond to in case a PeerExchange request comes from the PeerExchangeClient
EnablePeerExchangeClient bool `toml:",omitempty"` EnablePeerExchangeClient bool `toml:",omitempty"`
KeepAliveInterval int `toml:",omitempty"`
MinPeersForRelay int `toml:",omitempty"` // Indicates the minimum number of peers required for using Relay Protocol MinPeersForRelay int `toml:",omitempty"` // Indicates the minimum number of peers required for using Relay Protocol
MinPeersForFilter int `toml:",omitempty"` // Indicates the minimum number of peers required for using Filter Protocol MinPeersForFilter int `toml:",omitempty"` // Indicates the minimum number of peers required for using Filter Protocol
LightClient bool `toml:",omitempty"` // Indicates if the node is a light client LightClient bool `toml:",omitempty"` // Indicates if the node is a light client
@ -87,7 +86,6 @@ var DefaultConfig = Config{
MaxMessageSize: common.DefaultMaxMessageSize, MaxMessageSize: common.DefaultMaxMessageSize,
Host: "0.0.0.0", Host: "0.0.0.0",
Port: 0, Port: 0,
KeepAliveInterval: 10, // second
DiscoveryLimit: 20, DiscoveryLimit: 20,
MinPeersForRelay: 1, // TODO: determine correct value with Vac team MinPeersForRelay: 1, // TODO: determine correct value with Vac team
MinPeersForFilter: 2, // TODO: determine correct value with Vac team and via testing MinPeersForFilter: 2, // TODO: determine correct value with Vac team and via testing
@ -107,10 +105,6 @@ func setDefaults(cfg *Config) *Config {
cfg.Host = DefaultConfig.Host cfg.Host = DefaultConfig.Host
} }
if cfg.KeepAliveInterval == 0 {
cfg.KeepAliveInterval = DefaultConfig.KeepAliveInterval
}
if cfg.DiscoveryLimit == 0 { if cfg.DiscoveryLimit == 0 {
cfg.DiscoveryLimit = DefaultConfig.DiscoveryLimit cfg.DiscoveryLimit = DefaultConfig.DiscoveryLimit
} }

View File

@ -89,6 +89,8 @@ const hashQueryInterval = 3 * time.Second
const messageSentPeriod = 3 // in seconds const messageSentPeriod = 3 // in seconds
const messageExpiredPerid = 10 // in seconds const messageExpiredPerid = 10 // in seconds
const maxRelayPeers = 300 const maxRelayPeers = 300
const randomPeersKeepAliveInterval = 5 * time.Second
const allPeersKeepAliveInterval = 5 * time.Minute
type SentEnvelope struct { type SentEnvelope struct {
Envelope *protocol.Envelope Envelope *protocol.Envelope
@ -154,6 +156,7 @@ type Waku struct {
storePeerID peer.ID storePeerID peer.ID
topicHealthStatusChan chan peermanager.TopicHealthStatus topicHealthStatusChan chan peermanager.TopicHealthStatus
connectionNotifChan chan node.PeerConnection
connStatusSubscriptions map[string]*types.ConnStatusSubscription connStatusSubscriptions map[string]*types.ConnStatusSubscription
connStatusMu sync.Mutex connStatusMu sync.Mutex
onlineChecker *onlinechecker.DefaultOnlineChecker onlineChecker *onlinechecker.DefaultOnlineChecker
@ -167,8 +170,8 @@ type Waku struct {
// bootnodes successfully // bootnodes successfully
seededBootnodesForDiscV5 bool seededBootnodesForDiscV5 bool
// connectionChanged is channel that notifies when connectivity has changed // goingOnline is channel that notifies when connectivity has changed from offline to online
connectionChanged chan struct{} goingOnline chan struct{}
// discV5BootstrapNodes is the ENR to be used to fetch bootstrap nodes for discovery // discV5BootstrapNodes is the ENR to be used to fetch bootstrap nodes for discovery
discV5BootstrapNodes []string discV5BootstrapNodes []string
@ -224,6 +227,7 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge
msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit), msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit),
sendQueue: make(chan *protocol.Envelope, 1000), sendQueue: make(chan *protocol.Envelope, 1000),
topicHealthStatusChan: make(chan peermanager.TopicHealthStatus, 100), topicHealthStatusChan: make(chan peermanager.TopicHealthStatus, 100),
connectionNotifChan: make(chan node.PeerConnection),
connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription), connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription),
topicInterest: make(map[string]TopicInterest), topicInterest: make(map[string]TopicInterest),
ctx: ctx, ctx: ctx,
@ -259,10 +263,6 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge
return nil, fmt.Errorf("failed to setup the network interface: %v", err) return nil, fmt.Errorf("failed to setup the network interface: %v", err)
} }
if cfg.KeepAliveInterval == 0 {
cfg.KeepAliveInterval = DefaultConfig.KeepAliveInterval
}
libp2pOpts := node.DefaultLibP2POptions libp2pOpts := node.DefaultLibP2POptions
libp2pOpts = append(libp2pOpts, libp2p.BandwidthReporter(waku.bandwidthCounter)) libp2pOpts = append(libp2pOpts, libp2p.BandwidthReporter(waku.bandwidthCounter))
libp2pOpts = append(libp2pOpts, libp2p.NATPortMap()) libp2pOpts = append(libp2pOpts, libp2p.NATPortMap())
@ -271,8 +271,9 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge
node.WithLibP2POptions(libp2pOpts...), node.WithLibP2POptions(libp2pOpts...),
node.WithPrivateKey(nodeKey), node.WithPrivateKey(nodeKey),
node.WithHostAddress(hostAddr), node.WithHostAddress(hostAddr),
node.WithConnectionNotification(waku.connectionNotifChan),
node.WithTopicHealthStatusChannel(waku.topicHealthStatusChan), node.WithTopicHealthStatusChannel(waku.topicHealthStatusChan),
node.WithKeepAlive(time.Duration(cfg.KeepAliveInterval) * time.Second), node.WithKeepAlive(randomPeersKeepAliveInterval, allPeersKeepAliveInterval),
node.WithLogger(logger), node.WithLogger(logger),
node.WithLogLevel(logger.Level()), node.WithLogLevel(logger.Level()),
node.WithClusterID(cfg.ClusterID), node.WithClusterID(cfg.ClusterID),
@ -1293,48 +1294,6 @@ func (w *Waku) Query(ctx context.Context, peerID peer.ID, query legacy_store.Que
return result.Cursor(), len(result.Messages), nil return result.Cursor(), len(result.Messages), nil
} }
func (w *Waku) lightClientConnectionStatus() {
peers := w.node.Host().Network().Peers()
w.logger.Debug("peer stats",
zap.Int("peersCount", len(peers)))
subs := w.node.FilterLightnode().Subscriptions()
w.logger.Debug("filter subs count", zap.Int("count", len(subs)))
isOnline := false
if len(peers) > 0 {
isOnline = true
}
//TODOL needs fixing, right now invoking everytime.
//Trigger FilterManager to take care of any pending filter subscriptions
//TODO: Pass pubsubTopic based on topicHealth notif received.
go w.filterManager.onConnectionStatusChange(w.cfg.DefaultShardPubsubTopic, isOnline)
w.connStatusMu.Lock()
connStatus := types.ConnStatus{
IsOnline: isOnline,
Peers: FormatPeerStats(w.node),
}
for k, subs := range w.connStatusSubscriptions {
if !subs.Send(connStatus) {
delete(w.connStatusSubscriptions, k)
}
}
w.connStatusMu.Unlock()
if w.onPeerStats != nil {
w.onPeerStats(connStatus)
}
//TODO:Analyze if we need to discover and connect to peers with peerExchange loop enabled.
if !w.onlineChecker.IsOnline() && isOnline {
if err := w.discoverAndConnectPeers(); err != nil {
w.logger.Error("failed to add wakuv2 peers", zap.Error(err))
}
}
w.onlineChecker.SetOnline(isOnline)
}
// Start implements node.Service, starting the background data propagation thread // Start implements node.Service, starting the background data propagation thread
// of the Waku protocol. // of the Waku protocol.
func (w *Waku) Start() error { func (w *Waku) Start() error {
@ -1347,7 +1306,7 @@ func (w *Waku) Start() error {
return fmt.Errorf("failed to create a go-waku node: %v", err) return fmt.Errorf("failed to create a go-waku node: %v", err)
} }
w.connectionChanged = make(chan struct{}) w.goingOnline = make(chan struct{})
if err = w.node.Start(w.ctx); err != nil { if err = w.node.Start(w.ctx); err != nil {
return fmt.Errorf("failed to start go-waku node: %v", err) return fmt.Errorf("failed to start go-waku node: %v", err)
@ -1377,46 +1336,40 @@ func (w *Waku) Start() error {
go func() { go func() {
defer w.wg.Done() defer w.wg.Done()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
peerCountTimer := time.NewTimer(0) // fire immediately
defer peerCountTimer.Stop()
peerCount := 0
for { for {
select { select {
case <-w.ctx.Done(): case <-w.ctx.Done():
return return
case <-ticker.C:
//TODO: Need to fix. case <-w.topicHealthStatusChan:
// Temporary changes for lightNodes to have health check based on connected peers. // TODO: https://github.com/status-im/status-go/issues/4628
case <-w.connectionNotifChan:
isOnline := len(w.node.Host().Network().Peers()) > 0
if w.cfg.LightClient {
// TODO: Temporary changes for lightNodes to have health check based on connected peers.
//This needs to be enhanced to be based on healthy Filter and lightPush peers available for each shard. //This needs to be enhanced to be based on healthy Filter and lightPush peers available for each shard.
//This would get fixed as part of https://github.com/waku-org/go-waku/issues/1114 //This would get fixed as part of https://github.com/waku-org/go-waku/issues/1114
if w.cfg.LightClient {
w.lightClientConnectionStatus() subs := w.node.FilterLightnode().Subscriptions()
w.logger.Debug("filter subs count", zap.Int("count", len(subs)))
//TODO: needs fixing, right now invoking everytime.
//Trigger FilterManager to take care of any pending filter subscriptions
//TODO: Pass pubsubTopic based on topicHealth notif received.
go w.filterManager.onConnectionStatusChange(w.cfg.DefaultShardPubsubTopic, isOnline)
} }
case <-peerCountTimer.C:
peerCountTimer.Reset(3 * time.Second)
newPeerCount := len(w.node.Host().Network().Peers())
if newPeerCount != peerCount && w.onPeerStats != nil {
peerCount = newPeerCount
// TODO: `IsOnline` is not implemented correctly here, however
// this is not a problem because Desktop ignores that value.
// This should be fixed to as part of issue
// https://github.com/status-im/status-go/issues/4628
w.onPeerStats(types.ConnStatus{
IsOnline: true,
Peers: FormatPeerStats(w.node),
})
}
case c := <-w.topicHealthStatusChan:
w.connStatusMu.Lock() w.connStatusMu.Lock()
// TODO: https://github.com/status-im/status-go/issues/4628 latestConnStatus := types.ConnStatus{
// This code is not using the topic health status correctly. IsOnline: isOnline,
// It assumes we are using a single pubsub topic for now Peers: FormatPeerStats(w.node),
latestConnStatus := formatConnStatus(w.node, c) }
w.logger.Debug("peer stats", w.logger.Debug("peer stats",
zap.Int("peersCount", len(latestConnStatus.Peers)), zap.Int("peersCount", len(latestConnStatus.Peers)),
zap.Any("stats", latestConnStatus)) zap.Any("stats", latestConnStatus))
@ -1425,11 +1378,20 @@ func (w *Waku) Start() error {
delete(w.connStatusSubscriptions, k) delete(w.connStatusSubscriptions, k)
} }
} }
w.connStatusMu.Unlock() w.connStatusMu.Unlock()
if w.onPeerStats != nil { if w.onPeerStats != nil {
w.onPeerStats(latestConnStatus) w.onPeerStats(latestConnStatus)
} }
//TODO: analyze if we need to discover and connect to peers with peerExchange loop enabled.
if !w.onlineChecker.IsOnline() && isOnline {
if err := w.discoverAndConnectPeers(); err != nil {
w.logger.Error("failed to add wakuv2 peers", zap.Error(err))
}
}
w.ConnectionChanged(connection.State{ w.ConnectionChanged(connection.State{
Offline: !latestConnStatus.IsOnline, Offline: !latestConnStatus.IsOnline,
}) })
@ -1520,7 +1482,7 @@ func (w *Waku) Stop() error {
} }
} }
close(w.connectionChanged) close(w.goingOnline)
w.wg.Wait() w.wg.Wait()
w.ctx = nil w.ctx = nil
@ -1724,7 +1686,7 @@ func (w *Waku) RelayPeersByTopic(topic string) (*types.PeerList, error) {
} }
return &types.PeerList{ return &types.PeerList{
FullMeshPeers: w.node.Relay().PubSub().Router().(*pubsub.GossipSubRouter).MeshPeers(topic), FullMeshPeers: w.node.Relay().PubSub().MeshPeers(topic),
AllPeers: w.node.Relay().PubSub().ListPeers(topic), AllPeers: w.node.Relay().PubSub().ListPeers(topic),
}, nil }, nil
} }
@ -1818,7 +1780,7 @@ func (w *Waku) StopDiscV5() error {
func (w *Waku) ConnectionChanged(state connection.State) { func (w *Waku) ConnectionChanged(state connection.State) {
if !state.Offline && !w.onlineChecker.IsOnline() { if !state.Offline && !w.onlineChecker.IsOnline() {
select { select {
case w.connectionChanged <- struct{}{}: case w.goingOnline <- struct{}{}:
default: default:
w.logger.Warn("could not write on connection changed channel") w.logger.Warn("could not write on connection changed channel")
} }
@ -1880,7 +1842,7 @@ func (w *Waku) seedBootnodesForDiscV5() {
} }
// If we go online, trigger immediately // If we go online, trigger immediately
case <-w.connectionChanged: case <-w.goingOnline:
if w.cfg.EnableDiscV5 { if w.cfg.EnableDiscV5 {
if canQuery() { if canQuery() {
err := w.restartDiscV5() err := w.restartDiscV5()
@ -2073,18 +2035,6 @@ func FormatPeerStats(wakuNode *node.WakuNode) map[string]types.WakuV2Peer {
return p return p
} }
func formatConnStatus(wakuNode *node.WakuNode, c peermanager.TopicHealthStatus) types.ConnStatus {
isOnline := true
if c.Health == peermanager.UnHealthy {
isOnline = false
}
return types.ConnStatus{
IsOnline: isOnline,
Peers: FormatPeerStats(wakuNode),
}
}
func (w *Waku) StoreNode() legacy_store.Store { func (w *Waku) StoreNode() legacy_store.Store {
return w.node.LegacyStore() return w.node.LegacyStore()
} }

View File

@ -357,7 +357,6 @@ func TestWakuV2Filter(t *testing.T) {
setDefaultConfig(config, true) setDefaultConfig(config, true)
config.EnablePeerExchangeClient = false config.EnablePeerExchangeClient = false
config.Port = 0 config.Port = 0
config.KeepAliveInterval = 0
config.MinPeersForFilter = 2 config.MinPeersForFilter = 2
config.DiscV5BootstrapNodes = []string{enrTreeAddress} config.DiscV5BootstrapNodes = []string{enrTreeAddress}
@ -455,7 +454,6 @@ func TestWakuV2Store(t *testing.T) {
EnableStore: false, EnableStore: false,
StoreCapacity: 100, StoreCapacity: 100,
StoreSeconds: 3600, StoreSeconds: 3600,
KeepAliveInterval: 10,
} }
w1PeersCh := make(chan []string, 100) // buffered not to block on the send side w1PeersCh := make(chan []string, 100) // buffered not to block on the send side
@ -482,7 +480,6 @@ func TestWakuV2Store(t *testing.T) {
EnableStore: true, EnableStore: true,
StoreCapacity: 100, StoreCapacity: 100,
StoreSeconds: 3600, StoreSeconds: 3600,
KeepAliveInterval: 10,
} }
// Start the second Waku node // Start the second Waku node
@ -513,6 +510,8 @@ func TestWakuV2Store(t *testing.T) {
_, err = w2.Subscribe(filter) _, err = w2.Subscribe(filter)
require.NoError(t, err) require.NoError(t, err)
time.Sleep(2 * time.Second)
// Send a message from the first node // Send a message from the first node
msgTimestamp := w1.CurrentTime().UnixNano() msgTimestamp := w1.CurrentTime().UnixNano()
contentTopic := maps.Keys(filter.ContentTopics)[0] contentTopic := maps.Keys(filter.ContentTopics)[0]
@ -654,7 +653,7 @@ func TestOnlineChecker(t *testing.T) {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
<-w.connectionChanged <-w.goingOnline
require.True(t, true) require.True(t, true)
}() }()