diff --git a/protocol/messenger.go b/protocol/messenger.go index 7d9c455f8..a3f3fe262 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -7,7 +7,6 @@ import ( "database/sql" "encoding/json" "fmt" - "math/rand" "os" "strconv" "strings" @@ -33,7 +32,6 @@ import ( utils "github.com/status-im/status-go/common" "github.com/status-im/status-go/connection" "github.com/status-im/status-go/contracts" - "github.com/status-im/status-go/deprecation" "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/images" @@ -44,7 +42,6 @@ import ( "github.com/status-im/status-go/multiaccounts/settings" "github.com/status-im/status-go/protocol/anonmetrics" "github.com/status-im/status-go/protocol/common" - "github.com/status-im/status-go/protocol/common/shard" "github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/encryption" "github.com/status-im/status-go/protocol/encryption/multidevice" @@ -920,20 +917,7 @@ func (m *Messenger) Start() (*MessengerResponse, error) { } } - joinedCommunities, err := m.communitiesManager.Joined() - if err != nil { - return nil, err - } - - for _, joinedCommunity := range joinedCommunities { - // resume importing message history archives in case - // imports have been interrupted previously - err := m.resumeHistoryArchivesImport(joinedCommunity.ID()) - if err != nil { - return nil, err - } - } - m.enableHistoryArchivesImportAfterDelay() + m.startHistoryArchivesImportLoop() if m.httpServer != nil { err = m.httpServer.Start() @@ -974,6 +958,28 @@ func (m *Messenger) Start() (*MessengerResponse, error) { return response, nil } +func (m *Messenger) startHistoryArchivesImportLoop() { + go func() { + defer gocommon.LogOnPanic() + joinedCommunities, err := m.communitiesManager.Joined() + if err != nil { + m.logger.Error("failed to get joined communities", zap.Error(err)) + return + } + + for _, joinedCommunity := range joinedCommunities { + // resume importing message history archives in case + // imports have been interrupted previously + err := m.resumeHistoryArchivesImport(joinedCommunity.ID()) + if err != nil { + m.logger.Error("failed to resume history archives import", zap.Error(err)) + return + } + } + m.enableHistoryArchivesImportAfterDelay() + }() +} + func (m *Messenger) SetMediaServer(server *server.MediaServer) { m.httpServer = server m.communitiesManager.SetMediaServer(server) @@ -1775,221 +1781,6 @@ func (m *Messenger) handlePushNotificationClientRegistrations(c chan struct{}) { }() } -// InitFilters analyzes chats and contacts in order to setup filters -// which are responsible for retrieving messages. -func (m *Messenger) InitFilters() error { - - // Seed the for color generation - rand.Seed(time.Now().Unix()) - - logger := m.logger.With(zap.String("site", "Init")) - - // Community requests will arrive in this pubsub topic - err := m.SubscribeToPubsubTopic(shard.DefaultNonProtectedPubsubTopic(), nil) - if err != nil { - return err - } - - var ( - filtersToInit []transport.FiltersToInitialize - publicKeys []*ecdsa.PublicKey - ) - - joinedCommunities, err := m.communitiesManager.Joined() - if err != nil { - return err - } - for _, org := range joinedCommunities { - // the org advertise on the public topic derived by the pk - filtersToInit = append(filtersToInit, m.DefaultFilters(org)...) - - // This is for status-go versions that didn't have `CommunitySettings` - // We need to ensure communities that existed before community settings - // were introduced will have community settings as well - exists, err := m.communitiesManager.CommunitySettingsExist(org.ID()) - if err != nil { - logger.Warn("failed to check if community settings exist", zap.Error(err)) - continue - } - - if !exists { - communitySettings := communities.CommunitySettings{ - CommunityID: org.IDString(), - HistoryArchiveSupportEnabled: true, - } - - err = m.communitiesManager.SaveCommunitySettings(communitySettings) - if err != nil { - logger.Warn("failed to save community settings", zap.Error(err)) - } - continue - } - - // In case we do have settings, but the history archive support is disabled - // for this community, we enable it, as this should be the default for all - // non-admin communities - communitySettings, err := m.communitiesManager.GetCommunitySettingsByID(org.ID()) - if err != nil { - logger.Warn("failed to fetch community settings", zap.Error(err)) - continue - } - - if !org.IsControlNode() && !communitySettings.HistoryArchiveSupportEnabled { - communitySettings.HistoryArchiveSupportEnabled = true - err = m.communitiesManager.UpdateCommunitySettings(*communitySettings) - if err != nil { - logger.Warn("failed to update community settings", zap.Error(err)) - } - } - } - - spectatedCommunities, err := m.communitiesManager.Spectated() - if err != nil { - return err - } - for _, org := range spectatedCommunities { - filtersToInit = append(filtersToInit, m.DefaultFilters(org)...) - } - - // Get chat IDs and public keys from the existing chats. - // TODO: Get only active chats by the query. - chats, err := m.persistence.Chats() - if err != nil { - return err - } - - communityInfo := make(map[string]*communities.Community) - var validChats []*Chat - for _, chat := range chats { - if err := chat.Validate(); err != nil { - logger.Warn("failed to validate chat", zap.Error(err)) - continue - } - validChats = append(validChats, chat) - } - - m.initChatsFirstMessageTimestamp(communityInfo, validChats) - - for _, chat := range validChats { - if !chat.Active || chat.Timeline() { - m.allChats.Store(chat.ID, chat) - continue - } - - switch chat.ChatType { - case ChatTypePublic, ChatTypeProfile: - filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID}) - case ChatTypeCommunityChat: - community, ok := communityInfo[chat.CommunityID] - if !ok { - community, err = m.communitiesManager.GetByIDString(chat.CommunityID) - if err != nil { - return err - } - communityInfo[chat.CommunityID] = community - } - - if chat.UnviewedMessagesCount > 0 || chat.UnviewedMentionsCount > 0 { - // Make sure the unread count is 0 for the channels the user cannot view - // It's possible that the users received messages to a channel before permissions were added - canView := community.CanView(&m.identity.PublicKey, chat.CommunityChatID()) - - if !canView { - chat.UnviewedMessagesCount = 0 - chat.UnviewedMentionsCount = 0 - } - } - - filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic()}) - case ChatTypeOneToOne: - pk, err := chat.PublicKey() - if err != nil { - return err - } - publicKeys = append(publicKeys, pk) - case ChatTypePrivateGroupChat: - for _, member := range chat.Members { - publicKey, err := member.PublicKey() - if err != nil { - return errors.Wrapf(err, "invalid public key for member %s in chat %s", member.ID, chat.Name) - } - publicKeys = append(publicKeys, publicKey) - } - default: - return errors.New("invalid chat type") - } - - m.allChats.Store(chat.ID, chat) - } - - // Timeline and profile chats are deprecated. - // This code can be removed after some reasonable time. - - // upsert timeline chat - if !deprecation.ChatProfileDeprecated { - err = m.ensureTimelineChat() - if err != nil { - return err - } - } - - // upsert profile chat - if !deprecation.ChatTimelineDeprecated { - err = m.ensureMyOwnProfileChat() - if err != nil { - return err - } - } - - // Get chat IDs and public keys from the contacts. - contacts, err := m.persistence.Contacts() - if err != nil { - return err - } - for idx, contact := range contacts { - if err = m.updateContactImagesURL(contact); err != nil { - return err - } - m.allContacts.Store(contact.ID, contacts[idx]) - // We only need filters for contacts added by us and not blocked. - if !contact.added() || contact.Blocked { - continue - } - publicKey, err := contact.PublicKey() - if err != nil { - logger.Error("failed to get contact's public key", zap.Error(err)) - continue - } - publicKeys = append(publicKeys, publicKey) - } - - _, err = m.transport.InitFilters(filtersToInit, publicKeys) - if err != nil { - return err - } - - // Init filters for the communities we control - var communityFiltersToInitialize []transport.CommunityFilterToInitialize - controlledCommunities, err := m.communitiesManager.Controlled() - if err != nil { - return err - } - - for _, c := range controlledCommunities { - communityFiltersToInitialize = append(communityFiltersToInitialize, transport.CommunityFilterToInitialize{ - Shard: c.Shard(), - PrivKey: c.PrivateKey(), - }) - } - - _, err = m.InitCommunityFilters(communityFiltersToInitialize) - if err != nil { - return err - } - - return nil -} - // Shutdown takes care of ensuring a clean shutdown of Messenger func (m *Messenger) Shutdown() (err error) { if m == nil { diff --git a/protocol/messenger_filter_init.go b/protocol/messenger_filter_init.go new file mode 100644 index 000000000..c8c47a665 --- /dev/null +++ b/protocol/messenger_filter_init.go @@ -0,0 +1,380 @@ +package protocol + +import ( + "crypto/ecdsa" + "math/rand" + "sync" + "time" + + "github.com/pkg/errors" + gocommon "github.com/status-im/status-go/common" + "github.com/status-im/status-go/deprecation" + "github.com/status-im/status-go/protocol/common/shard" + "github.com/status-im/status-go/protocol/communities" + "github.com/status-im/status-go/protocol/transport" + "go.uber.org/zap" +) + +// InitFilters analyzes chats and contacts in order to setup filters +// which are responsible for retrieving messages. +func (m *Messenger) InitFilters() error { + // Seed the for color generation + rand.Seed(time.Now().Unix()) + + // Community requests will arrive in this pubsub topic + if err := m.SubscribeToPubsubTopic(shard.DefaultNonProtectedPubsubTopic(), nil); err != nil { + return err + } + + filters, publicKeys, err := m.collectFiltersAndKeys() + if err != nil { + return err + } + + _, err = m.transport.InitFilters(filters, publicKeys) + return err +} + +func (m *Messenger) collectFiltersAndKeys() ([]transport.FiltersToInitialize, []*ecdsa.PublicKey, error) { + var wg sync.WaitGroup + errCh := make(chan error, 5) + filtersCh := make(chan []transport.FiltersToInitialize, 3) + publicKeysCh := make(chan []*ecdsa.PublicKey, 2) + + // Start all goroutines + wg.Add(5) + go m.processJoinedCommunities(&wg, filtersCh, errCh) + go m.processSpectatedCommunities(&wg, filtersCh, errCh) + go m.processChats(&wg, filtersCh, publicKeysCh, errCh) + go m.processContacts(&wg, publicKeysCh, errCh) + go m.processControlledCommunities(&wg, errCh) + + // Wait for all goroutines to complete and close channels + wg.Wait() + close(filtersCh) + close(publicKeysCh) + + // Check for errors + select { + case err := <-errCh: + return nil, nil, err + default: + } + + return m.collectResults(filtersCh, publicKeysCh) +} + +func (m *Messenger) processJoinedCommunities(wg *sync.WaitGroup, filtersCh chan<- []transport.FiltersToInitialize, errCh chan<- error) { + defer gocommon.LogOnPanic() + defer wg.Done() + + joinedCommunities, err := m.communitiesManager.Joined() + if err != nil { + errCh <- err + return + } + + filtersToInit := m.processCommunitiesSettings(joinedCommunities) + filtersCh <- filtersToInit +} + +func (m *Messenger) processCommunitiesSettings(communities []*communities.Community) []transport.FiltersToInitialize { + logger := m.logger.With(zap.String("site", "processCommunitiesSettings")) + var filtersToInit []transport.FiltersToInitialize + + for _, org := range communities { + // the org advertise on the public topic derived by the pk + filtersToInit = append(filtersToInit, m.DefaultFilters(org)...) + + if err := m.ensureCommunitySettings(org); err != nil { + logger.Warn("failed to process community settings", zap.Error(err)) + } + } + + return filtersToInit +} + +func (m *Messenger) ensureCommunitySettings(org *communities.Community) error { + // This is for status-go versions that didn't have `CommunitySettings` + // We need to ensure communities that existed before community settings + // were introduced will have community settings as well + exists, err := m.communitiesManager.CommunitySettingsExist(org.ID()) + if err != nil { + return err + } + + if !exists { + communitySettings := communities.CommunitySettings{ + CommunityID: org.IDString(), + HistoryArchiveSupportEnabled: true, + } + return m.communitiesManager.SaveCommunitySettings(communitySettings) + } + + // In case we do have settings, but the history archive support is disabled + // for this community, we enable it, as this should be the default for all + // non-admin communities + communitySettings, err := m.communitiesManager.GetCommunitySettingsByID(org.ID()) + if err != nil { + return err + } + + if !org.IsControlNode() && !communitySettings.HistoryArchiveSupportEnabled { + communitySettings.HistoryArchiveSupportEnabled = true + return m.communitiesManager.UpdateCommunitySettings(*communitySettings) + } + + return nil +} + +func (m *Messenger) processSpectatedCommunities(wg *sync.WaitGroup, filtersCh chan<- []transport.FiltersToInitialize, errCh chan<- error) { + defer gocommon.LogOnPanic() + defer wg.Done() + + spectatedCommunities, err := m.communitiesManager.Spectated() + if err != nil { + errCh <- err + return + } + + var filtersToInit []transport.FiltersToInitialize + for _, org := range spectatedCommunities { + filtersToInit = append(filtersToInit, m.DefaultFilters(org)...) + } + filtersCh <- filtersToInit +} + +func (m *Messenger) processChats(wg *sync.WaitGroup, filtersCh chan<- []transport.FiltersToInitialize, publicKeysCh chan<- []*ecdsa.PublicKey, errCh chan<- error) { + defer gocommon.LogOnPanic() + defer wg.Done() + + // Get chat IDs and public keys from the existing chats. + // TODO: Get only active chats by the query. + chats, err := m.persistence.Chats() + if err != nil { + errCh <- err + return + } + + validChats, communityInfo := m.validateAndProcessChats(chats) + filters, publicKeys, err := m.processValidChats(validChats, communityInfo) + if err != nil { + errCh <- err + return + } + + filtersCh <- filters + publicKeysCh <- publicKeys + + if err := m.processDeprecatedChats(); err != nil { + errCh <- err + } +} + +func (m *Messenger) validateAndProcessChats(chats []*Chat) ([]*Chat, map[string]*communities.Community) { + logger := m.logger.With(zap.String("site", "validateAndProcessChats")) + communityInfo := make(map[string]*communities.Community) + var validChats []*Chat + + for _, chat := range chats { + if err := chat.Validate(); err != nil { + logger.Warn("failed to validate chat", zap.Error(err)) + continue + } + validChats = append(validChats, chat) + } + + m.initChatsFirstMessageTimestamp(communityInfo, validChats) + return validChats, communityInfo +} + +func (m *Messenger) processValidChats(validChats []*Chat, communityInfo map[string]*communities.Community) ([]transport.FiltersToInitialize, []*ecdsa.PublicKey, error) { + var filtersToInit []transport.FiltersToInitialize + var publicKeys []*ecdsa.PublicKey + + for _, chat := range validChats { + if !chat.Active || chat.Timeline() { + m.allChats.Store(chat.ID, chat) + continue + } + + filters, pks, err := m.processSingleChat(chat, communityInfo) + if err != nil { + return nil, nil, err + } + + filtersToInit = append(filtersToInit, filters...) + publicKeys = append(publicKeys, pks...) + m.allChats.Store(chat.ID, chat) + } + + return filtersToInit, publicKeys, nil +} + +func (m *Messenger) processSingleChat(chat *Chat, communityInfo map[string]*communities.Community) ([]transport.FiltersToInitialize, []*ecdsa.PublicKey, error) { + var filters []transport.FiltersToInitialize + var publicKeys []*ecdsa.PublicKey + + switch chat.ChatType { + case ChatTypePublic, ChatTypeProfile: + filters = append(filters, transport.FiltersToInitialize{ChatID: chat.ID}) + + case ChatTypeCommunityChat: + filter, err := m.processCommunityChat(chat, communityInfo) + if err != nil { + return nil, nil, err + } + filters = append(filters, filter) + + case ChatTypeOneToOne: + pk, err := chat.PublicKey() + if err != nil { + return nil, nil, err + } + publicKeys = append(publicKeys, pk) + + case ChatTypePrivateGroupChat: + pks, err := m.processPrivateGroupChat(chat) + if err != nil { + return nil, nil, err + } + publicKeys = append(publicKeys, pks...) + + default: + return nil, nil, errors.New("invalid chat type") + } + + return filters, publicKeys, nil +} + +func (m *Messenger) processCommunityChat(chat *Chat, communityInfo map[string]*communities.Community) (transport.FiltersToInitialize, error) { + community, ok := communityInfo[chat.CommunityID] + if !ok { + var err error + community, err = m.communitiesManager.GetByIDString(chat.CommunityID) + if err != nil { + return transport.FiltersToInitialize{}, err + } + communityInfo[chat.CommunityID] = community + } + + if chat.UnviewedMessagesCount > 0 || chat.UnviewedMentionsCount > 0 { + // Make sure the unread count is 0 for the channels the user cannot view + // It's possible that the users received messages to a channel before permissions were added + if !community.CanView(&m.identity.PublicKey, chat.CommunityChatID()) { + chat.UnviewedMessagesCount = 0 + chat.UnviewedMentionsCount = 0 + } + } + + return transport.FiltersToInitialize{ + ChatID: chat.ID, + PubsubTopic: community.PubsubTopic(), + }, nil +} + +func (m *Messenger) processPrivateGroupChat(chat *Chat) ([]*ecdsa.PublicKey, error) { + var publicKeys []*ecdsa.PublicKey + for _, member := range chat.Members { + publicKey, err := member.PublicKey() + if err != nil { + return nil, errors.Wrapf(err, "invalid public key for member %s in chat %s", member.ID, chat.Name) + } + publicKeys = append(publicKeys, publicKey) + } + return publicKeys, nil +} + +func (m *Messenger) processDeprecatedChats() error { + // Timeline and profile chats are deprecated. + // This code can be removed after some reasonable time. + + // upsert timeline chat + if !deprecation.ChatProfileDeprecated { + if err := m.ensureTimelineChat(); err != nil { + return err + } + } + + // upsert profile chat + if !deprecation.ChatTimelineDeprecated { + if err := m.ensureMyOwnProfileChat(); err != nil { + return err + } + } + + return nil +} + +func (m *Messenger) processContacts(wg *sync.WaitGroup, publicKeysCh chan<- []*ecdsa.PublicKey, errCh chan<- error) { + defer gocommon.LogOnPanic() + defer wg.Done() + + // Get chat IDs and public keys from the contacts. + contacts, err := m.persistence.Contacts() + if err != nil { + errCh <- err + return + } + + var publicKeys []*ecdsa.PublicKey + for idx, contact := range contacts { + if err = m.updateContactImagesURL(contact); err != nil { + errCh <- err + return + } + m.allContacts.Store(contact.ID, contacts[idx]) + // We only need filters for contacts added by us and not blocked. + if !contact.added() || contact.Blocked { + continue + } + + publicKey, err := contact.PublicKey() + if err != nil { + m.logger.Error("failed to get contact's public key", zap.Error(err)) + continue + } + publicKeys = append(publicKeys, publicKey) + } + publicKeysCh <- publicKeys +} + +// processControlledCommunities Init filters for the communities we control +func (m *Messenger) processControlledCommunities(wg *sync.WaitGroup, errCh chan<- error) { + defer gocommon.LogOnPanic() + defer wg.Done() + + controlledCommunities, err := m.communitiesManager.Controlled() + if err != nil { + errCh <- err + return + } + + var communityFiltersToInitialize []transport.CommunityFilterToInitialize + for _, c := range controlledCommunities { + communityFiltersToInitialize = append(communityFiltersToInitialize, transport.CommunityFilterToInitialize{ + Shard: c.Shard(), + PrivKey: c.PrivateKey(), + }) + } + + _, err = m.InitCommunityFilters(communityFiltersToInitialize) + if err != nil { + errCh <- err + } +} + +func (m *Messenger) collectResults(filtersCh <-chan []transport.FiltersToInitialize, publicKeysCh <-chan []*ecdsa.PublicKey) ([]transport.FiltersToInitialize, []*ecdsa.PublicKey, error) { + var allFilters []transport.FiltersToInitialize + var allPublicKeys []*ecdsa.PublicKey + + for filters := range filtersCh { + allFilters = append(allFilters, filters...) + } + + for pks := range publicKeysCh { + allPublicKeys = append(allPublicKeys, pks...) + } + + return allFilters, allPublicKeys, nil +}