diff --git a/cmd/waku/server/rest/store.go b/cmd/waku/server/rest/store.go index 280669e2..d131219b 100644 --- a/cmd/waku/server/rest/store.go +++ b/cmd/waku/server/rest/store.go @@ -74,7 +74,7 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store options = append(options, store.WithPeer(peerID)) - query.Topic = r.URL.Query().Get("pubsubTopic") + query.PubsubTopic = r.URL.Query().Get("pubsubTopic") contentTopics := r.URL.Query().Get("contentTopics") if contentTopics != "" { @@ -127,7 +127,7 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store } } - cursor.PubsubTopic = query.Topic + cursor.PubsubTopic = query.PubsubTopic options = append(options, store.WithCursor(cursor)) } diff --git a/cmd/waku/server/rpc/store.go b/cmd/waku/server/rpc/store.go index 1a29399d..c6b80592 100644 --- a/cmd/waku/server/rpc/store.go +++ b/cmd/waku/server/rpc/store.go @@ -48,7 +48,7 @@ func (s *StoreService) GetV1Messages(req *http.Request, args *StoreMessagesArgs, res, err := s.node.Store().Query( req.Context(), store.Query{ - Topic: args.Topic, + PubsubTopic: args.Topic, ContentTopics: args.ContentFilters, StartTime: args.StartTime, EndTime: args.EndTime, diff --git a/library/store.go b/library/store.go index 837e1072..1f125d7c 100644 --- a/library/store.go +++ b/library/store.go @@ -39,7 +39,7 @@ func queryResponse(ctx context.Context, args storeMessagesArgs, options []store. res, err := wakuState.node.Store().Query( ctx, store.Query{ - Topic: args.Topic, + PubsubTopic: args.Topic, ContentTopics: args.ContentTopics, StartTime: args.StartTime, EndTime: args.EndTime, diff --git a/waku/v2/protocol/store/waku_store_client.go b/waku/v2/protocol/store/waku_store_client.go index d1f25e6a..aa60a23d 100644 --- a/waku/v2/protocol/store/waku_store_client.go +++ b/waku/v2/protocol/store/waku_store_client.go @@ -18,7 +18,7 @@ import ( ) type Query struct { - Topic string + PubsubTopic string ContentTopics []string StartTime int64 EndTime int64 @@ -256,13 +256,18 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR for _, opt := range optList { opt(params) } + + //if query.PubsubTopic == "" { + //Should we derive pubsub topic from contentTopic so that peer selection and discovery can be done accordingly? + //} + if store.pm != nil && params.selectedPeer == "" { var err error params.selectedPeer, err = store.pm.SelectPeer( peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: StoreID_v20beta4, - PubsubTopic: query.Topic, + PubsubTopic: query.PubsubTopic, SpecificPeers: params.preferredPeers, Ctx: ctx, }, @@ -275,7 +280,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR historyRequest := &pb.HistoryRPC{ RequestId: hex.EncodeToString(params.requestID), Query: &pb.HistoryQuery{ - PubsubTopic: query.Topic, + PubsubTopic: query.PubsubTopic, ContentFilters: []*pb.ContentFilter{}, StartTime: query.StartTime, EndTime: query.EndTime, diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index 9929f6e9..b4a5325d 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -71,7 +71,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { require.NoError(t, err) q := Query{ - Topic: pubsubTopic1, + PubsubTopic: pubsubTopic1, ContentTopics: []string{topic1}, } @@ -111,7 +111,7 @@ func TestWakuStoreProtocolLocalQuery(t *testing.T) { time.Sleep(100 * time.Millisecond) q := Query{ - Topic: pubsubTopic1, + PubsubTopic: pubsubTopic1, ContentTopics: []string{topic1}, } response, err := s1.Query(ctx, q, WithLocalQuery()) @@ -168,7 +168,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { defer s2.Stop() q := Query{ - Topic: pubsubTopic1, + PubsubTopic: pubsubTopic1, ContentTopics: []string{topic1}, } @@ -245,7 +245,7 @@ func TestWakuStoreResult(t *testing.T) { defer s2.Stop() q := Query{ - Topic: pubsubTopic1, + PubsubTopic: pubsubTopic1, ContentTopics: []string{topic1}, } @@ -347,7 +347,7 @@ func TestWakuStoreProtocolFind(t *testing.T) { defer s2.Stop() q := Query{ - Topic: pubsubTopic1, + PubsubTopic: pubsubTopic1, } fn := func(msg *pb.WakuMessage) (bool, error) {