Make sure pn is resent and succesful in tests
This commit is contained in:
parent
59d1ee4bb9
commit
2be8dff54a
|
@ -236,11 +236,13 @@ func (c *Client) Stop() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) queryNotificationInfo(publicKey *ecdsa.PublicKey, force bool) error {
|
func (c *Client) queryNotificationInfo(publicKey *ecdsa.PublicKey, force bool) error {
|
||||||
|
c.config.Logger.Info("Getting queried at")
|
||||||
// Check if we queried recently
|
// Check if we queried recently
|
||||||
queriedAt, err := c.persistence.GetQueriedAt(publicKey)
|
queriedAt, err := c.persistence.GetQueriedAt(publicKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
c.config.Logger.Info("checking if querying necessary")
|
||||||
// Naively query again if too much time has passed.
|
// Naively query again if too much time has passed.
|
||||||
// Here it might not be necessary
|
// Here it might not be necessary
|
||||||
if force || time.Now().Unix()-queriedAt > staleQueryTimeInSeconds {
|
if force || time.Now().Unix()-queriedAt > staleQueryTimeInSeconds {
|
||||||
|
@ -353,6 +355,7 @@ func (c *Client) shouldNotifyOn(publicKey *ecdsa.PublicKey, installationID strin
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) notifiedOn(publicKey *ecdsa.PublicKey, installationID string, messageID []byte) error {
|
func (c *Client) notifiedOn(publicKey *ecdsa.PublicKey, installationID string, messageID []byte) error {
|
||||||
|
c.config.Logger.Info("notified on")
|
||||||
return c.persistence.UpsertSentNotification(&SentNotification{
|
return c.persistence.UpsertSentNotification(&SentNotification{
|
||||||
PublicKey: publicKey,
|
PublicKey: publicKey,
|
||||||
LastTriedAt: time.Now().Unix(),
|
LastTriedAt: time.Now().Unix(),
|
||||||
|
@ -501,7 +504,7 @@ func shouldRetryPushNotification(pn *SentNotification) bool {
|
||||||
if pn.RetryCount > maxPushNotificationRetries {
|
if pn.RetryCount > maxPushNotificationRetries {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return time.Now().Unix() > nextPushNotificationRetry(pn)
|
return time.Now().Unix() >= nextPushNotificationRetry(pn)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) resetServers() error {
|
func (c *Client) resetServers() error {
|
||||||
|
@ -652,14 +655,18 @@ func (c *Client) resendNotification(pn *SentNotification) error {
|
||||||
c.config.Logger.Info("resending notification", zap.Any("notification", pn))
|
c.config.Logger.Info("resending notification", zap.Any("notification", pn))
|
||||||
pn.RetryCount += 1
|
pn.RetryCount += 1
|
||||||
pn.LastTriedAt = time.Now().Unix()
|
pn.LastTriedAt = time.Now().Unix()
|
||||||
|
c.config.Logger.Info("PN", zap.Any("pn", pn))
|
||||||
err := c.persistence.UpsertSentNotification(pn)
|
err := c.persistence.UpsertSentNotification(pn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
c.config.Logger.Error("failed to upsert notification", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.config.Logger.Info("GOING INTO querying")
|
||||||
// Re-fetch push notification info
|
// Re-fetch push notification info
|
||||||
err = c.queryNotificationInfo(pn.PublicKey, true)
|
err = c.queryNotificationInfo(pn.PublicKey, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
c.config.Logger.Error("failed to query notification info", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -699,6 +706,7 @@ func (c *Client) resendingLoop() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
nextRetry := nextPushNotificationRetry(pn)
|
nextRetry := nextPushNotificationRetry(pn)
|
||||||
|
c.config.Logger.Info("Next next retry", zap.Int64("now", time.Now().Unix()), zap.Int64("next", nextRetry), zap.Any("pn", pn))
|
||||||
if lowestNextRetry == 0 || nextRetry < lowestNextRetry {
|
if lowestNextRetry == 0 || nextRetry < lowestNextRetry {
|
||||||
lowestNextRetry = nextRetry
|
lowestNextRetry = nextRetry
|
||||||
}
|
}
|
||||||
|
@ -1011,7 +1019,9 @@ func (c *Client) HandlePushNotificationQueryResponse(serverPublicKey *ecdsa.Publ
|
||||||
// HandlePushNotificationResponse should set the request as processed
|
// HandlePushNotificationResponse should set the request as processed
|
||||||
func (c *Client) HandlePushNotificationResponse(serverKey *ecdsa.PublicKey, response protobuf.PushNotificationResponse) error {
|
func (c *Client) HandlePushNotificationResponse(serverKey *ecdsa.PublicKey, response protobuf.PushNotificationResponse) error {
|
||||||
messageID := response.MessageId
|
messageID := response.MessageId
|
||||||
|
c.config.Logger.Info("received response for", zap.Binary("message-id", messageID))
|
||||||
for _, report := range response.Reports {
|
for _, report := range response.Reports {
|
||||||
|
c.config.Logger.Info("received response", zap.Any("report", report))
|
||||||
err := c.persistence.UpdateNotificationResponse(messageID, report)
|
err := c.persistence.UpdateNotificationResponse(messageID, report)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
// Code generated by go-bindata. DO NOT EDIT.
|
// Code generated by go-bindata. DO NOT EDIT.
|
||||||
// sources:
|
// sources:
|
||||||
// 1593601729_initial_schema.down.sql (144B)
|
// 1593601729_initial_schema.down.sql (144B)
|
||||||
// 1593601729_initial_schema.up.sql (1.753kB)
|
// 1593601729_initial_schema.up.sql (1.773kB)
|
||||||
// doc.go (382B)
|
// doc.go (382B)
|
||||||
|
|
||||||
package migrations
|
package migrations
|
||||||
|
@ -91,7 +91,7 @@ func _1593601729_initial_schemaDownSql() (*asset, error) {
|
||||||
return a, nil
|
return a, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var __1593601729_initial_schemaUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xac\x54\xc1\x6e\xe3\x20\x10\xbd\xfb\x2b\xe6\xd8\x48\x39\xec\xbd\x27\x27\x25\x2b\x4b\x08\xef\x26\x44\xca\x0d\xb1\x78\x5a\xa3\x78\x71\x17\x70\xb5\xf9\xfb\x15\x76\xe2\x26\xc5\x8b\xab\xb6\x17\x4b\x1e\x1e\xa3\x79\x6f\x1e\x6f\xbd\x25\x39\x27\xc0\xf3\x15\x25\x50\x6c\x80\x95\x1c\xc8\xa1\xd8\xf1\x1d\x3c\x77\xae\x16\xa6\xf5\xfa\x51\x2b\xe9\x75\x6b\x84\x6a\x34\x1a\x2f\x1c\xda\x17\xb4\x0e\xee\x32\x80\xe7\xee\x57\xa3\x95\x38\xe2\x09\x56\xb4\x5c\xf5\xf7\xd9\x9e\xd2\x65\x06\x60\xf1\x49\x3b\x8f\x16\x2b\x58\x95\x25\x25\x39\x83\x07\xb2\xc9\xf7\x94\xc3\x26\xa7\x3b\x72\x8b\x11\xd2\x43\xc1\xf8\xd8\x61\xc4\x7e\x0b\xb8\x46\x3a\x2f\x2c\x7a\xab\xe7\x90\x01\x74\x12\xaa\xed\x4c\x0a\x25\x95\x42\xe7\x84\x6f\x8f\x68\x80\x93\x03\x0f\xc5\x3d\x2b\x7e\xee\xc9\xdd\x2b\xa7\x05\x94\x0c\xd6\x25\xdb\xd0\x62\xcd\x61\x4b\x7e\xd0\x7c\x4d\xb2\xc5\x7d\x96\x7d\x44\xb7\x3f\x1d\x5a\x8d\xf3\xba\x0d\xb8\x88\xe6\xe5\xe8\x24\x74\x15\x5f\x8a\x66\x5f\x5e\xb0\x5f\x4b\x42\x9b\xc7\x76\x96\xc1\xe0\x10\x91\x82\x68\xe3\xbc\x6c\x9a\xa1\xb7\xae\xfa\x1d\xdc\x00\xa2\x0d\xbd\xf1\x56\xb0\xc2\xcb\xb4\x4a\xc1\x9d\xba\x35\x51\x3d\xd6\xe8\xed\x18\xcb\x78\xf4\xaf\x95\xcf\x5b\xa9\x8e\x58\x89\xdf\xe8\x9c\x7c\x3a\x9b\xe1\xfc\x33\xb9\x57\x55\x4b\x3f\xa9\xcf\xa5\xd3\x04\xff\x33\xcf\xd7\xb6\xb7\x1c\x8a\xef\xac\xdc\x92\x0c\xe0\xa3\x24\x5c\xf8\x5c\x1f\xcc\xd3\x48\x59\xa1\x96\xae\xc6\xea\x73\x6e\xe9\xf3\x61\x32\x1d\xde\x9f\x09\xae\xeb\x2d\x37\x86\x55\x84\x1a\x53\x0b\xad\x6d\x6d\xa2\x53\xb4\x80\x25\x24\x4c\xb7\xf8\xc4\x2a\x86\xfc\xb4\x57\x5b\xb8\x64\xea\x50\x8b\xa5\x04\x50\xad\xf1\x52\x05\x57\xb9\xfe\x78\xa8\xba\x93\xf1\x35\x7a\xad\x82\xbe\xff\xa7\x36\x92\xbb\xc6\xcf\xbe\x91\x82\x3d\x90\x03\xe8\xea\xaf\x48\x06\xcb\xb5\x07\x4a\x96\x0e\xa1\xd4\x33\x5e\xdc\x67\xff\x02\x00\x00\xff\xff\x39\x9f\x6b\x23\xd9\x06\x00\x00")
|
var __1593601729_initial_schemaUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xac\x54\xc1\x8e\xdb\x20\x10\xbd\xfb\x2b\xe6\xb8\x91\x72\xe8\x7d\x4f\x4e\x96\x54\x96\x10\x6e\x13\x22\xe5\x86\x28\x9e\x5d\xa3\xb8\x78\x0b\x78\xd5\xfc\x7d\x85\x9d\x78\x93\xc5\xc5\x55\xba\x17\x4b\x1e\x1e\xa3\x79\x6f\x1e\x6f\xbd\x25\x39\x27\xc0\xf3\x15\x25\x50\x6c\x80\x95\x1c\xc8\xa1\xd8\xf1\x1d\xbc\x76\xae\x16\xa6\xf5\xfa\x59\x2b\xe9\x75\x6b\x84\x6a\x34\x1a\x2f\x1c\xda\x37\xb4\x0e\x1e\x32\x80\xd7\xee\x47\xa3\x95\x38\xe2\x09\x56\xb4\x5c\xf5\xf7\xd9\x9e\xd2\x65\x06\x60\xf1\x45\x3b\x8f\x16\x2b\x58\x95\x25\x25\x39\x83\x27\xb2\xc9\xf7\x94\xc3\x26\xa7\x3b\x72\x8b\x11\xd2\x43\xc1\xf8\xd8\x61\xc4\x7e\x09\xb8\x46\x3a\x2f\x2c\x7a\xab\xe7\x90\x01\x74\x12\xaa\xed\x4c\x0a\x25\x95\x42\xe7\x84\x6f\x8f\x68\x80\x93\x03\x0f\xc5\x3d\x2b\xbe\xef\xc9\xc3\x3b\xa7\x05\x94\x0c\xd6\x25\xdb\xd0\x62\xcd\x61\x4b\xbe\xd1\x7c\x4d\xb2\xc5\x63\x96\xdd\xa3\xdb\xaf\x0e\xad\xc6\x79\xdd\x06\x5c\x44\xf3\x72\x74\x12\xba\x8a\x2f\x45\xb3\x2f\x2f\xd8\xcf\x25\xa1\xcd\x73\x3b\xcb\x60\x70\x88\x48\x41\xb4\x71\x5e\x36\xcd\xd0\x5b\x57\xfd\x0e\x6e\x00\xd1\x86\x3e\x78\x2b\x58\xe1\x6d\x5a\xa5\xe0\x4e\xdd\x9a\xa8\x1e\x6b\xf4\x71\x8c\x65\x3c\xfa\xe7\xca\xe7\xad\x54\x47\xac\xc4\x4f\x74\x4e\xbe\x9c\xcd\x70\xfe\x99\xdc\xab\xaa\xa5\x9f\xd4\xe7\xd2\x69\x82\xff\x99\xe7\x7b\xdb\x5b\x0e\xc5\x57\x56\x6e\x49\x06\x70\x2f\x09\x17\x3e\xd7\x07\xf3\x34\x52\x56\xa8\xa5\xab\xb1\xfa\x3f\xb7\xf4\xf9\x30\x99\x0e\xff\x9e\x09\xae\xeb\x2d\x37\x86\x55\x84\x1a\x53\x0b\xad\x6d\x6d\xa2\x53\xb4\x80\x25\x24\x4c\x37\xed\xb0\xfb\xd7\x33\x64\xaa\xbd\xda\xcc\x25\x67\x87\x5a\x2c\x2f\x80\x6a\x8d\x97\x2a\x38\xcd\xf5\xc7\x43\xd5\x9d\x8c\xaf\xd1\x6b\x15\x34\xff\x3b\xdd\x91\xf0\x35\x7e\xf6\xdd\x14\xec\x89\x1c\x40\x57\xbf\x45\x32\x6c\xae\x7d\x51\xb2\x74\x30\xa5\x9e\xf6\xe2\x31\xfb\x13\x00\x00\xff\xff\xfb\x06\xc2\x3d\xed\x06\x00\x00")
|
||||||
|
|
||||||
func _1593601729_initial_schemaUpSqlBytes() ([]byte, error) {
|
func _1593601729_initial_schemaUpSqlBytes() ([]byte, error) {
|
||||||
return bindataRead(
|
return bindataRead(
|
||||||
|
@ -106,8 +106,8 @@ func _1593601729_initial_schemaUpSql() (*asset, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
info := bindataFileInfo{name: "1593601729_initial_schema.up.sql", size: 1753, mode: os.FileMode(0644), modTime: time.Unix(1595240420, 0)}
|
info := bindataFileInfo{name: "1593601729_initial_schema.up.sql", size: 1773, mode: os.FileMode(0644), modTime: time.Unix(1595256491, 0)}
|
||||||
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x28, 0xbf, 0x64, 0xe0, 0x65, 0x53, 0xd3, 0x80, 0xf4, 0x46, 0xce, 0xd6, 0x23, 0x4e, 0xc5, 0x8f, 0x80, 0x4e, 0x91, 0xa7, 0x2e, 0x9, 0x3b, 0xf4, 0x5f, 0xa1, 0xff, 0xfc, 0x6e, 0x4, 0xa2, 0xe7}}
|
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x4e, 0x1e, 0x5, 0x35, 0x9, 0xb2, 0x2d, 0x6f, 0x33, 0x63, 0xa2, 0x7a, 0x5b, 0xd2, 0x2d, 0xcb, 0x79, 0x7e, 0x6, 0xb4, 0x9d, 0x35, 0xd8, 0x9b, 0x55, 0xe5, 0xf8, 0x44, 0xca, 0xa6, 0xf3, 0xd3}}
|
||||||
return a, nil
|
return a, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -41,7 +41,7 @@ CREATE TABLE IF NOT EXISTS push_notification_client_sent_notifications (
|
||||||
retry_count INT NOT NULL DEFAULT 0,
|
retry_count INT NOT NULL DEFAULT 0,
|
||||||
success BOOLEAN NOT NULL DEFAULT FALSE,
|
success BOOLEAN NOT NULL DEFAULT FALSE,
|
||||||
error INT NOT NULL DEFAULT 0,
|
error INT NOT NULL DEFAULT 0,
|
||||||
UNIQUE(message_id, public_key, installation_id)
|
UNIQUE(message_id, public_key, installation_id) ON CONFLICT REPLACE
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS push_notification_client_registrations (
|
CREATE TABLE IF NOT EXISTS push_notification_client_registrations (
|
||||||
|
|
|
@ -273,6 +273,7 @@ func (s *MessengerPushNotificationSuite) TestReceivePushNotification() {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
s.Require().NoError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *MessengerPushNotificationSuite) TestReceivePushNotificationFromContactOnly() {
|
func (s *MessengerPushNotificationSuite) TestReceivePushNotificationFromContactOnly() {
|
||||||
|
@ -400,6 +401,7 @@ func (s *MessengerPushNotificationSuite) TestReceivePushNotificationFromContactO
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
s.Require().NoError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *MessengerPushNotificationSuite) TestReceivePushNotificationRetries() {
|
func (s *MessengerPushNotificationSuite) TestReceivePushNotificationRetries() {
|
||||||
|
@ -476,13 +478,39 @@ func (s *MessengerPushNotificationSuite) TestReceivePushNotificationRetries() {
|
||||||
chat := CreateOneToOneChat(pkString, &s.m.identity.PublicKey, alice.transport)
|
chat := CreateOneToOneChat(pkString, &s.m.identity.PublicKey, alice.transport)
|
||||||
s.Require().NoError(alice.SaveChat(&chat))
|
s.Require().NoError(alice.SaveChat(&chat))
|
||||||
inputMessage := buildTestMessage(chat)
|
inputMessage := buildTestMessage(chat)
|
||||||
response, err := alice.SendChatMessage(context.Background(), inputMessage)
|
_, err = alice.SendChatMessage(context.Background(), inputMessage)
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
messageIDString := response.Messages[0].ID
|
// We check that alice retrieves the info from the server
|
||||||
messageID, err := hex.DecodeString(messageIDString[2:])
|
var info []*push_notification_client.PushNotificationInfo
|
||||||
|
err = tt.RetryWithBackOff(func() error {
|
||||||
|
_, err = server.RetrieveAll()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = alice.RetrieveAll()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
info, err = alice.pushNotificationClient.GetPushNotificationInfo(&bob.identity.PublicKey, bobInstallationIDs)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Check we have replies for bob
|
||||||
|
if len(info) != 1 {
|
||||||
|
return errors.New("info not fetched")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
|
||||||
|
})
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
s.Require().NotNil(info)
|
||||||
|
s.Require().Equal(bob.installationID, info[0].InstallationID)
|
||||||
|
s.Require().Equal(bobServers[0].AccessToken, info[0].AccessToken)
|
||||||
|
s.Require().Equal(&bob.identity.PublicKey, info[0].PublicKey)
|
||||||
|
|
||||||
// The message has been sent, but not received, now we remove a contact so that the token is invalidated
|
// The message has been sent, but not received, now we remove a contact so that the token is invalidated
|
||||||
frankContact = &Contact{
|
frankContact = &Contact{
|
||||||
ID: types.EncodeHex(crypto.FromECDSAPub(&frank.identity.PublicKey)),
|
ID: types.EncodeHex(crypto.FromECDSAPub(&frank.identity.PublicKey)),
|
||||||
|
@ -518,7 +546,14 @@ func (s *MessengerPushNotificationSuite) TestReceivePushNotificationRetries() {
|
||||||
// Make sure access token is not the same
|
// Make sure access token is not the same
|
||||||
s.Require().NotEqual(newBobServers[0].AccessToken, bobServers[0].AccessToken)
|
s.Require().NotEqual(newBobServers[0].AccessToken, bobServers[0].AccessToken)
|
||||||
|
|
||||||
var info []*push_notification_client.PushNotificationInfo
|
// Send another message, here the token will not be valid
|
||||||
|
inputMessage = buildTestMessage(chat)
|
||||||
|
response, err := alice.SendChatMessage(context.Background(), inputMessage)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
messageIDString := response.Messages[0].ID
|
||||||
|
messageID, err := hex.DecodeString(messageIDString[2:])
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
err = tt.RetryWithBackOff(func() error {
|
err = tt.RetryWithBackOff(func() error {
|
||||||
_, err = server.RetrieveAll()
|
_, err = server.RetrieveAll()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -537,6 +572,9 @@ func (s *MessengerPushNotificationSuite) TestReceivePushNotificationRetries() {
|
||||||
if len(info) != 1 {
|
if len(info) != 1 {
|
||||||
return errors.New("info not fetched")
|
return errors.New("info not fetched")
|
||||||
}
|
}
|
||||||
|
if newBobServers[0].AccessToken != info[0].AccessToken {
|
||||||
|
return errors.New("still using the old access token")
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
})
|
})
|
||||||
|
@ -574,4 +612,6 @@ func (s *MessengerPushNotificationSuite) TestReceivePushNotificationRetries() {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
s.Require().NoError(err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue