From 68b615a87e2a07a1ebc10b7ff15cce5c3244109e Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 19 Apr 2023 12:09:03 -0400 Subject: [PATCH] feat: improve metrics for filter, lightpush, dns discovery and store protocols --- waku/metrics/http.go | 15 +- waku/v2/dnsdisc/enr.go | 3 + waku/v2/metrics/metrics.go | 178 ++++++++++++++++-- waku/v2/protocol/filter/client.go | 16 ++ waku/v2/protocol/filter/server.go | 81 +++++--- waku/v2/protocol/legacy_filter/waku_filter.go | 19 +- waku/v2/protocol/lightpush/waku_lightpush.go | 20 +- waku/v2/protocol/store/waku_store_client.go | 8 +- waku/v2/protocol/store/waku_store_protocol.go | 9 +- 9 files changed, 287 insertions(+), 62 deletions(-) diff --git a/waku/metrics/http.go b/waku/metrics/http.go index b87599fd..735636e9 100644 --- a/waku/metrics/http.go +++ b/waku/metrics/http.go @@ -51,11 +51,24 @@ func NewMetricsServer(address string, port int, log *zap.Logger) *Server { // Register the views if err := view.Register( metrics.MessageView, - metrics.StoreMessagesView, + metrics.LegacyFilterErrorTypesView, + metrics.LegacyFilterMessagesView, + metrics.LegacyFilterSubscribersView, + metrics.LegacyFilterSubscriptionsView, metrics.FilterSubscriptionsView, + metrics.FilterErrorTypesView, + metrics.FilterHandleMessageDurationView, + metrics.FilterMessagesView, + metrics.FilterRequestDurationView, + metrics.FilterRequestsView, metrics.StoreErrorTypesView, + metrics.LightpushMessagesView, metrics.LightpushErrorTypesView, + metrics.DnsDiscoveryNodesView, + metrics.DnsDiscoveryErrorTypesView, metrics.StoreMessagesView, + metrics.StoreErrorTypesView, + metrics.StoreQueriesView, metrics.PeersView, metrics.DialsView, metrics.VersionView, diff --git a/waku/v2/dnsdisc/enr.go b/waku/v2/dnsdisc/enr.go index b50fa8a5..0ef33f1a 100644 --- a/waku/v2/dnsdisc/enr.go +++ b/waku/v2/dnsdisc/enr.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/metrics" "github.com/waku-org/go-waku/waku/v2/utils" ma "github.com/multiformats/go-multiaddr" @@ -46,12 +47,14 @@ func RetrieveNodes(ctx context.Context, url string, opts ...DnsDiscoveryOption) tree, err := client.SyncTree(url) if err != nil { + metrics.RecordDnsDiscoveryError(ctx, "tree_sync_failure") return nil, err } for _, node := range tree.Nodes() { peerID, m, err := utils.Multiaddress(node) if err != nil { + metrics.RecordDnsDiscoveryError(ctx, "peer_info_failure") return nil, err } diff --git a/waku/v2/metrics/metrics.go b/waku/v2/metrics/metrics.go index 72cafd32..35bed0b7 100644 --- a/waku/v2/metrics/metrics.go +++ b/waku/v2/metrics/metrics.go @@ -3,6 +3,7 @@ package metrics import ( "context" "fmt" + "time" "github.com/waku-org/go-waku/waku/v2/utils" "go.opencensus.io/stats" @@ -12,16 +13,34 @@ import ( ) var ( - WakuVersion = stats.Int64("waku_version", "", stats.UnitDimensionless) - Messages = stats.Int64("node_messages", "Number of messages received", stats.UnitDimensionless) - Peers = stats.Int64("peers", "Number of connected peers", stats.UnitDimensionless) - Dials = stats.Int64("dials", "Number of peer dials", stats.UnitDimensionless) - StoreMessages = stats.Int64("store_messages", "Number of historical messages", stats.UnitDimensionless) - FilterSubscriptions = stats.Int64("filter_subscriptions", "Number of filter subscriptions", stats.UnitDimensionless) - StoreErrors = stats.Int64("errors", "Number of errors in store protocol", stats.UnitDimensionless) - StoreQueries = stats.Int64("store_queries", "Number of store queries", stats.UnitDimensionless) - LightpushErrors = stats.Int64("errors", "Number of errors in lightpush protocol", stats.UnitDimensionless) - PeerExchangeError = stats.Int64("errors", "Number of errors in peer exchange protocol", stats.UnitDimensionless) + WakuVersion = stats.Int64("waku_version", "", stats.UnitDimensionless) + Messages = stats.Int64("node_messages", "Number of messages received", stats.UnitDimensionless) + Peers = stats.Int64("peers", "Number of connected peers", stats.UnitDimensionless) + Dials = stats.Int64("dials", "Number of peer dials", stats.UnitDimensionless) + + LegacyFilterMessages = stats.Int64("legacy_filter_messages", "Number of legacy filter messages", stats.UnitDimensionless) + LegacyFilterSubscribers = stats.Int64("legacy_filter_subscribers", "Number of legacy filter subscribers", stats.UnitDimensionless) + LegacyFilterSubscriptions = stats.Int64("legacy_filter_subscriptions", "Number of legacy filter subscriptions", stats.UnitDimensionless) + LegacyFilterErrors = stats.Int64("legacy_filter_errors", "Number of errors in legacy filter protocol", stats.UnitDimensionless) + + FilterMessages = stats.Int64("filter_messages", "Number of filter messages", stats.UnitDimensionless) + FilterRequests = stats.Int64("filter_requests", "Number of filter requests", stats.UnitDimensionless) + FilterSubscriptions = stats.Int64("filter_subscriptions", "Number of filter subscriptions", stats.UnitDimensionless) + FilterErrors = stats.Int64("filter_errors", "Number of errors in filter protocol", stats.UnitDimensionless) + FilterRequestDurationSeconds = stats.Int64("filter_request_duration_seconds", "Duration of Filter Subscribe Requests", stats.UnitSeconds) + FilterHandleMessageDurationSeconds = stats.Int64("filter_handle_msessageduration_seconds", "Duration to Push Message to Filter Subscribers", stats.UnitSeconds) + + StoredMessages = stats.Int64("store_messages", "Number of historical messages", stats.UnitDimensionless) + StoreErrors = stats.Int64("errors", "Number of errors in store protocol", stats.UnitDimensionless) + StoreQueries = stats.Int64("store_queries", "Number of store queries", stats.UnitDimensionless) + + LightpushMessages = stats.Int64("lightpush_messages", "Number of messages sent via lightpush protocol", stats.UnitDimensionless) + LightpushErrors = stats.Int64("errors", "Number of errors in lightpush protocol", stats.UnitDimensionless) + + PeerExchangeError = stats.Int64("errors", "Number of errors in peer exchange protocol", stats.UnitDimensionless) + + DnsDiscoveryNodes = stats.Int64("dnsdisc_nodes", "Number of discovered nodes", stats.UnitDimensionless) + DnsDiscoveryErrors = stats.Int64("errors", "Number of errors in dns discovery", stats.UnitDimensionless) ) var ( @@ -49,6 +68,7 @@ var ( Description: "The number of the messages received", Aggregation: view.Count(), } + StoreQueriesView = &view.View{ Name: "gowaku_store_queries", Measure: StoreQueries, @@ -57,17 +77,11 @@ var ( } StoreMessagesView = &view.View{ Name: "gowaku_store_messages", - Measure: StoreMessages, + Measure: StoredMessages, Description: "The distribution of the store protocol messages", Aggregation: view.LastValue(), TagKeys: []tag.Key{KeyType}, } - FilterSubscriptionsView = &view.View{ - Name: "gowaku_filter_subscriptions", - Measure: FilterSubscriptions, - Description: "The number of content filter subscriptions", - Aggregation: view.LastValue(), - } StoreErrorTypesView = &view.View{ Name: "gowaku_store_errors", Measure: StoreErrors, @@ -75,6 +89,82 @@ var ( Aggregation: view.Count(), TagKeys: []tag.Key{ErrorType}, } + + LegacyFilterSubscriptionsView = &view.View{ + Name: "gowaku_legacy_filter_subscriptions", + Measure: LegacyFilterSubscriptions, + Description: "The number of legacy filter subscriptions", + Aggregation: view.Count(), + } + LegacyFilterSubscribersView = &view.View{ + Name: "gowaku_legacy_filter_subscribers", + Measure: LegacyFilterSubscribers, + Description: "The number of legacy filter subscribers", + Aggregation: view.LastValue(), + } + LegacyFilterMessagesView = &view.View{ + Name: "gowaku_legacy_filter_messages", + Measure: LegacyFilterMessages, + Description: "The distribution of the legacy filter protocol messages received", + Aggregation: view.Count(), + TagKeys: []tag.Key{KeyType}, + } + LegacyFilterErrorTypesView = &view.View{ + Name: "gowaku_legacy_filter_errors", + Measure: LegacyFilterErrors, + Description: "The distribution of the legacy filter protocol errors", + Aggregation: view.Count(), + TagKeys: []tag.Key{ErrorType}, + } + + FilterSubscriptionsView = &view.View{ + Name: "gowaku_filter_subscriptions", + Measure: FilterSubscriptions, + Description: "The number of filter subscriptions", + Aggregation: view.Count(), + } + FilterRequestsView = &view.View{ + Name: "gowaku_filter_requests", + Measure: FilterRequests, + Description: "The number of filter requests", + Aggregation: view.Count(), + } + FilterMessagesView = &view.View{ + Name: "gowaku_filter_messages", + Measure: FilterMessages, + Description: "The distribution of the filter protocol messages received", + Aggregation: view.Count(), + TagKeys: []tag.Key{KeyType}, + } + FilterErrorTypesView = &view.View{ + Name: "gowaku_filter_errors", + Measure: FilterErrors, + Description: "The distribution of the filter protocol errors", + Aggregation: view.Count(), + TagKeys: []tag.Key{ErrorType}, + } + FilterRequestDurationView = &view.View{ + Name: "gowaku_filter_request_duration_seconds", + Measure: FilterRequestDurationSeconds, + Description: "Duration of Filter Subscribe Requests", + Aggregation: view.Count(), + TagKeys: []tag.Key{ErrorType}, + } + FilterHandleMessageDurationView = &view.View{ + Name: "gowaku_filter_handle_msessageduration_seconds", + Measure: FilterHandleMessageDurationSeconds, + Description: "Duration to Push Message to Filter Subscribers", + Aggregation: view.Count(), + TagKeys: []tag.Key{ErrorType}, + } + + LightpushMessagesView = &view.View{ + Name: "gowaku_lightpush_messages", + Measure: StoredMessages, + Description: "The distribution of the lightpush protocol messages", + Aggregation: view.LastValue(), + TagKeys: []tag.Key{KeyType}, + } LightpushErrorTypesView = &view.View{ Name: "gowaku_lightpush_errors", Measure: LightpushErrors, @@ -89,6 +179,19 @@ var ( Aggregation: view.LastValue(), TagKeys: []tag.Key{GitVersion}, } + DnsDiscoveryNodesView = &view.View{ + Name: "gowaku_dnsdisc_discovered", + Measure: DnsDiscoveryNodes, + Description: "The number of nodes discovered via DNS discovery", + Aggregation: view.Count(), + } + DnsDiscoveryErrorTypesView = &view.View{ + Name: "gowaku_dnsdisc_errors", + Measure: DnsDiscoveryErrors, + Description: "The distribution of the dns discovery protocol errors", + Aggregation: view.Count(), + TagKeys: []tag.Key{ErrorType}, + } ) func recordWithTags(ctx context.Context, tagKey tag.Key, tagType string, ms stats.Measurement) { @@ -97,16 +200,53 @@ func recordWithTags(ctx context.Context, tagKey tag.Key, tagType string, ms stat } } +func RecordLightpushMessage(ctx context.Context, tagType string) { + if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(KeyType, tagType)}, LightpushMessages.M(1)); err != nil { + utils.Logger().Error("failed to record with tags", zap.Error(err)) + } +} + func RecordLightpushError(ctx context.Context, tagType string) { recordWithTags(ctx, ErrorType, tagType, LightpushErrors.M(1)) } +func RecordLegacyFilterError(ctx context.Context, tagType string) { + recordWithTags(ctx, ErrorType, tagType, LegacyFilterErrors.M(1)) +} + +func RecordFilterError(ctx context.Context, tagType string) { + recordWithTags(ctx, ErrorType, tagType, FilterErrors.M(1)) +} + +func RecordFilterRequest(ctx context.Context, tagType string, duration time.Duration) { + if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(KeyType, tagType)}, FilterRequests.M(1)); err != nil { + utils.Logger().Error("failed to record with tags", zap.Error(err)) + } + FilterRequestDurationSeconds.M(int64(duration.Seconds())) +} + +func RecordFilterMessage(ctx context.Context, tagType string, len int) { + if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(KeyType, tagType)}, FilterMessages.M(int64(len))); err != nil { + utils.Logger().Error("failed to record with tags", zap.Error(err)) + } +} + +func RecordLegacyFilterMessage(ctx context.Context, tagType string, len int) { + if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(KeyType, tagType)}, LegacyFilterMessages.M(int64(len))); err != nil { + utils.Logger().Error("failed to record with tags", zap.Error(err)) + } +} + func RecordPeerExchangeError(ctx context.Context, tagType string) { recordWithTags(ctx, ErrorType, tagType, PeerExchangeError.M(1)) } -func RecordMessage(ctx context.Context, tagType string, len int) { - if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(KeyType, tagType)}, StoreMessages.M(int64(len))); err != nil { +func RecordDnsDiscoveryError(ctx context.Context, tagType string) { + recordWithTags(ctx, ErrorType, tagType, DnsDiscoveryErrors.M(1)) +} + +func RecordStoreMessage(ctx context.Context, tagType string, len int) { + if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(KeyType, tagType)}, StoredMessages.M(int64(len))); err != nil { utils.Logger().Error("failed to record with tags", zap.Error(err)) } } diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 9baacfaa..eaddf565 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -119,9 +119,12 @@ func (wf *WakuFilterLightnode) onRequest(ctx context.Context) func(s network.Str err := reader.ReadMsg(messagePush) if err != nil { logger.Error("reading message push", zap.Error(err)) + metrics.RecordFilterError(ctx, "decode_rpc_failure") return } + metrics.RecordFilterMessage(ctx, "PushMessage", 1) + wf.notify(s.Conn().RemotePeer(), messagePush.PubsubTopic, messagePush.WakuMessage) logger.Info("received message push") @@ -141,12 +144,14 @@ func (wf *WakuFilterLightnode) notify(remotePeerID peer.ID, pubsubTopic string, func (wf *WakuFilterLightnode) request(ctx context.Context, params *FilterSubscribeParameters, reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter ContentFilter) error { err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(params.selectedPeer)) if err != nil { + metrics.RecordFilterError(ctx, "dial_failure") return err } var conn network.Stream conn, err = wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1) if err != nil { + metrics.RecordFilterError(ctx, "dial_failure") return err } defer conn.Close() @@ -164,6 +169,7 @@ func (wf *WakuFilterLightnode) request(ctx context.Context, params *FilterSubscr wf.log.Debug("sending FilterSubscribeRequest", zap.Stringer("request", request)) err = writer.WriteMsg(request) if err != nil { + metrics.RecordFilterError(ctx, "write_request_failure") wf.log.Error("sending FilterSubscribeRequest", zap.Error(err)) return err } @@ -172,10 +178,19 @@ func (wf *WakuFilterLightnode) request(ctx context.Context, params *FilterSubscr err = reader.ReadMsg(filterSubscribeResponse) if err != nil { wf.log.Error("receiving FilterSubscribeResponse", zap.Error(err)) + metrics.RecordFilterError(ctx, "decode_rpc_failure") return err } + if filterSubscribeResponse.RequestId != request.RequestId { + wf.log.Error("requestId mismatch", zap.String("expected", request.RequestId), zap.String("received", filterSubscribeResponse.RequestId)) + metrics.RecordFilterError(ctx, "request_id_mismatch") + err := NewFilterError(300, "request_id_mismatch") + return &err + } + if filterSubscribeResponse.StatusCode != http.StatusOK { + metrics.RecordFilterError(ctx, "error_response") err := NewFilterError(int(filterSubscribeResponse.StatusCode), filterSubscribeResponse.StatusDesc) return &err } @@ -208,6 +223,7 @@ func (wf *WakuFilterLightnode) Subscribe(ctx context.Context, contentFilter Cont } if params.selectedPeer == "" { + metrics.RecordFilterError(ctx, "peer_not_found_failure") return nil, ErrNoPeersAvailable } diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index 0a1a0766..126dd159 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -7,6 +7,7 @@ import ( "math" "net/http" "sync" + "time" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -19,6 +20,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter/pb" "github.com/waku-org/go-waku/waku/v2/timesource" + "go.opencensus.io/stats" "go.opencensus.io/tag" "go.uber.org/zap" ) @@ -101,28 +103,33 @@ func (wf *WakuFilterFullNode) onRequest(ctx context.Context) func(s network.Stre subscribeRequest := &pb.FilterSubscribeRequest{} err := reader.ReadMsg(subscribeRequest) if err != nil { + metrics.RecordFilterError(ctx, "decode_rpc_failure") logger.Error("reading request", zap.Error(err)) return } logger = logger.With(zap.String("requestID", subscribeRequest.RequestId)) + start := time.Now() + switch subscribeRequest.FilterSubscribeType { case pb.FilterSubscribeRequest_SUBSCRIBE: - wf.subscribe(s, logger, subscribeRequest) + wf.subscribe(ctx, s, logger, subscribeRequest) case pb.FilterSubscribeRequest_SUBSCRIBER_PING: - wf.ping(s, logger, subscribeRequest) + wf.ping(ctx, s, logger, subscribeRequest) case pb.FilterSubscribeRequest_UNSUBSCRIBE: - wf.unsubscribe(s, logger, subscribeRequest) + wf.unsubscribe(ctx, s, logger, subscribeRequest) case pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL: - wf.unsubscribeAll(s, logger, subscribeRequest) + wf.unsubscribeAll(ctx, s, logger, subscribeRequest) } + metrics.RecordFilterRequest(ctx, subscribeRequest.FilterSubscribeType.String(), time.Since(start)) + logger.Info("received request") } } -func reply(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest, statusCode int, description ...string) { +func reply(ctx context.Context, s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest, statusCode int, description ...string) { response := &pb.FilterSubscribeResponse{ RequestId: request.RequestId, StatusCode: uint32(statusCode), @@ -137,37 +144,38 @@ func reply(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequ writer := pbio.NewDelimitedWriter(s) err := writer.WriteMsg(response) if err != nil { + metrics.RecordFilterError(ctx, "write_response_failure") logger.Error("sending response", zap.Error(err)) } } -func (wf *WakuFilterFullNode) ping(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { +func (wf *WakuFilterFullNode) ping(ctx context.Context, s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { exists := wf.subscriptions.Has(s.Conn().RemotePeer()) if exists { - reply(s, logger, request, http.StatusOK) + reply(ctx, s, logger, request, http.StatusOK) } else { - reply(s, logger, request, http.StatusNotFound, peerHasNoSubscription) + reply(ctx, s, logger, request, http.StatusNotFound, peerHasNoSubscription) } } -func (wf *WakuFilterFullNode) subscribe(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { +func (wf *WakuFilterFullNode) subscribe(ctx context.Context, s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { if request.PubsubTopic == "" { - reply(s, logger, request, http.StatusBadRequest, "pubsubtopic can't be empty") + reply(ctx, s, logger, request, http.StatusBadRequest, "pubsubtopic can't be empty") return } if len(request.ContentTopics) == 0 { - reply(s, logger, request, http.StatusBadRequest, "at least one contenttopic should be specified") + reply(ctx, s, logger, request, http.StatusBadRequest, "at least one contenttopic should be specified") return } if len(request.ContentTopics) > MaxContentTopicsPerRequest { - reply(s, logger, request, http.StatusBadRequest, fmt.Sprintf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest)) + reply(ctx, s, logger, request, http.StatusBadRequest, fmt.Sprintf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest)) } if wf.subscriptions.Count() >= wf.maxSubscriptions { - reply(s, logger, request, http.StatusServiceUnavailable, "node has reached maximum number of subscriptions") + reply(ctx, s, logger, request, http.StatusServiceUnavailable, "node has reached maximum number of subscriptions") return } @@ -180,45 +188,49 @@ func (wf *WakuFilterFullNode) subscribe(s network.Stream, logger *zap.Logger, re } if ctTotal+len(request.ContentTopics) > MaxCriteriaPerSubscription { - reply(s, logger, request, http.StatusServiceUnavailable, "peer has reached maximum number of filter criteria") + reply(ctx, s, logger, request, http.StatusServiceUnavailable, "peer has reached maximum number of filter criteria") return } } wf.subscriptions.Set(peerID, request.PubsubTopic, request.ContentTopics) - reply(s, logger, request, http.StatusOK) + stats.Record(ctx, metrics.FilterSubscriptions.M(int64(wf.subscriptions.Count()))) + + reply(ctx, s, logger, request, http.StatusOK) } -func (wf *WakuFilterFullNode) unsubscribe(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { +func (wf *WakuFilterFullNode) unsubscribe(ctx context.Context, s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { if request.PubsubTopic == "" { - reply(s, logger, request, http.StatusBadRequest, "pubsubtopic can't be empty") + reply(ctx, s, logger, request, http.StatusBadRequest, "pubsubtopic can't be empty") return } if len(request.ContentTopics) == 0 { - reply(s, logger, request, http.StatusBadRequest, "at least one contenttopic should be specified") + reply(ctx, s, logger, request, http.StatusBadRequest, "at least one contenttopic should be specified") return } if len(request.ContentTopics) > MaxContentTopicsPerRequest { - reply(s, logger, request, http.StatusBadRequest, fmt.Sprintf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest)) + reply(ctx, s, logger, request, http.StatusBadRequest, fmt.Sprintf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest)) } err := wf.subscriptions.Delete(s.Conn().RemotePeer(), request.PubsubTopic, request.ContentTopics) if err != nil { - reply(s, logger, request, http.StatusNotFound, peerHasNoSubscription) + reply(ctx, s, logger, request, http.StatusNotFound, peerHasNoSubscription) } else { - reply(s, logger, request, http.StatusOK) + stats.Record(ctx, metrics.FilterSubscriptions.M(int64(wf.subscriptions.Count()))) + reply(ctx, s, logger, request, http.StatusOK) } } -func (wf *WakuFilterFullNode) unsubscribeAll(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { +func (wf *WakuFilterFullNode) unsubscribeAll(ctx context.Context, s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { err := wf.subscriptions.DeleteAll(s.Conn().RemotePeer()) if err != nil { - reply(s, logger, request, http.StatusNotFound, peerHasNoSubscription) + reply(ctx, s, logger, request, http.StatusNotFound, peerHasNoSubscription) } else { - reply(s, logger, request, http.StatusOK) + stats.Record(ctx, metrics.FilterSubscriptions.M(int64(wf.subscriptions.Count()))) + reply(ctx, s, logger, request, http.StatusOK) } } @@ -242,10 +254,14 @@ func (wf *WakuFilterFullNode) filterListener(ctx context.Context) { wf.wg.Add(1) go func(subscriber peer.ID) { defer wf.wg.Done() + start := time.Now() err := wf.pushMessage(ctx, subscriber, envelope) if err != nil { logger.Error("pushing message", zap.Error(err)) + return } + ellapsed := time.Since(start) + metrics.FilterHandleMessageDurationSeconds.M(int64(ellapsed.Seconds())) }(subscriber) } @@ -274,6 +290,11 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, peerID peer.ID, e err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(peerID)) if err != nil { wf.subscriptions.FlagAsFailure(peerID) + if errors.Is(context.DeadlineExceeded, err) { + metrics.RecordFilterError(ctx, "push_timeout_failure") + } else { + metrics.RecordFilterError(ctx, "dial_failure") + } logger.Error("connecting to peer", zap.Error(err)) return err } @@ -281,9 +302,12 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, peerID peer.ID, e conn, err := wf.h.NewStream(ctx, peerID, FilterPushID_v20beta1) if err != nil { wf.subscriptions.FlagAsFailure(peerID) - + if errors.Is(context.DeadlineExceeded, err) { + metrics.RecordFilterError(ctx, "push_timeout_failure") + } else { + metrics.RecordFilterError(ctx, "dial_failure") + } logger.Error("opening peer stream", zap.Error(err)) - //waku_filter_errors.inc(labelValues = [dialFailure]) return err } @@ -291,6 +315,11 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, peerID peer.ID, e writer := pbio.NewDelimitedWriter(conn) err = writer.WriteMsg(messagePush) if err != nil { + if errors.Is(context.DeadlineExceeded, err) { + metrics.RecordFilterError(ctx, "push_timeout_failure") + } else { + metrics.RecordFilterError(ctx, "response_write_failure") + } logger.Error("pushing messages to peer", zap.Error(err)) wf.subscriptions.FlagAsFailure(peerID) return nil diff --git a/waku/v2/protocol/legacy_filter/waku_filter.go b/waku/v2/protocol/legacy_filter/waku_filter.go index 820c96bc..67c707aa 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter.go +++ b/waku/v2/protocol/legacy_filter/waku_filter.go @@ -125,6 +125,7 @@ func (wf *WakuFilter) onRequest(ctx context.Context) func(s network.Stream) { err := reader.ReadMsg(filterRPCRequest) if err != nil { + metrics.RecordLegacyFilterError(ctx, "decode_rpc_failure") logger.Error("reading request", zap.Error(err)) return } @@ -139,7 +140,7 @@ func (wf *WakuFilter) onRequest(ctx context.Context) func(s network.Stream) { } logger.Info("received a message push", zap.Int("messages", len(filterRPCRequest.Push.Messages))) - stats.Record(ctx, metrics.Messages.M(int64(len(filterRPCRequest.Push.Messages)))) + metrics.RecordLegacyFilterMessage(ctx, "FilterRequest", len(filterRPCRequest.Push.Messages)) } else if filterRPCRequest.Request != nil && wf.isFullNode { // We're on a full node. // This is a filter request coming from a light node. @@ -152,13 +153,13 @@ func (wf *WakuFilter) onRequest(ctx context.Context) func(s network.Stream) { len := wf.subscribers.Append(subscriber) logger.Info("adding subscriber") - stats.Record(ctx, metrics.FilterSubscriptions.M(int64(len))) + stats.Record(ctx, metrics.LegacyFilterSubscribers.M(int64(len))) } else { peerId := s.Conn().RemotePeer() wf.subscribers.RemoveContentFilters(peerId, filterRPCRequest.RequestId, filterRPCRequest.Request.ContentFilters) logger.Info("removing subscriber") - stats.Record(ctx, metrics.FilterSubscriptions.M(int64(wf.subscribers.Length()))) + stats.Record(ctx, metrics.LegacyFilterSubscribers.M(int64(wf.subscribers.Length()))) } } else { logger.Error("can't serve request") @@ -176,15 +177,15 @@ func (wf *WakuFilter) pushMessage(ctx context.Context, subscriber Subscriber, ms if err != nil { wf.subscribers.FlagAsFailure(subscriber.peer) logger.Error("connecting to peer", zap.Error(err)) + metrics.RecordLegacyFilterError(ctx, "dial_failure") return err } conn, err := wf.h.NewStream(ctx, subscriber.peer, FilterID_v20beta1) if err != nil { wf.subscribers.FlagAsFailure(subscriber.peer) - logger.Error("opening peer stream", zap.Error(err)) - //waku_filter_errors.inc(labelValues = [dialFailure]) + metrics.RecordLegacyFilterError(ctx, "dial_failure") return err } @@ -194,6 +195,7 @@ func (wf *WakuFilter) pushMessage(ctx context.Context, subscriber Subscriber, ms if err != nil { logger.Error("pushing messages to peer", zap.Error(err)) wf.subscribers.FlagAsFailure(subscriber.peer) + metrics.RecordLegacyFilterError(ctx, "push_write_error") return nil } @@ -259,6 +261,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil } if params.selectedPeer == "" { + metrics.RecordLegacyFilterError(ctx, "peer_not_found_failure") return nil, ErrNoPeersAvailable } @@ -270,6 +273,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil // We connect first so dns4 addresses are resolved (NewStream does not do it) err = wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(params.selectedPeer)) if err != nil { + metrics.RecordLegacyFilterError(ctx, "dial_failure") return } @@ -282,6 +286,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil var conn network.Stream conn, err = wf.h.NewStream(ctx, params.selectedPeer, FilterID_v20beta1) if err != nil { + metrics.RecordLegacyFilterError(ctx, "dial_failure") return } @@ -295,6 +300,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil wf.log.Debug("sending filterRPC", zap.Stringer("rpc", filterRPC)) err = writer.WriteMsg(filterRPC) if err != nil { + metrics.RecordLegacyFilterError(ctx, "request_write_error") wf.log.Error("sending filterRPC", zap.Error(err)) return } @@ -311,11 +317,13 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, contentFilter ContentFilt // We connect first so dns4 addresses are resolved (NewStream does not do it) err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(peer)) if err != nil { + metrics.RecordLegacyFilterError(ctx, "dial_failure") return err } conn, err := wf.h.NewStream(ctx, peer, FilterID_v20beta1) if err != nil { + metrics.RecordLegacyFilterError(ctx, "dial_failure") return err } @@ -339,6 +347,7 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, contentFilter ContentFilt filterRPC := &pb.FilterRPC{RequestId: hex.EncodeToString(id), Request: request} err = writer.WriteMsg(filterRPC) if err != nil { + metrics.RecordLegacyFilterError(ctx, "request_write_error") return err } diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 179f4191..385e34e1 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -81,7 +81,7 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(s network.Strea err := reader.ReadMsg(requestPushRPC) if err != nil { logger.Error("reading request", zap.Error(err)) - metrics.RecordLightpushError(ctx, "decodeRpcFailure") + metrics.RecordLightpushError(ctx, "decode_rpc_failure") return } @@ -93,6 +93,8 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(s network.Strea pubSubTopic := requestPushRPC.Query.PubsubTopic message := requestPushRPC.Query.Message + metrics.RecordLightpushMessage(ctx, "PushRequest") + // TODO: Assumes success, should probably be extended to check for network, peers, etc // It might make sense to use WithReadiness option here? @@ -100,6 +102,7 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(s network.Strea if err != nil { logger.Error("publishing message", zap.Error(err)) + metrics.RecordLightpushError(ctx, "message_push_failure") response.Info = "Could not publish message" } else { response.IsSuccess = true @@ -112,11 +115,14 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(s network.Strea err = writer.WriteMsg(responsePushRPC) if err != nil { + metrics.RecordLightpushError(ctx, "response_write_failure") logger.Error("writing response", zap.Error(err)) _ = s.Reset() } else { logger.Info("response sent") } + } else { + metrics.RecordLightpushError(ctx, "empty_request_body_failure") } if requestPushRPC.Response != nil { @@ -125,6 +131,8 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(s network.Strea } else { logger.Info("request failure", zap.String("info=", requestPushRPC.Response.Info)) } + } else { + metrics.RecordLightpushError(ctx, "empty_response_body_failure") } } } @@ -140,7 +148,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o } if params.selectedPeer == "" { - metrics.RecordLightpushError(ctx, "dialError") + metrics.RecordLightpushError(ctx, "peer_not_found_failure") return nil, ErrNoPeersAvailable } @@ -152,6 +160,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o // We connect first so dns4 addresses are resolved (NewStream does not do it) err := wakuLP.h.Connect(ctx, wakuLP.h.Peerstore().PeerInfo(params.selectedPeer)) if err != nil { + metrics.RecordLightpushError(ctx, "dial_failure") logger.Error("connecting peer", zap.Error(err)) return nil, err } @@ -159,7 +168,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1) if err != nil { logger.Error("creating stream to peer", zap.Error(err)) - metrics.RecordLightpushError(ctx, "dialError") + metrics.RecordLightpushError(ctx, "dial_failure") return nil, err } @@ -167,7 +176,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o defer func() { err := connOpt.Reset() if err != nil { - metrics.RecordLightpushError(ctx, "dialError") + metrics.RecordLightpushError(ctx, "dial_failure") logger.Error("resetting connection", zap.Error(err)) } }() @@ -179,6 +188,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o err = writer.WriteMsg(pushRequestRPC) if err != nil { + metrics.RecordLightpushError(ctx, "request_write_failure") logger.Error("writing request", zap.Error(err)) return nil, err } @@ -187,7 +197,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o err = reader.ReadMsg(pushResponseRPC) if err != nil { logger.Error("reading response", zap.Error(err)) - metrics.RecordLightpushError(ctx, "decodeRPCFailure") + metrics.RecordLightpushError(ctx, "decode_rpc_failure") return nil, err } diff --git a/waku/v2/protocol/store/waku_store_client.go b/waku/v2/protocol/store/waku_store_client.go index 0962b73a..180391e5 100644 --- a/waku/v2/protocol/store/waku_store_client.go +++ b/waku/v2/protocol/store/waku_store_client.go @@ -180,12 +180,14 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec err := store.h.Connect(ctx, store.h.Peerstore().PeerInfo(selectedPeer)) if err != nil { logger.Error("connecting to peer", zap.Error(err)) + metrics.RecordStoreError(store.ctx, "dial_failure") return nil, err } connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4) if err != nil { logger.Error("creating stream to peer", zap.Error(err)) + metrics.RecordStoreError(store.ctx, "dial_failure") return nil, err } @@ -202,6 +204,7 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec err = writer.WriteMsg(historyRequest) if err != nil { logger.Error("writing request", zap.Error(err)) + metrics.RecordStoreError(store.ctx, "write_request_failure") return nil, err } @@ -209,7 +212,7 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec err = reader.ReadMsg(historyResponseRPC) if err != nil { logger.Error("reading response", zap.Error(err)) - metrics.RecordStoreError(store.ctx, "decodeRPCFailure") + metrics.RecordStoreError(store.ctx, "decode_rpc_failure") return nil, err } @@ -220,7 +223,7 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec }, nil } - metrics.RecordMessage(ctx, "retrieved", len(historyResponseRPC.Response.Messages)) + metrics.RecordStoreMessage(ctx, "retrieved", len(historyResponseRPC.Response.Messages)) return historyResponseRPC.Response, nil } @@ -275,6 +278,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR } if !params.localQuery && params.selectedPeer == "" { + metrics.RecordStoreError(ctx, "peer_not_found_failure") return nil, ErrNoPeersAvailable } diff --git a/waku/v2/protocol/store/waku_store_protocol.go b/waku/v2/protocol/store/waku_store_protocol.go index 4ec2e4ed..1859fd1f 100644 --- a/waku/v2/protocol/store/waku_store_protocol.go +++ b/waku/v2/protocol/store/waku_store_protocol.go @@ -120,7 +120,7 @@ func (store *WakuStore) Start(ctx context.Context) error { err := store.msgProvider.Start(ctx, store.timesource) // TODO: store protocol should not start a message provider if err != nil { store.log.Error("Error starting message provider", zap.Error(err)) - return nil + return err } store.started = true @@ -179,7 +179,7 @@ func (store *WakuStore) updateMetrics(ctx context.Context) { if err != nil { store.log.Error("updating store metrics", zap.Error(err)) } else { - metrics.RecordMessage(store.ctx, "stored", msgCount) + metrics.RecordStoreMessage(store.ctx, "stored", msgCount) } case <-ctx.Done(): return @@ -198,7 +198,7 @@ func (store *WakuStore) onRequest(s network.Stream) { err := reader.ReadMsg(historyRPCRequest) if err != nil { logger.Error("reading request", zap.Error(err)) - metrics.RecordStoreError(store.ctx, "decodeRPCFailure") + metrics.RecordStoreError(store.ctx, "decode_rpc_failure") return } @@ -207,7 +207,7 @@ func (store *WakuStore) onRequest(s network.Stream) { logger = logger.With(logging.Filters(query.GetContentFilters())) } else { logger.Error("reading request", zap.Error(err)) - metrics.RecordStoreError(store.ctx, "emptyRpcQueryFailure") + metrics.RecordStoreError(store.ctx, "empty_rpc_query_failure") return } @@ -222,6 +222,7 @@ func (store *WakuStore) onRequest(s network.Stream) { err = writer.WriteMsg(historyResponseRPC) if err != nil { logger.Error("writing response", zap.Error(err), logging.PagingInfo(historyResponseRPC.Response.PagingInfo)) + metrics.RecordStoreError(store.ctx, "response_write_failure") _ = s.Reset() } else { logger.Info("response sent")