diff --git a/cmd/storemsgcounter/execute.go b/cmd/storemsgcounter/execute.go index fbf0dbf..9412ccc 100644 --- a/cmd/storemsgcounter/execute.go +++ b/cmd/storemsgcounter/execute.go @@ -156,6 +156,8 @@ func Execute(ctx context.Context, options Options) error { db: dbStore, } + metricsChan := make(chan *metricsSummary, 50) + go func() { missingMessagesTimer := time.NewTimer(0) defer missingMessagesTimer.Stop() @@ -170,7 +172,7 @@ func Execute(ctx context.Context, options Options) error { runIdLogger := logger.With(zap.String("runId", runId), zap.String("fleet", options.FleetName), zap.Uint("clusterID", options.ClusterID)) runIdLogger.Info("verifying message history...") - shouldResetTimer, err := application.verifyHistory(ctx, runId, storenodeIDs, runIdLogger) + shouldResetTimer, err := application.verifyHistory(ctx, runId, storenodeIDs, metricsChan, runIdLogger) if err != nil { runIdLogger.Error("could not verify message history", zap.Error(err)) } @@ -186,7 +188,6 @@ func Execute(ctx context.Context, options Options) error { }() go func() { - syncCheckTimer := time.NewTimer(0) defer syncCheckTimer.Stop() @@ -218,7 +219,34 @@ func Execute(ctx context.Context, options Options) error { }() } } + }() + go func() { + // Metrics accumulator + t := time.NewTicker(20 * time.Second) + defer t.Stop() + + metricsSummaryMissing := make(map[peer.ID]int) + metricsSummaryUnknown := make(map[peer.ID]int) + + for { + select { + case <-ctx.Done(): + return + case <-t.C: + for s, missingCnt := range metricsSummaryMissing { + metrics.RecordMissingMessages(s, "does_not_exist", missingCnt) + } + for s, unknownCnt := range metricsSummaryUnknown { + metrics.RecordMissingMessages(s, "unknown", unknownCnt) + } + metricsSummaryMissing = make(map[peer.ID]int) + metricsSummaryUnknown = make(map[peer.ID]int) + case m := <-metricsChan: + metricsSummaryMissing[m.storenode] += m.missingMessages + metricsSummaryUnknown[m.storenode] += m.unknownMessages + } + } }() <-ctx.Done() @@ -226,11 +254,17 @@ func Execute(ctx context.Context, options Options) error { return nil } +type metricsSummary struct { + storenode peer.ID + missingMessages int + unknownMessages int +} + var msgMapLock sync.Mutex var msgMap map[pb.MessageHash]map[peer.ID]MessageExistence var msgPubsubTopic map[pb.MessageHash]string -func (app *Application) verifyHistory(ctx context.Context, runId string, storenodes peer.IDSlice, logger *zap.Logger) (shouldResetTimer bool, err error) { +func (app *Application) verifyHistory(ctx context.Context, runId string, storenodes peer.IDSlice, metricsChan chan *metricsSummary, logger *zap.Logger) (shouldResetTimer bool, err error) { // [MessageHash][StoreNode] = exists? msgMapLock.Lock() @@ -371,7 +405,7 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno } if len(unknownIn) != 0 { - logger.Info("message with unknown state identified", zap.Stringer("hash", msgHash), zap.String("pubsubTopic", msgPubsubTopic[msgHash]), zap.Int("num_nodes", len(missingIn))) + logger.Info("message with unknown state identified", zap.Stringer("hash", msgHash), zap.String("pubsubTopic", msgPubsubTopic[msgHash]), zap.Int("num_nodes", len(unknownIn))) err = app.db.RecordMessage(runId, tx, msgHash, msgPubsubTopic[msgHash], unknownIn, "unknown") if err != nil { return false, err @@ -381,12 +415,15 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno for _, s := range storenodes { missingCnt := missingInSummary[s] - app.metrics.RecordMissingMessages(s, "does_not_exist", missingCnt) - logger.Info("missing message summary", zap.Stringer("storenode", s), zap.Int("numMsgs", missingCnt)) - unknownCnt := unknownInSummary[s] - app.metrics.RecordMissingMessages(s, "unknown", unknownCnt) - logger.Info("messages that could not be verified summary", zap.Stringer("storenode", s), zap.Int("numMsgs", missingCnt)) + + metricsChan <- &metricsSummary{ + storenode: s, + missingMessages: missingInSummary[s], + unknownMessages: unknownInSummary[s], + } + + logger.Info("missing message summary", zap.Stringer("storenode", s), zap.Int("missing", missingCnt), zap.Int("unknown", unknownCnt)) } logger.Info("total missing messages", zap.Int("total", totalMissingMessages))