From 20fb8607cbe11d54dc75dce5194c1696a233cc16 Mon Sep 17 00:00:00 2001 From: Andrea Maria Piana Date: Fri, 10 Jul 2020 15:26:06 +0200 Subject: [PATCH] keep track of queries & actually send pns, hurray --- protocol/common/message_processor.go | 1 + protocol/messenger.go | 17 +- protocol/protobuf/push_notifications.pb.go | 130 ++++----- protocol/protobuf/push_notifications.proto | 6 +- .../1593601729_initial_schema.up.sql | 16 -- .../1593601729_initial_schema.down.sql | 0 .../sql/1593601729_initial_schema.up.sql | 40 +++ .../migrations/{ => sql}/doc.go | 2 +- .../push_notification_client/persistence.go | 122 +++++++- .../persistence_test.go | 66 +++++ .../push_notification.go | 262 +++++++++++++----- .../push_notification_test.go | 119 ++++++-- .../1593601728_initial_schema.down.sql | 0 .../1593601728_initial_schema.up.sql | 0 .../migrations/{ => sql}/doc.go | 2 +- .../push_notification_server/persistence.go | 22 ++ protocol/push_notification_server/server.go | 15 + protocol/push_notification_test.go | 43 +-- 18 files changed, 655 insertions(+), 208 deletions(-) delete mode 100644 protocol/push_notification_client/migrations/1593601729_initial_schema.up.sql rename protocol/push_notification_client/migrations/{ => sql}/1593601729_initial_schema.down.sql (100%) create mode 100644 protocol/push_notification_client/migrations/sql/1593601729_initial_schema.up.sql rename protocol/push_notification_client/migrations/{ => sql}/doc.go (83%) rename protocol/push_notification_server/migrations/{ => sql}/1593601728_initial_schema.down.sql (100%) rename protocol/push_notification_server/migrations/{ => sql}/1593601728_initial_schema.up.sql (100%) rename protocol/push_notification_server/migrations/{ => sql}/doc.go (83%) diff --git a/protocol/common/message_processor.go b/protocol/common/message_processor.go index eaa6246e8..495e6f715 100644 --- a/protocol/common/message_processor.go +++ b/protocol/common/message_processor.go @@ -388,6 +388,7 @@ func (p *MessageProcessor) addToDataSync(publicKey *ecdsa.PublicKey, message []b // sendDataSync sends a message scheduled by the data sync layer. // Data Sync layer calls this method "dispatch" function. func (p *MessageProcessor) sendDataSync(ctx context.Context, publicKey *ecdsa.PublicKey, encodedMessage []byte, payload *datasyncproto.Payload) error { + // Calculate the messageIDs messageIDs := make([][]byte, 0, len(payload.Messages)) for _, payload := range payload.Messages { messageIDs = append(messageIDs, v1protocol.MessageID(&p.identity.PublicKey, payload.Body)) diff --git a/protocol/messenger.go b/protocol/messenger.go index 9778bd8f4..5a8378f1a 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -287,6 +287,7 @@ func NewMessenger( verifyTransactionClient: c.verifyTransactionClient, shutdownTasks: []func() error{ database.Close, + pushNotificationClient.Stop, transp.ResetFilters, transp.Stop, func() error { processor.Stop(); return nil }, @@ -303,6 +304,18 @@ func NewMessenger( } func (m *Messenger) Start() error { + // Start push notification server + if m.pushNotificationServer != nil { + if err := m.pushNotificationServer.Start(); err != nil { + return err + } + } + if m.pushNotificationClient != nil { + if err := m.pushNotificationClient.Start(); err != nil { + return err + } + } + return m.encryptor.Start(m.identity) } @@ -1394,7 +1407,7 @@ func (m *Messenger) SendChatMessage(ctx context.Context, message *Message) (*Mes // If the chat is not public, we instruct the pushNotificationService to send a notification if !chat.Public() && m.pushNotificationClient != nil { - if err := m.pushNotificationClient.NotifyOnMessageID(id); err != nil { + if err := m.pushNotificationClient.NotifyOnMessageID(chat.ID, id); err != nil { return nil, err } @@ -1963,7 +1976,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte } logger.Debug("Handling PushNotificationQueryResponse") // TODO: Compare DST with Identity - if err := m.pushNotificationClient.HandlePushNotificationQueryResponse(msg.ParsedMessage.(protobuf.PushNotificationQueryResponse)); err != nil { + if err := m.pushNotificationClient.HandlePushNotificationQueryResponse(publicKey, msg.ParsedMessage.(protobuf.PushNotificationQueryResponse)); err != nil { logger.Warn("failed to handle PushNotificationQueryResponse", zap.Error(err)) } // We continue in any case, no changes to messenger diff --git a/protocol/protobuf/push_notifications.pb.go b/protocol/protobuf/push_notifications.pb.go index 12dc55a40..c59a9016a 100644 --- a/protocol/protobuf/push_notifications.pb.go +++ b/protocol/protobuf/push_notifications.pb.go @@ -595,8 +595,7 @@ func (m *PushNotification) GetMessage() []byte { type PushNotificationRequest struct { Requests []*PushNotification `protobuf:"bytes,1,rep,name=requests,proto3" json:"requests,omitempty"` - MessageId string `protobuf:"bytes,2,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"` - AckRequired string `protobuf:"bytes,3,opt,name=ack_required,json=ackRequired,proto3" json:"ack_required,omitempty"` + MessageId []byte `protobuf:"bytes,2,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -634,18 +633,11 @@ func (m *PushNotificationRequest) GetRequests() []*PushNotification { return nil } -func (m *PushNotificationRequest) GetMessageId() string { +func (m *PushNotificationRequest) GetMessageId() []byte { if m != nil { return m.MessageId } - return "" -} - -func (m *PushNotificationRequest) GetAckRequired() string { - if m != nil { - return m.AckRequired - } - return "" + return nil } type PushNotificationReport struct { @@ -711,9 +703,8 @@ func (m *PushNotificationReport) GetInstallationId() string { return "" } -// TOP LEVEL type PushNotificationResponse struct { - MessageId string `protobuf:"bytes,1,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"` + MessageId []byte `protobuf:"bytes,1,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"` Reports []*PushNotificationReport `protobuf:"bytes,2,rep,name=reports,proto3" json:"reports,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` @@ -745,11 +736,11 @@ func (m *PushNotificationResponse) XXX_DiscardUnknown() { var xxx_messageInfo_PushNotificationResponse proto.InternalMessageInfo -func (m *PushNotificationResponse) GetMessageId() string { +func (m *PushNotificationResponse) GetMessageId() []byte { if m != nil { return m.MessageId } - return "" + return nil } func (m *PushNotificationResponse) GetReports() []*PushNotificationReport { @@ -779,59 +770,58 @@ func init() { func init() { proto.RegisterFile("push_notifications.proto", fileDescriptor_200acd86044eaa5d) } var fileDescriptor_200acd86044eaa5d = []byte{ - // 857 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x95, 0x51, 0x6f, 0xe3, 0x44, - 0x10, 0xc7, 0x59, 0x27, 0x6d, 0xe2, 0x49, 0x68, 0x73, 0xab, 0x5e, 0xcf, 0x9c, 0x38, 0xc8, 0x19, - 0x24, 0xa2, 0x43, 0x8a, 0x50, 0x91, 0xe0, 0xc4, 0x13, 0xa1, 0x75, 0x8b, 0xd5, 0xc6, 0x0e, 0x1b, - 0x97, 0x13, 0x12, 0x92, 0xe5, 0xc4, 0xdb, 0x8b, 0x95, 0x9c, 0x1d, 0x76, 0xd7, 0x87, 0xf2, 0x80, - 0xc4, 0x27, 0x40, 0x42, 0xbc, 0xf1, 0x29, 0xd0, 0x7d, 0x07, 0xbe, 0x17, 0xf2, 0xda, 0x0e, 0x8e, - 0xe3, 0x4b, 0xf3, 0x70, 0x4f, 0xf1, 0xfc, 0x77, 0x66, 0x77, 0xe7, 0x37, 0x3b, 0x13, 0xd0, 0x96, - 0x31, 0x9f, 0xb9, 0x61, 0x24, 0x82, 0xbb, 0x60, 0xea, 0x89, 0x20, 0x0a, 0x79, 0x7f, 0xc9, 0x22, - 0x11, 0xe1, 0xa6, 0xfc, 0x99, 0xc4, 0x77, 0xfa, 0xbf, 0x35, 0xf8, 0x70, 0x14, 0xf3, 0x99, 0x55, - 0xf0, 0x22, 0xf4, 0x65, 0xc0, 0x05, 0x93, 0xdf, 0xd8, 0x06, 0x10, 0xd1, 0x9c, 0x86, 0xae, 0x58, - 0x2d, 0xa9, 0x86, 0xba, 0xa8, 0x77, 0x74, 0xf6, 0x45, 0x3f, 0x8f, 0xef, 0xef, 0x8a, 0xed, 0x3b, - 0x49, 0xa0, 0xb3, 0x5a, 0x52, 0xa2, 0x8a, 0xfc, 0x13, 0x9f, 0xc0, 0x81, 0x34, 0x34, 0xa5, 0x8b, - 0x7a, 0x2a, 0x49, 0x0d, 0xfc, 0x19, 0x1c, 0x07, 0x21, 0x17, 0xde, 0x62, 0x21, 0x43, 0xdd, 0xc0, - 0xd7, 0x6a, 0x72, 0xfd, 0xa8, 0x28, 0x9b, 0x3e, 0x7e, 0x0a, 0x6d, 0x6f, 0x3a, 0xa5, 0x9c, 0xbb, - 0xe9, 0x2e, 0x75, 0xe9, 0xd5, 0x4a, 0x35, 0x79, 0x20, 0xd6, 0xa0, 0x41, 0x43, 0x6f, 0xb2, 0xa0, - 0xbe, 0x76, 0xd0, 0x45, 0xbd, 0x26, 0xc9, 0xcd, 0x64, 0xe5, 0x35, 0x65, 0x3c, 0x88, 0x42, 0xed, - 0xb0, 0x8b, 0x7a, 0x75, 0x92, 0x9b, 0xf8, 0x19, 0x3c, 0xf0, 0x16, 0x8b, 0xe8, 0x57, 0xea, 0xbb, - 0x31, 0xa7, 0xcc, 0x5d, 0x04, 0x5c, 0x68, 0x8d, 0x6e, 0xad, 0xd7, 0x26, 0xc7, 0xd9, 0xc2, 0x2d, - 0xa7, 0xec, 0x26, 0xe0, 0x22, 0xf1, 0x9d, 0x2c, 0xa2, 0xe9, 0x9c, 0xfa, 0xee, 0x74, 0xe6, 0x89, - 0xd4, 0xb7, 0x99, 0xfa, 0x66, 0x0b, 0xe7, 0x33, 0x4f, 0x48, 0xdf, 0x8f, 0x00, 0xe2, 0x90, 0x49, - 0x28, 0x94, 0x69, 0xaa, 0xbc, 0x4e, 0x41, 0xd1, 0x2f, 0x41, 0x5d, 0x53, 0xc2, 0xa7, 0x80, 0x6f, - 0xad, 0x6b, 0xcb, 0x7e, 0x61, 0xb9, 0x8e, 0x7d, 0x6d, 0x58, 0xae, 0xf3, 0xd3, 0xc8, 0xe8, 0xbc, - 0x87, 0xdf, 0x07, 0x75, 0x30, 0xca, 0xb4, 0x0e, 0xc2, 0x18, 0x8e, 0x2e, 0x4d, 0x62, 0x7c, 0x37, - 0x18, 0x1b, 0x99, 0xa6, 0xe8, 0x6f, 0x14, 0xf8, 0x74, 0x57, 0x2d, 0x08, 0xe5, 0xcb, 0x28, 0xe4, - 0x34, 0x41, 0xc0, 0x63, 0x09, 0x4b, 0x16, 0xb3, 0x49, 0x72, 0x13, 0x5b, 0x70, 0x40, 0x19, 0x8b, - 0x98, 0x2c, 0xcc, 0xd1, 0xd9, 0xf3, 0xfd, 0x8a, 0x9c, 0x6f, 0xdc, 0x37, 0x92, 0x58, 0x59, 0xec, - 0x74, 0x1b, 0xfc, 0x04, 0x80, 0xd1, 0x5f, 0x62, 0xca, 0x45, 0x5e, 0xcd, 0x36, 0x51, 0x33, 0xc5, - 0xf4, 0xf5, 0xdf, 0x11, 0xa8, 0xeb, 0x98, 0x62, 0xea, 0x06, 0x21, 0x36, 0xc9, 0x53, 0x7f, 0x08, - 0x0f, 0x86, 0x83, 0x9b, 0x4b, 0x9b, 0x0c, 0x8d, 0x0b, 0x77, 0x68, 0x8c, 0xc7, 0x83, 0x2b, 0xa3, - 0x83, 0xf0, 0x09, 0x74, 0x7e, 0x34, 0xc8, 0xd8, 0xb4, 0x2d, 0x77, 0x68, 0x8e, 0x87, 0x03, 0xe7, - 0xfc, 0xfb, 0x8e, 0x82, 0x1f, 0xc3, 0xe9, 0xad, 0x35, 0xbe, 0x1d, 0x8d, 0x6c, 0xe2, 0x18, 0x17, - 0x45, 0x86, 0xb5, 0x04, 0x9a, 0x69, 0x39, 0x06, 0xb1, 0x06, 0x37, 0xe9, 0x09, 0x9d, 0xba, 0xfe, - 0x07, 0x82, 0xa7, 0xe5, 0xdc, 0x06, 0xfe, 0x6b, 0xca, 0x44, 0xc0, 0xe9, 0x2b, 0x1a, 0x0a, 0x33, - 0xbc, 0x8b, 0x92, 0x3c, 0x96, 0xf1, 0x64, 0x11, 0x4c, 0xdd, 0x39, 0x5d, 0x49, 0x68, 0x6d, 0xa2, - 0xa6, 0xca, 0x35, 0x5d, 0x6d, 0x3d, 0x48, 0x65, 0xfb, 0x41, 0xee, 0xfb, 0xb8, 0xf5, 0xdf, 0x40, - 0x3b, 0x8f, 0x42, 0xe1, 0x4d, 0xc5, 0x79, 0xe4, 0xd3, 0x8d, 0xab, 0x60, 0x0f, 0x4e, 0xb7, 0xfa, - 0xd9, 0x0d, 0xc2, 0xbb, 0x48, 0x43, 0xdd, 0x5a, 0xaf, 0x75, 0xf6, 0xf9, 0xdb, 0xeb, 0xb5, 0x95, - 0x13, 0x39, 0x59, 0x96, 0x5c, 0x12, 0x55, 0x7f, 0x0e, 0x0f, 0xcb, 0xa1, 0x3f, 0xc4, 0x94, 0xad, - 0xf0, 0xc7, 0xd0, 0xfa, 0x1f, 0x01, 0x97, 0x07, 0xb6, 0x09, 0xac, 0x19, 0x70, 0xfd, 0x0d, 0x82, - 0x0f, 0x2a, 0x43, 0x25, 0xc1, 0x32, 0x22, 0xb4, 0x17, 0x22, 0xa5, 0xb2, 0xff, 0x37, 0xab, 0x51, - 0x2b, 0x57, 0xa3, 0xb2, 0x8f, 0xeb, 0x95, 0x7d, 0xac, 0xff, 0x89, 0xe0, 0x49, 0xe5, 0xa5, 0xd7, - 0xcd, 0xf2, 0x35, 0xd4, 0x0b, 0x84, 0x3f, 0x79, 0x3b, 0xe1, 0x75, 0xae, 0x44, 0x06, 0x24, 0xb7, - 0x7c, 0x45, 0x39, 0xf7, 0x5e, 0xd2, 0x3c, 0x93, 0x36, 0x51, 0x33, 0xc5, 0xf4, 0x8b, 0x4d, 0x58, - 0xdb, 0x68, 0x42, 0xfd, 0x1f, 0x04, 0x9d, 0xf2, 0xe6, 0xfb, 0xf0, 0x7b, 0x04, 0x0d, 0x39, 0x8b, - 0xd6, 0xdc, 0x0e, 0x13, 0xf3, 0x7e, 0x5e, 0x15, 0xdc, 0xeb, 0x95, 0xdc, 0x35, 0x68, 0x64, 0xf7, - 0x97, 0x43, 0xb5, 0x4d, 0x72, 0x53, 0xff, 0x0b, 0xc1, 0xa3, 0xed, 0x09, 0x21, 0xdb, 0x1c, 0x7f, - 0x05, 0xcd, 0xac, 0xe3, 0x79, 0x06, 0xf1, 0xf1, 0x8e, 0xb1, 0xb2, 0xf6, 0xad, 0xe0, 0xa7, 0x16, - 0xf9, 0x49, 0x20, 0x73, 0x37, 0x71, 0x0f, 0x18, 0xcd, 0xbb, 0xa9, 0xe5, 0x4d, 0xe7, 0x24, 0x93, - 0xf4, 0xbf, 0x15, 0x38, 0xdd, 0xbe, 0xd5, 0x32, 0x62, 0x62, 0xc7, 0x08, 0xfc, 0x76, 0x73, 0x04, - 0x3e, 0xdb, 0x35, 0x02, 0x93, 0xad, 0x2a, 0x87, 0xde, 0xbb, 0xc0, 0xad, 0xff, 0xbc, 0xcf, 0x70, - 0x3c, 0x86, 0xd6, 0x0b, 0x62, 0x5b, 0x57, 0xc5, 0x7f, 0x86, 0xd2, 0x90, 0x53, 0x12, 0xcd, 0xb2, - 0x1d, 0x97, 0x18, 0x57, 0xe6, 0xd8, 0x31, 0x88, 0x71, 0xd1, 0xa9, 0xe9, 0x31, 0x68, 0xdb, 0x09, - 0x65, 0x6f, 0x7e, 0x13, 0x3d, 0x2a, 0xa3, 0xff, 0x06, 0x1a, 0x4c, 0xe6, 0xce, 0x35, 0x45, 0x16, - 0xb4, 0x7b, 0x1f, 0x24, 0x92, 0x07, 0x4c, 0x0e, 0xa5, 0xe7, 0x97, 0xff, 0x05, 0x00, 0x00, 0xff, - 0xff, 0x8b, 0x53, 0x27, 0x32, 0x99, 0x08, 0x00, 0x00, + // 836 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x94, 0x51, 0x6f, 0xe3, 0x44, + 0x10, 0xc7, 0xb1, 0x9d, 0x36, 0xf1, 0x34, 0xb4, 0xbe, 0x55, 0xaf, 0x67, 0x4e, 0x1c, 0xe4, 0x0c, + 0x12, 0xd1, 0x21, 0x45, 0xa8, 0x48, 0x70, 0xe2, 0x89, 0xd0, 0xba, 0xc5, 0x6a, 0x63, 0x87, 0x8d, + 0xc3, 0x09, 0x09, 0xc9, 0x72, 0xe2, 0xcd, 0xc5, 0xaa, 0xcf, 0x36, 0xde, 0xf5, 0xa1, 0x3c, 0x20, + 0xf1, 0x09, 0x90, 0x78, 0xe5, 0x53, 0xa0, 0xfb, 0x0e, 0x7c, 0x2f, 0xe4, 0xb5, 0x1d, 0x1c, 0xc7, + 0x4d, 0xf3, 0xc0, 0x93, 0x3d, 0xb3, 0x33, 0xbb, 0x3b, 0xbf, 0xd9, 0xf9, 0x83, 0x1a, 0xa7, 0x74, + 0xe9, 0x84, 0x11, 0xf3, 0x17, 0xfe, 0xdc, 0x65, 0x7e, 0x14, 0xd2, 0x41, 0x9c, 0x44, 0x2c, 0x42, + 0x1d, 0xfe, 0x99, 0xa5, 0x0b, 0xed, 0x1f, 0x09, 0x3e, 0x1c, 0xa7, 0x74, 0x69, 0x56, 0xa2, 0x30, + 0x79, 0xed, 0x53, 0x96, 0xf0, 0x7f, 0x64, 0x01, 0xb0, 0xe8, 0x8e, 0x84, 0x0e, 0x5b, 0xc5, 0x44, + 0x15, 0x7a, 0x42, 0xff, 0xf8, 0xfc, 0x8b, 0x41, 0x99, 0x3f, 0xd8, 0x95, 0x3b, 0xb0, 0xb3, 0x44, + 0x7b, 0x15, 0x13, 0x2c, 0xb3, 0xf2, 0x17, 0x9d, 0xc2, 0x01, 0x37, 0x54, 0xb1, 0x27, 0xf4, 0x65, + 0x9c, 0x1b, 0xe8, 0x33, 0x38, 0xf1, 0x43, 0xca, 0xdc, 0x20, 0xe0, 0xa9, 0x8e, 0xef, 0xa9, 0x12, + 0x5f, 0x3f, 0xae, 0xba, 0x0d, 0x0f, 0x3d, 0x87, 0xae, 0x3b, 0x9f, 0x13, 0x4a, 0x9d, 0x7c, 0x97, + 0x16, 0x8f, 0x3a, 0xca, 0x7d, 0xfc, 0x40, 0xa4, 0x42, 0x9b, 0x84, 0xee, 0x2c, 0x20, 0x9e, 0x7a, + 0xd0, 0x13, 0xfa, 0x1d, 0x5c, 0x9a, 0xd9, 0xca, 0x5b, 0x92, 0x50, 0x3f, 0x0a, 0xd5, 0xc3, 0x9e, + 0xd0, 0x6f, 0xe1, 0xd2, 0x44, 0x2f, 0xe0, 0x91, 0x1b, 0x04, 0xd1, 0xaf, 0xc4, 0x73, 0x52, 0x4a, + 0x12, 0x27, 0xf0, 0x29, 0x53, 0xdb, 0x3d, 0xa9, 0xdf, 0xc5, 0x27, 0xc5, 0xc2, 0x94, 0x92, 0xe4, + 0xd6, 0xa7, 0x2c, 0x8b, 0x9d, 0x05, 0xd1, 0xfc, 0x8e, 0x78, 0xce, 0x7c, 0xe9, 0xb2, 0x3c, 0xb6, + 0x93, 0xc7, 0x16, 0x0b, 0x17, 0x4b, 0x97, 0xf1, 0xd8, 0x8f, 0x00, 0xd2, 0x30, 0xe1, 0x50, 0x48, + 0xa2, 0xca, 0xfc, 0x3a, 0x15, 0x8f, 0x76, 0x05, 0xf2, 0x9a, 0x12, 0x3a, 0x03, 0x34, 0x35, 0x6f, + 0x4c, 0xeb, 0x95, 0xe9, 0xd8, 0xd6, 0x8d, 0x6e, 0x3a, 0xf6, 0x4f, 0x63, 0x5d, 0x79, 0x0f, 0xbd, + 0x0f, 0xf2, 0x70, 0x5c, 0xf8, 0x14, 0x01, 0x21, 0x38, 0xbe, 0x32, 0xb0, 0xfe, 0xdd, 0x70, 0xa2, + 0x17, 0x3e, 0x51, 0x7b, 0x27, 0xc2, 0xa7, 0xbb, 0x7a, 0x81, 0x09, 0x8d, 0xa3, 0x90, 0x92, 0x0c, + 0x01, 0x4d, 0x39, 0x2c, 0xde, 0xcc, 0x0e, 0x2e, 0x4d, 0x64, 0xc2, 0x01, 0x49, 0x92, 0x28, 0xe1, + 0x8d, 0x39, 0x3e, 0x7f, 0xb9, 0x5f, 0x93, 0xcb, 0x8d, 0x07, 0x7a, 0x96, 0xcb, 0x9b, 0x9d, 0x6f, + 0x83, 0x9e, 0x01, 0x24, 0xe4, 0x97, 0x94, 0x50, 0x56, 0x76, 0xb3, 0x8b, 0xe5, 0xc2, 0x63, 0x78, + 0xda, 0xef, 0x02, 0xc8, 0xeb, 0x9c, 0x6a, 0xe9, 0x3a, 0xc6, 0x16, 0x2e, 0x4b, 0x7f, 0x0c, 0x8f, + 0x46, 0xc3, 0xdb, 0x2b, 0x0b, 0x8f, 0xf4, 0x4b, 0x67, 0xa4, 0x4f, 0x26, 0xc3, 0x6b, 0x5d, 0x11, + 0xd0, 0x29, 0x28, 0x3f, 0xea, 0x78, 0x62, 0x58, 0xa6, 0x33, 0x32, 0x26, 0xa3, 0xa1, 0x7d, 0xf1, + 0xbd, 0x22, 0xa2, 0xa7, 0x70, 0x36, 0x35, 0x27, 0xd3, 0xf1, 0xd8, 0xc2, 0xb6, 0x7e, 0x59, 0x65, + 0x28, 0x65, 0xd0, 0x0c, 0xd3, 0xd6, 0xb1, 0x39, 0xbc, 0xcd, 0x4f, 0x50, 0x5a, 0xda, 0x1f, 0x02, + 0x3c, 0xaf, 0xd7, 0x36, 0xf4, 0xde, 0x92, 0x84, 0xf9, 0x94, 0xbc, 0x21, 0x21, 0x33, 0xc2, 0x45, + 0x94, 0xd5, 0x11, 0xa7, 0xb3, 0xc0, 0x9f, 0x3b, 0x77, 0x64, 0xc5, 0xa1, 0x75, 0xb1, 0x9c, 0x7b, + 0x6e, 0xc8, 0x6a, 0xeb, 0x41, 0x8a, 0xdb, 0x0f, 0x72, 0xdf, 0xc7, 0xad, 0xfd, 0x06, 0xea, 0x45, + 0x14, 0x32, 0x77, 0xce, 0x2e, 0x22, 0x8f, 0x6c, 0x5c, 0x05, 0xb9, 0x70, 0xb6, 0x35, 0xcf, 0x8e, + 0x1f, 0x2e, 0x22, 0x55, 0xe8, 0x49, 0xfd, 0xa3, 0xf3, 0xcf, 0xef, 0xef, 0xd7, 0x56, 0x4d, 0xf8, + 0x34, 0xae, 0x85, 0x64, 0x5e, 0xed, 0x25, 0x3c, 0xae, 0xa7, 0xfe, 0x90, 0x92, 0x64, 0x85, 0x3e, + 0x86, 0xa3, 0xff, 0x10, 0x50, 0x7e, 0x60, 0x17, 0xc3, 0x9a, 0x01, 0xd5, 0xde, 0x09, 0xf0, 0x41, + 0x63, 0x2a, 0x27, 0x58, 0x47, 0x24, 0xec, 0x85, 0x48, 0x6c, 0x9c, 0xff, 0xcd, 0x6e, 0x48, 0xf5, + 0x6e, 0x34, 0xce, 0x71, 0xab, 0x71, 0x8e, 0xb5, 0x3f, 0x05, 0x78, 0xd6, 0x78, 0xe9, 0xf5, 0xb0, + 0x7c, 0x0d, 0xad, 0x0a, 0xe1, 0x4f, 0xee, 0x27, 0xbc, 0xae, 0x15, 0xf3, 0x84, 0xec, 0x96, 0x6f, + 0x08, 0xa5, 0xee, 0x6b, 0x52, 0x56, 0xd2, 0xc5, 0x72, 0xe1, 0x31, 0xbc, 0xea, 0x10, 0x4a, 0x1b, + 0x43, 0xa8, 0xfd, 0x2d, 0x80, 0x52, 0xdf, 0x7c, 0x1f, 0x7e, 0x4f, 0xa0, 0xcd, 0xb5, 0x68, 0xcd, + 0xed, 0x30, 0x33, 0x1f, 0xe6, 0xd5, 0xc0, 0xbd, 0xd5, 0xc8, 0x5d, 0x85, 0x76, 0x71, 0x7f, 0x2e, + 0xaa, 0x5d, 0x5c, 0x9a, 0x5a, 0x0c, 0x4f, 0xb6, 0x05, 0x82, 0x4f, 0x39, 0xfa, 0x0a, 0x3a, 0xc5, + 0xc0, 0xd3, 0x82, 0xe1, 0xd3, 0x1d, 0xaa, 0xb2, 0x8e, 0x7d, 0x00, 0x9f, 0xf6, 0x97, 0x08, 0x67, + 0xdb, 0x47, 0xc6, 0x51, 0xc2, 0x76, 0xc8, 0xdb, 0xb7, 0x9b, 0xf2, 0xf6, 0x62, 0x97, 0xbc, 0x65, + 0x5b, 0x35, 0x0a, 0xda, 0xff, 0x81, 0x52, 0xfb, 0x79, 0x1f, 0xe1, 0x3b, 0x81, 0xa3, 0x57, 0xd8, + 0x32, 0xaf, 0xab, 0xaa, 0x5f, 0x13, 0x30, 0x31, 0xf3, 0x99, 0x96, 0xed, 0x60, 0xfd, 0xda, 0x98, + 0xd8, 0x3a, 0xd6, 0x2f, 0x15, 0x49, 0x4b, 0x41, 0xdd, 0x2e, 0xa8, 0x78, 0xcf, 0x9b, 0x5c, 0x85, + 0xfa, 0xb3, 0xfc, 0x06, 0xda, 0x09, 0xaf, 0x9d, 0xaa, 0x22, 0xef, 0x56, 0xef, 0x21, 0x48, 0xb8, + 0x4c, 0x98, 0x1d, 0xf2, 0xc8, 0x2f, 0xff, 0x0d, 0x00, 0x00, 0xff, 0xff, 0x9e, 0x28, 0xe3, 0xa0, + 0x75, 0x08, 0x00, 0x00, } diff --git a/protocol/protobuf/push_notifications.proto b/protocol/protobuf/push_notifications.proto index fbcd74dc2..c8342ebcd 100644 --- a/protocol/protobuf/push_notifications.proto +++ b/protocol/protobuf/push_notifications.proto @@ -70,8 +70,7 @@ message PushNotification { message PushNotificationRequest { repeated PushNotification requests = 1; - string message_id = 2; - string ack_required = 3; + bytes message_id = 2; } message PushNotificationReport { @@ -87,8 +86,7 @@ message PushNotificationReport { string installation_id = 4; } -// TOP LEVEL message PushNotificationResponse { - string message_id = 1; + bytes message_id = 1; repeated PushNotificationReport reports = 2; } diff --git a/protocol/push_notification_client/migrations/1593601729_initial_schema.up.sql b/protocol/push_notification_client/migrations/1593601729_initial_schema.up.sql deleted file mode 100644 index bbf4ebf71..000000000 --- a/protocol/push_notification_client/migrations/1593601729_initial_schema.up.sql +++ /dev/null @@ -1,16 +0,0 @@ -CREATE TABLE IF NOT EXISTS push_notification_client_servers ( - public_key BLOB NOT NULL, - registered BOOLEAN DEFAULT FALSE, - registered_at INT NOT NULL DEFAULT 0, - access_token TEXT, - UNIQUE(public_key) ON CONFLICT REPLACE -); - -CREATE TABLE IF NOT EXISTS push_notification_client_info ( - public_key BLOB NOT NULL, - installation_id TEXT NOT NULL, - access_token TEXT NOT NULL, - UNIQUE(public_key, installation_id) ON CONFLICT REPLACE -); - -CREATE INDEX idx_push_notification_client_info_public_key ON push_notification_client_info(public_key, installation_id); diff --git a/protocol/push_notification_client/migrations/1593601729_initial_schema.down.sql b/protocol/push_notification_client/migrations/sql/1593601729_initial_schema.down.sql similarity index 100% rename from protocol/push_notification_client/migrations/1593601729_initial_schema.down.sql rename to protocol/push_notification_client/migrations/sql/1593601729_initial_schema.down.sql diff --git a/protocol/push_notification_client/migrations/sql/1593601729_initial_schema.up.sql b/protocol/push_notification_client/migrations/sql/1593601729_initial_schema.up.sql new file mode 100644 index 000000000..afb671961 --- /dev/null +++ b/protocol/push_notification_client/migrations/sql/1593601729_initial_schema.up.sql @@ -0,0 +1,40 @@ +CREATE TABLE IF NOT EXISTS push_notification_client_servers ( + public_key BLOB NOT NULL, + registered BOOLEAN DEFAULT FALSE, + registered_at INT NOT NULL DEFAULT 0, + access_token TEXT, + UNIQUE(public_key) ON CONFLICT REPLACE +); + +CREATE TABLE IF NOT EXISTS push_notification_client_queries ( + public_key BLOB NOT NULL, + queried_at INT NOT NULL, + query_id BLOB NOT NULL, + UNIQUE(public_key,query_id) ON CONFLICT REPLACE +); + +CREATE TABLE IF NOT EXISTS push_notification_client_info ( + public_key BLOB NOT NULL, + server_public_key BLOB NOT NULL, + installation_id TEXT NOT NULL, + access_token TEXT NOT NULL, + retrieved_at INT NOT NULL, + UNIQUE(public_key, installation_id, server_public_key) ON CONFLICT REPLACE +); + +CREATE TABLE IF NOT EXISTS push_notification_client_tracked_messages ( + message_id BLOB NOT NULL, + chat_id TEXT NOT NULL, + tracked_at INT NOT NULL, + UNIQUE(message_id) ON CONFLICT IGNORE + ); + +CREATE TABLE IF NOT EXISTS push_notification_client_sent_notifications ( + message_id BLOB NOT NULL, + public_key BLOB NOT NULL, + installation_id TEXT NOT NULL, + sent_at INT NOT NULL, + UNIQUE(message_id, public_key, installation_id) + ); + +CREATE INDEX idx_push_notification_client_info_public_key ON push_notification_client_info(public_key, installation_id); diff --git a/protocol/push_notification_client/migrations/doc.go b/protocol/push_notification_client/migrations/sql/doc.go similarity index 83% rename from protocol/push_notification_client/migrations/doc.go rename to protocol/push_notification_client/migrations/sql/doc.go index 0315ccce1..a7d080561 100644 --- a/protocol/push_notification_client/migrations/doc.go +++ b/protocol/push_notification_client/migrations/sql/doc.go @@ -6,4 +6,4 @@ package migrations -//go:generate go-bindata -pkg migrations -o ./migrations.go . +//go:generate go-bindata -pkg migrations -o ../migrations.go ./ diff --git a/protocol/push_notification_client/persistence.go b/protocol/push_notification_client/persistence.go index 73a66a06c..a4ca30a90 100644 --- a/protocol/push_notification_client/persistence.go +++ b/protocol/push_notification_client/persistence.go @@ -1,9 +1,11 @@ package push_notification_client import ( + "context" "crypto/ecdsa" "database/sql" "strings" + "time" "github.com/status-im/status-go/eth-node/crypto" ) @@ -16,16 +18,126 @@ func NewPersistence(db *sql.DB) *Persistence { return &Persistence{db: db} } -func (p *Persistence) TrackPushNotification(messageID []byte) error { +func (p *Persistence) TrackPushNotification(chatID string, messageID []byte) error { + trackedAt := time.Now().Unix() + _, err := p.db.Exec(`INSERT INTO push_notification_client_tracked_messages (chat_id, message_id, tracked_at) VALUES (?,?,?)`, chatID, messageID, trackedAt) + return err +} + +func (p *Persistence) SavePushNotificationQuery(publicKey *ecdsa.PublicKey, queryID []byte) error { + queriedAt := time.Now().Unix() + _, err := p.db.Exec(`INSERT INTO push_notification_client_queries (public_key, query_id, queried_at) VALUES (?,?,?)`, crypto.CompressPubkey(publicKey), queryID, queriedAt) + return err +} + +func (p *Persistence) GetQueriedAt(publicKey *ecdsa.PublicKey) (int64, error) { + var queriedAt int64 + err := p.db.QueryRow(`SELECT queried_at FROM push_notification_client_queries WHERE public_key = ? ORDER BY queried_at DESC LIMIT 1`, crypto.CompressPubkey(publicKey)).Scan(&queriedAt) + if err == sql.ErrNoRows { + return 0, nil + } + if err != nil { + return 0, err + } + + return queriedAt, nil +} + +func (p *Persistence) GetQueryPublicKey(queryID []byte) (*ecdsa.PublicKey, error) { + var publicKeyBytes []byte + err := p.db.QueryRow(`SELECT public_key FROM push_notification_client_queries WHERE query_id = ?`, queryID).Scan(&publicKeyBytes) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + + publicKey, err := crypto.DecompressPubkey(publicKeyBytes) + if err != nil { + return nil, err + } + return publicKey, nil +} + +func (p *Persistence) SavePushNotificationInfo(infos []*PushNotificationInfo) error { + tx, err := p.db.BeginTx(context.Background(), &sql.TxOptions{}) + defer func() { + if err == nil { + err = tx.Commit() + return + } + // don't shadow original error + _ = tx.Rollback() + }() + for _, info := range infos { + _, err := tx.Exec(`INSERT INTO push_notification_client_info (public_key, server_public_key, installation_id, access_token, retrieved_at) VALUES (?, ?, ?, ?, ?)`, crypto.CompressPubkey(info.PublicKey), crypto.CompressPubkey(info.ServerPublicKey), info.InstallationID, info.AccessToken, info.RetrievedAt) + if err != nil { + return err + } + } + return nil } -func (p *Persistence) ShouldSentNotificationFor(publicKey *ecdsa.PublicKey, messageID []byte) (bool, error) { - return false, nil +func (p *Persistence) GetPushNotificationInfo(publicKey *ecdsa.PublicKey, installationIDs []string) ([]*PushNotificationInfo, error) { + queryArgs := make([]interface{}, 0, len(installationIDs)+1) + queryArgs = append(queryArgs, crypto.CompressPubkey(publicKey)) + for _, installationID := range installationIDs { + queryArgs = append(queryArgs, installationID) + } + + inVector := strings.Repeat("?, ", len(installationIDs)-1) + "?" + + rows, err := p.db.Query(`SELECT server_public_key, installation_id, access_token, retrieved_at FROM push_notification_client_info WHERE public_key = ? AND installation_id IN (`+inVector+`)`, queryArgs...) + if err != nil { + return nil, err + } + var infos []*PushNotificationInfo + for rows.Next() { + var serverPublicKeyBytes []byte + info := &PushNotificationInfo{PublicKey: publicKey} + err := rows.Scan(&serverPublicKeyBytes, &info.InstallationID, &info.AccessToken, &info.RetrievedAt) + if err != nil { + return nil, err + } + + serverPublicKey, err := crypto.DecompressPubkey(serverPublicKeyBytes) + if err != nil { + return nil, err + } + + info.ServerPublicKey = serverPublicKey + infos = append(infos, info) + } + + return infos, nil } -func (p *Persistence) SentFor(publicKey *ecdsa.PublicKey, messageID []byte) error { - return nil +func (p *Persistence) ShouldSentNotificationFor(publicKey *ecdsa.PublicKey, installationID string, messageID []byte) (bool, error) { + // First we check that we are tracking this message, next we check that we haven't already sent this + var count uint64 + err := p.db.QueryRow(`SELECT COUNT(1) FROM push_notification_client_tracked_messages WHERE message_id = ?`, messageID).Scan(&count) + if err != nil { + return false, err + } + + if count == 0 { + return false, nil + } + + err = p.db.QueryRow(`SELECT COUNT(1) FROM push_notification_client_sent_notifications WHERE message_id = ? AND public_key = ? AND installation_id = ? `, messageID, crypto.CompressPubkey(publicKey), installationID).Scan(&count) + if err != nil { + return false, err + } + + return count == 0, nil +} + +func (p *Persistence) NotifiedOn(publicKey *ecdsa.PublicKey, installationID string, messageID []byte) error { + sentAt := time.Now().Unix() + _, err := p.db.Exec(`INSERT INTO push_notification_client_sent_notifications (public_key, installation_id, message_id, sent_at) VALUES (?, ?, ?, ?)`, crypto.CompressPubkey(publicKey), installationID, messageID, sentAt) + return err } func (p *Persistence) UpsertServer(server *PushNotificationServer) error { diff --git a/protocol/push_notification_client/persistence_test.go b/protocol/push_notification_client/persistence_test.go index 7c4e55c73..f81699240 100644 --- a/protocol/push_notification_client/persistence_test.go +++ b/protocol/push_notification_client/persistence_test.go @@ -72,3 +72,69 @@ func (s *SQLitePersistenceSuite) TestSaveAndRetrieveServer() { s.Require().Equal(int64(2), retrievedServers[0].RegisteredAt) s.Require().True(common.IsPubKeyEqual(retrievedServers[0].PublicKey, &key.PublicKey)) } + +func (s *SQLitePersistenceSuite) TestSaveAndRetrieveInfo() { + installationID1 := "installation-id-1" + installationID2 := "installation-id-2" + installationID3 := "installation-id-3" + key1, err := crypto.GenerateKey() + s.Require().NoError(err) + key2, err := crypto.GenerateKey() + s.Require().NoError(err) + serverKey, err := crypto.GenerateKey() + s.Require().NoError(err) + + accessToken := "token" + + infos := []*PushNotificationInfo{ + { + PublicKey: &key1.PublicKey, + ServerPublicKey: &serverKey.PublicKey, + RetrievedAt: 1, + AccessToken: accessToken, + InstallationID: installationID1, + }, + { + PublicKey: &key1.PublicKey, + ServerPublicKey: &serverKey.PublicKey, + RetrievedAt: 1, + AccessToken: accessToken, + InstallationID: installationID2, + }, + { + PublicKey: &key1.PublicKey, + ServerPublicKey: &serverKey.PublicKey, + RetrievedAt: 1, + AccessToken: accessToken, + InstallationID: installationID3, + }, + { + PublicKey: &key2.PublicKey, + ServerPublicKey: &serverKey.PublicKey, + RetrievedAt: 1, + AccessToken: accessToken, + InstallationID: installationID1, + }, + { + PublicKey: &key2.PublicKey, + ServerPublicKey: &serverKey.PublicKey, + RetrievedAt: 1, + AccessToken: accessToken, + InstallationID: installationID2, + }, + { + PublicKey: &key2.PublicKey, + ServerPublicKey: &serverKey.PublicKey, + RetrievedAt: 1, + AccessToken: accessToken, + InstallationID: installationID3, + }, + } + + s.Require().NoError(s.persistence.SavePushNotificationInfo(infos)) + + retrievedInfos, err := s.persistence.GetPushNotificationInfo(&key1.PublicKey, []string{installationID1, installationID2}) + s.Require().NoError(err) + + s.Require().Len(retrievedInfos, 2) +} diff --git a/protocol/push_notification_client/push_notification.go b/protocol/push_notification_client/push_notification.go index 083a6d9d7..f5c8c229f 100644 --- a/protocol/push_notification_client/push_notification.go +++ b/protocol/push_notification_client/push_notification.go @@ -4,6 +4,7 @@ import ( "context" "crypto/aes" "crypto/cipher" + "sort" "bytes" "crypto/ecdsa" @@ -16,6 +17,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/google/uuid" + "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/crypto/ecies" "github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/protobuf" @@ -24,6 +26,7 @@ import ( const encryptedPayloadKeyLength = 16 const accessTokenKeyLength = 16 +const staleQueryTimeInSeconds = 86400 type PushNotificationServer struct { PublicKey *ecdsa.PublicKey @@ -33,9 +36,11 @@ type PushNotificationServer struct { } type PushNotificationInfo struct { - AccessToken string - InstallationID string - PublicKey *ecdsa.PublicKey + AccessToken string + InstallationID string + PublicKey *ecdsa.PublicKey + ServerPublicKey *ecdsa.PublicKey + RetrievedAt int64 } type Config struct { @@ -79,18 +84,15 @@ type Client struct { //messageProcessor is a message processor used to send and being notified of messages messageProcessor *common.MessageProcessor - //pushNotificationQueryResponses is a channel that listens to pushNotificationResponse - pushNotificationQueryResponses chan *protobuf.PushNotificationQueryResponse } func New(persistence *Persistence, config *Config, processor *common.MessageProcessor) *Client { return &Client{ - quit: make(chan struct{}), - config: config, - pushNotificationQueryResponses: make(chan *protobuf.PushNotificationQueryResponse), - messageProcessor: processor, - persistence: persistence, - reader: rand.Reader} + quit: make(chan struct{}), + config: config, + messageProcessor: processor, + persistence: persistence, + reader: rand.Reader} } func (c *Client) Start() error { @@ -104,7 +106,7 @@ func (c *Client) Start() error { select { case m := <-subscription: if err := c.HandleMessageSent(m); err != nil { - // TODO: log + c.config.Logger.Error("failed to handle message", zap.Error(err)) } case <-c.quit: return @@ -119,23 +121,150 @@ func (c *Client) Stop() error { return nil } -// Sends an actual push notification, where do we get the chatID? -func sendPushNotificationTo(publicKey *ecdsa.PublicKey, chatID string) error { +type notificationSendingSpec struct { + serverPublicKey *ecdsa.PublicKey + installationID string + messageID []byte +} + +// The message has been sent +// We should: +// 1) Check whether we should notify on anything +// 2) Refresh info if necessaary +// 3) Sent push notifications +func (c *Client) HandleMessageSent(sentMessage *common.SentMessage) error { + if !c.config.SendEnabled { + return nil + } + publicKey := sentMessage.PublicKey + var installationIDs []string + + var notificationSpecs []*notificationSendingSpec + + //Find if there's any actionable message + for _, messageID := range sentMessage.MessageIDs { + for _, installation := range sentMessage.Spec.Installations { + installationID := installation.ID + shouldNotify, err := c.shouldNotifyOn(publicKey, installationID, messageID) + if err != nil { + return err + } + if shouldNotify { + notificationSpecs = append(notificationSpecs, ¬ificationSendingSpec{ + installationID: installationID, + messageID: messageID, + }) + installationIDs = append(installationIDs, installation.ID) + } + } + } + + // Is there anything we should be notifying on? + if len(installationIDs) == 0 { + return nil + } + + // Check if we queried recently + queriedAt, err := c.persistence.GetQueriedAt(publicKey) + if err != nil { + return err + } + + // Naively query again if too much time has passed. + // Here it might not be necessary + if time.Now().Unix()-queriedAt > staleQueryTimeInSeconds { + err := c.QueryPushNotificationInfo(publicKey) + if err != nil { + return err + } + // This is just horrible, but for now will do, + // the issue is that we don't really know how long it will + // take to reply, as there might be multiple servers + // replying to us. + // The only time we are 100% certain that we can proceed is + // when we have non-stale info for each device, but + // most devices are not going to be registered, so we'd still + // have to wait teh maximum amount of time allowed. + time.Sleep(3 * time.Second) + + } + // Retrieve infos + info, err := c.GetPushNotificationInfo(publicKey, installationIDs) + if err != nil { + return err + } + + // Naively dispatch to the first server for now + // This wait for an acknowledgement and try a different server after a timeout + // Also we sent a single notification for multiple message ids, need to check with UI what's the desired behavior + + // Sort by server so we tend to hit the same one + sort.Slice(info, func(i, j int) bool { + return info[i].ServerPublicKey.X.Cmp(info[j].ServerPublicKey.X) <= 0 + }) + + installationIDsMap := make(map[string]bool) + // One info per installation id, grouped by server + actionableInfos := make(map[string][]*PushNotificationInfo) + for _, i := range info { + if !installationIDsMap[i.InstallationID] { + serverKey := hex.EncodeToString(crypto.CompressPubkey(i.ServerPublicKey)) + actionableInfos[serverKey] = append(actionableInfos[serverKey], i) + installationIDsMap[i.InstallationID] = true + } + + } + + for _, infos := range actionableInfos { + var pushNotifications []*protobuf.PushNotification + for _, i := range infos { + // TODO: Add ChatID, message, public_key + pushNotifications = append(pushNotifications, &protobuf.PushNotification{ + AccessToken: i.AccessToken, + PublicKey: common.HashPublicKey(publicKey), + InstallationId: i.InstallationID, + }) + + } + request := &protobuf.PushNotificationRequest{ + MessageId: sentMessage.MessageIDs[0], + Requests: pushNotifications, + } + serverPublicKey := infos[0].ServerPublicKey + + payload, err := proto.Marshal(request) + if err != nil { + return err + } + + rawMessage := &common.RawMessage{ + Payload: payload, + MessageType: protobuf.ApplicationMetadataMessage_PUSH_NOTIFICATION_REQUEST, + } + + // TODO: We should use the messageID for the response + _, err = c.messageProcessor.SendPrivate(context.Background(), serverPublicKey, rawMessage) + + if err != nil { + return err + } + } + return nil } -// This should schedule: -// 1) Check we have reasonably fresh push notifications info -// 2) Otherwise it should fetch them -// 3) Send a push notification to the devices in question -func (p *Client) HandleMessageSent(sentMessage *common.SentMessage) error { - return nil +// NotifyOnMessageID keeps track of the message to make sure we notify on it +func (c *Client) NotifyOnMessageID(chatID string, messageID []byte) error { + return c.persistence.TrackPushNotification(chatID, messageID) } -func (p *Client) NotifyOnMessageID(messageID []byte) error { - return nil +func (c *Client) shouldNotifyOn(publicKey *ecdsa.PublicKey, installationID string, messageID []byte) (bool, error) { + return c.persistence.ShouldSentNotificationFor(publicKey, installationID, messageID) } +func (c *Client) notifiedOn(publicKey *ecdsa.PublicKey, installationID string, messageID []byte) error { + return c.persistence.NotifiedOn(publicKey, installationID, messageID) +} func (p *Client) mutedChatIDsHashes(chatIDs []string) [][]byte { var mutedChatListHashes [][]byte @@ -313,14 +442,42 @@ func (p *Client) HandlePushNotificationAdvertisement(info *protobuf.PushNotifica } // HandlePushNotificationQueryResponse should update the data in the database for a given user -func (c *Client) HandlePushNotificationQueryResponse(response protobuf.PushNotificationQueryResponse) error { +func (c *Client) HandlePushNotificationQueryResponse(serverPublicKey *ecdsa.PublicKey, response protobuf.PushNotificationQueryResponse) error { c.config.Logger.Debug("received push notification query response", zap.Any("response", response)) - select { - case c.pushNotificationQueryResponses <- &response: - default: - return errors.New("could not process push notification query response") + if len(response.Info) == 0 { + return errors.New("empty response from the server") } + + publicKey, err := c.persistence.GetQueryPublicKey(response.MessageId) + if err != nil { + return err + } + if publicKey == nil { + c.config.Logger.Debug("query not found") + return nil + } + var pushNotificationInfo []*PushNotificationInfo + for _, info := range response.Info { + if bytes.Compare(info.PublicKey, common.HashPublicKey(publicKey)) != 0 { + c.config.Logger.Warn("reply for different key, ignoring") + continue + } + pushNotificationInfo = append(pushNotificationInfo, &PushNotificationInfo{ + PublicKey: publicKey, + ServerPublicKey: serverPublicKey, + AccessToken: info.AccessToken, + InstallationID: info.InstallationId, + RetrievedAt: time.Now().Unix(), + }) + + } + err = c.persistence.SavePushNotificationInfo(pushNotificationInfo) + if err != nil { + c.config.Logger.Error("failed to save push notifications", zap.Error(err)) + return err + } + return nil } @@ -347,14 +504,14 @@ func (c *Client) AddPushNotificationServer(publicKey *ecdsa.PublicKey) error { }) } -func (c *Client) RetrievePushNotificationInfo(publicKey *ecdsa.PublicKey) ([]*PushNotificationInfo, error) { +func (c *Client) QueryPushNotificationInfo(publicKey *ecdsa.PublicKey) error { hashedPublicKey := common.HashPublicKey(publicKey) query := &protobuf.PushNotificationQuery{ PublicKeys: [][]byte{hashedPublicKey}, } encodedMessage, err := proto.Marshal(query) if err != nil { - return nil, err + return err } rawMessage := &common.RawMessage{ @@ -366,47 +523,20 @@ func (c *Client) RetrievePushNotificationInfo(publicKey *ecdsa.PublicKey) ([]*Pu c.config.Logger.Debug("sending query") messageID, err := c.messageProcessor.SendPublic(context.Background(), encodedPublicKey, rawMessage) - // TODO: this is probably best done by polling the database instead - for { - select { - case <-c.quit: - return nil, nil - case <-time.After(5 * time.Second): - return nil, errors.New("no registration query response received") - case response := <-c.pushNotificationQueryResponses: - if bytes.Compare(response.MessageId, messageID) != 0 { - // Not for us, queue back - c.pushNotificationQueryResponses <- response - // This is not accurate, we should then shrink the timeout - // Also we should handle multiple responses - continue - } - - if len(response.Info) == 0 { - return nil, errors.New("empty response from the server") - } - - var pushNotificationInfo []*PushNotificationInfo - for _, info := range response.Info { - if bytes.Compare(info.PublicKey, hashedPublicKey) != 0 { - continue - } - pushNotificationInfo = append(pushNotificationInfo, &PushNotificationInfo{ - PublicKey: publicKey, - AccessToken: info.AccessToken, - InstallationID: info.InstallationId, - }) - - } - - return pushNotificationInfo, nil - } + if err != nil { + return err } + + return c.persistence.SavePushNotificationQuery(publicKey, messageID) } -func (s *Client) listenToPublicKeyQueryTopic(hashedPublicKey []byte) error { +func (c *Client) GetPushNotificationInfo(publicKey *ecdsa.PublicKey, installationIDs []string) ([]*PushNotificationInfo, error) { + return c.persistence.GetPushNotificationInfo(publicKey, installationIDs) +} + +func (c *Client) listenToPublicKeyQueryTopic(hashedPublicKey []byte) error { encodedPublicKey := hex.EncodeToString(hashedPublicKey) - return s.messageProcessor.JoinPublic(encodedPublicKey) + return c.messageProcessor.JoinPublic(encodedPublicKey) } func encryptAccessToken(plaintext []byte, key []byte, reader io.Reader) ([]byte, error) { diff --git a/protocol/push_notification_client/push_notification_test.go b/protocol/push_notification_client/push_notification_test.go index 7487a897d..852ea6ec1 100644 --- a/protocol/push_notification_client/push_notification_test.go +++ b/protocol/push_notification_client/push_notification_test.go @@ -3,7 +3,9 @@ package push_notification_client import ( "bytes" "crypto/ecdsa" + "io/ioutil" "math/rand" + "os" "testing" @@ -12,12 +14,52 @@ import ( "github.com/status-im/status-go/eth-node/crypto/ecies" "github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/protobuf" - "github.com/stretchr/testify/require" + "github.com/status-im/status-go/protocol/sqlite" + "github.com/status-im/status-go/protocol/tt" + "github.com/stretchr/testify/suite" ) -func TestBuildPushNotificationRegisterMessage(t *testing.T) { +type ClientSuite struct { + suite.Suite + tmpFile *os.File + persistence *Persistence + identity *ecdsa.PrivateKey + installationID string + client *Client +} + +func TestClientSuite(t *testing.T) { + s := new(ClientSuite) + s.installationID = "c6ae4fde-bb65-11ea-b3de-0242ac130004" + + suite.Run(t, s) +} + +func (s *ClientSuite) SetupTest() { + tmpFile, err := ioutil.TempFile("", "") + s.Require().NoError(err) + s.tmpFile = tmpFile + + database, err := sqlite.Open(s.tmpFile.Name(), "") + s.Require().NoError(err) + s.persistence = NewPersistence(database) + + identity, err := crypto.GenerateKey() + s.Require().NoError(err) + s.identity = identity + + config := &Config{ + Identity: identity, + Logger: tt.MustCreateTestLogger(), + RemoteNotificationsEnabled: true, + InstallationID: s.installationID, + } + + s.client = New(s.persistence, config, nil) +} + +func (s *ClientSuite) TestBuildPushNotificationRegisterMessage() { myDeviceToken := "device-token" - myInstallationID := "installationID" mutedChatList := []string{"a", "b"} // build chat lish hashes @@ -26,9 +68,8 @@ func TestBuildPushNotificationRegisterMessage(t *testing.T) { mutedChatListHashes = append(mutedChatListHashes, common.Shake256([]byte(chatID))) } - identity, err := crypto.GenerateKey() contactKey, err := crypto.GenerateKey() - require.NoError(t, err) + s.Require().NoError(err) contactIDs := []*ecdsa.PublicKey{&contactKey.PublicKey} // Set random generator for uuid @@ -41,43 +82,77 @@ func TestBuildPushNotificationRegisterMessage(t *testing.T) { // set up reader reader := bytes.NewReader([]byte(expectedUUID)) - sharedKey, err := ecies.ImportECDSA(identity).GenerateShared( + sharedKey, err := ecies.ImportECDSA(s.identity).GenerateShared( ecies.ImportECDSAPublic(&contactKey.PublicKey), accessTokenKeyLength, accessTokenKeyLength, ) - require.NoError(t, err) + s.Require().NoError(err) // build encrypted token encryptedToken, err := encryptAccessToken([]byte(expectedUUID), sharedKey, reader) - require.NoError(t, err) + s.Require().NoError(err) // Reset random generator uuid.SetRand(rand.New(rand.NewSource(seed))) - config := &Config{ - Identity: identity, - RemoteNotificationsEnabled: true, - InstallationID: myInstallationID, - } - - client := &Client{} - client.config = config - client.DeviceToken = myDeviceToken + s.client.DeviceToken = myDeviceToken // Set reader - client.reader = bytes.NewReader([]byte(expectedUUID)) + s.client.reader = bytes.NewReader([]byte(expectedUUID)) options := &protobuf.PushNotificationRegistration{ Version: 1, AccessToken: expectedUUID, Token: myDeviceToken, - InstallationId: myInstallationID, + InstallationId: s.installationID, Enabled: true, BlockedChatList: mutedChatListHashes, AllowedUserList: [][]byte{encryptedToken}, } - actualMessage, err := client.buildPushNotificationRegistrationMessage(contactIDs, mutedChatList) - require.NoError(t, err) + actualMessage, err := s.client.buildPushNotificationRegistrationMessage(contactIDs, mutedChatList) + s.Require().NoError(err) - require.Equal(t, options, actualMessage) + s.Require().Equal(options, actualMessage) +} + +func (s *ClientSuite) TestNotifyOnMessageID() { + messageID := []byte("message-id") + chatID := "chat-id" + installationID1 := "1" + installationID2 := "2" + + s.Require().NoError(s.client.NotifyOnMessageID(chatID, messageID)) + + key1, err := crypto.GenerateKey() + s.Require().NoError(err) + + // First time, should notify + response, err := s.client.shouldNotifyOn(&key1.PublicKey, installationID1, messageID) + s.Require().NoError(err) + s.Require().True(response) + + // Save notification + s.Require().NoError(s.client.notifiedOn(&key1.PublicKey, installationID1, messageID)) + + // Second time, should not notify + response, err = s.client.shouldNotifyOn(&key1.PublicKey, installationID1, messageID) + s.Require().NoError(err) + s.Require().False(response) + + // Different installationID + response, err = s.client.shouldNotifyOn(&key1.PublicKey, installationID2, messageID) + s.Require().NoError(err) + s.Require().True(response) + + key2, err := crypto.GenerateKey() + s.Require().NoError(err) + // different key, should notify + response, err = s.client.shouldNotifyOn(&key2.PublicKey, installationID1, messageID) + s.Require().NoError(err) + s.Require().True(response) + + // non tracked message id + response, err = s.client.shouldNotifyOn(&key1.PublicKey, installationID1, []byte("not-existing")) + s.Require().NoError(err) + s.Require().False(response) } diff --git a/protocol/push_notification_server/migrations/1593601728_initial_schema.down.sql b/protocol/push_notification_server/migrations/sql/1593601728_initial_schema.down.sql similarity index 100% rename from protocol/push_notification_server/migrations/1593601728_initial_schema.down.sql rename to protocol/push_notification_server/migrations/sql/1593601728_initial_schema.down.sql diff --git a/protocol/push_notification_server/migrations/1593601728_initial_schema.up.sql b/protocol/push_notification_server/migrations/sql/1593601728_initial_schema.up.sql similarity index 100% rename from protocol/push_notification_server/migrations/1593601728_initial_schema.up.sql rename to protocol/push_notification_server/migrations/sql/1593601728_initial_schema.up.sql diff --git a/protocol/push_notification_server/migrations/doc.go b/protocol/push_notification_server/migrations/sql/doc.go similarity index 83% rename from protocol/push_notification_server/migrations/doc.go rename to protocol/push_notification_server/migrations/sql/doc.go index 0315ccce1..a7d080561 100644 --- a/protocol/push_notification_server/migrations/doc.go +++ b/protocol/push_notification_server/migrations/sql/doc.go @@ -6,4 +6,4 @@ package migrations -//go:generate go-bindata -pkg migrations -o ./migrations.go . +//go:generate go-bindata -pkg migrations -o ../migrations.go ./ diff --git a/protocol/push_notification_server/persistence.go b/protocol/push_notification_server/persistence.go index f7f73cb67..1d6eebe34 100644 --- a/protocol/push_notification_server/persistence.go +++ b/protocol/push_notification_server/persistence.go @@ -14,6 +14,8 @@ type Persistence interface { GetPushNotificationRegistrationByPublicKeyAndInstallationID(publicKey []byte, installationID string) (*protobuf.PushNotificationRegistration, error) // GetPushNotificationRegistrationByPublicKey retrieve all the push notification registrations from storage given a public key GetPushNotificationRegistrationByPublicKeys(publicKeys [][]byte) ([]*PushNotificationIDAndRegistration, error) + //GetPushNotificationRegistrationPublicKeys return all the public keys stored + GetPushNotificationRegistrationPublicKeys() ([][]byte, error) // DeletePushNotificationRegistration deletes a push notification registration from storage given a public key and installation id DeletePushNotificationRegistration(publicKey []byte, installationID string) error @@ -88,6 +90,26 @@ func (p *SQLitePersistence) GetPushNotificationRegistrationByPublicKeys(publicKe return registrations, nil } +func (p *SQLitePersistence) GetPushNotificationRegistrationPublicKeys() ([][]byte, error) { + rows, err := p.db.Query(`SELECT public_key FROM push_notification_server_registrations`) + if err != nil { + return nil, err + } + defer rows.Close() + + var publicKeys [][]byte + for rows.Next() { + var publicKey []byte + err := rows.Scan(&publicKey) + if err != nil { + return nil, err + } + + publicKeys = append(publicKeys, publicKey) + } + return publicKeys, nil +} + func (p *SQLitePersistence) SavePushNotificationRegistration(publicKey []byte, registration *protobuf.PushNotificationRegistration) error { marshaledRegistration, err := proto.Marshal(registration) if err != nil { diff --git a/protocol/push_notification_server/server.go b/protocol/push_notification_server/server.go index 223298859..04d1e56ba 100644 --- a/protocol/push_notification_server/server.go +++ b/protocol/push_notification_server/server.go @@ -163,6 +163,7 @@ func (p *Server) HandlePushNotificationRequest(request *protobuf.PushNotificatio response.MessageId = request.MessageId + // TODO: filter by chat id // Collect successful requests & registrations var requestAndRegistrations []*RequestAndRegistration @@ -256,6 +257,20 @@ func (s *Server) HandlePushNotificationRegistration(publicKey *ecdsa.PublicKey, return response } +func (s *Server) Start() error { + pks, err := s.persistence.GetPushNotificationRegistrationPublicKeys() + if err != nil { + return err + } + for _, pk := range pks { + if err := s.listenToPublicKeyQueryTopic(pk); err != nil { + return err + } + } + + return nil +} + func (s *Server) listenToPublicKeyQueryTopic(hashedPublicKey []byte) error { if s.messageProcessor == nil { return nil diff --git a/protocol/push_notification_test.go b/protocol/push_notification_test.go index 310f81ffb..95961ff28 100644 --- a/protocol/push_notification_test.go +++ b/protocol/push_notification_test.go @@ -123,7 +123,8 @@ func (s *MessengerPushNotificationSuite) TestReceivePushNotification() { bob1 := s.m bob2 := s.newMessengerWithKey(s.shh, s.m.identity) server := s.newPushNotificationServer(s.shh) - client2 := s.newMessenger(s.shh) + alice := s.newMessenger(s.shh) + bobInstallationIDs := []string{bob1.installationID, bob2.installationID} // Register bob1 err := bob1.AddPushNotificationServer(context.Background(), &server.identity.PublicKey) @@ -213,11 +214,8 @@ func (s *MessengerPushNotificationSuite) TestReceivePushNotification() { s.Require().Len(bob2Servers, 1) s.Require().True(bob2Servers[0].Registered) - var info []*push_notification_client.PushNotificationInfo - go func() { - info, err = client2.pushNotificationClient.RetrievePushNotificationInfo(&bob2.identity.PublicKey) - errChan <- err - }() + err = alice.pushNotificationClient.QueryPushNotificationInfo(&bob2.identity.PublicKey) + s.Require().NoError(err) // Receive push notification query // TODO: find a better way to handle this waiting @@ -236,21 +234,23 @@ func (s *MessengerPushNotificationSuite) TestReceivePushNotification() { // Receive push notification query response // TODO: find a better way to handle this waiting time.Sleep(500 * time.Millisecond) - _, err = client2.RetrieveAll() + _, err = alice.RetrieveAll() s.Require().NoError(err) time.Sleep(500 * time.Millisecond) - _, err = client2.RetrieveAll() + _, err = alice.RetrieveAll() s.Require().NoError(err) time.Sleep(500 * time.Millisecond) - _, err = client2.RetrieveAll() + _, err = alice.RetrieveAll() s.Require().NoError(err) - err = <-errChan + // Here we should poll, as we don't know whether they are already there + + info, err := alice.pushNotificationClient.GetPushNotificationInfo(&bob1.identity.PublicKey, bobInstallationIDs) s.Require().NoError(err) - s.Require().NotNil(info) // Check we have replies for both bob1 and bob2 + s.Require().NotNil(info) s.Require().Len(info, 2) var bob1Info, bob2Info *push_notification_client.PushNotificationInfo @@ -264,17 +264,18 @@ func (s *MessengerPushNotificationSuite) TestReceivePushNotification() { } s.Require().NotNil(bob1Info) - s.Require().Equal(bob1Info, &push_notification_client.PushNotificationInfo{ - InstallationID: bob1.installationID, - AccessToken: bob1Servers[0].AccessToken, - PublicKey: &bob1.identity.PublicKey, - }) + s.Require().Equal(bob1.installationID, bob1Info.InstallationID) + s.Require().Equal(bob1Info.AccessToken, bob1Servers[0].AccessToken, bob1Info.AccessToken) + s.Require().Equal(&bob1.identity.PublicKey, bob1Info.PublicKey) s.Require().NotNil(bob2Info) - s.Require().Equal(bob2Info, &push_notification_client.PushNotificationInfo{ - InstallationID: bob2.installationID, - AccessToken: bob2Servers[0].AccessToken, - PublicKey: &bob1.identity.PublicKey, - }) + s.Require().Equal(bob2.installationID, bob2Info.InstallationID) + s.Require().Equal(bob2Servers[0].AccessToken, bob2Info.AccessToken) + s.Require().Equal(&bob2.identity.PublicKey, bob2Info.PublicKey) + retrievedNotificationInfo, err := alice.pushNotificationClient.GetPushNotificationInfo(&bob1.identity.PublicKey, bobInstallationIDs) + alice.logger.Info("BOB KEY", zap.Any("key", bob1.identity.PublicKey)) + s.Require().NoError(err) + s.Require().NotNil(retrievedNotificationInfo) + s.Require().Len(retrievedNotificationInfo, 2) }