diff --git a/waku/v2/api/common/storenode_requestor.go b/waku/v2/api/common/storenode_requestor.go new file mode 100644 index 00000000..8a723c9e --- /dev/null +++ b/waku/v2/api/common/storenode_requestor.go @@ -0,0 +1,12 @@ +package common + +import ( + "context" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" +) + +type StorenodeRequestor interface { + Query(ctx context.Context, peerID peer.ID, query *pb.StoreQueryRequest) (StoreRequestResult, error) +} diff --git a/waku/v2/api/history/cycle.go b/waku/v2/api/history/cycle.go index b6571dfc..9404579b 100644 --- a/waku/v2/api/history/cycle.go +++ b/waku/v2/api/history/cycle.go @@ -393,14 +393,10 @@ func (m *StorenodeCycle) SetStorenodeConfigProvider(provider StorenodeConfigProv m.storenodeConfigProvider = provider } -func (m *StorenodeCycle) WaitForAvailableStoreNode(ctx context.Context, timeout time.Duration) bool { - // Add 1 second to timeout, because the storenode cycle has 1 second ticker, which doesn't tick on start. +func (m *StorenodeCycle) WaitForAvailableStoreNode(ctx context.Context) bool { + // Note: Add 1 second to timeout, because the storenode cycle has 1 second ticker, which doesn't tick on start. // This can be improved after merging https://github.com/status-im/status-go/pull/4380. // NOTE: https://stackoverflow.com/questions/32705582/how-to-get-time-tick-to-tick-immediately - timeout += time.Second - - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() wg := sync.WaitGroup{} wg.Add(1) @@ -410,7 +406,18 @@ func (m *StorenodeCycle) WaitForAvailableStoreNode(ctx context.Context, timeout select { case <-m.StorenodeAvailableOneshotEmitter.Subscribe(): case <-ctx.Done(): + if errors.Is(ctx.Err(), context.Canceled) { + return + } + + // Wait for an additional second, but handle cancellation + select { + case <-time.After(1 * time.Second): + case <-ctx.Done(): // context was cancelled + } + return + } } }() @@ -418,6 +425,11 @@ func (m *StorenodeCycle) WaitForAvailableStoreNode(ctx context.Context, timeout select { case <-waitForWaitGroup(&wg): case <-ctx.Done(): + // Wait for an additional second, but handle cancellation + select { + case <-time.After(1 * time.Second): + case <-ctx.Done(): // context was cancelled o + } } return m.IsStorenodeAvailable(m.activeStorenode) diff --git a/waku/v2/api/history/history.go b/waku/v2/api/history/history.go index e95f01a5..c31a79da 100644 --- a/waku/v2/api/history/history.go +++ b/waku/v2/api/history/history.go @@ -2,6 +2,7 @@ package history import ( "context" + "encoding/hex" "errors" "math" "sync" @@ -10,8 +11,12 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/api/common" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/store" + "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" + "google.golang.org/protobuf/proto" + "go.uber.org/zap" ) @@ -25,7 +30,7 @@ type work struct { } type HistoryRetriever struct { - store Store + store common.StorenodeRequestor logger *zap.Logger historyProcessor HistoryProcessor } @@ -35,11 +40,7 @@ type HistoryProcessor interface { OnRequestFailed(requestID []byte, peerID peer.ID, err error) } -type Store interface { - Query(ctx context.Context, criteria store.FilterCriteria, opts ...store.RequestOption) (store.Result, error) -} - -func NewHistoryRetriever(store Store, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever { +func NewHistoryRetriever(store common.StorenodeRequestor, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever { return &HistoryRetriever{ store: store, logger: logger.Named("history-retriever"), @@ -257,12 +258,6 @@ func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID pee requestID := protocol.GenerateRequestID() logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerID)) - opts := []store.RequestOption{ - store.WithPaging(false, limit), - store.WithRequestID(requestID), - store.WithPeer(peerID), - store.WithCursor(cursor)} - logger.Debug("store.query", logging.Timep("startTime", criteria.TimeStart), logging.Timep("endTime", criteria.TimeEnd), @@ -271,8 +266,19 @@ func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID pee zap.String("cursor", hexutil.Encode(cursor)), ) + storeQueryRequest := &pb.StoreQueryRequest{ + RequestId: hex.EncodeToString(requestID), + IncludeData: true, + PubsubTopic: &criteria.PubsubTopic, + ContentTopics: criteria.ContentTopicsList(), + TimeStart: criteria.TimeStart, + TimeEnd: criteria.TimeEnd, + PaginationCursor: cursor, + PaginationLimit: proto.Uint64(limit), + } + queryStart := time.Now() - result, err := hr.store.Query(ctx, criteria, opts...) + result, err := hr.store.Query(ctx, peerID, storeQueryRequest) queryDuration := time.Since(queryStart) if err != nil { logger.Error("error querying storenode", zap.Error(err)) diff --git a/waku/v2/api/history/history_test.go b/waku/v2/api/history/history_test.go index d256850a..460b25d2 100644 --- a/waku/v2/api/history/history_test.go +++ b/waku/v2/api/history/history_test.go @@ -13,6 +13,7 @@ import ( "github.com/google/uuid" "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/v2/api/common" "github.com/waku-org/go-waku/waku/v2/protocol" proto_pb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" @@ -91,14 +92,10 @@ func getInitialResponseKey(contentTopics []string) string { return hex.EncodeToString(append([]byte("start"), []byte(contentTopics[0])...)) } -func (t *mockStore) Query(ctx context.Context, criteria store.FilterCriteria, opts ...store.RequestOption) (store.Result, error) { - params := store.Parameters{} - for _, opt := range opts { - _ = opt(¶ms) - } +func (t *mockStore) Query(ctx context.Context, peerID peer.ID, storeQueryRequest *pb.StoreQueryRequest) (common.StoreRequestResult, error) { result := &mockResult{} - if params.Cursor() == nil { - initialResponse := getInitialResponseKey(criteria.ContentTopicsList()) + if len(storeQueryRequest.GetPaginationCursor()) == 0 { + initialResponse := getInitialResponseKey(storeQueryRequest.GetContentTopics()) response := t.queryResponses[initialResponse] if response.err != nil { return nil, response.err @@ -106,7 +103,7 @@ func (t *mockStore) Query(ctx context.Context, criteria store.FilterCriteria, op result.cursor = response.cursor result.messages = response.messages } else { - response := t.queryResponses[hex.EncodeToString(params.Cursor())] + response := t.queryResponses[hex.EncodeToString(storeQueryRequest.GetPaginationCursor())] if response.err != nil { return nil, response.err } diff --git a/waku/v2/api/missing/default_requestor.go b/waku/v2/api/missing/default_requestor.go index 248d61c6..38282173 100644 --- a/waku/v2/api/missing/default_requestor.go +++ b/waku/v2/api/missing/default_requestor.go @@ -5,12 +5,12 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/api/common" - "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" + storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" ) -func NewDefaultStorenodeRequestor(store *store.WakuStore) StorenodeRequestor { +func NewDefaultStorenodeRequestor(store *store.WakuStore) common.StorenodeRequestor { return &defaultStorenodeRequestor{ store: store, } @@ -24,10 +24,6 @@ func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerI return d.store.QueryByHash(ctx, messageHashes, store.WithPeer(peerID), store.WithPaging(false, pageSize)) } -func (d *defaultStorenodeRequestor) QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.StoreRequestResult, error) { - return d.store.Query(ctx, store.FilterCriteria{ - ContentFilter: protocol.NewContentFilter(pubsubTopic, contentTopics...), - TimeStart: from, - TimeEnd: to, - }, store.WithPeer(peerID), store.WithPaging(false, pageSize), store.IncludeData(false)) +func (d *defaultStorenodeRequestor) Query(ctx context.Context, peerID peer.ID, storeQueryRequest *storepb.StoreQueryRequest) (common.StoreRequestResult, error) { + return d.store.RequestRaw(ctx, peerID, storeQueryRequest) } diff --git a/waku/v2/api/missing/missing_messages.go b/waku/v2/api/missing/missing_messages.go index 1af991eb..72ac4f9f 100644 --- a/waku/v2/api/missing/missing_messages.go +++ b/waku/v2/api/missing/missing_messages.go @@ -14,6 +14,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/api/common" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -31,17 +32,12 @@ type MessageTracker interface { MessageExists(pb.MessageHash) (bool, error) } -type StorenodeRequestor interface { - GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error) - QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.StoreRequestResult, error) -} - // MissingMessageVerifier is used to periodically retrieve missing messages from store nodes that have some specific criteria type MissingMessageVerifier struct { ctx context.Context params missingMessageVerifierParams - storenodeRequestor StorenodeRequestor + storenodeRequestor common.StorenodeRequestor messageTracker MessageTracker criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages @@ -54,7 +50,7 @@ type MissingMessageVerifier struct { } // NewMissingMessageVerifier creates an instance of a MissingMessageVerifier -func NewMissingMessageVerifier(storenodeRequester StorenodeRequestor, messageTracker MessageTracker, timesource timesource.Timesource, logger *zap.Logger, options ...MissingMessageVerifierOption) *MissingMessageVerifier { +func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, messageTracker MessageTracker, timesource timesource.Timesource, logger *zap.Logger, options ...MissingMessageVerifierOption) *MissingMessageVerifier { options = append(defaultMissingMessagesVerifierOptions, options...) params := missingMessageVerifierParams{} for _, opt := range options { @@ -219,14 +215,19 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, ) result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) { - return m.storenodeRequestor.QueryWithCriteria( + storeQueryRequest := &storepb.StoreQueryRequest{ + RequestId: hex.EncodeToString(protocol.GenerateRequestID()), + PubsubTopic: &interest.contentFilter.PubsubTopic, + ContentTopics: contentTopics[batchFrom:batchTo], + TimeStart: proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()), + TimeEnd: proto.Int64(now.Add(-m.params.delay).UnixNano()), + PaginationLimit: proto.Uint64(messageFetchPageSize), + } + + return m.storenodeRequestor.Query( ctx, interest.peerID, - messageFetchPageSize, - interest.contentFilter.PubsubTopic, - contentTopics[batchFrom:batchTo], - proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()), - proto.Int64(now.Add(-m.params.delay).UnixNano()), + storeQueryRequest, ) }, logger, "retrieving history to check for missing messages") if err != nil { @@ -295,7 +296,20 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) { queryCtx, cancel := context.WithTimeout(ctx, m.params.storeQueryTimeout) defer cancel() - return m.storenodeRequestor.GetMessagesByHash(queryCtx, interest.peerID, maxMsgHashesPerRequest, messageHashes) + + var messageHashesBytes [][]byte + for _, m := range messageHashes { + messageHashesBytes = append(messageHashesBytes, m.Bytes()) + } + + storeQueryRequest := &storepb.StoreQueryRequest{ + RequestId: hex.EncodeToString(protocol.GenerateRequestID()), + IncludeData: true, + MessageHashes: messageHashesBytes, + PaginationLimit: proto.Uint64(maxMsgHashesPerRequest), + } + + return m.storenodeRequestor.Query(queryCtx, interest.peerID, storeQueryRequest) }, logger, "retrieving missing messages") if err != nil { if !errors.Is(err, context.Canceled) { diff --git a/waku/v2/protocol/store/client.go b/waku/v2/protocol/store/client.go index 6b3c9b2e..febb863e 100644 --- a/waku/v2/protocol/store/client.go +++ b/waku/v2/protocol/store/client.go @@ -194,6 +194,35 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ return result, nil } +func (s *WakuStore) RequestRaw(ctx context.Context, peerID peer.ID, storeRequest *pb.StoreQueryRequest) (Result, error) { + err := storeRequest.Validate() + if err != nil { + return nil, err + } + + var params Parameters + params.selectedPeer = peerID + if params.selectedPeer == "" { + return nil, ErrMustSelectPeer + } + + response, err := s.queryFrom(ctx, storeRequest, ¶ms) + if err != nil { + return nil, err + } + + result := &resultImpl{ + store: s, + messages: response.Messages, + storeRequest: storeRequest, + storeResponse: response, + peerID: params.selectedPeer, + cursor: response.PaginationCursor, + } + + return result, nil +} + // Query retrieves all the messages that match a criteria. Use the options to indicate whether to return the message themselves or not. func (s *WakuStore) Query(ctx context.Context, criteria FilterCriteria, opts ...RequestOption) (Result, error) { return s.Request(ctx, criteria, opts...)