diff --git a/protocol/communities/manager.go b/protocol/communities/manager.go index 43c510dde..df4a1dbd5 100644 --- a/protocol/communities/manager.go +++ b/protocol/communities/manager.go @@ -504,7 +504,7 @@ func (m *Manager) ImportCommunity(key *ecdsa.PrivateKey) (*Community, error) { return community, nil } -func (m *Manager) CreateChat(communityID types.HexBytes, chat *protobuf.CommunityChat, publish bool) (*Community, *CommunityChanges, error) { +func (m *Manager) CreateChat(communityID types.HexBytes, chat *protobuf.CommunityChat, publish bool, thirdPartyID string) (*Community, *CommunityChanges, error) { community, err := m.GetByID(communityID) if err != nil { return nil, nil, err @@ -513,6 +513,10 @@ func (m *Manager) CreateChat(communityID types.HexBytes, chat *protobuf.Communit return nil, nil, ErrOrgNotFound } chatID := uuid.New().String() + if thirdPartyID != "" { + chatID = chatID + thirdPartyID + } + changes, err := community.CreateChat(chatID, chat) if err != nil { return nil, nil, err @@ -598,7 +602,11 @@ func (m *Manager) CreateCategory(request *requests.CreateCommunityCategory, publ if community == nil { return nil, nil, ErrOrgNotFound } + categoryID := uuid.New().String() + if request.ThirdPartyID != "" { + categoryID = categoryID + request.ThirdPartyID + } // Remove communityID prefix from chatID if exists for i, cid := range request.ChatIDs { diff --git a/protocol/communities/manager_test.go b/protocol/communities/manager_test.go index 82161dde9..862f8cab6 100644 --- a/protocol/communities/manager_test.go +++ b/protocol/communities/manager_test.go @@ -707,7 +707,7 @@ func (s *ManagerSuite) buildCommunityWithChat() (*Community, string, error) { }, Members: make(map[string]*protobuf.CommunityMember), } - _, changes, err := s.manager.CreateChat(community.ID(), chat, true) + _, changes, err := s.manager.CreateChat(community.ID(), chat, true, "") if err != nil { return nil, "", err } diff --git a/protocol/discord/types.go b/protocol/discord/types.go index 075cde3cb..48df0194a 100644 --- a/protocol/discord/types.go +++ b/protocol/discord/types.go @@ -160,10 +160,12 @@ type ImportProgress struct { ErrorsCount uint `json:"errorsCount"` WarningsCount uint `json:"warningsCount"` Stopped bool `json:"stopped"` + TotalChunkCount int `json:"totalChunksCount,omitempty"` + CurrentChunk int `json:"currentChunk,omitempty"` m sync.Mutex } -func (progress *ImportProgress) Init(tasks []ImportTask) { +func (progress *ImportProgress) Init(totalChunkCount int, tasks []ImportTask) { progress.Progress = 0 progress.Tasks = make([]*ImportTaskProgress, 0) for _, task := range tasks { @@ -180,6 +182,8 @@ func (progress *ImportProgress) Init(tasks []ImportTask) { progress.ErrorsCount = 0 progress.WarningsCount = 0 progress.Stopped = false + progress.TotalChunkCount = totalChunkCount + progress.CurrentChunk = 0 } func (progress *ImportProgress) Stop() { diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index a79c80fa6..97d6a47e8 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -904,7 +904,7 @@ func (m *Messenger) CreateCommunityChat(communityID types.HexBytes, c *protobuf. var response MessengerResponse c.Identity.FirstMessageTimestamp = FirstMessageTimestampNoMessage - community, changes, err := m.communitiesManager.CreateChat(communityID, c, true) + community, changes, err := m.communitiesManager.CreateChat(communityID, c, true, "") if err != nil { return nil, err } @@ -2339,13 +2339,15 @@ func (m *Messenger) RequestExtractDiscordChannelsAndCategories(filesToImport []s func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscordCommunity) { go func() { + totalImportChunkCount := len(request.FilesToImport) + progressUpdates := make(chan *discord.ImportProgress) done := make(chan struct{}) cancel := make(chan string) m.startPublishImportProgressInterval(progressUpdates, cancel, done) importProgress := &discord.ImportProgress{} - importProgress.Init([]discord.ImportTask{ + importProgress.Init(totalImportChunkCount, []discord.ImportTask{ discord.CommunityCreationTask, discord.ChannelsCreationTask, discord.ImportMessagesTask, @@ -2357,31 +2359,6 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor // initial progress immediately m.publishImportProgress(importProgress) - exportData, errs := m.ExtractDiscordDataFromImportFiles(request.FilesToImport) - if len(errs) > 0 { - for _, err := range errs { - importProgress.AddTaskError(discord.CommunityCreationTask, err) - } - progressUpdates <- importProgress - return - } - totalChannelsCount := len(exportData.ExportedData) - totalMessageCount := exportData.MessageCount - - if totalChannelsCount == 0 || totalMessageCount == 0 { - importError := discord.Error(discord.ErrNoChannelData.Error()) - if totalMessageCount == 0 { - importError.Message = discord.ErrNoMessageData.Error() - } - importProgress.AddTaskError(discord.CommunityCreationTask, importError) - importProgress.StopTask(discord.CommunityCreationTask) - progressUpdates <- importProgress - return - } - - importProgress.UpdateTaskProgress(discord.CommunityCreationTask, 0.5) - progressUpdates <- importProgress - createCommunityRequest := request.ToCreateCommunityRequest() // We're calling `CreateCommunity` on `communitiesManager` directly, instead of @@ -2434,7 +2411,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor importProgress.CommunityImages[t] = images.IdentityImage{Name: t, Payload: i.Payload} } - importProgress.UpdateTaskProgress(discord.CommunityCreationTask, 0.75) + importProgress.UpdateTaskProgress(discord.CommunityCreationTask, 1) progressUpdates <- importProgress if m.DiscordImportMarkedAsCancelled(communityID) { @@ -2444,113 +2421,162 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor return } - //This is a map of discord category IDs <-> Status category IDs - processedCategoriesIds := make(map[string]string, 0) - totalCategoriesCount := len(exportData.Categories) - - for _, category := range exportData.Categories { - - createCommunityCategoryRequest := &requests.CreateCommunityCategory{ - CommunityID: discordCommunity.ID(), - CategoryName: category.Name, - ChatIDs: make([]string, 0), - } - // We call `CreateCategory` on `communitiesManager` directly so we can control - // whether or not the community update should be published (it should not until the - // import has finished) - communityWithCategories, changes, err := m.communitiesManager.CreateCategory(createCommunityCategoryRequest, false) - if err != nil { - m.cleanUpImport(communityID) - importProgress.AddTaskError(discord.CommunityCreationTask, discord.Error(err.Error())) - importProgress.StopTask(discord.CommunityCreationTask) - progressUpdates <- importProgress - return - } - discordCommunity = communityWithCategories - // This looks like we keep overriding the same field but there's - // only one `CategoriesAdded` change at this point. - for _, addedCategory := range changes.CategoriesAdded { - processedCategoriesIds[category.ID] = addedCategory.CategoryId - } - - // We're multiplying `progressValue` by 0.25 as it's added to the previous 0.75 progress - progressValue := (float32(len(processedCategoriesIds)) / float32(totalCategoriesCount)) * 0.25 - importProgress.UpdateTaskProgress(discord.CommunityCreationTask, 0.75+progressValue) - - progressUpdates <- importProgress - } - - if m.DiscordImportMarkedAsCancelled(communityID) { - importProgress.StopTask(discord.CommunityCreationTask) - progressUpdates <- importProgress - cancel <- communityID - return - } - var chatsToSave []*Chat processedChannelIds := make(map[string]string, 0) - messagesToSave := make(map[string]*common.Message, 0) - pinMessagesToSave := make([]*common.PinMessage, 0) - authorProfilesToSave := make(map[string]*protobuf.DiscordMessageAuthor, 0) - messageAttachmentsToDownload := make([]*protobuf.DiscordMessageAttachment, 0) + processedCategoriesIds := make(map[string]string, 0) - for _, channel := range exportData.ExportedData { + for i, importFile := range request.FilesToImport { - communityChat := &protobuf.CommunityChat{ - Permissions: &protobuf.CommunityPermissions{ - Access: protobuf.CommunityPermissions_NO_MEMBERSHIP, - }, - Identity: &protobuf.ChatIdentity{ - DisplayName: channel.Channel.Name, - Emoji: "", - Description: channel.Channel.Description, - Color: discordCommunity.Color(), - }, - CategoryId: processedCategoriesIds[channel.Channel.CategoryID], - } - - // We call `CreateChat` on `communitiesManager` directly to get more control - // over whether we want to publish the updated community description. - communityWithChats, changes, err := m.communitiesManager.CreateChat(discordCommunity.ID(), communityChat, false) - if err != nil { - m.cleanUpImport(communityID) - errmsg := err.Error() - if _errors.Is(err, communities.ErrInvalidCommunityDescriptionDuplicatedName) { - errmsg = fmt.Sprintf("Couldn't create channel '%s': %s", communityChat.Identity.DisplayName, err.Error()) + exportData, errs := m.ExtractDiscordDataFromImportFiles([]string{importFile}) + if len(errs) > 0 { + for _, err := range errs { + importProgress.AddTaskError(discord.CommunityCreationTask, err) } - importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg)) - importProgress.StopTask(discord.ChannelsCreationTask) progressUpdates <- importProgress return } - discordCommunity = communityWithChats + totalChannelsCount := len(exportData.ExportedData) + totalMessageCount := exportData.MessageCount - // This looks like we keep overriding the chat id value - // as we iterate over `ChatsAdded`, however at this point we - // know there was only a single such change (and it's a map) - for chatID, chat := range changes.ChatsAdded { - c := CreateCommunityChat(communityID, chatID, chat, m.getTimesource()) - chatsToSave = append(chatsToSave, c) - processedChannelIds[channel.Channel.ID] = c.ID + if totalChannelsCount == 0 || totalMessageCount == 0 { + importError := discord.Error(fmt.Errorf("No channel to import messages from in file: %s", importFile).Error()) + if totalMessageCount == 0 { + importError.Message = fmt.Errorf("No messages to import in file: %s", importFile).Error() + } + importProgress.AddTaskError(discord.ChannelsCreationTask, importError) + progressUpdates <- importProgress + continue } - progressValue := float32(len(processedChannelIds)) / float32(totalChannelsCount) + importProgress.CurrentChunk = i + 1 + + // We actually only ever receive a single category + // from `exportData` but since it's a map, we still have to + // iterate over it to access its values + for _, category := range exportData.Categories { + + categories := discordCommunity.Categories() + exists := false + for catID := range categories { + if strings.HasSuffix(catID, category.ID) { + exists = true + break + } + } + + if !exists { + createCommunityCategoryRequest := &requests.CreateCommunityCategory{ + CommunityID: discordCommunity.ID(), + CategoryName: category.Name, + ThirdPartyID: category.ID, + ChatIDs: make([]string, 0), + } + // We call `CreateCategory` on `communitiesManager` directly so we can control + // whether or not the community update should be published (it should not until the + // import has finished) + communityWithCategories, changes, err := m.communitiesManager.CreateCategory(createCommunityCategoryRequest, false) + if err != nil { + m.cleanUpImport(communityID) + importProgress.AddTaskError(discord.CommunityCreationTask, discord.Error(err.Error())) + importProgress.StopTask(discord.CommunityCreationTask) + progressUpdates <- importProgress + return + } + discordCommunity = communityWithCategories + // This looks like we keep overriding the same field but there's + // only one `CategoriesAdded` change at this point. + for _, addedCategory := range changes.CategoriesAdded { + processedCategoriesIds[category.ID] = addedCategory.CategoryId + } + } + } + + progressValue := calculateProgress(i+1, totalImportChunkCount, (float32(1) / 2)) + importProgress.UpdateTaskProgress(discord.ChannelsCreationTask, progressValue) + + progressUpdates <- importProgress + + if m.DiscordImportMarkedAsCancelled(communityID) { + importProgress.StopTask(discord.CommunityCreationTask) + progressUpdates <- importProgress + cancel <- communityID + return + } + + messagesToSave := make(map[string]*common.Message, 0) + pinMessagesToSave := make([]*common.PinMessage, 0) + authorProfilesToSave := make(map[string]*protobuf.DiscordMessageAuthor, 0) + messageAttachmentsToDownload := make([]*protobuf.DiscordMessageAttachment, 0) + + // Save to access the first item here as we process + // exported data by files which only holds a single channel + channel := exportData.ExportedData[0] + chatIDs := discordCommunity.ChatIDs() + + exists := false + for _, chatID := range chatIDs { + if strings.HasSuffix(chatID, channel.Channel.ID) { + exists = true + break + } + } + + if !exists { + communityChat := &protobuf.CommunityChat{ + Permissions: &protobuf.CommunityPermissions{ + Access: protobuf.CommunityPermissions_NO_MEMBERSHIP, + }, + Identity: &protobuf.ChatIdentity{ + DisplayName: channel.Channel.Name, + Emoji: "", + Description: channel.Channel.Description, + Color: discordCommunity.Color(), + }, + CategoryId: processedCategoriesIds[channel.Channel.CategoryID], + } + + // We call `CreateChat` on `communitiesManager` directly to get more control + // over whether we want to publish the updated community description. + communityWithChats, changes, err := m.communitiesManager.CreateChat(discordCommunity.ID(), communityChat, false, channel.Channel.ID) + if err != nil { + m.cleanUpImport(communityID) + errmsg := err.Error() + if _errors.Is(err, communities.ErrInvalidCommunityDescriptionDuplicatedName) { + errmsg = fmt.Sprintf("Couldn't create channel '%s': %s", communityChat.Identity.DisplayName, err.Error()) + } + importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg)) + importProgress.StopTask(discord.ChannelsCreationTask) + progressUpdates <- importProgress + return + } + discordCommunity = communityWithChats + + // This looks like we keep overriding the chat id value + // as we iterate over `ChatsAdded`, however at this point we + // know there was only a single such change (and it's a map) + for chatID, chat := range changes.ChatsAdded { + c := CreateCommunityChat(communityID, chatID, chat, m.getTimesource()) + chatsToSave = append(chatsToSave, c) + processedChannelIds[channel.Channel.ID] = c.ID + } + } + + progressValue = calculateProgress(i+1, totalImportChunkCount, 1) importProgress.UpdateTaskProgress(discord.ChannelsCreationTask, progressValue) progressUpdates <- importProgress - for _, discordMessage := range channel.Messages { - - progressValue := float32(len(messagesToSave)) / float32(totalMessageCount) + for ii, discordMessage := range channel.Messages { timestamp, err := time.Parse(discordTimestampLayout, discordMessage.Timestamp) if err != nil { m.logger.Error("failed to parse discord message timestamp", zap.Error(err)) importProgress.AddTaskError(discord.ImportMessagesTask, discord.Warning(err.Error())) - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) + progressUpdates <- importProgress continue } if timestamp.Unix() < request.From { + progressUpdates <- importProgress continue } @@ -2558,7 +2584,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor if err != nil { m.logger.Error("failed to check if message author exists in database", zap.Error(err)) importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) + progressUpdates <- importProgress continue } @@ -2566,7 +2592,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor err := m.persistence.SaveDiscordMessageAuthor(discordMessage.Author) if err != nil { importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) + progressUpdates <- importProgress continue } } @@ -2575,7 +2601,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor if err != nil { m.logger.Error("failed to check if message avatar payload exists in database", zap.Error(err)) importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) + progressUpdates <- importProgress continue } @@ -2591,7 +2617,6 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor if err != nil { m.logger.Error("failed to parse discord message timestamp", zap.Error(err)) importProgress.AddTaskError(discord.ImportMessagesTask, discord.Warning(err.Error())) - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) progressUpdates <- importProgress continue } @@ -2641,7 +2666,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor if err != nil { m.logger.Error("failed to prepare message content", zap.Error(err)) importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) + progressUpdates <- importProgress continue } @@ -2662,7 +2687,6 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor if err != nil { m.logger.Error("failed to parse marshal pin message", zap.Error(err)) importProgress.AddTaskError(discord.ImportMessagesTask, discord.Warning(err.Error())) - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) progressUpdates <- importProgress continue } @@ -2671,7 +2695,6 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor if err != nil { m.logger.Error("failed to wrap pin message", zap.Error(err)) importProgress.AddTaskError(discord.ImportMessagesTask, discord.Warning(err.Error())) - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) progressUpdates <- importProgress continue } @@ -2692,44 +2715,10 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor } else { messagesToSave[communityID+discordMessage.Id] = messageToSave } - } - // We're multiplying `progressValue` by `0.5` so we leave 50% for actual save operations - // The 0.5 could be calculated but we'd need to know the total message chunks count, - // which we don't at this point - progressValue = float32(len(messagesToSave)) / float32(totalMessageCount) * 0.5 - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) - progressUpdates <- importProgress - if m.DiscordImportMarkedAsCancelled(communityID) { - importProgress.StopTask(discord.ImportMessagesTask) + progressValue := calculateProgress(i+1, totalImportChunkCount, float32(ii+1)/float32(len(channel.Messages))*0.5) + importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) progressUpdates <- importProgress - cancel <- communityID - return - } - } - - var discordMessages []*protobuf.DiscordMessage - for _, msg := range messagesToSave { - discordMessages = append(discordMessages, msg.GetDiscordMessage()) - } - - // We save these messages in chunks so we don't block the database - // for a longer period of time - discordMessageChunks := chunkSlice(discordMessages, maxChunkSizeMessages) - chunksCount := len(discordMessageChunks) - - // Signal to clients that save operations are starting - importProgress.UpdateTaskState(discord.ImportMessagesTask, discord.TaskStateSaving) - progressUpdates <- importProgress - - for i, msgs := range discordMessageChunks { - err = m.persistence.SaveDiscordMessages(msgs) - if err != nil { - m.cleanUpImport(communityID) - importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) - importProgress.StopTask(discord.ImportMessagesTask) - progressUpdates <- importProgress - return } if m.DiscordImportMarkedAsCancelled(communityID) { @@ -2739,198 +2728,155 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor return } - // We're multiplying `chunksCount` by `0.25` so we leave 25% for additional save operations - // 0.5 are the previous 50% of progress - currentCount := i + 1 - progressValue := 0.5 + (float32(currentCount) / float32(chunksCount) * 0.25) - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) - - // We slow down the saving of message chunks to keep the database responsive - if currentCount < chunksCount { - time.Sleep(2 * time.Second) - } - } - - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, 0.75) - - var messages []*common.Message - for _, msg := range messagesToSave { - messages = append(messages, msg) - } - - // Same as above, we save these messages in chunks so we don't block - // the database for a longer period of time - messageChunks := chunkSlice(messages, maxChunkSizeMessages) - chunksCount = len(messageChunks) - - for i, msgs := range messageChunks { - err = m.persistence.SaveMessages(msgs) - if err != nil { - m.cleanUpImport(communityID) - importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) - importProgress.StopTask(discord.ImportMessagesTask) - progressUpdates <- importProgress - return + var discordMessages []*protobuf.DiscordMessage + for _, msg := range messagesToSave { + discordMessages = append(discordMessages, msg.GetDiscordMessage()) } - if m.DiscordImportMarkedAsCancelled(communityID) { - importProgress.StopTask(discord.ImportMessagesTask) - progressUpdates <- importProgress - cancel <- communityID - return - } + // We save these messages in chunks so we don't block the database + // for a longer period of time + discordMessageChunks := chunkSlice(discordMessages, maxChunkSizeMessages) + chunksCount := len(discordMessageChunks) - // 0.75 are the previous 75% of progress, hence we multiply our chunk progress - // by 0.25 - currentCount := i + 1 - progressValue := 0.75 + ((float32(currentCount) / float32(chunksCount)) * 0.25) - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) - - // We slow down the saving of message chunks to keep the database responsive - if currentCount < chunksCount { - time.Sleep(2 * time.Second) - } - } - - pinMessageChunks := chunkSlice(pinMessagesToSave, maxChunkSizeMessages) - for _, pinMsgs := range pinMessageChunks { - err = m.persistence.SavePinMessages(pinMsgs) - if err != nil { - m.cleanUpImport(communityID) - importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) - importProgress.StopTask(discord.ImportMessagesTask) - progressUpdates <- importProgress - return - } - - if m.DiscordImportMarkedAsCancelled(communityID) { - importProgress.StopTask(discord.ImportMessagesTask) - progressUpdates <- importProgress - cancel <- communityID - return - } - } - - importProgress.UpdateTaskProgress(discord.ImportMessagesTask, 1) - progressUpdates <- importProgress - - totalAssetsCount := len(messageAttachmentsToDownload) + len(authorProfilesToSave) - var assetCounter discord.AssetCounter - - var wg sync.WaitGroup - - for id, author := range authorProfilesToSave { - wg.Add(1) - go func(id string, author *protobuf.DiscordMessageAuthor) { - defer wg.Done() - imagePayload, err := discord.DownloadAvatarAsset(author.AvatarUrl) + for ii, msgs := range discordMessageChunks { + m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord messages", ii+1, chunksCount, len(msgs))) + err = m.persistence.SaveDiscordMessages(msgs) if err != nil { - errmsg := fmt.Sprintf("Couldn't download profile avatar '%s': %s", author.AvatarUrl, err.Error()) - importProgress.AddTaskError( - discord.DownloadAssetsTask, - discord.Warning(errmsg), - ) + m.cleanUpImport(communityID) + importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) + importProgress.StopTask(discord.ImportMessagesTask) progressUpdates <- importProgress return } - err = m.persistence.UpdateDiscordMessageAuthorImage(author.Id, imagePayload) - if err != nil { - importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Warning(err.Error())) + if m.DiscordImportMarkedAsCancelled(communityID) { + importProgress.StopTask(discord.ImportMessagesTask) progressUpdates <- importProgress + cancel <- communityID return } - author.AvatarImagePayload = imagePayload - authorProfilesToSave[id] = author - - assetCounter.Increase() - progressValue := float32(assetCounter.Value()) / float32(totalAssetsCount) - importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue) + // We're multiplying `chunksCount` by `0.25` so we leave 25% for additional save operations + // 0.5 are the previous 50% of progress + currentCount := ii + 1 + progressValue := calculateProgress(i+1, totalImportChunkCount, 0.5+(float32(currentCount)/float32(chunksCount))*0.25) + importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) progressUpdates <- importProgress - if m.DiscordImportMarkedAsCancelled(discordCommunity.IDString()) { - importProgress.StopTask(discord.DownloadAssetsTask) + // We slow down the saving of message chunks to keep the database responsive + if currentCount < chunksCount { + time.Sleep(2 * time.Second) + } + } + + var messages []*common.Message + for _, msg := range messagesToSave { + messages = append(messages, msg) + } + + // Same as above, we save these messages in chunks so we don't block + // the database for a longer period of time + messageChunks := chunkSlice(messages, maxChunkSizeMessages) + chunksCount = len(messageChunks) + + for ii, msgs := range messageChunks { + m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d app messages", ii+1, chunksCount, len(msgs))) + err = m.persistence.SaveMessages(msgs) + if err != nil { + m.cleanUpImport(communityID) + importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) + importProgress.StopTask(discord.ImportMessagesTask) progressUpdates <- importProgress - cancel <- discordCommunity.IDString() return } - }(id, author) - } - wg.Wait() - if m.DiscordImportMarkedAsCancelled(communityID) { - importProgress.StopTask(discord.DownloadAssetsTask) - progressUpdates <- importProgress - cancel <- communityID - return - } + if m.DiscordImportMarkedAsCancelled(communityID) { + importProgress.StopTask(discord.ImportMessagesTask) + progressUpdates <- importProgress + cancel <- communityID + return + } - for idxRange := range gopart.Partition(len(messageAttachmentsToDownload), 100) { - attachments := messageAttachmentsToDownload[idxRange.Low:idxRange.High] - wg.Add(1) - go func(attachments []*protobuf.DiscordMessageAttachment) { - defer wg.Done() - for i, attachment := range attachments { - assetPayload, contentType, err := discord.DownloadAsset(attachment.Url) + // 0.75 are the previous 75% of progress, hence we multiply our chunk progress + // by 0.25 + currentCount := ii + 1 + progressValue := calculateProgress(i+1, totalImportChunkCount, 0.75+(float32(currentCount)/float32(chunksCount))*0.25) + // progressValue := 0.75 + ((float32(currentCount) / float32(chunksCount)) * 0.25) + importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue) + progressUpdates <- importProgress + + // We slow down the saving of message chunks to keep the database responsive + if currentCount < chunksCount { + time.Sleep(2 * time.Second) + } + } + + pinMessageChunks := chunkSlice(pinMessagesToSave, maxChunkSizeMessages) + for _, pinMsgs := range pinMessageChunks { + err = m.persistence.SavePinMessages(pinMsgs) + if err != nil { + m.cleanUpImport(communityID) + importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error())) + importProgress.StopTask(discord.ImportMessagesTask) + progressUpdates <- importProgress + return + } + + if m.DiscordImportMarkedAsCancelled(communityID) { + importProgress.StopTask(discord.ImportMessagesTask) + progressUpdates <- importProgress + cancel <- communityID + return + } + } + + totalAssetsCount := len(messageAttachmentsToDownload) + len(authorProfilesToSave) + var assetCounter discord.AssetCounter + + var wg sync.WaitGroup + + for id, author := range authorProfilesToSave { + wg.Add(1) + go func(id string, author *protobuf.DiscordMessageAuthor) { + defer wg.Done() + + m.communitiesManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount)) + imagePayload, err := discord.DownloadAvatarAsset(author.AvatarUrl) if err != nil { - errmsg := fmt.Sprintf("Couldn't download message attachment '%s': %s", attachment.Url, err.Error()) + errmsg := fmt.Sprintf("Couldn't download profile avatar '%s': %s", author.AvatarUrl, err.Error()) importProgress.AddTaskError( discord.DownloadAssetsTask, discord.Warning(errmsg), ) progressUpdates <- importProgress - continue + return } - attachment.Payload = assetPayload - attachment.ContentType = contentType - messageAttachmentsToDownload[i] = attachment + err = m.persistence.UpdateDiscordMessageAuthorImage(author.Id, imagePayload) + if err != nil { + importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Warning(err.Error())) + progressUpdates <- importProgress + return + } + + author.AvatarImagePayload = imagePayload + authorProfilesToSave[id] = author + + if m.DiscordImportMarkedAsCancelled(discordCommunity.IDString()) { + importProgress.StopTask(discord.DownloadAssetsTask) + progressUpdates <- importProgress + cancel <- discordCommunity.IDString() + return + } assetCounter.Increase() - - // Multiplying progress by `0.5` to leave 50% for saving assets to DB - // similar to how it's done for messages - progressValue := (float32(assetCounter.Value()) / float32(totalAssetsCount)) * 0.5 + progressValue := calculateProgress(i+1, totalImportChunkCount, (float32(assetCounter.Value())/float32(totalAssetsCount))*0.25) importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue) progressUpdates <- importProgress - if m.DiscordImportMarkedAsCancelled(communityID) { - importProgress.StopTask(discord.DownloadAssetsTask) - progressUpdates <- importProgress - cancel <- communityID - return - } - } - }(attachments) - } - wg.Wait() - - if m.DiscordImportMarkedAsCancelled(communityID) { - importProgress.StopTask(discord.DownloadAssetsTask) - progressUpdates <- importProgress - cancel <- communityID - return - } - - // Signal to the client that save operations are starting - importProgress.UpdateTaskState(discord.DownloadAssetsTask, discord.TaskStateSaving) - progressUpdates <- importProgress - - // We chunk message attachments by `maxChunkSizeBytes` to ensure individual - // save operations don't take too long and block the database - attachmentChunks := chunkAttachmentsByByteSize(messageAttachmentsToDownload, maxChunkSizeBytes) - chunksCount = len(attachmentChunks) - - for i, attachments := range attachmentChunks { - err = m.persistence.SaveDiscordMessageAttachments(attachments) - if err != nil { - m.cleanUpImport(communityID) - importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Error(err.Error())) - importProgress.Stop() - progressUpdates <- importProgress - return + }(id, author) } + wg.Wait() if m.DiscordImportMarkedAsCancelled(communityID) { importProgress.StopTask(discord.DownloadAssetsTask) @@ -2939,21 +2885,140 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor return } - // 0.5 are the previous 50% of progress, hence we multiply our chunk progress - // by 0.5 - currentCount := i + 1 - progressValue := 0.5 + ((float32(currentCount) / float32(chunksCount)) * 0.5) - importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue) + for idxRange := range gopart.Partition(len(messageAttachmentsToDownload), 100) { + attachments := messageAttachmentsToDownload[idxRange.Low:idxRange.High] + wg.Add(1) + go func(attachments []*protobuf.DiscordMessageAttachment) { + defer wg.Done() + for ii, attachment := range attachments { - // We slow down the saving of attachment chunks to keep the database responsive - if currentCount < chunksCount { - time.Sleep(2 * time.Second) + m.communitiesManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount)) + + assetPayload, contentType, err := discord.DownloadAsset(attachment.Url) + if err != nil { + errmsg := fmt.Sprintf("Couldn't download message attachment '%s': %s", attachment.Url, err.Error()) + importProgress.AddTaskError( + discord.DownloadAssetsTask, + discord.Warning(errmsg), + ) + progressUpdates <- importProgress + continue + } + + attachment.Payload = assetPayload + attachment.ContentType = contentType + messageAttachmentsToDownload[ii] = attachment + + if m.DiscordImportMarkedAsCancelled(communityID) { + importProgress.StopTask(discord.DownloadAssetsTask) + progressUpdates <- importProgress + cancel <- communityID + return + } + + assetCounter.Increase() + progressValue := calculateProgress(i+1, totalImportChunkCount, 0.25+(float32(assetCounter.Value())/float32(totalAssetsCount))*0.25) + importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue) + progressUpdates <- importProgress + } + }(attachments) + } + wg.Wait() + + if m.DiscordImportMarkedAsCancelled(communityID) { + importProgress.StopTask(discord.DownloadAssetsTask) + progressUpdates <- importProgress + cancel <- communityID + return + } + + attachmentChunks := chunkAttachmentsByByteSize(messageAttachmentsToDownload, maxChunkSizeBytes) + chunksCount = len(attachmentChunks) + + for ii, attachments := range attachmentChunks { + m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord message attachments", ii+1, chunksCount, len(attachments))) + err = m.persistence.SaveDiscordMessageAttachments(attachments) + if err != nil { + m.cleanUpImport(communityID) + importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Error(err.Error())) + importProgress.Stop() + progressUpdates <- importProgress + return + } + + if m.DiscordImportMarkedAsCancelled(communityID) { + importProgress.StopTask(discord.DownloadAssetsTask) + progressUpdates <- importProgress + cancel <- communityID + return + } + + // 0.5 are the previous 50% of progress, hence we multiply our chunk progress + // by 0.5 + currentCount := ii + 1 + progressValue := calculateProgress(i+1, totalImportChunkCount, 0.5+(float32(currentCount)/float32(chunksCount))*0.5) + importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue) + progressUpdates <- importProgress + + // We slow down the saving of attachment chunks to keep the database responsive + if currentCount < chunksCount { + time.Sleep(2 * time.Second) + } + } + + _, err := m.transport.JoinPublic(processedChannelIds[channel.Channel.ID]) + if err != nil { + m.logger.Error("failed to load filter for chat", zap.Error(err)) + continue + } + + wakuChatMessages, err := m.chatMessagesToWakuMessages(messages, discordCommunity) + if err != nil { + m.logger.Error("failed to convert chat messages into waku messages", zap.Error(err)) + continue + } + + wakuPinMessages, err := m.pinMessagesToWakuMessages(pinMessagesToSave, discordCommunity) + if err != nil { + m.logger.Error("failed to convert pin messages into waku messages", zap.Error(err)) + continue + } + + wakuMessages := append(wakuChatMessages, wakuPinMessages...) + + topics, err := m.communitiesManager.GetCommunityChatsTopics(discordCommunity.ID()) + if err != nil { + m.logger.Error("failed to get community chat topics", zap.Error(err)) + continue + } + + startDate := time.Unix(int64(exportData.OldestMessageTimestamp), 0) + endDate := time.Now() + + _, err = m.communitiesManager.CreateHistoryArchiveTorrentFromMessages( + discordCommunity.ID(), + wakuMessages, + topics, + startDate, + endDate, + messageArchiveInterval, + discordCommunity.Encrypted(), + ) + if err != nil { + m.logger.Error("failed to create history archive torrent", zap.Error(err)) + continue + } + + if m.torrentClientReady() && communitySettings.HistoryArchiveSupportEnabled { + + err = m.communitiesManager.SeedHistoryArchiveTorrent(discordCommunity.ID()) + if err != nil { + m.logger.Error("failed to seed history archive", zap.Error(err)) + } + go m.communitiesManager.StartHistoryArchiveTasksInterval(discordCommunity, messageArchiveInterval) } } - importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, 1) - progressUpdates <- importProgress - err = m.publishOrg(discordCommunity) if err != nil { m.cleanUpImport(communityID) @@ -2980,6 +3045,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor progressUpdates <- importProgress return } + importProgress.UpdateTaskProgress(discord.InitCommunityTask, 0.15) progressUpdates <- importProgress @@ -3009,12 +3075,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor return } - filterChatIds := discordCommunity.DefaultFilters() - for _, chatID := range processedChannelIds { - filterChatIds = append(filterChatIds, chatID) - } - - filters, err := m.transport.InitPublicFilters(filterChatIds) + _, err = m.transport.InitPublicFilters(discordCommunity.DefaultFilters()) if err != nil { m.cleanUpImport(communityID) importProgress.AddTaskError(discord.InitCommunityTask, discord.Error(err.Error())) @@ -3033,6 +3094,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor return } + filters := m.transport.Filters() _, err = m.scheduleSyncFilters(filters) if err != nil { m.cleanUpImport(communityID) @@ -3071,55 +3133,17 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor m.config.messengerSignalsHandler.DiscordCommunityImportFinished(communityID) close(done) - - wakuChatMessages, err := m.chatMessagesToWakuMessages(messages, discordCommunity) - if err != nil { - m.logger.Error("failed to convert chat messages into waku messages", zap.Error(err)) - return - } - - wakuPinMessages, err := m.pinMessagesToWakuMessages(pinMessagesToSave, discordCommunity) - if err != nil { - m.logger.Error("failed to convert pin messages into waku messages", zap.Error(err)) - return - } - - wakuMessages := append(wakuChatMessages, wakuPinMessages...) - - topics, err := m.communitiesManager.GetCommunityChatsTopics(discordCommunity.ID()) - if err != nil { - m.logger.Error("failed to get community chat topics", zap.Error(err)) - return - } - - startDate := time.Unix(int64(exportData.OldestMessageTimestamp), 0) - endDate := time.Now() - - _, err = m.communitiesManager.CreateHistoryArchiveTorrentFromMessages( - discordCommunity.ID(), - wakuMessages, - topics, - startDate, - endDate, - messageArchiveInterval, - discordCommunity.Encrypted(), - ) - if err != nil { - m.logger.Error("failed to create history archive torrent", zap.Error(err)) - return - } - - if m.torrentClientReady() && communitySettings.HistoryArchiveSupportEnabled { - - err = m.communitiesManager.SeedHistoryArchiveTorrent(discordCommunity.ID()) - if err != nil { - m.logger.Error("failed to seed history archive", zap.Error(err)) - } - go m.communitiesManager.StartHistoryArchiveTasksInterval(discordCommunity, messageArchiveInterval) - } }() } +func calculateProgress(i int, t int, currentProgress float32) float32 { + current := float32(1) / float32(t) * currentProgress + if i > 1 { + return float32(i-1)/float32(t) + current + } + return current +} + func (m *Messenger) MarkDiscordCommunityImportAsCancelled(communityID string) { m.importingCommunities[communityID] = true } diff --git a/protocol/requests/create_community_category.go b/protocol/requests/create_community_category.go index 75a19a148..d5fd34e7a 100644 --- a/protocol/requests/create_community_category.go +++ b/protocol/requests/create_community_category.go @@ -13,6 +13,7 @@ type CreateCommunityCategory struct { CommunityID types.HexBytes `json:"communityId"` CategoryName string `json:"categoryName"` ChatIDs []string `json:"chatIds"` + ThirdPartyID string `json:"thirdPartyID,omitempty"` } func (j *CreateCommunityCategory) Validate() error {