move installations to subscription

This commit is contained in:
Andrea Maria Piana 2020-07-31 11:08:09 +02:00
parent be8e64fe8d
commit dd3cf55556
No known key found for this signature in database
GPG Key ID: AA6CCA6DE0E06424
8 changed files with 82 additions and 38 deletions

View File

@ -17,7 +17,6 @@ import (
"github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/encryption" "github.com/status-im/status-go/protocol/encryption"
"github.com/status-im/status-go/protocol/encryption/multidevice"
"github.com/status-im/status-go/protocol/encryption/sharedsecret" "github.com/status-im/status-go/protocol/encryption/sharedsecret"
"github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/sqlite" "github.com/status-im/status-go/protocol/sqlite"
@ -63,13 +62,11 @@ func (s *MessageProcessorSuite) SetupTest() {
database, err := sqlite.Open(filepath.Join(s.tmpDir, "processor-test.sql"), "some-key") database, err := sqlite.Open(filepath.Join(s.tmpDir, "processor-test.sql"), "some-key")
s.Require().NoError(err) s.Require().NoError(err)
onNewInstallations := func([]*multidevice.Installation) {}
onNewSharedSecret := func([]*sharedsecret.Secret) {} onNewSharedSecret := func([]*sharedsecret.Secret) {}
onSendContactCode := func(*encryption.ProtocolMessageSpec) {} onSendContactCode := func(*encryption.ProtocolMessageSpec) {}
encryptionProtocol := encryption.New( encryptionProtocol := encryption.New(
database, database,
"installation-1", "installation-1",
onNewInstallations,
onNewSharedSecret, onNewSharedSecret,
onSendContactCode, onSendContactCode,
s.logger, s.logger,
@ -206,7 +203,6 @@ func (s *MessageProcessorSuite) TestHandleDecodedMessagesDatasyncEncrypted() {
senderEncryptionProtocol := encryption.New( senderEncryptionProtocol := encryption.New(
senderDatabase, senderDatabase,
"installation-2", "installation-2",
func([]*multidevice.Installation) {},
func([]*sharedsecret.Secret) {}, func([]*sharedsecret.Secret) {},
func(*encryption.ProtocolMessageSpec) {}, func(*encryption.ProtocolMessageSpec) {},
s.logger, s.logger,

View File

@ -65,7 +65,6 @@ func setupUser(user string, s *EncryptionServiceMultiDeviceSuite, n int) error {
protocol := New( protocol := New(
db, db,
installationID, installationID,
func(s []*multidevice.Installation) {},
func(s []*sharedsecret.Secret) {}, func(s []*sharedsecret.Secret) {},
func(*ProtocolMessageSpec) {}, func(*ProtocolMessageSpec) {},
s.logger.With(zap.String("user", user)), s.logger.With(zap.String("user", user)),

View File

@ -19,7 +19,6 @@ import (
"github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/protocol/encryption/multidevice"
"github.com/status-im/status-go/protocol/encryption/sharedsecret" "github.com/status-im/status-go/protocol/encryption/sharedsecret"
) )
@ -57,7 +56,6 @@ func (s *EncryptionServiceTestSuite) initDatabases(config encryptorConfig) {
db, db,
aliceInstallationID, aliceInstallationID,
config, config,
func(s []*multidevice.Installation) {},
func(s []*sharedsecret.Secret) {}, func(s []*sharedsecret.Secret) {},
func(*ProtocolMessageSpec) {}, func(*ProtocolMessageSpec) {},
s.logger.With(zap.String("user", "alice")), s.logger.With(zap.String("user", "alice")),
@ -70,7 +68,6 @@ func (s *EncryptionServiceTestSuite) initDatabases(config encryptorConfig) {
db, db,
bobInstallationID, bobInstallationID,
config, config,
func(s []*multidevice.Installation) {},
func(s []*sharedsecret.Secret) {}, func(s []*sharedsecret.Secret) {},
func(*ProtocolMessageSpec) {}, func(*ProtocolMessageSpec) {},
s.logger.With(zap.String("user", "bob")), s.logger.With(zap.String("user", "bob")),

View File

@ -24,6 +24,7 @@ const (
sharedSecretNegotiationVersion = 1 sharedSecretNegotiationVersion = 1
partitionedTopicMinVersion = 1 partitionedTopicMinVersion = 1
defaultMinVersion = 0 defaultMinVersion = 0
subscriptionsChannelSize = 100
) )
type PartitionTopicMode int type PartitionTopicMode int
@ -70,8 +71,8 @@ type Protocol struct {
secret *sharedsecret.SharedSecret secret *sharedsecret.SharedSecret
multidevice *multidevice.Multidevice multidevice *multidevice.Multidevice
publisher *publisher.Publisher publisher *publisher.Publisher
subscriptions *Subscriptions
onAddedBundlesHandler func([]*multidevice.Installation)
onNewSharedSecretHandler func([]*sharedsecret.Secret) onNewSharedSecretHandler func([]*sharedsecret.Secret)
onSendContactCodeHandler func(*ProtocolMessageSpec) onSendContactCodeHandler func(*ProtocolMessageSpec)
@ -87,7 +88,6 @@ var (
func New( func New(
db *sql.DB, db *sql.DB,
installationID string, installationID string,
addedBundlesHandler func([]*multidevice.Installation),
onNewSharedSecretHandler func([]*sharedsecret.Secret), onNewSharedSecretHandler func([]*sharedsecret.Secret),
onSendContactCodeHandler func(*ProtocolMessageSpec), onSendContactCodeHandler func(*ProtocolMessageSpec),
logger *zap.Logger, logger *zap.Logger,
@ -96,7 +96,6 @@ func New(
db, db,
installationID, installationID,
defaultEncryptorConfig(installationID, logger), defaultEncryptorConfig(installationID, logger),
addedBundlesHandler,
onNewSharedSecretHandler, onNewSharedSecretHandler,
onSendContactCodeHandler, onSendContactCodeHandler,
logger, logger,
@ -109,7 +108,6 @@ func NewWithEncryptorConfig(
db *sql.DB, db *sql.DB,
installationID string, installationID string,
encryptorConfig encryptorConfig, encryptorConfig encryptorConfig,
addedBundlesHandler func([]*multidevice.Installation),
onNewSharedSecretHandler func([]*sharedsecret.Secret), onNewSharedSecretHandler func([]*sharedsecret.Secret),
onSendContactCodeHandler func(*ProtocolMessageSpec), onSendContactCodeHandler func(*ProtocolMessageSpec),
logger *zap.Logger, logger *zap.Logger,
@ -123,18 +121,30 @@ func NewWithEncryptorConfig(
InstallationID: installationID, InstallationID: installationID,
}), }),
publisher: publisher.New(logger), publisher: publisher.New(logger),
onAddedBundlesHandler: addedBundlesHandler,
onNewSharedSecretHandler: onNewSharedSecretHandler, onNewSharedSecretHandler: onNewSharedSecretHandler,
onSendContactCodeHandler: onSendContactCodeHandler, onSendContactCodeHandler: onSendContactCodeHandler,
logger: logger.With(zap.Namespace("Protocol")), logger: logger.With(zap.Namespace("Protocol")),
} }
} }
func (p *Protocol) Start(myIdentity *ecdsa.PrivateKey) error { type Subscriptions struct {
NewInstallations chan []*multidevice.Installation
NewSharedSecret chan []*sharedsecret.Secret
SendContactCode <-chan struct{}
Quit chan struct{}
}
func (p *Protocol) Start(myIdentity *ecdsa.PrivateKey) (*Subscriptions, error) {
// Propagate currently cached shared secrets. // Propagate currently cached shared secrets.
secrets, err := p.secret.All() secrets, err := p.secret.All()
if err != nil { if err != nil {
return errors.Wrap(err, "failed to get all secrets") return nil, errors.Wrap(err, "failed to get all secrets")
}
p.subscriptions = &Subscriptions{
NewInstallations: make(chan []*multidevice.Installation, subscriptionsChannelSize),
NewSharedSecret: make(chan []*sharedsecret.Secret, subscriptionsChannelSize),
SendContactCode: p.publisher.Start(),
Quit: make(chan struct{}),
} }
p.onNewSharedSecretHandler(secrets) p.onNewSharedSecretHandler(secrets)
@ -155,6 +165,14 @@ func (p *Protocol) Start(myIdentity *ecdsa.PrivateKey) error {
} }
}() }()
return p.subscriptions, nil
}
func (p *Protocol) Stop() error {
p.publisher.Stop()
if p.subscriptions != nil {
close(p.subscriptions.Quit)
}
return nil return nil
} }
@ -433,7 +451,14 @@ func (p *Protocol) HandleMessage(
return nil, err return nil, err
} }
p.onAddedBundlesHandler(addedBundles) // Publish without blocking if channel is full
if p.subscriptions != nil {
select {
case p.subscriptions.NewInstallations <- addedBundles:
default:
p.logger.Warn("new installations channel full, dropping message")
}
}
} }
// Check if it's a public message // Check if it's a public message

View File

@ -14,7 +14,6 @@ import (
"github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/protocol/encryption/multidevice"
"github.com/status-im/status-go/protocol/encryption/sharedsecret" "github.com/status-im/status-go/protocol/encryption/sharedsecret"
) )
@ -44,7 +43,6 @@ func (s *ProtocolServiceTestSuite) SetupTest() {
s.Require().NoError(err) s.Require().NoError(err)
bobDBKey := "bob" bobDBKey := "bob"
addedBundlesHandler := func(addedBundles []*multidevice.Installation) {}
onNewSharedSecretHandler := func(secret []*sharedsecret.Secret) {} onNewSharedSecretHandler := func(secret []*sharedsecret.Secret) {}
db, err := sqlite.Open(s.aliceDBPath.Name(), aliceDBKey) db, err := sqlite.Open(s.aliceDBPath.Name(), aliceDBKey)
@ -52,7 +50,6 @@ func (s *ProtocolServiceTestSuite) SetupTest() {
s.alice = New( s.alice = New(
db, db,
"1", "1",
addedBundlesHandler,
onNewSharedSecretHandler, onNewSharedSecretHandler,
func(*ProtocolMessageSpec) {}, func(*ProtocolMessageSpec) {},
s.logger.With(zap.String("user", "alice")), s.logger.With(zap.String("user", "alice")),
@ -63,7 +60,6 @@ func (s *ProtocolServiceTestSuite) SetupTest() {
s.bob = New( s.bob = New(
db, db,
"2", "2",
addedBundlesHandler,
onNewSharedSecretHandler, onNewSharedSecretHandler,
func(*ProtocolMessageSpec) {}, func(*ProtocolMessageSpec) {},
s.logger.With(zap.String("user", "bob")), s.logger.With(zap.String("user", "bob")),
@ -190,7 +186,7 @@ func (s *ProtocolServiceTestSuite) TestPropagatingSavedSharedSecretsOnStart() {
secretResponse = secret secretResponse = secret
} }
err = s.alice.Start(aliceKey) _, err = s.alice.Start(aliceKey)
s.NoError(err) s.NoError(err)
s.Require().NotNil(secretResponse) s.Require().NotNil(secretResponse)

View File

@ -55,6 +55,10 @@ func (p *Publisher) Start() <-chan struct{} {
} }
func (p *Publisher) Stop() { func (p *Publisher) Stop() {
// If hasn't started, ignore
if p.quit == nil {
return
}
select { select {
case _, ok := <-p.quit: case _, ok := <-p.quit:
if !ok { if !ok {

View File

@ -127,17 +127,6 @@ func NewMessenger(
} }
} }
onNewInstallationsHandler := func(installations []*multidevice.Installation) {
for _, installation := range installations {
if installation.Identity == contactIDFromPublicKey(&messenger.identity.PublicKey) {
if _, ok := messenger.allInstallations[installation.ID]; !ok {
messenger.allInstallations[installation.ID] = installation
messenger.modifiedInstallations[installation.ID] = true
}
}
}
}
// Set default config fields. // Set default config fields.
onNewSharedSecretHandler := func(secrets []*sharedsecret.Secret) { onNewSharedSecretHandler := func(secrets []*sharedsecret.Secret) {
filters, err := messenger.handleSharedSecrets(secrets) filters, err := messenger.handleSharedSecrets(secrets)
@ -232,7 +221,6 @@ func NewMessenger(
encryptionProtocol := encryption.New( encryptionProtocol := encryption.New(
database, database,
installationID, installationID,
onNewInstallationsHandler,
onNewSharedSecretHandler, onNewSharedSecretHandler,
c.onSendContactCodeHandler, c.onSendContactCodeHandler,
logger, logger,
@ -297,6 +285,7 @@ func NewMessenger(
shutdownTasks: []func() error{ shutdownTasks: []func() error{
database.Close, database.Close,
pushNotificationClient.Stop, pushNotificationClient.Stop,
encryptionProtocol.Stop,
transp.ResetFilters, transp.ResetFilters,
transp.Stop, transp.Stop,
func() error { processor.Stop(); return nil }, func() error { processor.Stop(); return nil },
@ -328,7 +317,39 @@ func (m *Messenger) Start() error {
} }
} }
return m.encryptor.Start(m.identity) subscriptions, err := m.encryptor.Start(m.identity)
if err != nil {
return err
}
m.handleEncryptionLayerSubscriptions(subscriptions)
return nil
}
func (m *Messenger) handleNewInstallations(installations []*multidevice.Installation) {
for _, installation := range installations {
if installation.Identity == contactIDFromPublicKey(&m.identity.PublicKey) {
if _, ok := m.allInstallations[installation.ID]; !ok {
m.allInstallations[installation.ID] = installation
m.modifiedInstallations[installation.ID] = true
}
}
}
}
func (m *Messenger) handleEncryptionLayerSubscriptions(subscriptions *encryption.Subscriptions) {
go func() {
for {
select {
case newInstallations := <-subscriptions.NewInstallations:
m.logger.Debug("handling new installations")
m.handleNewInstallations(newInstallations)
case <-subscriptions.Quit:
m.logger.Debug("quitting encryption subscription loop")
return
}
}
}()
} }
// Init analyzes chats and contacts in order to setup filters // Init analyzes chats and contacts in order to setup filters

View File

@ -50,6 +50,12 @@ func (s *MessengerInstallationSuite) SetupTest() {
s.m = s.newMessenger(s.shh) s.m = s.newMessenger(s.shh)
s.privateKey = s.m.identity s.privateKey = s.m.identity
// We start the messenger in order to receive installations
s.Require().NoError(s.m.Start())
}
func (s *MessengerInstallationSuite) TearDownTest() {
s.Require().NoError(s.m.Shutdown())
} }
func (s *MessengerInstallationSuite) newMessengerWithKey(shh types.Waku, privateKey *ecdsa.PrivateKey) *Messenger { func (s *MessengerInstallationSuite) newMessengerWithKey(shh types.Waku, privateKey *ecdsa.PrivateKey) *Messenger {