move shared secrets to subscription

This commit is contained in:
Andrea Maria Piana 2020-07-31 11:46:38 +02:00
parent dd3cf55556
commit b557a64612
No known key found for this signature in database
GPG Key ID: AA6CCA6DE0E06424
13 changed files with 147 additions and 74 deletions

View File

@ -17,7 +17,6 @@ import (
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/encryption"
"github.com/status-im/status-go/protocol/encryption/sharedsecret"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/sqlite"
transport "github.com/status-im/status-go/protocol/transport/whisper"
@ -62,12 +61,10 @@ func (s *MessageProcessorSuite) SetupTest() {
database, err := sqlite.Open(filepath.Join(s.tmpDir, "processor-test.sql"), "some-key")
s.Require().NoError(err)
onNewSharedSecret := func([]*sharedsecret.Secret) {}
onSendContactCode := func(*encryption.ProtocolMessageSpec) {}
encryptionProtocol := encryption.New(
database,
"installation-1",
onNewSharedSecret,
onSendContactCode,
s.logger,
)
@ -203,7 +200,6 @@ func (s *MessageProcessorSuite) TestHandleDecodedMessagesDatasyncEncrypted() {
senderEncryptionProtocol := encryption.New(
senderDatabase,
"installation-2",
func([]*sharedsecret.Secret) {},
func(*encryption.ProtocolMessageSpec) {},
s.logger,
)

View File

@ -16,7 +16,6 @@ import (
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/protocol/encryption/multidevice"
"github.com/status-im/status-go/protocol/encryption/sharedsecret"
)
const (
@ -65,7 +64,6 @@ func setupUser(user string, s *EncryptionServiceMultiDeviceSuite, n int) error {
protocol := New(
db,
installationID,
func(s []*sharedsecret.Secret) {},
func(*ProtocolMessageSpec) {},
s.logger.With(zap.String("user", user)),
)

View File

@ -18,8 +18,6 @@ import (
"go.uber.org/zap"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/protocol/encryption/sharedsecret"
)
var cleartext = []byte("hello")
@ -56,7 +54,6 @@ func (s *EncryptionServiceTestSuite) initDatabases(config encryptorConfig) {
db,
aliceInstallationID,
config,
func(s []*sharedsecret.Secret) {},
func(*ProtocolMessageSpec) {},
s.logger.With(zap.String("user", "alice")),
)
@ -68,7 +65,6 @@ func (s *EncryptionServiceTestSuite) initDatabases(config encryptorConfig) {
db,
bobInstallationID,
config,
func(s []*sharedsecret.Secret) {},
func(*ProtocolMessageSpec) {},
s.logger.With(zap.String("user", "bob")),
)

View File

@ -73,7 +73,6 @@ type Protocol struct {
publisher *publisher.Publisher
subscriptions *Subscriptions
onNewSharedSecretHandler func([]*sharedsecret.Secret)
onSendContactCodeHandler func(*ProtocolMessageSpec)
logger *zap.Logger
@ -88,7 +87,6 @@ var (
func New(
db *sql.DB,
installationID string,
onNewSharedSecretHandler func([]*sharedsecret.Secret),
onSendContactCodeHandler func(*ProtocolMessageSpec),
logger *zap.Logger,
) *Protocol {
@ -96,7 +94,6 @@ func New(
db,
installationID,
defaultEncryptorConfig(installationID, logger),
onNewSharedSecretHandler,
onSendContactCodeHandler,
logger,
)
@ -108,7 +105,6 @@ func NewWithEncryptorConfig(
db *sql.DB,
installationID string,
encryptorConfig encryptorConfig,
onNewSharedSecretHandler func([]*sharedsecret.Secret),
onSendContactCodeHandler func(*ProtocolMessageSpec),
logger *zap.Logger,
) *Protocol {
@ -121,7 +117,6 @@ func NewWithEncryptorConfig(
InstallationID: installationID,
}),
publisher: publisher.New(logger),
onNewSharedSecretHandler: onNewSharedSecretHandler,
onSendContactCodeHandler: onSendContactCodeHandler,
logger: logger.With(zap.Namespace("Protocol")),
}
@ -129,7 +124,7 @@ func NewWithEncryptorConfig(
type Subscriptions struct {
NewInstallations chan []*multidevice.Installation
NewSharedSecret chan []*sharedsecret.Secret
NewSharedSecrets chan []*sharedsecret.Secret
SendContactCode <-chan struct{}
Quit chan struct{}
}
@ -142,11 +137,13 @@ func (p *Protocol) Start(myIdentity *ecdsa.PrivateKey) (*Subscriptions, error) {
}
p.subscriptions = &Subscriptions{
NewInstallations: make(chan []*multidevice.Installation, subscriptionsChannelSize),
NewSharedSecret: make(chan []*sharedsecret.Secret, subscriptionsChannelSize),
NewSharedSecrets: make(chan []*sharedsecret.Secret, subscriptionsChannelSize),
SendContactCode: p.publisher.Start(),
Quit: make(chan struct{}),
}
p.onNewSharedSecretHandler(secrets)
if len(secrets) > 0 {
p.publishNewSharedSecrets(secrets)
}
// Handle Publisher system messages.
publisherCh := p.publisher.Start()
@ -271,9 +268,9 @@ func (p *Protocol) BuildDirectMessage(myIdentityKey *ecdsa.PrivateKey, publicKey
zap.Bool("has-shared-secret", sharedSecret != nil),
zap.Bool("agreed", agreed))
// Call handler
// Publish shared secrets
if sharedSecret != nil {
p.onNewSharedSecretHandler([]*sharedsecret.Secret{sharedSecret})
p.publishNewSharedSecrets([]*sharedsecret.Secret{sharedSecret})
}
spec := &ProtocolMessageSpec{
@ -428,6 +425,26 @@ func (p *Protocol) ConfirmMessageProcessed(messageID []byte) error {
return p.encryptor.ConfirmMessageProcessed(messageID)
}
func (p *Protocol) publishNewInstallations(installations []*multidevice.Installation) {
if p.subscriptions != nil {
select {
case p.subscriptions.NewInstallations <- installations:
default:
p.logger.Warn("new installations channel full, dropping message")
}
}
}
func (p *Protocol) publishNewSharedSecrets(secrets []*sharedsecret.Secret) {
if p.subscriptions != nil {
select {
case p.subscriptions.NewSharedSecrets <- secrets:
default:
p.logger.Warn("new sharedsecrets channel full, dropping message")
}
}
}
// HandleMessage unmarshals a message and processes it, decrypting it if it is a 1:1 message.
func (p *Protocol) HandleMessage(
myIdentityKey *ecdsa.PrivateKey,
@ -452,13 +469,7 @@ func (p *Protocol) HandleMessage(
}
// Publish without blocking if channel is full
if p.subscriptions != nil {
select {
case p.subscriptions.NewInstallations <- addedBundles:
default:
p.logger.Warn("new installations channel full, dropping message")
}
}
p.publishNewInstallations(addedBundles)
}
// Check if it's a public message
@ -493,7 +504,7 @@ func (p *Protocol) HandleMessage(
return nil, err
}
p.onNewSharedSecretHandler([]*sharedsecret.Secret{sharedSecret})
p.publishNewSharedSecrets([]*sharedsecret.Secret{sharedSecret})
}
return message, nil
}

View File

@ -4,6 +4,7 @@ import (
"io/ioutil"
"os"
"testing"
"time"
"github.com/status-im/status-go/protocol/tt"
@ -13,7 +14,6 @@ import (
"go.uber.org/zap"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/protocol/encryption/sharedsecret"
)
@ -43,14 +43,11 @@ func (s *ProtocolServiceTestSuite) SetupTest() {
s.Require().NoError(err)
bobDBKey := "bob"
onNewSharedSecretHandler := func(secret []*sharedsecret.Secret) {}
db, err := sqlite.Open(s.aliceDBPath.Name(), aliceDBKey)
s.Require().NoError(err)
s.alice = New(
db,
"1",
onNewSharedSecretHandler,
func(*ProtocolMessageSpec) {},
s.logger.With(zap.String("user", "alice")),
)
@ -60,7 +57,6 @@ func (s *ProtocolServiceTestSuite) SetupTest() {
s.bob = New(
db,
"2",
onNewSharedSecretHandler,
func(*ProtocolMessageSpec) {},
s.logger.With(zap.String("user", "bob")),
)
@ -146,9 +142,9 @@ func (s *ProtocolServiceTestSuite) TestSecretNegotiation() {
payload := []byte("test")
s.bob.onNewSharedSecretHandler = func(secret []*sharedsecret.Secret) {
secretResponse = secret
}
subscriptions, err := s.bob.Start(bobKey)
s.Require().NoError(err)
msgSpec, err := s.alice.BuildDirectMessage(aliceKey, &bobKey.PublicKey, payload)
s.NoError(err)
s.NotNil(msgSpec, "It creates a message spec")
@ -167,7 +163,14 @@ func (s *ProtocolServiceTestSuite) TestSecretNegotiation() {
_, err = s.bob.HandleMessage(bobKey, &aliceKey.PublicKey, msgSpec.Message, []byte("message-id"))
s.NoError(err)
select {
case <-time.After(2 * time.Second):
case secretResponse = <-subscriptions.NewSharedSecrets:
}
s.Require().NotNil(secretResponse)
s.Require().NoError(s.bob.Stop())
}
func (s *ProtocolServiceTestSuite) TestPropagatingSavedSharedSecretsOnStart() {
@ -182,15 +185,17 @@ func (s *ProtocolServiceTestSuite) TestPropagatingSavedSharedSecretsOnStart() {
generatedSecret, err := s.alice.secret.Generate(aliceKey, &bobKey.PublicKey, "installation-1")
s.NoError(err)
s.alice.onNewSharedSecretHandler = func(secret []*sharedsecret.Secret) {
secretResponse = secret
}
subscriptions, err := s.alice.Start(aliceKey)
s.Require().NoError(err)
_, err = s.alice.Start(aliceKey)
s.NoError(err)
select {
case <-time.After(2 * time.Second):
case secretResponse = <-subscriptions.NewSharedSecrets:
}
s.Require().NotNil(secretResponse)
s.Require().Len(secretResponse, 1)
s.Equal(crypto.FromECDSAPub(generatedSecret.Identity), crypto.FromECDSAPub(secretResponse[0].Identity))
s.Equal(generatedSecret.Key, secretResponse[0].Key)
s.Require().NoError(s.alice.Stop())
}

View File

@ -56,6 +56,7 @@ var (
// mailservers because they can also be managed by the user.
type Messenger struct {
node types.Node
config *config
identity *ecdsa.PrivateKey
persistence *sqlitePersistence
transport transport.Transport
@ -127,18 +128,6 @@ func NewMessenger(
}
}
// Set default config fields.
onNewSharedSecretHandler := func(secrets []*sharedsecret.Secret) {
filters, err := messenger.handleSharedSecrets(secrets)
if err != nil {
slogger := logger.With(zap.String("site", "onNewSharedSecretHandler"))
slogger.Warn("failed to process secrets", zap.Error(err))
}
if c.onNegotiatedFilters != nil {
c.onNegotiatedFilters(filters)
}
}
if c.onSendContactCodeHandler == nil {
c.onSendContactCodeHandler = func(messageSpec *encryption.ProtocolMessageSpec) {
slogger := logger.With(zap.String("site", "onSendContactCodeHandler"))
@ -221,7 +210,6 @@ func NewMessenger(
encryptionProtocol := encryption.New(
database,
installationID,
onNewSharedSecretHandler,
c.onSendContactCodeHandler,
logger,
)
@ -263,6 +251,7 @@ func NewMessenger(
handler := newMessageHandler(identity, logger, &sqlitePersistence{db: database})
messenger = &Messenger{
config: &c,
node: node,
identity: identity,
persistence: &sqlitePersistence{db: database},
@ -325,6 +314,24 @@ func (m *Messenger) Start() error {
return nil
}
func (m *Messenger) handleSharedSecrets(secrets []*sharedsecret.Secret) ([]*transport.Filter, error) {
logger := m.logger.With(zap.String("site", "handleSharedSecrets"))
var result []*transport.Filter
for _, secret := range secrets {
logger.Debug("received shared secret", zap.Binary("identity", crypto.FromECDSAPub(secret.Identity)))
fSecret := types.NegotiatedSecret{
PublicKey: secret.Identity,
Key: secret.Key,
}
filter, err := m.transport.ProcessNegotiatedSecret(fSecret)
if err != nil {
return nil, err
}
result = append(result, filter)
}
return result, nil
}
func (m *Messenger) handleNewInstallations(installations []*multidevice.Installation) {
for _, installation := range installations {
if installation.Identity == contactIDFromPublicKey(&m.identity.PublicKey) {
@ -336,17 +343,28 @@ func (m *Messenger) handleNewInstallations(installations []*multidevice.Installa
}
}
// handleEncryptionLayerSubscriptions handles events from the encryption layer
func (m *Messenger) handleEncryptionLayerSubscriptions(subscriptions *encryption.Subscriptions) {
go func() {
for {
select {
case secrets := <-subscriptions.NewSharedSecrets:
m.logger.Debug("handling new shared secrets")
filters, err := m.handleSharedSecrets(secrets)
if err != nil {
m.logger.Warn("failed to process secrets", zap.Error(err))
continue
}
if m.config.onNegotiatedFilters != nil {
m.config.onNegotiatedFilters(filters)
}
case newInstallations := <-subscriptions.NewInstallations:
m.logger.Debug("handling new installations")
m.handleNewInstallations(newInstallations)
case <-subscriptions.Quit:
m.logger.Debug("quitting encryption subscription loop")
return
}
}
}()
@ -455,24 +473,6 @@ func (m *Messenger) Shutdown() (err error) {
return
}
func (m *Messenger) handleSharedSecrets(secrets []*sharedsecret.Secret) ([]*transport.Filter, error) {
logger := m.logger.With(zap.String("site", "handleSharedSecrets"))
var result []*transport.Filter
for _, secret := range secrets {
logger.Debug("received shared secret", zap.Binary("identity", crypto.FromECDSAPub(secret.Identity)))
fSecret := types.NegotiatedSecret{
PublicKey: secret.Identity,
Key: secret.Key,
}
filter, err := m.transport.ProcessNegotiatedSecret(fSecret)
if err != nil {
return nil, err
}
result = append(result, filter)
}
return result, nil
}
func (m *Messenger) EnableInstallation(id string) error {
m.mutex.Lock()
defer m.mutex.Unlock()

View File

@ -44,6 +44,11 @@ func (s *MessengerContactUpdateSuite) SetupTest() {
s.m = s.newMessenger(s.shh)
s.privateKey = s.m.identity
s.Require().NoError(s.m.Start())
}
func (s *MessengerContactUpdateSuite) TearDownTest() {
s.Require().NoError(s.m.Shutdown())
}
func (s *MessengerContactUpdateSuite) newMessengerWithKey(shh types.Waku, privateKey *ecdsa.PrivateKey) *Messenger {
@ -85,6 +90,7 @@ func (s *MessengerContactUpdateSuite) TestReceiveContactUpdate() {
contactID := types.EncodeHex(crypto.FromECDSAPub(&s.m.identity.PublicKey))
theirMessenger := s.newMessenger(s.shh)
s.Require().NoError(theirMessenger.Start())
theirContactID := types.EncodeHex(crypto.FromECDSAPub(&theirMessenger.identity.PublicKey))
response, err := theirMessenger.SendContactUpdate(context.Background(), contactID, theirName, theirPicture)
@ -136,4 +142,5 @@ func (s *MessengerContactUpdateSuite) TestReceiveContactUpdate() {
s.Require().False(receivedContact.ENSVerified)
s.Require().True(receivedContact.HasBeenAdded())
s.Require().NotEmpty(receivedContact.LastUpdated)
s.Require().NoError(theirMessenger.Shutdown())
}

View File

@ -47,6 +47,11 @@ func (s *MessengerEmojiSuite) SetupTest() {
s.m = s.newMessenger(s.shh)
s.privateKey = s.m.identity
s.Require().NoError(s.m.Start())
}
func (s *MessengerEmojiSuite) TearDownTest() {
s.Require().NoError(s.m.Shutdown())
}
func (s *MessengerEmojiSuite) newMessengerWithKey(shh types.Waku, privateKey *ecdsa.PrivateKey) *Messenger {
@ -89,6 +94,7 @@ func (s *MessengerEmojiSuite) TestSendEmoji() {
s.Require().NoError(err)
bob := s.newMessengerWithKey(s.shh, key)
s.Require().NoError(bob.Start())
chatID := statusChatID
@ -162,11 +168,13 @@ func (s *MessengerEmojiSuite) TestSendEmoji() {
s.Require().Equal(response.EmojiReactions[0].ID(), emojiID)
s.Require().Equal(response.EmojiReactions[0].Type, protobuf.EmojiReaction_SAD)
s.Require().True(response.EmojiReactions[0].Retracted)
s.Require().NoError(bob.Shutdown())
}
func (s *MessengerEmojiSuite) TestEmojiPrivateGroup() {
bob := s.m
alice := s.newMessenger(s.shh)
s.Require().NoError(alice.Start())
response, err := bob.CreateGroupChatWithMembers(context.Background(), "test", []string{})
s.NoError(err)
@ -217,4 +225,5 @@ func (s *MessengerEmojiSuite) TestEmojiPrivateGroup() {
"no emoji reaction received",
)
s.Require().NoError(err)
s.Require().NoError(alice.Shutdown())
}

View File

@ -94,6 +94,7 @@ func (s *MessengerInstallationSuite) newMessenger(shh types.Waku) *Messenger {
func (s *MessengerInstallationSuite) TestReceiveInstallation() {
theirMessenger := s.newMessengerWithKey(s.shh, s.privateKey)
s.Require().NoError(theirMessenger.Start())
err := theirMessenger.SetInstallationMetadata(theirMessenger.installationID, &multidevice.InstallationMetadata{
Name: "their-name",
@ -159,6 +160,7 @@ func (s *MessengerInstallationSuite) TestReceiveInstallation() {
actualChat := response.Chats[0]
s.Require().Equal(statusChatID, actualChat.ID)
s.Require().True(actualChat.Active)
s.Require().NoError(theirMessenger.Shutdown())
}
func (s *MessengerInstallationSuite) TestSyncInstallation() {
@ -180,6 +182,7 @@ func (s *MessengerInstallationSuite) TestSyncInstallation() {
// pair
theirMessenger := s.newMessengerWithKey(s.shh, s.privateKey)
s.Require().NoError(theirMessenger.Start())
err = theirMessenger.SetInstallationMetadata(theirMessenger.installationID, &multidevice.InstallationMetadata{
Name: "their-name",
@ -246,6 +249,7 @@ func (s *MessengerInstallationSuite) TestSyncInstallation() {
s.Require().NotNil(statusChat)
s.Require().True(actualContact.IsAdded())
s.Require().NoError(theirMessenger.Shutdown())
}
func (s *MessengerInstallationSuite) TestSyncInstallationNewMessages() {
@ -253,7 +257,9 @@ func (s *MessengerInstallationSuite) TestSyncInstallationNewMessages() {
bob1 := s.m
// pair
bob2 := s.newMessengerWithKey(s.shh, s.privateKey)
s.Require().NoError(bob2.Start())
alice := s.newMessenger(s.shh)
s.Require().NoError(alice.Start())
err := bob2.SetInstallationMetadata(bob2.installationID, &multidevice.InstallationMetadata{
Name: "their-name",
@ -296,4 +302,6 @@ func (s *MessengerInstallationSuite) TestSyncInstallationNewMessages() {
"message not received",
)
s.Require().NoError(err)
s.Require().NoError(bob2.Shutdown())
s.Require().NoError(alice.Shutdown())
}

View File

@ -45,6 +45,11 @@ func (s *MessengerMuteSuite) SetupTest() {
s.m = s.newMessenger(s.shh)
s.privateKey = s.m.identity
s.Require().NoError(s.m.Start())
}
func (s *MessengerMuteSuite) TearDownTest() {
s.Require().NoError(s.m.Shutdown())
}
func (s *MessengerMuteSuite) newMessengerWithKey(shh types.Waku, privateKey *ecdsa.PrivateKey) *Messenger {
@ -86,6 +91,7 @@ func (s *MessengerMuteSuite) TestSetMute() {
s.Require().NoError(err)
theirMessenger := s.newMessengerWithKey(s.shh, key)
s.Require().NoError(theirMessenger.Start())
chatID := "status"
@ -107,4 +113,5 @@ func (s *MessengerMuteSuite) TestSetMute() {
s.Require().NoError(s.m.UnmuteChat(chatID))
s.Require().False(s.m.Chats()[0].Muted)
s.Require().NoError(theirMessenger.Shutdown())
}

View File

@ -102,6 +102,7 @@ func (s *MessengerSuite) SetupTest() {
s.m = s.newMessenger(s.shh)
s.privateKey = s.m.identity
s.Require().NoError(s.m.Start())
}
func (s *MessengerSuite) newMessengerWithKey(shh types.Waku, privateKey *ecdsa.PrivateKey) *Messenger {
@ -503,6 +504,7 @@ func (s *MessengerSuite) TestRetrieveOwnPublic() {
// Retrieve their public message
func (s *MessengerSuite) TestRetrieveTheirPublic() {
theirMessenger := s.newMessenger(s.shh)
s.Require().NoError(theirMessenger.Start())
theirChat := CreatePublicChat("status", s.m.transport)
err := theirMessenger.SaveChat(&theirChat)
s.Require().NoError(err)
@ -539,10 +541,12 @@ func (s *MessengerSuite) TestRetrieveTheirPublic() {
s.Require().Equal(sentMessage.Clock, actualChat.LastClockValue)
// It sets the last message
s.Require().NotNil(actualChat.LastMessage)
s.Require().NoError(theirMessenger.Shutdown())
}
func (s *MessengerSuite) TestDeletedAtClockValue() {
theirMessenger := s.newMessenger(s.shh)
s.Require().NoError(theirMessenger.Start())
theirChat := CreatePublicChat("status", s.m.transport)
err := theirMessenger.SaveChat(&theirChat)
s.Require().NoError(err)
@ -568,10 +572,12 @@ func (s *MessengerSuite) TestDeletedAtClockValue() {
response, err := s.m.RetrieveAll()
s.Require().NoError(err)
s.Require().Len(response.Messages, 0)
s.Require().NoError(theirMessenger.Shutdown())
}
func (s *MessengerSuite) TestRetrieveBlockedContact() {
theirMessenger := s.newMessenger(s.shh)
s.Require().NoError(theirMessenger.Start())
theirChat := CreatePublicChat("status", s.m.transport)
err := theirMessenger.SaveChat(&theirChat)
s.Require().NoError(err)
@ -610,6 +616,7 @@ func (s *MessengerSuite) TestRetrieveBlockedContact() {
// Resend their public message, receive only once
func (s *MessengerSuite) TestResendPublicMessage() {
theirMessenger := s.newMessenger(s.shh)
s.Require().NoError(theirMessenger.Start())
theirChat := CreatePublicChat("status", s.m.transport)
err := theirMessenger.SaveChat(&theirChat)
s.Require().NoError(err)
@ -663,6 +670,7 @@ func (s *MessengerSuite) TestResendPublicMessage() {
// Test receiving a message on an existing private chat
func (s *MessengerSuite) TestRetrieveTheirPrivateChatExisting() {
theirMessenger := s.newMessenger(s.shh)
s.Require().NoError(theirMessenger.Start())
theirChat := CreateOneToOneChat("XXX", &s.privateKey.PublicKey, s.m.transport)
err := theirMessenger.SaveChat(&theirChat)
s.Require().NoError(err)
@ -698,11 +706,13 @@ func (s *MessengerSuite) TestRetrieveTheirPrivateChatExisting() {
// It sets the last message
s.Require().NotNil(actualChat.LastMessage)
s.Require().True(actualChat.Active)
s.Require().NoError(theirMessenger.Shutdown())
}
// Test receiving a message on an non-existing private chat
func (s *MessengerSuite) TestRetrieveTheirPrivateChatNonExisting() {
theirMessenger := s.newMessenger(s.shh)
s.Require().NoError(theirMessenger.Start())
chat := CreateOneToOneChat("XXX", &s.privateKey.PublicKey, s.m.transport)
err := theirMessenger.SaveChat(&chat)
s.NoError(err)
@ -739,6 +749,7 @@ func (s *MessengerSuite) TestRetrieveTheirPrivateChatNonExisting() {
// Test receiving a message on an non-existing public chat
func (s *MessengerSuite) TestRetrieveTheirPublicChatNonExisting() {
theirMessenger := s.newMessenger(s.shh)
s.Require().NoError(theirMessenger.Start())
chat := CreatePublicChat("test-chat", s.m.transport)
err := theirMessenger.SaveChat(&chat)
s.NoError(err)
@ -756,12 +767,14 @@ func (s *MessengerSuite) TestRetrieveTheirPublicChatNonExisting() {
s.Require().Equal(len(response.Messages), 0)
s.Require().Equal(len(response.Chats), 0)
s.Require().NoError(theirMessenger.Shutdown())
}
// Test receiving a message on an existing private group chat
func (s *MessengerSuite) TestRetrieveTheirPrivateGroupChat() {
var response *MessengerResponse
theirMessenger := s.newMessenger(s.shh)
s.Require().NoError(theirMessenger.Start())
response, err := s.m.CreateGroupChatWithMembers(context.Background(), "id", []string{})
s.NoError(err)
s.Require().Len(response.Chats, 1)
@ -823,6 +836,7 @@ func (s *MessengerSuite) TestRetrieveTheirPrivateGroupChat() {
func (s *MessengerSuite) TestChangeNameGroupChat() {
var response *MessengerResponse
theirMessenger := s.newMessenger(s.shh)
s.Require().NoError(theirMessenger.Start())
response, err := s.m.CreateGroupChatWithMembers(context.Background(), "old-name", []string{})
s.NoError(err)
s.Require().Len(response.Chats, 1)
@ -869,12 +883,14 @@ func (s *MessengerSuite) TestChangeNameGroupChat() {
s.Require().Len(response.Chats, 1)
actualChat := response.Chats[0]
s.Require().Equal(newName, actualChat.Name)
s.Require().NoError(theirMessenger.Shutdown())
}
// Test being re-invited to a group chat
func (s *MessengerSuite) TestReInvitedToGroupChat() {
var response *MessengerResponse
theirMessenger := s.newMessenger(s.shh)
s.Require().NoError(theirMessenger.Start())
response, err := s.m.CreateGroupChatWithMembers(context.Background(), "old-name", []string{})
s.NoError(err)
s.Require().Len(response.Chats, 1)
@ -928,6 +944,7 @@ func (s *MessengerSuite) TestReInvitedToGroupChat() {
s.Require().Len(response.Chats, 1)
s.Require().True(response.Chats[0].Active)
s.Require().NoError(theirMessenger.Shutdown())
}
func (s *MessengerSuite) TestChatPersistencePublic() {
@ -1434,6 +1451,7 @@ func (s *MessengerSuite) TestDeclineRequestAddressForTransaction() {
value := testValue
contract := testContract
theirMessenger := s.newMessenger(s.shh)
s.Require().NoError(theirMessenger.Start())
theirPkString := types.EncodeHex(crypto.FromECDSAPub(&theirMessenger.identity.PublicKey))
chat := CreateOneToOneChat(theirPkString, &theirMessenger.identity.PublicKey, s.m.transport)
@ -1516,6 +1534,7 @@ func (s *MessengerSuite) TestDeclineRequestAddressForTransaction() {
s.Require().Equal(CommandStateRequestAddressForTransactionDeclined, receiverMessage.CommandParameters.CommandState)
s.Require().Equal(initialCommandID, receiverMessage.CommandParameters.ID)
s.Require().Equal(initialCommandID, receiverMessage.Replace)
s.Require().NoError(theirMessenger.Shutdown())
}
func (s *MessengerSuite) TestSendEthTransaction() {
@ -1523,6 +1542,7 @@ func (s *MessengerSuite) TestSendEthTransaction() {
contract := testContract
theirMessenger := s.newMessenger(s.shh)
s.Require().NoError(theirMessenger.Start())
theirPkString := types.EncodeHex(crypto.FromECDSAPub(&theirMessenger.identity.PublicKey))
receiverAddress := crypto.PubkeyToAddress(theirMessenger.identity.PublicKey)
@ -1617,6 +1637,7 @@ func (s *MessengerSuite) TestSendEthTransaction() {
s.Require().Equal(CommandStateTransactionSent, receiverMessage.CommandParameters.CommandState)
s.Require().Equal(senderMessage.ID, receiverMessage.ID)
s.Require().Equal("", receiverMessage.Replace)
s.Require().NoError(theirMessenger.Shutdown())
}
func (s *MessengerSuite) TestSendTokenTransaction() {
@ -1624,6 +1645,7 @@ func (s *MessengerSuite) TestSendTokenTransaction() {
contract := testContract
theirMessenger := s.newMessenger(s.shh)
s.Require().NoError(theirMessenger.Start())
theirPkString := types.EncodeHex(crypto.FromECDSAPub(&theirMessenger.identity.PublicKey))
receiverAddress := crypto.PubkeyToAddress(theirMessenger.identity.PublicKey)
@ -1718,12 +1740,14 @@ func (s *MessengerSuite) TestSendTokenTransaction() {
s.Require().Equal(CommandStateTransactionSent, receiverMessage.CommandParameters.CommandState)
s.Require().Equal(senderMessage.ID, receiverMessage.ID)
s.Require().Equal(senderMessage.Replace, senderMessage.Replace)
s.Require().NoError(theirMessenger.Shutdown())
}
func (s *MessengerSuite) TestAcceptRequestAddressForTransaction() {
value := testValue
contract := testContract
theirMessenger := s.newMessenger(s.shh)
s.Require().NoError(theirMessenger.Start())
theirPkString := types.EncodeHex(crypto.FromECDSAPub(&theirMessenger.identity.PublicKey))
myAddress := crypto.PubkeyToAddress(s.m.identity.PublicKey)
@ -1808,6 +1832,7 @@ func (s *MessengerSuite) TestAcceptRequestAddressForTransaction() {
s.Require().Equal(initialCommandID, receiverMessage.CommandParameters.ID)
s.Require().Equal("some-address", receiverMessage.CommandParameters.Address)
s.Require().Equal(initialCommandID, receiverMessage.Replace)
s.Require().NoError(theirMessenger.Shutdown())
}
func (s *MessengerSuite) TestDeclineRequestTransaction() {
@ -1816,6 +1841,7 @@ func (s *MessengerSuite) TestDeclineRequestTransaction() {
receiverAddress := crypto.PubkeyToAddress(s.m.identity.PublicKey)
receiverAddressString := strings.ToLower(receiverAddress.Hex())
theirMessenger := s.newMessenger(s.shh)
s.Require().NoError(theirMessenger.Start())
theirPkString := types.EncodeHex(crypto.FromECDSAPub(&theirMessenger.identity.PublicKey))
chat := CreateOneToOneChat(theirPkString, &theirMessenger.identity.PublicKey, s.m.transport)
@ -1895,6 +1921,7 @@ func (s *MessengerSuite) TestDeclineRequestTransaction() {
s.Require().Equal(initialCommandID, receiverMessage.CommandParameters.ID)
s.Require().Equal(initialCommandID, receiverMessage.Replace)
s.Require().Equal(CommandStateRequestTransactionDeclined, receiverMessage.CommandParameters.CommandState)
s.Require().NoError(theirMessenger.Shutdown())
}
func (s *MessengerSuite) TestRequestTransaction() {
@ -1903,6 +1930,7 @@ func (s *MessengerSuite) TestRequestTransaction() {
receiverAddress := crypto.PubkeyToAddress(s.m.identity.PublicKey)
receiverAddressString := strings.ToLower(receiverAddress.Hex())
theirMessenger := s.newMessenger(s.shh)
s.Require().NoError(theirMessenger.Start())
theirPkString := types.EncodeHex(crypto.FromECDSAPub(&theirMessenger.identity.PublicKey))
chat := CreateOneToOneChat(theirPkString, &theirMessenger.identity.PublicKey, s.m.transport)
@ -2039,6 +2067,7 @@ func (s *MessengerSuite) TestRequestTransaction() {
s.Require().Equal(CommandStateTransactionSent, receiverMessage.CommandParameters.CommandState)
s.Require().Equal(senderMessage.ID, receiverMessage.ID)
s.Require().Equal(senderMessage.Replace, senderMessage.Replace)
s.Require().NoError(theirMessenger.Shutdown())
}
type MockTransaction struct {

View File

@ -56,6 +56,7 @@ func (s *MessengerPushNotificationSuite) SetupTest() {
s.m = s.newMessenger(s.shh)
s.privateKey = s.m.identity
s.Require().NoError(s.m.Start())
}
func (s *MessengerPushNotificationSuite) TearDownTest() {
@ -126,6 +127,7 @@ func (s *MessengerPushNotificationSuite) TestReceivePushNotification() {
bob1 := s.m
bob2 := s.newMessengerWithKey(s.shh, s.m.identity)
s.Require().NoError(bob2.Start())
serverKey, err := crypto.GenerateKey()
s.Require().NoError(err)
@ -432,6 +434,7 @@ func (s *MessengerPushNotificationSuite) TestReceivePushNotificationRetries() {
alice := s.newMessenger(s.shh)
// another contact to invalidate the token
frank := s.newMessenger(s.shh)
s.Require().NoError(frank.Start())
// start alice and enable push notifications
s.Require().NoError(alice.Start())
s.Require().NoError(alice.EnableSendingPushNotifications())

View File

@ -169,10 +169,14 @@ func (s *Service) InitProtocol(identity *ecdsa.PrivateKey, db *sql.DB, logger *z
func (s *Service) StartMessenger() error {
// Start a loop that retrieves all messages and propagates them to status-react.
s.cancelMessenger = make(chan struct{})
err := s.messenger.Start()
if err != nil {
return err
}
go s.retrieveMessagesLoop(time.Second, s.cancelMessenger)
go s.verifyTransactionLoop(30*time.Second, s.cancelMessenger)
go s.verifyENSLoop(30*time.Second, s.cancelMessenger)
return s.messenger.Start()
return nil
}
func (s *Service) retrieveMessagesLoop(tick time.Duration, cancel <-chan struct{}) {