fix: Discord import tool various fixes for pinned messages (#4001)

This commit is contained in:
Igor Sirotin 2023-09-15 15:19:10 +01:00 committed by GitHub
parent 5e8300d6a1
commit daf3fc79bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 72 additions and 22 deletions

View File

@ -3566,6 +3566,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
}
var chatsToSave []*Chat
createdChats := make(map[string]*Chat, 0)
processedChannelIds := make(map[string]string, 0)
processedCategoriesIds := make(map[string]string, 0)
@ -3700,6 +3701,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
// know there was only a single such change (and it's a map)
for chatID, chat := range changes.ChatsAdded {
c := CreateCommunityChat(communityID, chatID, chat, m.getTimesource())
createdChats[c.ID] = c
chatsToSave = append(chatsToSave, c)
processedChannelIds[channel.Channel.ID] = c.ID
}
@ -3789,9 +3791,9 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
// Handle message replies
if discordMessage.Type == string(discord.MessageTypeReply) && discordMessage.Reference != nil {
_, exists := messagesToSave[communityID+discordMessage.Reference.MessageId]
if exists {
chatMessage.ResponseTo = communityID + discordMessage.Reference.MessageId
repliedMessageID := communityID + discordMessage.Reference.MessageId
if _, exists := messagesToSave[repliedMessageID]; exists {
chatMessage.ResponseTo = repliedMessageID
}
}
@ -3817,11 +3819,12 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
// Handle pin messages
if discordMessage.Type == string(discord.MessageTypeChannelPinned) && discordMessage.Reference != nil {
_, exists := messagesToSave[communityID+discordMessage.Reference.MessageId]
pinnedMessageID := communityID + discordMessage.Reference.MessageId
_, exists := messagesToSave[pinnedMessageID]
if exists {
pinMessage := protobuf.PinMessage{
Clock: messageToSave.WhisperTimestamp,
MessageId: communityID + discordMessage.Reference.MessageId,
MessageId: pinnedMessageID,
ChatId: messageToSave.LocalChatID,
MessageType: protobuf.MessageType_COMMUNITY_CHAT,
Pinned: true,
@ -3843,10 +3846,8 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
continue
}
messageID := v1protocol.MessageID(&communityPubKey, wrappedPayload)
pinMessageToSave := common.PinMessage{
ID: types.EncodeHex(messageID),
ID: types.EncodeHex(v1protocol.MessageID(&communityPubKey, wrappedPayload)),
PinMessage: &pinMessage,
LocalChatID: processedChannelIds[channel.Channel.ID],
From: messageToSave.From,
@ -3855,9 +3856,48 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
}
pinMessagesToSave = append(pinMessagesToSave, &pinMessageToSave)
// Generate SystemMessagePinnedMessage
chat, ok := createdChats[pinMessageToSave.LocalChatID]
if !ok {
err := errors.New("failed to get chat for pin message")
m.logger.Warn(err.Error(),
zap.String("PinMessageId", pinMessageToSave.ID),
zap.String("ChatID", pinMessageToSave.LocalChatID))
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Warning(err.Error()))
progressUpdates <- importProgress
continue
}
id, err := generatePinMessageNotificationID(&m.identity.PublicKey, &pinMessageToSave, chat)
if err != nil {
m.logger.Warn("failed to generate pin message notification ID",
zap.String("PinMessageId", pinMessageToSave.ID))
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Warning(err.Error()))
progressUpdates <- importProgress
continue
}
systemMessage := &common.Message{
ChatMessage: &protobuf.ChatMessage{
Clock: pinMessageToSave.Clock,
Timestamp: clockAndTimestamp,
ChatId: chat.ID,
MessageType: pinMessageToSave.MessageType,
ResponseTo: pinMessage.MessageId,
ContentType: protobuf.ChatMessage_SYSTEM_MESSAGE_PINNED_MESSAGE,
},
WhisperTimestamp: clockAndTimestamp,
ID: id,
LocalChatID: chat.ID,
From: messageToSave.From,
Seen: true,
}
messagesToSave[systemMessage.ID] = systemMessage
}
} else {
messagesToSave[communityID+discordMessage.Id] = messageToSave
messagesToSave[messageToSave.ID] = messageToSave
}
progressValue := calculateProgress(i+1, totalImportChunkCount, float32(ii+1)/float32(len(channel.Messages))*0.5)
@ -3874,10 +3914,12 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
var discordMessages []*protobuf.DiscordMessage
for _, msg := range messagesToSave {
discordMessages = append(discordMessages, msg.GetDiscordMessage())
if msg.ChatMessage.ContentType == protobuf.ChatMessage_DISCORD_MESSAGE {
discordMessages = append(discordMessages, msg.GetDiscordMessage())
}
}
// We save these messages in chunks so we don't block the database
// We save these messages in chunks, so we don't block the database
// for a longer period of time
discordMessageChunks := chunkSlice(discordMessages, maxChunkSizeMessages)
chunksCount := len(discordMessageChunks)
@ -3913,7 +3955,9 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
}
}
var messages []*common.Message
// Get slice of all values in `messagesToSave` map
var messages = make([]*common.Message, 0, len(messagesToSave))
for _, msg := range messagesToSave {
messages = append(messages, msg)
}
@ -4014,7 +4058,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
}
assetCounter.Increase()
progressValue := calculateProgress(i+1, totalImportChunkCount, (float32(assetCounter.Value())/float32(totalAssetsCount))*0.25)
progressValue := calculateProgress(i+1, totalImportChunkCount, (float32(assetCounter.Value())/float32(totalAssetsCount))*0.5)
importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue)
progressUpdates <- importProgress
@ -4061,7 +4105,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
}
assetCounter.Increase()
progressValue := calculateProgress(i+1, totalImportChunkCount, 0.25+(float32(assetCounter.Value())/float32(totalAssetsCount))*0.25)
progressValue := calculateProgress(i+1, totalImportChunkCount, (float32(assetCounter.Value())/float32(totalAssetsCount))*0.5)
importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue)
progressUpdates <- importProgress
}
@ -4110,6 +4154,11 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
}
}
if len(attachmentChunks) == 0 {
progressValue := calculateProgress(i+1, totalImportChunkCount, 1.0)
importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue)
}
_, err := m.transport.JoinPublic(processedChannelIds[channel.Channel.ID])
if err != nil {
m.logger.Error("failed to load filter for chat", zap.Error(err))
@ -4381,12 +4430,13 @@ func (m *Messenger) pinMessagesToWakuMessages(pinMessages []*common.PinMessage,
hash := crypto.Keccak256Hash(append([]byte(c.IDString()), wrappedPayload...))
wakuMessage := &types.Message{
Sig: crypto.FromECDSAPub(&c.PrivateKey().PublicKey),
Timestamp: uint32(msg.WhisperTimestamp / 1000),
Topic: filter.ContentTopic,
Payload: wrappedPayload,
Padding: []byte{1},
Hash: hash[:],
Sig: crypto.FromECDSAPub(&c.PrivateKey().PublicKey),
Timestamp: uint32(msg.WhisperTimestamp / 1000),
Topic: filter.ContentTopic,
Payload: wrappedPayload,
Padding: []byte{1},
Hash: hash[:],
ThirdPartyID: msg.ID, // CommunityID + DiscordMessageID
}
wakuMessages = append(wakuMessages, wakuMessage)
}
@ -4418,7 +4468,7 @@ func (m *Messenger) chatMessagesToWakuMessages(chatMessages []*common.Message, c
return nil, err
}
hash := crypto.Keccak256Hash([]byte(c.IDString() + msg.GetDiscordMessage().Id))
hash := crypto.Keccak256Hash([]byte(msg.ID))
wakuMessage := &types.Message{
Sig: crypto.FromECDSAPub(&c.PrivateKey().PublicKey),
Timestamp: uint32(msg.WhisperTimestamp / 1000),
@ -4426,7 +4476,7 @@ func (m *Messenger) chatMessagesToWakuMessages(chatMessages []*common.Message, c
Payload: wrappedPayload,
Padding: []byte{1},
Hash: hash[:],
ThirdPartyID: c.IDString() + msg.GetDiscordMessage().Id,
ThirdPartyID: msg.ID, // CommunityID + DiscordMessageID
}
wakuMessages = append(wakuMessages, wakuMessage)
}