Send signal on new messages

This commit is contained in:
Andrea Maria Piana 2019-07-01 12:00:46 +02:00
parent dbaf622e12
commit b3efbb54f8
9 changed files with 170 additions and 44 deletions

View File

@ -1 +1 @@
0.29.0-beta.3
0.30.0-beta.0

View File

@ -11,6 +11,7 @@ import (
whisper "github.com/status-im/whisper/whisperv6"
"math/big"
"sync"
"time"
)
const (
@ -48,23 +49,33 @@ type Chat struct {
Negotiated bool `json:"negotiated"`
}
type Messages struct {
Chat *Chat `json:"chat"`
Messages []*whisper.Message `json:"messages"`
Error error `json:"error"`
}
type Service struct {
whisper *whisper.Whisper
secret *sharedsecret.Service
chats map[string]*Chat
persistence Persistence
mutex sync.Mutex
keys map[string][]byte
whisper *whisper.Whisper
secret *sharedsecret.Service
chats map[string]*Chat
persistence Persistence
mutex sync.Mutex
keys map[string][]byte
quit chan struct{}
onNewMessages func([]*Messages)
}
// New returns a new filter service
func New(w *whisper.Whisper, p Persistence, s *sharedsecret.Service) *Service {
func New(w *whisper.Whisper, p Persistence, s *sharedsecret.Service, onNewMessages func([]*Messages)) *Service {
return &Service{
whisper: w,
secret: s,
mutex: sync.Mutex{},
persistence: p,
chats: make(map[string]*Chat),
whisper: w,
secret: s,
mutex: sync.Mutex{},
persistence: p,
chats: make(map[string]*Chat),
quit: make(chan struct{}),
onNewMessages: onNewMessages,
}
}
@ -137,9 +148,29 @@ func (s *Service) Init(chats []*Chat) ([]*Chat, error) {
return allChats, nil
}
func (s *Service) Start(checkPeriod time.Duration) {
ticker := time.NewTicker(checkPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
messages := s.getMessages()
if len(messages) != 0 {
s.onNewMessages(messages)
}
case <-s.quit:
return
}
}
}
// Stop removes all the filters
func (s *Service) Stop() error {
var chats []*Chat
close(s.quit)
for _, chat := range s.chats {
chats = append(chats, chat)
}
@ -484,6 +515,46 @@ func (s *Service) addAsymmetricFilter(keyAsym *ecdsa.PrivateKey, chatID string,
return &Filter{FilterID: id, Topic: whisper.BytesToTopic(topic)}, nil
}
func (s *Service) getMessages() []*Messages {
var response []*Messages
s.mutex.Lock()
defer s.mutex.Unlock()
for chatID := range s.chats {
messages := s.getMessagesForChat(chatID)
if messages.Error != nil || len(messages.Messages) != 0 {
response = append(response, messages)
}
}
return response
}
func (s *Service) getMessagesForChat(chatID string) *Messages {
response := &Messages{}
response.Chat = s.chats[chatID]
if response.Chat == nil {
response.Error = errors.New("Chat not found")
return response
}
filter := s.whisper.GetFilter(response.Chat.FilterID)
if filter == nil {
response.Error = errors.New("Filter not found")
return response
}
receivedMessages := filter.Retrieve()
response.Messages = make([]*whisper.Message, 0, len(receivedMessages))
for _, msg := range receivedMessages {
response.Messages = append(response.Messages, whisper.ToWhisperMessage(msg))
}
return response
}
func negotiatedID(identity *ecdsa.PublicKey) string {
return fmt.Sprintf("0x%x-negotiated", crypto.FromECDSAPub(identity))
}

View File

@ -75,7 +75,7 @@ func (s *ServiceTestSuite) SetupTest() {
persistence := NewSQLLitePersistence(db)
s.service = New(whisper, persistence, sharedSecretService)
s.service = New(whisper, persistence, sharedSecretService, func([]*Messages) {})
}
func (s *ServiceTestSuite) TearDownTest() {
@ -152,7 +152,7 @@ func (s *ServiceTestSuite) TestLoadFromCache() {
s.Require().NoError(err)
// We create another service using the same persistence
service2 := New(s.service.whisper, s.service.persistence, s.service.secret)
service2 := New(s.service.whisper, s.service.persistence, s.service.secret, func([]*Messages) {})
_, err = service2.Init(chats)
s.Require().NoError(err)
}

View File

@ -403,6 +403,11 @@ func (p *Publisher) sendContactCode() (*whisper.NewMessage, error) {
return nil, nil
}
if p.persistence == nil {
p.log.Info("not initialized, skipping")
return nil, nil
}
lastPublished, err := p.persistence.Get()
if err != nil {
p.log.Error("could not fetch config from db", "err", err)
@ -412,7 +417,6 @@ func (p *Publisher) sendContactCode() (*whisper.NewMessage, error) {
now := time.Now().Unix()
if now-lastPublished < publishInterval {
fmt.Println("NOTHING")
p.log.Debug("nothing to do")
return nil, nil
}

View File

@ -66,7 +66,7 @@ func (s *ServiceTestSuite) createPublisher(installationID string) (*Publisher, *
sharedSecretService := sharedsecret.NewService(persistence.GetSharedSecretStorage())
filterService := filter.New(whisper, filter.NewSQLLitePersistence(persistence.DB), sharedSecretService)
filterService := filter.New(whisper, filter.NewSQLLitePersistence(persistence.DB), sharedSecretService, func([]*filter.Messages) {})
multideviceConfig := &multidevice.Config{
InstallationID: installationID,

View File

@ -11,7 +11,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
@ -417,31 +416,7 @@ func (api *PublicAPI) GetNewFilterMessages(filterID string) ([]dedup.Deduplicate
return nil, err
}
dedupMessages := api.service.deduplicator.Deduplicate(msgs)
// Attempt to decrypt message, otherwise leave unchanged
for _, dedupMessage := range dedupMessages {
err := api.service.ProcessMessage(dedupMessage.Message, dedupMessage.DedupID)
switch err {
case chat.ErrNotPairedDevice:
api.log.Info("Received a message from non-paired device", "err", err)
case chat.ErrDeviceNotFound:
api.log.Warn("Device not found, sending signal", "err", err)
publicKey, err := crypto.UnmarshalPubkey(dedupMessage.Message.Sig)
if err != nil {
return nil, fmt.Errorf("failed to handler chat.ErrDeviceNotFound: %v", err)
}
keyString := fmt.Sprintf("%#x", crypto.FromECDSAPub(publicKey))
handler := PublisherSignalHandler{}
handler.DecryptMessageFailed(keyString)
default:
api.log.Error("Failed handling message with error", "err", err)
}
}
return dedupMessages, nil
return api.service.processReceivedMessages(msgs)
}
// ConfirmMessagesProcessed is a method to confirm that messages was consumed by

View File

@ -9,6 +9,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
@ -39,6 +40,8 @@ const (
defaultTimeoutWaitAdded = 5 * time.Second
// maxInstallations is a maximum number of supported devices for one account.
maxInstallations = 3
// filterCheckIntervalMs is a how often we should check whisper filters for new messages
filterCheckIntervalMs = 300
)
// EnvelopeEventsHandler used for two different event types.
@ -177,7 +180,28 @@ func (s *Service) initProtocol(address, encKey, password string) error {
sharedSecretService := sharedsecret.NewService(persistence.GetSharedSecretStorage())
// Initialize filter
filterService := filter.New(s.w, filter.NewSQLLitePersistence(persistence.DB), sharedSecretService)
onNewMessagesHandler := func(messages []*filter.Messages) {
var signalMessages []*signal.Messages
handler := PublisherSignalHandler{}
for _, chatMessages := range messages {
signalMessage := &signal.Messages{
Error: chatMessages.Error,
Chat: chatMessages.Chat,
}
signalMessages = append(signalMessages, signalMessage)
dedupMessages, err := s.processReceivedMessages(chatMessages.Messages)
if err != nil {
log.Error("could not process messages", "err", err)
continue
}
signalMessage.Messages = dedupMessages
}
handler.NewMessages(signalMessages)
}
filterService := filter.New(s.w, filter.NewSQLLitePersistence(persistence.DB), sharedSecretService, onNewMessagesHandler)
go filterService.Start(filterCheckIntervalMs * time.Millisecond)
// Initialize multidevice
multideviceConfig := &multidevice.Config{
@ -208,6 +232,34 @@ func (s *Service) initProtocol(address, encKey, password string) error {
return nil
}
func (s *Service) processReceivedMessages(messages []*whisper.Message) ([]dedup.DeduplicateMessage, error) {
dedupMessages := s.deduplicator.Deduplicate(messages)
// Attempt to decrypt message, otherwise leave unchanged
for _, dedupMessage := range dedupMessages {
err := s.ProcessMessage(dedupMessage.Message, dedupMessage.DedupID)
switch err {
case chat.ErrNotPairedDevice:
log.Info("Received a message from non-paired device", "err", err)
case chat.ErrDeviceNotFound:
log.Warn("Device not found, sending signal", "err", err)
publicKey, err := crypto.UnmarshalPubkey(dedupMessage.Message.Sig)
if err != nil {
return nil, fmt.Errorf("failed to handler chat.ErrDeviceNotFound: %v", err)
}
keyString := fmt.Sprintf("%#x", crypto.FromECDSAPub(publicKey))
handler := PublisherSignalHandler{}
handler.DecryptMessageFailed(keyString)
default:
log.Error("Failed handling message with error", "err", err)
}
}
return dedupMessages, nil
}
func (s *Service) newSharedSecretHandler(filterService *filter.Service) func([]*sharedsecret.Secret) {
return func(sharedSecrets []*sharedsecret.Secret) {
var filters []*signal.Filter

View File

@ -42,3 +42,7 @@ func (h PublisherSignalHandler) BundleAdded(identity string, installationID stri
func (h PublisherSignalHandler) WhisperFilterAdded(filters []*signal.Filter) {
signal.SendWhisperFilterAdded(filters)
}
func (h PublisherSignalHandler) NewMessages(messages []*signal.Messages) {
signal.SendNewMessages(messages)
}

View File

@ -4,6 +4,8 @@ import (
"encoding/hex"
"github.com/ethereum/go-ethereum/common"
"github.com/status-im/status-go/messaging/filter"
"github.com/status-im/status-go/services/shhext/dedup"
whisper "github.com/status-im/whisper/whisperv6"
)
@ -35,6 +37,9 @@ const (
// EventWhisperFilterAdded is triggered when we setup a new filter or restore existing ones
EventWhisperFilterAdded = "whisper.filter.added"
// EventNewMessages is triggered when we receive new messages
EventNewMessages = "messages.new"
)
// EnvelopeSignal includes hash of the envelope.
@ -81,6 +86,11 @@ type WhisperFilterAddedSignal struct {
Filters []*Filter `json:"filters"`
}
// NewMessagesSignal notifies clients of new messages
type NewMessagesSignal struct {
Messages []*Messages `json:"messages"`
}
// SendEnvelopeSent triggered when envelope delivered at least to 1 peer.
func SendEnvelopeSent(hash common.Hash) {
send(EventEnvelopeSent, EnvelopeSignal{Hash: hash})
@ -121,6 +131,12 @@ type EnodeDiscoveredSignal struct {
Topic string `json:"topic"`
}
type Messages struct {
Error error `json:"error"`
Messages []dedup.DeduplicateMessage `json:"messages"`
Chat *filter.Chat `json:"chat"`
}
// SendEnodeDiscovered tiggered when an enode is discovered.
// finds a new enode.
func SendEnodeDiscovered(enode, topic string) {
@ -141,3 +157,7 @@ func SendBundleAdded(identity string, installationID string) {
func SendWhisperFilterAdded(filters []*Filter) {
send(EventWhisperFilterAdded, WhisperFilterAddedSignal{Filters: filters})
}
func SendNewMessages(messages []*Messages) {
send(EventNewMessages, NewMessagesSignal{Messages: messages})
}