Honor FullNode config in waku
previously FullNode would only result in setting a bloom filter to full. This behavior caused issues as you effectively cannot install a filter on a FullNode, as it would advertise the new topic/bloom filter and stop receiving all the messages. This caused an issue when running push notifications servers together with mailservers, also the behavior is a bit counter-intuitive as I would expect the FullNode config to be honored no matter of what filters are installed.
This commit is contained in:
parent
2332e5f537
commit
56c0142f16
|
@ -21,6 +21,7 @@ import (
|
||||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
|
|
||||||
|
"github.com/status-im/status-go/eth-node/types"
|
||||||
"github.com/status-im/status-go/waku/common"
|
"github.com/status-im/status-go/waku/common"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -455,7 +456,7 @@ func (p *Peer) update() {
|
||||||
|
|
||||||
func (p *Peer) setOptions(peerOptions StatusOptions) error {
|
func (p *Peer) setOptions(peerOptions StatusOptions) error {
|
||||||
|
|
||||||
p.logger.Debug("settings options", zap.Binary("peerID", p.ID()), zap.Any("Options", peerOptions))
|
p.logger.Debug("settings options", zap.String("peerID", types.EncodeHex(p.ID())), zap.Any("Options", peerOptions))
|
||||||
|
|
||||||
if err := peerOptions.Validate(); err != nil {
|
if err := peerOptions.Validate(); err != nil {
|
||||||
return fmt.Errorf("p [%x]: sent invalid options: %v", p.ID(), err)
|
return fmt.Errorf("p [%x]: sent invalid options: %v", p.ID(), err)
|
||||||
|
|
66
waku/waku.go
66
waku/waku.go
|
@ -44,6 +44,7 @@ import (
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
"github.com/ethereum/go-ethereum/rpc"
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
|
|
||||||
|
"github.com/status-im/status-go/eth-node/types"
|
||||||
"github.com/status-im/status-go/waku/common"
|
"github.com/status-im/status-go/waku/common"
|
||||||
v0 "github.com/status-im/status-go/waku/v0"
|
v0 "github.com/status-im/status-go/waku/v0"
|
||||||
v1 "github.com/status-im/status-go/waku/v1"
|
v1 "github.com/status-im/status-go/waku/v1"
|
||||||
|
@ -68,6 +69,7 @@ type settings struct {
|
||||||
LightClient bool // Light client mode enabled does not forward messages
|
LightClient bool // Light client mode enabled does not forward messages
|
||||||
RestrictLightClientsConn bool // Restrict connection between two light clients
|
RestrictLightClientsConn bool // Restrict connection between two light clients
|
||||||
SyncAllowance int // Maximum time in seconds allowed to process the waku-related messages
|
SyncAllowance int // Maximum time in seconds allowed to process the waku-related messages
|
||||||
|
FullNode bool // Whether this is to be run in FullNode settings
|
||||||
}
|
}
|
||||||
|
|
||||||
// Waku represents a dark communication interface through the Ethereum
|
// Waku represents a dark communication interface through the Ethereum
|
||||||
|
@ -111,15 +113,16 @@ type Waku struct {
|
||||||
|
|
||||||
// New creates a Waku client ready to communicate through the Ethereum P2P network.
|
// New creates a Waku client ready to communicate through the Ethereum P2P network.
|
||||||
func New(cfg *Config, logger *zap.Logger) *Waku {
|
func New(cfg *Config, logger *zap.Logger) *Waku {
|
||||||
|
if logger == nil {
|
||||||
|
logger = zap.NewNop()
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Debug("starting waku with config", zap.Any("config", cfg))
|
||||||
if cfg == nil {
|
if cfg == nil {
|
||||||
c := DefaultConfig
|
c := DefaultConfig
|
||||||
cfg = &c
|
cfg = &c
|
||||||
}
|
}
|
||||||
|
|
||||||
if logger == nil {
|
|
||||||
logger = zap.NewNop()
|
|
||||||
}
|
|
||||||
|
|
||||||
waku := &Waku{
|
waku := &Waku{
|
||||||
privateKeys: make(map[string]*ecdsa.PrivateKey),
|
privateKeys: make(map[string]*ecdsa.PrivateKey),
|
||||||
symKeys: make(map[string][]byte),
|
symKeys: make(map[string][]byte),
|
||||||
|
@ -139,6 +142,7 @@ func New(cfg *Config, logger *zap.Logger) *Waku {
|
||||||
MinPowTolerance: cfg.MinimumAcceptedPoW,
|
MinPowTolerance: cfg.MinimumAcceptedPoW,
|
||||||
EnableConfirmations: cfg.EnableConfirmations,
|
EnableConfirmations: cfg.EnableConfirmations,
|
||||||
LightClient: cfg.LightClient,
|
LightClient: cfg.LightClient,
|
||||||
|
FullNode: cfg.FullNode,
|
||||||
BloomFilterMode: cfg.BloomFilterMode,
|
BloomFilterMode: cfg.BloomFilterMode,
|
||||||
RestrictLightClientsConn: cfg.RestrictLightClientsConn,
|
RestrictLightClientsConn: cfg.RestrictLightClientsConn,
|
||||||
SyncAllowance: common.DefaultSyncAllowance,
|
SyncAllowance: common.DefaultSyncAllowance,
|
||||||
|
@ -233,6 +237,10 @@ func (w *Waku) MinPowTolerance() float64 {
|
||||||
// If a message does not match the bloom, it will tantamount to spam, and the peer will
|
// If a message does not match the bloom, it will tantamount to spam, and the peer will
|
||||||
// be disconnected.
|
// be disconnected.
|
||||||
func (w *Waku) BloomFilter() []byte {
|
func (w *Waku) BloomFilter() []byte {
|
||||||
|
if w.FullNode() {
|
||||||
|
return common.MakeFullNodeBloom()
|
||||||
|
}
|
||||||
|
|
||||||
w.settingsMu.RLock()
|
w.settingsMu.RLock()
|
||||||
defer w.settingsMu.RUnlock()
|
defer w.settingsMu.RUnlock()
|
||||||
return w.settings.BloomFilter
|
return w.settings.BloomFilter
|
||||||
|
@ -243,6 +251,10 @@ func (w *Waku) BloomFilter() []byte {
|
||||||
// or no change of bloom filter have ever occurred, the return value will be the same
|
// or no change of bloom filter have ever occurred, the return value will be the same
|
||||||
// as return value of BloomFilter().
|
// as return value of BloomFilter().
|
||||||
func (w *Waku) BloomFilterTolerance() []byte {
|
func (w *Waku) BloomFilterTolerance() []byte {
|
||||||
|
if w.FullNode() {
|
||||||
|
return common.MakeFullNodeBloom()
|
||||||
|
}
|
||||||
|
|
||||||
w.settingsMu.RLock()
|
w.settingsMu.RLock()
|
||||||
defer w.settingsMu.RUnlock()
|
defer w.settingsMu.RUnlock()
|
||||||
return w.settings.BloomFilterTolerance
|
return w.settings.BloomFilterTolerance
|
||||||
|
@ -250,6 +262,10 @@ func (w *Waku) BloomFilterTolerance() []byte {
|
||||||
|
|
||||||
// BloomFilterMode returns whether the node is running in bloom filter mode
|
// BloomFilterMode returns whether the node is running in bloom filter mode
|
||||||
func (w *Waku) BloomFilterMode() bool {
|
func (w *Waku) BloomFilterMode() bool {
|
||||||
|
if w.FullNode() {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
w.settingsMu.RLock()
|
w.settingsMu.RLock()
|
||||||
defer w.settingsMu.RUnlock()
|
defer w.settingsMu.RUnlock()
|
||||||
return w.settings.BloomFilterMode
|
return w.settings.BloomFilterMode
|
||||||
|
@ -294,7 +310,8 @@ func (w *Waku) SetBloomFilter(bloom []byte) error {
|
||||||
func (w *Waku) TopicInterest() []common.TopicType {
|
func (w *Waku) TopicInterest() []common.TopicType {
|
||||||
w.settingsMu.RLock()
|
w.settingsMu.RLock()
|
||||||
defer w.settingsMu.RUnlock()
|
defer w.settingsMu.RUnlock()
|
||||||
if w.settings.TopicInterest == nil {
|
// Return nil if FullNode as otherwise topic interest will have precedence
|
||||||
|
if w.settings.FullNode || w.settings.TopicInterest == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
topicInterest := make([]common.TopicType, len(w.settings.TopicInterest))
|
topicInterest := make([]common.TopicType, len(w.settings.TopicInterest))
|
||||||
|
@ -528,7 +545,19 @@ func (w *Waku) notifyPeersAboutPowRequirementChange(pow float64) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *Waku) FullNode() bool {
|
||||||
|
w.settingsMu.RLock()
|
||||||
|
// If full node, nothing to do
|
||||||
|
fullNode := w.settings.FullNode
|
||||||
|
w.settingsMu.RUnlock()
|
||||||
|
return fullNode
|
||||||
|
}
|
||||||
|
|
||||||
func (w *Waku) notifyPeersAboutBloomFilterChange(bloom []byte) {
|
func (w *Waku) notifyPeersAboutBloomFilterChange(bloom []byte) {
|
||||||
|
|
||||||
|
if w.FullNode() {
|
||||||
|
return
|
||||||
|
}
|
||||||
arr := w.getPeers()
|
arr := w.getPeers()
|
||||||
for _, p := range arr {
|
for _, p := range arr {
|
||||||
err := p.NotifyAboutBloomFilterChange(bloom)
|
err := p.NotifyAboutBloomFilterChange(bloom)
|
||||||
|
@ -543,6 +572,9 @@ func (w *Waku) notifyPeersAboutBloomFilterChange(bloom []byte) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *Waku) notifyPeersAboutTopicInterestChange(topicInterest []common.TopicType) {
|
func (w *Waku) notifyPeersAboutTopicInterestChange(topicInterest []common.TopicType) {
|
||||||
|
if w.FullNode() {
|
||||||
|
return
|
||||||
|
}
|
||||||
arr := w.getPeers()
|
arr := w.getPeers()
|
||||||
for _, p := range arr {
|
for _, p := range arr {
|
||||||
err := p.NotifyAboutTopicInterestChange(topicInterest)
|
err := p.NotifyAboutTopicInterestChange(topicInterest)
|
||||||
|
@ -1045,6 +1077,8 @@ func (w *Waku) HandlePeer(peer common.Peer, rw p2p.MsgReadWriter) error {
|
||||||
w.peers[peer] = struct{}{}
|
w.peers[peer] = struct{}{}
|
||||||
w.peerMu.Unlock()
|
w.peerMu.Unlock()
|
||||||
|
|
||||||
|
w.logger.Debug("handling peer", zap.String("id", types.EncodeHex(peer.ID())))
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
w.peerMu.Lock()
|
w.peerMu.Lock()
|
||||||
delete(w.peers, peer)
|
delete(w.peers, peer)
|
||||||
|
@ -1064,10 +1098,13 @@ func (w *Waku) HandlePeer(peer common.Peer, rw p2p.MsgReadWriter) error {
|
||||||
return w.rateLimiter.Decorate(peer, rw, runLoop)
|
return w.rateLimiter.Decorate(peer, rw, runLoop)
|
||||||
}
|
}
|
||||||
|
|
||||||
return peer.Run()
|
err := peer.Run()
|
||||||
|
w.logger.Debug("handled peer", zap.String("id", types.EncodeHex(peer.ID())), zap.Error(err))
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *Waku) OnNewEnvelopes(envelopes []*common.Envelope, peer common.Peer) ([]common.EnvelopeError, error) {
|
func (w *Waku) OnNewEnvelopes(envelopes []*common.Envelope, peer common.Peer) ([]common.EnvelopeError, error) {
|
||||||
|
w.logger.Debug("received new envelopes", zap.Int("count", len(envelopes)))
|
||||||
envelopeErrors := make([]common.EnvelopeError, 0)
|
envelopeErrors := make([]common.EnvelopeError, 0)
|
||||||
trouble := false
|
trouble := false
|
||||||
for _, env := range envelopes {
|
for _, env := range envelopes {
|
||||||
|
@ -1186,6 +1223,9 @@ func (w *Waku) topicInterestMatch(envelope *common.Envelope) (bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *Waku) topicInterestOrBloomMatch(envelope *common.Envelope) (bool, error) {
|
func (w *Waku) topicInterestOrBloomMatch(envelope *common.Envelope) (bool, error) {
|
||||||
|
if w.FullNode() {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
w.settingsMu.RLock()
|
w.settingsMu.RLock()
|
||||||
topicInterestMode := !w.settings.BloomFilterMode
|
topicInterestMode := !w.settings.BloomFilterMode
|
||||||
w.settingsMu.RUnlock()
|
w.settingsMu.RUnlock()
|
||||||
|
@ -1209,6 +1249,20 @@ func (w *Waku) SetBloomFilterMode(mode bool) {
|
||||||
// Recalculate and notify topic interest or bloom, currently not implemented
|
// Recalculate and notify topic interest or bloom, currently not implemented
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *Waku) SetFullNode(set bool) {
|
||||||
|
w.settingsMu.Lock()
|
||||||
|
w.settings.FullNode = set
|
||||||
|
w.settingsMu.Unlock()
|
||||||
|
|
||||||
|
// We advertise the topic interest if full node has been disabled
|
||||||
|
// or bloom filter if enabled, as that's how we indicate to a peer we are a full node or not
|
||||||
|
if set {
|
||||||
|
w.notifyPeersAboutBloomFilterChange(w.BloomFilter())
|
||||||
|
} else {
|
||||||
|
w.notifyPeersAboutTopicInterestChange(w.TopicInterest())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// addEnvelope adds an envelope to the envelope map, used for sending
|
// addEnvelope adds an envelope to the envelope map, used for sending
|
||||||
func (w *Waku) addEnvelope(envelope *common.Envelope) {
|
func (w *Waku) addEnvelope(envelope *common.Envelope) {
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,8 @@ import (
|
||||||
|
|
||||||
"golang.org/x/crypto/pbkdf2"
|
"golang.org/x/crypto/pbkdf2"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
|
|
||||||
"github.com/status-im/status-go/waku/common"
|
"github.com/status-im/status-go/waku/common"
|
||||||
|
@ -938,6 +940,31 @@ func TestSymmetricSendKeyMismatch(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFullNode(t *testing.T) {
|
||||||
|
|
||||||
|
config := &Config{FullNode: true}
|
||||||
|
w := New(config, nil)
|
||||||
|
|
||||||
|
require.True(t, w.FullNode(), "full node should be true")
|
||||||
|
require.True(t, common.IsFullNode(w.BloomFilter()), "bloom filter should be full")
|
||||||
|
require.True(t, common.IsFullNode(w.BloomFilterTolerance()), "bloom filter tolerance should be full")
|
||||||
|
require.Nil(t, w.TopicInterest(), "topic interest should be nil")
|
||||||
|
|
||||||
|
// Set a topic
|
||||||
|
require.NoError(t, w.SetTopicInterest([]common.TopicType{common.BytesToTopic([]byte("a"))}))
|
||||||
|
|
||||||
|
// Make sure everything is the same
|
||||||
|
require.True(t, w.FullNode(), "full node should be true")
|
||||||
|
require.True(t, common.IsFullNode(w.BloomFilter()), "bloom filter should be full")
|
||||||
|
require.True(t, common.IsFullNode(w.BloomFilterTolerance()), "bloom filter tolerance should be full")
|
||||||
|
require.Nil(t, w.TopicInterest(), "topic interest should be nil")
|
||||||
|
|
||||||
|
// unset full node
|
||||||
|
w.SetFullNode(false)
|
||||||
|
require.False(t, w.FullNode(), "full node should be false")
|
||||||
|
require.NotNil(t, w.TopicInterest(), "topic interest should not be nil")
|
||||||
|
}
|
||||||
|
|
||||||
func TestBloom(t *testing.T) {
|
func TestBloom(t *testing.T) {
|
||||||
topic := common.TopicType{0, 0, 255, 6}
|
topic := common.TopicType{0, 0, 255, 6}
|
||||||
b := topic.ToBloom()
|
b := topic.ToBloom()
|
||||||
|
|
Loading…
Reference in New Issue