Run registration loop when servers are not fully registered

This commit is contained in:
Andrea Maria Piana 2020-07-15 12:29:16 +02:00
parent d2ab7f1bcd
commit 8130e5b402
No known key found for this signature in database
GPG Key ID: AA6CCA6DE0E06424
4 changed files with 113 additions and 16 deletions

View File

@ -11,6 +11,7 @@ import (
"encoding/json"
"errors"
"io"
"math"
"sort"
"time"
@ -29,11 +30,19 @@ const encryptedPayloadKeyLength = 16
const accessTokenKeyLength = 16
const staleQueryTimeInSeconds = 86400
// maxRetries is the maximum number of attempts we do before giving up registering with a server
const maxRetries int64 = 12
// RegistrationBackoffTime is the step of the exponential backoff
const RegistrationBackoffTime int64 = 15
type PushNotificationServer struct {
PublicKey *ecdsa.PublicKey `json:"-"`
Registered bool `json:"registered,omitempty"`
RegisteredAt int64 `json:"registeredAt,omitempty"`
AccessToken string `json:"accessToken,omitempty"`
PublicKey *ecdsa.PublicKey `json:"-"`
Registered bool `json:"registered,omitempty"`
RegisteredAt int64 `json:"registeredAt,omitempty"`
LastRetriedAt int64 `json:"lastRetriedAt,omitempty"`
RetryCount int64 `json:"retryCount,omitempty"`
AccessToken string `json:"accessToken,omitempty"`
}
func (s *PushNotificationServer) MarshalJSON() ([]byte, error) {
@ -68,8 +77,7 @@ type Config struct {
// AllowOnlyFromContacts indicates whether we should be receiving push notifications
// only from contacts
AllowOnlyFromContacts bool
// PushNotificationServers is an array of push notification servers we want to register with
PushNotificationServers []*PushNotificationServer
// InstallationID is the installation-id for this device
InstallationID string
@ -99,8 +107,10 @@ type Client struct {
reader io.Reader
//messageProcessor is a message processor used to send and being notified of messages
messageProcessor *common.MessageProcessor
// registrationLoopQuitChan is a channel to indicate to the registration loop that should be terminating
registrationLoopQuitChan chan struct{}
}
func New(persistence *Persistence, config *Config, processor *common.MessageProcessor) *Client {
@ -109,7 +119,8 @@ func New(persistence *Persistence, config *Config, processor *common.MessageProc
config: config,
messageProcessor: processor,
persistence: persistence,
reader: rand.Reader}
reader: rand.Reader,
}
}
func (c *Client) subscribeForSentMessages() {
@ -147,6 +158,19 @@ func (c *Client) loadLastPushNotificationRegistration() error {
return nil
}
func (c *Client) stopRegistrationLoop() {
// stop old registration loop
if c.registrationLoopQuitChan != nil {
close(c.registrationLoopQuitChan)
c.registrationLoopQuitChan = nil
}
}
func (c *Client) startRegistrationLoop() {
c.stopRegistrationLoop()
c.registrationLoopQuitChan = make(chan struct{})
go c.registrationLoop()
}
func (c *Client) Start() error {
if c.messageProcessor == nil {
@ -158,12 +182,16 @@ func (c *Client) Start() error {
return err
}
c.subscribeForSentMessages()
c.startRegistrationLoop()
return nil
}
func (c *Client) Stop() error {
close(c.quit)
if c.registrationLoopQuitChan != nil {
close(c.registrationLoopQuitChan)
}
return nil
}
@ -450,10 +478,21 @@ func (c *Client) shouldRefreshToken(oldContactIDs, newContactIDs []*ecdsa.Public
return false
}
func nextServerRetry(server *PushNotificationServer) int64 {
return server.LastRetriedAt + RegistrationBackoffTime*server.RetryCount*int64(math.Exp2(float64(server.RetryCount)))
}
// We calculate if it's too early to retry, by exponentially backing off
func shouldRetryRegisteringWithServer(server *PushNotificationServer) bool {
return time.Now().Unix() < nextServerRetry(server)
}
func (c *Client) registerWithServer(registration *protobuf.PushNotificationRegistration, server *PushNotificationServer) error {
// Reset server registration data
server.Registered = false
server.RegisteredAt = 0
server.RetryCount += 1
server.LastRetriedAt = time.Now().Unix()
server.AccessToken = registration.AccessToken
if err := c.persistence.UpsertServer(server); err != nil {
@ -489,10 +528,64 @@ func (c *Client) registerWithServer(registration *protobuf.PushNotificationRegis
return err
}
return nil
}
func (c *Client) registrationLoop() error {
for {
c.config.Logger.Info("runing registration loop")
servers, err := c.persistence.GetServers()
if err != nil {
c.config.Logger.Error("failed retrieving servers, quitting registration loop", zap.Error(err))
return err
}
if len(servers) == 0 {
c.config.Logger.Debug("nothing to do, quitting registration loop")
return nil
}
var nonRegisteredServers []*PushNotificationServer
for _, server := range servers {
if server.Registered {
nonRegisteredServers = append(nonRegisteredServers, server)
}
if len(nonRegisteredServers) == 0 {
c.config.Logger.Debug("registered with all servers, quitting registration loop")
return nil
}
var lowestNextRetry int64
for _, server := range nonRegisteredServers {
if shouldRetryRegisteringWithServer(server) {
err := c.registerWithServer(c.lastPushNotificationRegistration, server)
if err != nil {
return err
}
}
nextRetry := nextServerRetry(server)
if lowestNextRetry == 0 || nextRetry < lowestNextRetry {
lowestNextRetry = nextRetry
}
}
nextRetry := lowestNextRetry - time.Now().Unix()
waitFor := time.Duration(nextRetry)
select {
case <-time.After(waitFor * time.Second):
case <-c.registrationLoopQuitChan:
return nil
}
}
}
}
func (c *Client) Register(deviceToken string, contactIDs []*ecdsa.PublicKey, mutedChatIDs []string) ([]*PushNotificationServer, error) {
// stop registration loop
c.stopRegistrationLoop()
c.DeviceToken = deviceToken
servers, err := c.persistence.GetServers()
if err != nil {
@ -526,7 +619,9 @@ func (c *Client) Register(deviceToken string, contactIDs []*ecdsa.PublicKey, mut
case <-c.quit:
return servers, nil
case <-ctx.Done():
c.config.Logger.Info("Context done")
c.config.Logger.Info("could not register all servers")
// start registration loop
c.startRegistrationLoop()
return servers, nil
case <-time.After(200 * time.Millisecond):
servers, err = c.persistence.GetServersByPublicKey(serverPublicKeys)

View File

@ -1,7 +1,7 @@
// Code generated by go-bindata. DO NOT EDIT.
// sources:
// 1593601729_initial_schema.down.sql (144B)
// 1593601729_initial_schema.up.sql (1.496kB)
// 1593601729_initial_schema.up.sql (1.576kB)
// doc.go (382B)
package migrations
@ -91,7 +91,7 @@ func _1593601729_initial_schemaDownSql() (*asset, error) {
return a, nil
}
var __1593601729_initial_schemaUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xac\x94\xc1\x8e\x9b\x30\x10\x86\xef\x3c\xc5\x1c\x17\x89\x43\xef\x7b\x02\xd6\x54\x48\x96\xdd\x26\x46\xca\xcd\x72\x8d\x77\x63\x85\x9a\xd4\x76\xa2\xe6\xed\x2b\x20\x24\x24\x24\x50\x25\xb9\x20\x61\xff\x1e\xcf\x37\xbf\x67\xd2\x05\x8a\x19\x02\x16\x27\x18\x41\x9e\x01\xa1\x0c\xd0\x2a\x5f\xb2\x25\x6c\x77\x6e\xcd\x4d\xed\xf5\xa7\x96\xc2\xeb\xda\x70\x59\x69\x65\x3c\x77\xca\xee\x95\x75\xf0\x16\x00\x6c\x77\xbf\x2a\x2d\xf9\x46\x1d\x20\xc1\x34\x69\xcf\x93\x02\xe3\x28\x00\xb0\xea\x4b\x3b\xaf\xac\x2a\x21\xa1\x14\xa3\x98\xc0\x07\xca\xe2\x02\x33\xc8\x62\xbc\x44\x97\x1a\x2e\x3c\xe4\x84\x9d\x22\x9c\xb4\xdf\x1a\x9d\x90\x52\x39\xc7\x7d\xbd\x51\x06\x18\x5a\xb1\x66\xb1\x20\xf9\xcf\x02\xbd\x9d\x73\x08\x81\x12\x48\x29\xc9\x70\x9e\x32\x58\xa0\x1f\x38\x4e\x51\x10\xbe\x07\xc1\x23\x9c\x7f\x76\xca\x6a\x35\xcf\xd9\xe9\x46\x00\xfd\xd6\x81\xeb\x72\x7c\x68\x94\x7b\xd4\x6b\x5f\x0b\xa1\xcd\x67\x3d\x4b\xd0\x39\xca\xa7\x24\xda\x38\x2f\xaa\xaa\x8b\xad\xcb\xd6\x83\x0b\xc1\xc8\xa1\xab\xb7\xe0\xad\x56\xfb\xdb\x55\x1a\xd7\xe2\xfa\xba\x68\x9c\xe2\x6b\xcb\xe4\xad\x90\x1b\x55\xf2\xdf\xca\x39\xf1\x75\x34\xfd\xf8\x73\xd3\x3f\xb9\x16\xfe\x66\x1d\xfa\x48\xf7\x39\xcf\x61\x2f\x19\xf2\xef\x84\x2e\x50\x00\xf0\x28\x84\x6b\x3e\xc3\x8d\x79\x8c\xa7\x2c\x6f\xef\xfb\x1f\xce\x08\x26\xbc\x0d\x9f\x20\xee\xc6\x87\x1d\xc0\xf6\x23\xa5\x5b\x1b\x43\x01\xc8\xda\x78\x21\x1b\xf3\x5c\xbb\xdd\xad\xba\x83\xf1\x6b\xe5\xb5\x6c\x48\xef\xcf\xa1\x13\xdc\x50\x3f\xfb\x14\x73\xf2\x81\x56\xa0\xcb\xbf\x7c\xb2\x4f\x87\x0d\x48\xc9\x74\x4f\x4f\x75\x4b\xf8\x1e\xfc\x0b\x00\x00\xff\xff\x1c\x18\x75\x11\xd8\x05\x00\x00")
var __1593601729_initial_schemaUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xac\x94\xc1\x8e\x9b\x30\x10\x86\xef\x3c\xc5\x1c\x17\x89\x43\xef\x7b\x02\xd6\x54\x48\x96\xdd\x26\x46\xca\xcd\x72\x8d\x77\x63\x85\x9a\xad\x6d\x56\xe5\xed\x2b\x60\xc9\x92\x90\x40\x95\xe4\x82\x84\xfd\x7b\xec\x6f\xfe\x99\x49\x37\x28\x66\x08\x58\x9c\x60\x04\x79\x06\x84\x32\x40\xbb\x7c\xcb\xb6\xf0\xde\xb8\x3d\x37\xb5\xd7\xaf\x5a\x0a\xaf\x6b\xc3\x65\xa5\x95\xf1\xdc\x29\xfb\xa1\xac\x83\xa7\x00\xe0\xbd\xf9\x55\x69\xc9\x0f\xaa\x85\x04\xd3\xa4\x3f\x4f\x0a\x8c\xa3\x00\xc0\xaa\x37\xed\xbc\xb2\xaa\x84\x84\x52\x8c\x62\x02\x2f\x28\x8b\x0b\xcc\x20\x8b\xf1\x16\x9d\x6a\xb8\xf0\x90\x13\x76\x8c\x70\xd4\x7e\xeb\x74\x95\x70\x9e\x5b\xe5\xad\x5e\x53\x76\xa2\x96\xcb\xba\x31\x4b\x2a\x21\xa5\x72\x8e\xfb\xfa\xa0\x0c\x30\xb4\x63\xdd\x62\x41\xf2\x9f\x05\x7a\xfa\x62\x0a\x81\x12\x48\x29\xc9\x70\x9e\x32\xd8\xa0\x1f\x38\x4e\x51\x10\x3e\x07\xc1\x2d\x79\xfb\xd3\x28\xab\xd5\x7a\xde\x06\xdd\x0c\x73\xdc\x6a\xb9\x2e\xe7\x87\x66\x6f\x8f\x46\xed\x63\x21\xb4\x79\xad\x57\x09\x86\x0a\xe1\x4b\x12\x6d\x9c\x17\x55\x35\xc4\xd6\x65\xef\xc1\x89\x60\xe6\xd0\x59\x6d\x75\xa5\xf0\x71\x39\x4b\xf3\x5c\x9c\x5f\x17\xcd\x9f\xf8\xd8\x34\x79\x2b\xe4\x41\x95\xfc\xb7\x72\x4e\xbc\x7d\x9a\xfe\xf9\x73\xd1\x3f\xb9\x17\xfe\x62\x1e\xc6\x48\xd7\x39\xbf\xc2\x9e\x32\xe4\xdf\x09\xdd\xa0\x00\xe0\x56\x08\xd7\x7d\xa6\x1b\xeb\x18\x77\x59\xde\xdf\xf7\x3f\x9c\x11\x2c\x78\x1b\xde\x41\x3c\x8c\x23\x3b\x81\x1d\x47\xd4\xb0\x36\x87\x02\x90\xb5\xf1\x42\x76\xe6\xb9\x7e\x7b\x58\x75\xad\xf1\x7b\xe5\xb5\xec\x48\xaf\xcf\xa1\x23\xdc\x54\xbf\x5a\x8a\x39\x79\x41\x3b\xd0\xe5\x5f\xbe\xd8\xa7\xd3\x06\xa4\x64\xb9\xa7\x97\xba\x25\x7c\x0e\xfe\x05\x00\x00\xff\xff\x58\xe5\xb5\x4b\x28\x06\x00\x00")
func _1593601729_initial_schemaUpSqlBytes() ([]byte, error) {
return bindataRead(
@ -106,8 +106,8 @@ func _1593601729_initial_schemaUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "1593601729_initial_schema.up.sql", size: 1496, mode: os.FileMode(0644), modTime: time.Unix(1594800672, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xe7, 0xb6, 0xaf, 0x11, 0x19, 0xf8, 0xc6, 0xed, 0x2f, 0xf5, 0x42, 0x54, 0xdd, 0x65, 0xf7, 0x39, 0xbc, 0x19, 0xff, 0x72, 0xa1, 0x38, 0x87, 0xfa, 0x6d, 0xd5, 0xe3, 0x6a, 0x49, 0x65, 0x3c, 0x49}}
info := bindataFileInfo{name: "1593601729_initial_schema.up.sql", size: 1576, mode: os.FileMode(0644), modTime: time.Unix(1594803635, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xf4, 0xe, 0xc, 0xdf, 0xb6, 0xdb, 0x22, 0x1, 0x1f, 0xe0, 0x18, 0x96, 0x1b, 0x13, 0x5f, 0x60, 0x2c, 0x41, 0xd5, 0x4, 0x77, 0x99, 0x8a, 0x4d, 0xa1, 0xd6, 0x41, 0xb5, 0x1, 0x86, 0x29, 0x1e}}
return a, nil
}

View File

@ -2,6 +2,8 @@ CREATE TABLE IF NOT EXISTS push_notification_client_servers (
public_key BLOB NOT NULL,
registered BOOLEAN DEFAULT FALSE,
registered_at INT NOT NULL DEFAULT 0,
last_retried_at INT NOT NULL DEFAULT 0,
retry_count INT NOT NULL DEFAULT 0,
access_token TEXT,
UNIQUE(public_key) ON CONFLICT REPLACE
);

View File

@ -261,13 +261,13 @@ func (p *Persistence) NotifiedOn(publicKey *ecdsa.PublicKey, installationID stri
}
func (p *Persistence) UpsertServer(server *PushNotificationServer) error {
_, err := p.db.Exec(`INSERT INTO push_notification_client_servers (public_key, registered, registered_at, access_token) VALUES (?,?,?,?)`, crypto.CompressPubkey(server.PublicKey), server.Registered, server.RegisteredAt, server.AccessToken)
_, err := p.db.Exec(`INSERT INTO push_notification_client_servers (public_key, registered, registered_at, access_token, last_retried_at, retry_count) VALUES (?,?,?,?,?,?)`, crypto.CompressPubkey(server.PublicKey), server.Registered, server.RegisteredAt, server.AccessToken, server.LastRetriedAt, server.RetryCount)
return err
}
func (p *Persistence) GetServers() ([]*PushNotificationServer, error) {
rows, err := p.db.Query(`SELECT public_key, registered, registered_at,access_token FROM push_notification_client_servers`)
rows, err := p.db.Query(`SELECT public_key, registered, registered_at,access_token,last_retried_at, retry_count FROM push_notification_client_servers`)
if err != nil {
return nil, err
}
@ -275,7 +275,7 @@ func (p *Persistence) GetServers() ([]*PushNotificationServer, error) {
for rows.Next() {
server := &PushNotificationServer{}
var key []byte
err := rows.Scan(&key, &server.Registered, &server.RegisteredAt, &server.AccessToken)
err := rows.Scan(&key, &server.Registered, &server.RegisteredAt, &server.AccessToken, &server.LastRetriedAt, &server.RetryCount)
if err != nil {
return nil, err
}