From 8ae6e3035b780b977a00ffb5a589f1090d491c8c Mon Sep 17 00:00:00 2001 From: Boris Melnik Date: Wed, 25 Oct 2023 19:32:21 +0300 Subject: [PATCH] feat(discord): Import single channel from discord (#4160) --- protocol/discord/types.go | 2 + protocol/messenger.go | 2 + protocol/messenger_communities.go | 763 ++++++++++++++++++++ protocol/requests/import_discord_channel.go | 41 ++ services/ext/api.go | 8 + 5 files changed, 816 insertions(+) create mode 100644 protocol/requests/import_discord_channel.go diff --git a/protocol/discord/types.go b/protocol/discord/types.go index 48df0194a..bfaba5446 100644 --- a/protocol/discord/types.go +++ b/protocol/discord/types.go @@ -154,6 +154,8 @@ type ImportTasks map[ImportTask]*ImportTaskProgress type ImportProgress struct { CommunityID string `json:"communityId,omitempty"` CommunityName string `json:"communityName"` + ChannelID string `json:"channelId"` + ChannelName string `json:"channelName"` CommunityImages map[string]images.IdentityImage `json:"communityImages"` Tasks []*ImportTaskProgress `json:"tasks"` Progress float32 `json:"progress"` diff --git a/protocol/messenger.go b/protocol/messenger.go index f7a6d3c0e..ad21c08be 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -147,6 +147,7 @@ type Messenger struct { cancel context.CancelFunc importingCommunities map[string]bool + importingChannels map[string]bool importRateLimiter *rate.Limiter importDelayer struct { wait chan struct{} @@ -540,6 +541,7 @@ func NewMessenger( requestedContactsLock: sync.RWMutex{}, requestedContacts: make(map[string]*transport.Filter), importingCommunities: make(map[string]bool), + importingChannels: make(map[string]bool), importRateLimiter: rate.NewLimiter(rate.Every(importSlowRate), 1), importDelayer: struct { wait chan struct{} diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index b285e881c..9b34cff73 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -3810,6 +3810,740 @@ func (m *Messenger) RequestExtractDiscordChannelsAndCategories(filesToImport []s }() } +func (m *Messenger) saveDiscordAuthorIfNotExists(discordAuthor *protobuf.DiscordMessageAuthor) *discord.ImportError { + exists, err := m.persistence.HasDiscordMessageAuthor(discordAuthor.GetId()) + if err != nil { + m.logger.Error("failed to check if message author exists in database", zap.Error(err)) + return discord.Error(err.Error()) + } + + if !exists { + err := m.persistence.SaveDiscordMessageAuthor(discordAuthor) + if err != nil { + return discord.Error(err.Error()) + } + } + + return nil +} + +func (m *Messenger) convertDiscordMessageTimeStamp(discordMessage *protobuf.DiscordMessage, timestamp time.Time) *discord.ImportError { + discordMessage.Timestamp = fmt.Sprintf("%d", timestamp.Unix()) + + if discordMessage.TimestampEdited != "" { + timestampEdited, err := time.Parse(discordTimestampLayout, discordMessage.TimestampEdited) + if err != nil { + m.logger.Error("failed to parse discord message timestamp", zap.Error(err)) + return discord.Warning(err.Error()) + } + // Convert timestamp to unix timestamp + discordMessage.TimestampEdited = fmt.Sprintf("%d", timestampEdited.Unix()) + } + + return nil +} + +func (m *Messenger) createPinMessageFromDiscordMessage(message *common.Message, pinnedMessageID string, channelID string, community *communities.Community) (*common.PinMessage, *discord.ImportError) { + pinMessage := protobuf.PinMessage{ + Clock: message.WhisperTimestamp, + MessageId: pinnedMessageID, + ChatId: message.LocalChatID, + MessageType: protobuf.MessageType_COMMUNITY_CHAT, + Pinned: true, + } + + encodedPayload, err := proto.Marshal(&pinMessage) + if err != nil { + m.logger.Error("failed to parse marshal pin message", zap.Error(err)) + return nil, discord.Warning(err.Error()) + } + + wrappedPayload, err := v1protocol.WrapMessageV1(encodedPayload, protobuf.ApplicationMetadataMessage_PIN_MESSAGE, community.PrivateKey()) + if err != nil { + m.logger.Error("failed to wrap pin message", zap.Error(err)) + return nil, discord.Warning(err.Error()) + } + + pinMessageToSave := &common.PinMessage{ + ID: types.EncodeHex(v1protocol.MessageID(&community.PrivateKey().PublicKey, wrappedPayload)), + PinMessage: &pinMessage, + LocalChatID: channelID, + From: message.From, + SigPubKey: message.SigPubKey, + WhisperTimestamp: message.WhisperTimestamp, + } + + return pinMessageToSave, nil +} + +func (m *Messenger) generateSystemPinnedMessage(pinMessage *common.PinMessage, channel *Chat, clockAndTimestamp uint64, pinnedMessageID string) (*common.Message, *discord.ImportError) { + id, err := generatePinMessageNotificationID(&m.identity.PublicKey, pinMessage, channel) + if err != nil { + m.logger.Warn("failed to generate pin message notification ID", + zap.String("PinMessageId", pinMessage.ID)) + return nil, discord.Warning(err.Error()) + } + systemMessage := &common.Message{ + ChatMessage: &protobuf.ChatMessage{ + Clock: pinMessage.Clock, + Timestamp: clockAndTimestamp, + ChatId: channel.ID, + MessageType: pinMessage.MessageType, + ResponseTo: pinnedMessageID, + ContentType: protobuf.ChatMessage_SYSTEM_MESSAGE_PINNED_MESSAGE, + }, + WhisperTimestamp: clockAndTimestamp, + ID: id, + LocalChatID: channel.ID, + From: pinMessage.From, + Seen: true, + } + + return systemMessage, nil +} + +func (m *Messenger) processDiscordMessages(discordChannel *discord.ExportedData, + channel *Chat, + importProgress *discord.ImportProgress, + progressUpdates chan *discord.ImportProgress, + fromDate int64, + community *communities.Community) ( + map[string]*common.Message, + []*common.PinMessage, + map[string]*protobuf.DiscordMessageAuthor, + []*protobuf.DiscordMessageAttachment) { + + messagesToSave := make(map[string]*common.Message, 0) + pinMessagesToSave := make([]*common.PinMessage, 0) + authorProfilesToSave := make(map[string]*protobuf.DiscordMessageAuthor, 0) + messageAttachmentsToDownload := make([]*protobuf.DiscordMessageAttachment, 0) + + for _, discordMessage := range discordChannel.Messages { + + timestamp, err := time.Parse(discordTimestampLayout, discordMessage.Timestamp) + if err != nil { + m.logger.Error("failed to parse discord message timestamp", zap.Error(err)) + importProgress.AddTaskError(discord.ImportMessagesTask, discord.Warning(err.Error())) + progressUpdates <- importProgress + continue + } + + if timestamp.Unix() < fromDate { + progressUpdates <- importProgress + continue + } + + importErr := m.saveDiscordAuthorIfNotExists(discordMessage.Author) + if importErr != nil { + importProgress.AddTaskError(discord.ImportMessagesTask, importErr) + progressUpdates <- importProgress + continue + } + + hasPayload, err := m.persistence.HasDiscordMessageAuthorImagePayload(discordMessage.Author.GetId()) + if err != nil { + m.logger.Error("failed to check if message avatar payload exists in database", zap.Error(err)) + importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) + progressUpdates <- importProgress + continue + } + + if !hasPayload { + authorProfilesToSave[discordMessage.Author.Id] = discordMessage.Author + } + + // Convert timestamp to unix timestamp + importErr = m.convertDiscordMessageTimeStamp(discordMessage, timestamp) + if importErr != nil { + importProgress.AddTaskError(discord.ImportMessagesTask, importErr) + progressUpdates <- importProgress + continue + } + + for i := range discordMessage.Attachments { + discordMessage.Attachments[i].MessageId = discordMessage.Id + } + messageAttachmentsToDownload = append(messageAttachmentsToDownload, discordMessage.Attachments...) + + clockAndTimestamp := uint64(timestamp.Unix()) * 1000 + communityPubKey := community.PrivateKey().PublicKey + + chatMessage := protobuf.ChatMessage{ + Timestamp: clockAndTimestamp, + MessageType: protobuf.MessageType_COMMUNITY_CHAT, + ContentType: protobuf.ChatMessage_DISCORD_MESSAGE, + Clock: clockAndTimestamp, + ChatId: channel.ID, + Payload: &protobuf.ChatMessage_DiscordMessage{ + DiscordMessage: discordMessage, + }, + } + + // Handle message replies + if discordMessage.Type == string(discord.MessageTypeReply) && discordMessage.Reference != nil { + repliedMessageID := community.IDString() + discordMessage.Reference.MessageId + if _, exists := messagesToSave[repliedMessageID]; exists { + chatMessage.ResponseTo = repliedMessageID + } + } + + messageToSave := &common.Message{ + ID: community.IDString() + discordMessage.Id, + WhisperTimestamp: clockAndTimestamp, + From: types.EncodeHex(crypto.FromECDSAPub(&communityPubKey)), + Seen: true, + LocalChatID: channel.ID, + SigPubKey: &communityPubKey, + CommunityID: community.IDString(), + ChatMessage: &chatMessage, + } + + err = messageToSave.PrepareContent(common.PubkeyToHex(&m.identity.PublicKey)) + if err != nil { + m.logger.Error("failed to prepare message content", zap.Error(err)) + importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) + progressUpdates <- importProgress + continue + } + + // Handle pin messages + if discordMessage.Type == string(discord.MessageTypeChannelPinned) && discordMessage.Reference != nil { + + pinnedMessageID := community.IDString() + discordMessage.Reference.MessageId + _, exists := messagesToSave[pinnedMessageID] + if exists { + pinMessageToSave, importErr := m.createPinMessageFromDiscordMessage(messageToSave, pinnedMessageID, channel.ID, community) + if importErr != nil { + importProgress.AddTaskError(discord.ImportMessagesTask, importErr) + progressUpdates <- importProgress + continue + } + + pinMessagesToSave = append(pinMessagesToSave, pinMessageToSave) + + // Generate SystemMessagePinnedMessage + systemMessage, importErr := m.generateSystemPinnedMessage(pinMessageToSave, channel, clockAndTimestamp, pinnedMessageID) + if importErr != nil { + importProgress.AddTaskError(discord.ImportMessagesTask, importErr) + progressUpdates <- importProgress + continue + } + + messagesToSave[systemMessage.ID] = systemMessage + } + } else { + messagesToSave[messageToSave.ID] = messageToSave + } + } + + return messagesToSave, pinMessagesToSave, authorProfilesToSave, messageAttachmentsToDownload +} + +func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordChannel) { + go func() { + totalImportChunkCount := len(request.FilesToImport) + + progressUpdates := make(chan *discord.ImportProgress) + + done := make(chan struct{}) + cancel := make(chan string) + + var newChat *Chat + + m.startPublishImportProgressInterval(progressUpdates, cancel, done) + + importProgress := &discord.ImportProgress{} + importProgress.Init(totalImportChunkCount, []discord.ImportTask{ + discord.ChannelsCreationTask, + discord.ImportMessagesTask, + discord.DownloadAssetsTask, + }) + + importProgress.CommunityID = request.DiscordChannelID + importProgress.CommunityName = request.Name + // initial progress immediately + + if err := request.Validate(); err != nil { + errmsg := fmt.Sprintf("Request validation failed: '%s'", err.Error()) + importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg)) + importProgress.StopTask(discord.ChannelsCreationTask) + progressUpdates <- importProgress + cancel <- request.DiscordChannelID + return + } + + // Here's 3 steps: Find the corrent channel in files, get the community and create the channel + progressValue := calculateProgress(3, 1, (float32(1) / 3)) + + m.publishImportProgress(importProgress) + + community, err := m.GetCommunityByID(request.CommunityID) + + if err != nil { + errmsg := fmt.Sprintf("Couldn't get the community '%s': '%s'", request.CommunityID, err.Error()) + importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg)) + importProgress.StopTask(discord.ChannelsCreationTask) + progressUpdates <- importProgress + cancel <- request.DiscordChannelID + return + } + + if community == nil { + errmsg := fmt.Sprintf("Couldn't get the community by id: '%s'", request.CommunityID) + importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg)) + importProgress.StopTask(discord.ChannelsCreationTask) + progressUpdates <- importProgress + cancel <- request.DiscordChannelID + return + } + + importProgress.UpdateTaskProgress(discord.ChannelsCreationTask, progressValue) + progressUpdates <- importProgress + + for i, importFile := range request.FilesToImport { + m.importingChannels[request.DiscordChannelID] = false + + exportData, errs := m.ExtractDiscordDataFromImportFiles([]string{importFile}) + if len(errs) > 0 { + for _, err := range errs { + importProgress.AddTaskError(discord.ChannelsCreationTask, err) + } + importProgress.StopTask(discord.ChannelsCreationTask) + progressUpdates <- importProgress + cancel <- request.DiscordChannelID + return + } + + var channel *discord.ExportedData + + for _, ch := range exportData.ExportedData { + if ch.Channel.ID == request.DiscordChannelID { + channel = ch + } + } + + if channel == nil { + if i < len(request.FilesToImport)-1 { + // skip this file + continue + } else if i == len(request.FilesToImport)-1 { + errmsg := fmt.Sprintf("Couldn't find the target channel id in files: '%s'", request.DiscordChannelID) + importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg)) + importProgress.StopTask(discord.ChannelsCreationTask) + progressUpdates <- importProgress + cancel <- request.DiscordChannelID + return + } + } + progressValue := calculateProgress(3, 2, (float32(2) / 3)) + + importProgress.UpdateTaskProgress(discord.ChannelsCreationTask, progressValue) + progressUpdates <- importProgress + + if len(channel.Channel.ID) == 0 { + // skip this file and try to find in the next file + continue + } + exists := false + + for _, chatID := range community.ChatIDs() { + if strings.HasSuffix(chatID, request.DiscordChannelID) { + exists = true + break + } + } + + if !exists { + + communityChat := &protobuf.CommunityChat{ + Permissions: &protobuf.CommunityPermissions{ + Access: protobuf.CommunityPermissions_NO_MEMBERSHIP, + }, + Identity: &protobuf.ChatIdentity{ + DisplayName: request.Name, + Emoji: request.Emoji, + Description: request.Description, + Color: request.Color, + }, + CategoryId: "", + } + + changes, err := m.communitiesManager.CreateChat(request.CommunityID, communityChat, false, channel.Channel.ID) + if err != nil { + errmsg := err.Error() + if errors.Is(err, communities.ErrInvalidCommunityDescriptionDuplicatedName) { + errmsg = fmt.Sprintf("Couldn't create channel '%s': %s", communityChat.Identity.DisplayName, err.Error()) + } + + importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg)) + importProgress.StopTask(discord.ChannelsCreationTask) + progressUpdates <- importProgress + cancel <- request.DiscordChannelID + return + } + + community = changes.Community + for chatID, chat := range changes.ChatsAdded { + newChat = CreateCommunityChat(request.CommunityID.String(), chatID, chat, m.getTimesource()) + } + + progressValue = calculateProgress(3, 3, (float32(3) / 3)) + + importProgress.UpdateTaskProgress(discord.ChannelsCreationTask, progressValue) + progressUpdates <- importProgress + } + + if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) { + importProgress.StopTask(discord.ImportMessagesTask) + progressUpdates <- importProgress + cancel <- request.DiscordChannelID + return + } + + messagesToSave, pinMessagesToSave, authorProfilesToSave, messageAttachmentsToDownload := + m.processDiscordMessages(channel, newChat, importProgress, progressUpdates, request.From, community) + + var discordMessages []*protobuf.DiscordMessage + for _, msg := range messagesToSave { + if msg.ChatMessage.ContentType == protobuf.ChatMessage_DISCORD_MESSAGE { + discordMessages = append(discordMessages, msg.GetDiscordMessage()) + } + } + + // We save these messages in chunks, so we don't block the database + // for a longer period of time + discordMessageChunks := chunkSlice(discordMessages, maxChunkSizeMessages) + chunksCount := len(discordMessageChunks) + + for ii, msgs := range discordMessageChunks { + m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord messages", ii+1, chunksCount, len(msgs))) + err := m.persistence.SaveDiscordMessages(msgs) + if err != nil { + m.cleanUpImportChannel(request.CommunityID.String(), newChat.ID) + importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) + importProgress.StopTask(discord.ImportMessagesTask) + progressUpdates <- importProgress + cancel <- request.DiscordChannelID + + return + } + + if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) { + importProgress.StopTask(discord.ImportMessagesTask) + progressUpdates <- importProgress + cancel <- request.DiscordChannelID + return + } + + // We're multiplying `chunksCount` by `0.25` so we leave 25% for additional save operations + // 0.5 are the previous 50% of progress + currentCount := ii + 1 + progressValue := calculateProgress(i+1, totalImportChunkCount, 0.5+(float32(currentCount)/float32(chunksCount))*0.25) + importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) + progressUpdates <- importProgress + + // We slow down the saving of message chunks to keep the database responsive + if currentCount < chunksCount { + time.Sleep(2 * time.Second) + } + } + + // Get slice of all values in `messagesToSave` map + var messages = make([]*common.Message, 0, len(messagesToSave)) + for _, msg := range messagesToSave { + messages = append(messages, msg) + } + + // Same as above, we save these messages in chunks so we don't block + // the database for a longer period of time + messageChunks := chunkSlice(messages, maxChunkSizeMessages) + chunksCount = len(messageChunks) + + for ii, msgs := range messageChunks { + m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d app messages", ii+1, chunksCount, len(msgs))) + err := m.persistence.SaveMessages(msgs) + if err != nil { + m.cleanUpImportChannel(request.CommunityID.String(), request.DiscordChannelID) + importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) + importProgress.StopTask(discord.ImportMessagesTask) + progressUpdates <- importProgress + cancel <- request.DiscordChannelID + + return + } + + if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) { + importProgress.StopTask(discord.ImportMessagesTask) + progressUpdates <- importProgress + cancel <- request.DiscordChannelID + return + } + + // 0.75 are the previous 75% of progress, hence we multiply our chunk progress + // by 0.25 + currentCount := ii + 1 + progressValue := calculateProgress(i+1, totalImportChunkCount, 0.75+(float32(currentCount)/float32(chunksCount))*0.25) + // progressValue := 0.75 + ((float32(currentCount) / float32(chunksCount)) * 0.25) + importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) + progressUpdates <- importProgress + + // We slow down the saving of message chunks to keep the database responsive + if currentCount < chunksCount { + time.Sleep(2 * time.Second) + } + } + + pinMessageChunks := chunkSlice(pinMessagesToSave, maxChunkSizeMessages) + for _, pinMsgs := range pinMessageChunks { + err := m.persistence.SavePinMessages(pinMsgs) + if err != nil { + m.cleanUpImportChannel(request.CommunityID.String(), request.DiscordChannelID) + importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) + importProgress.StopTask(discord.ImportMessagesTask) + progressUpdates <- importProgress + cancel <- request.DiscordChannelID + + return + } + + if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) { + importProgress.StopTask(discord.ImportMessagesTask) + progressUpdates <- importProgress + cancel <- request.DiscordChannelID + cancel <- request.DiscordChannelID + + return + } + } + + totalAssetsCount := len(messageAttachmentsToDownload) + len(authorProfilesToSave) + var assetCounter discord.AssetCounter + + var wg sync.WaitGroup + + for id, author := range authorProfilesToSave { + wg.Add(1) + go func(id string, author *protobuf.DiscordMessageAuthor) { + defer wg.Done() + + m.communitiesManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount)) + imagePayload, err := discord.DownloadAvatarAsset(author.AvatarUrl) + if err != nil { + errmsg := fmt.Sprintf("Couldn't download profile avatar '%s': %s", author.AvatarUrl, err.Error()) + importProgress.AddTaskError( + discord.DownloadAssetsTask, + discord.Warning(errmsg), + ) + progressUpdates <- importProgress + cancel <- request.DiscordChannelID + + return + } + + err = m.persistence.UpdateDiscordMessageAuthorImage(author.Id, imagePayload) + if err != nil { + importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Warning(err.Error())) + progressUpdates <- importProgress + cancel <- request.DiscordChannelID + + return + } + + author.AvatarImagePayload = imagePayload + authorProfilesToSave[id] = author + + if m.DiscordImportMarkedAsCancelled(request.DiscordChannelID) { + importProgress.StopTask(discord.DownloadAssetsTask) + progressUpdates <- importProgress + cancel <- request.DiscordChannelID + return + } + + assetCounter.Increase() + progressValue := calculateProgress(i+1, totalImportChunkCount, (float32(assetCounter.Value())/float32(totalAssetsCount))*0.5) + importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue) + progressUpdates <- importProgress + + }(id, author) + } + wg.Wait() + + if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) { + importProgress.StopTask(discord.DownloadAssetsTask) + progressUpdates <- importProgress + cancel <- request.DiscordChannelID + return + } + + for idxRange := range gopart.Partition(len(messageAttachmentsToDownload), 100) { + attachments := messageAttachmentsToDownload[idxRange.Low:idxRange.High] + wg.Add(1) + go func(attachments []*protobuf.DiscordMessageAttachment) { + defer wg.Done() + for ii, attachment := range attachments { + + m.communitiesManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount)) + + assetPayload, contentType, err := discord.DownloadAsset(attachment.Url) + if err != nil { + errmsg := fmt.Sprintf("Couldn't download message attachment '%s': %s", attachment.Url, err.Error()) + importProgress.AddTaskError( + discord.DownloadAssetsTask, + discord.Warning(errmsg), + ) + progressUpdates <- importProgress + continue + } + + attachment.Payload = assetPayload + attachment.ContentType = contentType + messageAttachmentsToDownload[ii] = attachment + + if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) { + importProgress.StopTask(discord.DownloadAssetsTask) + progressUpdates <- importProgress + cancel <- request.DiscordChannelID + return + } + + assetCounter.Increase() + progressValue := calculateProgress(i+1, totalImportChunkCount, (float32(assetCounter.Value())/float32(totalAssetsCount))*0.5) + importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue) + progressUpdates <- importProgress + } + }(attachments) + } + wg.Wait() + + if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) { + importProgress.StopTask(discord.DownloadAssetsTask) + progressUpdates <- importProgress + cancel <- request.DiscordChannelID + return + } + + attachmentChunks := chunkAttachmentsByByteSize(messageAttachmentsToDownload, maxChunkSizeBytes) + chunksCount = len(attachmentChunks) + + for ii, attachments := range attachmentChunks { + m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord message attachments", ii+1, chunksCount, len(attachments))) + err := m.persistence.SaveDiscordMessageAttachments(attachments) + if err != nil { + m.cleanUpImportChannel(request.CommunityID.String(), request.DiscordChannelID) + importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Error(err.Error())) + importProgress.Stop() + progressUpdates <- importProgress + cancel <- request.DiscordChannelID + + return + } + + if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) { + importProgress.StopTask(discord.DownloadAssetsTask) + progressUpdates <- importProgress + cancel <- request.DiscordChannelID + return + } + + // 0.5 are the previous 50% of progress, hence we multiply our chunk progress + // by 0.5 + currentCount := ii + 1 + progressValue := calculateProgress(i+1, totalImportChunkCount, 0.5+(float32(currentCount)/float32(chunksCount))*0.5) + importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue) + progressUpdates <- importProgress + + // We slow down the saving of attachment chunks to keep the database responsive + if currentCount < chunksCount { + time.Sleep(2 * time.Second) + } + } + + if len(attachmentChunks) == 0 { + progressValue := calculateProgress(i+1, totalImportChunkCount, 1.0) + importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue) + } + + _, err := m.transport.JoinPublic(newChat.ID) + if err != nil { + m.logger.Error("failed to load filter for chat", zap.Error(err)) + continue + } + + wakuChatMessages, err := m.chatMessagesToWakuMessages(messages, community) + if err != nil { + m.logger.Error("failed to convert chat messages into waku messages", zap.Error(err)) + continue + } + + wakuPinMessages, err := m.pinMessagesToWakuMessages(pinMessagesToSave, community) + if err != nil { + m.logger.Error("failed to convert pin messages into waku messages", zap.Error(err)) + continue + } + + wakuMessages := append(wakuChatMessages, wakuPinMessages...) + + topics, err := m.communitiesManager.GetCommunityChatsTopics(request.CommunityID) + if err != nil { + m.logger.Error("failed to get community chat topics", zap.Error(err)) + continue + } + + startDate := time.Unix(int64(exportData.OldestMessageTimestamp), 0) + endDate := time.Now() + + _, err = m.communitiesManager.CreateHistoryArchiveTorrentFromMessages( + request.CommunityID, + wakuMessages, + topics, + startDate, + endDate, + messageArchiveInterval, + community.Encrypted(), + ) + if err != nil { + m.logger.Error("failed to create history archive torrent", zap.Error(err)) + continue + } + communitySettings, err := m.communitiesManager.GetCommunitySettingsByID(request.CommunityID) + if err != nil { + m.logger.Error("Failed to get community settings", zap.Error(err)) + continue + } + if m.torrentClientReady() && communitySettings.HistoryArchiveSupportEnabled { + + err = m.communitiesManager.SeedHistoryArchiveTorrent(request.CommunityID) + if err != nil { + m.logger.Error("failed to seed history archive", zap.Error(err)) + } + go m.communitiesManager.StartHistoryArchiveTasksInterval(community, messageArchiveInterval) + } + } + if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) { + importProgress.StopTask(discord.InitCommunityTask) + progressUpdates <- importProgress + cancel <- request.DiscordChannelID + return + } + + // Chats need to be saved after the community has been published, + // hence we make this part of the `InitCommunityTask` + err = m.saveChat(newChat) + + if err != nil { + m.cleanUpImportChannel(request.CommunityID.String(), request.DiscordChannelID) + importProgress.AddTaskError(discord.InitCommunityTask, discord.Error(err.Error())) + importProgress.Stop() + progressUpdates <- importProgress + cancel <- request.DiscordChannelID + return + } + + m.config.messengerSignalsHandler.DiscordCommunityImportFinished(request.DiscordChannelID) + close(done) + }() +} + func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscordCommunity) { go func() { @@ -4661,11 +5395,20 @@ func (m *Messenger) MarkDiscordCommunityImportAsCancelled(communityID string) { m.importingCommunities[communityID] = true } +func (m *Messenger) MarkDiscordChannelImportAsCancelled(channelID string) { + m.importingChannels[channelID] = true +} + func (m *Messenger) DiscordImportMarkedAsCancelled(communityID string) bool { cancelled, exists := m.importingCommunities[communityID] return exists && cancelled } +func (m *Messenger) DiscordImportChannelMarkedAsCancelled(channelID string) bool { + cancelled, exists := m.importingChannels[channelID] + return exists && cancelled +} + func (m *Messenger) cleanUpImports() { for id := range m.importingCommunities { m.cleanUpImport(id) @@ -4688,6 +5431,26 @@ func (m *Messenger) cleanUpImport(communityID string) { } } +func (m *Messenger) cleanUpImportChannel(communityID string, channelID string) { + community, err := m.communitiesManager.GetByIDString(communityID) + if err != nil { + m.logger.Error("clean up failed, couldn't delete community", zap.Error(err)) + return + } + + _, err = community.DeleteChat(channelID) + if err != nil { + m.logger.Error("clean up failed, couldn't delete community chat", zap.Error(err)) + return + } + + err = m.persistence.DeleteMessagesByChatID(channelID) + if err != nil { + m.logger.Error("clean up failed, couldn't delete community chat messages", zap.Error(err)) + return + } +} + func (m *Messenger) publishImportProgress(progress *discord.ImportProgress) { m.config.messengerSignalsHandler.DiscordCommunityImportProgress(progress) } diff --git a/protocol/requests/import_discord_channel.go b/protocol/requests/import_discord_channel.go new file mode 100644 index 000000000..ac9ad308a --- /dev/null +++ b/protocol/requests/import_discord_channel.go @@ -0,0 +1,41 @@ +package requests + +import ( + "errors" + + "github.com/status-im/status-go/eth-node/types" +) + +// errrors +var ( + ErrImportDiscordChannelMissingFilesToImport = errors.New("import-discord-channel: missing files to import") + ErrImportDiscordChannelChannelIDIsEmpty = errors.New("import-discord-channel: discord channel id is empty") + ErrImportDiscordChannelCommunityIDIsEmpty = errors.New("import-discord-channel: community id is empty") +) + +type ImportDiscordChannel struct { + Name string `json:"name"` + DiscordChannelID string `json:"discordChannelID"` + CommunityID types.HexBytes `json:"communityId"` + Description string `json:"description"` + Color string `json:"color"` + Emoji string `json:"emoji"` + FilesToImport []string `json:"filesToImport"` + From int64 `json:"from"` +} + +func (r *ImportDiscordChannel) Validate() error { + if len(r.FilesToImport) == 0 { + return ErrImportDiscordChannelMissingFilesToImport + } + + if len(r.DiscordChannelID) == 0 { + return ErrImportDiscordChannelChannelIDIsEmpty + } + + if len(r.CommunityID) == 0 { + return ErrImportDiscordChannelCommunityIDIsEmpty + } + + return nil +} diff --git a/services/ext/api.go b/services/ext/api.go index 48a972ec9..bceba507a 100644 --- a/services/ext/api.go +++ b/services/ext/api.go @@ -1388,6 +1388,10 @@ func (api *PublicAPI) ExtractDiscordChannelsAndCategories(filesToImport []string return api.service.messenger.ExtractDiscordChannelsAndCategories(filesToImport) } +func (api *PublicAPI) RequestImportDiscordChannel(request *requests.ImportDiscordChannel) { + api.service.messenger.RequestImportDiscordChannel(request) +} + func (api *PublicAPI) RequestImportDiscordCommunity(request *requests.ImportDiscordCommunity) { api.service.messenger.RequestImportDiscordCommunity(request) } @@ -1396,6 +1400,10 @@ func (api *PublicAPI) RequestCancelDiscordCommunityImport(id string) { api.service.messenger.MarkDiscordCommunityImportAsCancelled(id) } +func (api *PublicAPI) RequestCancelDiscordChannelImport(discordChannelID string) { + api.service.messenger.MarkDiscordChannelImportAsCancelled(discordChannelID) +} + func (api *PublicAPI) BuildContact(request *requests.BuildContact) (*protocol.Contact, error) { return api.service.messenger.BuildContact(request) }