diff --git a/cmd/storemsgcounter/execute.go b/cmd/storemsgcounter/execute.go index 2be66c8..186157b 100644 --- a/cmd/storemsgcounter/execute.go +++ b/cmd/storemsgcounter/execute.go @@ -14,11 +14,13 @@ import ( "github.com/google/uuid" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/waku/v2/dnsdisc" "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" + "github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/storenode-messages/internal/logging" "github.com/waku-org/storenode-messages/internal/metrics" "github.com/waku-org/storenode-messages/internal/persistence" @@ -110,6 +112,9 @@ func Execute(ctx context.Context, options Options) error { if err != nil { return err } + + metrics := metrics.NewMetrics(prometheus.DefaultRegisterer, logger) + err = wakuNode.Start(ctx) if err != nil { return err @@ -137,7 +142,7 @@ func Execute(ctx context.Context, options Options) error { runIdLogger := logger.With(zap.String("runId", runId)) runIdLogger.Info("verifying message history...") - err := verifyHistory(ctx, runId, storenodes, wakuNode, dbStore, runIdLogger) + err := verifyHistory(ctx, runId, storenodes, wakuNode, dbStore, metrics, runIdLogger) if err != nil { return err } @@ -152,7 +157,7 @@ var msgMapLock sync.Mutex var msgMap map[pb.MessageHash]map[peer.ID]MessageExistence var msgPubsubTopic map[pb.MessageHash]string -func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, wakuNode *node.WakuNode, dbStore *persistence.DBStore, logger *zap.Logger) error { +func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, wakuNode *node.WakuNode, dbStore *persistence.DBStore, metrics metrics.Metrics, logger *zap.Logger) error { // [MessageHash][StoreNode] = exists? msgMapLock.Lock() @@ -184,7 +189,7 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo wg.Add(1) go func(topic string, lastSyncTimestamp *time.Time) { defer wg.Done() - retrieveHistory(ctx, runId, storenodes, topic, lastSyncTimestamp, wakuNode, dbStore, tx, logger) + retrieveHistory(ctx, runId, storenodes, topic, lastSyncTimestamp, wakuNode, dbStore, tx, metrics, logger) }(topic, lastSyncTimestamp) } wg.Wait() @@ -208,7 +213,7 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo wg.Add(1) go func(peerID peer.ID, messageHashes []pb.MessageHash) { defer wg.Done() - verifyMessageExistence(ctx, runId, peerID, messageHashes, wakuNode, dbStore, logger) + verifyMessageExistence(ctx, runId, peerID, messageHashes, wakuNode, dbStore, metrics, logger) }(peerID, messageHashes) } wg.Wait() @@ -219,14 +224,20 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo msgMapLock.Lock() defer msgMapLock.Unlock() + missingInSummary := make(map[string]int) + unknownInSummary := make(map[string]int) + for msgHash, nodes := range msgMap { - var missingIn []peer.AddrInfo - var unknownIn []peer.AddrInfo + var missingIn []string + var unknownIn []string for _, node := range storenodes { + storeAddr := utils.EncapsulatePeerID(node.ID, node.Addrs[0])[0].String() if nodes[node.ID] == DoesNotExist { - missingIn = append(missingIn, node) + missingIn = append(missingIn, storeAddr) + missingInSummary[storeAddr]++ } else if nodes[node.ID] == Unknown { - unknownIn = append(unknownIn, node) + unknownIn = append(unknownIn, storeAddr) + unknownInSummary[storeAddr]++ } } @@ -247,14 +258,18 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo } } - if err != nil { - return err + for s, cnt := range missingInSummary { + metrics.RecordMissingMessages(s, "does_not_exist", cnt) + } + + for s, cnt := range unknownInSummary { + metrics.RecordMissingMessages(s, "unknown", cnt) } return nil } -func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, topic string, lastSyncTimestamp *time.Time, wakuNode *node.WakuNode, dbStore *persistence.DBStore, tx *sql.Tx, logger *zap.Logger) { +func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, topic string, lastSyncTimestamp *time.Time, wakuNode *node.WakuNode, dbStore *persistence.DBStore, tx *sql.Tx, metrics metrics.Metrics, logger *zap.Logger) { logger = logger.With(zap.String("topic", topic), zap.Timep("lastSyncTimestamp", lastSyncTimestamp)) now := wakuNode.Timesource().Now() @@ -274,6 +289,9 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn // Determine if the messages exist across all nodes for _, node := range storenodes { storeNodeFailure := false + + storeAddr := utils.EncapsulatePeerID(node.ID, node.Addrs[0])[0].String() + var result *store.Result var err error @@ -304,10 +322,11 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn if storeNodeFailure { queryLogger.Error("storenode not available") - err := dbStore.RecordStorenodeUnavailable(runId, node) + err := dbStore.RecordStorenodeUnavailable(runId, storeAddr) if err != nil { queryLogger.Error("could not store node unavailable", zap.Error(err)) } + metrics.RecordStorenodeUnavailable(storeAddr) } else { iteratorLbl: @@ -345,14 +364,16 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn if storeNodeFailure { queryLogger.Error("storenode not available", zap.String("cursor", hexutil.Encode(result.Cursor()))) - err := dbStore.RecordStorenodeUnavailable(runId, node) + err := dbStore.RecordStorenodeUnavailable(runId, storeAddr) if err != nil { queryLogger.Error("could not store recordnode unavailable", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Error(err)) } + metrics.RecordStorenodeUnavailable(storeAddr) break iteratorLbl } } } + } // Update db with last sync time @@ -362,13 +383,15 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn } } -func verifyMessageExistence(ctx context.Context, runId string, peerID peer.ID, messageHashes []pb.MessageHash, wakuNode *node.WakuNode, dbStore *persistence.DBStore, logger *zap.Logger) { +func verifyMessageExistence(ctx context.Context, runId string, peerID peer.ID, messageHashes []pb.MessageHash, wakuNode *node.WakuNode, dbStore *persistence.DBStore, metrics metrics.Metrics, logger *zap.Logger) { storeNodeFailure := false var result *store.Result var err error peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID) + storeAddr := utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs[0])[0].String() + queryLogger := logger.With(zap.Stringer("storenode", peerID)) queryLbl: @@ -391,10 +414,12 @@ queryLbl: if storeNodeFailure { queryLogger.Error("storenode not available") - err := dbStore.RecordStorenodeUnavailable(runId, peerInfo) + err := dbStore.RecordStorenodeUnavailable(runId, storeAddr) if err != nil { queryLogger.Error("could not store recordnode unavailable", zap.Error(err)) } + metrics.RecordStorenodeUnavailable(storeAddr) + } else { for !result.IsComplete() { msgMapLock.Lock() @@ -436,10 +461,11 @@ queryLbl: if storeNodeFailure { queryLogger.Error("storenode not available", zap.String("cursor", hexutil.Encode(result.Cursor()))) - err := dbStore.RecordStorenodeUnavailable(runId, peerInfo) + err := dbStore.RecordStorenodeUnavailable(runId, storeAddr) if err != nil { logger.Error("could not store recordnode unavailable", zap.Error(err), zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Stringer("storenode", peerInfo)) } + metrics.RecordStorenodeUnavailable(storeAddr) } } } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 2f20843..bb78457 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -6,16 +6,16 @@ import ( "go.uber.org/zap" ) -var missingMessages = prometheus.NewCounterVec( - prometheus.CounterOpts{ +var missingMessages = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ Name: "missing_messages", Help: "The messages identified as missing and the reason why they're missing", }, []string{"storenode", "status"}, ) -var storenodeUnavailable = prometheus.NewCounterVec( - prometheus.CounterOpts{ +var storenodeUnavailable = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ Name: "storenode_unavailable", Help: "Number of PubSub Topics node is subscribed to", }, @@ -29,7 +29,7 @@ var collectors = []prometheus.Collector{ // Metrics exposes the functions required to update prometheus metrics for relay protocol type Metrics interface { - RecordMissingMessage(storenode string, status string) + RecordMissingMessages(storenode string, status string, length int) RecordStorenodeUnavailable(storenode string) } @@ -46,14 +46,14 @@ func NewMetrics(reg prometheus.Registerer, logger *zap.Logger) Metrics { } } -func (m *metricsImpl) RecordMissingMessage(storenode string, status string) { +func (m *metricsImpl) RecordMissingMessages(storenode string, status string, length int) { go func() { - missingMessages.WithLabelValues(storenode, status).Inc() + missingMessages.WithLabelValues(storenode, status).Set(float64(length)) }() } func (m *metricsImpl) RecordStorenodeUnavailable(storenode string) { go func() { - storenodeUnavailable.WithLabelValues(storenode).Inc() + storenodeUnavailable.WithLabelValues(storenode).Set(1) }() } diff --git a/internal/persistence/database.go b/internal/persistence/database.go index 0d60edb..dd341a9 100644 --- a/internal/persistence/database.go +++ b/internal/persistence/database.go @@ -6,12 +6,8 @@ import ( "sync" "time" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/timesource" - "github.com/waku-org/go-waku/waku/v2/utils" - "github.com/waku-org/storenode-messages/internal/metrics" "go.uber.org/zap" ) @@ -20,7 +16,6 @@ type DBStore struct { db *sql.DB migrationFn func(db *sql.DB, logger *zap.Logger) error retentionPolicy time.Duration - metrics metrics.Metrics timesource timesource.Timesource log *zap.Logger @@ -104,8 +99,6 @@ func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) { optList := DefaultOptions() optList = append(optList, options...) - result.metrics = metrics.NewMetrics(prometheus.DefaultRegisterer, log) - for _, opt := range optList { err := opt(result) if err != nil { @@ -257,8 +250,8 @@ func (d *DBStore) UpdateTopicSyncState(tx *sql.Tx, clusterID uint, topic string, return stmt.Close() } -func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, clusterID uint, topic string, timestamp uint64, storenodes []peer.AddrInfo, status string) error { - stmt, err := tx.Prepare("INSERT INTO missingMessages(runId, clusterId, pubsubTopic, messageHash, msgTimestamp, storenode, msgStatus, storedAt) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)") +func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, clusterID uint, topic string, storenodes []string, status string) error { + stmt, err := tx.Prepare("INSERT INTO missingMessages(runId, clusterId, pubsubTopic, messageHash, storenode, msgStatus, storedAt) VALUES ($1, $2, $3, $4, $5, $6, $7)") if err != nil { return err } @@ -266,35 +259,28 @@ func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, now := time.Now().UnixNano() for _, s := range storenodes { - storeAddr := utils.EncapsulatePeerID(s.ID, s.Addrs[0])[0].String() - - _, err := stmt.Exec(uuid, clusterID, topic, msgHash.String(), timestamp, storeAddr, status, now) + _, err := stmt.Exec(uuid, clusterID, topic, msgHash.String(), s, status, now) if err != nil { return err } - d.metrics.RecordMissingMessage(storeAddr, status) } return nil } -func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode peer.AddrInfo) error { +func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode string) error { stmt, err := d.db.Prepare("INSERT INTO storeNodeUnavailable(runId, storenode, requestTime) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING") if err != nil { return err } defer stmt.Close() - storeAddr := utils.EncapsulatePeerID(storenode.ID, storenode.Addrs[0])[0].String() - now := time.Now().UnixNano() _, err = stmt.Exec(uuid, storenode, now) if err != nil { return err } - d.metrics.RecordStorenodeUnavailable(storeAddr) - return nil }