chore: WakuV2 use config instead of settings

This commit is contained in:
frank 2024-02-27 17:24:34 +08:00
parent 92685e5a7b
commit 3de945feaf
7 changed files with 125 additions and 147 deletions

View File

@ -110,7 +110,6 @@ func SetFleet(fleet string, nodeConfig *params.NodeConfig) error {
DiscoveryLimit: 20,
Host: "0.0.0.0",
AutoUpdate: true,
PeerExchange: true,
}
clusterConfig, err := params.LoadClusterConfigFromFleet(fleet)

View File

@ -220,7 +220,6 @@ func randomNodeConfig() *params.NodeConfig {
MaxMessageSize: uint32(randomInt(math.MaxInt64)),
EnableConfirmations: randomBool(),
CustomNodes: randomCustomNodes(),
PeerExchange: randomBool(),
EnableDiscV5: randomBool(),
UDPPort: randomInt(math.MaxInt64),
AutoUpdate: randomBool(),

View File

@ -318,7 +318,7 @@ func (b *StatusNode) wakuV2Service(nodeConfig *params.NodeConfig, telemetryServe
KeepAliveInterval: nodeConfig.WakuV2Config.KeepAliveInterval,
Rendezvous: nodeConfig.Rendezvous,
WakuNodes: nodeConfig.ClusterConfig.WakuNodes,
PeerExchange: nodeConfig.WakuV2Config.PeerExchange,
EnablePeerExchangeServer: nodeConfig.WakuV2Config.PeerExchange,
EnableStore: nodeConfig.WakuV2Config.EnableStore,
StoreCapacity: nodeConfig.WakuV2Config.StoreCapacity,
StoreSeconds: nodeConfig.WakuV2Config.StoreSeconds,
@ -334,6 +334,15 @@ func (b *StatusNode) wakuV2Service(nodeConfig *params.NodeConfig, telemetryServe
ClusterID: nodeConfig.ClusterConfig.ClusterID,
}
// apply peer exchange settings
if cfg.LightClient {
cfg.EnablePeerExchangeServer = false
cfg.EnablePeerExchangeClient = true
} else {
cfg.EnablePeerExchangeServer = true
cfg.EnablePeerExchangeClient = false
}
if nodeConfig.WakuV2Config.MaxMessageSize > 0 {
cfg.MaxMessageSize = nodeConfig.WakuV2Config.MaxMessageSize
}

View File

@ -189,6 +189,7 @@ type WakuV2Config struct {
CustomNodes map[string]string
// PeerExchange determines whether WakuV2 Peer Exchange is enabled or not
// Deprecated: will be calculated based on LightClient
PeerExchange bool
// Nameserver determines which nameserver will be used for dns discovery

View File

@ -19,6 +19,8 @@
package wakuv2
import (
"errors"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/status-im/status-go/protocol/common/shard"
@ -30,30 +32,43 @@ import (
// Config represents the configuration state of a waku node.
type Config struct {
MaxMessageSize uint32 `toml:",omitempty"`
MaxMessageSize uint32 `toml:",omitempty"` // Maximal message length allowed by the waku node
Host string `toml:",omitempty"`
Port int `toml:",omitempty"`
PeerExchange bool `toml:",omitempty"`
EnablePeerExchangeServer bool `toml:",omitempty"` // PeerExchange server makes sense only when discv5 is running locally as it will have a cache of peers that it can respond to in case a PeerExchange request comes from the PeerExchangeClient
EnablePeerExchangeClient bool `toml:",omitempty"`
KeepAliveInterval int `toml:",omitempty"`
MinPeersForRelay int `toml:",omitempty"`
MinPeersForFilter int `toml:",omitempty"`
LightClient bool `toml:",omitempty"`
MinPeersForRelay int `toml:",omitempty"` // Indicates the minimum number of peers required for using Relay Protocol
MinPeersForFilter int `toml:",omitempty"` // Indicates the minimum number of peers required for using Filter Protocol
LightClient bool `toml:",omitempty"` // Indicates if the node is a light client
WakuNodes []string `toml:",omitempty"`
Rendezvous bool `toml:",omitempty"`
DiscV5BootstrapNodes []string `toml:",omitempty"`
Nameserver string `toml:",omitempty"`
Resolver ethdisc.Resolver `toml:",omitempty"`
EnableDiscV5 bool `toml:",omitempty"`
DiscoveryLimit int `toml:",omitempty"`
Nameserver string `toml:",omitempty"` // Optional nameserver to use for dns discovery
Resolver ethdisc.Resolver `toml:",omitempty"` // Optional resolver to use for dns discovery
EnableDiscV5 bool `toml:",omitempty"` // Indicates whether discv5 is enabled or not
DiscoveryLimit int `toml:",omitempty"` // Indicates the number of nodes to discover with peer exchange client
AutoUpdate bool `toml:",omitempty"`
UDPPort int `toml:",omitempty"`
EnableStore bool `toml:",omitempty"`
StoreCapacity int `toml:",omitempty"`
StoreSeconds int `toml:",omitempty"`
TelemetryServerURL string `toml:",omitempty"`
DefaultShardPubsubTopic string `toml:",omitempty"`
DefaultShardPubsubTopic string `toml:",omitempty"` // Pubsub topic to be used by default for messages that do not have a topic assigned (depending whether sharding is used or not)
UseShardAsDefaultTopic bool `toml:",omitempty"`
ClusterID uint16 `toml:",omitempty"`
EnableConfirmations bool `toml:",omitempty"` // Enable sending message confirmations
SkipPublishToTopic bool `toml:",omitempty"` // Used in testing
}
func (c *Config) Validate() error {
if c.LightClient && (c.EnablePeerExchangeServer || c.EnableDiscV5 || !c.EnablePeerExchangeClient) {
return errors.New("bad configuration for a light client: either peer exchange server or discv5 must be disabled, and the peer exchange client must be enabled")
}
if !c.LightClient && (!c.EnablePeerExchangeServer || !c.EnableDiscV5 || c.EnablePeerExchangeClient) {
return errors.New("bad configuration for a full node: peer exchange server and discv5 must be enabled, and the peer exchange client must be disabled")
}
return nil
}
var DefaultConfig = Config{

View File

@ -62,8 +62,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
ethdisc "github.com/ethereum/go-ethereum/p2p/dnsdisc"
"github.com/status-im/status-go/connection"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/timesource"
@ -81,22 +79,6 @@ const requestTimeout = 30 * time.Second
const bootnodesQueryBackoffMs = 200
const bootnodesMaxRetries = 7
type settings struct {
LightClient bool // Indicates if the node is a light client
MinPeersForRelay int // Indicates the minimum number of peers required for using Relay Protocol
MinPeersForFilter int // Indicates the minimum number of peers required for using Filter Protocol
MaxMsgSize uint32 // Maximal message length allowed by the waku node
EnableConfirmations bool // Enable sending message confirmations
PeerExchange bool // Enable peer exchange
DiscoveryLimit int // Indicates the number of nodes to discover
Nameserver string // Optional nameserver to use for dns discovery
Resolver ethdisc.Resolver // Optional resolver to use for dns discovery
EnableDiscV5 bool // Indicates whether discv5 is enabled or not
DefaultPubsubTopic string // Pubsub topic to be used by default for messages that do not have a topic assigned (depending whether sharding is used or not)
Options []node.WakuNodeOption
SkipPublishToTopic bool // used in testing
}
type ITelemetryClient interface {
PushReceivedEnvelope(*protocol.Envelope)
}
@ -134,8 +116,7 @@ type Waku struct {
wg sync.WaitGroup
cfg *Config
settings settings // Holds configuration settings that can be dynamically changed
settingsMu sync.RWMutex // Mutex to sync the settings access
options []node.WakuNodeOption
envelopeFeed event.Feed
@ -189,6 +170,9 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
}
cfg = setDefaults(cfg)
if err = cfg.Validate(); err != nil {
logger.Warn("bad wakuv2 configuration", zap.Error(err))
}
logger.Info("starting wakuv2 with config", zap.Any("config", cfg))
@ -218,33 +202,8 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
onHistoricMessagesRequestFailed: onHistoricMessagesRequestFailed,
onPeerStats: onPeerStats,
}
enablePeerExchangeServer := false
enablePeerExchangeClient := false //TODO: Not sure how to set this as config that can be used in peerExchangeLoop
enableDiscv5 := false
if cfg.LightClient {
enablePeerExchangeServer = false
enablePeerExchangeClient = true
enableDiscv5 = false
} else {
enablePeerExchangeServer = true
enablePeerExchangeClient = false
enableDiscv5 = true
}
waku.settings = settings{
MaxMsgSize: cfg.MaxMessageSize,
LightClient: cfg.LightClient,
MinPeersForRelay: cfg.MinPeersForRelay,
MinPeersForFilter: cfg.MinPeersForFilter,
PeerExchange: enablePeerExchangeServer,
DiscoveryLimit: cfg.DiscoveryLimit,
Nameserver: cfg.Nameserver,
Resolver: cfg.Resolver,
EnableDiscV5: enableDiscv5,
}
waku.settings.DefaultPubsubTopic = cfg.DefaultShardPubsubTopic
waku.filters = common.NewFilters(waku.settings.DefaultPubsubTopic, waku.logger)
waku.filters = common.NewFilters(waku.cfg.DefaultShardPubsubTopic, waku.logger)
waku.bandwidthCounter = metrics.NewBandwidthCounter()
var privateKey *ecdsa.PrivateKey
@ -285,13 +244,12 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
node.WithMaxMsgSize(1024 * 1024),
}
if enableDiscv5 {
if cfg.EnableDiscV5 {
bootnodes, err := waku.getDiscV5BootstrapNodes(waku.ctx, cfg.DiscV5BootstrapNodes)
if err != nil {
logger.Error("failed to get bootstrap nodes", zap.Error(err))
return nil, err
}
opts = append(opts, node.WithDiscoveryV5(uint(cfg.UDPPort), bootnodes, cfg.AutoUpdate))
}
@ -299,14 +257,14 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
opts = append(opts, node.WithWakuFilterLightNode())
} else {
relayOpts := []pubsub.Option{
pubsub.WithMaxMessageSize(int(waku.settings.MaxMsgSize)),
pubsub.WithMaxMessageSize(int(waku.cfg.MaxMessageSize)),
}
if waku.logger.Level() == zap.DebugLevel {
relayOpts = append(relayOpts, pubsub.WithEventTracer(waku))
}
opts = append(opts, node.WithWakuRelayAndMinPeers(waku.settings.MinPeersForRelay, relayOpts...))
opts = append(opts, node.WithWakuRelayAndMinPeers(waku.cfg.MinPeersForRelay, relayOpts...))
}
if cfg.EnableStore {
@ -333,7 +291,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
}
}
waku.settings.Options = opts
waku.options = opts
waku.logger.Info("setup the go-waku node successfully")
return waku, nil
@ -401,10 +359,8 @@ func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnA
discNodes, ok := w.dnsAddressCache[enrtreeAddress]
if !ok {
w.settingsMu.RLock()
nameserver := w.settings.Nameserver
resolver := w.settings.Resolver
w.settingsMu.RUnlock()
nameserver := w.cfg.Nameserver
resolver := w.cfg.Resolver
var opts []dnsdisc.DNSDiscoveryOption
if nameserver != "" {
@ -434,19 +390,19 @@ func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnA
wg.Wait()
}
func (w *Waku) addWakuV2Peers(ctx context.Context, cfg *Config) error {
func (w *Waku) discoverAndConnectPeers() error {
fnApply := func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) {
defer wg.Done()
if len(d.PeerInfo.Addrs) != 0 {
go w.identifyAndConnect(ctx, w.settings.LightClient, d.PeerInfo)
go w.identifyAndConnect(w.ctx, w.cfg.LightClient, d.PeerInfo)
}
}
for _, addrString := range cfg.WakuNodes {
for _, addrString := range w.cfg.WakuNodes {
addrString := addrString
if strings.HasPrefix(addrString, "enrtree://") {
// Use DNS Discovery
go w.dnsDiscover(ctx, addrString, fnApply)
go w.dnsDiscover(w.ctx, addrString, fnApply)
} else {
// It is a normal multiaddress
addr, err := multiaddr.NewMultiaddr(addrString)
@ -461,7 +417,7 @@ func (w *Waku) addWakuV2Peers(ctx context.Context, cfg *Config) error {
continue
}
go w.identifyAndConnect(ctx, cfg.LightClient, *peerInfo)
go w.identifyAndConnect(w.ctx, w.cfg.LightClient, *peerInfo)
}
}
@ -550,7 +506,7 @@ func (w *Waku) GetStats() types.StatsSummary {
func (w *Waku) runPeerExchangeLoop() {
defer w.wg.Done()
if !enablePeerExchangeClient {
if !w.cfg.EnablePeerExchangeClient {
// Currently peer exchange client is only used for light nodes
return
}
@ -578,7 +534,7 @@ func (w *Waku) runPeerExchangeLoop() {
}
}
peersToDiscover := w.settings.DiscoveryLimit - peersWithRelay
peersToDiscover := w.cfg.DiscoveryLimit - peersWithRelay
if peersToDiscover <= 0 {
continue
}
@ -606,7 +562,7 @@ func (w *Waku) runPeerExchangeLoop() {
func (w *Waku) getPubsubTopic(topic string) string {
if topic == "" || !w.cfg.UseShardAsDefaultTopic {
topic = w.settings.DefaultPubsubTopic
topic = w.cfg.DefaultShardPubsubTopic
}
return topic
}
@ -624,7 +580,7 @@ func (w *Waku) unsubscribeFromPubsubTopicWithWakuRelay(topic string) error {
}
func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.PublicKey) error {
if w.settings.LightClient {
if w.cfg.LightClient {
return errors.New("only available for full nodes")
}
@ -673,16 +629,7 @@ func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.P
// MaxMessageSize returns the maximum accepted message size.
func (w *Waku) MaxMessageSize() uint32 {
w.settingsMu.RLock()
defer w.settingsMu.RUnlock()
return w.settings.MaxMsgSize
}
// ConfirmationsEnabled returns true if message confirmations are enabled.
func (w *Waku) ConfirmationsEnabled() bool {
w.settingsMu.RLock()
defer w.settingsMu.RUnlock()
return w.settings.EnableConfirmations
return w.cfg.MaxMessageSize
}
// CurrentTime returns current time.
@ -962,7 +909,7 @@ func (w *Waku) Subscribe(f *common.Filter) (string, error) {
return id, err
}
if w.settings.LightClient {
if w.cfg.LightClient {
w.filterManager.eventChan <- FilterEvent{eventType: FilterEventAdded, filterID: id}
}
@ -976,7 +923,7 @@ func (w *Waku) Unsubscribe(ctx context.Context, id string) error {
return fmt.Errorf("failed to unsubscribe: invalid ID '%s'", id)
}
if w.settings.LightClient {
if w.cfg.LightClient {
w.filterManager.eventChan <- FilterEvent{eventType: FilterEventRemoved, filterID: id}
}
@ -1010,7 +957,7 @@ func (w *Waku) UnsubscribeMany(ids []string) error {
}
func (w *Waku) SkipPublishToTopic(value bool) {
w.settings.SkipPublishToTopic = value
w.cfg.SkipPublishToTopic = value
}
func (w *Waku) broadcast() {
@ -1019,12 +966,12 @@ func (w *Waku) broadcast() {
case envelope := <-w.sendQueue:
logger := w.logger.With(zap.String("envelopeHash", hexutil.Encode(envelope.Hash())), zap.String("pubsubTopic", envelope.PubsubTopic()), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().GetTimestamp()))
var fn publishFn
if w.settings.SkipPublishToTopic {
if w.cfg.SkipPublishToTopic {
// For now only used in testing to simulate going offline
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
return errors.New("test send failure")
}
} else if w.settings.LightClient {
} else if w.cfg.LightClient {
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
logger.Info("publishing message via lightpush")
_, err := w.node.Lightpush().Publish(w.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic()))
@ -1181,7 +1128,7 @@ func (w *Waku) Start() error {
}
var err error
if w.node, err = node.New(w.settings.Options...); err != nil {
if w.node, err = node.New(w.options...); err != nil {
return fmt.Errorf("failed to create a go-waku node: %v", err)
}
@ -1201,18 +1148,18 @@ func (w *Waku) Start() error {
w.identifyService = idService
if err = w.addWakuV2Peers(w.ctx, w.cfg); err != nil {
if err = w.discoverAndConnectPeers(); err != nil {
return fmt.Errorf("failed to add wakuv2 peers: %v", err)
}
if w.settings.EnableDiscV5 {
if w.cfg.EnableDiscV5 {
err := w.node.DiscV5().Start(w.ctx)
if err != nil {
return err
}
}
if w.settings.PeerExchange {
if w.cfg.EnablePeerExchangeServer {
err := w.node.PeerExchange().Start(w.ctx)
if err != nil {
return err
@ -1245,7 +1192,7 @@ func (w *Waku) Start() error {
w.onPeerStats(latestConnStatus)
}
if w.settings.EnableDiscV5 {
if w.cfg.EnableDiscV5 {
// Restarting DiscV5
if !latestConnStatus.IsOnline && isConnected {
w.logger.Info("Restarting DiscV5: offline and is connected")
@ -1269,12 +1216,12 @@ func (w *Waku) Start() error {
go w.telemetryBandwidthStats(w.cfg.TelemetryServerURL)
go w.runPeerExchangeLoop()
if w.settings.LightClient {
if w.cfg.LightClient {
// Create FilterManager that will main peer connectivity
// for installed filters
w.filterManager = newFilterManager(w.ctx, w.logger,
func(id string) *common.Filter { return w.GetFilter(id) },
w.settings,
w.cfg,
func(env *protocol.Envelope) error { return w.OnNewEnvelopes(env, common.RelayedMessageType, false) },
w.node)
@ -1302,7 +1249,7 @@ func (w *Waku) Start() error {
}
func (w *Waku) setupRelaySubscriptions() error {
if w.settings.LightClient {
if w.cfg.LightClient {
return nil
}
@ -1321,7 +1268,7 @@ func (w *Waku) setupRelaySubscriptions() error {
}
}
err := w.subscribeToPubsubTopicWithWakuRelay(w.settings.DefaultPubsubTopic, nil)
err := w.subscribeToPubsubTopicWithWakuRelay(w.cfg.DefaultShardPubsubTopic, nil)
if err != nil {
return err
}
@ -1553,7 +1500,7 @@ func (w *Waku) ListenAddresses() []string {
func (w *Waku) SubscribeToPubsubTopic(topic string, pubkey *ecdsa.PublicKey) error {
topic = w.getPubsubTopic(topic)
if !w.settings.LightClient {
if !w.cfg.LightClient {
err := w.subscribeToPubsubTopicWithWakuRelay(topic, pubkey)
if err != nil {
return err
@ -1565,7 +1512,7 @@ func (w *Waku) SubscribeToPubsubTopic(topic string, pubkey *ecdsa.PublicKey) err
func (w *Waku) UnsubscribeFromPubsubTopic(topic string) error {
topic = w.getPubsubTopic(topic)
if !w.settings.LightClient {
if !w.cfg.LightClient {
err := w.unsubscribeFromPubsubTopicWithWakuRelay(topic)
if err != nil {
return err
@ -1635,7 +1582,7 @@ func (w *Waku) ConnectionChanged(state connection.State) {
// It backs off exponentially until maxRetries, at which point it restarts from 0
// It also restarts if there's a connection change signalled from the client
func (w *Waku) seedBootnodesForDiscV5() {
if !w.settings.EnableDiscV5 || w.node.DiscV5() == nil {
if !w.cfg.EnableDiscV5 || w.node.DiscV5() == nil {
w.wg.Done()
return
}
@ -1865,7 +1812,7 @@ func FormatPeerStats(wakuNode *node.WakuNode, peers node.PeerStats) map[string]t
for _, addr := range peerInfo.Addrs {
wakuV2Peer.Addresses = append(wakuV2Peer.Addresses, addr.Encapsulate(hostInfo).String())
}
p[k.Pretty()] = wakuV2Peer
p[k.String()] = wakuV2Peer
}
return p
}

View File

@ -9,6 +9,8 @@ import (
"testing"
"time"
"go.uber.org/zap"
"github.com/cenkalti/backoff/v3"
"github.com/ethereum/go-ethereum/common/hexutil"
@ -220,11 +222,14 @@ func makeTestTree(domain string, nodes []*enode.Node, links []string) (*ethdnsdi
}
func TestPeerExchange(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
// start node which serve as PeerExchange server
config := &Config{}
config.EnableDiscV5 = true
config.PeerExchange = true
pxServerNode, err := New("", "", config, nil, nil, nil, nil, nil)
config.EnablePeerExchangeServer = true
config.EnablePeerExchangeClient = false
pxServerNode, err := New("", "", config, logger.Named("pxServerNode"), nil, nil, nil, nil)
require.NoError(t, err)
require.NoError(t, pxServerNode.Start())
@ -233,10 +238,12 @@ func TestPeerExchange(t *testing.T) {
// start node that will be discovered by PeerExchange
config = &Config{}
config.EnableDiscV5 = true
config.EnablePeerExchangeServer = false
config.EnablePeerExchangeClient = false
config.DiscV5BootstrapNodes = []string{pxServerNode.node.ENR().String()}
node, err := New("", "", config, nil, nil, nil, nil, nil)
discV5Node, err := New("", "", config, logger.Named("discV5Node"), nil, nil, nil, nil)
require.NoError(t, err)
require.NoError(t, node.Start())
require.NoError(t, discV5Node.Start())
time.Sleep(1 * time.Second)
@ -246,12 +253,13 @@ func TestPeerExchange(t *testing.T) {
resolver := mapResolver(tree.ToTXT("n"))
config = &Config{}
config.PeerExchange = true
config.EnablePeerExchangeServer = false
config.EnablePeerExchangeClient = true
config.LightClient = true
config.Resolver = resolver
config.WakuNodes = []string{url}
lightNode, err := New("", "", config, nil, nil, nil, nil, nil)
lightNode, err := New("", "", config, logger.Named("lightNode"), nil, nil, nil, nil)
require.NoError(t, err)
require.NoError(t, lightNode.Start())
@ -269,7 +277,7 @@ func TestPeerExchange(t *testing.T) {
require.NoError(t, lightNode.Stop())
require.NoError(t, pxServerNode.Stop())
require.NoError(t, node.Stop())
require.NoError(t, discV5Node.Stop())
}
func TestWakuV2Filter(t *testing.T) {