status-go/protocol/transport/envelopes_monitor_test.go

269 lines
8.8 KiB
Go
Raw Permalink Normal View History

package transport
import (
"context"
"reflect"
"testing"
"time"
"go.uber.org/zap"
"github.com/stretchr/testify/suite"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
wakutypes "github.com/status-im/status-go/waku/types"
)
var (
testHash = types.Hash{0x01}
testHashes = []types.Hash{testHash}
testIDs = [][]byte{[]byte("id")}
)
type envelopeEventsHandlerMock struct {
envelopeSentCalls [][][]byte // slice of EnvelopeSent arguments
}
func (h *envelopeEventsHandlerMock) EnvelopeSent(identifiers [][]byte) {
h.envelopeSentCalls = append(h.envelopeSentCalls, identifiers)
}
func (h *envelopeEventsHandlerMock) EnvelopeExpired([][]byte, error) {
}
func (h *envelopeEventsHandlerMock) MailServerRequestCompleted(types.Hash, types.Hash, []byte, error) {
}
func (h *envelopeEventsHandlerMock) MailServerRequestExpired(types.Hash) {
}
type EnvelopesMonitorSuite struct {
suite.Suite
monitor *EnvelopesMonitor
eventsHandlerMock *envelopeEventsHandlerMock
}
func TestEnvelopesMonitorSuite(t *testing.T) {
suite.Run(t, new(EnvelopesMonitorSuite))
}
func (s *EnvelopesMonitorSuite) SetupTest() {
s.eventsHandlerMock = &envelopeEventsHandlerMock{}
s.monitor = NewEnvelopesMonitor(
nil,
EnvelopesMonitorConfig{
EnvelopeEventsHandler: s.eventsHandlerMock,
MaxAttempts: 6,
AwaitOnlyMailServerConfirmations: false,
IsMailserver: func(types.EnodeID) bool { return false },
Logger: zap.NewNop(),
},
)
}
func (s *EnvelopesMonitorSuite) TestEnvelopePosted() {
err := s.monitor.Add(testIDs, testHashes, []*wakutypes.NewMessage{{}})
s.Require().NoError(err)
s.Contains(s.monitor.envelopes, testHash)
2023-11-22 19:28:40 +01:00
s.Equal(EnvelopePosted, s.monitor.envelopes[testHash].state)
s.monitor.handleEvent(wakutypes.EnvelopeEvent{
Event: wakutypes.EventEnvelopeSent,
Hash: testHash,
})
s.Contains(s.monitor.envelopes, testHash)
2023-11-22 19:28:40 +01:00
s.Equal(EnvelopeSent, s.monitor.envelopes[testHash].state)
}
func (s *EnvelopesMonitorSuite) TestEnvelopePostedOutOfOrder() {
s.monitor.handleEvent(wakutypes.EnvelopeEvent{
Event: wakutypes.EventEnvelopeSent,
Hash: testHash,
})
err := s.monitor.Add(testIDs, testHashes, []*wakutypes.NewMessage{{}})
s.Require().NoError(err)
s.Require().Contains(s.monitor.envelopes, testHash)
2023-11-22 19:28:40 +01:00
s.Require().Equal(EnvelopeSent, s.monitor.envelopes[testHash].state)
}
func (s *EnvelopesMonitorSuite) TestConfirmedWithAcknowledge() {
testBatch := types.Hash{1}
pkey, err := crypto.GenerateKey()
s.Require().NoError(err)
node := enode.NewV4(&pkey.PublicKey, nil, 0, 0)
err = s.monitor.Add(testIDs, testHashes, []*wakutypes.NewMessage{{}})
s.Require().NoError(err)
s.Contains(s.monitor.envelopes, testHash)
2023-11-22 19:28:40 +01:00
s.Equal(EnvelopePosted, s.monitor.envelopes[testHash].state)
s.monitor.handleEvent(wakutypes.EnvelopeEvent{
Event: wakutypes.EventEnvelopeSent,
Hash: testHash,
Batch: testBatch,
})
2023-11-22 19:28:40 +01:00
s.Equal(EnvelopePosted, s.monitor.envelopes[testHash].state)
s.monitor.handleEvent(wakutypes.EnvelopeEvent{
Event: wakutypes.EventBatchAcknowledged,
Batch: testBatch,
Peer: types.EnodeID(node.ID()),
})
s.Contains(s.monitor.envelopes, testHash)
2023-11-22 19:28:40 +01:00
s.Equal(EnvelopeSent, s.monitor.envelopes[testHash].state)
}
func (s *EnvelopesMonitorSuite) TestRemoved() {
err := s.monitor.Add(testIDs, testHashes, []*wakutypes.NewMessage{{}})
s.Require().NoError(err)
s.Contains(s.monitor.envelopes, testHash)
s.monitor.handleEvent(wakutypes.EnvelopeEvent{
Event: wakutypes.EventEnvelopeExpired,
Hash: testHash,
})
s.NotContains(s.monitor.envelopes, testHash)
}
func (s *EnvelopesMonitorSuite) TestIgnoreNotFromMailserver() {
// enables filter in the tracker to drop confirmations from non-mailserver peers
s.monitor.awaitOnlyMailServerConfirmations = true
err := s.monitor.Add(testIDs, testHashes, []*wakutypes.NewMessage{{}})
s.Require().NoError(err)
s.monitor.handleEvent(wakutypes.EnvelopeEvent{
Event: wakutypes.EventEnvelopeSent,
Hash: testHash,
Peer: types.EnodeID{1}, // could be empty, doesn't impact test behaviour
})
s.Require().Equal(EnvelopePosted, s.monitor.GetState(testHash))
}
func (s *EnvelopesMonitorSuite) TestReceived() {
s.monitor.isMailserver = func(peer types.EnodeID) bool {
return true
}
err := s.monitor.Add(testIDs, testHashes, []*wakutypes.NewMessage{{}})
s.Require().NoError(err)
s.Contains(s.monitor.envelopes, testHash)
s.monitor.handleEvent(wakutypes.EnvelopeEvent{
Event: wakutypes.EventEnvelopeReceived,
Hash: testHash,
})
s.Require().Equal(EnvelopeSent, s.monitor.GetState(testHash))
}
func (s *EnvelopesMonitorSuite) TestMultipleHashes() {
messageIDs := [][]byte{[]byte("id1"), []byte("id2")}
hashes := []types.Hash{{0x01}, {0x02}, {0x03}}
messages := []*wakutypes.NewMessage{{}, {}, {}}
err := s.monitor.Add(messageIDs, hashes, messages)
s.Require().NoError(err)
for _, hash := range hashes {
s.Contains(s.monitor.envelopes, hash)
}
s.Require().Empty(s.eventsHandlerMock.envelopeSentCalls)
s.Require().Equal(EnvelopePosted, s.monitor.envelopes[hashes[0]].state)
s.Require().Equal(EnvelopePosted, s.monitor.envelopes[hashes[1]].state)
s.Require().Equal(EnvelopePosted, s.monitor.envelopes[hashes[2]].state)
s.monitor.handleEvent(wakutypes.EnvelopeEvent{
Event: wakutypes.EventEnvelopeSent,
Hash: hashes[0],
})
s.Require().Empty(s.eventsHandlerMock.envelopeSentCalls)
s.Require().Equal(EnvelopeSent, s.monitor.envelopes[hashes[0]].state)
s.Require().Equal(EnvelopePosted, s.monitor.envelopes[hashes[1]].state)
s.Require().Equal(EnvelopePosted, s.monitor.envelopes[hashes[2]].state)
s.monitor.handleEvent(wakutypes.EnvelopeEvent{
Event: wakutypes.EventEnvelopeSent,
Hash: hashes[1],
})
s.Require().Empty(s.eventsHandlerMock.envelopeSentCalls)
s.Require().Equal(EnvelopeSent, s.monitor.envelopes[hashes[0]].state)
s.Require().Equal(EnvelopeSent, s.monitor.envelopes[hashes[1]].state)
s.Require().Equal(EnvelopePosted, s.monitor.envelopes[hashes[2]].state)
s.monitor.handleEvent(wakutypes.EnvelopeEvent{
Event: wakutypes.EventEnvelopeSent,
Hash: hashes[2],
})
// Identifiers should be marked as sent only if all corresponding envelopes are sent
s.Require().Len(s.eventsHandlerMock.envelopeSentCalls, 1)
s.Require().True(reflect.DeepEqual(messageIDs, s.eventsHandlerMock.envelopeSentCalls[0]))
s.Require().Equal(EnvelopeSent, s.monitor.envelopes[hashes[0]].state)
s.Require().Equal(EnvelopeSent, s.monitor.envelopes[hashes[1]].state)
s.Require().Equal(EnvelopeSent, s.monitor.envelopes[hashes[2]].state)
}
func (s *EnvelopesMonitorSuite) TestMultipleHashes_EnvelopeExpired() {
messageIDs := [][]byte{[]byte("id1"), []byte("id2")}
hashes := []types.Hash{{0x01}, {0x02}, {0x03}}
messages := []*wakutypes.NewMessage{{}, {}, {}}
err := s.monitor.Add(messageIDs, hashes, messages)
s.Require().NoError(err)
// If any envelope fails, then messageIDs are considered as not sent
s.monitor.handleEvent(wakutypes.EnvelopeEvent{
Event: wakutypes.EventEnvelopeExpired,
Hash: hashes[0],
})
s.monitor.handleEvent(wakutypes.EnvelopeEvent{
Event: wakutypes.EventEnvelopeSent,
Hash: hashes[1],
})
s.monitor.handleEvent(wakutypes.EnvelopeEvent{
Event: wakutypes.EventEnvelopeSent,
Hash: hashes[2],
})
s.Require().Empty(s.eventsHandlerMock.envelopeSentCalls)
s.Require().Empty(s.monitor.messageEnvelopeHashes)
s.Require().Len(s.monitor.envelopes, 2)
}
func (s *EnvelopesMonitorSuite) TestMultipleHashes_Failure() {
err := s.monitor.Add(testIDs, []types.Hash{{0x01}, {0x02}}, []*wakutypes.NewMessage{{}})
s.Require().Error(err)
}
func (s *EnvelopesMonitorSuite) TestRetryOnce() {
s.monitor.api = &mockWakuAPI{}
err := s.monitor.Add(testIDs, testHashes, []*wakutypes.NewMessage{{}})
s.Require().NoError(err)
envelope := s.monitor.envelopes[testHash]
envelope.attempts = 2
envelope.lastAttemptTime = time.Now().Add(-20 * time.Second)
s.monitor.retryQueue = append(s.monitor.retryQueue, envelope)
s.monitor.retryOnce()
s.Require().Equal(3, envelope.attempts)
s.Require().Len(s.monitor.retryQueue, 0)
s.Require().Equal(envelope.envelopeHashID, s.monitor.envelopes[envelope.envelopeHashID].envelopeHashID)
}
type mockWakuAPI struct{}
func (m *mockWakuAPI) Post(ctx context.Context, msg wakutypes.NewMessage) ([]byte, error) {
return []byte{0x01}, nil
}
func (m *mockWakuAPI) AddPrivateKey(ctx context.Context, privateKey types.HexBytes) (string, error) {
return "", nil
}
func (m *mockWakuAPI) GenerateSymKeyFromPassword(ctx context.Context, passwd string) (string, error) {
return "", nil
}
func (m *mockWakuAPI) DeleteKeyPair(ctx context.Context, key string) (bool, error) {
return false, nil
}
func (m *mockWakuAPI) NewMessageFilter(req wakutypes.Criteria) (string, error) {
return "", nil
}
func (m *mockWakuAPI) GetFilterMessages(id string) ([]*wakutypes.Message, error) {
return nil, nil
}
func (m *mockWakuAPI) BloomFilter() []byte {
return nil
}