From 323a6ce6dd0a368faa4a69f6c84d282cc66144a7 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 27 Apr 2023 13:42:31 -0400 Subject: [PATCH] test: processMailserverBatch --- protocol/messenger_mailserver.go | 49 ++++-- ..._mailserver_processMailserverBatch_test.go | 155 ++++++++++++++++++ 2 files changed, 192 insertions(+), 12 deletions(-) create mode 100644 protocol/messenger_mailserver_processMailserverBatch_test.go diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index c276a1b17..24b6a6aa2 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -557,38 +557,51 @@ type work struct { storeCursor *types.StoreRequestCursor } -func (m *Messenger) processMailserverBatch(batch MailserverBatch) error { +type messageRequester interface { + SendMessagesRequestForTopics( + ctx context.Context, + peerID []byte, + from, to uint32, + previousCursor []byte, + previousStoreCursor *types.StoreRequestCursor, + topics []types.TopicType, + waitForResponse bool, + ) (cursor []byte, storeCursor *types.StoreRequestCursor, err error) +} + +func processMailserverBatch(ctx context.Context, messageRequester messageRequester, batch MailserverBatch, mailserverID []byte, logger *zap.Logger) error { var topicStrings []string for _, t := range batch.Topics { topicStrings = append(topicStrings, t.String()) } - logger := m.logger.With(zap.Any("chatIDs", batch.ChatIDs), zap.String("fromString", time.Unix(int64(batch.From), 0).Format(time.RFC3339)), zap.String("toString", time.Unix(int64(batch.To), 0).Format(time.RFC3339)), zap.Any("topic", topicStrings), zap.Int64("from", int64(batch.From)), zap.Int64("to", int64(batch.To))) + logger = logger.With(zap.Any("chatIDs", batch.ChatIDs), zap.String("fromString", time.Unix(int64(batch.From), 0).Format(time.RFC3339)), zap.String("toString", time.Unix(int64(batch.To), 0).Format(time.RFC3339)), zap.Any("topic", topicStrings), zap.Int64("from", int64(batch.From)), zap.Int64("to", int64(batch.To))) logger.Info("syncing topic") - mailserverID, err := m.activeMailserverID() - if err != nil { - return err - } - wg := sync.WaitGroup{} workCh := make(chan work, 100) + errCh := make(chan error) go func() { + defer func() { + close(errCh) + }() + for { select { - case <-m.ctx.Done(): + case <-ctx.Done(): return case w, ok := <-workCh: if !ok { return } - ctx, cancel := context.WithTimeout(m.ctx, mailserverRequestTimeout) - cursor, storeCursor, err := m.transport.SendMessagesRequestForTopics(ctx, mailserverID, batch.From, batch.To, w.cursor, w.storeCursor, w.topics, true) + ctx, cancel := context.WithTimeout(ctx, mailserverRequestTimeout) + cursor, storeCursor, err := messageRequester.SendMessagesRequestForTopics(ctx, mailserverID, batch.From, batch.To, w.cursor, w.storeCursor, w.topics, true) if err != nil { logger.Error("failed to send request", zap.Error(err)) wg.Done() cancel() + errCh <- err return } @@ -628,8 +641,20 @@ func (m *Messenger) processMailserverBatch(batch MailserverBatch) error { // to test it //logger.Info("waiting until message processed") //m.waitUntilP2PMessagesProcessed() - logger.Info("synced topic") - return nil + + result := <-errCh + + logger.Info("synced topic", zap.NamedError("hasError", result)) + return result +} + +func (m *Messenger) processMailserverBatch(batch MailserverBatch) error { + mailserverID, err := m.activeMailserverID() + if err != nil { + return err + } + + return processMailserverBatch(m.ctx, m.transport, batch, mailserverID, m.logger) } type MailserverBatch struct { diff --git a/protocol/messenger_mailserver_processMailserverBatch_test.go b/protocol/messenger_mailserver_processMailserverBatch_test.go new file mode 100644 index 000000000..1cbdbf3c9 --- /dev/null +++ b/protocol/messenger_mailserver_processMailserverBatch_test.go @@ -0,0 +1,155 @@ +package protocol + +import ( + "context" + "crypto/rand" + "encoding/hex" + "errors" + "math/big" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + + "github.com/status-im/status-go/eth-node/types" + "github.com/status-im/status-go/protocol/tt" +) + +type queryResponse struct { + topics []types.TopicType + err error // Indicates if this response will simulate an error returned by SendMessagesRequestForTopics + cursor []byte +} + +type mockTransport struct { + queryResponses map[string]queryResponse +} + +func newMockTransport() *mockTransport { + return &mockTransport{ + queryResponses: make(map[string]queryResponse), + } +} + +func getInitialResponseKey(topics []types.TopicType) string { + return hex.EncodeToString(append([]byte("start"), topics[0][:]...)) +} + +func (t *mockTransport) SendMessagesRequestForTopics( + ctx context.Context, + peerID []byte, + from, to uint32, + previousCursor []byte, + previousStoreCursor *types.StoreRequestCursor, + topics []types.TopicType, + waitForResponse bool, +) (cursor []byte, storeCursor *types.StoreRequestCursor, err error) { + var response queryResponse + if previousCursor == nil { + initialResponse := getInitialResponseKey(topics) + response = t.queryResponses[initialResponse] + } else { + response = t.queryResponses[hex.EncodeToString(previousCursor)] + } + return response.cursor, nil, response.err +} + +func (t *mockTransport) Populate(topics []types.TopicType, responses int, includeRandomError bool) error { + if responses <= 0 || len(topics) == 0 { + return errors.New("invalid input parameters") + } + + var topicBatches [][]types.TopicType + + for i := 0; i < len(topics); i += maxTopicsPerRequest { + // Split batch in 10-contentTopic subbatches + j := i + maxTopicsPerRequest + if j > len(topics) { + j = len(topics) + } + topicBatches = append(topicBatches, topics[i:j]) + } + + randomErrIdx, err := rand.Int(rand.Reader, big.NewInt(int64(len(topicBatches)))) + if err != nil { + return err + } + randomErrIdxInt := int(randomErrIdx.Int64()) + + for i, topicBatch := range topicBatches { + // Setup initial response + initialResponseKey := getInitialResponseKey(topicBatch) + t.queryResponses[initialResponseKey] = queryResponse{ + topics: topicBatch, + err: nil, + } + + prevKey := initialResponseKey + for x := 0; x < responses-1; x++ { + newResponseCursor := []byte(uuid.New().String()) + newResponseKey := hex.EncodeToString(newResponseCursor) + + var err error + if includeRandomError && i == randomErrIdxInt && x == responses-2 { // Include an error in last request + err = errors.New("random error") + } + + t.queryResponses[newResponseKey] = queryResponse{ + topics: topicBatch, + err: err, + } + + // Updating prev response cursor to point to the new response + prevResponse := t.queryResponses[prevKey] + prevResponse.cursor = newResponseCursor + t.queryResponses[prevKey] = prevResponse + + prevKey = newResponseKey + } + + } + + return nil +} + +func TestProcessMailserverBatchHappyPath(t *testing.T) { + logger := tt.MustCreateTestLogger() + + mailserverID := []byte{1, 2, 3, 4, 5} + topics := []types.TopicType{} + for i := 0; i < 22; i++ { + topics = append(topics, types.BytesToTopic([]byte{0, 0, 0, byte(i)})) + } + + testTransport := newMockTransport() + err := testTransport.Populate(topics, 10, false) + require.NoError(t, err) + + testBatch := MailserverBatch{ + Topics: topics, + } + + err = processMailserverBatch(context.TODO(), testTransport, testBatch, mailserverID, logger) + require.NoError(t, err) +} + +func TestProcessMailserverBatchFailure(t *testing.T) { + logger := tt.MustCreateTestLogger() + + mailserverID := []byte{1, 2, 3, 4, 5} + topics := []types.TopicType{} + for i := 0; i < 5; i++ { + topics = append(topics, types.BytesToTopic([]byte{0, 0, 0, byte(i)})) + } + + testTransport := newMockTransport() + err := testTransport.Populate(topics, 4, true) + require.NoError(t, err) + + testBatch := MailserverBatch{ + Topics: topics, + } + + err = processMailserverBatch(context.TODO(), testTransport, testBatch, mailserverID, logger) + require.Error(t, err) +}