fix_: ensure storenode requests do not exceed 24h

This commit is contained in:
Richard Ramos 2024-11-20 14:50:51 -04:00
parent 11cf42bedd
commit fe52352690
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
1 changed files with 24 additions and 47 deletions

View File

@ -13,6 +13,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors" "github.com/pkg/errors"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/exp/maps"
gocommon "github.com/status-im/status-go/common" gocommon "github.com/status-im/status-go/common"
"github.com/status-im/status-go/connection" "github.com/status-im/status-go/connection"
@ -579,57 +580,17 @@ func (m *Messenger) syncFiltersFrom(ms mailservers.Mailserver, filters []*transp
m.config.messengerSignalsHandler.HistoryRequestStarted(len(batches)) m.config.messengerSignalsHandler.HistoryRequestStarted(len(batches))
} }
var batches24h []MailserverBatch
for pubsubTopic := range batches { for pubsubTopic := range batches {
batchKeys := make([]int, 0, len(batches[pubsubTopic])) batchKeys := maps.Keys(batches[pubsubTopic])
for k := range batches[pubsubTopic] {
batchKeys = append(batchKeys, k)
}
sort.Ints(batchKeys) sort.Ints(batchKeys)
for _, k := range batchKeys {
keysToIterate := append([]int{}, batchKeys...) err := m.processMailserverBatch(ms, batches[pubsubTopic][k])
for {
// For all batches
var tmpKeysToIterate []int
for _, k := range keysToIterate {
batch := batches[pubsubTopic][k]
dayBatch := MailserverBatch{
To: batch.To,
Cursor: batch.Cursor,
PubsubTopic: batch.PubsubTopic,
Topics: batch.Topics,
ChatIDs: batch.ChatIDs,
}
from := batch.To - uint32(oneDayDuration.Seconds())
if from > batch.From {
dayBatch.From = from
batches24h = append(batches24h, dayBatch)
// Replace og batch with new dates
batch.To = from
batches[pubsubTopic][k] = batch
tmpKeysToIterate = append(tmpKeysToIterate, k)
} else {
batches24h = append(batches24h, batch)
}
}
if len(tmpKeysToIterate) == 0 {
break
}
keysToIterate = tmpKeysToIterate
}
}
for _, batch := range batches24h {
err := m.processMailserverBatch(ms, batch)
if err != nil { if err != nil {
m.logger.Error("error syncing topics", zap.Error(err)) m.logger.Error("error syncing topics", zap.Error(err))
return nil, err return nil, err
} }
} }
}
m.logger.Debug("topics synced") m.logger.Debug("topics synced")
if m.config.messengerSignalsHandler != nil { if m.config.messengerSignalsHandler != nil {
@ -727,6 +688,8 @@ type work struct {
contentTopics []types.TopicType contentTopics []types.TopicType
cursor types.StoreRequestCursor cursor types.StoreRequestCursor
limit uint32 limit uint32
from uint32
to uint32
} }
type messageRequester interface { type messageRequester interface {
@ -805,6 +768,8 @@ func processMailserverBatch(
pubsubTopic: batch.PubsubTopic, pubsubTopic: batch.PubsubTopic,
contentTopics: batch.Topics[i:j], contentTopics: batch.Topics[i:j],
limit: pageLimit, limit: pageLimit,
from: batch.From,
to: batch.To,
} }
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
} }
@ -846,7 +811,17 @@ loop:
}() }()
queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout) queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout)
cursor, envelopesCount, err := messageRequester.SendMessagesRequestForTopics(queryCtx, storenodeID, batch.From, batch.To, w.cursor, w.pubsubTopic, w.contentTopics, w.limit, true, processEnvelopes)
// If time range is greater than 24 hours, limit the range: to - (to-24h)
from := w.from
to := w.to
nextWorkTo := to
if batch.To-batch.From > uint32(oneDayDuration.Seconds()) {
from = to - uint32(oneDayDuration.Seconds())
nextWorkTo = from
}
cursor, envelopesCount, err := messageRequester.SendMessagesRequestForTopics(queryCtx, storenodeID, from, to, w.cursor, w.pubsubTopic, w.contentTopics, w.limit, true, processEnvelopes)
queryCancel() queryCancel()
if err != nil { if err != nil {
@ -880,6 +855,8 @@ loop:
contentTopics: w.contentTopics, contentTopics: w.contentTopics,
cursor: cursor, cursor: cursor,
limit: nextPageLimit, limit: nextPageLimit,
from: w.from,
to: nextWorkTo,
} }
}(w) }(w)
case err := <-errCh: case err := <-errCh: