diff --git a/cmd/storemsgcounter/execute.go b/cmd/storemsgcounter/execute.go index 8ed2e66..d985bc4 100644 --- a/cmd/storemsgcounter/execute.go +++ b/cmd/storemsgcounter/execute.go @@ -147,52 +147,69 @@ func Execute(ctx context.Context, options Options) error { db: dbStore, } - missingMessagesTimer := time.NewTimer(0) - defer missingMessagesTimer.Stop() + go func() { + missingMessagesTimer := time.NewTimer(0) + defer missingMessagesTimer.Stop() - syncCheckTimer := time.NewTicker(30 * time.Minute) - defer syncCheckTimer.Stop() - - for { - select { - case <-ctx.Done(): - return nil - case <-missingMessagesTimer.C: - tmpUUID := uuid.New() - runId := hex.EncodeToString(tmpUUID[:]) - runIdLogger := logger.With(zap.String("runId", runId)) - - runIdLogger.Info("verifying message history...") - err := application.verifyHistory(ctx, runId, storenodeIDs, runIdLogger) - if err != nil { - return err - } - runIdLogger.Info("verification complete") - - missingMessagesTimer.Reset(timeInterval) - case <-syncCheckTimer.C: - go func() { + for { + select { + case <-ctx.Done(): + return + case <-missingMessagesTimer.C: tmpUUID := uuid.New() runId := hex.EncodeToString(tmpUUID[:]) - runIdLogger := logger.With(zap.String("syncRunId", runId)) - runIdLogger.Info("rechecking missing messages status") + runIdLogger := logger.With(zap.String("runId", runId)) - err := application.checkMissingMessageStatus(ctx, storenodeIDs, runId, runIdLogger) + runIdLogger.Info("verifying message history...") + err := application.verifyHistory(ctx, runId, storenodeIDs, runIdLogger) if err != nil { - logger.Error("could not recheck the status of missing messages", zap.Error(err)) - return + runIdLogger.Error("could not verify message history", zap.Error(err)) } + runIdLogger.Info("verification complete") - err = application.countMissingMessages(storenodeIDs) - if err != nil { - logger.Error("could not count missing messages", zap.Error(err)) - return - } - - runIdLogger.Info("missing messages recheck complete") - }() + missingMessagesTimer.Reset(timeInterval) + } } - } + }() + + go func() { + + syncCheckTimer := time.NewTicker(30 * time.Minute) + defer syncCheckTimer.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-syncCheckTimer.C: + go func() { + tmpUUID := uuid.New() + runId := hex.EncodeToString(tmpUUID[:]) + runIdLogger := logger.With(zap.String("syncRunId", runId)) + runIdLogger.Info("rechecking missing messages status") + + err := application.checkMissingMessageStatus(ctx, storenodeIDs, runId, runIdLogger) + if err != nil { + logger.Error("could not recheck the status of missing messages", zap.Error(err)) + return + } + + err = application.countMissingMessages(storenodeIDs) + if err != nil { + logger.Error("could not count missing messages", zap.Error(err)) + return + } + + runIdLogger.Info("missing messages recheck complete") + }() + } + } + + }() + + <-ctx.Done() + + return nil } var msgMapLock sync.Mutex