feat(discord): Split import channel signals and community import signals (#4232)

This commit is contained in:
Boris Melnik 2023-11-07 13:44:10 +03:00 committed by GitHub
parent 07cd7bab10
commit 1d08b403e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 160 additions and 45 deletions

View File

@ -4139,21 +4139,22 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
progressUpdates := make(chan *discord.ImportProgress)
done := make(chan struct{})
cancel := make(chan string)
cancel := make(chan []string)
var newChat *Chat
m.startPublishImportProgressInterval(progressUpdates, cancel, done)
m.startPublishImportChannelProgressInterval(progressUpdates, cancel, done)
importProgress := &discord.ImportProgress{}
importProgress.Init(totalImportChunkCount, []discord.ImportTask{
discord.ChannelsCreationTask,
discord.ImportMessagesTask,
discord.DownloadAssetsTask,
discord.InitCommunityTask,
})
importProgress.CommunityID = request.DiscordChannelID
importProgress.CommunityName = request.Name
importProgress.ChannelID = request.DiscordChannelID
importProgress.ChannelName = request.Name
// initial progress immediately
if err := request.Validate(); err != nil {
@ -4161,14 +4162,14 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg))
importProgress.StopTask(discord.ChannelsCreationTask)
progressUpdates <- importProgress
cancel <- request.DiscordChannelID
cancel <- []string{string(request.CommunityID), "", request.DiscordChannelID}
return
}
// Here's 3 steps: Find the corrent channel in files, get the community and create the channel
progressValue := calculateProgress(3, 1, (float32(1) / 3))
progressValue := float32(0.3)
m.publishImportProgress(importProgress)
m.publishChannelImportProgress(importProgress)
community, err := m.GetCommunityByID(request.CommunityID)
@ -4177,7 +4178,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg))
importProgress.StopTask(discord.ChannelsCreationTask)
progressUpdates <- importProgress
cancel <- request.DiscordChannelID
cancel <- []string{string(request.CommunityID), "", request.DiscordChannelID}
return
}
@ -4186,7 +4187,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg))
importProgress.StopTask(discord.ChannelsCreationTask)
progressUpdates <- importProgress
cancel <- request.DiscordChannelID
cancel <- []string{string(request.CommunityID), "", request.DiscordChannelID}
return
}
@ -4203,7 +4204,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
}
importProgress.StopTask(discord.ChannelsCreationTask)
progressUpdates <- importProgress
cancel <- request.DiscordChannelID
cancel <- []string{string(request.CommunityID), "", request.DiscordChannelID}
return
}
@ -4224,15 +4225,22 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg))
importProgress.StopTask(discord.ChannelsCreationTask)
progressUpdates <- importProgress
cancel <- request.DiscordChannelID
cancel <- []string{string(request.CommunityID), "", request.DiscordChannelID}
return
}
}
progressValue := calculateProgress(3, 2, (float32(2) / 3))
progressValue := float32(0.6)
importProgress.UpdateTaskProgress(discord.ChannelsCreationTask, progressValue)
progressUpdates <- importProgress
if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) {
importProgress.StopTask(discord.ChannelsCreationTask)
progressUpdates <- importProgress
cancel <- []string{string(request.CommunityID), "", request.DiscordChannelID}
return
}
if len(channel.Channel.ID) == 0 {
// skip this file and try to find in the next file
continue
@ -4266,12 +4274,13 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
errmsg := err.Error()
if errors.Is(err, communities.ErrInvalidCommunityDescriptionDuplicatedName) {
errmsg = fmt.Sprintf("Couldn't create channel '%s': %s", communityChat.Identity.DisplayName, err.Error())
fmt.Println(errmsg)
}
importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error(errmsg))
importProgress.StopTask(discord.ChannelsCreationTask)
progressUpdates <- importProgress
cancel <- request.DiscordChannelID
cancel <- []string{string(request.CommunityID), "", request.DiscordChannelID}
return
}
@ -4280,16 +4289,23 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
newChat = CreateCommunityChat(request.CommunityID.String(), chatID, chat, m.getTimesource())
}
progressValue = calculateProgress(3, 3, (float32(3) / 3))
progressValue = float32(1.0)
importProgress.UpdateTaskProgress(discord.ChannelsCreationTask, progressValue)
progressUpdates <- importProgress
} else {
// When channel with current discord id already exist we should skip import
importProgress.AddTaskError(discord.ChannelsCreationTask, discord.Error("Channel already imported to this community"))
importProgress.StopTask(discord.ChannelsCreationTask)
progressUpdates <- importProgress
cancel <- []string{string(request.CommunityID), "", request.DiscordChannelID}
return
}
if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) {
importProgress.StopTask(discord.ImportMessagesTask)
progressUpdates <- importProgress
cancel <- request.DiscordChannelID
cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID}
return
}
@ -4316,15 +4332,14 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error()))
importProgress.StopTask(discord.ImportMessagesTask)
progressUpdates <- importProgress
cancel <- request.DiscordChannelID
cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID}
return
}
if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) {
importProgress.StopTask(discord.ImportMessagesTask)
progressUpdates <- importProgress
cancel <- request.DiscordChannelID
cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID}
return
}
@ -4360,7 +4375,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error()))
importProgress.StopTask(discord.ImportMessagesTask)
progressUpdates <- importProgress
cancel <- request.DiscordChannelID
cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID}
return
}
@ -4368,7 +4383,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) {
importProgress.StopTask(discord.ImportMessagesTask)
progressUpdates <- importProgress
cancel <- request.DiscordChannelID
cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID}
return
}
@ -4394,7 +4409,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
importProgress.AddTaskError(discord.ImportMessagesTask, discord.Error(err.Error()))
importProgress.StopTask(discord.ImportMessagesTask)
progressUpdates <- importProgress
cancel <- request.DiscordChannelID
cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID}
return
}
@ -4402,9 +4417,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) {
importProgress.StopTask(discord.ImportMessagesTask)
progressUpdates <- importProgress
cancel <- request.DiscordChannelID
cancel <- request.DiscordChannelID
cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID}
return
}
}
@ -4428,7 +4441,6 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
discord.Warning(errmsg),
)
progressUpdates <- importProgress
cancel <- request.DiscordChannelID
return
}
@ -4437,7 +4449,6 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
if err != nil {
importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Warning(err.Error()))
progressUpdates <- importProgress
cancel <- request.DiscordChannelID
return
}
@ -4448,7 +4459,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
if m.DiscordImportMarkedAsCancelled(request.DiscordChannelID) {
importProgress.StopTask(discord.DownloadAssetsTask)
progressUpdates <- importProgress
cancel <- request.DiscordChannelID
cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID}
return
}
@ -4464,7 +4475,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) {
importProgress.StopTask(discord.DownloadAssetsTask)
progressUpdates <- importProgress
cancel <- request.DiscordChannelID
cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID}
return
}
@ -4495,7 +4506,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) {
importProgress.StopTask(discord.DownloadAssetsTask)
progressUpdates <- importProgress
cancel <- request.DiscordChannelID
cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID}
return
}
@ -4511,7 +4522,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) {
importProgress.StopTask(discord.DownloadAssetsTask)
progressUpdates <- importProgress
cancel <- request.DiscordChannelID
cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID}
return
}
@ -4522,19 +4533,17 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord message attachments", ii+1, chunksCount, len(attachments)))
err := m.persistence.SaveDiscordMessageAttachments(attachments)
if err != nil {
m.cleanUpImportChannel(request.CommunityID.String(), request.DiscordChannelID)
importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Error(err.Error()))
importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Warning(err.Error()))
importProgress.Stop()
progressUpdates <- importProgress
cancel <- request.DiscordChannelID
return
continue
}
if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) {
importProgress.StopTask(discord.DownloadAssetsTask)
progressUpdates <- importProgress
cancel <- request.DiscordChannelID
cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID}
return
}
@ -4612,10 +4621,13 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
go m.communitiesManager.StartHistoryArchiveTasksInterval(community, messageArchiveInterval)
}
}
importProgress.UpdateTaskProgress(discord.InitCommunityTask, float32(0.0))
if m.DiscordImportChannelMarkedAsCancelled(request.DiscordChannelID) {
importProgress.StopTask(discord.InitCommunityTask)
progressUpdates <- importProgress
cancel <- request.DiscordChannelID
cancel <- []string{string(request.CommunityID), newChat.ID, request.DiscordChannelID}
return
}
@ -4628,11 +4640,13 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
importProgress.AddTaskError(discord.InitCommunityTask, discord.Error(err.Error()))
importProgress.Stop()
progressUpdates <- importProgress
cancel <- request.DiscordChannelID
cancel <- []string{string(request.CommunityID), request.DiscordChannelID}
return
}
m.config.messengerSignalsHandler.DiscordCommunityImportFinished(request.DiscordChannelID)
importProgress.UpdateTaskProgress(discord.InitCommunityTask, float32(1.0))
m.config.messengerSignalsHandler.DiscordChannelImportFinished(string(request.CommunityID), newChat.ID)
close(done)
}()
}
@ -5525,13 +5539,7 @@ func (m *Messenger) cleanUpImport(communityID string) {
}
func (m *Messenger) cleanUpImportChannel(communityID string, channelID string) {
community, err := m.communitiesManager.GetByIDString(communityID)
if err != nil {
m.logger.Error("clean up failed, couldn't delete community", zap.Error(err))
return
}
_, err = community.DeleteChat(channelID)
_, err := m.DeleteCommunityChat(types.HexBytes(communityID), channelID)
if err != nil {
m.logger.Error("clean up failed, couldn't delete community chat", zap.Error(err))
return
@ -5548,6 +5556,10 @@ func (m *Messenger) publishImportProgress(progress *discord.ImportProgress) {
m.config.messengerSignalsHandler.DiscordCommunityImportProgress(progress)
}
func (m *Messenger) publishChannelImportProgress(progress *discord.ImportProgress) {
m.config.messengerSignalsHandler.DiscordChannelImportProgress(progress)
}
func (m *Messenger) startPublishImportProgressInterval(c chan *discord.ImportProgress, cancel chan string, done chan struct{}) {
var currentProgress *discord.ImportProgress
@ -5587,6 +5599,50 @@ func (m *Messenger) startPublishImportProgressInterval(c chan *discord.ImportPro
}()
}
func (m *Messenger) startPublishImportChannelProgressInterval(c chan *discord.ImportProgress, cancel chan []string, done chan struct{}) {
var currentProgress *discord.ImportProgress
go func() {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if currentProgress != nil {
m.publishChannelImportProgress(currentProgress)
if currentProgress.Stopped {
return
}
}
case progressUpdate := <-c:
currentProgress = progressUpdate
case <-done:
if currentProgress != nil {
m.publishChannelImportProgress(currentProgress)
}
return
case ids := <-cancel:
if currentProgress != nil {
m.publishImportProgress(currentProgress)
}
if len(ids) > 0 {
communityID := ids[0]
channelID := ids[1]
discordChannelID := ids[2]
m.cleanUpImportChannel(communityID, channelID)
m.config.messengerSignalsHandler.DiscordChannelImportCancelled(discordChannelID)
}
return
case <-m.quit:
m.cleanUpImports()
return
}
}
}()
}
func (m *Messenger) pinMessagesToWakuMessages(pinMessages []*common.PinMessage, c *communities.Community) ([]*types.Message, error) {
wakuMessages := make([]*types.Message, 0)
for _, msg := range pinMessages {

View File

@ -55,6 +55,9 @@ type MessengerSignalsHandler interface {
DiscordCommunityImportProgress(importProgress *discord.ImportProgress)
DiscordCommunityImportFinished(communityID string)
DiscordCommunityImportCancelled(communityID string)
DiscordChannelImportProgress(importProgress *discord.ImportProgress)
DiscordChannelImportFinished(communityID string, channelID string)
DiscordChannelImportCancelled(channelID string)
SendWakuFetchingBackupProgress(response *wakusync.WakuBackedUpDataResponse)
SendWakuBackedUpProfile(response *wakusync.WakuBackedUpDataResponse)
SendWakuBackedUpSettings(response *wakusync.WakuBackedUpDataResponse)

View File

@ -137,14 +137,26 @@ func (m *MessengerSignalsHandler) DiscordCommunityImportProgress(importProgress
signal.SendDiscordCommunityImportProgress(importProgress)
}
func (m *MessengerSignalsHandler) DiscordChannelImportProgress(importProgress *discord.ImportProgress) {
signal.SendDiscordChannelImportProgress(importProgress)
}
func (m *MessengerSignalsHandler) DiscordCommunityImportFinished(id string) {
signal.SendDiscordCommunityImportFinished(id)
}
func (m *MessengerSignalsHandler) DiscordChannelImportFinished(communityID string, channelID string) {
signal.SendDiscordChannelImportFinished(communityID, channelID)
}
func (m *MessengerSignalsHandler) DiscordCommunityImportCancelled(id string) {
signal.SendDiscordCommunityImportCancelled(id)
}
func (m *MessengerSignalsHandler) DiscordChannelImportCancelled(id string) {
signal.SendDiscordChannelImportCancelled(id)
}
func (m *MessengerSignalsHandler) SendWakuFetchingBackupProgress(response *wakusync.WakuBackedUpDataResponse) {
signal.SendWakuFetchingBackupProgress(response)
}

View File

@ -21,6 +21,18 @@ const (
// EventDiscordCommunityImportCancelled triggered when importing
// the discord community was cancelled
EventDiscordCommunityImportCancelled = "community.discordCommunityImportCancelled"
// EventDiscordChannelImportProgress is triggered during the import
// of a discord community channel as it progresses
EventDiscordChannelImportProgress = "community.discordChannelImportProgress"
// EventDiscordChannelImportFinished triggered when importing
// the discord community channel into status was successful
EventDiscordChannelImportFinished = "community.discordChannelImportFinished"
// EventDiscordChannelImportCancelled triggered when importing
// the discord community channel was cancelled
EventDiscordChannelImportCancelled = "community.discordChannelImportCancelled"
)
type DiscordCategoriesAndChannelsExtractedSignal struct {
@ -42,6 +54,19 @@ type DiscordCommunityImportCancelledSignal struct {
CommunityID string `json:"communityId"`
}
type DiscordChannelImportProgressSignal struct {
ImportProgress *discord.ImportProgress `json:"importProgress"`
}
type DiscordChannelImportFinishedSignal struct {
CommunityID string `json:"communityId"`
ChannelID string `json:"channelId"`
}
type DiscordChannelImportCancelledSignal struct {
ChannelID string `json:"channelId"`
}
func SendDiscordCategoriesAndChannelsExtracted(categories []*discord.Category, channels []*discord.Channel, oldestMessageTimestamp int64, errors map[string]*discord.ImportError) {
send(EventDiscordCategoriesAndChannelsExtracted, DiscordCategoriesAndChannelsExtractedSignal{
Categories: categories,
@ -57,14 +82,33 @@ func SendDiscordCommunityImportProgress(importProgress *discord.ImportProgress)
})
}
func SendDiscordChannelImportProgress(importProgress *discord.ImportProgress) {
send(EventDiscordChannelImportProgress, DiscordChannelImportProgressSignal{
ImportProgress: importProgress,
})
}
func SendDiscordCommunityImportFinished(communityID string) {
send(EventDiscordCommunityImportFinished, DiscordCommunityImportFinishedSignal{
CommunityID: communityID,
})
}
func SendDiscordChannelImportFinished(communityID string, channelID string) {
send(EventDiscordChannelImportFinished, DiscordChannelImportFinishedSignal{
CommunityID: communityID,
ChannelID: channelID,
})
}
func SendDiscordCommunityImportCancelled(communityID string) {
send(EventDiscordCommunityImportCancelled, DiscordCommunityImportCancelledSignal{
CommunityID: communityID,
})
}
func SendDiscordChannelImportCancelled(channelID string) {
send(EventDiscordChannelImportCancelled, DiscordChannelImportCancelledSignal{
ChannelID: channelID,
})
}