fix: store and lightpush metrics (#114)

This commit is contained in:
Richard Ramos 2021-10-30 19:19:03 -04:00 committed by GitHub
parent d94802f739
commit d150123f21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 51 additions and 29 deletions

View File

@ -50,6 +50,7 @@ func NewMetricsServer(address string, port int) *Server {
metrics.MessageView, metrics.MessageView,
metrics.FilterSubscriptionsView, metrics.FilterSubscriptionsView,
metrics.StoreErrorTypesView, metrics.StoreErrorTypesView,
metrics.LightpushErrorTypesView,
metrics.StoreMessagesView, metrics.StoreMessagesView,
metrics.PeersView, metrics.PeersView,
metrics.DialsView, metrics.DialsView,

View File

@ -1,23 +1,29 @@
package metrics package metrics
import ( import (
"context"
logging "github.com/ipfs/go-log"
"go.opencensus.io/stats" "go.opencensus.io/stats"
"go.opencensus.io/stats/view" "go.opencensus.io/stats/view"
"go.opencensus.io/tag" "go.opencensus.io/tag"
) )
var log = logging.Logger("metrics")
var ( var (
Messages = stats.Int64("node_messages", "Number of messages received", stats.UnitDimensionless) Messages = stats.Int64("node_messages", "Number of messages received", stats.UnitDimensionless)
Peers = stats.Int64("peers", "Number of connected peers", stats.UnitDimensionless) Peers = stats.Int64("peers", "Number of connected peers", stats.UnitDimensionless)
Dials = stats.Int64("dials", "Number of peer dials", stats.UnitDimensionless) Dials = stats.Int64("dials", "Number of peer dials", stats.UnitDimensionless)
StoreMessages = stats.Int64("store_messages", "Number of historical messages", stats.UnitDimensionless) StoreMessages = stats.Int64("store_messages", "Number of historical messages", stats.UnitDimensionless)
FilterSubscriptions = stats.Int64("filter_subscriptions", "Number of filter subscriptions", stats.UnitDimensionless) FilterSubscriptions = stats.Int64("filter_subscriptions", "Number of filter subscriptions", stats.UnitDimensionless)
Errors = stats.Int64("errors", "Number of errors", stats.UnitDimensionless) StoreErrors = stats.Int64("errors", "Number of errors in store protocol", stats.UnitDimensionless)
LightpushErrors = stats.Int64("errors", "Number of errors in lightpush protocol", stats.UnitDimensionless)
) )
var ( var (
KeyType, _ = tag.NewKey("type") KeyType, _ = tag.NewKey("type")
KeyStoreErrorType, _ = tag.NewKey("store_error_type") ErrorType, _ = tag.NewKey("error_type")
) )
var ( var (
@ -44,6 +50,7 @@ var (
Measure: StoreMessages, Measure: StoreMessages,
Description: "The distribution of the store protocol messages", Description: "The distribution of the store protocol messages",
Aggregation: view.LastValue(), Aggregation: view.LastValue(),
TagKeys: []tag.Key{KeyType},
} }
FilterSubscriptionsView = &view.View{ FilterSubscriptionsView = &view.View{
Name: "gowaku_filter_subscriptions", Name: "gowaku_filter_subscriptions",
@ -53,9 +60,34 @@ var (
} }
StoreErrorTypesView = &view.View{ StoreErrorTypesView = &view.View{
Name: "gowaku_store_errors", Name: "gowaku_store_errors",
Measure: Errors, Measure: StoreErrors,
Description: "The distribution of the store protocol errors", Description: "The distribution of the store protocol errors",
Aggregation: view.Count(), Aggregation: view.Count(),
TagKeys: []tag.Key{KeyType}, TagKeys: []tag.Key{KeyType},
} }
LightpushErrorTypesView = &view.View{
Name: "gowaku_lightpush_errors",
Measure: LightpushErrors,
Description: "The distribution of the lightpush protocol errors",
Aggregation: view.Count(),
TagKeys: []tag.Key{KeyType},
}
) )
func RecordLightpushError(ctx context.Context, tagType string) {
if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(tag.Key(ErrorType), tagType)}, LightpushErrors.M(1)); err != nil {
log.Error("failed to record with tags", err)
}
}
func RecordMessage(ctx context.Context, tagType string, len int) {
if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(KeyType, tagType)}, StoreMessages.M(int64(len))); err != nil {
log.Error("failed to record with tags", err)
}
}
func RecordStoreError(ctx context.Context, tagType string) {
if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(ErrorType, tagType)}, StoreErrors.M(1)); err != nil {
log.Error("failed to record with tags", err)
}
}

View File

@ -12,6 +12,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-msgio/protoio" "github.com/libp2p/go-msgio/protoio"
"github.com/status-im/go-waku/waku/v2/metrics"
"github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/status-im/go-waku/waku/v2/protocol/relay"
@ -60,6 +61,7 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
err := reader.ReadMsg(requestPushRPC) err := reader.ReadMsg(requestPushRPC)
if err != nil { if err != nil {
log.Error("error reading request", err) log.Error("error reading request", err)
metrics.RecordLightpushError(wakuLP.ctx, "decodeRpcFailure")
return return
} }
@ -166,6 +168,7 @@ func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, o
} }
if params.selectedPeer == "" { if params.selectedPeer == "" {
metrics.RecordLightpushError(wakuLP.ctx, "dialError")
return nil, ErrNoPeersAvailable return nil, ErrNoPeersAvailable
} }
@ -176,6 +179,7 @@ func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, o
connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1) connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1)
if err != nil { if err != nil {
log.Info("failed to connect to remote peer", err) log.Info("failed to connect to remote peer", err)
metrics.RecordLightpushError(wakuLP.ctx, "dialError")
return nil, err return nil, err
} }
@ -183,6 +187,7 @@ func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, o
defer func() { defer func() {
err := connOpt.Reset() err := connOpt.Reset()
if err != nil { if err != nil {
metrics.RecordLightpushError(wakuLP.ctx, "dialError")
log.Error("failed to reset connection", err) log.Error("failed to reset connection", err)
} }
}() }()
@ -202,6 +207,7 @@ func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, o
err = reader.ReadMsg(pushResponseRPC) err = reader.ReadMsg(pushResponseRPC)
if err != nil { if err != nil {
log.Error("could not read response", err) log.Error("could not read response", err)
metrics.RecordLightpushError(wakuLP.ctx, "decodeRPCFailure")
return nil, err return nil, err
} }

View File

@ -17,8 +17,6 @@ import (
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-msgio/protoio" "github.com/libp2p/go-msgio/protoio"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"github.com/status-im/go-waku/waku/persistence" "github.com/status-im/go-waku/waku/persistence"
"github.com/status-im/go-waku/waku/v2/metrics" "github.com/status-im/go-waku/waku/v2/metrics"
@ -250,10 +248,7 @@ func (store *WakuStore) fetchDBRecords(ctx context.Context) {
storedMessages, err := store.msgProvider.GetAll() storedMessages, err := store.msgProvider.GetAll()
if err != nil { if err != nil {
log.Error("could not load DBProvider messages", err) log.Error("could not load DBProvider messages", err)
err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "store_load_failure")}, metrics.Errors.M(1)) metrics.RecordStoreError(ctx, "store_load_failure")
if err != nil {
log.Error("failed to record with tags")
}
return return
} }
@ -265,9 +260,7 @@ func (store *WakuStore) fetchDBRecords(ctx context.Context) {
store.storeMessageWithIndex(storedMessage.PubsubTopic, idx, storedMessage.Message) store.storeMessageWithIndex(storedMessage.PubsubTopic, idx, storedMessage.Message)
if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "stored")}, metrics.StoreMessages.M(int64(len(store.messages)))); err != nil { metrics.RecordMessage(ctx, "stored", len(store.messages))
log.Error("failed to record with tags")
}
} }
} }
@ -296,6 +289,7 @@ func (store *WakuStore) storeMessage(pubSubTopic string, msg *pb.WakuMessage) {
store.storeMessageWithIndex(pubSubTopic, index, msg) store.storeMessageWithIndex(pubSubTopic, index, msg)
if store.msgProvider == nil { if store.msgProvider == nil {
metrics.RecordMessage(store.ctx, "stored", len(store.messages))
return return
} }
@ -303,16 +297,11 @@ func (store *WakuStore) storeMessage(pubSubTopic string, msg *pb.WakuMessage) {
if err != nil { if err != nil {
log.Error("could not store message", err) log.Error("could not store message", err)
if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "store_failure")}, metrics.Errors.M(1)); err != nil { metrics.RecordStoreError(store.ctx, "store_failure")
log.Error("failed to record with tags", err)
}
return return
} }
if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "stored")}, metrics.StoreMessages.M(int64(len(store.messages)))); err != nil { metrics.RecordMessage(store.ctx, "stored", len(store.messages))
log.Error("failed to record with tags", err)
}
} }
func (store *WakuStore) storeIncomingMessages(ctx context.Context) { func (store *WakuStore) storeIncomingMessages(ctx context.Context) {
@ -332,9 +321,7 @@ func (store *WakuStore) onRequest(s network.Stream) {
err := reader.ReadMsg(historyRPCRequest) err := reader.ReadMsg(historyRPCRequest)
if err != nil { if err != nil {
log.Error("error reading request", err) log.Error("error reading request", err)
if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "decodeRPCFailure")}, metrics.Errors.M(1)); err != nil { metrics.RecordStoreError(store.ctx, "decodeRPCFailure")
log.Error("failed to record with tags", err)
}
return return
} }
@ -497,15 +484,11 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec
err = reader.ReadMsg(historyResponseRPC) err = reader.ReadMsg(historyResponseRPC)
if err != nil { if err != nil {
log.Error("could not read response", err) log.Error("could not read response", err)
if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "decodeRPCFailure")}, metrics.Errors.M(1)); err != nil { metrics.RecordStoreError(store.ctx, "decodeRPCFailure")
log.Error("failed to record with tags")
}
return nil, err return nil, err
} }
if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "retrieved")}, metrics.StoreMessages.M(int64(len(store.messages)))); err != nil { metrics.RecordMessage(ctx, "retrieved", len(store.messages))
log.Error("failed to record with tags", err)
}
return historyResponseRPC.Response, nil return historyResponseRPC.Response, nil
} }