package protocol import ( "context" "crypto/sha256" "encoding/hex" "fmt" "math" "sort" "sync" "time" "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" "go.uber.org/zap" gocommon "github.com/status-im/status-go/common" "github.com/status-im/status-go/connection" "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/services/mailservers" ) const ( initialStoreNodeRequestPageSize = 4 defaultStoreNodeRequestPageSize = 50 // tolerance is how many seconds of potentially out-of-order messages we want to fetch tolerance uint32 = 60 mailserverRequestTimeout = 30 * time.Second mailserverMaxTries uint = 2 mailserverMaxFailedRequests uint = 2 oneDayDuration = 24 * time.Hour oneMonthDuration = 31 * oneDayDuration ) // maxTopicsPerRequest sets the batch size to limit the number of topics per store query var maxTopicsPerRequest int = 10 var ErrNoFiltersForChat = errors.New("no filter registered for given chat") func (m *Messenger) shouldSync() (bool, error) { // TODO (pablo) support community store node as well if m.mailserverCycle.activeMailserver == nil || !m.Online() { return false, nil } useMailserver, err := m.settings.CanUseMailservers() if err != nil { m.logger.Error("failed to get use mailservers", zap.Error(err)) return false, err } return useMailserver, nil } func (m *Messenger) scheduleSyncChat(chat *Chat) (bool, error) { shouldSync, err := m.shouldSync() if err != nil { m.logger.Error("failed to get should sync", zap.Error(err)) return false, err } if !shouldSync { return false, nil } go func() { defer gocommon.LogOnPanic() ms := m.getActiveMailserver(chat.CommunityID) _, err = m.performMailserverRequest(ms, func(mailServer mailservers.Mailserver) (*MessengerResponse, error) { response, err := m.syncChatWithFilters(mailServer, chat.ID) if err != nil { m.logger.Error("failed to sync chat", zap.Error(err)) return nil, err } if m.config.messengerSignalsHandler != nil { m.config.messengerSignalsHandler.MessengerResponse(response) } return response, nil }) if err != nil { m.logger.Error("failed to perform mailserver request", zap.Error(err)) } }() return true, nil } func (m *Messenger) connectToNewMailserverAndWait() error { // Handle pinned mailservers m.logger.Info("disconnecting mailserver") pinnedMailserver, err := m.getPinnedMailserver() if err != nil { m.logger.Error("could not obtain the pinned mailserver", zap.Error(err)) return err } // If pinned mailserver is not nil, no need to disconnect and wait for it to be available if pinnedMailserver == nil { m.disconnectActiveMailserver(graylistBackoff) } return m.findNewMailserver() } func (m *Messenger) performMailserverRequest(ms *mailservers.Mailserver, fn func(mailServer mailservers.Mailserver) (*MessengerResponse, error)) (*MessengerResponse, error) { if ms == nil { return nil, errors.New("mailserver not available") } m.mailserverCycle.RLock() defer m.mailserverCycle.RUnlock() var tries uint = 0 for tries < mailserverMaxTries { if !m.communityStorenodes.IsCommunityStoreNode(ms.ID) && !m.isMailserverAvailable(ms.ID) { return nil, errors.New("storenode not available") } m.logger.Info("trying performing mailserver requests", zap.Uint("try", tries), zap.String("mailserverID", ms.ID)) // Peform request response, err := fn(*ms) // pass by value because we don't want the fn to modify the mailserver if err == nil { // Reset failed requests m.logger.Debug("mailserver request performed successfully", zap.String("mailserverID", ms.ID)) ms.FailedRequests = 0 return response, nil } m.logger.Error("failed to perform mailserver request", zap.String("mailserverID", ms.ID), zap.Uint("tries", tries), zap.Error(err), ) tries++ // Increment failed requests ms.FailedRequests++ // Change mailserver if ms.FailedRequests >= mailserverMaxFailedRequests { return nil, errors.New("too many failed requests") } // Wait a couple of second not to spam time.Sleep(2 * time.Second) } return nil, errors.New("failed to perform mailserver request") } func (m *Messenger) scheduleSyncFilters(filters []*transport.Filter) (bool, error) { shouldSync, err := m.shouldSync() if err != nil { m.logger.Error("failed to get shouldSync", zap.Error(err)) return false, err } if !shouldSync { return false, nil } go func() { defer gocommon.LogOnPanic() // split filters by community store node so we can request the filters to the correct mailserver filtersByMs := m.SplitFiltersByStoreNode(filters) for communityID, filtersForMs := range filtersByMs { ms := m.getActiveMailserver(communityID) _, err := m.performMailserverRequest(ms, func(ms mailservers.Mailserver) (*MessengerResponse, error) { response, err := m.syncFilters(ms, filtersForMs) if err != nil { m.logger.Error("failed to sync filter", zap.Error(err)) return nil, err } if m.config.messengerSignalsHandler != nil { m.config.messengerSignalsHandler.MessengerResponse(response) } return response, nil }) if err != nil { m.logger.Error("failed to perform mailserver request", zap.Error(err)) } } }() return true, nil } func (m *Messenger) calculateMailserverTo() uint32 { seconds := float64(m.GetCurrentTimeInMillis()) / 1000 return uint32(math.Ceil(seconds)) } func (m *Messenger) calculateMailserverTimeBounds(duration time.Duration) (uint32, uint32) { now := float64(m.GetCurrentTimeInMillis()) / 1000 to := uint32(math.Ceil(now)) from := uint32(math.Floor(now)) - uint32(duration.Seconds()) return from, to } func (m *Messenger) filtersForChat(chatID string) ([]*transport.Filter, error) { chat, ok := m.allChats.Load(chatID) if !ok { return nil, ErrChatNotFound } var filters []*transport.Filter if chat.OneToOne() { // We sync our own topic and any eventual negotiated publicKeys := []string{common.PubkeyToHex(&m.identity.PublicKey), chatID} filters = m.transport.FiltersByIdentities(publicKeys) } else if chat.PrivateGroupChat() { var publicKeys []string for _, m := range chat.Members { publicKeys = append(publicKeys, m.ID) } filters = m.transport.FiltersByIdentities(publicKeys) } else { filter := m.transport.FilterByChatID(chatID) if filter == nil { return nil, ErrNoFiltersForChat } filters = []*transport.Filter{filter} } return filters, nil } func (m *Messenger) topicsForChat(chatID string) (string, []types.TopicType, error) { filters, err := m.filtersForChat(chatID) if err != nil { return "", nil, err } var contentTopics []types.TopicType for _, filter := range filters { contentTopics = append(contentTopics, filter.ContentTopic) } return filters[0].PubsubTopic, contentTopics, nil } func (m *Messenger) syncChatWithFilters(ms mailservers.Mailserver, chatID string) (*MessengerResponse, error) { filters, err := m.filtersForChat(chatID) if err != nil { return nil, err } return m.syncFilters(ms, filters) } func (m *Messenger) syncBackup() error { filter := m.transport.PersonalTopicFilter() if filter == nil { return errors.New("personal topic filter not loaded") } canSync, err := m.canSyncWithStoreNodes() if err != nil { return err } if !canSync { return nil } from, to := m.calculateMailserverTimeBounds(oneMonthDuration) batch := MailserverBatch{From: from, To: to, Topics: []types.TopicType{filter.ContentTopic}} ms := m.getActiveMailserver(filter.ChatID) err = m.processMailserverBatch(*ms, batch) if err != nil { return err } return m.settings.SetBackupFetched(true) } func (m *Messenger) defaultSyncPeriodFromNow() (uint32, error) { defaultSyncPeriod, err := m.settings.GetDefaultSyncPeriod() if err != nil { return 0, err } return uint32(m.getTimesource().GetCurrentTime()/1000) - defaultSyncPeriod, nil } // capToDefaultSyncPeriod caps the sync period to the default func (m *Messenger) capToDefaultSyncPeriod(period uint32) (uint32, error) { d, err := m.defaultSyncPeriodFromNow() if err != nil { return 0, err } if d > period { return d, nil } return period - tolerance, nil } func (m *Messenger) updateFiltersPriority(filters []*transport.Filter) { for _, filter := range filters { chatID := filter.ChatID chat := m.Chat(chatID) if chat != nil { filter.Priority = chat.ReadMessagesAtClockValue } } } func (m *Messenger) resetFiltersPriority(filters []*transport.Filter) { for _, filter := range filters { filter.Priority = 0 } } func (m *Messenger) SplitFiltersByStoreNode(filters []*transport.Filter) map[string][]*transport.Filter { // split filters by community store node so we can request the filters to the correct mailserver filtersByMs := make(map[string][]*transport.Filter, len(filters)) for _, f := range filters { communityID := "" // none by default if chat, ok := m.allChats.Load(f.ChatID); ok && chat.CommunityChat() && m.communityStorenodes.HasStorenodeSetup(chat.CommunityID) { communityID = chat.CommunityID } if _, exists := filtersByMs[communityID]; !exists { filtersByMs[communityID] = make([]*transport.Filter, 0, len(filters)) } filtersByMs[communityID] = append(filtersByMs[communityID], f) } return filtersByMs } // RequestAllHistoricMessages requests all the historic messages for any topic func (m *Messenger) RequestAllHistoricMessages(forceFetchingBackup, withRetries bool) (*MessengerResponse, error) { shouldSync, err := m.shouldSync() if err != nil { return nil, err } if !shouldSync { return nil, nil } backupFetched, err := m.settings.BackupFetched() if err != nil { return nil, err } if m.mailserversDatabase == nil { return nil, nil } if forceFetchingBackup || !backupFetched { m.logger.Info("fetching backup") err := m.syncBackup() if err != nil { return nil, err } m.logger.Info("backup fetched") } filters := m.transport.Filters() m.updateFiltersPriority(filters) defer m.resetFiltersPriority(filters) filtersByMs := m.SplitFiltersByStoreNode(filters) allResponses := &MessengerResponse{} for communityID, filtersForMs := range filtersByMs { ms := m.getActiveMailserver(communityID) if withRetries { response, err := m.performMailserverRequest(ms, func(ms mailservers.Mailserver) (*MessengerResponse, error) { return m.syncFilters(ms, filtersForMs) }) if err != nil { return nil, err } if response != nil { allResponses.AddChats(response.Chats()) allResponses.AddMessages(response.Messages()) } continue } response, err := m.syncFilters(*ms, filtersForMs) if err != nil { return nil, err } if response != nil { allResponses.AddChats(response.Chats()) allResponses.AddMessages(response.Messages()) } } return allResponses, nil } const missingMessageCheckPeriod = 30 * time.Second func (m *Messenger) checkForMissingMessagesLoop() { defer gocommon.LogOnPanic() t := time.NewTicker(missingMessageCheckPeriod) defer t.Stop() mailserverAvailableSignal := m.mailserverCycle.availabilitySubscriptions.Subscribe() for { select { case <-m.quit: return // Wait for mailserver available, also triggered on mailserver change case <-mailserverAvailableSignal: mailserverAvailableSignal = m.mailserverCycle.availabilitySubscriptions.Subscribe() case <-t.C: } filters := m.transport.Filters() filtersByMs := m.SplitFiltersByStoreNode(filters) for communityID, filtersForMs := range filtersByMs { ms := m.getActiveMailserver(communityID) if ms == nil { continue } peerID, err := ms.PeerID() if err != nil { m.logger.Error("could not obtain the peerID") return } m.transport.SetCriteriaForMissingMessageVerification(peerID, filtersForMs) } } } func getPrioritizedBatches() []int { return []int{1, 5, 10} } func (m *Messenger) syncFiltersFrom(ms mailservers.Mailserver, filters []*transport.Filter, lastRequest uint32) (*MessengerResponse, error) { canSync, err := m.canSyncWithStoreNodes() if err != nil { return nil, err } if !canSync { return nil, nil } response := &MessengerResponse{} topicInfo, err := m.mailserversDatabase.Topics() if err != nil { return nil, err } topicsData := make(map[string]mailservers.MailserverTopic) for _, topic := range topicInfo { topicsData[fmt.Sprintf("%s-%s", topic.PubsubTopic, topic.ContentTopic)] = topic } batches := make(map[string]map[int]MailserverBatch) to := m.calculateMailserverTo() var syncedTopics []mailservers.MailserverTopic sort.Slice(filters[:], func(i, j int) bool { p1 := filters[i].Priority p2 := filters[j].Priority return p1 > p2 }) prioritizedBatches := getPrioritizedBatches() currentBatch := 0 if len(filters) == 0 || filters[0].Priority == 0 { currentBatch = len(prioritizedBatches) } defaultPeriodFromNow, err := m.defaultSyncPeriodFromNow() if err != nil { return nil, err } contentTopicsPerPubsubTopic := make(map[string]map[string]*transport.Filter) for _, filter := range filters { if !filter.Listen || filter.Ephemeral { continue } contentTopics, ok := contentTopicsPerPubsubTopic[filter.PubsubTopic] if !ok { contentTopics = make(map[string]*transport.Filter) } contentTopics[filter.ContentTopic.String()] = filter contentTopicsPerPubsubTopic[filter.PubsubTopic] = contentTopics } for pubsubTopic, contentTopics := range contentTopicsPerPubsubTopic { if _, ok := batches[pubsubTopic]; !ok { batches[pubsubTopic] = make(map[int]MailserverBatch) } for _, filter := range contentTopics { var chatID string // If the filter has an identity, we use it as a chatID, otherwise is a public chat/community chat filter if len(filter.Identity) != 0 { chatID = filter.Identity } else { chatID = filter.ChatID } topicData, ok := topicsData[fmt.Sprintf("%s-%s", filter.PubsubTopic, filter.ContentTopic)] var capToDefaultSyncPeriod = true if !ok { if lastRequest == 0 { lastRequest = defaultPeriodFromNow } topicData = mailservers.MailserverTopic{ PubsubTopic: filter.PubsubTopic, ContentTopic: filter.ContentTopic.String(), LastRequest: int(defaultPeriodFromNow), } } else if lastRequest != 0 { topicData.LastRequest = int(lastRequest) capToDefaultSyncPeriod = false } batchID := topicData.LastRequest if currentBatch < len(prioritizedBatches) { batch, ok := batches[pubsubTopic][currentBatch] if ok { prevTopicData, ok := topicsData[batch.PubsubTopic+batch.Topics[0].String()] if (!ok && topicData.LastRequest != int(defaultPeriodFromNow)) || (ok && prevTopicData.LastRequest != topicData.LastRequest) { currentBatch++ } } if currentBatch < len(prioritizedBatches) { batchID = currentBatch currentBatchCap := prioritizedBatches[currentBatch] - 1 if currentBatchCap == 0 { currentBatch++ } else { prioritizedBatches[currentBatch] = currentBatchCap } } } batch, ok := batches[pubsubTopic][batchID] if !ok { from := uint32(topicData.LastRequest) if capToDefaultSyncPeriod { from, err = m.capToDefaultSyncPeriod(uint32(topicData.LastRequest)) if err != nil { return nil, err } } batch = MailserverBatch{From: from, To: to} } batch.ChatIDs = append(batch.ChatIDs, chatID) batch.PubsubTopic = pubsubTopic batch.Topics = append(batch.Topics, filter.ContentTopic) batches[pubsubTopic][batchID] = batch // Set last request to the new `to` topicData.LastRequest = int(to) syncedTopics = append(syncedTopics, topicData) } } if m.config.messengerSignalsHandler != nil { m.config.messengerSignalsHandler.HistoryRequestStarted(len(batches)) } var batches24h []MailserverBatch for pubsubTopic := range batches { batchKeys := make([]int, 0, len(batches[pubsubTopic])) for k := range batches[pubsubTopic] { batchKeys = append(batchKeys, k) } sort.Ints(batchKeys) keysToIterate := append([]int{}, batchKeys...) for { // For all batches var tmpKeysToIterate []int for _, k := range keysToIterate { batch := batches[pubsubTopic][k] dayBatch := MailserverBatch{ To: batch.To, Cursor: batch.Cursor, PubsubTopic: batch.PubsubTopic, Topics: batch.Topics, ChatIDs: batch.ChatIDs, } from := batch.To - uint32(oneDayDuration.Seconds()) if from > batch.From { dayBatch.From = from batches24h = append(batches24h, dayBatch) // Replace og batch with new dates batch.To = from batches[pubsubTopic][k] = batch tmpKeysToIterate = append(tmpKeysToIterate, k) } else { batches24h = append(batches24h, batch) } } if len(tmpKeysToIterate) == 0 { break } keysToIterate = tmpKeysToIterate } } for _, batch := range batches24h { err := m.processMailserverBatch(ms, batch) if err != nil { m.logger.Error("error syncing topics", zap.Error(err)) return nil, err } } m.logger.Debug("topics synced") if m.config.messengerSignalsHandler != nil { m.config.messengerSignalsHandler.HistoryRequestCompleted() } err = m.mailserversDatabase.AddTopics(syncedTopics) if err != nil { return nil, err } var messagesToBeSaved []*common.Message for _, batches := range batches { for _, batch := range batches { for _, id := range batch.ChatIDs { chat, ok := m.allChats.Load(id) if !ok || !chat.Active || chat.Timeline() || chat.ProfileUpdates() { continue } gap, err := m.calculateGapForChat(chat, batch.From) if err != nil { return nil, err } if chat.SyncedFrom == 0 || chat.SyncedFrom > batch.From { chat.SyncedFrom = batch.From } chat.SyncedTo = to err = m.persistence.SetSyncTimestamps(chat.SyncedFrom, chat.SyncedTo, chat.ID) if err != nil { return nil, err } response.AddChat(chat) if gap != nil { response.AddMessage(gap) messagesToBeSaved = append(messagesToBeSaved, gap) } } } } if len(messagesToBeSaved) > 0 { err := m.persistence.SaveMessages(messagesToBeSaved) if err != nil { return nil, err } } return response, nil } func (m *Messenger) syncFilters(ms mailservers.Mailserver, filters []*transport.Filter) (*MessengerResponse, error) { return m.syncFiltersFrom(ms, filters, 0) } func (m *Messenger) calculateGapForChat(chat *Chat, from uint32) (*common.Message, error) { // Chat was never synced, no gap necessary if chat.SyncedTo == 0 { return nil, nil } // If we filled the gap, nothing to do if chat.SyncedTo >= from { return nil, nil } timestamp := m.getTimesource().GetCurrentTime() message := &common.Message{ ChatMessage: &protobuf.ChatMessage{ ChatId: chat.ID, Text: "Gap message", MessageType: protobuf.MessageType_SYSTEM_MESSAGE_GAP, ContentType: protobuf.ChatMessage_SYSTEM_MESSAGE_GAP, Clock: uint64(from) * 1000, Timestamp: timestamp, }, GapParameters: &common.GapParameters{ From: chat.SyncedTo, To: from, }, From: common.PubkeyToHex(&m.identity.PublicKey), WhisperTimestamp: timestamp, LocalChatID: chat.ID, Seen: true, ID: types.EncodeHex(crypto.Keccak256([]byte(fmt.Sprintf("%s-%d-%d", chat.ID, chat.SyncedTo, from)))), } return message, m.persistence.SaveMessages([]*common.Message{message}) } type work struct { pubsubTopic string contentTopics []types.TopicType cursor types.StoreRequestCursor limit uint32 } type messageRequester interface { SendMessagesRequestForTopics( ctx context.Context, peerID peer.ID, from, to uint32, previousStoreCursor types.StoreRequestCursor, pubsubTopic string, contentTopics []types.TopicType, limit uint32, waitForResponse bool, processEnvelopes bool, ) (cursor types.StoreRequestCursor, envelopesCount int, err error) } func processMailserverBatch( ctx context.Context, messageRequester messageRequester, batch MailserverBatch, storenodeID peer.ID, logger *zap.Logger, pageLimit uint32, shouldProcessNextPage func(int) (bool, uint32), processEnvelopes bool, ) error { var topicStrings []string for _, t := range batch.Topics { topicStrings = append(topicStrings, t.String()) } logger = logger.With(zap.String("batch hash", batch.Hash())) logger.Info("syncing topic", zap.Any("chatIDs", batch.ChatIDs), zap.String("fromString", time.Unix(int64(batch.From), 0).Format(time.RFC3339)), zap.String("toString", time.Unix(int64(batch.To), 0).Format(time.RFC3339)), zap.Any("topic", topicStrings), zap.Int64("from", int64(batch.From)), zap.Int64("to", int64(batch.To))) wg := sync.WaitGroup{} workWg := sync.WaitGroup{} workCh := make(chan work, 1000) // each batch item is split in 10 topics bunch and sent to this channel workCompleteCh := make(chan struct{}) // once all batch items are processed, this channel is triggered semaphore := make(chan int, 3) // limit the number of concurrent queries errCh := make(chan error) ctx, cancel := context.WithCancel(ctx) defer cancel() // Producer wg.Add(1) go func() { defer gocommon.LogOnPanic() defer func() { logger.Debug("mailserver batch producer complete") wg.Done() }() allWorks := int(math.Ceil(float64(len(batch.Topics)) / float64(maxTopicsPerRequest))) workWg.Add(allWorks) for i := 0; i < len(batch.Topics); i += maxTopicsPerRequest { j := i + maxTopicsPerRequest if j > len(batch.Topics) { j = len(batch.Topics) } select { case <-ctx.Done(): logger.Debug("processBatch producer - context done") return default: logger.Debug("processBatch producer - creating work") workCh <- work{ pubsubTopic: batch.PubsubTopic, contentTopics: batch.Topics[i:j], limit: pageLimit, } time.Sleep(50 * time.Millisecond) } } go func() { defer gocommon.LogOnPanic() workWg.Wait() workCompleteCh <- struct{}{} }() logger.Debug("processBatch producer complete") }() var result error loop: for { select { case <-ctx.Done(): logger.Debug("processBatch cleanup - context done") result = ctx.Err() if errors.Is(result, context.Canceled) { result = nil } break loop case w, ok := <-workCh: if !ok { continue } logger.Debug("processBatch - received work") semaphore <- 1 go func(w work) { // Consumer defer gocommon.LogOnPanic() defer func() { workWg.Done() <-semaphore }() queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout) cursor, envelopesCount, err := messageRequester.SendMessagesRequestForTopics(queryCtx, storenodeID, batch.From, batch.To, w.cursor, w.pubsubTopic, w.contentTopics, w.limit, true, processEnvelopes) queryCancel() if err != nil { logger.Debug("failed to send request", zap.Error(err)) errCh <- err return } processNextPage := true nextPageLimit := pageLimit if shouldProcessNextPage != nil { processNextPage, nextPageLimit = shouldProcessNextPage(envelopesCount) } if !processNextPage { return } // Check the cursor after calling `shouldProcessNextPage`. // The app might use process the fetched envelopes in the callback for own needs. if cursor == nil { return } logger.Debug("processBatch producer - creating work (cursor)") workWg.Add(1) workCh <- work{ pubsubTopic: w.pubsubTopic, contentTopics: w.contentTopics, cursor: cursor, limit: nextPageLimit, } }(w) case err := <-errCh: logger.Debug("processBatch - received error", zap.Error(err)) cancel() // Kill go routines return err case <-workCompleteCh: logger.Debug("processBatch - all jobs complete") cancel() // Kill go routines } } wg.Wait() // NOTE(camellos): Disabling for now, not critical and I'd rather take a bit more time // to test it //logger.Info("waiting until message processed") //m.waitUntilP2PMessagesProcessed() logger.Info("synced topic", zap.NamedError("hasError", result)) return result } func (m *Messenger) canSyncWithStoreNodes() (bool, error) { if m.featureFlags.StoreNodesDisabled { return false, nil } if m.connectionState.IsExpensive() { return m.settings.CanSyncOnMobileNetwork() } return true, nil } func (m *Messenger) DisableStoreNodes() { m.featureFlags.StoreNodesDisabled = true } func (m *Messenger) processMailserverBatch(ms mailservers.Mailserver, batch MailserverBatch) error { canSync, err := m.canSyncWithStoreNodes() if err != nil { return err } if !canSync { return nil } mailserverID, err := ms.PeerID() if err != nil { return err } logger := m.logger.With(zap.String("mailserverID", ms.ID)) return processMailserverBatch(m.ctx, m.transport, batch, mailserverID, logger, defaultStoreNodeRequestPageSize, nil, false) } func (m *Messenger) processMailserverBatchWithOptions(ms mailservers.Mailserver, batch MailserverBatch, pageLimit uint32, shouldProcessNextPage func(int) (bool, uint32), processEnvelopes bool) error { canSync, err := m.canSyncWithStoreNodes() if err != nil { return err } if !canSync { return nil } mailserverID, err := ms.PeerID() if err != nil { return err } logger := m.logger.With(zap.String("mailserverID", ms.ID)) return processMailserverBatch(m.ctx, m.transport, batch, mailserverID, logger, pageLimit, shouldProcessNextPage, processEnvelopes) } type MailserverBatch struct { From uint32 To uint32 Cursor string PubsubTopic string Topics []types.TopicType ChatIDs []string } func (mb *MailserverBatch) Hash() string { data := fmt.Sprintf("%d%d%s%s%v%v", mb.From, mb.To, mb.Cursor, mb.PubsubTopic, mb.Topics, mb.ChatIDs) hash := sha256.Sum256([]byte(data)) return hex.EncodeToString(hash[:4]) } func (m *Messenger) SyncChatFromSyncedFrom(chatID string) (uint32, error) { chat, ok := m.allChats.Load(chatID) if !ok { return 0, ErrChatNotFound } ms := m.getActiveMailserver(chat.CommunityID) var from uint32 _, err := m.performMailserverRequest(ms, func(ms mailservers.Mailserver) (*MessengerResponse, error) { canSync, err := m.canSyncWithStoreNodes() if err != nil { return nil, err } if !canSync { return nil, nil } pubsubTopic, topics, err := m.topicsForChat(chatID) if err != nil { return nil, nil } defaultSyncPeriod, err := m.settings.GetDefaultSyncPeriod() if err != nil { return nil, err } batch := MailserverBatch{ ChatIDs: []string{chatID}, To: chat.SyncedFrom, From: chat.SyncedFrom - defaultSyncPeriod, PubsubTopic: pubsubTopic, Topics: topics, } if m.config.messengerSignalsHandler != nil { m.config.messengerSignalsHandler.HistoryRequestStarted(1) } err = m.processMailserverBatch(ms, batch) if err != nil { return nil, err } if m.config.messengerSignalsHandler != nil { m.config.messengerSignalsHandler.HistoryRequestCompleted() } if chat.SyncedFrom == 0 || chat.SyncedFrom > batch.From { chat.SyncedFrom = batch.From } m.logger.Debug("setting sync timestamps", zap.Int64("from", int64(batch.From)), zap.Int64("to", int64(chat.SyncedTo)), zap.String("chatID", chatID)) err = m.persistence.SetSyncTimestamps(batch.From, chat.SyncedTo, chat.ID) from = batch.From return nil, err }) if err != nil { return 0, err } return from, nil } func (m *Messenger) FillGaps(chatID string, messageIDs []string) error { messages, err := m.persistence.MessagesByIDs(messageIDs) if err != nil { return err } chat, ok := m.allChats.Load(chatID) if !ok { return errors.New("chat not existing") } pubsubTopic, topics, err := m.topicsForChat(chatID) if err != nil { return err } var lowestFrom, highestTo uint32 for _, message := range messages { if message.GapParameters == nil { return errors.New("can't sync non-gap message") } if lowestFrom == 0 || lowestFrom > message.GapParameters.From { lowestFrom = message.GapParameters.From } if highestTo < message.GapParameters.To { highestTo = message.GapParameters.To } } batch := MailserverBatch{ ChatIDs: []string{chatID}, To: highestTo, From: lowestFrom, PubsubTopic: pubsubTopic, Topics: topics, } if m.config.messengerSignalsHandler != nil { m.config.messengerSignalsHandler.HistoryRequestStarted(1) } ms := m.getActiveMailserver(chat.CommunityID) err = m.processMailserverBatch(*ms, batch) if err != nil { return err } if m.config.messengerSignalsHandler != nil { m.config.messengerSignalsHandler.HistoryRequestCompleted() } return m.persistence.DeleteMessages(messageIDs) } func (m *Messenger) waitUntilP2PMessagesProcessed() { // nolint: unused ticker := time.NewTicker(50 * time.Millisecond) for { //nolint: gosimple select { case <-ticker.C: if !m.transport.ProcessingP2PMessages() { ticker.Stop() return } } } } func (m *Messenger) LoadFilters(filters []*transport.Filter) ([]*transport.Filter, error) { return m.transport.LoadFilters(filters) } func (m *Messenger) ToggleUseMailservers(value bool) error { m.mailserverCycle.Lock() defer m.mailserverCycle.Unlock() err := m.settings.SetUseMailservers(value) if err != nil { return err } m.disconnectActiveMailserver(backoffByUserAction) if value { m.cycleMailservers() return nil } return nil } func (m *Messenger) SetPinnedMailservers(mailservers map[string]string) error { err := m.settings.SetPinnedMailservers(mailservers) if err != nil { return err } m.disconnectActiveMailserver(backoffByUserAction) m.cycleMailservers() return nil } func (m *Messenger) RemoveFilters(filters []*transport.Filter) error { return m.transport.RemoveFilters(filters) } func (m *Messenger) ConnectionChanged(state connection.State) { m.transport.ConnectionChanged(state) if !m.connectionState.Offline && state.Offline { m.sender.StopDatasync() } if m.connectionState.Offline && !state.Offline { err := m.sender.StartDatasync(m.mvdsStatusChangeEvent, m.sendDataSync) if err != nil { m.logger.Error("failed to start datasync", zap.Error(err)) } } m.connectionState = state } func (m *Messenger) fetchMessages(chatID string, duration time.Duration) (uint32, error) { from, to := m.calculateMailserverTimeBounds(duration) chat, ok := m.allChats.Load(chatID) if !ok { return 0, ErrChatNotFound } ms := m.getActiveMailserver(chat.CommunityID) _, err := m.performMailserverRequest(ms, func(ms mailservers.Mailserver) (*MessengerResponse, error) { canSync, err := m.canSyncWithStoreNodes() if err != nil { return nil, err } if !canSync { return nil, nil } m.logger.Debug("fetching messages", zap.String("chatID", chatID), zap.String("mailserver", ms.Name)) pubsubTopic, topics, err := m.topicsForChat(chatID) if err != nil { return nil, nil } batch := MailserverBatch{ ChatIDs: []string{chatID}, From: from, To: to, PubsubTopic: pubsubTopic, Topics: topics, } if m.config.messengerSignalsHandler != nil { m.config.messengerSignalsHandler.HistoryRequestStarted(1) } err = m.processMailserverBatch(ms, batch) if err != nil { return nil, err } if m.config.messengerSignalsHandler != nil { m.config.messengerSignalsHandler.HistoryRequestCompleted() } if chat.SyncedFrom == 0 || chat.SyncedFrom > batch.From { chat.SyncedFrom = batch.From } m.logger.Debug("setting sync timestamps", zap.Int64("from", int64(batch.From)), zap.Int64("to", int64(chat.SyncedTo)), zap.String("chatID", chatID)) err = m.persistence.SetSyncTimestamps(batch.From, chat.SyncedTo, chat.ID) from = batch.From return nil, err }) if err != nil { return 0, err } return from, nil }