fix: better calculation of from/to mailserver batch parameters (#4702)

This commit is contained in:
Igor Sirotin 2024-02-12 12:20:56 +00:00 committed by GitHub
parent daef5c56e2
commit 8d4f4904c6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 29 additions and 17 deletions

View File

@ -656,7 +656,7 @@ func (m *Messenger) FetchMessages(request *requests.FetchMessages) error {
return ErrChatNotFound
}
_, err := m.fetchMessages(chat.ID, oneMonthInSeconds)
_, err := m.fetchMessages(chat.ID, oneMonthDuration)
if err != nil {
return err
}

View File

@ -27,12 +27,12 @@ const (
// tolerance is how many seconds of potentially out-of-order messages we want to fetch
tolerance uint32 = 60
mailserverRequestTimeout = 30 * time.Second
oneMonthInSeconds uint32 = 31 * 24 * 60 * 60
mailserverMaxTries uint = 2
mailserverMaxFailedRequests uint = 2
mailserverRequestTimeout = 30 * time.Second
mailserverMaxTries uint = 2
mailserverMaxFailedRequests uint = 2
OneDayInSeconds = 86400
oneDayDuration = 24 * time.Hour
oneMonthDuration = 31 * oneDayDuration
)
// maxTopicsPerRequest sets the batch size to limit the number of topics per store query
@ -184,7 +184,15 @@ func (m *Messenger) scheduleSyncFilters(filters []*transport.Filter) (bool, erro
}
func (m *Messenger) calculateMailserverTo() uint32 {
return uint32(m.getTimesource().GetCurrentTime() / 1000)
seconds := float64(m.GetCurrentTimeInMillis()) / 1000
return uint32(math.Ceil(seconds))
}
func (m *Messenger) calculateMailserverTimeBounds(duration time.Duration) (uint32, uint32) {
now := float64(m.GetCurrentTimeInMillis()) / 1000
to := uint32(math.Ceil(now))
from := uint32(math.Floor(now)) - uint32(duration.Seconds())
return from, to
}
func (m *Messenger) filtersForChat(chatID string) ([]*transport.Filter, error) {
@ -249,8 +257,8 @@ func (m *Messenger) syncBackup() error {
return errors.New("personal topic filter not loaded")
}
to := m.calculateMailserverTo()
from := uint32(m.getTimesource().GetCurrentTime()/1000) - oneMonthInSeconds
from, to := m.calculateMailserverTimeBounds(oneMonthDuration)
batch := MailserverBatch{From: from, To: to, Topics: []types.TopicType{filter.ContentTopic}}
err := m.processMailserverBatch(batch)
if err != nil {
@ -490,7 +498,7 @@ func (m *Messenger) syncFiltersFrom(filters []*transport.Filter, lastRequest uin
ChatIDs: batch.ChatIDs,
}
from := batch.To - OneDayInSeconds
from := batch.To - uint32(oneDayDuration.Seconds())
if from > batch.From {
dayBatch.From = from
batches24h = append(batches24h, dayBatch)
@ -650,7 +658,13 @@ func processMailserverBatch(
for _, t := range batch.Topics {
topicStrings = append(topicStrings, t.String())
}
logger = logger.With(zap.Any("chatIDs", batch.ChatIDs), zap.String("fromString", time.Unix(int64(batch.From), 0).Format(time.RFC3339)), zap.String("toString", time.Unix(int64(batch.To), 0).Format(time.RFC3339)), zap.Any("topic", topicStrings), zap.Int64("from", int64(batch.From)), zap.Int64("to", int64(batch.To)))
logger = logger.With(zap.Any("chatIDs", batch.ChatIDs),
zap.String("fromString", time.Unix(int64(batch.From), 0).Format(time.RFC3339)),
zap.String("toString", time.Unix(int64(batch.To), 0).Format(time.RFC3339)),
zap.Any("topic", topicStrings),
zap.Int64("from", int64(batch.From)),
zap.Int64("to", int64(batch.To)))
logger.Info("syncing topic")
wg := sync.WaitGroup{}
@ -1000,9 +1014,9 @@ func (m *Messenger) ConnectionChanged(state connection.State) {
m.connectionState = state
}
func (m *Messenger) fetchMessages(chatID string, duration uint32) (uint32, error) {
to := uint32(m.getTimesource().GetCurrentTime() / 1000)
from := to - duration
func (m *Messenger) fetchMessages(chatID string, duration time.Duration) (uint32, error) {
from, to := m.calculateMailserverTimeBounds(duration)
_, err := m.performMailserverRequest(func() (*MessengerResponse, error) {
pubsubTopic, topics, err := m.topicsForChat(chatID)
if err != nil {

View File

@ -3,7 +3,6 @@ package protocol
import (
"database/sql"
"fmt"
"math"
"strings"
"sync"
"time"
@ -518,8 +517,7 @@ func (r *storeNodeRequest) routine() {
}
// Start store node request
to := uint32(math.Ceil(float64(r.manager.messenger.GetCurrentTimeInMillis()) / 1000))
from := to - oneMonthInSeconds
from, to := r.manager.messenger.calculateMailserverTimeBounds(oneMonthDuration)
_, err := r.manager.messenger.performMailserverRequest(func() (*MessengerResponse, error) {
batch := MailserverBatch{