diff --git a/waku/v2/api/history/cycle.go b/waku/v2/api/history/cycle.go index e09b2530..6666dcec 100644 --- a/waku/v2/api/history/cycle.go +++ b/waku/v2/api/history/cycle.go @@ -182,7 +182,6 @@ func poolSize(fleetSize int) int { } func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, allStorenodes []peer.ID) []peer.ID { - // TODO: this can be replaced by peer selector once code is moved to go-waku api availableStorenodes := make(map[peer.ID]time.Duration) availableStorenodesMutex := sync.Mutex{} availableStorenodesWg := sync.WaitGroup{} diff --git a/waku/v2/api/history/emitters.go b/waku/v2/api/history/emitters.go index 2aee5476..2d055954 100644 --- a/waku/v2/api/history/emitters.go +++ b/waku/v2/api/history/emitters.go @@ -26,7 +26,6 @@ func (s *Emitter[T]) Emit(value T) { for _, subs := range s.subscriptions { subs <- value } - s.subscriptions = nil } type OneShotEmitter[T any] struct { diff --git a/waku/v2/api/history/emitters_test.go b/waku/v2/api/history/emitters_test.go new file mode 100644 index 00000000..a90eda3b --- /dev/null +++ b/waku/v2/api/history/emitters_test.go @@ -0,0 +1,67 @@ +package history + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestEmitter(t *testing.T) { + emitter := NewEmitter[int]() + + subscr1 := emitter.Subscribe() + subscr2 := emitter.Subscribe() + + wg := sync.WaitGroup{} + wg.Add(3) + + go func() { + defer wg.Done() + emitter.Emit(1) + emitter.Emit(2) + }() + + go func() { + defer wg.Done() + require.Equal(t, 1, <-subscr1) + require.Equal(t, 2, <-subscr1) + }() + + go func() { + defer wg.Done() + require.Equal(t, 1, <-subscr2) + require.Equal(t, 2, <-subscr2) + }() + + wg.Wait() +} + +func TestOneShotEmitter(t *testing.T) { + emitter := NewOneshotEmitter[struct{}]() + + subscr1 := emitter.Subscribe() + subscr2 := emitter.Subscribe() + + wg := sync.WaitGroup{} + wg.Add(3) + + go func() { + defer wg.Done() + emitter.Emit(struct{}{}) + }() + + go func() { + defer wg.Done() + for range subscr1 { + } + }() + + go func() { + defer wg.Done() + for range subscr2 { + } + }() + + wg.Wait() +} diff --git a/waku/v2/api/history/history.go b/waku/v2/api/history/history.go index 3db71a50..d37ddd48 100644 --- a/waku/v2/api/history/history.go +++ b/waku/v2/api/history/history.go @@ -25,7 +25,7 @@ type work struct { } type HistoryRetriever struct { - store *store.WakuStore + store Store logger *zap.Logger historyProcessor HistoryProcessor } @@ -35,7 +35,11 @@ type HistoryProcessor interface { OnRequestFailed(requestID []byte, peerID peer.ID, err error) } -func NewHistoryRetriever(store *store.WakuStore, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever { +type Store interface { + Query(ctx context.Context, criteria store.FilterCriteria, opts ...store.RequestOption) (store.Result, error) +} + +func NewHistoryRetriever(store Store, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever { return &HistoryRetriever{ store: store, logger: logger.Named("history-retriever"), diff --git a/waku/v2/api/history/history_test.go b/waku/v2/api/history/history_test.go new file mode 100644 index 00000000..d256850a --- /dev/null +++ b/waku/v2/api/history/history_test.go @@ -0,0 +1,254 @@ +package history + +import ( + "context" + "crypto/rand" + "encoding/hex" + "errors" + "math/big" + "sort" + "testing" + "time" + + "github.com/google/uuid" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/v2/protocol" + proto_pb "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/store" + "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" + "google.golang.org/protobuf/proto" + + "github.com/waku-org/go-waku/waku/v2/utils" +) + +type queryResponse struct { + contentTopics []string + messages []*pb.WakuMessageKeyValue + err error // Indicates if this response will simulate an error returned by SendMessagesRequestForTopics + cursor []byte +} + +type mockResult struct { + cursor []byte + messages []*pb.WakuMessageKeyValue +} + +func (r *mockResult) Cursor() []byte { + return r.cursor +} + +func (r *mockResult) Messages() []*pb.WakuMessageKeyValue { + return r.messages +} + +func (r *mockResult) IsComplete() bool { + return false +} + +func (r *mockResult) PeerID() peer.ID { + return "" +} + +func (r *mockResult) Query() *pb.StoreQueryRequest { + return nil +} + +func (r *mockResult) Response() *pb.StoreQueryResponse { + return nil +} + +func (r *mockResult) Next(ctx context.Context, opts ...store.RequestOption) error { + return nil +} + +type mockHistoryProcessor struct { +} + +func (h *mockHistoryProcessor) OnEnvelope(env *protocol.Envelope, processEnvelopes bool) error { + return nil +} + +func (h *mockHistoryProcessor) OnRequestFailed(requestID []byte, peerID peer.ID, err error) { +} + +func newMockHistoryProcessor() *mockHistoryProcessor { + return &mockHistoryProcessor{} +} + +type mockStore struct { + queryResponses map[string]queryResponse +} + +func newMockStore() *mockStore { + return &mockStore{ + queryResponses: make(map[string]queryResponse), + } +} + +func getInitialResponseKey(contentTopics []string) string { + sort.Strings(contentTopics) + return hex.EncodeToString(append([]byte("start"), []byte(contentTopics[0])...)) +} + +func (t *mockStore) Query(ctx context.Context, criteria store.FilterCriteria, opts ...store.RequestOption) (store.Result, error) { + params := store.Parameters{} + for _, opt := range opts { + _ = opt(¶ms) + } + result := &mockResult{} + if params.Cursor() == nil { + initialResponse := getInitialResponseKey(criteria.ContentTopicsList()) + response := t.queryResponses[initialResponse] + if response.err != nil { + return nil, response.err + } + result.cursor = response.cursor + result.messages = response.messages + } else { + response := t.queryResponses[hex.EncodeToString(params.Cursor())] + if response.err != nil { + return nil, response.err + } + result.cursor = response.cursor + result.messages = response.messages + } + + return result, nil +} + +func (t *mockStore) Populate(topics []string, responses int, includeRandomError bool) error { + if responses <= 0 || len(topics) == 0 { + return errors.New("invalid input parameters") + } + + var topicBatches [][]string + + for i := 0; i < len(topics); i += maxTopicsPerRequest { + // Split batch in 10-contentTopic subbatches + j := i + maxTopicsPerRequest + if j > len(topics) { + j = len(topics) + } + topicBatches = append(topicBatches, topics[i:j]) + } + + randomErrIdx, err := rand.Int(rand.Reader, big.NewInt(int64(len(topicBatches)))) + if err != nil { + return err + } + randomErrIdxInt := int(randomErrIdx.Int64()) + + for i, topicBatch := range topicBatches { + // Setup initial response + initialResponseKey := getInitialResponseKey(topicBatch) + t.queryResponses[initialResponseKey] = queryResponse{ + contentTopics: topicBatch, + messages: []*pb.WakuMessageKeyValue{ + { + MessageHash: protocol.GenerateRequestID(), + Message: &proto_pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: "abc", + Timestamp: proto.Int64(time.Now().UnixNano()), + }, + PubsubTopic: proto.String("test"), + }, + }, + err: nil, + } + + prevKey := initialResponseKey + for x := 0; x < responses-1; x++ { + newResponseCursor := []byte(uuid.New().String()) + newResponseKey := hex.EncodeToString(newResponseCursor) + + var err error + if includeRandomError && i == randomErrIdxInt && x == responses-2 { // Include an error in last request + err = errors.New("random error") + } + + t.queryResponses[newResponseKey] = queryResponse{ + contentTopics: topicBatch, + messages: []*pb.WakuMessageKeyValue{ + { + MessageHash: protocol.GenerateRequestID(), + Message: &proto_pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: "abc", + Timestamp: proto.Int64(time.Now().UnixNano()), + }, + PubsubTopic: proto.String("test"), + }, + }, + err: err, + } + + // Updating prev response cursor to point to the new response + prevResponse := t.queryResponses[prevKey] + prevResponse.cursor = newResponseCursor + t.queryResponses[prevKey] = prevResponse + + prevKey = newResponseKey + } + + } + + return nil +} + +func TestSuccessBatchExecution(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + + storenodeID, err := peer.Decode("16Uiu2HAkw3x97MbbZSWHbdF5bob45vcZvPPK4s4Mjyv2mxyB9GS3") + require.NoError(t, err) + + topics := []string{} + for i := 0; i < 50; i++ { + topics = append(topics, uuid.NewString()) + } + + testStore := newMockStore() + err = testStore.Populate(topics, 10, false) + require.NoError(t, err) + + historyProcessor := newMockHistoryProcessor() + + historyRetriever := NewHistoryRetriever(testStore, historyProcessor, utils.Logger()) + + criteria := store.FilterCriteria{ + ContentFilter: protocol.NewContentFilter("test", topics...), + } + + err = historyRetriever.Query(ctx, criteria, storenodeID, 10, func(i int) (bool, uint64) { return true, 10 }, true) + require.NoError(t, err) +} + +func TestFailedBatchExecution(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + + storenodeID, err := peer.Decode("16Uiu2HAkw3x97MbbZSWHbdF5bob45vcZvPPK4s4Mjyv2mxyB9GS3") + require.NoError(t, err) + + topics := []string{} + for i := 0; i < 2; i++ { + topics = append(topics, uuid.NewString()) + } + + testStore := newMockStore() + err = testStore.Populate(topics, 10, true) + require.NoError(t, err) + + historyProcessor := newMockHistoryProcessor() + + historyRetriever := NewHistoryRetriever(testStore, historyProcessor, utils.Logger()) + + criteria := store.FilterCriteria{ + ContentFilter: protocol.NewContentFilter("test", topics...), + } + + err = historyRetriever.Query(ctx, criteria, storenodeID, 10, func(i int) (bool, uint64) { return true, 10 }, true) + require.Error(t, err) +} diff --git a/waku/v2/api/missing/missing_messages.go b/waku/v2/api/missing/missing_messages.go index ca8b63fb..095e3241 100644 --- a/waku/v2/api/missing/missing_messages.go +++ b/waku/v2/api/missing/missing_messages.go @@ -178,7 +178,7 @@ func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, inter } } -func (m *MissingMessageVerifier) storeQueryWithRetry(ctx context.Context, queryFunc func(ctx context.Context) (*store.Result, error), logger *zap.Logger, logMsg string) (*store.Result, error) { +func (m *MissingMessageVerifier) storeQueryWithRetry(ctx context.Context, queryFunc func(ctx context.Context) (store.Result, error), logger *zap.Logger, logMsg string) (store.Result, error) { retry := true count := 1 for retry && count <= m.params.maxAttemptsToRetrieveHistory { @@ -212,7 +212,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, logging.Epoch("to", now), ) - result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { + result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (store.Result, error) { return m.store.Query(ctx, store.FilterCriteria{ ContentFilter: protocol.NewContentFilter(interest.contentFilter.PubsubTopic, contentTopics[batchFrom:batchTo]...), TimeStart: proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()), @@ -243,7 +243,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, missingHashes = append(missingHashes, hash) } - result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { + result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (store.Result, error) { if err = result.Next(ctx); err != nil { return nil, err } @@ -282,7 +282,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, defer utils.LogOnPanic() defer wg.Wait() - result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { + result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (store.Result, error) { queryCtx, cancel := context.WithTimeout(ctx, m.params.storeQueryTimeout) defer cancel() return m.store.QueryByHash(queryCtx, messageHashes, store.WithPeer(interest.peerID), store.WithPaging(false, maxMsgHashesPerRequest)) @@ -303,7 +303,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, } } - result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { + result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (store.Result, error) { if err = result.Next(ctx); err != nil { return nil, err } diff --git a/waku/v2/protocol/store/client.go b/waku/v2/protocol/store/client.go index adee966e..090ef8f0 100644 --- a/waku/v2/protocol/store/client.go +++ b/waku/v2/protocol/store/client.go @@ -99,7 +99,7 @@ func (s *WakuStore) SetHost(h host.Host) { // Request is used to send a store query. This function requires understanding how to prepare a store query // and most of the time you can use `Query`, `QueryByHash` and `Exists` instead, as they provide // a simpler API -func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...RequestOption) (*Result, error) { +func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...RequestOption) (Result, error) { params := new(Parameters) optList := DefaultOptions() @@ -182,7 +182,7 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ return nil, err } - result := &Result{ + result := &resultImpl{ store: s, messages: response.Messages, storeRequest: storeRequest, @@ -195,12 +195,12 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ } // Query retrieves all the messages that match a criteria. Use the options to indicate whether to return the message themselves or not. -func (s *WakuStore) Query(ctx context.Context, criteria FilterCriteria, opts ...RequestOption) (*Result, error) { +func (s *WakuStore) Query(ctx context.Context, criteria FilterCriteria, opts ...RequestOption) (Result, error) { return s.Request(ctx, criteria, opts...) } // Query retrieves all the messages with specific message hashes -func (s *WakuStore) QueryByHash(ctx context.Context, messageHashes []wpb.MessageHash, opts ...RequestOption) (*Result, error) { +func (s *WakuStore) QueryByHash(ctx context.Context, messageHashes []wpb.MessageHash, opts ...RequestOption) (Result, error) { return s.Request(ctx, MessageHashCriteria{messageHashes}, opts...) } @@ -214,17 +214,17 @@ func (s *WakuStore) Exists(ctx context.Context, messageHash wpb.MessageHash, opt return false, err } - return len(result.messages) != 0, nil + return len(result.Messages()) != 0, nil } -func (s *WakuStore) next(ctx context.Context, r *Result, opts ...RequestOption) (*Result, error) { +func (s *WakuStore) next(ctx context.Context, r Result, opts ...RequestOption) (*resultImpl, error) { if r.IsComplete() { - return &Result{ + return &resultImpl{ store: s, messages: nil, cursor: nil, - storeRequest: r.storeRequest, - storeResponse: r.storeResponse, + storeRequest: r.Query(), + storeResponse: r.Response(), peerID: r.PeerID(), }, nil } @@ -240,7 +240,7 @@ func (s *WakuStore) next(ctx context.Context, r *Result, opts ...RequestOption) } } - storeRequest := proto.Clone(r.storeRequest).(*pb.StoreQueryRequest) + storeRequest := proto.Clone(r.Query()).(*pb.StoreQueryRequest) storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID()) storeRequest.PaginationCursor = r.Cursor() @@ -249,7 +249,7 @@ func (s *WakuStore) next(ctx context.Context, r *Result, opts ...RequestOption) return nil, err } - result := &Result{ + result := &resultImpl{ store: s, messages: response.Messages, storeRequest: storeRequest, diff --git a/waku/v2/protocol/store/options.go b/waku/v2/protocol/store/options.go index b8deba47..facb3f54 100644 --- a/waku/v2/protocol/store/options.go +++ b/waku/v2/protocol/store/options.go @@ -22,6 +22,10 @@ type Parameters struct { skipRatelimit bool } +func (p *Parameters) Cursor() []byte { + return p.cursor +} + type RequestOption func(*Parameters) error // WithPeer is an option used to specify the peerID to request the message history. diff --git a/waku/v2/protocol/store/result.go b/waku/v2/protocol/store/result.go index 604d6453..f5f70106 100644 --- a/waku/v2/protocol/store/result.go +++ b/waku/v2/protocol/store/result.go @@ -8,7 +8,17 @@ import ( ) // Result represents a valid response from a store node -type Result struct { +type Result interface { + Cursor() []byte + IsComplete() bool + PeerID() peer.ID + Query() *pb.StoreQueryRequest + Response() *pb.StoreQueryResponse + Next(ctx context.Context, opts ...RequestOption) error + Messages() []*pb.WakuMessageKeyValue +} + +type resultImpl struct { done bool messages []*pb.WakuMessageKeyValue @@ -19,27 +29,27 @@ type Result struct { peerID peer.ID } -func (r *Result) Cursor() []byte { +func (r *resultImpl) Cursor() []byte { return r.cursor } -func (r *Result) IsComplete() bool { +func (r *resultImpl) IsComplete() bool { return r.done } -func (r *Result) PeerID() peer.ID { +func (r *resultImpl) PeerID() peer.ID { return r.peerID } -func (r *Result) Query() *pb.StoreQueryRequest { +func (r *resultImpl) Query() *pb.StoreQueryRequest { return r.storeRequest } -func (r *Result) Response() *pb.StoreQueryResponse { +func (r *resultImpl) Response() *pb.StoreQueryResponse { return r.storeResponse } -func (r *Result) Next(ctx context.Context, opts ...RequestOption) error { +func (r *resultImpl) Next(ctx context.Context, opts ...RequestOption) error { if r.cursor == nil { r.done = true r.messages = nil @@ -57,6 +67,6 @@ func (r *Result) Next(ctx context.Context, opts ...RequestOption) error { return nil } -func (r *Result) Messages() []*pb.WakuMessageKeyValue { +func (r *resultImpl) Messages() []*pb.WakuMessageKeyValue { return r.messages }