diff --git a/waku/v2/discv5/discover_test.go b/waku/v2/discv5/discover_test.go index 8e55536d..f7fc461c 100644 --- a/waku/v2/discv5/discover_test.go +++ b/waku/v2/discv5/discover_test.go @@ -2,6 +2,11 @@ package discv5 import ( "context" + dto "github.com/prometheus/client_model/go" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" + wakuproto "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/service" + "go.uber.org/zap" "testing" "time" @@ -15,6 +20,51 @@ import ( "github.com/waku-org/go-waku/waku/v2/utils" ) +func discoverFilterOnDemand(iterator enode.Iterator, maxCount int) ([]service.PeerData, error) { + + log := utils.Logger() + + var peers []service.PeerData + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + //Iterate and fill peers. + defer iterator.Close() + + for iterator.Next() { + + pInfo, err := wenr.EnodeToPeerInfo(iterator.Node()) + if err != nil { + continue + } + pData := service.PeerData{ + Origin: wps.Discv5, + ENR: iterator.Node(), + AddrInfo: *pInfo, + } + peers = append(peers, pData) + + log.Info("found peer", zap.String("ID", pData.AddrInfo.ID.String())) + + if len(peers) >= maxCount { + log.Info("found required number of nodes, stopping on demand discovery") + break + } + + select { + case <-ctx.Done(): + log.Error("failed to find peers for shard and services", zap.Error(ctx.Err())) + return nil, ctx.Err() + + default: + } + + } + + return peers, nil +} + func TestDiscV5(t *testing.T) { // Host1 <-> Host2 <-> Host3 // Host4(No waku capabilities) <-> Host2 @@ -102,3 +152,276 @@ func TestDiscV5(t *testing.T) { require.False(t, peerconn3.HasPeer(host4.ID())) //host4 should not be discoverable, rather filtered out. } + +func TestDiscV5WithCapabilitiesFilter(t *testing.T) { + + // H1 + host1, _, prvKey1 := tests.CreateHost(t) + udpPort1, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3) + require.NoError(t, err) + ip1, _ := tests.ExtractIP(host1.Addrs()[0]) + l1, err := tests.NewLocalnode(prvKey1, ip1, udpPort1, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) + require.NoError(t, err) + peerconn1 := NewTestPeerDiscoverer() + d1, err := NewDiscoveryV5(prvKey1, l1, peerconn1, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort1))) + require.NoError(t, err) + d1.SetHost(host1) + + // H2 + host2, _, prvKey2 := tests.CreateHost(t) + ip2, _ := tests.ExtractIP(host2.Addrs()[0]) + udpPort2, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3) + require.NoError(t, err) + l2, err := tests.NewLocalnode(prvKey2, ip2, udpPort2, wenr.NewWakuEnrBitfield(true, true, false, true), nil, utils.Logger()) + require.NoError(t, err) + peerconn2 := NewTestPeerDiscoverer() + d2, err := NewDiscoveryV5(prvKey2, l2, peerconn2, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort2))) + require.NoError(t, err) + d2.SetHost(host2) + + // H3 + host3, _, prvKey3 := tests.CreateHost(t) + ip3, _ := tests.ExtractIP(host3.Addrs()[0]) + udpPort3, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3) + require.NoError(t, err) + l3, err := tests.NewLocalnode(prvKey3, ip3, udpPort3, wenr.NewWakuEnrBitfield(true, true, false, true), nil, utils.Logger()) + require.NoError(t, err) + peerconn3 := NewTestPeerDiscoverer() + d3, err := NewDiscoveryV5(prvKey3, l3, peerconn3, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort3))) + require.NoError(t, err) + d3.SetHost(host3) + + defer d1.Stop() + defer d2.Stop() + defer d3.Stop() + + err = d1.Start(context.Background()) + require.NoError(t, err) + + err = d2.Start(context.Background()) + require.NoError(t, err) + // Set boot nodes for node2 after the DiscoveryV5 was created + err = d2.SetBootnodes([]*enode.Node{d1.localnode.Node()}) + require.NoError(t, err) + + err = d3.Start(context.Background()) + require.NoError(t, err) + // Set boot nodes for node3 after the DiscoveryV5 was created + err = d3.SetBootnodes([]*enode.Node{d2.localnode.Node()}) + require.NoError(t, err) + + // Desired node capabilities + filterBitfield := wenr.NewWakuEnrBitfield(false, false, true, false) + iterator3, err := d3.PeerIterator(FilterCapabilities(filterBitfield)) + require.NoError(t, err) + require.NotNil(t, iterator3) + + time.Sleep(2 * time.Second) + + // Check node were discovered by automatic discovery + require.True(t, peerconn3.HasPeer(host1.ID()) && peerconn3.HasPeer(host2.ID())) + + peers, err := discoverFilterOnDemand(iterator3, 1) + require.NoError(t, err) + require.Equal(t, 1, len(peers)) + + // Host1 has store support while host2 hasn't + require.Equal(t, host1.ID().String(), peers[0].AddrInfo.ID.String()) + + d3.Stop() + peerconn3.Clear() + +} + +func TestDiscV5WithShardFilter(t *testing.T) { + + // Following topic syntax for shard /waku/2/rs// + pubSubTopic := "/waku/2/rs/10/1" + + // H1 + host1, _, prvKey1 := tests.CreateHost(t) + udpPort1, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3) + require.NoError(t, err) + ip1, _ := tests.ExtractIP(host1.Addrs()[0]) + l1, err := tests.NewLocalnode(prvKey1, ip1, udpPort1, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) + require.NoError(t, err) + peerconn1 := NewTestPeerDiscoverer() + d1, err := NewDiscoveryV5(prvKey1, l1, peerconn1, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort1))) + require.NoError(t, err) + d1.SetHost(host1) + + // Derive shard from the topic + rs1, err := wakuproto.TopicsToRelayShards(pubSubTopic) + require.NoError(t, err) + + // Update node with shard info + err = wenr.Update(l1, wenr.WithWakuRelaySharding(rs1[0])) + require.NoError(t, err) + + // H2 + host2, _, prvKey2 := tests.CreateHost(t) + ip2, _ := tests.ExtractIP(host2.Addrs()[0]) + udpPort2, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3) + require.NoError(t, err) + l2, err := tests.NewLocalnode(prvKey2, ip2, udpPort2, wenr.NewWakuEnrBitfield(true, true, false, true), nil, utils.Logger()) + require.NoError(t, err) + peerconn2 := NewTestPeerDiscoverer() + d2, err := NewDiscoveryV5(prvKey2, l2, peerconn2, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort2)), WithBootnodes([]*enode.Node{d1.localnode.Node()})) + require.NoError(t, err) + d2.SetHost(host2) + + // Update second node with shard info used for first node as well + err = wenr.Update(l2, wenr.WithWakuRelaySharding(rs1[0])) + require.NoError(t, err) + + // H3 + host3, _, prvKey3 := tests.CreateHost(t) + ip3, _ := tests.ExtractIP(host3.Addrs()[0]) + udpPort3, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3) + require.NoError(t, err) + l3, err := tests.NewLocalnode(prvKey3, ip3, udpPort3, wenr.NewWakuEnrBitfield(true, true, false, true), nil, utils.Logger()) + require.NoError(t, err) + peerconn3 := NewTestPeerDiscoverer() + d3, err := NewDiscoveryV5(prvKey3, l3, peerconn3, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort3)), WithBootnodes([]*enode.Node{d2.localnode.Node()})) + require.NoError(t, err) + d3.SetHost(host3) + + defer d1.Stop() + defer d2.Stop() + defer d3.Stop() + + err = d1.Start(context.Background()) + require.NoError(t, err) + + err = d2.Start(context.Background()) + require.NoError(t, err) + + err = d3.Start(context.Background()) + require.NoError(t, err) + + // Create iterator with desired shard info + iterator3, err := d3.PeerIterator(FilterShard(rs1[0].ClusterID, rs1[0].ShardIDs[0])) + + require.NoError(t, err) + require.NotNil(t, iterator3) + + time.Sleep(2 * time.Second) + + // Check node were discovered by automatic discovery + require.True(t, peerconn3.HasPeer(host1.ID()) && peerconn3.HasPeer(host2.ID())) + + // Request two nodes + peers, err := discoverFilterOnDemand(iterator3, 2) + require.NoError(t, err) + require.Equal(t, 2, len(peers)) + + // Create map for checking peer.ID and enode.ID + allPeers := make(map[string]string) + allPeers[host1.ID().String()] = d1.Node().ID().String() + allPeers[host2.ID().String()] = d2.Node().ID().String() + allPeers[host3.ID().String()] = d3.Node().ID().String() + + // Check nodes1 and nodes2 were discovered and node3 wasn't + for _, peer := range peers { + delete(allPeers, peer.AddrInfo.ID.String()) + } + + require.Equal(t, 1, len(allPeers)) + enodeID3, host3Remains := allPeers[host3.ID().String()] + require.True(t, host3Remains) + require.Equal(t, d3.Node().ID().String(), enodeID3) + + d3.Stop() + peerconn3.Clear() +} + +func TestRecordErrorIteratorFailure(t *testing.T) { + + m := newMetrics(prometheus.DefaultRegisterer) + + // Increment error counter for rateLimitFailure 7 times + for i := 0; i < 2; i++ { + m.RecordError(iteratorFailure) + } + + // Retrieve metric values + counter, _ := discV5Errors.GetMetricWithLabelValues(string(iteratorFailure)) + failures := &dto.Metric{} + + // Store values into metric client struct + err := counter.Write(failures) + require.NoError(t, err) + + // Check the count is in + require.Equal(t, 2, int(failures.GetCounter().GetValue())) + +} + +func TestDiscV5WithCustomPredicate(t *testing.T) { + + // H1 + host1, _, prvKey1 := tests.CreateHost(t) + udpPort1, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3) + require.NoError(t, err) + ip1, _ := tests.ExtractIP(host1.Addrs()[0]) + l1, err := tests.NewLocalnode(prvKey1, ip1, udpPort1, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) + require.NoError(t, err) + peerconn1 := NewTestPeerDiscoverer() + d1, err := NewDiscoveryV5(prvKey1, l1, peerconn1, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort1))) + require.NoError(t, err) + d1.SetHost(host1) + + // H2 + host2, _, prvKey2 := tests.CreateHost(t) + ip2, _ := tests.ExtractIP(host2.Addrs()[0]) + udpPort2, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3) + require.NoError(t, err) + l2, err := tests.NewLocalnode(prvKey2, ip2, udpPort2, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) + require.NoError(t, err) + peerconn2 := NewTestPeerDiscoverer() + d2, err := NewDiscoveryV5(prvKey2, l2, peerconn2, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort2)), WithBootnodes([]*enode.Node{d1.localnode.Node()})) + require.NoError(t, err) + d2.SetHost(host2) + + // H3 + blockAllPredicate := func(node *enode.Node) bool { + return false + } + + host3, _, prvKey3 := tests.CreateHost(t) + ip3, _ := tests.ExtractIP(host3.Addrs()[0]) + udpPort3, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3) + require.NoError(t, err) + l3, err := tests.NewLocalnode(prvKey3, ip3, udpPort3, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) + require.NoError(t, err) + peerconn3 := NewTestPeerDiscoverer() + d3, err := NewDiscoveryV5(prvKey3, l3, peerconn3, prometheus.DefaultRegisterer, utils.Logger(), + WithPredicate(blockAllPredicate), WithUDPPort(uint(udpPort3)), + WithBootnodes([]*enode.Node{d2.localnode.Node()})) + require.NoError(t, err) + d3.SetHost(host3) + + defer d1.Stop() + defer d2.Stop() + defer d3.Stop() + + err = d1.Start(context.Background()) + require.NoError(t, err) + + err = d2.Start(context.Background()) + require.NoError(t, err) + + err = d3.Start(context.Background()) + require.NoError(t, err) + + time.Sleep(2 * time.Second) + + // Check none nodes were discovered by node3 as it is prevented by blockAllPredicate + require.False(t, peerconn3.HasPeer(host1.ID()) || peerconn3.HasPeer(host2.ID())) + + // Check node2 could still discover node1 - predicate works at node scope only + require.True(t, peerconn2.HasPeer(host1.ID())) + + d3.Stop() + peerconn3.Clear() +}