feat: add configurable throttling mechanism for importing msgs

part of: status-im/status-desktop#10815
This commit is contained in:
Patryk Osmaczko 2023-06-01 22:02:34 +02:00 committed by osmaczko
parent 449314a4dc
commit 15d2b4fe80
5 changed files with 129 additions and 153 deletions

2
go.mod
View File

@ -88,6 +88,7 @@ require (
go.uber.org/multierr v1.11.0
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
golang.org/x/net v0.8.0
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af
)
require (
@ -267,7 +268,6 @@ require (
golang.org/x/sys v0.7.0 // indirect
golang.org/x/term v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect
golang.org/x/tools v0.7.0 // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect

View File

@ -3234,6 +3234,7 @@ func (m *Manager) ExtractMessagesFromHistoryArchive(communityID types.HexBytes,
}
data := make([]byte, metadata.Size-metadata.Padding)
m.LogStdout("loading history archive data into memory", zap.Float64("data_size_MB", float64(metadata.Size-metadata.Padding)/1024.0/1024.0))
_, err = dataFile.Read(data)
if err != nil {
m.LogStdout("failed failed to read archive data", zap.Error(err))

View File

@ -66,9 +66,6 @@ import (
"github.com/status-im/status-go/telemetry"
)
const maxChunkSizeMessages = 1000
const maxChunkSizeBytes = 1500000
// todo: kozieiev: get rid of wakutransp word
type chatContext string
@ -3280,96 +3277,43 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter
}
importMessagesToSave := messageState.Response.DiscordMessages()
importMessagesCount := len(importMessagesToSave)
if importMessagesCount > 0 {
if importMessagesCount <= maxChunkSizeMessages {
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d discord messages", importMessagesCount))
m.handleImportMessagesMutex.Lock()
err := m.persistence.SaveDiscordMessages(importMessagesToSave)
if err != nil {
m.communitiesManager.LogStdout("failed to save discord messages", zap.Error(err))
m.handleImportMessagesMutex.Unlock()
return err
}
if len(importMessagesToSave) > 0 {
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d discord messages", len(importMessagesToSave)))
m.handleImportMessagesMutex.Lock()
err := m.persistence.SaveDiscordMessages(importMessagesToSave)
if err != nil {
m.communitiesManager.LogStdout("failed to save discord messages", zap.Error(err))
m.handleImportMessagesMutex.Unlock()
} else {
// We need to process the messages in chunks otherwise we'll
// block the database for too long
chunks := chunkSlice(importMessagesToSave, maxChunkSizeMessages)
chunksCount := len(chunks)
for i, msgs := range chunks {
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord messages", i+1, chunksCount, len(msgs)))
// We can't defer Unlock here because we want to
// unlock after every iteration to leave room for
// other processes to access the database
m.handleImportMessagesMutex.Lock()
err := m.persistence.SaveDiscordMessages(msgs)
if err != nil {
m.communitiesManager.LogStdout(fmt.Sprintf("failed to save discord message chunk %d of %d", i+1, chunksCount), zap.Error(err))
m.handleImportMessagesMutex.Unlock()
return err
}
m.handleImportMessagesMutex.Unlock()
// We slow down the saving of message chunks to keep the database responsive
if i < chunksCount-1 {
time.Sleep(2 * time.Second)
}
}
return err
}
m.handleImportMessagesMutex.Unlock()
}
messageAttachmentsToSave := messageState.Response.DiscordMessageAttachments()
if len(messageAttachmentsToSave) > 0 {
chunks := chunkAttachmentsByByteSize(messageAttachmentsToSave, maxChunkSizeBytes)
chunksCount := len(chunks)
for i, attachments := range chunks {
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord message attachments", i+1, chunksCount, len(attachments)))
m.handleImportMessagesMutex.Lock()
err := m.persistence.SaveDiscordMessageAttachments(attachments)
if err != nil {
m.communitiesManager.LogStdout(fmt.Sprintf("failed to save discord message attachments chunk %d of %d", i+1, chunksCount), zap.Error(err))
m.handleImportMessagesMutex.Unlock()
return err
}
// We slow down the saving of message chunks to keep the database responsive
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d discord message attachments", len(messageAttachmentsToSave)))
m.handleImportMessagesMutex.Lock()
err := m.persistence.SaveDiscordMessageAttachments(messageAttachmentsToSave)
if err != nil {
m.communitiesManager.LogStdout("failed to save discord message attachments", zap.Error(err))
m.handleImportMessagesMutex.Unlock()
if i < chunksCount-1 {
time.Sleep(2 * time.Second)
}
return err
}
m.handleImportMessagesMutex.Unlock()
}
messagesToSave := messageState.Response.Messages()
messagesCount := len(messagesToSave)
if messagesCount > 0 {
if messagesCount <= maxChunkSizeMessages {
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d app messages", messagesCount))
m.handleMessagesMutex.Lock()
err := m.SaveMessages(messagesToSave)
if err != nil {
m.handleMessagesMutex.Unlock()
return err
}
if len(messagesToSave) > 0 {
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d app messages", len(messagesToSave)))
m.handleMessagesMutex.Lock()
err := m.SaveMessages(messagesToSave)
if err != nil {
m.handleMessagesMutex.Unlock()
} else {
chunks := chunkSlice(messagesToSave, maxChunkSizeMessages)
chunksCount := len(chunks)
for i, msgs := range chunks {
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d app messages", i+1, chunksCount, len(msgs)))
m.handleMessagesMutex.Lock()
err := m.SaveMessages(msgs)
if err != nil {
m.handleMessagesMutex.Unlock()
return err
}
m.handleMessagesMutex.Unlock()
// We slow down the saving of message chunks to keep the database responsive
if i < chunksCount-1 {
time.Sleep(2 * time.Second)
}
}
return err
}
m.handleMessagesMutex.Unlock()
}
// Save chats if they were modified
if len(messageState.Response.chats) > 0 {
err := m.saveChats(messageState.Response.Chats())
@ -4478,28 +4422,10 @@ func (m *Messenger) saveDataAndPrepareResponse(messageState *ReceivedMessageStat
}
messagesToSave := messageState.Response.Messages()
messagesCount := len(messagesToSave)
if messagesCount > 0 {
if messagesCount <= maxChunkSizeMessages {
err = m.SaveMessages(messagesToSave)
if err != nil {
return nil, err
}
} else {
messageChunks := chunkSlice(messagesToSave, maxChunkSizeMessages)
chunksCount := len(messageChunks)
for i, msgs := range messageChunks {
err := m.SaveMessages(msgs)
if err != nil {
return nil, err
}
// We slow down the saving of message chunks to keep the database responsive
// this is important when messages from history archives are handled,
// which could result in handling several thousand messages per archive
if i < chunksCount-1 {
time.Sleep(2 * time.Second)
}
}
if len(messagesToSave) > 0 {
err = m.SaveMessages(messagesToSave)
if err != nil {
return nil, err
}
}
@ -6292,45 +6218,6 @@ func (m *Messenger) handleSyncVerificationRequest(state *ReceivedMessageState, m
return nil
}
func chunkSlice[T comparable](slice []T, chunkSize int) [][]T {
var chunks [][]T
for i := 0; i < len(slice); i += chunkSize {
end := i + chunkSize
// necessary check to avoid slicing beyond
// slice capacity
if end > len(slice) {
end = len(slice)
}
chunks = append(chunks, slice[i:end])
}
return chunks
}
func chunkAttachmentsByByteSize(slice []*protobuf.DiscordMessageAttachment, maxFileSizeBytes uint64) [][]*protobuf.DiscordMessageAttachment {
var chunks [][]*protobuf.DiscordMessageAttachment
currentChunkSize := uint64(0)
currentChunk := make([]*protobuf.DiscordMessageAttachment, 0)
for i, attachment := range slice {
payloadBytes := attachment.GetFileSizeBytes()
if currentChunkSize+payloadBytes > maxFileSizeBytes && len(currentChunk) > 0 {
chunks = append(chunks, currentChunk)
currentChunk = make([]*protobuf.DiscordMessageAttachment, 0)
currentChunkSize = uint64(0)
}
currentChunk = append(currentChunk, attachment)
currentChunkSize = currentChunkSize + payloadBytes
if i == len(slice)-1 {
chunks = append(chunks, currentChunk)
}
}
return chunks
}
func (m *Messenger) ImageServerURL() string {
return m.httpServer.MakeImageServerURL()
}

View File

@ -12,6 +12,8 @@ import (
"sync"
"time"
"golang.org/x/time/rate"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/ethclient"
@ -47,6 +49,19 @@ var updateActiveMembersInterval = 24 * time.Hour
const discordTimestampLayout = "2006-01-02T15:04:05+00:00"
var importRateLimiter = rate.NewLimiter(rate.Every(importSlowRate), 1)
const (
importSlowRate = time.Second / 1
importFastRate = time.Second / 100
importMessagesChunkSize = 10
)
const (
maxChunkSizeMessages = 1000
maxChunkSizeBytes = 1500000
)
func (m *Messenger) publishOrg(org *communities.Community) error {
m.logger.Debug("publishing org", zap.String("org-id", org.IDString()), zap.Any("org", org))
payload, err := org.MarshaledDescription()
@ -2309,14 +2324,28 @@ func (m *Messenger) resumeHistoryArchivesImport(communityID types.HexBytes) erro
return nil
}
func (m *Messenger) SpeedupArchivesImport() {
importRateLimiter.SetLimit(rate.Every(importFastRate))
}
func (m *Messenger) SlowdownArchivesImport() {
importRateLimiter.SetLimit(rate.Every(importSlowRate))
}
func (m *Messenger) importHistoryArchives(communityID types.HexBytes, cancel chan struct{}) error {
importTicker := time.NewTicker(100 * time.Millisecond)
defer importTicker.Stop()
ctx, cancelFunc := context.WithCancel(context.Background())
go func() {
<-cancel
cancelFunc()
}()
importMessageArchivesLoop:
for {
select {
case <-cancel:
case <-ctx.Done():
m.communitiesManager.LogStdout("interrupted importing history archive messages")
return nil
case <-importTicker.C:
@ -2345,10 +2374,27 @@ importMessageArchivesLoop:
}
m.config.messengerSignalsHandler.ImportingHistoryArchiveMessages(types.EncodeHex(communityID))
response, err := m.handleArchiveMessages(archiveMessages, communityID)
if err != nil {
m.communitiesManager.LogStdout("failed to handle archive messages", zap.Error(err))
continue
for _, messagesChunk := range chunkSlice(archiveMessages, importMessagesChunkSize) {
if err := importRateLimiter.Wait(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
m.communitiesManager.LogStdout("rate limiter error when handling archive messages", zap.Error(err))
}
continue importMessageArchivesLoop
}
response, err := m.handleArchiveMessages(messagesChunk, communityID)
if err != nil {
m.communitiesManager.LogStdout("failed to handle archive messages", zap.Error(err))
continue importMessageArchivesLoop
}
if !response.IsEmpty() {
notifications := response.Notifications()
response.ClearNotifications()
signal.SendNewMessages(response)
localnotifications.PushMessages(notifications)
}
}
err = m.communitiesManager.SetMessageArchiveIDImported(communityID, downloadedArchiveID, true)
@ -2356,13 +2402,6 @@ importMessageArchivesLoop:
m.communitiesManager.LogStdout("failed to mark history message archive as imported", zap.Error(err))
continue
}
if !response.IsEmpty() {
notifications := response.Notifications()
response.ClearNotifications()
signal.SendNewMessages(response)
localnotifications.PushMessages(notifications)
}
}
}
return nil
@ -3640,3 +3679,42 @@ func (m *Messenger) CheckPermissionsToJoinCommunity(request *requests.CheckPermi
return m.communitiesManager.CheckPermissionToJoin(request.CommunityID, addresses)
}
func chunkSlice[T comparable](slice []T, chunkSize int) [][]T {
var chunks [][]T
for i := 0; i < len(slice); i += chunkSize {
end := i + chunkSize
// necessary check to avoid slicing beyond
// slice capacity
if end > len(slice) {
end = len(slice)
}
chunks = append(chunks, slice[i:end])
}
return chunks
}
func chunkAttachmentsByByteSize(slice []*protobuf.DiscordMessageAttachment, maxFileSizeBytes uint64) [][]*protobuf.DiscordMessageAttachment {
var chunks [][]*protobuf.DiscordMessageAttachment
currentChunkSize := uint64(0)
currentChunk := make([]*protobuf.DiscordMessageAttachment, 0)
for i, attachment := range slice {
payloadBytes := attachment.GetFileSizeBytes()
if currentChunkSize+payloadBytes > maxFileSizeBytes && len(currentChunk) > 0 {
chunks = append(chunks, currentChunk)
currentChunk = make([]*protobuf.DiscordMessageAttachment, 0)
currentChunkSize = uint64(0)
}
currentChunk = append(currentChunk, attachment)
currentChunkSize = currentChunkSize + payloadBytes
if i == len(slice)-1 {
chunks = append(chunks, currentChunk)
}
}
return chunks
}

View File

@ -440,6 +440,16 @@ func (api *PublicAPI) ImportCommunity(ctx context.Context, hexPrivateKey string)
}
// Speeds up importing messages from archives
func (api *PublicAPI) SpeedupArchivesImport(ctx context.Context) {
api.service.messenger.SpeedupArchivesImport()
}
// Slows down importing messages from archives
func (api *PublicAPI) SlowdownArchivesImport(ctx context.Context) {
api.service.messenger.SlowdownArchivesImport()
}
// CreateCommunityChat creates a community chat in the given community
func (api *PublicAPI) CreateCommunityChat(communityID types.HexBytes, c *protobuf.CommunityChat) (*protocol.MessengerResponse, error) {
return api.service.messenger.CreateCommunityChat(communityID, c)