Resume import of message history archives upon bootstrap

This adds the functionality that history archives continue to be imported
in case the import has been interrupted the last time the app/client
was running.

This typically happens when users don't wait for an ongoing import to finish,
which sometimes can take a while. Users then close the app/kill the client
which leaves the database in a state where there's downloaded archives that
haven't been fully imported.

Prior to this change, the node will have to wait until it receives a new
magnetlink that it hasn't seen before, until it processes imports again.
This can take several days.

Now, it will check on startup if there are any archives left to be imported
and resumes the import from there.
This commit is contained in:
Pascal Precht 2023-01-19 14:23:48 +01:00 committed by r4bbit
parent ea46779e98
commit f85c500e9a
3 changed files with 122 additions and 57 deletions

View File

@ -720,6 +720,20 @@ func (m *Messenger) Start() (*MessengerResponse, error) {
} }
} }
joinedCommunities, err := m.communitiesManager.Joined()
if err != nil {
return nil, err
}
for _, joinedCommunity := range joinedCommunities {
// resume importing message history archives in case
// imports have been interrupted previously
err := m.resumeHistoryArchivesImport(joinedCommunity.ID())
if err != nil {
return nil, err
}
}
if m.httpServer != nil { if m.httpServer != nil {
err = m.httpServer.Start() err = m.httpServer.Start()
if err != nil { if err != nil {

View File

@ -32,6 +32,8 @@ import (
"github.com/status-im/status-go/protocol/requests" "github.com/status-im/status-go/protocol/requests"
"github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/protocol/transport"
v1protocol "github.com/status-im/status-go/protocol/v1" v1protocol "github.com/status-im/status-go/protocol/v1"
localnotifications "github.com/status-im/status-go/services/local-notifications"
"github.com/status-im/status-go/signal"
) )
// 7 days interval // 7 days interval
@ -1979,6 +1981,105 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community
} }
} }
func (m *Messenger) resumeHistoryArchivesImport(communityID types.HexBytes) error {
archiveIDsToImport, err := m.communitiesManager.GetMessageArchiveIDsToImport(communityID)
if err != nil {
return err
}
if len(archiveIDsToImport) == 0 {
return nil
}
currentTask := m.communitiesManager.GetHistoryArchiveDownloadTask(communityID.String())
// no need to resume imports if there's already a task ongoing
if currentTask != nil {
return nil
}
// Create new task
task := &communities.HistoryArchiveDownloadTask{
Cancel: make(chan struct{}),
Waiter: *new(sync.WaitGroup),
}
m.communitiesManager.AddHistoryArchiveDownloadTask(communityID.String(), task)
go func() {
// this wait groups tracks the ongoing task for a particular community
task.Waiter.Add(1)
defer func() {
task.Waiter.Done()
m.communitiesManager.DeleteHistoryArchiveDownloadTask(communityID.String())
}()
err := m.importHistoryArchives(communityID, task.Cancel)
if err != nil {
m.communitiesManager.LogStdout("failed to import history archives", zap.Error(err))
}
m.config.messengerSignalsHandler.DownloadingHistoryArchivesFinished(types.EncodeHex(communityID))
}()
return nil
}
func (m *Messenger) importHistoryArchives(communityID types.HexBytes, cancel chan struct{}) error {
importTicker := time.NewTicker(100 * time.Millisecond)
defer importTicker.Stop()
importMessageArchivesLoop:
for {
select {
case <-cancel:
m.communitiesManager.LogStdout("interrupted importing history archive messages")
return nil
case <-importTicker.C:
archiveIDsToImport, err := m.communitiesManager.GetMessageArchiveIDsToImport(communityID)
if err != nil {
m.communitiesManager.LogStdout("couldn't get message archive IDs to import", zap.Error(err))
return err
}
if len(archiveIDsToImport) == 0 {
m.communitiesManager.LogStdout("no message archives to import")
break importMessageArchivesLoop
}
m.communitiesManager.LogStdout(fmt.Sprintf("importing message archive, %d left", len(archiveIDsToImport)))
// only process one archive at a time, so in case of cancel we don't
// wait for all archives to be processed first
downloadedArchiveID := archiveIDsToImport[0]
archiveMessages, err := m.communitiesManager.ExtractMessagesFromHistoryArchive(communityID, downloadedArchiveID)
if err != nil {
m.communitiesManager.LogStdout("failed to extract history archive messages", zap.Error(err))
continue
}
m.config.messengerSignalsHandler.ImportingHistoryArchiveMessages(types.EncodeHex(communityID))
response, err := m.handleArchiveMessages(archiveMessages, communityID)
if err != nil {
m.communitiesManager.LogStdout("failed to handle archive messages", zap.Error(err))
continue
}
err = m.communitiesManager.SetMessageArchiveIDImported(communityID, downloadedArchiveID, true)
if err != nil {
m.communitiesManager.LogStdout("failed to mark history message archive as imported", zap.Error(err))
continue
}
if !response.IsEmpty() {
notifications := response.Notifications()
response.ClearNotifications()
signal.SendNewMessages(response)
localnotifications.PushMessages(notifications)
}
}
}
return nil
}
func (m *Messenger) dispatchMagnetlinkMessage(communityID string) error { func (m *Messenger) dispatchMagnetlinkMessage(communityID string) error {
community, err := m.communitiesManager.GetByIDString(communityID) community, err := m.communitiesManager.GetByIDString(communityID)

View File

@ -7,7 +7,6 @@ import (
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"sync" "sync"
"time"
"github.com/pborman/uuid" "github.com/pborman/uuid"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -27,8 +26,6 @@ import (
"github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/protocol/transport"
v1protocol "github.com/status-im/status-go/protocol/v1" v1protocol "github.com/status-im/status-go/protocol/v1"
"github.com/status-im/status-go/protocol/verification" "github.com/status-im/status-go/protocol/verification"
localnotifications "github.com/status-im/status-go/services/local-notifications"
"github.com/status-im/status-go/signal"
) )
const ( const (
@ -1000,65 +997,19 @@ func (m *Messenger) downloadAndImportHistoryArchives(id types.HexBytes, magnetli
return return
} }
importTicker := time.NewTicker(100 * time.Millisecond) err = m.importHistoryArchives(id, cancel)
defer importTicker.Stop() if err != nil {
m.communitiesManager.LogStdout("failed to import history archives", zap.Error(err))
importMessageArchivesLoop: m.config.messengerSignalsHandler.DownloadingHistoryArchivesFinished(types.EncodeHex(id))
for { return
select {
case <-cancel:
m.communitiesManager.LogStdout("interrupted importing history archive messages")
return
case <-importTicker.C:
archiveIDsToImport, err := m.communitiesManager.GetMessageArchiveIDsToImport(id)
if err != nil {
m.communitiesManager.LogStdout("couldn't get message archive IDs to import", zap.Error(err))
return
}
if len(archiveIDsToImport) == 0 {
m.communitiesManager.LogStdout("no message archives to import")
break importMessageArchivesLoop
}
m.communitiesManager.LogStdout(fmt.Sprintf("importing message archive, %d left", len(archiveIDsToImport)))
// only process one archive at a time, so in case of cancel we don't
// wait for all archives to be processed first
downloadedArchiveID := archiveIDsToImport[0]
archiveMessages, err := m.communitiesManager.ExtractMessagesFromHistoryArchive(id, downloadedArchiveID)
if err != nil {
m.communitiesManager.LogStdout("failed to extract history archive messages", zap.Error(err))
continue
}
response, err := m.handleArchiveMessages(archiveMessages, id)
if err != nil {
m.communitiesManager.LogStdout("failed to handle archive messages", zap.Error(err))
continue
}
err = m.communitiesManager.SetMessageArchiveIDImported(id, downloadedArchiveID, true)
if err != nil {
m.communitiesManager.LogStdout("failed to mark history message archive as imported", zap.Error(err))
continue
}
if !response.IsEmpty() {
notifications := response.Notifications()
response.ClearNotifications()
signal.SendNewMessages(response)
localnotifications.PushMessages(notifications)
}
}
} }
err = m.communitiesManager.UpdateLastSeenMagnetlink(id, magnetlink) err = m.communitiesManager.UpdateLastSeenMagnetlink(id, magnetlink)
if err != nil { if err != nil {
m.communitiesManager.LogStdout("couldn't update last seen magnetlink", zap.Error(err)) m.communitiesManager.LogStdout("couldn't update last seen magnetlink", zap.Error(err))
} }
m.config.messengerSignalsHandler.DownloadingHistoryArchivesFinished(types.EncodeHex(id)) m.config.messengerSignalsHandler.DownloadingHistoryArchivesFinished(types.EncodeHex(id))
} }
func (m *Messenger) handleArchiveMessages(archiveMessages []*protobuf.WakuMessage, id types.HexBytes) (*MessengerResponse, error) { func (m *Messenger) handleArchiveMessages(archiveMessages []*protobuf.WakuMessage, id types.HexBytes) (*MessengerResponse, error) {
@ -1094,7 +1045,6 @@ func (m *Messenger) handleArchiveMessages(archiveMessages []*protobuf.WakuMessag
} }
} }
m.config.messengerSignalsHandler.ImportingHistoryArchiveMessages(types.EncodeHex(id))
err := m.handleImportedMessages(importedMessages) err := m.handleImportedMessages(importedMessages)
if err != nil { if err != nil {
m.communitiesManager.LogStdout("failed to handle imported messages", zap.Error(err)) m.communitiesManager.LogStdout("failed to handle imported messages", zap.Error(err))