chore: fix peer exchange and unit testing (#4381)

* chore: peer exchange local test

* chore: use local enr resolver

* remove light client close peer.

* fix: peer exchange not start when enabled

* chore: remove ws config

* uncomment light client

* use resolver option

* chore: use option param for dns resolver

* chore: fix vendor changes.

* chore: lint
This commit is contained in:
kaichao 2023-12-07 08:28:08 +08:00 committed by GitHub
parent 3874e47840
commit 9be202be50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 134 additions and 41 deletions

View File

@ -19,6 +19,7 @@
package wakuv2
import (
ethdisc "github.com/ethereum/go-ethereum/p2p/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/status-im/status-go/wakuv2/common"
@ -26,30 +27,31 @@ import (
// Config represents the configuration state of a waku node.
type Config struct {
MaxMessageSize uint32 `toml:",omitempty"`
Host string `toml:",omitempty"`
Port int `toml:",omitempty"`
PeerExchange bool `toml:",omitempty"`
KeepAliveInterval int `toml:",omitempty"`
MinPeersForRelay int `toml:",omitempty"`
MinPeersForFilter int `toml:",omitempty"`
LightClient bool `toml:",omitempty"`
WakuNodes []string `toml:",omitempty"`
Rendezvous bool `toml:",omitempty"`
DiscV5BootstrapNodes []string `toml:",omitempty"`
Nameserver string `toml:",omitempty"`
EnableDiscV5 bool `toml:",omitempty"`
DiscoveryLimit int `toml:",omitempty"`
AutoUpdate bool `toml:",omitempty"`
UDPPort int `toml:",omitempty"`
EnableStore bool `toml:",omitempty"`
StoreCapacity int `toml:",omitempty"`
StoreSeconds int `toml:",omitempty"`
TelemetryServerURL string `toml:",omitempty"`
EnableFilterFullNode bool `toml:",omitempty"`
DefaultShardPubsubTopic string `toml:",omitempty"`
UseShardAsDefaultTopic bool `toml:",omitempty"`
ClusterID uint16 `toml:",omitempty"`
MaxMessageSize uint32 `toml:",omitempty"`
Host string `toml:",omitempty"`
Port int `toml:",omitempty"`
PeerExchange bool `toml:",omitempty"`
KeepAliveInterval int `toml:",omitempty"`
MinPeersForRelay int `toml:",omitempty"`
MinPeersForFilter int `toml:",omitempty"`
LightClient bool `toml:",omitempty"`
WakuNodes []string `toml:",omitempty"`
Rendezvous bool `toml:",omitempty"`
DiscV5BootstrapNodes []string `toml:",omitempty"`
Nameserver string `toml:",omitempty"`
Resolver ethdisc.Resolver `toml:",omitempty"`
EnableDiscV5 bool `toml:",omitempty"`
DiscoveryLimit int `toml:",omitempty"`
AutoUpdate bool `toml:",omitempty"`
UDPPort int `toml:",omitempty"`
EnableStore bool `toml:",omitempty"`
StoreCapacity int `toml:",omitempty"`
StoreSeconds int `toml:",omitempty"`
TelemetryServerURL string `toml:",omitempty"`
EnableFilterFullNode bool `toml:",omitempty"`
DefaultShardPubsubTopic string `toml:",omitempty"`
UseShardAsDefaultTopic bool `toml:",omitempty"`
ClusterID uint16 `toml:",omitempty"`
}
var DefaultConfig = Config{

View File

@ -54,6 +54,7 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/metrics"
ethdisc "github.com/ethereum/go-ethereum/p2p/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
@ -79,16 +80,17 @@ const bootnodesQueryBackoffMs = 200
const bootnodesMaxRetries = 7
type settings struct {
LightClient bool // Indicates if the node is a light client
MinPeersForRelay int // Indicates the minimum number of peers required for using Relay Protocol
MinPeersForFilter int // Indicates the minimum number of peers required for using Filter Protocol
MaxMsgSize uint32 // Maximal message length allowed by the waku node
EnableConfirmations bool // Enable sending message confirmations
PeerExchange bool // Enable peer exchange
DiscoveryLimit int // Indicates the number of nodes to discover
Nameserver string // Optional nameserver to use for dns discovery
EnableDiscV5 bool // Indicates whether discv5 is enabled or not
DefaultPubsubTopic string // Pubsub topic to be used by default for messages that do not have a topic assigned (depending whether sharding is used or not)
LightClient bool // Indicates if the node is a light client
MinPeersForRelay int // Indicates the minimum number of peers required for using Relay Protocol
MinPeersForFilter int // Indicates the minimum number of peers required for using Filter Protocol
MaxMsgSize uint32 // Maximal message length allowed by the waku node
EnableConfirmations bool // Enable sending message confirmations
PeerExchange bool // Enable peer exchange
DiscoveryLimit int // Indicates the number of nodes to discover
Nameserver string // Optional nameserver to use for dns discovery
Resolver ethdisc.Resolver // Optional resolver to use for dns discovery
EnableDiscV5 bool // Indicates whether discv5 is enabled or not
DefaultPubsubTopic string // Pubsub topic to be used by default for messages that do not have a topic assigned (depending whether sharding is used or not)
Options []node.WakuNodeOption
SkipPublishToTopic bool // used in testing
}
@ -242,6 +244,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
PeerExchange: cfg.PeerExchange,
DiscoveryLimit: cfg.DiscoveryLimit,
Nameserver: cfg.Nameserver,
Resolver: cfg.Resolver,
EnableDiscV5: cfg.EnableDiscV5,
}
@ -403,12 +406,16 @@ func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnA
if !ok {
w.settingsMu.RLock()
nameserver := w.settings.Nameserver
resolver := w.settings.Resolver
w.settingsMu.RUnlock()
var opts []dnsdisc.DNSDiscoveryOption
if nameserver != "" {
opts = append(opts, dnsdisc.WithNameserver(nameserver))
}
if resolver != nil {
opts = append(opts, dnsdisc.WithResolver(resolver))
}
discoveredNodes, err := dnsdisc.RetrieveNodes(ctx, enrtreeAddress, opts...)
if err != nil {
@ -590,11 +597,7 @@ func (w *Waku) runPeerExchangeLoop() {
}
// Obtaining peer ID
peerIDString, err := discoveredNode.PeerInfo.Addrs[0].ValueForProtocol(multiaddr.P_P2P)
if err != nil {
w.logger.Warn("multiaddress does not contain peerID", zap.String("multiaddr", discoveredNode.PeerInfo.Addrs[0].String()))
continue // No peer ID available somehow
}
peerIDString := discoveredNode.PeerID.String()
peerID, err := peer.Decode(peerIDString)
if err != nil {
@ -1184,6 +1187,13 @@ func (w *Waku) Start() error {
}
}
if w.cfg.PeerExchange {
err := w.node.PeerExchange().Start(w.ctx)
if err != nil {
return err
}
}
w.wg.Add(2)
go func() {

View File

@ -10,10 +10,15 @@ import (
"time"
"github.com/cenkalti/backoff/v3"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
ethdnsdisc "github.com/ethereum/go-ethereum/p2p/dnsdisc"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/stretchr/testify/require"
"golang.org/x/exp/maps"
"google.golang.org/protobuf/proto"
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
@ -192,6 +197,82 @@ func TestBasicWakuV2(t *testing.T) {
require.NoError(t, w.Stop())
}
type mapResolver map[string]string
func (mr mapResolver) LookupTXT(ctx context.Context, name string) ([]string, error) {
if record, ok := mr[name]; ok {
return []string{record}, nil
}
return nil, errors.New("not found")
}
var signingKeyForTesting, _ = crypto.ToECDSA(hexutil.MustDecode("0xdc599867fc513f8f5e2c2c9c489cde5e71362d1d9ec6e693e0de063236ed1240"))
func makeTestTree(domain string, nodes []*enode.Node, links []string) (*ethdnsdisc.Tree, string) {
tree, err := ethdnsdisc.MakeTree(1, nodes, links)
if err != nil {
panic(err)
}
url, err := tree.Sign(signingKeyForTesting, domain)
if err != nil {
panic(err)
}
return tree, url
}
func TestPeerExchange(t *testing.T) {
// start node which serve as PeerExchange server
config := &Config{}
config.EnableDiscV5 = true
config.PeerExchange = true
pxServerNode, err := New("", "", config, nil, nil, nil, nil, nil)
require.NoError(t, err)
require.NoError(t, pxServerNode.Start())
time.Sleep(1 * time.Second)
// start node that will be discovered by PeerExchange
config = &Config{}
config.EnableDiscV5 = true
config.DiscV5BootstrapNodes = []string{pxServerNode.node.ENR().String()}
node, err := New("", "", config, nil, nil, nil, nil, nil)
require.NoError(t, err)
require.NoError(t, node.Start())
time.Sleep(1 * time.Second)
// start light node which use PeerExchange to discover peers
enrNodes := []*enode.Node{pxServerNode.node.ENR()}
tree, url := makeTestTree("n", enrNodes, nil)
resolver := mapResolver(tree.ToTXT("n"))
config = &Config{}
config.PeerExchange = true
config.LightClient = true
config.Resolver = resolver
config.WakuNodes = []string{url}
lightNode, err := New("", "", config, nil, nil, nil, nil, nil)
require.NoError(t, err)
require.NoError(t, lightNode.Start())
// Sanity check, not great, but it's probably helpful
options := func(b *backoff.ExponentialBackOff) {
b.MaxElapsedTime = 30 * time.Second
}
err = tt.RetryWithBackOff(func() error {
if len(lightNode.Peers()) == 2 {
return nil
}
return errors.New("no peers discovered")
}, options)
require.NoError(t, err)
require.NoError(t, lightNode.Stop())
require.NoError(t, pxServerNode.Stop())
require.NoError(t, node.Stop())
}
func TestWakuV2Filter(t *testing.T) {
enrTreeAddress := testENRBootstrap
envEnrTreeAddress := os.Getenv("ENRTREE_ADDRESS")