refactor(import-tool): process import data in chunks

This commit refactors the discord import tool such that,
instead of loading all data to be imported into memory at
once, it will now perform the import on a per file basis.

This improves the memory pressure for the node performing
the import and seems to increase its performance as well.
This commit is contained in:
Pascal Precht 2023-01-26 13:52:43 +01:00 committed by r4bbit
parent ee9f8edfcf
commit f31e40264e
5 changed files with 417 additions and 380 deletions

View File

@ -504,7 +504,7 @@ func (m *Manager) ImportCommunity(key *ecdsa.PrivateKey) (*Community, error) {
return community, nil
}
func (m *Manager) CreateChat(communityID types.HexBytes, chat *protobuf.CommunityChat, publish bool) (*Community, *CommunityChanges, error) {
func (m *Manager) CreateChat(communityID types.HexBytes, chat *protobuf.CommunityChat, publish bool, thirdPartyID string) (*Community, *CommunityChanges, error) {
community, err := m.GetByID(communityID)
if err != nil {
return nil, nil, err
@ -513,6 +513,10 @@ func (m *Manager) CreateChat(communityID types.HexBytes, chat *protobuf.Communit
return nil, nil, ErrOrgNotFound
}
chatID := uuid.New().String()
if thirdPartyID != "" {
chatID = chatID + thirdPartyID
}
changes, err := community.CreateChat(chatID, chat)
if err != nil {
return nil, nil, err
@ -598,7 +602,11 @@ func (m *Manager) CreateCategory(request *requests.CreateCommunityCategory, publ
if community == nil {
return nil, nil, ErrOrgNotFound
}
categoryID := uuid.New().String()
if request.ThirdPartyID != "" {
categoryID = categoryID + request.ThirdPartyID
}
// Remove communityID prefix from chatID if exists
for i, cid := range request.ChatIDs {

View File

@ -707,7 +707,7 @@ func (s *ManagerSuite) buildCommunityWithChat() (*Community, string, error) {
},
Members: make(map[string]*protobuf.CommunityMember),
}
_, changes, err := s.manager.CreateChat(community.ID(), chat, true)
_, changes, err := s.manager.CreateChat(community.ID(), chat, true, "")
if err != nil {
return nil, "", err
}

View File

@ -160,10 +160,12 @@ type ImportProgress struct {
ErrorsCount uint `json:"errorsCount"`
WarningsCount uint `json:"warningsCount"`
Stopped bool `json:"stopped"`
TotalChunkCount int `json:"totalChunksCount,omitempty"`
CurrentChunk int `json:"currentChunk,omitempty"`
m sync.Mutex
}
func (progress *ImportProgress) Init(tasks []ImportTask) {
func (progress *ImportProgress) Init(totalChunkCount int, tasks []ImportTask) {
progress.Progress = 0
progress.Tasks = make([]*ImportTaskProgress, 0)
for _, task := range tasks {
@ -180,6 +182,8 @@ func (progress *ImportProgress) Init(tasks []ImportTask) {
progress.ErrorsCount = 0
progress.WarningsCount = 0
progress.Stopped = false
progress.TotalChunkCount = totalChunkCount
progress.CurrentChunk = 0
}
func (progress *ImportProgress) Stop() {

View File

@ -904,7 +904,7 @@ func (m *Messenger) CreateCommunityChat(communityID types.HexBytes, c *protobuf.
var response MessengerResponse
c.Identity.FirstMessageTimestamp = FirstMessageTimestampNoMessage
community, changes, err := m.communitiesManager.CreateChat(communityID, c, true)
community, changes, err := m.communitiesManager.CreateChat(communityID, c, true, "")
if err != nil {
return nil, err
}
@ -2339,13 +2339,15 @@ func (m *Messenger) RequestExtractDiscordChannelsAndCategories(filesToImport []s
func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscordCommunity) {
go func() {
totalImportChunkCount := len(request.FilesToImport)
progressUpdates := make(chan *discord.ImportProgress)
done := make(chan struct{})
cancel := make(chan string)
m.startPublishImportProgressInterval(progressUpdates, cancel, done)
importProgress := &discord.ImportProgress{}
importProgress.Init([]discord.ImportTask{
importProgress.Init(totalImportChunkCount, []discord.ImportTask{
discord.CommunityCreationTask,
discord.ChannelsCreationTask,
discord.ImportMessagesTask,
@ -2357,31 +2359,6 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
// initial progress immediately
m.publishImportProgress(importProgress)
exportData, errs := m.ExtractDiscordDataFromImportFiles(request.FilesToImport)
if len(errs) > 0 {
for _, err := range errs {
importProgress.AddTaskError(discord.CommunityCreationTask, err)
}
progressUpdates <- importProgress
return
}
totalChannelsCount := len(exportData.ExportedData)
totalMessageCount := exportData.MessageCount
if totalChannelsCount == 0 || totalMessageCount == 0 {
importError := discord.Error(discord.ErrNoChannelData.Error())
if totalMessageCount == 0 {
importError.Message = discord.ErrNoMessageData.Error()
}
importProgress.AddTaskError(discord.CommunityCreationTask, importError)
importProgress.StopTask(discord.CommunityCreationTask)
progressUpdates <- importProgress
return
}
importProgress.UpdateTaskProgress(discord.CommunityCreationTask, 0.5)
progressUpdates <- importProgress
createCommunityRequest := request.ToCreateCommunityRequest()
// We're calling `CreateCommunity` on `communitiesManager` directly, instead of
@ -2434,7 +2411,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
importProgress.CommunityImages[t] = images.IdentityImage{Name: t, Payload: i.Payload}
}
importProgress.UpdateTaskProgress(discord.CommunityCreationTask, 0.75)
importProgress.UpdateTaskProgress(discord.CommunityCreationTask, 1)
progressUpdates <- importProgress
if m.DiscordImportMarkedAsCancelled(communityID) {
@ -2444,113 +2421,162 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
return
}
//This is a map of discord category IDs <-> Status category IDs
processedCategoriesIds := make(map[string]string, 0)
totalCategoriesCount := len(exportData.Categories)
for _, category := range exportData.Categories {
createCommunityCategoryRequest := &requests.CreateCommunityCategory{
CommunityID: discordCommunity.ID(),
CategoryName: category.Name,
ChatIDs: make([]string, 0),
}
// We call `CreateCategory` on `communitiesManager` directly so we can control
// whether or not the community update should be published (it should not until the
// import has finished)
communityWithCategories, changes, err := m.communitiesManager.CreateCategory(createCommunityCategoryRequest, false)
if err != nil {
m.cleanUpImport(communityID)
importProgress.AddTaskError(discord.CommunityCreationTask, discord.Error(err.Error()))
importProgress.StopTask(discord.CommunityCreationTask)
progressUpdates <- importProgress
return
}
discordCommunity = communityWithCategories
// This looks like we keep overriding the same field but there's
// only one `CategoriesAdded` change at this point.
for _, addedCategory := range changes.CategoriesAdded {
processedCategoriesIds[category.ID] = addedCategory.CategoryId
}
// We're multiplying `progressValue` by 0.25 as it's added to the previous 0.75 progress
progressValue := (float32(len(processedCategoriesIds)) / float32(totalCategoriesCount)) * 0.25
importProgress.UpdateTaskProgress(discord.CommunityCreationTask, 0.75+progressValue)
progressUpdates <- importProgress
}
if m.DiscordImportMarkedAsCancelled(communityID) {
importProgress.StopTask(discord.CommunityCreationTask)
progressUpdates <- importProgress
cancel <- communityID
return
}
var chatsToSave []*Chat
processedChannelIds := make(map[string]string, 0)
messagesToSave := make(map[string]*common.Message, 0)
pinMessagesToSave := make([]*common.PinMessage, 0)
authorProfilesToSave := make(map[string]*protobuf.DiscordMessageAuthor, 0)
messageAttachmentsToDownload := make([]*protobuf.DiscordMessageAttachment, 0)
processedCategoriesIds := make(map[string]string, 0)
for _, channel := range exportData.ExportedData {
for i, importFile := range request.FilesToImport {
communityChat := &protobuf.CommunityChat{
Permissions: &protobuf.CommunityPermissions{
Access: protobuf.CommunityPermissions_NO_MEMBERSHIP,
},
Identity: &protobuf.ChatIdentity{
DisplayName: channel.Channel.Name,
Emoji: "",
Description: channel.Channel.Description,
Color: discordCommunity.Color(),
},
CategoryId: processedCategoriesIds[channel.Channel.CategoryID],
}
// We call `CreateChat` on `communitiesManager` directly to get more control
// over whether we want to publish the updated community description.
communityWithChats, changes, err := m.communitiesManager.CreateChat(discordCommunity.ID(), communityChat, false)
if err != nil {
m.cleanUpImport(communityID)
errmsg := err.Error()
if _errors.Is(err, communities.ErrInvalidCommunityDescriptionDuplicatedName) {
errmsg = fmt.Sprintf("Couldn't create channel '%s': %s", communityChat.Identity.DisplayName, err.Error())
exportData, errs := m.ExtractDiscordDataFromImportFiles([]string{importFile})
if len(errs) > 0 {
for _, err := range errs {
importProgress.AddTaskError(discord.CommunityCreationTask, err)
}
importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg))
importProgress.StopTask(discord.ChannelsCreationTask)
progressUpdates <- importProgress
return
}
discordCommunity = communityWithChats
totalChannelsCount := len(exportData.ExportedData)
totalMessageCount := exportData.MessageCount
// This looks like we keep overriding the chat id value
// as we iterate over `ChatsAdded`, however at this point we
// know there was only a single such change (and it's a map)
for chatID, chat := range changes.ChatsAdded {
c := CreateCommunityChat(communityID, chatID, chat, m.getTimesource())
chatsToSave = append(chatsToSave, c)
processedChannelIds[channel.Channel.ID] = c.ID
if totalChannelsCount == 0 || totalMessageCount == 0 {
importError := discord.Error(fmt.Errorf("No channel to import messages from in file: %s", importFile).Error())
if totalMessageCount == 0 {
importError.Message = fmt.Errorf("No messages to import in file: %s", importFile).Error()
}
importProgress.AddTaskError(discord.ChannelsCreationTask, importError)
progressUpdates <- importProgress
continue
}
progressValue := float32(len(processedChannelIds)) / float32(totalChannelsCount)
importProgress.CurrentChunk = i + 1
// We actually only ever receive a single category
// from `exportData` but since it's a map, we still have to
// iterate over it to access its values
for _, category := range exportData.Categories {
categories := discordCommunity.Categories()
exists := false
for catID := range categories {
if strings.HasSuffix(catID, category.ID) {
exists = true
break
}
}
if !exists {
createCommunityCategoryRequest := &requests.CreateCommunityCategory{
CommunityID: discordCommunity.ID(),
CategoryName: category.Name,
ThirdPartyID: category.ID,
ChatIDs: make([]string, 0),
}
// We call `CreateCategory` on `communitiesManager` directly so we can control
// whether or not the community update should be published (it should not until the
// import has finished)
communityWithCategories, changes, err := m.communitiesManager.CreateCategory(createCommunityCategoryRequest, false)
if err != nil {
m.cleanUpImport(communityID)
importProgress.AddTaskError(discord.CommunityCreationTask, discord.Error(err.Error()))
importProgress.StopTask(discord.CommunityCreationTask)
progressUpdates <- importProgress
return
}
discordCommunity = communityWithCategories
// This looks like we keep overriding the same field but there's
// only one `CategoriesAdded` change at this point.
for _, addedCategory := range changes.CategoriesAdded {
processedCategoriesIds[category.ID] = addedCategory.CategoryId
}
}
}
progressValue := calculateProgress(i+1, totalImportChunkCount, (float32(1) / 2))
importProgress.UpdateTaskProgress(discord.ChannelsCreationTask, progressValue)
progressUpdates <- importProgress
if m.DiscordImportMarkedAsCancelled(communityID) {
importProgress.StopTask(discord.CommunityCreationTask)
progressUpdates <- importProgress
cancel <- communityID
return
}
messagesToSave := make(map[string]*common.Message, 0)
pinMessagesToSave := make([]*common.PinMessage, 0)
authorProfilesToSave := make(map[string]*protobuf.DiscordMessageAuthor, 0)
messageAttachmentsToDownload := make([]*protobuf.DiscordMessageAttachment, 0)
// Save to access the first item here as we process
// exported data by files which only holds a single channel
channel := exportData.ExportedData[0]
chatIDs := discordCommunity.ChatIDs()
exists := false
for _, chatID := range chatIDs {
if strings.HasSuffix(chatID, channel.Channel.ID) {
exists = true
break
}
}
if !exists {
communityChat := &protobuf.CommunityChat{
Permissions: &protobuf.CommunityPermissions{
Access: protobuf.CommunityPermissions_NO_MEMBERSHIP,
},
Identity: &protobuf.ChatIdentity{
DisplayName: channel.Channel.Name,
Emoji: "",
Description: channel.Channel.Description,
Color: discordCommunity.Color(),
},
CategoryId: processedCategoriesIds[channel.Channel.CategoryID],
}
// We call `CreateChat` on `communitiesManager` directly to get more control
// over whether we want to publish the updated community description.
communityWithChats, changes, err := m.communitiesManager.CreateChat(discordCommunity.ID(), communityChat, false, channel.Channel.ID)
if err != nil {
m.cleanUpImport(communityID)
errmsg := err.Error()
if _errors.Is(err, communities.ErrInvalidCommunityDescriptionDuplicatedName) {
errmsg = fmt.Sprintf("Couldn't create channel '%s': %s", communityChat.Identity.DisplayName, err.Error())
}
importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg))
importProgress.StopTask(discord.ChannelsCreationTask)
progressUpdates <- importProgress
return
}
discordCommunity = communityWithChats
// This looks like we keep overriding the chat id value
// as we iterate over `ChatsAdded`, however at this point we
// know there was only a single such change (and it's a map)
for chatID, chat := range changes.ChatsAdded {
c := CreateCommunityChat(communityID, chatID, chat, m.getTimesource())
chatsToSave = append(chatsToSave, c)
processedChannelIds[channel.Channel.ID] = c.ID
}
}
progressValue = calculateProgress(i+1, totalImportChunkCount, 1)
importProgress.UpdateTaskProgress(discord.ChannelsCreationTask, progressValue)
progressUpdates <- importProgress
for _, discordMessage := range channel.Messages {
progressValue := float32(len(messagesToSave)) / float32(totalMessageCount)
for ii, discordMessage := range channel.Messages {
timestamp, err := time.Parse(discordTimestampLayout, discordMessage.Timestamp)
if err != nil {
m.logger.Error("failed to parse discord message timestamp", zap.Error(err))
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Warning(err.Error()))
importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue)
progressUpdates <- importProgress
continue
}
if timestamp.Unix() < request.From {
progressUpdates <- importProgress
continue
}
@ -2558,7 +2584,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
if err != nil {
m.logger.Error("failed to check if message author exists in database", zap.Error(err))
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error()))
importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue)
progressUpdates <- importProgress
continue
}
@ -2566,7 +2592,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
err := m.persistence.SaveDiscordMessageAuthor(discordMessage.Author)
if err != nil {
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error()))
importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue)
progressUpdates <- importProgress
continue
}
}
@ -2575,7 +2601,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
if err != nil {
m.logger.Error("failed to check if message avatar payload exists in database", zap.Error(err))
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error()))
importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue)
progressUpdates <- importProgress
continue
}
@ -2591,7 +2617,6 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
if err != nil {
m.logger.Error("failed to parse discord message timestamp", zap.Error(err))
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Warning(err.Error()))
importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue)
progressUpdates <- importProgress
continue
}
@ -2641,7 +2666,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
if err != nil {
m.logger.Error("failed to prepare message content", zap.Error(err))
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error()))
importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue)
progressUpdates <- importProgress
continue
}
@ -2662,7 +2687,6 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
if err != nil {
m.logger.Error("failed to parse marshal pin message", zap.Error(err))
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Warning(err.Error()))
importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue)
progressUpdates <- importProgress
continue
}
@ -2671,7 +2695,6 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
if err != nil {
m.logger.Error("failed to wrap pin message", zap.Error(err))
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Warning(err.Error()))
importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue)
progressUpdates <- importProgress
continue
}
@ -2692,44 +2715,10 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
} else {
messagesToSave[communityID+discordMessage.Id] = messageToSave
}
}
// We're multiplying `progressValue` by `0.5` so we leave 50% for actual save operations
// The 0.5 could be calculated but we'd need to know the total message chunks count,
// which we don't at this point
progressValue = float32(len(messagesToSave)) / float32(totalMessageCount) * 0.5
importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue)
progressUpdates <- importProgress
if m.DiscordImportMarkedAsCancelled(communityID) {
importProgress.StopTask(discord.ImportMessagesTask)
progressValue := calculateProgress(i+1, totalImportChunkCount, float32(ii+1)/float32(len(channel.Messages))*0.5)
importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue)
progressUpdates <- importProgress
cancel <- communityID
return
}
}
var discordMessages []*protobuf.DiscordMessage
for _, msg := range messagesToSave {
discordMessages = append(discordMessages, msg.GetDiscordMessage())
}
// We save these messages in chunks so we don't block the database
// for a longer period of time
discordMessageChunks := chunkSlice(discordMessages, maxChunkSizeMessages)
chunksCount := len(discordMessageChunks)
// Signal to clients that save operations are starting
importProgress.UpdateTaskState(discord.ImportMessagesTask, discord.TaskStateSaving)
progressUpdates <- importProgress
for i, msgs := range discordMessageChunks {
err = m.persistence.SaveDiscordMessages(msgs)
if err != nil {
m.cleanUpImport(communityID)
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error()))
importProgress.StopTask(discord.ImportMessagesTask)
progressUpdates <- importProgress
return
}
if m.DiscordImportMarkedAsCancelled(communityID) {
@ -2739,198 +2728,155 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
return
}
// We're multiplying `chunksCount` by `0.25` so we leave 25% for additional save operations
// 0.5 are the previous 50% of progress
currentCount := i + 1
progressValue := 0.5 + (float32(currentCount) / float32(chunksCount) * 0.25)
importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue)
// We slow down the saving of message chunks to keep the database responsive
if currentCount < chunksCount {
time.Sleep(2 * time.Second)
}
}
importProgress.UpdateTaskProgress(discord.ImportMessagesTask, 0.75)
var messages []*common.Message
for _, msg := range messagesToSave {
messages = append(messages, msg)
}
// Same as above, we save these messages in chunks so we don't block
// the database for a longer period of time
messageChunks := chunkSlice(messages, maxChunkSizeMessages)
chunksCount = len(messageChunks)
for i, msgs := range messageChunks {
err = m.persistence.SaveMessages(msgs)
if err != nil {
m.cleanUpImport(communityID)
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error()))
importProgress.StopTask(discord.ImportMessagesTask)
progressUpdates <- importProgress
return
var discordMessages []*protobuf.DiscordMessage
for _, msg := range messagesToSave {
discordMessages = append(discordMessages, msg.GetDiscordMessage())
}
if m.DiscordImportMarkedAsCancelled(communityID) {
importProgress.StopTask(discord.ImportMessagesTask)
progressUpdates <- importProgress
cancel <- communityID
return
}
// We save these messages in chunks so we don't block the database
// for a longer period of time
discordMessageChunks := chunkSlice(discordMessages, maxChunkSizeMessages)
chunksCount := len(discordMessageChunks)
// 0.75 are the previous 75% of progress, hence we multiply our chunk progress
// by 0.25
currentCount := i + 1
progressValue := 0.75 + ((float32(currentCount) / float32(chunksCount)) * 0.25)
importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue)
// We slow down the saving of message chunks to keep the database responsive
if currentCount < chunksCount {
time.Sleep(2 * time.Second)
}
}
pinMessageChunks := chunkSlice(pinMessagesToSave, maxChunkSizeMessages)
for _, pinMsgs := range pinMessageChunks {
err = m.persistence.SavePinMessages(pinMsgs)
if err != nil {
m.cleanUpImport(communityID)
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error()))
importProgress.StopTask(discord.ImportMessagesTask)
progressUpdates <- importProgress
return
}
if m.DiscordImportMarkedAsCancelled(communityID) {
importProgress.StopTask(discord.ImportMessagesTask)
progressUpdates <- importProgress
cancel <- communityID
return
}
}
importProgress.UpdateTaskProgress(discord.ImportMessagesTask, 1)
progressUpdates <- importProgress
totalAssetsCount := len(messageAttachmentsToDownload) + len(authorProfilesToSave)
var assetCounter discord.AssetCounter
var wg sync.WaitGroup
for id, author := range authorProfilesToSave {
wg.Add(1)
go func(id string, author *protobuf.DiscordMessageAuthor) {
defer wg.Done()
imagePayload, err := discord.DownloadAvatarAsset(author.AvatarUrl)
for ii, msgs := range discordMessageChunks {
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord messages", ii+1, chunksCount, len(msgs)))
err = m.persistence.SaveDiscordMessages(msgs)
if err != nil {
errmsg := fmt.Sprintf("Couldn't download profile avatar '%s': %s", author.AvatarUrl, err.Error())
importProgress.AddTaskError(
discord.DownloadAssetsTask,
discord.Warning(errmsg),
)
m.cleanUpImport(communityID)
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error()))
importProgress.StopTask(discord.ImportMessagesTask)
progressUpdates <- importProgress
return
}
err = m.persistence.UpdateDiscordMessageAuthorImage(author.Id, imagePayload)
if err != nil {
importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Warning(err.Error()))
if m.DiscordImportMarkedAsCancelled(communityID) {
importProgress.StopTask(discord.ImportMessagesTask)
progressUpdates <- importProgress
cancel <- communityID
return
}
author.AvatarImagePayload = imagePayload
authorProfilesToSave[id] = author
assetCounter.Increase()
progressValue := float32(assetCounter.Value()) / float32(totalAssetsCount)
importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue)
// We're multiplying `chunksCount` by `0.25` so we leave 25% for additional save operations
// 0.5 are the previous 50% of progress
currentCount := ii + 1
progressValue := calculateProgress(i+1, totalImportChunkCount, 0.5+(float32(currentCount)/float32(chunksCount))*0.25)
importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue)
progressUpdates <- importProgress
if m.DiscordImportMarkedAsCancelled(discordCommunity.IDString()) {
importProgress.StopTask(discord.DownloadAssetsTask)
// We slow down the saving of message chunks to keep the database responsive
if currentCount < chunksCount {
time.Sleep(2 * time.Second)
}
}
var messages []*common.Message
for _, msg := range messagesToSave {
messages = append(messages, msg)
}
// Same as above, we save these messages in chunks so we don't block
// the database for a longer period of time
messageChunks := chunkSlice(messages, maxChunkSizeMessages)
chunksCount = len(messageChunks)
for ii, msgs := range messageChunks {
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d app messages", ii+1, chunksCount, len(msgs)))
err = m.persistence.SaveMessages(msgs)
if err != nil {
m.cleanUpImport(communityID)
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error()))
importProgress.StopTask(discord.ImportMessagesTask)
progressUpdates <- importProgress
cancel <- discordCommunity.IDString()
return
}
}(id, author)
}
wg.Wait()
if m.DiscordImportMarkedAsCancelled(communityID) {
importProgress.StopTask(discord.DownloadAssetsTask)
progressUpdates <- importProgress
cancel <- communityID
return
}
if m.DiscordImportMarkedAsCancelled(communityID) {
importProgress.StopTask(discord.ImportMessagesTask)
progressUpdates <- importProgress
cancel <- communityID
return
}
for idxRange := range gopart.Partition(len(messageAttachmentsToDownload), 100) {
attachments := messageAttachmentsToDownload[idxRange.Low:idxRange.High]
wg.Add(1)
go func(attachments []*protobuf.DiscordMessageAttachment) {
defer wg.Done()
for i, attachment := range attachments {
assetPayload, contentType, err := discord.DownloadAsset(attachment.Url)
// 0.75 are the previous 75% of progress, hence we multiply our chunk progress
// by 0.25
currentCount := ii + 1
progressValue := calculateProgress(i+1, totalImportChunkCount, 0.75+(float32(currentCount)/float32(chunksCount))*0.25)
// progressValue := 0.75 + ((float32(currentCount) / float32(chunksCount)) * 0.25)
importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue)
progressUpdates <- importProgress
// We slow down the saving of message chunks to keep the database responsive
if currentCount < chunksCount {
time.Sleep(2 * time.Second)
}
}
pinMessageChunks := chunkSlice(pinMessagesToSave, maxChunkSizeMessages)
for _, pinMsgs := range pinMessageChunks {
err = m.persistence.SavePinMessages(pinMsgs)
if err != nil {
m.cleanUpImport(communityID)
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error()))
importProgress.StopTask(discord.ImportMessagesTask)
progressUpdates <- importProgress
return
}
if m.DiscordImportMarkedAsCancelled(communityID) {
importProgress.StopTask(discord.ImportMessagesTask)
progressUpdates <- importProgress
cancel <- communityID
return
}
}
totalAssetsCount := len(messageAttachmentsToDownload) + len(authorProfilesToSave)
var assetCounter discord.AssetCounter
var wg sync.WaitGroup
for id, author := range authorProfilesToSave {
wg.Add(1)
go func(id string, author *protobuf.DiscordMessageAuthor) {
defer wg.Done()
m.communitiesManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount))
imagePayload, err := discord.DownloadAvatarAsset(author.AvatarUrl)
if err != nil {
errmsg := fmt.Sprintf("Couldn't download message attachment '%s': %s", attachment.Url, err.Error())
errmsg := fmt.Sprintf("Couldn't download profile avatar '%s': %s", author.AvatarUrl, err.Error())
importProgress.AddTaskError(
discord.DownloadAssetsTask,
discord.Warning(errmsg),
)
progressUpdates <- importProgress
continue
return
}
attachment.Payload = assetPayload
attachment.ContentType = contentType
messageAttachmentsToDownload[i] = attachment
err = m.persistence.UpdateDiscordMessageAuthorImage(author.Id, imagePayload)
if err != nil {
importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Warning(err.Error()))
progressUpdates <- importProgress
return
}
author.AvatarImagePayload = imagePayload
authorProfilesToSave[id] = author
if m.DiscordImportMarkedAsCancelled(discordCommunity.IDString()) {
importProgress.StopTask(discord.DownloadAssetsTask)
progressUpdates <- importProgress
cancel <- discordCommunity.IDString()
return
}
assetCounter.Increase()
// Multiplying progress by `0.5` to leave 50% for saving assets to DB
// similar to how it's done for messages
progressValue := (float32(assetCounter.Value()) / float32(totalAssetsCount)) * 0.5
progressValue := calculateProgress(i+1, totalImportChunkCount, (float32(assetCounter.Value())/float32(totalAssetsCount))*0.25)
importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue)
progressUpdates <- importProgress
if m.DiscordImportMarkedAsCancelled(communityID) {
importProgress.StopTask(discord.DownloadAssetsTask)
progressUpdates <- importProgress
cancel <- communityID
return
}
}
}(attachments)
}
wg.Wait()
if m.DiscordImportMarkedAsCancelled(communityID) {
importProgress.StopTask(discord.DownloadAssetsTask)
progressUpdates <- importProgress
cancel <- communityID
return
}
// Signal to the client that save operations are starting
importProgress.UpdateTaskState(discord.DownloadAssetsTask, discord.TaskStateSaving)
progressUpdates <- importProgress
// We chunk message attachments by `maxChunkSizeBytes` to ensure individual
// save operations don't take too long and block the database
attachmentChunks := chunkAttachmentsByByteSize(messageAttachmentsToDownload, maxChunkSizeBytes)
chunksCount = len(attachmentChunks)
for i, attachments := range attachmentChunks {
err = m.persistence.SaveDiscordMessageAttachments(attachments)
if err != nil {
m.cleanUpImport(communityID)
importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Error(err.Error()))
importProgress.Stop()
progressUpdates <- importProgress
return
}(id, author)
}
wg.Wait()
if m.DiscordImportMarkedAsCancelled(communityID) {
importProgress.StopTask(discord.DownloadAssetsTask)
@ -2939,21 +2885,140 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
return
}
// 0.5 are the previous 50% of progress, hence we multiply our chunk progress
// by 0.5
currentCount := i + 1
progressValue := 0.5 + ((float32(currentCount) / float32(chunksCount)) * 0.5)
importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue)
for idxRange := range gopart.Partition(len(messageAttachmentsToDownload), 100) {
attachments := messageAttachmentsToDownload[idxRange.Low:idxRange.High]
wg.Add(1)
go func(attachments []*protobuf.DiscordMessageAttachment) {
defer wg.Done()
for ii, attachment := range attachments {
// We slow down the saving of attachment chunks to keep the database responsive
if currentCount < chunksCount {
time.Sleep(2 * time.Second)
m.communitiesManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount))
assetPayload, contentType, err := discord.DownloadAsset(attachment.Url)
if err != nil {
errmsg := fmt.Sprintf("Couldn't download message attachment '%s': %s", attachment.Url, err.Error())
importProgress.AddTaskError(
discord.DownloadAssetsTask,
discord.Warning(errmsg),
)
progressUpdates <- importProgress
continue
}
attachment.Payload = assetPayload
attachment.ContentType = contentType
messageAttachmentsToDownload[ii] = attachment
if m.DiscordImportMarkedAsCancelled(communityID) {
importProgress.StopTask(discord.DownloadAssetsTask)
progressUpdates <- importProgress
cancel <- communityID
return
}
assetCounter.Increase()
progressValue := calculateProgress(i+1, totalImportChunkCount, 0.25+(float32(assetCounter.Value())/float32(totalAssetsCount))*0.25)
importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue)
progressUpdates <- importProgress
}
}(attachments)
}
wg.Wait()
if m.DiscordImportMarkedAsCancelled(communityID) {
importProgress.StopTask(discord.DownloadAssetsTask)
progressUpdates <- importProgress
cancel <- communityID
return
}
attachmentChunks := chunkAttachmentsByByteSize(messageAttachmentsToDownload, maxChunkSizeBytes)
chunksCount = len(attachmentChunks)
for ii, attachments := range attachmentChunks {
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord message attachments", ii+1, chunksCount, len(attachments)))
err = m.persistence.SaveDiscordMessageAttachments(attachments)
if err != nil {
m.cleanUpImport(communityID)
importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Error(err.Error()))
importProgress.Stop()
progressUpdates <- importProgress
return
}
if m.DiscordImportMarkedAsCancelled(communityID) {
importProgress.StopTask(discord.DownloadAssetsTask)
progressUpdates <- importProgress
cancel <- communityID
return
}
// 0.5 are the previous 50% of progress, hence we multiply our chunk progress
// by 0.5
currentCount := ii + 1
progressValue := calculateProgress(i+1, totalImportChunkCount, 0.5+(float32(currentCount)/float32(chunksCount))*0.5)
importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue)
progressUpdates <- importProgress
// We slow down the saving of attachment chunks to keep the database responsive
if currentCount < chunksCount {
time.Sleep(2 * time.Second)
}
}
_, err := m.transport.JoinPublic(processedChannelIds[channel.Channel.ID])
if err != nil {
m.logger.Error("failed to load filter for chat", zap.Error(err))
continue
}
wakuChatMessages, err := m.chatMessagesToWakuMessages(messages, discordCommunity)
if err != nil {
m.logger.Error("failed to convert chat messages into waku messages", zap.Error(err))
continue
}
wakuPinMessages, err := m.pinMessagesToWakuMessages(pinMessagesToSave, discordCommunity)
if err != nil {
m.logger.Error("failed to convert pin messages into waku messages", zap.Error(err))
continue
}
wakuMessages := append(wakuChatMessages, wakuPinMessages...)
topics, err := m.communitiesManager.GetCommunityChatsTopics(discordCommunity.ID())
if err != nil {
m.logger.Error("failed to get community chat topics", zap.Error(err))
continue
}
startDate := time.Unix(int64(exportData.OldestMessageTimestamp), 0)
endDate := time.Now()
_, err = m.communitiesManager.CreateHistoryArchiveTorrentFromMessages(
discordCommunity.ID(),
wakuMessages,
topics,
startDate,
endDate,
messageArchiveInterval,
discordCommunity.Encrypted(),
)
if err != nil {
m.logger.Error("failed to create history archive torrent", zap.Error(err))
continue
}
if m.torrentClientReady() && communitySettings.HistoryArchiveSupportEnabled {
err = m.communitiesManager.SeedHistoryArchiveTorrent(discordCommunity.ID())
if err != nil {
m.logger.Error("failed to seed history archive", zap.Error(err))
}
go m.communitiesManager.StartHistoryArchiveTasksInterval(discordCommunity, messageArchiveInterval)
}
}
importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, 1)
progressUpdates <- importProgress
err = m.publishOrg(discordCommunity)
if err != nil {
m.cleanUpImport(communityID)
@ -2980,6 +3045,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
progressUpdates <- importProgress
return
}
importProgress.UpdateTaskProgress(discord.InitCommunityTask, 0.15)
progressUpdates <- importProgress
@ -3009,12 +3075,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
return
}
filterChatIds := discordCommunity.DefaultFilters()
for _, chatID := range processedChannelIds {
filterChatIds = append(filterChatIds, chatID)
}
filters, err := m.transport.InitPublicFilters(filterChatIds)
_, err = m.transport.InitPublicFilters(discordCommunity.DefaultFilters())
if err != nil {
m.cleanUpImport(communityID)
importProgress.AddTaskError(discord.InitCommunityTask, discord.Error(err.Error()))
@ -3033,6 +3094,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
return
}
filters := m.transport.Filters()
_, err = m.scheduleSyncFilters(filters)
if err != nil {
m.cleanUpImport(communityID)
@ -3071,55 +3133,17 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
m.config.messengerSignalsHandler.DiscordCommunityImportFinished(communityID)
close(done)
wakuChatMessages, err := m.chatMessagesToWakuMessages(messages, discordCommunity)
if err != nil {
m.logger.Error("failed to convert chat messages into waku messages", zap.Error(err))
return
}
wakuPinMessages, err := m.pinMessagesToWakuMessages(pinMessagesToSave, discordCommunity)
if err != nil {
m.logger.Error("failed to convert pin messages into waku messages", zap.Error(err))
return
}
wakuMessages := append(wakuChatMessages, wakuPinMessages...)
topics, err := m.communitiesManager.GetCommunityChatsTopics(discordCommunity.ID())
if err != nil {
m.logger.Error("failed to get community chat topics", zap.Error(err))
return
}
startDate := time.Unix(int64(exportData.OldestMessageTimestamp), 0)
endDate := time.Now()
_, err = m.communitiesManager.CreateHistoryArchiveTorrentFromMessages(
discordCommunity.ID(),
wakuMessages,
topics,
startDate,
endDate,
messageArchiveInterval,
discordCommunity.Encrypted(),
)
if err != nil {
m.logger.Error("failed to create history archive torrent", zap.Error(err))
return
}
if m.torrentClientReady() && communitySettings.HistoryArchiveSupportEnabled {
err = m.communitiesManager.SeedHistoryArchiveTorrent(discordCommunity.ID())
if err != nil {
m.logger.Error("failed to seed history archive", zap.Error(err))
}
go m.communitiesManager.StartHistoryArchiveTasksInterval(discordCommunity, messageArchiveInterval)
}
}()
}
func calculateProgress(i int, t int, currentProgress float32) float32 {
current := float32(1) / float32(t) * currentProgress
if i > 1 {
return float32(i-1)/float32(t) + current
}
return current
}
func (m *Messenger) MarkDiscordCommunityImportAsCancelled(communityID string) {
m.importingCommunities[communityID] = true
}

View File

@ -13,6 +13,7 @@ type CreateCommunityCategory struct {
CommunityID types.HexBytes `json:"communityId"`
CategoryName string `json:"categoryName"`
ChatIDs []string `json:"chatIds"`
ThirdPartyID string `json:"thirdPartyID,omitempty"`
}
func (j *CreateCommunityCategory) Validate() error {