chore: refactor metrics

This commit is contained in:
Richard Ramos 2024-07-10 11:42:28 -04:00
parent 5347ec23ff
commit 73779e074e
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
3 changed files with 54 additions and 42 deletions

View File

@ -14,11 +14,13 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/peerstore"
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/waku/v2/dnsdisc" "github.com/waku-org/go-waku/waku/v2/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/utils"
"github.com/waku-org/storenode-messages/internal/logging" "github.com/waku-org/storenode-messages/internal/logging"
"github.com/waku-org/storenode-messages/internal/metrics" "github.com/waku-org/storenode-messages/internal/metrics"
"github.com/waku-org/storenode-messages/internal/persistence" "github.com/waku-org/storenode-messages/internal/persistence"
@ -110,6 +112,9 @@ func Execute(ctx context.Context, options Options) error {
if err != nil { if err != nil {
return err return err
} }
metrics := metrics.NewMetrics(prometheus.DefaultRegisterer, logger)
err = wakuNode.Start(ctx) err = wakuNode.Start(ctx)
if err != nil { if err != nil {
return err return err
@ -137,7 +142,7 @@ func Execute(ctx context.Context, options Options) error {
runIdLogger := logger.With(zap.String("runId", runId)) runIdLogger := logger.With(zap.String("runId", runId))
runIdLogger.Info("verifying message history...") runIdLogger.Info("verifying message history...")
err := verifyHistory(ctx, runId, storenodes, wakuNode, dbStore, runIdLogger) err := verifyHistory(ctx, runId, storenodes, wakuNode, dbStore, metrics, runIdLogger)
if err != nil { if err != nil {
return err return err
} }
@ -152,7 +157,7 @@ var msgMapLock sync.Mutex
var msgMap map[pb.MessageHash]map[peer.ID]MessageExistence var msgMap map[pb.MessageHash]map[peer.ID]MessageExistence
var msgPubsubTopic map[pb.MessageHash]string var msgPubsubTopic map[pb.MessageHash]string
func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, wakuNode *node.WakuNode, dbStore *persistence.DBStore, logger *zap.Logger) error { func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, wakuNode *node.WakuNode, dbStore *persistence.DBStore, metrics metrics.Metrics, logger *zap.Logger) error {
// [MessageHash][StoreNode] = exists? // [MessageHash][StoreNode] = exists?
msgMapLock.Lock() msgMapLock.Lock()
@ -184,7 +189,7 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo
wg.Add(1) wg.Add(1)
go func(topic string, lastSyncTimestamp *time.Time) { go func(topic string, lastSyncTimestamp *time.Time) {
defer wg.Done() defer wg.Done()
retrieveHistory(ctx, runId, storenodes, topic, lastSyncTimestamp, wakuNode, dbStore, tx, logger) retrieveHistory(ctx, runId, storenodes, topic, lastSyncTimestamp, wakuNode, dbStore, tx, metrics, logger)
}(topic, lastSyncTimestamp) }(topic, lastSyncTimestamp)
} }
wg.Wait() wg.Wait()
@ -208,7 +213,7 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo
wg.Add(1) wg.Add(1)
go func(peerID peer.ID, messageHashes []pb.MessageHash) { go func(peerID peer.ID, messageHashes []pb.MessageHash) {
defer wg.Done() defer wg.Done()
verifyMessageExistence(ctx, runId, peerID, messageHashes, wakuNode, dbStore, logger) verifyMessageExistence(ctx, runId, peerID, messageHashes, wakuNode, dbStore, metrics, logger)
}(peerID, messageHashes) }(peerID, messageHashes)
} }
wg.Wait() wg.Wait()
@ -219,14 +224,20 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo
msgMapLock.Lock() msgMapLock.Lock()
defer msgMapLock.Unlock() defer msgMapLock.Unlock()
missingInSummary := make(map[string]int)
unknownInSummary := make(map[string]int)
for msgHash, nodes := range msgMap { for msgHash, nodes := range msgMap {
var missingIn []peer.AddrInfo var missingIn []string
var unknownIn []peer.AddrInfo var unknownIn []string
for _, node := range storenodes { for _, node := range storenodes {
storeAddr := utils.EncapsulatePeerID(node.ID, node.Addrs[0])[0].String()
if nodes[node.ID] == DoesNotExist { if nodes[node.ID] == DoesNotExist {
missingIn = append(missingIn, node) missingIn = append(missingIn, storeAddr)
missingInSummary[storeAddr]++
} else if nodes[node.ID] == Unknown { } else if nodes[node.ID] == Unknown {
unknownIn = append(unknownIn, node) unknownIn = append(unknownIn, storeAddr)
unknownInSummary[storeAddr]++
} }
} }
@ -247,14 +258,18 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo
} }
} }
if err != nil { for s, cnt := range missingInSummary {
return err metrics.RecordMissingMessages(s, "does_not_exist", cnt)
}
for s, cnt := range unknownInSummary {
metrics.RecordMissingMessages(s, "unknown", cnt)
} }
return nil return nil
} }
func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, topic string, lastSyncTimestamp *time.Time, wakuNode *node.WakuNode, dbStore *persistence.DBStore, tx *sql.Tx, logger *zap.Logger) { func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, topic string, lastSyncTimestamp *time.Time, wakuNode *node.WakuNode, dbStore *persistence.DBStore, tx *sql.Tx, metrics metrics.Metrics, logger *zap.Logger) {
logger = logger.With(zap.String("topic", topic), zap.Timep("lastSyncTimestamp", lastSyncTimestamp)) logger = logger.With(zap.String("topic", topic), zap.Timep("lastSyncTimestamp", lastSyncTimestamp))
now := wakuNode.Timesource().Now() now := wakuNode.Timesource().Now()
@ -274,6 +289,9 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn
// Determine if the messages exist across all nodes // Determine if the messages exist across all nodes
for _, node := range storenodes { for _, node := range storenodes {
storeNodeFailure := false storeNodeFailure := false
storeAddr := utils.EncapsulatePeerID(node.ID, node.Addrs[0])[0].String()
var result *store.Result var result *store.Result
var err error var err error
@ -304,10 +322,11 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn
if storeNodeFailure { if storeNodeFailure {
queryLogger.Error("storenode not available") queryLogger.Error("storenode not available")
err := dbStore.RecordStorenodeUnavailable(runId, node) err := dbStore.RecordStorenodeUnavailable(runId, storeAddr)
if err != nil { if err != nil {
queryLogger.Error("could not store node unavailable", zap.Error(err)) queryLogger.Error("could not store node unavailable", zap.Error(err))
} }
metrics.RecordStorenodeUnavailable(storeAddr)
} else { } else {
iteratorLbl: iteratorLbl:
@ -345,14 +364,16 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn
if storeNodeFailure { if storeNodeFailure {
queryLogger.Error("storenode not available", zap.String("cursor", hexutil.Encode(result.Cursor()))) queryLogger.Error("storenode not available", zap.String("cursor", hexutil.Encode(result.Cursor())))
err := dbStore.RecordStorenodeUnavailable(runId, node) err := dbStore.RecordStorenodeUnavailable(runId, storeAddr)
if err != nil { if err != nil {
queryLogger.Error("could not store recordnode unavailable", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Error(err)) queryLogger.Error("could not store recordnode unavailable", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Error(err))
} }
metrics.RecordStorenodeUnavailable(storeAddr)
break iteratorLbl break iteratorLbl
} }
} }
} }
} }
// Update db with last sync time // Update db with last sync time
@ -362,13 +383,15 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn
} }
} }
func verifyMessageExistence(ctx context.Context, runId string, peerID peer.ID, messageHashes []pb.MessageHash, wakuNode *node.WakuNode, dbStore *persistence.DBStore, logger *zap.Logger) { func verifyMessageExistence(ctx context.Context, runId string, peerID peer.ID, messageHashes []pb.MessageHash, wakuNode *node.WakuNode, dbStore *persistence.DBStore, metrics metrics.Metrics, logger *zap.Logger) {
storeNodeFailure := false storeNodeFailure := false
var result *store.Result var result *store.Result
var err error var err error
peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID) peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID)
storeAddr := utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs[0])[0].String()
queryLogger := logger.With(zap.Stringer("storenode", peerID)) queryLogger := logger.With(zap.Stringer("storenode", peerID))
queryLbl: queryLbl:
@ -391,10 +414,12 @@ queryLbl:
if storeNodeFailure { if storeNodeFailure {
queryLogger.Error("storenode not available") queryLogger.Error("storenode not available")
err := dbStore.RecordStorenodeUnavailable(runId, peerInfo) err := dbStore.RecordStorenodeUnavailable(runId, storeAddr)
if err != nil { if err != nil {
queryLogger.Error("could not store recordnode unavailable", zap.Error(err)) queryLogger.Error("could not store recordnode unavailable", zap.Error(err))
} }
metrics.RecordStorenodeUnavailable(storeAddr)
} else { } else {
for !result.IsComplete() { for !result.IsComplete() {
msgMapLock.Lock() msgMapLock.Lock()
@ -436,10 +461,11 @@ queryLbl:
if storeNodeFailure { if storeNodeFailure {
queryLogger.Error("storenode not available", zap.String("cursor", hexutil.Encode(result.Cursor()))) queryLogger.Error("storenode not available", zap.String("cursor", hexutil.Encode(result.Cursor())))
err := dbStore.RecordStorenodeUnavailable(runId, peerInfo) err := dbStore.RecordStorenodeUnavailable(runId, storeAddr)
if err != nil { if err != nil {
logger.Error("could not store recordnode unavailable", zap.Error(err), zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Stringer("storenode", peerInfo)) logger.Error("could not store recordnode unavailable", zap.Error(err), zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Stringer("storenode", peerInfo))
} }
metrics.RecordStorenodeUnavailable(storeAddr)
} }
} }
} }

View File

@ -6,16 +6,16 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
var missingMessages = prometheus.NewCounterVec( var missingMessages = prometheus.NewGaugeVec(
prometheus.CounterOpts{ prometheus.GaugeOpts{
Name: "missing_messages", Name: "missing_messages",
Help: "The messages identified as missing and the reason why they're missing", Help: "The messages identified as missing and the reason why they're missing",
}, },
[]string{"storenode", "status"}, []string{"storenode", "status"},
) )
var storenodeUnavailable = prometheus.NewCounterVec( var storenodeUnavailable = prometheus.NewGaugeVec(
prometheus.CounterOpts{ prometheus.GaugeOpts{
Name: "storenode_unavailable", Name: "storenode_unavailable",
Help: "Number of PubSub Topics node is subscribed to", Help: "Number of PubSub Topics node is subscribed to",
}, },
@ -29,7 +29,7 @@ var collectors = []prometheus.Collector{
// Metrics exposes the functions required to update prometheus metrics for relay protocol // Metrics exposes the functions required to update prometheus metrics for relay protocol
type Metrics interface { type Metrics interface {
RecordMissingMessage(storenode string, status string) RecordMissingMessages(storenode string, status string, length int)
RecordStorenodeUnavailable(storenode string) RecordStorenodeUnavailable(storenode string)
} }
@ -46,14 +46,14 @@ func NewMetrics(reg prometheus.Registerer, logger *zap.Logger) Metrics {
} }
} }
func (m *metricsImpl) RecordMissingMessage(storenode string, status string) { func (m *metricsImpl) RecordMissingMessages(storenode string, status string, length int) {
go func() { go func() {
missingMessages.WithLabelValues(storenode, status).Inc() missingMessages.WithLabelValues(storenode, status).Set(float64(length))
}() }()
} }
func (m *metricsImpl) RecordStorenodeUnavailable(storenode string) { func (m *metricsImpl) RecordStorenodeUnavailable(storenode string) {
go func() { go func() {
storenodeUnavailable.WithLabelValues(storenode).Inc() storenodeUnavailable.WithLabelValues(storenode).Set(1)
}() }()
} }

View File

@ -6,12 +6,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"github.com/waku-org/storenode-messages/internal/metrics"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -20,7 +16,6 @@ type DBStore struct {
db *sql.DB db *sql.DB
migrationFn func(db *sql.DB, logger *zap.Logger) error migrationFn func(db *sql.DB, logger *zap.Logger) error
retentionPolicy time.Duration retentionPolicy time.Duration
metrics metrics.Metrics
timesource timesource.Timesource timesource timesource.Timesource
log *zap.Logger log *zap.Logger
@ -104,8 +99,6 @@ func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) {
optList := DefaultOptions() optList := DefaultOptions()
optList = append(optList, options...) optList = append(optList, options...)
result.metrics = metrics.NewMetrics(prometheus.DefaultRegisterer, log)
for _, opt := range optList { for _, opt := range optList {
err := opt(result) err := opt(result)
if err != nil { if err != nil {
@ -257,8 +250,8 @@ func (d *DBStore) UpdateTopicSyncState(tx *sql.Tx, clusterID uint, topic string,
return stmt.Close() return stmt.Close()
} }
func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, clusterID uint, topic string, timestamp uint64, storenodes []peer.AddrInfo, status string) error { func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, clusterID uint, topic string, storenodes []string, status string) error {
stmt, err := tx.Prepare("INSERT INTO missingMessages(runId, clusterId, pubsubTopic, messageHash, msgTimestamp, storenode, msgStatus, storedAt) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)") stmt, err := tx.Prepare("INSERT INTO missingMessages(runId, clusterId, pubsubTopic, messageHash, storenode, msgStatus, storedAt) VALUES ($1, $2, $3, $4, $5, $6, $7)")
if err != nil { if err != nil {
return err return err
} }
@ -266,35 +259,28 @@ func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash,
now := time.Now().UnixNano() now := time.Now().UnixNano()
for _, s := range storenodes { for _, s := range storenodes {
storeAddr := utils.EncapsulatePeerID(s.ID, s.Addrs[0])[0].String() _, err := stmt.Exec(uuid, clusterID, topic, msgHash.String(), s, status, now)
_, err := stmt.Exec(uuid, clusterID, topic, msgHash.String(), timestamp, storeAddr, status, now)
if err != nil { if err != nil {
return err return err
} }
d.metrics.RecordMissingMessage(storeAddr, status)
} }
return nil return nil
} }
func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode peer.AddrInfo) error { func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode string) error {
stmt, err := d.db.Prepare("INSERT INTO storeNodeUnavailable(runId, storenode, requestTime) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING") stmt, err := d.db.Prepare("INSERT INTO storeNodeUnavailable(runId, storenode, requestTime) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING")
if err != nil { if err != nil {
return err return err
} }
defer stmt.Close() defer stmt.Close()
storeAddr := utils.EncapsulatePeerID(storenode.ID, storenode.Addrs[0])[0].String()
now := time.Now().UnixNano() now := time.Now().UnixNano()
_, err = stmt.Exec(uuid, storenode, now) _, err = stmt.Exec(uuid, storenode, now)
if err != nil { if err != nil {
return err return err
} }
d.metrics.RecordStorenodeUnavailable(storeAddr)
return nil return nil
} }