feat: improve metrics for filter, lightpush, dns discovery and store protocols

This commit is contained in:
Richard Ramos 2023-04-19 12:09:03 -04:00 committed by RichΛrd
parent 6550565afc
commit 68b615a87e
9 changed files with 287 additions and 62 deletions

View File

@ -51,11 +51,24 @@ func NewMetricsServer(address string, port int, log *zap.Logger) *Server {
// Register the views
if err := view.Register(
metrics.MessageView,
metrics.StoreMessagesView,
metrics.LegacyFilterErrorTypesView,
metrics.LegacyFilterMessagesView,
metrics.LegacyFilterSubscribersView,
metrics.LegacyFilterSubscriptionsView,
metrics.FilterSubscriptionsView,
metrics.FilterErrorTypesView,
metrics.FilterHandleMessageDurationView,
metrics.FilterMessagesView,
metrics.FilterRequestDurationView,
metrics.FilterRequestsView,
metrics.StoreErrorTypesView,
metrics.LightpushMessagesView,
metrics.LightpushErrorTypesView,
metrics.DnsDiscoveryNodesView,
metrics.DnsDiscoveryErrorTypesView,
metrics.StoreMessagesView,
metrics.StoreErrorTypesView,
metrics.StoreQueriesView,
metrics.PeersView,
metrics.DialsView,
metrics.VersionView,

View File

@ -7,6 +7,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/utils"
ma "github.com/multiformats/go-multiaddr"
@ -46,12 +47,14 @@ func RetrieveNodes(ctx context.Context, url string, opts ...DnsDiscoveryOption)
tree, err := client.SyncTree(url)
if err != nil {
metrics.RecordDnsDiscoveryError(ctx, "tree_sync_failure")
return nil, err
}
for _, node := range tree.Nodes() {
peerID, m, err := utils.Multiaddress(node)
if err != nil {
metrics.RecordDnsDiscoveryError(ctx, "peer_info_failure")
return nil, err
}

View File

@ -3,6 +3,7 @@ package metrics
import (
"context"
"fmt"
"time"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.opencensus.io/stats"
@ -12,16 +13,34 @@ import (
)
var (
WakuVersion = stats.Int64("waku_version", "", stats.UnitDimensionless)
Messages = stats.Int64("node_messages", "Number of messages received", stats.UnitDimensionless)
Peers = stats.Int64("peers", "Number of connected peers", stats.UnitDimensionless)
Dials = stats.Int64("dials", "Number of peer dials", stats.UnitDimensionless)
StoreMessages = stats.Int64("store_messages", "Number of historical messages", stats.UnitDimensionless)
FilterSubscriptions = stats.Int64("filter_subscriptions", "Number of filter subscriptions", stats.UnitDimensionless)
StoreErrors = stats.Int64("errors", "Number of errors in store protocol", stats.UnitDimensionless)
StoreQueries = stats.Int64("store_queries", "Number of store queries", stats.UnitDimensionless)
LightpushErrors = stats.Int64("errors", "Number of errors in lightpush protocol", stats.UnitDimensionless)
PeerExchangeError = stats.Int64("errors", "Number of errors in peer exchange protocol", stats.UnitDimensionless)
WakuVersion = stats.Int64("waku_version", "", stats.UnitDimensionless)
Messages = stats.Int64("node_messages", "Number of messages received", stats.UnitDimensionless)
Peers = stats.Int64("peers", "Number of connected peers", stats.UnitDimensionless)
Dials = stats.Int64("dials", "Number of peer dials", stats.UnitDimensionless)
LegacyFilterMessages = stats.Int64("legacy_filter_messages", "Number of legacy filter messages", stats.UnitDimensionless)
LegacyFilterSubscribers = stats.Int64("legacy_filter_subscribers", "Number of legacy filter subscribers", stats.UnitDimensionless)
LegacyFilterSubscriptions = stats.Int64("legacy_filter_subscriptions", "Number of legacy filter subscriptions", stats.UnitDimensionless)
LegacyFilterErrors = stats.Int64("legacy_filter_errors", "Number of errors in legacy filter protocol", stats.UnitDimensionless)
FilterMessages = stats.Int64("filter_messages", "Number of filter messages", stats.UnitDimensionless)
FilterRequests = stats.Int64("filter_requests", "Number of filter requests", stats.UnitDimensionless)
FilterSubscriptions = stats.Int64("filter_subscriptions", "Number of filter subscriptions", stats.UnitDimensionless)
FilterErrors = stats.Int64("filter_errors", "Number of errors in filter protocol", stats.UnitDimensionless)
FilterRequestDurationSeconds = stats.Int64("filter_request_duration_seconds", "Duration of Filter Subscribe Requests", stats.UnitSeconds)
FilterHandleMessageDurationSeconds = stats.Int64("filter_handle_msessageduration_seconds", "Duration to Push Message to Filter Subscribers", stats.UnitSeconds)
StoredMessages = stats.Int64("store_messages", "Number of historical messages", stats.UnitDimensionless)
StoreErrors = stats.Int64("errors", "Number of errors in store protocol", stats.UnitDimensionless)
StoreQueries = stats.Int64("store_queries", "Number of store queries", stats.UnitDimensionless)
LightpushMessages = stats.Int64("lightpush_messages", "Number of messages sent via lightpush protocol", stats.UnitDimensionless)
LightpushErrors = stats.Int64("errors", "Number of errors in lightpush protocol", stats.UnitDimensionless)
PeerExchangeError = stats.Int64("errors", "Number of errors in peer exchange protocol", stats.UnitDimensionless)
DnsDiscoveryNodes = stats.Int64("dnsdisc_nodes", "Number of discovered nodes", stats.UnitDimensionless)
DnsDiscoveryErrors = stats.Int64("errors", "Number of errors in dns discovery", stats.UnitDimensionless)
)
var (
@ -49,6 +68,7 @@ var (
Description: "The number of the messages received",
Aggregation: view.Count(),
}
StoreQueriesView = &view.View{
Name: "gowaku_store_queries",
Measure: StoreQueries,
@ -57,17 +77,11 @@ var (
}
StoreMessagesView = &view.View{
Name: "gowaku_store_messages",
Measure: StoreMessages,
Measure: StoredMessages,
Description: "The distribution of the store protocol messages",
Aggregation: view.LastValue(),
TagKeys: []tag.Key{KeyType},
}
FilterSubscriptionsView = &view.View{
Name: "gowaku_filter_subscriptions",
Measure: FilterSubscriptions,
Description: "The number of content filter subscriptions",
Aggregation: view.LastValue(),
}
StoreErrorTypesView = &view.View{
Name: "gowaku_store_errors",
Measure: StoreErrors,
@ -75,6 +89,82 @@ var (
Aggregation: view.Count(),
TagKeys: []tag.Key{ErrorType},
}
LegacyFilterSubscriptionsView = &view.View{
Name: "gowaku_legacy_filter_subscriptions",
Measure: LegacyFilterSubscriptions,
Description: "The number of legacy filter subscriptions",
Aggregation: view.Count(),
}
LegacyFilterSubscribersView = &view.View{
Name: "gowaku_legacy_filter_subscribers",
Measure: LegacyFilterSubscribers,
Description: "The number of legacy filter subscribers",
Aggregation: view.LastValue(),
}
LegacyFilterMessagesView = &view.View{
Name: "gowaku_legacy_filter_messages",
Measure: LegacyFilterMessages,
Description: "The distribution of the legacy filter protocol messages received",
Aggregation: view.Count(),
TagKeys: []tag.Key{KeyType},
}
LegacyFilterErrorTypesView = &view.View{
Name: "gowaku_legacy_filter_errors",
Measure: LegacyFilterErrors,
Description: "The distribution of the legacy filter protocol errors",
Aggregation: view.Count(),
TagKeys: []tag.Key{ErrorType},
}
FilterSubscriptionsView = &view.View{
Name: "gowaku_filter_subscriptions",
Measure: FilterSubscriptions,
Description: "The number of filter subscriptions",
Aggregation: view.Count(),
}
FilterRequestsView = &view.View{
Name: "gowaku_filter_requests",
Measure: FilterRequests,
Description: "The number of filter requests",
Aggregation: view.Count(),
}
FilterMessagesView = &view.View{
Name: "gowaku_filter_messages",
Measure: FilterMessages,
Description: "The distribution of the filter protocol messages received",
Aggregation: view.Count(),
TagKeys: []tag.Key{KeyType},
}
FilterErrorTypesView = &view.View{
Name: "gowaku_filter_errors",
Measure: FilterErrors,
Description: "The distribution of the filter protocol errors",
Aggregation: view.Count(),
TagKeys: []tag.Key{ErrorType},
}
FilterRequestDurationView = &view.View{
Name: "gowaku_filter_request_duration_seconds",
Measure: FilterRequestDurationSeconds,
Description: "Duration of Filter Subscribe Requests",
Aggregation: view.Count(),
TagKeys: []tag.Key{ErrorType},
}
FilterHandleMessageDurationView = &view.View{
Name: "gowaku_filter_handle_msessageduration_seconds",
Measure: FilterHandleMessageDurationSeconds,
Description: "Duration to Push Message to Filter Subscribers",
Aggregation: view.Count(),
TagKeys: []tag.Key{ErrorType},
}
LightpushMessagesView = &view.View{
Name: "gowaku_lightpush_messages",
Measure: StoredMessages,
Description: "The distribution of the lightpush protocol messages",
Aggregation: view.LastValue(),
TagKeys: []tag.Key{KeyType},
}
LightpushErrorTypesView = &view.View{
Name: "gowaku_lightpush_errors",
Measure: LightpushErrors,
@ -89,6 +179,19 @@ var (
Aggregation: view.LastValue(),
TagKeys: []tag.Key{GitVersion},
}
DnsDiscoveryNodesView = &view.View{
Name: "gowaku_dnsdisc_discovered",
Measure: DnsDiscoveryNodes,
Description: "The number of nodes discovered via DNS discovery",
Aggregation: view.Count(),
}
DnsDiscoveryErrorTypesView = &view.View{
Name: "gowaku_dnsdisc_errors",
Measure: DnsDiscoveryErrors,
Description: "The distribution of the dns discovery protocol errors",
Aggregation: view.Count(),
TagKeys: []tag.Key{ErrorType},
}
)
func recordWithTags(ctx context.Context, tagKey tag.Key, tagType string, ms stats.Measurement) {
@ -97,16 +200,53 @@ func recordWithTags(ctx context.Context, tagKey tag.Key, tagType string, ms stat
}
}
func RecordLightpushMessage(ctx context.Context, tagType string) {
if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(KeyType, tagType)}, LightpushMessages.M(1)); err != nil {
utils.Logger().Error("failed to record with tags", zap.Error(err))
}
}
func RecordLightpushError(ctx context.Context, tagType string) {
recordWithTags(ctx, ErrorType, tagType, LightpushErrors.M(1))
}
func RecordLegacyFilterError(ctx context.Context, tagType string) {
recordWithTags(ctx, ErrorType, tagType, LegacyFilterErrors.M(1))
}
func RecordFilterError(ctx context.Context, tagType string) {
recordWithTags(ctx, ErrorType, tagType, FilterErrors.M(1))
}
func RecordFilterRequest(ctx context.Context, tagType string, duration time.Duration) {
if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(KeyType, tagType)}, FilterRequests.M(1)); err != nil {
utils.Logger().Error("failed to record with tags", zap.Error(err))
}
FilterRequestDurationSeconds.M(int64(duration.Seconds()))
}
func RecordFilterMessage(ctx context.Context, tagType string, len int) {
if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(KeyType, tagType)}, FilterMessages.M(int64(len))); err != nil {
utils.Logger().Error("failed to record with tags", zap.Error(err))
}
}
func RecordLegacyFilterMessage(ctx context.Context, tagType string, len int) {
if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(KeyType, tagType)}, LegacyFilterMessages.M(int64(len))); err != nil {
utils.Logger().Error("failed to record with tags", zap.Error(err))
}
}
func RecordPeerExchangeError(ctx context.Context, tagType string) {
recordWithTags(ctx, ErrorType, tagType, PeerExchangeError.M(1))
}
func RecordMessage(ctx context.Context, tagType string, len int) {
if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(KeyType, tagType)}, StoreMessages.M(int64(len))); err != nil {
func RecordDnsDiscoveryError(ctx context.Context, tagType string) {
recordWithTags(ctx, ErrorType, tagType, DnsDiscoveryErrors.M(1))
}
func RecordStoreMessage(ctx context.Context, tagType string, len int) {
if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(KeyType, tagType)}, StoredMessages.M(int64(len))); err != nil {
utils.Logger().Error("failed to record with tags", zap.Error(err))
}
}

View File

@ -119,9 +119,12 @@ func (wf *WakuFilterLightnode) onRequest(ctx context.Context) func(s network.Str
err := reader.ReadMsg(messagePush)
if err != nil {
logger.Error("reading message push", zap.Error(err))
metrics.RecordFilterError(ctx, "decode_rpc_failure")
return
}
metrics.RecordFilterMessage(ctx, "PushMessage", 1)
wf.notify(s.Conn().RemotePeer(), messagePush.PubsubTopic, messagePush.WakuMessage)
logger.Info("received message push")
@ -141,12 +144,14 @@ func (wf *WakuFilterLightnode) notify(remotePeerID peer.ID, pubsubTopic string,
func (wf *WakuFilterLightnode) request(ctx context.Context, params *FilterSubscribeParameters, reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter ContentFilter) error {
err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(params.selectedPeer))
if err != nil {
metrics.RecordFilterError(ctx, "dial_failure")
return err
}
var conn network.Stream
conn, err = wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1)
if err != nil {
metrics.RecordFilterError(ctx, "dial_failure")
return err
}
defer conn.Close()
@ -164,6 +169,7 @@ func (wf *WakuFilterLightnode) request(ctx context.Context, params *FilterSubscr
wf.log.Debug("sending FilterSubscribeRequest", zap.Stringer("request", request))
err = writer.WriteMsg(request)
if err != nil {
metrics.RecordFilterError(ctx, "write_request_failure")
wf.log.Error("sending FilterSubscribeRequest", zap.Error(err))
return err
}
@ -172,10 +178,19 @@ func (wf *WakuFilterLightnode) request(ctx context.Context, params *FilterSubscr
err = reader.ReadMsg(filterSubscribeResponse)
if err != nil {
wf.log.Error("receiving FilterSubscribeResponse", zap.Error(err))
metrics.RecordFilterError(ctx, "decode_rpc_failure")
return err
}
if filterSubscribeResponse.RequestId != request.RequestId {
wf.log.Error("requestId mismatch", zap.String("expected", request.RequestId), zap.String("received", filterSubscribeResponse.RequestId))
metrics.RecordFilterError(ctx, "request_id_mismatch")
err := NewFilterError(300, "request_id_mismatch")
return &err
}
if filterSubscribeResponse.StatusCode != http.StatusOK {
metrics.RecordFilterError(ctx, "error_response")
err := NewFilterError(int(filterSubscribeResponse.StatusCode), filterSubscribeResponse.StatusDesc)
return &err
}
@ -208,6 +223,7 @@ func (wf *WakuFilterLightnode) Subscribe(ctx context.Context, contentFilter Cont
}
if params.selectedPeer == "" {
metrics.RecordFilterError(ctx, "peer_not_found_failure")
return nil, ErrNoPeersAvailable
}

View File

@ -7,6 +7,7 @@ import (
"math"
"net/http"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
@ -19,6 +20,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.uber.org/zap"
)
@ -101,28 +103,33 @@ func (wf *WakuFilterFullNode) onRequest(ctx context.Context) func(s network.Stre
subscribeRequest := &pb.FilterSubscribeRequest{}
err := reader.ReadMsg(subscribeRequest)
if err != nil {
metrics.RecordFilterError(ctx, "decode_rpc_failure")
logger.Error("reading request", zap.Error(err))
return
}
logger = logger.With(zap.String("requestID", subscribeRequest.RequestId))
start := time.Now()
switch subscribeRequest.FilterSubscribeType {
case pb.FilterSubscribeRequest_SUBSCRIBE:
wf.subscribe(s, logger, subscribeRequest)
wf.subscribe(ctx, s, logger, subscribeRequest)
case pb.FilterSubscribeRequest_SUBSCRIBER_PING:
wf.ping(s, logger, subscribeRequest)
wf.ping(ctx, s, logger, subscribeRequest)
case pb.FilterSubscribeRequest_UNSUBSCRIBE:
wf.unsubscribe(s, logger, subscribeRequest)
wf.unsubscribe(ctx, s, logger, subscribeRequest)
case pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL:
wf.unsubscribeAll(s, logger, subscribeRequest)
wf.unsubscribeAll(ctx, s, logger, subscribeRequest)
}
metrics.RecordFilterRequest(ctx, subscribeRequest.FilterSubscribeType.String(), time.Since(start))
logger.Info("received request")
}
}
func reply(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest, statusCode int, description ...string) {
func reply(ctx context.Context, s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest, statusCode int, description ...string) {
response := &pb.FilterSubscribeResponse{
RequestId: request.RequestId,
StatusCode: uint32(statusCode),
@ -137,37 +144,38 @@ func reply(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequ
writer := pbio.NewDelimitedWriter(s)
err := writer.WriteMsg(response)
if err != nil {
metrics.RecordFilterError(ctx, "write_response_failure")
logger.Error("sending response", zap.Error(err))
}
}
func (wf *WakuFilterFullNode) ping(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
func (wf *WakuFilterFullNode) ping(ctx context.Context, s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
exists := wf.subscriptions.Has(s.Conn().RemotePeer())
if exists {
reply(s, logger, request, http.StatusOK)
reply(ctx, s, logger, request, http.StatusOK)
} else {
reply(s, logger, request, http.StatusNotFound, peerHasNoSubscription)
reply(ctx, s, logger, request, http.StatusNotFound, peerHasNoSubscription)
}
}
func (wf *WakuFilterFullNode) subscribe(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
func (wf *WakuFilterFullNode) subscribe(ctx context.Context, s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
if request.PubsubTopic == "" {
reply(s, logger, request, http.StatusBadRequest, "pubsubtopic can't be empty")
reply(ctx, s, logger, request, http.StatusBadRequest, "pubsubtopic can't be empty")
return
}
if len(request.ContentTopics) == 0 {
reply(s, logger, request, http.StatusBadRequest, "at least one contenttopic should be specified")
reply(ctx, s, logger, request, http.StatusBadRequest, "at least one contenttopic should be specified")
return
}
if len(request.ContentTopics) > MaxContentTopicsPerRequest {
reply(s, logger, request, http.StatusBadRequest, fmt.Sprintf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest))
reply(ctx, s, logger, request, http.StatusBadRequest, fmt.Sprintf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest))
}
if wf.subscriptions.Count() >= wf.maxSubscriptions {
reply(s, logger, request, http.StatusServiceUnavailable, "node has reached maximum number of subscriptions")
reply(ctx, s, logger, request, http.StatusServiceUnavailable, "node has reached maximum number of subscriptions")
return
}
@ -180,45 +188,49 @@ func (wf *WakuFilterFullNode) subscribe(s network.Stream, logger *zap.Logger, re
}
if ctTotal+len(request.ContentTopics) > MaxCriteriaPerSubscription {
reply(s, logger, request, http.StatusServiceUnavailable, "peer has reached maximum number of filter criteria")
reply(ctx, s, logger, request, http.StatusServiceUnavailable, "peer has reached maximum number of filter criteria")
return
}
}
wf.subscriptions.Set(peerID, request.PubsubTopic, request.ContentTopics)
reply(s, logger, request, http.StatusOK)
stats.Record(ctx, metrics.FilterSubscriptions.M(int64(wf.subscriptions.Count())))
reply(ctx, s, logger, request, http.StatusOK)
}
func (wf *WakuFilterFullNode) unsubscribe(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
func (wf *WakuFilterFullNode) unsubscribe(ctx context.Context, s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
if request.PubsubTopic == "" {
reply(s, logger, request, http.StatusBadRequest, "pubsubtopic can't be empty")
reply(ctx, s, logger, request, http.StatusBadRequest, "pubsubtopic can't be empty")
return
}
if len(request.ContentTopics) == 0 {
reply(s, logger, request, http.StatusBadRequest, "at least one contenttopic should be specified")
reply(ctx, s, logger, request, http.StatusBadRequest, "at least one contenttopic should be specified")
return
}
if len(request.ContentTopics) > MaxContentTopicsPerRequest {
reply(s, logger, request, http.StatusBadRequest, fmt.Sprintf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest))
reply(ctx, s, logger, request, http.StatusBadRequest, fmt.Sprintf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest))
}
err := wf.subscriptions.Delete(s.Conn().RemotePeer(), request.PubsubTopic, request.ContentTopics)
if err != nil {
reply(s, logger, request, http.StatusNotFound, peerHasNoSubscription)
reply(ctx, s, logger, request, http.StatusNotFound, peerHasNoSubscription)
} else {
reply(s, logger, request, http.StatusOK)
stats.Record(ctx, metrics.FilterSubscriptions.M(int64(wf.subscriptions.Count())))
reply(ctx, s, logger, request, http.StatusOK)
}
}
func (wf *WakuFilterFullNode) unsubscribeAll(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
func (wf *WakuFilterFullNode) unsubscribeAll(ctx context.Context, s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
err := wf.subscriptions.DeleteAll(s.Conn().RemotePeer())
if err != nil {
reply(s, logger, request, http.StatusNotFound, peerHasNoSubscription)
reply(ctx, s, logger, request, http.StatusNotFound, peerHasNoSubscription)
} else {
reply(s, logger, request, http.StatusOK)
stats.Record(ctx, metrics.FilterSubscriptions.M(int64(wf.subscriptions.Count())))
reply(ctx, s, logger, request, http.StatusOK)
}
}
@ -242,10 +254,14 @@ func (wf *WakuFilterFullNode) filterListener(ctx context.Context) {
wf.wg.Add(1)
go func(subscriber peer.ID) {
defer wf.wg.Done()
start := time.Now()
err := wf.pushMessage(ctx, subscriber, envelope)
if err != nil {
logger.Error("pushing message", zap.Error(err))
return
}
ellapsed := time.Since(start)
metrics.FilterHandleMessageDurationSeconds.M(int64(ellapsed.Seconds()))
}(subscriber)
}
@ -274,6 +290,11 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, peerID peer.ID, e
err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(peerID))
if err != nil {
wf.subscriptions.FlagAsFailure(peerID)
if errors.Is(context.DeadlineExceeded, err) {
metrics.RecordFilterError(ctx, "push_timeout_failure")
} else {
metrics.RecordFilterError(ctx, "dial_failure")
}
logger.Error("connecting to peer", zap.Error(err))
return err
}
@ -281,9 +302,12 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, peerID peer.ID, e
conn, err := wf.h.NewStream(ctx, peerID, FilterPushID_v20beta1)
if err != nil {
wf.subscriptions.FlagAsFailure(peerID)
if errors.Is(context.DeadlineExceeded, err) {
metrics.RecordFilterError(ctx, "push_timeout_failure")
} else {
metrics.RecordFilterError(ctx, "dial_failure")
}
logger.Error("opening peer stream", zap.Error(err))
//waku_filter_errors.inc(labelValues = [dialFailure])
return err
}
@ -291,6 +315,11 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, peerID peer.ID, e
writer := pbio.NewDelimitedWriter(conn)
err = writer.WriteMsg(messagePush)
if err != nil {
if errors.Is(context.DeadlineExceeded, err) {
metrics.RecordFilterError(ctx, "push_timeout_failure")
} else {
metrics.RecordFilterError(ctx, "response_write_failure")
}
logger.Error("pushing messages to peer", zap.Error(err))
wf.subscriptions.FlagAsFailure(peerID)
return nil

View File

@ -125,6 +125,7 @@ func (wf *WakuFilter) onRequest(ctx context.Context) func(s network.Stream) {
err := reader.ReadMsg(filterRPCRequest)
if err != nil {
metrics.RecordLegacyFilterError(ctx, "decode_rpc_failure")
logger.Error("reading request", zap.Error(err))
return
}
@ -139,7 +140,7 @@ func (wf *WakuFilter) onRequest(ctx context.Context) func(s network.Stream) {
}
logger.Info("received a message push", zap.Int("messages", len(filterRPCRequest.Push.Messages)))
stats.Record(ctx, metrics.Messages.M(int64(len(filterRPCRequest.Push.Messages))))
metrics.RecordLegacyFilterMessage(ctx, "FilterRequest", len(filterRPCRequest.Push.Messages))
} else if filterRPCRequest.Request != nil && wf.isFullNode {
// We're on a full node.
// This is a filter request coming from a light node.
@ -152,13 +153,13 @@ func (wf *WakuFilter) onRequest(ctx context.Context) func(s network.Stream) {
len := wf.subscribers.Append(subscriber)
logger.Info("adding subscriber")
stats.Record(ctx, metrics.FilterSubscriptions.M(int64(len)))
stats.Record(ctx, metrics.LegacyFilterSubscribers.M(int64(len)))
} else {
peerId := s.Conn().RemotePeer()
wf.subscribers.RemoveContentFilters(peerId, filterRPCRequest.RequestId, filterRPCRequest.Request.ContentFilters)
logger.Info("removing subscriber")
stats.Record(ctx, metrics.FilterSubscriptions.M(int64(wf.subscribers.Length())))
stats.Record(ctx, metrics.LegacyFilterSubscribers.M(int64(wf.subscribers.Length())))
}
} else {
logger.Error("can't serve request")
@ -176,15 +177,15 @@ func (wf *WakuFilter) pushMessage(ctx context.Context, subscriber Subscriber, ms
if err != nil {
wf.subscribers.FlagAsFailure(subscriber.peer)
logger.Error("connecting to peer", zap.Error(err))
metrics.RecordLegacyFilterError(ctx, "dial_failure")
return err
}
conn, err := wf.h.NewStream(ctx, subscriber.peer, FilterID_v20beta1)
if err != nil {
wf.subscribers.FlagAsFailure(subscriber.peer)
logger.Error("opening peer stream", zap.Error(err))
//waku_filter_errors.inc(labelValues = [dialFailure])
metrics.RecordLegacyFilterError(ctx, "dial_failure")
return err
}
@ -194,6 +195,7 @@ func (wf *WakuFilter) pushMessage(ctx context.Context, subscriber Subscriber, ms
if err != nil {
logger.Error("pushing messages to peer", zap.Error(err))
wf.subscribers.FlagAsFailure(subscriber.peer)
metrics.RecordLegacyFilterError(ctx, "push_write_error")
return nil
}
@ -259,6 +261,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil
}
if params.selectedPeer == "" {
metrics.RecordLegacyFilterError(ctx, "peer_not_found_failure")
return nil, ErrNoPeersAvailable
}
@ -270,6 +273,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil
// We connect first so dns4 addresses are resolved (NewStream does not do it)
err = wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(params.selectedPeer))
if err != nil {
metrics.RecordLegacyFilterError(ctx, "dial_failure")
return
}
@ -282,6 +286,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil
var conn network.Stream
conn, err = wf.h.NewStream(ctx, params.selectedPeer, FilterID_v20beta1)
if err != nil {
metrics.RecordLegacyFilterError(ctx, "dial_failure")
return
}
@ -295,6 +300,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil
wf.log.Debug("sending filterRPC", zap.Stringer("rpc", filterRPC))
err = writer.WriteMsg(filterRPC)
if err != nil {
metrics.RecordLegacyFilterError(ctx, "request_write_error")
wf.log.Error("sending filterRPC", zap.Error(err))
return
}
@ -311,11 +317,13 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, contentFilter ContentFilt
// We connect first so dns4 addresses are resolved (NewStream does not do it)
err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(peer))
if err != nil {
metrics.RecordLegacyFilterError(ctx, "dial_failure")
return err
}
conn, err := wf.h.NewStream(ctx, peer, FilterID_v20beta1)
if err != nil {
metrics.RecordLegacyFilterError(ctx, "dial_failure")
return err
}
@ -339,6 +347,7 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, contentFilter ContentFilt
filterRPC := &pb.FilterRPC{RequestId: hex.EncodeToString(id), Request: request}
err = writer.WriteMsg(filterRPC)
if err != nil {
metrics.RecordLegacyFilterError(ctx, "request_write_error")
return err
}

View File

@ -81,7 +81,7 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(s network.Strea
err := reader.ReadMsg(requestPushRPC)
if err != nil {
logger.Error("reading request", zap.Error(err))
metrics.RecordLightpushError(ctx, "decodeRpcFailure")
metrics.RecordLightpushError(ctx, "decode_rpc_failure")
return
}
@ -93,6 +93,8 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(s network.Strea
pubSubTopic := requestPushRPC.Query.PubsubTopic
message := requestPushRPC.Query.Message
metrics.RecordLightpushMessage(ctx, "PushRequest")
// TODO: Assumes success, should probably be extended to check for network, peers, etc
// It might make sense to use WithReadiness option here?
@ -100,6 +102,7 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(s network.Strea
if err != nil {
logger.Error("publishing message", zap.Error(err))
metrics.RecordLightpushError(ctx, "message_push_failure")
response.Info = "Could not publish message"
} else {
response.IsSuccess = true
@ -112,11 +115,14 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(s network.Strea
err = writer.WriteMsg(responsePushRPC)
if err != nil {
metrics.RecordLightpushError(ctx, "response_write_failure")
logger.Error("writing response", zap.Error(err))
_ = s.Reset()
} else {
logger.Info("response sent")
}
} else {
metrics.RecordLightpushError(ctx, "empty_request_body_failure")
}
if requestPushRPC.Response != nil {
@ -125,6 +131,8 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(s network.Strea
} else {
logger.Info("request failure", zap.String("info=", requestPushRPC.Response.Info))
}
} else {
metrics.RecordLightpushError(ctx, "empty_response_body_failure")
}
}
}
@ -140,7 +148,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o
}
if params.selectedPeer == "" {
metrics.RecordLightpushError(ctx, "dialError")
metrics.RecordLightpushError(ctx, "peer_not_found_failure")
return nil, ErrNoPeersAvailable
}
@ -152,6 +160,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o
// We connect first so dns4 addresses are resolved (NewStream does not do it)
err := wakuLP.h.Connect(ctx, wakuLP.h.Peerstore().PeerInfo(params.selectedPeer))
if err != nil {
metrics.RecordLightpushError(ctx, "dial_failure")
logger.Error("connecting peer", zap.Error(err))
return nil, err
}
@ -159,7 +168,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o
connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1)
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
metrics.RecordLightpushError(ctx, "dialError")
metrics.RecordLightpushError(ctx, "dial_failure")
return nil, err
}
@ -167,7 +176,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o
defer func() {
err := connOpt.Reset()
if err != nil {
metrics.RecordLightpushError(ctx, "dialError")
metrics.RecordLightpushError(ctx, "dial_failure")
logger.Error("resetting connection", zap.Error(err))
}
}()
@ -179,6 +188,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o
err = writer.WriteMsg(pushRequestRPC)
if err != nil {
metrics.RecordLightpushError(ctx, "request_write_failure")
logger.Error("writing request", zap.Error(err))
return nil, err
}
@ -187,7 +197,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o
err = reader.ReadMsg(pushResponseRPC)
if err != nil {
logger.Error("reading response", zap.Error(err))
metrics.RecordLightpushError(ctx, "decodeRPCFailure")
metrics.RecordLightpushError(ctx, "decode_rpc_failure")
return nil, err
}

View File

@ -180,12 +180,14 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec
err := store.h.Connect(ctx, store.h.Peerstore().PeerInfo(selectedPeer))
if err != nil {
logger.Error("connecting to peer", zap.Error(err))
metrics.RecordStoreError(store.ctx, "dial_failure")
return nil, err
}
connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4)
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
metrics.RecordStoreError(store.ctx, "dial_failure")
return nil, err
}
@ -202,6 +204,7 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec
err = writer.WriteMsg(historyRequest)
if err != nil {
logger.Error("writing request", zap.Error(err))
metrics.RecordStoreError(store.ctx, "write_request_failure")
return nil, err
}
@ -209,7 +212,7 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec
err = reader.ReadMsg(historyResponseRPC)
if err != nil {
logger.Error("reading response", zap.Error(err))
metrics.RecordStoreError(store.ctx, "decodeRPCFailure")
metrics.RecordStoreError(store.ctx, "decode_rpc_failure")
return nil, err
}
@ -220,7 +223,7 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec
}, nil
}
metrics.RecordMessage(ctx, "retrieved", len(historyResponseRPC.Response.Messages))
metrics.RecordStoreMessage(ctx, "retrieved", len(historyResponseRPC.Response.Messages))
return historyResponseRPC.Response, nil
}
@ -275,6 +278,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
}
if !params.localQuery && params.selectedPeer == "" {
metrics.RecordStoreError(ctx, "peer_not_found_failure")
return nil, ErrNoPeersAvailable
}

View File

@ -120,7 +120,7 @@ func (store *WakuStore) Start(ctx context.Context) error {
err := store.msgProvider.Start(ctx, store.timesource) // TODO: store protocol should not start a message provider
if err != nil {
store.log.Error("Error starting message provider", zap.Error(err))
return nil
return err
}
store.started = true
@ -179,7 +179,7 @@ func (store *WakuStore) updateMetrics(ctx context.Context) {
if err != nil {
store.log.Error("updating store metrics", zap.Error(err))
} else {
metrics.RecordMessage(store.ctx, "stored", msgCount)
metrics.RecordStoreMessage(store.ctx, "stored", msgCount)
}
case <-ctx.Done():
return
@ -198,7 +198,7 @@ func (store *WakuStore) onRequest(s network.Stream) {
err := reader.ReadMsg(historyRPCRequest)
if err != nil {
logger.Error("reading request", zap.Error(err))
metrics.RecordStoreError(store.ctx, "decodeRPCFailure")
metrics.RecordStoreError(store.ctx, "decode_rpc_failure")
return
}
@ -207,7 +207,7 @@ func (store *WakuStore) onRequest(s network.Stream) {
logger = logger.With(logging.Filters(query.GetContentFilters()))
} else {
logger.Error("reading request", zap.Error(err))
metrics.RecordStoreError(store.ctx, "emptyRpcQueryFailure")
metrics.RecordStoreError(store.ctx, "empty_rpc_query_failure")
return
}
@ -222,6 +222,7 @@ func (store *WakuStore) onRequest(s network.Stream) {
err = writer.WriteMsg(historyResponseRPC)
if err != nil {
logger.Error("writing response", zap.Error(err), logging.PagingInfo(historyResponseRPC.Response.PagingInfo))
metrics.RecordStoreError(store.ctx, "response_write_failure")
_ = s.Reset()
} else {
logger.Info("response sent")