Refactor publisher

This commit is contained in:
Andrea Maria Piana 2019-07-05 14:45:47 +02:00
parent a2295baaf9
commit 2a2fcb48e1
6 changed files with 78 additions and 79 deletions

View File

@ -1 +1 @@
0.30.0-beta.0 0.30.0-alpha.1

View File

@ -292,6 +292,10 @@ func (p *ProtocolService) ConfirmMessagesProcessed(messageIDs [][]byte) error {
return p.encryption.ConfirmMessagesProcessed(messageIDs) return p.encryption.ConfirmMessagesProcessed(messageIDs)
} }
func (p *ProtocolService) GetSharedSecretService() *sharedsecret.Service {
return p.secret
}
// HandleMessage unmarshals a message and processes it, decrypting it if it is a 1:1 message. // HandleMessage unmarshals a message and processes it, decrypting it if it is a 1:1 message.
func (p *ProtocolService) HandleMessage(myIdentityKey *ecdsa.PrivateKey, theirPublicKey *ecdsa.PublicKey, protocolMessage *protobuf.ProtocolMessage, messageID []byte) ([]byte, error) { func (p *ProtocolService) HandleMessage(myIdentityKey *ecdsa.PrivateKey, theirPublicKey *ecdsa.PublicKey, protocolMessage *protobuf.ProtocolMessage, messageID []byte) ([]byte, error) {
p.log.Debug("Received message from", "public-key", theirPublicKey) p.log.Debug("Received message from", "public-key", theirPublicKey)

View File

@ -16,6 +16,7 @@ import (
"github.com/status-im/status-go/messaging/chat/protobuf" "github.com/status-im/status-go/messaging/chat/protobuf"
"github.com/status-im/status-go/messaging/filter" "github.com/status-im/status-go/messaging/filter"
"github.com/status-im/status-go/messaging/multidevice" "github.com/status-im/status-go/messaging/multidevice"
"github.com/status-im/status-go/messaging/sharedsecret"
"github.com/status-im/status-go/services/shhext/whisperutils" "github.com/status-im/status-go/services/shhext/whisperutils"
@ -27,6 +28,8 @@ const (
tickerInterval = 120 tickerInterval = 120
// How often we should publish a contact code in seconds // How often we should publish a contact code in seconds
publishInterval = 21600 publishInterval = 21600
// How often we should check for new messages
pollIntervalMs = 300
) )
var ( var (
@ -66,10 +69,13 @@ func New(w *whisper.Whisper, c Config) *Publisher {
} }
} }
func (p *Publisher) Init(db *sql.DB, protocol *chat.ProtocolService, filter *filter.Service) { func (p *Publisher) Init(db *sql.DB, protocol *chat.ProtocolService, onNewMessagesHandler func([]*filter.Messages)) {
filterService := filter.New(p.whisper, filter.NewSQLLitePersistence(db), protocol.GetSharedSecretService(), onNewMessagesHandler)
p.persistence = NewSQLLitePersistence(db) p.persistence = NewSQLLitePersistence(db)
p.protocol = protocol p.protocol = protocol
p.filter = filter p.filter = filterService
} }
func (p *Publisher) ProcessPublicBundle(myIdentityKey *ecdsa.PrivateKey, bundle *protobuf.Bundle) ([]*multidevice.Installation, error) { func (p *Publisher) ProcessPublicBundle(myIdentityKey *ecdsa.PrivateKey, bundle *protobuf.Bundle) ([]*multidevice.Installation, error) {
@ -173,10 +179,15 @@ func (p *Publisher) GetPublicBundle(identityKey *ecdsa.PublicKey) (*protobuf.Bun
} }
func (p *Publisher) Start(online func() bool, startTicker bool) error { func (p *Publisher) Start(online func() bool, startTicker bool) error {
if p.protocol == nil {
return errProtocolNotInitialized
}
p.online = online p.online = online
if startTicker { if startTicker {
p.startTicker() p.startTicker()
} }
go p.filter.Start(pollIntervalMs * time.Millisecond)
return nil return nil
} }
@ -205,6 +216,15 @@ func (p *Publisher) LoadFilter(chat *filter.Chat) ([]*filter.Chat, error) {
func (p *Publisher) RemoveFilters(chats []*filter.Chat) error { func (p *Publisher) RemoveFilters(chats []*filter.Chat) error {
return p.filter.Remove(chats) return p.filter.Remove(chats)
} }
func (p *Publisher) ProcessNegotiatedSecret(secrets []*sharedsecret.Secret) {
for _, secret := range secrets {
_, err := p.filter.ProcessNegotiatedSecret(secret)
if err != nil {
log.Error("could not process negotiated filter", "err", err)
}
}
}
func (p *Publisher) ProcessMessage(msg *whisper.Message, msgID []byte) error { func (p *Publisher) ProcessMessage(msg *whisper.Message, msgID []byte) error {
if !p.config.PFSEnabled { if !p.config.PFSEnabled {

View File

@ -66,8 +66,6 @@ func (s *ServiceTestSuite) createPublisher(installationID string) (*Publisher, *
sharedSecretService := sharedsecret.NewService(persistence.GetSharedSecretStorage()) sharedSecretService := sharedsecret.NewService(persistence.GetSharedSecretStorage())
filterService := filter.New(whisper, filter.NewSQLLitePersistence(persistence.DB), sharedSecretService, func([]*filter.Messages) {})
multideviceConfig := &multidevice.Config{ multideviceConfig := &multidevice.Config{
InstallationID: installationID, InstallationID: installationID,
ProtocolVersion: chat.ProtocolVersion, ProtocolVersion: chat.ProtocolVersion,
@ -82,14 +80,10 @@ func (s *ServiceTestSuite) createPublisher(installationID string) (*Publisher, *
sharedSecretService, sharedSecretService,
multideviceService, multideviceService,
func(addedBundles []*multidevice.Installation) {}, func(addedBundles []*multidevice.Installation) {},
func(sharedSecrets []*sharedsecret.Secret) { publisher.ProcessNegotiatedSecret,
for _, sharedSecret := range sharedSecrets {
_, _ = filterService.ProcessNegotiatedSecret(sharedSecret)
}
},
) )
publisher.Init(persistence.DB, protocolService, filterService) publisher.Init(persistence.DB, protocolService, func(msg []*filter.Messages) {})
err = publisher.Start(func() bool { return true }, false) err = publisher.Start(func() bool { return true }, false)
s.Require().NoError(err) s.Require().NoError(err)

View File

@ -40,8 +40,6 @@ const (
defaultTimeoutWaitAdded = 5 * time.Second defaultTimeoutWaitAdded = 5 * time.Second
// maxInstallations is a maximum number of supported devices for one account. // maxInstallations is a maximum number of supported devices for one account.
maxInstallations = 3 maxInstallations = 3
// filterCheckIntervalMs is a how often we should check whisper filters for new messages
filterCheckIntervalMs = 300
) )
// EnvelopeEventsHandler used for two different event types. // EnvelopeEventsHandler used for two different event types.
@ -176,13 +174,30 @@ func (s *Service) initProtocol(address, encKey, password string) error {
return err return err
} }
// Initialize sharedsecret multideviceConfig := &multidevice.Config{
sharedSecretService := sharedsecret.NewService(persistence.GetSharedSecretStorage()) InstallationID: s.config.InstallationID,
ProtocolVersion: chat.ProtocolVersion,
MaxInstallations: maxInstallations,
}
addedBundlesHandler := func(addedBundles []*multidevice.Installation) {
handler := PublisherSignalHandler{}
for _, bundle := range addedBundles {
handler.BundleAdded(bundle.Identity, bundle.ID)
}
}
protocolService := chat.NewProtocolService(
chat.NewEncryptionService(
persistence,
chat.DefaultEncryptionServiceConfig(s.config.InstallationID)),
sharedsecret.NewService(persistence.GetSharedSecretStorage()),
multidevice.New(multideviceConfig, persistence.GetMultideviceStorage()),
addedBundlesHandler,
s.ProcessNegotiatedSecret)
// Initialize filter
onNewMessagesHandler := func(messages []*filter.Messages) { onNewMessagesHandler := func(messages []*filter.Messages) {
var signalMessages []*signal.Messages var signalMessages []*signal.Messages
handler := PublisherSignalHandler{}
for _, chatMessages := range messages { for _, chatMessages := range messages {
signalMessage := &signal.Messages{ signalMessage := &signal.Messages{
Error: chatMessages.Error, Error: chatMessages.Error,
@ -197,37 +212,9 @@ func (s *Service) initProtocol(address, encKey, password string) error {
signalMessage.Messages = dedupMessages signalMessage.Messages = dedupMessages
} }
handler.NewMessages(signalMessages) PublisherSignalHandler{}.NewMessages(signalMessages)
} }
s.Publisher.Init(persistence.DB, protocolService, onNewMessagesHandler)
filterService := filter.New(s.w, filter.NewSQLLitePersistence(persistence.DB), sharedSecretService, onNewMessagesHandler)
go filterService.Start(filterCheckIntervalMs * time.Millisecond)
// Initialize multidevice
multideviceConfig := &multidevice.Config{
InstallationID: s.config.InstallationID,
ProtocolVersion: chat.ProtocolVersion,
MaxInstallations: maxInstallations,
}
multideviceService := multidevice.New(multideviceConfig, persistence.GetMultideviceStorage())
addedBundlesHandler := func(addedBundles []*multidevice.Installation) {
handler := PublisherSignalHandler{}
for _, bundle := range addedBundles {
handler.BundleAdded(bundle.Identity, bundle.ID)
}
}
protocolService := chat.NewProtocolService(
chat.NewEncryptionService(
persistence,
chat.DefaultEncryptionServiceConfig(s.config.InstallationID)),
sharedSecretService,
multideviceService,
addedBundlesHandler,
s.newSharedSecretHandler(filterService))
s.Publisher.Init(persistence.DB, protocolService, filterService)
return nil return nil
} }
@ -253,41 +240,15 @@ func (s *Service) processReceivedMessages(messages []*whisper.Message) ([]dedup.
handler := PublisherSignalHandler{} handler := PublisherSignalHandler{}
handler.DecryptMessageFailed(keyString) handler.DecryptMessageFailed(keyString)
default: default:
if err != nil {
log.Error("Failed handling message with error", "err", err) log.Error("Failed handling message with error", "err", err)
} }
} }
}
return dedupMessages, nil return dedupMessages, nil
} }
func (s *Service) newSharedSecretHandler(filterService *filter.Service) func([]*sharedsecret.Secret) {
return func(sharedSecrets []*sharedsecret.Secret) {
var filters []*signal.Filter
for _, sharedSecret := range sharedSecrets {
chat, err := filterService.ProcessNegotiatedSecret(sharedSecret)
if err != nil {
log.Error("Failed to process negotiated secret", "err", err)
return
}
filter := &signal.Filter{
ChatID: chat.ChatID,
SymKeyID: chat.SymKeyID,
Listen: chat.Listen,
FilterID: chat.FilterID,
Identity: chat.Identity,
Topic: chat.Topic,
}
filters = append(filters, filter)
}
if len(filters) != 0 {
handler := PublisherSignalHandler{}
handler.WhisperFilterAdded(filters)
}
}
}
// UpdateMailservers updates information about selected mail servers. // UpdateMailservers updates information about selected mail servers.
func (s *Service) UpdateMailservers(nodes []*enode.Node) error { func (s *Service) UpdateMailservers(nodes []*enode.Node) error {
if err := s.peerStore.Update(nodes); err != nil { if err := s.peerStore.Update(nodes); err != nil {
@ -344,8 +305,11 @@ func (s *Service) Start(server *p2p.Server) error {
s.mailMonitor.Start() s.mailMonitor.Start()
s.nodeID = server.PrivateKey s.nodeID = server.PrivateKey
s.server = server s.server = server
if s.config.PFSEnabled {
return s.Publisher.Start(s.online, true) return s.Publisher.Start(s.online, true)
} }
return nil
}
func (s *Service) online() bool { func (s *Service) online() bool {
return s.server.PeerCount() != 0 return s.server.PeerCount() != 0

View File

@ -129,6 +129,7 @@ func (s *ShhExtSuite) SetupTest() {
db, err := leveldb.Open(storage.NewMemStorage(), nil) db, err := leveldb.Open(storage.NewMemStorage(), nil)
s.Require().NoError(err) s.Require().NoError(err)
s.services[i] = New(s.whisper[i], nil, db, config) s.services[i] = New(s.whisper[i], nil, db, config)
s.Require().NoError(s.services[i].InitProtocolWithPassword(fmt.Sprintf("%d", i), "password"))
s.NoError(stack.Register(func(n *node.ServiceContext) (node.Service, error) { s.NoError(stack.Register(func(n *node.ServiceContext) (node.Service, error) {
return s.services[i], nil return s.services[i], nil
})) }))
@ -139,12 +140,26 @@ func (s *ShhExtSuite) SetupTest() {
} }
func (s *ShhExtSuite) TestInitProtocol() { func (s *ShhExtSuite) TestInitProtocol() {
err := s.services[0].InitProtocolWithPassword("example-address", "`090///\nhtaa\rhta9x8923)$$'23") directory, err := ioutil.TempDir("", "status-go-testing")
s.Require().NoError(err)
config := params.ShhextConfig{
InstallationID: "2",
BackupDisabledDataDir: directory,
PFSEnabled: true,
MailServerConfirmations: true,
ConnectionTarget: 10,
}
db, err := leveldb.Open(storage.NewMemStorage(), nil)
s.Require().NoError(err)
service := New(whisper.New(nil), nil, db, config)
err = service.InitProtocolWithPassword("example-address", "`090///\nhtaa\rhta9x8923)$$'23")
s.NoError(err) s.NoError(err)
digest := sha3.Sum256([]byte("`090///\nhtaa\rhta9x8923)$$'23")) digest := sha3.Sum256([]byte("`090///\nhtaa\rhta9x8923)$$'23"))
encKey := fmt.Sprintf("%x", digest) encKey := fmt.Sprintf("%x", digest)
err = s.services[0].InitProtocolWithEncyptionKey("example-address", encKey) err = service.InitProtocolWithEncyptionKey("example-address", encKey)
s.NoError(err) s.NoError(err)
} }
@ -351,6 +366,7 @@ func (s *ShhExtSuite) TestRequestMessagesSuccess() {
PFSEnabled: true, PFSEnabled: true,
} }
service := New(shh, mock, nil, config) service := New(shh, mock, nil, config)
s.Require().NoError(service.InitProtocolWithPassword("abc", "password"))
s.Require().NoError(service.Start(aNode.Server())) s.Require().NoError(service.Start(aNode.Server()))
api := NewPublicAPI(service) api := NewPublicAPI(service)
@ -721,6 +737,7 @@ func (s *RequestWithTrackingHistorySuite) SetupTest() {
s.localContext = NewContextFromService(context.Background(), s.localService, s.localService.storage) s.localContext = NewContextFromService(context.Background(), s.localService, s.localService.storage)
localPkey, err := crypto.GenerateKey() localPkey, err := crypto.GenerateKey()
s.Require().NoError(err) s.Require().NoError(err)
s.Require().NoError(s.localService.InitProtocolWithPassword("local-service", "password"))
s.Require().NoError(s.localService.Start(&p2p.Server{Config: p2p.Config{PrivateKey: localPkey}})) s.Require().NoError(s.localService.Start(&p2p.Server{Config: p2p.Config{PrivateKey: localPkey}}))
s.localAPI = NewPublicAPI(s.localService) s.localAPI = NewPublicAPI(s.localService)