diff --git a/cmd/waku/node.go b/cmd/waku/node.go index c1e8dff7..73c21675 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -300,26 +300,30 @@ func Execute(options NodeOptions) { utils.Logger().Info("Version details ", zap.String("version", node.Version), zap.String("commit", node.GitCommit)) failOnErr(err, "Wakunode") - - if options.Filter.UseV1 { - addStaticPeers(wakuNode, options.Filter.NodesV1, legacy_filter.FilterID_v20beta1) - } - //Process pubSub and contentTopics specified and arrive at all corresponding pubSubTopics pubSubTopicMap := processTopics(options) + pubSubTopicMapKeys := make([]string, 0, len(pubSubTopicMap)) + for k := range pubSubTopicMap { + pubSubTopicMapKeys = append(pubSubTopicMapKeys, k) + } + + if options.Filter.UseV1 { + addStaticPeers(wakuNode, options.Filter.NodesV1, pubSubTopicMapKeys, legacy_filter.FilterID_v20beta1) + } if err = wakuNode.Start(ctx); err != nil { logger.Fatal("starting waku node", zap.Error(err)) } for _, d := range discoveredNodes { - wakuNode.AddDiscoveredPeer(d.PeerID, d.PeerInfo.Addrs, wakupeerstore.DNSDiscovery) + wakuNode.AddDiscoveredPeer(d.PeerID, d.PeerInfo.Addrs, wakupeerstore.DNSDiscovery, nil) } - addStaticPeers(wakuNode, options.Store.Nodes, store.StoreID_v20beta4) - addStaticPeers(wakuNode, options.LightPush.Nodes, lightpush.LightPushID_v20beta1) - addStaticPeers(wakuNode, options.Rendezvous.Nodes, rendezvous.RendezvousID) - addStaticPeers(wakuNode, options.Filter.Nodes, filter.FilterSubscribeID_v20beta1) + //For now assuming that static peers added support/listen on all topics specified via commandLine. + addStaticPeers(wakuNode, options.Store.Nodes, pubSubTopicMapKeys, store.StoreID_v20beta4) + addStaticPeers(wakuNode, options.LightPush.Nodes, pubSubTopicMapKeys, lightpush.LightPushID_v20beta1) + addStaticPeers(wakuNode, options.Rendezvous.Nodes, pubSubTopicMapKeys, rendezvous.RendezvousID) + addStaticPeers(wakuNode, options.Filter.Nodes, pubSubTopicMapKeys, filter.FilterSubscribeID_v20beta1) var wg sync.WaitGroup @@ -409,7 +413,8 @@ func Execute(options NodeOptions) { if options.PeerExchange.Enable && options.PeerExchange.Node != nil { logger.Info("retrieving peer info via peer exchange protocol") - peerID, err := wakuNode.AddPeer(*options.PeerExchange.Node, wakupeerstore.Static, peer_exchange.PeerExchangeID_v20alpha1) + peerID, err := wakuNode.AddPeer(*options.PeerExchange.Node, wakupeerstore.Static, + pubSubTopicMapKeys, peer_exchange.PeerExchangeID_v20alpha1) if err != nil { logger.Error("adding peer exchange peer", logging.MultiAddrs("node", *options.PeerExchange.Node), zap.Error(err)) } else { @@ -510,9 +515,9 @@ func processTopics(options NodeOptions) map[string]struct{} { return pubSubTopicMap } -func addStaticPeers(wakuNode *node.WakuNode, addresses []multiaddr.Multiaddr, protocols ...protocol.ID) { +func addStaticPeers(wakuNode *node.WakuNode, addresses []multiaddr.Multiaddr, pubSubTopics []string, protocols ...protocol.ID) { for _, addr := range addresses { - _, err := wakuNode.AddPeer(addr, wakupeerstore.Static, protocols...) + _, err := wakuNode.AddPeer(addr, wakupeerstore.Static, pubSubTopics, protocols...) failOnErr(err, "error adding peer") } } diff --git a/cmd/waku/server/rest/store.go b/cmd/waku/server/rest/store.go index e457af58..280669e2 100644 --- a/cmd/waku/server/rest/store.go +++ b/cmd/waku/server/rest/store.go @@ -197,7 +197,7 @@ func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) defer cancel() - _, err = d.node.AddPeer(peerAddr, peerstore.Static) + _, err = d.node.AddPeer(peerAddr, peerstore.Static, d.node.Relay().Topics()) if err != nil { writeStoreError(w, http.StatusInternalServerError, err) return diff --git a/cmd/waku/server/rpc/filter_test.go b/cmd/waku/server/rpc/filter_test.go index 202cc113..4d2b7c11 100644 --- a/cmd/waku/server/rpc/filter_test.go +++ b/cmd/waku/server/rpc/filter_test.go @@ -84,7 +84,7 @@ func TestFilterSubscription(t *testing.T) { break } - _, err = d.node.AddPeer(addr, peerstore.Static, legacy_filter.FilterID_v20beta1) + _, err = d.node.AddPeer(addr, peerstore.Static, []string{testTopic}, legacy_filter.FilterID_v20beta1) require.NoError(t, err) args := &FilterContentArgs{Topic: testTopic, ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: "ct"}}} diff --git a/examples/chat2/exec.go b/examples/chat2/exec.go index 45caa49b..266b0315 100644 --- a/examples/chat2/exec.go +++ b/examples/chat2/exec.go @@ -77,19 +77,19 @@ func execute(options Options) { return } - err = addPeer(wakuNode, options.Store.Node, store.StoreID_v20beta4) + err = addPeer(wakuNode, options.Store.Node, options.Relay.Topics.Value(), store.StoreID_v20beta4) if err != nil { fmt.Println(err.Error()) return } - err = addPeer(wakuNode, options.LightPush.Node, lightpush.LightPushID_v20beta1) + err = addPeer(wakuNode, options.LightPush.Node, options.Relay.Topics.Value(), lightpush.LightPushID_v20beta1) if err != nil { fmt.Println(err.Error()) return } - err = addPeer(wakuNode, options.Filter.Node, filter.FilterSubscribeID_v20beta1) + err = addPeer(wakuNode, options.Filter.Node, options.Relay.Topics.Value(), filter.FilterSubscribeID_v20beta1) if err != nil { fmt.Println(err.Error()) @@ -113,10 +113,10 @@ func execute(options Options) { chat.Stop() } -func addPeer(wakuNode *node.WakuNode, addr *multiaddr.Multiaddr, protocols ...protocol.ID) error { +func addPeer(wakuNode *node.WakuNode, addr *multiaddr.Multiaddr, topics []string, protocols ...protocol.ID) error { if addr == nil { return nil } - _, err := wakuNode.AddPeer(*addr, peerstore.Static, protocols...) + _, err := wakuNode.AddPeer(*addr, peerstore.Static, topics, protocols...) return err } diff --git a/examples/filter2/main.go b/examples/filter2/main.go index 8e1ae957..fbcb5100 100644 --- a/examples/filter2/main.go +++ b/examples/filter2/main.go @@ -90,7 +90,8 @@ func main() { // Setup filter // - _, err = lightNode.AddPeer(fullNode.ListenAddresses()[0], wps.Static, filter.FilterSubscribeID_v20beta1) + _, err = lightNode.AddPeer(fullNode.ListenAddresses()[0], wps.Static, + []string{pubSubTopic.String()}, filter.FilterSubscribeID_v20beta1) if err != nil { log.Info("Error adding filter peer on light node ", err) } diff --git a/library/node.go b/library/node.go index 72a2c632..a3da4f03 100644 --- a/library/node.go +++ b/library/node.go @@ -260,7 +260,7 @@ func AddPeer(address string, protocolID string) (string, error) { return "", err } - peerID, err := wakuState.node.AddPeer(ma, peerstore.Static, libp2pProtocol.ID(protocolID)) + peerID, err := wakuState.node.AddPeer(ma, peerstore.Static, wakuState.relayTopics, libp2pProtocol.ID(protocolID)) if err != nil { return "", err } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index afe9d1d9..540679b4 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -31,6 +31,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/discv5" "github.com/waku-org/go-waku/waku/v2/peermanager" wps "github.com/waku-org/go-waku/waku/v2/peerstore" + wakuprotocol "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" @@ -688,18 +689,20 @@ func (w *WakuNode) startStore(ctx context.Context, sub relay.Subscription) error } // AddPeer is used to add a peer and the protocols it support to the node peerstore -func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, protocols ...protocol.ID) (peer.ID, error) { - return w.peermanager.AddPeer(address, origin, protocols...) +// TODO: Need to update this for autosharding, to only take contentTopics and optional pubSubTopics or provide an alternate API only for contentTopics. +func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) { + return w.peermanager.AddPeer(address, origin, pubSubTopics, protocols...) } // AddDiscoveredPeer to add a discovered peer to the node peerStore -func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin) { +func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubsubTopics []string) { p := peermanager.PeerData{ Origin: origin, AddrInfo: peer.AddrInfo{ ID: ID, Addrs: addrs, }, + PubSubTopics: pubsubTopics, } w.peermanager.AddDiscoveredPeer(p) } @@ -838,6 +841,11 @@ func (w *WakuNode) Peers() ([]*Peer, error) { return peers, nil } +func (w *WakuNode) PeersByShard(cluster uint16, shard uint16) peer.IDSlice { + pTopic := wakuprotocol.NewStaticShardingPubsubTopic(cluster, shard).String() + return w.peerstore.(wps.WakuPeerstore).PeersByPubSubTopic(pTopic) +} + func (w *WakuNode) findRelayNodes(ctx context.Context) { defer w.wg.Done() diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index 7468418e..e8c0c31e 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -27,9 +27,10 @@ import ( // PeerData contains information about a peer useful in establishing connections with it. type PeerData struct { - Origin wps.Origin - AddrInfo peer.AddrInfo - ENR *enode.Node + Origin wps.Origin + AddrInfo peer.AddrInfo + PubSubTopics []string + ENR *enode.Node } // PeerConnectionStrategy is a utility to connect to peers, diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 69f665df..87ddeae9 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -12,6 +12,7 @@ import ( ma "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/logging" wps "github.com/waku-org/go-waku/waku/v2/peerstore" + wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/utils" @@ -213,8 +214,27 @@ func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice) { // Note that these peers will not be set in service-slots. // TODO: It maybe good to set in service-slots based on services supported in the ENR func (pm *PeerManager) AddDiscoveredPeer(p PeerData) { + // Try to fetch shard info from ENR to arrive at pubSub topics. + if len(p.PubSubTopics) == 0 && p.ENR != nil { + shards, err := wenr.RelaySharding(p.ENR.Record()) + if err != nil { + pm.logger.Error("Could not derive relayShards from ENR", zap.Error(err), + logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) + } else { + if shards != nil { + p.PubSubTopics = make([]string, 0) + topics := shards.Topics() + for _, topic := range topics { + topicStr := topic.String() + p.PubSubTopics = append(p.PubSubTopics, topicStr) + } + } else { + pm.logger.Info("ENR doesn't have relay shards", logging.HostID("peer", p.AddrInfo.ID)) + } + } + } - _ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin) + _ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubSubTopics) if p.ENR != nil { err := pm.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR) @@ -223,11 +243,12 @@ func (pm *PeerManager) AddDiscoveredPeer(p PeerData) { logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) } } + } // addPeer adds peer to only the peerStore. // It also sets additional metadata such as origin, ENR and supported protocols -func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, protocols ...protocol.ID) error { +func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) error { pm.logger.Info("adding peer to peerstore", logging.HostID("peer", ID)) pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.AddressTTL) err := pm.host.Peerstore().(wps.WakuPeerstore).SetOrigin(ID, origin) @@ -242,11 +263,21 @@ func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Orig return err } } + if len(pubSubTopics) == 0 { + // Probably the peer is discovered via DNSDiscovery (for which we don't have pubSubTopic info) + //If pubSubTopic and enr is empty or no shard info in ENR,then set to defaultPubSubTopic + pubSubTopics = []string{relay.DefaultWakuTopic} + } + err = pm.host.Peerstore().(wps.WakuPeerstore).SetPubSubTopics(ID, pubSubTopics) + if err != nil { + pm.logger.Error("could not store pubSubTopic", zap.Error(err), + logging.HostID("peer", ID), zap.Strings("topics", pubSubTopics)) + } return nil } // AddPeer adds peer to the peerStore and also to service slots -func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, protocols ...protocol.ID) (peer.ID, error) { +func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) { //Assuming all addresses have peerId info, err := peer.AddrInfoFromP2pAddr(address) if err != nil { @@ -259,7 +290,7 @@ func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, protocol } //Add to the peer-store - err = pm.addPeer(info.ID, info.Addrs, origin, protocols...) + err = pm.addPeer(info.ID, info.Addrs, origin, pubSubTopics, protocols...) if err != nil { return "", err } diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index ac7c2b1c..cef6598f 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -57,7 +57,7 @@ func TestServiceSlots(t *testing.T) { // add h2 peer to peer manager t.Log(h2.ID()) - _, err = pm.AddPeer(getAddr(h2), wps.Static, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer(getAddr(h2), wps.Static, []string{""}, libp2pProtocol.ID(protocol)) require.NoError(t, err) /////////////// @@ -70,7 +70,7 @@ func TestServiceSlots(t *testing.T) { require.Equal(t, peerID, h2.ID()) // add h3 peer to peer manager - _, err = pm.AddPeer(getAddr(h3), wps.Static, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer(getAddr(h3), wps.Static, []string{""}, libp2pProtocol.ID(protocol)) require.NoError(t, err) // check that returned peer is h2 or h3 peer @@ -94,7 +94,7 @@ func TestServiceSlots(t *testing.T) { require.Error(t, err, utils.ErrNoPeersAvailable) // add h4 peer for protocol1 - _, err = pm.AddPeer(getAddr(h4), wps.Static, libp2pProtocol.ID(protocol1)) + _, err = pm.AddPeer(getAddr(h4), wps.Static, []string{""}, libp2pProtocol.ID(protocol1)) require.NoError(t, err) //Test peer selection for protocol1 @@ -122,7 +122,7 @@ func TestDefaultProtocol(t *testing.T) { defer h5.Close() //Test peer selection for relay protocol from peer store - _, err = pm.AddPeer(getAddr(h5), wps.Static, relay.WakuRelayID_v200) + _, err = pm.AddPeer(getAddr(h5), wps.Static, []string{""}, relay.WakuRelayID_v200) require.NoError(t, err) // since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol. @@ -143,7 +143,7 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) { require.NoError(t, err) defer h6.Close() - _, err = pm.AddPeer(getAddr(h6), wps.Static, protocol2) + _, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, protocol2) require.NoError(t, err) peerID, err := pm.SelectPeer(protocol2, nil) diff --git a/waku/v2/peerstore/waku_peer_store.go b/waku/v2/peerstore/waku_peer_store.go index b1feb42d..0d856001 100644 --- a/waku/v2/peerstore/waku_peer_store.go +++ b/waku/v2/peerstore/waku_peer_store.go @@ -25,6 +25,7 @@ const ( const peerOrigin = "origin" const peerENR = "enr" const peerDirection = "direction" +const peerPubSubTopics = "pubSubTopics" // ConnectionFailures contains connection failure information towards all peers type ConnectionFailures struct { @@ -51,6 +52,11 @@ type WakuPeerstore interface { SetDirection(p peer.ID, direction network.Direction) error Direction(p peer.ID) (network.Direction, error) + + AddPubSubTopic(p peer.ID, topic string) error + PubSubTopics(p peer.ID) ([]string, error) + SetPubSubTopics(p peer.ID, topics []string) error + PeersByPubSubTopic(pubSubTopic string) peer.IDSlice } // NewWakuPeerstore creates a new WakuPeerStore object @@ -139,3 +145,43 @@ func (ps *WakuPeerstoreImpl) Direction(p peer.ID) (network.Direction, error) { return result.(network.Direction), nil } + +// AddPubSubTopic adds a new pubSubTopic for a peer +func (ps *WakuPeerstoreImpl) AddPubSubTopic(p peer.ID, topic string) error { + existingTopics, err := ps.PubSubTopics(p) + if err != nil { + return err + } + existingTopics = append(existingTopics, topic) + return ps.peerStore.Put(p, peerPubSubTopics, existingTopics) +} + +// SetPubSubTopics sets pubSubTopics for a peer, it also overrides existing ones that were set previously.. +func (ps *WakuPeerstoreImpl) SetPubSubTopics(p peer.ID, topics []string) error { + return ps.peerStore.Put(p, peerPubSubTopics, topics) +} + +// PubSubTopics fetches list of pubSubTopics for a peer +func (ps *WakuPeerstoreImpl) PubSubTopics(p peer.ID) ([]string, error) { + result, err := ps.peerStore.Get(p, peerPubSubTopics) + if err != nil { + return nil, err + } + return result.([]string), nil +} + +// PeersByPubSubTopic Returns list of peers by pubSubTopic +func (ps *WakuPeerstoreImpl) PeersByPubSubTopic(pubSubTopic string) peer.IDSlice { + var result peer.IDSlice + for _, p := range ps.Peers() { + topics, err := ps.PubSubTopics(p) + if err == nil { + for _, topic := range topics { + if topic == pubSubTopic { + result = append(result, p) + } + } + } //Note: skipping a peer in case of an error as there would be others available. + } + return result +} diff --git a/waku/v2/rendezvous/rendezvous.go b/waku/v2/rendezvous/rendezvous.go index eecab0d3..5b21ffaf 100644 --- a/waku/v2/rendezvous/rendezvous.go +++ b/waku/v2/rendezvous/rendezvous.go @@ -103,8 +103,9 @@ func (r *Rendezvous) DiscoverWithNamespace(ctx context.Context, namespace string r.peerConnector.Subscribe(ctx, peerCh) for _, p := range addrInfo { peer := peermanager.PeerData{ - Origin: peerstore.Rendezvous, - AddrInfo: p, + Origin: peerstore.Rendezvous, + AddrInfo: p, + PubSubTopics: []string{namespace}, } select { case <-ctx.Done():