From 3feae7f77b805653a8853dcfdbcfed20011d656e Mon Sep 17 00:00:00 2001 From: Chris Piraino Date: Wed, 2 Sep 2020 10:24:17 -0500 Subject: [PATCH] Report node/service usage metrics from every server Using the newly provided state store methods, we periodically emit usage metrics from the servers. We decided to emit these metrics from all servers, not just the leader, because that means we do not have to care about leader election flapping causing metrics turbulence, and it seems reasonable for each server to emit its own view of the state, even if they should always converge rapidly. --- agent/consul/config.go | 16 ++- agent/consul/server.go | 14 ++ agent/consul/usagemetrics/usagemetrics.go | 135 ++++++++++++++++++ agent/consul/usagemetrics/usagemetrics_oss.go | 7 + .../usagemetrics/usagemetrics_oss_test.go | 9 ++ .../consul/usagemetrics/usagemetrics_test.go | 128 +++++++++++++++++ logging/names.go | 1 + 7 files changed, 305 insertions(+), 5 deletions(-) create mode 100644 agent/consul/usagemetrics/usagemetrics.go create mode 100644 agent/consul/usagemetrics/usagemetrics_oss.go create mode 100644 agent/consul/usagemetrics/usagemetrics_oss_test.go create mode 100644 agent/consul/usagemetrics/usagemetrics_test.go diff --git a/agent/consul/config.go b/agent/consul/config.go index a48effe441..955fb49d66 100644 --- a/agent/consul/config.go +++ b/agent/consul/config.go @@ -443,6 +443,10 @@ type Config struct { // dead servers. AutopilotInterval time.Duration + // MetricsReportingInterval is the frequency with which the server will + // report usage metrics to the configured go-metrics Sinks. + MetricsReportingInterval time.Duration + // ConnectEnabled is whether to enable Connect features such as the CA. ConnectEnabled bool @@ -589,11 +593,13 @@ func DefaultConfig() *Config { }, }, - ServerHealthInterval: 2 * time.Second, - AutopilotInterval: 10 * time.Second, - DefaultQueryTime: 300 * time.Second, - MaxQueryTime: 600 * time.Second, - EnterpriseConfig: DefaultEnterpriseConfig(), + ServerHealthInterval: 2 * time.Second, + AutopilotInterval: 10 * time.Second, + MetricsReportingInterval: 10 * time.Second, + DefaultQueryTime: 300 * time.Second, + MaxQueryTime: 600 * time.Second, + + EnterpriseConfig: DefaultEnterpriseConfig(), } // Increase our reap interval to 3 days instead of 24h. diff --git a/agent/consul/server.go b/agent/consul/server.go index 04d3b61bd3..c1c1a6d76e 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -25,6 +25,7 @@ import ( "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/consul/fsm" "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/consul/usagemetrics" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" @@ -589,6 +590,19 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) { return nil, err } + reporter, err := usagemetrics.NewUsageMetricsReporter( + new(usagemetrics.Config). + WithStateProvider(s.fsm). + WithLogger(s.logger). + WithDatacenter(s.config.Datacenter). + WithReportingInterval(s.config.MetricsReportingInterval), + ) + if err != nil { + s.Shutdown() + return nil, fmt.Errorf("Failed to start usage metrics reporter: %v", err) + } + go reporter.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) + // Initialize Autopilot. This must happen before starting leadership monitoring // as establishing leadership could attempt to use autopilot and cause a panic. s.initAutopilot(config) diff --git a/agent/consul/usagemetrics/usagemetrics.go b/agent/consul/usagemetrics/usagemetrics.go new file mode 100644 index 0000000000..18b36cfd69 --- /dev/null +++ b/agent/consul/usagemetrics/usagemetrics.go @@ -0,0 +1,135 @@ +package usagemetrics + +import ( + "context" + "errors" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/logging" + "github.com/hashicorp/go-hclog" +) + +// Config holds the settings for various parameters for the +// UsageMetricsReporter +type Config struct { + logger hclog.Logger + metricLabels []metrics.Label + stateProvider StateProvider + tickerInterval time.Duration +} + +// WithDatacenter adds the datacenter as a label to all metrics emitted by the +// UsageMetricsReporter +func (c *Config) WithDatacenter(dc string) *Config { + c.metricLabels = append(c.metricLabels, metrics.Label{Name: "datacenter", Value: dc}) + return c +} + +// WithLogger takes a logger and creates a new, named sub-logger to use when +// running +func (c *Config) WithLogger(logger hclog.Logger) *Config { + c.logger = logger.Named(logging.UsageMetrics) + return c +} + +// WithReportingInterval specifies the interval on which UsageMetricsReporter +// should emit metrics +func (c *Config) WithReportingInterval(dur time.Duration) *Config { + c.tickerInterval = dur + return c +} + +func (c *Config) WithStateProvider(sp StateProvider) *Config { + c.stateProvider = sp + return c +} + +// StateProvider defines an inteface for retrieving a state.Store handle. In +// non-test code, this is satisfied by the fsm.FSM struct. +type StateProvider interface { + State() *state.Store +} + +// UsageMetricsReporter provides functionality for emitting usage metrics into +// the metrics stream. This makes it essentially a translation layer +// between the state store and metrics stream. +type UsageMetricsReporter struct { + logger hclog.Logger + metricLabels []metrics.Label + stateProvider StateProvider + tickerInterval time.Duration +} + +func NewUsageMetricsReporter(cfg *Config) (*UsageMetricsReporter, error) { + if cfg.stateProvider == nil { + return nil, errors.New("must provide a StateProvider to usage reporter") + } + + if cfg.logger == nil { + cfg.logger = hclog.NewNullLogger() + } + + if cfg.tickerInterval == 0 { + // Metrics are aggregated every 10 seconds, so we default to that. + cfg.tickerInterval = 10 * time.Second + } + + u := &UsageMetricsReporter{ + logger: cfg.logger, + stateProvider: cfg.stateProvider, + metricLabels: cfg.metricLabels, + tickerInterval: cfg.tickerInterval, + } + + return u, nil +} + +// Run must be run in a goroutine, and can be stopped by closing or sending +// data to the passed in shutdownCh +func (u *UsageMetricsReporter) Run(ctx context.Context) { + ticker := time.NewTicker(u.tickerInterval) + for { + select { + case <-ctx.Done(): + u.logger.Debug("usage metrics reporter shutting down") + ticker.Stop() + return + case <-ticker.C: + u.runOnce() + } + } +} + +func (u *UsageMetricsReporter) runOnce() { + state := u.stateProvider.State() + _, nodes, err := state.NodeCount() + if err != nil { + u.logger.Warn("failed to retrieve nodes from state store", "error", err) + } + metrics.SetGaugeWithLabels( + []string{"consul", "state", "nodes"}, + float32(nodes), + u.metricLabels, + ) + + _, serviceUsage, err := state.ServiceUsage() + if err != nil { + u.logger.Warn("failed to retrieve services from state store", "error", err) + } + + metrics.SetGaugeWithLabels( + []string{"consul", "state", "services"}, + float32(serviceUsage.Services), + u.metricLabels, + ) + + metrics.SetGaugeWithLabels( + []string{"consul", "state", "service_instances"}, + float32(serviceUsage.ServiceInstances), + u.metricLabels, + ) + + u.emitEnterpriseUsage(serviceUsage) +} diff --git a/agent/consul/usagemetrics/usagemetrics_oss.go b/agent/consul/usagemetrics/usagemetrics_oss.go new file mode 100644 index 0000000000..37d71b83f8 --- /dev/null +++ b/agent/consul/usagemetrics/usagemetrics_oss.go @@ -0,0 +1,7 @@ +// +build !consulent + +package usagemetrics + +import "github.com/hashicorp/consul/agent/consul/state" + +func (u *UsageMetricsReporter) emitEnterpriseUsage(state.ServiceUsage) {} diff --git a/agent/consul/usagemetrics/usagemetrics_oss_test.go b/agent/consul/usagemetrics/usagemetrics_oss_test.go new file mode 100644 index 0000000000..3d5263c0b2 --- /dev/null +++ b/agent/consul/usagemetrics/usagemetrics_oss_test.go @@ -0,0 +1,9 @@ +// +build !consulent + +package usagemetrics + +import "github.com/hashicorp/consul/agent/consul/state" + +func newStateStore() (*state.Store, error) { + return state.NewStateStore(nil) +} diff --git a/agent/consul/usagemetrics/usagemetrics_test.go b/agent/consul/usagemetrics/usagemetrics_test.go new file mode 100644 index 0000000000..c293cbb1de --- /dev/null +++ b/agent/consul/usagemetrics/usagemetrics_test.go @@ -0,0 +1,128 @@ +package usagemetrics + +import ( + "testing" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/sdk/testutil" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +type mockStateProvider struct { + mock.Mock +} + +func (m *mockStateProvider) State() *state.Store { + retValues := m.Called() + return retValues.Get(0).(*state.Store) +} + +func TestUsageReporter_Run(t *testing.T) { + type testCase struct { + modfiyStateStore func(t *testing.T, s *state.Store) + expectedGauges map[string]metrics.GaugeValue + } + cases := map[string]testCase{ + "empty-state": { + expectedGauges: map[string]metrics.GaugeValue{ + "consul.usage.test.consul.state.nodes;datacenter=dc1": { + Name: "consul.usage.test.consul.state.nodes", + Value: 0, + Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}}, + }, + "consul.usage.test.consul.state.services;datacenter=dc1": { + Name: "consul.usage.test.consul.state.services", + Value: 0, + Labels: []metrics.Label{ + {Name: "datacenter", Value: "dc1"}, + }, + }, + "consul.usage.test.consul.state.service_instances;datacenter=dc1": { + Name: "consul.usage.test.consul.state.service_instances", + Value: 0, + Labels: []metrics.Label{ + {Name: "datacenter", Value: "dc1"}, + }, + }, + }, + }, + "nodes-and-services": { + modfiyStateStore: func(t *testing.T, s *state.Store) { + require.Nil(t, s.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})) + require.Nil(t, s.EnsureNode(2, &structs.Node{Node: "bar", Address: "127.0.0.2"})) + require.Nil(t, s.EnsureNode(3, &structs.Node{Node: "baz", Address: "127.0.0.2"})) + + // Typical services and some consul services spread across two nodes + require.Nil(t, s.EnsureService(4, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: nil, Address: "", Port: 5000})) + require.Nil(t, s.EnsureService(5, "bar", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000})) + require.Nil(t, s.EnsureService(6, "foo", &structs.NodeService{ID: "consul", Service: "consul", Tags: nil})) + require.Nil(t, s.EnsureService(7, "bar", &structs.NodeService{ID: "consul", Service: "consul", Tags: nil})) + }, + expectedGauges: map[string]metrics.GaugeValue{ + "consul.usage.test.consul.state.nodes;datacenter=dc1": { + Name: "consul.usage.test.consul.state.nodes", + Value: 3, + Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}}, + }, + "consul.usage.test.consul.state.services;datacenter=dc1": { + Name: "consul.usage.test.consul.state.services", + Value: 3, + Labels: []metrics.Label{ + {Name: "datacenter", Value: "dc1"}, + }, + }, + "consul.usage.test.consul.state.service_instances;datacenter=dc1": { + Name: "consul.usage.test.consul.state.service_instances", + Value: 4, + Labels: []metrics.Label{ + {Name: "datacenter", Value: "dc1"}, + }, + }, + }, + }, + } + + for name, tcase := range cases { + t.Run(name, func(t *testing.T) { + // Only have a single interval for the test + sink := metrics.NewInmemSink(1*time.Minute, 1*time.Minute) + cfg := metrics.DefaultConfig("consul.usage.test") + cfg.EnableHostname = false + metrics.NewGlobal(cfg, sink) + + mockStateProvider := &mockStateProvider{} + s, err := newStateStore() + require.NoError(t, err) + if tcase.modfiyStateStore != nil { + tcase.modfiyStateStore(t, s) + } + mockStateProvider.On("State").Return(s) + + reporter, err := NewUsageMetricsReporter( + new(Config). + WithStateProvider(mockStateProvider). + WithLogger(testutil.Logger(t)). + WithDatacenter("dc1"), + ) + require.NoError(t, err) + + reporter.runOnce() + + intervals := sink.Data() + require.Len(t, intervals, 1) + intv := intervals[0] + + // Range over the expected values instead of just doing an Equal + // comparison on the maps because of different metrics emitted between + // OSS and Ent. The enterprise tests have a full equality comparison on + // the maps. + for key, expected := range tcase.expectedGauges { + require.Equal(t, expected, intv.Gauges[key]) + } + }) + } +} diff --git a/logging/names.go b/logging/names.go index 6ade11bf69..02c0fbf69f 100644 --- a/logging/names.go +++ b/logging/names.go @@ -51,6 +51,7 @@ const ( TerminatingGateway string = "terminating_gateway" TLSUtil string = "tlsutil" Transaction string = "txn" + UsageMetrics string = "usage_metrics" WAN string = "wan" Watch string = "watch" )