parent
22fc83de59
commit
5422b867f3
|
@ -19,11 +19,12 @@ type EnumType struct {
|
|||
|
||||
// MethodInfo holds information about a method
|
||||
type MethodInfo struct {
|
||||
ProtobufName string
|
||||
MethodName string
|
||||
EnumValue string
|
||||
ProcessRaw bool
|
||||
SyncMessage bool
|
||||
ProtobufName string
|
||||
MethodName string
|
||||
EnumValue string
|
||||
ProcessRaw bool
|
||||
SyncMessage bool
|
||||
FromArchiveArg bool
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
@ -67,6 +68,8 @@ func main() {
|
|||
info.ProcessRaw = true
|
||||
}
|
||||
|
||||
info.FromArchiveArg = protobufName == "ChatMessage" || protobufName == "PinMessage"
|
||||
|
||||
methodInfos = append(methodInfos, info)
|
||||
}
|
||||
|
||||
|
|
|
@ -16,11 +16,11 @@ import (
|
|||
v1protocol "github.com/status-im/status-go/protocol/v1"
|
||||
)
|
||||
|
||||
func (m *Messenger) dispatchToHandler(messageState *ReceivedMessageState, protoBytes []byte, msg *v1protocol.StatusMessage, filter transport.Filter) error {
|
||||
func (m *Messenger) dispatchToHandler(messageState *ReceivedMessageState, protoBytes []byte, msg *v1protocol.StatusMessage, filter transport.Filter, fromArchive bool) error {
|
||||
switch msg.Type {
|
||||
{{ range .}}
|
||||
case protobuf.ApplicationMetadataMessage_{{.EnumValue}}:
|
||||
return m.{{.MethodName}}(messageState, protoBytes, msg, filter)
|
||||
return m.{{.MethodName}}(messageState, protoBytes, msg, filter{{ if .FromArchiveArg }}, fromArchive{{ end }})
|
||||
{{ end }}
|
||||
default:
|
||||
m.logger.Info("protobuf type not found", zap.String("type", string(msg.Type)))
|
||||
|
@ -30,7 +30,7 @@ func (m *Messenger) dispatchToHandler(messageState *ReceivedMessageState, protoB
|
|||
}
|
||||
|
||||
{{ range . }}
|
||||
func (m *Messenger) {{.MethodName}}(messageState *ReceivedMessageState, protoBytes []byte, msg *v1protocol.StatusMessage, filter transport.Filter) error {
|
||||
func (m *Messenger) {{.MethodName}}(messageState *ReceivedMessageState, protoBytes []byte, msg *v1protocol.StatusMessage, filter transport.Filter{{ if .FromArchiveArg }}, fromArchive bool{{ end }}) error {
|
||||
m.logger.Info("handling {{ .ProtobufName}}")
|
||||
{{ if .SyncMessage }}
|
||||
if !common.IsPubKeyEqual(messageState.CurrentMessageState.PublicKey, &m.identity.PublicKey) {
|
||||
|
@ -50,7 +50,7 @@ func (m *Messenger) {{.MethodName}}(messageState *ReceivedMessageState, protoByt
|
|||
|
||||
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, messageState.CurrentMessageState.Contact.ID, filter.ContentTopic, filter.ChatID, msg.Type, p)
|
||||
|
||||
return m.Handle{{.ProtobufName}}(messageState, p, msg)
|
||||
return m.Handle{{.ProtobufName}}(messageState, p, msg{{ if .FromArchiveArg }}, fromArchive {{ end }})
|
||||
{{ end }}
|
||||
}
|
||||
|
||||
|
|
|
@ -3158,7 +3158,7 @@ func (m *Messenger) RetrieveAll() (*MessengerResponse, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
return m.handleRetrievedMessages(chatWithMessages, true)
|
||||
return m.handleRetrievedMessages(chatWithMessages, true, false)
|
||||
}
|
||||
|
||||
func (m *Messenger) GetStats() types.StatsSummary {
|
||||
|
@ -3435,22 +3435,17 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter
|
|||
switch msg.Type {
|
||||
|
||||
case protobuf.ApplicationMetadataMessage_CHAT_MESSAGE:
|
||||
logger.Debug("Handling ChatMessage")
|
||||
|
||||
protoMessage := &protobuf.ChatMessage{}
|
||||
err := proto.Unmarshal(msg.UnwrappedPayload, protoMessage)
|
||||
if err != nil {
|
||||
logger.Warn("failed to unmarshal ChatMessage", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
messageState.CurrentMessageState.Message = protoMessage
|
||||
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.ContentTopic, filter.ChatID, msg.Type, messageState.CurrentMessageState.Message)
|
||||
err = m.HandleImportedChatMessage(messageState)
|
||||
err = m.handleChatMessageProtobuf(messageState, msg.UnwrappedPayload, msg, filter, true)
|
||||
if err != nil {
|
||||
logger.Warn("failed to handle ChatMessage", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
case protobuf.ApplicationMetadataMessage_PIN_MESSAGE:
|
||||
err = m.handlePinMessageProtobuf(messageState, msg.UnwrappedPayload, msg, filter, true)
|
||||
if err != nil {
|
||||
logger.Warn("failed to handle PinMessage", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3513,7 +3508,7 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filter][]*types.Message, storeWakuMessages bool) (*MessengerResponse, error) {
|
||||
func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filter][]*types.Message, storeWakuMessages bool, fromArchive bool) (*MessengerResponse, error) {
|
||||
|
||||
m.handleMessagesMutex.Lock()
|
||||
defer m.handleMessagesMutex.Unlock()
|
||||
|
@ -3618,7 +3613,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
|
|||
|
||||
if msg.UnwrappedPayload != nil {
|
||||
|
||||
err := m.dispatchToHandler(messageState, msg.UnwrappedPayload, msg, filter)
|
||||
err := m.dispatchToHandler(messageState, msg.UnwrappedPayload, msg, filter, fromArchive)
|
||||
if err != nil {
|
||||
allMessagesProcessed = false
|
||||
logger.Warn("failed to process protobuf", zap.Error(err))
|
||||
|
|
|
@ -2626,8 +2626,8 @@ func (m *Messenger) handleCommunityResponse(state *ReceivedMessageState, communi
|
|||
state.Response.CommunityChanges = append(state.Response.CommunityChanges, communityResponse.Changes)
|
||||
state.Response.AddRequestsToJoinCommunity(communityResponse.RequestsToJoin)
|
||||
|
||||
// If we haven't joined the org, nothing to do
|
||||
if !community.Joined() {
|
||||
// If we haven't joined/spectated the org, nothing to do
|
||||
if !community.Joined() && !community.Spectated() {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1208,7 +1208,7 @@ func (s *MessengerContactRequestSuite) TestReceiveAcceptAndRetractContactRequest
|
|||
}
|
||||
|
||||
response := state.Response
|
||||
err = s.m.HandleChatMessage(state, &message, nil)
|
||||
err = s.m.HandleChatMessage(state, &message, nil, false)
|
||||
s.Require().NoError(err)
|
||||
s.Require().Len(response.ActivityCenterNotifications(), 1)
|
||||
contacts := s.m.Contacts()
|
||||
|
|
|
@ -198,7 +198,7 @@ func (s *MessengerDeleteMessageSuite) TestDeleteMessageFirstThenMessage() {
|
|||
PublicKey: &theirMessenger.identity.PublicKey,
|
||||
},
|
||||
}
|
||||
err = s.m.HandleChatMessage(state, inputMessage.ChatMessage, nil)
|
||||
err = s.m.HandleChatMessage(state, inputMessage.ChatMessage, nil, false)
|
||||
s.Require().NoError(err)
|
||||
s.Require().Len(state.Response.Messages(), 0) // Message should not be added to response
|
||||
s.Require().Len(state.Response.RemovedMessages(), 0)
|
||||
|
@ -338,7 +338,7 @@ func (s *MessengerDeleteMessageSuite) TestDeleteImageMessageFirstThenMessage() {
|
|||
PublicKey: &theirMessenger.identity.PublicKey,
|
||||
},
|
||||
}
|
||||
err = s.m.HandleChatMessage(state, album[0].ChatMessage, nil)
|
||||
err = s.m.HandleChatMessage(state, album[0].ChatMessage, nil, false)
|
||||
s.Require().NoError(err)
|
||||
s.Require().Len(state.Response.Messages(), 0) // Message should not be added to response
|
||||
s.Require().Len(state.Response.RemovedMessages(), 0)
|
||||
|
@ -354,7 +354,7 @@ func (s *MessengerDeleteMessageSuite) TestDeleteImageMessageFirstThenMessage() {
|
|||
PublicKey: &theirMessenger.identity.PublicKey,
|
||||
},
|
||||
}
|
||||
err = s.m.HandleChatMessage(state, album[1].ChatMessage, nil)
|
||||
err = s.m.HandleChatMessage(state, album[1].ChatMessage, nil, false)
|
||||
s.Require().NoError(err)
|
||||
s.Require().Len(state.Response.Messages(), 0) // Message should not be added to response even if we didn't delete that ID
|
||||
s.Require().Len(state.Response.RemovedMessages(), 0)
|
||||
|
|
|
@ -244,7 +244,7 @@ func (s *MessengerEditMessageSuite) TestEditMessageFirstEditsThenMessage() {
|
|||
PublicKey: &theirMessenger.identity.PublicKey,
|
||||
},
|
||||
}
|
||||
err = s.m.HandleChatMessage(state, inputMessage.ChatMessage, nil)
|
||||
err = s.m.HandleChatMessage(state, inputMessage.ChatMessage, nil, false)
|
||||
s.Require().NoError(err)
|
||||
s.Require().Len(response.Messages(), 1)
|
||||
|
||||
|
|
|
@ -245,7 +245,7 @@ func (m *Messenger) HandleMembershipUpdate(messageState *ReceivedMessageState, c
|
|||
}
|
||||
|
||||
if message.Message != nil {
|
||||
return m.HandleChatMessage(messageState, message.Message, nil)
|
||||
return m.HandleChatMessage(messageState, message.Message, nil, false)
|
||||
} else if message.EmojiReaction != nil {
|
||||
return m.HandleEmojiReaction(messageState, message.EmojiReaction, nil)
|
||||
}
|
||||
|
@ -781,7 +781,7 @@ func (m *Messenger) HandleSyncChatMessagesRead(state *ReceivedMessageState, mess
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *Messenger) handlePinMessage(pinner *Contact, whisperTimestamp uint64, response *MessengerResponse, message *protobuf.PinMessage) error {
|
||||
func (m *Messenger) handlePinMessage(pinner *Contact, whisperTimestamp uint64, response *MessengerResponse, message *protobuf.PinMessage, forceSeen bool) error {
|
||||
logger := m.logger.With(zap.String("site", "HandlePinMessage"))
|
||||
|
||||
logger.Info("Handling pin message")
|
||||
|
@ -816,9 +816,6 @@ func (m *Messenger) handlePinMessage(pinner *Contact, whisperTimestamp uint64, r
|
|||
return nil
|
||||
}
|
||||
|
||||
// Set the LocalChatID for the message
|
||||
pinMessage.LocalChatID = chat.ID
|
||||
|
||||
if c, ok := m.allChats.Load(chat.ID); ok {
|
||||
chat = c
|
||||
}
|
||||
|
@ -842,7 +839,7 @@ func (m *Messenger) handlePinMessage(pinner *Contact, whisperTimestamp uint64, r
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
message := &common.Message{
|
||||
systemMessage := &common.Message{
|
||||
ChatMessage: &protobuf.ChatMessage{
|
||||
Clock: message.Clock,
|
||||
Timestamp: whisperTimestamp,
|
||||
|
@ -856,7 +853,12 @@ func (m *Messenger) handlePinMessage(pinner *Contact, whisperTimestamp uint64, r
|
|||
LocalChatID: chat.ID,
|
||||
From: pinner.ID,
|
||||
}
|
||||
response.AddMessage(message)
|
||||
|
||||
if forceSeen {
|
||||
systemMessage.Seen = true
|
||||
}
|
||||
|
||||
response.AddMessage(systemMessage)
|
||||
chat.UnviewedMessagesCount++
|
||||
}
|
||||
|
||||
|
@ -872,8 +874,8 @@ func (m *Messenger) handlePinMessage(pinner *Contact, whisperTimestamp uint64, r
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *Messenger) HandlePinMessage(state *ReceivedMessageState, message *protobuf.PinMessage, statusMessage *v1protocol.StatusMessage) error {
|
||||
return m.handlePinMessage(state.CurrentMessageState.Contact, state.CurrentMessageState.WhisperTimestamp, state.Response, message)
|
||||
func (m *Messenger) HandlePinMessage(state *ReceivedMessageState, message *protobuf.PinMessage, statusMessage *v1protocol.StatusMessage, fromArchive bool) error {
|
||||
return m.handlePinMessage(state.CurrentMessageState.Contact, state.CurrentMessageState.WhisperTimestamp, state.Response, message, fromArchive)
|
||||
}
|
||||
|
||||
func (m *Messenger) handleAcceptContactRequest(
|
||||
|
@ -1353,7 +1355,7 @@ func (m *Messenger) handleArchiveMessages(archiveMessages []*protobuf.WakuMessag
|
|||
return nil, err
|
||||
}
|
||||
|
||||
response, err := m.handleRetrievedMessages(otherMessages, false)
|
||||
response, err := m.handleRetrievedMessages(otherMessages, false, true)
|
||||
if err != nil {
|
||||
m.communitiesManager.LogStdout("failed to write history archive messages to database", zap.Error(err))
|
||||
return nil, err
|
||||
|
@ -2267,13 +2269,9 @@ func (m *Messenger) handleChatMessage(state *ReceivedMessageState, forceSeen boo
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *Messenger) HandleChatMessage(state *ReceivedMessageState, message *protobuf.ChatMessage, statusMessage *v1protocol.StatusMessage) error {
|
||||
func (m *Messenger) HandleChatMessage(state *ReceivedMessageState, message *protobuf.ChatMessage, statusMessage *v1protocol.StatusMessage, fromArchive bool) error {
|
||||
state.CurrentMessageState.Message = message
|
||||
return m.handleChatMessage(state, false)
|
||||
}
|
||||
|
||||
func (m *Messenger) HandleImportedChatMessage(state *ReceivedMessageState) error {
|
||||
return m.handleChatMessage(state, true)
|
||||
return m.handleChatMessage(state, fromArchive)
|
||||
}
|
||||
|
||||
func (m *Messenger) addActivityCenterNotification(response *MessengerResponse, notification *ActivityCenterNotification) error {
|
||||
|
|
|
@ -16,11 +16,11 @@ import (
|
|||
v1protocol "github.com/status-im/status-go/protocol/v1"
|
||||
)
|
||||
|
||||
func (m *Messenger) dispatchToHandler(messageState *ReceivedMessageState, protoBytes []byte, msg *v1protocol.StatusMessage, filter transport.Filter) error {
|
||||
func (m *Messenger) dispatchToHandler(messageState *ReceivedMessageState, protoBytes []byte, msg *v1protocol.StatusMessage, filter transport.Filter, fromArchive bool) error {
|
||||
switch msg.Type {
|
||||
|
||||
case protobuf.ApplicationMetadataMessage_CHAT_MESSAGE:
|
||||
return m.handleChatMessageProtobuf(messageState, protoBytes, msg, filter)
|
||||
return m.handleChatMessageProtobuf(messageState, protoBytes, msg, filter, fromArchive)
|
||||
|
||||
case protobuf.ApplicationMetadataMessage_CONTACT_UPDATE:
|
||||
return m.handleContactUpdateProtobuf(messageState, protoBytes, msg, filter)
|
||||
|
@ -95,7 +95,7 @@ func (m *Messenger) dispatchToHandler(messageState *ReceivedMessageState, protoB
|
|||
return m.handleCommunityRequestToJoinProtobuf(messageState, protoBytes, msg, filter)
|
||||
|
||||
case protobuf.ApplicationMetadataMessage_PIN_MESSAGE:
|
||||
return m.handlePinMessageProtobuf(messageState, protoBytes, msg, filter)
|
||||
return m.handlePinMessageProtobuf(messageState, protoBytes, msg, filter, fromArchive)
|
||||
|
||||
case protobuf.ApplicationMetadataMessage_EDIT_MESSAGE:
|
||||
return m.handleEditMessageProtobuf(messageState, protoBytes, msg, filter)
|
||||
|
@ -234,7 +234,7 @@ func (m *Messenger) dispatchToHandler(messageState *ReceivedMessageState, protoB
|
|||
}
|
||||
|
||||
|
||||
func (m *Messenger) handleChatMessageProtobuf(messageState *ReceivedMessageState, protoBytes []byte, msg *v1protocol.StatusMessage, filter transport.Filter) error {
|
||||
func (m *Messenger) handleChatMessageProtobuf(messageState *ReceivedMessageState, protoBytes []byte, msg *v1protocol.StatusMessage, filter transport.Filter, fromArchive bool) error {
|
||||
m.logger.Info("handling ChatMessage")
|
||||
|
||||
|
||||
|
@ -247,7 +247,7 @@ func (m *Messenger) handleChatMessageProtobuf(messageState *ReceivedMessageState
|
|||
|
||||
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, messageState.CurrentMessageState.Contact.ID, filter.ContentTopic, filter.ChatID, msg.Type, p)
|
||||
|
||||
return m.HandleChatMessage(messageState, p, msg)
|
||||
return m.HandleChatMessage(messageState, p, msg, fromArchive )
|
||||
|
||||
}
|
||||
|
||||
|
@ -696,7 +696,7 @@ func (m *Messenger) handleCommunityRequestToJoinProtobuf(messageState *ReceivedM
|
|||
}
|
||||
|
||||
|
||||
func (m *Messenger) handlePinMessageProtobuf(messageState *ReceivedMessageState, protoBytes []byte, msg *v1protocol.StatusMessage, filter transport.Filter) error {
|
||||
func (m *Messenger) handlePinMessageProtobuf(messageState *ReceivedMessageState, protoBytes []byte, msg *v1protocol.StatusMessage, filter transport.Filter, fromArchive bool) error {
|
||||
m.logger.Info("handling PinMessage")
|
||||
|
||||
|
||||
|
@ -709,7 +709,7 @@ func (m *Messenger) handlePinMessageProtobuf(messageState *ReceivedMessageState,
|
|||
|
||||
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, messageState.CurrentMessageState.Contact.ID, filter.ContentTopic, filter.ChatID, msg.Type, p)
|
||||
|
||||
return m.HandlePinMessage(messageState, p, msg)
|
||||
return m.HandlePinMessage(messageState, p, msg, fromArchive )
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -133,6 +133,7 @@ func (s *MessengerPinMessageSuite) TestPinMessageOutOfOrder() {
|
|||
1000,
|
||||
handlePinMessageResponse,
|
||||
&unpinMessage,
|
||||
false,
|
||||
)
|
||||
s.Require().NoError(err)
|
||||
|
||||
|
@ -156,6 +157,7 @@ func (s *MessengerPinMessageSuite) TestPinMessageOutOfOrder() {
|
|||
1000,
|
||||
handlePinMessageResponse,
|
||||
&pinMessage,
|
||||
false,
|
||||
)
|
||||
s.Require().NoError(err)
|
||||
|
||||
|
@ -180,6 +182,7 @@ func (s *MessengerPinMessageSuite) TestPinMessageOutOfOrder() {
|
|||
1000,
|
||||
handlePinMessageResponse,
|
||||
&pinMessage,
|
||||
false,
|
||||
)
|
||||
s.Require().NoError(err)
|
||||
|
||||
|
|
Loading…
Reference in New Issue