diff --git a/.changelog/10243.txt b/.changelog/10243.txt new file mode 100644 index 0000000000..de2692ca80 --- /dev/null +++ b/.changelog/10243.txt @@ -0,0 +1,3 @@ +```release-note:feature +xds: emit a labeled gauge of connected xDS streams by version +``` diff --git a/agent/setup.go b/agent/setup.go index 15b7df9051..55299c49a6 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -26,6 +26,7 @@ import ( "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/submatview" "github.com/hashicorp/consul/agent/token" + "github.com/hashicorp/consul/agent/xds" "github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/logging" @@ -195,6 +196,7 @@ func getPrometheusDefs(cfg lib.TelemetryConfig) ([]prometheus.GaugeDefinition, [ consul.RPCGauges, consul.SessionGauges, grpc.StatsGauges, + xds.StatsGauges, usagemetrics.Gauges, consul.ReplicationGauges, Gauges, diff --git a/agent/xds/delta.go b/agent/xds/delta.go index dad9b9c7b8..f5fac86720 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -31,6 +31,8 @@ type ADSDeltaStream = envoy_discovery_v3.AggregatedDiscoveryService_DeltaAggrega // DeltaAggregatedResources implements envoy_discovery_v3.AggregatedDiscoveryServiceServer func (s *Server) DeltaAggregatedResources(stream ADSDeltaStream) error { + defer s.activeStreams.Increment("v3")() + // a channel for receiving incoming requests reqCh := make(chan *envoy_discovery_v3.DeltaDiscoveryRequest) reqStop := int32(0) diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index ae971621c0..56c43b60c0 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -59,6 +59,8 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { // Check no response sent yet assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + requireProtocolVersionGauge(t, scenario, "v3", 1) + // Deliver a new snapshot (tcp with one tcp upstream) mgr.DeliverConfig(t, sid, snap) diff --git a/agent/xds/server.go b/agent/xds/server.go index 43731b36ea..259b014370 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -11,6 +11,8 @@ import ( envoy_discovery_v2 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/armon/go-metrics" + "github.com/armon/go-metrics/prometheus" "github.com/hashicorp/go-hclog" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -25,6 +27,13 @@ import ( "github.com/hashicorp/consul/tlsutil" ) +var StatsGauges = []prometheus.GaugeDefinition{ + { + Name: []string{"xds", "server", "streams"}, + Help: "Measures the number of active xDS streams handled by the server split by protocol version.", + }, +} + // ADSStream is a shorter way of referring to this thing... type ADSStream = envoy_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesServer type ADSStream_v2 = envoy_discovery_v2.AggregatedDiscoveryService_StreamAggregatedResourcesServer @@ -141,6 +150,36 @@ type Server struct { AuthCheckFrequency time.Duration DisableV2Protocol bool + + activeStreams activeStreamCounters +} + +// activeStreamCounters simply encapsulates two counters accessed atomically to +// ensure alignment is correct. +type activeStreamCounters struct { + xDSv3 uint64 + xDSv2 uint64 +} + +func (c *activeStreamCounters) Increment(xdsVersion string) func() { + var counter *uint64 + switch xdsVersion { + case "v3": + counter = &c.xDSv3 + case "v2": + counter = &c.xDSv2 + default: + return func() {} + } + + labels := []metrics.Label{{Name: "version", Value: xdsVersion}} + + count := atomic.AddUint64(counter, 1) + metrics.SetGaugeWithLabels([]string{"xds", "server", "streams"}, float32(count), labels) + return func() { + count := atomic.AddUint64(counter, ^uint64(0)) + metrics.SetGaugeWithLabels([]string{"xds", "server", "streams"}, float32(count), labels) + } } func NewServer( @@ -171,6 +210,8 @@ func (s *Server) StreamAggregatedResources(stream ADSStream) error { // Deprecated: remove when xDS v2 is no longer supported func (s *Server) streamAggregatedResources(stream ADSStream) error { + defer s.activeStreams.Increment("v2")() + // Note: despite dealing entirely in v3 protobufs, this function is // exclusively used from the xDS v2 shim RPC handler, so the logging below // will refer to it as "v2". diff --git a/agent/xds/server_test.go b/agent/xds/server_test.go index 7a26635d3c..b956da0006 100644 --- a/agent/xds/server_test.go +++ b/agent/xds/server_test.go @@ -43,6 +43,8 @@ func TestServer_StreamAggregatedResources_v2_BasicProtocol_TCP(t *testing.T) { // Check no response sent yet assertChanBlocked(t, envoy.stream.sendCh) + requireProtocolVersionGauge(t, scenario, "v2", 1) + // Deliver a new snapshot snap := newTestSnapshot(t, nil, "") mgr.DeliverConfig(t, sid, snap) diff --git a/agent/xds/xds_protocol_helpers_test.go b/agent/xds/xds_protocol_helpers_test.go index 2959c6d4f8..ed8608e01b 100644 --- a/agent/xds/xds_protocol_helpers_test.go +++ b/agent/xds/xds_protocol_helpers_test.go @@ -19,6 +19,7 @@ import ( envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" envoy_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3" + "github.com/armon/go-metrics" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/wrappers" @@ -118,6 +119,7 @@ type testServerScenario struct { server *Server mgr *testManager envoy *TestEnvoy + sink *metrics.InmemSink errCh <-chan error } @@ -155,6 +157,17 @@ func newTestServerScenarioInner( envoy.Close() }) + sink := metrics.NewInmemSink(1*time.Minute, 1*time.Minute) + cfg := metrics.DefaultConfig("consul.xds.test") + cfg.EnableHostname = false + cfg.EnableRuntimeMetrics = false + metrics.NewGlobal(cfg, sink) + + t.Cleanup(func() { + sink := &metrics.BlackholeSink{} + metrics.NewGlobal(cfg, sink) + }) + s := NewServer( testutil.Logger(t), mgr, @@ -178,6 +191,7 @@ func newTestServerScenarioInner( server: s, mgr: mgr, envoy: envoy, + sink: sink, errCh: errCh, } } @@ -647,3 +661,23 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) { t.FailNow() } } + +func requireProtocolVersionGauge( + t *testing.T, + scenario *testServerScenario, + xdsVersion string, + expected int, +) { + data := scenario.sink.Data() + require.Len(t, data, 1) + + item := data[0] + require.Len(t, item.Gauges, 1) + + val, ok := item.Gauges["consul.xds.test.xds.server.streams;version="+xdsVersion] + require.True(t, ok) + + require.Equal(t, "consul.xds.test.xds.server.streams", val.Name) + require.Equal(t, expected, int(val.Value)) + require.Equal(t, []metrics.Label{{Name: "version", Value: xdsVersion}}, val.Labels) +} diff --git a/website/content/docs/agent/telemetry.mdx b/website/content/docs/agent/telemetry.mdx index b0bde73431..90c3b9d2c8 100644 --- a/website/content/docs/agent/telemetry.mdx +++ b/website/content/docs/agent/telemetry.mdx @@ -417,7 +417,8 @@ These metrics are used to monitor the health of the Consul servers. | `consul.grpc.server.connection.count` | Counts the number of new gRPC connections received by the server. | connections | counter | | `consul.grpc.server.connections` | Measures the number of active gRPC connections open on the server. | connections | gauge | | `consul.grpc.server.stream.count` | Counts the number of new gRPC streams received by the server. | streams | counter | -| `consul.grpc.server.streams` | Measures the number of active gRPC streams handled by the server. | streams | guage | +| `consul.grpc.server.streams` | Measures the number of active gRPC streams handled by the server. | streams | gauge | +| `consul.xds.server.streams` | Measures the number of active xDS streams handled by the server split by protocol version. | streams | gauge | ## Cluster Health