state: partition the usage metrics subsystem (#10867)

This commit is contained in:
R.B. Boyer 2021-08-18 09:27:15 -05:00 committed by GitHub
parent 613dd7d053
commit e44bce3c4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 320 additions and 264 deletions

View File

@ -692,9 +692,9 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
require.Equal(t, fedState2, fedStateLoaded2) require.Equal(t, fedState2, fedStateLoaded2)
// Verify usage data is correctly updated // Verify usage data is correctly updated
idx, nodeCount, err := fsm2.state.NodeCount() idx, nodeUsage, err := fsm2.state.NodeUsage()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, len(nodes), nodeCount) require.Equal(t, len(nodes), nodeUsage.Nodes)
require.NotZero(t, idx) require.NotZero(t, idx)
// Verify system metadata is restored. // Verify system metadata is restored.

View File

@ -10,16 +10,18 @@ import (
const ( const (
serviceNamesUsageTable = "service-names" serviceNamesUsageTable = "service-names"
tableUsage = "usage"
) )
// usageTableSchema returns a new table schema used for tracking various indexes // usageTableSchema returns a new table schema used for tracking various indexes
// for the Raft log. // for the Raft log.
func usageTableSchema() *memdb.TableSchema { func usageTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{ return &memdb.TableSchema{
Name: "usage", Name: tableUsage,
Indexes: map[string]*memdb.IndexSchema{ Indexes: map[string]*memdb.IndexSchema{
"id": { indexID: {
Name: "id", Name: indexID,
AllowMissing: false, AllowMissing: false,
Unique: true, Unique: true,
Indexer: &memdb.StringFieldIndex{ Indexer: &memdb.StringFieldIndex{
@ -46,6 +48,12 @@ type ServiceUsage struct {
EnterpriseServiceUsage EnterpriseServiceUsage
} }
// NodeUsage contains all of the usage data related to nodes
type NodeUsage struct {
Nodes int
EnterpriseNodeUsage
}
type uniqueServiceState int type uniqueServiceState int
const ( const (
@ -68,8 +76,10 @@ func updateUsage(tx WriteTxn, changes Changes) error {
} }
switch change.Table { switch change.Table {
case "nodes": case tableNodes:
usageDeltas[change.Table] += delta usageDeltas[change.Table] += delta
addEnterpriseNodeUsage(usageDeltas, change)
case tableServices: case tableServices:
svc := changeObject(change).(*structs.ServiceNode) svc := changeObject(change).(*structs.ServiceNode)
usageDeltas[change.Table] += delta usageDeltas[change.Table] += delta
@ -98,7 +108,8 @@ func updateUsage(tx WriteTxn, changes Changes) error {
// This will happen when restoring from a snapshot, just take the max index // This will happen when restoring from a snapshot, just take the max index
// of the tables we are tracking. // of the tables we are tracking.
if idx == 0 { if idx == 0 {
idx = maxIndexTxn(tx, "nodes", tableServices) // TODO(partitions? namespaces?)
idx = maxIndexTxn(tx, tableNodes, tableServices)
} }
return writeUsageDeltas(tx, idx, usageDeltas) return writeUsageDeltas(tx, idx, usageDeltas)
@ -107,7 +118,10 @@ func updateUsage(tx WriteTxn, changes Changes) error {
func updateServiceNameUsage(tx WriteTxn, usageDeltas map[string]int, serviceNameChanges map[structs.ServiceName]int) (map[structs.ServiceName]uniqueServiceState, error) { func updateServiceNameUsage(tx WriteTxn, usageDeltas map[string]int, serviceNameChanges map[structs.ServiceName]int) (map[structs.ServiceName]uniqueServiceState, error) {
serviceStates := make(map[structs.ServiceName]uniqueServiceState, len(serviceNameChanges)) serviceStates := make(map[structs.ServiceName]uniqueServiceState, len(serviceNameChanges))
for svc, delta := range serviceNameChanges { for svc, delta := range serviceNameChanges {
q := Query{Value: svc.Name, EnterpriseMeta: svc.EnterpriseMeta} q := Query{
Value: svc.Name,
EnterpriseMeta: svc.EnterpriseMeta,
}
serviceIter, err := tx.Get(tableServices, indexService, q) serviceIter, err := tx.Get(tableServices, indexService, q)
if err != nil { if err != nil {
return nil, err return nil, err
@ -162,7 +176,7 @@ func serviceNameChanged(change memdb.Change) bool {
// passed in will be recorded on the entry as well. // passed in will be recorded on the entry as well.
func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error { func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error {
for id, delta := range usageDeltas { for id, delta := range usageDeltas {
u, err := tx.First("usage", "id", id) u, err := tx.First(tableUsage, indexID, id)
if err != nil { if err != nil {
return fmt.Errorf("failed to retrieve existing usage entry: %s", err) return fmt.Errorf("failed to retrieve existing usage entry: %s", err)
} }
@ -175,7 +189,7 @@ func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error
// large numbers. // large numbers.
delta = 0 delta = 0
} }
err := tx.Insert("usage", &UsageEntry{ err := tx.Insert(tableUsage, &UsageEntry{
ID: id, ID: id,
Count: delta, Count: delta,
Index: idx, Index: idx,
@ -192,7 +206,7 @@ func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error
// large numbers. // large numbers.
updated = 0 updated = 0
} }
err := tx.Insert("usage", &UsageEntry{ err := tx.Insert(tableUsage, &UsageEntry{
ID: id, ID: id,
Count: updated, Count: updated,
Index: idx, Index: idx,
@ -205,17 +219,26 @@ func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error
return nil return nil
} }
// NodeCount returns the latest seen Raft index, a count of the number of nodes // NodeUsage returns the latest seen Raft index, a compiled set of node usage
// registered, and any errors. // data, and any errors.
func (s *Store) NodeCount() (uint64, int, error) { func (s *Store) NodeUsage() (uint64, NodeUsage, error) {
tx := s.db.ReadTxn() tx := s.db.ReadTxn()
defer tx.Abort() defer tx.Abort()
nodeUsage, err := firstUsageEntry(tx, "nodes") nodes, err := firstUsageEntry(tx, tableNodes)
if err != nil { if err != nil {
return 0, 0, fmt.Errorf("failed nodes lookup: %s", err) return 0, NodeUsage{}, fmt.Errorf("failed nodes lookup: %s", err)
} }
return nodeUsage.Index, nodeUsage.Count, nil
usage := NodeUsage{
Nodes: nodes.Count,
}
results, err := compileEnterpriseNodeUsage(tx, usage)
if err != nil {
return 0, NodeUsage{}, fmt.Errorf("failed nodes lookup: %s", err)
}
return nodes.Index, results, nil
} }
// ServiceUsage returns the latest seen Raft index, a compiled set of service // ServiceUsage returns the latest seen Raft index, a compiled set of service
@ -238,7 +261,7 @@ func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) {
ServiceInstances: serviceInstances.Count, ServiceInstances: serviceInstances.Count,
Services: services.Count, Services: services.Count,
} }
results, err := compileEnterpriseUsage(tx, usage) results, err := compileEnterpriseServiceUsage(tx, usage)
if err != nil { if err != nil {
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
} }
@ -247,7 +270,7 @@ func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) {
} }
func firstUsageEntry(tx ReadTxn, id string) (*UsageEntry, error) { func firstUsageEntry(tx ReadTxn, id string) (*UsageEntry, error) {
usage, err := tx.First("usage", "id", id) usage, err := tx.First(tableUsage, indexID, id)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -3,16 +3,24 @@
package state package state
import ( import (
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb" memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/structs"
) )
type EnterpriseServiceUsage struct{} type EnterpriseServiceUsage struct{}
type EnterpriseNodeUsage struct{}
func addEnterpriseNodeUsage(map[string]int, memdb.Change) {}
func addEnterpriseServiceInstanceUsage(map[string]int, memdb.Change) {} func addEnterpriseServiceInstanceUsage(map[string]int, memdb.Change) {}
func addEnterpriseServiceUsage(map[string]int, map[structs.ServiceName]uniqueServiceState) {} func addEnterpriseServiceUsage(map[string]int, map[structs.ServiceName]uniqueServiceState) {}
func compileEnterpriseUsage(tx ReadTxn, usage ServiceUsage) (ServiceUsage, error) { func compileEnterpriseServiceUsage(tx ReadTxn, usage ServiceUsage) (ServiceUsage, error) {
return usage, nil
}
func compileEnterpriseNodeUsage(tx ReadTxn, usage NodeUsage) (NodeUsage, error) {
return usage, nil return usage, nil
} }

View File

@ -1,25 +0,0 @@
// +build !consulent
package state
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestStateStore_Usage_ServiceUsage(t *testing.T) {
s := testStateStore(t)
testRegisterNode(t, s, 0, "node1")
testRegisterNode(t, s, 1, "node2")
testRegisterService(t, s, 8, "node1", "service1")
testRegisterService(t, s, 9, "node2", "service1")
testRegisterService(t, s, 10, "node2", "service2")
idx, usage, err := s.ServiceUsage()
require.NoError(t, err)
require.Equal(t, idx, uint64(10))
require.Equal(t, 2, usage.Services)
require.Equal(t, 3, usage.ServiceInstances)
}

View File

@ -9,40 +9,40 @@ import (
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
) )
func TestStateStore_Usage_NodeCount(t *testing.T) { func TestStateStore_Usage_NodeUsage(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
// No nodes have been registered, and thus no usage entry exists // No nodes have been registered, and thus no usage entry exists
idx, count, err := s.NodeCount() idx, usage, err := s.NodeUsage()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, idx, uint64(0)) require.Equal(t, idx, uint64(0))
require.Equal(t, count, 0) require.Equal(t, usage.Nodes, 0)
testRegisterNode(t, s, 0, "node1") testRegisterNode(t, s, 0, "node1")
testRegisterNode(t, s, 1, "node2") testRegisterNode(t, s, 1, "node2")
idx, count, err = s.NodeCount() idx, usage, err = s.NodeUsage()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, idx, uint64(1)) require.Equal(t, idx, uint64(1))
require.Equal(t, count, 2) require.Equal(t, usage.Nodes, 2)
} }
func TestStateStore_Usage_NodeCount_Delete(t *testing.T) { func TestStateStore_Usage_NodeUsage_Delete(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
testRegisterNode(t, s, 0, "node1") testRegisterNode(t, s, 0, "node1")
testRegisterNode(t, s, 1, "node2") testRegisterNode(t, s, 1, "node2")
idx, count, err := s.NodeCount() idx, usage, err := s.NodeUsage()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, idx, uint64(1)) require.Equal(t, idx, uint64(1))
require.Equal(t, count, 2) require.Equal(t, usage.Nodes, 2)
require.NoError(t, s.DeleteNode(2, "node2", nil)) require.NoError(t, s.DeleteNode(2, "node2", nil))
idx, count, err = s.NodeCount() idx, usage, err = s.NodeUsage()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, idx, uint64(2)) require.Equal(t, idx, uint64(2))
require.Equal(t, count, 1) require.Equal(t, usage.Nodes, 1)
} }
func TestStateStore_Usage_ServiceUsageEmpty(t *testing.T) { func TestStateStore_Usage_ServiceUsageEmpty(t *testing.T) {
@ -56,6 +56,22 @@ func TestStateStore_Usage_ServiceUsageEmpty(t *testing.T) {
require.Equal(t, usage.ServiceInstances, 0) require.Equal(t, usage.ServiceInstances, 0)
} }
func TestStateStore_Usage_ServiceUsage(t *testing.T) {
s := testStateStore(t)
testRegisterNode(t, s, 0, "node1")
testRegisterNode(t, s, 1, "node2")
testRegisterService(t, s, 8, "node1", "service1")
testRegisterService(t, s, 9, "node2", "service1")
testRegisterService(t, s, 10, "node2", "service2")
idx, usage, err := s.ServiceUsage()
require.NoError(t, err)
require.Equal(t, idx, uint64(10))
require.Equal(t, 2, usage.Services)
require.Equal(t, 3, usage.ServiceInstances)
}
func TestStateStore_Usage_ServiceUsage_DeleteNode(t *testing.T) { func TestStateStore_Usage_ServiceUsage_DeleteNode(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
testRegisterNode(t, s, 1, "node1") testRegisterNode(t, s, 1, "node1")
@ -116,10 +132,10 @@ func TestStateStore_Usage_Restore(t *testing.T) {
}) })
require.NoError(t, restore.Commit()) require.NoError(t, restore.Commit())
idx, count, err := s.NodeCount() idx, nodeUsage, err := s.NodeUsage()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, idx, uint64(9)) require.Equal(t, idx, uint64(9))
require.Equal(t, count, 1) require.Equal(t, nodeUsage.Nodes, 1)
idx, usage, err := s.ServiceUsage() idx, usage, err := s.ServiceUsage()
require.NoError(t, err) require.NoError(t, err)

View File

@ -8,10 +8,11 @@ import (
"github.com/armon/go-metrics/prometheus" "github.com/armon/go-metrics/prometheus"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/logging"
) )
var Gauges = []prometheus.GaugeDefinition{ var Gauges = []prometheus.GaugeDefinition{
@ -145,15 +146,13 @@ func (u *UsageMetricsReporter) Run(ctx context.Context) {
func (u *UsageMetricsReporter) runOnce() { func (u *UsageMetricsReporter) runOnce() {
state := u.stateProvider.State() state := u.stateProvider.State()
_, nodes, err := state.NodeCount()
_, nodeUsage, err := state.NodeUsage()
if err != nil { if err != nil {
u.logger.Warn("failed to retrieve nodes from state store", "error", err) u.logger.Warn("failed to retrieve nodes from state store", "error", err)
} }
metrics.SetGaugeWithLabels(
[]string{"consul", "state", "nodes"}, u.emitNodeUsage(nodeUsage)
float32(nodes),
u.metricLabels,
)
_, serviceUsage, err := state.ServiceUsage() _, serviceUsage, err := state.ServiceUsage()
if err != nil { if err != nil {
@ -162,65 +161,27 @@ func (u *UsageMetricsReporter) runOnce() {
u.emitServiceUsage(serviceUsage) u.emitServiceUsage(serviceUsage)
servers, clients := u.memberUsage() members := u.memberUsage()
u.emitMemberUsage(servers, clients) u.emitMemberUsage(members)
} }
func (u *UsageMetricsReporter) memberUsage() (int, map[string]int) { func (u *UsageMetricsReporter) memberUsage() []serf.Member {
if u.getMembersFunc == nil { if u.getMembersFunc == nil {
return 0, nil return nil
} }
mems := u.getMembersFunc() mems := u.getMembersFunc()
if len(mems) <= 0 { if len(mems) <= 0 {
u.logger.Warn("cluster reported zero members") u.logger.Warn("cluster reported zero members")
return 0, nil
} }
servers := 0 out := make([]serf.Member, 0, len(mems))
clients := make(map[string]int)
for _, m := range mems { for _, m := range mems {
if m.Status != serf.StatusAlive { if m.Status != serf.StatusAlive {
continue continue
} }
out = append(out, m)
switch m.Tags["role"] {
case "node":
clients[m.Tags["segment"]]++
case "consul":
servers++
}
} }
return servers, clients return out
}
func (u *UsageMetricsReporter) emitMemberUsage(servers int, clients map[string]int) {
totalClients := 0
for seg, c := range clients {
segmentLabel := metrics.Label{Name: "segment", Value: seg}
labels := append([]metrics.Label{segmentLabel}, u.metricLabels...)
metrics.SetGaugeWithLabels(
[]string{"consul", "members", "clients"},
float32(c),
labels,
)
totalClients += c
}
metrics.SetGaugeWithLabels(
[]string{"consul", "members", "clients"},
float32(totalClients),
u.metricLabels,
)
metrics.SetGaugeWithLabels(
[]string{"consul", "members", "servers"},
float32(servers),
u.metricLabels,
)
} }

View File

@ -4,9 +4,47 @@ package usagemetrics
import ( import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/serf/serf"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
) )
func (u *UsageMetricsReporter) emitNodeUsage(nodeUsage state.NodeUsage) {
metrics.SetGaugeWithLabels(
[]string{"consul", "state", "nodes"},
float32(nodeUsage.Nodes),
u.metricLabels,
)
}
func (u *UsageMetricsReporter) emitMemberUsage(members []serf.Member) {
var (
servers int
clients int
)
for _, m := range members {
switch m.Tags["role"] {
case "node":
clients++
case "consul":
servers++
}
}
metrics.SetGaugeWithLabels(
[]string{"consul", "members", "clients"},
float32(clients),
u.metricLabels,
)
metrics.SetGaugeWithLabels(
[]string{"consul", "members", "servers"},
float32(servers),
u.metricLabels,
)
}
func (u *UsageMetricsReporter) emitServiceUsage(serviceUsage state.ServiceUsage) { func (u *UsageMetricsReporter) emitServiceUsage(serviceUsage state.ServiceUsage) {
metrics.SetGaugeWithLabels( metrics.SetGaugeWithLabels(
[]string{"consul", "state", "services"}, []string{"consul", "state", "services"},

View File

@ -9,16 +9,151 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/serf/serf"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/serf/serf"
) )
func newStateStore() (*state.Store, error) { func newStateStore() (*state.Store, error) {
return state.NewStateStore(nil), nil return state.NewStateStore(nil), nil
} }
func TestUsageReporter_emitNodeUsage_OSS(t *testing.T) {
type testCase struct {
modfiyStateStore func(t *testing.T, s *state.Store)
getMembersFunc getMembersFunc
expectedGauges map[string]metrics.GaugeValue
}
cases := map[string]testCase{
"empty-state": {
expectedGauges: map[string]metrics.GaugeValue{
// --- node ---
"consul.usage.test.consul.state.nodes;datacenter=dc1": {
Name: "consul.usage.test.consul.state.nodes",
Value: 0,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
// --- member ---
"consul.usage.test.consul.members.clients;datacenter=dc1": {
Name: "consul.usage.test.consul.members.clients",
Value: 0,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
"consul.usage.test.consul.members.servers;datacenter=dc1": {
Name: "consul.usage.test.consul.members.servers",
Value: 0,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
// --- service ---
"consul.usage.test.consul.state.services;datacenter=dc1": {
Name: "consul.usage.test.consul.state.services",
Value: 0,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
"consul.usage.test.consul.state.service_instances;datacenter=dc1": {
Name: "consul.usage.test.consul.state.service_instances",
Value: 0,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
},
getMembersFunc: func() []serf.Member { return []serf.Member{} },
},
"nodes": {
modfiyStateStore: func(t *testing.T, s *state.Store) {
require.NoError(t, s.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}))
require.NoError(t, s.EnsureNode(2, &structs.Node{Node: "bar", Address: "127.0.0.2"}))
require.NoError(t, s.EnsureNode(3, &structs.Node{Node: "baz", Address: "127.0.0.2"}))
},
getMembersFunc: func() []serf.Member {
return []serf.Member{
{
Name: "foo",
Tags: map[string]string{"role": "consul"},
Status: serf.StatusAlive,
},
{
Name: "bar",
Tags: map[string]string{"role": "consul"},
Status: serf.StatusAlive,
},
{
Name: "baz",
Tags: map[string]string{"role": "node"},
Status: serf.StatusAlive,
},
}
},
expectedGauges: map[string]metrics.GaugeValue{
// --- node ---
"consul.usage.test.consul.state.nodes;datacenter=dc1": {
Name: "consul.usage.test.consul.state.nodes",
Value: 3,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
// --- member ---
"consul.usage.test.consul.members.servers;datacenter=dc1": {
Name: "consul.usage.test.consul.members.servers",
Value: 2,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
"consul.usage.test.consul.members.clients;datacenter=dc1": {
Name: "consul.usage.test.consul.members.clients",
Value: 1,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
// --- service ---
"consul.usage.test.consul.state.services;datacenter=dc1": {
Name: "consul.usage.test.consul.state.services",
Value: 0,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
"consul.usage.test.consul.state.service_instances;datacenter=dc1": {
Name: "consul.usage.test.consul.state.service_instances",
Value: 0,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
},
},
}
for name, tcase := range cases {
t.Run(name, func(t *testing.T) {
// Only have a single interval for the test
sink := metrics.NewInmemSink(1*time.Minute, 1*time.Minute)
cfg := metrics.DefaultConfig("consul.usage.test")
cfg.EnableHostname = false
metrics.NewGlobal(cfg, sink)
mockStateProvider := &mockStateProvider{}
s, err := newStateStore()
require.NoError(t, err)
if tcase.modfiyStateStore != nil {
tcase.modfiyStateStore(t, s)
}
mockStateProvider.On("State").Return(s)
reporter, err := NewUsageMetricsReporter(
new(Config).
WithStateProvider(mockStateProvider).
WithLogger(testutil.Logger(t)).
WithDatacenter("dc1").
WithGetMembersFunc(tcase.getMembersFunc),
)
require.NoError(t, err)
reporter.runOnce()
intervals := sink.Data()
require.Len(t, intervals, 1)
intv := intervals[0]
assertEqualGaugeMaps(t, tcase.expectedGauges, intv.Gauges)
})
}
}
func TestUsageReporter_emitServiceUsage_OSS(t *testing.T) { func TestUsageReporter_emitServiceUsage_OSS(t *testing.T) {
type testCase struct { type testCase struct {
modfiyStateStore func(t *testing.T, s *state.Store) modfiyStateStore func(t *testing.T, s *state.Store)
@ -28,11 +163,28 @@ func TestUsageReporter_emitServiceUsage_OSS(t *testing.T) {
cases := map[string]testCase{ cases := map[string]testCase{
"empty-state": { "empty-state": {
expectedGauges: map[string]metrics.GaugeValue{ expectedGauges: map[string]metrics.GaugeValue{
// --- node ---
"consul.usage.test.consul.state.nodes;datacenter=dc1": { "consul.usage.test.consul.state.nodes;datacenter=dc1": {
Name: "consul.usage.test.consul.state.nodes", Name: "consul.usage.test.consul.state.nodes",
Value: 0, Value: 0,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}}, Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
}, },
// --- member ---
"consul.usage.test.consul.members.servers;datacenter=dc1": {
Name: "consul.usage.test.consul.members.servers",
Value: 0,
Labels: []metrics.Label{
{Name: "datacenter", Value: "dc1"},
},
},
"consul.usage.test.consul.members.clients;datacenter=dc1": {
Name: "consul.usage.test.consul.members.clients",
Value: 0,
Labels: []metrics.Label{
{Name: "datacenter", Value: "dc1"},
},
},
// --- service ---
"consul.usage.test.consul.state.services;datacenter=dc1": { "consul.usage.test.consul.state.services;datacenter=dc1": {
Name: "consul.usage.test.consul.state.services", Name: "consul.usage.test.consul.state.services",
Value: 0, Value: 0,
@ -47,35 +199,21 @@ func TestUsageReporter_emitServiceUsage_OSS(t *testing.T) {
{Name: "datacenter", Value: "dc1"}, {Name: "datacenter", Value: "dc1"},
}, },
}, },
"consul.usage.test.consul.members.clients;datacenter=dc1": {
Name: "consul.usage.test.consul.members.clients",
Value: 0,
Labels: []metrics.Label{
{Name: "datacenter", Value: "dc1"},
},
},
"consul.usage.test.consul.members.servers;datacenter=dc1": {
Name: "consul.usage.test.consul.members.servers",
Value: 0,
Labels: []metrics.Label{
{Name: "datacenter", Value: "dc1"},
},
},
}, },
getMembersFunc: func() []serf.Member { return []serf.Member{} }, getMembersFunc: func() []serf.Member { return []serf.Member{} },
}, },
"nodes-and-services": { "nodes-and-services": {
modfiyStateStore: func(t *testing.T, s *state.Store) { modfiyStateStore: func(t *testing.T, s *state.Store) {
require.Nil(t, s.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})) require.NoError(t, s.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}))
require.Nil(t, s.EnsureNode(2, &structs.Node{Node: "bar", Address: "127.0.0.2"})) require.NoError(t, s.EnsureNode(2, &structs.Node{Node: "bar", Address: "127.0.0.2"}))
require.Nil(t, s.EnsureNode(3, &structs.Node{Node: "baz", Address: "127.0.0.2"})) require.NoError(t, s.EnsureNode(3, &structs.Node{Node: "baz", Address: "127.0.0.2"}))
require.Nil(t, s.EnsureNode(4, &structs.Node{Node: "qux", Address: "127.0.0.3"})) require.NoError(t, s.EnsureNode(4, &structs.Node{Node: "qux", Address: "127.0.0.3"}))
// Typical services and some consul services spread across two nodes // Typical services and some consul services spread across two nodes
require.Nil(t, s.EnsureService(5, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: nil, Address: "", Port: 5000})) require.NoError(t, s.EnsureService(5, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: nil, Address: "", Port: 5000}))
require.Nil(t, s.EnsureService(6, "bar", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000})) require.NoError(t, s.EnsureService(6, "bar", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000}))
require.Nil(t, s.EnsureService(7, "foo", &structs.NodeService{ID: "consul", Service: "consul", Tags: nil})) require.NoError(t, s.EnsureService(7, "foo", &structs.NodeService{ID: "consul", Service: "consul", Tags: nil}))
require.Nil(t, s.EnsureService(8, "bar", &structs.NodeService{ID: "consul", Service: "consul", Tags: nil})) require.NoError(t, s.EnsureService(8, "bar", &structs.NodeService{ID: "consul", Service: "consul", Tags: nil}))
}, },
getMembersFunc: func() []serf.Member { getMembersFunc: func() []serf.Member {
return []serf.Member{ return []serf.Member{
@ -102,21 +240,16 @@ func TestUsageReporter_emitServiceUsage_OSS(t *testing.T) {
} }
}, },
expectedGauges: map[string]metrics.GaugeValue{ expectedGauges: map[string]metrics.GaugeValue{
// --- node ---
"consul.usage.test.consul.state.nodes;datacenter=dc1": { "consul.usage.test.consul.state.nodes;datacenter=dc1": {
Name: "consul.usage.test.consul.state.nodes", Name: "consul.usage.test.consul.state.nodes",
Value: 4, Value: 4,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}}, Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
}, },
"consul.usage.test.consul.state.services;datacenter=dc1": { // --- member ---
Name: "consul.usage.test.consul.state.services", "consul.usage.test.consul.members.servers;datacenter=dc1": {
Value: 3, Name: "consul.usage.test.consul.members.servers",
Labels: []metrics.Label{ Value: 2,
{Name: "datacenter", Value: "dc1"},
},
},
"consul.usage.test.consul.state.service_instances;datacenter=dc1": {
Name: "consul.usage.test.consul.state.service_instances",
Value: 4,
Labels: []metrics.Label{ Labels: []metrics.Label{
{Name: "datacenter", Value: "dc1"}, {Name: "datacenter", Value: "dc1"},
}, },
@ -128,26 +261,18 @@ func TestUsageReporter_emitServiceUsage_OSS(t *testing.T) {
{Name: "datacenter", Value: "dc1"}, {Name: "datacenter", Value: "dc1"},
}, },
}, },
"consul.usage.test.consul.members.servers;datacenter=dc1": { // --- service ---
Name: "consul.usage.test.consul.members.servers", "consul.usage.test.consul.state.services;datacenter=dc1": {
Value: 2, Name: "consul.usage.test.consul.state.services",
Value: 3,
Labels: []metrics.Label{ Labels: []metrics.Label{
{Name: "datacenter", Value: "dc1"}, {Name: "datacenter", Value: "dc1"},
}, },
}, },
"consul.usage.test.consul.members.clients;segment=a;datacenter=dc1": { "consul.usage.test.consul.state.service_instances;datacenter=dc1": {
Name: "consul.usage.test.consul.members.clients", Name: "consul.usage.test.consul.state.service_instances",
Value: 1, Value: 4,
Labels: []metrics.Label{ Labels: []metrics.Label{
{Name: "segment", Value: "a"},
{Name: "datacenter", Value: "dc1"},
},
},
"consul.usage.test.consul.members.clients;segment=b;datacenter=dc1": {
Name: "consul.usage.test.consul.members.clients",
Value: 1,
Labels: []metrics.Label{
{Name: "segment", Value: "b"},
{Name: "datacenter", Value: "dc1"}, {Name: "datacenter", Value: "dc1"},
}, },
}, },
@ -185,7 +310,7 @@ func TestUsageReporter_emitServiceUsage_OSS(t *testing.T) {
require.Len(t, intervals, 1) require.Len(t, intervals, 1)
intv := intervals[0] intv := intervals[0]
require.Equal(t, tcase.expectedGauges, intv.Gauges) assertEqualGaugeMaps(t, tcase.expectedGauges, intv.Gauges)
}) })
} }
} }

View File

@ -2,16 +2,12 @@ package usagemetrics
import ( import (
"testing" "testing"
"time"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/serf/serf"
) )
type mockStateProvider struct { type mockStateProvider struct {
@ -23,106 +19,20 @@ func (m *mockStateProvider) State() *state.Store {
return retValues.Get(0).(*state.Store) return retValues.Get(0).(*state.Store)
} }
func TestUsageReporter_Run_Nodes(t *testing.T) { func assertEqualGaugeMaps(t *testing.T, expectedMap, foundMap map[string]metrics.GaugeValue) {
type testCase struct { t.Helper()
modfiyStateStore func(t *testing.T, s *state.Store)
getMembersFunc getMembersFunc for key := range foundMap {
expectedGauges map[string]metrics.GaugeValue if _, ok := expectedMap[key]; !ok {
t.Errorf("found unexpected gauge key: %s", key)
} }
cases := map[string]testCase{
"empty-state": {
expectedGauges: map[string]metrics.GaugeValue{
"consul.usage.test.consul.state.nodes;datacenter=dc1": {
Name: "consul.usage.test.consul.state.nodes",
Value: 0,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
},
getMembersFunc: func() []serf.Member { return []serf.Member{} },
},
"nodes": {
modfiyStateStore: func(t *testing.T, s *state.Store) {
require.Nil(t, s.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}))
require.Nil(t, s.EnsureNode(2, &structs.Node{Node: "bar", Address: "127.0.0.2"}))
require.Nil(t, s.EnsureNode(3, &structs.Node{Node: "baz", Address: "127.0.0.2"}))
},
getMembersFunc: func() []serf.Member {
return []serf.Member{
{
Name: "foo",
Tags: map[string]string{"role": "consul"},
Status: serf.StatusAlive,
},
{
Name: "bar",
Tags: map[string]string{"role": "consul"},
Status: serf.StatusAlive,
},
{
Name: "baz",
Tags: map[string]string{"role": "node"},
Status: serf.StatusAlive,
},
}
},
expectedGauges: map[string]metrics.GaugeValue{
"consul.usage.test.consul.state.nodes;datacenter=dc1": {
Name: "consul.usage.test.consul.state.nodes",
Value: 3,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
"consul.usage.test.consul.members.clients;datacenter=dc1": {
Name: "consul.usage.test.consul.members.clients",
Value: 1,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
"consul.usage.test.consul.members.servers;datacenter=dc1": {
Name: "consul.usage.test.consul.members.servers",
Value: 2,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
},
},
} }
for name, tcase := range cases { for key, expected := range expectedMap {
t.Run(name, func(t *testing.T) { if _, ok := foundMap[key]; !ok {
// Only have a single interval for the test t.Errorf("did not find expected gauge key: %s", key)
sink := metrics.NewInmemSink(1*time.Minute, 1*time.Minute) continue
cfg := metrics.DefaultConfig("consul.usage.test")
cfg.EnableHostname = false
metrics.NewGlobal(cfg, sink)
mockStateProvider := &mockStateProvider{}
s, err := newStateStore()
require.NoError(t, err)
if tcase.modfiyStateStore != nil {
tcase.modfiyStateStore(t, s)
} }
mockStateProvider.On("State").Return(s) assert.Equal(t, expected, foundMap[key], "gauge key mismatch on %q", key)
reporter, err := NewUsageMetricsReporter(
new(Config).
WithStateProvider(mockStateProvider).
WithLogger(testutil.Logger(t)).
WithDatacenter("dc1").
WithGetMembersFunc(tcase.getMembersFunc),
)
require.NoError(t, err)
reporter.runOnce()
intervals := sink.Data()
require.Len(t, intervals, 1)
intv := intervals[0]
// Range over the expected values instead of just doing an Equal
// comparison on the maps because of different metrics emitted between
// OSS and Ent. The enterprise and OSS tests have a full equality
// comparison on the maps.
for key, expected := range tcase.expectedGauges {
require.Equal(t, expected, intv.Gauges[key])
}
})
} }
} }