From d150123f21a02c12de1e5fb3e69763903ba5d677 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Sat, 30 Oct 2021 19:19:03 -0400 Subject: [PATCH] fix: store and lightpush metrics (#114) --- waku/metrics/http.go | 1 + waku/v2/metrics/metrics.go | 40 ++++++++++++++++++-- waku/v2/protocol/lightpush/waku_lightpush.go | 6 +++ waku/v2/protocol/store/waku_store.go | 33 ++++------------ 4 files changed, 51 insertions(+), 29 deletions(-) diff --git a/waku/metrics/http.go b/waku/metrics/http.go index d62bc706..4280a7a1 100644 --- a/waku/metrics/http.go +++ b/waku/metrics/http.go @@ -50,6 +50,7 @@ func NewMetricsServer(address string, port int) *Server { metrics.MessageView, metrics.FilterSubscriptionsView, metrics.StoreErrorTypesView, + metrics.LightpushErrorTypesView, metrics.StoreMessagesView, metrics.PeersView, metrics.DialsView, diff --git a/waku/v2/metrics/metrics.go b/waku/v2/metrics/metrics.go index 76ab699a..3a3d2a1d 100644 --- a/waku/v2/metrics/metrics.go +++ b/waku/v2/metrics/metrics.go @@ -1,23 +1,29 @@ package metrics import ( + "context" + + logging "github.com/ipfs/go-log" "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" ) +var log = logging.Logger("metrics") + var ( Messages = stats.Int64("node_messages", "Number of messages received", stats.UnitDimensionless) Peers = stats.Int64("peers", "Number of connected peers", stats.UnitDimensionless) Dials = stats.Int64("dials", "Number of peer dials", stats.UnitDimensionless) StoreMessages = stats.Int64("store_messages", "Number of historical messages", stats.UnitDimensionless) FilterSubscriptions = stats.Int64("filter_subscriptions", "Number of filter subscriptions", stats.UnitDimensionless) - Errors = stats.Int64("errors", "Number of errors", stats.UnitDimensionless) + StoreErrors = stats.Int64("errors", "Number of errors in store protocol", stats.UnitDimensionless) + LightpushErrors = stats.Int64("errors", "Number of errors in lightpush protocol", stats.UnitDimensionless) ) var ( - KeyType, _ = tag.NewKey("type") - KeyStoreErrorType, _ = tag.NewKey("store_error_type") + KeyType, _ = tag.NewKey("type") + ErrorType, _ = tag.NewKey("error_type") ) var ( @@ -44,6 +50,7 @@ var ( Measure: StoreMessages, Description: "The distribution of the store protocol messages", Aggregation: view.LastValue(), + TagKeys: []tag.Key{KeyType}, } FilterSubscriptionsView = &view.View{ Name: "gowaku_filter_subscriptions", @@ -53,9 +60,34 @@ var ( } StoreErrorTypesView = &view.View{ Name: "gowaku_store_errors", - Measure: Errors, + Measure: StoreErrors, Description: "The distribution of the store protocol errors", Aggregation: view.Count(), TagKeys: []tag.Key{KeyType}, } + LightpushErrorTypesView = &view.View{ + Name: "gowaku_lightpush_errors", + Measure: LightpushErrors, + Description: "The distribution of the lightpush protocol errors", + Aggregation: view.Count(), + TagKeys: []tag.Key{KeyType}, + } ) + +func RecordLightpushError(ctx context.Context, tagType string) { + if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(tag.Key(ErrorType), tagType)}, LightpushErrors.M(1)); err != nil { + log.Error("failed to record with tags", err) + } +} + +func RecordMessage(ctx context.Context, tagType string, len int) { + if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(KeyType, tagType)}, StoreMessages.M(int64(len))); err != nil { + log.Error("failed to record with tags", err) + } +} + +func RecordStoreError(ctx context.Context, tagType string) { + if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(ErrorType, tagType)}, StoreErrors.M(1)); err != nil { + log.Error("failed to record with tags", err) + } +} diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 8654be72..9773763a 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-msgio/protoio" + "github.com/status-im/go-waku/waku/v2/metrics" "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/relay" @@ -60,6 +61,7 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) { err := reader.ReadMsg(requestPushRPC) if err != nil { log.Error("error reading request", err) + metrics.RecordLightpushError(wakuLP.ctx, "decodeRpcFailure") return } @@ -166,6 +168,7 @@ func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, o } if params.selectedPeer == "" { + metrics.RecordLightpushError(wakuLP.ctx, "dialError") return nil, ErrNoPeersAvailable } @@ -176,6 +179,7 @@ func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, o connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1) if err != nil { log.Info("failed to connect to remote peer", err) + metrics.RecordLightpushError(wakuLP.ctx, "dialError") return nil, err } @@ -183,6 +187,7 @@ func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, o defer func() { err := connOpt.Reset() if err != nil { + metrics.RecordLightpushError(wakuLP.ctx, "dialError") log.Error("failed to reset connection", err) } }() @@ -202,6 +207,7 @@ func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, o err = reader.ReadMsg(pushResponseRPC) if err != nil { log.Error("could not read response", err) + metrics.RecordLightpushError(wakuLP.ctx, "decodeRPCFailure") return nil, err } diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index 62c77d9b..dfa7e4f6 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -17,8 +17,6 @@ import ( "github.com/libp2p/go-libp2p-core/peer" libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-msgio/protoio" - "go.opencensus.io/stats" - "go.opencensus.io/tag" "github.com/status-im/go-waku/waku/persistence" "github.com/status-im/go-waku/waku/v2/metrics" @@ -250,10 +248,7 @@ func (store *WakuStore) fetchDBRecords(ctx context.Context) { storedMessages, err := store.msgProvider.GetAll() if err != nil { log.Error("could not load DBProvider messages", err) - err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "store_load_failure")}, metrics.Errors.M(1)) - if err != nil { - log.Error("failed to record with tags") - } + metrics.RecordStoreError(ctx, "store_load_failure") return } @@ -265,9 +260,7 @@ func (store *WakuStore) fetchDBRecords(ctx context.Context) { store.storeMessageWithIndex(storedMessage.PubsubTopic, idx, storedMessage.Message) - if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "stored")}, metrics.StoreMessages.M(int64(len(store.messages)))); err != nil { - log.Error("failed to record with tags") - } + metrics.RecordMessage(ctx, "stored", len(store.messages)) } } @@ -296,6 +289,7 @@ func (store *WakuStore) storeMessage(pubSubTopic string, msg *pb.WakuMessage) { store.storeMessageWithIndex(pubSubTopic, index, msg) if store.msgProvider == nil { + metrics.RecordMessage(store.ctx, "stored", len(store.messages)) return } @@ -303,16 +297,11 @@ func (store *WakuStore) storeMessage(pubSubTopic string, msg *pb.WakuMessage) { if err != nil { log.Error("could not store message", err) - if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "store_failure")}, metrics.Errors.M(1)); err != nil { - log.Error("failed to record with tags", err) - } + metrics.RecordStoreError(store.ctx, "store_failure") return } - if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "stored")}, metrics.StoreMessages.M(int64(len(store.messages)))); err != nil { - log.Error("failed to record with tags", err) - } - + metrics.RecordMessage(store.ctx, "stored", len(store.messages)) } func (store *WakuStore) storeIncomingMessages(ctx context.Context) { @@ -332,9 +321,7 @@ func (store *WakuStore) onRequest(s network.Stream) { err := reader.ReadMsg(historyRPCRequest) if err != nil { log.Error("error reading request", err) - if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "decodeRPCFailure")}, metrics.Errors.M(1)); err != nil { - log.Error("failed to record with tags", err) - } + metrics.RecordStoreError(store.ctx, "decodeRPCFailure") return } @@ -497,15 +484,11 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec err = reader.ReadMsg(historyResponseRPC) if err != nil { log.Error("could not read response", err) - if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "decodeRPCFailure")}, metrics.Errors.M(1)); err != nil { - log.Error("failed to record with tags") - } + metrics.RecordStoreError(store.ctx, "decodeRPCFailure") return nil, err } - if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "retrieved")}, metrics.StoreMessages.M(int64(len(store.messages)))); err != nil { - log.Error("failed to record with tags", err) - } + metrics.RecordMessage(ctx, "retrieved", len(store.messages)) return historyResponseRPC.Response, nil }