diff --git a/VERSION b/VERSION index 029dcabe1..22e1f1fc0 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.148.3 +0.149.0 diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index fca37f6d4..6f3e7c15c 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -3,6 +3,7 @@ package protocol import ( "context" "fmt" + "math" "sort" "sync" "time" @@ -611,72 +612,117 @@ func processMailserverBatch(ctx context.Context, messageRequester messageRequest logger.Info("syncing topic") wg := sync.WaitGroup{} - workCh := make(chan work, 100) + workWg := sync.WaitGroup{} + workCh := make(chan work, 1000) // each batch item is split in 10 topics bunch and sent to this channel + workCompleteCh := make(chan struct{}) // once all batch items are processed, this channel is triggered + semaphore := make(chan int, 3) // limit the number of concurrent queries errCh := make(chan error) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Producer + wg.Add(1) go func() { defer func() { - close(errCh) + logger.Debug("mailserver batch producer complete") + wg.Done() }() - for { + allWorks := int(math.Ceil(float64(len(batch.Topics)) / float64(maxTopicsPerRequest))) + workWg.Add(allWorks) + + for i := 0; i < len(batch.Topics); i += maxTopicsPerRequest { + j := i + maxTopicsPerRequest + if j > len(batch.Topics) { + j = len(batch.Topics) + } + select { case <-ctx.Done(): + logger.Debug("processBatch producer - context done") return - case w, ok := <-workCh: - if !ok { - return + default: + logger.Debug("processBatch producer - creating work") + workCh <- work{ + topics: batch.Topics[i:j], } + time.Sleep(50 * time.Millisecond) + } + } + + go func() { + workWg.Wait() + workCompleteCh <- struct{}{} + }() + + logger.Debug("processBatch producer complete") + }() + + var result error + +loop: + for { + select { + case <-ctx.Done(): + logger.Debug("processBatch cleanup - context done") + result = ctx.Err() + if errors.Is(result, context.Canceled) { + result = nil + } + break loop + case w, ok := <-workCh: + if !ok { + continue + } + + logger.Debug("processBatch - received work") + semaphore <- 1 + go func(w work) { // Consumer + defer func() { + workWg.Done() + <-semaphore + }() + + queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout) + cursor, storeCursor, err := messageRequester.SendMessagesRequestForTopics(queryCtx, mailserverID, batch.From, batch.To, w.cursor, w.storeCursor, w.topics, true) + + queryCancel() - ctx, cancel := context.WithTimeout(ctx, mailserverRequestTimeout) - cursor, storeCursor, err := messageRequester.SendMessagesRequestForTopics(ctx, mailserverID, batch.From, batch.To, w.cursor, w.storeCursor, w.topics, true) if err != nil { - logger.Error("failed to send request", zap.Error(err)) - wg.Done() - cancel() + logger.Debug("failed to send request", zap.Error(err)) errCh <- err return } - cancel() - if len(cursor) != 0 || storeCursor != nil { - logger.Info("query has cursor", zap.String("cursorV1", types.EncodeHex(cursor))) + logger.Debug("processBatch producer - creating work (cursor)") + + workWg.Add(1) workCh <- work{ topics: w.topics, cursor: cursor, storeCursor: storeCursor, } - } else { - wg.Done() // We are done with this topic and its cursor } - - } - } - }() - - for i := 0; i < len(batch.Topics); i += maxTopicsPerRequest { - j := i + maxTopicsPerRequest - if j > len(batch.Topics) { - j = len(batch.Topics) - } - - wg.Add(1) - workCh <- work{ - topics: batch.Topics[i:j], + }(w) + case err := <-errCh: + logger.Debug("processBatch - received error", zap.Error(err)) + cancel() // Kill go routines + return err + case <-workCompleteCh: + logger.Debug("processBatch - all jobs complete") + cancel() // Kill go routines } } wg.Wait() - close(workCh) // NOTE(camellos): Disabling for now, not critical and I'd rather take a bit more time // to test it //logger.Info("waiting until message processed") //m.waitUntilP2PMessagesProcessed() - result := <-errCh - logger.Info("synced topic", zap.NamedError("hasError", result)) return result }