From c0c90947abf88202465eb66979584cc5e7905b7d Mon Sep 17 00:00:00 2001 From: Roman Volosovskyi Date: Mon, 31 Jan 2022 14:10:28 +0200 Subject: [PATCH] [#12783] Improve order of mailserver requests --- protocol/messenger.go | 2 +- protocol/messenger_mailserver.go | 85 ++++++++++++++++++++++++++++---- protocol/transport/filter.go | 2 + 3 files changed, 79 insertions(+), 10 deletions(-) diff --git a/protocol/messenger.go b/protocol/messenger.go index ba0ee6a2b..1a6151db5 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -3736,7 +3736,7 @@ func (m *Messenger) markAllRead(chatID string, clock uint64, shouldBeSynced bool // TODO(samyoul) remove storing of an updated reference pointer? m.allChats.Store(chat.ID, chat) - return nil + return m.persistence.SaveChats([]*Chat{chat}) } func (m *Messenger) MarkAllRead(chatID string) error { diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index feb9d1c6b..6bafc2ac6 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -3,6 +3,7 @@ package protocol import ( "context" "fmt" + "sort" "time" "github.com/pborman/uuid" @@ -211,6 +212,22 @@ func (m *Messenger) capToDefaultSyncPeriod(period uint32) (uint32, error) { return period - tolerance, nil } +func (m *Messenger) updateFiltersPriority(filters []*transport.Filter) { + for _, filter := range filters { + chatID := filter.ChatID + chat := m.Chat(chatID) + if chat != nil { + filter.Priority = chat.ReadMessagesAtClockValue + } + } +} + +func (m *Messenger) resetFiltersPriority(filters []*transport.Filter) { + for _, filter := range filters { + filter.Priority = 0 + } +} + // RequestAllHistoricMessages requests all the historic messages for any topic func (m *Messenger) RequestAllHistoricMessages() (*MessengerResponse, error) { shouldSync, err := m.shouldSync() @@ -236,7 +253,14 @@ func (m *Messenger) RequestAllHistoricMessages() (*MessengerResponse, error) { m.logger.Info("backup fetched") } - return m.syncFilters(m.transport.Filters()) + filters := m.transport.Filters() + m.updateFiltersPriority(filters) + defer m.resetFiltersPriority(filters) + return m.syncFilters(filters) +} + +func getPrioritizedBatches() []int { + return []int{1, 5, 10} } func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse, error) { @@ -255,6 +279,24 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse to := m.calculateMailserverTo() var syncedTopics []mailservers.MailserverTopic + + sort.Slice(filters[:], func(i, j int) bool { + p1 := filters[i].Priority + p2 := filters[j].Priority + return p1 > p2 + }) + prioritizedBatches := getPrioritizedBatches() + currentBatch := 0 + + if filters[0].Priority == 0 { + currentBatch = len(prioritizedBatches) + } + + defaultPeriodFromNow, err := m.defaultSyncPeriodFromNow() + if err != nil { + return nil, err + } + for _, filter := range filters { if !filter.Listen || filter.Ephemeral { continue @@ -270,17 +312,35 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse topicData, ok := topicsData[filter.Topic.String()] if !ok { - lastRequest, err := m.defaultSyncPeriodFromNow() - if err != nil { - return nil, err - } topicData = mailservers.MailserverTopic{ Topic: filter.Topic.String(), - LastRequest: int(lastRequest), + LastRequest: int(defaultPeriodFromNow), } } - batch, ok := batches[topicData.LastRequest] + batchID := topicData.LastRequest + + if currentBatch < len(prioritizedBatches) { + batch, ok := batches[currentBatch] + if ok { + prevTopicData, ok := topicsData[batch.Topics[0].String()] + if (!ok && topicData.LastRequest != int(defaultPeriodFromNow)) || + (ok && prevTopicData.LastRequest != topicData.LastRequest) { + currentBatch++ + } + } + if currentBatch < len(prioritizedBatches) { + batchID = currentBatch + currentBatchCap := prioritizedBatches[currentBatch] - 1 + if currentBatchCap == 0 { + currentBatch++ + } else { + prioritizedBatches[currentBatch] = currentBatchCap + } + } + } + + batch, ok := batches[batchID] if !ok { from, err := m.capToDefaultSyncPeriod(uint32(topicData.LastRequest)) if err != nil { @@ -292,7 +352,7 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse batch.ChatIDs = append(batch.ChatIDs, chatID) batch.Topics = append(batch.Topics, filter.Topic) - batches[topicData.LastRequest] = batch + batches[batchID] = batch // Set last request to the new `to` topicData.LastRequest = int(to) syncedTopics = append(syncedTopics, topicData) @@ -306,8 +366,15 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse m.config.messengerSignalsHandler.HistoryRequestStarted(requestID, len(batches)) } + batchKeys := make([]int, 0, len(batches)) + for k := range batches { + batchKeys = append(batchKeys, k) + } + sort.Ints(batchKeys) + i := 0 - for _, batch := range batches { + for _, k := range batchKeys { + batch := batches[k] i++ err := m.processMailserverBatch(batch) if err != nil { diff --git a/protocol/transport/filter.go b/protocol/transport/filter.go index 3d3ea1824..8a98fd725 100644 --- a/protocol/transport/filter.go +++ b/protocol/transport/filter.go @@ -25,6 +25,8 @@ type Filter struct { Listen bool `json:"listen"` // Ephemeral indicates that this is an ephemeral filter Ephemeral bool `json:"ephemeral"` + // Priority + Priority uint64 } func (c *Filter) IsPublic() bool {