From 8130e5b402a26bd06a96cab7f96495c331ee5352 Mon Sep 17 00:00:00 2001 From: Andrea Maria Piana Date: Wed, 15 Jul 2020 12:29:16 +0200 Subject: [PATCH] Run registration loop when servers are not fully registered --- protocol/push_notification_client/client.go | 113 ++++++++++++++++-- .../migrations/migrations.go | 8 +- .../sql/1593601729_initial_schema.up.sql | 2 + .../push_notification_client/persistence.go | 6 +- 4 files changed, 113 insertions(+), 16 deletions(-) diff --git a/protocol/push_notification_client/client.go b/protocol/push_notification_client/client.go index 265a0f155..abef9e148 100644 --- a/protocol/push_notification_client/client.go +++ b/protocol/push_notification_client/client.go @@ -11,6 +11,7 @@ import ( "encoding/json" "errors" "io" + "math" "sort" "time" @@ -29,11 +30,19 @@ const encryptedPayloadKeyLength = 16 const accessTokenKeyLength = 16 const staleQueryTimeInSeconds = 86400 +// maxRetries is the maximum number of attempts we do before giving up registering with a server +const maxRetries int64 = 12 + +// RegistrationBackoffTime is the step of the exponential backoff +const RegistrationBackoffTime int64 = 15 + type PushNotificationServer struct { - PublicKey *ecdsa.PublicKey `json:"-"` - Registered bool `json:"registered,omitempty"` - RegisteredAt int64 `json:"registeredAt,omitempty"` - AccessToken string `json:"accessToken,omitempty"` + PublicKey *ecdsa.PublicKey `json:"-"` + Registered bool `json:"registered,omitempty"` + RegisteredAt int64 `json:"registeredAt,omitempty"` + LastRetriedAt int64 `json:"lastRetriedAt,omitempty"` + RetryCount int64 `json:"retryCount,omitempty"` + AccessToken string `json:"accessToken,omitempty"` } func (s *PushNotificationServer) MarshalJSON() ([]byte, error) { @@ -68,8 +77,7 @@ type Config struct { // AllowOnlyFromContacts indicates whether we should be receiving push notifications // only from contacts AllowOnlyFromContacts bool - // PushNotificationServers is an array of push notification servers we want to register with - PushNotificationServers []*PushNotificationServer + // InstallationID is the installation-id for this device InstallationID string @@ -99,8 +107,10 @@ type Client struct { reader io.Reader //messageProcessor is a message processor used to send and being notified of messages - messageProcessor *common.MessageProcessor + + // registrationLoopQuitChan is a channel to indicate to the registration loop that should be terminating + registrationLoopQuitChan chan struct{} } func New(persistence *Persistence, config *Config, processor *common.MessageProcessor) *Client { @@ -109,7 +119,8 @@ func New(persistence *Persistence, config *Config, processor *common.MessageProc config: config, messageProcessor: processor, persistence: persistence, - reader: rand.Reader} + reader: rand.Reader, + } } func (c *Client) subscribeForSentMessages() { @@ -147,6 +158,19 @@ func (c *Client) loadLastPushNotificationRegistration() error { return nil } +func (c *Client) stopRegistrationLoop() { + // stop old registration loop + if c.registrationLoopQuitChan != nil { + close(c.registrationLoopQuitChan) + c.registrationLoopQuitChan = nil + } +} + +func (c *Client) startRegistrationLoop() { + c.stopRegistrationLoop() + c.registrationLoopQuitChan = make(chan struct{}) + go c.registrationLoop() +} func (c *Client) Start() error { if c.messageProcessor == nil { @@ -158,12 +182,16 @@ func (c *Client) Start() error { return err } c.subscribeForSentMessages() + c.startRegistrationLoop() return nil } func (c *Client) Stop() error { close(c.quit) + if c.registrationLoopQuitChan != nil { + close(c.registrationLoopQuitChan) + } return nil } @@ -450,10 +478,21 @@ func (c *Client) shouldRefreshToken(oldContactIDs, newContactIDs []*ecdsa.Public return false } +func nextServerRetry(server *PushNotificationServer) int64 { + return server.LastRetriedAt + RegistrationBackoffTime*server.RetryCount*int64(math.Exp2(float64(server.RetryCount))) +} + +// We calculate if it's too early to retry, by exponentially backing off +func shouldRetryRegisteringWithServer(server *PushNotificationServer) bool { + return time.Now().Unix() < nextServerRetry(server) +} + func (c *Client) registerWithServer(registration *protobuf.PushNotificationRegistration, server *PushNotificationServer) error { // Reset server registration data server.Registered = false server.RegisteredAt = 0 + server.RetryCount += 1 + server.LastRetriedAt = time.Now().Unix() server.AccessToken = registration.AccessToken if err := c.persistence.UpsertServer(server); err != nil { @@ -489,10 +528,64 @@ func (c *Client) registerWithServer(registration *protobuf.PushNotificationRegis return err } return nil +} +func (c *Client) registrationLoop() error { + for { + c.config.Logger.Info("runing registration loop") + servers, err := c.persistence.GetServers() + if err != nil { + c.config.Logger.Error("failed retrieving servers, quitting registration loop", zap.Error(err)) + return err + } + if len(servers) == 0 { + c.config.Logger.Debug("nothing to do, quitting registration loop") + return nil + } + + var nonRegisteredServers []*PushNotificationServer + for _, server := range servers { + if server.Registered { + nonRegisteredServers = append(nonRegisteredServers, server) + } + if len(nonRegisteredServers) == 0 { + c.config.Logger.Debug("registered with all servers, quitting registration loop") + return nil + } + + var lowestNextRetry int64 + + for _, server := range nonRegisteredServers { + if shouldRetryRegisteringWithServer(server) { + err := c.registerWithServer(c.lastPushNotificationRegistration, server) + if err != nil { + return err + } + } + nextRetry := nextServerRetry(server) + if lowestNextRetry == 0 || nextRetry < lowestNextRetry { + lowestNextRetry = nextRetry + } + + } + + nextRetry := lowestNextRetry - time.Now().Unix() + waitFor := time.Duration(nextRetry) + select { + + case <-time.After(waitFor * time.Second): + case <-c.registrationLoopQuitChan: + return nil + + } + } + } } func (c *Client) Register(deviceToken string, contactIDs []*ecdsa.PublicKey, mutedChatIDs []string) ([]*PushNotificationServer, error) { + // stop registration loop + c.stopRegistrationLoop() + c.DeviceToken = deviceToken servers, err := c.persistence.GetServers() if err != nil { @@ -526,7 +619,9 @@ func (c *Client) Register(deviceToken string, contactIDs []*ecdsa.PublicKey, mut case <-c.quit: return servers, nil case <-ctx.Done(): - c.config.Logger.Info("Context done") + c.config.Logger.Info("could not register all servers") + // start registration loop + c.startRegistrationLoop() return servers, nil case <-time.After(200 * time.Millisecond): servers, err = c.persistence.GetServersByPublicKey(serverPublicKeys) diff --git a/protocol/push_notification_client/migrations/migrations.go b/protocol/push_notification_client/migrations/migrations.go index 6f5d47a54..d6c7865d8 100644 --- a/protocol/push_notification_client/migrations/migrations.go +++ b/protocol/push_notification_client/migrations/migrations.go @@ -1,7 +1,7 @@ // Code generated by go-bindata. DO NOT EDIT. // sources: // 1593601729_initial_schema.down.sql (144B) -// 1593601729_initial_schema.up.sql (1.496kB) +// 1593601729_initial_schema.up.sql (1.576kB) // doc.go (382B) package migrations @@ -91,7 +91,7 @@ func _1593601729_initial_schemaDownSql() (*asset, error) { return a, nil } -var __1593601729_initial_schemaUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xac\x94\xc1\x8e\x9b\x30\x10\x86\xef\x3c\xc5\x1c\x17\x89\x43\xef\x7b\x02\xd6\x54\x48\x96\xdd\x26\x46\xca\xcd\x72\x8d\x77\x63\x85\x9a\xd4\x76\xa2\xe6\xed\x2b\x20\x24\x24\x24\x50\x25\xb9\x20\x61\xff\x1e\xcf\x37\xbf\x67\xd2\x05\x8a\x19\x02\x16\x27\x18\x41\x9e\x01\xa1\x0c\xd0\x2a\x5f\xb2\x25\x6c\x77\x6e\xcd\x4d\xed\xf5\xa7\x96\xc2\xeb\xda\x70\x59\x69\x65\x3c\x77\xca\xee\x95\x75\xf0\x16\x00\x6c\x77\xbf\x2a\x2d\xf9\x46\x1d\x20\xc1\x34\x69\xcf\x93\x02\xe3\x28\x00\xb0\xea\x4b\x3b\xaf\xac\x2a\x21\xa1\x14\xa3\x98\xc0\x07\xca\xe2\x02\x33\xc8\x62\xbc\x44\x97\x1a\x2e\x3c\xe4\x84\x9d\x22\x9c\xb4\xdf\x1a\x9d\x90\x52\x39\xc7\x7d\xbd\x51\x06\x18\x5a\xb1\x66\xb1\x20\xf9\xcf\x02\xbd\x9d\x73\x08\x81\x12\x48\x29\xc9\x70\x9e\x32\x58\xa0\x1f\x38\x4e\x51\x10\xbe\x07\xc1\x23\x9c\x7f\x76\xca\x6a\x35\xcf\xd9\xe9\x46\x00\xfd\xd6\x81\xeb\x72\x7c\x68\x94\x7b\xd4\x6b\x5f\x0b\xa1\xcd\x67\x3d\x4b\xd0\x39\xca\xa7\x24\xda\x38\x2f\xaa\xaa\x8b\xad\xcb\xd6\x83\x0b\xc1\xc8\xa1\xab\xb7\xe0\xad\x56\xfb\xdb\x55\x1a\xd7\xe2\xfa\xba\x68\x9c\xe2\x6b\xcb\xe4\xad\x90\x1b\x55\xf2\xdf\xca\x39\xf1\x75\x34\xfd\xf8\x73\xd3\x3f\xb9\x16\xfe\x66\x1d\xfa\x48\xf7\x39\xcf\x61\x2f\x19\xf2\xef\x84\x2e\x50\x00\xf0\x28\x84\x6b\x3e\xc3\x8d\x79\x8c\xa7\x2c\x6f\xef\xfb\x1f\xce\x08\x26\xbc\x0d\x9f\x20\xee\xc6\x87\x1d\xc0\xf6\x23\xa5\x5b\x1b\x43\x01\xc8\xda\x78\x21\x1b\xf3\x5c\xbb\xdd\xad\xba\x83\xf1\x6b\xe5\xb5\x6c\x48\xef\xcf\xa1\x13\xdc\x50\x3f\xfb\x14\x73\xf2\x81\x56\xa0\xcb\xbf\x7c\xb2\x4f\x87\x0d\x48\xc9\x74\x4f\x4f\x75\x4b\xf8\x1e\xfc\x0b\x00\x00\xff\xff\x1c\x18\x75\x11\xd8\x05\x00\x00") +var __1593601729_initial_schemaUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xac\x94\xc1\x8e\x9b\x30\x10\x86\xef\x3c\xc5\x1c\x17\x89\x43\xef\x7b\x02\xd6\x54\x48\x96\xdd\x26\x46\xca\xcd\x72\x8d\x77\x63\x85\x9a\xad\x6d\x56\xe5\xed\x2b\x60\xc9\x92\x90\x40\x95\xe4\x82\x84\xfd\x7b\xec\x6f\xfe\x99\x49\x37\x28\x66\x08\x58\x9c\x60\x04\x79\x06\x84\x32\x40\xbb\x7c\xcb\xb6\xf0\xde\xb8\x3d\x37\xb5\xd7\xaf\x5a\x0a\xaf\x6b\xc3\x65\xa5\x95\xf1\xdc\x29\xfb\xa1\xac\x83\xa7\x00\xe0\xbd\xf9\x55\x69\xc9\x0f\xaa\x85\x04\xd3\xa4\x3f\x4f\x0a\x8c\xa3\x00\xc0\xaa\x37\xed\xbc\xb2\xaa\x84\x84\x52\x8c\x62\x02\x2f\x28\x8b\x0b\xcc\x20\x8b\xf1\x16\x9d\x6a\xb8\xf0\x90\x13\x76\x8c\x70\xd4\x7e\xeb\x74\x95\x70\x9e\x5b\xe5\xad\x5e\x53\x76\xa2\x96\xcb\xba\x31\x4b\x2a\x21\xa5\x72\x8e\xfb\xfa\xa0\x0c\x30\xb4\x63\xdd\x62\x41\xf2\x9f\x05\x7a\xfa\x62\x0a\x81\x12\x48\x29\xc9\x70\x9e\x32\xd8\xa0\x1f\x38\x4e\x51\x10\x3e\x07\xc1\x2d\x79\xfb\xd3\x28\xab\xd5\x7a\xde\x06\xdd\x0c\x73\xdc\x6a\xb9\x2e\xe7\x87\x66\x6f\x8f\x46\xed\x63\x21\xb4\x79\xad\x57\x09\x86\x0a\xe1\x4b\x12\x6d\x9c\x17\x55\x35\xc4\xd6\x65\xef\xc1\x89\x60\xe6\xd0\x59\x6d\x75\xa5\xf0\x71\x39\x4b\xf3\x5c\x9c\x5f\x17\xcd\x9f\xf8\xd8\x34\x79\x2b\xe4\x41\x95\xfc\xb7\x72\x4e\xbc\x7d\x9a\xfe\xf9\x73\xd1\x3f\xb9\x17\xfe\x62\x1e\xc6\x48\xd7\x39\xbf\xc2\x9e\x32\xe4\xdf\x09\xdd\xa0\x00\xe0\x56\x08\xd7\x7d\xa6\x1b\xeb\x18\x77\x59\xde\xdf\xf7\x3f\x9c\x11\x2c\x78\x1b\xde\x41\x3c\x8c\x23\x3b\x81\x1d\x47\xd4\xb0\x36\x87\x02\x90\xb5\xf1\x42\x76\xe6\xb9\x7e\x7b\x58\x75\xad\xf1\x7b\xe5\xb5\xec\x48\xaf\xcf\xa1\x23\xdc\x54\xbf\x5a\x8a\x39\x79\x41\x3b\xd0\xe5\x5f\xbe\xd8\xa7\xd3\x06\xa4\x64\xb9\xa7\x97\xba\x25\x7c\x0e\xfe\x05\x00\x00\xff\xff\x58\xe5\xb5\x4b\x28\x06\x00\x00") func _1593601729_initial_schemaUpSqlBytes() ([]byte, error) { return bindataRead( @@ -106,8 +106,8 @@ func _1593601729_initial_schemaUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1593601729_initial_schema.up.sql", size: 1496, mode: os.FileMode(0644), modTime: time.Unix(1594800672, 0)} - a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xe7, 0xb6, 0xaf, 0x11, 0x19, 0xf8, 0xc6, 0xed, 0x2f, 0xf5, 0x42, 0x54, 0xdd, 0x65, 0xf7, 0x39, 0xbc, 0x19, 0xff, 0x72, 0xa1, 0x38, 0x87, 0xfa, 0x6d, 0xd5, 0xe3, 0x6a, 0x49, 0x65, 0x3c, 0x49}} + info := bindataFileInfo{name: "1593601729_initial_schema.up.sql", size: 1576, mode: os.FileMode(0644), modTime: time.Unix(1594803635, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xf4, 0xe, 0xc, 0xdf, 0xb6, 0xdb, 0x22, 0x1, 0x1f, 0xe0, 0x18, 0x96, 0x1b, 0x13, 0x5f, 0x60, 0x2c, 0x41, 0xd5, 0x4, 0x77, 0x99, 0x8a, 0x4d, 0xa1, 0xd6, 0x41, 0xb5, 0x1, 0x86, 0x29, 0x1e}} return a, nil } diff --git a/protocol/push_notification_client/migrations/sql/1593601729_initial_schema.up.sql b/protocol/push_notification_client/migrations/sql/1593601729_initial_schema.up.sql index c4c6b6999..86be44f03 100644 --- a/protocol/push_notification_client/migrations/sql/1593601729_initial_schema.up.sql +++ b/protocol/push_notification_client/migrations/sql/1593601729_initial_schema.up.sql @@ -2,6 +2,8 @@ CREATE TABLE IF NOT EXISTS push_notification_client_servers ( public_key BLOB NOT NULL, registered BOOLEAN DEFAULT FALSE, registered_at INT NOT NULL DEFAULT 0, + last_retried_at INT NOT NULL DEFAULT 0, + retry_count INT NOT NULL DEFAULT 0, access_token TEXT, UNIQUE(public_key) ON CONFLICT REPLACE ); diff --git a/protocol/push_notification_client/persistence.go b/protocol/push_notification_client/persistence.go index 4afe0f815..10a800f95 100644 --- a/protocol/push_notification_client/persistence.go +++ b/protocol/push_notification_client/persistence.go @@ -261,13 +261,13 @@ func (p *Persistence) NotifiedOn(publicKey *ecdsa.PublicKey, installationID stri } func (p *Persistence) UpsertServer(server *PushNotificationServer) error { - _, err := p.db.Exec(`INSERT INTO push_notification_client_servers (public_key, registered, registered_at, access_token) VALUES (?,?,?,?)`, crypto.CompressPubkey(server.PublicKey), server.Registered, server.RegisteredAt, server.AccessToken) + _, err := p.db.Exec(`INSERT INTO push_notification_client_servers (public_key, registered, registered_at, access_token, last_retried_at, retry_count) VALUES (?,?,?,?,?,?)`, crypto.CompressPubkey(server.PublicKey), server.Registered, server.RegisteredAt, server.AccessToken, server.LastRetriedAt, server.RetryCount) return err } func (p *Persistence) GetServers() ([]*PushNotificationServer, error) { - rows, err := p.db.Query(`SELECT public_key, registered, registered_at,access_token FROM push_notification_client_servers`) + rows, err := p.db.Query(`SELECT public_key, registered, registered_at,access_token,last_retried_at, retry_count FROM push_notification_client_servers`) if err != nil { return nil, err } @@ -275,7 +275,7 @@ func (p *Persistence) GetServers() ([]*PushNotificationServer, error) { for rows.Next() { server := &PushNotificationServer{} var key []byte - err := rows.Scan(&key, &server.Registered, &server.RegisteredAt, &server.AccessToken) + err := rows.Scan(&key, &server.Registered, &server.RegisteredAt, &server.AccessToken, &server.LastRetriedAt, &server.RetryCount) if err != nil { return nil, err }