From 1f64bf1cfe60ac707e31b835b01eb200695bf1a4 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 24 Aug 2022 08:06:48 -0400 Subject: [PATCH] feat: log raw messages to csv for debugging (only if explicitly enabled) (#2737) --- params/config.go | 2 + protocol/messenger.go | 154 ++++++++++++++++++++++++++++------- protocol/messenger_config.go | 9 ++ services/ext/service.go | 1 + 4 files changed, 137 insertions(+), 29 deletions(-) diff --git a/params/config.go b/params/config.go index f0e1eb567..6f999cf8d 100644 --- a/params/config.go +++ b/params/config.go @@ -498,6 +498,8 @@ type NodeConfig struct { // PushNotificationServerConfig is the config for the push notification server PushNotificationServerConfig pushnotificationserver.Config `json:"PushNotificationServerConfig"` + + OutputMessageCSVEnabled bool } type Network struct { diff --git a/protocol/messenger.go b/protocol/messenger.go index 883811911..c8131a484 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -6,6 +6,7 @@ import ( "crypto/ecdsa" "database/sql" "encoding/hex" + "encoding/json" "fmt" "io/ioutil" "math" @@ -95,22 +96,26 @@ var messageCacheIntervalMs uint64 = 1000 * 60 * 60 * 48 // Similarly, it needs to expose an interface to manage // mailservers because they can also be managed by the user. type Messenger struct { - node types.Node - server *p2p.Server - peerStore *mailservers.PeerStore - config *config - identity *ecdsa.PrivateKey - persistence *sqlitePersistence - transport *transport.Transport - encryptor *encryption.Protocol - sender *common.MessageSender - ensVerifier *ens.Verifier - anonMetricsClient *anonmetrics.Client - anonMetricsServer *anonmetrics.Server - pushNotificationClient *pushnotificationclient.Client - pushNotificationServer *pushnotificationserver.Server - communitiesManager *communities.Manager - logger *zap.Logger + node types.Node + server *p2p.Server + peerStore *mailservers.PeerStore + config *config + identity *ecdsa.PrivateKey + persistence *sqlitePersistence + transport *transport.Transport + encryptor *encryption.Protocol + sender *common.MessageSender + ensVerifier *ens.Verifier + anonMetricsClient *anonmetrics.Client + anonMetricsServer *anonmetrics.Server + pushNotificationClient *pushnotificationclient.Client + pushNotificationServer *pushnotificationserver.Server + communitiesManager *communities.Manager + logger *zap.Logger + + outputCSV bool + csvFile *os.File + verifyTransactionClient EthClient featureFlags common.FeatureFlags shutdownTasks []func() error @@ -464,6 +469,22 @@ func NewMessenger( logger: logger, } + if c.outputMessagesCSV { + messenger.outputCSV = c.outputMessagesCSV + csvFile, err := os.Create("messages-" + fmt.Sprint(time.Now().Unix()) + ".csv") + if err != nil { + return nil, err + } + + _, err = csvFile.Write([]byte("timestamp\tmessageID\tfrom\ttopic\tchatID\tmessageType\tmessage\n")) + if err != nil { + return nil, err + } + + messenger.csvFile = csvFile + messenger.shutdownTasks = append(messenger.shutdownTasks, csvFile.Close) + } + if anonMetricsClient != nil { messenger.shutdownTasks = append(messenger.shutdownTasks, anonMetricsClient.Stop) } @@ -3558,6 +3579,25 @@ func (m *Messenger) buildMessageState() *ReceivedMessageState { } } +func (m *Messenger) outputToCSV(timestamp uint32, messageID types.HexBytes, from string, topic types.TopicType, chatID string, msgType protobuf.ApplicationMetadataMessage_Type, parsedMessage interface{}) { + if !m.outputCSV { + return + } + + msgJSON, err := json.Marshal(parsedMessage) + if err != nil { + m.logger.Error("could not marshall message", zap.Error(err)) + return + } + + line := fmt.Sprintf("%d\t%s\t%s\t%s\t%s\t%s\t%s\n", timestamp, messageID.String(), from, topic.String(), chatID, msgType, msgJSON) + _, err = m.csvFile.Write([]byte(line)) + if err != nil { + m.logger.Error("could not write to csv", zap.Error(err)) + return + } +} + func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filter][]*types.Message, storeWakuMessages bool) (*MessengerResponse, error) { m.handleMessagesMutex.Lock() @@ -3613,8 +3653,9 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte logger.Warn("failed to handle shared secrets") } - // Check for messages from blocked users senderID := contactIDFromPublicKey(publicKey) + + // Check for messages from blocked users if contact, ok := messageState.AllContacts.Load(senderID); ok && contact.Blocked { continue } @@ -3653,10 +3694,12 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte if msg.ParsedMessage != nil { logger.Debug("Handling parsed message") + switch msg.ParsedMessage.Interface().(type) { case protobuf.MembershipUpdateMessage: logger.Debug("Handling MembershipUpdateMessage") rawMembershipUpdate := msg.ParsedMessage.Interface().(protobuf.MembershipUpdateMessage) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, rawMembershipUpdate) chat, _ := messageState.AllChats.Load(rawMembershipUpdate.ChatId) err = m.HandleMembershipUpdate(messageState, chat, rawMembershipUpdate, m.systemMessagesTranslations) @@ -3669,6 +3712,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte case protobuf.ChatMessage: logger.Debug("Handling ChatMessage") messageState.CurrentMessageState.Message = msg.ParsedMessage.Interface().(protobuf.ChatMessage) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, messageState.CurrentMessageState.Message) err = m.HandleChatMessage(messageState) if err != nil { logger.Warn("failed to handle ChatMessage", zap.Error(err)) @@ -3679,6 +3723,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte case protobuf.EditMessage: logger.Debug("Handling EditMessage") editProto := msg.ParsedMessage.Interface().(protobuf.EditMessage) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, editProto) editMessage := EditMessage{ EditMessage: editProto, From: contact.ID, @@ -3695,6 +3740,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte case protobuf.DeleteMessage: logger.Debug("Handling DeleteMessage") deleteProto := msg.ParsedMessage.Interface().(protobuf.DeleteMessage) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, deleteProto) deleteMessage := DeleteMessage{ DeleteMessage: deleteProto, From: contact.ID, @@ -3711,6 +3757,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte case protobuf.PinMessage: pinMessage := msg.ParsedMessage.Interface().(protobuf.PinMessage) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, pinMessage) err = m.HandlePinMessage(messageState, pinMessage) if err != nil { logger.Warn("failed to handle PinMessage", zap.Error(err)) @@ -3724,6 +3771,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte continue } p := msg.ParsedMessage.Interface().(protobuf.PairInstallation) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, p) logger.Debug("Handling PairInstallation", zap.Any("message", p)) err = m.HandlePairInstallation(messageState, p) if err != nil { @@ -3734,6 +3782,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte case protobuf.StatusUpdate: p := msg.ParsedMessage.Interface().(protobuf.StatusUpdate) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, p) logger.Debug("Handling StatusUpdate", zap.Any("message", p)) err = m.HandleStatusUpdate(messageState, p) if err != nil { @@ -3744,6 +3793,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte case protobuf.SyncInstallationContact: logger.Warn("SyncInstallationContact is not supported") + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, msg.ParsedMessage.Interface().(protobuf.SyncInstallationContact)) continue case protobuf.SyncInstallationContactV2: @@ -3753,6 +3803,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte } p := msg.ParsedMessage.Interface().(protobuf.SyncInstallationContactV2) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, p) logger.Debug("Handling SyncInstallationContact", zap.Any("message", p)) err = m.HandleSyncInstallationContact(messageState, p) if err != nil { @@ -3768,6 +3819,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte } p := msg.ParsedMessage.Interface().(protobuf.SyncProfilePictures) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, p) logger.Debug("Handling SyncProfilePicture", zap.Any("message", p)) err = m.HandleSyncProfilePictures(messageState, p) if err != nil { @@ -3783,6 +3835,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte } p := msg.ParsedMessage.Interface().(protobuf.SyncBookmark) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, p) logger.Debug("Handling SyncBookmark", zap.Any("message", p)) err = m.handleSyncBookmark(messageState, p) if err != nil { @@ -3798,6 +3851,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte } p := msg.ParsedMessage.Interface().(protobuf.SyncClearHistory) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, p) logger.Debug("Handling SyncClearHistory", zap.Any("message", p)) err = m.handleSyncClearHistory(messageState, p) if err != nil { @@ -3811,6 +3865,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte continue } p := msg.ParsedMessage.Interface().(protobuf.SyncCommunitySettings) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, p) logger.Debug("Handling SyncCommunitySettings", zap.Any("message", p)) err = m.handleSyncCommunitySettings(messageState, p) if err != nil { @@ -3856,6 +3911,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte } p := msg.ParsedMessage.Interface().(protobuf.Backup) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, p) logger.Debug("Handling Backup", zap.Any("message", p)) err = m.HandleBackup(messageState, p) if err != nil { @@ -3871,6 +3927,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte } p := msg.ParsedMessage.Interface().(protobuf.SyncInstallationPublicChat) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, p) logger.Debug("Handling SyncInstallationPublicChat", zap.Any("message", p)) addedChat := m.HandleSyncInstallationPublicChat(messageState, p) @@ -3891,6 +3948,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte } p := msg.ParsedMessage.Interface().(protobuf.SyncChatRemoved) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, p) logger.Debug("Handling SyncChatRemoved", zap.Any("message", p)) err := m.HandleSyncChatRemoved(messageState, p) if err != nil { @@ -3905,6 +3963,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte } p := msg.ParsedMessage.Interface().(protobuf.SyncChatMessagesRead) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, p) logger.Debug("Handling SyncChatMessagesRead", zap.Any("message", p)) err := m.HandleSyncChatMessagesRead(messageState, p) if err != nil { @@ -3919,6 +3978,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte } community := msg.ParsedMessage.Interface().(protobuf.SyncCommunity) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, community) logger.Debug("Handling SyncCommunity", zap.Any("message", community)) err = m.handleSyncCommunity(messageState, community) @@ -3935,6 +3995,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte } a := msg.ParsedMessage.Interface().(protobuf.SyncActivityCenterRead) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, a) logger.Debug("Handling SyncActivityCenterRead", zap.Any("message", a)) err = m.handleActivityCenterRead(messageState, a) @@ -3951,6 +4012,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte } a := msg.ParsedMessage.Interface().(protobuf.SyncActivityCenterAccepted) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, a) logger.Debug("Handling SyncActivityCenterAccepted", zap.Any("message", a)) err = m.handleActivityCenterAccepted(messageState, a) @@ -3967,6 +4029,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte } a := msg.ParsedMessage.Interface().(protobuf.SyncActivityCenterDismissed) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, a) logger.Debug("Handling SyncActivityCenterDismissed", zap.Any("message", a)) err = m.handleActivityCenterDismissed(messageState, a) @@ -3983,6 +4046,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte } ss := msg.ParsedMessage.Interface().(protobuf.SyncSetting) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, ss) logger.Debug("Handling SyncSetting", zap.Any("message", ss)) err := m.handleSyncSetting(messageState.Response, &ss) @@ -3994,6 +4058,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte case protobuf.RequestAddressForTransaction: command := msg.ParsedMessage.Interface().(protobuf.RequestAddressForTransaction) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, command) logger.Debug("Handling RequestAddressForTransaction", zap.Any("message", command)) err = m.HandleRequestAddressForTransaction(messageState, command) if err != nil { @@ -4004,6 +4069,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte case protobuf.SendTransaction: command := msg.ParsedMessage.Interface().(protobuf.SendTransaction) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, command) logger.Debug("Handling SendTransaction", zap.Any("message", command)) err = m.HandleSendTransaction(messageState, command) if err != nil { @@ -4014,6 +4080,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte case protobuf.AcceptRequestAddressForTransaction: command := msg.ParsedMessage.Interface().(protobuf.AcceptRequestAddressForTransaction) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, command) logger.Debug("Handling AcceptRequestAddressForTransaction") err = m.HandleAcceptRequestAddressForTransaction(messageState, command) if err != nil { @@ -4024,6 +4091,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte case protobuf.DeclineRequestAddressForTransaction: command := msg.ParsedMessage.Interface().(protobuf.DeclineRequestAddressForTransaction) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, command) logger.Debug("Handling DeclineRequestAddressForTransaction") err = m.HandleDeclineRequestAddressForTransaction(messageState, command) if err != nil { @@ -4034,6 +4102,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte case protobuf.DeclineRequestTransaction: command := msg.ParsedMessage.Interface().(protobuf.DeclineRequestTransaction) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, command) logger.Debug("Handling DeclineRequestTransaction") err = m.HandleDeclineRequestTransaction(messageState, command) if err != nil { @@ -4044,7 +4113,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte case protobuf.RequestTransaction: command := msg.ParsedMessage.Interface().(protobuf.RequestTransaction) - + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, command) logger.Debug("Handling RequestTransaction") err = m.HandleRequestTransaction(messageState, command) if err != nil { @@ -4060,6 +4129,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte } contactUpdate := msg.ParsedMessage.Interface().(protobuf.ContactUpdate) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, contactUpdate) err = m.HandleContactUpdate(messageState, contactUpdate) if err != nil { logger.Warn("failed to handle ContactUpdate", zap.Error(err)) @@ -4069,6 +4139,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte case protobuf.AcceptContactRequest: logger.Debug("Handling AcceptContactRequest") message := msg.ParsedMessage.Interface().(protobuf.AcceptContactRequest) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, message) err = m.HandleAcceptContactRequest(messageState, message) if err != nil { logger.Warn("failed to handle AcceptContactRequest", zap.Error(err)) @@ -4079,6 +4150,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte logger.Debug("Handling RetractContactRequest") message := msg.ParsedMessage.Interface().(protobuf.RetractContactRequest) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, message) err = m.HandleRetractContactRequest(messageState, message) if err != nil { logger.Warn("failed to handle RetractContactRequest", zap.Error(err)) @@ -4091,8 +4163,10 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte if m.pushNotificationServer == nil { continue } + message := msg.ParsedMessage.Interface().(protobuf.PushNotificationQuery) logger.Debug("Handling PushNotificationQuery") - if err := m.pushNotificationServer.HandlePushNotificationQuery(publicKey, msg.ID, msg.ParsedMessage.Interface().(protobuf.PushNotificationQuery)); err != nil { + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, message) + if err := m.pushNotificationServer.HandlePushNotificationQuery(publicKey, msg.ID, message); err != nil { allMessagesProcessed = false logger.Warn("failed to handle PushNotificationQuery", zap.Error(err)) } @@ -4104,7 +4178,9 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte continue } logger.Debug("Handling PushNotificationRegistrationResponse") - if err := m.pushNotificationClient.HandlePushNotificationRegistrationResponse(publicKey, msg.ParsedMessage.Interface().(protobuf.PushNotificationRegistrationResponse)); err != nil { + message := msg.ParsedMessage.Interface().(protobuf.PushNotificationRegistrationResponse) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, message) + if err := m.pushNotificationClient.HandlePushNotificationRegistrationResponse(publicKey, message); err != nil { allMessagesProcessed = false logger.Warn("failed to handle PushNotificationRegistrationResponse", zap.Error(err)) } @@ -4114,6 +4190,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte logger.Debug("Received ContactCodeAdvertisement") cca := msg.ParsedMessage.Interface().(protobuf.ContactCodeAdvertisement) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, cca) logger.Debug("protobuf.ContactCodeAdvertisement received", zap.Any("cca", cca)) if cca.ChatIdentity != nil { @@ -4144,7 +4221,9 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte continue } logger.Debug("Handling PushNotificationResponse") - if err := m.pushNotificationClient.HandlePushNotificationResponse(publicKey, msg.ParsedMessage.Interface().(protobuf.PushNotificationResponse)); err != nil { + message := msg.ParsedMessage.Interface().(protobuf.PushNotificationResponse) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, message) + if err := m.pushNotificationClient.HandlePushNotificationResponse(publicKey, message); err != nil { allMessagesProcessed = false logger.Warn("failed to handle PushNotificationResponse", zap.Error(err)) } @@ -4157,7 +4236,9 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte continue } logger.Debug("Handling PushNotificationQueryResponse") - if err := m.pushNotificationClient.HandlePushNotificationQueryResponse(publicKey, msg.ParsedMessage.Interface().(protobuf.PushNotificationQueryResponse)); err != nil { + message := msg.ParsedMessage.Interface().(protobuf.PushNotificationQueryResponse) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, message) + if err := m.pushNotificationClient.HandlePushNotificationQueryResponse(publicKey, message); err != nil { allMessagesProcessed = false logger.Warn("failed to handle PushNotificationQueryResponse", zap.Error(err)) } @@ -4170,7 +4251,9 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte continue } logger.Debug("Handling PushNotificationRequest") - if err := m.pushNotificationServer.HandlePushNotificationRequest(publicKey, msg.ID, msg.ParsedMessage.Interface().(protobuf.PushNotificationRequest)); err != nil { + message := msg.ParsedMessage.Interface().(protobuf.PushNotificationRequest) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, message) + if err := m.pushNotificationServer.HandlePushNotificationRequest(publicKey, msg.ID, message); err != nil { allMessagesProcessed = false logger.Warn("failed to handle PushNotificationRequest", zap.Error(err)) } @@ -4178,7 +4261,9 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte continue case protobuf.EmojiReaction: logger.Debug("Handling EmojiReaction") - err = m.HandleEmojiReaction(messageState, msg.ParsedMessage.Interface().(protobuf.EmojiReaction)) + message := msg.ParsedMessage.Interface().(protobuf.EmojiReaction) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, message) + err = m.HandleEmojiReaction(messageState, message) if err != nil { logger.Warn("failed to handle EmojiReaction", zap.Error(err)) allMessagesProcessed = false @@ -4186,14 +4271,18 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte } case protobuf.GroupChatInvitation: logger.Debug("Handling GroupChatInvitation") - err = m.HandleGroupChatInvitation(messageState, msg.ParsedMessage.Interface().(protobuf.GroupChatInvitation)) + message := msg.ParsedMessage.Interface().(protobuf.GroupChatInvitation) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, message) + err = m.HandleGroupChatInvitation(messageState, message) if err != nil { logger.Warn("failed to handle GroupChatInvitation", zap.Error(err)) allMessagesProcessed = false continue } case protobuf.ChatIdentity: - err = m.HandleChatIdentity(messageState, msg.ParsedMessage.Interface().(protobuf.ChatIdentity)) + message := msg.ParsedMessage.Interface().(protobuf.ChatIdentity) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, message) + err = m.HandleChatIdentity(messageState, message) if err != nil { logger.Warn("failed to handle ChatIdentity", zap.Error(err)) allMessagesProcessed = false @@ -4202,7 +4291,9 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte case protobuf.CommunityDescription: logger.Debug("Handling CommunityDescription") - err = m.handleCommunityDescription(messageState, publicKey, msg.ParsedMessage.Interface().(protobuf.CommunityDescription), msg.DecryptedPayload) + message := msg.ParsedMessage.Interface().(protobuf.CommunityDescription) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, message) + err = m.handleCommunityDescription(messageState, publicKey, message, msg.DecryptedPayload) if err != nil { logger.Warn("failed to handle CommunityDescription", zap.Error(err)) allMessagesProcessed = false @@ -4255,6 +4346,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte case protobuf.CommunityInvitation: logger.Debug("Handling CommunityInvitation") invitation := msg.ParsedMessage.Interface().(protobuf.CommunityInvitation) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, invitation) err = m.HandleCommunityInvitation(messageState, publicKey, invitation, invitation.CommunityDescription) if err != nil { logger.Warn("failed to handle CommunityInvitation", zap.Error(err)) @@ -4264,6 +4356,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte case protobuf.CommunityRequestToJoin: logger.Debug("Handling CommunityRequestToJoin") request := msg.ParsedMessage.Interface().(protobuf.CommunityRequestToJoin) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, request) err = m.HandleCommunityRequestToJoin(messageState, publicKey, request) if err != nil { logger.Warn("failed to handle CommunityRequestToJoin", zap.Error(err)) @@ -4282,6 +4375,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte case protobuf.CommunityMessageArchiveMagnetlink: logger.Debug("Handling CommunityMessageArchiveMagnetlink") magnetlinkMessage := msg.ParsedMessage.Interface().(protobuf.CommunityMessageArchiveMagnetlink) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, magnetlinkMessage) err = m.HandleHistoryArchiveMagnetlinkMessage(messageState, publicKey, magnetlinkMessage.MagnetUri, magnetlinkMessage.Clock) if err != nil { logger.Warn("failed to handle CommunityMessageArchiveMagnetlink", zap.Error(err)) @@ -4295,8 +4389,9 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte logger.Warn("unable to handle AnonymousMetricBatch, anonMetricsServer is nil") continue } - - ams, err := m.anonMetricsServer.StoreMetrics(msg.ParsedMessage.Interface().(protobuf.AnonymousMetricBatch)) + message := msg.ParsedMessage.Interface().(protobuf.AnonymousMetricBatch) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, message) + ams, err := m.anonMetricsServer.StoreMetrics(message) if err != nil { logger.Warn("failed to store AnonymousMetricBatch", zap.Error(err)) continue @@ -4310,6 +4405,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte } p := msg.ParsedMessage.Interface().(protobuf.SyncWalletAccounts) + m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, p) logger.Debug("Handling SyncWalletAccount", zap.Any("message", p)) err = m.HandleSyncWalletAccount(messageState, p) if err != nil { diff --git a/protocol/messenger_config.go b/protocol/messenger_config.go index d6da6e9b1..eeea297b6 100644 --- a/protocol/messenger_config.go +++ b/protocol/messenger_config.go @@ -88,6 +88,8 @@ type config struct { logger *zap.Logger + outputMessagesCSV bool + messengerSignalsHandler MessengerSignalsHandler telemetryServerURL string @@ -296,3 +298,10 @@ func WithRPCClient(r *rpc.Client) Option { return nil } } + +func WithMessageCSV(enabled bool) Option { + return func(c *config) error { + c.outputMessagesCSV = enabled + return nil + } +} diff --git a/services/ext/service.go b/services/ext/service.go index 28ed0449c..499de4fb7 100644 --- a/services/ext/service.go +++ b/services/ext/service.go @@ -416,6 +416,7 @@ func buildMessengerOptions( protocol.WithTorrentConfig(&config.TorrentConfig), protocol.WithHTTPServer(httpServer), protocol.WithRPCClient(rpcClient), + protocol.WithMessageCSV(config.OutputMessageCSVEnabled), } if config.ShhextConfig.DataSyncEnabled {