From 167b65b68d76728286032ed7c702a36c436e58b6 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 8 Aug 2024 11:10:46 -0400 Subject: [PATCH] fix: parallel execution of missing message verifier and sync recheck --- cmd/storemsgcounter/execute.go | 93 ++++++++++++++++++++-------------- 1 file changed, 55 insertions(+), 38 deletions(-) diff --git a/cmd/storemsgcounter/execute.go b/cmd/storemsgcounter/execute.go index 8ed2e66..d985bc4 100644 --- a/cmd/storemsgcounter/execute.go +++ b/cmd/storemsgcounter/execute.go @@ -147,52 +147,69 @@ func Execute(ctx context.Context, options Options) error { db: dbStore, } - missingMessagesTimer := time.NewTimer(0) - defer missingMessagesTimer.Stop() + go func() { + missingMessagesTimer := time.NewTimer(0) + defer missingMessagesTimer.Stop() - syncCheckTimer := time.NewTicker(30 * time.Minute) - defer syncCheckTimer.Stop() - - for { - select { - case <-ctx.Done(): - return nil - case <-missingMessagesTimer.C: - tmpUUID := uuid.New() - runId := hex.EncodeToString(tmpUUID[:]) - runIdLogger := logger.With(zap.String("runId", runId)) - - runIdLogger.Info("verifying message history...") - err := application.verifyHistory(ctx, runId, storenodeIDs, runIdLogger) - if err != nil { - return err - } - runIdLogger.Info("verification complete") - - missingMessagesTimer.Reset(timeInterval) - case <-syncCheckTimer.C: - go func() { + for { + select { + case <-ctx.Done(): + return + case <-missingMessagesTimer.C: tmpUUID := uuid.New() runId := hex.EncodeToString(tmpUUID[:]) - runIdLogger := logger.With(zap.String("syncRunId", runId)) - runIdLogger.Info("rechecking missing messages status") + runIdLogger := logger.With(zap.String("runId", runId)) - err := application.checkMissingMessageStatus(ctx, storenodeIDs, runId, runIdLogger) + runIdLogger.Info("verifying message history...") + err := application.verifyHistory(ctx, runId, storenodeIDs, runIdLogger) if err != nil { - logger.Error("could not recheck the status of missing messages", zap.Error(err)) - return + runIdLogger.Error("could not verify message history", zap.Error(err)) } + runIdLogger.Info("verification complete") - err = application.countMissingMessages(storenodeIDs) - if err != nil { - logger.Error("could not count missing messages", zap.Error(err)) - return - } - - runIdLogger.Info("missing messages recheck complete") - }() + missingMessagesTimer.Reset(timeInterval) + } } - } + }() + + go func() { + + syncCheckTimer := time.NewTicker(30 * time.Minute) + defer syncCheckTimer.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-syncCheckTimer.C: + go func() { + tmpUUID := uuid.New() + runId := hex.EncodeToString(tmpUUID[:]) + runIdLogger := logger.With(zap.String("syncRunId", runId)) + runIdLogger.Info("rechecking missing messages status") + + err := application.checkMissingMessageStatus(ctx, storenodeIDs, runId, runIdLogger) + if err != nil { + logger.Error("could not recheck the status of missing messages", zap.Error(err)) + return + } + + err = application.countMissingMessages(storenodeIDs) + if err != nil { + logger.Error("could not count missing messages", zap.Error(err)) + return + } + + runIdLogger.Info("missing messages recheck complete") + }() + } + } + + }() + + <-ctx.Done() + + return nil } var msgMapLock sync.Mutex