diff --git a/VERSION b/VERSION index 198ca2920..999ebbb45 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.30.0-beta.0 +0.30.0-alpha.1 diff --git a/messaging/chat/protocol.go b/messaging/chat/protocol.go index bd3c83bec..7dc1e59ef 100644 --- a/messaging/chat/protocol.go +++ b/messaging/chat/protocol.go @@ -292,6 +292,10 @@ func (p *ProtocolService) ConfirmMessagesProcessed(messageIDs [][]byte) error { return p.encryption.ConfirmMessagesProcessed(messageIDs) } +func (p *ProtocolService) GetSharedSecretService() *sharedsecret.Service { + return p.secret +} + // HandleMessage unmarshals a message and processes it, decrypting it if it is a 1:1 message. func (p *ProtocolService) HandleMessage(myIdentityKey *ecdsa.PrivateKey, theirPublicKey *ecdsa.PublicKey, protocolMessage *protobuf.ProtocolMessage, messageID []byte) ([]byte, error) { p.log.Debug("Received message from", "public-key", theirPublicKey) diff --git a/messaging/publisher/publisher.go b/messaging/publisher/publisher.go index f8a65470c..1339a5701 100644 --- a/messaging/publisher/publisher.go +++ b/messaging/publisher/publisher.go @@ -16,6 +16,7 @@ import ( "github.com/status-im/status-go/messaging/chat/protobuf" "github.com/status-im/status-go/messaging/filter" "github.com/status-im/status-go/messaging/multidevice" + "github.com/status-im/status-go/messaging/sharedsecret" "github.com/status-im/status-go/services/shhext/whisperutils" @@ -27,6 +28,8 @@ const ( tickerInterval = 120 // How often we should publish a contact code in seconds publishInterval = 21600 + // How often we should check for new messages + pollIntervalMs = 300 ) var ( @@ -66,10 +69,13 @@ func New(w *whisper.Whisper, c Config) *Publisher { } } -func (p *Publisher) Init(db *sql.DB, protocol *chat.ProtocolService, filter *filter.Service) { +func (p *Publisher) Init(db *sql.DB, protocol *chat.ProtocolService, onNewMessagesHandler func([]*filter.Messages)) { + + filterService := filter.New(p.whisper, filter.NewSQLLitePersistence(db), protocol.GetSharedSecretService(), onNewMessagesHandler) + p.persistence = NewSQLLitePersistence(db) p.protocol = protocol - p.filter = filter + p.filter = filterService } func (p *Publisher) ProcessPublicBundle(myIdentityKey *ecdsa.PrivateKey, bundle *protobuf.Bundle) ([]*multidevice.Installation, error) { @@ -173,10 +179,15 @@ func (p *Publisher) GetPublicBundle(identityKey *ecdsa.PublicKey) (*protobuf.Bun } func (p *Publisher) Start(online func() bool, startTicker bool) error { + if p.protocol == nil { + return errProtocolNotInitialized + } + p.online = online if startTicker { p.startTicker() } + go p.filter.Start(pollIntervalMs * time.Millisecond) return nil } @@ -205,6 +216,15 @@ func (p *Publisher) LoadFilter(chat *filter.Chat) ([]*filter.Chat, error) { func (p *Publisher) RemoveFilters(chats []*filter.Chat) error { return p.filter.Remove(chats) } +func (p *Publisher) ProcessNegotiatedSecret(secrets []*sharedsecret.Secret) { + for _, secret := range secrets { + + _, err := p.filter.ProcessNegotiatedSecret(secret) + if err != nil { + log.Error("could not process negotiated filter", "err", err) + } + } +} func (p *Publisher) ProcessMessage(msg *whisper.Message, msgID []byte) error { if !p.config.PFSEnabled { diff --git a/messaging/publisher/publisher_test.go b/messaging/publisher/publisher_test.go index 32593f526..4f796a80a 100644 --- a/messaging/publisher/publisher_test.go +++ b/messaging/publisher/publisher_test.go @@ -66,8 +66,6 @@ func (s *ServiceTestSuite) createPublisher(installationID string) (*Publisher, * sharedSecretService := sharedsecret.NewService(persistence.GetSharedSecretStorage()) - filterService := filter.New(whisper, filter.NewSQLLitePersistence(persistence.DB), sharedSecretService, func([]*filter.Messages) {}) - multideviceConfig := &multidevice.Config{ InstallationID: installationID, ProtocolVersion: chat.ProtocolVersion, @@ -82,14 +80,10 @@ func (s *ServiceTestSuite) createPublisher(installationID string) (*Publisher, * sharedSecretService, multideviceService, func(addedBundles []*multidevice.Installation) {}, - func(sharedSecrets []*sharedsecret.Secret) { - for _, sharedSecret := range sharedSecrets { - _, _ = filterService.ProcessNegotiatedSecret(sharedSecret) - } - }, + publisher.ProcessNegotiatedSecret, ) - publisher.Init(persistence.DB, protocolService, filterService) + publisher.Init(persistence.DB, protocolService, func(msg []*filter.Messages) {}) err = publisher.Start(func() bool { return true }, false) s.Require().NoError(err) diff --git a/services/shhext/service.go b/services/shhext/service.go index dcf4101e3..df10b05d5 100644 --- a/services/shhext/service.go +++ b/services/shhext/service.go @@ -40,8 +40,6 @@ const ( defaultTimeoutWaitAdded = 5 * time.Second // maxInstallations is a maximum number of supported devices for one account. maxInstallations = 3 - // filterCheckIntervalMs is a how often we should check whisper filters for new messages - filterCheckIntervalMs = 300 ) // EnvelopeEventsHandler used for two different event types. @@ -176,13 +174,30 @@ func (s *Service) initProtocol(address, encKey, password string) error { return err } - // Initialize sharedsecret - sharedSecretService := sharedsecret.NewService(persistence.GetSharedSecretStorage()) + multideviceConfig := &multidevice.Config{ + InstallationID: s.config.InstallationID, + ProtocolVersion: chat.ProtocolVersion, + MaxInstallations: maxInstallations, + } + + addedBundlesHandler := func(addedBundles []*multidevice.Installation) { + handler := PublisherSignalHandler{} + for _, bundle := range addedBundles { + handler.BundleAdded(bundle.Identity, bundle.ID) + } + } + + protocolService := chat.NewProtocolService( + chat.NewEncryptionService( + persistence, + chat.DefaultEncryptionServiceConfig(s.config.InstallationID)), + sharedsecret.NewService(persistence.GetSharedSecretStorage()), + multidevice.New(multideviceConfig, persistence.GetMultideviceStorage()), + addedBundlesHandler, + s.ProcessNegotiatedSecret) - // Initialize filter onNewMessagesHandler := func(messages []*filter.Messages) { var signalMessages []*signal.Messages - handler := PublisherSignalHandler{} for _, chatMessages := range messages { signalMessage := &signal.Messages{ Error: chatMessages.Error, @@ -197,37 +212,9 @@ func (s *Service) initProtocol(address, encKey, password string) error { signalMessage.Messages = dedupMessages } - handler.NewMessages(signalMessages) + PublisherSignalHandler{}.NewMessages(signalMessages) } - - filterService := filter.New(s.w, filter.NewSQLLitePersistence(persistence.DB), sharedSecretService, onNewMessagesHandler) - go filterService.Start(filterCheckIntervalMs * time.Millisecond) - - // Initialize multidevice - multideviceConfig := &multidevice.Config{ - InstallationID: s.config.InstallationID, - ProtocolVersion: chat.ProtocolVersion, - MaxInstallations: maxInstallations, - } - multideviceService := multidevice.New(multideviceConfig, persistence.GetMultideviceStorage()) - - addedBundlesHandler := func(addedBundles []*multidevice.Installation) { - handler := PublisherSignalHandler{} - for _, bundle := range addedBundles { - handler.BundleAdded(bundle.Identity, bundle.ID) - } - } - - protocolService := chat.NewProtocolService( - chat.NewEncryptionService( - persistence, - chat.DefaultEncryptionServiceConfig(s.config.InstallationID)), - sharedSecretService, - multideviceService, - addedBundlesHandler, - s.newSharedSecretHandler(filterService)) - - s.Publisher.Init(persistence.DB, protocolService, filterService) + s.Publisher.Init(persistence.DB, protocolService, onNewMessagesHandler) return nil } @@ -253,41 +240,15 @@ func (s *Service) processReceivedMessages(messages []*whisper.Message) ([]dedup. handler := PublisherSignalHandler{} handler.DecryptMessageFailed(keyString) default: - log.Error("Failed handling message with error", "err", err) + if err != nil { + log.Error("Failed handling message with error", "err", err) + } } } return dedupMessages, nil } -func (s *Service) newSharedSecretHandler(filterService *filter.Service) func([]*sharedsecret.Secret) { - return func(sharedSecrets []*sharedsecret.Secret) { - var filters []*signal.Filter - for _, sharedSecret := range sharedSecrets { - chat, err := filterService.ProcessNegotiatedSecret(sharedSecret) - if err != nil { - log.Error("Failed to process negotiated secret", "err", err) - return - } - - filter := &signal.Filter{ - ChatID: chat.ChatID, - SymKeyID: chat.SymKeyID, - Listen: chat.Listen, - FilterID: chat.FilterID, - Identity: chat.Identity, - Topic: chat.Topic, - } - - filters = append(filters, filter) - } - if len(filters) != 0 { - handler := PublisherSignalHandler{} - handler.WhisperFilterAdded(filters) - } - } -} - // UpdateMailservers updates information about selected mail servers. func (s *Service) UpdateMailservers(nodes []*enode.Node) error { if err := s.peerStore.Update(nodes); err != nil { @@ -344,7 +305,10 @@ func (s *Service) Start(server *p2p.Server) error { s.mailMonitor.Start() s.nodeID = server.PrivateKey s.server = server - return s.Publisher.Start(s.online, true) + if s.config.PFSEnabled { + return s.Publisher.Start(s.online, true) + } + return nil } func (s *Service) online() bool { diff --git a/services/shhext/service_test.go b/services/shhext/service_test.go index 43d397c50..84ccb20db 100644 --- a/services/shhext/service_test.go +++ b/services/shhext/service_test.go @@ -129,6 +129,7 @@ func (s *ShhExtSuite) SetupTest() { db, err := leveldb.Open(storage.NewMemStorage(), nil) s.Require().NoError(err) s.services[i] = New(s.whisper[i], nil, db, config) + s.Require().NoError(s.services[i].InitProtocolWithPassword(fmt.Sprintf("%d", i), "password")) s.NoError(stack.Register(func(n *node.ServiceContext) (node.Service, error) { return s.services[i], nil })) @@ -139,12 +140,26 @@ func (s *ShhExtSuite) SetupTest() { } func (s *ShhExtSuite) TestInitProtocol() { - err := s.services[0].InitProtocolWithPassword("example-address", "`090///\nhtaa\rhta9x8923)$$'23") + directory, err := ioutil.TempDir("", "status-go-testing") + s.Require().NoError(err) + + config := params.ShhextConfig{ + InstallationID: "2", + BackupDisabledDataDir: directory, + PFSEnabled: true, + MailServerConfirmations: true, + ConnectionTarget: 10, + } + db, err := leveldb.Open(storage.NewMemStorage(), nil) + s.Require().NoError(err) + service := New(whisper.New(nil), nil, db, config) + + err = service.InitProtocolWithPassword("example-address", "`090///\nhtaa\rhta9x8923)$$'23") s.NoError(err) digest := sha3.Sum256([]byte("`090///\nhtaa\rhta9x8923)$$'23")) encKey := fmt.Sprintf("%x", digest) - err = s.services[0].InitProtocolWithEncyptionKey("example-address", encKey) + err = service.InitProtocolWithEncyptionKey("example-address", encKey) s.NoError(err) } @@ -351,6 +366,7 @@ func (s *ShhExtSuite) TestRequestMessagesSuccess() { PFSEnabled: true, } service := New(shh, mock, nil, config) + s.Require().NoError(service.InitProtocolWithPassword("abc", "password")) s.Require().NoError(service.Start(aNode.Server())) api := NewPublicAPI(service) @@ -721,6 +737,7 @@ func (s *RequestWithTrackingHistorySuite) SetupTest() { s.localContext = NewContextFromService(context.Background(), s.localService, s.localService.storage) localPkey, err := crypto.GenerateKey() s.Require().NoError(err) + s.Require().NoError(s.localService.InitProtocolWithPassword("local-service", "password")) s.Require().NoError(s.localService.Start(&p2p.Server{Config: p2p.Config{PrivateKey: localPkey}})) s.localAPI = NewPublicAPI(s.localService)