chore: resume archives import with delay

closes: status-im/status-desktop#10937
This commit is contained in:
Patryk Osmaczko 2023-06-08 12:22:26 +02:00 committed by osmaczko
parent 4c8fd538ba
commit a402b28b9f
2 changed files with 34 additions and 7 deletions

View File

@ -21,6 +21,7 @@ import (
"github.com/google/uuid"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/time/rate"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
@ -142,6 +143,11 @@ type Messenger struct {
cancel context.CancelFunc
importingCommunities map[string]bool
importRateLimiter *rate.Limiter
importDelayer struct {
wait chan struct{}
once sync.Once
}
requestedCommunitiesLock sync.RWMutex
requestedCommunities map[string]*transport.Filter
@ -487,8 +493,13 @@ func NewMessenger(
requestedContactsLock: sync.RWMutex{},
requestedContacts: make(map[string]*transport.Filter),
importingCommunities: make(map[string]bool),
browserDatabase: c.browserDatabase,
httpServer: c.httpServer,
importRateLimiter: rate.NewLimiter(rate.Every(importSlowRate), 1),
importDelayer: struct {
wait chan struct{}
once sync.Once
}{wait: make(chan struct{})},
browserDatabase: c.browserDatabase,
httpServer: c.httpServer,
contractMaker: &contracts.ContractMaker{
RPCClient: c.rpcClient,
},
@ -773,6 +784,7 @@ func (m *Messenger) Start() (*MessengerResponse, error) {
return nil, err
}
}
m.enableHistoryArchivesImportAfterDelay()
if m.httpServer != nil {
err = m.httpServer.Start()

View File

@ -49,12 +49,11 @@ var updateActiveMembersInterval = 24 * time.Hour
const discordTimestampLayout = "2006-01-02T15:04:05+00:00"
var importRateLimiter = rate.NewLimiter(rate.Every(importSlowRate), 1)
const (
importSlowRate = time.Second / 1
importFastRate = time.Second / 100
importMessagesChunkSize = 10
importInitialDelay = time.Minute * 5
)
const (
@ -2334,6 +2333,15 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community
}
}
func (m *Messenger) enableHistoryArchivesImportAfterDelay() {
go func() {
time.Sleep(importInitialDelay)
m.importDelayer.once.Do(func() {
close(m.importDelayer.wait)
})
}()
}
func (m *Messenger) resumeHistoryArchivesImport(communityID types.HexBytes) error {
archiveIDsToImport, err := m.communitiesManager.GetMessageArchiveIDsToImport(communityID)
if err != nil {
@ -2374,11 +2382,11 @@ func (m *Messenger) resumeHistoryArchivesImport(communityID types.HexBytes) erro
}
func (m *Messenger) SpeedupArchivesImport() {
importRateLimiter.SetLimit(rate.Every(importFastRate))
m.importRateLimiter.SetLimit(rate.Every(importFastRate))
}
func (m *Messenger) SlowdownArchivesImport() {
importRateLimiter.SetLimit(rate.Every(importSlowRate))
m.importRateLimiter.SetLimit(rate.Every(importSlowRate))
}
func (m *Messenger) importHistoryArchives(communityID types.HexBytes, cancel chan struct{}) error {
@ -2391,6 +2399,13 @@ func (m *Messenger) importHistoryArchives(communityID types.HexBytes, cancel cha
cancelFunc()
}()
// don't proceed until initial import delay has passed
select {
case <-m.importDelayer.wait:
case <-ctx.Done():
return nil
}
importMessageArchivesLoop:
for {
select {
@ -2425,7 +2440,7 @@ importMessageArchivesLoop:
m.config.messengerSignalsHandler.ImportingHistoryArchiveMessages(types.EncodeHex(communityID))
for _, messagesChunk := range chunkSlice(archiveMessages, importMessagesChunkSize) {
if err := importRateLimiter.Wait(ctx); err != nil {
if err := m.importRateLimiter.Wait(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
m.communitiesManager.LogStdout("rate limiter error when handling archive messages", zap.Error(err))
}