fix: parallel execution of missing message verifier and sync recheck

This commit is contained in:
Richard Ramos 2024-08-08 11:10:46 -04:00
parent b7afc073c4
commit 167b65b68d
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760

View File

@ -147,52 +147,69 @@ func Execute(ctx context.Context, options Options) error {
db: dbStore,
}
missingMessagesTimer := time.NewTimer(0)
defer missingMessagesTimer.Stop()
go func() {
missingMessagesTimer := time.NewTimer(0)
defer missingMessagesTimer.Stop()
syncCheckTimer := time.NewTicker(30 * time.Minute)
defer syncCheckTimer.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-missingMessagesTimer.C:
tmpUUID := uuid.New()
runId := hex.EncodeToString(tmpUUID[:])
runIdLogger := logger.With(zap.String("runId", runId))
runIdLogger.Info("verifying message history...")
err := application.verifyHistory(ctx, runId, storenodeIDs, runIdLogger)
if err != nil {
return err
}
runIdLogger.Info("verification complete")
missingMessagesTimer.Reset(timeInterval)
case <-syncCheckTimer.C:
go func() {
for {
select {
case <-ctx.Done():
return
case <-missingMessagesTimer.C:
tmpUUID := uuid.New()
runId := hex.EncodeToString(tmpUUID[:])
runIdLogger := logger.With(zap.String("syncRunId", runId))
runIdLogger.Info("rechecking missing messages status")
runIdLogger := logger.With(zap.String("runId", runId))
err := application.checkMissingMessageStatus(ctx, storenodeIDs, runId, runIdLogger)
runIdLogger.Info("verifying message history...")
err := application.verifyHistory(ctx, runId, storenodeIDs, runIdLogger)
if err != nil {
logger.Error("could not recheck the status of missing messages", zap.Error(err))
return
runIdLogger.Error("could not verify message history", zap.Error(err))
}
runIdLogger.Info("verification complete")
err = application.countMissingMessages(storenodeIDs)
if err != nil {
logger.Error("could not count missing messages", zap.Error(err))
return
}
runIdLogger.Info("missing messages recheck complete")
}()
missingMessagesTimer.Reset(timeInterval)
}
}
}
}()
go func() {
syncCheckTimer := time.NewTicker(30 * time.Minute)
defer syncCheckTimer.Stop()
for {
select {
case <-ctx.Done():
return
case <-syncCheckTimer.C:
go func() {
tmpUUID := uuid.New()
runId := hex.EncodeToString(tmpUUID[:])
runIdLogger := logger.With(zap.String("syncRunId", runId))
runIdLogger.Info("rechecking missing messages status")
err := application.checkMissingMessageStatus(ctx, storenodeIDs, runId, runIdLogger)
if err != nil {
logger.Error("could not recheck the status of missing messages", zap.Error(err))
return
}
err = application.countMissingMessages(storenodeIDs)
if err != nil {
logger.Error("could not count missing messages", zap.Error(err))
return
}
runIdLogger.Info("missing messages recheck complete")
}()
}
}
}()
<-ctx.Done()
return nil
}
var msgMapLock sync.Mutex