diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index 29e77d8a7..a2aa62168 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -21,11 +21,15 @@ import ( // tolerance is how many seconds of potentially out-of-order messages we want to fetch var tolerance uint32 = 60 + var mailserverRequestTimeout = 10 * time.Second var oneMonthInSeconds uint32 = 31 * 24 * 60 * 60 var mailserverMaxTries uint = 2 var mailserverMaxFailedRequests uint = 2 +// maxTopicsPerRequest sets the batch size to limit the number of topics per store query +var maxTopicsPerRequest int = 10 + var ErrNoFiltersForChat = errors.New("no filter registered for given chat") func (m *Messenger) shouldSync() (bool, error) { @@ -571,28 +575,39 @@ func (m *Messenger) processMailserverBatch(batch MailserverBatch) error { if err != nil { return err } - cursor, storeCursor, err := m.transport.SendMessagesRequestForTopics(ctx, mailserverID, batch.From, batch.To, nil, nil, batch.Topics, true) - if err != nil { - logger.Error("failed to send request", zap.Error(err)) - return err - } - for len(cursor) != 0 || storeCursor != nil { - logger.Info("retrieved cursor", zap.String("cursor", types.EncodeHex(cursor))) - err = func() error { - ctx, cancel := context.WithTimeout(context.Background(), mailserverRequestTimeout) - defer cancel() + for i := 0; i < len(batch.Topics); i += maxTopicsPerRequest { + j := i + maxTopicsPerRequest + if j > len(batch.Topics) { + j = len(batch.Topics) + } - cursor, storeCursor, err = m.transport.SendMessagesRequestForTopics(ctx, mailserverID, batch.From, batch.To, cursor, storeCursor, batch.Topics, true) + topicsForIteration := batch.Topics[i:j] + + cursor, storeCursor, err := m.transport.SendMessagesRequestForTopics(ctx, mailserverID, batch.From, batch.To, nil, nil, topicsForIteration, true) + if err != nil { + logger.Error("failed to send request", zap.Error(err)) + return err + } + + for len(cursor) != 0 || storeCursor != nil { + logger.Info("retrieved cursor", zap.String("cursor", types.EncodeHex(cursor))) + err = func() error { + ctx, cancel := context.WithTimeout(context.Background(), mailserverRequestTimeout) + defer cancel() + + cursor, storeCursor, err = m.transport.SendMessagesRequestForTopics(ctx, mailserverID, batch.From, batch.To, cursor, storeCursor, topicsForIteration, true) + if err != nil { + return err + } + return nil + }() if err != nil { return err } - return nil - }() - if err != nil { - return err } } + // NOTE(camellos): Disabling for now, not critical and I'd rather take a bit more time // to test it //logger.Info("waiting until message processed")