From ac6aff2486b5bc8e20138106db1f6399b40d02b0 Mon Sep 17 00:00:00 2001 From: Andrea Maria Piana Date: Tue, 18 Aug 2020 17:07:48 +0200 Subject: [PATCH] Re-issue bundle on changed parameters --- protocol/messenger.go | 95 +++++++++++++++++------ protocol/push_notification_test.go | 51 ++++++++++++ protocol/pushnotificationclient/client.go | 50 +++++++++++- 3 files changed, 168 insertions(+), 28 deletions(-) diff --git a/protocol/messenger.go b/protocol/messenger.go index c944206f0..013e32cc7 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -279,6 +279,8 @@ func (m *Messenger) Start() error { // Start push notification client if m.pushNotificationClient != nil { + m.handlePushNotificationClientRegistrations(m.pushNotificationClient.SubscribeToRegistrations()) + if err := m.pushNotificationClient.Start(); err != nil { return err } @@ -302,36 +304,49 @@ func (m *Messenger) Start() error { return nil } +func (m *Messenger) buildContactCodeAdvertisement() (*protobuf.ContactCodeAdvertisement, error) { + if m.pushNotificationClient == nil || !m.pushNotificationClient.Enabled() { + return nil, nil + } + m.logger.Debug("adding push notification info to contact code bundle") + info, err := m.pushNotificationClient.MyPushNotificationQueryInfo() + if err != nil { + return nil, err + } + if len(info) == 0 { + return nil, nil + } + return &protobuf.ContactCodeAdvertisement{ + PushNotificationInfo: info, + }, nil +} + // handleSendContactCode sends a public message wrapped in the encryption // layer, which will propagate our bundle func (m *Messenger) handleSendContactCode() error { var payload []byte - if m.pushNotificationClient != nil && m.pushNotificationClient.Enabled() { - info, err := m.pushNotificationClient.MyPushNotificationQueryInfo() + m.logger.Debug("sending contact code") + contactCodeAdvertisement, err := m.buildContactCodeAdvertisement() + if err != nil { + m.logger.Error("could not build contact code advertisement", zap.Error(err)) + } + + if contactCodeAdvertisement != nil { + payload, err = proto.Marshal(contactCodeAdvertisement) if err != nil { return err } - if len(info) != 0 { - contactCode := &protobuf.ContactCodeAdvertisement{ - PushNotificationInfo: info, - } - - payload, err = proto.Marshal(contactCode) - if err != nil { - return err - } - - } } contactCodeTopic := transport.ContactCodeTopic(&m.identity.PublicKey) rawMessage := common.RawMessage{ LocalChatID: contactCodeTopic, + MessageType: protobuf.ApplicationMetadataMessage_CONTACT_CODE_ADVERTISEMENT, Payload: payload, } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - _, err := m.processor.SendPublic(ctx, contactCodeTopic, rawMessage) + _, err = m.processor.SendPublic(ctx, contactCodeTopic, rawMessage) if err != nil { m.logger.Warn("failed to send a contact code", zap.Error(err)) } @@ -391,6 +406,22 @@ func (m *Messenger) handleEncryptionLayerSubscriptions(subscriptions *encryption }() } +// handlePushNotificationClientRegistration handles registration events +func (m *Messenger) handlePushNotificationClientRegistrations(c chan struct{}) { + go func() { + for { + _, more := <-c + if !more { + return + } + if err := m.handleSendContactCode(); err != nil { + m.logger.Error("failed to publish contact code", zap.Error(err)) + } + + } + }() +} + // Init analyzes chats and contacts in order to setup filters // which are responsible for retrieving messages. func (m *Messenger) Init() error { @@ -1192,7 +1223,7 @@ func (m *Messenger) saveContact(contact *Contact) error { } // We check if it should re-register with the push notification server - shouldReregisterForPushNotifications := m.pushNotificationClient != nil && (m.isNewContact(contact) || m.removedContact(contact)) + shouldReregisterForPushNotifications := (m.isNewContact(contact) || m.removedContact(contact)) err = m.persistence.SaveContact(contact, nil) if err != nil { @@ -1203,17 +1234,22 @@ func (m *Messenger) saveContact(contact *Contact) error { // Reregister only when data has changed if shouldReregisterForPushNotifications { - m.logger.Info("contact state changed, re-registering for push notification") - contactIDs, mutedChatIDs := m.addedContactsAndMutedChatIDs() - err := m.pushNotificationClient.Reregister(contactIDs, mutedChatIDs) - if err != nil { - return err - } + return m.reregisterForPushNotifications() } return nil } +func (m *Messenger) reregisterForPushNotifications() error { + m.logger.Info("contact state changed, re-registering for push notification") + if m.pushNotificationClient == nil { + return nil + } + + contactIDs, mutedChatIDs := m.addedContactsAndMutedChatIDs() + return m.pushNotificationClient.Reregister(contactIDs, mutedChatIDs) +} + func (m *Messenger) SaveContact(contact *Contact) error { m.mutex.Lock() defer m.mutex.Unlock() @@ -1875,6 +1911,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte logger.Warn("failed to check message exists", zap.Error(err)) } if exists { + logger.Debug("messageExists", zap.String("messageID", messageID)) continue } @@ -2051,7 +2088,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte if m.pushNotificationClient == nil { continue } - logger.Debug("Handling PushNotificationRegistrationResponse") + logger.Debug("Handling ContactCodeAdvertisement") if err := m.pushNotificationClient.HandleContactCodeAdvertisement(publicKey, msg.ParsedMessage.Interface().(protobuf.ContactCodeAdvertisement)); err != nil { logger.Warn("failed to handle ContactCodeAdvertisement", zap.Error(err)) } @@ -2119,6 +2156,8 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte logger.Debug("message not handled", zap.Any("messageType", reflect.TypeOf(msg.ParsedMessage.Interface()))) } + } else { + logger.Debug("parsed message is nil") } } } @@ -2310,7 +2349,8 @@ func (m *Messenger) MuteChat(chatID string) error { chat.Muted = true m.allChats[chat.ID] = chat - return nil + + return m.reregisterForPushNotifications() } // UnmuteChat signals to the messenger that we want to be notified @@ -2330,7 +2370,7 @@ func (m *Messenger) UnmuteChat(chatID string) error { chat.Muted = false m.allChats[chat.ID] = chat - return nil + return m.reregisterForPushNotifications() } func (m *Messenger) UpdateMessageOutgoingStatus(id, newOutgoingStatus string) error { @@ -3213,7 +3253,12 @@ func (m *Messenger) RegisterForPushNotifications(ctx context.Context, deviceToke defer m.mutex.Unlock() contactIDs, mutedChatIDs := m.addedContactsAndMutedChatIDs() - return m.pushNotificationClient.Register(deviceToken, apnTopic, tokenType, contactIDs, mutedChatIDs) + err := m.pushNotificationClient.Register(deviceToken, apnTopic, tokenType, contactIDs, mutedChatIDs) + if err != nil { + m.logger.Error("failed to register for push notifications", zap.Error(err)) + return err + } + return nil } // RegisteredForPushNotifications returns whether we successfully registered with all the servers diff --git a/protocol/push_notification_test.go b/protocol/push_notification_test.go index a03d1d9fa..2aa92791a 100644 --- a/protocol/push_notification_test.go +++ b/protocol/push_notification_test.go @@ -801,3 +801,54 @@ func (s *MessengerPushNotificationSuite) TestActAsYourOwnPushNotificationServer( s.Require().NoError(bob2.Shutdown()) s.Require().NoError(alice.Shutdown()) } + +func (s *MessengerPushNotificationSuite) TestContactCode() { + + bob1 := s.m + + serverKey, err := crypto.GenerateKey() + s.Require().NoError(err) + server := s.newPushNotificationServer(s.shh, serverKey) + + alice := s.newMessenger(s.shh) + // start alice and enable sending push notifications + s.Require().NoError(alice.Start()) + s.Require().NoError(alice.EnableSendingPushNotifications()) + + // Register bob1 + err = bob1.AddPushNotificationsServer(context.Background(), &server.identity.PublicKey) + s.Require().NoError(err) + + err = bob1.RegisterForPushNotifications(context.Background(), bob1DeviceToken, testAPNTopic, protobuf.PushNotificationRegistration_APN_TOKEN) + + // Pull servers and check we registered + err = tt.RetryWithBackOff(func() error { + _, err = server.RetrieveAll() + if err != nil { + return err + } + _, err = bob1.RetrieveAll() + if err != nil { + return err + } + registered, err := bob1.RegisteredForPushNotifications() + if err != nil { + return err + } + if !registered { + return errors.New("not registered") + } + return nil + }) + // Make sure we receive it + s.Require().NoError(err) + + contactCodeAdvertisement, err := bob1.buildContactCodeAdvertisement() + s.Require().NoError(err) + s.Require().NotNil(contactCodeAdvertisement) + + s.Require().NoError(alice.pushNotificationClient.HandleContactCodeAdvertisement(&bob1.identity.PublicKey, *contactCodeAdvertisement)) + + s.Require().NoError(alice.Shutdown()) + s.Require().NoError(server.Shutdown()) +} diff --git a/protocol/pushnotificationclient/client.go b/protocol/pushnotificationclient/client.go index 9cd3a949a..cc5fc0e97 100644 --- a/protocol/pushnotificationclient/client.go +++ b/protocol/pushnotificationclient/client.go @@ -156,6 +156,9 @@ type Client struct { resendingLoopQuitChan chan struct{} quit chan struct{} + + // registrationSubscriptions is a list of chan of client subscribed to the registration event + registrationSubscriptions []chan struct{} } func New(persistence *Persistence, config *Config, processor *common.MessageProcessor) *Client { @@ -190,10 +193,28 @@ func (c *Client) Start() error { return nil } +func (c *Client) publishOnRegistrationSubscriptions() { + // Publish on channels, drop if buffer is full + for _, s := range c.registrationSubscriptions { + select { + case s <- struct{}{}: + default: + c.config.Logger.Warn("subscription channel full, dropping message") + } + } +} + +func (c *Client) quitRegistrationSubscriptions() { + for _, s := range c.registrationSubscriptions { + close(s) + } +} + func (c *Client) Stop() error { close(c.quit) c.stopRegistrationLoop() c.stopResendingLoop() + c.quitRegistrationSubscriptions() return nil } @@ -237,6 +258,12 @@ func (c *Client) Registered() (bool, error) { return true, nil } +func (c *Client) SubscribeToRegistrations() chan struct{} { + s := make(chan struct{}, 100) + c.registrationSubscriptions = append(c.registrationSubscriptions, s) + return s +} + func (c *Client) GetSentNotification(hashedPublicKey []byte, installationID string, messageID []byte) (*SentNotification, error) { return c.persistence.GetSentNotification(hashedPublicKey, installationID, messageID) } @@ -315,7 +342,13 @@ func (c *Client) HandlePushNotificationRegistrationResponse(publicKey *ecdsa.Pub server.Registered = true server.RegisteredAt = time.Now().Unix() - return c.persistence.UpsertServer(server) + err = c.persistence.UpsertServer(server) + if err != nil { + return err + } + c.publishOnRegistrationSubscriptions() + + return nil } // processQueryInfo takes info about push notifications and validates them @@ -397,11 +430,17 @@ func (c *Client) HandlePushNotificationQueryResponse(serverPublicKey *ecdsa.Publ // HandleContactCodeAdvertisement checks if there are any info and process them func (c *Client) HandleContactCodeAdvertisement(clientPublicKey *ecdsa.PublicKey, message protobuf.ContactCodeAdvertisement) error { + // nothing to do for our own pubkey + if common.IsPubKeyEqual(clientPublicKey, &c.config.Identity.PublicKey) { + return nil + } + c.config.Logger.Debug("received contact code advertisement", zap.Any("advertisement", message)) for _, info := range message.PushNotificationInfo { c.config.Logger.Debug("handling push notification query info") - serverPublicKey, err := crypto.UnmarshalPubkey(info.ServerPublicKey) + serverPublicKey, err := crypto.DecompressPubkey(info.ServerPublicKey) if err != nil { + c.config.Logger.Error("could not unmarshal server pubkey", zap.Binary("server-key", info.ServerPublicKey)) return err } err = c.processQueryInfo(clientPublicKey, serverPublicKey, info) @@ -410,7 +449,12 @@ func (c *Client) HandleContactCodeAdvertisement(clientPublicKey *ecdsa.PublicKey } } - return nil + // Save query so that we won't query again to early + // NOTE: this is not very accurate as we might fetch an historical message, + // prolonging the time that we fetch new info. + // Most of the times it should work fine, as if the info are stale they'd be + // fetched again because of an error response from the push notification server + return c.persistence.SavePushNotificationQuery(clientPublicKey, []byte(uuid.New().String())) } // HandlePushNotificationResponse should set the request as processed