Handle messages sent to all devices

This commit is contained in:
Andrea Maria Piana 2020-07-14 16:07:19 +02:00
parent 7e8d1353d0
commit c88df6c1c7
No known key found for this signature in database
GPG Key ID: AA6CCA6DE0E06424
6 changed files with 240 additions and 46 deletions

View File

@ -3,6 +3,7 @@ package protocol
import ( import (
"context" "context"
"crypto/ecdsa" "crypto/ecdsa"
"database/sql"
"github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/common"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
@ -74,6 +75,7 @@ type Messenger struct {
modifiedInstallations map[string]bool modifiedInstallations map[string]bool
installationID string installationID string
mailserver []byte mailserver []byte
database *sql.DB
mutex sync.Mutex mutex sync.Mutex
} }
@ -248,6 +250,7 @@ func NewMessenger(
var pushNotificationServer *push_notification_server.Server var pushNotificationServer *push_notification_server.Server
if c.pushNotificationServerConfig != nil { if c.pushNotificationServerConfig != nil {
c.pushNotificationServerConfig.Identity = identity
pushNotificationServerPersistence := push_notification_server.NewSQLitePersistence(database) pushNotificationServerPersistence := push_notification_server.NewSQLitePersistence(database)
pushNotificationServer = push_notification_server.New(c.pushNotificationServerConfig, pushNotificationServerPersistence, processor) pushNotificationServer = push_notification_server.New(c.pushNotificationServerConfig, pushNotificationServerPersistence, processor)
} }
@ -285,6 +288,7 @@ func NewMessenger(
modifiedInstallations: make(map[string]bool), modifiedInstallations: make(map[string]bool),
messagesPersistenceEnabled: c.messagesPersistenceEnabled, messagesPersistenceEnabled: c.messagesPersistenceEnabled,
verifyTransactionClient: c.verifyTransactionClient, verifyTransactionClient: c.verifyTransactionClient,
database: database,
shutdownTasks: []func() error{ shutdownTasks: []func() error{
database.Close, database.Close,
pushNotificationClient.Stop, pushNotificationClient.Stop,
@ -304,6 +308,7 @@ func NewMessenger(
} }
func (m *Messenger) Start() error { func (m *Messenger) Start() error {
m.logger.Info("starting messenger", zap.String("identity", types.EncodeHex(crypto.FromECDSAPub(&m.identity.PublicKey))))
// Start push notification server // Start push notification server
if m.pushNotificationServer != nil { if m.pushNotificationServer != nil {
if err := m.pushNotificationServer.Start(); err != nil { if err := m.pushNotificationServer.Start(); err != nil {
@ -3065,6 +3070,24 @@ func (m *Messenger) RegisterForPushNotifications(ctx context.Context, deviceToke
return m.pushNotificationClient.Register(deviceToken, contactIDs, mutedChatIDs) return m.pushNotificationClient.Register(deviceToken, contactIDs, mutedChatIDs)
} }
func (m *Messenger) StartPushNotificationServer() error {
if m.pushNotificationServer == nil {
pushNotificationServerPersistence := push_notification_server.NewSQLitePersistence(m.database)
config := &push_notification_server.Config{
Logger: m.logger,
Identity: m.identity,
}
m.pushNotificationServer = push_notification_server.New(config, pushNotificationServerPersistence, m.processor)
}
return m.pushNotificationServer.Start()
}
func (m *Messenger) StopPushNotificationServer() error {
m.pushNotificationServer = nil
return nil
}
func generateAliasAndIdenticon(pk string) (string, string, error) { func generateAliasAndIdenticon(pk string) (string, string, error) {
identicon, err := identicon.GenerateBase64(pk) identicon, err := identicon.GenerateBase64(pk)
if err != nil { if err != nil {

View File

@ -1,17 +1,17 @@
package push_notification_client package push_notification_client
import ( import (
"bytes"
"context" "context"
"crypto/aes" "crypto/aes"
"crypto/cipher" "crypto/cipher"
"sort"
"bytes"
"crypto/ecdsa" "crypto/ecdsa"
"crypto/rand" "crypto/rand"
"encoding/hex" "encoding/hex"
"encoding/json"
"errors" "errors"
"io" "io"
"sort"
"time" "time"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
@ -19,6 +19,7 @@ import (
"github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/crypto/ecies" "github.com/status-im/status-go/eth-node/crypto/ecies"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/protobuf"
"go.uber.org/zap" "go.uber.org/zap"
@ -29,10 +30,23 @@ const accessTokenKeyLength = 16
const staleQueryTimeInSeconds = 86400 const staleQueryTimeInSeconds = 86400
type PushNotificationServer struct { type PushNotificationServer struct {
PublicKey *ecdsa.PublicKey PublicKey *ecdsa.PublicKey `json:"-"`
Registered bool Registered bool `json:"registered,omitempty"`
RegisteredAt int64 RegisteredAt int64 `json:"registeredAt,omitempty"`
AccessToken string AccessToken string `json:"accessToken,omitempty"`
}
func (s *PushNotificationServer) MarshalJSON() ([]byte, error) {
type ServerAlias PushNotificationServer
item := struct {
*ServerAlias
PublicKeyString string `json:"publicKey"`
}{
ServerAlias: (*ServerAlias)(s),
PublicKeyString: types.EncodeHex(crypto.FromECDSAPub(s.PublicKey)),
}
return json.Marshal(item)
} }
type PushNotificationInfo struct { type PushNotificationInfo struct {
@ -104,7 +118,12 @@ func (c *Client) Start() error {
subscription := c.messageProcessor.Subscribe() subscription := c.messageProcessor.Subscribe()
for { for {
select { select {
case m := <-subscription: case m, more := <-subscription:
if !more {
c.config.Logger.Info("no more")
return
}
c.config.Logger.Info("handling message sent")
if err := c.HandleMessageSent(m); err != nil { if err := c.HandleMessageSent(m); err != nil {
c.config.Logger.Error("failed to handle message", zap.Error(err)) c.config.Logger.Error("failed to handle message", zap.Error(err))
} }
@ -121,28 +140,46 @@ func (c *Client) Stop() error {
return nil return nil
} }
type notificationSendingSpec struct {
serverPublicKey *ecdsa.PublicKey
installationID string
messageID []byte
}
// The message has been sent // The message has been sent
// We should: // We should:
// 1) Check whether we should notify on anything // 1) Check whether we should notify on anything
// 2) Refresh info if necessaary // 2) Refresh info if necessaary
// 3) Sent push notifications // 3) Sent push notifications
// TODO: handle DH messages
func (c *Client) HandleMessageSent(sentMessage *common.SentMessage) error { func (c *Client) HandleMessageSent(sentMessage *common.SentMessage) error {
c.config.Logger.Info("sent message", zap.Any("sent message", sentMessage))
if !c.config.SendEnabled { if !c.config.SendEnabled {
c.config.Logger.Info("send not enabled, ignoring")
return nil return nil
} }
publicKey := sentMessage.PublicKey publicKey := sentMessage.PublicKey
// Check we track this messages fist
var trackedMessageIDs [][]byte
for _, messageID := range sentMessage.MessageIDs {
tracked, err := c.persistence.TrackedMessage(messageID)
if err != nil {
return err
}
if tracked {
trackedMessageIDs = append(trackedMessageIDs, messageID)
}
}
// Nothing to do
if len(trackedMessageIDs) == 0 {
return nil
}
sendToAllDevices := len(sentMessage.Spec.Installations) == 0
var installationIDs []string var installationIDs []string
var notificationSpecs []*notificationSendingSpec anyActionableMessage := sendToAllDevices
c.config.Logger.Info("send to all devices", zap.Bool("send to all", sendToAllDevices))
//Find if there's any actionable message // Collect installationIDs
for _, messageID := range sentMessage.MessageIDs { for _, messageID := range trackedMessageIDs {
for _, installation := range sentMessage.Spec.Installations { for _, installation := range sentMessage.Spec.Installations {
installationID := installation.ID installationID := installation.ID
shouldNotify, err := c.shouldNotifyOn(publicKey, installationID, messageID) shouldNotify, err := c.shouldNotifyOn(publicKey, installationID, messageID)
@ -150,20 +187,20 @@ func (c *Client) HandleMessageSent(sentMessage *common.SentMessage) error {
return err return err
} }
if shouldNotify { if shouldNotify {
notificationSpecs = append(notificationSpecs, &notificationSendingSpec{ anyActionableMessage = true
installationID: installationID,
messageID: messageID,
})
installationIDs = append(installationIDs, installation.ID) installationIDs = append(installationIDs, installation.ID)
} }
} }
} }
// Is there anything we should be notifying on? // Is there anything we should be notifying on?
if len(installationIDs) == 0 { if !anyActionableMessage {
c.config.Logger.Info("no actionable installation IDs")
return nil return nil
} }
c.config.Logger.Info("actionable messages", zap.Any("message-ids", trackedMessageIDs), zap.Any("installation-ids", installationIDs))
// Check if we queried recently // Check if we queried recently
queriedAt, err := c.persistence.GetQueriedAt(publicKey) queriedAt, err := c.persistence.GetQueriedAt(publicKey)
if err != nil { if err != nil {
@ -173,8 +210,10 @@ func (c *Client) HandleMessageSent(sentMessage *common.SentMessage) error {
// Naively query again if too much time has passed. // Naively query again if too much time has passed.
// Here it might not be necessary // Here it might not be necessary
if time.Now().Unix()-queriedAt > staleQueryTimeInSeconds { if time.Now().Unix()-queriedAt > staleQueryTimeInSeconds {
c.config.Logger.Info("querying info")
err := c.QueryPushNotificationInfo(publicKey) err := c.QueryPushNotificationInfo(publicKey)
if err != nil { if err != nil {
c.config.Logger.Error("could not query pn info", zap.Error(err))
return err return err
} }
// This is just horrible, but for now will do, // This is just horrible, but for now will do,
@ -188,9 +227,12 @@ func (c *Client) HandleMessageSent(sentMessage *common.SentMessage) error {
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
} }
c.config.Logger.Info("queried info")
// Retrieve infos // Retrieve infos
info, err := c.GetPushNotificationInfo(publicKey, installationIDs) info, err := c.GetPushNotificationInfo(publicKey, installationIDs)
if err != nil { if err != nil {
c.config.Logger.Error("could not get pn info", zap.Error(err))
return err return err
} }
@ -203,10 +245,14 @@ func (c *Client) HandleMessageSent(sentMessage *common.SentMessage) error {
return info[i].ServerPublicKey.X.Cmp(info[j].ServerPublicKey.X) <= 0 return info[i].ServerPublicKey.X.Cmp(info[j].ServerPublicKey.X) <= 0
}) })
c.config.Logger.Info("retrieved info")
installationIDsMap := make(map[string]bool) installationIDsMap := make(map[string]bool)
// One info per installation id, grouped by server // One info per installation id, grouped by server
actionableInfos := make(map[string][]*PushNotificationInfo) actionableInfos := make(map[string][]*PushNotificationInfo)
for _, i := range info { for _, i := range info {
c.config.Logger.Info("queried info", zap.String("id", i.InstallationID))
if !installationIDsMap[i.InstallationID] { if !installationIDsMap[i.InstallationID] {
serverKey := hex.EncodeToString(crypto.CompressPubkey(i.ServerPublicKey)) serverKey := hex.EncodeToString(crypto.CompressPubkey(i.ServerPublicKey))
actionableInfos[serverKey] = append(actionableInfos[serverKey], i) actionableInfos[serverKey] = append(actionableInfos[serverKey], i)
@ -215,6 +261,8 @@ func (c *Client) HandleMessageSent(sentMessage *common.SentMessage) error {
} }
c.config.Logger.Info("actionable info", zap.Int("count", len(actionableInfos)))
for _, infos := range actionableInfos { for _, infos := range actionableInfos {
var pushNotifications []*protobuf.PushNotification var pushNotifications []*protobuf.PushNotification
for _, i := range infos { for _, i := range infos {
@ -227,7 +275,7 @@ func (c *Client) HandleMessageSent(sentMessage *common.SentMessage) error {
} }
request := &protobuf.PushNotificationRequest{ request := &protobuf.PushNotificationRequest{
MessageId: sentMessage.MessageIDs[0], MessageId: trackedMessageIDs[0],
Requests: pushNotifications, Requests: pushNotifications,
} }
serverPublicKey := infos[0].ServerPublicKey serverPublicKey := infos[0].ServerPublicKey
@ -248,6 +296,20 @@ func (c *Client) HandleMessageSent(sentMessage *common.SentMessage) error {
if err != nil { if err != nil {
return err return err
} }
// Mark message as sent, this is at-most-once semantic
// for all messageIDs
for _, i := range infos {
for _, messageID := range trackedMessageIDs {
c.config.Logger.Info("marking as sent ", zap.Binary("mid", messageID), zap.String("id", i.InstallationID))
if err := c.notifiedOn(publicKey, i.InstallationID, messageID); err != nil {
return err
}
}
}
} }
return nil return nil
@ -259,7 +321,11 @@ func (c *Client) NotifyOnMessageID(chatID string, messageID []byte) error {
} }
func (c *Client) shouldNotifyOn(publicKey *ecdsa.PublicKey, installationID string, messageID []byte) (bool, error) { func (c *Client) shouldNotifyOn(publicKey *ecdsa.PublicKey, installationID string, messageID []byte) (bool, error) {
return c.persistence.ShouldSentNotificationFor(publicKey, installationID, messageID) if len(installationID) == 0 {
return c.persistence.ShouldSendNotificationToAllInstallationIDs(publicKey, messageID)
} else {
return c.persistence.ShouldSendNotificationFor(publicKey, installationID, messageID)
}
} }
func (c *Client) notifiedOn(publicKey *ecdsa.PublicKey, installationID string, messageID []byte) error { func (c *Client) notifiedOn(publicKey *ecdsa.PublicKey, installationID string, messageID []byte) error {
@ -395,7 +461,7 @@ func (c *Client) Register(deviceToken string, contactIDs []*ecdsa.PublicKey, mut
case <-c.quit: case <-c.quit:
return servers, nil return servers, nil
case <-ctx.Done(): case <-ctx.Done():
c.config.Logger.Debug("Context done") c.config.Logger.Info("Context done")
return servers, nil return servers, nil
case <-time.After(200 * time.Millisecond): case <-time.After(200 * time.Millisecond):
servers, err = c.persistence.GetServersByPublicKey(serverPublicKeys) servers, err = c.persistence.GetServersByPublicKey(serverPublicKeys)
@ -422,7 +488,7 @@ func (c *Client) Register(deviceToken string, contactIDs []*ecdsa.PublicKey, mut
// HandlePushNotificationRegistrationResponse should check whether the response was successful or not, retry if necessary otherwise store the result in the database // HandlePushNotificationRegistrationResponse should check whether the response was successful or not, retry if necessary otherwise store the result in the database
func (c *Client) HandlePushNotificationRegistrationResponse(publicKey *ecdsa.PublicKey, response protobuf.PushNotificationRegistrationResponse) error { func (c *Client) HandlePushNotificationRegistrationResponse(publicKey *ecdsa.PublicKey, response protobuf.PushNotificationRegistrationResponse) error {
c.config.Logger.Debug("received push notification registration response", zap.Any("response", response)) c.config.Logger.Info("received push notification registration response", zap.Any("response", response))
// TODO: handle non successful response and match request id // TODO: handle non successful response and match request id
// Not successful ignore for now // Not successful ignore for now
if !response.Success { if !response.Success {
@ -487,7 +553,7 @@ func (c *Client) handleGrant(clientPublicKey *ecdsa.PublicKey, serverPublicKey *
// HandlePushNotificationQueryResponse should update the data in the database for a given user // HandlePushNotificationQueryResponse should update the data in the database for a given user
func (c *Client) HandlePushNotificationQueryResponse(serverPublicKey *ecdsa.PublicKey, response protobuf.PushNotificationQueryResponse) error { func (c *Client) HandlePushNotificationQueryResponse(serverPublicKey *ecdsa.PublicKey, response protobuf.PushNotificationQueryResponse) error {
c.config.Logger.Debug("received push notification query response", zap.Any("response", response)) c.config.Logger.Info("received push notification query response", zap.Any("response", response))
if len(response.Info) == 0 { if len(response.Info) == 0 {
return errors.New("empty response from the server") return errors.New("empty response from the server")
} }
@ -497,7 +563,7 @@ func (c *Client) HandlePushNotificationQueryResponse(serverPublicKey *ecdsa.Publ
return err return err
} }
if publicKey == nil { if publicKey == nil {
c.config.Logger.Debug("query not found") c.config.Logger.Info("query not found")
return nil return nil
} }
var pushNotificationInfo []*PushNotificationInfo var pushNotificationInfo []*PushNotificationInfo
@ -538,7 +604,7 @@ func (p *Client) HandlePushNotificationResponse(ack *protobuf.PushNotificationRe
} }
func (c *Client) AddPushNotificationServer(publicKey *ecdsa.PublicKey) error { func (c *Client) AddPushNotificationServer(publicKey *ecdsa.PublicKey) error {
c.config.Logger.Debug("adding push notification server", zap.Any("public-key", publicKey)) c.config.Logger.Info("adding push notification server", zap.Any("public-key", publicKey))
currentServers, err := c.persistence.GetServers() currentServers, err := c.persistence.GetServers()
if err != nil { if err != nil {
return err return err
@ -571,7 +637,7 @@ func (c *Client) QueryPushNotificationInfo(publicKey *ecdsa.PublicKey) error {
} }
encodedPublicKey := hex.EncodeToString(hashedPublicKey) encodedPublicKey := hex.EncodeToString(hashedPublicKey)
c.config.Logger.Debug("sending query") c.config.Logger.Info("sending query")
messageID, err := c.messageProcessor.SendPublic(context.Background(), encodedPublicKey, rawMessage) messageID, err := c.messageProcessor.SendPublic(context.Background(), encodedPublicKey, rawMessage)
if err != nil { if err != nil {
@ -582,7 +648,11 @@ func (c *Client) QueryPushNotificationInfo(publicKey *ecdsa.PublicKey) error {
} }
func (c *Client) GetPushNotificationInfo(publicKey *ecdsa.PublicKey, installationIDs []string) ([]*PushNotificationInfo, error) { func (c *Client) GetPushNotificationInfo(publicKey *ecdsa.PublicKey, installationIDs []string) ([]*PushNotificationInfo, error) {
return c.persistence.GetPushNotificationInfo(publicKey, installationIDs) if len(installationIDs) == 0 {
return c.persistence.GetPushNotificationInfoByPublicKey(publicKey)
} else {
return c.persistence.GetPushNotificationInfo(publicKey, installationIDs)
}
} }
func (c *Client) listenToPublicKeyQueryTopic(hashedPublicKey []byte) error { func (c *Client) listenToPublicKeyQueryTopic(hashedPublicKey []byte) error {

View File

@ -24,6 +24,20 @@ func (p *Persistence) TrackPushNotification(chatID string, messageID []byte) err
return err return err
} }
func (p *Persistence) TrackedMessage(messageID []byte) (bool, error) {
var count uint64
err := p.db.QueryRow(`SELECT COUNT(1) FROM push_notification_client_tracked_messages WHERE message_id = ?`, messageID).Scan(&count)
if err != nil {
return false, err
}
if count == 0 {
return false, nil
}
return true, nil
}
func (p *Persistence) SavePushNotificationQuery(publicKey *ecdsa.PublicKey, queryID []byte) error { func (p *Persistence) SavePushNotificationQuery(publicKey *ecdsa.PublicKey, queryID []byte) error {
queriedAt := time.Now().Unix() queriedAt := time.Now().Unix()
_, err := p.db.Exec(`INSERT INTO push_notification_client_queries (public_key, query_id, queried_at) VALUES (?,?,?)`, crypto.CompressPubkey(publicKey), queryID, queriedAt) _, err := p.db.Exec(`INSERT INTO push_notification_client_queries (public_key, query_id, queried_at) VALUES (?,?,?)`, crypto.CompressPubkey(publicKey), queryID, queriedAt)
@ -114,7 +128,33 @@ func (p *Persistence) GetPushNotificationInfo(publicKey *ecdsa.PublicKey, instal
return infos, nil return infos, nil
} }
func (p *Persistence) ShouldSentNotificationFor(publicKey *ecdsa.PublicKey, installationID string, messageID []byte) (bool, error) { func (p *Persistence) GetPushNotificationInfoByPublicKey(publicKey *ecdsa.PublicKey) ([]*PushNotificationInfo, error) {
rows, err := p.db.Query(`SELECT server_public_key, installation_id, access_token, retrieved_at FROM push_notification_client_info WHERE public_key = ?`, crypto.CompressPubkey(publicKey))
if err != nil {
return nil, err
}
var infos []*PushNotificationInfo
for rows.Next() {
var serverPublicKeyBytes []byte
info := &PushNotificationInfo{PublicKey: publicKey}
err := rows.Scan(&serverPublicKeyBytes, &info.InstallationID, &info.AccessToken, &info.RetrievedAt)
if err != nil {
return nil, err
}
serverPublicKey, err := crypto.DecompressPubkey(serverPublicKeyBytes)
if err != nil {
return nil, err
}
info.ServerPublicKey = serverPublicKey
infos = append(infos, info)
}
return infos, nil
}
func (p *Persistence) ShouldSendNotificationFor(publicKey *ecdsa.PublicKey, installationID string, messageID []byte) (bool, error) {
// First we check that we are tracking this message, next we check that we haven't already sent this // First we check that we are tracking this message, next we check that we haven't already sent this
var count uint64 var count uint64
err := p.db.QueryRow(`SELECT COUNT(1) FROM push_notification_client_tracked_messages WHERE message_id = ?`, messageID).Scan(&count) err := p.db.QueryRow(`SELECT COUNT(1) FROM push_notification_client_tracked_messages WHERE message_id = ?`, messageID).Scan(&count)
@ -134,6 +174,26 @@ func (p *Persistence) ShouldSentNotificationFor(publicKey *ecdsa.PublicKey, inst
return count == 0, nil return count == 0, nil
} }
func (p *Persistence) ShouldSendNotificationToAllInstallationIDs(publicKey *ecdsa.PublicKey, messageID []byte) (bool, error) {
// First we check that we are tracking this message, next we check that we haven't already sent this
var count uint64
err := p.db.QueryRow(`SELECT COUNT(1) FROM push_notification_client_tracked_messages WHERE message_id = ?`, messageID).Scan(&count)
if err != nil {
return false, err
}
if count == 0 {
return false, nil
}
err = p.db.QueryRow(`SELECT COUNT(1) FROM push_notification_client_sent_notifications WHERE message_id = ? AND public_key = ? `, messageID, crypto.CompressPubkey(publicKey)).Scan(&count)
if err != nil {
return false, err
}
return count == 0, nil
}
func (p *Persistence) NotifiedOn(publicKey *ecdsa.PublicKey, installationID string, messageID []byte) error { func (p *Persistence) NotifiedOn(publicKey *ecdsa.PublicKey, installationID string, messageID []byte) error {
sentAt := time.Now().Unix() sentAt := time.Now().Unix()
_, err := p.db.Exec(`INSERT INTO push_notification_client_sent_notifications (public_key, installation_id, message_id, sent_at) VALUES (?, ?, ?, ?)`, crypto.CompressPubkey(publicKey), installationID, messageID, sentAt) _, err := p.db.Exec(`INSERT INTO push_notification_client_sent_notifications (public_key, installation_id, message_id, sent_at) VALUES (?, ?, ?, ?)`, crypto.CompressPubkey(publicKey), installationID, messageID, sentAt)

View File

@ -75,5 +75,4 @@ func sendGoRushNotification(request *GoRushRequest, url string) error {
return err return err
} }
return nil return nil
} }

View File

@ -11,6 +11,7 @@ import (
"github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/crypto/ecies" "github.com/status-im/status-go/eth-node/crypto/ecies"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/protobuf"
"go.uber.org/zap" "go.uber.org/zap"
@ -136,17 +137,17 @@ func (s *Server) ValidateRegistration(publicKey *ecdsa.PublicKey, payload []byte
return registration, nil return registration, nil
} }
func (p *Server) HandlePushNotificationQuery(query *protobuf.PushNotificationQuery) *protobuf.PushNotificationQueryResponse { func (s *Server) HandlePushNotificationQuery(query *protobuf.PushNotificationQuery) *protobuf.PushNotificationQueryResponse {
p.config.Logger.Debug("handling push notification query") s.config.Logger.Info("handling push notification query")
response := &protobuf.PushNotificationQueryResponse{} response := &protobuf.PushNotificationQueryResponse{}
if query == nil || len(query.PublicKeys) == 0 { if query == nil || len(query.PublicKeys) == 0 {
return response return response
} }
registrations, err := p.persistence.GetPushNotificationRegistrationByPublicKeys(query.PublicKeys) registrations, err := s.persistence.GetPushNotificationRegistrationByPublicKeys(query.PublicKeys)
if err != nil { if err != nil {
// TODO: log errors s.config.Logger.Error("failed to retrieve registration", zap.Error(err))
return response return response
} }
@ -171,10 +172,12 @@ func (p *Server) HandlePushNotificationQuery(query *protobuf.PushNotificationQue
return response return response
} }
func (p *Server) HandlePushNotificationRequest(request *protobuf.PushNotificationRequest) *protobuf.PushNotificationResponse { func (s *Server) HandlePushNotificationRequest(request *protobuf.PushNotificationRequest) *protobuf.PushNotificationResponse {
s.config.Logger.Info("handling pn request")
response := &protobuf.PushNotificationResponse{} response := &protobuf.PushNotificationResponse{}
// We don't even send a response in this case // We don't even send a response in this case
if request == nil || len(request.MessageId) == 0 { if request == nil || len(request.MessageId) == 0 {
s.config.Logger.Warn("empty message id")
return nil return nil
} }
@ -185,18 +188,20 @@ func (p *Server) HandlePushNotificationRequest(request *protobuf.PushNotificatio
var requestAndRegistrations []*RequestAndRegistration var requestAndRegistrations []*RequestAndRegistration
for _, pn := range request.Requests { for _, pn := range request.Requests {
registration, err := p.persistence.GetPushNotificationRegistrationByPublicKeyAndInstallationID(pn.PublicKey, pn.InstallationId) registration, err := s.persistence.GetPushNotificationRegistrationByPublicKeyAndInstallationID(pn.PublicKey, pn.InstallationId)
report := &protobuf.PushNotificationReport{ report := &protobuf.PushNotificationReport{
PublicKey: pn.PublicKey, PublicKey: pn.PublicKey,
InstallationId: pn.InstallationId, InstallationId: pn.InstallationId,
} }
if err != nil { if err != nil {
// TODO: log error s.config.Logger.Error("failed to retrieve registration", zap.Error(err))
report.Error = protobuf.PushNotificationReport_UNKNOWN_ERROR_TYPE report.Error = protobuf.PushNotificationReport_UNKNOWN_ERROR_TYPE
} else if registration == nil { } else if registration == nil {
s.config.Logger.Warn("empty registration")
report.Error = protobuf.PushNotificationReport_NOT_REGISTERED report.Error = protobuf.PushNotificationReport_NOT_REGISTERED
} else if registration.AccessToken != pn.AccessToken { } else if registration.AccessToken != pn.AccessToken {
s.config.Logger.Warn("invalid access token")
report.Error = protobuf.PushNotificationReport_WRONG_TOKEN report.Error = protobuf.PushNotificationReport_WRONG_TOKEN
} else { } else {
// For now we just assume that the notification will be successful // For now we just assume that the notification will be successful
@ -210,14 +215,19 @@ func (p *Server) HandlePushNotificationRequest(request *protobuf.PushNotificatio
response.Reports = append(response.Reports, report) response.Reports = append(response.Reports, report)
} }
s.config.Logger.Info("built pn request")
if len(requestAndRegistrations) == 0 { if len(requestAndRegistrations) == 0 {
s.config.Logger.Warn("no request and registration")
return response return response
} }
// This can be done asynchronously // This can be done asynchronously
goRushRequest := PushNotificationRegistrationToGoRushRequest(requestAndRegistrations) goRushRequest := PushNotificationRegistrationToGoRushRequest(requestAndRegistrations)
err := sendGoRushNotification(goRushRequest, p.config.GorushURL) //TODO: REMOVE ME
s.config.Logger.Info("REQUEST", zap.Any("REQUEST", goRushRequest))
err := sendGoRushNotification(goRushRequest, s.config.GorushURL)
if err != nil { if err != nil {
s.config.Logger.Error("failed to send go rush notification", zap.Error(err))
// TODO: handle this error? // TODO: handle this error?
} }
@ -226,7 +236,7 @@ func (p *Server) HandlePushNotificationRequest(request *protobuf.PushNotificatio
func (s *Server) HandlePushNotificationRegistration(publicKey *ecdsa.PublicKey, payload []byte) *protobuf.PushNotificationRegistrationResponse { func (s *Server) HandlePushNotificationRegistration(publicKey *ecdsa.PublicKey, payload []byte) *protobuf.PushNotificationRegistrationResponse {
s.config.Logger.Debug("handling push notification registration") s.config.Logger.Info("handling push notification registration")
response := &protobuf.PushNotificationRegistrationResponse{ response := &protobuf.PushNotificationRegistrationResponse{
RequestId: common.Shake256(payload), RequestId: common.Shake256(payload),
} }
@ -269,13 +279,15 @@ func (s *Server) HandlePushNotificationRegistration(publicKey *ecdsa.PublicKey,
} }
response.Success = true response.Success = true
s.config.Logger.Debug("handled push notification registration successfully") s.config.Logger.Info("handled push notification registration successfully")
return response return response
} }
func (s *Server) Start() error { func (s *Server) Start() error {
s.config.Logger.Info("starting push notification server")
if s.config.Identity == nil { if s.config.Identity == nil {
s.config.Logger.Info("Identity nil")
// Pull identity from database // Pull identity from database
identity, err := s.persistence.GetIdentity() identity, err := s.persistence.GetIdentity()
if err != nil { if err != nil {
@ -303,6 +315,8 @@ func (s *Server) Start() error {
} }
} }
s.config.Logger.Info("started push notification server", zap.String("identity", types.EncodeHex(crypto.FromECDSAPub(&s.config.Identity.PublicKey))))
return nil return nil
} }
@ -354,9 +368,10 @@ func (p *Server) HandlePushNotificationQuery2(publicKey *ecdsa.PublicKey, messag
} }
func (p *Server) HandlePushNotificationRequest2(publicKey *ecdsa.PublicKey, func (s *Server) HandlePushNotificationRequest2(publicKey *ecdsa.PublicKey,
request protobuf.PushNotificationRequest) error { request protobuf.PushNotificationRequest) error {
response := p.HandlePushNotificationRequest(&request) s.config.Logger.Info("handling pn request")
response := s.HandlePushNotificationRequest(&request)
if response == nil { if response == nil {
return nil return nil
} }
@ -370,7 +385,7 @@ func (p *Server) HandlePushNotificationRequest2(publicKey *ecdsa.PublicKey,
MessageType: protobuf.ApplicationMetadataMessage_PUSH_NOTIFICATION_RESPONSE, MessageType: protobuf.ApplicationMetadataMessage_PUSH_NOTIFICATION_RESPONSE,
} }
_, err = p.messageProcessor.SendPrivate(context.Background(), publicKey, rawMessage) _, err = s.messageProcessor.SendPrivate(context.Background(), publicKey, rawMessage)
return err return err
} }

View File

@ -12,10 +12,12 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/mailserver" "github.com/status-im/status-go/mailserver"
"github.com/status-im/status-go/protocol" "github.com/status-im/status-go/protocol"
"github.com/status-im/status-go/protocol/encryption/multidevice" "github.com/status-im/status-go/protocol/encryption/multidevice"
"github.com/status-im/status-go/protocol/push_notification_client"
"github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/services/ext/mailservers" "github.com/status-im/status-go/services/ext/mailservers"
) )
@ -401,6 +403,31 @@ func (api *PublicAPI) UpdateMailservers(enodes []string) error {
return api.service.UpdateMailservers(nodes) return api.service.UpdateMailservers(nodes)
} }
// PushNotifications server
func (api *PublicAPI) StartPushNotificationServer() error {
return api.service.messenger.StartPushNotificationServer()
}
func (api *PublicAPI) StopPushNotificationServer() error {
return api.service.messenger.StopPushNotificationServer()
}
// PushNotification client
func (api *PublicAPI) RegisterForPushNotifications(ctx context.Context, deviceToken string) ([]*push_notification_client.PushNotificationServer, error) {
return api.service.messenger.RegisterForPushNotifications(ctx, deviceToken)
}
func (api *PublicAPI) AddPushNotificationServer(ctx context.Context, publicKeyBytes types.HexBytes) error {
publicKey, err := crypto.UnmarshalPubkey(publicKeyBytes)
if err != nil {
return err
}
return api.service.messenger.AddPushNotificationServer(ctx, publicKey)
}
// Echo is a method for testing purposes. // Echo is a method for testing purposes.
func (api *PublicAPI) Echo(ctx context.Context, message string) (string, error) { func (api *PublicAPI) Echo(ctx context.Context, message string) (string, error) {
return message, nil return message, nil