From 3afde67022b82cb59a17f9a2b8afd2a673f04c5d Mon Sep 17 00:00:00 2001 From: Andrea Maria Piana Date: Mon, 6 Jul 2020 10:54:22 +0200 Subject: [PATCH] Move message processor to common and allow subscribing to sent messages --- protocol/common/feature_flags.go | 8 ++ protocol/{ => common}/message_processor.go | 96 ++++++++----- .../{ => common}/message_processor_test.go | 13 +- protocol/common/raw_message.go | 21 +++ protocol/message.go | 14 -- protocol/message_handler.go | 7 +- protocol/messenger.go | 130 +++++++++++------- protocol/messenger_config.go | 5 +- protocol/persistence.go | 7 +- .../push_notification.go | 44 +++++- .../push_notification_server.go | 26 +++- .../push_notification_server_test.go | 2 +- 12 files changed, 248 insertions(+), 125 deletions(-) create mode 100644 protocol/common/feature_flags.go rename protocol/{ => common}/message_processor.go (85%) rename protocol/{ => common}/message_processor_test.go (97%) create mode 100644 protocol/common/raw_message.go diff --git a/protocol/common/feature_flags.go b/protocol/common/feature_flags.go new file mode 100644 index 000000000..9473e1da1 --- /dev/null +++ b/protocol/common/feature_flags.go @@ -0,0 +1,8 @@ +package common + +type FeatureFlags struct { + // Datasync indicates whether direct messages should be sent exclusively + // using datasync, breaking change for non-v1 clients. Public messages + // are not impacted + Datasync bool +} diff --git a/protocol/message_processor.go b/protocol/common/message_processor.go similarity index 85% rename from protocol/message_processor.go rename to protocol/common/message_processor.go index d05c38625..b3d014efe 100644 --- a/protocol/message_processor.go +++ b/protocol/common/message_processor.go @@ -1,4 +1,4 @@ -package protocol +package common import ( "context" @@ -34,30 +34,33 @@ const ( whisperPoWTime = 5 ) -type messageProcessor struct { +// SentMessage reprent a message that has been passed to the transport layer +type SentMessage struct { + PublicKey *ecdsa.PublicKey + Spec *encryption.ProtocolMessageSpec + MessageIDs [][]byte +} + +type MessageProcessor struct { identity *ecdsa.PrivateKey datasync *datasync.DataSync protocol *encryption.Protocol transport transport.Transport logger *zap.Logger - featureFlags featureFlags - // onMessageSpecSent is a callback that is to be called when - // a message spec is sent. - // The reason is a callback is that datasync dispatches things asynchronously - // through a callback, and therefore return values can't be used - onMessageSpecSent func(*ecdsa.PublicKey, *encryption.ProtocolMessageSpec, [][]byte) error + subscriptions []chan<- *SentMessage + + featureFlags FeatureFlags } -func newMessageProcessor( +func NewMessageProcessor( identity *ecdsa.PrivateKey, database *sql.DB, enc *encryption.Protocol, transport transport.Transport, logger *zap.Logger, - features featureFlags, - onMessageSpecSent func(*ecdsa.PublicKey, *encryption.ProtocolMessageSpec, [][]byte) error, -) (*messageProcessor, error) { + features FeatureFlags, +) (*MessageProcessor, error) { dataSyncTransport := datasync.NewNodeTransport() dataSyncNode, err := datasyncnode.NewPersistentNode( database, @@ -70,9 +73,9 @@ func newMessageProcessor( if err != nil { return nil, err } - ds := datasync.New(dataSyncNode, dataSyncTransport, features.datasync, logger) + ds := datasync.New(dataSyncNode, dataSyncTransport, features.Datasync, logger) - p := &messageProcessor{ + p := &MessageProcessor{ identity: identity, datasync: ds, protocol: enc, @@ -85,7 +88,7 @@ func newMessageProcessor( // With DataSync enabled, messages are added to the DataSync // but actual encrypt and send calls are postponed. // sendDataSync is responsible for encrypting and sending postponed messages. - if features.datasync { + if features.Datasync { ds.Init(p.sendDataSync) ds.Start(300 * time.Millisecond) } @@ -93,12 +96,15 @@ func newMessageProcessor( return p, nil } -func (p *messageProcessor) Stop() { +func (p *MessageProcessor) Stop() { + for _, c := range p.subscriptions { + close(c) + } p.datasync.Stop() // idempotent op } // SendPrivate takes encoded data, encrypts it and sends through the wire. -func (p *messageProcessor) SendPrivate( +func (p *MessageProcessor) SendPrivate( ctx context.Context, recipient *ecdsa.PublicKey, rawMessage *RawMessage, @@ -113,7 +119,7 @@ func (p *messageProcessor) SendPrivate( // SendGroupRaw takes encoded data, encrypts it and sends through the wire, // always return the messageID -func (p *messageProcessor) SendGroup( +func (p *MessageProcessor) SendGroup( ctx context.Context, recipients []*ecdsa.PublicKey, rawMessage *RawMessage, @@ -140,7 +146,7 @@ func (p *messageProcessor) SendGroup( } // sendPrivate sends data to the recipient identifying with a given public key. -func (p *messageProcessor) sendPrivate( +func (p *MessageProcessor) sendPrivate( ctx context.Context, recipient *ecdsa.PublicKey, rawMessage *RawMessage, @@ -154,7 +160,7 @@ func (p *messageProcessor) sendPrivate( messageID := v1protocol.MessageID(&p.identity.PublicKey, wrappedMessage) - if p.featureFlags.datasync { + if p.featureFlags.Datasync { if err := p.addToDataSync(recipient, wrappedMessage); err != nil { return nil, errors.Wrap(err, "failed to send message with datasync") } @@ -180,7 +186,7 @@ func (p *messageProcessor) sendPrivate( } // sendPairInstallation sends data to the recipients, using DH -func (p *messageProcessor) SendPairInstallation( +func (p *MessageProcessor) SendPairInstallation( ctx context.Context, recipient *ecdsa.PublicKey, rawMessage *RawMessage, @@ -212,7 +218,7 @@ func (p *messageProcessor) SendPairInstallation( // EncodeMembershipUpdate takes a group and an optional chat message and returns the protobuf representation to be sent on the wire. // All the events in a group are encoded and added to the payload -func (p *messageProcessor) EncodeMembershipUpdate( +func (p *MessageProcessor) EncodeMembershipUpdate( group *v1protocol.Group, chatMessage *protobuf.ChatMessage, ) ([]byte, error) { @@ -231,7 +237,7 @@ func (p *messageProcessor) EncodeMembershipUpdate( } // SendPublic takes encoded data, encrypts it and sends through the wire. -func (p *messageProcessor) SendPublic( +func (p *MessageProcessor) SendPublic( ctx context.Context, chatName string, rawMessage *RawMessage, @@ -262,12 +268,12 @@ func (p *messageProcessor) SendPublic( return messageID, nil } -// handleMessages expects a whisper message as input, and it will go through +// HandleMessages expects a whisper message as input, and it will go through // a series of transformations until the message is parsed into an application // layer message, or in case of Raw methods, the processing stops at the layer // before. // It returns an error only if the processing of required steps failed. -func (p *messageProcessor) handleMessages(shhMessage *types.Message, applicationLayer bool) ([]*v1protocol.StatusMessage, error) { +func (p *MessageProcessor) HandleMessages(shhMessage *types.Message, applicationLayer bool) ([]*v1protocol.StatusMessage, error) { logger := p.logger.With(zap.String("site", "handleMessages")) hlogger := logger.With(zap.ByteString("hash", shhMessage.Hash)) var statusMessage v1protocol.StatusMessage @@ -305,7 +311,7 @@ func (p *messageProcessor) handleMessages(shhMessage *types.Message, application return statusMessages, nil } -func (p *messageProcessor) handleEncryptionLayer(ctx context.Context, message *v1protocol.StatusMessage) error { +func (p *MessageProcessor) handleEncryptionLayer(ctx context.Context, message *v1protocol.StatusMessage) error { logger := p.logger.With(zap.String("site", "handleEncryptionLayer")) publicKey := message.SigPubKey() @@ -322,7 +328,7 @@ func (p *messageProcessor) handleEncryptionLayer(ctx context.Context, message *v return nil } -func (p *messageProcessor) handleErrDeviceNotFound(ctx context.Context, publicKey *ecdsa.PublicKey) error { +func (p *MessageProcessor) handleErrDeviceNotFound(ctx context.Context, publicKey *ecdsa.PublicKey) error { now := time.Now().Unix() advertise, err := p.protocol.ShouldAdvertiseBundle(publicKey, now) if err != nil { @@ -351,7 +357,7 @@ func (p *messageProcessor) handleErrDeviceNotFound(ctx context.Context, publicKe return nil } -func (p *messageProcessor) wrapMessageV1(rawMessage *RawMessage) ([]byte, error) { +func (p *MessageProcessor) wrapMessageV1(rawMessage *RawMessage) ([]byte, error) { wrappedMessage, err := v1protocol.WrapMessageV1(rawMessage.Payload, rawMessage.MessageType, p.identity) if err != nil { return nil, errors.Wrap(err, "failed to wrap message") @@ -359,7 +365,7 @@ func (p *messageProcessor) wrapMessageV1(rawMessage *RawMessage) ([]byte, error) return wrappedMessage, nil } -func (p *messageProcessor) addToDataSync(publicKey *ecdsa.PublicKey, message []byte) error { +func (p *MessageProcessor) addToDataSync(publicKey *ecdsa.PublicKey, message []byte) error { groupID := datasync.ToOneToOneGroupID(&p.identity.PublicKey, publicKey) peerID := datasyncpeer.PublicKeyToPeerID(*publicKey) exist, err := p.datasync.IsPeerInGroup(groupID, peerID) @@ -381,7 +387,7 @@ func (p *messageProcessor) addToDataSync(publicKey *ecdsa.PublicKey, message []b // sendDataSync sends a message scheduled by the data sync layer. // Data Sync layer calls this method "dispatch" function. -func (p *messageProcessor) sendDataSync(ctx context.Context, publicKey *ecdsa.PublicKey, encodedMessage []byte, payload *datasyncproto.Payload) error { +func (p *MessageProcessor) sendDataSync(ctx context.Context, publicKey *ecdsa.PublicKey, encodedMessage []byte, payload *datasyncproto.Payload) error { messageIDs := make([][]byte, 0, len(payload.Messages)) for _, payload := range payload.Messages { messageIDs = append(messageIDs, v1protocol.MessageID(&p.identity.PublicKey, payload.Body)) @@ -403,8 +409,8 @@ func (p *messageProcessor) sendDataSync(ctx context.Context, publicKey *ecdsa.Pu } // sendMessageSpec analyses the spec properties and selects a proper transport method. -func (p *messageProcessor) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([]byte, *types.NewMessage, error) { - newMessage, err := messageSpecToWhisper(messageSpec) +func (p *MessageProcessor) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([]byte, *types.NewMessage, error) { + newMessage, err := MessageSpecToWhisper(messageSpec) if err != nil { return nil, nil, err } @@ -425,17 +431,31 @@ func (p *messageProcessor) sendMessageSpec(ctx context.Context, publicKey *ecdsa return nil, nil, err } - if p.onMessageSpecSent != nil { + sentMessage := &SentMessage{ + PublicKey: publicKey, + Spec: messageSpec, + MessageIDs: messageIDs, + } - if err := p.onMessageSpecSent(publicKey, messageSpec, messageIDs); err != nil { - return nil, nil, err + // Publish on channels, drop if buffer is full + for _, c := range p.subscriptions { + select { + case c <- sentMessage: + default: + logger.Warn("subscription channel full, dropping message") } } return hash, newMessage, nil } -func messageSpecToWhisper(spec *encryption.ProtocolMessageSpec) (*types.NewMessage, error) { +func (p *MessageProcessor) Subscribe() <-chan *SentMessage { + c := make(chan *SentMessage, 100) + p.subscriptions = append(p.subscriptions, c) + return c +} + +func MessageSpecToWhisper(spec *encryption.ProtocolMessageSpec) (*types.NewMessage, error) { var newMessage *types.NewMessage payload, err := proto.Marshal(spec.Message) @@ -463,8 +483,8 @@ func calculatePoW(payload []byte) float64 { return whisperDefaultPoW } -// isPubKeyEqual checks that two public keys are equal -func isPubKeyEqual(a, b *ecdsa.PublicKey) bool { +// IsPubKeyEqual checks that two public keys are equal +func IsPubKeyEqual(a, b *ecdsa.PublicKey) bool { // the curve is always the same, just compare the points return a.X.Cmp(b.X) == 0 && a.Y.Cmp(b.Y) == 0 } diff --git a/protocol/message_processor_test.go b/protocol/common/message_processor_test.go similarity index 97% rename from protocol/message_processor_test.go rename to protocol/common/message_processor_test.go index a2c22bafe..1abdceaa3 100644 --- a/protocol/message_processor_test.go +++ b/protocol/common/message_processor_test.go @@ -1,6 +1,7 @@ -package protocol +package common import ( + "github.com/status-im/status-go/protocol" "io/ioutil" "os" "path/filepath" @@ -35,12 +36,12 @@ type MessageProcessorSuite struct { processor *messageProcessor tmpDir string - testMessage Message + testMessage protocol.Message logger *zap.Logger } func (s *MessageProcessorSuite) SetupTest() { - s.testMessage = Message{ + s.testMessage = protocol.Message{ ChatMessage: protobuf.ChatMessage{ Text: "abc123", ChatId: "testing-adamb", @@ -81,8 +82,8 @@ func (s *MessageProcessorSuite) SetupTest() { whisperConfig.MinimumAcceptedPOW = 0 shh := whisper.New(&whisperConfig) s.Require().NoError(shh.Start(nil)) - config := &config{} - s.Require().NoError(WithDatasync()(config)) + config := &protocol.config{} + s.Require().NoError(protocol.WithDatasync()(config)) whisperTransport, err := transport.NewTransport( gethbridge.NewGethWhisperWrapper(shh), @@ -100,7 +101,7 @@ func (s *MessageProcessorSuite) SetupTest() { encryptionProtocol, whisperTransport, s.logger, - featureFlags{}, + protocol.featureFlags{}, nil, ) s.Require().NoError(err) diff --git a/protocol/common/raw_message.go b/protocol/common/raw_message.go new file mode 100644 index 000000000..d21259a48 --- /dev/null +++ b/protocol/common/raw_message.go @@ -0,0 +1,21 @@ +package common + +import ( + "crypto/ecdsa" + + "github.com/status-im/status-go/protocol/protobuf" +) + +// RawMessage represent a sent or received message, kept for being able +// to re-send/propagate +type RawMessage struct { + ID string + LocalChatID string + LastSent uint64 + SendCount int + Sent bool + ResendAutomatically bool + MessageType protobuf.ApplicationMetadataMessage_Type + Payload []byte + Recipients []*ecdsa.PublicKey +} diff --git a/protocol/message.go b/protocol/message.go index 2649033eb..27c6e885b 100644 --- a/protocol/message.go +++ b/protocol/message.go @@ -109,20 +109,6 @@ type Message struct { SigPubKey *ecdsa.PublicKey `json:"-"` } -// RawMessage represent a sent or received message, kept for being able -// to re-send/propagate -type RawMessage struct { - ID string - LocalChatID string - LastSent uint64 - SendCount int - Sent bool - ResendAutomatically bool - MessageType protobuf.ApplicationMetadataMessage_Type - Payload []byte - Recipients []*ecdsa.PublicKey -} - func (m *Message) MarshalJSON() ([]byte, error) { type StickerAlias struct { Hash string `json:"hash"` diff --git a/protocol/message_handler.go b/protocol/message_handler.go index 161b3f557..b66365403 100644 --- a/protocol/message_handler.go +++ b/protocol/message_handler.go @@ -4,6 +4,7 @@ import ( "crypto/ecdsa" "encoding/hex" "fmt" + "github.com/status-im/status-go/protocol/common" "github.com/pkg/errors" "go.uber.org/zap" @@ -146,7 +147,7 @@ func (m *MessageHandler) handleCommandMessage(state *ReceivedMessageState, messa message.LocalChatID = chat.ID // Increase unviewed count - if !isPubKeyEqual(message.SigPubKey, &m.identity.PublicKey) { + if !common.IsPubKeyEqual(message.SigPubKey, &m.identity.PublicKey) { chat.UnviewedMessagesCount++ message.OutgoingStatus = "" } else { @@ -332,7 +333,7 @@ func (m *MessageHandler) HandleChatMessage(state *ReceivedMessageState) error { receivedMessage.LocalChatID = chat.ID // Increase unviewed count - if !isPubKeyEqual(receivedMessage.SigPubKey, &m.identity.PublicKey) { + if !common.IsPubKeyEqual(receivedMessage.SigPubKey, &m.identity.PublicKey) { chat.UnviewedMessagesCount++ } else { // Our own message, mark as sent @@ -582,7 +583,7 @@ func (m *MessageHandler) matchMessage(message *Message, chats map[string]*Chat, return nil, errors.New("received a public message from non-existing chat") } return chat, nil - case message.MessageType == protobuf.ChatMessage_ONE_TO_ONE && isPubKeyEqual(message.SigPubKey, &m.identity.PublicKey): + case message.MessageType == protobuf.ChatMessage_ONE_TO_ONE && common.IsPubKeyEqual(message.SigPubKey, &m.identity.PublicKey): // It's a private message coming from us so we rely on Message.ChatID // If chat does not exist, it should be created to support multidevice synchronization. chatID := message.ChatId diff --git a/protocol/messenger.go b/protocol/messenger.go index 3a0c373b9..1076ecc3d 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -3,6 +3,7 @@ package protocol import ( "context" "crypto/ecdsa" + "github.com/status-im/status-go/protocol/common" "io/ioutil" "math/rand" "os" @@ -56,13 +57,13 @@ type Messenger struct { persistence *sqlitePersistence transport transport.Transport encryptor *encryption.Protocol - processor *messageProcessor + processor *common.MessageProcessor handler *MessageHandler pushNotificationClient *push_notification_client.Client pushNotificationServer *push_notification_server.Server logger *zap.Logger verifyTransactionClient EthClient - featureFlags featureFlags + featureFlags common.FeatureFlags messagesPersistenceEnabled bool shutdownTasks []func() error systemMessagesTranslations map[protobuf.MembershipUpdateEvent_EventType]string @@ -91,13 +92,6 @@ func (m *MessengerResponse) IsEmpty() bool { return len(m.Chats) == 0 && len(m.Messages) == 0 && len(m.Contacts) == 0 && len(m.Installations) == 0 } -type featureFlags struct { - // datasync indicates whether direct messages should be sent exclusively - // using datasync, breaking change for non-v1 clients. Public messages - // are not impacted - datasync bool -} - type dbConfig struct { dbPath string dbKey string @@ -155,7 +149,7 @@ func NewMessenger( slogger := logger.With(zap.String("site", "onSendContactCodeHandler")) slogger.Debug("received a SendContactCode request") - newMessage, err := messageSpecToWhisper(messageSpec) + newMessage, err := common.MessageSpecToWhisper(messageSpec) if err != nil { slogger.Warn("failed to convert spec to Whisper message", zap.Error(err)) return @@ -238,28 +232,27 @@ func NewMessenger( logger, ) - pushNotificationClientPersistence := push_notification_client.NewPersistence(database) - pushNotificationClient := push_notification_client.New(pushNotificationClientPersistence) - - var pushNotificationServer *push_notification_server.Server - if c.pushNotificationServerConfig != nil { - pushNotificationServerPersistence := push_notification_server.NewSQLitePersistence(database) - pushNotificationServer = push_notification_server.New(c.pushNotificationServerConfig, pushNotificationServerPersistence) - } - - processor, err := newMessageProcessor( + processor, err := common.NewMessageProcessor( identity, database, encryptionProtocol, transp, logger, c.featureFlags, - pushNotificationClient.HandleMessageSent, ) if err != nil { return nil, errors.Wrap(err, "failed to create messageProcessor") } + var pushNotificationServer *push_notification_server.Server + if c.pushNotificationServerConfig != nil { + pushNotificationServerPersistence := push_notification_server.NewSQLitePersistence(database) + pushNotificationServer = push_notification_server.New(c.pushNotificationServerConfig, pushNotificationServerPersistence, processor) + } + + pushNotificationClientPersistence := push_notification_client.NewPersistence(database) + pushNotificationClient := push_notification_client.New(pushNotificationClientPersistence, processor) + handler := newMessageHandler(identity, logger, &sqlitePersistence{db: database}) messenger = &Messenger{ @@ -596,7 +589,7 @@ func (m *Messenger) CreateGroupChatWithMembers(ctx context.Context, name string, } m.allChats[chat.ID] = &chat - _, err = m.dispatchMessage(ctx, &RawMessage{ + _, err = m.dispatchMessage(ctx, &common.RawMessage{ LocalChatID: chat.ID, Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_MEMBERSHIP_UPDATE_MESSAGE, @@ -662,7 +655,7 @@ func (m *Messenger) RemoveMemberFromGroupChat(ctx context.Context, chatID string if err != nil { return nil, err } - _, err = m.dispatchMessage(ctx, &RawMessage{ + _, err = m.dispatchMessage(ctx, &common.RawMessage{ LocalChatID: chat.ID, Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_MEMBERSHIP_UPDATE_MESSAGE, @@ -725,7 +718,7 @@ func (m *Messenger) AddMembersToGroupChat(ctx context.Context, chatID string, me if err != nil { return nil, err } - _, err = m.dispatchMessage(ctx, &RawMessage{ + _, err = m.dispatchMessage(ctx, &common.RawMessage{ LocalChatID: chat.ID, Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_MEMBERSHIP_UPDATE_MESSAGE, @@ -790,7 +783,7 @@ func (m *Messenger) ChangeGroupChatName(ctx context.Context, chatID string, name if err != nil { return nil, err } - _, err = m.dispatchMessage(ctx, &RawMessage{ + _, err = m.dispatchMessage(ctx, &common.RawMessage{ LocalChatID: chat.ID, Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_MEMBERSHIP_UPDATE_MESSAGE, @@ -856,7 +849,7 @@ func (m *Messenger) AddAdminsToGroupChat(ctx context.Context, chatID string, mem if err != nil { return nil, err } - _, err = m.dispatchMessage(ctx, &RawMessage{ + _, err = m.dispatchMessage(ctx, &common.RawMessage{ LocalChatID: chat.ID, Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_MEMBERSHIP_UPDATE_MESSAGE, @@ -924,7 +917,7 @@ func (m *Messenger) ConfirmJoiningGroup(ctx context.Context, chatID string) (*Me if err != nil { return nil, err } - _, err = m.dispatchMessage(ctx, &RawMessage{ + _, err = m.dispatchMessage(ctx, &common.RawMessage{ LocalChatID: chat.ID, Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_MEMBERSHIP_UPDATE_MESSAGE, @@ -992,7 +985,7 @@ func (m *Messenger) LeaveGroupChat(ctx context.Context, chatID string, remove bo if err != nil { return nil, err } - _, err = m.dispatchMessage(ctx, &RawMessage{ + _, err = m.dispatchMessage(ctx, &common.RawMessage{ LocalChatID: chat.ID, Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_MEMBERSHIP_UPDATE_MESSAGE, @@ -1178,7 +1171,7 @@ func (m *Messenger) ReSendChatMessage(ctx context.Context, messageID string) err return errors.New("chat not found") } - _, err = m.dispatchMessage(ctx, &RawMessage{ + _, err = m.dispatchMessage(ctx, &common.RawMessage{ LocalChatID: chat.ID, Payload: message.Payload, MessageType: message.MessageType, @@ -1198,7 +1191,7 @@ func (m *Messenger) hasPairedDevices() bool { } // sendToPairedDevices will check if we have any paired devices and send to them if necessary -func (m *Messenger) sendToPairedDevices(ctx context.Context, spec *RawMessage) error { +func (m *Messenger) sendToPairedDevices(ctx context.Context, spec *common.RawMessage) error { hasPairedDevices := m.hasPairedDevices() // We send a message to any paired device if hasPairedDevices { @@ -1210,7 +1203,7 @@ func (m *Messenger) sendToPairedDevices(ctx context.Context, spec *RawMessage) e return nil } -func (m *Messenger) dispatchPairInstallationMessage(ctx context.Context, spec *RawMessage) ([]byte, error) { +func (m *Messenger) dispatchPairInstallationMessage(ctx context.Context, spec *common.RawMessage) ([]byte, error) { var err error var id []byte @@ -1229,7 +1222,7 @@ func (m *Messenger) dispatchPairInstallationMessage(ctx context.Context, spec *R return id, nil } -func (m *Messenger) dispatchMessage(ctx context.Context, spec *RawMessage) ([]byte, error) { +func (m *Messenger) dispatchMessage(ctx context.Context, spec *common.RawMessage) ([]byte, error) { var err error var id []byte logger := m.logger.With(zap.String("site", "dispatchMessage"), zap.String("chatID", spec.LocalChatID)) @@ -1244,7 +1237,7 @@ func (m *Messenger) dispatchMessage(ctx context.Context, spec *RawMessage) ([]by if err != nil { return nil, err } - if !isPubKeyEqual(publicKey, &m.identity.PublicKey) { + if !common.IsPubKeyEqual(publicKey, &m.identity.PublicKey) { id, err = m.processor.SendPrivate(ctx, publicKey, spec) if err != nil { @@ -1279,7 +1272,7 @@ func (m *Messenger) dispatchMessage(ctx context.Context, spec *RawMessage) ([]by // Filter out my key from the recipients n := 0 for _, recipient := range spec.Recipients { - if !isPubKeyEqual(recipient, &m.identity.PublicKey) { + if !common.IsPubKeyEqual(recipient, &m.identity.PublicKey) { spec.Recipients[n] = recipient n++ } @@ -1379,7 +1372,7 @@ func (m *Messenger) SendChatMessage(ctx context.Context, message *Message) (*Mes return nil, errors.New("chat type not supported") } - id, err := m.dispatchMessage(ctx, &RawMessage{ + id, err := m.dispatchMessage(ctx, &common.RawMessage{ LocalChatID: chat.ID, Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_CHAT_MESSAGE, @@ -1494,7 +1487,7 @@ func (m *Messenger) sendContactUpdate(ctx context.Context, chatID, ensName, prof return nil, err } - _, err = m.dispatchMessage(ctx, &RawMessage{ + _, err = m.dispatchMessage(ctx, &common.RawMessage{ LocalChatID: chatID, Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_CONTACT_UPDATE, @@ -1588,7 +1581,7 @@ func (m *Messenger) SendPairInstallation(ctx context.Context) (*MessengerRespons return nil, err } - _, err = m.dispatchPairInstallationMessage(ctx, &RawMessage{ + _, err = m.dispatchPairInstallationMessage(ctx, &common.RawMessage{ LocalChatID: chatID, Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_PAIR_INSTALLATION, @@ -1635,7 +1628,7 @@ func (m *Messenger) syncPublicChat(ctx context.Context, publicChat *Chat) error return err } - _, err = m.dispatchMessage(ctx, &RawMessage{ + _, err = m.dispatchMessage(ctx, &common.RawMessage{ LocalChatID: chatID, Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_SYNC_INSTALLATION_PUBLIC_CHAT, @@ -1678,7 +1671,7 @@ func (m *Messenger) syncContact(ctx context.Context, contact *Contact) error { return err } - _, err = m.dispatchMessage(ctx, &RawMessage{ + _, err = m.dispatchMessage(ctx, &common.RawMessage{ LocalChatID: chatID, Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_SYNC_INSTALLATION_CONTACT, @@ -1758,7 +1751,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte for _, messages := range chatWithMessages { for _, shhMessage := range messages { // TODO: fix this to use an exported method. - statusMessages, err := m.processor.handleMessages(shhMessage, true) + statusMessages, err := m.processor.HandleMessages(shhMessage, true) if err != nil { logger.Info("failed to decode messages", zap.Error(err)) continue @@ -1827,7 +1820,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte continue } case protobuf.PairInstallation: - if !isPubKeyEqual(messageState.CurrentMessageState.PublicKey, &m.identity.PublicKey) { + if !common.IsPubKeyEqual(messageState.CurrentMessageState.PublicKey, &m.identity.PublicKey) { logger.Warn("not coming from us, ignoring") continue } @@ -1840,7 +1833,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte } case protobuf.SyncInstallationContact: - if !isPubKeyEqual(messageState.CurrentMessageState.PublicKey, &m.identity.PublicKey) { + if !common.IsPubKeyEqual(messageState.CurrentMessageState.PublicKey, &m.identity.PublicKey) { logger.Warn("not coming from us, ignoring") continue } @@ -1853,7 +1846,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte continue } case protobuf.SyncInstallationPublicChat: - if !isPubKeyEqual(messageState.CurrentMessageState.PublicKey, &m.identity.PublicKey) { + if !common.IsPubKeyEqual(messageState.CurrentMessageState.PublicKey, &m.identity.PublicKey) { logger.Warn("not coming from us, ignoring") continue } @@ -1926,6 +1919,43 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte logger.Warn("failed to handle ContactUpdate", zap.Error(err)) continue } + case protobuf.PushNotificationRegistration: + logger.Debug("Received PushNotificationRegistration") + if m.pushNotificationServer == nil { + continue + } + logger.Debug("Handling PushNotificationRegistration") + // TODO: Compare DST with Identity + if err := m.pushNotificationServer.HandlePushNotificationRegistration2(publicKey, msg.ParsedMessage.([]byte)); err != nil { + logger.Warn("failed to handle PushNotificationRegistration", zap.Error(err)) + } + // We continue in any case, no changes to messenger + continue + case protobuf.PushNotificationQuery: + logger.Debug("Received PushNotificationQuery") + if m.pushNotificationServer == nil { + continue + } + logger.Debug("Handling PushNotificationQuery") + // TODO: Compare DST with Identity + if err := m.pushNotificationServer.HandlePushNotificationQuery2(publicKey, msg.ParsedMessage.(protobuf.PushNotificationQuery)); err != nil { + logger.Warn("failed to handle PushNotificationQuery", zap.Error(err)) + } + // We continue in any case, no changes to messenger + continue + case protobuf.PushNotificationRequest: + logger.Debug("Received PushNotificationRequest") + if m.pushNotificationServer == nil { + continue + } + logger.Debug("Handling PushNotificationRequest") + // TODO: Compare DST with Identity + if err := m.pushNotificationServer.HandlePushNotificationRequest2(publicKey, msg.ParsedMessage.(protobuf.PushNotificationRequest)); err != nil { + logger.Warn("failed to handle PushNotificationRequest", zap.Error(err)) + } + // We continue in any case, no changes to messenger + continue + default: logger.Debug("message not handled") @@ -2244,7 +2274,7 @@ func (m *Messenger) RequestTransaction(ctx context.Context, chatID, value, contr if err != nil { return nil, err } - id, err := m.dispatchMessage(ctx, &RawMessage{ + id, err := m.dispatchMessage(ctx, &common.RawMessage{ LocalChatID: chat.ID, Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_REQUEST_TRANSACTION, @@ -2320,7 +2350,7 @@ func (m *Messenger) RequestAddressForTransaction(ctx context.Context, chatID, fr if err != nil { return nil, err } - id, err := m.dispatchMessage(ctx, &RawMessage{ + id, err := m.dispatchMessage(ctx, &common.RawMessage{ LocalChatID: chat.ID, Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_REQUEST_ADDRESS_FOR_TRANSACTION, @@ -2422,7 +2452,7 @@ func (m *Messenger) AcceptRequestAddressForTransaction(ctx context.Context, mess return nil, err } - newMessageID, err := m.dispatchMessage(ctx, &RawMessage{ + newMessageID, err := m.dispatchMessage(ctx, &common.RawMessage{ LocalChatID: chat.ID, Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_ACCEPT_REQUEST_ADDRESS_FOR_TRANSACTION, @@ -2505,7 +2535,7 @@ func (m *Messenger) DeclineRequestTransaction(ctx context.Context, messageID str return nil, err } - newMessageID, err := m.dispatchMessage(ctx, &RawMessage{ + newMessageID, err := m.dispatchMessage(ctx, &common.RawMessage{ LocalChatID: chat.ID, Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_DECLINE_REQUEST_TRANSACTION, @@ -2587,7 +2617,7 @@ func (m *Messenger) DeclineRequestAddressForTransaction(ctx context.Context, mes return nil, err } - newMessageID, err := m.dispatchMessage(ctx, &RawMessage{ + newMessageID, err := m.dispatchMessage(ctx, &common.RawMessage{ LocalChatID: chat.ID, Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_DECLINE_REQUEST_ADDRESS_FOR_TRANSACTION, @@ -2684,7 +2714,7 @@ func (m *Messenger) AcceptRequestTransaction(ctx context.Context, transactionHas return nil, err } - newMessageID, err := m.dispatchMessage(ctx, &RawMessage{ + newMessageID, err := m.dispatchMessage(ctx, &common.RawMessage{ LocalChatID: chat.ID, Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_SEND_TRANSACTION, @@ -2761,7 +2791,7 @@ func (m *Messenger) SendTransaction(ctx context.Context, chatID, value, contract return nil, err } - newMessageID, err := m.dispatchMessage(ctx, &RawMessage{ + newMessageID, err := m.dispatchMessage(ctx, &common.RawMessage{ LocalChatID: chat.ID, Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_SEND_TRANSACTION, diff --git a/protocol/messenger_config.go b/protocol/messenger_config.go index 07dae8a73..253182277 100644 --- a/protocol/messenger_config.go +++ b/protocol/messenger_config.go @@ -2,6 +2,7 @@ package protocol import ( "database/sql" + "github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/encryption" "github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/push_notification_server" @@ -23,7 +24,7 @@ type config struct { envelopesMonitorConfig *transport.EnvelopesMonitorConfig messagesPersistenceEnabled bool - featureFlags featureFlags + featureFlags common.FeatureFlags // A path to a database or a database instance is required. // The database instance has a higher priority. @@ -99,7 +100,7 @@ func WithPushNotificationServerConfig(pushNotificationServerConfig *push_notific func WithDatasync() func(c *config) error { return func(c *config) error { - c.featureFlags.datasync = true + c.featureFlags.Datasync = true return nil } } diff --git a/protocol/persistence.go b/protocol/persistence.go index 1b0f0bbe0..916fcdd94 100644 --- a/protocol/persistence.go +++ b/protocol/persistence.go @@ -7,6 +7,7 @@ import ( "encoding/gob" "github.com/pkg/errors" + "github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/eth-node/crypto" ) @@ -377,7 +378,7 @@ func (db sqlitePersistence) Contacts() ([]*Contact, error) { return response, nil } -func (db sqlitePersistence) SaveRawMessage(message *RawMessage) error { +func (db sqlitePersistence) SaveRawMessage(message *common.RawMessage) error { var pubKeys [][]byte for _, pk := range message.Recipients { pubKeys = append(pubKeys, crypto.CompressPubkey(pk)) @@ -417,10 +418,10 @@ func (db sqlitePersistence) SaveRawMessage(message *RawMessage) error { return err } -func (db sqlitePersistence) RawMessageByID(id string) (*RawMessage, error) { +func (db sqlitePersistence) RawMessageByID(id string) (*common.RawMessage, error) { var rawPubKeys [][]byte var encodedRecipients []byte - message := &RawMessage{} + message := &common.RawMessage{} err := db.db.QueryRow(` SELECT diff --git a/protocol/push_notification_client/push_notification.go b/protocol/push_notification_client/push_notification.go index da7abd26c..50ccf1b3d 100644 --- a/protocol/push_notification_client/push_notification.go +++ b/protocol/push_notification_client/push_notification.go @@ -5,6 +5,7 @@ import ( "crypto/cipher" "crypto/ecdsa" "crypto/rand" + "errors" "io" "golang.org/x/crypto/sha3" @@ -12,7 +13,7 @@ import ( "github.com/google/uuid" "github.com/status-im/status-go/eth-node/crypto/ecies" - "github.com/status-im/status-go/protocol/encryption" + "github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/protobuf" ) @@ -46,6 +47,7 @@ type Config struct { type Client struct { persistence *Persistence + quit chan struct{} config *Config // lastPushNotificationVersion is the latest known push notification version @@ -58,10 +60,44 @@ type Client struct { // randomReader only used for testing so we have deterministic encryption reader io.Reader + + //messageProcessor is a message processor used to send and being notified of messages + + messageProcessor *common.MessageProcessor } -func New(persistence *Persistence) *Client { - return &Client{persistence: persistence, reader: rand.Reader} +func New(persistence *Persistence, processor *common.MessageProcessor) *Client { + return &Client{ + quit: make(chan struct{}), + messageProcessor: processor, + persistence: persistence, + reader: rand.Reader} +} + +func (c *Client) Start() error { + if c.messageProcessor == nil { + return errors.New("can't start, missing message processor") + } + + go func() { + subscription := c.messageProcessor.Subscribe() + for { + select { + case m := <-subscription: + if err := c.HandleMessageSent(m); err != nil { + // TODO: log + } + case <-c.quit: + return + } + } + }() + return nil +} + +func (c *Client) Stop() error { + close(c.quit) + return nil } // This likely will return a channel as it's an asynchrous operation @@ -78,7 +114,7 @@ func sendPushNotificationTo(publicKey *ecdsa.PublicKey, chatID string) error { // 1) Check we have reasonably fresh push notifications info // 2) Otherwise it should fetch them // 3) Send a push notification to the devices in question -func (p *Client) HandleMessageSent(publicKey *ecdsa.PublicKey, spec *encryption.ProtocolMessageSpec, messageIDs [][]byte) error { +func (p *Client) HandleMessageSent(sentMessage *common.SentMessage) error { return nil } diff --git a/protocol/push_notification_server/push_notification_server.go b/protocol/push_notification_server/push_notification_server.go index 46f5b6dab..2bd4f4400 100644 --- a/protocol/push_notification_server/push_notification_server.go +++ b/protocol/push_notification_server/push_notification_server.go @@ -8,6 +8,7 @@ import ( "github.com/google/uuid" "github.com/status-im/status-go/eth-node/crypto/ecies" + "github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/protobuf" "go.uber.org/zap" ) @@ -25,12 +26,13 @@ type Config struct { } type Server struct { - persistence Persistence - config *Config + persistence Persistence + config *Config + messageProcessor *common.MessageProcessor } -func New(config *Config, persistence Persistence) *Server { - return &Server{persistence: persistence, config: config} +func New(config *Config, persistence Persistence, messageProcessor *common.MessageProcessor) *Server { + return &Server{persistence: persistence, config: config, messageProcessor: messageProcessor} } func (p *Server) generateSharedKey(publicKey *ecdsa.PublicKey) ([]byte, error) { @@ -239,3 +241,19 @@ func (p *Server) HandlePushNotificationRegistration(publicKey *ecdsa.PublicKey, return response } + +func (p *Server) HandlePushNotificationRegistration2(publicKey *ecdsa.PublicKey, payload []byte) error { + return nil + +} + +func (p *Server) HandlePushNotificationQuery2(publicKey *ecdsa.PublicKey, query protobuf.PushNotificationQuery) error { + return nil + +} + +func (p *Server) HandlePushNotificationRequest2(publicKey *ecdsa.PublicKey, + request protobuf.PushNotificationRequest) error { + return nil + +} diff --git a/protocol/push_notification_server/push_notification_server_test.go b/protocol/push_notification_server/push_notification_server_test.go index 19c5999da..7ecb4f831 100644 --- a/protocol/push_notification_server/push_notification_server_test.go +++ b/protocol/push_notification_server/push_notification_server_test.go @@ -56,7 +56,7 @@ func (s *ServerSuite) SetupTest() { Identity: identity, } - s.server = New(config, s.persistence) + s.server = New(config, s.persistence, nil) sharedKey, err := s.server.generateSharedKey(&s.key.PublicKey) s.Require().NoError(err)