feat(discord): Import single channel from discord (#4160)
This commit is contained in:
parent
3452eb72a8
commit
8ae6e3035b
|
@ -154,6 +154,8 @@ type ImportTasks map[ImportTask]*ImportTaskProgress
|
|||
type ImportProgress struct {
|
||||
CommunityID string `json:"communityId,omitempty"`
|
||||
CommunityName string `json:"communityName"`
|
||||
ChannelID string `json:"channelId"`
|
||||
ChannelName string `json:"channelName"`
|
||||
CommunityImages map[string]images.IdentityImage `json:"communityImages"`
|
||||
Tasks []*ImportTaskProgress `json:"tasks"`
|
||||
Progress float32 `json:"progress"`
|
||||
|
|
|
@ -147,6 +147,7 @@ type Messenger struct {
|
|||
cancel context.CancelFunc
|
||||
|
||||
importingCommunities map[string]bool
|
||||
importingChannels map[string]bool
|
||||
importRateLimiter *rate.Limiter
|
||||
importDelayer struct {
|
||||
wait chan struct{}
|
||||
|
@ -540,6 +541,7 @@ func NewMessenger(
|
|||
requestedContactsLock: sync.RWMutex{},
|
||||
requestedContacts: make(map[string]*transport.Filter),
|
||||
importingCommunities: make(map[string]bool),
|
||||
importingChannels: make(map[string]bool),
|
||||
importRateLimiter: rate.NewLimiter(rate.Every(importSlowRate), 1),
|
||||
importDelayer: struct {
|
||||
wait chan struct{}
|
||||
|
|
|
@ -3810,6 +3810,740 @@ func (m *Messenger) RequestExtractDiscordChannelsAndCategories(filesToImport []s
|
|||
}()
|
||||
}
|
||||
|
||||
func (m *Messenger) saveDiscordAuthorIfNotExists(discordAuthor *protobuf.DiscordMessageAuthor) *discord.ImportError {
|
||||
exists, err := m.persistence.HasDiscordMessageAuthor(discordAuthor.GetId())
|
||||
if err != nil {
|
||||
m.logger.Error("failed to check if message author exists in database", zap.Error(err))
|
||||
return discord.Error(err.Error())
|
||||
}
|
||||
|
||||
if !exists {
|
||||
err := m.persistence.SaveDiscordMessageAuthor(discordAuthor)
|
||||
if err != nil {
|
||||
return discord.Error(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Messenger) convertDiscordMessageTimeStamp(discordMessage *protobuf.DiscordMessage, timestamp time.Time) *discord.ImportError {
|
||||
discordMessage.Timestamp = fmt.Sprintf("%d", timestamp.Unix())
|
||||
|
||||
if discordMessage.TimestampEdited != "" {
|
||||
timestampEdited, err := time.Parse(discordTimestampLayout, discordMessage.TimestampEdited)
|
||||
if err != nil {
|
||||
m.logger.Error("failed to parse discord message timestamp", zap.Error(err))
|
||||
return discord.Warning(err.Error())
|
||||
}
|
||||
// Convert timestamp to unix timestamp
|
||||
discordMessage.TimestampEdited = fmt.Sprintf("%d", timestampEdited.Unix())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Messenger) createPinMessageFromDiscordMessage(message *common.Message, pinnedMessageID string, channelID string, community *communities.Community) (*common.PinMessage, *discord.ImportError) {
|
||||
pinMessage := protobuf.PinMessage{
|
||||
Clock: message.WhisperTimestamp,
|
||||
MessageId: pinnedMessageID,
|
||||
ChatId: message.LocalChatID,
|
||||
MessageType: protobuf.MessageType_COMMUNITY_CHAT,
|
||||
Pinned: true,
|
||||
}
|
||||
|
||||
encodedPayload, err := proto.Marshal(&pinMessage)
|
||||
if err != nil {
|
||||
m.logger.Error("failed to parse marshal pin message", zap.Error(err))
|
||||
return nil, discord.Warning(err.Error())
|
||||
}
|
||||
|
||||
wrappedPayload, err := v1protocol.WrapMessageV1(encodedPayload, protobuf.ApplicationMetadataMessage_PIN_MESSAGE, community.PrivateKey())
|
||||
if err != nil {
|
||||
m.logger.Error("failed to wrap pin message", zap.Error(err))
|
||||
return nil, discord.Warning(err.Error())
|
||||
}
|
||||
|
||||
pinMessageToSave := &common.PinMessage{
|
||||
ID: types.EncodeHex(v1protocol.MessageID(&community.PrivateKey().PublicKey, wrappedPayload)),
|
||||
PinMessage: &pinMessage,
|
||||
LocalChatID: channelID,
|
||||
From: message.From,
|
||||
SigPubKey: message.SigPubKey,
|
||||
WhisperTimestamp: message.WhisperTimestamp,
|
||||
}
|
||||
|
||||
return pinMessageToSave, nil
|
||||
}
|
||||
|
||||
func (m *Messenger) generateSystemPinnedMessage(pinMessage *common.PinMessage, channel *Chat, clockAndTimestamp uint64, pinnedMessageID string) (*common.Message, *discord.ImportError) {
|
||||
id, err := generatePinMessageNotificationID(&m.identity.PublicKey, pinMessage, channel)
|
||||
if err != nil {
|
||||
m.logger.Warn("failed to generate pin message notification ID",
|
||||
zap.String("PinMessageId", pinMessage.ID))
|
||||
return nil, discord.Warning(err.Error())
|
||||
}
|
||||
systemMessage := &common.Message{
|
||||
ChatMessage: &protobuf.ChatMessage{
|
||||
Clock: pinMessage.Clock,
|
||||
Timestamp: clockAndTimestamp,
|
||||
ChatId: channel.ID,
|
||||
MessageType: pinMessage.MessageType,
|
||||
ResponseTo: pinnedMessageID,
|
||||
ContentType: protobuf.ChatMessage_SYSTEM_MESSAGE_PINNED_MESSAGE,
|
||||
},
|
||||
WhisperTimestamp: clockAndTimestamp,
|
||||
ID: id,
|
||||
LocalChatID: channel.ID,
|
||||
From: pinMessage.From,
|
||||
Seen: true,
|
||||
}
|
||||
|
||||
return systemMessage, nil
|
||||
}
|
||||
|
||||
func (m *Messenger) processDiscordMessages(discordChannel *discord.ExportedData,
|
||||
channel *Chat,
|
||||
importProgress *discord.ImportProgress,
|
||||
progressUpdates chan *discord.ImportProgress,
|
||||
fromDate int64,
|
||||
community *communities.Community) (
|
||||
map[string]*common.Message,
|
||||
[]*common.PinMessage,
|
||||
map[string]*protobuf.DiscordMessageAuthor,
|
||||
[]*protobuf.DiscordMessageAttachment) {
|
||||
|
||||
messagesToSave := make(map[string]*common.Message, 0)
|
||||
pinMessagesToSave := make([]*common.PinMessage, 0)
|
||||
authorProfilesToSave := make(map[string]*protobuf.DiscordMessageAuthor, 0)
|
||||
messageAttachmentsToDownload := make([]*protobuf.DiscordMessageAttachment, 0)
|
||||
|
||||
for _, discordMessage := range discordChannel.Messages {
|
||||
|
||||
timestamp, err := time.Parse(discordTimestampLayout, discordMessage.Timestamp)
|
||||
if err != nil {
|
||||
m.logger.Error("failed to parse discord message timestamp", zap.Error(err))
|
||||
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Warning(err.Error()))
|
||||
progressUpdates <- importProgress
|
||||
continue
|
||||
}
|
||||
|
||||
if timestamp.Unix() < fromDate {
|
||||
progressUpdates <- importProgress
|
||||
continue
|
||||
}
|
||||
|
||||
importErr := m.saveDiscordAuthorIfNotExists(discordMessage.Author)
|
||||
if importErr != nil {
|
||||
importProgress.AddTaskError(discord.ImportMessagesTask, importErr)
|
||||
progressUpdates <- importProgress
|
||||
continue
|
||||
}
|
||||
|
||||
hasPayload, err := m.persistence.HasDiscordMessageAuthorImagePayload(discordMessage.Author.GetId())
|
||||
if err != nil {
|
||||
m.logger.Error("failed to check if message avatar payload exists in database", zap.Error(err))
|
||||
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error()))
|
||||
progressUpdates <- importProgress
|
||||
continue
|
||||
}
|
||||
|
||||
if !hasPayload {
|
||||
authorProfilesToSave[discordMessage.Author.Id] = discordMessage.Author
|
||||
}
|
||||
|
||||
// Convert timestamp to unix timestamp
|
||||
importErr = m.convertDiscordMessageTimeStamp(discordMessage, timestamp)
|
||||
if importErr != nil {
|
||||
importProgress.AddTaskError(discord.ImportMessagesTask, importErr)
|
||||
progressUpdates <- importProgress
|
||||
continue
|
||||
}
|
||||
|
||||
for i := range discordMessage.Attachments {
|
||||
discordMessage.Attachments[i].MessageId = discordMessage.Id
|
||||
}
|
||||
messageAttachmentsToDownload = append(messageAttachmentsToDownload, discordMessage.Attachments...)
|
||||
|
||||
clockAndTimestamp := uint64(timestamp.Unix()) * 1000
|
||||
communityPubKey := community.PrivateKey().PublicKey
|
||||
|
||||
chatMessage := protobuf.ChatMessage{
|
||||
Timestamp: clockAndTimestamp,
|
||||
MessageType: protobuf.MessageType_COMMUNITY_CHAT,
|
||||
ContentType: protobuf.ChatMessage_DISCORD_MESSAGE,
|
||||
Clock: clockAndTimestamp,
|
||||
ChatId: channel.ID,
|
||||
Payload: &protobuf.ChatMessage_DiscordMessage{
|
||||
DiscordMessage: discordMessage,
|
||||
},
|
||||
}
|
||||
|
||||
// Handle message replies
|
||||
if discordMessage.Type == string(discord.MessageTypeReply) && discordMessage.Reference != nil {
|
||||
repliedMessageID := community.IDString() + discordMessage.Reference.MessageId
|
||||
if _, exists := messagesToSave[repliedMessageID]; exists {
|
||||
chatMessage.ResponseTo = repliedMessageID
|
||||
}
|
||||
}
|
||||
|
||||
messageToSave := &common.Message{
|
||||
ID: community.IDString() + discordMessage.Id,
|
||||
WhisperTimestamp: clockAndTimestamp,
|
||||
From: types.EncodeHex(crypto.FromECDSAPub(&communityPubKey)),
|
||||
Seen: true,
|
||||
LocalChatID: channel.ID,
|
||||
SigPubKey: &communityPubKey,
|
||||
CommunityID: community.IDString(),
|
||||
ChatMessage: &chatMessage,
|
||||
}
|
||||
|
||||
err = messageToSave.PrepareContent(common.PubkeyToHex(&m.identity.PublicKey))
|
||||
if err != nil {
|
||||
m.logger.Error("failed to prepare message content", zap.Error(err))
|
||||
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error()))
|
||||
progressUpdates <- importProgress
|
||||
continue
|
||||
}
|
||||
|
||||
// Handle pin messages
|
||||
if discordMessage.Type == string(discord.MessageTypeChannelPinned) && discordMessage.Reference != nil {
|
||||
|
||||
pinnedMessageID := community.IDString() + discordMessage.Reference.MessageId
|
||||
_, exists := messagesToSave[pinnedMessageID]
|
||||
if exists {
|
||||
pinMessageToSave, importErr := m.createPinMessageFromDiscordMessage(messageToSave, pinnedMessageID, channel.ID, community)
|
||||
if importErr != nil {
|
||||
importProgress.AddTaskError(discord.ImportMessagesTask, importErr)
|
||||
progressUpdates <- importProgress
|
||||
continue
|
||||
}
|
||||
|
||||
pinMessagesToSave = append(pinMessagesToSave, pinMessageToSave)
|
||||
|
||||
// Generate SystemMessagePinnedMessage
|
||||
systemMessage, importErr := m.generateSystemPinnedMessage(pinMessageToSave, channel, clockAndTimestamp, pinnedMessageID)
|
||||
if importErr != nil {
|
||||
importProgress.AddTaskError(discord.ImportMessagesTask, importErr)
|
||||
progressUpdates <- importProgress
|
||||
continue
|
||||
}
|
||||
|
||||
messagesToSave[systemMessage.ID] = systemMessage
|
||||
}
|
||||
} else {
|
||||
messagesToSave[messageToSave.ID] = messageToSave
|
||||
}
|
||||
}
|
||||
|
||||
return messagesToSave, pinMessagesToSave, authorProfilesToSave, messageAttachmentsToDownload
|
||||
}
|
||||
|
||||
func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordChannel) {
|
||||
go func() {
|
||||
totalImportChunkCount := len(request.FilesToImport)
|
||||
|
||||
progressUpdates := make(chan *discord.ImportProgress)
|
||||
|
||||
done := make(chan struct{})
|
||||
cancel := make(chan string)
|
||||
|
||||
var newChat *Chat
|
||||
|
||||
m.startPublishImportProgressInterval(progressUpdates, cancel, done)
|
||||
|
||||
importProgress := &discord.ImportProgress{}
|
||||
importProgress.Init(totalImportChunkCount, []discord.ImportTask{
|
||||
discord.ChannelsCreationTask,
|
||||
discord.ImportMessagesTask,
|
||||
discord.DownloadAssetsTask,
|
||||
})
|
||||
|
||||
importProgress.CommunityID = request.DiscordChannelID
|
||||
importProgress.CommunityName = request.Name
|
||||
// initial progress immediately
|
||||
|
||||
if err := request.Validate(); err != nil {
|
||||
errmsg := fmt.Sprintf("Request validation failed: '%s'", err.Error())
|
||||
importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg))
|
||||
importProgress.StopTask(discord.ChannelsCreationTask)
|
||||
progressUpdates <- importProgress
|
||||
cancel <- request.DiscordChannelID
|
||||
return
|
||||
}
|
||||
|
||||
// Here's 3 steps: Find the corrent channel in files, get the community and create the channel
|
||||
progressValue := calculateProgress(3, 1, (float32(1) / 3))
|
||||
|
||||
m.publishImportProgress(importProgress)
|
||||
|
||||
community, err := m.GetCommunityByID(request.CommunityID)
|
||||
|
||||
if err != nil {
|
||||
errmsg := fmt.Sprintf("Couldn't get the community '%s': '%s'", request.CommunityID, err.Error())
|
||||
importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg))
|
||||
importProgress.StopTask(discord.ChannelsCreationTask)
|
||||
progressUpdates <- importProgress
|
||||
cancel <- request.DiscordChannelID
|
||||
return
|
||||
}
|
||||
|
||||
if community == nil {
|
||||
errmsg := fmt.Sprintf("Couldn't get the community by id: '%s'", request.CommunityID)
|
||||
importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg))
|
||||
importProgress.StopTask(discord.ChannelsCreationTask)
|
||||
progressUpdates <- importProgress
|
||||
cancel <- request.DiscordChannelID
|
||||
return
|
||||
}
|
||||
|
||||
importProgress.UpdateTaskProgress(discord.ChannelsCreationTask, progressValue)
|
||||
progressUpdates <- importProgress
|
||||
|
||||
for i, importFile := range request.FilesToImport {
|
||||
m.importingChannels[request.DiscordChannelID] = false
|
||||
|
||||
exportData, errs := m.ExtractDiscordDataFromImportFiles([]string{importFile})
|
||||
if len(errs) > 0 {
|
||||
for _, err := range errs {
|
||||
importProgress.AddTaskError(discord.ChannelsCreationTask, err)
|
||||
}
|
||||
importProgress.StopTask(discord.ChannelsCreationTask)
|
||||
progressUpdates <- importProgress
|
||||
cancel <- request.DiscordChannelID
|
||||
return
|
||||
}
|
||||
|
||||
var channel *discord.ExportedData
|
||||
|
||||
for _, ch := range exportData.ExportedData {
|
||||
if ch.Channel.ID == request.DiscordChannelID {
|
||||
channel = ch
|
||||
}
|
||||
}
|
||||
|
||||
if channel == nil {
|
||||
if i < len(request.FilesToImport)-1 {
|
||||
// skip this file
|
||||
continue
|
||||
} else if i == len(request.FilesToImport)-1 {
|
||||
errmsg := fmt.Sprintf("Couldn't find the target channel id in files: '%s'", request.DiscordChannelID)
|
||||
importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg))
|
||||
importProgress.StopTask(discord.ChannelsCreationTask)
|
||||
progressUpdates <- importProgress
|
||||
cancel <- request.DiscordChannelID
|
||||
return
|
||||
}
|
||||
}
|
||||
progressValue := calculateProgress(3, 2, (float32(2) / 3))
|
||||
|
||||
importProgress.UpdateTaskProgress(discord.ChannelsCreationTask, progressValue)
|
||||
progressUpdates <- importProgress
|
||||
|
||||
if len(channel.Channel.ID) == 0 {
|
||||
// skip this file and try to find in the next file
|
||||
continue
|
||||
}
|
||||
exists := false
|
||||
|
||||
for _, chatID := range community.ChatIDs() {
|
||||
if strings.HasSuffix(chatID, request.DiscordChannelID) {
|
||||
exists = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !exists {
|
||||
|
||||
communityChat := &protobuf.CommunityChat{
|
||||
Permissions: &protobuf.CommunityPermissions{
|
||||
Access: protobuf.CommunityPermissions_NO_MEMBERSHIP,
|
||||
},
|
||||
Identity: &protobuf.ChatIdentity{
|
||||
DisplayName: request.Name,
|
||||
Emoji: request.Emoji,
|
||||
Description: request.Description,
|
||||
Color: request.Color,
|
||||
},
|
||||
CategoryId: "",
|
||||
}
|
||||
|
||||
changes, err := m.communitiesManager.CreateChat(request.CommunityID, communityChat, false, channel.Channel.ID)
|
||||
if err != nil {
|
||||
errmsg := err.Error()
|
||||
if errors.Is(err, communities.ErrInvalidCommunityDescriptionDuplicatedName) {
|
||||
errmsg = fmt.Sprintf("Couldn't create channel '%s': %s", communityChat.Identity.DisplayName, err.Error())
|
||||
}
|
||||
|
||||
importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg))
|
||||
importProgress.StopTask(discord.ChannelsCreationTask)
|
||||
progressUpdates <- importProgress
|
||||
cancel <- request.DiscordChannelID
|
||||
return
|
||||
}
|
||||
|
||||
community = changes.Community
|
||||
for chatID, chat := range changes.ChatsAdded {
|
||||
newChat = CreateCommunityChat(request.CommunityID.String(), chatID, chat, m.getTimesource())
|
||||
}
|
||||
|
||||
progressValue = calculateProgress(3, 3, (float32(3) / 3))
|
||||
|
||||
importProgress.UpdateTaskProgress(discord.ChannelsCreationTask, progressValue)
|
||||
progressUpdates <- importProgress
|
||||
}
|
||||
|
||||
if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) {
|
||||
importProgress.StopTask(discord.ImportMessagesTask)
|
||||
progressUpdates <- importProgress
|
||||
cancel <- request.DiscordChannelID
|
||||
return
|
||||
}
|
||||
|
||||
messagesToSave, pinMessagesToSave, authorProfilesToSave, messageAttachmentsToDownload :=
|
||||
m.processDiscordMessages(channel, newChat, importProgress, progressUpdates, request.From, community)
|
||||
|
||||
var discordMessages []*protobuf.DiscordMessage
|
||||
for _, msg := range messagesToSave {
|
||||
if msg.ChatMessage.ContentType == protobuf.ChatMessage_DISCORD_MESSAGE {
|
||||
discordMessages = append(discordMessages, msg.GetDiscordMessage())
|
||||
}
|
||||
}
|
||||
|
||||
// We save these messages in chunks, so we don't block the database
|
||||
// for a longer period of time
|
||||
discordMessageChunks := chunkSlice(discordMessages, maxChunkSizeMessages)
|
||||
chunksCount := len(discordMessageChunks)
|
||||
|
||||
for ii, msgs := range discordMessageChunks {
|
||||
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord messages", ii+1, chunksCount, len(msgs)))
|
||||
err := m.persistence.SaveDiscordMessages(msgs)
|
||||
if err != nil {
|
||||
m.cleanUpImportChannel(request.CommunityID.String(), newChat.ID)
|
||||
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error()))
|
||||
importProgress.StopTask(discord.ImportMessagesTask)
|
||||
progressUpdates <- importProgress
|
||||
cancel <- request.DiscordChannelID
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) {
|
||||
importProgress.StopTask(discord.ImportMessagesTask)
|
||||
progressUpdates <- importProgress
|
||||
cancel <- request.DiscordChannelID
|
||||
return
|
||||
}
|
||||
|
||||
// We're multiplying `chunksCount` by `0.25` so we leave 25% for additional save operations
|
||||
// 0.5 are the previous 50% of progress
|
||||
currentCount := ii + 1
|
||||
progressValue := calculateProgress(i+1, totalImportChunkCount, 0.5+(float32(currentCount)/float32(chunksCount))*0.25)
|
||||
importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue)
|
||||
progressUpdates <- importProgress
|
||||
|
||||
// We slow down the saving of message chunks to keep the database responsive
|
||||
if currentCount < chunksCount {
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// Get slice of all values in `messagesToSave` map
|
||||
var messages = make([]*common.Message, 0, len(messagesToSave))
|
||||
for _, msg := range messagesToSave {
|
||||
messages = append(messages, msg)
|
||||
}
|
||||
|
||||
// Same as above, we save these messages in chunks so we don't block
|
||||
// the database for a longer period of time
|
||||
messageChunks := chunkSlice(messages, maxChunkSizeMessages)
|
||||
chunksCount = len(messageChunks)
|
||||
|
||||
for ii, msgs := range messageChunks {
|
||||
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d app messages", ii+1, chunksCount, len(msgs)))
|
||||
err := m.persistence.SaveMessages(msgs)
|
||||
if err != nil {
|
||||
m.cleanUpImportChannel(request.CommunityID.String(), request.DiscordChannelID)
|
||||
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error()))
|
||||
importProgress.StopTask(discord.ImportMessagesTask)
|
||||
progressUpdates <- importProgress
|
||||
cancel <- request.DiscordChannelID
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) {
|
||||
importProgress.StopTask(discord.ImportMessagesTask)
|
||||
progressUpdates <- importProgress
|
||||
cancel <- request.DiscordChannelID
|
||||
return
|
||||
}
|
||||
|
||||
// 0.75 are the previous 75% of progress, hence we multiply our chunk progress
|
||||
// by 0.25
|
||||
currentCount := ii + 1
|
||||
progressValue := calculateProgress(i+1, totalImportChunkCount, 0.75+(float32(currentCount)/float32(chunksCount))*0.25)
|
||||
// progressValue := 0.75 + ((float32(currentCount) / float32(chunksCount)) * 0.25)
|
||||
importProgress.UpdateTaskProgress(discord.ImportMessagesTask, progressValue)
|
||||
progressUpdates <- importProgress
|
||||
|
||||
// We slow down the saving of message chunks to keep the database responsive
|
||||
if currentCount < chunksCount {
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
pinMessageChunks := chunkSlice(pinMessagesToSave, maxChunkSizeMessages)
|
||||
for _, pinMsgs := range pinMessageChunks {
|
||||
err := m.persistence.SavePinMessages(pinMsgs)
|
||||
if err != nil {
|
||||
m.cleanUpImportChannel(request.CommunityID.String(), request.DiscordChannelID)
|
||||
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error()))
|
||||
importProgress.StopTask(discord.ImportMessagesTask)
|
||||
progressUpdates <- importProgress
|
||||
cancel <- request.DiscordChannelID
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) {
|
||||
importProgress.StopTask(discord.ImportMessagesTask)
|
||||
progressUpdates <- importProgress
|
||||
cancel <- request.DiscordChannelID
|
||||
cancel <- request.DiscordChannelID
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
totalAssetsCount := len(messageAttachmentsToDownload) + len(authorProfilesToSave)
|
||||
var assetCounter discord.AssetCounter
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for id, author := range authorProfilesToSave {
|
||||
wg.Add(1)
|
||||
go func(id string, author *protobuf.DiscordMessageAuthor) {
|
||||
defer wg.Done()
|
||||
|
||||
m.communitiesManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount))
|
||||
imagePayload, err := discord.DownloadAvatarAsset(author.AvatarUrl)
|
||||
if err != nil {
|
||||
errmsg := fmt.Sprintf("Couldn't download profile avatar '%s': %s", author.AvatarUrl, err.Error())
|
||||
importProgress.AddTaskError(
|
||||
discord.DownloadAssetsTask,
|
||||
discord.Warning(errmsg),
|
||||
)
|
||||
progressUpdates <- importProgress
|
||||
cancel <- request.DiscordChannelID
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
err = m.persistence.UpdateDiscordMessageAuthorImage(author.Id, imagePayload)
|
||||
if err != nil {
|
||||
importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Warning(err.Error()))
|
||||
progressUpdates <- importProgress
|
||||
cancel <- request.DiscordChannelID
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
author.AvatarImagePayload = imagePayload
|
||||
authorProfilesToSave[id] = author
|
||||
|
||||
if m.DiscordImportMarkedAsCancelled(request.DiscordChannelID) {
|
||||
importProgress.StopTask(discord.DownloadAssetsTask)
|
||||
progressUpdates <- importProgress
|
||||
cancel <- request.DiscordChannelID
|
||||
return
|
||||
}
|
||||
|
||||
assetCounter.Increase()
|
||||
progressValue := calculateProgress(i+1, totalImportChunkCount, (float32(assetCounter.Value())/float32(totalAssetsCount))*0.5)
|
||||
importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue)
|
||||
progressUpdates <- importProgress
|
||||
|
||||
}(id, author)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) {
|
||||
importProgress.StopTask(discord.DownloadAssetsTask)
|
||||
progressUpdates <- importProgress
|
||||
cancel <- request.DiscordChannelID
|
||||
return
|
||||
}
|
||||
|
||||
for idxRange := range gopart.Partition(len(messageAttachmentsToDownload), 100) {
|
||||
attachments := messageAttachmentsToDownload[idxRange.Low:idxRange.High]
|
||||
wg.Add(1)
|
||||
go func(attachments []*protobuf.DiscordMessageAttachment) {
|
||||
defer wg.Done()
|
||||
for ii, attachment := range attachments {
|
||||
|
||||
m.communitiesManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount))
|
||||
|
||||
assetPayload, contentType, err := discord.DownloadAsset(attachment.Url)
|
||||
if err != nil {
|
||||
errmsg := fmt.Sprintf("Couldn't download message attachment '%s': %s", attachment.Url, err.Error())
|
||||
importProgress.AddTaskError(
|
||||
discord.DownloadAssetsTask,
|
||||
discord.Warning(errmsg),
|
||||
)
|
||||
progressUpdates <- importProgress
|
||||
continue
|
||||
}
|
||||
|
||||
attachment.Payload = assetPayload
|
||||
attachment.ContentType = contentType
|
||||
messageAttachmentsToDownload[ii] = attachment
|
||||
|
||||
if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) {
|
||||
importProgress.StopTask(discord.DownloadAssetsTask)
|
||||
progressUpdates <- importProgress
|
||||
cancel <- request.DiscordChannelID
|
||||
return
|
||||
}
|
||||
|
||||
assetCounter.Increase()
|
||||
progressValue := calculateProgress(i+1, totalImportChunkCount, (float32(assetCounter.Value())/float32(totalAssetsCount))*0.5)
|
||||
importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue)
|
||||
progressUpdates <- importProgress
|
||||
}
|
||||
}(attachments)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) {
|
||||
importProgress.StopTask(discord.DownloadAssetsTask)
|
||||
progressUpdates <- importProgress
|
||||
cancel <- request.DiscordChannelID
|
||||
return
|
||||
}
|
||||
|
||||
attachmentChunks := chunkAttachmentsByByteSize(messageAttachmentsToDownload, maxChunkSizeBytes)
|
||||
chunksCount = len(attachmentChunks)
|
||||
|
||||
for ii, attachments := range attachmentChunks {
|
||||
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord message attachments", ii+1, chunksCount, len(attachments)))
|
||||
err := m.persistence.SaveDiscordMessageAttachments(attachments)
|
||||
if err != nil {
|
||||
m.cleanUpImportChannel(request.CommunityID.String(), request.DiscordChannelID)
|
||||
importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Error(err.Error()))
|
||||
importProgress.Stop()
|
||||
progressUpdates <- importProgress
|
||||
cancel <- request.DiscordChannelID
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) {
|
||||
importProgress.StopTask(discord.DownloadAssetsTask)
|
||||
progressUpdates <- importProgress
|
||||
cancel <- request.DiscordChannelID
|
||||
return
|
||||
}
|
||||
|
||||
// 0.5 are the previous 50% of progress, hence we multiply our chunk progress
|
||||
// by 0.5
|
||||
currentCount := ii + 1
|
||||
progressValue := calculateProgress(i+1, totalImportChunkCount, 0.5+(float32(currentCount)/float32(chunksCount))*0.5)
|
||||
importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue)
|
||||
progressUpdates <- importProgress
|
||||
|
||||
// We slow down the saving of attachment chunks to keep the database responsive
|
||||
if currentCount < chunksCount {
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
if len(attachmentChunks) == 0 {
|
||||
progressValue := calculateProgress(i+1, totalImportChunkCount, 1.0)
|
||||
importProgress.UpdateTaskProgress(discord.DownloadAssetsTask, progressValue)
|
||||
}
|
||||
|
||||
_, err := m.transport.JoinPublic(newChat.ID)
|
||||
if err != nil {
|
||||
m.logger.Error("failed to load filter for chat", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
wakuChatMessages, err := m.chatMessagesToWakuMessages(messages, community)
|
||||
if err != nil {
|
||||
m.logger.Error("failed to convert chat messages into waku messages", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
wakuPinMessages, err := m.pinMessagesToWakuMessages(pinMessagesToSave, community)
|
||||
if err != nil {
|
||||
m.logger.Error("failed to convert pin messages into waku messages", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
wakuMessages := append(wakuChatMessages, wakuPinMessages...)
|
||||
|
||||
topics, err := m.communitiesManager.GetCommunityChatsTopics(request.CommunityID)
|
||||
if err != nil {
|
||||
m.logger.Error("failed to get community chat topics", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
startDate := time.Unix(int64(exportData.OldestMessageTimestamp), 0)
|
||||
endDate := time.Now()
|
||||
|
||||
_, err = m.communitiesManager.CreateHistoryArchiveTorrentFromMessages(
|
||||
request.CommunityID,
|
||||
wakuMessages,
|
||||
topics,
|
||||
startDate,
|
||||
endDate,
|
||||
messageArchiveInterval,
|
||||
community.Encrypted(),
|
||||
)
|
||||
if err != nil {
|
||||
m.logger.Error("failed to create history archive torrent", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
communitySettings, err := m.communitiesManager.GetCommunitySettingsByID(request.CommunityID)
|
||||
if err != nil {
|
||||
m.logger.Error("Failed to get community settings", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
if m.torrentClientReady() && communitySettings.HistoryArchiveSupportEnabled {
|
||||
|
||||
err = m.communitiesManager.SeedHistoryArchiveTorrent(request.CommunityID)
|
||||
if err != nil {
|
||||
m.logger.Error("failed to seed history archive", zap.Error(err))
|
||||
}
|
||||
go m.communitiesManager.StartHistoryArchiveTasksInterval(community, messageArchiveInterval)
|
||||
}
|
||||
}
|
||||
if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) {
|
||||
importProgress.StopTask(discord.InitCommunityTask)
|
||||
progressUpdates <- importProgress
|
||||
cancel <- request.DiscordChannelID
|
||||
return
|
||||
}
|
||||
|
||||
// Chats need to be saved after the community has been published,
|
||||
// hence we make this part of the `InitCommunityTask`
|
||||
err = m.saveChat(newChat)
|
||||
|
||||
if err != nil {
|
||||
m.cleanUpImportChannel(request.CommunityID.String(), request.DiscordChannelID)
|
||||
importProgress.AddTaskError(discord.InitCommunityTask, discord.Error(err.Error()))
|
||||
importProgress.Stop()
|
||||
progressUpdates <- importProgress
|
||||
cancel <- request.DiscordChannelID
|
||||
return
|
||||
}
|
||||
|
||||
m.config.messengerSignalsHandler.DiscordCommunityImportFinished(request.DiscordChannelID)
|
||||
close(done)
|
||||
}()
|
||||
}
|
||||
|
||||
func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscordCommunity) {
|
||||
go func() {
|
||||
|
||||
|
@ -4661,11 +5395,20 @@ func (m *Messenger) MarkDiscordCommunityImportAsCancelled(communityID string) {
|
|||
m.importingCommunities[communityID] = true
|
||||
}
|
||||
|
||||
func (m *Messenger) MarkDiscordChannelImportAsCancelled(channelID string) {
|
||||
m.importingChannels[channelID] = true
|
||||
}
|
||||
|
||||
func (m *Messenger) DiscordImportMarkedAsCancelled(communityID string) bool {
|
||||
cancelled, exists := m.importingCommunities[communityID]
|
||||
return exists && cancelled
|
||||
}
|
||||
|
||||
func (m *Messenger) DiscordImportChannelMarkedAsCancelled(channelID string) bool {
|
||||
cancelled, exists := m.importingChannels[channelID]
|
||||
return exists && cancelled
|
||||
}
|
||||
|
||||
func (m *Messenger) cleanUpImports() {
|
||||
for id := range m.importingCommunities {
|
||||
m.cleanUpImport(id)
|
||||
|
@ -4688,6 +5431,26 @@ func (m *Messenger) cleanUpImport(communityID string) {
|
|||
}
|
||||
}
|
||||
|
||||
func (m *Messenger) cleanUpImportChannel(communityID string, channelID string) {
|
||||
community, err := m.communitiesManager.GetByIDString(communityID)
|
||||
if err != nil {
|
||||
m.logger.Error("clean up failed, couldn't delete community", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
_, err = community.DeleteChat(channelID)
|
||||
if err != nil {
|
||||
m.logger.Error("clean up failed, couldn't delete community chat", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
err = m.persistence.DeleteMessagesByChatID(channelID)
|
||||
if err != nil {
|
||||
m.logger.Error("clean up failed, couldn't delete community chat messages", zap.Error(err))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Messenger) publishImportProgress(progress *discord.ImportProgress) {
|
||||
m.config.messengerSignalsHandler.DiscordCommunityImportProgress(progress)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
package requests
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
)
|
||||
|
||||
// errrors
|
||||
var (
|
||||
ErrImportDiscordChannelMissingFilesToImport = errors.New("import-discord-channel: missing files to import")
|
||||
ErrImportDiscordChannelChannelIDIsEmpty = errors.New("import-discord-channel: discord channel id is empty")
|
||||
ErrImportDiscordChannelCommunityIDIsEmpty = errors.New("import-discord-channel: community id is empty")
|
||||
)
|
||||
|
||||
type ImportDiscordChannel struct {
|
||||
Name string `json:"name"`
|
||||
DiscordChannelID string `json:"discordChannelID"`
|
||||
CommunityID types.HexBytes `json:"communityId"`
|
||||
Description string `json:"description"`
|
||||
Color string `json:"color"`
|
||||
Emoji string `json:"emoji"`
|
||||
FilesToImport []string `json:"filesToImport"`
|
||||
From int64 `json:"from"`
|
||||
}
|
||||
|
||||
func (r *ImportDiscordChannel) Validate() error {
|
||||
if len(r.FilesToImport) == 0 {
|
||||
return ErrImportDiscordChannelMissingFilesToImport
|
||||
}
|
||||
|
||||
if len(r.DiscordChannelID) == 0 {
|
||||
return ErrImportDiscordChannelChannelIDIsEmpty
|
||||
}
|
||||
|
||||
if len(r.CommunityID) == 0 {
|
||||
return ErrImportDiscordChannelCommunityIDIsEmpty
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -1388,6 +1388,10 @@ func (api *PublicAPI) ExtractDiscordChannelsAndCategories(filesToImport []string
|
|||
return api.service.messenger.ExtractDiscordChannelsAndCategories(filesToImport)
|
||||
}
|
||||
|
||||
func (api *PublicAPI) RequestImportDiscordChannel(request *requests.ImportDiscordChannel) {
|
||||
api.service.messenger.RequestImportDiscordChannel(request)
|
||||
}
|
||||
|
||||
func (api *PublicAPI) RequestImportDiscordCommunity(request *requests.ImportDiscordCommunity) {
|
||||
api.service.messenger.RequestImportDiscordCommunity(request)
|
||||
}
|
||||
|
@ -1396,6 +1400,10 @@ func (api *PublicAPI) RequestCancelDiscordCommunityImport(id string) {
|
|||
api.service.messenger.MarkDiscordCommunityImportAsCancelled(id)
|
||||
}
|
||||
|
||||
func (api *PublicAPI) RequestCancelDiscordChannelImport(discordChannelID string) {
|
||||
api.service.messenger.MarkDiscordChannelImportAsCancelled(discordChannelID)
|
||||
}
|
||||
|
||||
func (api *PublicAPI) BuildContact(request *requests.BuildContact) (*protocol.Contact, error) {
|
||||
return api.service.messenger.BuildContact(request)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue