chore: add tests and mocks

This commit is contained in:
Richard Ramos 2024-09-30 17:32:42 -04:00
parent 0194a126e3
commit da402dc415
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
9 changed files with 365 additions and 28 deletions

View File

@ -182,7 +182,6 @@ func poolSize(fleetSize int) int {
}
func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, allStorenodes []peer.ID) []peer.ID {
// TODO: this can be replaced by peer selector once code is moved to go-waku api
availableStorenodes := make(map[peer.ID]time.Duration)
availableStorenodesMutex := sync.Mutex{}
availableStorenodesWg := sync.WaitGroup{}

View File

@ -26,7 +26,6 @@ func (s *Emitter[T]) Emit(value T) {
for _, subs := range s.subscriptions {
subs <- value
}
s.subscriptions = nil
}
type OneShotEmitter[T any] struct {

View File

@ -0,0 +1,67 @@
package history
import (
"sync"
"testing"
"github.com/stretchr/testify/require"
)
func TestEmitter(t *testing.T) {
emitter := NewEmitter[int]()
subscr1 := emitter.Subscribe()
subscr2 := emitter.Subscribe()
wg := sync.WaitGroup{}
wg.Add(3)
go func() {
defer wg.Done()
emitter.Emit(1)
emitter.Emit(2)
}()
go func() {
defer wg.Done()
require.Equal(t, 1, <-subscr1)
require.Equal(t, 2, <-subscr1)
}()
go func() {
defer wg.Done()
require.Equal(t, 1, <-subscr2)
require.Equal(t, 2, <-subscr2)
}()
wg.Wait()
}
func TestOneShotEmitter(t *testing.T) {
emitter := NewOneshotEmitter[struct{}]()
subscr1 := emitter.Subscribe()
subscr2 := emitter.Subscribe()
wg := sync.WaitGroup{}
wg.Add(3)
go func() {
defer wg.Done()
emitter.Emit(struct{}{})
}()
go func() {
defer wg.Done()
for range subscr1 {
}
}()
go func() {
defer wg.Done()
for range subscr2 {
}
}()
wg.Wait()
}

View File

@ -25,7 +25,7 @@ type work struct {
}
type HistoryRetriever struct {
store *store.WakuStore
store Store
logger *zap.Logger
historyProcessor HistoryProcessor
}
@ -35,7 +35,11 @@ type HistoryProcessor interface {
OnRequestFailed(requestID []byte, peerID peer.ID, err error)
}
func NewHistoryRetriever(store *store.WakuStore, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever {
type Store interface {
Query(ctx context.Context, criteria store.FilterCriteria, opts ...store.RequestOption) (store.Result, error)
}
func NewHistoryRetriever(store Store, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever {
return &HistoryRetriever{
store: store,
logger: logger.Named("history-retriever"),

View File

@ -0,0 +1,254 @@
package history
import (
"context"
"crypto/rand"
"encoding/hex"
"errors"
"math/big"
"sort"
"testing"
"time"
"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/protocol"
proto_pb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"google.golang.org/protobuf/proto"
"github.com/waku-org/go-waku/waku/v2/utils"
)
type queryResponse struct {
contentTopics []string
messages []*pb.WakuMessageKeyValue
err error // Indicates if this response will simulate an error returned by SendMessagesRequestForTopics
cursor []byte
}
type mockResult struct {
cursor []byte
messages []*pb.WakuMessageKeyValue
}
func (r *mockResult) Cursor() []byte {
return r.cursor
}
func (r *mockResult) Messages() []*pb.WakuMessageKeyValue {
return r.messages
}
func (r *mockResult) IsComplete() bool {
return false
}
func (r *mockResult) PeerID() peer.ID {
return ""
}
func (r *mockResult) Query() *pb.StoreQueryRequest {
return nil
}
func (r *mockResult) Response() *pb.StoreQueryResponse {
return nil
}
func (r *mockResult) Next(ctx context.Context, opts ...store.RequestOption) error {
return nil
}
type mockHistoryProcessor struct {
}
func (h *mockHistoryProcessor) OnEnvelope(env *protocol.Envelope, processEnvelopes bool) error {
return nil
}
func (h *mockHistoryProcessor) OnRequestFailed(requestID []byte, peerID peer.ID, err error) {
}
func newMockHistoryProcessor() *mockHistoryProcessor {
return &mockHistoryProcessor{}
}
type mockStore struct {
queryResponses map[string]queryResponse
}
func newMockStore() *mockStore {
return &mockStore{
queryResponses: make(map[string]queryResponse),
}
}
func getInitialResponseKey(contentTopics []string) string {
sort.Strings(contentTopics)
return hex.EncodeToString(append([]byte("start"), []byte(contentTopics[0])...))
}
func (t *mockStore) Query(ctx context.Context, criteria store.FilterCriteria, opts ...store.RequestOption) (store.Result, error) {
params := store.Parameters{}
for _, opt := range opts {
_ = opt(&params)
}
result := &mockResult{}
if params.Cursor() == nil {
initialResponse := getInitialResponseKey(criteria.ContentTopicsList())
response := t.queryResponses[initialResponse]
if response.err != nil {
return nil, response.err
}
result.cursor = response.cursor
result.messages = response.messages
} else {
response := t.queryResponses[hex.EncodeToString(params.Cursor())]
if response.err != nil {
return nil, response.err
}
result.cursor = response.cursor
result.messages = response.messages
}
return result, nil
}
func (t *mockStore) Populate(topics []string, responses int, includeRandomError bool) error {
if responses <= 0 || len(topics) == 0 {
return errors.New("invalid input parameters")
}
var topicBatches [][]string
for i := 0; i < len(topics); i += maxTopicsPerRequest {
// Split batch in 10-contentTopic subbatches
j := i + maxTopicsPerRequest
if j > len(topics) {
j = len(topics)
}
topicBatches = append(topicBatches, topics[i:j])
}
randomErrIdx, err := rand.Int(rand.Reader, big.NewInt(int64(len(topicBatches))))
if err != nil {
return err
}
randomErrIdxInt := int(randomErrIdx.Int64())
for i, topicBatch := range topicBatches {
// Setup initial response
initialResponseKey := getInitialResponseKey(topicBatch)
t.queryResponses[initialResponseKey] = queryResponse{
contentTopics: topicBatch,
messages: []*pb.WakuMessageKeyValue{
{
MessageHash: protocol.GenerateRequestID(),
Message: &proto_pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "abc",
Timestamp: proto.Int64(time.Now().UnixNano()),
},
PubsubTopic: proto.String("test"),
},
},
err: nil,
}
prevKey := initialResponseKey
for x := 0; x < responses-1; x++ {
newResponseCursor := []byte(uuid.New().String())
newResponseKey := hex.EncodeToString(newResponseCursor)
var err error
if includeRandomError && i == randomErrIdxInt && x == responses-2 { // Include an error in last request
err = errors.New("random error")
}
t.queryResponses[newResponseKey] = queryResponse{
contentTopics: topicBatch,
messages: []*pb.WakuMessageKeyValue{
{
MessageHash: protocol.GenerateRequestID(),
Message: &proto_pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "abc",
Timestamp: proto.Int64(time.Now().UnixNano()),
},
PubsubTopic: proto.String("test"),
},
},
err: err,
}
// Updating prev response cursor to point to the new response
prevResponse := t.queryResponses[prevKey]
prevResponse.cursor = newResponseCursor
t.queryResponses[prevKey] = prevResponse
prevKey = newResponseKey
}
}
return nil
}
func TestSuccessBatchExecution(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
storenodeID, err := peer.Decode("16Uiu2HAkw3x97MbbZSWHbdF5bob45vcZvPPK4s4Mjyv2mxyB9GS3")
require.NoError(t, err)
topics := []string{}
for i := 0; i < 50; i++ {
topics = append(topics, uuid.NewString())
}
testStore := newMockStore()
err = testStore.Populate(topics, 10, false)
require.NoError(t, err)
historyProcessor := newMockHistoryProcessor()
historyRetriever := NewHistoryRetriever(testStore, historyProcessor, utils.Logger())
criteria := store.FilterCriteria{
ContentFilter: protocol.NewContentFilter("test", topics...),
}
err = historyRetriever.Query(ctx, criteria, storenodeID, 10, func(i int) (bool, uint64) { return true, 10 }, true)
require.NoError(t, err)
}
func TestFailedBatchExecution(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
storenodeID, err := peer.Decode("16Uiu2HAkw3x97MbbZSWHbdF5bob45vcZvPPK4s4Mjyv2mxyB9GS3")
require.NoError(t, err)
topics := []string{}
for i := 0; i < 2; i++ {
topics = append(topics, uuid.NewString())
}
testStore := newMockStore()
err = testStore.Populate(topics, 10, true)
require.NoError(t, err)
historyProcessor := newMockHistoryProcessor()
historyRetriever := NewHistoryRetriever(testStore, historyProcessor, utils.Logger())
criteria := store.FilterCriteria{
ContentFilter: protocol.NewContentFilter("test", topics...),
}
err = historyRetriever.Query(ctx, criteria, storenodeID, 10, func(i int) (bool, uint64) { return true, 10 }, true)
require.Error(t, err)
}

View File

@ -178,7 +178,7 @@ func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, inter
}
}
func (m *MissingMessageVerifier) storeQueryWithRetry(ctx context.Context, queryFunc func(ctx context.Context) (*store.Result, error), logger *zap.Logger, logMsg string) (*store.Result, error) {
func (m *MissingMessageVerifier) storeQueryWithRetry(ctx context.Context, queryFunc func(ctx context.Context) (store.Result, error), logger *zap.Logger, logMsg string) (store.Result, error) {
retry := true
count := 1
for retry && count <= m.params.maxAttemptsToRetrieveHistory {
@ -212,7 +212,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
logging.Epoch("to", now),
)
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (store.Result, error) {
return m.store.Query(ctx, store.FilterCriteria{
ContentFilter: protocol.NewContentFilter(interest.contentFilter.PubsubTopic, contentTopics[batchFrom:batchTo]...),
TimeStart: proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()),
@ -243,7 +243,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
missingHashes = append(missingHashes, hash)
}
result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (store.Result, error) {
if err = result.Next(ctx); err != nil {
return nil, err
}
@ -282,7 +282,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
defer utils.LogOnPanic()
defer wg.Wait()
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (store.Result, error) {
queryCtx, cancel := context.WithTimeout(ctx, m.params.storeQueryTimeout)
defer cancel()
return m.store.QueryByHash(queryCtx, messageHashes, store.WithPeer(interest.peerID), store.WithPaging(false, maxMsgHashesPerRequest))
@ -303,7 +303,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
}
}
result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (store.Result, error) {
if err = result.Next(ctx); err != nil {
return nil, err
}

View File

@ -99,7 +99,7 @@ func (s *WakuStore) SetHost(h host.Host) {
// Request is used to send a store query. This function requires understanding how to prepare a store query
// and most of the time you can use `Query`, `QueryByHash` and `Exists` instead, as they provide
// a simpler API
func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...RequestOption) (*Result, error) {
func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...RequestOption) (Result, error) {
params := new(Parameters)
optList := DefaultOptions()
@ -182,7 +182,7 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ
return nil, err
}
result := &Result{
result := &resultImpl{
store: s,
messages: response.Messages,
storeRequest: storeRequest,
@ -195,12 +195,12 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ
}
// Query retrieves all the messages that match a criteria. Use the options to indicate whether to return the message themselves or not.
func (s *WakuStore) Query(ctx context.Context, criteria FilterCriteria, opts ...RequestOption) (*Result, error) {
func (s *WakuStore) Query(ctx context.Context, criteria FilterCriteria, opts ...RequestOption) (Result, error) {
return s.Request(ctx, criteria, opts...)
}
// Query retrieves all the messages with specific message hashes
func (s *WakuStore) QueryByHash(ctx context.Context, messageHashes []wpb.MessageHash, opts ...RequestOption) (*Result, error) {
func (s *WakuStore) QueryByHash(ctx context.Context, messageHashes []wpb.MessageHash, opts ...RequestOption) (Result, error) {
return s.Request(ctx, MessageHashCriteria{messageHashes}, opts...)
}
@ -214,17 +214,17 @@ func (s *WakuStore) Exists(ctx context.Context, messageHash wpb.MessageHash, opt
return false, err
}
return len(result.messages) != 0, nil
return len(result.Messages()) != 0, nil
}
func (s *WakuStore) next(ctx context.Context, r *Result, opts ...RequestOption) (*Result, error) {
func (s *WakuStore) next(ctx context.Context, r Result, opts ...RequestOption) (*resultImpl, error) {
if r.IsComplete() {
return &Result{
return &resultImpl{
store: s,
messages: nil,
cursor: nil,
storeRequest: r.storeRequest,
storeResponse: r.storeResponse,
storeRequest: r.Query(),
storeResponse: r.Response(),
peerID: r.PeerID(),
}, nil
}
@ -240,7 +240,7 @@ func (s *WakuStore) next(ctx context.Context, r *Result, opts ...RequestOption)
}
}
storeRequest := proto.Clone(r.storeRequest).(*pb.StoreQueryRequest)
storeRequest := proto.Clone(r.Query()).(*pb.StoreQueryRequest)
storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID())
storeRequest.PaginationCursor = r.Cursor()
@ -249,7 +249,7 @@ func (s *WakuStore) next(ctx context.Context, r *Result, opts ...RequestOption)
return nil, err
}
result := &Result{
result := &resultImpl{
store: s,
messages: response.Messages,
storeRequest: storeRequest,

View File

@ -22,6 +22,10 @@ type Parameters struct {
skipRatelimit bool
}
func (p *Parameters) Cursor() []byte {
return p.cursor
}
type RequestOption func(*Parameters) error
// WithPeer is an option used to specify the peerID to request the message history.

View File

@ -8,7 +8,17 @@ import (
)
// Result represents a valid response from a store node
type Result struct {
type Result interface {
Cursor() []byte
IsComplete() bool
PeerID() peer.ID
Query() *pb.StoreQueryRequest
Response() *pb.StoreQueryResponse
Next(ctx context.Context, opts ...RequestOption) error
Messages() []*pb.WakuMessageKeyValue
}
type resultImpl struct {
done bool
messages []*pb.WakuMessageKeyValue
@ -19,27 +29,27 @@ type Result struct {
peerID peer.ID
}
func (r *Result) Cursor() []byte {
func (r *resultImpl) Cursor() []byte {
return r.cursor
}
func (r *Result) IsComplete() bool {
func (r *resultImpl) IsComplete() bool {
return r.done
}
func (r *Result) PeerID() peer.ID {
func (r *resultImpl) PeerID() peer.ID {
return r.peerID
}
func (r *Result) Query() *pb.StoreQueryRequest {
func (r *resultImpl) Query() *pb.StoreQueryRequest {
return r.storeRequest
}
func (r *Result) Response() *pb.StoreQueryResponse {
func (r *resultImpl) Response() *pb.StoreQueryResponse {
return r.storeResponse
}
func (r *Result) Next(ctx context.Context, opts ...RequestOption) error {
func (r *resultImpl) Next(ctx context.Context, opts ...RequestOption) error {
if r.cursor == nil {
r.done = true
r.messages = nil
@ -57,6 +67,6 @@ func (r *Result) Next(ctx context.Context, opts ...RequestOption) error {
return nil
}
func (r *Result) Messages() []*pb.WakuMessageKeyValue {
func (r *resultImpl) Messages() []*pb.WakuMessageKeyValue {
return r.messages
}