mirror of
https://github.com/logos-messaging/storenode-messages-counter.git
synced 2026-01-02 14:13:11 +00:00
refactor: use accumulator for metrics
This commit is contained in:
parent
cd36306b12
commit
8fc157f98c
@ -156,6 +156,8 @@ func Execute(ctx context.Context, options Options) error {
|
||||
db: dbStore,
|
||||
}
|
||||
|
||||
metricsChan := make(chan *metricsSummary, 50)
|
||||
|
||||
go func() {
|
||||
missingMessagesTimer := time.NewTimer(0)
|
||||
defer missingMessagesTimer.Stop()
|
||||
@ -170,7 +172,7 @@ func Execute(ctx context.Context, options Options) error {
|
||||
runIdLogger := logger.With(zap.String("runId", runId), zap.String("fleet", options.FleetName), zap.Uint("clusterID", options.ClusterID))
|
||||
|
||||
runIdLogger.Info("verifying message history...")
|
||||
shouldResetTimer, err := application.verifyHistory(ctx, runId, storenodeIDs, runIdLogger)
|
||||
shouldResetTimer, err := application.verifyHistory(ctx, runId, storenodeIDs, metricsChan, runIdLogger)
|
||||
if err != nil {
|
||||
runIdLogger.Error("could not verify message history", zap.Error(err))
|
||||
}
|
||||
@ -186,7 +188,6 @@ func Execute(ctx context.Context, options Options) error {
|
||||
}()
|
||||
|
||||
go func() {
|
||||
|
||||
syncCheckTimer := time.NewTimer(0)
|
||||
defer syncCheckTimer.Stop()
|
||||
|
||||
@ -218,7 +219,34 @@ func Execute(ctx context.Context, options Options) error {
|
||||
}()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
// Metrics accumulator
|
||||
t := time.NewTicker(20 * time.Second)
|
||||
defer t.Stop()
|
||||
|
||||
metricsSummaryMissing := make(map[peer.ID]int)
|
||||
metricsSummaryUnknown := make(map[peer.ID]int)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
for s, missingCnt := range metricsSummaryMissing {
|
||||
metrics.RecordMissingMessages(s, "does_not_exist", missingCnt)
|
||||
}
|
||||
for s, unknownCnt := range metricsSummaryUnknown {
|
||||
metrics.RecordMissingMessages(s, "unknown", unknownCnt)
|
||||
}
|
||||
metricsSummaryMissing = make(map[peer.ID]int)
|
||||
metricsSummaryUnknown = make(map[peer.ID]int)
|
||||
case m := <-metricsChan:
|
||||
metricsSummaryMissing[m.storenode] += m.missingMessages
|
||||
metricsSummaryUnknown[m.storenode] += m.unknownMessages
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
<-ctx.Done()
|
||||
@ -226,11 +254,17 @@ func Execute(ctx context.Context, options Options) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type metricsSummary struct {
|
||||
storenode peer.ID
|
||||
missingMessages int
|
||||
unknownMessages int
|
||||
}
|
||||
|
||||
var msgMapLock sync.Mutex
|
||||
var msgMap map[pb.MessageHash]map[peer.ID]MessageExistence
|
||||
var msgPubsubTopic map[pb.MessageHash]string
|
||||
|
||||
func (app *Application) verifyHistory(ctx context.Context, runId string, storenodes peer.IDSlice, logger *zap.Logger) (shouldResetTimer bool, err error) {
|
||||
func (app *Application) verifyHistory(ctx context.Context, runId string, storenodes peer.IDSlice, metricsChan chan *metricsSummary, logger *zap.Logger) (shouldResetTimer bool, err error) {
|
||||
|
||||
// [MessageHash][StoreNode] = exists?
|
||||
msgMapLock.Lock()
|
||||
@ -371,7 +405,7 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno
|
||||
}
|
||||
|
||||
if len(unknownIn) != 0 {
|
||||
logger.Info("message with unknown state identified", zap.Stringer("hash", msgHash), zap.String("pubsubTopic", msgPubsubTopic[msgHash]), zap.Int("num_nodes", len(missingIn)))
|
||||
logger.Info("message with unknown state identified", zap.Stringer("hash", msgHash), zap.String("pubsubTopic", msgPubsubTopic[msgHash]), zap.Int("num_nodes", len(unknownIn)))
|
||||
err = app.db.RecordMessage(runId, tx, msgHash, msgPubsubTopic[msgHash], unknownIn, "unknown")
|
||||
if err != nil {
|
||||
return false, err
|
||||
@ -381,12 +415,15 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno
|
||||
|
||||
for _, s := range storenodes {
|
||||
missingCnt := missingInSummary[s]
|
||||
app.metrics.RecordMissingMessages(s, "does_not_exist", missingCnt)
|
||||
logger.Info("missing message summary", zap.Stringer("storenode", s), zap.Int("numMsgs", missingCnt))
|
||||
|
||||
unknownCnt := unknownInSummary[s]
|
||||
app.metrics.RecordMissingMessages(s, "unknown", unknownCnt)
|
||||
logger.Info("messages that could not be verified summary", zap.Stringer("storenode", s), zap.Int("numMsgs", missingCnt))
|
||||
|
||||
metricsChan <- &metricsSummary{
|
||||
storenode: s,
|
||||
missingMessages: missingInSummary[s],
|
||||
unknownMessages: unknownInSummary[s],
|
||||
}
|
||||
|
||||
logger.Info("missing message summary", zap.Stringer("storenode", s), zap.Int("missing", missingCnt), zap.Int("unknown", unknownCnt))
|
||||
}
|
||||
|
||||
logger.Info("total missing messages", zap.Int("total", totalMissingMessages))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user