Move message processor to common and allow subscribing to sent messages

This commit is contained in:
Andrea Maria Piana 2020-07-06 10:54:22 +02:00
parent 9da64ef251
commit 3afde67022
No known key found for this signature in database
GPG Key ID: AA6CCA6DE0E06424
12 changed files with 248 additions and 125 deletions

View File

@ -0,0 +1,8 @@
package common
type FeatureFlags struct {
// Datasync indicates whether direct messages should be sent exclusively
// using datasync, breaking change for non-v1 clients. Public messages
// are not impacted
Datasync bool
}

View File

@ -1,4 +1,4 @@
package protocol
package common
import (
"context"
@ -34,30 +34,33 @@ const (
whisperPoWTime = 5
)
type messageProcessor struct {
// SentMessage reprent a message that has been passed to the transport layer
type SentMessage struct {
PublicKey *ecdsa.PublicKey
Spec *encryption.ProtocolMessageSpec
MessageIDs [][]byte
}
type MessageProcessor struct {
identity *ecdsa.PrivateKey
datasync *datasync.DataSync
protocol *encryption.Protocol
transport transport.Transport
logger *zap.Logger
featureFlags featureFlags
// onMessageSpecSent is a callback that is to be called when
// a message spec is sent.
// The reason is a callback is that datasync dispatches things asynchronously
// through a callback, and therefore return values can't be used
onMessageSpecSent func(*ecdsa.PublicKey, *encryption.ProtocolMessageSpec, [][]byte) error
subscriptions []chan<- *SentMessage
featureFlags FeatureFlags
}
func newMessageProcessor(
func NewMessageProcessor(
identity *ecdsa.PrivateKey,
database *sql.DB,
enc *encryption.Protocol,
transport transport.Transport,
logger *zap.Logger,
features featureFlags,
onMessageSpecSent func(*ecdsa.PublicKey, *encryption.ProtocolMessageSpec, [][]byte) error,
) (*messageProcessor, error) {
features FeatureFlags,
) (*MessageProcessor, error) {
dataSyncTransport := datasync.NewNodeTransport()
dataSyncNode, err := datasyncnode.NewPersistentNode(
database,
@ -70,9 +73,9 @@ func newMessageProcessor(
if err != nil {
return nil, err
}
ds := datasync.New(dataSyncNode, dataSyncTransport, features.datasync, logger)
ds := datasync.New(dataSyncNode, dataSyncTransport, features.Datasync, logger)
p := &messageProcessor{
p := &MessageProcessor{
identity: identity,
datasync: ds,
protocol: enc,
@ -85,7 +88,7 @@ func newMessageProcessor(
// With DataSync enabled, messages are added to the DataSync
// but actual encrypt and send calls are postponed.
// sendDataSync is responsible for encrypting and sending postponed messages.
if features.datasync {
if features.Datasync {
ds.Init(p.sendDataSync)
ds.Start(300 * time.Millisecond)
}
@ -93,12 +96,15 @@ func newMessageProcessor(
return p, nil
}
func (p *messageProcessor) Stop() {
func (p *MessageProcessor) Stop() {
for _, c := range p.subscriptions {
close(c)
}
p.datasync.Stop() // idempotent op
}
// SendPrivate takes encoded data, encrypts it and sends through the wire.
func (p *messageProcessor) SendPrivate(
func (p *MessageProcessor) SendPrivate(
ctx context.Context,
recipient *ecdsa.PublicKey,
rawMessage *RawMessage,
@ -113,7 +119,7 @@ func (p *messageProcessor) SendPrivate(
// SendGroupRaw takes encoded data, encrypts it and sends through the wire,
// always return the messageID
func (p *messageProcessor) SendGroup(
func (p *MessageProcessor) SendGroup(
ctx context.Context,
recipients []*ecdsa.PublicKey,
rawMessage *RawMessage,
@ -140,7 +146,7 @@ func (p *messageProcessor) SendGroup(
}
// sendPrivate sends data to the recipient identifying with a given public key.
func (p *messageProcessor) sendPrivate(
func (p *MessageProcessor) sendPrivate(
ctx context.Context,
recipient *ecdsa.PublicKey,
rawMessage *RawMessage,
@ -154,7 +160,7 @@ func (p *messageProcessor) sendPrivate(
messageID := v1protocol.MessageID(&p.identity.PublicKey, wrappedMessage)
if p.featureFlags.datasync {
if p.featureFlags.Datasync {
if err := p.addToDataSync(recipient, wrappedMessage); err != nil {
return nil, errors.Wrap(err, "failed to send message with datasync")
}
@ -180,7 +186,7 @@ func (p *messageProcessor) sendPrivate(
}
// sendPairInstallation sends data to the recipients, using DH
func (p *messageProcessor) SendPairInstallation(
func (p *MessageProcessor) SendPairInstallation(
ctx context.Context,
recipient *ecdsa.PublicKey,
rawMessage *RawMessage,
@ -212,7 +218,7 @@ func (p *messageProcessor) SendPairInstallation(
// EncodeMembershipUpdate takes a group and an optional chat message and returns the protobuf representation to be sent on the wire.
// All the events in a group are encoded and added to the payload
func (p *messageProcessor) EncodeMembershipUpdate(
func (p *MessageProcessor) EncodeMembershipUpdate(
group *v1protocol.Group,
chatMessage *protobuf.ChatMessage,
) ([]byte, error) {
@ -231,7 +237,7 @@ func (p *messageProcessor) EncodeMembershipUpdate(
}
// SendPublic takes encoded data, encrypts it and sends through the wire.
func (p *messageProcessor) SendPublic(
func (p *MessageProcessor) SendPublic(
ctx context.Context,
chatName string,
rawMessage *RawMessage,
@ -262,12 +268,12 @@ func (p *messageProcessor) SendPublic(
return messageID, nil
}
// handleMessages expects a whisper message as input, and it will go through
// HandleMessages expects a whisper message as input, and it will go through
// a series of transformations until the message is parsed into an application
// layer message, or in case of Raw methods, the processing stops at the layer
// before.
// It returns an error only if the processing of required steps failed.
func (p *messageProcessor) handleMessages(shhMessage *types.Message, applicationLayer bool) ([]*v1protocol.StatusMessage, error) {
func (p *MessageProcessor) HandleMessages(shhMessage *types.Message, applicationLayer bool) ([]*v1protocol.StatusMessage, error) {
logger := p.logger.With(zap.String("site", "handleMessages"))
hlogger := logger.With(zap.ByteString("hash", shhMessage.Hash))
var statusMessage v1protocol.StatusMessage
@ -305,7 +311,7 @@ func (p *messageProcessor) handleMessages(shhMessage *types.Message, application
return statusMessages, nil
}
func (p *messageProcessor) handleEncryptionLayer(ctx context.Context, message *v1protocol.StatusMessage) error {
func (p *MessageProcessor) handleEncryptionLayer(ctx context.Context, message *v1protocol.StatusMessage) error {
logger := p.logger.With(zap.String("site", "handleEncryptionLayer"))
publicKey := message.SigPubKey()
@ -322,7 +328,7 @@ func (p *messageProcessor) handleEncryptionLayer(ctx context.Context, message *v
return nil
}
func (p *messageProcessor) handleErrDeviceNotFound(ctx context.Context, publicKey *ecdsa.PublicKey) error {
func (p *MessageProcessor) handleErrDeviceNotFound(ctx context.Context, publicKey *ecdsa.PublicKey) error {
now := time.Now().Unix()
advertise, err := p.protocol.ShouldAdvertiseBundle(publicKey, now)
if err != nil {
@ -351,7 +357,7 @@ func (p *messageProcessor) handleErrDeviceNotFound(ctx context.Context, publicKe
return nil
}
func (p *messageProcessor) wrapMessageV1(rawMessage *RawMessage) ([]byte, error) {
func (p *MessageProcessor) wrapMessageV1(rawMessage *RawMessage) ([]byte, error) {
wrappedMessage, err := v1protocol.WrapMessageV1(rawMessage.Payload, rawMessage.MessageType, p.identity)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
@ -359,7 +365,7 @@ func (p *messageProcessor) wrapMessageV1(rawMessage *RawMessage) ([]byte, error)
return wrappedMessage, nil
}
func (p *messageProcessor) addToDataSync(publicKey *ecdsa.PublicKey, message []byte) error {
func (p *MessageProcessor) addToDataSync(publicKey *ecdsa.PublicKey, message []byte) error {
groupID := datasync.ToOneToOneGroupID(&p.identity.PublicKey, publicKey)
peerID := datasyncpeer.PublicKeyToPeerID(*publicKey)
exist, err := p.datasync.IsPeerInGroup(groupID, peerID)
@ -381,7 +387,7 @@ func (p *messageProcessor) addToDataSync(publicKey *ecdsa.PublicKey, message []b
// sendDataSync sends a message scheduled by the data sync layer.
// Data Sync layer calls this method "dispatch" function.
func (p *messageProcessor) sendDataSync(ctx context.Context, publicKey *ecdsa.PublicKey, encodedMessage []byte, payload *datasyncproto.Payload) error {
func (p *MessageProcessor) sendDataSync(ctx context.Context, publicKey *ecdsa.PublicKey, encodedMessage []byte, payload *datasyncproto.Payload) error {
messageIDs := make([][]byte, 0, len(payload.Messages))
for _, payload := range payload.Messages {
messageIDs = append(messageIDs, v1protocol.MessageID(&p.identity.PublicKey, payload.Body))
@ -403,8 +409,8 @@ func (p *messageProcessor) sendDataSync(ctx context.Context, publicKey *ecdsa.Pu
}
// sendMessageSpec analyses the spec properties and selects a proper transport method.
func (p *messageProcessor) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([]byte, *types.NewMessage, error) {
newMessage, err := messageSpecToWhisper(messageSpec)
func (p *MessageProcessor) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([]byte, *types.NewMessage, error) {
newMessage, err := MessageSpecToWhisper(messageSpec)
if err != nil {
return nil, nil, err
}
@ -425,17 +431,31 @@ func (p *messageProcessor) sendMessageSpec(ctx context.Context, publicKey *ecdsa
return nil, nil, err
}
if p.onMessageSpecSent != nil {
sentMessage := &SentMessage{
PublicKey: publicKey,
Spec: messageSpec,
MessageIDs: messageIDs,
}
if err := p.onMessageSpecSent(publicKey, messageSpec, messageIDs); err != nil {
return nil, nil, err
// Publish on channels, drop if buffer is full
for _, c := range p.subscriptions {
select {
case c <- sentMessage:
default:
logger.Warn("subscription channel full, dropping message")
}
}
return hash, newMessage, nil
}
func messageSpecToWhisper(spec *encryption.ProtocolMessageSpec) (*types.NewMessage, error) {
func (p *MessageProcessor) Subscribe() <-chan *SentMessage {
c := make(chan *SentMessage, 100)
p.subscriptions = append(p.subscriptions, c)
return c
}
func MessageSpecToWhisper(spec *encryption.ProtocolMessageSpec) (*types.NewMessage, error) {
var newMessage *types.NewMessage
payload, err := proto.Marshal(spec.Message)
@ -463,8 +483,8 @@ func calculatePoW(payload []byte) float64 {
return whisperDefaultPoW
}
// isPubKeyEqual checks that two public keys are equal
func isPubKeyEqual(a, b *ecdsa.PublicKey) bool {
// IsPubKeyEqual checks that two public keys are equal
func IsPubKeyEqual(a, b *ecdsa.PublicKey) bool {
// the curve is always the same, just compare the points
return a.X.Cmp(b.X) == 0 && a.Y.Cmp(b.Y) == 0
}

View File

@ -1,6 +1,7 @@
package protocol
package common
import (
"github.com/status-im/status-go/protocol"
"io/ioutil"
"os"
"path/filepath"
@ -35,12 +36,12 @@ type MessageProcessorSuite struct {
processor *messageProcessor
tmpDir string
testMessage Message
testMessage protocol.Message
logger *zap.Logger
}
func (s *MessageProcessorSuite) SetupTest() {
s.testMessage = Message{
s.testMessage = protocol.Message{
ChatMessage: protobuf.ChatMessage{
Text: "abc123",
ChatId: "testing-adamb",
@ -81,8 +82,8 @@ func (s *MessageProcessorSuite) SetupTest() {
whisperConfig.MinimumAcceptedPOW = 0
shh := whisper.New(&whisperConfig)
s.Require().NoError(shh.Start(nil))
config := &config{}
s.Require().NoError(WithDatasync()(config))
config := &protocol.config{}
s.Require().NoError(protocol.WithDatasync()(config))
whisperTransport, err := transport.NewTransport(
gethbridge.NewGethWhisperWrapper(shh),
@ -100,7 +101,7 @@ func (s *MessageProcessorSuite) SetupTest() {
encryptionProtocol,
whisperTransport,
s.logger,
featureFlags{},
protocol.featureFlags{},
nil,
)
s.Require().NoError(err)

View File

@ -0,0 +1,21 @@
package common
import (
"crypto/ecdsa"
"github.com/status-im/status-go/protocol/protobuf"
)
// RawMessage represent a sent or received message, kept for being able
// to re-send/propagate
type RawMessage struct {
ID string
LocalChatID string
LastSent uint64
SendCount int
Sent bool
ResendAutomatically bool
MessageType protobuf.ApplicationMetadataMessage_Type
Payload []byte
Recipients []*ecdsa.PublicKey
}

View File

@ -109,20 +109,6 @@ type Message struct {
SigPubKey *ecdsa.PublicKey `json:"-"`
}
// RawMessage represent a sent or received message, kept for being able
// to re-send/propagate
type RawMessage struct {
ID string
LocalChatID string
LastSent uint64
SendCount int
Sent bool
ResendAutomatically bool
MessageType protobuf.ApplicationMetadataMessage_Type
Payload []byte
Recipients []*ecdsa.PublicKey
}
func (m *Message) MarshalJSON() ([]byte, error) {
type StickerAlias struct {
Hash string `json:"hash"`

View File

@ -4,6 +4,7 @@ import (
"crypto/ecdsa"
"encoding/hex"
"fmt"
"github.com/status-im/status-go/protocol/common"
"github.com/pkg/errors"
"go.uber.org/zap"
@ -146,7 +147,7 @@ func (m *MessageHandler) handleCommandMessage(state *ReceivedMessageState, messa
message.LocalChatID = chat.ID
// Increase unviewed count
if !isPubKeyEqual(message.SigPubKey, &m.identity.PublicKey) {
if !common.IsPubKeyEqual(message.SigPubKey, &m.identity.PublicKey) {
chat.UnviewedMessagesCount++
message.OutgoingStatus = ""
} else {
@ -332,7 +333,7 @@ func (m *MessageHandler) HandleChatMessage(state *ReceivedMessageState) error {
receivedMessage.LocalChatID = chat.ID
// Increase unviewed count
if !isPubKeyEqual(receivedMessage.SigPubKey, &m.identity.PublicKey) {
if !common.IsPubKeyEqual(receivedMessage.SigPubKey, &m.identity.PublicKey) {
chat.UnviewedMessagesCount++
} else {
// Our own message, mark as sent
@ -582,7 +583,7 @@ func (m *MessageHandler) matchMessage(message *Message, chats map[string]*Chat,
return nil, errors.New("received a public message from non-existing chat")
}
return chat, nil
case message.MessageType == protobuf.ChatMessage_ONE_TO_ONE && isPubKeyEqual(message.SigPubKey, &m.identity.PublicKey):
case message.MessageType == protobuf.ChatMessage_ONE_TO_ONE && common.IsPubKeyEqual(message.SigPubKey, &m.identity.PublicKey):
// It's a private message coming from us so we rely on Message.ChatID
// If chat does not exist, it should be created to support multidevice synchronization.
chatID := message.ChatId

View File

@ -3,6 +3,7 @@ package protocol
import (
"context"
"crypto/ecdsa"
"github.com/status-im/status-go/protocol/common"
"io/ioutil"
"math/rand"
"os"
@ -56,13 +57,13 @@ type Messenger struct {
persistence *sqlitePersistence
transport transport.Transport
encryptor *encryption.Protocol
processor *messageProcessor
processor *common.MessageProcessor
handler *MessageHandler
pushNotificationClient *push_notification_client.Client
pushNotificationServer *push_notification_server.Server
logger *zap.Logger
verifyTransactionClient EthClient
featureFlags featureFlags
featureFlags common.FeatureFlags
messagesPersistenceEnabled bool
shutdownTasks []func() error
systemMessagesTranslations map[protobuf.MembershipUpdateEvent_EventType]string
@ -91,13 +92,6 @@ func (m *MessengerResponse) IsEmpty() bool {
return len(m.Chats) == 0 && len(m.Messages) == 0 && len(m.Contacts) == 0 && len(m.Installations) == 0
}
type featureFlags struct {
// datasync indicates whether direct messages should be sent exclusively
// using datasync, breaking change for non-v1 clients. Public messages
// are not impacted
datasync bool
}
type dbConfig struct {
dbPath string
dbKey string
@ -155,7 +149,7 @@ func NewMessenger(
slogger := logger.With(zap.String("site", "onSendContactCodeHandler"))
slogger.Debug("received a SendContactCode request")
newMessage, err := messageSpecToWhisper(messageSpec)
newMessage, err := common.MessageSpecToWhisper(messageSpec)
if err != nil {
slogger.Warn("failed to convert spec to Whisper message", zap.Error(err))
return
@ -238,28 +232,27 @@ func NewMessenger(
logger,
)
pushNotificationClientPersistence := push_notification_client.NewPersistence(database)
pushNotificationClient := push_notification_client.New(pushNotificationClientPersistence)
var pushNotificationServer *push_notification_server.Server
if c.pushNotificationServerConfig != nil {
pushNotificationServerPersistence := push_notification_server.NewSQLitePersistence(database)
pushNotificationServer = push_notification_server.New(c.pushNotificationServerConfig, pushNotificationServerPersistence)
}
processor, err := newMessageProcessor(
processor, err := common.NewMessageProcessor(
identity,
database,
encryptionProtocol,
transp,
logger,
c.featureFlags,
pushNotificationClient.HandleMessageSent,
)
if err != nil {
return nil, errors.Wrap(err, "failed to create messageProcessor")
}
var pushNotificationServer *push_notification_server.Server
if c.pushNotificationServerConfig != nil {
pushNotificationServerPersistence := push_notification_server.NewSQLitePersistence(database)
pushNotificationServer = push_notification_server.New(c.pushNotificationServerConfig, pushNotificationServerPersistence, processor)
}
pushNotificationClientPersistence := push_notification_client.NewPersistence(database)
pushNotificationClient := push_notification_client.New(pushNotificationClientPersistence, processor)
handler := newMessageHandler(identity, logger, &sqlitePersistence{db: database})
messenger = &Messenger{
@ -596,7 +589,7 @@ func (m *Messenger) CreateGroupChatWithMembers(ctx context.Context, name string,
}
m.allChats[chat.ID] = &chat
_, err = m.dispatchMessage(ctx, &RawMessage{
_, err = m.dispatchMessage(ctx, &common.RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_MEMBERSHIP_UPDATE_MESSAGE,
@ -662,7 +655,7 @@ func (m *Messenger) RemoveMemberFromGroupChat(ctx context.Context, chatID string
if err != nil {
return nil, err
}
_, err = m.dispatchMessage(ctx, &RawMessage{
_, err = m.dispatchMessage(ctx, &common.RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_MEMBERSHIP_UPDATE_MESSAGE,
@ -725,7 +718,7 @@ func (m *Messenger) AddMembersToGroupChat(ctx context.Context, chatID string, me
if err != nil {
return nil, err
}
_, err = m.dispatchMessage(ctx, &RawMessage{
_, err = m.dispatchMessage(ctx, &common.RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_MEMBERSHIP_UPDATE_MESSAGE,
@ -790,7 +783,7 @@ func (m *Messenger) ChangeGroupChatName(ctx context.Context, chatID string, name
if err != nil {
return nil, err
}
_, err = m.dispatchMessage(ctx, &RawMessage{
_, err = m.dispatchMessage(ctx, &common.RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_MEMBERSHIP_UPDATE_MESSAGE,
@ -856,7 +849,7 @@ func (m *Messenger) AddAdminsToGroupChat(ctx context.Context, chatID string, mem
if err != nil {
return nil, err
}
_, err = m.dispatchMessage(ctx, &RawMessage{
_, err = m.dispatchMessage(ctx, &common.RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_MEMBERSHIP_UPDATE_MESSAGE,
@ -924,7 +917,7 @@ func (m *Messenger) ConfirmJoiningGroup(ctx context.Context, chatID string) (*Me
if err != nil {
return nil, err
}
_, err = m.dispatchMessage(ctx, &RawMessage{
_, err = m.dispatchMessage(ctx, &common.RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_MEMBERSHIP_UPDATE_MESSAGE,
@ -992,7 +985,7 @@ func (m *Messenger) LeaveGroupChat(ctx context.Context, chatID string, remove bo
if err != nil {
return nil, err
}
_, err = m.dispatchMessage(ctx, &RawMessage{
_, err = m.dispatchMessage(ctx, &common.RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_MEMBERSHIP_UPDATE_MESSAGE,
@ -1178,7 +1171,7 @@ func (m *Messenger) ReSendChatMessage(ctx context.Context, messageID string) err
return errors.New("chat not found")
}
_, err = m.dispatchMessage(ctx, &RawMessage{
_, err = m.dispatchMessage(ctx, &common.RawMessage{
LocalChatID: chat.ID,
Payload: message.Payload,
MessageType: message.MessageType,
@ -1198,7 +1191,7 @@ func (m *Messenger) hasPairedDevices() bool {
}
// sendToPairedDevices will check if we have any paired devices and send to them if necessary
func (m *Messenger) sendToPairedDevices(ctx context.Context, spec *RawMessage) error {
func (m *Messenger) sendToPairedDevices(ctx context.Context, spec *common.RawMessage) error {
hasPairedDevices := m.hasPairedDevices()
// We send a message to any paired device
if hasPairedDevices {
@ -1210,7 +1203,7 @@ func (m *Messenger) sendToPairedDevices(ctx context.Context, spec *RawMessage) e
return nil
}
func (m *Messenger) dispatchPairInstallationMessage(ctx context.Context, spec *RawMessage) ([]byte, error) {
func (m *Messenger) dispatchPairInstallationMessage(ctx context.Context, spec *common.RawMessage) ([]byte, error) {
var err error
var id []byte
@ -1229,7 +1222,7 @@ func (m *Messenger) dispatchPairInstallationMessage(ctx context.Context, spec *R
return id, nil
}
func (m *Messenger) dispatchMessage(ctx context.Context, spec *RawMessage) ([]byte, error) {
func (m *Messenger) dispatchMessage(ctx context.Context, spec *common.RawMessage) ([]byte, error) {
var err error
var id []byte
logger := m.logger.With(zap.String("site", "dispatchMessage"), zap.String("chatID", spec.LocalChatID))
@ -1244,7 +1237,7 @@ func (m *Messenger) dispatchMessage(ctx context.Context, spec *RawMessage) ([]by
if err != nil {
return nil, err
}
if !isPubKeyEqual(publicKey, &m.identity.PublicKey) {
if !common.IsPubKeyEqual(publicKey, &m.identity.PublicKey) {
id, err = m.processor.SendPrivate(ctx, publicKey, spec)
if err != nil {
@ -1279,7 +1272,7 @@ func (m *Messenger) dispatchMessage(ctx context.Context, spec *RawMessage) ([]by
// Filter out my key from the recipients
n := 0
for _, recipient := range spec.Recipients {
if !isPubKeyEqual(recipient, &m.identity.PublicKey) {
if !common.IsPubKeyEqual(recipient, &m.identity.PublicKey) {
spec.Recipients[n] = recipient
n++
}
@ -1379,7 +1372,7 @@ func (m *Messenger) SendChatMessage(ctx context.Context, message *Message) (*Mes
return nil, errors.New("chat type not supported")
}
id, err := m.dispatchMessage(ctx, &RawMessage{
id, err := m.dispatchMessage(ctx, &common.RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_CHAT_MESSAGE,
@ -1494,7 +1487,7 @@ func (m *Messenger) sendContactUpdate(ctx context.Context, chatID, ensName, prof
return nil, err
}
_, err = m.dispatchMessage(ctx, &RawMessage{
_, err = m.dispatchMessage(ctx, &common.RawMessage{
LocalChatID: chatID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_CONTACT_UPDATE,
@ -1588,7 +1581,7 @@ func (m *Messenger) SendPairInstallation(ctx context.Context) (*MessengerRespons
return nil, err
}
_, err = m.dispatchPairInstallationMessage(ctx, &RawMessage{
_, err = m.dispatchPairInstallationMessage(ctx, &common.RawMessage{
LocalChatID: chatID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_PAIR_INSTALLATION,
@ -1635,7 +1628,7 @@ func (m *Messenger) syncPublicChat(ctx context.Context, publicChat *Chat) error
return err
}
_, err = m.dispatchMessage(ctx, &RawMessage{
_, err = m.dispatchMessage(ctx, &common.RawMessage{
LocalChatID: chatID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_SYNC_INSTALLATION_PUBLIC_CHAT,
@ -1678,7 +1671,7 @@ func (m *Messenger) syncContact(ctx context.Context, contact *Contact) error {
return err
}
_, err = m.dispatchMessage(ctx, &RawMessage{
_, err = m.dispatchMessage(ctx, &common.RawMessage{
LocalChatID: chatID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_SYNC_INSTALLATION_CONTACT,
@ -1758,7 +1751,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
for _, messages := range chatWithMessages {
for _, shhMessage := range messages {
// TODO: fix this to use an exported method.
statusMessages, err := m.processor.handleMessages(shhMessage, true)
statusMessages, err := m.processor.HandleMessages(shhMessage, true)
if err != nil {
logger.Info("failed to decode messages", zap.Error(err))
continue
@ -1827,7 +1820,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
continue
}
case protobuf.PairInstallation:
if !isPubKeyEqual(messageState.CurrentMessageState.PublicKey, &m.identity.PublicKey) {
if !common.IsPubKeyEqual(messageState.CurrentMessageState.PublicKey, &m.identity.PublicKey) {
logger.Warn("not coming from us, ignoring")
continue
}
@ -1840,7 +1833,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
case protobuf.SyncInstallationContact:
if !isPubKeyEqual(messageState.CurrentMessageState.PublicKey, &m.identity.PublicKey) {
if !common.IsPubKeyEqual(messageState.CurrentMessageState.PublicKey, &m.identity.PublicKey) {
logger.Warn("not coming from us, ignoring")
continue
}
@ -1853,7 +1846,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
continue
}
case protobuf.SyncInstallationPublicChat:
if !isPubKeyEqual(messageState.CurrentMessageState.PublicKey, &m.identity.PublicKey) {
if !common.IsPubKeyEqual(messageState.CurrentMessageState.PublicKey, &m.identity.PublicKey) {
logger.Warn("not coming from us, ignoring")
continue
}
@ -1926,6 +1919,43 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
logger.Warn("failed to handle ContactUpdate", zap.Error(err))
continue
}
case protobuf.PushNotificationRegistration:
logger.Debug("Received PushNotificationRegistration")
if m.pushNotificationServer == nil {
continue
}
logger.Debug("Handling PushNotificationRegistration")
// TODO: Compare DST with Identity
if err := m.pushNotificationServer.HandlePushNotificationRegistration2(publicKey, msg.ParsedMessage.([]byte)); err != nil {
logger.Warn("failed to handle PushNotificationRegistration", zap.Error(err))
}
// We continue in any case, no changes to messenger
continue
case protobuf.PushNotificationQuery:
logger.Debug("Received PushNotificationQuery")
if m.pushNotificationServer == nil {
continue
}
logger.Debug("Handling PushNotificationQuery")
// TODO: Compare DST with Identity
if err := m.pushNotificationServer.HandlePushNotificationQuery2(publicKey, msg.ParsedMessage.(protobuf.PushNotificationQuery)); err != nil {
logger.Warn("failed to handle PushNotificationQuery", zap.Error(err))
}
// We continue in any case, no changes to messenger
continue
case protobuf.PushNotificationRequest:
logger.Debug("Received PushNotificationRequest")
if m.pushNotificationServer == nil {
continue
}
logger.Debug("Handling PushNotificationRequest")
// TODO: Compare DST with Identity
if err := m.pushNotificationServer.HandlePushNotificationRequest2(publicKey, msg.ParsedMessage.(protobuf.PushNotificationRequest)); err != nil {
logger.Warn("failed to handle PushNotificationRequest", zap.Error(err))
}
// We continue in any case, no changes to messenger
continue
default:
logger.Debug("message not handled")
@ -2244,7 +2274,7 @@ func (m *Messenger) RequestTransaction(ctx context.Context, chatID, value, contr
if err != nil {
return nil, err
}
id, err := m.dispatchMessage(ctx, &RawMessage{
id, err := m.dispatchMessage(ctx, &common.RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_REQUEST_TRANSACTION,
@ -2320,7 +2350,7 @@ func (m *Messenger) RequestAddressForTransaction(ctx context.Context, chatID, fr
if err != nil {
return nil, err
}
id, err := m.dispatchMessage(ctx, &RawMessage{
id, err := m.dispatchMessage(ctx, &common.RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_REQUEST_ADDRESS_FOR_TRANSACTION,
@ -2422,7 +2452,7 @@ func (m *Messenger) AcceptRequestAddressForTransaction(ctx context.Context, mess
return nil, err
}
newMessageID, err := m.dispatchMessage(ctx, &RawMessage{
newMessageID, err := m.dispatchMessage(ctx, &common.RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_ACCEPT_REQUEST_ADDRESS_FOR_TRANSACTION,
@ -2505,7 +2535,7 @@ func (m *Messenger) DeclineRequestTransaction(ctx context.Context, messageID str
return nil, err
}
newMessageID, err := m.dispatchMessage(ctx, &RawMessage{
newMessageID, err := m.dispatchMessage(ctx, &common.RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_DECLINE_REQUEST_TRANSACTION,
@ -2587,7 +2617,7 @@ func (m *Messenger) DeclineRequestAddressForTransaction(ctx context.Context, mes
return nil, err
}
newMessageID, err := m.dispatchMessage(ctx, &RawMessage{
newMessageID, err := m.dispatchMessage(ctx, &common.RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_DECLINE_REQUEST_ADDRESS_FOR_TRANSACTION,
@ -2684,7 +2714,7 @@ func (m *Messenger) AcceptRequestTransaction(ctx context.Context, transactionHas
return nil, err
}
newMessageID, err := m.dispatchMessage(ctx, &RawMessage{
newMessageID, err := m.dispatchMessage(ctx, &common.RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_SEND_TRANSACTION,
@ -2761,7 +2791,7 @@ func (m *Messenger) SendTransaction(ctx context.Context, chatID, value, contract
return nil, err
}
newMessageID, err := m.dispatchMessage(ctx, &RawMessage{
newMessageID, err := m.dispatchMessage(ctx, &common.RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_SEND_TRANSACTION,

View File

@ -2,6 +2,7 @@ package protocol
import (
"database/sql"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/encryption"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/push_notification_server"
@ -23,7 +24,7 @@ type config struct {
envelopesMonitorConfig *transport.EnvelopesMonitorConfig
messagesPersistenceEnabled bool
featureFlags featureFlags
featureFlags common.FeatureFlags
// A path to a database or a database instance is required.
// The database instance has a higher priority.
@ -99,7 +100,7 @@ func WithPushNotificationServerConfig(pushNotificationServerConfig *push_notific
func WithDatasync() func(c *config) error {
return func(c *config) error {
c.featureFlags.datasync = true
c.featureFlags.Datasync = true
return nil
}
}

View File

@ -7,6 +7,7 @@ import (
"encoding/gob"
"github.com/pkg/errors"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/eth-node/crypto"
)
@ -377,7 +378,7 @@ func (db sqlitePersistence) Contacts() ([]*Contact, error) {
return response, nil
}
func (db sqlitePersistence) SaveRawMessage(message *RawMessage) error {
func (db sqlitePersistence) SaveRawMessage(message *common.RawMessage) error {
var pubKeys [][]byte
for _, pk := range message.Recipients {
pubKeys = append(pubKeys, crypto.CompressPubkey(pk))
@ -417,10 +418,10 @@ func (db sqlitePersistence) SaveRawMessage(message *RawMessage) error {
return err
}
func (db sqlitePersistence) RawMessageByID(id string) (*RawMessage, error) {
func (db sqlitePersistence) RawMessageByID(id string) (*common.RawMessage, error) {
var rawPubKeys [][]byte
var encodedRecipients []byte
message := &RawMessage{}
message := &common.RawMessage{}
err := db.db.QueryRow(`
SELECT

View File

@ -5,6 +5,7 @@ import (
"crypto/cipher"
"crypto/ecdsa"
"crypto/rand"
"errors"
"io"
"golang.org/x/crypto/sha3"
@ -12,7 +13,7 @@ import (
"github.com/google/uuid"
"github.com/status-im/status-go/eth-node/crypto/ecies"
"github.com/status-im/status-go/protocol/encryption"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/protobuf"
)
@ -46,6 +47,7 @@ type Config struct {
type Client struct {
persistence *Persistence
quit chan struct{}
config *Config
// lastPushNotificationVersion is the latest known push notification version
@ -58,10 +60,44 @@ type Client struct {
// randomReader only used for testing so we have deterministic encryption
reader io.Reader
//messageProcessor is a message processor used to send and being notified of messages
messageProcessor *common.MessageProcessor
}
func New(persistence *Persistence) *Client {
return &Client{persistence: persistence, reader: rand.Reader}
func New(persistence *Persistence, processor *common.MessageProcessor) *Client {
return &Client{
quit: make(chan struct{}),
messageProcessor: processor,
persistence: persistence,
reader: rand.Reader}
}
func (c *Client) Start() error {
if c.messageProcessor == nil {
return errors.New("can't start, missing message processor")
}
go func() {
subscription := c.messageProcessor.Subscribe()
for {
select {
case m := <-subscription:
if err := c.HandleMessageSent(m); err != nil {
// TODO: log
}
case <-c.quit:
return
}
}
}()
return nil
}
func (c *Client) Stop() error {
close(c.quit)
return nil
}
// This likely will return a channel as it's an asynchrous operation
@ -78,7 +114,7 @@ func sendPushNotificationTo(publicKey *ecdsa.PublicKey, chatID string) error {
// 1) Check we have reasonably fresh push notifications info
// 2) Otherwise it should fetch them
// 3) Send a push notification to the devices in question
func (p *Client) HandleMessageSent(publicKey *ecdsa.PublicKey, spec *encryption.ProtocolMessageSpec, messageIDs [][]byte) error {
func (p *Client) HandleMessageSent(sentMessage *common.SentMessage) error {
return nil
}

View File

@ -8,6 +8,7 @@ import (
"github.com/google/uuid"
"github.com/status-im/status-go/eth-node/crypto/ecies"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/protobuf"
"go.uber.org/zap"
)
@ -25,12 +26,13 @@ type Config struct {
}
type Server struct {
persistence Persistence
config *Config
persistence Persistence
config *Config
messageProcessor *common.MessageProcessor
}
func New(config *Config, persistence Persistence) *Server {
return &Server{persistence: persistence, config: config}
func New(config *Config, persistence Persistence, messageProcessor *common.MessageProcessor) *Server {
return &Server{persistence: persistence, config: config, messageProcessor: messageProcessor}
}
func (p *Server) generateSharedKey(publicKey *ecdsa.PublicKey) ([]byte, error) {
@ -239,3 +241,19 @@ func (p *Server) HandlePushNotificationRegistration(publicKey *ecdsa.PublicKey,
return response
}
func (p *Server) HandlePushNotificationRegistration2(publicKey *ecdsa.PublicKey, payload []byte) error {
return nil
}
func (p *Server) HandlePushNotificationQuery2(publicKey *ecdsa.PublicKey, query protobuf.PushNotificationQuery) error {
return nil
}
func (p *Server) HandlePushNotificationRequest2(publicKey *ecdsa.PublicKey,
request protobuf.PushNotificationRequest) error {
return nil
}

View File

@ -56,7 +56,7 @@ func (s *ServerSuite) SetupTest() {
Identity: identity,
}
s.server = New(config, s.persistence)
s.server = New(config, s.persistence, nil)
sharedKey, err := s.server.generateSharedKey(&s.key.PublicKey)
s.Require().NoError(err)