diff --git a/cmd/storemsgcounter/execute.go b/cmd/storemsgcounter/execute.go index 973c814..b0f647f 100644 --- a/cmd/storemsgcounter/execute.go +++ b/cmd/storemsgcounter/execute.go @@ -379,6 +379,10 @@ func (app *Application) fetchStoreNodeMessages(ctx context.Context, runId string func (app *Application) retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, topic string, lastSyncTimestamp *time.Time, tx *sql.Tx, logger *zap.Logger) { logger = logger.With(zap.String("topic", topic), zap.Timep("lastSyncTimestamp", lastSyncTimestamp)) + if lastSyncTimestamp != nil { + app.metrics.RecordLastSyncDate(topic, *lastSyncTimestamp) + } + now := app.node.Timesource().Now() // Query is done with a delay @@ -410,6 +414,9 @@ func (app *Application) retrieveHistory(ctx context.Context, runId string, store if err != nil { logger.Panic("could not update topic sync state", zap.Error(err)) } + + app.metrics.RecordLastSyncDate(topic, endTime) + } func (app *Application) verifyMessageExistence(ctx context.Context, runId string, peerID peer.ID, messageHashes []pb.MessageHash, logger *zap.Logger) { diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 7bb34a9..e446c66 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -1,6 +1,8 @@ package metrics import ( + "time" + "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/metricshelper" "github.com/prometheus/client_golang/prometheus" @@ -30,6 +32,14 @@ var storenodeAvailability = prometheus.NewGaugeVec( []string{"storenode"}, ) +var topicLastSync = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "msgcounter_topic_lastsyncdate_seconds", + Help: "Indicates the last syncdate for a pubsubtopic", + }, + []string{"pubsubtopic"}, +) + var collectors = []prometheus.Collector{ missingMessages, storenodeAvailability, @@ -40,6 +50,7 @@ type Metrics interface { RecordMissingMessages(peerID peer.ID, status string, length int) RecordStorenodeAvailability(peerID peer.ID, available bool) RecordTotalMissingMessages(cnt int) + RecordLastSyncDate(topic string, date time.Time) } type metricsImpl struct { @@ -76,3 +87,9 @@ func (m *metricsImpl) RecordTotalMissingMessages(cnt int) { totalMissingMessages.Set(float64(cnt)) }() } + +func (m *metricsImpl) RecordLastSyncDate(topic string, date time.Time) { + go func() { + topicLastSync.WithLabelValues(topic).Set(float64(date.Unix())) + }() +}