Support for basic push notification for mentions

This commit is contained in:
Andrea Maria Piana 2020-09-02 16:11:16 +02:00
parent 00c5b60d7f
commit 8ee3c75510
No known key found for this signature in database
GPG Key ID: AA6CCA6DE0E06424
10 changed files with 479 additions and 135 deletions

View File

@ -325,11 +325,13 @@ func (p *MessageProcessor) SendPublic(
}
var newMessage *types.NewMessage
messageSpec, err := p.protocol.BuildPublicMessage(p.identity, wrappedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap a public message in the encryption layer")
}
if !rawMessage.SkipEncryption {
messageSpec, err := p.protocol.BuildPublicMessage(p.identity, wrappedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap a public message in the encryption layer")
}
newMessage, err = MessageSpecToWhisper(messageSpec)
if err != nil {
return nil, err
@ -354,6 +356,13 @@ func (p *MessageProcessor) SendPublic(
return nil, err
}
sentMessage := &SentMessage{
Spec: messageSpec,
MessageIDs: [][]byte{messageID},
}
p.notifyOnSentMessage(sentMessage)
p.transport.Track([][]byte{messageID}, hash, newMessage)
return messageID, nil

View File

@ -227,7 +227,7 @@ func NewMessenger(
pushNotificationClientConfig.Logger = logger
pushNotificationClientConfig.InstallationID = installationID
pushNotificationClient := pushnotificationclient.New(pushNotificationClientPersistence, pushNotificationClientConfig, processor)
pushNotificationClient := pushnotificationclient.New(pushNotificationClientPersistence, pushNotificationClientConfig, processor, &sqlitePersistence{db: database})
handler := newMessageHandler(identity, logger, &sqlitePersistence{db: database})
@ -1490,8 +1490,7 @@ func (m *Messenger) reregisterForPushNotifications() error {
return nil
}
contactIDs, mutedChatIDs := m.addedContactsAndMutedChatIDs()
return m.pushNotificationClient.Reregister(contactIDs, mutedChatIDs)
return m.pushNotificationClient.Reregister(m.pushNotificationOptions())
}
func (m *Messenger) SaveContact(contact *Contact) error {
@ -1758,7 +1757,7 @@ func (m *Messenger) SendChatMessage(ctx context.Context, message *common.Message
id, err := m.dispatchMessage(ctx, common.RawMessage{
LocalChatID: chat.ID,
SendPushNotification: m.featureFlags.PushNotifications && !chat.Public(),
SendPushNotification: m.featureFlags.PushNotifications,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_CHAT_MESSAGE,
ResendAutomatically: true,
@ -3488,7 +3487,7 @@ func (m *Messenger) EnableSendingPushNotifications() error {
return nil
}
func (m *Messenger) addedContactsAndMutedChatIDs() ([]*ecdsa.PublicKey, []string) {
func (m *Messenger) pushNotificationOptions() *pushnotificationclient.RegistrationOptions {
var contactIDs []*ecdsa.PublicKey
var mutedChatIDs []string
@ -3510,7 +3509,10 @@ func (m *Messenger) addedContactsAndMutedChatIDs() ([]*ecdsa.PublicKey, []string
}
}
return contactIDs, mutedChatIDs
return &pushnotificationclient.RegistrationOptions{
ContactIDs: contactIDs,
MutedChatIDs: mutedChatIDs,
}
}
// RegisterForPushNotification register deviceToken with any push notification server enabled
@ -3521,8 +3523,7 @@ func (m *Messenger) RegisterForPushNotifications(ctx context.Context, deviceToke
m.mutex.Lock()
defer m.mutex.Unlock()
contactIDs, mutedChatIDs := m.addedContactsAndMutedChatIDs()
err := m.pushNotificationClient.Register(deviceToken, apnTopic, tokenType, contactIDs, mutedChatIDs)
err := m.pushNotificationClient.Register(deviceToken, apnTopic, tokenType, m.pushNotificationOptions())
if err != nil {
m.logger.Error("failed to register for push notifications", zap.Error(err))
return err
@ -3546,8 +3547,7 @@ func (m *Messenger) EnablePushNotificationsFromContactsOnly() error {
m.mutex.Lock()
defer m.mutex.Unlock()
contactIDs, mutedChatIDs := m.addedContactsAndMutedChatIDs()
return m.pushNotificationClient.EnablePushNotificationsFromContactsOnly(contactIDs, mutedChatIDs)
return m.pushNotificationClient.EnablePushNotificationsFromContactsOnly(m.pushNotificationOptions())
}
// DisablePushNotificationsFromContactsOnly is used to indicate that we want to received push notifications from anyone
@ -3558,8 +3558,7 @@ func (m *Messenger) DisablePushNotificationsFromContactsOnly() error {
m.mutex.Lock()
defer m.mutex.Unlock()
contactIDs, mutedChatIDs := m.addedContactsAndMutedChatIDs()
return m.pushNotificationClient.DisablePushNotificationsFromContactsOnly(contactIDs, mutedChatIDs)
return m.pushNotificationClient.DisablePushNotificationsFromContactsOnly(m.pushNotificationOptions())
}
// GetPushNotificationsServers returns the servers used for push notifications

View File

@ -853,3 +853,133 @@ func (s *MessengerPushNotificationSuite) TestContactCode() {
s.Require().NoError(alice.Shutdown())
s.Require().NoError(server.Shutdown())
}
func (s *MessengerPushNotificationSuite) TestReceivePushNotificationMention() {
bob := s.m
serverKey, err := crypto.GenerateKey()
s.Require().NoError(err)
server := s.newPushNotificationServer(s.shh, serverKey)
alice := s.newMessenger(s.shh)
// start alice and enable sending push notifications
s.Require().NoError(alice.Start())
s.Require().NoError(alice.EnableSendingPushNotifications())
bobInstallationIDs := []string{bob.installationID}
// Create public chat and join for both alice and bob
chat := CreatePublicChat("status", s.m.transport)
err = bob.SaveChat(&chat)
s.Require().NoError(err)
err = bob.Join(chat)
s.Require().NoError(err)
err = alice.SaveChat(&chat)
s.Require().NoError(err)
err = alice.Join(chat)
s.Require().NoError(err)
// Register bob
err = bob.AddPushNotificationsServer(context.Background(), &server.identity.PublicKey, pushnotificationclient.ServerTypeCustom)
s.Require().NoError(err)
err = bob.RegisterForPushNotifications(context.Background(), bob1DeviceToken, testAPNTopic, protobuf.PushNotificationRegistration_APN_TOKEN)
// Pull servers and check we registered
err = tt.RetryWithBackOff(func() error {
_, err = server.RetrieveAll()
if err != nil {
return err
}
_, err = bob.RetrieveAll()
if err != nil {
return err
}
registered, err := bob.RegisteredForPushNotifications()
if err != nil {
return err
}
if !registered {
return errors.New("not registered")
}
return nil
})
// Make sure we receive it
s.Require().NoError(err)
bobServers, err := bob.GetPushNotificationsServers()
s.Require().NoError(err)
inputMessage := buildTestMessage(chat)
// message contains a mention
inputMessage.Text = "Hey @" + types.EncodeHex(crypto.FromECDSAPub(&bob.identity.PublicKey))
response, err := alice.SendChatMessage(context.Background(), inputMessage)
s.Require().NoError(err)
messageIDString := response.Messages[0].ID
messageID, err := hex.DecodeString(messageIDString[2:])
s.Require().NoError(err)
var bobInfo []*pushnotificationclient.PushNotificationInfo
err = tt.RetryWithBackOff(func() error {
_, err = server.RetrieveAll()
if err != nil {
return err
}
_, err = alice.RetrieveAll()
if err != nil {
return err
}
bobInfo, err = alice.pushNotificationClient.GetPushNotificationInfo(&bob.identity.PublicKey, bobInstallationIDs)
if err != nil {
return err
}
// Check we have replies for bob
if len(bobInfo) != 1 {
return errors.New("info not fetched")
}
return nil
})
s.Require().NoError(err)
s.Require().NotEmpty(bobInfo)
s.Require().Equal(bob.installationID, bobInfo[0].InstallationID)
s.Require().Equal(bobServers[0].AccessToken, bobInfo[0].AccessToken)
s.Require().Equal(&bob.identity.PublicKey, bobInfo[0].PublicKey)
retrievedNotificationInfo, err := alice.pushNotificationClient.GetPushNotificationInfo(&bob.identity.PublicKey, bobInstallationIDs)
s.Require().NoError(err)
s.Require().NotNil(retrievedNotificationInfo)
s.Require().Len(retrievedNotificationInfo, 1)
var sentNotification *pushnotificationclient.SentNotification
err = tt.RetryWithBackOff(func() error {
_, err = server.RetrieveAll()
if err != nil {
return err
}
_, err = alice.RetrieveAll()
if err != nil {
return err
}
sentNotification, err = alice.pushNotificationClient.GetSentNotification(common.HashPublicKey(&bob.identity.PublicKey), bob.installationID, messageID)
if err != nil {
return err
}
if sentNotification == nil {
return errors.New("sent notification not found")
}
if !sentNotification.Success {
return errors.New("sent notification not successul")
}
return nil
})
s.Require().NoError(err)
s.Require().NoError(alice.Shutdown())
s.Require().NoError(server.Shutdown())
}

View File

@ -7,6 +7,7 @@ import (
"crypto/cipher"
"crypto/ecdsa"
"crypto/rand"
"database/sql"
"encoding/hex"
"encoding/json"
"errors"
@ -47,6 +48,8 @@ import (
const encryptedPayloadKeyLength = 16
const accessTokenKeyLength = 16
const staleQueryTimeInSeconds = 86400
const mentionInstallationID = "mention"
const oneToOneChatIDLength = 65
// maxRegistrationRetries is the maximum number of attempts we do before giving up registering with a server
const maxRegistrationRetries int64 = 12
@ -103,13 +106,21 @@ type PushNotificationInfo struct {
}
type SentNotification struct {
PublicKey *ecdsa.PublicKey
InstallationID string
LastTriedAt int64
RetryCount int64
MessageID []byte
Success bool
Error protobuf.PushNotificationReport_ErrorType
PublicKey *ecdsa.PublicKey
InstallationID string
LastTriedAt int64
RetryCount int64
MessageID []byte
ChatID string
NotificationType protobuf.PushNotification_PushNotificationType
Success bool
Error protobuf.PushNotificationReport_ErrorType
}
type RegistrationOptions struct {
PublicChatIDs []string
MutedChatIDs []string
ContactIDs []*ecdsa.PublicKey
}
func (s *SentNotification) HashedPublicKey() []byte {
@ -128,6 +139,9 @@ type Config struct {
// only from contacts
AllowFromContactsOnly bool
// BlockMentions indicates whether we should not receive notification for mentions
BlockMentions bool
// InstallationID is the installation-id for this device
InstallationID string
@ -138,8 +152,13 @@ type Config struct {
DefaultServers []*ecdsa.PublicKey
}
type MessagePersistence interface {
MessageByID(string) (*common.Message, error)
}
type Client struct {
persistence *Persistence
persistence *Persistence
messagePersistence MessagePersistence
config *Config
@ -176,17 +195,14 @@ type Client struct {
registrationSubscriptions []chan struct{}
}
type MessagePersistence interface {
MessageByID(string) (*common.Message, error)
}
func New(persistence *Persistence, config *Config, processor *common.MessageProcessor) *Client {
func New(persistence *Persistence, config *Config, processor *common.MessageProcessor, messagePersistence MessagePersistence) *Client {
return &Client{
quit: make(chan struct{}),
config: config,
messageProcessor: processor,
persistence: persistence,
reader: rand.Reader,
quit: make(chan struct{}),
config: config,
messageProcessor: processor,
messagePersistence: messagePersistence,
persistence: persistence,
reader: rand.Reader,
}
}
@ -202,8 +218,7 @@ func (c *Client) Start() error {
return err
}
c.subscribeForSentMessages()
c.subscribeForScheduledMessages()
c.subscribeForMessageEvents()
// We start even if push notifications are disabled, as we might
// actually be sending an unregister message
@ -303,7 +318,7 @@ func (c *Client) GetServers() ([]*PushNotificationServer, error) {
return c.persistence.GetServers()
}
func (c *Client) Reregister(contactIDs []*ecdsa.PublicKey, mutedChatIDs []string) error {
func (c *Client) Reregister(options *RegistrationOptions) error {
c.config.Logger.Debug("re-registering")
if len(c.deviceToken) == 0 {
c.config.Logger.Info("no device token, not registering")
@ -315,7 +330,7 @@ func (c *Client) Reregister(contactIDs []*ecdsa.PublicKey, mutedChatIDs []string
return nil
}
return c.Register(c.deviceToken, c.apnTopic, c.tokenType, contactIDs, mutedChatIDs)
return c.Register(c.deviceToken, c.apnTopic, c.tokenType, options)
}
// pickDefaultServesr picks n servers at random
@ -337,7 +352,7 @@ func (c *Client) pickDefaultServers(servers []*ecdsa.PublicKey) []*ecdsa.PublicK
}
// Register registers with all the servers
func (c *Client) Register(deviceToken, apnTopic string, tokenType protobuf.PushNotificationRegistration_TokenType, contactIDs []*ecdsa.PublicKey, mutedChatIDs []string) error {
func (c *Client) Register(deviceToken, apnTopic string, tokenType protobuf.PushNotificationRegistration_TokenType, options *RegistrationOptions) error {
// stop registration loop
c.stopRegistrationLoop()
@ -368,12 +383,12 @@ func (c *Client) Register(deviceToken, apnTopic string, tokenType protobuf.PushN
c.apnTopic = apnTopic
c.tokenType = tokenType
registration, err := c.buildPushNotificationRegistrationMessage(contactIDs, mutedChatIDs)
registration, err := c.buildPushNotificationRegistrationMessage(options)
if err != nil {
return err
}
err = c.saveLastPushNotificationRegistration(registration, contactIDs)
err = c.saveLastPushNotificationRegistration(registration, options.ContactIDs)
if err != nil {
return err
}
@ -592,22 +607,22 @@ func (c *Client) DisableSending() {
c.config.SendEnabled = false
}
func (c *Client) EnablePushNotificationsFromContactsOnly(contactIDs []*ecdsa.PublicKey, mutedChatIDs []string) error {
func (c *Client) EnablePushNotificationsFromContactsOnly(options *RegistrationOptions) error {
c.config.Logger.Debug("enabling push notification from contacts only")
c.config.AllowFromContactsOnly = true
if c.lastPushNotificationRegistration != nil && c.config.RemoteNotificationsEnabled {
c.config.Logger.Debug("re-registering after enabling push notifications from contacts only")
return c.Register(c.deviceToken, c.apnTopic, c.tokenType, contactIDs, mutedChatIDs)
return c.Register(c.deviceToken, c.apnTopic, c.tokenType, options)
}
return nil
}
func (c *Client) DisablePushNotificationsFromContactsOnly(contactIDs []*ecdsa.PublicKey, mutedChatIDs []string) error {
func (c *Client) DisablePushNotificationsFromContactsOnly(options *RegistrationOptions) error {
c.config.Logger.Debug("disabling push notification from contacts only")
c.config.AllowFromContactsOnly = false
if c.lastPushNotificationRegistration != nil && c.config.RemoteNotificationsEnabled {
c.config.Logger.Debug("re-registering after disabling push notifications from contacts only")
return c.Register(c.deviceToken, c.apnTopic, c.tokenType, contactIDs, mutedChatIDs)
return c.Register(c.deviceToken, c.apnTopic, c.tokenType, options)
}
return nil
}
@ -648,37 +663,21 @@ func (c *Client) generateSharedKey(publicKey *ecdsa.PublicKey) ([]byte, error) {
)
}
// subscribeForSentMessages subscribes for newly sent messages so we can check if we need to send a push notification
func (c *Client) subscribeForSentMessages() {
// subscribeForMessageEvents subscribes for newly sent/scheduled messages so we can check if we need to send a push notification
func (c *Client) subscribeForMessageEvents() {
go func() {
c.config.Logger.Debug("subscribing for sent messages")
subscription := c.messageProcessor.SubscribeToSentMessages()
c.config.Logger.Debug("subscribing for message events")
sentMessagesSubscription := c.messageProcessor.SubscribeToSentMessages()
scheduledMessagesSubscription := c.messageProcessor.SubscribeToScheduledMessages()
for {
select {
case m, more := <-subscription:
if !more {
c.config.Logger.Debug("no more sent messages, quitting")
return
}
c.config.Logger.Debug("handling message sent")
if err := c.handleMessageSent(m); err != nil {
c.config.Logger.Error("failed to handle message", zap.Error(err))
}
case <-c.quit:
return
}
}
}()
}
// subscribeForScheduledMessages subscribes for messages scheduler for dispatch
func (c *Client) subscribeForScheduledMessages() {
go func() {
c.config.Logger.Debug("subscribing for scheduled messages")
subscription := c.messageProcessor.SubscribeToScheduledMessages()
for {
select {
case m, more := <-subscription:
// order is important, since both are asynchronous, we want to process
// first scheduled messages, and after sent messages, otherwise we might
// have some race conditions.
// This does not completely rules them out, but reduced the window
// where it might happen, a single channel should be used
// if this actually happens.
case m, more := <-scheduledMessagesSubscription:
if !more {
c.config.Logger.Debug("no more scheduled messages, quitting")
return
@ -687,6 +686,16 @@ func (c *Client) subscribeForScheduledMessages() {
if err := c.handleMessageScheduled(m); err != nil {
c.config.Logger.Error("failed to handle message", zap.Error(err))
}
case m, more := <-sentMessagesSubscription:
if !more {
c.config.Logger.Debug("no more sent messages, quitting")
return
}
c.config.Logger.Debug("handling message sent")
if err := c.handleMessageSent(m); err != nil {
c.config.Logger.Error("failed to handle message", zap.Error(err))
}
case <-c.quit:
return
}
@ -789,10 +798,9 @@ func (c *Client) queryNotificationInfo(publicKey *ecdsa.PublicKey, force bool) e
return nil
}
// handleMessageSent is called every time a message is sent. It will check if
// we need to notify on the message, and if so it will try to dispatch a push notification
// messages might be batched, if coming from datasync for example.
// handleMessageSent is called every time a message is sent
func (c *Client) handleMessageSent(sentMessage *common.SentMessage) error {
c.config.Logger.Debug("sent messages", zap.Any("messageIDs", sentMessage.MessageIDs))
// Ignore if we are not sending notifications
@ -801,6 +809,109 @@ func (c *Client) handleMessageSent(sentMessage *common.SentMessage) error {
return nil
}
if sentMessage.PublicKey == nil {
return c.handlePublicMessageSent(sentMessage)
}
return c.handleDirectMessageSent(sentMessage)
}
// saving to the database might happen after we fetch the message, so we retry
// for a reasonable amount of time before giving up
func (c *Client) getMessage(messageID string) (*common.Message, error) {
retries := 0
for retries < 10 {
message, err := c.messagePersistence.MessageByID(messageID)
if err == sql.ErrNoRows {
retries++
time.Sleep(300 * time.Millisecond)
continue
} else if err != nil {
return nil, err
}
return message, nil
}
return nil, sql.ErrNoRows
}
// handlePublicMessageSent handles public messages, we notify only on mentions
func (c *Client) handlePublicMessageSent(sentMessage *common.SentMessage) error {
// We always expect a single message, as we never batch them
if len(sentMessage.MessageIDs) != 1 {
return errors.New("batched public messages not handled")
}
messageID := sentMessage.MessageIDs[0]
c.config.Logger.Debug("handling public messages", zap.Binary("messageID", messageID))
tracked, err := c.persistence.TrackedMessage(messageID)
if err != nil {
return err
}
if !tracked {
c.config.Logger.Debug("messageID not tracked, nothing to do", zap.Binary("messageID", messageID))
}
c.config.Logger.Debug("messageID tracked", zap.Binary("messageID", messageID))
message, err := c.getMessage(types.EncodeHex(messageID))
if err != nil {
c.config.Logger.Error("could not retrieve message", zap.Error(err))
}
// This might happen if the user deleted their messages for example
if message == nil {
c.config.Logger.Warn("message not retrieved")
return nil
}
c.config.Logger.Debug("message found", zap.Binary("messageID", messageID))
for _, pkString := range message.Mentions {
c.config.Logger.Debug("handling mention", zap.String("publickey", pkString))
pubkeyBytes, err := types.DecodeHex(pkString)
if err != nil {
return err
}
publicKey, err := crypto.UnmarshalPubkey(pubkeyBytes)
if err != nil {
return err
}
// we use a synthetic installationID for mentions, as all devices need to be notified
shouldNotify, err := c.shouldNotifyOn(publicKey, mentionInstallationID, messageID)
if err != nil {
return err
}
c.config.Logger.Debug("should no mention", zap.Any("publickey", shouldNotify))
// we send the notifications and return the info of the devices notified
infos, err := c.sendNotification(publicKey, nil, messageID, message.LocalChatID, protobuf.PushNotification_MENTION)
if err != nil {
return err
}
// mark message as sent so we don't notify again
for _, i := range infos {
c.config.Logger.Debug("marking as sent ", zap.Binary("mid", messageID), zap.String("id", i.InstallationID))
if err := c.notifiedOn(publicKey, i.InstallationID, messageID, message.LocalChatID, protobuf.PushNotification_MESSAGE); err != nil {
return err
}
}
}
return nil
}
// handleDirectMessageSent handles one to ones and private group chat messages
// It will check if we need to notify on the message, and if so it will try to
// dispatch a push notification messages might be batched, if coming
// from datasync for example.
func (c *Client) handleDirectMessageSent(sentMessage *common.SentMessage) error {
c.config.Logger.Debug("handling direct messages", zap.Any("messageIDs", sentMessage.MessageIDs))
publicKey := sentMessage.PublicKey
// Collect the messageIDs we want to notify on
@ -853,8 +964,27 @@ func (c *Client) handleMessageSent(sentMessage *common.SentMessage) error {
c.config.Logger.Debug("actionable messages", zap.Any("message-ids", trackedMessageIDs), zap.Any("installation-ids", installationIDs))
// Get message to check chatID. Again we use the first message for simplicity, but we should send one for each chatID. Messages though are very rarely batched.
message, err := c.getMessage(types.EncodeHex(trackedMessageIDs[0]))
if err != nil {
return err
}
// This is not the prettiest.
// because chatIDs are asymettric, we need to check if it's a one-to-one message or a group chat message.
// to do that we fingerprint the chatID.
// If it's a public key, we use our own public key as chatID, which correspond to the chatID used by the other peer
// otherwise we use the group chat ID
var chatID string
if len(chatID) == oneToOneChatIDLength {
chatID = types.EncodeHex(crypto.FromECDSAPub(&c.config.Identity.PublicKey))
} else {
// this is a group chat
chatID = message.ChatId
}
// we send the notifications and return the info of the devices notified
infos, err := c.sendNotification(publicKey, installationIDs, trackedMessageIDs[0])
infos, err := c.sendNotification(publicKey, installationIDs, trackedMessageIDs[0], chatID, protobuf.PushNotification_MESSAGE)
if err != nil {
return err
}
@ -864,7 +994,7 @@ func (c *Client) handleMessageSent(sentMessage *common.SentMessage) error {
for _, messageID := range trackedMessageIDs {
c.config.Logger.Debug("marking as sent ", zap.Binary("mid", messageID), zap.String("id", i.InstallationID))
if err := c.notifiedOn(publicKey, i.InstallationID, messageID); err != nil {
if err := c.notifiedOn(publicKey, i.InstallationID, messageID, chatID, protobuf.PushNotification_MESSAGE); err != nil {
return err
}
@ -894,17 +1024,19 @@ func (c *Client) shouldNotifyOn(publicKey *ecdsa.PublicKey, installationID strin
return c.persistence.ShouldSendNotificationFor(publicKey, installationID, messageID)
}
// notifiedOn marks a combination of publickey/installationid/messageID as notified
func (c *Client) notifiedOn(publicKey *ecdsa.PublicKey, installationID string, messageID []byte) error {
// notifiedOn marks a combination of publickey/installationid/messageID/chatID/type as notified
func (c *Client) notifiedOn(publicKey *ecdsa.PublicKey, installationID string, messageID []byte, chatID string, notificationType protobuf.PushNotification_PushNotificationType) error {
return c.persistence.UpsertSentNotification(&SentNotification{
PublicKey: publicKey,
LastTriedAt: time.Now().Unix(),
InstallationID: installationID,
MessageID: messageID,
PublicKey: publicKey,
LastTriedAt: time.Now().Unix(),
InstallationID: installationID,
MessageID: messageID,
ChatID: chatID,
NotificationType: notificationType,
})
}
func (c *Client) mutedChatIDsHashes(chatIDs []string) [][]byte {
func (c *Client) chatIDsHashes(chatIDs []string) [][]byte {
var mutedChatListHashes [][]byte
for _, chatID := range chatIDs {
@ -983,26 +1115,27 @@ func (c *Client) getVersion() uint64 {
return c.lastPushNotificationRegistration.Version + 1
}
func (c *Client) buildPushNotificationRegistrationMessage(contactIDs []*ecdsa.PublicKey, mutedChatIDs []string) (*protobuf.PushNotificationRegistration, error) {
token := c.getToken(contactIDs)
allowedKeyList, err := c.allowedKeyList([]byte(token), contactIDs)
func (c *Client) buildPushNotificationRegistrationMessage(options *RegistrationOptions) (*protobuf.PushNotificationRegistration, error) {
token := c.getToken(options.ContactIDs)
allowedKeyList, err := c.allowedKeyList([]byte(token), options.ContactIDs)
if err != nil {
return nil, err
}
options := &protobuf.PushNotificationRegistration{
AccessToken: token,
TokenType: c.tokenType,
ApnTopic: c.apnTopic,
Version: c.getVersion(),
InstallationId: c.config.InstallationID,
DeviceToken: c.deviceToken,
AllowFromContactsOnly: c.config.AllowFromContactsOnly,
Enabled: c.config.RemoteNotificationsEnabled,
BlockedChatList: c.mutedChatIDsHashes(mutedChatIDs),
AllowedKeyList: allowedKeyList,
}
return options, nil
return &protobuf.PushNotificationRegistration{
AccessToken: token,
TokenType: c.tokenType,
ApnTopic: c.apnTopic,
Version: c.getVersion(),
InstallationId: c.config.InstallationID,
DeviceToken: c.deviceToken,
AllowFromContactsOnly: c.config.AllowFromContactsOnly,
Enabled: c.config.RemoteNotificationsEnabled,
BlockedChatList: c.chatIDsHashes(options.MutedChatIDs),
BlockMentions: c.config.BlockMentions,
AllowedMentionsChatList: c.chatIDsHashes(options.PublicChatIDs),
AllowedKeyList: allowedKeyList,
}, nil
}
func (c *Client) buildPushNotificationUnregisterMessage() *protobuf.PushNotificationRegistration {
@ -1132,7 +1265,8 @@ func (c *Client) registerWithServer(registration *protobuf.PushNotificationRegis
// sendNotification sends an actual notification to the push notification server.
// the notification is sent using an ephemeral key to shield the real identity of the sender
func (c *Client) sendNotification(publicKey *ecdsa.PublicKey, installationIDs []string, messageID []byte) ([]*PushNotificationInfo, error) {
func (c *Client) sendNotification(publicKey *ecdsa.PublicKey, installationIDs []string, messageID []byte, chatID string, notificationType protobuf.PushNotification_PushNotificationType) ([]*PushNotificationInfo, error) {
// get latest push notification infos
err := c.queryNotificationInfo(publicKey, false)
if err != nil {
@ -1191,12 +1325,11 @@ func (c *Client) sendNotification(publicKey *ecdsa.PublicKey, installationIDs []
for _, infos := range actionableInfos {
var pushNotifications []*protobuf.PushNotification
for _, i := range infos {
// TODO: Add group chat ChatID
pushNotifications = append(pushNotifications, &protobuf.PushNotification{
Type: protobuf.PushNotification_MESSAGE,
Type: notificationType,
// For now we set the ChatID to our own identity key, this will work fine for blocked users
// and muted 1-to-1 chats, but not for group chats.
ChatId: common.Shake256([]byte(types.EncodeHex(crypto.FromECDSAPub(&c.config.Identity.PublicKey)))),
ChatId: common.Shake256([]byte(chatID)),
AccessToken: i.AccessToken,
PublicKey: common.HashPublicKey(publicKey),
InstallationId: i.InstallationID,
@ -1255,7 +1388,7 @@ func (c *Client) resendNotification(pn *SentNotification) error {
return err
}
_, err = c.sendNotification(pn.PublicKey, []string{pn.InstallationID}, pn.MessageID)
_, err = c.sendNotification(pn.PublicKey, []string{pn.InstallationID}, pn.MessageID, pn.ChatID, pn.NotificationType)
return err
}
@ -1308,6 +1441,9 @@ func (c *Client) resendingLoop() error {
// registrationLoop is a loop that is running when we need to register with a push notification server, it only runs when needed, it will quit if no work is necessary.
func (c *Client) registrationLoop() error {
if c.lastPushNotificationRegistration == nil {
return nil
}
for {
c.config.Logger.Debug("running registration loop")
servers, err := c.persistence.GetServers()

View File

@ -58,7 +58,7 @@ func (s *ClientSuite) SetupTest() {
InstallationID: s.installationID,
}
s.client = New(s.persistence, config, nil)
s.client = New(s.persistence, config, nil, nil)
}
func (s *ClientSuite) TestBuildPushNotificationRegisterMessage() {
@ -74,6 +74,11 @@ func (s *ClientSuite) TestBuildPushNotificationRegisterMessage() {
s.Require().NoError(err)
contactIDs := []*ecdsa.PublicKey{&contactKey.PublicKey}
options := &RegistrationOptions{
ContactIDs: contactIDs,
MutedChatIDs: mutedChatList,
}
// Set random generator for uuid
var seed int64 = 1
uuid.SetRand(rand.New(rand.NewSource(seed)))
@ -88,7 +93,7 @@ func (s *ClientSuite) TestBuildPushNotificationRegisterMessage() {
// Set reader
s.client.reader = bytes.NewReader([]byte(expectedUUID))
options := &protobuf.PushNotificationRegistration{
registration := &protobuf.PushNotificationRegistration{
Version: 1,
AccessToken: expectedUUID,
DeviceToken: testDeviceToken,
@ -97,24 +102,36 @@ func (s *ClientSuite) TestBuildPushNotificationRegisterMessage() {
BlockedChatList: mutedChatListHashes,
}
actualMessage, err := s.client.buildPushNotificationRegistrationMessage(contactIDs, mutedChatList)
actualMessage, err := s.client.buildPushNotificationRegistrationMessage(options)
s.Require().NoError(err)
s.Require().Equal(options, actualMessage)
s.Require().Equal(registration, actualMessage)
}
func (s *ClientSuite) TestBuildPushNotificationRegisterMessageAllowFromContactsOnly() {
mutedChatList := []string{"a", "b"}
publicChatList := []string{"c", "d"}
// build chat lish hashes
// build muted chat lish hashes
var mutedChatListHashes [][]byte
for _, chatID := range mutedChatList {
mutedChatListHashes = append(mutedChatListHashes, common.Shake256([]byte(chatID)))
}
// build public chat lish hashes
var publicChatListHashes [][]byte
for _, chatID := range publicChatList {
publicChatListHashes = append(publicChatListHashes, common.Shake256([]byte(chatID)))
}
contactKey, err := crypto.GenerateKey()
s.Require().NoError(err)
contactIDs := []*ecdsa.PublicKey{&contactKey.PublicKey}
options := &RegistrationOptions{
ContactIDs: contactIDs,
MutedChatIDs: mutedChatList,
PublicChatIDs: publicChatList,
}
// Set random generator for uuid
var seed int64 = 1
@ -144,21 +161,22 @@ func (s *ClientSuite) TestBuildPushNotificationRegisterMessageAllowFromContactsO
// Set reader
s.client.reader = bytes.NewReader([]byte(expectedUUID))
options := &protobuf.PushNotificationRegistration{
Version: 1,
AccessToken: expectedUUID,
DeviceToken: testDeviceToken,
InstallationId: s.installationID,
AllowFromContactsOnly: true,
Enabled: true,
BlockedChatList: mutedChatListHashes,
AllowedKeyList: [][]byte{encryptedToken},
registration := &protobuf.PushNotificationRegistration{
Version: 1,
AccessToken: expectedUUID,
DeviceToken: testDeviceToken,
InstallationId: s.installationID,
AllowFromContactsOnly: true,
Enabled: true,
BlockedChatList: mutedChatListHashes,
AllowedKeyList: [][]byte{encryptedToken},
AllowedMentionsChatList: publicChatListHashes,
}
actualMessage, err := s.client.buildPushNotificationRegistrationMessage(contactIDs, mutedChatList)
actualMessage, err := s.client.buildPushNotificationRegistrationMessage(options)
s.Require().NoError(err)
s.Require().Equal(options, actualMessage)
s.Require().Equal(registration, actualMessage)
}
func (s *ClientSuite) TestHandleMessageScheduled() {
@ -183,7 +201,7 @@ func (s *ClientSuite) TestHandleMessageScheduled() {
s.Require().True(response)
// Save notification
s.Require().NoError(s.client.notifiedOn(&key1.PublicKey, installationID1, messageID))
s.Require().NoError(s.client.notifiedOn(&key1.PublicKey, installationID1, messageID, chatID, protobuf.PushNotification_MESSAGE))
// Second time, should not notify
response, err = s.client.shouldNotifyOn(&key1.PublicKey, installationID1, messageID)

View File

@ -4,6 +4,8 @@
// 1593601729_initial_schema.up.sql (1.773kB)
// 1597909626_add_server_type.down.sql (0)
// 1597909626_add_server_type.up.sql (145B)
// 1599053776_add_chat_id_and_type.down.sql (0)
// 1599053776_add_chat_id_and_type.up.sql (264B)
// doc.go (382B)
package migrations
@ -128,7 +130,7 @@ func _1597909626_add_server_typeDownSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "1597909626_add_server_type.down.sql", size: 0, mode: os.FileMode(0644), modTime: time.Unix(1597909727, 0)}
info := bindataFileInfo{name: "1597909626_add_server_type.down.sql", size: 0, mode: os.FileMode(0644), modTime: time.Unix(1598949727, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14, 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24, 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c, 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55}}
return a, nil
}
@ -148,11 +150,51 @@ func _1597909626_add_server_typeUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "1597909626_add_server_type.up.sql", size: 145, mode: os.FileMode(0644), modTime: time.Unix(1597909704, 0)}
info := bindataFileInfo{name: "1597909626_add_server_type.up.sql", size: 145, mode: os.FileMode(0644), modTime: time.Unix(1598949727, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xc8, 0x3f, 0xe0, 0xe7, 0x57, 0x0, 0x5d, 0x60, 0xf3, 0x55, 0x64, 0x71, 0x80, 0x3c, 0xca, 0x8, 0x61, 0xb5, 0x3c, 0xe, 0xa1, 0xe4, 0x61, 0xd1, 0x4e, 0xd8, 0xb2, 0x55, 0xdd, 0x87, 0x62, 0x9b}}
return a, nil
}
var __1599053776_add_chat_id_and_typeDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x01\x00\x00\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00")
func _1599053776_add_chat_id_and_typeDownSqlBytes() ([]byte, error) {
return bindataRead(
__1599053776_add_chat_id_and_typeDownSql,
"1599053776_add_chat_id_and_type.down.sql",
)
}
func _1599053776_add_chat_id_and_typeDownSql() (*asset, error) {
bytes, err := _1599053776_add_chat_id_and_typeDownSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "1599053776_add_chat_id_and_type.down.sql", size: 0, mode: os.FileMode(0644), modTime: time.Unix(1599053859, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14, 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24, 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c, 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55}}
return a, nil
}
var __1599053776_add_chat_id_and_typeUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\xf4\x09\x71\x0d\x52\x08\x71\x74\xf2\x71\x55\x28\x28\x2d\xce\x88\xcf\xcb\x2f\xc9\x4c\xcb\x4c\x4e\x2c\xc9\xcc\xcf\x8b\x4f\xce\xc9\x4c\xcd\x2b\x89\x2f\x06\x11\xc8\x12\xc5\x0a\x8e\x2e\x2e\x0a\xce\xfe\x3e\xa1\xbe\x7e\x0a\xc9\x19\x89\x25\xf1\x99\x29\x0a\x21\xae\x11\x21\xd6\x5c\x54\x30\x10\x45\x47\x49\x65\x41\xaa\x82\xa7\x5f\x88\x35\x17\x57\x68\x80\x8b\x63\x08\x69\xa6\x06\xbb\x86\xc0\xdd\x67\xab\xa0\xa4\xa4\x83\xc5\x70\x5b\x05\x43\x6b\x2e\x40\x00\x00\x00\xff\xff\x22\xaf\x2b\x87\x08\x01\x00\x00")
func _1599053776_add_chat_id_and_typeUpSqlBytes() ([]byte, error) {
return bindataRead(
__1599053776_add_chat_id_and_typeUpSql,
"1599053776_add_chat_id_and_type.up.sql",
)
}
func _1599053776_add_chat_id_and_typeUpSql() (*asset, error) {
bytes, err := _1599053776_add_chat_id_and_typeUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "1599053776_add_chat_id_and_type.up.sql", size: 264, mode: os.FileMode(0644), modTime: time.Unix(1599053853, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xea, 0x7a, 0xf9, 0xc4, 0xa2, 0x96, 0x2e, 0xf9, 0x8f, 0x7, 0xf1, 0x1e, 0x73, 0x8a, 0xa6, 0x3a, 0x13, 0x4, 0x73, 0x82, 0x83, 0xb, 0xe3, 0xb5, 0x3b, 0x7e, 0xd, 0x23, 0xce, 0x98, 0xd4, 0xdc}}
return a, nil
}
var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x84\x8f\x3d\x6e\xec\x30\x0c\x84\x7b\x9d\x62\xb0\xcd\x36\xcf\x52\xf3\xaa\x74\x29\xd3\xe7\x02\x5c\x89\x96\x88\xb5\x24\x43\xa4\xf7\xe7\xf6\x81\x37\x01\xe2\x2e\xed\x87\xf9\x86\xc3\x10\xf0\x59\x44\x31\xcb\xc2\x10\x45\xe3\xc8\xaa\x34\x9e\xb8\x70\xa4\x4d\x19\xa7\x2c\x56\xb6\x8b\x8f\xbd\x06\x35\xb2\x4d\x27\xa9\xa1\x4a\x1e\x64\x1c\x6e\xff\x4f\x2e\x04\x44\x6a\x67\x43\xa1\x96\x16\x7e\x75\x29\xd4\x68\x98\xb4\x8c\xbb\x58\x01\x61\x1d\x3c\xcb\xc3\xe3\xdd\xb0\x30\xa9\xc1\x0a\xd9\x59\x61\x85\x11\x49\x79\xaf\x99\xfb\x40\xee\xd3\x45\x5a\x22\x23\xbf\xa3\x8f\xf9\x40\xf6\x85\x91\x96\x85\x13\xe6\xd1\xeb\xcb\x55\xaa\x8c\x24\x83\xa3\xf5\xf1\xfc\x07\x52\x65\x43\xa3\xca\xba\xfb\x85\x6e\x8c\xd6\x7f\xce\x83\x5a\xfa\xfb\x23\xdc\xfb\xb8\x2a\x48\xc1\x8f\x95\xa3\x71\xf2\xce\xad\x14\xaf\x94\x19\xdf\x39\xe9\x4d\x9d\x0b\x21\xf7\xb7\xcc\x8d\x77\xf3\xb8\x73\x5a\xaf\xf9\x90\xc4\xd4\xe1\x7d\xf8\x05\x3e\x77\xf8\xe0\xbe\x02\x00\x00\xff\xff\x4d\x1d\x5d\x50\x7e\x01\x00\x00")
func docGoBytes() ([]byte, error) {
@ -272,6 +314,10 @@ var _bindata = map[string]func() (*asset, error){
"1597909626_add_server_type.up.sql": _1597909626_add_server_typeUpSql,
"1599053776_add_chat_id_and_type.down.sql": _1599053776_add_chat_id_and_typeDownSql,
"1599053776_add_chat_id_and_type.up.sql": _1599053776_add_chat_id_and_typeUpSql,
"doc.go": docGo,
}
@ -316,11 +362,13 @@ type bintree struct {
}
var _bintree = &bintree{nil, map[string]*bintree{
"1593601729_initial_schema.down.sql": &bintree{_1593601729_initial_schemaDownSql, map[string]*bintree{}},
"1593601729_initial_schema.up.sql": &bintree{_1593601729_initial_schemaUpSql, map[string]*bintree{}},
"1597909626_add_server_type.down.sql": &bintree{_1597909626_add_server_typeDownSql, map[string]*bintree{}},
"1597909626_add_server_type.up.sql": &bintree{_1597909626_add_server_typeUpSql, map[string]*bintree{}},
"doc.go": &bintree{docGo, map[string]*bintree{}},
"1593601729_initial_schema.down.sql": &bintree{_1593601729_initial_schemaDownSql, map[string]*bintree{}},
"1593601729_initial_schema.up.sql": &bintree{_1593601729_initial_schemaUpSql, map[string]*bintree{}},
"1597909626_add_server_type.down.sql": &bintree{_1597909626_add_server_typeDownSql, map[string]*bintree{}},
"1597909626_add_server_type.up.sql": &bintree{_1597909626_add_server_typeUpSql, map[string]*bintree{}},
"1599053776_add_chat_id_and_type.down.sql": &bintree{_1599053776_add_chat_id_and_typeDownSql, map[string]*bintree{}},
"1599053776_add_chat_id_and_type.up.sql": &bintree{_1599053776_add_chat_id_and_typeUpSql, map[string]*bintree{}},
"doc.go": &bintree{docGo, map[string]*bintree{}},
}}
// RestoreAsset restores an asset under the given directory.

View File

@ -0,0 +1,4 @@
ALTER TABLE push_notification_client_sent_notifications ADD COLUMN chat_id TEXT;
ALTER TABLE push_notification_client_sent_notifications ADD COLUMN notification_type INT;
UPDATE push_notification_client_sent_notifications SET chat_id = "", notification_type = 1;

View File

@ -277,7 +277,7 @@ func (p *Persistence) ShouldSendNotificationToAllInstallationIDs(publicKey *ecds
}
func (p *Persistence) UpsertSentNotification(n *SentNotification) error {
_, err := p.db.Exec(`INSERT INTO push_notification_client_sent_notifications (public_key, installation_id, message_id, last_tried_at, retry_count, success, error, hashed_public_key) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, crypto.CompressPubkey(n.PublicKey), n.InstallationID, n.MessageID, n.LastTriedAt, n.RetryCount, n.Success, n.Error, n.HashedPublicKey())
_, err := p.db.Exec(`INSERT INTO push_notification_client_sent_notifications (public_key, installation_id, message_id, last_tried_at, retry_count, success, error, hashed_public_key,chat_id, notification_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, crypto.CompressPubkey(n.PublicKey), n.InstallationID, n.MessageID, n.LastTriedAt, n.RetryCount, n.Success, n.Error, n.HashedPublicKey(), n.ChatID, n.NotificationType)
return err
}
@ -287,7 +287,7 @@ func (p *Persistence) GetSentNotification(hashedPublicKey []byte, installationID
InstallationID: installationID,
MessageID: messageID,
}
err := p.db.QueryRow(`SELECT retry_count, last_tried_at, error, success, public_key FROM push_notification_client_sent_notifications WHERE hashed_public_key = ?`, hashedPublicKey).Scan(&sentNotification.RetryCount, &sentNotification.LastTriedAt, &sentNotification.Error, &sentNotification.Success, &publicKeyBytes)
err := p.db.QueryRow(`SELECT retry_count, last_tried_at, error, success, public_key,chat_id,notification_type FROM push_notification_client_sent_notifications WHERE hashed_public_key = ?`, hashedPublicKey).Scan(&sentNotification.RetryCount, &sentNotification.LastTriedAt, &sentNotification.Error, &sentNotification.Success, &publicKeyBytes, &sentNotification.ChatID, &sentNotification.NotificationType)
if err != nil {
return nil, err
}
@ -309,7 +309,7 @@ func (p *Persistence) UpdateNotificationResponse(messageID []byte, response *pro
func (p *Persistence) GetRetriablePushNotifications() ([]*SentNotification, error) {
var notifications []*SentNotification
rows, err := p.db.Query(`SELECT retry_count, last_tried_at, error, success, public_key, installation_id, message_id FROM push_notification_client_sent_notifications WHERE NOT success AND error = ?`, protobuf.PushNotificationReport_WRONG_TOKEN)
rows, err := p.db.Query(`SELECT retry_count, last_tried_at, error, success, public_key, installation_id, message_id,chat_id, notification_type FROM push_notification_client_sent_notifications WHERE NOT success AND error = ?`, protobuf.PushNotificationReport_WRONG_TOKEN)
if err != nil {
return nil, err
}
@ -318,7 +318,7 @@ func (p *Persistence) GetRetriablePushNotifications() ([]*SentNotification, erro
for rows.Next() {
var publicKeyBytes []byte
notification := &SentNotification{}
err = rows.Scan(&notification.RetryCount, &notification.LastTriedAt, &notification.Error, &notification.Success, &publicKeyBytes, &notification.InstallationID, &notification.MessageID)
err = rows.Scan(&notification.RetryCount, &notification.LastTriedAt, &notification.Error, &notification.Success, &publicKeyBytes, &notification.InstallationID, &notification.MessageID, &notification.ChatID, &notification.NotificationType)
if err != nil {
return nil, err
}

View File

@ -365,7 +365,7 @@ func (s *Server) buildPushNotificationRequestResponseAndSendNotification(request
InstallationId: pn.InstallationId,
}
if pn.Type != protobuf.PushNotification_MESSAGE {
if pn.Type == protobuf.PushNotification_UNKNOWN_PUSH_NOTIFICATION_TYPE {
s.config.Logger.Warn("unhandled type")
continue
}