refactor: improve mailserver batch handling
This commit is contained in:
parent
2ec97ee861
commit
975883407f
|
@ -3,6 +3,7 @@ package protocol
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -611,72 +612,117 @@ func processMailserverBatch(ctx context.Context, messageRequester messageRequest
|
||||||
logger.Info("syncing topic")
|
logger.Info("syncing topic")
|
||||||
|
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
workCh := make(chan work, 100)
|
workWg := sync.WaitGroup{}
|
||||||
|
workCh := make(chan work, 1000) // each batch item is split in 10 topics bunch and sent to this channel
|
||||||
|
workCompleteCh := make(chan struct{}) // once all batch items are processed, this channel is triggered
|
||||||
|
semaphore := make(chan int, 3) // limit the number of concurrent queries
|
||||||
errCh := make(chan error)
|
errCh := make(chan error)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Producer
|
||||||
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer func() {
|
defer func() {
|
||||||
close(errCh)
|
logger.Debug("mailserver batch producer complete")
|
||||||
}()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case w, ok := <-workCh:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, mailserverRequestTimeout)
|
|
||||||
cursor, storeCursor, err := messageRequester.SendMessagesRequestForTopics(ctx, mailserverID, batch.From, batch.To, w.cursor, w.storeCursor, w.topics, true)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("failed to send request", zap.Error(err))
|
|
||||||
wg.Done()
|
wg.Done()
|
||||||
cancel()
|
|
||||||
errCh <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
cancel()
|
|
||||||
|
|
||||||
if len(cursor) != 0 || storeCursor != nil {
|
|
||||||
logger.Info("query has cursor", zap.String("cursorV1", types.EncodeHex(cursor)))
|
|
||||||
workCh <- work{
|
|
||||||
topics: w.topics,
|
|
||||||
cursor: cursor,
|
|
||||||
storeCursor: storeCursor,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
wg.Done() // We are done with this topic and its cursor
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
allWorks := int(math.Ceil(float64(len(batch.Topics)) / float64(maxTopicsPerRequest)))
|
||||||
|
workWg.Add(allWorks)
|
||||||
|
|
||||||
for i := 0; i < len(batch.Topics); i += maxTopicsPerRequest {
|
for i := 0; i < len(batch.Topics); i += maxTopicsPerRequest {
|
||||||
j := i + maxTopicsPerRequest
|
j := i + maxTopicsPerRequest
|
||||||
if j > len(batch.Topics) {
|
if j > len(batch.Topics) {
|
||||||
j = len(batch.Topics)
|
j = len(batch.Topics)
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Add(1)
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
logger.Debug("processBatch producer - context done")
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
logger.Debug("processBatch producer - creating work")
|
||||||
workCh <- work{
|
workCh <- work{
|
||||||
topics: batch.Topics[i:j],
|
topics: batch.Topics[i:j],
|
||||||
}
|
}
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
workWg.Wait()
|
||||||
|
workCompleteCh <- struct{}{}
|
||||||
|
}()
|
||||||
|
|
||||||
|
logger.Debug("processBatch producer complete")
|
||||||
|
}()
|
||||||
|
|
||||||
|
var result error
|
||||||
|
|
||||||
|
loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
logger.Debug("processBatch cleanup - context done")
|
||||||
|
result = ctx.Err()
|
||||||
|
if errors.Is(result, context.Canceled) {
|
||||||
|
result = nil
|
||||||
|
}
|
||||||
|
break loop
|
||||||
|
case w, ok := <-workCh:
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Debug("processBatch - received work")
|
||||||
|
semaphore <- 1
|
||||||
|
go func(w work) { // Consumer
|
||||||
|
defer func() {
|
||||||
|
workWg.Done()
|
||||||
|
<-semaphore
|
||||||
|
}()
|
||||||
|
|
||||||
|
queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout)
|
||||||
|
cursor, storeCursor, err := messageRequester.SendMessagesRequestForTopics(queryCtx, mailserverID, batch.From, batch.To, w.cursor, w.storeCursor, w.topics, true)
|
||||||
|
|
||||||
|
queryCancel()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
logger.Debug("failed to send request", zap.Error(err))
|
||||||
|
errCh <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(cursor) != 0 || storeCursor != nil {
|
||||||
|
logger.Debug("processBatch producer - creating work (cursor)")
|
||||||
|
|
||||||
|
workWg.Add(1)
|
||||||
|
workCh <- work{
|
||||||
|
topics: w.topics,
|
||||||
|
cursor: cursor,
|
||||||
|
storeCursor: storeCursor,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(w)
|
||||||
|
case err := <-errCh:
|
||||||
|
logger.Debug("processBatch - received error", zap.Error(err))
|
||||||
|
cancel() // Kill go routines
|
||||||
|
return err
|
||||||
|
case <-workCompleteCh:
|
||||||
|
logger.Debug("processBatch - all jobs complete")
|
||||||
|
cancel() // Kill go routines
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
close(workCh)
|
|
||||||
|
|
||||||
// NOTE(camellos): Disabling for now, not critical and I'd rather take a bit more time
|
// NOTE(camellos): Disabling for now, not critical and I'd rather take a bit more time
|
||||||
// to test it
|
// to test it
|
||||||
//logger.Info("waiting until message processed")
|
//logger.Info("waiting until message processed")
|
||||||
//m.waitUntilP2PMessagesProcessed()
|
//m.waitUntilP2PMessagesProcessed()
|
||||||
|
|
||||||
result := <-errCh
|
|
||||||
|
|
||||||
logger.Info("synced topic", zap.NamedError("hasError", result))
|
logger.Info("synced topic", zap.NamedError("hasError", result))
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue