mirror of
https://github.com/logos-messaging/storenode-messages-counter.git
synced 2026-01-04 07:03:11 +00:00
chore: use peerID instead of address
This commit is contained in:
parent
d16fdea83d
commit
25432c3a8f
@ -20,7 +20,6 @@ import (
|
|||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
|
||||||
"github.com/waku-org/storenode-messages/internal/logging"
|
"github.com/waku-org/storenode-messages/internal/logging"
|
||||||
"github.com/waku-org/storenode-messages/internal/metrics"
|
"github.com/waku-org/storenode-messages/internal/metrics"
|
||||||
"github.com/waku-org/storenode-messages/internal/persistence"
|
"github.com/waku-org/storenode-messages/internal/persistence"
|
||||||
@ -224,20 +223,19 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo
|
|||||||
msgMapLock.Lock()
|
msgMapLock.Lock()
|
||||||
defer msgMapLock.Unlock()
|
defer msgMapLock.Unlock()
|
||||||
|
|
||||||
missingInSummary := make(map[string]int)
|
missingInSummary := make(map[peer.ID]int)
|
||||||
unknownInSummary := make(map[string]int)
|
unknownInSummary := make(map[peer.ID]int)
|
||||||
|
|
||||||
for msgHash, nodes := range msgMap {
|
for msgHash, nodes := range msgMap {
|
||||||
var missingIn []string
|
var missingIn []peer.ID
|
||||||
var unknownIn []string
|
var unknownIn []peer.ID
|
||||||
for _, node := range storenodes {
|
for _, node := range storenodes {
|
||||||
storeAddr := utils.EncapsulatePeerID(node.ID, node.Addrs[0])[0].String()
|
|
||||||
if nodes[node.ID] == DoesNotExist {
|
if nodes[node.ID] == DoesNotExist {
|
||||||
missingIn = append(missingIn, storeAddr)
|
missingIn = append(missingIn, node.ID)
|
||||||
missingInSummary[storeAddr]++
|
missingInSummary[node.ID]++
|
||||||
} else if nodes[node.ID] == Unknown {
|
} else if nodes[node.ID] == Unknown {
|
||||||
unknownIn = append(unknownIn, storeAddr)
|
unknownIn = append(unknownIn, node.ID)
|
||||||
unknownInSummary[storeAddr]++
|
unknownInSummary[node.ID]++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -260,12 +258,12 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo
|
|||||||
|
|
||||||
for s, cnt := range missingInSummary {
|
for s, cnt := range missingInSummary {
|
||||||
metrics.RecordMissingMessages(s, "does_not_exist", cnt)
|
metrics.RecordMissingMessages(s, "does_not_exist", cnt)
|
||||||
logger.Info("missing message summary", zap.String("storenode", s), zap.Int("numMsgs", cnt))
|
logger.Info("missing message summary", zap.Stringer("storenode", s), zap.Int("numMsgs", cnt))
|
||||||
}
|
}
|
||||||
|
|
||||||
for s, cnt := range unknownInSummary {
|
for s, cnt := range unknownInSummary {
|
||||||
metrics.RecordMissingMessages(s, "unknown", cnt)
|
metrics.RecordMissingMessages(s, "unknown", cnt)
|
||||||
logger.Info("messages that could not be verified summary", zap.String("storenode", s), zap.Int("numMsgs", cnt))
|
logger.Info("messages that could not be verified summary", zap.Stringer("storenode", s), zap.Int("numMsgs", cnt))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -293,8 +291,6 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn
|
|||||||
for _, node := range storenodes {
|
for _, node := range storenodes {
|
||||||
storeNodeFailure := false
|
storeNodeFailure := false
|
||||||
|
|
||||||
storeAddr := utils.EncapsulatePeerID(node.ID, node.Addrs[0])[0].String()
|
|
||||||
|
|
||||||
var result *store.Result
|
var result *store.Result
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
@ -325,13 +321,13 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn
|
|||||||
|
|
||||||
if storeNodeFailure {
|
if storeNodeFailure {
|
||||||
queryLogger.Error("storenode not available")
|
queryLogger.Error("storenode not available")
|
||||||
err := dbStore.RecordStorenodeUnavailable(runId, storeAddr)
|
err := dbStore.RecordStorenodeUnavailable(runId, node.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
queryLogger.Error("could not store node unavailable", zap.Error(err))
|
queryLogger.Error("could not store node unavailable", zap.Error(err))
|
||||||
}
|
}
|
||||||
metrics.RecordStorenodeAvailability(storeAddr, false)
|
metrics.RecordStorenodeAvailability(node.ID, false)
|
||||||
} else {
|
} else {
|
||||||
metrics.RecordStorenodeAvailability(storeAddr, true)
|
metrics.RecordStorenodeAvailability(node.ID, true)
|
||||||
|
|
||||||
iteratorLbl:
|
iteratorLbl:
|
||||||
for !result.IsComplete() {
|
for !result.IsComplete() {
|
||||||
@ -368,14 +364,14 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn
|
|||||||
|
|
||||||
if storeNodeFailure {
|
if storeNodeFailure {
|
||||||
queryLogger.Error("storenode not available", zap.String("cursor", hexutil.Encode(result.Cursor())))
|
queryLogger.Error("storenode not available", zap.String("cursor", hexutil.Encode(result.Cursor())))
|
||||||
err := dbStore.RecordStorenodeUnavailable(runId, storeAddr)
|
err := dbStore.RecordStorenodeUnavailable(runId, node.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
queryLogger.Error("could not store recordnode unavailable", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Error(err))
|
queryLogger.Error("could not store recordnode unavailable", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Error(err))
|
||||||
}
|
}
|
||||||
metrics.RecordStorenodeAvailability(storeAddr, false)
|
metrics.RecordStorenodeAvailability(node.ID, false)
|
||||||
break iteratorLbl
|
break iteratorLbl
|
||||||
} else {
|
} else {
|
||||||
metrics.RecordStorenodeAvailability(storeAddr, true)
|
metrics.RecordStorenodeAvailability(node.ID, true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -396,8 +392,6 @@ func verifyMessageExistence(ctx context.Context, runId string, peerID peer.ID, m
|
|||||||
|
|
||||||
peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID)
|
peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID)
|
||||||
|
|
||||||
storeAddr := utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs[0])[0].String()
|
|
||||||
|
|
||||||
queryLogger := logger.With(zap.Stringer("storenode", peerID))
|
queryLogger := logger.With(zap.Stringer("storenode", peerID))
|
||||||
|
|
||||||
queryLbl:
|
queryLbl:
|
||||||
@ -420,14 +414,14 @@ queryLbl:
|
|||||||
if storeNodeFailure {
|
if storeNodeFailure {
|
||||||
queryLogger.Error("storenode not available")
|
queryLogger.Error("storenode not available")
|
||||||
|
|
||||||
err := dbStore.RecordStorenodeUnavailable(runId, storeAddr)
|
err := dbStore.RecordStorenodeUnavailable(runId, peerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
queryLogger.Error("could not store recordnode unavailable", zap.Error(err))
|
queryLogger.Error("could not store recordnode unavailable", zap.Error(err))
|
||||||
}
|
}
|
||||||
metrics.RecordStorenodeAvailability(storeAddr, false)
|
metrics.RecordStorenodeAvailability(peerID, false)
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
metrics.RecordStorenodeAvailability(storeAddr, true)
|
metrics.RecordStorenodeAvailability(peerID, true)
|
||||||
|
|
||||||
for !result.IsComplete() {
|
for !result.IsComplete() {
|
||||||
msgMapLock.Lock()
|
msgMapLock.Lock()
|
||||||
@ -469,13 +463,13 @@ queryLbl:
|
|||||||
|
|
||||||
if storeNodeFailure {
|
if storeNodeFailure {
|
||||||
queryLogger.Error("storenode not available", zap.String("cursor", hexutil.Encode(result.Cursor())))
|
queryLogger.Error("storenode not available", zap.String("cursor", hexutil.Encode(result.Cursor())))
|
||||||
err := dbStore.RecordStorenodeUnavailable(runId, storeAddr)
|
err := dbStore.RecordStorenodeUnavailable(runId, peerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("could not store recordnode unavailable", zap.Error(err), zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Stringer("storenode", peerInfo))
|
logger.Error("could not store recordnode unavailable", zap.Error(err), zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Stringer("storenode", peerInfo))
|
||||||
}
|
}
|
||||||
metrics.RecordStorenodeAvailability(storeAddr, false)
|
metrics.RecordStorenodeAvailability(peerID, false)
|
||||||
} else {
|
} else {
|
||||||
metrics.RecordStorenodeAvailability(storeAddr, true)
|
metrics.RecordStorenodeAvailability(peerID, true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
package metrics
|
package metrics
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/libp2p/go-libp2p/p2p/metricshelper"
|
"github.com/libp2p/go-libp2p/p2p/metricshelper"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
@ -8,7 +9,7 @@ import (
|
|||||||
|
|
||||||
var missingMessages = prometheus.NewGaugeVec(
|
var missingMessages = prometheus.NewGaugeVec(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
Name: "counter_missing_messages",
|
Name: "msgcounter_missing_messages",
|
||||||
Help: "The messages identified as missing and the reason why they're missing",
|
Help: "The messages identified as missing and the reason why they're missing",
|
||||||
},
|
},
|
||||||
[]string{"storenode", "status"},
|
[]string{"storenode", "status"},
|
||||||
@ -16,7 +17,7 @@ var missingMessages = prometheus.NewGaugeVec(
|
|||||||
|
|
||||||
var storenodeAvailability = prometheus.NewGaugeVec(
|
var storenodeAvailability = prometheus.NewGaugeVec(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
Name: "counter_storenode_availability",
|
Name: "msgcounter_storenode_availability",
|
||||||
Help: "Indicate whether a store node is available or not",
|
Help: "Indicate whether a store node is available or not",
|
||||||
},
|
},
|
||||||
[]string{"storenode"},
|
[]string{"storenode"},
|
||||||
@ -29,8 +30,8 @@ var collectors = []prometheus.Collector{
|
|||||||
|
|
||||||
// Metrics exposes the functions required to update prometheus metrics for relay protocol
|
// Metrics exposes the functions required to update prometheus metrics for relay protocol
|
||||||
type Metrics interface {
|
type Metrics interface {
|
||||||
RecordMissingMessages(storenode string, status string, length int)
|
RecordMissingMessages(peerID peer.ID, status string, length int)
|
||||||
RecordStorenodeAvailability(storenode string, available bool)
|
RecordStorenodeAvailability(peerID peer.ID, available bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
type metricsImpl struct {
|
type metricsImpl struct {
|
||||||
@ -46,18 +47,18 @@ func NewMetrics(reg prometheus.Registerer, logger *zap.Logger) Metrics {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *metricsImpl) RecordMissingMessages(storenode string, status string, length int) {
|
func (m *metricsImpl) RecordMissingMessages(peerID peer.ID, status string, length int) {
|
||||||
go func() {
|
go func() {
|
||||||
missingMessages.WithLabelValues(storenode, status).Set(float64(length))
|
missingMessages.WithLabelValues(peerID.String(), status).Set(float64(length))
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *metricsImpl) RecordStorenodeAvailability(storenode string, available bool) {
|
func (m *metricsImpl) RecordStorenodeAvailability(peerID peer.ID, available bool) {
|
||||||
go func() {
|
go func() {
|
||||||
gaugeValue := float64(1)
|
gaugeValue := float64(1)
|
||||||
if !available {
|
if !available {
|
||||||
gaugeValue = 0
|
gaugeValue = 0
|
||||||
}
|
}
|
||||||
storenodeAvailability.WithLabelValues(storenode).Set(gaugeValue)
|
storenodeAvailability.WithLabelValues(peerID.String()).Set(gaugeValue)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|||||||
@ -6,6 +6,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
@ -250,7 +251,7 @@ func (d *DBStore) UpdateTopicSyncState(tx *sql.Tx, clusterID uint, topic string,
|
|||||||
return stmt.Close()
|
return stmt.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, clusterID uint, topic string, storenodes []string, status string) error {
|
func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, clusterID uint, topic string, storenodes []peer.ID, status string) error {
|
||||||
stmt, err := tx.Prepare("INSERT INTO missingMessages(runId, clusterId, pubsubTopic, messageHash, storenode, msgStatus, storedAt) VALUES ($1, $2, $3, $4, $5, $6, $7)")
|
stmt, err := tx.Prepare("INSERT INTO missingMessages(runId, clusterId, pubsubTopic, messageHash, storenode, msgStatus, storedAt) VALUES ($1, $2, $3, $4, $5, $6, $7)")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -269,7 +270,7 @@ func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode string) error {
|
func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode peer.ID) error {
|
||||||
stmt, err := d.db.Prepare("INSERT INTO storeNodeUnavailable(runId, storenode, requestTime) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING")
|
stmt, err := d.db.Prepare("INSERT INTO storeNodeUnavailable(runId, storenode, requestTime) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user