From daf3fc79bfbfc253d1bdc338d65b56ef7ca4bccf Mon Sep 17 00:00:00 2001 From: Igor Sirotin Date: Fri, 15 Sep 2023 15:19:10 +0100 Subject: [PATCH] fix: Discord import tool various fixes for pinned messages (#4001) --- protocol/messenger_communities.go | 94 +++++++++++++++++++++++-------- 1 file changed, 72 insertions(+), 22 deletions(-) diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index e34bda77b..c00a6d120 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -3566,6 +3566,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor } var chatsToSave []*Chat + createdChats := make(map[string]*Chat, 0) processedChannelIds := make(map[string]string, 0) processedCategoriesIds := make(map[string]string, 0) @@ -3700,6 +3701,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor // know there was only a single such change (and it's a map) for chatID, chat := range changes.ChatsAdded { c := CreateCommunityChat(communityID, chatID, chat, m.getTimesource()) + createdChats[c.ID] = c chatsToSave = append(chatsToSave, c) processedChannelIds[channel.Channel.ID] = c.ID } @@ -3789,9 +3791,9 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor // Handle message replies if discordMessage.Type == string(discord.MessageTypeReply) && discordMessage.Reference != nil { - _, exists := messagesToSave[communityID+discordMessage.Reference.MessageId] - if exists { - chatMessage.ResponseTo = communityID + discordMessage.Reference.MessageId + repliedMessageID := communityID + discordMessage.Reference.MessageId + if _, exists := messagesToSave[repliedMessageID]; exists { + chatMessage.ResponseTo = repliedMessageID } } @@ -3817,11 +3819,12 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor // Handle pin messages if discordMessage.Type == string(discord.MessageTypeChannelPinned) && discordMessage.Reference != nil { - _, exists := messagesToSave[communityID+discordMessage.Reference.MessageId] + pinnedMessageID := communityID + discordMessage.Reference.MessageId + _, exists := messagesToSave[pinnedMessageID] if exists { pinMessage := protobuf.PinMessage{ Clock: messageToSave.WhisperTimestamp, - MessageId: communityID + discordMessage.Reference.MessageId, + MessageId: pinnedMessageID, ChatId: messageToSave.LocalChatID, MessageType: protobuf.MessageType_COMMUNITY_CHAT, Pinned: true, @@ -3843,10 +3846,8 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor continue } - messageID := v1protocol.MessageID(&communityPubKey, wrappedPayload) - pinMessageToSave := common.PinMessage{ - ID: types.EncodeHex(messageID), + ID: types.EncodeHex(v1protocol.MessageID(&communityPubKey, wrappedPayload)), PinMessage: &pinMessage, LocalChatID: processedChannelIds[channel.Channel.ID], From: messageToSave.From, @@ -3855,9 +3856,48 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor } pinMessagesToSave = append(pinMessagesToSave, &pinMessageToSave) + + // Generate SystemMessagePinnedMessage + + chat, ok := createdChats[pinMessageToSave.LocalChatID] + if !ok { + err := errors.New("failed to get chat for pin message") + m.logger.Warn(err.Error(), + zap.String("PinMessageId", pinMessageToSave.ID), + zap.String("ChatID", pinMessageToSave.LocalChatID)) + importProgress.AddTaskError(discord.ImportMessagesTask, discord.Warning(err.Error())) + progressUpdates <- importProgress + continue + } + + id, err := generatePinMessageNotificationID(&m.identity.PublicKey, &pinMessageToSave, chat) + if err != nil { + m.logger.Warn("failed to generate pin message notification ID", + zap.String("PinMessageId", pinMessageToSave.ID)) + importProgress.AddTaskError(discord.ImportMessagesTask, discord.Warning(err.Error())) + progressUpdates <- importProgress + continue + } + systemMessage := &common.Message{ + ChatMessage: &protobuf.ChatMessage{ + Clock: pinMessageToSave.Clock, + Timestamp: clockAndTimestamp, + ChatId: chat.ID, + MessageType: pinMessageToSave.MessageType, + ResponseTo: pinMessage.MessageId, + ContentType: protobuf.ChatMessage_SYSTEM_MESSAGE_PINNED_MESSAGE, + }, + WhisperTimestamp: clockAndTimestamp, + ID: id, + LocalChatID: chat.ID, + From: messageToSave.From, + Seen: true, + } + + messagesToSave[systemMessage.ID] = systemMessage } } else { - messagesToSave[communityID+discordMessage.Id] = messageToSave + messagesToSave[messageToSave.ID] = messageToSave } progressValue := calculateProgress(i+1, totalImportChunkCount, float32(ii+1)/float32(len(channel.Messages))*0.5) @@ -3874,10 +3914,12 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor var discordMessages []*protobuf.DiscordMessage for _, msg := range messagesToSave { - discordMessages = append(discordMessages, msg.GetDiscordMessage()) + if msg.ChatMessage.ContentType == protobuf.ChatMessage_DISCORD_MESSAGE { + discordMessages = append(discordMessages, msg.GetDiscordMessage()) + } } - // We save these messages in chunks so we don't block the database + // We save these messages in chunks, so we don't block the database // for a longer period of time discordMessageChunks := chunkSlice(discordMessages, maxChunkSizeMessages) chunksCount := len(discordMessageChunks) @@ -3913,7 +3955,9 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor } } - var messages []*common.Message + // Get slice of all values in `messagesToSave` map + + var messages = make([]*common.Message, 0, len(messagesToSave)) for _, msg := range messagesToSave { messages = append(messages, msg) } @@ -4014,7 +4058,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor } assetCounter.Increase() - progressValue := calculateProgress(i+1, totalImportChunkCount, (float32(assetCounter.Value())/float32(totalAssetsCount))*0.25) + progressValue := calculateProgress(i+1, totalImportChunkCount, (float32(assetCounter.Value())/float32(totalAssetsCount))*0.5) importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue) progressUpdates <- importProgress @@ -4061,7 +4105,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor } assetCounter.Increase() - progressValue := calculateProgress(i+1, totalImportChunkCount, 0.25+(float32(assetCounter.Value())/float32(totalAssetsCount))*0.25) + progressValue := calculateProgress(i+1, totalImportChunkCount, (float32(assetCounter.Value())/float32(totalAssetsCount))*0.5) importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue) progressUpdates <- importProgress } @@ -4110,6 +4154,11 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor } } + if len(attachmentChunks) == 0 { + progressValue := calculateProgress(i+1, totalImportChunkCount, 1.0) + importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue) + } + _, err := m.transport.JoinPublic(processedChannelIds[channel.Channel.ID]) if err != nil { m.logger.Error("failed to load filter for chat", zap.Error(err)) @@ -4381,12 +4430,13 @@ func (m *Messenger) pinMessagesToWakuMessages(pinMessages []*common.PinMessage, hash := crypto.Keccak256Hash(append([]byte(c.IDString()), wrappedPayload...)) wakuMessage := &types.Message{ - Sig: crypto.FromECDSAPub(&c.PrivateKey().PublicKey), - Timestamp: uint32(msg.WhisperTimestamp / 1000), - Topic: filter.ContentTopic, - Payload: wrappedPayload, - Padding: []byte{1}, - Hash: hash[:], + Sig: crypto.FromECDSAPub(&c.PrivateKey().PublicKey), + Timestamp: uint32(msg.WhisperTimestamp / 1000), + Topic: filter.ContentTopic, + Payload: wrappedPayload, + Padding: []byte{1}, + Hash: hash[:], + ThirdPartyID: msg.ID, // CommunityID + DiscordMessageID } wakuMessages = append(wakuMessages, wakuMessage) } @@ -4418,7 +4468,7 @@ func (m *Messenger) chatMessagesToWakuMessages(chatMessages []*common.Message, c return nil, err } - hash := crypto.Keccak256Hash([]byte(c.IDString() + msg.GetDiscordMessage().Id)) + hash := crypto.Keccak256Hash([]byte(msg.ID)) wakuMessage := &types.Message{ Sig: crypto.FromECDSAPub(&c.PrivateKey().PublicKey), Timestamp: uint32(msg.WhisperTimestamp / 1000), @@ -4426,7 +4476,7 @@ func (m *Messenger) chatMessagesToWakuMessages(chatMessages []*common.Message, c Payload: wrappedPayload, Padding: []byte{1}, Hash: hash[:], - ThirdPartyID: c.IDString() + msg.GetDiscordMessage().Id, + ThirdPartyID: msg.ID, // CommunityID + DiscordMessageID } wakuMessages = append(wakuMessages, wakuMessage) }