diff --git a/cmd/waku/server/rest/store.go b/cmd/waku/server/rest/store.go index 9715ab42..9f01c0ae 100644 --- a/cmd/waku/server/rest/store.go +++ b/cmd/waku/server/rest/store.go @@ -12,7 +12,6 @@ import ( "github.com/go-chi/chi/v5" "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/utils" @@ -60,20 +59,22 @@ func NewStoreService(node *node.WakuNode, m *chi.Mux) *StoreService { func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store.HistoryRequestOption, error) { query := &store.Query{} var options []store.HistoryRequestOption - + var err error peerAddrStr := r.URL.Query().Get("peerAddr") - m, err := multiaddr.NewMultiaddr(peerAddrStr) - if err != nil { - return nil, nil, nil, err + var m multiaddr.Multiaddr + if peerAddrStr != "" { + m, err = multiaddr.NewMultiaddr(peerAddrStr) + if err != nil { + return nil, nil, nil, err + } + + peerID, err := utils.GetPeerID(m) + if err != nil { + return nil, nil, nil, err + } + + options = append(options, store.WithPeer(peerID)) } - - peerID, err := utils.GetPeerID(m) - if err != nil { - return nil, nil, nil, err - } - - options = append(options, store.WithPeer(peerID)) - query.PubsubTopic = r.URL.Query().Get("pubsubTopic") contentTopics := r.URL.Query().Get("contentTopics") @@ -198,18 +199,25 @@ func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) defer cancel() - - _, err = d.node.AddPeer(peerAddr, peerstore.Static, d.node.Relay().Topics()) - if err != nil { - writeStoreError(w, http.StatusInternalServerError, err) - return + if peerAddr != nil { + options = append(options, store.WithPeerAddr(peerAddr)) } - - result, err := d.node.Store().Query(ctx, *query, options...) - if err != nil { - writeStoreError(w, http.StatusInternalServerError, err) - return + var results []*store.Result + var result *store.Result + if query.PubsubTopic == "" { + results, err = d.node.Store().QueryAutoSharding(ctx, *query, options...) + if err != nil { + writeStoreError(w, http.StatusInternalServerError, err) + return + } + } else { + result, err := d.node.Store().Query(ctx, *query, options...) + if err != nil { + writeStoreError(w, http.StatusInternalServerError, err) + return + } + results = append(results, result) } - + //TODO: How to respond with multiple query results?? writeErrOrResponse(w, nil, toStoreResponse(result)) } diff --git a/waku/v2/protocol/content_filter.go b/waku/v2/protocol/content_filter.go index 7ea7f2a5..f09cf52b 100644 --- a/waku/v2/protocol/content_filter.go +++ b/waku/v2/protocol/content_filter.go @@ -52,23 +52,5 @@ func (cf ContentFilter) Equals(cf1 ContentFilter) bool { // This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics func ContentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[PubsubTopicStr][]ContentTopicStr, error) { - pubSubTopicMap := make(map[string][]string) - - if contentFilter.PubsubTopic != "" { - pubSubTopicMap[contentFilter.PubsubTopic] = contentFilter.ContentTopicsList() - } else { - //Parse the content-Topics to figure out shards. - for _, cTopicString := range contentFilter.ContentTopicsList() { - pTopicStr, err := GetPubSubTopicFromContentTopic(cTopicString) - if err != nil { - return nil, err - } - _, ok := pubSubTopicMap[pTopicStr] - if !ok { - pubSubTopicMap[pTopicStr] = []string{} - } - pubSubTopicMap[pTopicStr] = append(pubSubTopicMap[pTopicStr], cTopicString) - } - } - return pubSubTopicMap, nil + return GeneratePubsubToContentTopicMap(contentFilter.PubsubTopic, contentFilter.ContentTopicsList()) } diff --git a/waku/v2/protocol/shard.go b/waku/v2/protocol/shard.go index 3f4779d2..66ec5fdc 100644 --- a/waku/v2/protocol/shard.go +++ b/waku/v2/protocol/shard.go @@ -245,3 +245,26 @@ func GetPubSubTopicFromContentTopic(cTopicString string) (string, error) { return pTopic.String(), nil } + +func GeneratePubsubToContentTopicMap(pubsubTopic string, contentTopics []string) (map[string][]string, error) { + + pubSubTopicMap := make(map[string][]string, 0) + + if pubsubTopic == "" { + //Should we derive pubsub topic from contentTopic so that peer selection and discovery can be done accordingly? + for _, cTopic := range contentTopics { + pTopic, err := GetPubSubTopicFromContentTopic(cTopic) + if err != nil { + return nil, err + } + _, ok := pubSubTopicMap[pTopic] + if !ok { + pubSubTopicMap[pTopic] = []string{} + } + pubSubTopicMap[pTopic] = append(pubSubTopicMap[pTopic], cTopic) + } + } else { + pubSubTopicMap[pubsubTopic] = append(pubSubTopicMap[pubsubTopic], contentTopics...) + } + return pubSubTopicMap, nil +} diff --git a/waku/v2/protocol/store/waku_store_client.go b/waku/v2/protocol/store/waku_store_client.go index ff122291..03758e4b 100644 --- a/waku/v2/protocol/store/waku_store_client.go +++ b/waku/v2/protocol/store/waku_store_client.go @@ -4,14 +4,19 @@ import ( "context" "encoding/hex" "errors" + "fmt" "math" + "strings" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-msgio/pbio" + "github.com/multiformats/go-multiaddr" "go.uber.org/zap" + "golang.org/x/exp/maps" "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" @@ -82,6 +87,7 @@ type criteriaFN = func(msg *wpb.WakuMessage) (bool, error) type HistoryRequestParameters struct { selectedPeer peer.ID + peerAddr multiaddr.Multiaddr peerSelectionType peermanager.PeerSelection preferredPeers peer.IDSlice localQuery bool @@ -102,6 +108,12 @@ func WithPeer(p peer.ID) HistoryRequestOption { } } +func WithPeerAddr(pAddr multiaddr.Multiaddr) HistoryRequestOption { + return func(params *HistoryRequestParameters) { + params.peerAddr = pAddr + } +} + // WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store // to request the message history. If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a peer @@ -246,6 +258,47 @@ func (store *WakuStore) localQuery(historyQuery *pb.HistoryRPC) (*pb.HistoryResp return historyResponseRPC.Response, nil } +func (store *WakuStore) QueryAutoSharding(ctx context.Context, query Query, opts ...HistoryRequestOption) ([]*Result, error) { + var results []*Result + var failedContentTopics []string + pubSubTopicMap, err := protocol.GeneratePubsubToContentTopicMap(query.PubsubTopic, query.ContentTopics) + if err != nil { + return nil, err + } + + //Duplicate processing of opts, which is done again in Query function below + // Not sure how to handle this, hence leaving it as of now + params := new(HistoryRequestParameters) + var optList []HistoryRequestOption + for _, opt := range optList { + opt(params) + } + + //Add Peer to peerstore. + if store.pm != nil && params.peerAddr != nil { + _, err = store.pm.AddPeer(params.peerAddr, peerstore.Static, maps.Keys(pubSubTopicMap), StoreID_v20beta4) + if err != nil { + return nil, err + } + } + + for pubSubTopic, cTopics := range pubSubTopicMap { + query.ContentTopics = cTopics + query.PubsubTopic = pubSubTopic + //Invoke Query separately + result, err := store.Query(ctx, query, opts...) + if err != nil { + failedContentTopics = append(failedContentTopics, cTopics...) + } + results = append(results, result) + } + if len(failedContentTopics) > 0 { + return results, fmt.Errorf("subscriptions failed for contentTopics: %s", strings.Join(failedContentTopics, ",")) + } else { + return results, nil + } +} + func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) { params := new(HistoryRequestParameters) params.s = store @@ -256,10 +309,6 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR opt(params) } - //if query.PubsubTopic == "" { - //Should we derive pubsub topic from contentTopic so that peer selection and discovery can be done accordingly? - //} - if store.pm != nil && params.selectedPeer == "" { var err error params.selectedPeer, err = store.pm.SelectPeer( diff --git a/waku/v2/protocol/store/waku_store_protocol.go b/waku/v2/protocol/store/waku_store_protocol.go index 195fa7c6..8fdbb59d 100644 --- a/waku/v2/protocol/store/waku_store_protocol.go +++ b/waku/v2/protocol/store/waku_store_protocol.go @@ -84,6 +84,7 @@ type Store interface { SetHost(h host.Host) Start(context.Context, *relay.Subscription) error Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) + QueryAutoSharding(ctx context.Context, query Query, opts ...HistoryRequestOption) ([]*Result, error) Find(ctx context.Context, query Query, cb criteriaFN, opts ...HistoryRequestOption) (*wpb.WakuMessage, error) Next(ctx context.Context, r *Result) (*Result, error) Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error)