From b3efbb54f82a80f04163eefbad46c94738283d91 Mon Sep 17 00:00:00 2001 From: Andrea Maria Piana Date: Mon, 1 Jul 2019 12:00:46 +0200 Subject: [PATCH] Send signal on new messages --- VERSION | 2 +- messaging/filter/service.go | 95 +++++++++++++++++++++++---- messaging/filter/service_test.go | 4 +- messaging/publisher/publisher.go | 6 +- messaging/publisher/publisher_test.go | 2 +- services/shhext/api.go | 27 +------- services/shhext/service.go | 54 ++++++++++++++- services/shhext/signal.go | 4 ++ signal/events_shhext.go | 20 ++++++ 9 files changed, 170 insertions(+), 44 deletions(-) diff --git a/VERSION b/VERSION index ca3b989fa..198ca2920 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.29.0-beta.3 +0.30.0-beta.0 diff --git a/messaging/filter/service.go b/messaging/filter/service.go index 4531b8d0c..751b3c8a4 100644 --- a/messaging/filter/service.go +++ b/messaging/filter/service.go @@ -11,6 +11,7 @@ import ( whisper "github.com/status-im/whisper/whisperv6" "math/big" "sync" + "time" ) const ( @@ -48,23 +49,33 @@ type Chat struct { Negotiated bool `json:"negotiated"` } +type Messages struct { + Chat *Chat `json:"chat"` + Messages []*whisper.Message `json:"messages"` + Error error `json:"error"` +} + type Service struct { - whisper *whisper.Whisper - secret *sharedsecret.Service - chats map[string]*Chat - persistence Persistence - mutex sync.Mutex - keys map[string][]byte + whisper *whisper.Whisper + secret *sharedsecret.Service + chats map[string]*Chat + persistence Persistence + mutex sync.Mutex + keys map[string][]byte + quit chan struct{} + onNewMessages func([]*Messages) } // New returns a new filter service -func New(w *whisper.Whisper, p Persistence, s *sharedsecret.Service) *Service { +func New(w *whisper.Whisper, p Persistence, s *sharedsecret.Service, onNewMessages func([]*Messages)) *Service { return &Service{ - whisper: w, - secret: s, - mutex: sync.Mutex{}, - persistence: p, - chats: make(map[string]*Chat), + whisper: w, + secret: s, + mutex: sync.Mutex{}, + persistence: p, + chats: make(map[string]*Chat), + quit: make(chan struct{}), + onNewMessages: onNewMessages, } } @@ -137,9 +148,29 @@ func (s *Service) Init(chats []*Chat) ([]*Chat, error) { return allChats, nil } +func (s *Service) Start(checkPeriod time.Duration) { + ticker := time.NewTicker(checkPeriod) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + messages := s.getMessages() + + if len(messages) != 0 { + s.onNewMessages(messages) + } + case <-s.quit: + return + } + } +} + // Stop removes all the filters func (s *Service) Stop() error { var chats []*Chat + + close(s.quit) for _, chat := range s.chats { chats = append(chats, chat) } @@ -484,6 +515,46 @@ func (s *Service) addAsymmetricFilter(keyAsym *ecdsa.PrivateKey, chatID string, return &Filter{FilterID: id, Topic: whisper.BytesToTopic(topic)}, nil } +func (s *Service) getMessages() []*Messages { + var response []*Messages + s.mutex.Lock() + defer s.mutex.Unlock() + + for chatID := range s.chats { + messages := s.getMessagesForChat(chatID) + if messages.Error != nil || len(messages.Messages) != 0 { + response = append(response, messages) + } + } + + return response +} + +func (s *Service) getMessagesForChat(chatID string) *Messages { + response := &Messages{} + + response.Chat = s.chats[chatID] + if response.Chat == nil { + response.Error = errors.New("Chat not found") + + return response + } + + filter := s.whisper.GetFilter(response.Chat.FilterID) + if filter == nil { + response.Error = errors.New("Filter not found") + return response + } + + receivedMessages := filter.Retrieve() + response.Messages = make([]*whisper.Message, 0, len(receivedMessages)) + for _, msg := range receivedMessages { + response.Messages = append(response.Messages, whisper.ToWhisperMessage(msg)) + } + + return response +} + func negotiatedID(identity *ecdsa.PublicKey) string { return fmt.Sprintf("0x%x-negotiated", crypto.FromECDSAPub(identity)) } diff --git a/messaging/filter/service_test.go b/messaging/filter/service_test.go index 49b3afb38..e98a338db 100644 --- a/messaging/filter/service_test.go +++ b/messaging/filter/service_test.go @@ -75,7 +75,7 @@ func (s *ServiceTestSuite) SetupTest() { persistence := NewSQLLitePersistence(db) - s.service = New(whisper, persistence, sharedSecretService) + s.service = New(whisper, persistence, sharedSecretService, func([]*Messages) {}) } func (s *ServiceTestSuite) TearDownTest() { @@ -152,7 +152,7 @@ func (s *ServiceTestSuite) TestLoadFromCache() { s.Require().NoError(err) // We create another service using the same persistence - service2 := New(s.service.whisper, s.service.persistence, s.service.secret) + service2 := New(s.service.whisper, s.service.persistence, s.service.secret, func([]*Messages) {}) _, err = service2.Init(chats) s.Require().NoError(err) } diff --git a/messaging/publisher/publisher.go b/messaging/publisher/publisher.go index eb2652676..f8a65470c 100644 --- a/messaging/publisher/publisher.go +++ b/messaging/publisher/publisher.go @@ -403,6 +403,11 @@ func (p *Publisher) sendContactCode() (*whisper.NewMessage, error) { return nil, nil } + if p.persistence == nil { + p.log.Info("not initialized, skipping") + return nil, nil + } + lastPublished, err := p.persistence.Get() if err != nil { p.log.Error("could not fetch config from db", "err", err) @@ -412,7 +417,6 @@ func (p *Publisher) sendContactCode() (*whisper.NewMessage, error) { now := time.Now().Unix() if now-lastPublished < publishInterval { - fmt.Println("NOTHING") p.log.Debug("nothing to do") return nil, nil } diff --git a/messaging/publisher/publisher_test.go b/messaging/publisher/publisher_test.go index 986221743..32593f526 100644 --- a/messaging/publisher/publisher_test.go +++ b/messaging/publisher/publisher_test.go @@ -66,7 +66,7 @@ func (s *ServiceTestSuite) createPublisher(installationID string) (*Publisher, * sharedSecretService := sharedsecret.NewService(persistence.GetSharedSecretStorage()) - filterService := filter.New(whisper, filter.NewSQLLitePersistence(persistence.DB), sharedSecretService) + filterService := filter.New(whisper, filter.NewSQLLitePersistence(persistence.DB), sharedSecretService, func([]*filter.Messages) {}) multideviceConfig := &multidevice.Config{ InstallationID: installationID, diff --git a/services/shhext/api.go b/services/shhext/api.go index f9fc060d5..4cc68296c 100644 --- a/services/shhext/api.go +++ b/services/shhext/api.go @@ -11,7 +11,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/rlp" @@ -417,31 +416,7 @@ func (api *PublicAPI) GetNewFilterMessages(filterID string) ([]dedup.Deduplicate return nil, err } - dedupMessages := api.service.deduplicator.Deduplicate(msgs) - - // Attempt to decrypt message, otherwise leave unchanged - for _, dedupMessage := range dedupMessages { - err := api.service.ProcessMessage(dedupMessage.Message, dedupMessage.DedupID) - switch err { - case chat.ErrNotPairedDevice: - api.log.Info("Received a message from non-paired device", "err", err) - case chat.ErrDeviceNotFound: - api.log.Warn("Device not found, sending signal", "err", err) - - publicKey, err := crypto.UnmarshalPubkey(dedupMessage.Message.Sig) - if err != nil { - return nil, fmt.Errorf("failed to handler chat.ErrDeviceNotFound: %v", err) - } - - keyString := fmt.Sprintf("%#x", crypto.FromECDSAPub(publicKey)) - handler := PublisherSignalHandler{} - handler.DecryptMessageFailed(keyString) - default: - api.log.Error("Failed handling message with error", "err", err) - } - } - - return dedupMessages, nil + return api.service.processReceivedMessages(msgs) } // ConfirmMessagesProcessed is a method to confirm that messages was consumed by diff --git a/services/shhext/service.go b/services/shhext/service.go index 184494952..dcf4101e3 100644 --- a/services/shhext/service.go +++ b/services/shhext/service.go @@ -9,6 +9,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" @@ -39,6 +40,8 @@ const ( defaultTimeoutWaitAdded = 5 * time.Second // maxInstallations is a maximum number of supported devices for one account. maxInstallations = 3 + // filterCheckIntervalMs is a how often we should check whisper filters for new messages + filterCheckIntervalMs = 300 ) // EnvelopeEventsHandler used for two different event types. @@ -177,7 +180,28 @@ func (s *Service) initProtocol(address, encKey, password string) error { sharedSecretService := sharedsecret.NewService(persistence.GetSharedSecretStorage()) // Initialize filter - filterService := filter.New(s.w, filter.NewSQLLitePersistence(persistence.DB), sharedSecretService) + onNewMessagesHandler := func(messages []*filter.Messages) { + var signalMessages []*signal.Messages + handler := PublisherSignalHandler{} + for _, chatMessages := range messages { + signalMessage := &signal.Messages{ + Error: chatMessages.Error, + Chat: chatMessages.Chat, + } + signalMessages = append(signalMessages, signalMessage) + dedupMessages, err := s.processReceivedMessages(chatMessages.Messages) + if err != nil { + log.Error("could not process messages", "err", err) + continue + } + + signalMessage.Messages = dedupMessages + } + handler.NewMessages(signalMessages) + } + + filterService := filter.New(s.w, filter.NewSQLLitePersistence(persistence.DB), sharedSecretService, onNewMessagesHandler) + go filterService.Start(filterCheckIntervalMs * time.Millisecond) // Initialize multidevice multideviceConfig := &multidevice.Config{ @@ -208,6 +232,34 @@ func (s *Service) initProtocol(address, encKey, password string) error { return nil } +func (s *Service) processReceivedMessages(messages []*whisper.Message) ([]dedup.DeduplicateMessage, error) { + dedupMessages := s.deduplicator.Deduplicate(messages) + + // Attempt to decrypt message, otherwise leave unchanged + for _, dedupMessage := range dedupMessages { + err := s.ProcessMessage(dedupMessage.Message, dedupMessage.DedupID) + switch err { + case chat.ErrNotPairedDevice: + log.Info("Received a message from non-paired device", "err", err) + case chat.ErrDeviceNotFound: + log.Warn("Device not found, sending signal", "err", err) + + publicKey, err := crypto.UnmarshalPubkey(dedupMessage.Message.Sig) + if err != nil { + return nil, fmt.Errorf("failed to handler chat.ErrDeviceNotFound: %v", err) + } + + keyString := fmt.Sprintf("%#x", crypto.FromECDSAPub(publicKey)) + handler := PublisherSignalHandler{} + handler.DecryptMessageFailed(keyString) + default: + log.Error("Failed handling message with error", "err", err) + } + } + + return dedupMessages, nil +} + func (s *Service) newSharedSecretHandler(filterService *filter.Service) func([]*sharedsecret.Secret) { return func(sharedSecrets []*sharedsecret.Secret) { var filters []*signal.Filter diff --git a/services/shhext/signal.go b/services/shhext/signal.go index 1a00f3748..e6ba1cac5 100644 --- a/services/shhext/signal.go +++ b/services/shhext/signal.go @@ -42,3 +42,7 @@ func (h PublisherSignalHandler) BundleAdded(identity string, installationID stri func (h PublisherSignalHandler) WhisperFilterAdded(filters []*signal.Filter) { signal.SendWhisperFilterAdded(filters) } + +func (h PublisherSignalHandler) NewMessages(messages []*signal.Messages) { + signal.SendNewMessages(messages) +} diff --git a/signal/events_shhext.go b/signal/events_shhext.go index 8e4f93897..2c0fe4133 100644 --- a/signal/events_shhext.go +++ b/signal/events_shhext.go @@ -4,6 +4,8 @@ import ( "encoding/hex" "github.com/ethereum/go-ethereum/common" + "github.com/status-im/status-go/messaging/filter" + "github.com/status-im/status-go/services/shhext/dedup" whisper "github.com/status-im/whisper/whisperv6" ) @@ -35,6 +37,9 @@ const ( // EventWhisperFilterAdded is triggered when we setup a new filter or restore existing ones EventWhisperFilterAdded = "whisper.filter.added" + + // EventNewMessages is triggered when we receive new messages + EventNewMessages = "messages.new" ) // EnvelopeSignal includes hash of the envelope. @@ -81,6 +86,11 @@ type WhisperFilterAddedSignal struct { Filters []*Filter `json:"filters"` } +// NewMessagesSignal notifies clients of new messages +type NewMessagesSignal struct { + Messages []*Messages `json:"messages"` +} + // SendEnvelopeSent triggered when envelope delivered at least to 1 peer. func SendEnvelopeSent(hash common.Hash) { send(EventEnvelopeSent, EnvelopeSignal{Hash: hash}) @@ -121,6 +131,12 @@ type EnodeDiscoveredSignal struct { Topic string `json:"topic"` } +type Messages struct { + Error error `json:"error"` + Messages []dedup.DeduplicateMessage `json:"messages"` + Chat *filter.Chat `json:"chat"` +} + // SendEnodeDiscovered tiggered when an enode is discovered. // finds a new enode. func SendEnodeDiscovered(enode, topic string) { @@ -141,3 +157,7 @@ func SendBundleAdded(identity string, installationID string) { func SendWhisperFilterAdded(filters []*Filter) { send(EventWhisperFilterAdded, WhisperFilterAddedSignal{Filters: filters}) } + +func SendNewMessages(messages []*Messages) { + send(EventNewMessages, NewMessagesSignal{Messages: messages}) +}