mirror of https://github.com/status-im/consul.git
Report node/service usage metrics from every server
Using the newly provided state store methods, we periodically emit usage metrics from the servers. We decided to emit these metrics from all servers, not just the leader, because that means we do not have to care about leader election flapping causing metrics turbulence, and it seems reasonable for each server to emit its own view of the state, even if they should always converge rapidly.
This commit is contained in:
parent
04705e90f9
commit
3feae7f77b
|
@ -443,6 +443,10 @@ type Config struct {
|
||||||
// dead servers.
|
// dead servers.
|
||||||
AutopilotInterval time.Duration
|
AutopilotInterval time.Duration
|
||||||
|
|
||||||
|
// MetricsReportingInterval is the frequency with which the server will
|
||||||
|
// report usage metrics to the configured go-metrics Sinks.
|
||||||
|
MetricsReportingInterval time.Duration
|
||||||
|
|
||||||
// ConnectEnabled is whether to enable Connect features such as the CA.
|
// ConnectEnabled is whether to enable Connect features such as the CA.
|
||||||
ConnectEnabled bool
|
ConnectEnabled bool
|
||||||
|
|
||||||
|
@ -591,8 +595,10 @@ func DefaultConfig() *Config {
|
||||||
|
|
||||||
ServerHealthInterval: 2 * time.Second,
|
ServerHealthInterval: 2 * time.Second,
|
||||||
AutopilotInterval: 10 * time.Second,
|
AutopilotInterval: 10 * time.Second,
|
||||||
|
MetricsReportingInterval: 10 * time.Second,
|
||||||
DefaultQueryTime: 300 * time.Second,
|
DefaultQueryTime: 300 * time.Second,
|
||||||
MaxQueryTime: 600 * time.Second,
|
MaxQueryTime: 600 * time.Second,
|
||||||
|
|
||||||
EnterpriseConfig: DefaultEnterpriseConfig(),
|
EnterpriseConfig: DefaultEnterpriseConfig(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,7 @@ import (
|
||||||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||||
"github.com/hashicorp/consul/agent/consul/fsm"
|
"github.com/hashicorp/consul/agent/consul/fsm"
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/usagemetrics"
|
||||||
"github.com/hashicorp/consul/agent/metadata"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/consul/agent/pool"
|
"github.com/hashicorp/consul/agent/pool"
|
||||||
"github.com/hashicorp/consul/agent/router"
|
"github.com/hashicorp/consul/agent/router"
|
||||||
|
@ -589,6 +590,19 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
reporter, err := usagemetrics.NewUsageMetricsReporter(
|
||||||
|
new(usagemetrics.Config).
|
||||||
|
WithStateProvider(s.fsm).
|
||||||
|
WithLogger(s.logger).
|
||||||
|
WithDatacenter(s.config.Datacenter).
|
||||||
|
WithReportingInterval(s.config.MetricsReportingInterval),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
s.Shutdown()
|
||||||
|
return nil, fmt.Errorf("Failed to start usage metrics reporter: %v", err)
|
||||||
|
}
|
||||||
|
go reporter.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
|
||||||
|
|
||||||
// Initialize Autopilot. This must happen before starting leadership monitoring
|
// Initialize Autopilot. This must happen before starting leadership monitoring
|
||||||
// as establishing leadership could attempt to use autopilot and cause a panic.
|
// as establishing leadership could attempt to use autopilot and cause a panic.
|
||||||
s.initAutopilot(config)
|
s.initAutopilot(config)
|
||||||
|
|
|
@ -0,0 +1,135 @@
|
||||||
|
package usagemetrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/armon/go-metrics"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
|
"github.com/hashicorp/consul/logging"
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Config holds the settings for various parameters for the
|
||||||
|
// UsageMetricsReporter
|
||||||
|
type Config struct {
|
||||||
|
logger hclog.Logger
|
||||||
|
metricLabels []metrics.Label
|
||||||
|
stateProvider StateProvider
|
||||||
|
tickerInterval time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithDatacenter adds the datacenter as a label to all metrics emitted by the
|
||||||
|
// UsageMetricsReporter
|
||||||
|
func (c *Config) WithDatacenter(dc string) *Config {
|
||||||
|
c.metricLabels = append(c.metricLabels, metrics.Label{Name: "datacenter", Value: dc})
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithLogger takes a logger and creates a new, named sub-logger to use when
|
||||||
|
// running
|
||||||
|
func (c *Config) WithLogger(logger hclog.Logger) *Config {
|
||||||
|
c.logger = logger.Named(logging.UsageMetrics)
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithReportingInterval specifies the interval on which UsageMetricsReporter
|
||||||
|
// should emit metrics
|
||||||
|
func (c *Config) WithReportingInterval(dur time.Duration) *Config {
|
||||||
|
c.tickerInterval = dur
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Config) WithStateProvider(sp StateProvider) *Config {
|
||||||
|
c.stateProvider = sp
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
// StateProvider defines an inteface for retrieving a state.Store handle. In
|
||||||
|
// non-test code, this is satisfied by the fsm.FSM struct.
|
||||||
|
type StateProvider interface {
|
||||||
|
State() *state.Store
|
||||||
|
}
|
||||||
|
|
||||||
|
// UsageMetricsReporter provides functionality for emitting usage metrics into
|
||||||
|
// the metrics stream. This makes it essentially a translation layer
|
||||||
|
// between the state store and metrics stream.
|
||||||
|
type UsageMetricsReporter struct {
|
||||||
|
logger hclog.Logger
|
||||||
|
metricLabels []metrics.Label
|
||||||
|
stateProvider StateProvider
|
||||||
|
tickerInterval time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewUsageMetricsReporter(cfg *Config) (*UsageMetricsReporter, error) {
|
||||||
|
if cfg.stateProvider == nil {
|
||||||
|
return nil, errors.New("must provide a StateProvider to usage reporter")
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.logger == nil {
|
||||||
|
cfg.logger = hclog.NewNullLogger()
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.tickerInterval == 0 {
|
||||||
|
// Metrics are aggregated every 10 seconds, so we default to that.
|
||||||
|
cfg.tickerInterval = 10 * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
|
u := &UsageMetricsReporter{
|
||||||
|
logger: cfg.logger,
|
||||||
|
stateProvider: cfg.stateProvider,
|
||||||
|
metricLabels: cfg.metricLabels,
|
||||||
|
tickerInterval: cfg.tickerInterval,
|
||||||
|
}
|
||||||
|
|
||||||
|
return u, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run must be run in a goroutine, and can be stopped by closing or sending
|
||||||
|
// data to the passed in shutdownCh
|
||||||
|
func (u *UsageMetricsReporter) Run(ctx context.Context) {
|
||||||
|
ticker := time.NewTicker(u.tickerInterval)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
u.logger.Debug("usage metrics reporter shutting down")
|
||||||
|
ticker.Stop()
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
u.runOnce()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *UsageMetricsReporter) runOnce() {
|
||||||
|
state := u.stateProvider.State()
|
||||||
|
_, nodes, err := state.NodeCount()
|
||||||
|
if err != nil {
|
||||||
|
u.logger.Warn("failed to retrieve nodes from state store", "error", err)
|
||||||
|
}
|
||||||
|
metrics.SetGaugeWithLabels(
|
||||||
|
[]string{"consul", "state", "nodes"},
|
||||||
|
float32(nodes),
|
||||||
|
u.metricLabels,
|
||||||
|
)
|
||||||
|
|
||||||
|
_, serviceUsage, err := state.ServiceUsage()
|
||||||
|
if err != nil {
|
||||||
|
u.logger.Warn("failed to retrieve services from state store", "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics.SetGaugeWithLabels(
|
||||||
|
[]string{"consul", "state", "services"},
|
||||||
|
float32(serviceUsage.Services),
|
||||||
|
u.metricLabels,
|
||||||
|
)
|
||||||
|
|
||||||
|
metrics.SetGaugeWithLabels(
|
||||||
|
[]string{"consul", "state", "service_instances"},
|
||||||
|
float32(serviceUsage.ServiceInstances),
|
||||||
|
u.metricLabels,
|
||||||
|
)
|
||||||
|
|
||||||
|
u.emitEnterpriseUsage(serviceUsage)
|
||||||
|
}
|
|
@ -0,0 +1,7 @@
|
||||||
|
// +build !consulent
|
||||||
|
|
||||||
|
package usagemetrics
|
||||||
|
|
||||||
|
import "github.com/hashicorp/consul/agent/consul/state"
|
||||||
|
|
||||||
|
func (u *UsageMetricsReporter) emitEnterpriseUsage(state.ServiceUsage) {}
|
|
@ -0,0 +1,9 @@
|
||||||
|
// +build !consulent
|
||||||
|
|
||||||
|
package usagemetrics
|
||||||
|
|
||||||
|
import "github.com/hashicorp/consul/agent/consul/state"
|
||||||
|
|
||||||
|
func newStateStore() (*state.Store, error) {
|
||||||
|
return state.NewStateStore(nil)
|
||||||
|
}
|
|
@ -0,0 +1,128 @@
|
||||||
|
package usagemetrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/armon/go-metrics"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type mockStateProvider struct {
|
||||||
|
mock.Mock
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockStateProvider) State() *state.Store {
|
||||||
|
retValues := m.Called()
|
||||||
|
return retValues.Get(0).(*state.Store)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUsageReporter_Run(t *testing.T) {
|
||||||
|
type testCase struct {
|
||||||
|
modfiyStateStore func(t *testing.T, s *state.Store)
|
||||||
|
expectedGauges map[string]metrics.GaugeValue
|
||||||
|
}
|
||||||
|
cases := map[string]testCase{
|
||||||
|
"empty-state": {
|
||||||
|
expectedGauges: map[string]metrics.GaugeValue{
|
||||||
|
"consul.usage.test.consul.state.nodes;datacenter=dc1": {
|
||||||
|
Name: "consul.usage.test.consul.state.nodes",
|
||||||
|
Value: 0,
|
||||||
|
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
|
||||||
|
},
|
||||||
|
"consul.usage.test.consul.state.services;datacenter=dc1": {
|
||||||
|
Name: "consul.usage.test.consul.state.services",
|
||||||
|
Value: 0,
|
||||||
|
Labels: []metrics.Label{
|
||||||
|
{Name: "datacenter", Value: "dc1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"consul.usage.test.consul.state.service_instances;datacenter=dc1": {
|
||||||
|
Name: "consul.usage.test.consul.state.service_instances",
|
||||||
|
Value: 0,
|
||||||
|
Labels: []metrics.Label{
|
||||||
|
{Name: "datacenter", Value: "dc1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"nodes-and-services": {
|
||||||
|
modfiyStateStore: func(t *testing.T, s *state.Store) {
|
||||||
|
require.Nil(t, s.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}))
|
||||||
|
require.Nil(t, s.EnsureNode(2, &structs.Node{Node: "bar", Address: "127.0.0.2"}))
|
||||||
|
require.Nil(t, s.EnsureNode(3, &structs.Node{Node: "baz", Address: "127.0.0.2"}))
|
||||||
|
|
||||||
|
// Typical services and some consul services spread across two nodes
|
||||||
|
require.Nil(t, s.EnsureService(4, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: nil, Address: "", Port: 5000}))
|
||||||
|
require.Nil(t, s.EnsureService(5, "bar", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000}))
|
||||||
|
require.Nil(t, s.EnsureService(6, "foo", &structs.NodeService{ID: "consul", Service: "consul", Tags: nil}))
|
||||||
|
require.Nil(t, s.EnsureService(7, "bar", &structs.NodeService{ID: "consul", Service: "consul", Tags: nil}))
|
||||||
|
},
|
||||||
|
expectedGauges: map[string]metrics.GaugeValue{
|
||||||
|
"consul.usage.test.consul.state.nodes;datacenter=dc1": {
|
||||||
|
Name: "consul.usage.test.consul.state.nodes",
|
||||||
|
Value: 3,
|
||||||
|
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
|
||||||
|
},
|
||||||
|
"consul.usage.test.consul.state.services;datacenter=dc1": {
|
||||||
|
Name: "consul.usage.test.consul.state.services",
|
||||||
|
Value: 3,
|
||||||
|
Labels: []metrics.Label{
|
||||||
|
{Name: "datacenter", Value: "dc1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"consul.usage.test.consul.state.service_instances;datacenter=dc1": {
|
||||||
|
Name: "consul.usage.test.consul.state.service_instances",
|
||||||
|
Value: 4,
|
||||||
|
Labels: []metrics.Label{
|
||||||
|
{Name: "datacenter", Value: "dc1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tcase := range cases {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
// Only have a single interval for the test
|
||||||
|
sink := metrics.NewInmemSink(1*time.Minute, 1*time.Minute)
|
||||||
|
cfg := metrics.DefaultConfig("consul.usage.test")
|
||||||
|
cfg.EnableHostname = false
|
||||||
|
metrics.NewGlobal(cfg, sink)
|
||||||
|
|
||||||
|
mockStateProvider := &mockStateProvider{}
|
||||||
|
s, err := newStateStore()
|
||||||
|
require.NoError(t, err)
|
||||||
|
if tcase.modfiyStateStore != nil {
|
||||||
|
tcase.modfiyStateStore(t, s)
|
||||||
|
}
|
||||||
|
mockStateProvider.On("State").Return(s)
|
||||||
|
|
||||||
|
reporter, err := NewUsageMetricsReporter(
|
||||||
|
new(Config).
|
||||||
|
WithStateProvider(mockStateProvider).
|
||||||
|
WithLogger(testutil.Logger(t)).
|
||||||
|
WithDatacenter("dc1"),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
reporter.runOnce()
|
||||||
|
|
||||||
|
intervals := sink.Data()
|
||||||
|
require.Len(t, intervals, 1)
|
||||||
|
intv := intervals[0]
|
||||||
|
|
||||||
|
// Range over the expected values instead of just doing an Equal
|
||||||
|
// comparison on the maps because of different metrics emitted between
|
||||||
|
// OSS and Ent. The enterprise tests have a full equality comparison on
|
||||||
|
// the maps.
|
||||||
|
for key, expected := range tcase.expectedGauges {
|
||||||
|
require.Equal(t, expected, intv.Gauges[key])
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -51,6 +51,7 @@ const (
|
||||||
TerminatingGateway string = "terminating_gateway"
|
TerminatingGateway string = "terminating_gateway"
|
||||||
TLSUtil string = "tlsutil"
|
TLSUtil string = "tlsutil"
|
||||||
Transaction string = "txn"
|
Transaction string = "txn"
|
||||||
|
UsageMetrics string = "usage_metrics"
|
||||||
WAN string = "wan"
|
WAN string = "wan"
|
||||||
Watch string = "watch"
|
Watch string = "watch"
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue