feat: log raw messages to csv for debugging (only if explicitly enabled) (#2737)

This commit is contained in:
Richard Ramos 2022-08-24 08:06:48 -04:00 committed by GitHub
parent 92a622d6fb
commit 1f64bf1cfe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 137 additions and 29 deletions

View File

@ -498,6 +498,8 @@ type NodeConfig struct {
// PushNotificationServerConfig is the config for the push notification server
PushNotificationServerConfig pushnotificationserver.Config `json:"PushNotificationServerConfig"`
OutputMessageCSVEnabled bool
}
type Network struct {

View File

@ -6,6 +6,7 @@ import (
"crypto/ecdsa"
"database/sql"
"encoding/hex"
"encoding/json"
"fmt"
"io/ioutil"
"math"
@ -95,22 +96,26 @@ var messageCacheIntervalMs uint64 = 1000 * 60 * 60 * 48
// Similarly, it needs to expose an interface to manage
// mailservers because they can also be managed by the user.
type Messenger struct {
node types.Node
server *p2p.Server
peerStore *mailservers.PeerStore
config *config
identity *ecdsa.PrivateKey
persistence *sqlitePersistence
transport *transport.Transport
encryptor *encryption.Protocol
sender *common.MessageSender
ensVerifier *ens.Verifier
anonMetricsClient *anonmetrics.Client
anonMetricsServer *anonmetrics.Server
pushNotificationClient *pushnotificationclient.Client
pushNotificationServer *pushnotificationserver.Server
communitiesManager *communities.Manager
logger *zap.Logger
node types.Node
server *p2p.Server
peerStore *mailservers.PeerStore
config *config
identity *ecdsa.PrivateKey
persistence *sqlitePersistence
transport *transport.Transport
encryptor *encryption.Protocol
sender *common.MessageSender
ensVerifier *ens.Verifier
anonMetricsClient *anonmetrics.Client
anonMetricsServer *anonmetrics.Server
pushNotificationClient *pushnotificationclient.Client
pushNotificationServer *pushnotificationserver.Server
communitiesManager *communities.Manager
logger *zap.Logger
outputCSV bool
csvFile *os.File
verifyTransactionClient EthClient
featureFlags common.FeatureFlags
shutdownTasks []func() error
@ -464,6 +469,22 @@ func NewMessenger(
logger: logger,
}
if c.outputMessagesCSV {
messenger.outputCSV = c.outputMessagesCSV
csvFile, err := os.Create("messages-" + fmt.Sprint(time.Now().Unix()) + ".csv")
if err != nil {
return nil, err
}
_, err = csvFile.Write([]byte("timestamp\tmessageID\tfrom\ttopic\tchatID\tmessageType\tmessage\n"))
if err != nil {
return nil, err
}
messenger.csvFile = csvFile
messenger.shutdownTasks = append(messenger.shutdownTasks, csvFile.Close)
}
if anonMetricsClient != nil {
messenger.shutdownTasks = append(messenger.shutdownTasks, anonMetricsClient.Stop)
}
@ -3558,6 +3579,25 @@ func (m *Messenger) buildMessageState() *ReceivedMessageState {
}
}
func (m *Messenger) outputToCSV(timestamp uint32, messageID types.HexBytes, from string, topic types.TopicType, chatID string, msgType protobuf.ApplicationMetadataMessage_Type, parsedMessage interface{}) {
if !m.outputCSV {
return
}
msgJSON, err := json.Marshal(parsedMessage)
if err != nil {
m.logger.Error("could not marshall message", zap.Error(err))
return
}
line := fmt.Sprintf("%d\t%s\t%s\t%s\t%s\t%s\t%s\n", timestamp, messageID.String(), from, topic.String(), chatID, msgType, msgJSON)
_, err = m.csvFile.Write([]byte(line))
if err != nil {
m.logger.Error("could not write to csv", zap.Error(err))
return
}
}
func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filter][]*types.Message, storeWakuMessages bool) (*MessengerResponse, error) {
m.handleMessagesMutex.Lock()
@ -3613,8 +3653,9 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
logger.Warn("failed to handle shared secrets")
}
// Check for messages from blocked users
senderID := contactIDFromPublicKey(publicKey)
// Check for messages from blocked users
if contact, ok := messageState.AllContacts.Load(senderID); ok && contact.Blocked {
continue
}
@ -3653,10 +3694,12 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
if msg.ParsedMessage != nil {
logger.Debug("Handling parsed message")
switch msg.ParsedMessage.Interface().(type) {
case protobuf.MembershipUpdateMessage:
logger.Debug("Handling MembershipUpdateMessage")
rawMembershipUpdate := msg.ParsedMessage.Interface().(protobuf.MembershipUpdateMessage)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, rawMembershipUpdate)
chat, _ := messageState.AllChats.Load(rawMembershipUpdate.ChatId)
err = m.HandleMembershipUpdate(messageState, chat, rawMembershipUpdate, m.systemMessagesTranslations)
@ -3669,6 +3712,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.ChatMessage:
logger.Debug("Handling ChatMessage")
messageState.CurrentMessageState.Message = msg.ParsedMessage.Interface().(protobuf.ChatMessage)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, messageState.CurrentMessageState.Message)
err = m.HandleChatMessage(messageState)
if err != nil {
logger.Warn("failed to handle ChatMessage", zap.Error(err))
@ -3679,6 +3723,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.EditMessage:
logger.Debug("Handling EditMessage")
editProto := msg.ParsedMessage.Interface().(protobuf.EditMessage)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, editProto)
editMessage := EditMessage{
EditMessage: editProto,
From: contact.ID,
@ -3695,6 +3740,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.DeleteMessage:
logger.Debug("Handling DeleteMessage")
deleteProto := msg.ParsedMessage.Interface().(protobuf.DeleteMessage)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, deleteProto)
deleteMessage := DeleteMessage{
DeleteMessage: deleteProto,
From: contact.ID,
@ -3711,6 +3757,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.PinMessage:
pinMessage := msg.ParsedMessage.Interface().(protobuf.PinMessage)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, pinMessage)
err = m.HandlePinMessage(messageState, pinMessage)
if err != nil {
logger.Warn("failed to handle PinMessage", zap.Error(err))
@ -3724,6 +3771,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
continue
}
p := msg.ParsedMessage.Interface().(protobuf.PairInstallation)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, p)
logger.Debug("Handling PairInstallation", zap.Any("message", p))
err = m.HandlePairInstallation(messageState, p)
if err != nil {
@ -3734,6 +3782,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.StatusUpdate:
p := msg.ParsedMessage.Interface().(protobuf.StatusUpdate)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, p)
logger.Debug("Handling StatusUpdate", zap.Any("message", p))
err = m.HandleStatusUpdate(messageState, p)
if err != nil {
@ -3744,6 +3793,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.SyncInstallationContact:
logger.Warn("SyncInstallationContact is not supported")
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, msg.ParsedMessage.Interface().(protobuf.SyncInstallationContact))
continue
case protobuf.SyncInstallationContactV2:
@ -3753,6 +3803,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
p := msg.ParsedMessage.Interface().(protobuf.SyncInstallationContactV2)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, p)
logger.Debug("Handling SyncInstallationContact", zap.Any("message", p))
err = m.HandleSyncInstallationContact(messageState, p)
if err != nil {
@ -3768,6 +3819,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
p := msg.ParsedMessage.Interface().(protobuf.SyncProfilePictures)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, p)
logger.Debug("Handling SyncProfilePicture", zap.Any("message", p))
err = m.HandleSyncProfilePictures(messageState, p)
if err != nil {
@ -3783,6 +3835,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
p := msg.ParsedMessage.Interface().(protobuf.SyncBookmark)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, p)
logger.Debug("Handling SyncBookmark", zap.Any("message", p))
err = m.handleSyncBookmark(messageState, p)
if err != nil {
@ -3798,6 +3851,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
p := msg.ParsedMessage.Interface().(protobuf.SyncClearHistory)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, p)
logger.Debug("Handling SyncClearHistory", zap.Any("message", p))
err = m.handleSyncClearHistory(messageState, p)
if err != nil {
@ -3811,6 +3865,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
continue
}
p := msg.ParsedMessage.Interface().(protobuf.SyncCommunitySettings)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, p)
logger.Debug("Handling SyncCommunitySettings", zap.Any("message", p))
err = m.handleSyncCommunitySettings(messageState, p)
if err != nil {
@ -3856,6 +3911,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
p := msg.ParsedMessage.Interface().(protobuf.Backup)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, p)
logger.Debug("Handling Backup", zap.Any("message", p))
err = m.HandleBackup(messageState, p)
if err != nil {
@ -3871,6 +3927,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
p := msg.ParsedMessage.Interface().(protobuf.SyncInstallationPublicChat)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, p)
logger.Debug("Handling SyncInstallationPublicChat", zap.Any("message", p))
addedChat := m.HandleSyncInstallationPublicChat(messageState, p)
@ -3891,6 +3948,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
p := msg.ParsedMessage.Interface().(protobuf.SyncChatRemoved)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, p)
logger.Debug("Handling SyncChatRemoved", zap.Any("message", p))
err := m.HandleSyncChatRemoved(messageState, p)
if err != nil {
@ -3905,6 +3963,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
p := msg.ParsedMessage.Interface().(protobuf.SyncChatMessagesRead)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, p)
logger.Debug("Handling SyncChatMessagesRead", zap.Any("message", p))
err := m.HandleSyncChatMessagesRead(messageState, p)
if err != nil {
@ -3919,6 +3978,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
community := msg.ParsedMessage.Interface().(protobuf.SyncCommunity)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, community)
logger.Debug("Handling SyncCommunity", zap.Any("message", community))
err = m.handleSyncCommunity(messageState, community)
@ -3935,6 +3995,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
a := msg.ParsedMessage.Interface().(protobuf.SyncActivityCenterRead)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, a)
logger.Debug("Handling SyncActivityCenterRead", zap.Any("message", a))
err = m.handleActivityCenterRead(messageState, a)
@ -3951,6 +4012,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
a := msg.ParsedMessage.Interface().(protobuf.SyncActivityCenterAccepted)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, a)
logger.Debug("Handling SyncActivityCenterAccepted", zap.Any("message", a))
err = m.handleActivityCenterAccepted(messageState, a)
@ -3967,6 +4029,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
a := msg.ParsedMessage.Interface().(protobuf.SyncActivityCenterDismissed)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, a)
logger.Debug("Handling SyncActivityCenterDismissed", zap.Any("message", a))
err = m.handleActivityCenterDismissed(messageState, a)
@ -3983,6 +4046,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
ss := msg.ParsedMessage.Interface().(protobuf.SyncSetting)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, ss)
logger.Debug("Handling SyncSetting", zap.Any("message", ss))
err := m.handleSyncSetting(messageState.Response, &ss)
@ -3994,6 +4058,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.RequestAddressForTransaction:
command := msg.ParsedMessage.Interface().(protobuf.RequestAddressForTransaction)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, command)
logger.Debug("Handling RequestAddressForTransaction", zap.Any("message", command))
err = m.HandleRequestAddressForTransaction(messageState, command)
if err != nil {
@ -4004,6 +4069,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.SendTransaction:
command := msg.ParsedMessage.Interface().(protobuf.SendTransaction)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, command)
logger.Debug("Handling SendTransaction", zap.Any("message", command))
err = m.HandleSendTransaction(messageState, command)
if err != nil {
@ -4014,6 +4080,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.AcceptRequestAddressForTransaction:
command := msg.ParsedMessage.Interface().(protobuf.AcceptRequestAddressForTransaction)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, command)
logger.Debug("Handling AcceptRequestAddressForTransaction")
err = m.HandleAcceptRequestAddressForTransaction(messageState, command)
if err != nil {
@ -4024,6 +4091,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.DeclineRequestAddressForTransaction:
command := msg.ParsedMessage.Interface().(protobuf.DeclineRequestAddressForTransaction)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, command)
logger.Debug("Handling DeclineRequestAddressForTransaction")
err = m.HandleDeclineRequestAddressForTransaction(messageState, command)
if err != nil {
@ -4034,6 +4102,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.DeclineRequestTransaction:
command := msg.ParsedMessage.Interface().(protobuf.DeclineRequestTransaction)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, command)
logger.Debug("Handling DeclineRequestTransaction")
err = m.HandleDeclineRequestTransaction(messageState, command)
if err != nil {
@ -4044,7 +4113,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.RequestTransaction:
command := msg.ParsedMessage.Interface().(protobuf.RequestTransaction)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, command)
logger.Debug("Handling RequestTransaction")
err = m.HandleRequestTransaction(messageState, command)
if err != nil {
@ -4060,6 +4129,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
contactUpdate := msg.ParsedMessage.Interface().(protobuf.ContactUpdate)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, contactUpdate)
err = m.HandleContactUpdate(messageState, contactUpdate)
if err != nil {
logger.Warn("failed to handle ContactUpdate", zap.Error(err))
@ -4069,6 +4139,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.AcceptContactRequest:
logger.Debug("Handling AcceptContactRequest")
message := msg.ParsedMessage.Interface().(protobuf.AcceptContactRequest)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, message)
err = m.HandleAcceptContactRequest(messageState, message)
if err != nil {
logger.Warn("failed to handle AcceptContactRequest", zap.Error(err))
@ -4079,6 +4150,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
logger.Debug("Handling RetractContactRequest")
message := msg.ParsedMessage.Interface().(protobuf.RetractContactRequest)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, message)
err = m.HandleRetractContactRequest(messageState, message)
if err != nil {
logger.Warn("failed to handle RetractContactRequest", zap.Error(err))
@ -4091,8 +4163,10 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
if m.pushNotificationServer == nil {
continue
}
message := msg.ParsedMessage.Interface().(protobuf.PushNotificationQuery)
logger.Debug("Handling PushNotificationQuery")
if err := m.pushNotificationServer.HandlePushNotificationQuery(publicKey, msg.ID, msg.ParsedMessage.Interface().(protobuf.PushNotificationQuery)); err != nil {
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, message)
if err := m.pushNotificationServer.HandlePushNotificationQuery(publicKey, msg.ID, message); err != nil {
allMessagesProcessed = false
logger.Warn("failed to handle PushNotificationQuery", zap.Error(err))
}
@ -4104,7 +4178,9 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
continue
}
logger.Debug("Handling PushNotificationRegistrationResponse")
if err := m.pushNotificationClient.HandlePushNotificationRegistrationResponse(publicKey, msg.ParsedMessage.Interface().(protobuf.PushNotificationRegistrationResponse)); err != nil {
message := msg.ParsedMessage.Interface().(protobuf.PushNotificationRegistrationResponse)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, message)
if err := m.pushNotificationClient.HandlePushNotificationRegistrationResponse(publicKey, message); err != nil {
allMessagesProcessed = false
logger.Warn("failed to handle PushNotificationRegistrationResponse", zap.Error(err))
}
@ -4114,6 +4190,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
logger.Debug("Received ContactCodeAdvertisement")
cca := msg.ParsedMessage.Interface().(protobuf.ContactCodeAdvertisement)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, cca)
logger.Debug("protobuf.ContactCodeAdvertisement received", zap.Any("cca", cca))
if cca.ChatIdentity != nil {
@ -4144,7 +4221,9 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
continue
}
logger.Debug("Handling PushNotificationResponse")
if err := m.pushNotificationClient.HandlePushNotificationResponse(publicKey, msg.ParsedMessage.Interface().(protobuf.PushNotificationResponse)); err != nil {
message := msg.ParsedMessage.Interface().(protobuf.PushNotificationResponse)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, message)
if err := m.pushNotificationClient.HandlePushNotificationResponse(publicKey, message); err != nil {
allMessagesProcessed = false
logger.Warn("failed to handle PushNotificationResponse", zap.Error(err))
}
@ -4157,7 +4236,9 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
continue
}
logger.Debug("Handling PushNotificationQueryResponse")
if err := m.pushNotificationClient.HandlePushNotificationQueryResponse(publicKey, msg.ParsedMessage.Interface().(protobuf.PushNotificationQueryResponse)); err != nil {
message := msg.ParsedMessage.Interface().(protobuf.PushNotificationQueryResponse)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, message)
if err := m.pushNotificationClient.HandlePushNotificationQueryResponse(publicKey, message); err != nil {
allMessagesProcessed = false
logger.Warn("failed to handle PushNotificationQueryResponse", zap.Error(err))
}
@ -4170,7 +4251,9 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
continue
}
logger.Debug("Handling PushNotificationRequest")
if err := m.pushNotificationServer.HandlePushNotificationRequest(publicKey, msg.ID, msg.ParsedMessage.Interface().(protobuf.PushNotificationRequest)); err != nil {
message := msg.ParsedMessage.Interface().(protobuf.PushNotificationRequest)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, message)
if err := m.pushNotificationServer.HandlePushNotificationRequest(publicKey, msg.ID, message); err != nil {
allMessagesProcessed = false
logger.Warn("failed to handle PushNotificationRequest", zap.Error(err))
}
@ -4178,7 +4261,9 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
continue
case protobuf.EmojiReaction:
logger.Debug("Handling EmojiReaction")
err = m.HandleEmojiReaction(messageState, msg.ParsedMessage.Interface().(protobuf.EmojiReaction))
message := msg.ParsedMessage.Interface().(protobuf.EmojiReaction)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, message)
err = m.HandleEmojiReaction(messageState, message)
if err != nil {
logger.Warn("failed to handle EmojiReaction", zap.Error(err))
allMessagesProcessed = false
@ -4186,14 +4271,18 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
case protobuf.GroupChatInvitation:
logger.Debug("Handling GroupChatInvitation")
err = m.HandleGroupChatInvitation(messageState, msg.ParsedMessage.Interface().(protobuf.GroupChatInvitation))
message := msg.ParsedMessage.Interface().(protobuf.GroupChatInvitation)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, message)
err = m.HandleGroupChatInvitation(messageState, message)
if err != nil {
logger.Warn("failed to handle GroupChatInvitation", zap.Error(err))
allMessagesProcessed = false
continue
}
case protobuf.ChatIdentity:
err = m.HandleChatIdentity(messageState, msg.ParsedMessage.Interface().(protobuf.ChatIdentity))
message := msg.ParsedMessage.Interface().(protobuf.ChatIdentity)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, message)
err = m.HandleChatIdentity(messageState, message)
if err != nil {
logger.Warn("failed to handle ChatIdentity", zap.Error(err))
allMessagesProcessed = false
@ -4202,7 +4291,9 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.CommunityDescription:
logger.Debug("Handling CommunityDescription")
err = m.handleCommunityDescription(messageState, publicKey, msg.ParsedMessage.Interface().(protobuf.CommunityDescription), msg.DecryptedPayload)
message := msg.ParsedMessage.Interface().(protobuf.CommunityDescription)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, message)
err = m.handleCommunityDescription(messageState, publicKey, message, msg.DecryptedPayload)
if err != nil {
logger.Warn("failed to handle CommunityDescription", zap.Error(err))
allMessagesProcessed = false
@ -4255,6 +4346,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.CommunityInvitation:
logger.Debug("Handling CommunityInvitation")
invitation := msg.ParsedMessage.Interface().(protobuf.CommunityInvitation)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, invitation)
err = m.HandleCommunityInvitation(messageState, publicKey, invitation, invitation.CommunityDescription)
if err != nil {
logger.Warn("failed to handle CommunityInvitation", zap.Error(err))
@ -4264,6 +4356,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.CommunityRequestToJoin:
logger.Debug("Handling CommunityRequestToJoin")
request := msg.ParsedMessage.Interface().(protobuf.CommunityRequestToJoin)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, request)
err = m.HandleCommunityRequestToJoin(messageState, publicKey, request)
if err != nil {
logger.Warn("failed to handle CommunityRequestToJoin", zap.Error(err))
@ -4282,6 +4375,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
case protobuf.CommunityMessageArchiveMagnetlink:
logger.Debug("Handling CommunityMessageArchiveMagnetlink")
magnetlinkMessage := msg.ParsedMessage.Interface().(protobuf.CommunityMessageArchiveMagnetlink)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, magnetlinkMessage)
err = m.HandleHistoryArchiveMagnetlinkMessage(messageState, publicKey, magnetlinkMessage.MagnetUri, magnetlinkMessage.Clock)
if err != nil {
logger.Warn("failed to handle CommunityMessageArchiveMagnetlink", zap.Error(err))
@ -4295,8 +4389,9 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
logger.Warn("unable to handle AnonymousMetricBatch, anonMetricsServer is nil")
continue
}
ams, err := m.anonMetricsServer.StoreMetrics(msg.ParsedMessage.Interface().(protobuf.AnonymousMetricBatch))
message := msg.ParsedMessage.Interface().(protobuf.AnonymousMetricBatch)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, message)
ams, err := m.anonMetricsServer.StoreMetrics(message)
if err != nil {
logger.Warn("failed to store AnonymousMetricBatch", zap.Error(err))
continue
@ -4310,6 +4405,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
p := msg.ParsedMessage.Interface().(protobuf.SyncWalletAccounts)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, p)
logger.Debug("Handling SyncWalletAccount", zap.Any("message", p))
err = m.HandleSyncWalletAccount(messageState, p)
if err != nil {

View File

@ -88,6 +88,8 @@ type config struct {
logger *zap.Logger
outputMessagesCSV bool
messengerSignalsHandler MessengerSignalsHandler
telemetryServerURL string
@ -296,3 +298,10 @@ func WithRPCClient(r *rpc.Client) Option {
return nil
}
}
func WithMessageCSV(enabled bool) Option {
return func(c *config) error {
c.outputMessagesCSV = enabled
return nil
}
}

View File

@ -416,6 +416,7 @@ func buildMessengerOptions(
protocol.WithTorrentConfig(&config.TorrentConfig),
protocol.WithHTTPServer(httpServer),
protocol.WithRPCClient(rpcClient),
protocol.WithMessageCSV(config.OutputMessageCSVEnabled),
}
if config.ShhextConfig.DataSyncEnabled {