diff --git a/cmd/waku/server/rest/store.go b/cmd/waku/server/rest/store.go index 9f01c0ae..2144f3c1 100644 --- a/cmd/waku/server/rest/store.go +++ b/cmd/waku/server/rest/store.go @@ -202,22 +202,11 @@ func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) { if peerAddr != nil { options = append(options, store.WithPeerAddr(peerAddr)) } - var results []*store.Result - var result *store.Result - if query.PubsubTopic == "" { - results, err = d.node.Store().QueryAutoSharding(ctx, *query, options...) - if err != nil { - writeStoreError(w, http.StatusInternalServerError, err) - return - } - } else { - result, err := d.node.Store().Query(ctx, *query, options...) - if err != nil { - writeStoreError(w, http.StatusInternalServerError, err) - return - } - results = append(results, result) + result, err := d.node.Store().Query(ctx, *query, options...) + if err != nil { + writeStoreError(w, http.StatusInternalServerError, err) + return } - //TODO: How to respond with multiple query results?? + writeErrOrResponse(w, nil, toStoreResponse(result)) } diff --git a/waku/v2/peermanager/peer_discovery.go b/waku/v2/peermanager/peer_discovery.go index 72ee3077..fd8b316b 100644 --- a/waku/v2/peermanager/peer_discovery.go +++ b/waku/v2/peermanager/peer_discovery.go @@ -100,18 +100,20 @@ func (pm *PeerManager) discoverOnDemand(cluster uint16, return peers, nil } -func (pm *PeerManager) discoverPeersByPubsubTopic(pubsubTopic string, proto protocol.ID, ctx context.Context, maxCount int) { - shardInfo, err := waku_proto.TopicsToRelayShards(pubsubTopic) +func (pm *PeerManager) discoverPeersByPubsubTopics(pubsubTopics []string, proto protocol.ID, ctx context.Context, maxCount int) { + shardsInfo, err := waku_proto.TopicsToRelayShards(pubsubTopics...) if err != nil { - pm.logger.Error("failed to convert pubsub topic to shard", zap.String("topic", pubsubTopic), zap.Error(err)) + pm.logger.Error("failed to convert pubsub topic to shard", zap.Strings("topics", pubsubTopics), zap.Error(err)) return } - if len(shardInfo) > 0 { - err = pm.DiscoverAndConnectToPeers(ctx, shardInfo[0].ClusterID, shardInfo[0].ShardIDs[0], proto, maxCount) - if err != nil { - pm.logger.Error("failed to discover and conenct to peers", zap.Error(err)) + if len(shardsInfo) > 0 { + for _, shardInfo := range shardsInfo { + err = pm.DiscoverAndConnectToPeers(ctx, shardInfo.ClusterID, shardInfo.ShardIDs[0], proto, maxCount) + if err != nil { + pm.logger.Error("failed to discover and conenct to peers", zap.Error(err)) + } } } else { - pm.logger.Debug("failed to convert pubsub topic to shard as topic is named pubsubTopic", zap.String("topic", pubsubTopic)) + pm.logger.Debug("failed to convert pubsub topics to shards as one of the topics is named pubsubTopic", zap.Strings("topics", pubsubTopics)) } } diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index f309b008..18e0f404 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -217,7 +217,7 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() { //Find not connected peers. notConnectedPeers := pm.getNotConnectedPers(topicStr) if notConnectedPeers.Len() == 0 { - pm.discoverPeersByPubsubTopic(topicStr, relay.WakuRelayID_v200, pm.ctx, 2) + pm.discoverPeersByPubsubTopics([]string{topicStr}, relay.WakuRelayID_v200, pm.ctx, 2) continue } //Connect to eligible peers. diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index 8d90f87c..5b289093 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -116,7 +116,7 @@ func TestServiceSlots(t *testing.T) { require.NoError(t, err) require.Equal(t, peerID, h4.ID()) - _, err = pm.SelectPeerByContentTopic(protocol1, "") + _, err = pm.SelectPeerByContentTopics(protocol1, []string{""}) require.Error(t, wakuproto.ErrInvalidFormat, err) } @@ -143,18 +143,18 @@ func TestPeerSelection(t *testing.T) { _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol}) require.NoError(t, err) - peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/2/rs/2/2"}) + peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/2"}}) require.NoError(t, err) require.Equal(t, h2.ID(), peerID) - _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/2/rs/2/3"}) + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/3"}}) require.Error(t, ErrNoPeersAvailable, err) - _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/2/rs/2/1"}) + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}}) require.NoError(t, err) //Test for selectWithLowestRTT - _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopic: "/waku/2/rs/2/1"}) + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}}) require.NoError(t, err) } @@ -287,7 +287,7 @@ func TestOnDemandPeerDiscovery(t *testing.T) { require.NoError(t, err) //Discovery should fail for non-waku protocol - _, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopic: topic, Proto: "/test"}) + _, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/test"}) require.Error(t, err) _, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: "/test"}) @@ -299,7 +299,7 @@ func TestOnDemandPeerDiscovery(t *testing.T) { var enrField uint8 enrField |= (1 << 1) pm3.RegisterWakuProtocol("/vac/waku/store/2.0.0-beta4", enrField) - peerID, err := pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopic: topic, Proto: "/vac/waku/store/2.0.0-beta4", Ctx: ctx}) + peerID, err := pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/vac/waku/store/2.0.0-beta4", Ctx: ctx}) require.NoError(t, err) require.Equal(t, peerID, host2.ID()) @@ -307,7 +307,7 @@ func TestOnDemandPeerDiscovery(t *testing.T) { enrField1 |= (1 << 3) pm3.RegisterWakuProtocol("/vac/waku/lightpush/2.0.0-beta1", enrField1) - peerID, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopic: topic, Proto: "/vac/waku/lightpush/2.0.0-beta1", Ctx: ctx}) + peerID, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/vac/waku/lightpush/2.0.0-beta1", Ctx: ctx}) require.NoError(t, err) require.Equal(t, peerID, host1.ID()) diff --git a/waku/v2/peermanager/peer_selection.go b/waku/v2/peermanager/peer_selection.go index e785c177..72e1d062 100644 --- a/waku/v2/peermanager/peer_selection.go +++ b/waku/v2/peermanager/peer_selection.go @@ -20,12 +20,16 @@ import ( // If a list of specific peers is passed, the peer will be chosen from that list assuming // it supports the chosen protocol and contentTopic, otherwise it will chose a peer from the service slot. // If a peer cannot be found in the service slot, a peer will be selected from node peerstore -func (pm *PeerManager) SelectPeerByContentTopic(proto protocol.ID, contentTopic string, specificPeers ...peer.ID) (peer.ID, error) { - pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic) - if err != nil { - return "", err +func (pm *PeerManager) SelectPeerByContentTopics(proto protocol.ID, contentTopics []string, specificPeers ...peer.ID) (peer.ID, error) { + pubsubTopics := []string{} + for _, cTopic := range contentTopics { + pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(cTopic) + if err != nil { + return "", err + } + pubsubTopics = append(pubsubTopics, pubsubTopic) } - return pm.SelectPeer(PeerSelectionCriteria{PubsubTopic: pubsubTopic, Proto: proto, SpecificPeers: specificPeers}) + return pm.SelectPeer(PeerSelectionCriteria{PubsubTopics: pubsubTopics, Proto: proto, SpecificPeers: specificPeers}) } // SelectRandomPeer is used to return a random peer that supports a given protocol. @@ -40,12 +44,12 @@ func (pm *PeerManager) SelectRandomPeer(criteria PeerSelectionCriteria) (peer.ID // - which topics they track // - latency? - peerID, err := pm.selectServicePeer(criteria.Proto, criteria.PubsubTopic, criteria.Ctx, criteria.SpecificPeers...) + peerID, err := pm.selectServicePeer(criteria.Proto, criteria.PubsubTopics, criteria.Ctx, criteria.SpecificPeers...) if err == nil { return peerID, nil } else if !errors.Is(err, ErrNoPeersAvailable) { pm.logger.Debug("could not retrieve random peer from slot", zap.String("protocol", string(criteria.Proto)), - zap.String("pubsubTopic", criteria.PubsubTopic), zap.Error(err)) + zap.Strings("pubsubTopics", criteria.PubsubTopics), zap.Error(err)) return "", err } @@ -54,34 +58,34 @@ func (pm *PeerManager) SelectRandomPeer(criteria PeerSelectionCriteria) (peer.ID if err != nil { return "", err } - if criteria.PubsubTopic != "" { - filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(criteria.PubsubTopic, filteredPeers...) + if len(criteria.PubsubTopics) > 0 { + filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, filteredPeers...) } return selectRandomPeer(filteredPeers, pm.logger) } -func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubsubTopic string, ctx context.Context, specificPeers ...peer.ID) (peer.ID, error) { +func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubsubTopics []string, ctx context.Context, specificPeers ...peer.ID) (peer.ID, error) { var peerID peer.ID var err error for retryCnt := 0; retryCnt < 1; retryCnt++ { //Try to fetch from serviceSlot if slot := pm.serviceSlots.getPeers(proto); slot != nil { - if pubsubTopic == "" { + if len(pubsubTopics) == 0 || (len(pubsubTopics) == 1 && pubsubTopics[0] == "") { return slot.getRandom() } else { //PubsubTopic based selection keys := make([]peer.ID, 0, len(slot.m)) for i := range slot.m { keys = append(keys, i) } - selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubsubTopic, keys...) + selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(pubsubTopics, keys...) peerID, err = selectRandomPeer(selectedPeers, pm.logger) if err == nil { return peerID, nil } else { - pm.logger.Debug("Discovering peers by pubsubTopic", zap.String("pubsubTopic", pubsubTopic)) + pm.logger.Debug("Discovering peers by pubsubTopic", zap.Strings("pubsubTopics", pubsubTopics)) //Trigger on-demand discovery for this topic and connect to peer immediately. //For now discover atleast 1 peer for the criteria - pm.discoverPeersByPubsubTopic(pubsubTopic, proto, ctx, 1) + pm.discoverPeersByPubsubTopics(pubsubTopics, proto, ctx, 1) //Try to fetch peers again. continue } @@ -98,7 +102,7 @@ func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubsubTopic string, type PeerSelectionCriteria struct { SelectionType PeerSelection Proto protocol.ID - PubsubTopic string + PubsubTopics []string SpecificPeers peer.IDSlice Ctx context.Context } @@ -135,8 +139,8 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) ( criteria.Ctx = context.Background() } - if criteria.PubsubTopic != "" { - peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(criteria.PubsubTopic, criteria.SpecificPeers...) + if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") { + peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, criteria.SpecificPeers...) } peers, err = pm.FilterPeersByProto(peers, criteria.Proto) diff --git a/waku/v2/peerstore/waku_peer_store.go b/waku/v2/peerstore/waku_peer_store.go index b1b1ac4f..5495a577 100644 --- a/waku/v2/peerstore/waku_peer_store.go +++ b/waku/v2/peerstore/waku_peer_store.go @@ -59,6 +59,7 @@ type WakuPeerstore interface { RemovePubSubTopic(p peer.ID, topic string) error PubSubTopics(p peer.ID) ([]string, error) SetPubSubTopics(p peer.ID, topics []string) error + PeersByPubSubTopics(pubSubTopics []string, specificPeers ...peer.ID) peer.IDSlice PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice } @@ -207,7 +208,38 @@ func (ps *WakuPeerstoreImpl) PubSubTopics(p peer.ID) ([]string, error) { return result.([]string), nil } -// PeersByPubSubTopic Returns list of peers by pubSubTopic +// PeersByPubSubTopic Returns list of peers that support list of pubSubTopics +// If specifiPeers are listed, filtering is done from them otherwise from all peers in peerstore +func (ps *WakuPeerstoreImpl) PeersByPubSubTopics(pubSubTopics []string, specificPeers ...peer.ID) peer.IDSlice { + if specificPeers == nil { + specificPeers = ps.Peers() + } + var result peer.IDSlice + for _, p := range specificPeers { + topics, err := ps.PubSubTopics(p) + if err == nil { + //Convoluted and crazy logic to find subset of topics + // Could not find a better way to do it? + peerTopicMap := make(map[string]struct{}) + match := true + for _, topic := range topics { + peerTopicMap[topic] = struct{}{} + } + for _, topic := range pubSubTopics { + if _, ok := peerTopicMap[topic]; !ok { + match = false + break + } + } + if match { + result = append(result, p) + } + } //Note: skipping a peer in case of an error as there would be others available. + } + return result +} + +// PeersByPubSubTopic Returns list of peers that support a single pubSubTopic // If specifiPeers are listed, filtering is done from them otherwise from all peers in peerstore func (ps *WakuPeerstoreImpl) PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice { if specificPeers == nil { diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index e3c9af32..aa0c38d5 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -312,7 +312,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: FilterSubscribeID_v20beta1, - PubsubTopic: pubSubTopic, + PubsubTopics: []string{pubSubTopic}, SpecificPeers: params.preferredPeers, Ctx: ctx, }, diff --git a/waku/v2/protocol/filter/filter_proto_ident_test.go b/waku/v2/protocol/filter/filter_proto_ident_test.go index d67c76bf..9168b03f 100644 --- a/waku/v2/protocol/filter/filter_proto_ident_test.go +++ b/waku/v2/protocol/filter/filter_proto_ident_test.go @@ -178,7 +178,7 @@ func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFi peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: FilterSubscribeID_v20beta1, - PubsubTopic: pubSubTopic, + PubsubTopics: []string{pubSubTopic}, SpecificPeers: params.preferredPeers, Ctx: ctx, }, diff --git a/waku/v2/protocol/legacy_filter/waku_filter.go b/waku/v2/protocol/legacy_filter/waku_filter.go index 8a7b00b3..0d87f718 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter.go +++ b/waku/v2/protocol/legacy_filter/waku_filter.go @@ -256,7 +256,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: FilterID_v20beta1, - PubsubTopic: filter.Topic, + PubsubTopics: []string{filter.Topic}, SpecificPeers: params.preferredPeers, Ctx: ctx, }, diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 469d9d07..43f33fea 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -253,7 +253,7 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: LightPushID_v20beta1, - PubsubTopic: params.pubsubTopic, + PubsubTopics: []string{params.pubsubTopic}, SpecificPeers: params.preferredPeers, Ctx: ctx, }, diff --git a/waku/v2/protocol/store/waku_store_client.go b/waku/v2/protocol/store/waku_store_client.go index 03758e4b..137c85a8 100644 --- a/waku/v2/protocol/store/waku_store_client.go +++ b/waku/v2/protocol/store/waku_store_client.go @@ -4,15 +4,12 @@ import ( "context" "encoding/hex" "errors" - "fmt" "math" - "strings" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-msgio/pbio" "github.com/multiformats/go-multiaddr" "go.uber.org/zap" - "golang.org/x/exp/maps" "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/peermanager" @@ -258,47 +255,6 @@ func (store *WakuStore) localQuery(historyQuery *pb.HistoryRPC) (*pb.HistoryResp return historyResponseRPC.Response, nil } -func (store *WakuStore) QueryAutoSharding(ctx context.Context, query Query, opts ...HistoryRequestOption) ([]*Result, error) { - var results []*Result - var failedContentTopics []string - pubSubTopicMap, err := protocol.GeneratePubsubToContentTopicMap(query.PubsubTopic, query.ContentTopics) - if err != nil { - return nil, err - } - - //Duplicate processing of opts, which is done again in Query function below - // Not sure how to handle this, hence leaving it as of now - params := new(HistoryRequestParameters) - var optList []HistoryRequestOption - for _, opt := range optList { - opt(params) - } - - //Add Peer to peerstore. - if store.pm != nil && params.peerAddr != nil { - _, err = store.pm.AddPeer(params.peerAddr, peerstore.Static, maps.Keys(pubSubTopicMap), StoreID_v20beta4) - if err != nil { - return nil, err - } - } - - for pubSubTopic, cTopics := range pubSubTopicMap { - query.ContentTopics = cTopics - query.PubsubTopic = pubSubTopic - //Invoke Query separately - result, err := store.Query(ctx, query, opts...) - if err != nil { - failedContentTopics = append(failedContentTopics, cTopics...) - } - results = append(results, result) - } - if len(failedContentTopics) > 0 { - return results, fmt.Errorf("subscriptions failed for contentTopics: %s", strings.Join(failedContentTopics, ",")) - } else { - return results, nil - } -} - func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) { params := new(HistoryRequestParameters) params.s = store @@ -309,19 +265,43 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR opt(params) } - if store.pm != nil && params.selectedPeer == "" { - var err error - params.selectedPeer, err = store.pm.SelectPeer( - peermanager.PeerSelectionCriteria{ - SelectionType: params.peerSelectionType, - Proto: StoreID_v20beta4, - PubsubTopic: query.PubsubTopic, - SpecificPeers: params.preferredPeers, - Ctx: ctx, - }, - ) - if err != nil { - return nil, err + if !params.localQuery { + pubsubTopics := []string{} + if query.PubsubTopic == "" { + for _, cTopic := range query.ContentTopics { + pubsubTopic, err := protocol.GetPubSubTopicFromContentTopic(cTopic) + if err != nil { + return nil, err + } + pubsubTopics = append(pubsubTopics, pubsubTopic) + } + } else { + pubsubTopics = append(pubsubTopics, query.PubsubTopic) + } + + //Add Peer to peerstore. + if store.pm != nil && params.peerAddr != nil { + peerId, err := store.pm.AddPeer(params.peerAddr, peerstore.Static, pubsubTopics, StoreID_v20beta4) + if err != nil { + return nil, err + } + params.selectedPeer = peerId + + } + if store.pm != nil && params.selectedPeer == "" { + var err error + params.selectedPeer, err = store.pm.SelectPeer( + peermanager.PeerSelectionCriteria{ + SelectionType: params.peerSelectionType, + Proto: StoreID_v20beta4, + PubsubTopics: pubsubTopics, + SpecificPeers: params.preferredPeers, + Ctx: ctx, + }, + ) + if err != nil { + return nil, err + } } } diff --git a/waku/v2/protocol/store/waku_store_protocol.go b/waku/v2/protocol/store/waku_store_protocol.go index 8fdbb59d..195fa7c6 100644 --- a/waku/v2/protocol/store/waku_store_protocol.go +++ b/waku/v2/protocol/store/waku_store_protocol.go @@ -84,7 +84,6 @@ type Store interface { SetHost(h host.Host) Start(context.Context, *relay.Subscription) error Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) - QueryAutoSharding(ctx context.Context, query Query, opts ...HistoryRequestOption) ([]*Result, error) Find(ctx context.Context, query Query, cb criteriaFN, opts ...HistoryRequestOption) (*wpb.WakuMessage, error) Next(ctx context.Context, r *Result) (*Result, error) Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error)