From fe5235269024c5b7ca291d69b18678862a9a188d Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 20 Nov 2024 14:50:51 -0400 Subject: [PATCH] fix_: ensure storenode requests do not exceed 24h --- protocol/messenger_mailserver.go | 71 +++++++++++--------------------- 1 file changed, 24 insertions(+), 47 deletions(-) diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index 4af9129f2..88facd0b3 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -13,6 +13,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" "go.uber.org/zap" + "golang.org/x/exp/maps" gocommon "github.com/status-im/status-go/common" "github.com/status-im/status-go/connection" @@ -579,55 +580,15 @@ func (m *Messenger) syncFiltersFrom(ms mailservers.Mailserver, filters []*transp m.config.messengerSignalsHandler.HistoryRequestStarted(len(batches)) } - var batches24h []MailserverBatch for pubsubTopic := range batches { - batchKeys := make([]int, 0, len(batches[pubsubTopic])) - for k := range batches[pubsubTopic] { - batchKeys = append(batchKeys, k) - } + batchKeys := maps.Keys(batches[pubsubTopic]) sort.Ints(batchKeys) - - keysToIterate := append([]int{}, batchKeys...) - for { - // For all batches - var tmpKeysToIterate []int - for _, k := range keysToIterate { - batch := batches[pubsubTopic][k] - - dayBatch := MailserverBatch{ - To: batch.To, - Cursor: batch.Cursor, - PubsubTopic: batch.PubsubTopic, - Topics: batch.Topics, - ChatIDs: batch.ChatIDs, - } - - from := batch.To - uint32(oneDayDuration.Seconds()) - if from > batch.From { - dayBatch.From = from - batches24h = append(batches24h, dayBatch) - - // Replace og batch with new dates - batch.To = from - batches[pubsubTopic][k] = batch - tmpKeysToIterate = append(tmpKeysToIterate, k) - } else { - batches24h = append(batches24h, batch) - } + for _, k := range batchKeys { + err := m.processMailserverBatch(ms, batches[pubsubTopic][k]) + if err != nil { + m.logger.Error("error syncing topics", zap.Error(err)) + return nil, err } - - if len(tmpKeysToIterate) == 0 { - break - } - keysToIterate = tmpKeysToIterate - } - } - - for _, batch := range batches24h { - err := m.processMailserverBatch(ms, batch) - if err != nil { - m.logger.Error("error syncing topics", zap.Error(err)) - return nil, err } } @@ -727,6 +688,8 @@ type work struct { contentTopics []types.TopicType cursor types.StoreRequestCursor limit uint32 + from uint32 + to uint32 } type messageRequester interface { @@ -805,6 +768,8 @@ func processMailserverBatch( pubsubTopic: batch.PubsubTopic, contentTopics: batch.Topics[i:j], limit: pageLimit, + from: batch.From, + to: batch.To, } time.Sleep(50 * time.Millisecond) } @@ -846,7 +811,17 @@ loop: }() queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout) - cursor, envelopesCount, err := messageRequester.SendMessagesRequestForTopics(queryCtx, storenodeID, batch.From, batch.To, w.cursor, w.pubsubTopic, w.contentTopics, w.limit, true, processEnvelopes) + + // If time range is greater than 24 hours, limit the range: to - (to-24h) + from := w.from + to := w.to + nextWorkTo := to + if batch.To-batch.From > uint32(oneDayDuration.Seconds()) { + from = to - uint32(oneDayDuration.Seconds()) + nextWorkTo = from + } + + cursor, envelopesCount, err := messageRequester.SendMessagesRequestForTopics(queryCtx, storenodeID, from, to, w.cursor, w.pubsubTopic, w.contentTopics, w.limit, true, processEnvelopes) queryCancel() if err != nil { @@ -880,6 +855,8 @@ loop: contentTopics: w.contentTopics, cursor: cursor, limit: nextPageLimit, + from: w.from, + to: nextWorkTo, } }(w) case err := <-errCh: