chore: add integration test for messages tracking
This commit is contained in:
parent
d1a9ad599c
commit
b9c6f5a834
|
@ -0,0 +1,184 @@
|
||||||
|
package protocol
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/cenkalti/backoff/v3"
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
|
||||||
|
"github.com/status-im/status-go/eth-node/crypto"
|
||||||
|
"github.com/status-im/status-go/eth-node/types"
|
||||||
|
"github.com/status-im/status-go/protocol/transport"
|
||||||
|
"github.com/status-im/status-go/protocol/tt"
|
||||||
|
"github.com/status-im/status-go/signal"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMessengerMessagesTrackingSuite(t *testing.T) {
|
||||||
|
suite.Run(t, new(MessengerMessagesTrackingSuite))
|
||||||
|
}
|
||||||
|
|
||||||
|
type EnvelopeSignalHandlerMock struct{}
|
||||||
|
|
||||||
|
// EnvelopeSent triggered when envelope delivered atleast to 1 peer.
|
||||||
|
func (h EnvelopeSignalHandlerMock) EnvelopeSent(identifiers [][]byte) {
|
||||||
|
signal.SendEnvelopeSent(identifiers)
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnvelopeExpired triggered when envelope is expired but wasn't delivered to any peer.
|
||||||
|
func (h EnvelopeSignalHandlerMock) EnvelopeExpired(identifiers [][]byte, err error) {
|
||||||
|
signal.SendEnvelopeExpired(identifiers, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MailServerRequestCompleted triggered when the mailserver sends a message to notify that the request has been completed
|
||||||
|
func (h EnvelopeSignalHandlerMock) MailServerRequestCompleted(requestID types.Hash, lastEnvelopeHash types.Hash, cursor []byte, err error) {
|
||||||
|
signal.SendMailServerRequestCompleted(requestID, lastEnvelopeHash, cursor, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MailServerRequestExpired triggered when the mailserver request expires
|
||||||
|
func (h EnvelopeSignalHandlerMock) MailServerRequestExpired(hash types.Hash) {
|
||||||
|
signal.SendMailServerRequestExpired(hash)
|
||||||
|
}
|
||||||
|
|
||||||
|
type EnvelopeEventsInterceptorMock struct {
|
||||||
|
EnvelopeEventsInterceptor
|
||||||
|
|
||||||
|
enabled bool
|
||||||
|
lock sync.Mutex
|
||||||
|
identifiersQueue [][][]byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *EnvelopeEventsInterceptorMock) EnvelopeSent(identifiers [][]byte) {
|
||||||
|
i.lock.Lock()
|
||||||
|
defer i.lock.Unlock()
|
||||||
|
|
||||||
|
if i.enabled {
|
||||||
|
i.EnvelopeEventsInterceptor.EnvelopeSent(identifiers)
|
||||||
|
} else {
|
||||||
|
i.identifiersQueue = append(i.identifiersQueue, identifiers)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *EnvelopeEventsInterceptorMock) Enable() {
|
||||||
|
i.lock.Lock()
|
||||||
|
defer i.lock.Unlock()
|
||||||
|
|
||||||
|
for _, identifiers := range i.identifiersQueue {
|
||||||
|
i.EnvelopeEventsInterceptor.EnvelopeSent(identifiers)
|
||||||
|
}
|
||||||
|
i.enabled = true
|
||||||
|
}
|
||||||
|
|
||||||
|
type MessengerMessagesTrackingSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
|
||||||
|
bobWaku types.Waku
|
||||||
|
bobInterceptor *EnvelopeEventsInterceptorMock
|
||||||
|
bob *Messenger
|
||||||
|
|
||||||
|
aliceWaku types.Waku
|
||||||
|
aliceInterceptor *EnvelopeEventsInterceptorMock
|
||||||
|
alice *Messenger
|
||||||
|
|
||||||
|
logger *zap.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MessengerMessagesTrackingSuite) SetupTest() {
|
||||||
|
s.logger = tt.MustCreateTestLogger()
|
||||||
|
|
||||||
|
wakuNodes := createWakuNetwork(&s.Suite, s.logger, []string{"bob", "alice"})
|
||||||
|
|
||||||
|
s.bobWaku = wakuNodes[0]
|
||||||
|
s.bob, s.bobInterceptor = s.newMessenger(s.bobWaku, s.logger.With(zap.String("name", "bob")))
|
||||||
|
|
||||||
|
s.aliceWaku = wakuNodes[1]
|
||||||
|
s.alice, s.aliceInterceptor = s.newMessenger(s.aliceWaku, s.logger.With(zap.String("name", "alice")))
|
||||||
|
|
||||||
|
_, err := s.bob.Start()
|
||||||
|
s.Require().NoError(err)
|
||||||
|
_, err = s.alice.Start()
|
||||||
|
s.Require().NoError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MessengerMessagesTrackingSuite) TearDownTest() {
|
||||||
|
if s.bob != nil {
|
||||||
|
s.Require().NoError(s.bob.Shutdown())
|
||||||
|
}
|
||||||
|
if s.bobWaku != nil {
|
||||||
|
s.Require().NoError(gethbridge.GetGethWakuV2From(s.bobWaku).Stop())
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.alice != nil {
|
||||||
|
s.Require().NoError(s.alice.Shutdown())
|
||||||
|
}
|
||||||
|
if s.aliceWaku != nil {
|
||||||
|
s.Require().NoError(gethbridge.GetGethWakuV2From(s.aliceWaku).Stop())
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = s.logger.Sync()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MessengerMessagesTrackingSuite) newMessenger(waku types.Waku, logger *zap.Logger) (*Messenger, *EnvelopeEventsInterceptorMock) {
|
||||||
|
privateKey, err := crypto.GenerateKey()
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
envelopesMonitorConfig := &transport.EnvelopesMonitorConfig{
|
||||||
|
EnvelopeEventsHandler: EnvelopeSignalHandlerMock{},
|
||||||
|
MaxAttempts: 1,
|
||||||
|
AwaitOnlyMailServerConfirmations: false,
|
||||||
|
IsMailserver: func(peer types.EnodeID) bool { return false },
|
||||||
|
Logger: s.logger,
|
||||||
|
}
|
||||||
|
|
||||||
|
messenger, err := newMessengerWithKey(waku, privateKey, s.logger, []Option{WithEnvelopesMonitorConfig(envelopesMonitorConfig)})
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
interceptor := &EnvelopeEventsInterceptorMock{
|
||||||
|
EnvelopeEventsInterceptor: EnvelopeEventsInterceptor{
|
||||||
|
EnvelopeEventsHandler: envelopesMonitorConfig.EnvelopeEventsHandler,
|
||||||
|
Messenger: messenger,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
err = messenger.transport.SetEnvelopeEventsHandler(interceptor)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
return messenger, interceptor
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MessengerMessagesTrackingSuite) TestMessageMarkedAsSent() {
|
||||||
|
//when message sent, its sent field should be "false" until we got confirmation
|
||||||
|
chat := CreatePublicChat("test-chat", s.bob.getTimesource())
|
||||||
|
err := s.bob.SaveChat(chat)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
inputMessage := buildTestMessage(*chat)
|
||||||
|
|
||||||
|
_, err = s.bob.SendChatMessage(context.Background(), inputMessage)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
rawMessage, err := s.bob.persistence.RawMessageByID(inputMessage.ID)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
s.Require().False(rawMessage.Sent)
|
||||||
|
|
||||||
|
// enables "EnvelopeSent" callback processing
|
||||||
|
s.bobInterceptor.Enable()
|
||||||
|
|
||||||
|
options := func(b *backoff.ExponentialBackOff) {
|
||||||
|
b.MaxElapsedTime = 1 * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
|
// Message should be marked as sent eventually
|
||||||
|
err = tt.RetryWithBackOff(func() error {
|
||||||
|
rawMessage, err = s.bob.persistence.RawMessageByID(inputMessage.ID)
|
||||||
|
if err != nil || !rawMessage.Sent {
|
||||||
|
return errors.New("message not marked as sent")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}, options)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
}
|
Loading…
Reference in New Issue