diff --git a/cmd/storemsgcounter/execute.go b/cmd/storemsgcounter/execute.go index 11bd581..cb91c5c 100644 --- a/cmd/storemsgcounter/execute.go +++ b/cmd/storemsgcounter/execute.go @@ -20,7 +20,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" - "github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/storenode-messages/internal/logging" "github.com/waku-org/storenode-messages/internal/metrics" "github.com/waku-org/storenode-messages/internal/persistence" @@ -224,20 +223,19 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo msgMapLock.Lock() defer msgMapLock.Unlock() - missingInSummary := make(map[string]int) - unknownInSummary := make(map[string]int) + missingInSummary := make(map[peer.ID]int) + unknownInSummary := make(map[peer.ID]int) for msgHash, nodes := range msgMap { - var missingIn []string - var unknownIn []string + var missingIn []peer.ID + var unknownIn []peer.ID for _, node := range storenodes { - storeAddr := utils.EncapsulatePeerID(node.ID, node.Addrs[0])[0].String() if nodes[node.ID] == DoesNotExist { - missingIn = append(missingIn, storeAddr) - missingInSummary[storeAddr]++ + missingIn = append(missingIn, node.ID) + missingInSummary[node.ID]++ } else if nodes[node.ID] == Unknown { - unknownIn = append(unknownIn, storeAddr) - unknownInSummary[storeAddr]++ + unknownIn = append(unknownIn, node.ID) + unknownInSummary[node.ID]++ } } @@ -260,12 +258,12 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo for s, cnt := range missingInSummary { metrics.RecordMissingMessages(s, "does_not_exist", cnt) - logger.Info("missing message summary", zap.String("storenode", s), zap.Int("numMsgs", cnt)) + logger.Info("missing message summary", zap.Stringer("storenode", s), zap.Int("numMsgs", cnt)) } for s, cnt := range unknownInSummary { metrics.RecordMissingMessages(s, "unknown", cnt) - logger.Info("messages that could not be verified summary", zap.String("storenode", s), zap.Int("numMsgs", cnt)) + logger.Info("messages that could not be verified summary", zap.Stringer("storenode", s), zap.Int("numMsgs", cnt)) } @@ -293,8 +291,6 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn for _, node := range storenodes { storeNodeFailure := false - storeAddr := utils.EncapsulatePeerID(node.ID, node.Addrs[0])[0].String() - var result *store.Result var err error @@ -325,13 +321,13 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn if storeNodeFailure { queryLogger.Error("storenode not available") - err := dbStore.RecordStorenodeUnavailable(runId, storeAddr) + err := dbStore.RecordStorenodeUnavailable(runId, node.ID) if err != nil { queryLogger.Error("could not store node unavailable", zap.Error(err)) } - metrics.RecordStorenodeAvailability(storeAddr, false) + metrics.RecordStorenodeAvailability(node.ID, false) } else { - metrics.RecordStorenodeAvailability(storeAddr, true) + metrics.RecordStorenodeAvailability(node.ID, true) iteratorLbl: for !result.IsComplete() { @@ -368,14 +364,14 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn if storeNodeFailure { queryLogger.Error("storenode not available", zap.String("cursor", hexutil.Encode(result.Cursor()))) - err := dbStore.RecordStorenodeUnavailable(runId, storeAddr) + err := dbStore.RecordStorenodeUnavailable(runId, node.ID) if err != nil { queryLogger.Error("could not store recordnode unavailable", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Error(err)) } - metrics.RecordStorenodeAvailability(storeAddr, false) + metrics.RecordStorenodeAvailability(node.ID, false) break iteratorLbl } else { - metrics.RecordStorenodeAvailability(storeAddr, true) + metrics.RecordStorenodeAvailability(node.ID, true) } } } @@ -396,8 +392,6 @@ func verifyMessageExistence(ctx context.Context, runId string, peerID peer.ID, m peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID) - storeAddr := utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs[0])[0].String() - queryLogger := logger.With(zap.Stringer("storenode", peerID)) queryLbl: @@ -420,14 +414,14 @@ queryLbl: if storeNodeFailure { queryLogger.Error("storenode not available") - err := dbStore.RecordStorenodeUnavailable(runId, storeAddr) + err := dbStore.RecordStorenodeUnavailable(runId, peerID) if err != nil { queryLogger.Error("could not store recordnode unavailable", zap.Error(err)) } - metrics.RecordStorenodeAvailability(storeAddr, false) + metrics.RecordStorenodeAvailability(peerID, false) } else { - metrics.RecordStorenodeAvailability(storeAddr, true) + metrics.RecordStorenodeAvailability(peerID, true) for !result.IsComplete() { msgMapLock.Lock() @@ -469,13 +463,13 @@ queryLbl: if storeNodeFailure { queryLogger.Error("storenode not available", zap.String("cursor", hexutil.Encode(result.Cursor()))) - err := dbStore.RecordStorenodeUnavailable(runId, storeAddr) + err := dbStore.RecordStorenodeUnavailable(runId, peerID) if err != nil { logger.Error("could not store recordnode unavailable", zap.Error(err), zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Stringer("storenode", peerInfo)) } - metrics.RecordStorenodeAvailability(storeAddr, false) + metrics.RecordStorenodeAvailability(peerID, false) } else { - metrics.RecordStorenodeAvailability(storeAddr, true) + metrics.RecordStorenodeAvailability(peerID, true) } } } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 782f43c..76fad20 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -1,6 +1,7 @@ package metrics import ( + "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/metricshelper" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -8,7 +9,7 @@ import ( var missingMessages = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Name: "counter_missing_messages", + Name: "msgcounter_missing_messages", Help: "The messages identified as missing and the reason why they're missing", }, []string{"storenode", "status"}, @@ -16,7 +17,7 @@ var missingMessages = prometheus.NewGaugeVec( var storenodeAvailability = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Name: "counter_storenode_availability", + Name: "msgcounter_storenode_availability", Help: "Indicate whether a store node is available or not", }, []string{"storenode"}, @@ -29,8 +30,8 @@ var collectors = []prometheus.Collector{ // Metrics exposes the functions required to update prometheus metrics for relay protocol type Metrics interface { - RecordMissingMessages(storenode string, status string, length int) - RecordStorenodeAvailability(storenode string, available bool) + RecordMissingMessages(peerID peer.ID, status string, length int) + RecordStorenodeAvailability(peerID peer.ID, available bool) } type metricsImpl struct { @@ -46,18 +47,18 @@ func NewMetrics(reg prometheus.Registerer, logger *zap.Logger) Metrics { } } -func (m *metricsImpl) RecordMissingMessages(storenode string, status string, length int) { +func (m *metricsImpl) RecordMissingMessages(peerID peer.ID, status string, length int) { go func() { - missingMessages.WithLabelValues(storenode, status).Set(float64(length)) + missingMessages.WithLabelValues(peerID.String(), status).Set(float64(length)) }() } -func (m *metricsImpl) RecordStorenodeAvailability(storenode string, available bool) { +func (m *metricsImpl) RecordStorenodeAvailability(peerID peer.ID, available bool) { go func() { gaugeValue := float64(1) if !available { gaugeValue = 0 } - storenodeAvailability.WithLabelValues(storenode).Set(gaugeValue) + storenodeAvailability.WithLabelValues(peerID.String()).Set(gaugeValue) }() } diff --git a/internal/persistence/database.go b/internal/persistence/database.go index dd341a9..76e46ea 100644 --- a/internal/persistence/database.go +++ b/internal/persistence/database.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/timesource" "go.uber.org/zap" @@ -250,7 +251,7 @@ func (d *DBStore) UpdateTopicSyncState(tx *sql.Tx, clusterID uint, topic string, return stmt.Close() } -func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, clusterID uint, topic string, storenodes []string, status string) error { +func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, clusterID uint, topic string, storenodes []peer.ID, status string) error { stmt, err := tx.Prepare("INSERT INTO missingMessages(runId, clusterId, pubsubTopic, messageHash, storenode, msgStatus, storedAt) VALUES ($1, $2, $3, $4, $5, $6, $7)") if err != nil { return err @@ -269,7 +270,7 @@ func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, return nil } -func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode string) error { +func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode peer.ID) error { stmt, err := d.db.Prepare("INSERT INTO storeNodeUnavailable(runId, storenode, requestTime) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING") if err != nil { return err