fix: cancel mailserver requests when stopping messenger

This commit is contained in:
Richard Ramos 2023-01-24 12:29:08 -04:00 committed by RichΛrd
parent 4a4ae4c264
commit eb36eb3ee5
2 changed files with 61 additions and 26 deletions

View File

@ -138,7 +138,10 @@ type Messenger struct {
mailserversDatabase *mailserversDB.Database mailserversDatabase *mailserversDB.Database
browserDatabase *browsers.Database browserDatabase *browsers.Database
httpServer *server.MediaServer httpServer *server.MediaServer
quit chan struct{}
quit chan struct{}
ctx context.Context
cancel context.CancelFunc
importingCommunities map[string]bool importingCommunities map[string]bool
@ -423,6 +426,8 @@ func NewMessenger(
savedAddressesManager := wallet.NewSavedAddressesManager(c.db) savedAddressesManager := wallet.NewSavedAddressesManager(c.db)
ctx, cancel := context.WithCancel(context.Background())
messenger = &Messenger{ messenger = &Messenger{
config: &c, config: &c,
node: node, node: node,
@ -459,6 +464,8 @@ func NewMessenger(
mailserversDatabase: c.mailserversDatabase, mailserversDatabase: c.mailserversDatabase,
account: c.account, account: c.account,
quit: make(chan struct{}), quit: make(chan struct{}),
ctx: ctx,
cancel: cancel,
requestedCommunitiesLock: sync.RWMutex{}, requestedCommunitiesLock: sync.RWMutex{},
requestedCommunities: make(map[string]*transport.Filter), requestedCommunities: make(map[string]*transport.Filter),
importingCommunities: make(map[string]bool), importingCommunities: make(map[string]bool),
@ -1571,6 +1578,7 @@ func (m *Messenger) Init() error {
// Shutdown takes care of ensuring a clean shutdown of Messenger // Shutdown takes care of ensuring a clean shutdown of Messenger
func (m *Messenger) Shutdown() (err error) { func (m *Messenger) Shutdown() (err error) {
close(m.quit) close(m.quit)
m.cancel()
m.downloadHistoryArchiveTasksWaitGroup.Wait() m.downloadHistoryArchiveTasksWaitGroup.Wait()
for i, task := range m.shutdownTasks { for i, task := range m.shutdownTasks {
m.logger.Debug("running shutdown task", zap.Int("n", i)) m.logger.Debug("running shutdown task", zap.Int("n", i))

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"sort" "sort"
"sync"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -550,6 +551,12 @@ func (m *Messenger) calculateGapForChat(chat *Chat, from uint32) (*common.Messag
return message, m.persistence.SaveMessages([]*common.Message{message}) return message, m.persistence.SaveMessages([]*common.Message{message})
} }
type work struct {
topics []types.TopicType
cursor []byte
storeCursor *types.StoreRequestCursor
}
func (m *Messenger) processMailserverBatch(batch MailserverBatch) error { func (m *Messenger) processMailserverBatch(batch MailserverBatch) error {
var topicStrings []string var topicStrings []string
for _, t := range batch.Topics { for _, t := range batch.Topics {
@ -557,46 +564,66 @@ func (m *Messenger) processMailserverBatch(batch MailserverBatch) error {
} }
logger := m.logger.With(zap.Any("chatIDs", batch.ChatIDs), zap.String("fromString", time.Unix(int64(batch.From), 0).Format(time.RFC3339)), zap.String("toString", time.Unix(int64(batch.To), 0).Format(time.RFC3339)), zap.Any("topic", topicStrings), zap.Int64("from", int64(batch.From)), zap.Int64("to", int64(batch.To))) logger := m.logger.With(zap.Any("chatIDs", batch.ChatIDs), zap.String("fromString", time.Unix(int64(batch.From), 0).Format(time.RFC3339)), zap.String("toString", time.Unix(int64(batch.To), 0).Format(time.RFC3339)), zap.Any("topic", topicStrings), zap.Int64("from", int64(batch.From)), zap.Int64("to", int64(batch.To)))
logger.Info("syncing topic") logger.Info("syncing topic")
ctx, cancel := context.WithTimeout(context.Background(), mailserverRequestTimeout)
defer cancel()
mailserverID, err := m.activeMailserverID() mailserverID, err := m.activeMailserverID()
if err != nil { if err != nil {
return err return err
} }
wg := sync.WaitGroup{}
workCh := make(chan work, 100)
go func() {
for {
select {
case <-m.ctx.Done():
return
case w, ok := <-workCh:
if !ok {
return
}
ctx, cancel := context.WithTimeout(m.ctx, mailserverRequestTimeout)
cursor, storeCursor, err := m.transport.SendMessagesRequestForTopics(ctx, mailserverID, batch.From, batch.To, nil, nil, w.topics, true)
if err != nil {
logger.Error("failed to send request", zap.Error(err))
wg.Done()
cancel()
return
}
cancel()
if len(cursor) != 0 || storeCursor != nil {
logger.Info("query has cursor", zap.String("cursorV1", types.EncodeHex(cursor)))
workCh <- work{
topics: w.topics,
cursor: cursor,
storeCursor: storeCursor,
}
} else {
wg.Done() // We are done with this topic and its cursor
}
}
}
}()
for i := 0; i < len(batch.Topics); i += maxTopicsPerRequest { for i := 0; i < len(batch.Topics); i += maxTopicsPerRequest {
j := i + maxTopicsPerRequest j := i + maxTopicsPerRequest
if j > len(batch.Topics) { if j > len(batch.Topics) {
j = len(batch.Topics) j = len(batch.Topics)
} }
topicsForIteration := batch.Topics[i:j] wg.Add(1)
workCh <- work{
cursor, storeCursor, err := m.transport.SendMessagesRequestForTopics(ctx, mailserverID, batch.From, batch.To, nil, nil, topicsForIteration, true) topics: batch.Topics[i:j],
if err != nil {
logger.Error("failed to send request", zap.Error(err))
return err
}
for len(cursor) != 0 || storeCursor != nil {
logger.Info("retrieved cursor", zap.String("cursor", types.EncodeHex(cursor)))
err = func() error {
ctx, cancel := context.WithTimeout(context.Background(), mailserverRequestTimeout)
defer cancel()
cursor, storeCursor, err = m.transport.SendMessagesRequestForTopics(ctx, mailserverID, batch.From, batch.To, cursor, storeCursor, topicsForIteration, true)
if err != nil {
return err
}
return nil
}()
if err != nil {
return err
}
} }
} }
wg.Wait()
close(workCh)
// NOTE(camellos): Disabling for now, not critical and I'd rather take a bit more time // NOTE(camellos): Disabling for now, not critical and I'd rather take a bit more time
// to test it // to test it
//logger.Info("waiting until message processed") //logger.Info("waiting until message processed")