fix: mailserver batches should be grouped by pubsub topics (#4458)

This commit is contained in:
richΛrd 2023-12-13 17:14:06 -04:00 committed by GitHub
parent b52a9ce0e5
commit 42cf9fa740
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 71 additions and 60 deletions

View File

@ -28,6 +28,8 @@ var oneMonthInSeconds uint32 = 31 * 24 * 60 * 60
var mailserverMaxTries uint = 2
var mailserverMaxFailedRequests uint = 2
const OneDayInSeconds = 86400
// maxTopicsPerRequest sets the batch size to limit the number of topics per store query
var maxTopicsPerRequest int = 10
@ -354,7 +356,7 @@ func (m *Messenger) syncFiltersFrom(filters []*transport.Filter, lastRequest uin
topicsData[fmt.Sprintf("%s-%s", topic.PubsubTopic, topic.ContentTopic)] = topic
}
batches := make(map[int]MailserverBatch)
batches := make(map[string]map[int]MailserverBatch)
to := m.calculateMailserverTo()
var syncedTopics []mailservers.MailserverTopic
@ -391,6 +393,10 @@ func (m *Messenger) syncFiltersFrom(filters []*transport.Filter, lastRequest uin
}
for pubsubTopic, contentTopics := range contentTopicsPerPubsubTopic {
if _, ok := batches[pubsubTopic]; !ok {
batches[pubsubTopic] = make(map[int]MailserverBatch)
}
for _, filter := range contentTopics {
var chatID string
// If the filter has an identity, we use it as a chatID, otherwise is a public chat/community chat filter
@ -419,7 +425,7 @@ func (m *Messenger) syncFiltersFrom(filters []*transport.Filter, lastRequest uin
batchID := topicData.LastRequest
if currentBatch < len(prioritizedBatches) {
batch, ok := batches[currentBatch]
batch, ok := batches[pubsubTopic][currentBatch]
if ok {
prevTopicData, ok := topicsData[batch.PubsubTopic+batch.Topics[0].String()]
if (!ok && topicData.LastRequest != int(defaultPeriodFromNow)) ||
@ -438,7 +444,7 @@ func (m *Messenger) syncFiltersFrom(filters []*transport.Filter, lastRequest uin
}
}
batch, ok := batches[batchID]
batch, ok := batches[pubsubTopic][batchID]
if !ok {
from := uint32(topicData.LastRequest)
if capToDefaultSyncPeriod {
@ -453,7 +459,8 @@ func (m *Messenger) syncFiltersFrom(filters []*transport.Filter, lastRequest uin
batch.ChatIDs = append(batch.ChatIDs, chatID)
batch.PubsubTopic = pubsubTopic
batch.Topics = append(batch.Topics, filter.ContentTopic)
batches[batchID] = batch
batches[pubsubTopic][batchID] = batch
// Set last request to the new `to`
topicData.LastRequest = int(to)
syncedTopics = append(syncedTopics, topicData)
@ -464,46 +471,48 @@ func (m *Messenger) syncFiltersFrom(filters []*transport.Filter, lastRequest uin
m.config.messengerSignalsHandler.HistoryRequestStarted(len(batches))
}
batchKeys := make([]int, 0, len(batches))
for k := range batches {
batchKeys = append(batchKeys, k)
}
sort.Ints(batchKeys)
var batches24h []MailserverBatch
keysToIterate := append([]int{}, batchKeys...)
for {
// For all batches
var tmpKeysToIterate []int
for _, k := range keysToIterate {
batch := batches[k]
for pubsubTopic := range batches {
batchKeys := make([]int, 0, len(batches[pubsubTopic]))
for k := range batches[pubsubTopic] {
batchKeys = append(batchKeys, k)
}
sort.Ints(batchKeys)
dayBatch := MailserverBatch{
To: batch.To,
Cursor: batch.Cursor,
PubsubTopic: batch.PubsubTopic,
Topics: batch.Topics,
ChatIDs: batch.ChatIDs,
keysToIterate := append([]int{}, batchKeys...)
for {
// For all batches
var tmpKeysToIterate []int
for _, k := range keysToIterate {
batch := batches[pubsubTopic][k]
dayBatch := MailserverBatch{
To: batch.To,
Cursor: batch.Cursor,
PubsubTopic: batch.PubsubTopic,
Topics: batch.Topics,
ChatIDs: batch.ChatIDs,
}
from := batch.To - OneDayInSeconds
if from > batch.From {
dayBatch.From = from
batches24h = append(batches24h, dayBatch)
// Replace og batch with new dates
batch.To = from
batches[pubsubTopic][k] = batch
tmpKeysToIterate = append(tmpKeysToIterate, k)
} else {
batches24h = append(batches24h, batch)
}
}
from := batch.To - 86400
if from > batch.From {
dayBatch.From = from
batches24h = append(batches24h, dayBatch)
// Replace og batch with new dates
batch.To = from
batches[k] = batch
tmpKeysToIterate = append(tmpKeysToIterate, k)
} else {
batches24h = append(batches24h, batch)
if len(tmpKeysToIterate) == 0 {
break
}
keysToIterate = tmpKeysToIterate
}
if len(tmpKeysToIterate) == 0 {
break
}
keysToIterate = tmpKeysToIterate
}
i := 0
@ -527,31 +536,33 @@ func (m *Messenger) syncFiltersFrom(filters []*transport.Filter, lastRequest uin
}
var messagesToBeSaved []*common.Message
for _, batch := range batches {
for _, id := range batch.ChatIDs {
chat, ok := m.allChats.Load(id)
if !ok || !chat.Active || chat.Timeline() || chat.ProfileUpdates() {
continue
}
gap, err := m.calculateGapForChat(chat, batch.From)
if err != nil {
return nil, err
}
if chat.SyncedFrom == 0 || chat.SyncedFrom > batch.From {
chat.SyncedFrom = batch.From
}
for _, batches := range batches {
for _, batch := range batches {
for _, id := range batch.ChatIDs {
chat, ok := m.allChats.Load(id)
if !ok || !chat.Active || chat.Timeline() || chat.ProfileUpdates() {
continue
}
gap, err := m.calculateGapForChat(chat, batch.From)
if err != nil {
return nil, err
}
if chat.SyncedFrom == 0 || chat.SyncedFrom > batch.From {
chat.SyncedFrom = batch.From
}
chat.SyncedTo = to
chat.SyncedTo = to
err = m.persistence.SetSyncTimestamps(chat.SyncedFrom, chat.SyncedTo, chat.ID)
if err != nil {
return nil, err
}
err = m.persistence.SetSyncTimestamps(chat.SyncedFrom, chat.SyncedTo, chat.ID)
if err != nil {
return nil, err
}
response.AddChat(chat)
if gap != nil {
response.AddMessage(gap)
messagesToBeSaved = append(messagesToBeSaved, gap)
response.AddChat(chat)
if gap != nil {
response.AddMessage(gap)
messagesToBeSaved = append(messagesToBeSaved, gap)
}
}
}
}