diff --git a/agent/consul/fsm/snapshot_oss_test.go b/agent/consul/fsm/snapshot_oss_test.go index e845c41c91..f798a0efaa 100644 --- a/agent/consul/fsm/snapshot_oss_test.go +++ b/agent/consul/fsm/snapshot_oss_test.go @@ -654,6 +654,12 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { require.NoError(t, err) require.Equal(t, fedState2, fedStateLoaded2) + // Verify usage data is correctly updated + idx, nodeCount, err := fsm2.state.NodeCount() + require.NoError(t, err) + require.Equal(t, len(nodes), nodeCount) + require.NotZero(t, idx) + // Snapshot snap, err = fsm2.Snapshot() require.NoError(t, err) diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index be4f4348e2..5cdfd19dd1 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -89,17 +89,20 @@ func (c *changeTrackerDB) publish(changes Changes) error { return nil } -// WriteTxnRestore returns a wrapped RW transaction that does NOT have change -// tracking enabled. This should only be used in Restore where we need to -// replace the entire contents of the Store without a need to track the changes. -// WriteTxnRestore uses a zero index since the whole restore doesn't really occur -// at one index - the effect is to write many values that were previously +// WriteTxnRestore returns a wrapped RW transaction that should only be used in +// Restore where we need to replace the entire contents of the Store. +// WriteTxnRestore uses a zero index since the whole restore doesn't really +// occur at one index - the effect is to write many values that were previously // written across many indexes. func (c *changeTrackerDB) WriteTxnRestore() *txn { - return &txn{ + t := &txn{ Txn: c.db.Txn(true), Index: 0, } + + // We enable change tracking so that usage data is correctly populated. + t.Txn.TrackChanges() + return t } // txn wraps a memdb.Txn to capture changes and send them to the EventPublisher. @@ -125,14 +128,21 @@ type txn struct { // by the caller. A non-nil error indicates that a commit failed and was not // applied. func (tx *txn) Commit() error { + changes := Changes{ + Index: tx.Index, + Changes: tx.Txn.Changes(), + } + + if len(changes.Changes) > 0 { + if err := updateUsage(tx, changes); err != nil { + return err + } + } + // publish may be nil if this is a read-only or WriteTxnRestore transaction. // In those cases changes should also be empty, and there will be nothing // to publish. if tx.publish != nil { - changes := Changes{ - Index: tx.Index, - Changes: tx.Txn.Changes(), - } if err := tx.publish(changes); err != nil { return err } diff --git a/agent/consul/state/usage.go b/agent/consul/state/usage.go new file mode 100644 index 0000000000..397e157f37 --- /dev/null +++ b/agent/consul/state/usage.go @@ -0,0 +1,170 @@ +package state + +import ( + "fmt" + + memdb "github.com/hashicorp/go-memdb" +) + +// usageTableSchema returns a new table schema used for tracking various indexes +// for the Raft log. +func usageTableSchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: "usage", + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + AllowMissing: false, + Unique: true, + Indexer: &memdb.StringFieldIndex{ + Field: "ID", + Lowercase: true, + }, + }, + }, + } +} + +func init() { + registerSchema(usageTableSchema) +} + +type UsageEntry struct { + ID string + Index uint64 + Count int +} + +// updateUsage takes a set of memdb changes and computes a delta for specific +// usage metrics that we track. +func updateUsage(tx *txn, changes Changes) error { + usageDeltas := make(map[string]int) + for _, change := range changes.Changes { + var delta int + if change.Created() { + delta = 1 + } else if change.Deleted() { + delta = -1 + } + switch change.Table { + case "nodes": + usageDeltas[change.Table] += delta + case "services": + usageDeltas[change.Table] += delta + } + + addEnterpriseUsage(usageDeltas, change) + } + + idx := changes.Index + // This will happen when restoring from a snapshot, just take the max index + // of the tables we are tracking. + if idx == 0 { + idx = maxIndexTxn(tx, "nodes", "services") + } + + for id, delta := range usageDeltas { + u, err := tx.First("usage", "id", id) + if err != nil { + return fmt.Errorf("failed to retrieve existing usage entry: %s", err) + } + + if u == nil { + if delta < 0 { + return fmt.Errorf("failed to insert usage entry for %q: delta will cause a negative count", id) + } + err := tx.Insert("usage", &UsageEntry{ + ID: id, + Count: delta, + Index: idx, + }) + if err != nil { + return fmt.Errorf("failed to update usage entry: %s", err) + } + } else if cur, ok := u.(*UsageEntry); ok { + if cur.Count+delta < 0 { + return fmt.Errorf("failed to insert usage entry for %q: delta will cause a negative count", id) + } + err := tx.Insert("usage", &UsageEntry{ + ID: id, + Count: cur.Count + delta, + Index: idx, + }) + if err != nil { + return fmt.Errorf("failed to update usage entry: %s", err) + } + } + } + return nil +} + +// ServiceUsage contains all of the usage data related to services +type ServiceUsage struct { + Services int + ServiceInstances int + EnterpriseServiceUsage +} + +// NodeCount returns the latest seen Raft index, a count of the number of nodes +// registered, and any errors. +func (s *Store) NodeCount() (uint64, int, error) { + tx := s.db.ReadTxn() + defer tx.Abort() + + usage, err := tx.First("usage", "id", "nodes") + if err != nil { + return 0, 0, fmt.Errorf("failed nodes lookup: %s", err) + } + + // If no nodes have been registered, the usage entry will not exist. + if usage == nil { + return 0, 0, nil + } + + nodeUsage, ok := usage.(*UsageEntry) + if !ok { + return 0, 0, fmt.Errorf("failed nodes lookup: type %T is not *UsageEntry", usage) + } + + return nodeUsage.Index, nodeUsage.Count, nil +} + +// ServiceUsage returns the latest seen Raft index, a compiled set of service +// usage data, and any errors. +func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) { + tx := s.db.ReadTxn() + defer tx.Abort() + + usage, err := firstUsageEntry(tx, "services") + if err != nil { + return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) + } + + results, err := s.compileServiceUsage(tx, usage.Count) + if err != nil { + return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) + } + + return usage.Index, results, nil +} + +func firstUsageEntry(tx *txn, id string) (*UsageEntry, error) { + usage, err := tx.First("usage", "id", id) + if err != nil { + return nil, err + } + + // If no elements have been inserted, the usage entry will not exist. We + // return a valid value so that can be certain the return value is not nil + // when no error has occurred. + if usage == nil { + return &UsageEntry{ID: id, Count: 0}, nil + } + + realUsage, ok := usage.(*UsageEntry) + if !ok { + return nil, fmt.Errorf("failed usage lookup: type %T is not *UsageEntry", usage) + } + + return realUsage, nil +} diff --git a/agent/consul/state/usage_oss.go b/agent/consul/state/usage_oss.go new file mode 100644 index 0000000000..ec54313d56 --- /dev/null +++ b/agent/consul/state/usage_oss.go @@ -0,0 +1,33 @@ +// +build !consulent + +package state + +import ( + "fmt" + + memdb "github.com/hashicorp/go-memdb" +) + +type EnterpriseServiceUsage struct{} + +func addEnterpriseUsage(map[string]int, memdb.Change) {} + +func (s *Store) compileServiceUsage(tx *txn, totalInstances int) (ServiceUsage, error) { + var totalServices int + results, err := tx.Get( + "index", + "id_prefix", + serviceIndexName("", nil), + ) + if err != nil { + return ServiceUsage{}, fmt.Errorf("failed services index lookup: %s", err) + } + for i := results.Next(); i != nil; i = results.Next() { + totalServices += 1 + } + + return ServiceUsage{ + Services: totalServices, + ServiceInstances: totalInstances, + }, nil +} diff --git a/agent/consul/state/usage_oss_test.go b/agent/consul/state/usage_oss_test.go new file mode 100644 index 0000000000..b441c71635 --- /dev/null +++ b/agent/consul/state/usage_oss_test.go @@ -0,0 +1,25 @@ +// +build !consulent + +package state + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestStateStore_Usage_ServiceUsage(t *testing.T) { + s := testStateStore(t) + + testRegisterNode(t, s, 0, "node1") + testRegisterNode(t, s, 1, "node2") + testRegisterService(t, s, 8, "node1", "service1") + testRegisterService(t, s, 9, "node2", "service1") + testRegisterService(t, s, 10, "node2", "service2") + + idx, usage, err := s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(10)) + require.Equal(t, 2, usage.Services) + require.Equal(t, 3, usage.ServiceInstances) +} diff --git a/agent/consul/state/usage_test.go b/agent/consul/state/usage_test.go new file mode 100644 index 0000000000..a1c07f6546 --- /dev/null +++ b/agent/consul/state/usage_test.go @@ -0,0 +1,133 @@ +package state + +import ( + "testing" + + "github.com/hashicorp/consul/agent/structs" + memdb "github.com/hashicorp/go-memdb" + "github.com/stretchr/testify/require" +) + +func TestStateStore_Usage_NodeCount(t *testing.T) { + s := testStateStore(t) + + // No nodes have been registered, and thus no usage entry exists + idx, count, err := s.NodeCount() + require.NoError(t, err) + require.Equal(t, idx, uint64(0)) + require.Equal(t, count, 0) + + testRegisterNode(t, s, 0, "node1") + testRegisterNode(t, s, 1, "node2") + + idx, count, err = s.NodeCount() + require.NoError(t, err) + require.Equal(t, idx, uint64(1)) + require.Equal(t, count, 2) +} + +func TestStateStore_Usage_NodeCount_Delete(t *testing.T) { + s := testStateStore(t) + + testRegisterNode(t, s, 0, "node1") + testRegisterNode(t, s, 1, "node2") + + idx, count, err := s.NodeCount() + require.NoError(t, err) + require.Equal(t, idx, uint64(1)) + require.Equal(t, count, 2) + + require.NoError(t, s.DeleteNode(2, "node2")) + idx, count, err = s.NodeCount() + require.NoError(t, err) + require.Equal(t, idx, uint64(2)) + require.Equal(t, count, 1) +} + +func TestStateStore_Usage_ServiceUsageEmpty(t *testing.T) { + s := testStateStore(t) + + // No services have been registered, and thus no usage entry exists + idx, usage, err := s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(0)) + require.Equal(t, usage.Services, 0) + require.Equal(t, usage.ServiceInstances, 0) +} + +func TestStateStore_Usage_Restore(t *testing.T) { + s := testStateStore(t) + restore := s.Restore() + restore.Registration(9, &structs.RegisterRequest{ + Node: "test-node", + Service: &structs.NodeService{ + ID: "mysql", + Service: "mysql", + Port: 8080, + Address: "198.18.0.2", + }, + }) + require.NoError(t, restore.Commit()) + + idx, count, err := s.NodeCount() + require.NoError(t, err) + require.Equal(t, idx, uint64(9)) + require.Equal(t, count, 1) +} + +func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) { + s := testStateStore(t) + txn := s.db.WriteTxn(1) + + // A single delete change will cause a negative count + changes := Changes{ + Index: 1, + Changes: memdb.Changes{ + { + Table: "nodes", + Before: &structs.Node{}, + After: nil, + }, + }, + } + + err := updateUsage(txn, changes) + require.Error(t, err) + require.Contains(t, err.Error(), "negative count") + + // A insert a change to create a usage entry + changes = Changes{ + Index: 1, + Changes: memdb.Changes{ + { + Table: "nodes", + Before: nil, + After: &structs.Node{}, + }, + }, + } + + err = updateUsage(txn, changes) + require.NoError(t, err) + + // Two deletes will cause a negative count now + changes = Changes{ + Index: 1, + Changes: memdb.Changes{ + { + Table: "nodes", + Before: &structs.Node{}, + After: nil, + }, + { + Table: "nodes", + Before: &structs.Node{}, + After: nil, + }, + }, + } + + err = updateUsage(txn, changes) + require.Error(t, err) + require.Contains(t, err.Error(), "negative count") +}