From a5ce5dfaa47b305efe460a84ebb7164180e2b8cb Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 14 Nov 2023 04:22:46 +0530 Subject: [PATCH] feat: update store client Query API for autosharding (#885) --- cmd/waku/server/rest/store.go | 52 ++++----- cmd/waku/server/rpc/store.go | 2 +- library/store.go | 2 +- waku/v2/node/wakunode2.go | 8 +- waku/v2/peermanager/peer_discovery.go | 18 +-- waku/v2/peermanager/peer_manager.go | 51 +++++++-- waku/v2/peermanager/peer_manager_test.go | 16 +-- waku/v2/peermanager/peer_selection.go | 38 ++++--- waku/v2/peerstore/waku_peer_store.go | 34 +++++- waku/v2/protocol/content_filter.go | 20 +--- waku/v2/protocol/filter/client.go | 2 +- .../filter/filter_proto_ident_test.go | 2 +- waku/v2/protocol/legacy_filter/waku_filter.go | 2 +- waku/v2/protocol/lightpush/waku_lightpush.go | 2 +- waku/v2/protocol/shard.go | 23 ++++ waku/v2/protocol/store/waku_store_client.go | 107 ++++++++++++++---- .../store/waku_store_protocol_test.go | 10 +- waku/v2/rendezvous/rendezvous.go | 2 +- waku/v2/service/common_discovery_service.go | 2 +- 19 files changed, 258 insertions(+), 135 deletions(-) diff --git a/cmd/waku/server/rest/store.go b/cmd/waku/server/rest/store.go index ce31e8c3..19f3e048 100644 --- a/cmd/waku/server/rest/store.go +++ b/cmd/waku/server/rest/store.go @@ -12,10 +12,8 @@ import ( "github.com/go-chi/chi/v5" "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" - "github.com/waku-org/go-waku/waku/v2/utils" ) type StoreService struct { @@ -57,24 +55,20 @@ func NewStoreService(node *node.WakuNode, m *chi.Mux) *StoreService { return s } -func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store.HistoryRequestOption, error) { +func getStoreParams(r *http.Request) (*store.Query, []store.HistoryRequestOption, error) { query := &store.Query{} var options []store.HistoryRequestOption - + var err error peerAddrStr := r.URL.Query().Get("peerAddr") - m, err := multiaddr.NewMultiaddr(peerAddrStr) - if err != nil { - return nil, nil, nil, err + var m multiaddr.Multiaddr + if peerAddrStr != "" { + m, err = multiaddr.NewMultiaddr(peerAddrStr) + if err != nil { + return nil, nil, err + } + options = append(options, store.WithPeerAddr(m)) } - - peerID, err := utils.GetPeerID(m) - if err != nil { - return nil, nil, nil, err - } - - options = append(options, store.WithPeer(peerID)) - - query.Topic = r.URL.Query().Get("pubsubTopic") + query.PubsubTopic = r.URL.Query().Get("pubsubTopic") contentTopics := r.URL.Query().Get("contentTopics") if contentTopics != "" { @@ -85,7 +79,7 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store if startTimeStr != "" { startTime, err := strconv.ParseInt(startTimeStr, 10, 64) if err != nil { - return nil, nil, nil, err + return nil, nil, err } query.StartTime = &startTime } @@ -94,7 +88,7 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store if endTimeStr != "" { endTime, err := strconv.ParseInt(endTimeStr, 10, 64) if err != nil { - return nil, nil, nil, err + return nil, nil, err } query.EndTime = &endTime } @@ -111,25 +105,25 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store if senderTimeStr != "" { cursor.SenderTime, err = strconv.ParseInt(senderTimeStr, 10, 64) if err != nil { - return nil, nil, nil, err + return nil, nil, err } } if storeTimeStr != "" { cursor.ReceiverTime, err = strconv.ParseInt(storeTimeStr, 10, 64) if err != nil { - return nil, nil, nil, err + return nil, nil, err } } if digestStr != "" { cursor.Digest, err = base64.URLEncoding.DecodeString(digestStr) if err != nil { - return nil, nil, nil, err + return nil, nil, err } } - cursor.PubsubTopic = query.Topic + cursor.PubsubTopic = query.PubsubTopic options = append(options, store.WithCursor(cursor)) } @@ -142,21 +136,21 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store if ascendingStr != "" { ascending, err = strconv.ParseBool(ascendingStr) if err != nil { - return nil, nil, nil, err + return nil, nil, err } } if pageSizeStr != "" { pageSize, err = strconv.ParseUint(pageSizeStr, 10, 64) if err != nil { - return nil, nil, nil, err + return nil, nil, err } } options = append(options, store.WithPaging(ascending, pageSize)) } - return m, query, options, nil + return query, options, nil } func writeStoreError(w http.ResponseWriter, code int, err error) { @@ -190,7 +184,7 @@ func toStoreResponse(result *store.Result) StoreResponse { } func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) { - peerAddr, query, options, err := getStoreParams(r) + query, options, err := getStoreParams(r) if err != nil { writeStoreError(w, http.StatusBadRequest, err) return @@ -199,12 +193,6 @@ func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) defer cancel() - _, err = d.node.AddPeer(peerAddr, peerstore.Static, d.node.Relay().Topics()) - if err != nil { - writeStoreError(w, http.StatusInternalServerError, err) - return - } - result, err := d.node.Store().Query(ctx, *query, options...) if err != nil { writeStoreError(w, http.StatusInternalServerError, err) diff --git a/cmd/waku/server/rpc/store.go b/cmd/waku/server/rpc/store.go index 84f3f64c..f39bb645 100644 --- a/cmd/waku/server/rpc/store.go +++ b/cmd/waku/server/rpc/store.go @@ -48,7 +48,7 @@ func (s *StoreService) GetV1Messages(req *http.Request, args *StoreMessagesArgs, res, err := s.node.Store().Query( req.Context(), store.Query{ - Topic: args.Topic, + PubsubTopic: args.Topic, ContentTopics: args.ContentFilters, StartTime: args.StartTime, EndTime: args.EndTime, diff --git a/library/store.go b/library/store.go index e539e8be..9c8c8c3d 100644 --- a/library/store.go +++ b/library/store.go @@ -39,7 +39,7 @@ func queryResponse(ctx context.Context, args storeMessagesArgs, options []store. res, err := wakuState.node.Store().Query( ctx, store.Query{ - Topic: args.Topic, + PubsubTopic: args.Topic, ContentTopics: args.ContentTopics, StartTime: args.StartTime, EndTime: args.EndTime, diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 5b4dbd50..a9f22657 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -714,7 +714,11 @@ func (w *WakuNode) startStore(ctx context.Context, sub *relay.Subscription) erro // AddPeer is used to add a peer and the protocols it support to the node peerstore // TODO: Need to update this for autosharding, to only take contentTopics and optional pubSubTopics or provide an alternate API only for contentTopics. func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) { - return w.peermanager.AddPeer(address, origin, pubSubTopics, protocols...) + pData, err := w.peermanager.AddPeer(address, origin, pubSubTopics, protocols...) + if err != nil { + return "", err + } + return pData.AddrInfo.ID, nil } // AddDiscoveredPeer to add a discovered peer to the node peerStore @@ -725,7 +729,7 @@ func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wp ID: ID, Addrs: addrs, }, - PubSubTopics: pubsubTopics, + PubsubTopics: pubsubTopics, } w.peermanager.AddDiscoveredPeer(p, connectNow) } diff --git a/waku/v2/peermanager/peer_discovery.go b/waku/v2/peermanager/peer_discovery.go index 72ee3077..fd8b316b 100644 --- a/waku/v2/peermanager/peer_discovery.go +++ b/waku/v2/peermanager/peer_discovery.go @@ -100,18 +100,20 @@ func (pm *PeerManager) discoverOnDemand(cluster uint16, return peers, nil } -func (pm *PeerManager) discoverPeersByPubsubTopic(pubsubTopic string, proto protocol.ID, ctx context.Context, maxCount int) { - shardInfo, err := waku_proto.TopicsToRelayShards(pubsubTopic) +func (pm *PeerManager) discoverPeersByPubsubTopics(pubsubTopics []string, proto protocol.ID, ctx context.Context, maxCount int) { + shardsInfo, err := waku_proto.TopicsToRelayShards(pubsubTopics...) if err != nil { - pm.logger.Error("failed to convert pubsub topic to shard", zap.String("topic", pubsubTopic), zap.Error(err)) + pm.logger.Error("failed to convert pubsub topic to shard", zap.Strings("topics", pubsubTopics), zap.Error(err)) return } - if len(shardInfo) > 0 { - err = pm.DiscoverAndConnectToPeers(ctx, shardInfo[0].ClusterID, shardInfo[0].ShardIDs[0], proto, maxCount) - if err != nil { - pm.logger.Error("failed to discover and conenct to peers", zap.Error(err)) + if len(shardsInfo) > 0 { + for _, shardInfo := range shardsInfo { + err = pm.DiscoverAndConnectToPeers(ctx, shardInfo.ClusterID, shardInfo.ShardIDs[0], proto, maxCount) + if err != nil { + pm.logger.Error("failed to discover and conenct to peers", zap.Error(err)) + } } } else { - pm.logger.Debug("failed to convert pubsub topic to shard as topic is named pubsubTopic", zap.String("topic", pubsubTopic)) + pm.logger.Debug("failed to convert pubsub topics to shards as one of the topics is named pubsubTopic", zap.Strings("topics", pubsubTopics)) } } diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index f309b008..c1169129 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -217,7 +217,7 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() { //Find not connected peers. notConnectedPeers := pm.getNotConnectedPers(topicStr) if notConnectedPeers.Len() == 0 { - pm.discoverPeersByPubsubTopic(topicStr, relay.WakuRelayID_v200, pm.ctx, 2) + pm.discoverPeersByPubsubTopics([]string{topicStr}, relay.WakuRelayID_v200, pm.ctx, 2) continue } //Connect to eligible peers. @@ -321,11 +321,11 @@ func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID { logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) } else { if shards != nil { - p.PubSubTopics = make([]string, 0) + p.PubsubTopics = make([]string, 0) topics := shards.Topics() for _, topic := range topics { topicStr := topic.String() - p.PubSubTopics = append(p.PubSubTopics, topicStr) + p.PubsubTopics = append(p.PubsubTopics, topicStr) } } else { pm.logger.Debug("ENR doesn't have relay shards", logging.HostID("peer", p.AddrInfo.ID)) @@ -361,12 +361,12 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) { return } supportedProtos := []protocol.ID{} - if len(p.PubSubTopics) == 0 && p.ENR != nil { + if len(p.PubsubTopics) == 0 && p.ENR != nil { // Try to fetch shard info and supported protocols from ENR to arrive at pubSub topics. supportedProtos = pm.processPeerENR(&p) } - _ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubSubTopics, supportedProtos...) + _ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubsubTopics, supportedProtos...) if p.ENR != nil { err := pm.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR) @@ -419,12 +419,29 @@ func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Orig return nil } +func AddrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host, pubsubTopics ...string) *service.PeerData { + addrs := host.Peerstore().Addrs(peerID) + if len(addrs) == 0 { + //Addresses expired, remove peer from peerStore + host.Peerstore().RemovePeer(peerID) + return nil + } + return &service.PeerData{ + Origin: origin, + AddrInfo: peer.AddrInfo{ + ID: peerID, + Addrs: addrs, + }, + PubsubTopics: pubsubTopics, + } +} + // AddPeer adds peer to the peerStore and also to service slots -func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) { +func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubsubTopics []string, protocols ...protocol.ID) (*service.PeerData, error) { //Assuming all addresses have peerId info, err := peer.AddrInfoFromP2pAddr(address) if err != nil { - return "", err + return nil, err } //Add Service peers to serviceSlots. @@ -433,12 +450,26 @@ func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTo } //Add to the peer-store - err = pm.addPeer(info.ID, info.Addrs, origin, pubSubTopics, protocols...) + err = pm.addPeer(info.ID, info.Addrs, origin, pubsubTopics, protocols...) if err != nil { - return "", err + return nil, err } - return info.ID, nil + pData := &service.PeerData{ + Origin: origin, + AddrInfo: peer.AddrInfo{ + ID: info.ID, + Addrs: info.Addrs, + }, + PubsubTopics: pubsubTopics, + } + + return pData, nil +} + +// Connect establishes a connection to a peer. +func (pm *PeerManager) Connect(pData *service.PeerData) { + go pm.peerConnector.PushToChan(*pData) } // RemovePeer deletes peer from the peerStore after disconnecting it. diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index 8d90f87c..5b289093 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -116,7 +116,7 @@ func TestServiceSlots(t *testing.T) { require.NoError(t, err) require.Equal(t, peerID, h4.ID()) - _, err = pm.SelectPeerByContentTopic(protocol1, "") + _, err = pm.SelectPeerByContentTopics(protocol1, []string{""}) require.Error(t, wakuproto.ErrInvalidFormat, err) } @@ -143,18 +143,18 @@ func TestPeerSelection(t *testing.T) { _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol}) require.NoError(t, err) - peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/2/rs/2/2"}) + peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/2"}}) require.NoError(t, err) require.Equal(t, h2.ID(), peerID) - _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/2/rs/2/3"}) + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/3"}}) require.Error(t, ErrNoPeersAvailable, err) - _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/2/rs/2/1"}) + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}}) require.NoError(t, err) //Test for selectWithLowestRTT - _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopic: "/waku/2/rs/2/1"}) + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}}) require.NoError(t, err) } @@ -287,7 +287,7 @@ func TestOnDemandPeerDiscovery(t *testing.T) { require.NoError(t, err) //Discovery should fail for non-waku protocol - _, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopic: topic, Proto: "/test"}) + _, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/test"}) require.Error(t, err) _, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: "/test"}) @@ -299,7 +299,7 @@ func TestOnDemandPeerDiscovery(t *testing.T) { var enrField uint8 enrField |= (1 << 1) pm3.RegisterWakuProtocol("/vac/waku/store/2.0.0-beta4", enrField) - peerID, err := pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopic: topic, Proto: "/vac/waku/store/2.0.0-beta4", Ctx: ctx}) + peerID, err := pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/vac/waku/store/2.0.0-beta4", Ctx: ctx}) require.NoError(t, err) require.Equal(t, peerID, host2.ID()) @@ -307,7 +307,7 @@ func TestOnDemandPeerDiscovery(t *testing.T) { enrField1 |= (1 << 3) pm3.RegisterWakuProtocol("/vac/waku/lightpush/2.0.0-beta1", enrField1) - peerID, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopic: topic, Proto: "/vac/waku/lightpush/2.0.0-beta1", Ctx: ctx}) + peerID, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/vac/waku/lightpush/2.0.0-beta1", Ctx: ctx}) require.NoError(t, err) require.Equal(t, peerID, host1.ID()) diff --git a/waku/v2/peermanager/peer_selection.go b/waku/v2/peermanager/peer_selection.go index e785c177..72e1d062 100644 --- a/waku/v2/peermanager/peer_selection.go +++ b/waku/v2/peermanager/peer_selection.go @@ -20,12 +20,16 @@ import ( // If a list of specific peers is passed, the peer will be chosen from that list assuming // it supports the chosen protocol and contentTopic, otherwise it will chose a peer from the service slot. // If a peer cannot be found in the service slot, a peer will be selected from node peerstore -func (pm *PeerManager) SelectPeerByContentTopic(proto protocol.ID, contentTopic string, specificPeers ...peer.ID) (peer.ID, error) { - pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic) - if err != nil { - return "", err +func (pm *PeerManager) SelectPeerByContentTopics(proto protocol.ID, contentTopics []string, specificPeers ...peer.ID) (peer.ID, error) { + pubsubTopics := []string{} + for _, cTopic := range contentTopics { + pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(cTopic) + if err != nil { + return "", err + } + pubsubTopics = append(pubsubTopics, pubsubTopic) } - return pm.SelectPeer(PeerSelectionCriteria{PubsubTopic: pubsubTopic, Proto: proto, SpecificPeers: specificPeers}) + return pm.SelectPeer(PeerSelectionCriteria{PubsubTopics: pubsubTopics, Proto: proto, SpecificPeers: specificPeers}) } // SelectRandomPeer is used to return a random peer that supports a given protocol. @@ -40,12 +44,12 @@ func (pm *PeerManager) SelectRandomPeer(criteria PeerSelectionCriteria) (peer.ID // - which topics they track // - latency? - peerID, err := pm.selectServicePeer(criteria.Proto, criteria.PubsubTopic, criteria.Ctx, criteria.SpecificPeers...) + peerID, err := pm.selectServicePeer(criteria.Proto, criteria.PubsubTopics, criteria.Ctx, criteria.SpecificPeers...) if err == nil { return peerID, nil } else if !errors.Is(err, ErrNoPeersAvailable) { pm.logger.Debug("could not retrieve random peer from slot", zap.String("protocol", string(criteria.Proto)), - zap.String("pubsubTopic", criteria.PubsubTopic), zap.Error(err)) + zap.Strings("pubsubTopics", criteria.PubsubTopics), zap.Error(err)) return "", err } @@ -54,34 +58,34 @@ func (pm *PeerManager) SelectRandomPeer(criteria PeerSelectionCriteria) (peer.ID if err != nil { return "", err } - if criteria.PubsubTopic != "" { - filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(criteria.PubsubTopic, filteredPeers...) + if len(criteria.PubsubTopics) > 0 { + filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, filteredPeers...) } return selectRandomPeer(filteredPeers, pm.logger) } -func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubsubTopic string, ctx context.Context, specificPeers ...peer.ID) (peer.ID, error) { +func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubsubTopics []string, ctx context.Context, specificPeers ...peer.ID) (peer.ID, error) { var peerID peer.ID var err error for retryCnt := 0; retryCnt < 1; retryCnt++ { //Try to fetch from serviceSlot if slot := pm.serviceSlots.getPeers(proto); slot != nil { - if pubsubTopic == "" { + if len(pubsubTopics) == 0 || (len(pubsubTopics) == 1 && pubsubTopics[0] == "") { return slot.getRandom() } else { //PubsubTopic based selection keys := make([]peer.ID, 0, len(slot.m)) for i := range slot.m { keys = append(keys, i) } - selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubsubTopic, keys...) + selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(pubsubTopics, keys...) peerID, err = selectRandomPeer(selectedPeers, pm.logger) if err == nil { return peerID, nil } else { - pm.logger.Debug("Discovering peers by pubsubTopic", zap.String("pubsubTopic", pubsubTopic)) + pm.logger.Debug("Discovering peers by pubsubTopic", zap.Strings("pubsubTopics", pubsubTopics)) //Trigger on-demand discovery for this topic and connect to peer immediately. //For now discover atleast 1 peer for the criteria - pm.discoverPeersByPubsubTopic(pubsubTopic, proto, ctx, 1) + pm.discoverPeersByPubsubTopics(pubsubTopics, proto, ctx, 1) //Try to fetch peers again. continue } @@ -98,7 +102,7 @@ func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubsubTopic string, type PeerSelectionCriteria struct { SelectionType PeerSelection Proto protocol.ID - PubsubTopic string + PubsubTopics []string SpecificPeers peer.IDSlice Ctx context.Context } @@ -135,8 +139,8 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) ( criteria.Ctx = context.Background() } - if criteria.PubsubTopic != "" { - peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(criteria.PubsubTopic, criteria.SpecificPeers...) + if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") { + peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, criteria.SpecificPeers...) } peers, err = pm.FilterPeersByProto(peers, criteria.Proto) diff --git a/waku/v2/peerstore/waku_peer_store.go b/waku/v2/peerstore/waku_peer_store.go index b1b1ac4f..5495a577 100644 --- a/waku/v2/peerstore/waku_peer_store.go +++ b/waku/v2/peerstore/waku_peer_store.go @@ -59,6 +59,7 @@ type WakuPeerstore interface { RemovePubSubTopic(p peer.ID, topic string) error PubSubTopics(p peer.ID) ([]string, error) SetPubSubTopics(p peer.ID, topics []string) error + PeersByPubSubTopics(pubSubTopics []string, specificPeers ...peer.ID) peer.IDSlice PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice } @@ -207,7 +208,38 @@ func (ps *WakuPeerstoreImpl) PubSubTopics(p peer.ID) ([]string, error) { return result.([]string), nil } -// PeersByPubSubTopic Returns list of peers by pubSubTopic +// PeersByPubSubTopic Returns list of peers that support list of pubSubTopics +// If specifiPeers are listed, filtering is done from them otherwise from all peers in peerstore +func (ps *WakuPeerstoreImpl) PeersByPubSubTopics(pubSubTopics []string, specificPeers ...peer.ID) peer.IDSlice { + if specificPeers == nil { + specificPeers = ps.Peers() + } + var result peer.IDSlice + for _, p := range specificPeers { + topics, err := ps.PubSubTopics(p) + if err == nil { + //Convoluted and crazy logic to find subset of topics + // Could not find a better way to do it? + peerTopicMap := make(map[string]struct{}) + match := true + for _, topic := range topics { + peerTopicMap[topic] = struct{}{} + } + for _, topic := range pubSubTopics { + if _, ok := peerTopicMap[topic]; !ok { + match = false + break + } + } + if match { + result = append(result, p) + } + } //Note: skipping a peer in case of an error as there would be others available. + } + return result +} + +// PeersByPubSubTopic Returns list of peers that support a single pubSubTopic // If specifiPeers are listed, filtering is done from them otherwise from all peers in peerstore func (ps *WakuPeerstoreImpl) PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice { if specificPeers == nil { diff --git a/waku/v2/protocol/content_filter.go b/waku/v2/protocol/content_filter.go index 7ea7f2a5..f09cf52b 100644 --- a/waku/v2/protocol/content_filter.go +++ b/waku/v2/protocol/content_filter.go @@ -52,23 +52,5 @@ func (cf ContentFilter) Equals(cf1 ContentFilter) bool { // This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics func ContentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[PubsubTopicStr][]ContentTopicStr, error) { - pubSubTopicMap := make(map[string][]string) - - if contentFilter.PubsubTopic != "" { - pubSubTopicMap[contentFilter.PubsubTopic] = contentFilter.ContentTopicsList() - } else { - //Parse the content-Topics to figure out shards. - for _, cTopicString := range contentFilter.ContentTopicsList() { - pTopicStr, err := GetPubSubTopicFromContentTopic(cTopicString) - if err != nil { - return nil, err - } - _, ok := pubSubTopicMap[pTopicStr] - if !ok { - pubSubTopicMap[pTopicStr] = []string{} - } - pubSubTopicMap[pTopicStr] = append(pubSubTopicMap[pTopicStr], cTopicString) - } - } - return pubSubTopicMap, nil + return GeneratePubsubToContentTopicMap(contentFilter.PubsubTopic, contentFilter.ContentTopicsList()) } diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index e3c9af32..aa0c38d5 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -312,7 +312,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: FilterSubscribeID_v20beta1, - PubsubTopic: pubSubTopic, + PubsubTopics: []string{pubSubTopic}, SpecificPeers: params.preferredPeers, Ctx: ctx, }, diff --git a/waku/v2/protocol/filter/filter_proto_ident_test.go b/waku/v2/protocol/filter/filter_proto_ident_test.go index d67c76bf..9168b03f 100644 --- a/waku/v2/protocol/filter/filter_proto_ident_test.go +++ b/waku/v2/protocol/filter/filter_proto_ident_test.go @@ -178,7 +178,7 @@ func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFi peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: FilterSubscribeID_v20beta1, - PubsubTopic: pubSubTopic, + PubsubTopics: []string{pubSubTopic}, SpecificPeers: params.preferredPeers, Ctx: ctx, }, diff --git a/waku/v2/protocol/legacy_filter/waku_filter.go b/waku/v2/protocol/legacy_filter/waku_filter.go index 8a7b00b3..0d87f718 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter.go +++ b/waku/v2/protocol/legacy_filter/waku_filter.go @@ -256,7 +256,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: FilterID_v20beta1, - PubsubTopic: filter.Topic, + PubsubTopics: []string{filter.Topic}, SpecificPeers: params.preferredPeers, Ctx: ctx, }, diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 10e78662..f1890ed9 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -254,7 +254,7 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: LightPushID_v20beta1, - PubsubTopic: params.pubsubTopic, + PubsubTopics: []string{params.pubsubTopic}, SpecificPeers: params.preferredPeers, Ctx: ctx, }, diff --git a/waku/v2/protocol/shard.go b/waku/v2/protocol/shard.go index 3f4779d2..66ec5fdc 100644 --- a/waku/v2/protocol/shard.go +++ b/waku/v2/protocol/shard.go @@ -245,3 +245,26 @@ func GetPubSubTopicFromContentTopic(cTopicString string) (string, error) { return pTopic.String(), nil } + +func GeneratePubsubToContentTopicMap(pubsubTopic string, contentTopics []string) (map[string][]string, error) { + + pubSubTopicMap := make(map[string][]string, 0) + + if pubsubTopic == "" { + //Should we derive pubsub topic from contentTopic so that peer selection and discovery can be done accordingly? + for _, cTopic := range contentTopics { + pTopic, err := GetPubSubTopicFromContentTopic(cTopic) + if err != nil { + return nil, err + } + _, ok := pubSubTopicMap[pTopic] + if !ok { + pubSubTopicMap[pTopic] = []string{} + } + pubSubTopicMap[pTopic] = append(pubSubTopicMap[pTopic], cTopic) + } + } else { + pubSubTopicMap[pubsubTopic] = append(pubSubTopicMap[pubsubTopic], contentTopics...) + } + return pubSubTopicMap, nil +} diff --git a/waku/v2/protocol/store/waku_store_client.go b/waku/v2/protocol/store/waku_store_client.go index 49e80374..fb2d2e95 100644 --- a/waku/v2/protocol/store/waku_store_client.go +++ b/waku/v2/protocol/store/waku_store_client.go @@ -8,17 +8,19 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-msgio/pbio" + "github.com/multiformats/go-multiaddr" "go.uber.org/zap" "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" ) type Query struct { - Topic string + PubsubTopic string ContentTopics []string StartTime *int64 EndTime *int64 @@ -82,6 +84,7 @@ type criteriaFN = func(msg *wpb.WakuMessage) (bool, error) type HistoryRequestParameters struct { selectedPeer peer.ID + peerAddr multiaddr.Multiaddr peerSelectionType peermanager.PeerSelection preferredPeers peer.IDSlice localQuery bool @@ -93,12 +96,31 @@ type HistoryRequestParameters struct { s *WakuStore } -type HistoryRequestOption func(*HistoryRequestParameters) +type HistoryRequestOption func(*HistoryRequestParameters) error -// WithPeer is an option used to specify the peerID to request the message history +// WithPeer is an option used to specify the peerID to request the message history. +// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. func WithPeer(p peer.ID) HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.selectedPeer = p + if params.peerAddr != nil { + return errors.New("peerId and peerAddr options are mutually exclusive") + } + return nil + } +} + +//WithPeerAddr is an option used to specify a peerAddress to request the message history. +// This new peer will be added to peerStore. +// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. + +func WithPeerAddr(pAddr multiaddr.Multiaddr) HistoryRequestOption { + return func(params *HistoryRequestParameters) error { + params.peerAddr = pAddr + if params.selectedPeer != "" { + return errors.New("peerAddr and peerId options are mutually exclusive") + } + return nil } } @@ -108,9 +130,10 @@ func WithPeer(p peer.ID) HistoryRequestOption { // from the node peerstore // Note: This option is avaiable only with peerManager func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.peerSelectionType = peermanager.Automatic params.preferredPeers = fromThesePeers + return nil } } @@ -120,44 +143,50 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption // from the node peerstore // Note: This option is avaiable only with peerManager func WithFastestPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.peerSelectionType = peermanager.LowestRTT + return nil } } // WithRequestID is an option to set a specific request ID to be used when // creating a store request func WithRequestID(requestID []byte) HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.requestID = requestID + return nil } } // WithAutomaticRequestID is an option to automatically generate a request ID // when creating a store request func WithAutomaticRequestID() HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.requestID = protocol.GenerateRequestID() + return nil } } func WithCursor(c *pb.Index) HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.cursor = c + return nil } } // WithPaging is an option used to specify the order and maximum number of records to return func WithPaging(asc bool, pageSize uint64) HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.asc = asc params.pageSize = pageSize + return nil } } func WithLocalQuery() HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.localQuery = true + return nil } } @@ -253,28 +282,56 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR optList := DefaultOptions() optList = append(optList, opts...) for _, opt := range optList { - opt(params) - } - if store.pm != nil && params.selectedPeer == "" { - var err error - params.selectedPeer, err = store.pm.SelectPeer( - peermanager.PeerSelectionCriteria{ - SelectionType: params.peerSelectionType, - Proto: StoreID_v20beta4, - PubsubTopic: query.Topic, - SpecificPeers: params.preferredPeers, - Ctx: ctx, - }, - ) + err := opt(params) if err != nil { return nil, err } } + if !params.localQuery { + pubsubTopics := []string{} + if query.PubsubTopic == "" { + for _, cTopic := range query.ContentTopics { + pubsubTopic, err := protocol.GetPubSubTopicFromContentTopic(cTopic) + if err != nil { + return nil, err + } + pubsubTopics = append(pubsubTopics, pubsubTopic) + } + } else { + pubsubTopics = append(pubsubTopics, query.PubsubTopic) + } + + //Add Peer to peerstore. + if store.pm != nil && params.peerAddr != nil { + pData, err := store.pm.AddPeer(params.peerAddr, peerstore.Static, pubsubTopics, StoreID_v20beta4) + if err != nil { + return nil, err + } + store.pm.Connect(pData) + params.selectedPeer = pData.AddrInfo.ID + } + if store.pm != nil && params.selectedPeer == "" { + var err error + params.selectedPeer, err = store.pm.SelectPeer( + peermanager.PeerSelectionCriteria{ + SelectionType: params.peerSelectionType, + Proto: StoreID_v20beta4, + PubsubTopics: pubsubTopics, + SpecificPeers: params.preferredPeers, + Ctx: ctx, + }, + ) + if err != nil { + return nil, err + } + } + } + historyRequest := &pb.HistoryRPC{ RequestId: hex.EncodeToString(params.requestID), Query: &pb.HistoryQuery{ - PubsubTopic: query.Topic, + PubsubTopic: query.PubsubTopic, ContentFilters: []*pb.ContentFilter{}, StartTime: query.StartTime, EndTime: query.EndTime, diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index 82365110..ed034b55 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -71,7 +71,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { require.NoError(t, err) q := Query{ - Topic: pubsubTopic1, + PubsubTopic: pubsubTopic1, ContentTopics: []string{topic1}, } @@ -110,7 +110,7 @@ func TestWakuStoreProtocolLocalQuery(t *testing.T) { time.Sleep(100 * time.Millisecond) q := Query{ - Topic: pubsubTopic1, + PubsubTopic: pubsubTopic1, ContentTopics: []string{topic1}, } response, err := s1.Query(ctx, q, WithLocalQuery()) @@ -167,7 +167,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { defer s2.Stop() q := Query{ - Topic: pubsubTopic1, + PubsubTopic: pubsubTopic1, ContentTopics: []string{topic1}, } @@ -244,7 +244,7 @@ func TestWakuStoreResult(t *testing.T) { defer s2.Stop() q := Query{ - Topic: pubsubTopic1, + PubsubTopic: pubsubTopic1, ContentTopics: []string{topic1}, } @@ -346,7 +346,7 @@ func TestWakuStoreProtocolFind(t *testing.T) { defer s2.Stop() q := Query{ - Topic: pubsubTopic1, + PubsubTopic: pubsubTopic1, } fn := func(msg *pb.WakuMessage) (bool, error) { diff --git a/waku/v2/rendezvous/rendezvous.go b/waku/v2/rendezvous/rendezvous.go index a53a74a4..76c63ff5 100644 --- a/waku/v2/rendezvous/rendezvous.go +++ b/waku/v2/rendezvous/rendezvous.go @@ -107,7 +107,7 @@ func (r *Rendezvous) DiscoverWithNamespace(ctx context.Context, namespace string peer := service.PeerData{ Origin: peerstore.Rendezvous, AddrInfo: p, - PubSubTopics: []string{namespace}, + PubsubTopics: []string{namespace}, } if !r.PushToChan(peer) { r.log.Error("could push to closed channel/context completed") diff --git a/waku/v2/service/common_discovery_service.go b/waku/v2/service/common_discovery_service.go index 72bf96f1..c22f18f1 100644 --- a/waku/v2/service/common_discovery_service.go +++ b/waku/v2/service/common_discovery_service.go @@ -14,7 +14,7 @@ type PeerData struct { Origin wps.Origin AddrInfo peer.AddrInfo ENR *enode.Node - PubSubTopics []string + PubsubTopics []string } type CommonDiscoveryService struct {