Re-issue bundle on changed parameters

This commit is contained in:
Andrea Maria Piana 2020-08-18 17:07:48 +02:00
parent 4cf491ae38
commit ac6aff2486
No known key found for this signature in database
GPG Key ID: AA6CCA6DE0E06424
3 changed files with 168 additions and 28 deletions

View File

@ -279,6 +279,8 @@ func (m *Messenger) Start() error {
// Start push notification client // Start push notification client
if m.pushNotificationClient != nil { if m.pushNotificationClient != nil {
m.handlePushNotificationClientRegistrations(m.pushNotificationClient.SubscribeToRegistrations())
if err := m.pushNotificationClient.Start(); err != nil { if err := m.pushNotificationClient.Start(); err != nil {
return err return err
} }
@ -302,36 +304,49 @@ func (m *Messenger) Start() error {
return nil return nil
} }
func (m *Messenger) buildContactCodeAdvertisement() (*protobuf.ContactCodeAdvertisement, error) {
if m.pushNotificationClient == nil || !m.pushNotificationClient.Enabled() {
return nil, nil
}
m.logger.Debug("adding push notification info to contact code bundle")
info, err := m.pushNotificationClient.MyPushNotificationQueryInfo()
if err != nil {
return nil, err
}
if len(info) == 0 {
return nil, nil
}
return &protobuf.ContactCodeAdvertisement{
PushNotificationInfo: info,
}, nil
}
// handleSendContactCode sends a public message wrapped in the encryption // handleSendContactCode sends a public message wrapped in the encryption
// layer, which will propagate our bundle // layer, which will propagate our bundle
func (m *Messenger) handleSendContactCode() error { func (m *Messenger) handleSendContactCode() error {
var payload []byte var payload []byte
if m.pushNotificationClient != nil && m.pushNotificationClient.Enabled() { m.logger.Debug("sending contact code")
info, err := m.pushNotificationClient.MyPushNotificationQueryInfo() contactCodeAdvertisement, err := m.buildContactCodeAdvertisement()
if err != nil { if err != nil {
return err m.logger.Error("could not build contact code advertisement", zap.Error(err))
}
if len(info) != 0 {
contactCode := &protobuf.ContactCodeAdvertisement{
PushNotificationInfo: info,
} }
payload, err = proto.Marshal(contactCode) if contactCodeAdvertisement != nil {
payload, err = proto.Marshal(contactCodeAdvertisement)
if err != nil { if err != nil {
return err return err
} }
}
} }
contactCodeTopic := transport.ContactCodeTopic(&m.identity.PublicKey) contactCodeTopic := transport.ContactCodeTopic(&m.identity.PublicKey)
rawMessage := common.RawMessage{ rawMessage := common.RawMessage{
LocalChatID: contactCodeTopic, LocalChatID: contactCodeTopic,
MessageType: protobuf.ApplicationMetadataMessage_CONTACT_CODE_ADVERTISEMENT,
Payload: payload, Payload: payload,
} }
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
_, err := m.processor.SendPublic(ctx, contactCodeTopic, rawMessage) _, err = m.processor.SendPublic(ctx, contactCodeTopic, rawMessage)
if err != nil { if err != nil {
m.logger.Warn("failed to send a contact code", zap.Error(err)) m.logger.Warn("failed to send a contact code", zap.Error(err))
} }
@ -391,6 +406,22 @@ func (m *Messenger) handleEncryptionLayerSubscriptions(subscriptions *encryption
}() }()
} }
// handlePushNotificationClientRegistration handles registration events
func (m *Messenger) handlePushNotificationClientRegistrations(c chan struct{}) {
go func() {
for {
_, more := <-c
if !more {
return
}
if err := m.handleSendContactCode(); err != nil {
m.logger.Error("failed to publish contact code", zap.Error(err))
}
}
}()
}
// Init analyzes chats and contacts in order to setup filters // Init analyzes chats and contacts in order to setup filters
// which are responsible for retrieving messages. // which are responsible for retrieving messages.
func (m *Messenger) Init() error { func (m *Messenger) Init() error {
@ -1192,7 +1223,7 @@ func (m *Messenger) saveContact(contact *Contact) error {
} }
// We check if it should re-register with the push notification server // We check if it should re-register with the push notification server
shouldReregisterForPushNotifications := m.pushNotificationClient != nil && (m.isNewContact(contact) || m.removedContact(contact)) shouldReregisterForPushNotifications := (m.isNewContact(contact) || m.removedContact(contact))
err = m.persistence.SaveContact(contact, nil) err = m.persistence.SaveContact(contact, nil)
if err != nil { if err != nil {
@ -1203,17 +1234,22 @@ func (m *Messenger) saveContact(contact *Contact) error {
// Reregister only when data has changed // Reregister only when data has changed
if shouldReregisterForPushNotifications { if shouldReregisterForPushNotifications {
m.logger.Info("contact state changed, re-registering for push notification") return m.reregisterForPushNotifications()
contactIDs, mutedChatIDs := m.addedContactsAndMutedChatIDs()
err := m.pushNotificationClient.Reregister(contactIDs, mutedChatIDs)
if err != nil {
return err
}
} }
return nil return nil
} }
func (m *Messenger) reregisterForPushNotifications() error {
m.logger.Info("contact state changed, re-registering for push notification")
if m.pushNotificationClient == nil {
return nil
}
contactIDs, mutedChatIDs := m.addedContactsAndMutedChatIDs()
return m.pushNotificationClient.Reregister(contactIDs, mutedChatIDs)
}
func (m *Messenger) SaveContact(contact *Contact) error { func (m *Messenger) SaveContact(contact *Contact) error {
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
@ -1875,6 +1911,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
logger.Warn("failed to check message exists", zap.Error(err)) logger.Warn("failed to check message exists", zap.Error(err))
} }
if exists { if exists {
logger.Debug("messageExists", zap.String("messageID", messageID))
continue continue
} }
@ -2051,7 +2088,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
if m.pushNotificationClient == nil { if m.pushNotificationClient == nil {
continue continue
} }
logger.Debug("Handling PushNotificationRegistrationResponse") logger.Debug("Handling ContactCodeAdvertisement")
if err := m.pushNotificationClient.HandleContactCodeAdvertisement(publicKey, msg.ParsedMessage.Interface().(protobuf.ContactCodeAdvertisement)); err != nil { if err := m.pushNotificationClient.HandleContactCodeAdvertisement(publicKey, msg.ParsedMessage.Interface().(protobuf.ContactCodeAdvertisement)); err != nil {
logger.Warn("failed to handle ContactCodeAdvertisement", zap.Error(err)) logger.Warn("failed to handle ContactCodeAdvertisement", zap.Error(err))
} }
@ -2119,6 +2156,8 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
logger.Debug("message not handled", zap.Any("messageType", reflect.TypeOf(msg.ParsedMessage.Interface()))) logger.Debug("message not handled", zap.Any("messageType", reflect.TypeOf(msg.ParsedMessage.Interface())))
} }
} else {
logger.Debug("parsed message is nil")
} }
} }
} }
@ -2310,7 +2349,8 @@ func (m *Messenger) MuteChat(chatID string) error {
chat.Muted = true chat.Muted = true
m.allChats[chat.ID] = chat m.allChats[chat.ID] = chat
return nil
return m.reregisterForPushNotifications()
} }
// UnmuteChat signals to the messenger that we want to be notified // UnmuteChat signals to the messenger that we want to be notified
@ -2330,7 +2370,7 @@ func (m *Messenger) UnmuteChat(chatID string) error {
chat.Muted = false chat.Muted = false
m.allChats[chat.ID] = chat m.allChats[chat.ID] = chat
return nil return m.reregisterForPushNotifications()
} }
func (m *Messenger) UpdateMessageOutgoingStatus(id, newOutgoingStatus string) error { func (m *Messenger) UpdateMessageOutgoingStatus(id, newOutgoingStatus string) error {
@ -3213,7 +3253,12 @@ func (m *Messenger) RegisterForPushNotifications(ctx context.Context, deviceToke
defer m.mutex.Unlock() defer m.mutex.Unlock()
contactIDs, mutedChatIDs := m.addedContactsAndMutedChatIDs() contactIDs, mutedChatIDs := m.addedContactsAndMutedChatIDs()
return m.pushNotificationClient.Register(deviceToken, apnTopic, tokenType, contactIDs, mutedChatIDs) err := m.pushNotificationClient.Register(deviceToken, apnTopic, tokenType, contactIDs, mutedChatIDs)
if err != nil {
m.logger.Error("failed to register for push notifications", zap.Error(err))
return err
}
return nil
} }
// RegisteredForPushNotifications returns whether we successfully registered with all the servers // RegisteredForPushNotifications returns whether we successfully registered with all the servers

View File

@ -801,3 +801,54 @@ func (s *MessengerPushNotificationSuite) TestActAsYourOwnPushNotificationServer(
s.Require().NoError(bob2.Shutdown()) s.Require().NoError(bob2.Shutdown())
s.Require().NoError(alice.Shutdown()) s.Require().NoError(alice.Shutdown())
} }
func (s *MessengerPushNotificationSuite) TestContactCode() {
bob1 := s.m
serverKey, err := crypto.GenerateKey()
s.Require().NoError(err)
server := s.newPushNotificationServer(s.shh, serverKey)
alice := s.newMessenger(s.shh)
// start alice and enable sending push notifications
s.Require().NoError(alice.Start())
s.Require().NoError(alice.EnableSendingPushNotifications())
// Register bob1
err = bob1.AddPushNotificationsServer(context.Background(), &server.identity.PublicKey)
s.Require().NoError(err)
err = bob1.RegisterForPushNotifications(context.Background(), bob1DeviceToken, testAPNTopic, protobuf.PushNotificationRegistration_APN_TOKEN)
// Pull servers and check we registered
err = tt.RetryWithBackOff(func() error {
_, err = server.RetrieveAll()
if err != nil {
return err
}
_, err = bob1.RetrieveAll()
if err != nil {
return err
}
registered, err := bob1.RegisteredForPushNotifications()
if err != nil {
return err
}
if !registered {
return errors.New("not registered")
}
return nil
})
// Make sure we receive it
s.Require().NoError(err)
contactCodeAdvertisement, err := bob1.buildContactCodeAdvertisement()
s.Require().NoError(err)
s.Require().NotNil(contactCodeAdvertisement)
s.Require().NoError(alice.pushNotificationClient.HandleContactCodeAdvertisement(&bob1.identity.PublicKey, *contactCodeAdvertisement))
s.Require().NoError(alice.Shutdown())
s.Require().NoError(server.Shutdown())
}

View File

@ -156,6 +156,9 @@ type Client struct {
resendingLoopQuitChan chan struct{} resendingLoopQuitChan chan struct{}
quit chan struct{} quit chan struct{}
// registrationSubscriptions is a list of chan of client subscribed to the registration event
registrationSubscriptions []chan struct{}
} }
func New(persistence *Persistence, config *Config, processor *common.MessageProcessor) *Client { func New(persistence *Persistence, config *Config, processor *common.MessageProcessor) *Client {
@ -190,10 +193,28 @@ func (c *Client) Start() error {
return nil return nil
} }
func (c *Client) publishOnRegistrationSubscriptions() {
// Publish on channels, drop if buffer is full
for _, s := range c.registrationSubscriptions {
select {
case s <- struct{}{}:
default:
c.config.Logger.Warn("subscription channel full, dropping message")
}
}
}
func (c *Client) quitRegistrationSubscriptions() {
for _, s := range c.registrationSubscriptions {
close(s)
}
}
func (c *Client) Stop() error { func (c *Client) Stop() error {
close(c.quit) close(c.quit)
c.stopRegistrationLoop() c.stopRegistrationLoop()
c.stopResendingLoop() c.stopResendingLoop()
c.quitRegistrationSubscriptions()
return nil return nil
} }
@ -237,6 +258,12 @@ func (c *Client) Registered() (bool, error) {
return true, nil return true, nil
} }
func (c *Client) SubscribeToRegistrations() chan struct{} {
s := make(chan struct{}, 100)
c.registrationSubscriptions = append(c.registrationSubscriptions, s)
return s
}
func (c *Client) GetSentNotification(hashedPublicKey []byte, installationID string, messageID []byte) (*SentNotification, error) { func (c *Client) GetSentNotification(hashedPublicKey []byte, installationID string, messageID []byte) (*SentNotification, error) {
return c.persistence.GetSentNotification(hashedPublicKey, installationID, messageID) return c.persistence.GetSentNotification(hashedPublicKey, installationID, messageID)
} }
@ -315,7 +342,13 @@ func (c *Client) HandlePushNotificationRegistrationResponse(publicKey *ecdsa.Pub
server.Registered = true server.Registered = true
server.RegisteredAt = time.Now().Unix() server.RegisteredAt = time.Now().Unix()
return c.persistence.UpsertServer(server) err = c.persistence.UpsertServer(server)
if err != nil {
return err
}
c.publishOnRegistrationSubscriptions()
return nil
} }
// processQueryInfo takes info about push notifications and validates them // processQueryInfo takes info about push notifications and validates them
@ -397,11 +430,17 @@ func (c *Client) HandlePushNotificationQueryResponse(serverPublicKey *ecdsa.Publ
// HandleContactCodeAdvertisement checks if there are any info and process them // HandleContactCodeAdvertisement checks if there are any info and process them
func (c *Client) HandleContactCodeAdvertisement(clientPublicKey *ecdsa.PublicKey, message protobuf.ContactCodeAdvertisement) error { func (c *Client) HandleContactCodeAdvertisement(clientPublicKey *ecdsa.PublicKey, message protobuf.ContactCodeAdvertisement) error {
// nothing to do for our own pubkey
if common.IsPubKeyEqual(clientPublicKey, &c.config.Identity.PublicKey) {
return nil
}
c.config.Logger.Debug("received contact code advertisement", zap.Any("advertisement", message)) c.config.Logger.Debug("received contact code advertisement", zap.Any("advertisement", message))
for _, info := range message.PushNotificationInfo { for _, info := range message.PushNotificationInfo {
c.config.Logger.Debug("handling push notification query info") c.config.Logger.Debug("handling push notification query info")
serverPublicKey, err := crypto.UnmarshalPubkey(info.ServerPublicKey) serverPublicKey, err := crypto.DecompressPubkey(info.ServerPublicKey)
if err != nil { if err != nil {
c.config.Logger.Error("could not unmarshal server pubkey", zap.Binary("server-key", info.ServerPublicKey))
return err return err
} }
err = c.processQueryInfo(clientPublicKey, serverPublicKey, info) err = c.processQueryInfo(clientPublicKey, serverPublicKey, info)
@ -410,7 +449,12 @@ func (c *Client) HandleContactCodeAdvertisement(clientPublicKey *ecdsa.PublicKey
} }
} }
return nil // Save query so that we won't query again to early
// NOTE: this is not very accurate as we might fetch an historical message,
// prolonging the time that we fetch new info.
// Most of the times it should work fine, as if the info are stale they'd be
// fetched again because of an error response from the push notification server
return c.persistence.SavePushNotificationQuery(clientPublicKey, []byte(uuid.New().String()))
} }
// HandlePushNotificationResponse should set the request as processed // HandlePushNotificationResponse should set the request as processed