[#12783] Improve order of mailserver requests

This commit is contained in:
Roman Volosovskyi 2022-01-31 14:10:28 +02:00
parent 2e8007c12d
commit c0c90947ab
No known key found for this signature in database
GPG Key ID: 0238A4B5ECEE70DE
3 changed files with 79 additions and 10 deletions

View File

@ -3736,7 +3736,7 @@ func (m *Messenger) markAllRead(chatID string, clock uint64, shouldBeSynced bool
// TODO(samyoul) remove storing of an updated reference pointer?
m.allChats.Store(chat.ID, chat)
return nil
return m.persistence.SaveChats([]*Chat{chat})
}
func (m *Messenger) MarkAllRead(chatID string) error {

View File

@ -3,6 +3,7 @@ package protocol
import (
"context"
"fmt"
"sort"
"time"
"github.com/pborman/uuid"
@ -211,6 +212,22 @@ func (m *Messenger) capToDefaultSyncPeriod(period uint32) (uint32, error) {
return period - tolerance, nil
}
func (m *Messenger) updateFiltersPriority(filters []*transport.Filter) {
for _, filter := range filters {
chatID := filter.ChatID
chat := m.Chat(chatID)
if chat != nil {
filter.Priority = chat.ReadMessagesAtClockValue
}
}
}
func (m *Messenger) resetFiltersPriority(filters []*transport.Filter) {
for _, filter := range filters {
filter.Priority = 0
}
}
// RequestAllHistoricMessages requests all the historic messages for any topic
func (m *Messenger) RequestAllHistoricMessages() (*MessengerResponse, error) {
shouldSync, err := m.shouldSync()
@ -236,7 +253,14 @@ func (m *Messenger) RequestAllHistoricMessages() (*MessengerResponse, error) {
m.logger.Info("backup fetched")
}
return m.syncFilters(m.transport.Filters())
filters := m.transport.Filters()
m.updateFiltersPriority(filters)
defer m.resetFiltersPriority(filters)
return m.syncFilters(filters)
}
func getPrioritizedBatches() []int {
return []int{1, 5, 10}
}
func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse, error) {
@ -255,6 +279,24 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse
to := m.calculateMailserverTo()
var syncedTopics []mailservers.MailserverTopic
sort.Slice(filters[:], func(i, j int) bool {
p1 := filters[i].Priority
p2 := filters[j].Priority
return p1 > p2
})
prioritizedBatches := getPrioritizedBatches()
currentBatch := 0
if filters[0].Priority == 0 {
currentBatch = len(prioritizedBatches)
}
defaultPeriodFromNow, err := m.defaultSyncPeriodFromNow()
if err != nil {
return nil, err
}
for _, filter := range filters {
if !filter.Listen || filter.Ephemeral {
continue
@ -270,17 +312,35 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse
topicData, ok := topicsData[filter.Topic.String()]
if !ok {
lastRequest, err := m.defaultSyncPeriodFromNow()
if err != nil {
return nil, err
}
topicData = mailservers.MailserverTopic{
Topic: filter.Topic.String(),
LastRequest: int(lastRequest),
LastRequest: int(defaultPeriodFromNow),
}
}
batch, ok := batches[topicData.LastRequest]
batchID := topicData.LastRequest
if currentBatch < len(prioritizedBatches) {
batch, ok := batches[currentBatch]
if ok {
prevTopicData, ok := topicsData[batch.Topics[0].String()]
if (!ok && topicData.LastRequest != int(defaultPeriodFromNow)) ||
(ok && prevTopicData.LastRequest != topicData.LastRequest) {
currentBatch++
}
}
if currentBatch < len(prioritizedBatches) {
batchID = currentBatch
currentBatchCap := prioritizedBatches[currentBatch] - 1
if currentBatchCap == 0 {
currentBatch++
} else {
prioritizedBatches[currentBatch] = currentBatchCap
}
}
}
batch, ok := batches[batchID]
if !ok {
from, err := m.capToDefaultSyncPeriod(uint32(topicData.LastRequest))
if err != nil {
@ -292,7 +352,7 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse
batch.ChatIDs = append(batch.ChatIDs, chatID)
batch.Topics = append(batch.Topics, filter.Topic)
batches[topicData.LastRequest] = batch
batches[batchID] = batch
// Set last request to the new `to`
topicData.LastRequest = int(to)
syncedTopics = append(syncedTopics, topicData)
@ -306,8 +366,15 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse
m.config.messengerSignalsHandler.HistoryRequestStarted(requestID, len(batches))
}
batchKeys := make([]int, 0, len(batches))
for k := range batches {
batchKeys = append(batchKeys, k)
}
sort.Ints(batchKeys)
i := 0
for _, batch := range batches {
for _, k := range batchKeys {
batch := batches[k]
i++
err := m.processMailserverBatch(batch)
if err != nil {

View File

@ -25,6 +25,8 @@ type Filter struct {
Listen bool `json:"listen"`
// Ephemeral indicates that this is an ephemeral filter
Ephemeral bool `json:"ephemeral"`
// Priority
Priority uint64
}
func (c *Filter) IsPublic() bool {