fix: limit store queries to 10 content topics

This commit is contained in:
Richard Ramos 2022-11-08 15:17:50 -04:00 committed by RichΛrd
parent 77b7ce5a09
commit 4f5a25befa
1 changed files with 30 additions and 15 deletions

View File

@ -21,11 +21,15 @@ import (
// tolerance is how many seconds of potentially out-of-order messages we want to fetch // tolerance is how many seconds of potentially out-of-order messages we want to fetch
var tolerance uint32 = 60 var tolerance uint32 = 60
var mailserverRequestTimeout = 10 * time.Second var mailserverRequestTimeout = 10 * time.Second
var oneMonthInSeconds uint32 = 31 * 24 * 60 * 60 var oneMonthInSeconds uint32 = 31 * 24 * 60 * 60
var mailserverMaxTries uint = 2 var mailserverMaxTries uint = 2
var mailserverMaxFailedRequests uint = 2 var mailserverMaxFailedRequests uint = 2
// maxTopicsPerRequest sets the batch size to limit the number of topics per store query
var maxTopicsPerRequest int = 10
var ErrNoFiltersForChat = errors.New("no filter registered for given chat") var ErrNoFiltersForChat = errors.New("no filter registered for given chat")
func (m *Messenger) shouldSync() (bool, error) { func (m *Messenger) shouldSync() (bool, error) {
@ -571,28 +575,39 @@ func (m *Messenger) processMailserverBatch(batch MailserverBatch) error {
if err != nil { if err != nil {
return err return err
} }
cursor, storeCursor, err := m.transport.SendMessagesRequestForTopics(ctx, mailserverID, batch.From, batch.To, nil, nil, batch.Topics, true)
if err != nil {
logger.Error("failed to send request", zap.Error(err))
return err
}
for len(cursor) != 0 || storeCursor != nil { for i := 0; i < len(batch.Topics); i += maxTopicsPerRequest {
logger.Info("retrieved cursor", zap.String("cursor", types.EncodeHex(cursor))) j := i + maxTopicsPerRequest
err = func() error { if j > len(batch.Topics) {
ctx, cancel := context.WithTimeout(context.Background(), mailserverRequestTimeout) j = len(batch.Topics)
defer cancel() }
cursor, storeCursor, err = m.transport.SendMessagesRequestForTopics(ctx, mailserverID, batch.From, batch.To, cursor, storeCursor, batch.Topics, true) topicsForIteration := batch.Topics[i:j]
cursor, storeCursor, err := m.transport.SendMessagesRequestForTopics(ctx, mailserverID, batch.From, batch.To, nil, nil, topicsForIteration, true)
if err != nil {
logger.Error("failed to send request", zap.Error(err))
return err
}
for len(cursor) != 0 || storeCursor != nil {
logger.Info("retrieved cursor", zap.String("cursor", types.EncodeHex(cursor)))
err = func() error {
ctx, cancel := context.WithTimeout(context.Background(), mailserverRequestTimeout)
defer cancel()
cursor, storeCursor, err = m.transport.SendMessagesRequestForTopics(ctx, mailserverID, batch.From, batch.To, cursor, storeCursor, topicsForIteration, true)
if err != nil {
return err
}
return nil
}()
if err != nil { if err != nil {
return err return err
} }
return nil
}()
if err != nil {
return err
} }
} }
// NOTE(camellos): Disabling for now, not critical and I'd rather take a bit more time // NOTE(camellos): Disabling for now, not critical and I'd rather take a bit more time
// to test it // to test it
//logger.Info("waiting until message processed") //logger.Info("waiting until message processed")