From f31e40264e5d225bc02c4a2e6c8a532c43f0ff8e Mon Sep 17 00:00:00 2001 From: Pascal Precht <445106+0x-r4bbit@users.noreply.github.com> Date: Thu, 26 Jan 2023 13:52:43 +0100 Subject: [PATCH] refactor(import-tool): process import data in chunks This commit refactors the discord import tool such that, instead of loading all data to be imported into memory at once, it will now perform the import on a per file basis. This improves the memory pressure for the node performing the import and seems to increase its performance as well. --- protocol/communities/manager.go | 10 +- protocol/communities/manager_test.go | 2 +- protocol/discord/types.go | 6 +- protocol/messenger_communities.go | 778 +++++++++--------- .../requests/create_community_category.go | 1 + 5 files changed, 417 insertions(+), 380 deletions(-) diff --git a/protocol/communities/manager.go b/protocol/communities/manager.go index 43c510dde..df4a1dbd5 100644 --- a/protocol/communities/manager.go +++ b/protocol/communities/manager.go @@ -504,7 +504,7 @@ func (m *Manager) ImportCommunity(key *ecdsa.PrivateKey) (*Community, error) { return community, nil } -func (m *Manager) CreateChat(communityID types.HexBytes, chat *protobuf.CommunityChat, publish bool) (*Community, *CommunityChanges, error) { +func (m *Manager) CreateChat(communityID types.HexBytes, chat *protobuf.CommunityChat, publish bool, thirdPartyID string) (*Community, *CommunityChanges, error) { community, err := m.GetByID(communityID) if err != nil { return nil, nil, err @@ -513,6 +513,10 @@ func (m *Manager) CreateChat(communityID types.HexBytes, chat *protobuf.Communit return nil, nil, ErrOrgNotFound } chatID := uuid.New().String() + if thirdPartyID != "" { + chatID = chatID + thirdPartyID + } + changes, err := community.CreateChat(chatID, chat) if err != nil { return nil, nil, err @@ -598,7 +602,11 @@ func (m *Manager) CreateCategory(request *requests.CreateCommunityCategory, publ if community == nil { return nil, nil, ErrOrgNotFound } + categoryID := uuid.New().String() + if request.ThirdPartyID != "" { + categoryID = categoryID + request.ThirdPartyID + } // Remove communityID prefix from chatID if exists for i, cid := range request.ChatIDs { diff --git a/protocol/communities/manager_test.go b/protocol/communities/manager_test.go index 82161dde9..862f8cab6 100644 --- a/protocol/communities/manager_test.go +++ b/protocol/communities/manager_test.go @@ -707,7 +707,7 @@ func (s *ManagerSuite) buildCommunityWithChat() (*Community, string, error) { }, Members: make(map[string]*protobuf.CommunityMember), } - _, changes, err := s.manager.CreateChat(community.ID(), chat, true) + _, changes, err := s.manager.CreateChat(community.ID(), chat, true, "") if err != nil { return nil, "", err } diff --git a/protocol/discord/types.go b/protocol/discord/types.go index 075cde3cb..48df0194a 100644 --- a/protocol/discord/types.go +++ b/protocol/discord/types.go @@ -160,10 +160,12 @@ type ImportProgress struct { ErrorsCount uint `json:"errorsCount"` WarningsCount uint `json:"warningsCount"` Stopped bool `json:"stopped"` + TotalChunkCount int `json:"totalChunksCount,omitempty"` + CurrentChunk int `json:"currentChunk,omitempty"` m sync.Mutex } -func (progress *ImportProgress) Init(tasks []ImportTask) { +func (progress *ImportProgress) Init(totalChunkCount int, tasks []ImportTask) { progress.Progress = 0 progress.Tasks = make([]*ImportTaskProgress, 0) for _, task := range tasks { @@ -180,6 +182,8 @@ func (progress *ImportProgress) Init(tasks []ImportTask) { progress.ErrorsCount = 0 progress.WarningsCount = 0 progress.Stopped = false + progress.TotalChunkCount = totalChunkCount + progress.CurrentChunk = 0 } func (progress *ImportProgress) Stop() { diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index a79c80fa6..97d6a47e8 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -904,7 +904,7 @@ func (m *Messenger) CreateCommunityChat(communityID types.HexBytes, c *protobuf. var response MessengerResponse c.Identity.FirstMessageTimestamp = FirstMessageTimestampNoMessage - community, changes, err := m.communitiesManager.CreateChat(communityID, c, true) + community, changes, err := m.communitiesManager.CreateChat(communityID, c, true, "") if err != nil { return nil, err } @@ -2339,13 +2339,15 @@ func (m *Messenger) RequestExtractDiscordChannelsAndCategories(filesToImport []s func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscordCommunity) { go func() { + totalImportChunkCount := len(request.FilesToImport) + progressUpdates := make(chan *discord.ImportProgress) done := make(chan struct{}) cancel := make(chan string) m.startPublishImportProgressInterval(progressUpdates, cancel, done) importProgress := &discord.ImportProgress{} - importProgress.Init([]discord.ImportTask{ + importProgress.Init(totalImportChunkCount, []discord.ImportTask{ discord.CommunityCreationTask, discord.ChannelsCreationTask, discord.ImportMessagesTask, @@ -2357,31 +2359,6 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor // initial progress immediately m.publishImportProgress(importProgress) - exportData, errs := m.ExtractDiscordDataFromImportFiles(request.FilesToImport) - if len(errs) > 0 { - for _, err := range errs { - importProgress.AddTaskError(discord.CommunityCreationTask, err) - } - progressUpdates <- importProgress - return - } - totalChannelsCount := len(exportData.ExportedData) - totalMessageCount := exportData.MessageCount - - if totalChannelsCount == 0 || totalMessageCount == 0 { - importError := discord.Error(discord.ErrNoChannelData.Error()) - if totalMessageCount == 0 { - importError.Message = discord.ErrNoMessageData.Error() - } - importProgress.AddTaskError(discord.CommunityCreationTask, importError) - importProgress.StopTask(discord.CommunityCreationTask) - progressUpdates <- importProgress - return - } - - importProgress.UpdateTaskProgress(discord.CommunityCreationTask, 0.5) - progressUpdates <- importProgress - createCommunityRequest := request.ToCreateCommunityRequest() // We're calling `CreateCommunity` on `communitiesManager` directly, instead of @@ -2434,7 +2411,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor importProgress.CommunityImages[t] = images.IdentityImage{Name: t, Payload: i.Payload} } - importProgress.UpdateTaskProgress(discord.CommunityCreationTask, 0.75) + importProgress.UpdateTaskProgress(discord.CommunityCreationTask, 1) progressUpdates <- importProgress if m.DiscordImportMarkedAsCancelled(communityID) { @@ -2444,113 +2421,162 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor return } - //This is a map of discord category IDs <-> Status category IDs - processedCategoriesIds := make(map[string]string, 0) - totalCategoriesCount := len(exportData.Categories) - - for _, category := range exportData.Categories { - - createCommunityCategoryRequest := &requests.CreateCommunityCategory{ - CommunityID: discordCommunity.ID(), - CategoryName: category.Name, - ChatIDs: make([]string, 0), - } - // We call `CreateCategory` on `communitiesManager` directly so we can control - // whether or not the community update should be published (it should not until the - // import has finished) - communityWithCategories, changes, err := m.communitiesManager.CreateCategory(createCommunityCategoryRequest, false) - if err != nil { - m.cleanUpImport(communityID) - importProgress.AddTaskError(discord.CommunityCreationTask, discord.Error(err.Error())) - importProgress.StopTask(discord.CommunityCreationTask) - progressUpdates <- importProgress - return - } - discordCommunity = communityWithCategories - // This looks like we keep overriding the same field but there's - // only one `CategoriesAdded` change at this point. - for _, addedCategory := range changes.CategoriesAdded { - processedCategoriesIds[category.ID] = addedCategory.CategoryId - } - - // We're multiplying `progressValue` by 0.25 as it's added to the previous 0.75 progress - progressValue := (float32(len(processedCategoriesIds)) / float32(totalCategoriesCount)) * 0.25 - importProgress.UpdateTaskProgress(discord.CommunityCreationTask, 0.75+progressValue) - - progressUpdates <- importProgress - } - - if m.DiscordImportMarkedAsCancelled(communityID) { - importProgress.StopTask(discord.CommunityCreationTask) - progressUpdates <- importProgress - cancel <- communityID - return - } - var chatsToSave []*Chat processedChannelIds := make(map[string]string, 0) - messagesToSave := make(map[string]*common.Message, 0) - pinMessagesToSave := make([]*common.PinMessage, 0) - authorProfilesToSave := make(map[string]*protobuf.DiscordMessageAuthor, 0) - messageAttachmentsToDownload := make([]*protobuf.DiscordMessageAttachment, 0) + processedCategoriesIds := make(map[string]string, 0) - for _, channel := range exportData.ExportedData { + for i, importFile := range request.FilesToImport { - communityChat := &protobuf.CommunityChat{ - Permissions: &protobuf.CommunityPermissions{ - Access: protobuf.CommunityPermissions_NO_MEMBERSHIP, - }, - Identity: &protobuf.ChatIdentity{ - DisplayName: channel.Channel.Name, - Emoji: "", - Description: channel.Channel.Description, - Color: discordCommunity.Color(), - }, - CategoryId: processedCategoriesIds[channel.Channel.CategoryID], - } - - // We call `CreateChat` on `communitiesManager` directly to get more control - // over whether we want to publish the updated community description. - communityWithChats, changes, err := m.communitiesManager.CreateChat(discordCommunity.ID(), communityChat, false) - if err != nil { - m.cleanUpImport(communityID) - errmsg := err.Error() - if _errors.Is(err, communities.ErrInvalidCommunityDescriptionDuplicatedName) { - errmsg = fmt.Sprintf("Couldn't create channel '%s': %s", communityChat.Identity.DisplayName, err.Error()) + exportData, errs := m.ExtractDiscordDataFromImportFiles([]string{importFile}) + if len(errs) > 0 { + for _, err := range errs { + importProgress.AddTaskError(discord.CommunityCreationTask, err) } - importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg)) - importProgress.StopTask(discord.ChannelsCreationTask) progressUpdates <- importProgress return } - discordCommunity = communityWithChats + totalChannelsCount := len(exportData.ExportedData) + totalMessageCount := exportData.MessageCount - // This looks like we keep overriding the chat id value - // as we iterate over `ChatsAdded`, however at this point we - // know there was only a single such change (and it's a map) - for chatID, chat := range changes.ChatsAdded { - c := CreateCommunityChat(communityID, chatID, chat, m.getTimesource()) - chatsToSave = append(chatsToSave, c) - processedChannelIds[channel.Channel.ID] = c.ID + if totalChannelsCount == 0 || totalMessageCount == 0 { + importError := discord.Error(fmt.Errorf("No channel to import messages from in file: %s", importFile).Error()) + if totalMessageCount == 0 { + importError.Message = fmt.Errorf("No messages to import in file: %s", importFile).Error() + } + importProgress.AddTaskError(discord.ChannelsCreationTask, importError) + progressUpdates <- importProgress + continue } - progressValue := float32(len(processedChannelIds)) / float32(totalChannelsCount) + importProgress.CurrentChunk = i + 1 + + // We actually only ever receive a single category + // from `exportData` but since it's a map, we still have to + // iterate over it to access its values + for _, category := range exportData.Categories { + + categories := discordCommunity.Categories() + exists := false + for catID := range categories { + if strings.HasSuffix(catID, category.ID) { + exists = true + break + } + } + + if !exists { + createCommunityCategoryRequest := &requests.CreateCommunityCategory{ + CommunityID: discordCommunity.ID(), + CategoryName: category.Name, + ThirdPartyID: category.ID, + ChatIDs: make([]string, 0), + } + // We call `CreateCategory` on `communitiesManager` directly so we can control + // whether or not the community update should be published (it should not until the + // import has finished) + communityWithCategories, changes, err := m.communitiesManager.CreateCategory(createCommunityCategoryRequest, false) + if err != nil { + m.cleanUpImport(communityID) + importProgress.AddTaskError(discord.CommunityCreationTask, discord.Error(err.Error())) + importProgress.StopTask(discord.CommunityCreationTask) + progressUpdates <- importProgress + return + } + discordCommunity = communityWithCategories + // This looks like we keep overriding the same field but there's + // only one `CategoriesAdded` change at this point. + for _, addedCategory := range changes.CategoriesAdded { + processedCategoriesIds[category.ID] = addedCategory.CategoryId + } + } + } + + progressValue := calculateProgress(i+1, totalImportChunkCount, (float32(1) / 2)) + importProgress.UpdateTaskProgress(discord.ChannelsCreationTask, progressValue) + + progressUpdates <- importProgress + + if m.DiscordImportMarkedAsCancelled(communityID) { + importProgress.StopTask(discord.CommunityCreationTask) + progressUpdates <- importProgress + cancel <- communityID + return + } + + messagesToSave := make(map[string]*common.Message, 0) + pinMessagesToSave := make([]*common.PinMessage, 0) + authorProfilesToSave := make(map[string]*protobuf.DiscordMessageAuthor, 0) + messageAttachmentsToDownload := make([]*protobuf.DiscordMessageAttachment, 0) + + // Save to access the first item here as we process + // exported data by files which only holds a single channel + channel := exportData.ExportedData[0] + chatIDs := discordCommunity.ChatIDs() + + exists := false + for _, chatID := range chatIDs { + if strings.HasSuffix(chatID, channel.Channel.ID) { + exists = true + break + } + } + + if !exists { + communityChat := &protobuf.CommunityChat{ + Permissions: &protobuf.CommunityPermissions{ + Access: protobuf.CommunityPermissions_NO_MEMBERSHIP, + }, + Identity: &protobuf.ChatIdentity{ + DisplayName: channel.Channel.Name, + Emoji: "", + Description: channel.Channel.Description, + Color: discordCommunity.Color(), + }, + CategoryId: processedCategoriesIds[channel.Channel.CategoryID], + } + + // We call `CreateChat` on `communitiesManager` directly to get more control + // over whether we want to publish the updated community description. + communityWithChats, changes, err := m.communitiesManager.CreateChat(discordCommunity.ID(), communityChat, false, channel.Channel.ID) + if err != nil { + m.cleanUpImport(communityID) + errmsg := err.Error() + if _errors.Is(err, communities.ErrInvalidCommunityDescriptionDuplicatedName) { + errmsg = fmt.Sprintf("Couldn't create channel '%s': %s", communityChat.Identity.DisplayName, err.Error()) + } + importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg)) + importProgress.StopTask(discord.ChannelsCreationTask) + progressUpdates <- importProgress + return + } + discordCommunity = communityWithChats + + // This looks like we keep overriding the chat id value + // as we iterate over `ChatsAdded`, however at this point we + // know there was only a single such change (and it's a map) + for chatID, chat := range changes.ChatsAdded { + c := CreateCommunityChat(communityID, chatID, chat, m.getTimesource()) + chatsToSave = append(chatsToSave, c) + processedChannelIds[channel.Channel.ID] = c.ID + } + } + + progressValue = calculateProgress(i+1, totalImportChunkCount, 1) importProgress.UpdateTaskProgress(discord.ChannelsCreationTask, progressValue) progressUpdates <- importProgress - for _, discordMessage := range channel.Messages { - - progressValue := float32(len(messagesToSave)) / float32(totalMessageCount) + for ii, discordMessage := range channel.Messages { timestamp, err := time.Parse(discordTimestampLayout, discordMessage.Timestamp) if err != nil { m.logger.Error("failed to parse discord message timestamp", zap.Error(err)) importProgress.AddTaskError(discord.ImportMessagesTask, discord.Warning(err.Error())) - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) + progressUpdates <- importProgress continue } if timestamp.Unix() < request.From { + progressUpdates <- importProgress continue } @@ -2558,7 +2584,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor if err != nil { m.logger.Error("failed to check if message author exists in database", zap.Error(err)) importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) + progressUpdates <- importProgress continue } @@ -2566,7 +2592,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor err := m.persistence.SaveDiscordMessageAuthor(discordMessage.Author) if err != nil { importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) + progressUpdates <- importProgress continue } } @@ -2575,7 +2601,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor if err != nil { m.logger.Error("failed to check if message avatar payload exists in database", zap.Error(err)) importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) + progressUpdates <- importProgress continue } @@ -2591,7 +2617,6 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor if err != nil { m.logger.Error("failed to parse discord message timestamp", zap.Error(err)) importProgress.AddTaskError(discord.ImportMessagesTask, discord.Warning(err.Error())) - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) progressUpdates <- importProgress continue } @@ -2641,7 +2666,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor if err != nil { m.logger.Error("failed to prepare message content", zap.Error(err)) importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) + progressUpdates <- importProgress continue } @@ -2662,7 +2687,6 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor if err != nil { m.logger.Error("failed to parse marshal pin message", zap.Error(err)) importProgress.AddTaskError(discord.ImportMessagesTask, discord.Warning(err.Error())) - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) progressUpdates <- importProgress continue } @@ -2671,7 +2695,6 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor if err != nil { m.logger.Error("failed to wrap pin message", zap.Error(err)) importProgress.AddTaskError(discord.ImportMessagesTask, discord.Warning(err.Error())) - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) progressUpdates <- importProgress continue } @@ -2692,44 +2715,10 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor } else { messagesToSave[communityID+discordMessage.Id] = messageToSave } - } - // We're multiplying `progressValue` by `0.5` so we leave 50% for actual save operations - // The 0.5 could be calculated but we'd need to know the total message chunks count, - // which we don't at this point - progressValue = float32(len(messagesToSave)) / float32(totalMessageCount) * 0.5 - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) - progressUpdates <- importProgress - if m.DiscordImportMarkedAsCancelled(communityID) { - importProgress.StopTask(discord.ImportMessagesTask) + progressValue := calculateProgress(i+1, totalImportChunkCount, float32(ii+1)/float32(len(channel.Messages))*0.5) + importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) progressUpdates <- importProgress - cancel <- communityID - return - } - } - - var discordMessages []*protobuf.DiscordMessage - for _, msg := range messagesToSave { - discordMessages = append(discordMessages, msg.GetDiscordMessage()) - } - - // We save these messages in chunks so we don't block the database - // for a longer period of time - discordMessageChunks := chunkSlice(discordMessages, maxChunkSizeMessages) - chunksCount := len(discordMessageChunks) - - // Signal to clients that save operations are starting - importProgress.UpdateTaskState(discord.ImportMessagesTask, discord.TaskStateSaving) - progressUpdates <- importProgress - - for i, msgs := range discordMessageChunks { - err = m.persistence.SaveDiscordMessages(msgs) - if err != nil { - m.cleanUpImport(communityID) - importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) - importProgress.StopTask(discord.ImportMessagesTask) - progressUpdates <- importProgress - return } if m.DiscordImportMarkedAsCancelled(communityID) { @@ -2739,198 +2728,155 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor return } - // We're multiplying `chunksCount` by `0.25` so we leave 25% for additional save operations - // 0.5 are the previous 50% of progress - currentCount := i + 1 - progressValue := 0.5 + (float32(currentCount) / float32(chunksCount) * 0.25) - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) - - // We slow down the saving of message chunks to keep the database responsive - if currentCount < chunksCount { - time.Sleep(2 * time.Second) - } - } - - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, 0.75) - - var messages []*common.Message - for _, msg := range messagesToSave { - messages = append(messages, msg) - } - - // Same as above, we save these messages in chunks so we don't block - // the database for a longer period of time - messageChunks := chunkSlice(messages, maxChunkSizeMessages) - chunksCount = len(messageChunks) - - for i, msgs := range messageChunks { - err = m.persistence.SaveMessages(msgs) - if err != nil { - m.cleanUpImport(communityID) - importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) - importProgress.StopTask(discord.ImportMessagesTask) - progressUpdates <- importProgress - return + var discordMessages []*protobuf.DiscordMessage + for _, msg := range messagesToSave { + discordMessages = append(discordMessages, msg.GetDiscordMessage()) } - if m.DiscordImportMarkedAsCancelled(communityID) { - importProgress.StopTask(discord.ImportMessagesTask) - progressUpdates <- importProgress - cancel <- communityID - return - } + // We save these messages in chunks so we don't block the database + // for a longer period of time + discordMessageChunks := chunkSlice(discordMessages, maxChunkSizeMessages) + chunksCount := len(discordMessageChunks) - // 0.75 are the previous 75% of progress, hence we multiply our chunk progress - // by 0.25 - currentCount := i + 1 - progressValue := 0.75 + ((float32(currentCount) / float32(chunksCount)) * 0.25) - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) - - // We slow down the saving of message chunks to keep the database responsive - if currentCount < chunksCount { - time.Sleep(2 * time.Second) - } - } - - pinMessageChunks := chunkSlice(pinMessagesToSave, maxChunkSizeMessages) - for _, pinMsgs := range pinMessageChunks { - err = m.persistence.SavePinMessages(pinMsgs) - if err != nil { - m.cleanUpImport(communityID) - importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) - importProgress.StopTask(discord.ImportMessagesTask) - progressUpdates <- importProgress - return - } - - if m.DiscordImportMarkedAsCancelled(communityID) { - importProgress.StopTask(discord.ImportMessagesTask) - progressUpdates <- importProgress - cancel <- communityID - return - } - } - - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, 1) - progressUpdates <- importProgress - - totalAssetsCount := len(messageAttachmentsToDownload) + len(authorProfilesToSave) - var assetCounter discord.AssetCounter - - var wg sync.WaitGroup - - for id, author := range authorProfilesToSave { - wg.Add(1) - go func(id string, author *protobuf.DiscordMessageAuthor) { - defer wg.Done() - imagePayload, err := discord.DownloadAvatarAsset(author.AvatarUrl) + for ii, msgs := range discordMessageChunks { + m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord messages", ii+1, chunksCount, len(msgs))) + err = m.persistence.SaveDiscordMessages(msgs) if err != nil { - errmsg := fmt.Sprintf("Couldn't download profile avatar '%s': %s", author.AvatarUrl, err.Error()) - importProgress.AddTaskError( - discord.DownloadAssetsTask, - discord.Warning(errmsg), - ) + m.cleanUpImport(communityID) + importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) + importProgress.StopTask(discord.ImportMessagesTask) progressUpdates <- importProgress return } - err = m.persistence.UpdateDiscordMessageAuthorImage(author.Id, imagePayload) - if err != nil { - importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Warning(err.Error())) + if m.DiscordImportMarkedAsCancelled(communityID) { + importProgress.StopTask(discord.ImportMessagesTask) progressUpdates <- importProgress + cancel <- communityID return } - author.AvatarImagePayload = imagePayload - authorProfilesToSave[id] = author - - assetCounter.Increase() - progressValue := float32(assetCounter.Value()) / float32(totalAssetsCount) - importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue) + // We're multiplying `chunksCount` by `0.25` so we leave 25% for additional save operations + // 0.5 are the previous 50% of progress + currentCount := ii + 1 + progressValue := calculateProgress(i+1, totalImportChunkCount, 0.5+(float32(currentCount)/float32(chunksCount))*0.25) + importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) progressUpdates <- importProgress - if m.DiscordImportMarkedAsCancelled(discordCommunity.IDString()) { - importProgress.StopTask(discord.DownloadAssetsTask) + // We slow down the saving of message chunks to keep the database responsive + if currentCount < chunksCount { + time.Sleep(2 * time.Second) + } + } + + var messages []*common.Message + for _, msg := range messagesToSave { + messages = append(messages, msg) + } + + // Same as above, we save these messages in chunks so we don't block + // the database for a longer period of time + messageChunks := chunkSlice(messages, maxChunkSizeMessages) + chunksCount = len(messageChunks) + + for ii, msgs := range messageChunks { + m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d app messages", ii+1, chunksCount, len(msgs))) + err = m.persistence.SaveMessages(msgs) + if err != nil { + m.cleanUpImport(communityID) + importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) + importProgress.StopTask(discord.ImportMessagesTask) progressUpdates <- importProgress - cancel <- discordCommunity.IDString() return } - }(id, author) - } - wg.Wait() - if m.DiscordImportMarkedAsCancelled(communityID) { - importProgress.StopTask(discord.DownloadAssetsTask) - progressUpdates <- importProgress - cancel <- communityID - return - } + if m.DiscordImportMarkedAsCancelled(communityID) { + importProgress.StopTask(discord.ImportMessagesTask) + progressUpdates <- importProgress + cancel <- communityID + return + } - for idxRange := range gopart.Partition(len(messageAttachmentsToDownload), 100) { - attachments := messageAttachmentsToDownload[idxRange.Low:idxRange.High] - wg.Add(1) - go func(attachments []*protobuf.DiscordMessageAttachment) { - defer wg.Done() - for i, attachment := range attachments { - assetPayload, contentType, err := discord.DownloadAsset(attachment.Url) + // 0.75 are the previous 75% of progress, hence we multiply our chunk progress + // by 0.25 + currentCount := ii + 1 + progressValue := calculateProgress(i+1, totalImportChunkCount, 0.75+(float32(currentCount)/float32(chunksCount))*0.25) + // progressValue := 0.75 + ((float32(currentCount) / float32(chunksCount)) * 0.25) + importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) + progressUpdates <- importProgress + + // We slow down the saving of message chunks to keep the database responsive + if currentCount < chunksCount { + time.Sleep(2 * time.Second) + } + } + + pinMessageChunks := chunkSlice(pinMessagesToSave, maxChunkSizeMessages) + for _, pinMsgs := range pinMessageChunks { + err = m.persistence.SavePinMessages(pinMsgs) + if err != nil { + m.cleanUpImport(communityID) + importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) + importProgress.StopTask(discord.ImportMessagesTask) + progressUpdates <- importProgress + return + } + + if m.DiscordImportMarkedAsCancelled(communityID) { + importProgress.StopTask(discord.ImportMessagesTask) + progressUpdates <- importProgress + cancel <- communityID + return + } + } + + totalAssetsCount := len(messageAttachmentsToDownload) + len(authorProfilesToSave) + var assetCounter discord.AssetCounter + + var wg sync.WaitGroup + + for id, author := range authorProfilesToSave { + wg.Add(1) + go func(id string, author *protobuf.DiscordMessageAuthor) { + defer wg.Done() + + m.communitiesManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount)) + imagePayload, err := discord.DownloadAvatarAsset(author.AvatarUrl) if err != nil { - errmsg := fmt.Sprintf("Couldn't download message attachment '%s': %s", attachment.Url, err.Error()) + errmsg := fmt.Sprintf("Couldn't download profile avatar '%s': %s", author.AvatarUrl, err.Error()) importProgress.AddTaskError( discord.DownloadAssetsTask, discord.Warning(errmsg), ) progressUpdates <- importProgress - continue + return } - attachment.Payload = assetPayload - attachment.ContentType = contentType - messageAttachmentsToDownload[i] = attachment + err = m.persistence.UpdateDiscordMessageAuthorImage(author.Id, imagePayload) + if err != nil { + importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Warning(err.Error())) + progressUpdates <- importProgress + return + } + + author.AvatarImagePayload = imagePayload + authorProfilesToSave[id] = author + + if m.DiscordImportMarkedAsCancelled(discordCommunity.IDString()) { + importProgress.StopTask(discord.DownloadAssetsTask) + progressUpdates <- importProgress + cancel <- discordCommunity.IDString() + return + } assetCounter.Increase() - - // Multiplying progress by `0.5` to leave 50% for saving assets to DB - // similar to how it's done for messages - progressValue := (float32(assetCounter.Value()) / float32(totalAssetsCount)) * 0.5 + progressValue := calculateProgress(i+1, totalImportChunkCount, (float32(assetCounter.Value())/float32(totalAssetsCount))*0.25) importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue) progressUpdates <- importProgress - if m.DiscordImportMarkedAsCancelled(communityID) { - importProgress.StopTask(discord.DownloadAssetsTask) - progressUpdates <- importProgress - cancel <- communityID - return - } - } - }(attachments) - } - wg.Wait() - - if m.DiscordImportMarkedAsCancelled(communityID) { - importProgress.StopTask(discord.DownloadAssetsTask) - progressUpdates <- importProgress - cancel <- communityID - return - } - - // Signal to the client that save operations are starting - importProgress.UpdateTaskState(discord.DownloadAssetsTask, discord.TaskStateSaving) - progressUpdates <- importProgress - - // We chunk message attachments by `maxChunkSizeBytes` to ensure individual - // save operations don't take too long and block the database - attachmentChunks := chunkAttachmentsByByteSize(messageAttachmentsToDownload, maxChunkSizeBytes) - chunksCount = len(attachmentChunks) - - for i, attachments := range attachmentChunks { - err = m.persistence.SaveDiscordMessageAttachments(attachments) - if err != nil { - m.cleanUpImport(communityID) - importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Error(err.Error())) - importProgress.Stop() - progressUpdates <- importProgress - return + }(id, author) } + wg.Wait() if m.DiscordImportMarkedAsCancelled(communityID) { importProgress.StopTask(discord.DownloadAssetsTask) @@ -2939,21 +2885,140 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor return } - // 0.5 are the previous 50% of progress, hence we multiply our chunk progress - // by 0.5 - currentCount := i + 1 - progressValue := 0.5 + ((float32(currentCount) / float32(chunksCount)) * 0.5) - importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue) + for idxRange := range gopart.Partition(len(messageAttachmentsToDownload), 100) { + attachments := messageAttachmentsToDownload[idxRange.Low:idxRange.High] + wg.Add(1) + go func(attachments []*protobuf.DiscordMessageAttachment) { + defer wg.Done() + for ii, attachment := range attachments { - // We slow down the saving of attachment chunks to keep the database responsive - if currentCount < chunksCount { - time.Sleep(2 * time.Second) + m.communitiesManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount)) + + assetPayload, contentType, err := discord.DownloadAsset(attachment.Url) + if err != nil { + errmsg := fmt.Sprintf("Couldn't download message attachment '%s': %s", attachment.Url, err.Error()) + importProgress.AddTaskError( + discord.DownloadAssetsTask, + discord.Warning(errmsg), + ) + progressUpdates <- importProgress + continue + } + + attachment.Payload = assetPayload + attachment.ContentType = contentType + messageAttachmentsToDownload[ii] = attachment + + if m.DiscordImportMarkedAsCancelled(communityID) { + importProgress.StopTask(discord.DownloadAssetsTask) + progressUpdates <- importProgress + cancel <- communityID + return + } + + assetCounter.Increase() + progressValue := calculateProgress(i+1, totalImportChunkCount, 0.25+(float32(assetCounter.Value())/float32(totalAssetsCount))*0.25) + importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue) + progressUpdates <- importProgress + } + }(attachments) + } + wg.Wait() + + if m.DiscordImportMarkedAsCancelled(communityID) { + importProgress.StopTask(discord.DownloadAssetsTask) + progressUpdates <- importProgress + cancel <- communityID + return + } + + attachmentChunks := chunkAttachmentsByByteSize(messageAttachmentsToDownload, maxChunkSizeBytes) + chunksCount = len(attachmentChunks) + + for ii, attachments := range attachmentChunks { + m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord message attachments", ii+1, chunksCount, len(attachments))) + err = m.persistence.SaveDiscordMessageAttachments(attachments) + if err != nil { + m.cleanUpImport(communityID) + importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Error(err.Error())) + importProgress.Stop() + progressUpdates <- importProgress + return + } + + if m.DiscordImportMarkedAsCancelled(communityID) { + importProgress.StopTask(discord.DownloadAssetsTask) + progressUpdates <- importProgress + cancel <- communityID + return + } + + // 0.5 are the previous 50% of progress, hence we multiply our chunk progress + // by 0.5 + currentCount := ii + 1 + progressValue := calculateProgress(i+1, totalImportChunkCount, 0.5+(float32(currentCount)/float32(chunksCount))*0.5) + importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue) + progressUpdates <- importProgress + + // We slow down the saving of attachment chunks to keep the database responsive + if currentCount < chunksCount { + time.Sleep(2 * time.Second) + } + } + + _, err := m.transport.JoinPublic(processedChannelIds[channel.Channel.ID]) + if err != nil { + m.logger.Error("failed to load filter for chat", zap.Error(err)) + continue + } + + wakuChatMessages, err := m.chatMessagesToWakuMessages(messages, discordCommunity) + if err != nil { + m.logger.Error("failed to convert chat messages into waku messages", zap.Error(err)) + continue + } + + wakuPinMessages, err := m.pinMessagesToWakuMessages(pinMessagesToSave, discordCommunity) + if err != nil { + m.logger.Error("failed to convert pin messages into waku messages", zap.Error(err)) + continue + } + + wakuMessages := append(wakuChatMessages, wakuPinMessages...) + + topics, err := m.communitiesManager.GetCommunityChatsTopics(discordCommunity.ID()) + if err != nil { + m.logger.Error("failed to get community chat topics", zap.Error(err)) + continue + } + + startDate := time.Unix(int64(exportData.OldestMessageTimestamp), 0) + endDate := time.Now() + + _, err = m.communitiesManager.CreateHistoryArchiveTorrentFromMessages( + discordCommunity.ID(), + wakuMessages, + topics, + startDate, + endDate, + messageArchiveInterval, + discordCommunity.Encrypted(), + ) + if err != nil { + m.logger.Error("failed to create history archive torrent", zap.Error(err)) + continue + } + + if m.torrentClientReady() && communitySettings.HistoryArchiveSupportEnabled { + + err = m.communitiesManager.SeedHistoryArchiveTorrent(discordCommunity.ID()) + if err != nil { + m.logger.Error("failed to seed history archive", zap.Error(err)) + } + go m.communitiesManager.StartHistoryArchiveTasksInterval(discordCommunity, messageArchiveInterval) } } - importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, 1) - progressUpdates <- importProgress - err = m.publishOrg(discordCommunity) if err != nil { m.cleanUpImport(communityID) @@ -2980,6 +3045,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor progressUpdates <- importProgress return } + importProgress.UpdateTaskProgress(discord.InitCommunityTask, 0.15) progressUpdates <- importProgress @@ -3009,12 +3075,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor return } - filterChatIds := discordCommunity.DefaultFilters() - for _, chatID := range processedChannelIds { - filterChatIds = append(filterChatIds, chatID) - } - - filters, err := m.transport.InitPublicFilters(filterChatIds) + _, err = m.transport.InitPublicFilters(discordCommunity.DefaultFilters()) if err != nil { m.cleanUpImport(communityID) importProgress.AddTaskError(discord.InitCommunityTask, discord.Error(err.Error())) @@ -3033,6 +3094,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor return } + filters := m.transport.Filters() _, err = m.scheduleSyncFilters(filters) if err != nil { m.cleanUpImport(communityID) @@ -3071,55 +3133,17 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor m.config.messengerSignalsHandler.DiscordCommunityImportFinished(communityID) close(done) - - wakuChatMessages, err := m.chatMessagesToWakuMessages(messages, discordCommunity) - if err != nil { - m.logger.Error("failed to convert chat messages into waku messages", zap.Error(err)) - return - } - - wakuPinMessages, err := m.pinMessagesToWakuMessages(pinMessagesToSave, discordCommunity) - if err != nil { - m.logger.Error("failed to convert pin messages into waku messages", zap.Error(err)) - return - } - - wakuMessages := append(wakuChatMessages, wakuPinMessages...) - - topics, err := m.communitiesManager.GetCommunityChatsTopics(discordCommunity.ID()) - if err != nil { - m.logger.Error("failed to get community chat topics", zap.Error(err)) - return - } - - startDate := time.Unix(int64(exportData.OldestMessageTimestamp), 0) - endDate := time.Now() - - _, err = m.communitiesManager.CreateHistoryArchiveTorrentFromMessages( - discordCommunity.ID(), - wakuMessages, - topics, - startDate, - endDate, - messageArchiveInterval, - discordCommunity.Encrypted(), - ) - if err != nil { - m.logger.Error("failed to create history archive torrent", zap.Error(err)) - return - } - - if m.torrentClientReady() && communitySettings.HistoryArchiveSupportEnabled { - - err = m.communitiesManager.SeedHistoryArchiveTorrent(discordCommunity.ID()) - if err != nil { - m.logger.Error("failed to seed history archive", zap.Error(err)) - } - go m.communitiesManager.StartHistoryArchiveTasksInterval(discordCommunity, messageArchiveInterval) - } }() } +func calculateProgress(i int, t int, currentProgress float32) float32 { + current := float32(1) / float32(t) * currentProgress + if i > 1 { + return float32(i-1)/float32(t) + current + } + return current +} + func (m *Messenger) MarkDiscordCommunityImportAsCancelled(communityID string) { m.importingCommunities[communityID] = true } diff --git a/protocol/requests/create_community_category.go b/protocol/requests/create_community_category.go index 75a19a148..d5fd34e7a 100644 --- a/protocol/requests/create_community_category.go +++ b/protocol/requests/create_community_category.go @@ -13,6 +13,7 @@ type CreateCommunityCategory struct { CommunityID types.HexBytes `json:"communityId"` CategoryName string `json:"categoryName"` ChatIDs []string `json:"chatIds"` + ThirdPartyID string `json:"thirdPartyID,omitempty"` } func (j *CreateCommunityCategory) Validate() error {