keep track of queries & actually send pns, hurray

This commit is contained in:
Andrea Maria Piana 2020-07-10 15:26:06 +02:00
parent 1c379984cb
commit 20fb8607cb
No known key found for this signature in database
GPG Key ID: AA6CCA6DE0E06424
18 changed files with 655 additions and 208 deletions

View File

@ -388,6 +388,7 @@ func (p *MessageProcessor) addToDataSync(publicKey *ecdsa.PublicKey, message []b
// sendDataSync sends a message scheduled by the data sync layer.
// Data Sync layer calls this method "dispatch" function.
func (p *MessageProcessor) sendDataSync(ctx context.Context, publicKey *ecdsa.PublicKey, encodedMessage []byte, payload *datasyncproto.Payload) error {
// Calculate the messageIDs
messageIDs := make([][]byte, 0, len(payload.Messages))
for _, payload := range payload.Messages {
messageIDs = append(messageIDs, v1protocol.MessageID(&p.identity.PublicKey, payload.Body))

View File

@ -287,6 +287,7 @@ func NewMessenger(
verifyTransactionClient: c.verifyTransactionClient,
shutdownTasks: []func() error{
database.Close,
pushNotificationClient.Stop,
transp.ResetFilters,
transp.Stop,
func() error { processor.Stop(); return nil },
@ -303,6 +304,18 @@ func NewMessenger(
}
func (m *Messenger) Start() error {
// Start push notification server
if m.pushNotificationServer != nil {
if err := m.pushNotificationServer.Start(); err != nil {
return err
}
}
if m.pushNotificationClient != nil {
if err := m.pushNotificationClient.Start(); err != nil {
return err
}
}
return m.encryptor.Start(m.identity)
}
@ -1394,7 +1407,7 @@ func (m *Messenger) SendChatMessage(ctx context.Context, message *Message) (*Mes
// If the chat is not public, we instruct the pushNotificationService to send a notification
if !chat.Public() && m.pushNotificationClient != nil {
if err := m.pushNotificationClient.NotifyOnMessageID(id); err != nil {
if err := m.pushNotificationClient.NotifyOnMessageID(chat.ID, id); err != nil {
return nil, err
}
@ -1963,7 +1976,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
logger.Debug("Handling PushNotificationQueryResponse")
// TODO: Compare DST with Identity
if err := m.pushNotificationClient.HandlePushNotificationQueryResponse(msg.ParsedMessage.(protobuf.PushNotificationQueryResponse)); err != nil {
if err := m.pushNotificationClient.HandlePushNotificationQueryResponse(publicKey, msg.ParsedMessage.(protobuf.PushNotificationQueryResponse)); err != nil {
logger.Warn("failed to handle PushNotificationQueryResponse", zap.Error(err))
}
// We continue in any case, no changes to messenger

View File

@ -595,8 +595,7 @@ func (m *PushNotification) GetMessage() []byte {
type PushNotificationRequest struct {
Requests []*PushNotification `protobuf:"bytes,1,rep,name=requests,proto3" json:"requests,omitempty"`
MessageId string `protobuf:"bytes,2,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"`
AckRequired string `protobuf:"bytes,3,opt,name=ack_required,json=ackRequired,proto3" json:"ack_required,omitempty"`
MessageId []byte `protobuf:"bytes,2,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -634,18 +633,11 @@ func (m *PushNotificationRequest) GetRequests() []*PushNotification {
return nil
}
func (m *PushNotificationRequest) GetMessageId() string {
func (m *PushNotificationRequest) GetMessageId() []byte {
if m != nil {
return m.MessageId
}
return ""
}
func (m *PushNotificationRequest) GetAckRequired() string {
if m != nil {
return m.AckRequired
}
return ""
return nil
}
type PushNotificationReport struct {
@ -711,9 +703,8 @@ func (m *PushNotificationReport) GetInstallationId() string {
return ""
}
// TOP LEVEL
type PushNotificationResponse struct {
MessageId string `protobuf:"bytes,1,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"`
MessageId []byte `protobuf:"bytes,1,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"`
Reports []*PushNotificationReport `protobuf:"bytes,2,rep,name=reports,proto3" json:"reports,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
@ -745,11 +736,11 @@ func (m *PushNotificationResponse) XXX_DiscardUnknown() {
var xxx_messageInfo_PushNotificationResponse proto.InternalMessageInfo
func (m *PushNotificationResponse) GetMessageId() string {
func (m *PushNotificationResponse) GetMessageId() []byte {
if m != nil {
return m.MessageId
}
return ""
return nil
}
func (m *PushNotificationResponse) GetReports() []*PushNotificationReport {
@ -779,59 +770,58 @@ func init() {
func init() { proto.RegisterFile("push_notifications.proto", fileDescriptor_200acd86044eaa5d) }
var fileDescriptor_200acd86044eaa5d = []byte{
// 857 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x95, 0x51, 0x6f, 0xe3, 0x44,
0x10, 0xc7, 0x59, 0x27, 0x6d, 0xe2, 0x49, 0x68, 0x73, 0xab, 0x5e, 0xcf, 0x9c, 0x38, 0xc8, 0x19,
0x24, 0xa2, 0x43, 0x8a, 0x50, 0x91, 0xe0, 0xc4, 0x13, 0xa1, 0x75, 0x8b, 0xd5, 0xc6, 0x0e, 0x1b,
0x97, 0x13, 0x12, 0x92, 0xe5, 0xc4, 0xdb, 0x8b, 0x95, 0x9c, 0x1d, 0x76, 0xd7, 0x87, 0xf2, 0x80,
0xc4, 0x27, 0x40, 0x42, 0xbc, 0xf1, 0x29, 0xd0, 0x7d, 0x07, 0xbe, 0x17, 0xf2, 0xda, 0x0e, 0x8e,
0xe3, 0x4b, 0xf3, 0x70, 0x4f, 0xf1, 0xfc, 0x77, 0x66, 0x77, 0xe7, 0x37, 0x3b, 0x13, 0xd0, 0x96,
0x31, 0x9f, 0xb9, 0x61, 0x24, 0x82, 0xbb, 0x60, 0xea, 0x89, 0x20, 0x0a, 0x79, 0x7f, 0xc9, 0x22,
0x11, 0xe1, 0xa6, 0xfc, 0x99, 0xc4, 0x77, 0xfa, 0xbf, 0x35, 0xf8, 0x70, 0x14, 0xf3, 0x99, 0x55,
0xf0, 0x22, 0xf4, 0x65, 0xc0, 0x05, 0x93, 0xdf, 0xd8, 0x06, 0x10, 0xd1, 0x9c, 0x86, 0xae, 0x58,
0x2d, 0xa9, 0x86, 0xba, 0xa8, 0x77, 0x74, 0xf6, 0x45, 0x3f, 0x8f, 0xef, 0xef, 0x8a, 0xed, 0x3b,
0x49, 0xa0, 0xb3, 0x5a, 0x52, 0xa2, 0x8a, 0xfc, 0x13, 0x9f, 0xc0, 0x81, 0x34, 0x34, 0xa5, 0x8b,
0x7a, 0x2a, 0x49, 0x0d, 0xfc, 0x19, 0x1c, 0x07, 0x21, 0x17, 0xde, 0x62, 0x21, 0x43, 0xdd, 0xc0,
0xd7, 0x6a, 0x72, 0xfd, 0xa8, 0x28, 0x9b, 0x3e, 0x7e, 0x0a, 0x6d, 0x6f, 0x3a, 0xa5, 0x9c, 0xbb,
0xe9, 0x2e, 0x75, 0xe9, 0xd5, 0x4a, 0x35, 0x79, 0x20, 0xd6, 0xa0, 0x41, 0x43, 0x6f, 0xb2, 0xa0,
0xbe, 0x76, 0xd0, 0x45, 0xbd, 0x26, 0xc9, 0xcd, 0x64, 0xe5, 0x35, 0x65, 0x3c, 0x88, 0x42, 0xed,
0xb0, 0x8b, 0x7a, 0x75, 0x92, 0x9b, 0xf8, 0x19, 0x3c, 0xf0, 0x16, 0x8b, 0xe8, 0x57, 0xea, 0xbb,
0x31, 0xa7, 0xcc, 0x5d, 0x04, 0x5c, 0x68, 0x8d, 0x6e, 0xad, 0xd7, 0x26, 0xc7, 0xd9, 0xc2, 0x2d,
0xa7, 0xec, 0x26, 0xe0, 0x22, 0xf1, 0x9d, 0x2c, 0xa2, 0xe9, 0x9c, 0xfa, 0xee, 0x74, 0xe6, 0x89,
0xd4, 0xb7, 0x99, 0xfa, 0x66, 0x0b, 0xe7, 0x33, 0x4f, 0x48, 0xdf, 0x8f, 0x00, 0xe2, 0x90, 0x49,
0x28, 0x94, 0x69, 0xaa, 0xbc, 0x4e, 0x41, 0xd1, 0x2f, 0x41, 0x5d, 0x53, 0xc2, 0xa7, 0x80, 0x6f,
0xad, 0x6b, 0xcb, 0x7e, 0x61, 0xb9, 0x8e, 0x7d, 0x6d, 0x58, 0xae, 0xf3, 0xd3, 0xc8, 0xe8, 0xbc,
0x87, 0xdf, 0x07, 0x75, 0x30, 0xca, 0xb4, 0x0e, 0xc2, 0x18, 0x8e, 0x2e, 0x4d, 0x62, 0x7c, 0x37,
0x18, 0x1b, 0x99, 0xa6, 0xe8, 0x6f, 0x14, 0xf8, 0x74, 0x57, 0x2d, 0x08, 0xe5, 0xcb, 0x28, 0xe4,
0x34, 0x41, 0xc0, 0x63, 0x09, 0x4b, 0x16, 0xb3, 0x49, 0x72, 0x13, 0x5b, 0x70, 0x40, 0x19, 0x8b,
0x98, 0x2c, 0xcc, 0xd1, 0xd9, 0xf3, 0xfd, 0x8a, 0x9c, 0x6f, 0xdc, 0x37, 0x92, 0x58, 0x59, 0xec,
0x74, 0x1b, 0xfc, 0x04, 0x80, 0xd1, 0x5f, 0x62, 0xca, 0x45, 0x5e, 0xcd, 0x36, 0x51, 0x33, 0xc5,
0xf4, 0xf5, 0xdf, 0x11, 0xa8, 0xeb, 0x98, 0x62, 0xea, 0x06, 0x21, 0x36, 0xc9, 0x53, 0x7f, 0x08,
0x0f, 0x86, 0x83, 0x9b, 0x4b, 0x9b, 0x0c, 0x8d, 0x0b, 0x77, 0x68, 0x8c, 0xc7, 0x83, 0x2b, 0xa3,
0x83, 0xf0, 0x09, 0x74, 0x7e, 0x34, 0xc8, 0xd8, 0xb4, 0x2d, 0x77, 0x68, 0x8e, 0x87, 0x03, 0xe7,
0xfc, 0xfb, 0x8e, 0x82, 0x1f, 0xc3, 0xe9, 0xad, 0x35, 0xbe, 0x1d, 0x8d, 0x6c, 0xe2, 0x18, 0x17,
0x45, 0x86, 0xb5, 0x04, 0x9a, 0x69, 0x39, 0x06, 0xb1, 0x06, 0x37, 0xe9, 0x09, 0x9d, 0xba, 0xfe,
0x07, 0x82, 0xa7, 0xe5, 0xdc, 0x06, 0xfe, 0x6b, 0xca, 0x44, 0xc0, 0xe9, 0x2b, 0x1a, 0x0a, 0x33,
0xbc, 0x8b, 0x92, 0x3c, 0x96, 0xf1, 0x64, 0x11, 0x4c, 0xdd, 0x39, 0x5d, 0x49, 0x68, 0x6d, 0xa2,
0xa6, 0xca, 0x35, 0x5d, 0x6d, 0x3d, 0x48, 0x65, 0xfb, 0x41, 0xee, 0xfb, 0xb8, 0xf5, 0xdf, 0x40,
0x3b, 0x8f, 0x42, 0xe1, 0x4d, 0xc5, 0x79, 0xe4, 0xd3, 0x8d, 0xab, 0x60, 0x0f, 0x4e, 0xb7, 0xfa,
0xd9, 0x0d, 0xc2, 0xbb, 0x48, 0x43, 0xdd, 0x5a, 0xaf, 0x75, 0xf6, 0xf9, 0xdb, 0xeb, 0xb5, 0x95,
0x13, 0x39, 0x59, 0x96, 0x5c, 0x12, 0x55, 0x7f, 0x0e, 0x0f, 0xcb, 0xa1, 0x3f, 0xc4, 0x94, 0xad,
0xf0, 0xc7, 0xd0, 0xfa, 0x1f, 0x01, 0x97, 0x07, 0xb6, 0x09, 0xac, 0x19, 0x70, 0xfd, 0x0d, 0x82,
0x0f, 0x2a, 0x43, 0x25, 0xc1, 0x32, 0x22, 0xb4, 0x17, 0x22, 0xa5, 0xb2, 0xff, 0x37, 0xab, 0x51,
0x2b, 0x57, 0xa3, 0xb2, 0x8f, 0xeb, 0x95, 0x7d, 0xac, 0xff, 0x89, 0xe0, 0x49, 0xe5, 0xa5, 0xd7,
0xcd, 0xf2, 0x35, 0xd4, 0x0b, 0x84, 0x3f, 0x79, 0x3b, 0xe1, 0x75, 0xae, 0x44, 0x06, 0x24, 0xb7,
0x7c, 0x45, 0x39, 0xf7, 0x5e, 0xd2, 0x3c, 0x93, 0x36, 0x51, 0x33, 0xc5, 0xf4, 0x8b, 0x4d, 0x58,
0xdb, 0x68, 0x42, 0xfd, 0x1f, 0x04, 0x9d, 0xf2, 0xe6, 0xfb, 0xf0, 0x7b, 0x04, 0x0d, 0x39, 0x8b,
0xd6, 0xdc, 0x0e, 0x13, 0xf3, 0x7e, 0x5e, 0x15, 0xdc, 0xeb, 0x95, 0xdc, 0x35, 0x68, 0x64, 0xf7,
0x97, 0x43, 0xb5, 0x4d, 0x72, 0x53, 0xff, 0x0b, 0xc1, 0xa3, 0xed, 0x09, 0x21, 0xdb, 0x1c, 0x7f,
0x05, 0xcd, 0xac, 0xe3, 0x79, 0x06, 0xf1, 0xf1, 0x8e, 0xb1, 0xb2, 0xf6, 0xad, 0xe0, 0xa7, 0x16,
0xf9, 0x49, 0x20, 0x73, 0x37, 0x71, 0x0f, 0x18, 0xcd, 0xbb, 0xa9, 0xe5, 0x4d, 0xe7, 0x24, 0x93,
0xf4, 0xbf, 0x15, 0x38, 0xdd, 0xbe, 0xd5, 0x32, 0x62, 0x62, 0xc7, 0x08, 0xfc, 0x76, 0x73, 0x04,
0x3e, 0xdb, 0x35, 0x02, 0x93, 0xad, 0x2a, 0x87, 0xde, 0xbb, 0xc0, 0xad, 0xff, 0xbc, 0xcf, 0x70,
0x3c, 0x86, 0xd6, 0x0b, 0x62, 0x5b, 0x57, 0xc5, 0x7f, 0x86, 0xd2, 0x90, 0x53, 0x12, 0xcd, 0xb2,
0x1d, 0x97, 0x18, 0x57, 0xe6, 0xd8, 0x31, 0x88, 0x71, 0xd1, 0xa9, 0xe9, 0x31, 0x68, 0xdb, 0x09,
0x65, 0x6f, 0x7e, 0x13, 0x3d, 0x2a, 0xa3, 0xff, 0x06, 0x1a, 0x4c, 0xe6, 0xce, 0x35, 0x45, 0x16,
0xb4, 0x7b, 0x1f, 0x24, 0x92, 0x07, 0x4c, 0x0e, 0xa5, 0xe7, 0x97, 0xff, 0x05, 0x00, 0x00, 0xff,
0xff, 0x8b, 0x53, 0x27, 0x32, 0x99, 0x08, 0x00, 0x00,
// 836 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x94, 0x51, 0x6f, 0xe3, 0x44,
0x10, 0xc7, 0xb1, 0x9d, 0x36, 0xf1, 0x34, 0xb4, 0xbe, 0x55, 0xaf, 0x67, 0x4e, 0x1c, 0xe4, 0x0c,
0x12, 0xd1, 0x21, 0x45, 0xa8, 0x48, 0x70, 0xe2, 0x89, 0xd0, 0xba, 0xc5, 0x6a, 0x63, 0x87, 0x8d,
0xc3, 0x09, 0x09, 0xc9, 0x72, 0xe2, 0xcd, 0xc5, 0xaa, 0xcf, 0x36, 0xde, 0xf5, 0xa1, 0x3c, 0x20,
0xf1, 0x09, 0x90, 0x78, 0xe5, 0x53, 0xa0, 0xfb, 0x0e, 0x7c, 0x2f, 0xe4, 0xb5, 0x1d, 0x1c, 0xc7,
0x4d, 0xf3, 0xc0, 0x93, 0x3d, 0xb3, 0x33, 0xbb, 0x3b, 0xbf, 0xd9, 0xf9, 0x83, 0x1a, 0xa7, 0x74,
0xe9, 0x84, 0x11, 0xf3, 0x17, 0xfe, 0xdc, 0x65, 0x7e, 0x14, 0xd2, 0x41, 0x9c, 0x44, 0x2c, 0x42,
0x1d, 0xfe, 0x99, 0xa5, 0x0b, 0xed, 0x1f, 0x09, 0x3e, 0x1c, 0xa7, 0x74, 0x69, 0x56, 0xa2, 0x30,
0x79, 0xed, 0x53, 0x96, 0xf0, 0x7f, 0x64, 0x01, 0xb0, 0xe8, 0x8e, 0x84, 0x0e, 0x5b, 0xc5, 0x44,
0x15, 0x7a, 0x42, 0xff, 0xf8, 0xfc, 0x8b, 0x41, 0x99, 0x3f, 0xd8, 0x95, 0x3b, 0xb0, 0xb3, 0x44,
0x7b, 0x15, 0x13, 0x2c, 0xb3, 0xf2, 0x17, 0x9d, 0xc2, 0x01, 0x37, 0x54, 0xb1, 0x27, 0xf4, 0x65,
0x9c, 0x1b, 0xe8, 0x33, 0x38, 0xf1, 0x43, 0xca, 0xdc, 0x20, 0xe0, 0xa9, 0x8e, 0xef, 0xa9, 0x12,
0x5f, 0x3f, 0xae, 0xba, 0x0d, 0x0f, 0x3d, 0x87, 0xae, 0x3b, 0x9f, 0x13, 0x4a, 0x9d, 0x7c, 0x97,
0x16, 0x8f, 0x3a, 0xca, 0x7d, 0xfc, 0x40, 0xa4, 0x42, 0x9b, 0x84, 0xee, 0x2c, 0x20, 0x9e, 0x7a,
0xd0, 0x13, 0xfa, 0x1d, 0x5c, 0x9a, 0xd9, 0xca, 0x5b, 0x92, 0x50, 0x3f, 0x0a, 0xd5, 0xc3, 0x9e,
0xd0, 0x6f, 0xe1, 0xd2, 0x44, 0x2f, 0xe0, 0x91, 0x1b, 0x04, 0xd1, 0xaf, 0xc4, 0x73, 0x52, 0x4a,
0x12, 0x27, 0xf0, 0x29, 0x53, 0xdb, 0x3d, 0xa9, 0xdf, 0xc5, 0x27, 0xc5, 0xc2, 0x94, 0x92, 0xe4,
0xd6, 0xa7, 0x2c, 0x8b, 0x9d, 0x05, 0xd1, 0xfc, 0x8e, 0x78, 0xce, 0x7c, 0xe9, 0xb2, 0x3c, 0xb6,
0x93, 0xc7, 0x16, 0x0b, 0x17, 0x4b, 0x97, 0xf1, 0xd8, 0x8f, 0x00, 0xd2, 0x30, 0xe1, 0x50, 0x48,
0xa2, 0xca, 0xfc, 0x3a, 0x15, 0x8f, 0x76, 0x05, 0xf2, 0x9a, 0x12, 0x3a, 0x03, 0x34, 0x35, 0x6f,
0x4c, 0xeb, 0x95, 0xe9, 0xd8, 0xd6, 0x8d, 0x6e, 0x3a, 0xf6, 0x4f, 0x63, 0x5d, 0x79, 0x0f, 0xbd,
0x0f, 0xf2, 0x70, 0x5c, 0xf8, 0x14, 0x01, 0x21, 0x38, 0xbe, 0x32, 0xb0, 0xfe, 0xdd, 0x70, 0xa2,
0x17, 0x3e, 0x51, 0x7b, 0x27, 0xc2, 0xa7, 0xbb, 0x7a, 0x81, 0x09, 0x8d, 0xa3, 0x90, 0x92, 0x0c,
0x01, 0x4d, 0x39, 0x2c, 0xde, 0xcc, 0x0e, 0x2e, 0x4d, 0x64, 0xc2, 0x01, 0x49, 0x92, 0x28, 0xe1,
0x8d, 0x39, 0x3e, 0x7f, 0xb9, 0x5f, 0x93, 0xcb, 0x8d, 0x07, 0x7a, 0x96, 0xcb, 0x9b, 0x9d, 0x6f,
0x83, 0x9e, 0x01, 0x24, 0xe4, 0x97, 0x94, 0x50, 0x56, 0x76, 0xb3, 0x8b, 0xe5, 0xc2, 0x63, 0x78,
0xda, 0xef, 0x02, 0xc8, 0xeb, 0x9c, 0x6a, 0xe9, 0x3a, 0xc6, 0x16, 0x2e, 0x4b, 0x7f, 0x0c, 0x8f,
0x46, 0xc3, 0xdb, 0x2b, 0x0b, 0x8f, 0xf4, 0x4b, 0x67, 0xa4, 0x4f, 0x26, 0xc3, 0x6b, 0x5d, 0x11,
0xd0, 0x29, 0x28, 0x3f, 0xea, 0x78, 0x62, 0x58, 0xa6, 0x33, 0x32, 0x26, 0xa3, 0xa1, 0x7d, 0xf1,
0xbd, 0x22, 0xa2, 0xa7, 0x70, 0x36, 0x35, 0x27, 0xd3, 0xf1, 0xd8, 0xc2, 0xb6, 0x7e, 0x59, 0x65,
0x28, 0x65, 0xd0, 0x0c, 0xd3, 0xd6, 0xb1, 0x39, 0xbc, 0xcd, 0x4f, 0x50, 0x5a, 0xda, 0x1f, 0x02,
0x3c, 0xaf, 0xd7, 0x36, 0xf4, 0xde, 0x92, 0x84, 0xf9, 0x94, 0xbc, 0x21, 0x21, 0x33, 0xc2, 0x45,
0x94, 0xd5, 0x11, 0xa7, 0xb3, 0xc0, 0x9f, 0x3b, 0x77, 0x64, 0xc5, 0xa1, 0x75, 0xb1, 0x9c, 0x7b,
0x6e, 0xc8, 0x6a, 0xeb, 0x41, 0x8a, 0xdb, 0x0f, 0x72, 0xdf, 0xc7, 0xad, 0xfd, 0x06, 0xea, 0x45,
0x14, 0x32, 0x77, 0xce, 0x2e, 0x22, 0x8f, 0x6c, 0x5c, 0x05, 0xb9, 0x70, 0xb6, 0x35, 0xcf, 0x8e,
0x1f, 0x2e, 0x22, 0x55, 0xe8, 0x49, 0xfd, 0xa3, 0xf3, 0xcf, 0xef, 0xef, 0xd7, 0x56, 0x4d, 0xf8,
0x34, 0xae, 0x85, 0x64, 0x5e, 0xed, 0x25, 0x3c, 0xae, 0xa7, 0xfe, 0x90, 0x92, 0x64, 0x85, 0x3e,
0x86, 0xa3, 0xff, 0x10, 0x50, 0x7e, 0x60, 0x17, 0xc3, 0x9a, 0x01, 0xd5, 0xde, 0x09, 0xf0, 0x41,
0x63, 0x2a, 0x27, 0x58, 0x47, 0x24, 0xec, 0x85, 0x48, 0x6c, 0x9c, 0xff, 0xcd, 0x6e, 0x48, 0xf5,
0x6e, 0x34, 0xce, 0x71, 0xab, 0x71, 0x8e, 0xb5, 0x3f, 0x05, 0x78, 0xd6, 0x78, 0xe9, 0xf5, 0xb0,
0x7c, 0x0d, 0xad, 0x0a, 0xe1, 0x4f, 0xee, 0x27, 0xbc, 0xae, 0x15, 0xf3, 0x84, 0xec, 0x96, 0x6f,
0x08, 0xa5, 0xee, 0x6b, 0x52, 0x56, 0xd2, 0xc5, 0x72, 0xe1, 0x31, 0xbc, 0xea, 0x10, 0x4a, 0x1b,
0x43, 0xa8, 0xfd, 0x2d, 0x80, 0x52, 0xdf, 0x7c, 0x1f, 0x7e, 0x4f, 0xa0, 0xcd, 0xb5, 0x68, 0xcd,
0xed, 0x30, 0x33, 0x1f, 0xe6, 0xd5, 0xc0, 0xbd, 0xd5, 0xc8, 0x5d, 0x85, 0x76, 0x71, 0x7f, 0x2e,
0xaa, 0x5d, 0x5c, 0x9a, 0x5a, 0x0c, 0x4f, 0xb6, 0x05, 0x82, 0x4f, 0x39, 0xfa, 0x0a, 0x3a, 0xc5,
0xc0, 0xd3, 0x82, 0xe1, 0xd3, 0x1d, 0xaa, 0xb2, 0x8e, 0x7d, 0x00, 0x9f, 0xf6, 0x97, 0x08, 0x67,
0xdb, 0x47, 0xc6, 0x51, 0xc2, 0x76, 0xc8, 0xdb, 0xb7, 0x9b, 0xf2, 0xf6, 0x62, 0x97, 0xbc, 0x65,
0x5b, 0x35, 0x0a, 0xda, 0xff, 0x81, 0x52, 0xfb, 0x79, 0x1f, 0xe1, 0x3b, 0x81, 0xa3, 0x57, 0xd8,
0x32, 0xaf, 0xab, 0xaa, 0x5f, 0x13, 0x30, 0x31, 0xf3, 0x99, 0x96, 0xed, 0x60, 0xfd, 0xda, 0x98,
0xd8, 0x3a, 0xd6, 0x2f, 0x15, 0x49, 0x4b, 0x41, 0xdd, 0x2e, 0xa8, 0x78, 0xcf, 0x9b, 0x5c, 0x85,
0xfa, 0xb3, 0xfc, 0x06, 0xda, 0x09, 0xaf, 0x9d, 0xaa, 0x22, 0xef, 0x56, 0xef, 0x21, 0x48, 0xb8,
0x4c, 0x98, 0x1d, 0xf2, 0xc8, 0x2f, 0xff, 0x0d, 0x00, 0x00, 0xff, 0xff, 0x9e, 0x28, 0xe3, 0xa0,
0x75, 0x08, 0x00, 0x00,
}

View File

@ -70,8 +70,7 @@ message PushNotification {
message PushNotificationRequest {
repeated PushNotification requests = 1;
string message_id = 2;
string ack_required = 3;
bytes message_id = 2;
}
message PushNotificationReport {
@ -87,8 +86,7 @@ message PushNotificationReport {
string installation_id = 4;
}
// TOP LEVEL
message PushNotificationResponse {
string message_id = 1;
bytes message_id = 1;
repeated PushNotificationReport reports = 2;
}

View File

@ -1,16 +0,0 @@
CREATE TABLE IF NOT EXISTS push_notification_client_servers (
public_key BLOB NOT NULL,
registered BOOLEAN DEFAULT FALSE,
registered_at INT NOT NULL DEFAULT 0,
access_token TEXT,
UNIQUE(public_key) ON CONFLICT REPLACE
);
CREATE TABLE IF NOT EXISTS push_notification_client_info (
public_key BLOB NOT NULL,
installation_id TEXT NOT NULL,
access_token TEXT NOT NULL,
UNIQUE(public_key, installation_id) ON CONFLICT REPLACE
);
CREATE INDEX idx_push_notification_client_info_public_key ON push_notification_client_info(public_key, installation_id);

View File

@ -0,0 +1,40 @@
CREATE TABLE IF NOT EXISTS push_notification_client_servers (
public_key BLOB NOT NULL,
registered BOOLEAN DEFAULT FALSE,
registered_at INT NOT NULL DEFAULT 0,
access_token TEXT,
UNIQUE(public_key) ON CONFLICT REPLACE
);
CREATE TABLE IF NOT EXISTS push_notification_client_queries (
public_key BLOB NOT NULL,
queried_at INT NOT NULL,
query_id BLOB NOT NULL,
UNIQUE(public_key,query_id) ON CONFLICT REPLACE
);
CREATE TABLE IF NOT EXISTS push_notification_client_info (
public_key BLOB NOT NULL,
server_public_key BLOB NOT NULL,
installation_id TEXT NOT NULL,
access_token TEXT NOT NULL,
retrieved_at INT NOT NULL,
UNIQUE(public_key, installation_id, server_public_key) ON CONFLICT REPLACE
);
CREATE TABLE IF NOT EXISTS push_notification_client_tracked_messages (
message_id BLOB NOT NULL,
chat_id TEXT NOT NULL,
tracked_at INT NOT NULL,
UNIQUE(message_id) ON CONFLICT IGNORE
);
CREATE TABLE IF NOT EXISTS push_notification_client_sent_notifications (
message_id BLOB NOT NULL,
public_key BLOB NOT NULL,
installation_id TEXT NOT NULL,
sent_at INT NOT NULL,
UNIQUE(message_id, public_key, installation_id)
);
CREATE INDEX idx_push_notification_client_info_public_key ON push_notification_client_info(public_key, installation_id);

View File

@ -6,4 +6,4 @@
package migrations
//go:generate go-bindata -pkg migrations -o ./migrations.go .
//go:generate go-bindata -pkg migrations -o ../migrations.go ./

View File

@ -1,9 +1,11 @@
package push_notification_client
import (
"context"
"crypto/ecdsa"
"database/sql"
"strings"
"time"
"github.com/status-im/status-go/eth-node/crypto"
)
@ -16,16 +18,126 @@ func NewPersistence(db *sql.DB) *Persistence {
return &Persistence{db: db}
}
func (p *Persistence) TrackPushNotification(messageID []byte) error {
func (p *Persistence) TrackPushNotification(chatID string, messageID []byte) error {
trackedAt := time.Now().Unix()
_, err := p.db.Exec(`INSERT INTO push_notification_client_tracked_messages (chat_id, message_id, tracked_at) VALUES (?,?,?)`, chatID, messageID, trackedAt)
return err
}
func (p *Persistence) SavePushNotificationQuery(publicKey *ecdsa.PublicKey, queryID []byte) error {
queriedAt := time.Now().Unix()
_, err := p.db.Exec(`INSERT INTO push_notification_client_queries (public_key, query_id, queried_at) VALUES (?,?,?)`, crypto.CompressPubkey(publicKey), queryID, queriedAt)
return err
}
func (p *Persistence) GetQueriedAt(publicKey *ecdsa.PublicKey) (int64, error) {
var queriedAt int64
err := p.db.QueryRow(`SELECT queried_at FROM push_notification_client_queries WHERE public_key = ? ORDER BY queried_at DESC LIMIT 1`, crypto.CompressPubkey(publicKey)).Scan(&queriedAt)
if err == sql.ErrNoRows {
return 0, nil
}
if err != nil {
return 0, err
}
return queriedAt, nil
}
func (p *Persistence) GetQueryPublicKey(queryID []byte) (*ecdsa.PublicKey, error) {
var publicKeyBytes []byte
err := p.db.QueryRow(`SELECT public_key FROM push_notification_client_queries WHERE query_id = ?`, queryID).Scan(&publicKeyBytes)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
publicKey, err := crypto.DecompressPubkey(publicKeyBytes)
if err != nil {
return nil, err
}
return publicKey, nil
}
func (p *Persistence) SavePushNotificationInfo(infos []*PushNotificationInfo) error {
tx, err := p.db.BeginTx(context.Background(), &sql.TxOptions{})
defer func() {
if err == nil {
err = tx.Commit()
return
}
// don't shadow original error
_ = tx.Rollback()
}()
for _, info := range infos {
_, err := tx.Exec(`INSERT INTO push_notification_client_info (public_key, server_public_key, installation_id, access_token, retrieved_at) VALUES (?, ?, ?, ?, ?)`, crypto.CompressPubkey(info.PublicKey), crypto.CompressPubkey(info.ServerPublicKey), info.InstallationID, info.AccessToken, info.RetrievedAt)
if err != nil {
return err
}
}
return nil
}
func (p *Persistence) ShouldSentNotificationFor(publicKey *ecdsa.PublicKey, messageID []byte) (bool, error) {
return false, nil
func (p *Persistence) GetPushNotificationInfo(publicKey *ecdsa.PublicKey, installationIDs []string) ([]*PushNotificationInfo, error) {
queryArgs := make([]interface{}, 0, len(installationIDs)+1)
queryArgs = append(queryArgs, crypto.CompressPubkey(publicKey))
for _, installationID := range installationIDs {
queryArgs = append(queryArgs, installationID)
}
inVector := strings.Repeat("?, ", len(installationIDs)-1) + "?"
rows, err := p.db.Query(`SELECT server_public_key, installation_id, access_token, retrieved_at FROM push_notification_client_info WHERE public_key = ? AND installation_id IN (`+inVector+`)`, queryArgs...)
if err != nil {
return nil, err
}
var infos []*PushNotificationInfo
for rows.Next() {
var serverPublicKeyBytes []byte
info := &PushNotificationInfo{PublicKey: publicKey}
err := rows.Scan(&serverPublicKeyBytes, &info.InstallationID, &info.AccessToken, &info.RetrievedAt)
if err != nil {
return nil, err
}
serverPublicKey, err := crypto.DecompressPubkey(serverPublicKeyBytes)
if err != nil {
return nil, err
}
info.ServerPublicKey = serverPublicKey
infos = append(infos, info)
}
return infos, nil
}
func (p *Persistence) SentFor(publicKey *ecdsa.PublicKey, messageID []byte) error {
return nil
func (p *Persistence) ShouldSentNotificationFor(publicKey *ecdsa.PublicKey, installationID string, messageID []byte) (bool, error) {
// First we check that we are tracking this message, next we check that we haven't already sent this
var count uint64
err := p.db.QueryRow(`SELECT COUNT(1) FROM push_notification_client_tracked_messages WHERE message_id = ?`, messageID).Scan(&count)
if err != nil {
return false, err
}
if count == 0 {
return false, nil
}
err = p.db.QueryRow(`SELECT COUNT(1) FROM push_notification_client_sent_notifications WHERE message_id = ? AND public_key = ? AND installation_id = ? `, messageID, crypto.CompressPubkey(publicKey), installationID).Scan(&count)
if err != nil {
return false, err
}
return count == 0, nil
}
func (p *Persistence) NotifiedOn(publicKey *ecdsa.PublicKey, installationID string, messageID []byte) error {
sentAt := time.Now().Unix()
_, err := p.db.Exec(`INSERT INTO push_notification_client_sent_notifications (public_key, installation_id, message_id, sent_at) VALUES (?, ?, ?, ?)`, crypto.CompressPubkey(publicKey), installationID, messageID, sentAt)
return err
}
func (p *Persistence) UpsertServer(server *PushNotificationServer) error {

View File

@ -72,3 +72,69 @@ func (s *SQLitePersistenceSuite) TestSaveAndRetrieveServer() {
s.Require().Equal(int64(2), retrievedServers[0].RegisteredAt)
s.Require().True(common.IsPubKeyEqual(retrievedServers[0].PublicKey, &key.PublicKey))
}
func (s *SQLitePersistenceSuite) TestSaveAndRetrieveInfo() {
installationID1 := "installation-id-1"
installationID2 := "installation-id-2"
installationID3 := "installation-id-3"
key1, err := crypto.GenerateKey()
s.Require().NoError(err)
key2, err := crypto.GenerateKey()
s.Require().NoError(err)
serverKey, err := crypto.GenerateKey()
s.Require().NoError(err)
accessToken := "token"
infos := []*PushNotificationInfo{
{
PublicKey: &key1.PublicKey,
ServerPublicKey: &serverKey.PublicKey,
RetrievedAt: 1,
AccessToken: accessToken,
InstallationID: installationID1,
},
{
PublicKey: &key1.PublicKey,
ServerPublicKey: &serverKey.PublicKey,
RetrievedAt: 1,
AccessToken: accessToken,
InstallationID: installationID2,
},
{
PublicKey: &key1.PublicKey,
ServerPublicKey: &serverKey.PublicKey,
RetrievedAt: 1,
AccessToken: accessToken,
InstallationID: installationID3,
},
{
PublicKey: &key2.PublicKey,
ServerPublicKey: &serverKey.PublicKey,
RetrievedAt: 1,
AccessToken: accessToken,
InstallationID: installationID1,
},
{
PublicKey: &key2.PublicKey,
ServerPublicKey: &serverKey.PublicKey,
RetrievedAt: 1,
AccessToken: accessToken,
InstallationID: installationID2,
},
{
PublicKey: &key2.PublicKey,
ServerPublicKey: &serverKey.PublicKey,
RetrievedAt: 1,
AccessToken: accessToken,
InstallationID: installationID3,
},
}
s.Require().NoError(s.persistence.SavePushNotificationInfo(infos))
retrievedInfos, err := s.persistence.GetPushNotificationInfo(&key1.PublicKey, []string{installationID1, installationID2})
s.Require().NoError(err)
s.Require().Len(retrievedInfos, 2)
}

View File

@ -4,6 +4,7 @@ import (
"context"
"crypto/aes"
"crypto/cipher"
"sort"
"bytes"
"crypto/ecdsa"
@ -16,6 +17,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/crypto/ecies"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/protobuf"
@ -24,6 +26,7 @@ import (
const encryptedPayloadKeyLength = 16
const accessTokenKeyLength = 16
const staleQueryTimeInSeconds = 86400
type PushNotificationServer struct {
PublicKey *ecdsa.PublicKey
@ -33,9 +36,11 @@ type PushNotificationServer struct {
}
type PushNotificationInfo struct {
AccessToken string
InstallationID string
PublicKey *ecdsa.PublicKey
AccessToken string
InstallationID string
PublicKey *ecdsa.PublicKey
ServerPublicKey *ecdsa.PublicKey
RetrievedAt int64
}
type Config struct {
@ -79,18 +84,15 @@ type Client struct {
//messageProcessor is a message processor used to send and being notified of messages
messageProcessor *common.MessageProcessor
//pushNotificationQueryResponses is a channel that listens to pushNotificationResponse
pushNotificationQueryResponses chan *protobuf.PushNotificationQueryResponse
}
func New(persistence *Persistence, config *Config, processor *common.MessageProcessor) *Client {
return &Client{
quit: make(chan struct{}),
config: config,
pushNotificationQueryResponses: make(chan *protobuf.PushNotificationQueryResponse),
messageProcessor: processor,
persistence: persistence,
reader: rand.Reader}
quit: make(chan struct{}),
config: config,
messageProcessor: processor,
persistence: persistence,
reader: rand.Reader}
}
func (c *Client) Start() error {
@ -104,7 +106,7 @@ func (c *Client) Start() error {
select {
case m := <-subscription:
if err := c.HandleMessageSent(m); err != nil {
// TODO: log
c.config.Logger.Error("failed to handle message", zap.Error(err))
}
case <-c.quit:
return
@ -119,23 +121,150 @@ func (c *Client) Stop() error {
return nil
}
// Sends an actual push notification, where do we get the chatID?
func sendPushNotificationTo(publicKey *ecdsa.PublicKey, chatID string) error {
type notificationSendingSpec struct {
serverPublicKey *ecdsa.PublicKey
installationID string
messageID []byte
}
// The message has been sent
// We should:
// 1) Check whether we should notify on anything
// 2) Refresh info if necessaary
// 3) Sent push notifications
func (c *Client) HandleMessageSent(sentMessage *common.SentMessage) error {
if !c.config.SendEnabled {
return nil
}
publicKey := sentMessage.PublicKey
var installationIDs []string
var notificationSpecs []*notificationSendingSpec
//Find if there's any actionable message
for _, messageID := range sentMessage.MessageIDs {
for _, installation := range sentMessage.Spec.Installations {
installationID := installation.ID
shouldNotify, err := c.shouldNotifyOn(publicKey, installationID, messageID)
if err != nil {
return err
}
if shouldNotify {
notificationSpecs = append(notificationSpecs, &notificationSendingSpec{
installationID: installationID,
messageID: messageID,
})
installationIDs = append(installationIDs, installation.ID)
}
}
}
// Is there anything we should be notifying on?
if len(installationIDs) == 0 {
return nil
}
// Check if we queried recently
queriedAt, err := c.persistence.GetQueriedAt(publicKey)
if err != nil {
return err
}
// Naively query again if too much time has passed.
// Here it might not be necessary
if time.Now().Unix()-queriedAt > staleQueryTimeInSeconds {
err := c.QueryPushNotificationInfo(publicKey)
if err != nil {
return err
}
// This is just horrible, but for now will do,
// the issue is that we don't really know how long it will
// take to reply, as there might be multiple servers
// replying to us.
// The only time we are 100% certain that we can proceed is
// when we have non-stale info for each device, but
// most devices are not going to be registered, so we'd still
// have to wait teh maximum amount of time allowed.
time.Sleep(3 * time.Second)
}
// Retrieve infos
info, err := c.GetPushNotificationInfo(publicKey, installationIDs)
if err != nil {
return err
}
// Naively dispatch to the first server for now
// This wait for an acknowledgement and try a different server after a timeout
// Also we sent a single notification for multiple message ids, need to check with UI what's the desired behavior
// Sort by server so we tend to hit the same one
sort.Slice(info, func(i, j int) bool {
return info[i].ServerPublicKey.X.Cmp(info[j].ServerPublicKey.X) <= 0
})
installationIDsMap := make(map[string]bool)
// One info per installation id, grouped by server
actionableInfos := make(map[string][]*PushNotificationInfo)
for _, i := range info {
if !installationIDsMap[i.InstallationID] {
serverKey := hex.EncodeToString(crypto.CompressPubkey(i.ServerPublicKey))
actionableInfos[serverKey] = append(actionableInfos[serverKey], i)
installationIDsMap[i.InstallationID] = true
}
}
for _, infos := range actionableInfos {
var pushNotifications []*protobuf.PushNotification
for _, i := range infos {
// TODO: Add ChatID, message, public_key
pushNotifications = append(pushNotifications, &protobuf.PushNotification{
AccessToken: i.AccessToken,
PublicKey: common.HashPublicKey(publicKey),
InstallationId: i.InstallationID,
})
}
request := &protobuf.PushNotificationRequest{
MessageId: sentMessage.MessageIDs[0],
Requests: pushNotifications,
}
serverPublicKey := infos[0].ServerPublicKey
payload, err := proto.Marshal(request)
if err != nil {
return err
}
rawMessage := &common.RawMessage{
Payload: payload,
MessageType: protobuf.ApplicationMetadataMessage_PUSH_NOTIFICATION_REQUEST,
}
// TODO: We should use the messageID for the response
_, err = c.messageProcessor.SendPrivate(context.Background(), serverPublicKey, rawMessage)
if err != nil {
return err
}
}
return nil
}
// This should schedule:
// 1) Check we have reasonably fresh push notifications info
// 2) Otherwise it should fetch them
// 3) Send a push notification to the devices in question
func (p *Client) HandleMessageSent(sentMessage *common.SentMessage) error {
return nil
// NotifyOnMessageID keeps track of the message to make sure we notify on it
func (c *Client) NotifyOnMessageID(chatID string, messageID []byte) error {
return c.persistence.TrackPushNotification(chatID, messageID)
}
func (p *Client) NotifyOnMessageID(messageID []byte) error {
return nil
func (c *Client) shouldNotifyOn(publicKey *ecdsa.PublicKey, installationID string, messageID []byte) (bool, error) {
return c.persistence.ShouldSentNotificationFor(publicKey, installationID, messageID)
}
func (c *Client) notifiedOn(publicKey *ecdsa.PublicKey, installationID string, messageID []byte) error {
return c.persistence.NotifiedOn(publicKey, installationID, messageID)
}
func (p *Client) mutedChatIDsHashes(chatIDs []string) [][]byte {
var mutedChatListHashes [][]byte
@ -313,14 +442,42 @@ func (p *Client) HandlePushNotificationAdvertisement(info *protobuf.PushNotifica
}
// HandlePushNotificationQueryResponse should update the data in the database for a given user
func (c *Client) HandlePushNotificationQueryResponse(response protobuf.PushNotificationQueryResponse) error {
func (c *Client) HandlePushNotificationQueryResponse(serverPublicKey *ecdsa.PublicKey, response protobuf.PushNotificationQueryResponse) error {
c.config.Logger.Debug("received push notification query response", zap.Any("response", response))
select {
case c.pushNotificationQueryResponses <- &response:
default:
return errors.New("could not process push notification query response")
if len(response.Info) == 0 {
return errors.New("empty response from the server")
}
publicKey, err := c.persistence.GetQueryPublicKey(response.MessageId)
if err != nil {
return err
}
if publicKey == nil {
c.config.Logger.Debug("query not found")
return nil
}
var pushNotificationInfo []*PushNotificationInfo
for _, info := range response.Info {
if bytes.Compare(info.PublicKey, common.HashPublicKey(publicKey)) != 0 {
c.config.Logger.Warn("reply for different key, ignoring")
continue
}
pushNotificationInfo = append(pushNotificationInfo, &PushNotificationInfo{
PublicKey: publicKey,
ServerPublicKey: serverPublicKey,
AccessToken: info.AccessToken,
InstallationID: info.InstallationId,
RetrievedAt: time.Now().Unix(),
})
}
err = c.persistence.SavePushNotificationInfo(pushNotificationInfo)
if err != nil {
c.config.Logger.Error("failed to save push notifications", zap.Error(err))
return err
}
return nil
}
@ -347,14 +504,14 @@ func (c *Client) AddPushNotificationServer(publicKey *ecdsa.PublicKey) error {
})
}
func (c *Client) RetrievePushNotificationInfo(publicKey *ecdsa.PublicKey) ([]*PushNotificationInfo, error) {
func (c *Client) QueryPushNotificationInfo(publicKey *ecdsa.PublicKey) error {
hashedPublicKey := common.HashPublicKey(publicKey)
query := &protobuf.PushNotificationQuery{
PublicKeys: [][]byte{hashedPublicKey},
}
encodedMessage, err := proto.Marshal(query)
if err != nil {
return nil, err
return err
}
rawMessage := &common.RawMessage{
@ -366,47 +523,20 @@ func (c *Client) RetrievePushNotificationInfo(publicKey *ecdsa.PublicKey) ([]*Pu
c.config.Logger.Debug("sending query")
messageID, err := c.messageProcessor.SendPublic(context.Background(), encodedPublicKey, rawMessage)
// TODO: this is probably best done by polling the database instead
for {
select {
case <-c.quit:
return nil, nil
case <-time.After(5 * time.Second):
return nil, errors.New("no registration query response received")
case response := <-c.pushNotificationQueryResponses:
if bytes.Compare(response.MessageId, messageID) != 0 {
// Not for us, queue back
c.pushNotificationQueryResponses <- response
// This is not accurate, we should then shrink the timeout
// Also we should handle multiple responses
continue
}
if len(response.Info) == 0 {
return nil, errors.New("empty response from the server")
}
var pushNotificationInfo []*PushNotificationInfo
for _, info := range response.Info {
if bytes.Compare(info.PublicKey, hashedPublicKey) != 0 {
continue
}
pushNotificationInfo = append(pushNotificationInfo, &PushNotificationInfo{
PublicKey: publicKey,
AccessToken: info.AccessToken,
InstallationID: info.InstallationId,
})
}
return pushNotificationInfo, nil
}
if err != nil {
return err
}
return c.persistence.SavePushNotificationQuery(publicKey, messageID)
}
func (s *Client) listenToPublicKeyQueryTopic(hashedPublicKey []byte) error {
func (c *Client) GetPushNotificationInfo(publicKey *ecdsa.PublicKey, installationIDs []string) ([]*PushNotificationInfo, error) {
return c.persistence.GetPushNotificationInfo(publicKey, installationIDs)
}
func (c *Client) listenToPublicKeyQueryTopic(hashedPublicKey []byte) error {
encodedPublicKey := hex.EncodeToString(hashedPublicKey)
return s.messageProcessor.JoinPublic(encodedPublicKey)
return c.messageProcessor.JoinPublic(encodedPublicKey)
}
func encryptAccessToken(plaintext []byte, key []byte, reader io.Reader) ([]byte, error) {

View File

@ -3,7 +3,9 @@ package push_notification_client
import (
"bytes"
"crypto/ecdsa"
"io/ioutil"
"math/rand"
"os"
"testing"
@ -12,12 +14,52 @@ import (
"github.com/status-im/status-go/eth-node/crypto/ecies"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/stretchr/testify/require"
"github.com/status-im/status-go/protocol/sqlite"
"github.com/status-im/status-go/protocol/tt"
"github.com/stretchr/testify/suite"
)
func TestBuildPushNotificationRegisterMessage(t *testing.T) {
type ClientSuite struct {
suite.Suite
tmpFile *os.File
persistence *Persistence
identity *ecdsa.PrivateKey
installationID string
client *Client
}
func TestClientSuite(t *testing.T) {
s := new(ClientSuite)
s.installationID = "c6ae4fde-bb65-11ea-b3de-0242ac130004"
suite.Run(t, s)
}
func (s *ClientSuite) SetupTest() {
tmpFile, err := ioutil.TempFile("", "")
s.Require().NoError(err)
s.tmpFile = tmpFile
database, err := sqlite.Open(s.tmpFile.Name(), "")
s.Require().NoError(err)
s.persistence = NewPersistence(database)
identity, err := crypto.GenerateKey()
s.Require().NoError(err)
s.identity = identity
config := &Config{
Identity: identity,
Logger: tt.MustCreateTestLogger(),
RemoteNotificationsEnabled: true,
InstallationID: s.installationID,
}
s.client = New(s.persistence, config, nil)
}
func (s *ClientSuite) TestBuildPushNotificationRegisterMessage() {
myDeviceToken := "device-token"
myInstallationID := "installationID"
mutedChatList := []string{"a", "b"}
// build chat lish hashes
@ -26,9 +68,8 @@ func TestBuildPushNotificationRegisterMessage(t *testing.T) {
mutedChatListHashes = append(mutedChatListHashes, common.Shake256([]byte(chatID)))
}
identity, err := crypto.GenerateKey()
contactKey, err := crypto.GenerateKey()
require.NoError(t, err)
s.Require().NoError(err)
contactIDs := []*ecdsa.PublicKey{&contactKey.PublicKey}
// Set random generator for uuid
@ -41,43 +82,77 @@ func TestBuildPushNotificationRegisterMessage(t *testing.T) {
// set up reader
reader := bytes.NewReader([]byte(expectedUUID))
sharedKey, err := ecies.ImportECDSA(identity).GenerateShared(
sharedKey, err := ecies.ImportECDSA(s.identity).GenerateShared(
ecies.ImportECDSAPublic(&contactKey.PublicKey),
accessTokenKeyLength,
accessTokenKeyLength,
)
require.NoError(t, err)
s.Require().NoError(err)
// build encrypted token
encryptedToken, err := encryptAccessToken([]byte(expectedUUID), sharedKey, reader)
require.NoError(t, err)
s.Require().NoError(err)
// Reset random generator
uuid.SetRand(rand.New(rand.NewSource(seed)))
config := &Config{
Identity: identity,
RemoteNotificationsEnabled: true,
InstallationID: myInstallationID,
}
client := &Client{}
client.config = config
client.DeviceToken = myDeviceToken
s.client.DeviceToken = myDeviceToken
// Set reader
client.reader = bytes.NewReader([]byte(expectedUUID))
s.client.reader = bytes.NewReader([]byte(expectedUUID))
options := &protobuf.PushNotificationRegistration{
Version: 1,
AccessToken: expectedUUID,
Token: myDeviceToken,
InstallationId: myInstallationID,
InstallationId: s.installationID,
Enabled: true,
BlockedChatList: mutedChatListHashes,
AllowedUserList: [][]byte{encryptedToken},
}
actualMessage, err := client.buildPushNotificationRegistrationMessage(contactIDs, mutedChatList)
require.NoError(t, err)
actualMessage, err := s.client.buildPushNotificationRegistrationMessage(contactIDs, mutedChatList)
s.Require().NoError(err)
require.Equal(t, options, actualMessage)
s.Require().Equal(options, actualMessage)
}
func (s *ClientSuite) TestNotifyOnMessageID() {
messageID := []byte("message-id")
chatID := "chat-id"
installationID1 := "1"
installationID2 := "2"
s.Require().NoError(s.client.NotifyOnMessageID(chatID, messageID))
key1, err := crypto.GenerateKey()
s.Require().NoError(err)
// First time, should notify
response, err := s.client.shouldNotifyOn(&key1.PublicKey, installationID1, messageID)
s.Require().NoError(err)
s.Require().True(response)
// Save notification
s.Require().NoError(s.client.notifiedOn(&key1.PublicKey, installationID1, messageID))
// Second time, should not notify
response, err = s.client.shouldNotifyOn(&key1.PublicKey, installationID1, messageID)
s.Require().NoError(err)
s.Require().False(response)
// Different installationID
response, err = s.client.shouldNotifyOn(&key1.PublicKey, installationID2, messageID)
s.Require().NoError(err)
s.Require().True(response)
key2, err := crypto.GenerateKey()
s.Require().NoError(err)
// different key, should notify
response, err = s.client.shouldNotifyOn(&key2.PublicKey, installationID1, messageID)
s.Require().NoError(err)
s.Require().True(response)
// non tracked message id
response, err = s.client.shouldNotifyOn(&key1.PublicKey, installationID1, []byte("not-existing"))
s.Require().NoError(err)
s.Require().False(response)
}

View File

@ -6,4 +6,4 @@
package migrations
//go:generate go-bindata -pkg migrations -o ./migrations.go .
//go:generate go-bindata -pkg migrations -o ../migrations.go ./

View File

@ -14,6 +14,8 @@ type Persistence interface {
GetPushNotificationRegistrationByPublicKeyAndInstallationID(publicKey []byte, installationID string) (*protobuf.PushNotificationRegistration, error)
// GetPushNotificationRegistrationByPublicKey retrieve all the push notification registrations from storage given a public key
GetPushNotificationRegistrationByPublicKeys(publicKeys [][]byte) ([]*PushNotificationIDAndRegistration, error)
//GetPushNotificationRegistrationPublicKeys return all the public keys stored
GetPushNotificationRegistrationPublicKeys() ([][]byte, error)
// DeletePushNotificationRegistration deletes a push notification registration from storage given a public key and installation id
DeletePushNotificationRegistration(publicKey []byte, installationID string) error
@ -88,6 +90,26 @@ func (p *SQLitePersistence) GetPushNotificationRegistrationByPublicKeys(publicKe
return registrations, nil
}
func (p *SQLitePersistence) GetPushNotificationRegistrationPublicKeys() ([][]byte, error) {
rows, err := p.db.Query(`SELECT public_key FROM push_notification_server_registrations`)
if err != nil {
return nil, err
}
defer rows.Close()
var publicKeys [][]byte
for rows.Next() {
var publicKey []byte
err := rows.Scan(&publicKey)
if err != nil {
return nil, err
}
publicKeys = append(publicKeys, publicKey)
}
return publicKeys, nil
}
func (p *SQLitePersistence) SavePushNotificationRegistration(publicKey []byte, registration *protobuf.PushNotificationRegistration) error {
marshaledRegistration, err := proto.Marshal(registration)
if err != nil {

View File

@ -163,6 +163,7 @@ func (p *Server) HandlePushNotificationRequest(request *protobuf.PushNotificatio
response.MessageId = request.MessageId
// TODO: filter by chat id
// Collect successful requests & registrations
var requestAndRegistrations []*RequestAndRegistration
@ -256,6 +257,20 @@ func (s *Server) HandlePushNotificationRegistration(publicKey *ecdsa.PublicKey,
return response
}
func (s *Server) Start() error {
pks, err := s.persistence.GetPushNotificationRegistrationPublicKeys()
if err != nil {
return err
}
for _, pk := range pks {
if err := s.listenToPublicKeyQueryTopic(pk); err != nil {
return err
}
}
return nil
}
func (s *Server) listenToPublicKeyQueryTopic(hashedPublicKey []byte) error {
if s.messageProcessor == nil {
return nil

View File

@ -123,7 +123,8 @@ func (s *MessengerPushNotificationSuite) TestReceivePushNotification() {
bob1 := s.m
bob2 := s.newMessengerWithKey(s.shh, s.m.identity)
server := s.newPushNotificationServer(s.shh)
client2 := s.newMessenger(s.shh)
alice := s.newMessenger(s.shh)
bobInstallationIDs := []string{bob1.installationID, bob2.installationID}
// Register bob1
err := bob1.AddPushNotificationServer(context.Background(), &server.identity.PublicKey)
@ -213,11 +214,8 @@ func (s *MessengerPushNotificationSuite) TestReceivePushNotification() {
s.Require().Len(bob2Servers, 1)
s.Require().True(bob2Servers[0].Registered)
var info []*push_notification_client.PushNotificationInfo
go func() {
info, err = client2.pushNotificationClient.RetrievePushNotificationInfo(&bob2.identity.PublicKey)
errChan <- err
}()
err = alice.pushNotificationClient.QueryPushNotificationInfo(&bob2.identity.PublicKey)
s.Require().NoError(err)
// Receive push notification query
// TODO: find a better way to handle this waiting
@ -236,21 +234,23 @@ func (s *MessengerPushNotificationSuite) TestReceivePushNotification() {
// Receive push notification query response
// TODO: find a better way to handle this waiting
time.Sleep(500 * time.Millisecond)
_, err = client2.RetrieveAll()
_, err = alice.RetrieveAll()
s.Require().NoError(err)
time.Sleep(500 * time.Millisecond)
_, err = client2.RetrieveAll()
_, err = alice.RetrieveAll()
s.Require().NoError(err)
time.Sleep(500 * time.Millisecond)
_, err = client2.RetrieveAll()
_, err = alice.RetrieveAll()
s.Require().NoError(err)
err = <-errChan
// Here we should poll, as we don't know whether they are already there
info, err := alice.pushNotificationClient.GetPushNotificationInfo(&bob1.identity.PublicKey, bobInstallationIDs)
s.Require().NoError(err)
s.Require().NotNil(info)
// Check we have replies for both bob1 and bob2
s.Require().NotNil(info)
s.Require().Len(info, 2)
var bob1Info, bob2Info *push_notification_client.PushNotificationInfo
@ -264,17 +264,18 @@ func (s *MessengerPushNotificationSuite) TestReceivePushNotification() {
}
s.Require().NotNil(bob1Info)
s.Require().Equal(bob1Info, &push_notification_client.PushNotificationInfo{
InstallationID: bob1.installationID,
AccessToken: bob1Servers[0].AccessToken,
PublicKey: &bob1.identity.PublicKey,
})
s.Require().Equal(bob1.installationID, bob1Info.InstallationID)
s.Require().Equal(bob1Info.AccessToken, bob1Servers[0].AccessToken, bob1Info.AccessToken)
s.Require().Equal(&bob1.identity.PublicKey, bob1Info.PublicKey)
s.Require().NotNil(bob2Info)
s.Require().Equal(bob2Info, &push_notification_client.PushNotificationInfo{
InstallationID: bob2.installationID,
AccessToken: bob2Servers[0].AccessToken,
PublicKey: &bob1.identity.PublicKey,
})
s.Require().Equal(bob2.installationID, bob2Info.InstallationID)
s.Require().Equal(bob2Servers[0].AccessToken, bob2Info.AccessToken)
s.Require().Equal(&bob2.identity.PublicKey, bob2Info.PublicKey)
retrievedNotificationInfo, err := alice.pushNotificationClient.GetPushNotificationInfo(&bob1.identity.PublicKey, bobInstallationIDs)
alice.logger.Info("BOB KEY", zap.Any("key", bob1.identity.PublicKey))
s.Require().NoError(err)
s.Require().NotNil(retrievedNotificationInfo)
s.Require().Len(retrievedNotificationInfo, 2)
}