diff --git a/protocol/common/message_processor_test.go b/protocol/common/message_processor_test.go index b9d0c1f8e..5e9280865 100644 --- a/protocol/common/message_processor_test.go +++ b/protocol/common/message_processor_test.go @@ -17,7 +17,6 @@ import ( "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/encryption" - "github.com/status-im/status-go/protocol/encryption/sharedsecret" "github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/sqlite" transport "github.com/status-im/status-go/protocol/transport/whisper" @@ -62,12 +61,10 @@ func (s *MessageProcessorSuite) SetupTest() { database, err := sqlite.Open(filepath.Join(s.tmpDir, "processor-test.sql"), "some-key") s.Require().NoError(err) - onNewSharedSecret := func([]*sharedsecret.Secret) {} onSendContactCode := func(*encryption.ProtocolMessageSpec) {} encryptionProtocol := encryption.New( database, "installation-1", - onNewSharedSecret, onSendContactCode, s.logger, ) @@ -203,7 +200,6 @@ func (s *MessageProcessorSuite) TestHandleDecodedMessagesDatasyncEncrypted() { senderEncryptionProtocol := encryption.New( senderDatabase, "installation-2", - func([]*sharedsecret.Secret) {}, func(*encryption.ProtocolMessageSpec) {}, s.logger, ) diff --git a/protocol/encryption/encryption_multi_device_test.go b/protocol/encryption/encryption_multi_device_test.go index f32c053aa..d21609d1d 100644 --- a/protocol/encryption/encryption_multi_device_test.go +++ b/protocol/encryption/encryption_multi_device_test.go @@ -16,7 +16,6 @@ import ( "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/protocol/encryption/multidevice" - "github.com/status-im/status-go/protocol/encryption/sharedsecret" ) const ( @@ -65,7 +64,6 @@ func setupUser(user string, s *EncryptionServiceMultiDeviceSuite, n int) error { protocol := New( db, installationID, - func(s []*sharedsecret.Secret) {}, func(*ProtocolMessageSpec) {}, s.logger.With(zap.String("user", user)), ) diff --git a/protocol/encryption/encryption_test.go b/protocol/encryption/encryption_test.go index 262eadf79..ab8e22642 100644 --- a/protocol/encryption/encryption_test.go +++ b/protocol/encryption/encryption_test.go @@ -18,8 +18,6 @@ import ( "go.uber.org/zap" "github.com/status-im/status-go/eth-node/crypto" - - "github.com/status-im/status-go/protocol/encryption/sharedsecret" ) var cleartext = []byte("hello") @@ -56,7 +54,6 @@ func (s *EncryptionServiceTestSuite) initDatabases(config encryptorConfig) { db, aliceInstallationID, config, - func(s []*sharedsecret.Secret) {}, func(*ProtocolMessageSpec) {}, s.logger.With(zap.String("user", "alice")), ) @@ -68,7 +65,6 @@ func (s *EncryptionServiceTestSuite) initDatabases(config encryptorConfig) { db, bobInstallationID, config, - func(s []*sharedsecret.Secret) {}, func(*ProtocolMessageSpec) {}, s.logger.With(zap.String("user", "bob")), ) diff --git a/protocol/encryption/protocol.go b/protocol/encryption/protocol.go index 4b7af228b..39e9b0b0e 100644 --- a/protocol/encryption/protocol.go +++ b/protocol/encryption/protocol.go @@ -73,7 +73,6 @@ type Protocol struct { publisher *publisher.Publisher subscriptions *Subscriptions - onNewSharedSecretHandler func([]*sharedsecret.Secret) onSendContactCodeHandler func(*ProtocolMessageSpec) logger *zap.Logger @@ -88,7 +87,6 @@ var ( func New( db *sql.DB, installationID string, - onNewSharedSecretHandler func([]*sharedsecret.Secret), onSendContactCodeHandler func(*ProtocolMessageSpec), logger *zap.Logger, ) *Protocol { @@ -96,7 +94,6 @@ func New( db, installationID, defaultEncryptorConfig(installationID, logger), - onNewSharedSecretHandler, onSendContactCodeHandler, logger, ) @@ -108,7 +105,6 @@ func NewWithEncryptorConfig( db *sql.DB, installationID string, encryptorConfig encryptorConfig, - onNewSharedSecretHandler func([]*sharedsecret.Secret), onSendContactCodeHandler func(*ProtocolMessageSpec), logger *zap.Logger, ) *Protocol { @@ -121,7 +117,6 @@ func NewWithEncryptorConfig( InstallationID: installationID, }), publisher: publisher.New(logger), - onNewSharedSecretHandler: onNewSharedSecretHandler, onSendContactCodeHandler: onSendContactCodeHandler, logger: logger.With(zap.Namespace("Protocol")), } @@ -129,7 +124,7 @@ func NewWithEncryptorConfig( type Subscriptions struct { NewInstallations chan []*multidevice.Installation - NewSharedSecret chan []*sharedsecret.Secret + NewSharedSecrets chan []*sharedsecret.Secret SendContactCode <-chan struct{} Quit chan struct{} } @@ -142,11 +137,13 @@ func (p *Protocol) Start(myIdentity *ecdsa.PrivateKey) (*Subscriptions, error) { } p.subscriptions = &Subscriptions{ NewInstallations: make(chan []*multidevice.Installation, subscriptionsChannelSize), - NewSharedSecret: make(chan []*sharedsecret.Secret, subscriptionsChannelSize), + NewSharedSecrets: make(chan []*sharedsecret.Secret, subscriptionsChannelSize), SendContactCode: p.publisher.Start(), Quit: make(chan struct{}), } - p.onNewSharedSecretHandler(secrets) + if len(secrets) > 0 { + p.publishNewSharedSecrets(secrets) + } // Handle Publisher system messages. publisherCh := p.publisher.Start() @@ -271,9 +268,9 @@ func (p *Protocol) BuildDirectMessage(myIdentityKey *ecdsa.PrivateKey, publicKey zap.Bool("has-shared-secret", sharedSecret != nil), zap.Bool("agreed", agreed)) - // Call handler + // Publish shared secrets if sharedSecret != nil { - p.onNewSharedSecretHandler([]*sharedsecret.Secret{sharedSecret}) + p.publishNewSharedSecrets([]*sharedsecret.Secret{sharedSecret}) } spec := &ProtocolMessageSpec{ @@ -428,6 +425,26 @@ func (p *Protocol) ConfirmMessageProcessed(messageID []byte) error { return p.encryptor.ConfirmMessageProcessed(messageID) } +func (p *Protocol) publishNewInstallations(installations []*multidevice.Installation) { + if p.subscriptions != nil { + select { + case p.subscriptions.NewInstallations <- installations: + default: + p.logger.Warn("new installations channel full, dropping message") + } + } +} + +func (p *Protocol) publishNewSharedSecrets(secrets []*sharedsecret.Secret) { + if p.subscriptions != nil { + select { + case p.subscriptions.NewSharedSecrets <- secrets: + default: + p.logger.Warn("new sharedsecrets channel full, dropping message") + } + } +} + // HandleMessage unmarshals a message and processes it, decrypting it if it is a 1:1 message. func (p *Protocol) HandleMessage( myIdentityKey *ecdsa.PrivateKey, @@ -452,13 +469,7 @@ func (p *Protocol) HandleMessage( } // Publish without blocking if channel is full - if p.subscriptions != nil { - select { - case p.subscriptions.NewInstallations <- addedBundles: - default: - p.logger.Warn("new installations channel full, dropping message") - } - } + p.publishNewInstallations(addedBundles) } // Check if it's a public message @@ -493,7 +504,7 @@ func (p *Protocol) HandleMessage( return nil, err } - p.onNewSharedSecretHandler([]*sharedsecret.Secret{sharedSecret}) + p.publishNewSharedSecrets([]*sharedsecret.Secret{sharedSecret}) } return message, nil } diff --git a/protocol/encryption/protocol_test.go b/protocol/encryption/protocol_test.go index 57882010b..c7a7ffaa7 100644 --- a/protocol/encryption/protocol_test.go +++ b/protocol/encryption/protocol_test.go @@ -4,6 +4,7 @@ import ( "io/ioutil" "os" "testing" + "time" "github.com/status-im/status-go/protocol/tt" @@ -13,7 +14,6 @@ import ( "go.uber.org/zap" "github.com/status-im/status-go/eth-node/crypto" - "github.com/status-im/status-go/protocol/encryption/sharedsecret" ) @@ -43,14 +43,11 @@ func (s *ProtocolServiceTestSuite) SetupTest() { s.Require().NoError(err) bobDBKey := "bob" - onNewSharedSecretHandler := func(secret []*sharedsecret.Secret) {} - db, err := sqlite.Open(s.aliceDBPath.Name(), aliceDBKey) s.Require().NoError(err) s.alice = New( db, "1", - onNewSharedSecretHandler, func(*ProtocolMessageSpec) {}, s.logger.With(zap.String("user", "alice")), ) @@ -60,7 +57,6 @@ func (s *ProtocolServiceTestSuite) SetupTest() { s.bob = New( db, "2", - onNewSharedSecretHandler, func(*ProtocolMessageSpec) {}, s.logger.With(zap.String("user", "bob")), ) @@ -146,9 +142,9 @@ func (s *ProtocolServiceTestSuite) TestSecretNegotiation() { payload := []byte("test") - s.bob.onNewSharedSecretHandler = func(secret []*sharedsecret.Secret) { - secretResponse = secret - } + subscriptions, err := s.bob.Start(bobKey) + s.Require().NoError(err) + msgSpec, err := s.alice.BuildDirectMessage(aliceKey, &bobKey.PublicKey, payload) s.NoError(err) s.NotNil(msgSpec, "It creates a message spec") @@ -167,7 +163,14 @@ func (s *ProtocolServiceTestSuite) TestSecretNegotiation() { _, err = s.bob.HandleMessage(bobKey, &aliceKey.PublicKey, msgSpec.Message, []byte("message-id")) s.NoError(err) + select { + case <-time.After(2 * time.Second): + case secretResponse = <-subscriptions.NewSharedSecrets: + + } + s.Require().NotNil(secretResponse) + s.Require().NoError(s.bob.Stop()) } func (s *ProtocolServiceTestSuite) TestPropagatingSavedSharedSecretsOnStart() { @@ -182,15 +185,17 @@ func (s *ProtocolServiceTestSuite) TestPropagatingSavedSharedSecretsOnStart() { generatedSecret, err := s.alice.secret.Generate(aliceKey, &bobKey.PublicKey, "installation-1") s.NoError(err) - s.alice.onNewSharedSecretHandler = func(secret []*sharedsecret.Secret) { - secretResponse = secret - } + subscriptions, err := s.alice.Start(aliceKey) + s.Require().NoError(err) - _, err = s.alice.Start(aliceKey) - s.NoError(err) + select { + case <-time.After(2 * time.Second): + case secretResponse = <-subscriptions.NewSharedSecrets: + } s.Require().NotNil(secretResponse) s.Require().Len(secretResponse, 1) s.Equal(crypto.FromECDSAPub(generatedSecret.Identity), crypto.FromECDSAPub(secretResponse[0].Identity)) s.Equal(generatedSecret.Key, secretResponse[0].Key) + s.Require().NoError(s.alice.Stop()) } diff --git a/protocol/messenger.go b/protocol/messenger.go index b8b752ce1..337bf6ba7 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -56,6 +56,7 @@ var ( // mailservers because they can also be managed by the user. type Messenger struct { node types.Node + config *config identity *ecdsa.PrivateKey persistence *sqlitePersistence transport transport.Transport @@ -127,18 +128,6 @@ func NewMessenger( } } - // Set default config fields. - onNewSharedSecretHandler := func(secrets []*sharedsecret.Secret) { - filters, err := messenger.handleSharedSecrets(secrets) - if err != nil { - slogger := logger.With(zap.String("site", "onNewSharedSecretHandler")) - slogger.Warn("failed to process secrets", zap.Error(err)) - } - - if c.onNegotiatedFilters != nil { - c.onNegotiatedFilters(filters) - } - } if c.onSendContactCodeHandler == nil { c.onSendContactCodeHandler = func(messageSpec *encryption.ProtocolMessageSpec) { slogger := logger.With(zap.String("site", "onSendContactCodeHandler")) @@ -221,7 +210,6 @@ func NewMessenger( encryptionProtocol := encryption.New( database, installationID, - onNewSharedSecretHandler, c.onSendContactCodeHandler, logger, ) @@ -263,6 +251,7 @@ func NewMessenger( handler := newMessageHandler(identity, logger, &sqlitePersistence{db: database}) messenger = &Messenger{ + config: &c, node: node, identity: identity, persistence: &sqlitePersistence{db: database}, @@ -325,6 +314,24 @@ func (m *Messenger) Start() error { return nil } +func (m *Messenger) handleSharedSecrets(secrets []*sharedsecret.Secret) ([]*transport.Filter, error) { + logger := m.logger.With(zap.String("site", "handleSharedSecrets")) + var result []*transport.Filter + for _, secret := range secrets { + logger.Debug("received shared secret", zap.Binary("identity", crypto.FromECDSAPub(secret.Identity))) + fSecret := types.NegotiatedSecret{ + PublicKey: secret.Identity, + Key: secret.Key, + } + filter, err := m.transport.ProcessNegotiatedSecret(fSecret) + if err != nil { + return nil, err + } + result = append(result, filter) + } + return result, nil +} + func (m *Messenger) handleNewInstallations(installations []*multidevice.Installation) { for _, installation := range installations { if installation.Identity == contactIDFromPublicKey(&m.identity.PublicKey) { @@ -336,17 +343,28 @@ func (m *Messenger) handleNewInstallations(installations []*multidevice.Installa } } +// handleEncryptionLayerSubscriptions handles events from the encryption layer func (m *Messenger) handleEncryptionLayerSubscriptions(subscriptions *encryption.Subscriptions) { go func() { for { select { + case secrets := <-subscriptions.NewSharedSecrets: + m.logger.Debug("handling new shared secrets") + filters, err := m.handleSharedSecrets(secrets) + if err != nil { + m.logger.Warn("failed to process secrets", zap.Error(err)) + continue + } + + if m.config.onNegotiatedFilters != nil { + m.config.onNegotiatedFilters(filters) + } case newInstallations := <-subscriptions.NewInstallations: m.logger.Debug("handling new installations") m.handleNewInstallations(newInstallations) case <-subscriptions.Quit: m.logger.Debug("quitting encryption subscription loop") return - } } }() @@ -455,24 +473,6 @@ func (m *Messenger) Shutdown() (err error) { return } -func (m *Messenger) handleSharedSecrets(secrets []*sharedsecret.Secret) ([]*transport.Filter, error) { - logger := m.logger.With(zap.String("site", "handleSharedSecrets")) - var result []*transport.Filter - for _, secret := range secrets { - logger.Debug("received shared secret", zap.Binary("identity", crypto.FromECDSAPub(secret.Identity))) - fSecret := types.NegotiatedSecret{ - PublicKey: secret.Identity, - Key: secret.Key, - } - filter, err := m.transport.ProcessNegotiatedSecret(fSecret) - if err != nil { - return nil, err - } - result = append(result, filter) - } - return result, nil -} - func (m *Messenger) EnableInstallation(id string) error { m.mutex.Lock() defer m.mutex.Unlock() diff --git a/protocol/messenger_contact_update_test.go b/protocol/messenger_contact_update_test.go index 25a6e9de7..c89a7f645 100644 --- a/protocol/messenger_contact_update_test.go +++ b/protocol/messenger_contact_update_test.go @@ -44,6 +44,11 @@ func (s *MessengerContactUpdateSuite) SetupTest() { s.m = s.newMessenger(s.shh) s.privateKey = s.m.identity + s.Require().NoError(s.m.Start()) +} + +func (s *MessengerContactUpdateSuite) TearDownTest() { + s.Require().NoError(s.m.Shutdown()) } func (s *MessengerContactUpdateSuite) newMessengerWithKey(shh types.Waku, privateKey *ecdsa.PrivateKey) *Messenger { @@ -85,6 +90,7 @@ func (s *MessengerContactUpdateSuite) TestReceiveContactUpdate() { contactID := types.EncodeHex(crypto.FromECDSAPub(&s.m.identity.PublicKey)) theirMessenger := s.newMessenger(s.shh) + s.Require().NoError(theirMessenger.Start()) theirContactID := types.EncodeHex(crypto.FromECDSAPub(&theirMessenger.identity.PublicKey)) response, err := theirMessenger.SendContactUpdate(context.Background(), contactID, theirName, theirPicture) @@ -136,4 +142,5 @@ func (s *MessengerContactUpdateSuite) TestReceiveContactUpdate() { s.Require().False(receivedContact.ENSVerified) s.Require().True(receivedContact.HasBeenAdded()) s.Require().NotEmpty(receivedContact.LastUpdated) + s.Require().NoError(theirMessenger.Shutdown()) } diff --git a/protocol/messenger_emoji_test.go b/protocol/messenger_emoji_test.go index 297c62127..10c8cd266 100644 --- a/protocol/messenger_emoji_test.go +++ b/protocol/messenger_emoji_test.go @@ -47,6 +47,11 @@ func (s *MessengerEmojiSuite) SetupTest() { s.m = s.newMessenger(s.shh) s.privateKey = s.m.identity + s.Require().NoError(s.m.Start()) +} + +func (s *MessengerEmojiSuite) TearDownTest() { + s.Require().NoError(s.m.Shutdown()) } func (s *MessengerEmojiSuite) newMessengerWithKey(shh types.Waku, privateKey *ecdsa.PrivateKey) *Messenger { @@ -89,6 +94,7 @@ func (s *MessengerEmojiSuite) TestSendEmoji() { s.Require().NoError(err) bob := s.newMessengerWithKey(s.shh, key) + s.Require().NoError(bob.Start()) chatID := statusChatID @@ -162,11 +168,13 @@ func (s *MessengerEmojiSuite) TestSendEmoji() { s.Require().Equal(response.EmojiReactions[0].ID(), emojiID) s.Require().Equal(response.EmojiReactions[0].Type, protobuf.EmojiReaction_SAD) s.Require().True(response.EmojiReactions[0].Retracted) + s.Require().NoError(bob.Shutdown()) } func (s *MessengerEmojiSuite) TestEmojiPrivateGroup() { bob := s.m alice := s.newMessenger(s.shh) + s.Require().NoError(alice.Start()) response, err := bob.CreateGroupChatWithMembers(context.Background(), "test", []string{}) s.NoError(err) @@ -217,4 +225,5 @@ func (s *MessengerEmojiSuite) TestEmojiPrivateGroup() { "no emoji reaction received", ) s.Require().NoError(err) + s.Require().NoError(alice.Shutdown()) } diff --git a/protocol/messenger_installations_test.go b/protocol/messenger_installations_test.go index dec7ce458..56e26c09a 100644 --- a/protocol/messenger_installations_test.go +++ b/protocol/messenger_installations_test.go @@ -94,6 +94,7 @@ func (s *MessengerInstallationSuite) newMessenger(shh types.Waku) *Messenger { func (s *MessengerInstallationSuite) TestReceiveInstallation() { theirMessenger := s.newMessengerWithKey(s.shh, s.privateKey) + s.Require().NoError(theirMessenger.Start()) err := theirMessenger.SetInstallationMetadata(theirMessenger.installationID, &multidevice.InstallationMetadata{ Name: "their-name", @@ -159,6 +160,7 @@ func (s *MessengerInstallationSuite) TestReceiveInstallation() { actualChat := response.Chats[0] s.Require().Equal(statusChatID, actualChat.ID) s.Require().True(actualChat.Active) + s.Require().NoError(theirMessenger.Shutdown()) } func (s *MessengerInstallationSuite) TestSyncInstallation() { @@ -180,6 +182,7 @@ func (s *MessengerInstallationSuite) TestSyncInstallation() { // pair theirMessenger := s.newMessengerWithKey(s.shh, s.privateKey) + s.Require().NoError(theirMessenger.Start()) err = theirMessenger.SetInstallationMetadata(theirMessenger.installationID, &multidevice.InstallationMetadata{ Name: "their-name", @@ -246,6 +249,7 @@ func (s *MessengerInstallationSuite) TestSyncInstallation() { s.Require().NotNil(statusChat) s.Require().True(actualContact.IsAdded()) + s.Require().NoError(theirMessenger.Shutdown()) } func (s *MessengerInstallationSuite) TestSyncInstallationNewMessages() { @@ -253,7 +257,9 @@ func (s *MessengerInstallationSuite) TestSyncInstallationNewMessages() { bob1 := s.m // pair bob2 := s.newMessengerWithKey(s.shh, s.privateKey) + s.Require().NoError(bob2.Start()) alice := s.newMessenger(s.shh) + s.Require().NoError(alice.Start()) err := bob2.SetInstallationMetadata(bob2.installationID, &multidevice.InstallationMetadata{ Name: "their-name", @@ -296,4 +302,6 @@ func (s *MessengerInstallationSuite) TestSyncInstallationNewMessages() { "message not received", ) s.Require().NoError(err) + s.Require().NoError(bob2.Shutdown()) + s.Require().NoError(alice.Shutdown()) } diff --git a/protocol/messenger_mute_test.go b/protocol/messenger_mute_test.go index 8fd525771..3fb6635f3 100644 --- a/protocol/messenger_mute_test.go +++ b/protocol/messenger_mute_test.go @@ -45,6 +45,11 @@ func (s *MessengerMuteSuite) SetupTest() { s.m = s.newMessenger(s.shh) s.privateKey = s.m.identity + s.Require().NoError(s.m.Start()) +} + +func (s *MessengerMuteSuite) TearDownTest() { + s.Require().NoError(s.m.Shutdown()) } func (s *MessengerMuteSuite) newMessengerWithKey(shh types.Waku, privateKey *ecdsa.PrivateKey) *Messenger { @@ -86,6 +91,7 @@ func (s *MessengerMuteSuite) TestSetMute() { s.Require().NoError(err) theirMessenger := s.newMessengerWithKey(s.shh, key) + s.Require().NoError(theirMessenger.Start()) chatID := "status" @@ -107,4 +113,5 @@ func (s *MessengerMuteSuite) TestSetMute() { s.Require().NoError(s.m.UnmuteChat(chatID)) s.Require().False(s.m.Chats()[0].Muted) + s.Require().NoError(theirMessenger.Shutdown()) } diff --git a/protocol/messenger_test.go b/protocol/messenger_test.go index 102d078bf..d0399d269 100644 --- a/protocol/messenger_test.go +++ b/protocol/messenger_test.go @@ -102,6 +102,7 @@ func (s *MessengerSuite) SetupTest() { s.m = s.newMessenger(s.shh) s.privateKey = s.m.identity + s.Require().NoError(s.m.Start()) } func (s *MessengerSuite) newMessengerWithKey(shh types.Waku, privateKey *ecdsa.PrivateKey) *Messenger { @@ -503,6 +504,7 @@ func (s *MessengerSuite) TestRetrieveOwnPublic() { // Retrieve their public message func (s *MessengerSuite) TestRetrieveTheirPublic() { theirMessenger := s.newMessenger(s.shh) + s.Require().NoError(theirMessenger.Start()) theirChat := CreatePublicChat("status", s.m.transport) err := theirMessenger.SaveChat(&theirChat) s.Require().NoError(err) @@ -539,10 +541,12 @@ func (s *MessengerSuite) TestRetrieveTheirPublic() { s.Require().Equal(sentMessage.Clock, actualChat.LastClockValue) // It sets the last message s.Require().NotNil(actualChat.LastMessage) + s.Require().NoError(theirMessenger.Shutdown()) } func (s *MessengerSuite) TestDeletedAtClockValue() { theirMessenger := s.newMessenger(s.shh) + s.Require().NoError(theirMessenger.Start()) theirChat := CreatePublicChat("status", s.m.transport) err := theirMessenger.SaveChat(&theirChat) s.Require().NoError(err) @@ -568,10 +572,12 @@ func (s *MessengerSuite) TestDeletedAtClockValue() { response, err := s.m.RetrieveAll() s.Require().NoError(err) s.Require().Len(response.Messages, 0) + s.Require().NoError(theirMessenger.Shutdown()) } func (s *MessengerSuite) TestRetrieveBlockedContact() { theirMessenger := s.newMessenger(s.shh) + s.Require().NoError(theirMessenger.Start()) theirChat := CreatePublicChat("status", s.m.transport) err := theirMessenger.SaveChat(&theirChat) s.Require().NoError(err) @@ -610,6 +616,7 @@ func (s *MessengerSuite) TestRetrieveBlockedContact() { // Resend their public message, receive only once func (s *MessengerSuite) TestResendPublicMessage() { theirMessenger := s.newMessenger(s.shh) + s.Require().NoError(theirMessenger.Start()) theirChat := CreatePublicChat("status", s.m.transport) err := theirMessenger.SaveChat(&theirChat) s.Require().NoError(err) @@ -663,6 +670,7 @@ func (s *MessengerSuite) TestResendPublicMessage() { // Test receiving a message on an existing private chat func (s *MessengerSuite) TestRetrieveTheirPrivateChatExisting() { theirMessenger := s.newMessenger(s.shh) + s.Require().NoError(theirMessenger.Start()) theirChat := CreateOneToOneChat("XXX", &s.privateKey.PublicKey, s.m.transport) err := theirMessenger.SaveChat(&theirChat) s.Require().NoError(err) @@ -698,11 +706,13 @@ func (s *MessengerSuite) TestRetrieveTheirPrivateChatExisting() { // It sets the last message s.Require().NotNil(actualChat.LastMessage) s.Require().True(actualChat.Active) + s.Require().NoError(theirMessenger.Shutdown()) } // Test receiving a message on an non-existing private chat func (s *MessengerSuite) TestRetrieveTheirPrivateChatNonExisting() { theirMessenger := s.newMessenger(s.shh) + s.Require().NoError(theirMessenger.Start()) chat := CreateOneToOneChat("XXX", &s.privateKey.PublicKey, s.m.transport) err := theirMessenger.SaveChat(&chat) s.NoError(err) @@ -739,6 +749,7 @@ func (s *MessengerSuite) TestRetrieveTheirPrivateChatNonExisting() { // Test receiving a message on an non-existing public chat func (s *MessengerSuite) TestRetrieveTheirPublicChatNonExisting() { theirMessenger := s.newMessenger(s.shh) + s.Require().NoError(theirMessenger.Start()) chat := CreatePublicChat("test-chat", s.m.transport) err := theirMessenger.SaveChat(&chat) s.NoError(err) @@ -756,12 +767,14 @@ func (s *MessengerSuite) TestRetrieveTheirPublicChatNonExisting() { s.Require().Equal(len(response.Messages), 0) s.Require().Equal(len(response.Chats), 0) + s.Require().NoError(theirMessenger.Shutdown()) } // Test receiving a message on an existing private group chat func (s *MessengerSuite) TestRetrieveTheirPrivateGroupChat() { var response *MessengerResponse theirMessenger := s.newMessenger(s.shh) + s.Require().NoError(theirMessenger.Start()) response, err := s.m.CreateGroupChatWithMembers(context.Background(), "id", []string{}) s.NoError(err) s.Require().Len(response.Chats, 1) @@ -823,6 +836,7 @@ func (s *MessengerSuite) TestRetrieveTheirPrivateGroupChat() { func (s *MessengerSuite) TestChangeNameGroupChat() { var response *MessengerResponse theirMessenger := s.newMessenger(s.shh) + s.Require().NoError(theirMessenger.Start()) response, err := s.m.CreateGroupChatWithMembers(context.Background(), "old-name", []string{}) s.NoError(err) s.Require().Len(response.Chats, 1) @@ -869,12 +883,14 @@ func (s *MessengerSuite) TestChangeNameGroupChat() { s.Require().Len(response.Chats, 1) actualChat := response.Chats[0] s.Require().Equal(newName, actualChat.Name) + s.Require().NoError(theirMessenger.Shutdown()) } // Test being re-invited to a group chat func (s *MessengerSuite) TestReInvitedToGroupChat() { var response *MessengerResponse theirMessenger := s.newMessenger(s.shh) + s.Require().NoError(theirMessenger.Start()) response, err := s.m.CreateGroupChatWithMembers(context.Background(), "old-name", []string{}) s.NoError(err) s.Require().Len(response.Chats, 1) @@ -928,6 +944,7 @@ func (s *MessengerSuite) TestReInvitedToGroupChat() { s.Require().Len(response.Chats, 1) s.Require().True(response.Chats[0].Active) + s.Require().NoError(theirMessenger.Shutdown()) } func (s *MessengerSuite) TestChatPersistencePublic() { @@ -1434,6 +1451,7 @@ func (s *MessengerSuite) TestDeclineRequestAddressForTransaction() { value := testValue contract := testContract theirMessenger := s.newMessenger(s.shh) + s.Require().NoError(theirMessenger.Start()) theirPkString := types.EncodeHex(crypto.FromECDSAPub(&theirMessenger.identity.PublicKey)) chat := CreateOneToOneChat(theirPkString, &theirMessenger.identity.PublicKey, s.m.transport) @@ -1516,6 +1534,7 @@ func (s *MessengerSuite) TestDeclineRequestAddressForTransaction() { s.Require().Equal(CommandStateRequestAddressForTransactionDeclined, receiverMessage.CommandParameters.CommandState) s.Require().Equal(initialCommandID, receiverMessage.CommandParameters.ID) s.Require().Equal(initialCommandID, receiverMessage.Replace) + s.Require().NoError(theirMessenger.Shutdown()) } func (s *MessengerSuite) TestSendEthTransaction() { @@ -1523,6 +1542,7 @@ func (s *MessengerSuite) TestSendEthTransaction() { contract := testContract theirMessenger := s.newMessenger(s.shh) + s.Require().NoError(theirMessenger.Start()) theirPkString := types.EncodeHex(crypto.FromECDSAPub(&theirMessenger.identity.PublicKey)) receiverAddress := crypto.PubkeyToAddress(theirMessenger.identity.PublicKey) @@ -1617,6 +1637,7 @@ func (s *MessengerSuite) TestSendEthTransaction() { s.Require().Equal(CommandStateTransactionSent, receiverMessage.CommandParameters.CommandState) s.Require().Equal(senderMessage.ID, receiverMessage.ID) s.Require().Equal("", receiverMessage.Replace) + s.Require().NoError(theirMessenger.Shutdown()) } func (s *MessengerSuite) TestSendTokenTransaction() { @@ -1624,6 +1645,7 @@ func (s *MessengerSuite) TestSendTokenTransaction() { contract := testContract theirMessenger := s.newMessenger(s.shh) + s.Require().NoError(theirMessenger.Start()) theirPkString := types.EncodeHex(crypto.FromECDSAPub(&theirMessenger.identity.PublicKey)) receiverAddress := crypto.PubkeyToAddress(theirMessenger.identity.PublicKey) @@ -1718,12 +1740,14 @@ func (s *MessengerSuite) TestSendTokenTransaction() { s.Require().Equal(CommandStateTransactionSent, receiverMessage.CommandParameters.CommandState) s.Require().Equal(senderMessage.ID, receiverMessage.ID) s.Require().Equal(senderMessage.Replace, senderMessage.Replace) + s.Require().NoError(theirMessenger.Shutdown()) } func (s *MessengerSuite) TestAcceptRequestAddressForTransaction() { value := testValue contract := testContract theirMessenger := s.newMessenger(s.shh) + s.Require().NoError(theirMessenger.Start()) theirPkString := types.EncodeHex(crypto.FromECDSAPub(&theirMessenger.identity.PublicKey)) myAddress := crypto.PubkeyToAddress(s.m.identity.PublicKey) @@ -1808,6 +1832,7 @@ func (s *MessengerSuite) TestAcceptRequestAddressForTransaction() { s.Require().Equal(initialCommandID, receiverMessage.CommandParameters.ID) s.Require().Equal("some-address", receiverMessage.CommandParameters.Address) s.Require().Equal(initialCommandID, receiverMessage.Replace) + s.Require().NoError(theirMessenger.Shutdown()) } func (s *MessengerSuite) TestDeclineRequestTransaction() { @@ -1816,6 +1841,7 @@ func (s *MessengerSuite) TestDeclineRequestTransaction() { receiverAddress := crypto.PubkeyToAddress(s.m.identity.PublicKey) receiverAddressString := strings.ToLower(receiverAddress.Hex()) theirMessenger := s.newMessenger(s.shh) + s.Require().NoError(theirMessenger.Start()) theirPkString := types.EncodeHex(crypto.FromECDSAPub(&theirMessenger.identity.PublicKey)) chat := CreateOneToOneChat(theirPkString, &theirMessenger.identity.PublicKey, s.m.transport) @@ -1895,6 +1921,7 @@ func (s *MessengerSuite) TestDeclineRequestTransaction() { s.Require().Equal(initialCommandID, receiverMessage.CommandParameters.ID) s.Require().Equal(initialCommandID, receiverMessage.Replace) s.Require().Equal(CommandStateRequestTransactionDeclined, receiverMessage.CommandParameters.CommandState) + s.Require().NoError(theirMessenger.Shutdown()) } func (s *MessengerSuite) TestRequestTransaction() { @@ -1903,6 +1930,7 @@ func (s *MessengerSuite) TestRequestTransaction() { receiverAddress := crypto.PubkeyToAddress(s.m.identity.PublicKey) receiverAddressString := strings.ToLower(receiverAddress.Hex()) theirMessenger := s.newMessenger(s.shh) + s.Require().NoError(theirMessenger.Start()) theirPkString := types.EncodeHex(crypto.FromECDSAPub(&theirMessenger.identity.PublicKey)) chat := CreateOneToOneChat(theirPkString, &theirMessenger.identity.PublicKey, s.m.transport) @@ -2039,6 +2067,7 @@ func (s *MessengerSuite) TestRequestTransaction() { s.Require().Equal(CommandStateTransactionSent, receiverMessage.CommandParameters.CommandState) s.Require().Equal(senderMessage.ID, receiverMessage.ID) s.Require().Equal(senderMessage.Replace, senderMessage.Replace) + s.Require().NoError(theirMessenger.Shutdown()) } type MockTransaction struct { diff --git a/protocol/push_notification_test.go b/protocol/push_notification_test.go index 9ac2c4449..a03d1d9fa 100644 --- a/protocol/push_notification_test.go +++ b/protocol/push_notification_test.go @@ -56,6 +56,7 @@ func (s *MessengerPushNotificationSuite) SetupTest() { s.m = s.newMessenger(s.shh) s.privateKey = s.m.identity + s.Require().NoError(s.m.Start()) } func (s *MessengerPushNotificationSuite) TearDownTest() { @@ -126,6 +127,7 @@ func (s *MessengerPushNotificationSuite) TestReceivePushNotification() { bob1 := s.m bob2 := s.newMessengerWithKey(s.shh, s.m.identity) + s.Require().NoError(bob2.Start()) serverKey, err := crypto.GenerateKey() s.Require().NoError(err) @@ -432,6 +434,7 @@ func (s *MessengerPushNotificationSuite) TestReceivePushNotificationRetries() { alice := s.newMessenger(s.shh) // another contact to invalidate the token frank := s.newMessenger(s.shh) + s.Require().NoError(frank.Start()) // start alice and enable push notifications s.Require().NoError(alice.Start()) s.Require().NoError(alice.EnableSendingPushNotifications()) diff --git a/services/ext/service.go b/services/ext/service.go index a32031343..3aa4beddf 100644 --- a/services/ext/service.go +++ b/services/ext/service.go @@ -169,10 +169,14 @@ func (s *Service) InitProtocol(identity *ecdsa.PrivateKey, db *sql.DB, logger *z func (s *Service) StartMessenger() error { // Start a loop that retrieves all messages and propagates them to status-react. s.cancelMessenger = make(chan struct{}) + err := s.messenger.Start() + if err != nil { + return err + } go s.retrieveMessagesLoop(time.Second, s.cancelMessenger) go s.verifyTransactionLoop(30*time.Second, s.cancelMessenger) go s.verifyENSLoop(30*time.Second, s.cancelMessenger) - return s.messenger.Start() + return nil } func (s *Service) retrieveMessagesLoop(tick time.Duration, cancel <-chan struct{}) {