diff --git a/cmd/waku/node.go b/cmd/waku/node.go index 56e3f83a..534b1e00 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -421,20 +421,6 @@ func Execute(options NodeOptions) error { } } - if len(discoveredNodes) != 0 { - for _, n := range discoveredNodes { - go func(ctx context.Context, info peer.AddrInfo) { - ctx, cancel := context.WithTimeout(ctx, dialTimeout) - defer cancel() - err = wakuNode.DialPeerWithInfo(ctx, info) - if err != nil { - logger.Error("dialing peer", logging.HostID("peer", info.ID), zap.Error(err)) - } - }(ctx, n.PeerInfo) - - } - } - var rpcServer *rpc.WakuRPC if options.RPCServer.Enable { rpcServer = rpc.NewWakuRPC(wakuNode, options.RPCServer.Address, options.RPCServer.Port, options.RPCServer.Admin, options.PProf, options.RPCServer.RelayCacheCapacity, logger) diff --git a/tests/utils.go b/tests/utils.go index ae54a594..01100b77 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -2,21 +2,33 @@ package tests import ( "context" + "crypto/ecdsa" "crypto/rand" "encoding/hex" "fmt" "io" + "math" "net" + "strconv" "testing" + gcrypto "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/config" "github.com/libp2p/go-libp2p/core/crypto" + libp2pcrypto "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/waku/v2/peerstore" + wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/utils" + "go.uber.org/zap" ) // GetHostAddress returns the first listen address used by a host @@ -137,3 +149,74 @@ func RandomHex(n int) (string, error) { } return hex.EncodeToString(bytes), nil } + +func NewLocalnode(priv *ecdsa.PrivateKey, ipAddr *net.TCPAddr, udpPort int, wakuFlags wenr.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.Logger) (*enode.LocalNode, error) { + db, err := enode.OpenDB("") + if err != nil { + return nil, err + } + localnode := enode.NewLocalNode(db, priv) + localnode.SetFallbackUDP(udpPort) + localnode.Set(enr.WithEntry(wenr.WakuENRField, wakuFlags)) + localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) + localnode.SetStaticIP(ipAddr.IP) + + if udpPort > 0 && udpPort <= math.MaxUint16 { + localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion] + } else { + log.Error("setting udpPort", zap.Int("port", udpPort)) + } + + if ipAddr.Port > 0 && ipAddr.Port <= math.MaxUint16 { + localnode.Set(enr.TCP(uint16(ipAddr.Port))) // lgtm [go/incorrect-integer-conversion] + } else { + log.Error("setting tcpPort", zap.Int("port", ipAddr.Port)) + } + + if advertiseAddr != nil { + localnode.SetStaticIP(*advertiseAddr) + } + + return localnode, nil +} + +func CreateHost(t *testing.T, opts ...config.Option) (host.Host, int, *ecdsa.PrivateKey) { + privKey, err := gcrypto.GenerateKey() + require.NoError(t, err) + + sPrivKey := libp2pcrypto.PrivKey(utils.EcdsaPrivKeyToSecp256k1PrivKey(privKey)) + + port, err := FindFreePort(t, "127.0.0.1", 3) + require.NoError(t, err) + + sourceMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)) + require.NoError(t, err) + + opts = append(opts, libp2p.ListenAddrs(sourceMultiAddr), + libp2p.Identity(sPrivKey)) + + host, err := libp2p.New(opts...) + require.NoError(t, err) + + return host, port, privKey +} + +func ExtractIP(addr multiaddr.Multiaddr) (*net.TCPAddr, error) { + ipStr, err := addr.ValueForProtocol(multiaddr.P_IP4) + if err != nil { + return nil, err + } + + portStr, err := addr.ValueForProtocol(multiaddr.P_TCP) + if err != nil { + return nil, err + } + port, err := strconv.Atoi(portStr) + if err != nil { + return nil, err + } + return &net.TCPAddr{ + IP: net.ParseIP(ipStr), + Port: port, + }, nil +} diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index ba0a7c36..ebcecedf 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -14,9 +14,9 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-discover/discover" "github.com/waku-org/go-waku/logging" - "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/peerstore" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" + "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -29,7 +29,7 @@ var ErrNoDiscV5Listener = errors.New("no discv5 listener") // PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol type PeerConnector interface { - Subscribe(context.Context, <-chan peermanager.PeerData) + Subscribe(context.Context, <-chan service.PeerData) } type DiscoveryV5 struct { @@ -46,7 +46,7 @@ type DiscoveryV5 struct { log *zap.Logger - *peermanager.CommonDiscoveryService + *service.CommonDiscoveryService } type discV5Parameters struct { @@ -139,7 +139,7 @@ func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConn params: params, peerConnector: peerConnector, NAT: NAT, - CommonDiscoveryService: peermanager.NewCommonDiscoveryService(), + CommonDiscoveryService: service.NewCommonDiscoveryService(), localnode: localnode, metrics: newMetrics(reg), config: discover.Config{ @@ -438,7 +438,7 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error { defer iterator.Close() d.Iterate(ctx, iterator, func(n *enode.Node, p peer.AddrInfo) error { - peer := peermanager.PeerData{ + peer := service.PeerData{ Origin: peerstore.Discv5, AddrInfo: p, ENR: n, diff --git a/waku/v2/discv5/discover_test.go b/waku/v2/discv5/discover_test.go index ba906b03..8e55536d 100644 --- a/waku/v2/discv5/discover_test.go +++ b/waku/v2/discv5/discover_test.go @@ -2,151 +2,67 @@ package discv5 import ( "context" - "crypto/ecdsa" - "fmt" - "math" - "net" - "strconv" "testing" "time" - gcrypto "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/p2p/enr" "github.com/prometheus/client_golang/prometheus" - "github.com/waku-org/go-waku/waku/v2/peermanager" + wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" - "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/utils" - "go.uber.org/zap" - - "github.com/libp2p/go-libp2p" - libp2pcrypto "github.com/libp2p/go-libp2p/core/crypto" - "github.com/libp2p/go-libp2p/core/host" ) -func createHost(t *testing.T) (host.Host, int, *ecdsa.PrivateKey) { - privKey, err := gcrypto.GenerateKey() - require.NoError(t, err) - - sPrivKey := libp2pcrypto.PrivKey(utils.EcdsaPrivKeyToSecp256k1PrivKey(privKey)) - - port, err := tests.FindFreePort(t, "127.0.0.1", 3) - require.NoError(t, err) - - sourceMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)) - require.NoError(t, err) - - host, err := libp2p.New( - libp2p.ListenAddrs(sourceMultiAddr), - libp2p.Identity(sPrivKey), - ) - require.NoError(t, err) - - return host, port, privKey -} - -func newLocalnode(priv *ecdsa.PrivateKey, ipAddr *net.TCPAddr, udpPort int, wakuFlags wenr.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.Logger) (*enode.LocalNode, error) { - db, err := enode.OpenDB("") - if err != nil { - return nil, err - } - localnode := enode.NewLocalNode(db, priv) - localnode.SetFallbackUDP(udpPort) - localnode.Set(enr.WithEntry(wenr.WakuENRField, wakuFlags)) - localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) - localnode.SetStaticIP(ipAddr.IP) - - if udpPort > 0 && udpPort <= math.MaxUint16 { - localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion] - } else { - log.Error("setting udpPort", zap.Int("port", udpPort)) - } - - if ipAddr.Port > 0 && ipAddr.Port <= math.MaxUint16 { - localnode.Set(enr.TCP(uint16(ipAddr.Port))) // lgtm [go/incorrect-integer-conversion] - } else { - log.Error("setting tcpPort", zap.Int("port", ipAddr.Port)) - } - - if advertiseAddr != nil { - localnode.SetStaticIP(*advertiseAddr) - } - - return localnode, nil -} - -func extractIP(addr multiaddr.Multiaddr) (*net.TCPAddr, error) { - ipStr, err := addr.ValueForProtocol(multiaddr.P_IP4) - if err != nil { - return nil, err - } - - portStr, err := addr.ValueForProtocol(multiaddr.P_TCP) - if err != nil { - return nil, err - } - port, err := strconv.Atoi(portStr) - if err != nil { - return nil, err - } - return &net.TCPAddr{ - IP: net.ParseIP(ipStr), - Port: port, - }, nil -} - func TestDiscV5(t *testing.T) { // Host1 <-> Host2 <-> Host3 // Host4(No waku capabilities) <-> Host2 // H1 - host1, _, prvKey1 := createHost(t) + host1, _, prvKey1 := tests.CreateHost(t) udpPort1, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3) require.NoError(t, err) - ip1, _ := extractIP(host1.Addrs()[0]) - l1, err := newLocalnode(prvKey1, ip1, udpPort1, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) + ip1, _ := tests.ExtractIP(host1.Addrs()[0]) + l1, err := tests.NewLocalnode(prvKey1, ip1, udpPort1, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) require.NoError(t, err) - peerconn1 := peermanager.NewTestPeerDiscoverer() + peerconn1 := NewTestPeerDiscoverer() d1, err := NewDiscoveryV5(prvKey1, l1, peerconn1, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort1))) require.NoError(t, err) d1.SetHost(host1) // H2 - host2, _, prvKey2 := createHost(t) - ip2, _ := extractIP(host2.Addrs()[0]) + host2, _, prvKey2 := tests.CreateHost(t) + ip2, _ := tests.ExtractIP(host2.Addrs()[0]) udpPort2, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3) require.NoError(t, err) - l2, err := newLocalnode(prvKey2, ip2, udpPort2, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) + l2, err := tests.NewLocalnode(prvKey2, ip2, udpPort2, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) require.NoError(t, err) - peerconn2 := peermanager.NewTestPeerDiscoverer() + peerconn2 := NewTestPeerDiscoverer() d2, err := NewDiscoveryV5(prvKey2, l2, peerconn2, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort2)), WithBootnodes([]*enode.Node{d1.localnode.Node()})) require.NoError(t, err) d2.SetHost(host2) // H3 - host3, _, prvKey3 := createHost(t) - ip3, _ := extractIP(host3.Addrs()[0]) + host3, _, prvKey3 := tests.CreateHost(t) + ip3, _ := tests.ExtractIP(host3.Addrs()[0]) udpPort3, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3) require.NoError(t, err) - l3, err := newLocalnode(prvKey3, ip3, udpPort3, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) + l3, err := tests.NewLocalnode(prvKey3, ip3, udpPort3, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) require.NoError(t, err) - peerconn3 := peermanager.NewTestPeerDiscoverer() + peerconn3 := NewTestPeerDiscoverer() d3, err := NewDiscoveryV5(prvKey3, l3, peerconn3, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort3)), WithBootnodes([]*enode.Node{d2.localnode.Node()})) require.NoError(t, err) d3.SetHost(host3) // H4 doesn't have any Waku capabilities - host4, _, prvKey4 := createHost(t) - ip4, _ := extractIP(host2.Addrs()[0]) + host4, _, prvKey4 := tests.CreateHost(t) + ip4, _ := tests.ExtractIP(host2.Addrs()[0]) udpPort4, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3) require.NoError(t, err) - l4, err := newLocalnode(prvKey4, ip4, udpPort4, 0, nil, utils.Logger()) + l4, err := tests.NewLocalnode(prvKey4, ip4, udpPort4, 0, nil, utils.Logger()) require.NoError(t, err) - peerconn4 := peermanager.NewTestPeerDiscoverer() + peerconn4 := NewTestPeerDiscoverer() d4, err := NewDiscoveryV5(prvKey4, l4, peerconn4, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort4)), WithBootnodes([]*enode.Node{d2.localnode.Node()})) require.NoError(t, err) d2.SetHost(host2) diff --git a/waku/v2/peermanager/mock_peer_discoverer.go b/waku/v2/discv5/mock_peer_discoverer.go similarity index 93% rename from waku/v2/peermanager/mock_peer_discoverer.go rename to waku/v2/discv5/mock_peer_discoverer.go index f7ea5138..5bef8542 100644 --- a/waku/v2/peermanager/mock_peer_discoverer.go +++ b/waku/v2/discv5/mock_peer_discoverer.go @@ -1,10 +1,11 @@ -package peermanager +package discv5 import ( "context" "sync" "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/service" ) // TestPeerDiscoverer is mock peer discoverer for testing @@ -23,7 +24,7 @@ func NewTestPeerDiscoverer() *TestPeerDiscoverer { } // Subscribe is for subscribing to peer discoverer -func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan PeerData) { +func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan service.PeerData) { go func() { for { select { diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 8fc659c2..0bb611f7 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -43,6 +43,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/rendezvous" + "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" @@ -289,6 +290,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { } w.opts.legacyFilterOpts = append(w.opts.legacyFilterOpts, legacy_filter.WithPeerManager(w.peermanager)) + w.opts.filterOpts = append(w.opts.filterOpts, filter.WithPeerManager(w.peermanager)) w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullNode, w.timesource, w.opts.prometheusReg, w.log, w.opts.legacyFilterOpts...) w.filterFullNode = filter.NewWakuFilterFullNode(w.timesource, w.opts.prometheusReg, w.log, w.opts.filterOpts...) @@ -691,7 +693,9 @@ func (w *WakuNode) mountDiscV5() error { } var err error - w.discoveryV5, err = discv5.NewDiscoveryV5(w.opts.privKey, w.localNode, w.peerConnector, w.opts.prometheusReg, w.log, discV5Options...) + discv5Inst, err := discv5.NewDiscoveryV5(w.opts.privKey, w.localNode, w.peerConnector, w.opts.prometheusReg, w.log, discV5Options...) + w.discoveryV5 = discv5Inst + w.peermanager.SetDiscv5(discv5Inst) return err } @@ -714,7 +718,7 @@ func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics // AddDiscoveredPeer to add a discovered peer to the node peerStore func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubsubTopics []string, connectNow bool) { - p := peermanager.PeerData{ + p := service.PeerData{ Origin: origin, AddrInfo: peer.AddrInfo{ ID: ID, diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index a4dbd2c6..553313af 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -18,6 +18,7 @@ import ( "github.com/waku-org/go-waku/logging" wps "github.com/waku-org/go-waku/waku/v2/peerstore" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/service" "go.uber.org/zap" @@ -34,7 +35,7 @@ type PeerConnectionStrategy struct { paused atomic.Bool dialTimeout time.Duration - *CommonDiscoveryService + *service.CommonDiscoveryService subscriptions []subscription backoff backoff.BackoffFactory @@ -43,7 +44,7 @@ type PeerConnectionStrategy struct { type subscription struct { ctx context.Context - ch <-chan PeerData + ch <-chan service.PeerData } // backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer @@ -71,7 +72,7 @@ func NewPeerConnectionStrategy(pm *PeerManager, pc := &PeerConnectionStrategy{ cache: cache, dialTimeout: dialTimeout, - CommonDiscoveryService: NewCommonDiscoveryService(), + CommonDiscoveryService: service.NewCommonDiscoveryService(), pm: pm, backoff: getBackOff(), logger: logger.Named("discovery-connector"), @@ -86,7 +87,7 @@ type connCacheData struct { } // Subscribe receives channels on which discovered peers should be pushed -func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan PeerData) { +func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan service.PeerData) { // if not running yet, store the subscription and return if err := c.ErrOnNotRunning(); err != nil { c.mux.Lock() diff --git a/waku/v2/peermanager/peer_discovery.go b/waku/v2/peermanager/peer_discovery.go new file mode 100644 index 00000000..72ee3077 --- /dev/null +++ b/waku/v2/peermanager/peer_discovery.go @@ -0,0 +1,117 @@ +package peermanager + +import ( + "context" + "errors" + + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/waku-org/go-waku/waku/v2/discv5" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" + waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" + wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/service" + "go.uber.org/zap" +) + +// DiscoverAndConnectToPeers discovers peers using discoveryv5 and connects to the peers. +// It discovers peers till maxCount peers are found for the cluster,shard and protocol or the context passed expires. +func (pm *PeerManager) DiscoverAndConnectToPeers(ctx context.Context, cluster uint16, + shard uint16, serviceProtocol protocol.ID, maxCount int) error { + if pm.discoveryService == nil { + return nil + } + peers, err := pm.discoverOnDemand(cluster, shard, serviceProtocol, ctx, maxCount) + if err != nil { + return err + } + + pm.logger.Debug("discovered peers on demand ", zap.Int("noOfPeers", len(peers))) + connectNow := false + //Add discovered peers to peerStore and connect to them + for idx, p := range peers { + if serviceProtocol != relay.WakuRelayID_v200 && idx <= maxCount { + //how many connections to initiate? Maybe this could be a config exposed to client API. + //For now just going ahead with initiating connections with 2 nodes in case of non-relay service peers + //In case of relay let it go through connectivityLoop + connectNow = true + } + pm.AddDiscoveredPeer(p, connectNow) + } + return nil +} + +// RegisterWakuProtocol to be used by Waku protocols that could be used for peer discovery +// Which means protoocl should be as defined in waku2 ENR key in https://rfc.vac.dev/spec/31/. +func (pm *PeerManager) RegisterWakuProtocol(proto protocol.ID, bitField uint8) { + pm.wakuprotoToENRFieldMap[proto] = WakuProtoInfo{waku2ENRBitField: bitField} +} + +// OnDemandPeerDiscovery initiates an on demand peer discovery and +// filters peers based on cluster,shard and any wakuservice protocols specified +func (pm *PeerManager) discoverOnDemand(cluster uint16, + shard uint16, wakuProtocol protocol.ID, ctx context.Context, maxCount int) ([]service.PeerData, error) { + var peers []service.PeerData + + wakuProtoInfo, ok := pm.wakuprotoToENRFieldMap[wakuProtocol] + if !ok { + pm.logger.Error("cannot do on demand discovery for non-waku protocol", zap.String("protocol", string(wakuProtocol))) + return nil, errors.New("cannot do on demand discovery for non-waku protocol") + } + iterator, err := pm.discoveryService.PeerIterator( + discv5.FilterShard(cluster, shard), + discv5.FilterCapabilities(wakuProtoInfo.waku2ENRBitField), + ) + if err != nil { + pm.logger.Error("failed to find peers for shard and services", zap.Uint16("cluster", cluster), + zap.Uint16("shard", shard), zap.String("service", string(wakuProtocol)), zap.Error(err)) + return peers, err + } + + //Iterate and fill peers. + defer iterator.Close() + + for iterator.Next() { + + pInfo, err := wenr.EnodeToPeerInfo(iterator.Node()) + if err != nil { + continue + } + pData := service.PeerData{ + Origin: wps.Discv5, + ENR: iterator.Node(), + AddrInfo: *pInfo, + } + peers = append(peers, pData) + + if len(peers) >= maxCount { + pm.logger.Debug("found required number of nodes, stopping on demand discovery", zap.Uint16("cluster", cluster), + zap.Uint16("shard", shard), zap.Int("required-nodes", maxCount)) + break + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + } + return peers, nil +} + +func (pm *PeerManager) discoverPeersByPubsubTopic(pubsubTopic string, proto protocol.ID, ctx context.Context, maxCount int) { + shardInfo, err := waku_proto.TopicsToRelayShards(pubsubTopic) + if err != nil { + pm.logger.Error("failed to convert pubsub topic to shard", zap.String("topic", pubsubTopic), zap.Error(err)) + return + } + if len(shardInfo) > 0 { + err = pm.DiscoverAndConnectToPeers(ctx, shardInfo[0].ClusterID, shardInfo[0].ShardIDs[0], proto, maxCount) + if err != nil { + pm.logger.Error("failed to discover and conenct to peers", zap.Error(err)) + } + } else { + pm.logger.Debug("failed to convert pubsub topic to shard as topic is named pubsubTopic", zap.String("topic", pubsubTopic)) + } +} diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index d39ae51a..f309b008 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -3,10 +3,10 @@ package peermanager import ( "context" "errors" - "math/rand" "sync" "time" + "github.com/ethereum/go-ethereum/p2p/enr" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" @@ -14,13 +14,14 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/protocol" - "github.com/libp2p/go-libp2p/p2p/protocol/ping" ma "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/discv5" wps "github.com/waku-org/go-waku/waku/v2/peerstore" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/service" "go.uber.org/zap" ) @@ -30,20 +31,29 @@ type NodeTopicDetails struct { topic *pubsub.Topic } +// WakuProtoInfo holds protocol specific info +// To be used at a later stage to set various config such as criteria for peer management specific to each Waku protocols +// This should make peer-manager agnostic to protocol +type WakuProtoInfo struct { + waku2ENRBitField uint8 +} + // PeerManager applies various controls and manage connections towards peers. type PeerManager struct { - peerConnector *PeerConnectionStrategy - maxPeers int - maxRelayPeers int - logger *zap.Logger - InRelayPeersTarget int - OutRelayPeersTarget int - host host.Host - serviceSlots *ServiceSlots - ctx context.Context - sub event.Subscription - topicMutex sync.RWMutex - subRelayTopics map[string]*NodeTopicDetails + peerConnector *PeerConnectionStrategy + maxPeers int + maxRelayPeers int + logger *zap.Logger + InRelayPeersTarget int + OutRelayPeersTarget int + host host.Host + serviceSlots *ServiceSlots + ctx context.Context + sub event.Subscription + topicMutex sync.RWMutex + subRelayTopics map[string]*NodeTopicDetails + discoveryService *discv5.DiscoveryV5 + wakuprotoToENRFieldMap map[protocol.ID]WakuProtoInfo } // PeerSelection provides various options based on which Peer is selected from a list of peers. @@ -88,13 +98,14 @@ func NewPeerManager(maxConnections int, maxPeers int, logger *zap.Logger) *PeerM } pm := &PeerManager{ - logger: logger.Named("peer-manager"), - maxRelayPeers: maxRelayPeers, - InRelayPeersTarget: inRelayPeersTarget, - OutRelayPeersTarget: outRelayPeersTarget, - serviceSlots: NewServiceSlot(), - subRelayTopics: make(map[string]*NodeTopicDetails), - maxPeers: maxPeers, + logger: logger.Named("peer-manager"), + maxRelayPeers: maxRelayPeers, + InRelayPeersTarget: inRelayPeersTarget, + OutRelayPeersTarget: outRelayPeersTarget, + serviceSlots: NewServiceSlot(), + subRelayTopics: make(map[string]*NodeTopicDetails), + maxPeers: maxPeers, + wakuprotoToENRFieldMap: map[protocol.ID]WakuProtoInfo{}, } logger.Info("PeerManager init values", zap.Int("maxConnections", maxConnections), zap.Int("maxRelayPeers", maxRelayPeers), @@ -105,6 +116,11 @@ func NewPeerManager(maxConnections int, maxPeers int, logger *zap.Logger) *PeerM return pm } +// SetDiscv5 sets the discoveryv5 service to be used for peer discovery. +func (pm *PeerManager) SetDiscv5(discv5 *discv5.DiscoveryV5) { + pm.discoveryService = discv5 +} + // SetHost sets the host to be used in order to access the peerStore. func (pm *PeerManager) SetHost(host host.Host) { pm.host = host @@ -117,6 +133,9 @@ func (pm *PeerManager) SetPeerConnector(pc *PeerConnectionStrategy) { // Start starts the processing to be done by peer manager. func (pm *PeerManager) Start(ctx context.Context) { + + pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) + pm.ctx = ctx if pm.sub != nil { go pm.peerEventLoop(ctx) @@ -198,7 +217,7 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() { //Find not connected peers. notConnectedPeers := pm.getNotConnectedPers(topicStr) if notConnectedPeers.Len() == 0 { - //TODO: Trigger on-demand discovery for this topic. + pm.discoverPeersByPubsubTopic(topicStr, relay.WakuRelayID_v200, pm.ctx, 2) continue } //Connect to eligible peers. @@ -231,14 +250,14 @@ func (pm *PeerManager) connectToRelayPeers() { // addrInfoToPeerData returns addressinfo for a peer // If addresses are expired, it removes the peer from host peerStore and returns nil. -func addrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host) *PeerData { +func addrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host) *service.PeerData { addrs := host.Peerstore().Addrs(peerID) if len(addrs) == 0 { //Addresses expired, remove peer from peerStore host.Peerstore().RemovePeer(peerID) return nil } - return &PeerData{ + return &service.PeerData{ Origin: origin, AddrInfo: peer.AddrInfo{ ID: peerID, @@ -295,10 +314,42 @@ func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice) { } } +func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID { + shards, err := wenr.RelaySharding(p.ENR.Record()) + if err != nil { + pm.logger.Error("could not derive relayShards from ENR", zap.Error(err), + logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) + } else { + if shards != nil { + p.PubSubTopics = make([]string, 0) + topics := shards.Topics() + for _, topic := range topics { + topicStr := topic.String() + p.PubSubTopics = append(p.PubSubTopics, topicStr) + } + } else { + pm.logger.Debug("ENR doesn't have relay shards", logging.HostID("peer", p.AddrInfo.ID)) + } + } + supportedProtos := []protocol.ID{} + //Identify and specify protocols supported by the peer based on the discovered peer's ENR + var enrField wenr.WakuEnrBitfield + if err := p.ENR.Record().Load(enr.WithEntry(wenr.WakuENRField, &enrField)); err == nil { + for proto, protoENR := range pm.wakuprotoToENRFieldMap { + protoENRField := protoENR.waku2ENRBitField + if protoENRField&enrField != 0 { + supportedProtos = append(supportedProtos, proto) + //Add Service peers to serviceSlots. + pm.addPeerToServiceSlot(proto, p.AddrInfo.ID) + } + } + } + return supportedProtos +} + // AddDiscoveredPeer to add dynamically discovered peers. // Note that these peers will not be set in service-slots. -// TODO: It maybe good to set in service-slots based on services supported in the ENR -func (pm *PeerManager) AddDiscoveredPeer(p PeerData, connectNow bool) { +func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) { //Doing this check again inside addPeer, in order to avoid additional complexity of rollingBack other changes. if pm.maxPeers <= pm.host.Peerstore().Peers().Len() { return @@ -309,27 +360,13 @@ func (pm *PeerManager) AddDiscoveredPeer(p PeerData, connectNow bool) { pm.logger.Debug("Found discovered peer already in peerStore", logging.HostID("peer", p.AddrInfo.ID)) return } - // Try to fetch shard info from ENR to arrive at pubSub topics. + supportedProtos := []protocol.ID{} if len(p.PubSubTopics) == 0 && p.ENR != nil { - shards, err := wenr.RelaySharding(p.ENR.Record()) - if err != nil { - pm.logger.Error("Could not derive relayShards from ENR", zap.Error(err), - logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) - } else { - if shards != nil { - p.PubSubTopics = make([]string, 0) - topics := shards.Topics() - for _, topic := range topics { - topicStr := topic.String() - p.PubSubTopics = append(p.PubSubTopics, topicStr) - } - } else { - pm.logger.Debug("ENR doesn't have relay shards", logging.HostID("peer", p.AddrInfo.ID)) - } - } + // Try to fetch shard info and supported protocols from ENR to arrive at pubSub topics. + supportedProtos = pm.processPeerENR(&p) } - _ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubSubTopics) + _ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubSubTopics, supportedProtos...) if p.ENR != nil { err := pm.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR) @@ -429,199 +466,3 @@ func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) { // getPeers returns nil for WakuRelayIDv200 protocol, but we don't run this ServiceSlot code for WakuRelayIDv200 protocol pm.serviceSlots.getPeers(proto).add(peerID) } - -// SelectPeerByContentTopic is used to return a random peer that supports a given protocol for given contentTopic. -// If a list of specific peers is passed, the peer will be chosen from that list assuming -// it supports the chosen protocol and contentTopic, otherwise it will chose a peer from the service slot. -// If a peer cannot be found in the service slot, a peer will be selected from node peerstore -func (pm *PeerManager) SelectPeerByContentTopic(proto protocol.ID, contentTopic string, specificPeers ...peer.ID) (peer.ID, error) { - pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic) - if err != nil { - return "", err - } - return pm.SelectPeer(PeerSelectionCriteria{PubsubTopic: pubsubTopic, Proto: proto, SpecificPeers: specificPeers}) -} - -// SelectRandomPeer is used to return a random peer that supports a given protocol. -// If a list of specific peers is passed, the peer will be chosen from that list assuming -// it supports the chosen protocol, otherwise it will chose a peer from the service slot. -// If a peer cannot be found in the service slot, a peer will be selected from node peerstore -// if pubSubTopic is specified, peer is selected from list that support the pubSubTopic -func (pm *PeerManager) SelectRandomPeer(criteria PeerSelectionCriteria) (peer.ID, error) { - // @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service. - // Ideally depending on the query and our set of peers we take a subset of ideal peers. - // This will require us to check for various factors such as: - // - which topics they track - // - latency? - - peerID, err := pm.selectServicePeer(criteria.Proto, criteria.PubsubTopic, criteria.SpecificPeers...) - if err == nil { - return peerID, nil - } else if !errors.Is(err, ErrNoPeersAvailable) { - pm.logger.Debug("could not retrieve random peer from slot", zap.String("protocol", string(criteria.Proto)), zap.String("pubsubTopic", criteria.PubsubTopic), zap.Error(err)) - return "", err - } - - // if not found in serviceSlots or proto == WakuRelayIDv200 - filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.Proto) - if err != nil { - return "", err - } - if criteria.PubsubTopic != "" { - filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(criteria.PubsubTopic, filteredPeers...) - } - return selectRandomPeer(filteredPeers, pm.logger) -} - -func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string, specificPeers ...peer.ID) (peer.ID, error) { - //Try to fetch from serviceSlot - if slot := pm.serviceSlots.getPeers(proto); slot != nil { - if pubSubTopic == "" { - return slot.getRandom() - } else { //PubsubTopic based selection - keys := make([]peer.ID, 0, len(slot.m)) - for i := range slot.m { - keys = append(keys, i) - } - selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubSubTopic, keys...) - return selectRandomPeer(selectedPeers, pm.logger) - } - } - - return "", ErrNoPeersAvailable -} - -// PeerSelectionCriteria is the selection Criteria that is used by PeerManager to select peers. -type PeerSelectionCriteria struct { - SelectionType PeerSelection - Proto protocol.ID - PubsubTopic string - SpecificPeers peer.IDSlice - Ctx context.Context -} - -// SelectPeer selects a peer based on selectionType specified. -// Context is required only in case of selectionType set to LowestRTT -func (pm *PeerManager) SelectPeer(criteria PeerSelectionCriteria) (peer.ID, error) { - - switch criteria.SelectionType { - case Automatic: - return pm.SelectRandomPeer(criteria) - case LowestRTT: - if criteria.Ctx == nil { - criteria.Ctx = context.Background() - pm.logger.Warn("context is not passed for peerSelectionwithRTT, using background context") - } - return pm.SelectPeerWithLowestRTT(criteria) - default: - return "", errors.New("unknown peer selection type specified") - } -} - -type pingResult struct { - p peer.ID - rtt time.Duration -} - -// SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time -// If a list of specific peers is passed, the peer will be chosen from that list assuming -// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore -// TO OPTIMIZE: As of now the peer with lowest RTT is identified when select is called, this should be optimized -// to maintain the RTT as part of peer-scoring and just select based on that. -func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) (peer.ID, error) { - var peers peer.IDSlice - var err error - if criteria.Ctx == nil { - criteria.Ctx = context.Background() - } - - if criteria.PubsubTopic != "" { - peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(criteria.PubsubTopic, criteria.SpecificPeers...) - } - - peers, err = pm.FilterPeersByProto(peers, criteria.Proto) - if err != nil { - return "", err - } - wg := sync.WaitGroup{} - waitCh := make(chan struct{}) - pingCh := make(chan pingResult, 1000) - - wg.Add(len(peers)) - - go func() { - for _, p := range peers { - go func(p peer.ID) { - defer wg.Done() - ctx, cancel := context.WithTimeout(criteria.Ctx, 3*time.Second) - defer cancel() - result := <-ping.Ping(ctx, pm.host, p) - if result.Error == nil { - pingCh <- pingResult{ - p: p, - rtt: result.RTT, - } - } else { - pm.logger.Debug("could not ping", logging.HostID("peer", p), zap.Error(result.Error)) - } - }(p) - } - wg.Wait() - close(waitCh) - close(pingCh) - }() - - select { - case <-waitCh: - var min *pingResult - for p := range pingCh { - if min == nil { - min = &p - } else { - if p.rtt < min.rtt { - min = &p - } - } - } - if min == nil { - return "", ErrNoPeersAvailable - } - - return min.p, nil - case <-criteria.Ctx.Done(): - return "", ErrNoPeersAvailable - } -} - -// selectRandomPeer selects randomly a peer from the list of peers passed. -func selectRandomPeer(peers peer.IDSlice, log *zap.Logger) (peer.ID, error) { - if len(peers) >= 1 { - peerID := peers[rand.Intn(len(peers))] - // TODO: proper heuristic here that compares peer scores and selects "best" one. For now a random peer for the given protocol is returned - return peerID, nil // nolint: gosec - } - - return "", ErrNoPeersAvailable -} - -// FilterPeersByProto filters list of peers that support specified protocols. -// If specificPeers is nil, all peers in the host's peerStore are considered for filtering. -func (pm *PeerManager) FilterPeersByProto(specificPeers peer.IDSlice, proto ...protocol.ID) (peer.IDSlice, error) { - peerSet := specificPeers - if len(peerSet) == 0 { - peerSet = pm.host.Peerstore().Peers() - } - - var peers peer.IDSlice - for _, peer := range peerSet { - protocols, err := pm.host.Peerstore().SupportsProtocols(peer, proto...) - if err != nil { - return nil, err - } - - if len(protocols) > 0 { - peers = append(peers, peer) - } - } - return peers, nil -} diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index 385c6f0a..8d90f87c 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -8,13 +8,19 @@ import ( "testing" "time" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/host" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" "github.com/multiformats/go-multiaddr" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/discv5" wps "github.com/waku-org/go-waku/waku/v2/peerstore" wakuproto "github.com/waku-org/go-waku/waku/v2/protocol" + wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/utils" ) @@ -128,27 +134,27 @@ func TestPeerSelection(t *testing.T) { defer h3.Close() protocol := libp2pProtocol.ID("test/protocol") - _, err = pm.AddPeer(getAddr(h2), wps.Static, []string{"/waku/rs/2/1", "/waku/rs/2/2"}, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer(getAddr(h2), wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, libp2pProtocol.ID(protocol)) require.NoError(t, err) - _, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/rs/2/1"}, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol)) require.NoError(t, err) _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol}) require.NoError(t, err) - peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/rs/2/2"}) + peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/2/rs/2/2"}) require.NoError(t, err) require.Equal(t, h2.ID(), peerID) - _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/rs/2/3"}) + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/2/rs/2/3"}) require.Error(t, ErrNoPeersAvailable, err) - _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/rs/2/1"}) + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/2/rs/2/1"}) require.NoError(t, err) //Test for selectWithLowestRTT - _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopic: "/waku/rs/2/1"}) + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopic: "/waku/2/rs/2/1"}) require.NoError(t, err) } @@ -208,7 +214,6 @@ func TestConnectToRelayPeers(t *testing.T) { ctx, pm, deferFn := initTest(t) pc, err := NewPeerConnectionStrategy(pm, 120*time.Second, pm.logger) require.NoError(t, err) - pm.SetPeerConnector(pc) err = pc.Start(ctx) require.NoError(t, err) pm.Start(ctx) @@ -218,3 +223,92 @@ func TestConnectToRelayPeers(t *testing.T) { pm.connectToRelayPeers() } + +func createHostWithDiscv5AndPM(t *testing.T, hostName string, topic string, enrField uint8, bootnode ...*enode.Node) (host.Host, *PeerManager, *discv5.DiscoveryV5) { + ps, err := pstoremem.NewPeerstore() + require.NoError(t, err) + wakuPeerStore := wps.NewWakuPeerstore(ps) + + host, _, prvKey1 := tests.CreateHost(t, libp2p.Peerstore(wakuPeerStore)) + + logger := utils.Logger().Named(hostName) + + udpPort, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3) + require.NoError(t, err) + ip1, _ := tests.ExtractIP(host.Addrs()[0]) + localNode, err := tests.NewLocalnode(prvKey1, ip1, udpPort, enrField, nil, logger) + require.NoError(t, err) + + rs, err := wakuproto.TopicsToRelayShards(topic) + require.NoError(t, err) + + err = wenr.Update(localNode, wenr.WithWakuRelaySharding(rs[0])) + require.NoError(t, err) + pm := NewPeerManager(10, 20, logger) + pm.SetHost(host) + peerconn, err := NewPeerConnectionStrategy(pm, 30*time.Second, logger) + require.NoError(t, err) + discv5, err := discv5.NewDiscoveryV5(prvKey1, localNode, peerconn, prometheus.DefaultRegisterer, logger, discv5.WithUDPPort(uint(udpPort)), discv5.WithBootnodes(bootnode)) + require.NoError(t, err) + discv5.SetHost(host) + pm.SetDiscv5(discv5) + pm.SetPeerConnector(peerconn) + + return host, pm, discv5 +} + +func TestOnDemandPeerDiscovery(t *testing.T) { + topic := "/waku/2/rs/1/1" + + // Host1 <-> Host2 <-> Host3 + host1, _, d1 := createHostWithDiscv5AndPM(t, "host1", topic, wenr.NewWakuEnrBitfield(true, true, false, true)) + + host2, _, d2 := createHostWithDiscv5AndPM(t, "host2", topic, wenr.NewWakuEnrBitfield(false, true, true, true), d1.Node()) + host3, pm3, d3 := createHostWithDiscv5AndPM(t, "host3", topic, wenr.NewWakuEnrBitfield(true, true, true, true), d2.Node()) + + defer d1.Stop() + defer d2.Stop() + defer d3.Stop() + + defer host1.Close() + defer host2.Close() + defer host3.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + err := d1.Start(ctx) + require.NoError(t, err) + + err = d2.Start(ctx) + require.NoError(t, err) + + err = d3.Start(ctx) + require.NoError(t, err) + + //Discovery should fail for non-waku protocol + _, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopic: topic, Proto: "/test"}) + require.Error(t, err) + + _, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: "/test"}) + require.Error(t, err) + + ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + var enrField uint8 + enrField |= (1 << 1) + pm3.RegisterWakuProtocol("/vac/waku/store/2.0.0-beta4", enrField) + peerID, err := pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopic: topic, Proto: "/vac/waku/store/2.0.0-beta4", Ctx: ctx}) + require.NoError(t, err) + require.Equal(t, peerID, host2.ID()) + + var enrField1 uint8 + + enrField1 |= (1 << 3) + pm3.RegisterWakuProtocol("/vac/waku/lightpush/2.0.0-beta1", enrField1) + peerID, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopic: topic, Proto: "/vac/waku/lightpush/2.0.0-beta1", Ctx: ctx}) + require.NoError(t, err) + require.Equal(t, peerID, host1.ID()) + +} diff --git a/waku/v2/peermanager/peer_selection.go b/waku/v2/peermanager/peer_selection.go new file mode 100644 index 00000000..e785c177 --- /dev/null +++ b/waku/v2/peermanager/peer_selection.go @@ -0,0 +1,227 @@ +package peermanager + +import ( + "context" + "errors" + "math/rand" + "sync" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "github.com/waku-org/go-waku/logging" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" + waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" + "go.uber.org/zap" +) + +// SelectPeerByContentTopic is used to return a random peer that supports a given protocol for given contentTopic. +// If a list of specific peers is passed, the peer will be chosen from that list assuming +// it supports the chosen protocol and contentTopic, otherwise it will chose a peer from the service slot. +// If a peer cannot be found in the service slot, a peer will be selected from node peerstore +func (pm *PeerManager) SelectPeerByContentTopic(proto protocol.ID, contentTopic string, specificPeers ...peer.ID) (peer.ID, error) { + pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic) + if err != nil { + return "", err + } + return pm.SelectPeer(PeerSelectionCriteria{PubsubTopic: pubsubTopic, Proto: proto, SpecificPeers: specificPeers}) +} + +// SelectRandomPeer is used to return a random peer that supports a given protocol. +// If a list of specific peers is passed, the peer will be chosen from that list assuming +// it supports the chosen protocol, otherwise it will chose a peer from the service slot. +// If a peer cannot be found in the service slot, a peer will be selected from node peerstore +// if pubSubTopic is specified, peer is selected from list that support the pubSubTopic +func (pm *PeerManager) SelectRandomPeer(criteria PeerSelectionCriteria) (peer.ID, error) { + // @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service. + // Ideally depending on the query and our set of peers we take a subset of ideal peers. + // This will require us to check for various factors such as: + // - which topics they track + // - latency? + + peerID, err := pm.selectServicePeer(criteria.Proto, criteria.PubsubTopic, criteria.Ctx, criteria.SpecificPeers...) + if err == nil { + return peerID, nil + } else if !errors.Is(err, ErrNoPeersAvailable) { + pm.logger.Debug("could not retrieve random peer from slot", zap.String("protocol", string(criteria.Proto)), + zap.String("pubsubTopic", criteria.PubsubTopic), zap.Error(err)) + return "", err + } + + // if not found in serviceSlots or proto == WakuRelayIDv200 + filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.Proto) + if err != nil { + return "", err + } + if criteria.PubsubTopic != "" { + filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(criteria.PubsubTopic, filteredPeers...) + } + return selectRandomPeer(filteredPeers, pm.logger) +} + +func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubsubTopic string, ctx context.Context, specificPeers ...peer.ID) (peer.ID, error) { + var peerID peer.ID + var err error + for retryCnt := 0; retryCnt < 1; retryCnt++ { + //Try to fetch from serviceSlot + if slot := pm.serviceSlots.getPeers(proto); slot != nil { + if pubsubTopic == "" { + return slot.getRandom() + } else { //PubsubTopic based selection + keys := make([]peer.ID, 0, len(slot.m)) + for i := range slot.m { + keys = append(keys, i) + } + selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubsubTopic, keys...) + peerID, err = selectRandomPeer(selectedPeers, pm.logger) + if err == nil { + return peerID, nil + } else { + pm.logger.Debug("Discovering peers by pubsubTopic", zap.String("pubsubTopic", pubsubTopic)) + //Trigger on-demand discovery for this topic and connect to peer immediately. + //For now discover atleast 1 peer for the criteria + pm.discoverPeersByPubsubTopic(pubsubTopic, proto, ctx, 1) + //Try to fetch peers again. + continue + } + } + } + } + if peerID == "" { + pm.logger.Debug("could not retrieve random peer from slot", zap.Error(err)) + } + return "", ErrNoPeersAvailable +} + +// PeerSelectionCriteria is the selection Criteria that is used by PeerManager to select peers. +type PeerSelectionCriteria struct { + SelectionType PeerSelection + Proto protocol.ID + PubsubTopic string + SpecificPeers peer.IDSlice + Ctx context.Context +} + +// SelectPeer selects a peer based on selectionType specified. +// Context is required only in case of selectionType set to LowestRTT +func (pm *PeerManager) SelectPeer(criteria PeerSelectionCriteria) (peer.ID, error) { + + switch criteria.SelectionType { + case Automatic: + return pm.SelectRandomPeer(criteria) + case LowestRTT: + return pm.SelectPeerWithLowestRTT(criteria) + default: + return "", errors.New("unknown peer selection type specified") + } +} + +type pingResult struct { + p peer.ID + rtt time.Duration +} + +// SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time +// If a list of specific peers is passed, the peer will be chosen from that list assuming +// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore +// TO OPTIMIZE: As of now the peer with lowest RTT is identified when select is called, this should be optimized +// to maintain the RTT as part of peer-scoring and just select based on that. +func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) (peer.ID, error) { + var peers peer.IDSlice + var err error + if criteria.Ctx == nil { + pm.logger.Warn("context is not passed for peerSelectionwithRTT, using background context") + criteria.Ctx = context.Background() + } + + if criteria.PubsubTopic != "" { + peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(criteria.PubsubTopic, criteria.SpecificPeers...) + } + + peers, err = pm.FilterPeersByProto(peers, criteria.Proto) + if err != nil { + return "", err + } + wg := sync.WaitGroup{} + waitCh := make(chan struct{}) + pingCh := make(chan pingResult, 1000) + + wg.Add(len(peers)) + + go func() { + for _, p := range peers { + go func(p peer.ID) { + defer wg.Done() + ctx, cancel := context.WithTimeout(criteria.Ctx, 3*time.Second) + defer cancel() + result := <-ping.Ping(ctx, pm.host, p) + if result.Error == nil { + pingCh <- pingResult{ + p: p, + rtt: result.RTT, + } + } else { + pm.logger.Debug("could not ping", logging.HostID("peer", p), zap.Error(result.Error)) + } + }(p) + } + wg.Wait() + close(waitCh) + close(pingCh) + }() + + select { + case <-waitCh: + var min *pingResult + for p := range pingCh { + if min == nil { + min = &p + } else { + if p.rtt < min.rtt { + min = &p + } + } + } + if min == nil { + return "", ErrNoPeersAvailable + } + + return min.p, nil + case <-criteria.Ctx.Done(): + return "", ErrNoPeersAvailable + } +} + +// selectRandomPeer selects randomly a peer from the list of peers passed. +func selectRandomPeer(peers peer.IDSlice, log *zap.Logger) (peer.ID, error) { + if len(peers) >= 1 { + peerID := peers[rand.Intn(len(peers))] + // TODO: proper heuristic here that compares peer scores and selects "best" one. For now a random peer for the given protocol is returned + return peerID, nil // nolint: gosec + } + + return "", ErrNoPeersAvailable +} + +// FilterPeersByProto filters list of peers that support specified protocols. +// If specificPeers is nil, all peers in the host's peerStore are considered for filtering. +func (pm *PeerManager) FilterPeersByProto(specificPeers peer.IDSlice, proto ...protocol.ID) (peer.IDSlice, error) { + peerSet := specificPeers + if len(peerSet) == 0 { + peerSet = pm.host.Peerstore().Peers() + } + + var peers peer.IDSlice + for _, peer := range peerSet { + protocols, err := pm.host.Peerstore().SupportsProtocols(peer, proto...) + if err != nil { + return nil, err + } + + if len(protocols) > 0 { + peers = append(peers, peer) + } + } + return peers, nil +} diff --git a/waku/v2/protocol/enr/enr.go b/waku/v2/protocol/enr/enr.go index 8e330a96..7f8d9e56 100644 --- a/waku/v2/protocol/enr/enr.go +++ b/waku/v2/protocol/enr/enr.go @@ -146,6 +146,8 @@ func EnodeToPeerInfo(node *enode.Node) (*peer.AddrInfo, error) { if err != nil { return nil, err } - + if len(res) == 0 { + return nil, errors.New("could not retrieve peer addresses from enr") + } return &res[0], nil } diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 1dfb6e72..eaefeafa 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -23,6 +23,7 @@ import ( wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/subscription" + "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/timesource" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -37,7 +38,7 @@ var ( ) type WakuFilterLightNode struct { - *protocol.CommonService + *service.CommonService h host.Host broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer.s timesource timesource.Timesource @@ -79,7 +80,7 @@ func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerM wf.broadcaster = broadcaster wf.timesource = timesource wf.pm = pm - wf.CommonService = protocol.NewCommonService() + wf.CommonService = service.NewCommonService() wf.metrics = newMetrics(reg) return wf diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index 334e9f8b..2ff583af 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -19,6 +19,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/subscription" + "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -408,7 +409,7 @@ func (s *FilterTestSuite) TestRunningGuard() { _, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) - s.Require().ErrorIs(err, protocol.ErrNotStarted) + s.Require().ErrorIs(err, service.ErrNotStarted) err = s.lightNode.Start(s.ctx) s.Require().NoError(err) @@ -454,7 +455,7 @@ func (s *FilterTestSuite) TestStartStop() { startNode := func() { for i := 0; i < 100; i++ { err := s.lightNode.Start(context.Background()) - if errors.Is(err, protocol.ErrAlreadyStarted) { + if errors.Is(err, service.ErrAlreadyStarted) { continue } s.Require().NoError(err) diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index f7f46e92..9495ee35 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -51,6 +51,7 @@ type ( FilterParameters struct { Timeout time.Duration MaxSubscribers int + pm *peermanager.PeerManager } Option func(*FilterParameters) @@ -156,6 +157,12 @@ func WithMaxSubscribers(maxSubscribers int) Option { } } +func WithPeerManager(pm *peermanager.PeerManager) Option { + return func(params *FilterParameters) { + params.pm = pm + } +} + func DefaultOptions() []Option { return []Option{ WithTimeout(24 * time.Hour), diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index bed0503d..2352a54c 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -17,6 +17,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/timesource" "go.uber.org/zap" ) @@ -24,7 +25,7 @@ import ( // FilterSubscribeID_v20beta1 is the current Waku Filter protocol identifier for servers to // allow filter clients to subscribe, modify, refresh and unsubscribe a desired set of filter criteria const FilterSubscribeID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter-subscribe/2.0.0-beta1") - +const FilterSubscribeENRField = uint8(1 << 2) const peerHasNoSubscription = "peer has no subscriptions" type ( @@ -33,7 +34,7 @@ type ( msgSub *relay.Subscription metrics Metrics log *zap.Logger - *protocol.CommonService + *service.CommonService subscriptions *SubscribersMap maxSubscriptions int @@ -52,11 +53,13 @@ func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Regi opt(params) } - wf.CommonService = protocol.NewCommonService() + wf.CommonService = service.NewCommonService() wf.metrics = newMetrics(reg) wf.subscriptions = NewSubscribersMap(params.Timeout) wf.maxSubscriptions = params.MaxSubscribers - + if params.pm != nil { + params.pm.RegisterWakuProtocol(FilterSubscribeID_v20beta1, FilterSubscribeENRField) + } return wf } diff --git a/waku/v2/protocol/legacy_filter/waku_filter.go b/waku/v2/protocol/legacy_filter/waku_filter.go index 1e5d92d5..cb8aa6fe 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter.go +++ b/waku/v2/protocol/legacy_filter/waku_filter.go @@ -18,6 +18,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/timesource" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -47,7 +48,7 @@ type ( } WakuFilter struct { - *protocol.CommonService + *service.CommonService h host.Host pm *peermanager.PeerManager isFullNode bool @@ -76,7 +77,7 @@ func NewWakuFilter(broadcaster relay.Broadcaster, isFullNode bool, timesource ti } wf.isFullNode = isFullNode - wf.CommonService = protocol.NewCommonService() + wf.CommonService = service.NewCommonService() wf.filters = NewFilterMap(broadcaster, timesource) wf.subscribers = NewSubscribers(params.Timeout) wf.metrics = newMetrics(reg) diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 3439c17b..efd1ced4 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -23,6 +23,7 @@ import ( // LightPushID_v20beta1 is the current Waku LightPush protocol identifier const LightPushID_v20beta1 = libp2pProtocol.ID("/vac/waku/lightpush/2.0.0-beta1") +const LightPushENRField = uint8(1 << 3) var ( ErrNoPeersAvailable = errors.New("no suitable remote peers") @@ -49,6 +50,7 @@ func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, reg p wakuLP.log = log.Named("lightpush") wakuLP.pm = pm wakuLP.metrics = newMetrics(reg) + return wakuLP } @@ -69,6 +71,9 @@ func (wakuLP *WakuLightPush) Start(ctx context.Context) error { wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest(ctx)) wakuLP.log.Info("Light Push protocol started") + if wakuLP.pm != nil { + wakuLP.pm.RegisterWakuProtocol(LightPushID_v20beta1, LightPushENRField) + } return nil } diff --git a/waku/v2/protocol/peer_exchange/client.go b/waku/v2/protocol/peer_exchange/client.go index f1ba7c39..111dc9fd 100644 --- a/waku/v2/protocol/peer_exchange/client.go +++ b/waku/v2/protocol/peer_exchange/client.go @@ -14,6 +14,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/peerstore" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb" + "github.com/waku-org/go-waku/waku/v2/service" "go.uber.org/zap" ) @@ -124,11 +125,11 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb go func() { defer wakuPX.WaitGroup().Done() - peerCh := make(chan peermanager.PeerData) + peerCh := make(chan service.PeerData) defer close(peerCh) wakuPX.peerConnector.Subscribe(ctx, peerCh) for _, p := range discoveredPeers { - peer := peermanager.PeerData{ + peer := service.PeerData{ Origin: peerstore.PeerExchange, AddrInfo: p.addrInfo, ENR: p.enr, diff --git a/waku/v2/protocol/peer_exchange/protocol.go b/waku/v2/protocol/peer_exchange/protocol.go index 8230abaa..5b1834f1 100644 --- a/waku/v2/protocol/peer_exchange/protocol.go +++ b/waku/v2/protocol/peer_exchange/protocol.go @@ -18,6 +18,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb" + "github.com/waku-org/go-waku/waku/v2/service" "go.uber.org/zap" ) @@ -32,7 +33,7 @@ var ( // PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol type PeerConnector interface { - Subscribe(context.Context, <-chan peermanager.PeerData) + Subscribe(context.Context, <-chan service.PeerData) } type WakuPeerExchange struct { @@ -42,7 +43,7 @@ type WakuPeerExchange struct { metrics Metrics log *zap.Logger - *protocol.CommonService + *service.CommonService peerConnector PeerConnector enrCache *enrCache @@ -63,7 +64,7 @@ func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, wakuPX.enrCache = newEnrCache wakuPX.peerConnector = peerConnector wakuPX.pm = pm - wakuPX.CommonService = protocol.NewCommonService() + wakuPX.CommonService = service.NewCommonService() return wakuPX, nil } diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index ca64b55f..5d2fbaa1 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -2,131 +2,46 @@ package peer_exchange import ( "context" - "crypto/ecdsa" - "fmt" - "math" - "net" - "strconv" "testing" "time" - gcrypto "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/p2p/enr" - "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peerstore" - "github.com/multiformats/go-multiaddr" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/discv5" - "github.com/waku-org/go-waku/waku/v2/peermanager" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/utils" - "go.uber.org/zap" - - libp2pcrypto "github.com/libp2p/go-libp2p/core/crypto" ) -func createHost(t *testing.T) (host.Host, int, *ecdsa.PrivateKey) { - privKey, err := gcrypto.GenerateKey() - require.NoError(t, err) - - sPrivKey := libp2pcrypto.PrivKey(utils.EcdsaPrivKeyToSecp256k1PrivKey(privKey)) - - port, err := tests.FindFreePort(t, "127.0.0.1", 3) - require.NoError(t, err) - - sourceMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)) - require.NoError(t, err) - - host, err := libp2p.New( - libp2p.ListenAddrs(sourceMultiAddr), - libp2p.Identity(sPrivKey), - ) - require.NoError(t, err) - - return host, port, privKey -} - -func extractIP(addr multiaddr.Multiaddr) (*net.TCPAddr, error) { - ipStr, err := addr.ValueForProtocol(multiaddr.P_IP4) - if err != nil { - return nil, err - } - - portStr, err := addr.ValueForProtocol(multiaddr.P_TCP) - if err != nil { - return nil, err - } - port, err := strconv.Atoi(portStr) - if err != nil { - return nil, err - } - return &net.TCPAddr{ - IP: net.ParseIP(ipStr), - Port: port, - }, nil -} - -func newLocalnode(priv *ecdsa.PrivateKey, ipAddr *net.TCPAddr, udpPort int, wakuFlags wenr.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.Logger) (*enode.LocalNode, error) { - db, err := enode.OpenDB("") - if err != nil { - return nil, err - } - localnode := enode.NewLocalNode(db, priv) - localnode.SetFallbackUDP(udpPort) - localnode.Set(enr.WithEntry(wenr.WakuENRField, wakuFlags)) - localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) - localnode.SetStaticIP(ipAddr.IP) - - if udpPort > 0 && udpPort <= math.MaxUint16 { - localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion] - } else { - log.Error("setting udpPort", zap.Int("port", udpPort)) - } - - if ipAddr.Port > 0 && ipAddr.Port <= math.MaxUint16 { - localnode.Set(enr.TCP(uint16(ipAddr.Port))) // lgtm [go/incorrect-integer-conversion] - } else { - log.Error("setting tcpPort", zap.Int("port", ipAddr.Port)) - } - - if advertiseAddr != nil { - localnode.SetStaticIP(*advertiseAddr) - } - - return localnode, nil -} - func TestRetrieveProvidePeerExchangePeers(t *testing.T) { // H1 - host1, _, prvKey1 := createHost(t) + host1, _, prvKey1 := tests.CreateHost(t) udpPort1, err := tests.FindFreePort(t, "127.0.0.1", 3) require.NoError(t, err) - ip1, _ := extractIP(host1.Addrs()[0]) - l1, err := newLocalnode(prvKey1, ip1, udpPort1, wenr.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger()) + ip1, _ := tests.ExtractIP(host1.Addrs()[0]) + l1, err := tests.NewLocalnode(prvKey1, ip1, udpPort1, wenr.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger()) require.NoError(t, err) - discv5PeerConn1 := peermanager.NewTestPeerDiscoverer() + discv5PeerConn1 := discv5.NewTestPeerDiscoverer() d1, err := discv5.NewDiscoveryV5(prvKey1, l1, discv5PeerConn1, prometheus.DefaultRegisterer, utils.Logger(), discv5.WithUDPPort(uint(udpPort1))) require.NoError(t, err) d1.SetHost(host1) // H2 - host2, _, prvKey2 := createHost(t) - ip2, _ := extractIP(host2.Addrs()[0]) + host2, _, prvKey2 := tests.CreateHost(t) + ip2, _ := tests.ExtractIP(host2.Addrs()[0]) udpPort2, err := tests.FindFreePort(t, "127.0.0.1", 3) require.NoError(t, err) - l2, err := newLocalnode(prvKey2, ip2, udpPort2, wenr.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger()) + l2, err := tests.NewLocalnode(prvKey2, ip2, udpPort2, wenr.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger()) require.NoError(t, err) - discv5PeerConn2 := peermanager.NewTestPeerDiscoverer() + discv5PeerConn2 := discv5.NewTestPeerDiscoverer() d2, err := discv5.NewDiscoveryV5(prvKey2, l2, discv5PeerConn2, prometheus.DefaultRegisterer, utils.Logger(), discv5.WithUDPPort(uint(udpPort2)), discv5.WithBootnodes([]*enode.Node{d1.Node()})) require.NoError(t, err) d2.SetHost(host2) // H3 - host3, _, _ := createHost(t) + host3, _, _ := tests.CreateHost(t) defer d1.Stop() defer d2.Stop() @@ -143,12 +58,12 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) { time.Sleep(3 * time.Second) // Wait some time for peers to be discovered // mount peer exchange - pxPeerConn1 := peermanager.NewTestPeerDiscoverer() + pxPeerConn1 := discv5.NewTestPeerDiscoverer() px1, err := NewWakuPeerExchange(d1, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger()) require.NoError(t, err) px1.SetHost(host1) - pxPeerConn3 := peermanager.NewTestPeerDiscoverer() + pxPeerConn3 := discv5.NewTestPeerDiscoverer() px3, err := NewWakuPeerExchange(nil, pxPeerConn3, nil, prometheus.DefaultRegisterer, utils.Logger()) require.NoError(t, err) px3.SetHost(host3) diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index c195ae07..d9047d8b 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -18,11 +18,13 @@ import ( "github.com/waku-org/go-waku/logging" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/timesource" ) // WakuRelayID_v200 is the current protocol ID used for WakuRelay const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0") +const WakuRelayENRField = uint8(1 << 0) // DefaultWakuTopic is the default pubsub topic used across all Waku protocols var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic{}.String() @@ -62,7 +64,7 @@ type WakuRelay struct { EvtPeerTopic event.Emitter } contentSubs map[string]map[int]*Subscription - *waku_proto.CommonService + *service.CommonService } // NewWakuRelay returns a new instance of a WakuRelay struct @@ -76,7 +78,7 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou w.topicValidators = make(map[string][]validatorFn) w.bcaster = bcaster w.minPeersToPublish = minPeersToPublish - w.CommonService = waku_proto.NewCommonService() + w.CommonService = service.NewCommonService() w.log = log.Named("relay") w.events = eventbus.NewBus() w.metrics = newMetrics(reg, w.log) diff --git a/waku/v2/protocol/store/waku_store_common.go b/waku/v2/protocol/store/waku_store_common.go index 33d7a6a4..1781d3e3 100644 --- a/waku/v2/protocol/store/waku_store_common.go +++ b/waku/v2/protocol/store/waku_store_common.go @@ -16,6 +16,7 @@ import ( // StoreID_v20beta4 is the current Waku Store protocol identifier const StoreID_v20beta4 = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta4") +const StoreENRField = uint8(1 << 1) // MaxPageSize is the maximum number of waku messages to return per page const MaxPageSize = 20 @@ -64,5 +65,8 @@ func NewWakuStore(p MessageProvider, pm *peermanager.PeerManager, timesource tim wakuStore.pm = pm wakuStore.metrics = newMetrics(reg) + if pm != nil { + pm.RegisterWakuProtocol(StoreID_v20beta4, StoreENRField) + } return wakuStore } diff --git a/waku/v2/rendezvous/rendezvous.go b/waku/v2/rendezvous/rendezvous.go index db48faec..a53a74a4 100644 --- a/waku/v2/rendezvous/rendezvous.go +++ b/waku/v2/rendezvous/rendezvous.go @@ -8,9 +8,9 @@ import ( "github.com/libp2p/go-libp2p/core/host" rvs "github.com/waku-org/go-libp2p-rendezvous" - "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/service" "go.uber.org/zap" ) @@ -31,12 +31,12 @@ type Rendezvous struct { peerConnector PeerConnector log *zap.Logger - *peermanager.CommonDiscoveryService + *service.CommonDiscoveryService } // PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol type PeerConnector interface { - Subscribe(context.Context, <-chan peermanager.PeerData) + Subscribe(context.Context, <-chan service.PeerData) } // NewRendezvous creates an instance of Rendezvous struct @@ -46,7 +46,7 @@ func NewRendezvous(db *DB, peerConnector PeerConnector, log *zap.Logger) *Rendez db: db, peerConnector: peerConnector, log: logger, - CommonDiscoveryService: peermanager.NewCommonDiscoveryService(), + CommonDiscoveryService: service.NewCommonDiscoveryService(), } } @@ -104,7 +104,7 @@ func (r *Rendezvous) DiscoverWithNamespace(ctx context.Context, namespace string rp.SetSuccess(cookie) for _, p := range addrInfo { - peer := peermanager.PeerData{ + peer := service.PeerData{ Origin: peerstore.Rendezvous, AddrInfo: p, PubSubTopics: []string{namespace}, diff --git a/waku/v2/rendezvous/rendezvous_test.go b/waku/v2/rendezvous/rendezvous_test.go index a2b1d881..3aeb6685 100644 --- a/waku/v2/rendezvous/rendezvous_test.go +++ b/waku/v2/rendezvous/rendezvous_test.go @@ -14,16 +14,16 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/persistence/sqlite" - "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/utils" ) type PeerConn struct { sync.RWMutex - ch <-chan peermanager.PeerData + ch <-chan service.PeerData } -func (p *PeerConn) Subscribe(ctx context.Context, ch <-chan peermanager.PeerData) { +func (p *PeerConn) Subscribe(ctx context.Context, ch <-chan service.PeerData) { p.Lock() p.ch = ch p.Unlock() diff --git a/waku/v2/peermanager/common_discovery_service.go b/waku/v2/service/common_discovery_service.go similarity index 93% rename from waku/v2/peermanager/common_discovery_service.go rename to waku/v2/service/common_discovery_service.go index 0fae5fb5..72bf96f1 100644 --- a/waku/v2/peermanager/common_discovery_service.go +++ b/waku/v2/service/common_discovery_service.go @@ -1,4 +1,4 @@ -package peermanager +package service import ( "context" @@ -7,7 +7,6 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p/core/peer" wps "github.com/waku-org/go-waku/waku/v2/peerstore" - "github.com/waku-org/go-waku/waku/v2/protocol" ) // PeerData contains information about a peer useful in establishing connections with it. @@ -19,13 +18,13 @@ type PeerData struct { } type CommonDiscoveryService struct { - commonService *protocol.CommonService + commonService *CommonService channel chan PeerData } func NewCommonDiscoveryService() *CommonDiscoveryService { return &CommonDiscoveryService{ - commonService: protocol.NewCommonService(), + commonService: NewCommonService(), } } diff --git a/waku/v2/protocol/common_service.go b/waku/v2/service/common_service.go similarity index 99% rename from waku/v2/protocol/common_service.go rename to waku/v2/service/common_service.go index 65746961..9bf3ea12 100644 --- a/waku/v2/protocol/common_service.go +++ b/waku/v2/service/common_service.go @@ -1,4 +1,4 @@ -package protocol +package service import ( "context" diff --git a/waku/v2/protocol/common_service_test.go b/waku/v2/service/common_service_test.go similarity index 96% rename from waku/v2/protocol/common_service_test.go rename to waku/v2/service/common_service_test.go index cd707e11..db81043b 100644 --- a/waku/v2/protocol/common_service_test.go +++ b/waku/v2/service/common_service_test.go @@ -1,4 +1,4 @@ -package protocol +package service import ( "context"