diff --git a/protocol/messenger.go b/protocol/messenger.go index 667af8b68..7cb6e3db2 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -720,6 +720,20 @@ func (m *Messenger) Start() (*MessengerResponse, error) { } } + joinedCommunities, err := m.communitiesManager.Joined() + if err != nil { + return nil, err + } + + for _, joinedCommunity := range joinedCommunities { + // resume importing message history archives in case + // imports have been interrupted previously + err := m.resumeHistoryArchivesImport(joinedCommunity.ID()) + if err != nil { + return nil, err + } + } + if m.httpServer != nil { err = m.httpServer.Start() if err != nil { diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index b3e3add09..f8dc58ce9 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -32,6 +32,8 @@ import ( "github.com/status-im/status-go/protocol/requests" "github.com/status-im/status-go/protocol/transport" v1protocol "github.com/status-im/status-go/protocol/v1" + localnotifications "github.com/status-im/status-go/services/local-notifications" + "github.com/status-im/status-go/signal" ) // 7 days interval @@ -1979,6 +1981,105 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community } } +func (m *Messenger) resumeHistoryArchivesImport(communityID types.HexBytes) error { + archiveIDsToImport, err := m.communitiesManager.GetMessageArchiveIDsToImport(communityID) + if err != nil { + return err + } + + if len(archiveIDsToImport) == 0 { + return nil + } + + currentTask := m.communitiesManager.GetHistoryArchiveDownloadTask(communityID.String()) + // no need to resume imports if there's already a task ongoing + if currentTask != nil { + return nil + } + + // Create new task + task := &communities.HistoryArchiveDownloadTask{ + Cancel: make(chan struct{}), + Waiter: *new(sync.WaitGroup), + } + + m.communitiesManager.AddHistoryArchiveDownloadTask(communityID.String(), task) + + go func() { + // this wait groups tracks the ongoing task for a particular community + task.Waiter.Add(1) + defer func() { + task.Waiter.Done() + m.communitiesManager.DeleteHistoryArchiveDownloadTask(communityID.String()) + }() + err := m.importHistoryArchives(communityID, task.Cancel) + if err != nil { + m.communitiesManager.LogStdout("failed to import history archives", zap.Error(err)) + } + m.config.messengerSignalsHandler.DownloadingHistoryArchivesFinished(types.EncodeHex(communityID)) + }() + return nil +} + +func (m *Messenger) importHistoryArchives(communityID types.HexBytes, cancel chan struct{}) error { + importTicker := time.NewTicker(100 * time.Millisecond) + defer importTicker.Stop() + +importMessageArchivesLoop: + for { + select { + case <-cancel: + m.communitiesManager.LogStdout("interrupted importing history archive messages") + return nil + case <-importTicker.C: + + archiveIDsToImport, err := m.communitiesManager.GetMessageArchiveIDsToImport(communityID) + if err != nil { + m.communitiesManager.LogStdout("couldn't get message archive IDs to import", zap.Error(err)) + return err + } + + if len(archiveIDsToImport) == 0 { + m.communitiesManager.LogStdout("no message archives to import") + break importMessageArchivesLoop + } + + m.communitiesManager.LogStdout(fmt.Sprintf("importing message archive, %d left", len(archiveIDsToImport))) + + // only process one archive at a time, so in case of cancel we don't + // wait for all archives to be processed first + downloadedArchiveID := archiveIDsToImport[0] + + archiveMessages, err := m.communitiesManager.ExtractMessagesFromHistoryArchive(communityID, downloadedArchiveID) + if err != nil { + m.communitiesManager.LogStdout("failed to extract history archive messages", zap.Error(err)) + continue + } + + m.config.messengerSignalsHandler.ImportingHistoryArchiveMessages(types.EncodeHex(communityID)) + response, err := m.handleArchiveMessages(archiveMessages, communityID) + if err != nil { + m.communitiesManager.LogStdout("failed to handle archive messages", zap.Error(err)) + continue + } + + err = m.communitiesManager.SetMessageArchiveIDImported(communityID, downloadedArchiveID, true) + if err != nil { + m.communitiesManager.LogStdout("failed to mark history message archive as imported", zap.Error(err)) + continue + } + + if !response.IsEmpty() { + notifications := response.Notifications() + response.ClearNotifications() + signal.SendNewMessages(response) + localnotifications.PushMessages(notifications) + } + } + } + return nil +} + func (m *Messenger) dispatchMagnetlinkMessage(communityID string) error { community, err := m.communitiesManager.GetByIDString(communityID) diff --git a/protocol/messenger_handler.go b/protocol/messenger_handler.go index 3c25e83c9..48ea24a10 100644 --- a/protocol/messenger_handler.go +++ b/protocol/messenger_handler.go @@ -7,7 +7,6 @@ import ( "encoding/hex" "fmt" "sync" - "time" "github.com/pborman/uuid" "github.com/pkg/errors" @@ -27,8 +26,6 @@ import ( "github.com/status-im/status-go/protocol/transport" v1protocol "github.com/status-im/status-go/protocol/v1" "github.com/status-im/status-go/protocol/verification" - localnotifications "github.com/status-im/status-go/services/local-notifications" - "github.com/status-im/status-go/signal" ) const ( @@ -1000,65 +997,19 @@ func (m *Messenger) downloadAndImportHistoryArchives(id types.HexBytes, magnetli return } - importTicker := time.NewTicker(100 * time.Millisecond) - defer importTicker.Stop() - -importMessageArchivesLoop: - for { - select { - case <-cancel: - m.communitiesManager.LogStdout("interrupted importing history archive messages") - return - case <-importTicker.C: - - archiveIDsToImport, err := m.communitiesManager.GetMessageArchiveIDsToImport(id) - if err != nil { - m.communitiesManager.LogStdout("couldn't get message archive IDs to import", zap.Error(err)) - return - } - - if len(archiveIDsToImport) == 0 { - m.communitiesManager.LogStdout("no message archives to import") - break importMessageArchivesLoop - } - - m.communitiesManager.LogStdout(fmt.Sprintf("importing message archive, %d left", len(archiveIDsToImport))) - - // only process one archive at a time, so in case of cancel we don't - // wait for all archives to be processed first - downloadedArchiveID := archiveIDsToImport[0] - - archiveMessages, err := m.communitiesManager.ExtractMessagesFromHistoryArchive(id, downloadedArchiveID) - if err != nil { - m.communitiesManager.LogStdout("failed to extract history archive messages", zap.Error(err)) - continue - } - - response, err := m.handleArchiveMessages(archiveMessages, id) - if err != nil { - m.communitiesManager.LogStdout("failed to handle archive messages", zap.Error(err)) - continue - } - - err = m.communitiesManager.SetMessageArchiveIDImported(id, downloadedArchiveID, true) - if err != nil { - m.communitiesManager.LogStdout("failed to mark history message archive as imported", zap.Error(err)) - continue - } - - if !response.IsEmpty() { - notifications := response.Notifications() - response.ClearNotifications() - signal.SendNewMessages(response) - localnotifications.PushMessages(notifications) - } - } + err = m.importHistoryArchives(id, cancel) + if err != nil { + m.communitiesManager.LogStdout("failed to import history archives", zap.Error(err)) + m.config.messengerSignalsHandler.DownloadingHistoryArchivesFinished(types.EncodeHex(id)) + return } + err = m.communitiesManager.UpdateLastSeenMagnetlink(id, magnetlink) if err != nil { m.communitiesManager.LogStdout("couldn't update last seen magnetlink", zap.Error(err)) } m.config.messengerSignalsHandler.DownloadingHistoryArchivesFinished(types.EncodeHex(id)) + } func (m *Messenger) handleArchiveMessages(archiveMessages []*protobuf.WakuMessage, id types.HexBytes) (*MessengerResponse, error) { @@ -1094,7 +1045,6 @@ func (m *Messenger) handleArchiveMessages(archiveMessages []*protobuf.WakuMessag } } - m.config.messengerSignalsHandler.ImportingHistoryArchiveMessages(types.EncodeHex(id)) err := m.handleImportedMessages(importedMessages) if err != nil { m.communitiesManager.LogStdout("failed to handle imported messages", zap.Error(err))