diff --git a/protocol/messenger.go b/protocol/messenger.go index 7cb6e3db2..0fb0214f8 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -138,7 +138,10 @@ type Messenger struct { mailserversDatabase *mailserversDB.Database browserDatabase *browsers.Database httpServer *server.MediaServer - quit chan struct{} + + quit chan struct{} + ctx context.Context + cancel context.CancelFunc importingCommunities map[string]bool @@ -423,6 +426,8 @@ func NewMessenger( savedAddressesManager := wallet.NewSavedAddressesManager(c.db) + ctx, cancel := context.WithCancel(context.Background()) + messenger = &Messenger{ config: &c, node: node, @@ -459,6 +464,8 @@ func NewMessenger( mailserversDatabase: c.mailserversDatabase, account: c.account, quit: make(chan struct{}), + ctx: ctx, + cancel: cancel, requestedCommunitiesLock: sync.RWMutex{}, requestedCommunities: make(map[string]*transport.Filter), importingCommunities: make(map[string]bool), @@ -1571,6 +1578,7 @@ func (m *Messenger) Init() error { // Shutdown takes care of ensuring a clean shutdown of Messenger func (m *Messenger) Shutdown() (err error) { close(m.quit) + m.cancel() m.downloadHistoryArchiveTasksWaitGroup.Wait() for i, task := range m.shutdownTasks { m.logger.Debug("running shutdown task", zap.Int("n", i)) diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index 2d81ea593..46d620ccc 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sort" + "sync" "time" "github.com/pkg/errors" @@ -550,6 +551,12 @@ func (m *Messenger) calculateGapForChat(chat *Chat, from uint32) (*common.Messag return message, m.persistence.SaveMessages([]*common.Message{message}) } +type work struct { + topics []types.TopicType + cursor []byte + storeCursor *types.StoreRequestCursor +} + func (m *Messenger) processMailserverBatch(batch MailserverBatch) error { var topicStrings []string for _, t := range batch.Topics { @@ -557,46 +564,66 @@ func (m *Messenger) processMailserverBatch(batch MailserverBatch) error { } logger := m.logger.With(zap.Any("chatIDs", batch.ChatIDs), zap.String("fromString", time.Unix(int64(batch.From), 0).Format(time.RFC3339)), zap.String("toString", time.Unix(int64(batch.To), 0).Format(time.RFC3339)), zap.Any("topic", topicStrings), zap.Int64("from", int64(batch.From)), zap.Int64("to", int64(batch.To))) logger.Info("syncing topic") - ctx, cancel := context.WithTimeout(context.Background(), mailserverRequestTimeout) - defer cancel() mailserverID, err := m.activeMailserverID() if err != nil { return err } + wg := sync.WaitGroup{} + workCh := make(chan work, 100) + + go func() { + for { + select { + case <-m.ctx.Done(): + return + case w, ok := <-workCh: + if !ok { + return + } + + ctx, cancel := context.WithTimeout(m.ctx, mailserverRequestTimeout) + cursor, storeCursor, err := m.transport.SendMessagesRequestForTopics(ctx, mailserverID, batch.From, batch.To, nil, nil, w.topics, true) + if err != nil { + logger.Error("failed to send request", zap.Error(err)) + wg.Done() + cancel() + return + } + + cancel() + + if len(cursor) != 0 || storeCursor != nil { + logger.Info("query has cursor", zap.String("cursorV1", types.EncodeHex(cursor))) + workCh <- work{ + topics: w.topics, + cursor: cursor, + storeCursor: storeCursor, + } + } else { + wg.Done() // We are done with this topic and its cursor + } + + } + } + }() + for i := 0; i < len(batch.Topics); i += maxTopicsPerRequest { j := i + maxTopicsPerRequest if j > len(batch.Topics) { j = len(batch.Topics) } - topicsForIteration := batch.Topics[i:j] - - cursor, storeCursor, err := m.transport.SendMessagesRequestForTopics(ctx, mailserverID, batch.From, batch.To, nil, nil, topicsForIteration, true) - if err != nil { - logger.Error("failed to send request", zap.Error(err)) - return err - } - - for len(cursor) != 0 || storeCursor != nil { - logger.Info("retrieved cursor", zap.String("cursor", types.EncodeHex(cursor))) - err = func() error { - ctx, cancel := context.WithTimeout(context.Background(), mailserverRequestTimeout) - defer cancel() - - cursor, storeCursor, err = m.transport.SendMessagesRequestForTopics(ctx, mailserverID, batch.From, batch.To, cursor, storeCursor, topicsForIteration, true) - if err != nil { - return err - } - return nil - }() - if err != nil { - return err - } + wg.Add(1) + workCh <- work{ + topics: batch.Topics[i:j], } } + wg.Wait() + close(workCh) + // NOTE(camellos): Disabling for now, not critical and I'd rather take a bit more time // to test it //logger.Info("waiting until message processed")