fix: flaky MessengerStoreNodeRequestSuite (#4721)

This commit is contained in:
Igor Sirotin 2024-02-12 22:30:19 +00:00 committed by GitHub
parent 1c42c07760
commit 432bfeea2f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 49 additions and 22 deletions

View File

@ -26,4 +26,8 @@ type FeatureFlags struct {
// Peersyncing indicates whether we should advertise and sync messages with other peers
Peersyncing bool
// AutoRequestHistoricMessages indicates whether we should automatically request
// historic messages on getting online, connecting to store node, etc.
AutoRequestHistoricMessages bool
}

View File

@ -293,7 +293,7 @@ func NewMessenger(
) (*Messenger, error) {
var messenger *Messenger
c := config{messageResendMinDelay: 30, messageResendMaxCount: 3}
c := messengerDefaultConfig()
for _, opt := range opts {
if err := opt(&c); err != nil {
@ -950,7 +950,7 @@ func (m *Messenger) handleConnectionChange(online bool) {
}
// Start fetching messages from store nodes
if online {
if online && m.config.featureFlags.AutoRequestHistoricMessages {
m.asyncRequestAllHistoricMessages()
}

View File

@ -117,6 +117,16 @@ type config struct {
messageResendMaxCount int
}
func messengerDefaultConfig() config {
c := config{
messageResendMinDelay: 30,
messageResendMaxCount: 3,
}
c.featureFlags.AutoRequestHistoricMessages = true
return c
}
type Option func(*config) error
// WithSystemMessagesTranslations is required for Group Chats which are currently disabled.
@ -390,3 +400,10 @@ func WithAccountManager(accountManager account.Manager) Option {
return nil
}
}
func WithAutoRequestHistoricMessages(enabled bool) Option {
return func(c *config) error {
c.featureFlags.AutoRequestHistoricMessages = enabled
return nil
}
}

View File

@ -425,7 +425,9 @@ func (m *Messenger) connectToMailserver(ms mailservers.Mailserver) error {
signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver.Address, m.mailserverCycle.activeMailserver.ID)
// Query mailserver
m.asyncRequestAllHistoricMessages()
if m.config.featureFlags.AutoRequestHistoricMessages {
m.asyncRequestAllHistoricMessages()
}
}
}
return nil
@ -552,7 +554,9 @@ func (m *Messenger) handleMailserverCycleEvent(connectedPeers []ConnectedPeer) e
signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver.Address, m.mailserverCycle.activeMailserver.ID)
}
// Query mailserver
m.asyncRequestAllHistoricMessages()
if m.config.featureFlags.AutoRequestHistoricMessages {
m.asyncRequestAllHistoricMessages()
}
} else {
m.mailPeersMutex.Unlock()
}

View File

@ -204,7 +204,9 @@ func (s *MessengerStoreNodeRequestSuite) newMessenger(shh types.Waku, logger *za
privateKey, err := crypto.GenerateKey()
s.Require().NoError(err)
var options []Option
options := []Option{
WithAutoRequestHistoricMessages(false),
}
if mailserverAddress != "" {
options = append(options,
@ -338,7 +340,8 @@ func (s *MessengerStoreNodeRequestSuite) waitForEnvelopes(subscription <-chan st
select {
case <-subscription:
case <-ctx.Done():
s.Require().Fail("timeout waiting for store node to receive envelopes")
err := fmt.Sprintf("timeout waiting for store node to receive envelopes, received: %d, expected: %d", i, expectedEnvelopesCount)
s.Require().Fail(err)
}
}
}
@ -696,6 +699,8 @@ func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityEnvelopesOrder() {
const descriptionsCount = 4
community := s.createCommunity(s.owner)
contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(community.IDString()))
storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic)
// Push a few descriptions to the store node
for i := 0; i < descriptionsCount-1; i++ {
@ -703,28 +708,19 @@ func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityEnvelopesOrder() {
s.Require().NoError(err)
}
// Wait for store node to receive envelopes
s.waitForEnvelopes(storeNodeSubscription, descriptionsCount-1)
// Subscribe to received envelope
bobWakuV2 := gethbridge.GetGethWakuV2From(s.bobWaku)
contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(community.IDString()))
var prevEnvelope *wakuV2common.ReceivedMessage
receivedEnvelopesCount := 0
var receivedEnvelopes []*wakuV2common.ReceivedMessage
s.setupEnvelopesWatcher(bobWakuV2, &contentTopic, func(envelope *wakuV2common.ReceivedMessage) {
// We check that each next envelope fetched is newer than the previous one
if prevEnvelope != nil {
s.Require().Less(
envelope.Envelope.Message().GetTimestamp(),
prevEnvelope.Envelope.Message().GetTimestamp())
}
prevEnvelope = envelope
receivedEnvelopesCount++
receivedEnvelopes = append(receivedEnvelopes, envelope)
})
// Force a single-envelope page size to be able to check the order.
// Also force all envelopes to be fetched.
options := []StoreNodeRequestOption{
WithWaitForResponseOption(true),
WithStopWhenDataFound(false),
@ -733,13 +729,19 @@ func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityEnvelopesOrder() {
}
// Fetch the community
fetchedCommunity, _, err := s.bob.storeNodeRequestsManager.FetchCommunity(community.CommunityShard(), options)
s.Require().NoError(err)
s.requireCommunitiesEqual(fetchedCommunity, community)
// Ensure all expected envelopes were received
s.Require().Equal(receivedEnvelopesCount, descriptionsCount)
s.Require().Equal(descriptionsCount, len(receivedEnvelopes))
// We check that each next envelope fetched is newer than the previous one
for i := 1; i < len(receivedEnvelopes); i++ {
s.Require().Less(
receivedEnvelopes[i].Envelope.Message().GetTimestamp(),
receivedEnvelopes[i-1].Envelope.Message().GetTimestamp())
}
}
/*