Merge message handler with messenger

This commit is contained in:
Andrea Maria Piana 2021-06-07 14:38:13 +02:00
parent 58a8872c57
commit 075ee9c29f
3 changed files with 47 additions and 225 deletions

View File

@ -12,14 +12,11 @@ import (
"github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/images" "github.com/status-im/status-go/images"
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/communities"
"github.com/status-im/status-go/protocol/encryption/multidevice" "github.com/status-im/status-go/protocol/encryption/multidevice"
"github.com/status-im/status-go/protocol/ens"
"github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/transport"
v1protocol "github.com/status-im/status-go/protocol/v1" v1protocol "github.com/status-im/status-go/protocol/v1"
) )
@ -31,31 +28,10 @@ const (
var ErrMessageNotAllowed = errors.New("message from a non-contact") var ErrMessageNotAllowed = errors.New("message from a non-contact")
type MessageHandler struct {
identity *ecdsa.PrivateKey
persistence *sqlitePersistence
settings *accounts.Database
transport *transport.Transport
ensVerifier *ens.Verifier
communitiesManager *communities.Manager
logger *zap.Logger
}
func newMessageHandler(identity *ecdsa.PrivateKey, logger *zap.Logger, persistence *sqlitePersistence, communitiesManager *communities.Manager, transport *transport.Transport, ensVerifier *ens.Verifier, settings *accounts.Database) *MessageHandler {
return &MessageHandler{
identity: identity,
persistence: persistence,
communitiesManager: communitiesManager,
settings: settings,
ensVerifier: ensVerifier,
transport: transport,
logger: logger}
}
// HandleMembershipUpdate updates a Chat instance according to the membership updates. // HandleMembershipUpdate updates a Chat instance according to the membership updates.
// It retrieves chat, if exists, and merges membership updates from the message. // It retrieves chat, if exists, and merges membership updates from the message.
// Finally, the Chat is updated with the new group events. // Finally, the Chat is updated with the new group events.
func (m *MessageHandler) HandleMembershipUpdate(messageState *ReceivedMessageState, chat *Chat, rawMembershipUpdate protobuf.MembershipUpdateMessage, translations *systemMessageTranslationsMap) error { func (m *Messenger) HandleMembershipUpdate(messageState *ReceivedMessageState, chat *Chat, rawMembershipUpdate protobuf.MembershipUpdateMessage, translations *systemMessageTranslationsMap) error {
var group *v1protocol.Group var group *v1protocol.Group
var err error var err error
@ -179,7 +155,7 @@ func (m *MessageHandler) HandleMembershipUpdate(messageState *ReceivedMessageSta
return nil return nil
} }
func (m *MessageHandler) createMessageNotification(chat *Chat, messageState *ReceivedMessageState) { func (m *Messenger) createMessageNotification(chat *Chat, messageState *ReceivedMessageState) {
var notificationType ActivityCenterType var notificationType ActivityCenterType
if chat.OneToOne() { if chat.OneToOne() {
@ -202,7 +178,7 @@ func (m *MessageHandler) createMessageNotification(chat *Chat, messageState *Rec
} }
} }
func (m *MessageHandler) handleCommandMessage(state *ReceivedMessageState, message *common.Message) error { func (m *Messenger) handleCommandMessage(state *ReceivedMessageState, message *common.Message) error {
message.ID = state.CurrentMessageState.MessageID message.ID = state.CurrentMessageState.MessageID
message.From = state.CurrentMessageState.Contact.ID message.From = state.CurrentMessageState.Contact.ID
message.Alias = state.CurrentMessageState.Contact.Alias message.Alias = state.CurrentMessageState.Contact.Alias
@ -272,7 +248,7 @@ func (m *MessageHandler) handleCommandMessage(state *ReceivedMessageState, messa
return nil return nil
} }
func (m *MessageHandler) HandleSyncInstallationContact(state *ReceivedMessageState, message protobuf.SyncInstallationContact) error { func (m *Messenger) HandleSyncInstallationContact(state *ReceivedMessageState, message protobuf.SyncInstallationContact) error {
chat, ok := state.AllChats.Load(state.CurrentMessageState.Contact.ID) chat, ok := state.AllChats.Load(state.CurrentMessageState.Contact.ID)
if !ok { if !ok {
chat = OneToOneFromPublicKey(state.CurrentMessageState.PublicKey, state.Timesource) chat = OneToOneFromPublicKey(state.CurrentMessageState.PublicKey, state.Timesource)
@ -310,7 +286,7 @@ func (m *MessageHandler) HandleSyncInstallationContact(state *ReceivedMessageSta
return nil return nil
} }
func (m *MessageHandler) HandleSyncInstallationPublicChat(state *ReceivedMessageState, message protobuf.SyncInstallationPublicChat) *Chat { func (m *Messenger) HandleSyncInstallationPublicChat(state *ReceivedMessageState, message protobuf.SyncInstallationPublicChat) *Chat {
chatID := message.Id chatID := message.Id
_, ok := state.AllChats.Load(chatID) _, ok := state.AllChats.Load(chatID)
if ok { if ok {
@ -329,7 +305,7 @@ func (m *MessageHandler) HandleSyncInstallationPublicChat(state *ReceivedMessage
return chat return chat
} }
func (m *MessageHandler) HandlePinMessage(state *ReceivedMessageState, message protobuf.PinMessage) error { func (m *Messenger) HandlePinMessage(state *ReceivedMessageState, message protobuf.PinMessage) error {
logger := m.logger.With(zap.String("site", "HandlePinMessage")) logger := m.logger.With(zap.String("site", "HandlePinMessage"))
logger.Info("Handling pin message") logger.Info("Handling pin message")
@ -382,7 +358,7 @@ func (m *MessageHandler) HandlePinMessage(state *ReceivedMessageState, message p
return nil return nil
} }
func (m *MessageHandler) HandleContactUpdate(state *ReceivedMessageState, message protobuf.ContactUpdate) error { func (m *Messenger) HandleContactUpdate(state *ReceivedMessageState, message protobuf.ContactUpdate) error {
logger := m.logger.With(zap.String("site", "HandleContactUpdate")) logger := m.logger.With(zap.String("site", "HandleContactUpdate"))
contact := state.CurrentMessageState.Contact contact := state.CurrentMessageState.Contact
chat, ok := state.AllChats.Load(contact.ID) chat, ok := state.AllChats.Load(contact.ID)
@ -428,7 +404,7 @@ func (m *MessageHandler) HandleContactUpdate(state *ReceivedMessageState, messag
return nil return nil
} }
func (m *MessageHandler) HandlePairInstallation(state *ReceivedMessageState, message protobuf.PairInstallation) error { func (m *Messenger) HandlePairInstallation(state *ReceivedMessageState, message protobuf.PairInstallation) error {
logger := m.logger.With(zap.String("site", "HandlePairInstallation")) logger := m.logger.With(zap.String("site", "HandlePairInstallation"))
if err := ValidateReceivedPairInstallation(&message, state.CurrentMessageState.WhisperTimestamp); err != nil { if err := ValidateReceivedPairInstallation(&message, state.CurrentMessageState.WhisperTimestamp); err != nil {
logger.Warn("failed to validate message", zap.Error(err)) logger.Warn("failed to validate message", zap.Error(err))
@ -454,7 +430,7 @@ func (m *MessageHandler) HandlePairInstallation(state *ReceivedMessageState, mes
} }
// HandleCommunityInvitation handles an community invitation // HandleCommunityInvitation handles an community invitation
func (m *MessageHandler) HandleCommunityInvitation(state *ReceivedMessageState, signer *ecdsa.PublicKey, invitation protobuf.CommunityInvitation, rawPayload []byte) error { func (m *Messenger) HandleCommunityInvitation(state *ReceivedMessageState, signer *ecdsa.PublicKey, invitation protobuf.CommunityInvitation, rawPayload []byte) error {
if invitation.PublicKey == nil { if invitation.PublicKey == nil {
return errors.New("invalid pubkey") return errors.New("invalid pubkey")
} }
@ -481,7 +457,7 @@ func (m *MessageHandler) HandleCommunityInvitation(state *ReceivedMessageState,
} }
// HandleCommunityRequestToJoin handles an community request to join // HandleCommunityRequestToJoin handles an community request to join
func (m *MessageHandler) HandleCommunityRequestToJoin(state *ReceivedMessageState, signer *ecdsa.PublicKey, requestToJoinProto protobuf.CommunityRequestToJoin) error { func (m *Messenger) HandleCommunityRequestToJoin(state *ReceivedMessageState, signer *ecdsa.PublicKey, requestToJoinProto protobuf.CommunityRequestToJoin) error {
if requestToJoinProto.CommunityId == nil { if requestToJoinProto.CommunityId == nil {
return errors.New("invalid community id") return errors.New("invalid community id")
} }
@ -508,7 +484,7 @@ func (m *MessageHandler) HandleCommunityRequestToJoin(state *ReceivedMessageStat
} }
// handleWrappedCommunityDescriptionMessage handles a wrapped community description // handleWrappedCommunityDescriptionMessage handles a wrapped community description
func (m *MessageHandler) handleWrappedCommunityDescriptionMessage(payload []byte) (*communities.CommunityResponse, error) { func (m *Messenger) handleWrappedCommunityDescriptionMessage(payload []byte) (*communities.CommunityResponse, error) {
return m.communitiesManager.HandleWrappedCommunityDescriptionMessage(payload) return m.communitiesManager.HandleWrappedCommunityDescriptionMessage(payload)
} }
@ -558,7 +534,7 @@ func (m *Messenger) HandleEditMessage(response *MessengerResponse, editMessage E
return nil return nil
} }
func (m *MessageHandler) HandleChatMessage(state *ReceivedMessageState) error { func (m *Messenger) HandleChatMessage(state *ReceivedMessageState) error {
logger := m.logger.With(zap.String("site", "handleChatMessage")) logger := m.logger.With(zap.String("site", "handleChatMessage"))
if err := ValidateReceivedChatMessage(&state.CurrentMessageState.Message, state.CurrentMessageState.WhisperTimestamp); err != nil { if err := ValidateReceivedChatMessage(&state.CurrentMessageState.Message, state.CurrentMessageState.WhisperTimestamp); err != nil {
logger.Warn("failed to validate message", zap.Error(err)) logger.Warn("failed to validate message", zap.Error(err))
@ -674,7 +650,7 @@ func (m *MessageHandler) HandleChatMessage(state *ReceivedMessageState) error {
return nil return nil
} }
func (m *MessageHandler) addActivityCenterNotification(state *ReceivedMessageState, notification *ActivityCenterNotification) error { func (m *Messenger) addActivityCenterNotification(state *ReceivedMessageState, notification *ActivityCenterNotification) error {
err := m.persistence.SaveActivityCenterNotification(notification) err := m.persistence.SaveActivityCenterNotification(notification)
if err != nil { if err != nil {
m.logger.Warn("failed to save notification", zap.Error(err)) m.logger.Warn("failed to save notification", zap.Error(err))
@ -684,7 +660,7 @@ func (m *MessageHandler) addActivityCenterNotification(state *ReceivedMessageSta
return nil return nil
} }
func (m *MessageHandler) HandleRequestAddressForTransaction(messageState *ReceivedMessageState, command protobuf.RequestAddressForTransaction) error { func (m *Messenger) HandleRequestAddressForTransaction(messageState *ReceivedMessageState, command protobuf.RequestAddressForTransaction) error {
err := ValidateReceivedRequestAddressForTransaction(&command, messageState.CurrentMessageState.WhisperTimestamp) err := ValidateReceivedRequestAddressForTransaction(&command, messageState.CurrentMessageState.WhisperTimestamp)
if err != nil { if err != nil {
return err return err
@ -708,7 +684,7 @@ func (m *MessageHandler) HandleRequestAddressForTransaction(messageState *Receiv
return m.handleCommandMessage(messageState, message) return m.handleCommandMessage(messageState, message)
} }
func (m *MessageHandler) HandleRequestTransaction(messageState *ReceivedMessageState, command protobuf.RequestTransaction) error { func (m *Messenger) HandleRequestTransaction(messageState *ReceivedMessageState, command protobuf.RequestTransaction) error {
err := ValidateReceivedRequestTransaction(&command, messageState.CurrentMessageState.WhisperTimestamp) err := ValidateReceivedRequestTransaction(&command, messageState.CurrentMessageState.WhisperTimestamp)
if err != nil { if err != nil {
return err return err
@ -733,7 +709,7 @@ func (m *MessageHandler) HandleRequestTransaction(messageState *ReceivedMessageS
return m.handleCommandMessage(messageState, message) return m.handleCommandMessage(messageState, message)
} }
func (m *MessageHandler) HandleAcceptRequestAddressForTransaction(messageState *ReceivedMessageState, command protobuf.AcceptRequestAddressForTransaction) error { func (m *Messenger) HandleAcceptRequestAddressForTransaction(messageState *ReceivedMessageState, command protobuf.AcceptRequestAddressForTransaction) error {
err := ValidateReceivedAcceptRequestAddressForTransaction(&command, messageState.CurrentMessageState.WhisperTimestamp) err := ValidateReceivedAcceptRequestAddressForTransaction(&command, messageState.CurrentMessageState.WhisperTimestamp)
if err != nil { if err != nil {
return err return err
@ -783,7 +759,7 @@ func (m *MessageHandler) HandleAcceptRequestAddressForTransaction(messageState *
return m.handleCommandMessage(messageState, initialMessage) return m.handleCommandMessage(messageState, initialMessage)
} }
func (m *MessageHandler) HandleSendTransaction(messageState *ReceivedMessageState, command protobuf.SendTransaction) error { func (m *Messenger) HandleSendTransaction(messageState *ReceivedMessageState, command protobuf.SendTransaction) error {
err := ValidateReceivedSendTransaction(&command, messageState.CurrentMessageState.WhisperTimestamp) err := ValidateReceivedSendTransaction(&command, messageState.CurrentMessageState.WhisperTimestamp)
if err != nil { if err != nil {
return err return err
@ -803,7 +779,7 @@ func (m *MessageHandler) HandleSendTransaction(messageState *ReceivedMessageStat
return m.persistence.SaveTransactionToValidate(transactionToValidate) return m.persistence.SaveTransactionToValidate(transactionToValidate)
} }
func (m *MessageHandler) HandleDeclineRequestAddressForTransaction(messageState *ReceivedMessageState, command protobuf.DeclineRequestAddressForTransaction) error { func (m *Messenger) HandleDeclineRequestAddressForTransaction(messageState *ReceivedMessageState, command protobuf.DeclineRequestAddressForTransaction) error {
err := ValidateReceivedDeclineRequestAddressForTransaction(&command, messageState.CurrentMessageState.WhisperTimestamp) err := ValidateReceivedDeclineRequestAddressForTransaction(&command, messageState.CurrentMessageState.WhisperTimestamp)
if err != nil { if err != nil {
return err return err
@ -844,7 +820,7 @@ func (m *MessageHandler) HandleDeclineRequestAddressForTransaction(messageState
return m.handleCommandMessage(messageState, oldMessage) return m.handleCommandMessage(messageState, oldMessage)
} }
func (m *MessageHandler) HandleDeclineRequestTransaction(messageState *ReceivedMessageState, command protobuf.DeclineRequestTransaction) error { func (m *Messenger) HandleDeclineRequestTransaction(messageState *ReceivedMessageState, command protobuf.DeclineRequestTransaction) error {
err := ValidateReceivedDeclineRequestTransaction(&command, messageState.CurrentMessageState.WhisperTimestamp) err := ValidateReceivedDeclineRequestTransaction(&command, messageState.CurrentMessageState.WhisperTimestamp)
if err != nil { if err != nil {
return err return err
@ -885,7 +861,7 @@ func (m *MessageHandler) HandleDeclineRequestTransaction(messageState *ReceivedM
return m.handleCommandMessage(messageState, oldMessage) return m.handleCommandMessage(messageState, oldMessage)
} }
func (m *MessageHandler) matchChatEntity(chatEntity common.ChatEntity, chats *chatMap, contacts *contactMap, timesource common.TimeSource) (*Chat, error) { func (m *Messenger) matchChatEntity(chatEntity common.ChatEntity, chats *chatMap, contacts *contactMap, timesource common.TimeSource) (*Chat, error) {
if chatEntity.GetSigPubKey() == nil { if chatEntity.GetSigPubKey() == nil {
m.logger.Error("public key can't be empty") m.logger.Error("public key can't be empty")
return nil, errors.New("received a chatEntity with empty public key") return nil, errors.New("received a chatEntity with empty public key")
@ -999,7 +975,7 @@ func (m *MessageHandler) matchChatEntity(chatEntity common.ChatEntity, chats *ch
} }
} }
func (m *MessageHandler) messageExists(messageID string, existingMessagesMap map[string]bool) (bool, error) { func (m *Messenger) messageExists(messageID string, existingMessagesMap map[string]bool) (bool, error) {
if _, ok := existingMessagesMap[messageID]; ok { if _, ok := existingMessagesMap[messageID]; ok {
return true, nil return true, nil
} }
@ -1018,7 +994,7 @@ func (m *MessageHandler) messageExists(messageID string, existingMessagesMap map
return false, nil return false, nil
} }
func (m *MessageHandler) HandleEmojiReaction(state *ReceivedMessageState, pbEmojiR protobuf.EmojiReaction) error { func (m *Messenger) HandleEmojiReaction(state *ReceivedMessageState, pbEmojiR protobuf.EmojiReaction) error {
logger := m.logger.With(zap.String("site", "HandleEmojiReaction")) logger := m.logger.With(zap.String("site", "HandleEmojiReaction"))
if err := ValidateReceivedEmojiReaction(&pbEmojiR, state.Timesource.GetCurrentTime()); err != nil { if err := ValidateReceivedEmojiReaction(&pbEmojiR, state.Timesource.GetCurrentTime()); err != nil {
logger.Error("invalid emoji reaction", zap.Error(err)) logger.Error("invalid emoji reaction", zap.Error(err))
@ -1072,7 +1048,7 @@ func (m *MessageHandler) HandleEmojiReaction(state *ReceivedMessageState, pbEmoj
return nil return nil
} }
func (m *MessageHandler) HandleGroupChatInvitation(state *ReceivedMessageState, pbGHInvitations protobuf.GroupChatInvitation) error { func (m *Messenger) HandleGroupChatInvitation(state *ReceivedMessageState, pbGHInvitations protobuf.GroupChatInvitation) error {
allowed, err := m.isMessageAllowedFrom(state.AllContacts, state.CurrentMessageState.Contact.ID, nil) allowed, err := m.isMessageAllowedFrom(state.AllContacts, state.CurrentMessageState.Contact.ID, nil)
if err != nil { if err != nil {
return err return err
@ -1124,7 +1100,7 @@ func (m *MessageHandler) HandleGroupChatInvitation(state *ReceivedMessageState,
// HandleChatIdentity handles an incoming protobuf.ChatIdentity // HandleChatIdentity handles an incoming protobuf.ChatIdentity
// extracts contact information stored in the protobuf and adds it to the user's contact for update. // extracts contact information stored in the protobuf and adds it to the user's contact for update.
func (m *MessageHandler) HandleChatIdentity(state *ReceivedMessageState, ci protobuf.ChatIdentity) error { func (m *Messenger) HandleChatIdentity(state *ReceivedMessageState, ci protobuf.ChatIdentity) error {
logger := m.logger.With(zap.String("site", "HandleChatIdentity")) logger := m.logger.With(zap.String("site", "HandleChatIdentity"))
allowed, err := m.isMessageAllowedFrom(state.AllContacts, state.CurrentMessageState.Contact.ID, nil) allowed, err := m.isMessageAllowedFrom(state.AllContacts, state.CurrentMessageState.Contact.ID, nil)
if err != nil { if err != nil {
@ -1156,13 +1132,13 @@ func (m *MessageHandler) HandleChatIdentity(state *ReceivedMessageState, ci prot
return nil return nil
} }
func (m *MessageHandler) checkForEdits(message *common.Message) error { func (m *Messenger) checkForEdits(message *common.Message) error {
// Check for any pending edit // Check for any pending edit
// If any pending edits are available and valid, apply them // If any pending edits are available and valid, apply them
return nil return nil
} }
func (m *MessageHandler) isMessageAllowedFrom(allContacts *contactMap, publicKey string, chat *Chat) (bool, error) { func (m *Messenger) isMessageAllowedFrom(allContacts *contactMap, publicKey string, chat *Chat) (bool, error) {
onlyFromContacts, err := m.settings.GetMessagesFromContactsOnly() onlyFromContacts, err := m.settings.GetMessagesFromContactsOnly()
if err != nil { if err != nil {
return false, err return false, err
@ -1197,7 +1173,7 @@ func (m *MessageHandler) isMessageAllowedFrom(allContacts *contactMap, publicKey
return contact.IsAdded(), nil return contact.IsAdded(), nil
} }
func (m *MessageHandler) updateUnviewedCounts(chat *Chat, mentioned bool) { func (m *Messenger) updateUnviewedCounts(chat *Chat, mentioned bool) {
chat.UnviewedMessagesCount++ chat.UnviewedMessagesCount++
if mentioned { if mentioned {
chat.UnviewedMentionsCount++ chat.UnviewedMentionsCount++

View File

@ -82,7 +82,6 @@ type Messenger struct {
transport *transport.Transport transport *transport.Transport
encryptor *encryption.Protocol encryptor *encryption.Protocol
sender *common.MessageSender sender *common.MessageSender
handler *MessageHandler
ensVerifier *ens.Verifier ensVerifier *ens.Verifier
pushNotificationClient *pushnotificationclient.Client pushNotificationClient *pushnotificationclient.Client
pushNotificationServer *pushnotificationserver.Server pushNotificationServer *pushnotificationserver.Server
@ -291,8 +290,6 @@ func NewMessenger(
return nil, err return nil, err
} }
settings := accounts.NewDB(database) settings := accounts.NewDB(database)
handler := newMessageHandler(identity, logger, sqlitePersistence, communitiesManager, transp, ensVerifier, settings)
messenger = &Messenger{ messenger = &Messenger{
config: &c, config: &c,
node: node, node: node,
@ -301,7 +298,6 @@ func NewMessenger(
transport: transp, transport: transp,
encryptor: encryptionProtocol, encryptor: encryptionProtocol,
sender: sender, sender: sender,
handler: handler,
pushNotificationClient: pushNotificationClient, pushNotificationClient: pushNotificationClient,
pushNotificationServer: pushNotificationServer, pushNotificationServer: pushNotificationServer,
communitiesManager: communitiesManager, communitiesManager: communitiesManager,
@ -2508,7 +2504,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
// Don't process duplicates // Don't process duplicates
messageID := types.EncodeHex(msg.ID) messageID := types.EncodeHex(msg.ID)
exists, err := m.handler.messageExists(messageID, messageState.ExistingMessagesMap) exists, err := m.messageExists(messageID, messageState.ExistingMessagesMap)
if err != nil { if err != nil {
logger.Warn("failed to check message exists", zap.Error(err)) logger.Warn("failed to check message exists", zap.Error(err))
} }
@ -2547,7 +2543,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
rawMembershipUpdate := msg.ParsedMessage.Interface().(protobuf.MembershipUpdateMessage) rawMembershipUpdate := msg.ParsedMessage.Interface().(protobuf.MembershipUpdateMessage)
chat, _ := messageState.AllChats.Load(rawMembershipUpdate.ChatId) chat, _ := messageState.AllChats.Load(rawMembershipUpdate.ChatId)
err = m.handler.HandleMembershipUpdate(messageState, chat, rawMembershipUpdate, m.systemMessagesTranslations) err = m.HandleMembershipUpdate(messageState, chat, rawMembershipUpdate, m.systemMessagesTranslations)
if err != nil { if err != nil {
logger.Warn("failed to handle MembershipUpdate", zap.Error(err)) logger.Warn("failed to handle MembershipUpdate", zap.Error(err))
allMessagesProcessed = false allMessagesProcessed = false
@ -2557,7 +2553,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.ChatMessage: case protobuf.ChatMessage:
logger.Debug("Handling ChatMessage") logger.Debug("Handling ChatMessage")
messageState.CurrentMessageState.Message = msg.ParsedMessage.Interface().(protobuf.ChatMessage) messageState.CurrentMessageState.Message = msg.ParsedMessage.Interface().(protobuf.ChatMessage)
err = m.handler.HandleChatMessage(messageState) err = m.HandleChatMessage(messageState)
if err != nil { if err != nil {
logger.Warn("failed to handle ChatMessage", zap.Error(err)) logger.Warn("failed to handle ChatMessage", zap.Error(err))
allMessagesProcessed = false allMessagesProcessed = false
@ -2582,7 +2578,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.PinMessage: case protobuf.PinMessage:
pinMessage := msg.ParsedMessage.Interface().(protobuf.PinMessage) pinMessage := msg.ParsedMessage.Interface().(protobuf.PinMessage)
err = m.handler.HandlePinMessage(messageState, pinMessage) err = m.HandlePinMessage(messageState, pinMessage)
if err != nil { if err != nil {
logger.Warn("failed to handle PinMessage", zap.Error(err)) logger.Warn("failed to handle PinMessage", zap.Error(err))
allMessagesProcessed = false allMessagesProcessed = false
@ -2596,7 +2592,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
} }
p := msg.ParsedMessage.Interface().(protobuf.PairInstallation) p := msg.ParsedMessage.Interface().(protobuf.PairInstallation)
logger.Debug("Handling PairInstallation", zap.Any("message", p)) logger.Debug("Handling PairInstallation", zap.Any("message", p))
err = m.handler.HandlePairInstallation(messageState, p) err = m.HandlePairInstallation(messageState, p)
if err != nil { if err != nil {
logger.Warn("failed to handle PairInstallation", zap.Error(err)) logger.Warn("failed to handle PairInstallation", zap.Error(err))
allMessagesProcessed = false allMessagesProcessed = false
@ -2611,7 +2607,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
p := msg.ParsedMessage.Interface().(protobuf.SyncInstallationContact) p := msg.ParsedMessage.Interface().(protobuf.SyncInstallationContact)
logger.Debug("Handling SyncInstallationContact", zap.Any("message", p)) logger.Debug("Handling SyncInstallationContact", zap.Any("message", p))
err = m.handler.HandleSyncInstallationContact(messageState, p) err = m.HandleSyncInstallationContact(messageState, p)
if err != nil { if err != nil {
logger.Warn("failed to handle SyncInstallationContact", zap.Error(err)) logger.Warn("failed to handle SyncInstallationContact", zap.Error(err))
allMessagesProcessed = false allMessagesProcessed = false
@ -2626,7 +2622,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
p := msg.ParsedMessage.Interface().(protobuf.SyncInstallationPublicChat) p := msg.ParsedMessage.Interface().(protobuf.SyncInstallationPublicChat)
logger.Debug("Handling SyncInstallationPublicChat", zap.Any("message", p)) logger.Debug("Handling SyncInstallationPublicChat", zap.Any("message", p))
addedChat := m.handler.HandleSyncInstallationPublicChat(messageState, p) addedChat := m.HandleSyncInstallationPublicChat(messageState, p)
// We join and re-register as we want to receive mentions from the newly joined public chat // We join and re-register as we want to receive mentions from the newly joined public chat
if addedChat != nil { if addedChat != nil {
@ -2649,7 +2645,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.RequestAddressForTransaction: case protobuf.RequestAddressForTransaction:
command := msg.ParsedMessage.Interface().(protobuf.RequestAddressForTransaction) command := msg.ParsedMessage.Interface().(protobuf.RequestAddressForTransaction)
logger.Debug("Handling RequestAddressForTransaction", zap.Any("message", command)) logger.Debug("Handling RequestAddressForTransaction", zap.Any("message", command))
err = m.handler.HandleRequestAddressForTransaction(messageState, command) err = m.HandleRequestAddressForTransaction(messageState, command)
if err != nil { if err != nil {
logger.Warn("failed to handle RequestAddressForTransaction", zap.Error(err)) logger.Warn("failed to handle RequestAddressForTransaction", zap.Error(err))
allMessagesProcessed = false allMessagesProcessed = false
@ -2659,7 +2655,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.SendTransaction: case protobuf.SendTransaction:
command := msg.ParsedMessage.Interface().(protobuf.SendTransaction) command := msg.ParsedMessage.Interface().(protobuf.SendTransaction)
logger.Debug("Handling SendTransaction", zap.Any("message", command)) logger.Debug("Handling SendTransaction", zap.Any("message", command))
err = m.handler.HandleSendTransaction(messageState, command) err = m.HandleSendTransaction(messageState, command)
if err != nil { if err != nil {
logger.Warn("failed to handle SendTransaction", zap.Error(err)) logger.Warn("failed to handle SendTransaction", zap.Error(err))
allMessagesProcessed = false allMessagesProcessed = false
@ -2669,7 +2665,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.AcceptRequestAddressForTransaction: case protobuf.AcceptRequestAddressForTransaction:
command := msg.ParsedMessage.Interface().(protobuf.AcceptRequestAddressForTransaction) command := msg.ParsedMessage.Interface().(protobuf.AcceptRequestAddressForTransaction)
logger.Debug("Handling AcceptRequestAddressForTransaction") logger.Debug("Handling AcceptRequestAddressForTransaction")
err = m.handler.HandleAcceptRequestAddressForTransaction(messageState, command) err = m.HandleAcceptRequestAddressForTransaction(messageState, command)
if err != nil { if err != nil {
logger.Warn("failed to handle AcceptRequestAddressForTransaction", zap.Error(err)) logger.Warn("failed to handle AcceptRequestAddressForTransaction", zap.Error(err))
allMessagesProcessed = false allMessagesProcessed = false
@ -2679,7 +2675,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.DeclineRequestAddressForTransaction: case protobuf.DeclineRequestAddressForTransaction:
command := msg.ParsedMessage.Interface().(protobuf.DeclineRequestAddressForTransaction) command := msg.ParsedMessage.Interface().(protobuf.DeclineRequestAddressForTransaction)
logger.Debug("Handling DeclineRequestAddressForTransaction") logger.Debug("Handling DeclineRequestAddressForTransaction")
err = m.handler.HandleDeclineRequestAddressForTransaction(messageState, command) err = m.HandleDeclineRequestAddressForTransaction(messageState, command)
if err != nil { if err != nil {
logger.Warn("failed to handle DeclineRequestAddressForTransaction", zap.Error(err)) logger.Warn("failed to handle DeclineRequestAddressForTransaction", zap.Error(err))
allMessagesProcessed = false allMessagesProcessed = false
@ -2689,7 +2685,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.DeclineRequestTransaction: case protobuf.DeclineRequestTransaction:
command := msg.ParsedMessage.Interface().(protobuf.DeclineRequestTransaction) command := msg.ParsedMessage.Interface().(protobuf.DeclineRequestTransaction)
logger.Debug("Handling DeclineRequestTransaction") logger.Debug("Handling DeclineRequestTransaction")
err = m.handler.HandleDeclineRequestTransaction(messageState, command) err = m.HandleDeclineRequestTransaction(messageState, command)
if err != nil { if err != nil {
logger.Warn("failed to handle DeclineRequestTransaction", zap.Error(err)) logger.Warn("failed to handle DeclineRequestTransaction", zap.Error(err))
allMessagesProcessed = false allMessagesProcessed = false
@ -2700,7 +2696,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
command := msg.ParsedMessage.Interface().(protobuf.RequestTransaction) command := msg.ParsedMessage.Interface().(protobuf.RequestTransaction)
logger.Debug("Handling RequestTransaction") logger.Debug("Handling RequestTransaction")
err = m.handler.HandleRequestTransaction(messageState, command) err = m.HandleRequestTransaction(messageState, command)
if err != nil { if err != nil {
logger.Warn("failed to handle RequestTransaction", zap.Error(err)) logger.Warn("failed to handle RequestTransaction", zap.Error(err))
allMessagesProcessed = false allMessagesProcessed = false
@ -2710,7 +2706,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.ContactUpdate: case protobuf.ContactUpdate:
logger.Debug("Handling ContactUpdate") logger.Debug("Handling ContactUpdate")
contactUpdate := msg.ParsedMessage.Interface().(protobuf.ContactUpdate) contactUpdate := msg.ParsedMessage.Interface().(protobuf.ContactUpdate)
err = m.handler.HandleContactUpdate(messageState, contactUpdate) err = m.HandleContactUpdate(messageState, contactUpdate)
if err != nil { if err != nil {
logger.Warn("failed to handle ContactUpdate", zap.Error(err)) logger.Warn("failed to handle ContactUpdate", zap.Error(err))
allMessagesProcessed = false allMessagesProcessed = false
@ -2747,7 +2743,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
if cca.ChatIdentity != nil { if cca.ChatIdentity != nil {
logger.Debug("Received ContactCodeAdvertisement ChatIdentity") logger.Debug("Received ContactCodeAdvertisement ChatIdentity")
err = m.handler.HandleChatIdentity(messageState, *cca.ChatIdentity) err = m.HandleChatIdentity(messageState, *cca.ChatIdentity)
if err != nil { if err != nil {
allMessagesProcessed = false allMessagesProcessed = false
logger.Warn("failed to handle ContactCodeAdvertisement ChatIdentity", zap.Error(err)) logger.Warn("failed to handle ContactCodeAdvertisement ChatIdentity", zap.Error(err))
@ -2807,7 +2803,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
continue continue
case protobuf.EmojiReaction: case protobuf.EmojiReaction:
logger.Debug("Handling EmojiReaction") logger.Debug("Handling EmojiReaction")
err = m.handler.HandleEmojiReaction(messageState, msg.ParsedMessage.Interface().(protobuf.EmojiReaction)) err = m.HandleEmojiReaction(messageState, msg.ParsedMessage.Interface().(protobuf.EmojiReaction))
if err != nil { if err != nil {
logger.Warn("failed to handle EmojiReaction", zap.Error(err)) logger.Warn("failed to handle EmojiReaction", zap.Error(err))
allMessagesProcessed = false allMessagesProcessed = false
@ -2815,14 +2811,14 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
} }
case protobuf.GroupChatInvitation: case protobuf.GroupChatInvitation:
logger.Debug("Handling GroupChatInvitation") logger.Debug("Handling GroupChatInvitation")
err = m.handler.HandleGroupChatInvitation(messageState, msg.ParsedMessage.Interface().(protobuf.GroupChatInvitation)) err = m.HandleGroupChatInvitation(messageState, msg.ParsedMessage.Interface().(protobuf.GroupChatInvitation))
if err != nil { if err != nil {
logger.Warn("failed to handle GroupChatInvitation", zap.Error(err)) logger.Warn("failed to handle GroupChatInvitation", zap.Error(err))
allMessagesProcessed = false allMessagesProcessed = false
continue continue
} }
case protobuf.ChatIdentity: case protobuf.ChatIdentity:
err = m.handler.HandleChatIdentity(messageState, msg.ParsedMessage.Interface().(protobuf.ChatIdentity)) err = m.HandleChatIdentity(messageState, msg.ParsedMessage.Interface().(protobuf.ChatIdentity))
if err != nil { if err != nil {
logger.Warn("failed to handle ChatIdentity", zap.Error(err)) logger.Warn("failed to handle ChatIdentity", zap.Error(err))
allMessagesProcessed = false allMessagesProcessed = false
@ -2848,7 +2844,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.CommunityInvitation: case protobuf.CommunityInvitation:
logger.Debug("Handling CommunityInvitation") logger.Debug("Handling CommunityInvitation")
invitation := msg.ParsedMessage.Interface().(protobuf.CommunityInvitation) invitation := msg.ParsedMessage.Interface().(protobuf.CommunityInvitation)
err = m.handler.HandleCommunityInvitation(messageState, publicKey, invitation, invitation.CommunityDescription) err = m.HandleCommunityInvitation(messageState, publicKey, invitation, invitation.CommunityDescription)
if err != nil { if err != nil {
logger.Warn("failed to handle CommunityInvitation", zap.Error(err)) logger.Warn("failed to handle CommunityInvitation", zap.Error(err))
allMessagesProcessed = false allMessagesProcessed = false
@ -2857,7 +2853,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.CommunityRequestToJoin: case protobuf.CommunityRequestToJoin:
logger.Debug("Handling CommunityRequestToJoin") logger.Debug("Handling CommunityRequestToJoin")
request := msg.ParsedMessage.Interface().(protobuf.CommunityRequestToJoin) request := msg.ParsedMessage.Interface().(protobuf.CommunityRequestToJoin)
err = m.handler.HandleCommunityRequestToJoin(messageState, publicKey, request) err = m.HandleCommunityRequestToJoin(messageState, publicKey, request)
if err != nil { if err != nil {
logger.Warn("failed to handle CommunityRequestToJoin", zap.Error(err)) logger.Warn("failed to handle CommunityRequestToJoin", zap.Error(err))
continue continue

View File

@ -8,7 +8,6 @@ import (
"errors" "errors"
"io/ioutil" "io/ioutil"
"math/big" "math/big"
"strconv"
"strings" "strings"
"testing" "testing"
"time" "time"
@ -53,10 +52,6 @@ func TestMessengerWithDataSyncEnabledSuite(t *testing.T) {
suite.Run(t, &MessengerSuite{enableDataSync: true}) suite.Run(t, &MessengerSuite{enableDataSync: true})
} }
func TestMessageHandlerSuite(t *testing.T) {
suite.Run(t, new(MessageHandlerSuite))
}
type MessengerSuite struct { type MessengerSuite struct {
suite.Suite suite.Suite
@ -2397,157 +2392,12 @@ func (s *MessengerSuite) TestResendExpiredEmojis() {
s.Equal(2, rawMessage.SendCount) s.Equal(2, rawMessage.SendCount)
} }
type MessageHandlerSuite struct {
suite.Suite
messageHandler *MessageHandler
logger *zap.Logger
}
func (s *MessageHandlerSuite) SetupTest() {
s.logger = tt.MustCreateTestLogger()
privateKey, err := crypto.GenerateKey()
s.Require().NoError(err)
s.messageHandler = &MessageHandler{
identity: privateKey,
logger: s.logger,
}
}
func (s *MessageHandlerSuite) TearDownTest() {
_ = s.logger.Sync()
}
type testTimeSource struct{} type testTimeSource struct{}
func (t *testTimeSource) GetCurrentTime() uint64 { func (t *testTimeSource) GetCurrentTime() uint64 {
return uint64(time.Now().Unix()) return uint64(time.Now().Unix())
} }
func (s *MessageHandlerSuite) TestRun() {
key1, err := crypto.GenerateKey()
s.Require().NoError(err)
key2, err := crypto.GenerateKey()
s.Require().NoError(err)
testCases := []struct {
Name string
Error bool
Chat *Chat // Chat to create
Message common.Message
SigPubKey *ecdsa.PublicKey
ExpectedChatID string
}{
{
Name: "Public chat",
Chat: CreatePublicChat("test-chat", &testTimeSource{}),
Message: common.Message{
ChatMessage: protobuf.ChatMessage{
ChatId: "test-chat",
MessageType: protobuf.MessageType_PUBLIC_GROUP,
Text: "test-text"},
},
SigPubKey: &key1.PublicKey,
ExpectedChatID: "test-chat",
},
{
Name: "Private message from myself with existing chat",
Chat: CreateOneToOneChat("test-private-chat", &key1.PublicKey, &testTimeSource{}),
Message: common.Message{
ChatMessage: protobuf.ChatMessage{
ChatId: "test-chat",
MessageType: protobuf.MessageType_ONE_TO_ONE,
Text: "test-text"},
},
SigPubKey: &key1.PublicKey,
ExpectedChatID: oneToOneChatID(&key1.PublicKey),
},
{
Name: "Private message from other with existing chat",
Chat: CreateOneToOneChat("test-private-chat", &key2.PublicKey, &testTimeSource{}),
Message: common.Message{
ChatMessage: protobuf.ChatMessage{
ChatId: "test-chat",
MessageType: protobuf.MessageType_ONE_TO_ONE,
Text: "test-text"},
},
SigPubKey: &key2.PublicKey,
ExpectedChatID: oneToOneChatID(&key2.PublicKey),
},
{
Name: "Private message from myself without chat",
Message: common.Message{
ChatMessage: protobuf.ChatMessage{
ChatId: "test-chat",
MessageType: protobuf.MessageType_ONE_TO_ONE,
Text: "test-text"},
},
SigPubKey: &key1.PublicKey,
ExpectedChatID: oneToOneChatID(&key1.PublicKey),
},
{
Name: "Private message from other without chat",
Message: common.Message{
ChatMessage: protobuf.ChatMessage{
ChatId: "test-chat",
MessageType: protobuf.MessageType_ONE_TO_ONE,
Text: "test-text"},
},
SigPubKey: &key2.PublicKey,
ExpectedChatID: oneToOneChatID(&key2.PublicKey),
},
{
Name: "Private message without public key",
SigPubKey: nil,
Error: true,
},
{
Name: "Private group message",
Message: common.Message{
ChatMessage: protobuf.ChatMessage{
ChatId: "non-existing-chat",
MessageType: protobuf.MessageType_PRIVATE_GROUP,
Text: "test-text"},
},
Error: true,
SigPubKey: &key2.PublicKey,
},
}
for idx, tc := range testCases {
s.Run(tc.Name, func() {
chatsMap := new(chatMap)
contactsMap := new(contactMap)
if tc.Chat != nil && tc.Chat.ID != "" {
chatsMap.Store(tc.Chat.ID, tc.Chat)
}
message := tc.Message
message.SigPubKey = tc.SigPubKey
// ChatID is not set at the beginning.
s.Empty(message.LocalChatID)
message.ID = strconv.Itoa(idx) // manually set the ID because messages does not go through messageSender
chat, err := s.messageHandler.matchChatEntity(&message, chatsMap, contactsMap, &testTimeSource{})
if tc.Error {
s.Require().Error(err)
} else {
s.Require().NoError(err)
if tc.ExpectedChatID != "" {
s.Require().NotNil(chat)
s.Require().Equal(tc.ExpectedChatID, chat.ID)
}
}
})
}
}
func WaitOnMessengerResponse(m *Messenger, condition func(*MessengerResponse) bool, errorMessage string) (*MessengerResponse, error) { func WaitOnMessengerResponse(m *Messenger, condition func(*MessengerResponse) bool, errorMessage string) (*MessengerResponse, error) {
var response *MessengerResponse var response *MessengerResponse
return response, tt.RetryWithBackOff(func() error { return response, tt.RetryWithBackOff(func() error {