From 3a4a940d049c733e1475b3541a331674c78718cb Mon Sep 17 00:00:00 2001 From: frank Date: Thu, 24 Oct 2024 14:43:56 +0800 Subject: [PATCH] refactor_: InitFilters --- protocol/messenger.go | 393 +++++++++++++++++++++--------- protocol/messenger_filter_init.go | 1 + 2 files changed, 274 insertions(+), 120 deletions(-) create mode 100644 protocol/messenger_filter_init.go diff --git a/protocol/messenger.go b/protocol/messenger.go index 7d9c455f8..b5b269ca4 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -920,20 +920,7 @@ func (m *Messenger) Start() (*MessengerResponse, error) { } } - joinedCommunities, err := m.communitiesManager.Joined() - if err != nil { - return nil, err - } - - for _, joinedCommunity := range joinedCommunities { - // resume importing message history archives in case - // imports have been interrupted previously - err := m.resumeHistoryArchivesImport(joinedCommunity.ID()) - if err != nil { - return nil, err - } - } - m.enableHistoryArchivesImportAfterDelay() + go m.startHistoryArchivesImportLoop() if m.httpServer != nil { err = m.httpServer.Start() @@ -974,6 +961,26 @@ func (m *Messenger) Start() (*MessengerResponse, error) { return response, nil } +func (m *Messenger) startHistoryArchivesImportLoop() { + defer gocommon.LogOnPanic() + joinedCommunities, err := m.communitiesManager.Joined() + if err != nil { + m.logger.Error("failed to get joined communities", zap.Error(err)) + return + } + + for _, joinedCommunity := range joinedCommunities { + // resume importing message history archives in case + // imports have been interrupted previously + err := m.resumeHistoryArchivesImport(joinedCommunity.ID()) + if err != nil { + m.logger.Error("failed to resume history archives import", zap.Error(err)) + continue + } + } + m.enableHistoryArchivesImportAfterDelay() +} + func (m *Messenger) SetMediaServer(server *server.MediaServer) { m.httpServer = server m.communitiesManager.SetMediaServer(server) @@ -1778,88 +1785,161 @@ func (m *Messenger) handlePushNotificationClientRegistrations(c chan struct{}) { // InitFilters analyzes chats and contacts in order to setup filters // which are responsible for retrieving messages. func (m *Messenger) InitFilters() error { - // Seed the for color generation rand.Seed(time.Now().Unix()) - logger := m.logger.With(zap.String("site", "Init")) - // Community requests will arrive in this pubsub topic - err := m.SubscribeToPubsubTopic(shard.DefaultNonProtectedPubsubTopic(), nil) + if err := m.SubscribeToPubsubTopic(shard.DefaultNonProtectedPubsubTopic(), nil); err != nil { + return err + } + + filters, publicKeys, err := m.collectFiltersAndKeys() if err != nil { return err } - var ( - filtersToInit []transport.FiltersToInitialize - publicKeys []*ecdsa.PublicKey - ) + _, err = m.transport.InitFilters(filters, publicKeys) + return err +} + +func (m *Messenger) collectFiltersAndKeys() ([]transport.FiltersToInitialize, []*ecdsa.PublicKey, error) { + var wg sync.WaitGroup + errCh := make(chan error, 5) + filtersCh := make(chan []transport.FiltersToInitialize, 3) + publicKeysCh := make(chan []*ecdsa.PublicKey, 2) + + wg.Add(5) + go m.processJoinedCommunities(&wg, filtersCh, errCh) + go m.processSpectatedCommunities(&wg, filtersCh, errCh) + go m.processChats(&wg, filtersCh, publicKeysCh, errCh) + go m.processContacts(&wg, publicKeysCh, errCh) + go m.processControlledCommunities(&wg, errCh) + + wg.Wait() + close(filtersCh) + close(publicKeysCh) + + select { + case err := <-errCh: + return nil, nil, err + default: + } + + return m.collectResults(filtersCh, publicKeysCh) +} + +func (m *Messenger) processJoinedCommunities(wg *sync.WaitGroup, filtersCh chan<- []transport.FiltersToInitialize, errCh chan<- error) { + defer gocommon.LogOnPanic() + defer wg.Done() joinedCommunities, err := m.communitiesManager.Joined() if err != nil { - return err + errCh <- err + return } - for _, org := range joinedCommunities { + + filtersToInit := m.processCommunitiesSettings(joinedCommunities) + filtersCh <- filtersToInit +} + +func (m *Messenger) processCommunitiesSettings(communities []*communities.Community) []transport.FiltersToInitialize { + logger := m.logger.With(zap.String("site", "processCommunitiesSettings")) + var filtersToInit []transport.FiltersToInitialize + + for _, org := range communities { // the org advertise on the public topic derived by the pk filtersToInit = append(filtersToInit, m.DefaultFilters(org)...) - // This is for status-go versions that didn't have `CommunitySettings` - // We need to ensure communities that existed before community settings - // were introduced will have community settings as well - exists, err := m.communitiesManager.CommunitySettingsExist(org.ID()) - if err != nil { - logger.Warn("failed to check if community settings exist", zap.Error(err)) - continue - } - - if !exists { - communitySettings := communities.CommunitySettings{ - CommunityID: org.IDString(), - HistoryArchiveSupportEnabled: true, - } - - err = m.communitiesManager.SaveCommunitySettings(communitySettings) - if err != nil { - logger.Warn("failed to save community settings", zap.Error(err)) - } - continue - } - - // In case we do have settings, but the history archive support is disabled - // for this community, we enable it, as this should be the default for all - // non-admin communities - communitySettings, err := m.communitiesManager.GetCommunitySettingsByID(org.ID()) - if err != nil { - logger.Warn("failed to fetch community settings", zap.Error(err)) - continue - } - - if !org.IsControlNode() && !communitySettings.HistoryArchiveSupportEnabled { - communitySettings.HistoryArchiveSupportEnabled = true - err = m.communitiesManager.UpdateCommunitySettings(*communitySettings) - if err != nil { - logger.Warn("failed to update community settings", zap.Error(err)) - } + if err := m.ensureCommunitySettings(org); err != nil { + logger.Warn("failed to process community settings", zap.Error(err)) } } - spectatedCommunities, err := m.communitiesManager.Spectated() + return filtersToInit +} + +func (m *Messenger) ensureCommunitySettings(org *communities.Community) error { + // This is for status-go versions that didn't have `CommunitySettings` + // We need to ensure communities that existed before community settings + // were introduced will have community settings as well + exists, err := m.communitiesManager.CommunitySettingsExist(org.ID()) if err != nil { return err } + + if !exists { + communitySettings := communities.CommunitySettings{ + CommunityID: org.IDString(), + HistoryArchiveSupportEnabled: true, + } + return m.communitiesManager.SaveCommunitySettings(communitySettings) + } + + // In case we do have settings, but the history archive support is disabled + // for this community, we enable it, as this should be the default for all + // non-admin communities + communitySettings, err := m.communitiesManager.GetCommunitySettingsByID(org.ID()) + if err != nil { + return err + } + + if !org.IsControlNode() && !communitySettings.HistoryArchiveSupportEnabled { + communitySettings.HistoryArchiveSupportEnabled = true + return m.communitiesManager.UpdateCommunitySettings(*communitySettings) + } + + return nil +} + +func (m *Messenger) processSpectatedCommunities(wg *sync.WaitGroup, filtersCh chan<- []transport.FiltersToInitialize, errCh chan<- error) { + defer gocommon.LogOnPanic() + defer wg.Done() + + spectatedCommunities, err := m.communitiesManager.Spectated() + if err != nil { + errCh <- err + return + } + + var filtersToInit []transport.FiltersToInitialize for _, org := range spectatedCommunities { filtersToInit = append(filtersToInit, m.DefaultFilters(org)...) } + filtersCh <- filtersToInit +} + +func (m *Messenger) processChats(wg *sync.WaitGroup, filtersCh chan<- []transport.FiltersToInitialize, publicKeysCh chan<- []*ecdsa.PublicKey, errCh chan<- error) { + defer gocommon.LogOnPanic() + defer wg.Done() // Get chat IDs and public keys from the existing chats. // TODO: Get only active chats by the query. chats, err := m.persistence.Chats() if err != nil { - return err + errCh <- err + return } + validChats, communityInfo := m.validateAndProcessChats(chats) + filters, publicKeys, err := m.processValidChats(validChats, communityInfo) + if err != nil { + errCh <- err + return + } + + filtersCh <- filters + publicKeysCh <- publicKeys + + if err := m.processDeprecatedChats(); err != nil { + errCh <- err + } +} + +func (m *Messenger) validateAndProcessChats(chats []*Chat) ([]*Chat, map[string]*communities.Community) { + logger := m.logger.With(zap.String("site", "validateAndProcessChats")) communityInfo := make(map[string]*communities.Community) var validChats []*Chat + for _, chat := range chats { if err := chat.Validate(); err != nil { logger.Warn("failed to validate chat", zap.Error(err)) @@ -1869,6 +1949,12 @@ func (m *Messenger) InitFilters() error { } m.initChatsFirstMessageTimestamp(communityInfo, validChats) + return validChats, communityInfo +} + +func (m *Messenger) processValidChats(validChats []*Chat, communityInfo map[string]*communities.Community) ([]transport.FiltersToInitialize, []*ecdsa.PublicKey, error) { + var filtersToInit []transport.FiltersToInitialize + var publicKeys []*ecdsa.PublicKey for _, chat := range validChats { if !chat.Active || chat.Timeline() { @@ -1876,105 +1962,159 @@ func (m *Messenger) InitFilters() error { continue } - switch chat.ChatType { - case ChatTypePublic, ChatTypeProfile: - filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID}) - case ChatTypeCommunityChat: - community, ok := communityInfo[chat.CommunityID] - if !ok { - community, err = m.communitiesManager.GetByIDString(chat.CommunityID) - if err != nil { - return err - } - communityInfo[chat.CommunityID] = community - } - - if chat.UnviewedMessagesCount > 0 || chat.UnviewedMentionsCount > 0 { - // Make sure the unread count is 0 for the channels the user cannot view - // It's possible that the users received messages to a channel before permissions were added - canView := community.CanView(&m.identity.PublicKey, chat.CommunityChatID()) - - if !canView { - chat.UnviewedMessagesCount = 0 - chat.UnviewedMentionsCount = 0 - } - } - - filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic()}) - case ChatTypeOneToOne: - pk, err := chat.PublicKey() - if err != nil { - return err - } - publicKeys = append(publicKeys, pk) - case ChatTypePrivateGroupChat: - for _, member := range chat.Members { - publicKey, err := member.PublicKey() - if err != nil { - return errors.Wrapf(err, "invalid public key for member %s in chat %s", member.ID, chat.Name) - } - publicKeys = append(publicKeys, publicKey) - } - default: - return errors.New("invalid chat type") + filters, pks, err := m.processSingleChat(chat, communityInfo) + if err != nil { + return nil, nil, err } + filtersToInit = append(filtersToInit, filters...) + publicKeys = append(publicKeys, pks...) m.allChats.Store(chat.ID, chat) } + return filtersToInit, publicKeys, nil +} + +func (m *Messenger) processSingleChat(chat *Chat, communityInfo map[string]*communities.Community) ([]transport.FiltersToInitialize, []*ecdsa.PublicKey, error) { + var filters []transport.FiltersToInitialize + var publicKeys []*ecdsa.PublicKey + + switch chat.ChatType { + case ChatTypePublic, ChatTypeProfile: + filters = append(filters, transport.FiltersToInitialize{ChatID: chat.ID}) + + case ChatTypeCommunityChat: + filter, err := m.processCommunityChat(chat, communityInfo) + if err != nil { + return nil, nil, err + } + filters = append(filters, filter) + + case ChatTypeOneToOne: + pk, err := chat.PublicKey() + if err != nil { + return nil, nil, err + } + publicKeys = append(publicKeys, pk) + + case ChatTypePrivateGroupChat: + pks, err := m.processPrivateGroupChat(chat) + if err != nil { + return nil, nil, err + } + publicKeys = append(publicKeys, pks...) + + default: + return nil, nil, errors.New("invalid chat type") + } + + return filters, publicKeys, nil +} + +func (m *Messenger) processCommunityChat(chat *Chat, communityInfo map[string]*communities.Community) (transport.FiltersToInitialize, error) { + community, ok := communityInfo[chat.CommunityID] + if !ok { + var err error + community, err = m.communitiesManager.GetByIDString(chat.CommunityID) + if err != nil { + return transport.FiltersToInitialize{}, err + } + communityInfo[chat.CommunityID] = community + } + + if chat.UnviewedMessagesCount > 0 || chat.UnviewedMentionsCount > 0 { + // Make sure the unread count is 0 for the channels the user cannot view + // It's possible that the users received messages to a channel before permissions were added + if !community.CanView(&m.identity.PublicKey, chat.CommunityChatID()) { + chat.UnviewedMessagesCount = 0 + chat.UnviewedMentionsCount = 0 + } + } + + return transport.FiltersToInitialize{ + ChatID: chat.ID, + PubsubTopic: community.PubsubTopic(), + }, nil +} + +func (m *Messenger) processPrivateGroupChat(chat *Chat) ([]*ecdsa.PublicKey, error) { + var publicKeys []*ecdsa.PublicKey + for _, member := range chat.Members { + publicKey, err := member.PublicKey() + if err != nil { + return nil, errors.Wrapf(err, "invalid public key for member %s in chat %s", member.ID, chat.Name) + } + publicKeys = append(publicKeys, publicKey) + } + return publicKeys, nil +} + +func (m *Messenger) processDeprecatedChats() error { // Timeline and profile chats are deprecated. // This code can be removed after some reasonable time. // upsert timeline chat if !deprecation.ChatProfileDeprecated { - err = m.ensureTimelineChat() - if err != nil { + if err := m.ensureTimelineChat(); err != nil { return err } } // upsert profile chat if !deprecation.ChatTimelineDeprecated { - err = m.ensureMyOwnProfileChat() - if err != nil { + if err := m.ensureMyOwnProfileChat(); err != nil { return err } } + return nil +} + +func (m *Messenger) processContacts(wg *sync.WaitGroup, publicKeysCh chan<- []*ecdsa.PublicKey, errCh chan<- error) { + defer gocommon.LogOnPanic() + defer wg.Done() + // Get chat IDs and public keys from the contacts. contacts, err := m.persistence.Contacts() if err != nil { - return err + errCh <- err + return } + + var publicKeys []*ecdsa.PublicKey for idx, contact := range contacts { if err = m.updateContactImagesURL(contact); err != nil { - return err + errCh <- err + return } m.allContacts.Store(contact.ID, contacts[idx]) // We only need filters for contacts added by us and not blocked. if !contact.added() || contact.Blocked { continue } + publicKey, err := contact.PublicKey() if err != nil { - logger.Error("failed to get contact's public key", zap.Error(err)) + m.logger.Error("failed to get contact's public key", zap.Error(err)) continue } publicKeys = append(publicKeys, publicKey) } + publicKeysCh <- publicKeys +} - _, err = m.transport.InitFilters(filtersToInit, publicKeys) - if err != nil { - return err - } +// processControlledCommunities Init filters for the communities we control +func (m *Messenger) processControlledCommunities(wg *sync.WaitGroup, errCh chan<- error) { + defer gocommon.LogOnPanic() + defer wg.Done() - // Init filters for the communities we control - var communityFiltersToInitialize []transport.CommunityFilterToInitialize controlledCommunities, err := m.communitiesManager.Controlled() if err != nil { - return err + errCh <- err + return } + var communityFiltersToInitialize []transport.CommunityFilterToInitialize for _, c := range controlledCommunities { communityFiltersToInitialize = append(communityFiltersToInitialize, transport.CommunityFilterToInitialize{ Shard: c.Shard(), @@ -1984,10 +2124,23 @@ func (m *Messenger) InitFilters() error { _, err = m.InitCommunityFilters(communityFiltersToInitialize) if err != nil { - return err + errCh <- err + } +} + +func (m *Messenger) collectResults(filtersCh <-chan []transport.FiltersToInitialize, publicKeysCh <-chan []*ecdsa.PublicKey) ([]transport.FiltersToInitialize, []*ecdsa.PublicKey, error) { + var allFilters []transport.FiltersToInitialize + var allPublicKeys []*ecdsa.PublicKey + + for filters := range filtersCh { + allFilters = append(allFilters, filters...) } - return nil + for pks := range publicKeysCh { + allPublicKeys = append(allPublicKeys, pks...) + } + + return allFilters, allPublicKeys, nil } // Shutdown takes care of ensuring a clean shutdown of Messenger diff --git a/protocol/messenger_filter_init.go b/protocol/messenger_filter_init.go new file mode 100644 index 000000000..2d0eaff9f --- /dev/null +++ b/protocol/messenger_filter_init.go @@ -0,0 +1 @@ +package protocol