From 04705e90f9c907a09c727714c89f9ec4e413a312 Mon Sep 17 00:00:00 2001 From: Chris Piraino Date: Wed, 2 Sep 2020 10:24:16 -0500 Subject: [PATCH] Add new usage memdb table that tracks usage counts of various elements We update the usage table on Commit() by using the TrackedChanges() API of memdb. Track memdb changes on restore so that usage data can be compiled --- agent/consul/fsm/snapshot_oss_test.go | 6 + agent/consul/state/memdb.go | 30 +++-- agent/consul/state/usage.go | 170 ++++++++++++++++++++++++++ agent/consul/state/usage_oss.go | 33 +++++ agent/consul/state/usage_oss_test.go | 25 ++++ agent/consul/state/usage_test.go | 133 ++++++++++++++++++++ 6 files changed, 387 insertions(+), 10 deletions(-) create mode 100644 agent/consul/state/usage.go create mode 100644 agent/consul/state/usage_oss.go create mode 100644 agent/consul/state/usage_oss_test.go create mode 100644 agent/consul/state/usage_test.go diff --git a/agent/consul/fsm/snapshot_oss_test.go b/agent/consul/fsm/snapshot_oss_test.go index e845c41c91..f798a0efaa 100644 --- a/agent/consul/fsm/snapshot_oss_test.go +++ b/agent/consul/fsm/snapshot_oss_test.go @@ -654,6 +654,12 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { require.NoError(t, err) require.Equal(t, fedState2, fedStateLoaded2) + // Verify usage data is correctly updated + idx, nodeCount, err := fsm2.state.NodeCount() + require.NoError(t, err) + require.Equal(t, len(nodes), nodeCount) + require.NotZero(t, idx) + // Snapshot snap, err = fsm2.Snapshot() require.NoError(t, err) diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index be4f4348e2..5cdfd19dd1 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -89,17 +89,20 @@ func (c *changeTrackerDB) publish(changes Changes) error { return nil } -// WriteTxnRestore returns a wrapped RW transaction that does NOT have change -// tracking enabled. This should only be used in Restore where we need to -// replace the entire contents of the Store without a need to track the changes. -// WriteTxnRestore uses a zero index since the whole restore doesn't really occur -// at one index - the effect is to write many values that were previously +// WriteTxnRestore returns a wrapped RW transaction that should only be used in +// Restore where we need to replace the entire contents of the Store. +// WriteTxnRestore uses a zero index since the whole restore doesn't really +// occur at one index - the effect is to write many values that were previously // written across many indexes. func (c *changeTrackerDB) WriteTxnRestore() *txn { - return &txn{ + t := &txn{ Txn: c.db.Txn(true), Index: 0, } + + // We enable change tracking so that usage data is correctly populated. + t.Txn.TrackChanges() + return t } // txn wraps a memdb.Txn to capture changes and send them to the EventPublisher. @@ -125,14 +128,21 @@ type txn struct { // by the caller. A non-nil error indicates that a commit failed and was not // applied. func (tx *txn) Commit() error { + changes := Changes{ + Index: tx.Index, + Changes: tx.Txn.Changes(), + } + + if len(changes.Changes) > 0 { + if err := updateUsage(tx, changes); err != nil { + return err + } + } + // publish may be nil if this is a read-only or WriteTxnRestore transaction. // In those cases changes should also be empty, and there will be nothing // to publish. if tx.publish != nil { - changes := Changes{ - Index: tx.Index, - Changes: tx.Txn.Changes(), - } if err := tx.publish(changes); err != nil { return err } diff --git a/agent/consul/state/usage.go b/agent/consul/state/usage.go new file mode 100644 index 0000000000..397e157f37 --- /dev/null +++ b/agent/consul/state/usage.go @@ -0,0 +1,170 @@ +package state + +import ( + "fmt" + + memdb "github.com/hashicorp/go-memdb" +) + +// usageTableSchema returns a new table schema used for tracking various indexes +// for the Raft log. +func usageTableSchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: "usage", + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + AllowMissing: false, + Unique: true, + Indexer: &memdb.StringFieldIndex{ + Field: "ID", + Lowercase: true, + }, + }, + }, + } +} + +func init() { + registerSchema(usageTableSchema) +} + +type UsageEntry struct { + ID string + Index uint64 + Count int +} + +// updateUsage takes a set of memdb changes and computes a delta for specific +// usage metrics that we track. +func updateUsage(tx *txn, changes Changes) error { + usageDeltas := make(map[string]int) + for _, change := range changes.Changes { + var delta int + if change.Created() { + delta = 1 + } else if change.Deleted() { + delta = -1 + } + switch change.Table { + case "nodes": + usageDeltas[change.Table] += delta + case "services": + usageDeltas[change.Table] += delta + } + + addEnterpriseUsage(usageDeltas, change) + } + + idx := changes.Index + // This will happen when restoring from a snapshot, just take the max index + // of the tables we are tracking. + if idx == 0 { + idx = maxIndexTxn(tx, "nodes", "services") + } + + for id, delta := range usageDeltas { + u, err := tx.First("usage", "id", id) + if err != nil { + return fmt.Errorf("failed to retrieve existing usage entry: %s", err) + } + + if u == nil { + if delta < 0 { + return fmt.Errorf("failed to insert usage entry for %q: delta will cause a negative count", id) + } + err := tx.Insert("usage", &UsageEntry{ + ID: id, + Count: delta, + Index: idx, + }) + if err != nil { + return fmt.Errorf("failed to update usage entry: %s", err) + } + } else if cur, ok := u.(*UsageEntry); ok { + if cur.Count+delta < 0 { + return fmt.Errorf("failed to insert usage entry for %q: delta will cause a negative count", id) + } + err := tx.Insert("usage", &UsageEntry{ + ID: id, + Count: cur.Count + delta, + Index: idx, + }) + if err != nil { + return fmt.Errorf("failed to update usage entry: %s", err) + } + } + } + return nil +} + +// ServiceUsage contains all of the usage data related to services +type ServiceUsage struct { + Services int + ServiceInstances int + EnterpriseServiceUsage +} + +// NodeCount returns the latest seen Raft index, a count of the number of nodes +// registered, and any errors. +func (s *Store) NodeCount() (uint64, int, error) { + tx := s.db.ReadTxn() + defer tx.Abort() + + usage, err := tx.First("usage", "id", "nodes") + if err != nil { + return 0, 0, fmt.Errorf("failed nodes lookup: %s", err) + } + + // If no nodes have been registered, the usage entry will not exist. + if usage == nil { + return 0, 0, nil + } + + nodeUsage, ok := usage.(*UsageEntry) + if !ok { + return 0, 0, fmt.Errorf("failed nodes lookup: type %T is not *UsageEntry", usage) + } + + return nodeUsage.Index, nodeUsage.Count, nil +} + +// ServiceUsage returns the latest seen Raft index, a compiled set of service +// usage data, and any errors. +func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) { + tx := s.db.ReadTxn() + defer tx.Abort() + + usage, err := firstUsageEntry(tx, "services") + if err != nil { + return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) + } + + results, err := s.compileServiceUsage(tx, usage.Count) + if err != nil { + return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) + } + + return usage.Index, results, nil +} + +func firstUsageEntry(tx *txn, id string) (*UsageEntry, error) { + usage, err := tx.First("usage", "id", id) + if err != nil { + return nil, err + } + + // If no elements have been inserted, the usage entry will not exist. We + // return a valid value so that can be certain the return value is not nil + // when no error has occurred. + if usage == nil { + return &UsageEntry{ID: id, Count: 0}, nil + } + + realUsage, ok := usage.(*UsageEntry) + if !ok { + return nil, fmt.Errorf("failed usage lookup: type %T is not *UsageEntry", usage) + } + + return realUsage, nil +} diff --git a/agent/consul/state/usage_oss.go b/agent/consul/state/usage_oss.go new file mode 100644 index 0000000000..ec54313d56 --- /dev/null +++ b/agent/consul/state/usage_oss.go @@ -0,0 +1,33 @@ +// +build !consulent + +package state + +import ( + "fmt" + + memdb "github.com/hashicorp/go-memdb" +) + +type EnterpriseServiceUsage struct{} + +func addEnterpriseUsage(map[string]int, memdb.Change) {} + +func (s *Store) compileServiceUsage(tx *txn, totalInstances int) (ServiceUsage, error) { + var totalServices int + results, err := tx.Get( + "index", + "id_prefix", + serviceIndexName("", nil), + ) + if err != nil { + return ServiceUsage{}, fmt.Errorf("failed services index lookup: %s", err) + } + for i := results.Next(); i != nil; i = results.Next() { + totalServices += 1 + } + + return ServiceUsage{ + Services: totalServices, + ServiceInstances: totalInstances, + }, nil +} diff --git a/agent/consul/state/usage_oss_test.go b/agent/consul/state/usage_oss_test.go new file mode 100644 index 0000000000..b441c71635 --- /dev/null +++ b/agent/consul/state/usage_oss_test.go @@ -0,0 +1,25 @@ +// +build !consulent + +package state + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestStateStore_Usage_ServiceUsage(t *testing.T) { + s := testStateStore(t) + + testRegisterNode(t, s, 0, "node1") + testRegisterNode(t, s, 1, "node2") + testRegisterService(t, s, 8, "node1", "service1") + testRegisterService(t, s, 9, "node2", "service1") + testRegisterService(t, s, 10, "node2", "service2") + + idx, usage, err := s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(10)) + require.Equal(t, 2, usage.Services) + require.Equal(t, 3, usage.ServiceInstances) +} diff --git a/agent/consul/state/usage_test.go b/agent/consul/state/usage_test.go new file mode 100644 index 0000000000..a1c07f6546 --- /dev/null +++ b/agent/consul/state/usage_test.go @@ -0,0 +1,133 @@ +package state + +import ( + "testing" + + "github.com/hashicorp/consul/agent/structs" + memdb "github.com/hashicorp/go-memdb" + "github.com/stretchr/testify/require" +) + +func TestStateStore_Usage_NodeCount(t *testing.T) { + s := testStateStore(t) + + // No nodes have been registered, and thus no usage entry exists + idx, count, err := s.NodeCount() + require.NoError(t, err) + require.Equal(t, idx, uint64(0)) + require.Equal(t, count, 0) + + testRegisterNode(t, s, 0, "node1") + testRegisterNode(t, s, 1, "node2") + + idx, count, err = s.NodeCount() + require.NoError(t, err) + require.Equal(t, idx, uint64(1)) + require.Equal(t, count, 2) +} + +func TestStateStore_Usage_NodeCount_Delete(t *testing.T) { + s := testStateStore(t) + + testRegisterNode(t, s, 0, "node1") + testRegisterNode(t, s, 1, "node2") + + idx, count, err := s.NodeCount() + require.NoError(t, err) + require.Equal(t, idx, uint64(1)) + require.Equal(t, count, 2) + + require.NoError(t, s.DeleteNode(2, "node2")) + idx, count, err = s.NodeCount() + require.NoError(t, err) + require.Equal(t, idx, uint64(2)) + require.Equal(t, count, 1) +} + +func TestStateStore_Usage_ServiceUsageEmpty(t *testing.T) { + s := testStateStore(t) + + // No services have been registered, and thus no usage entry exists + idx, usage, err := s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(0)) + require.Equal(t, usage.Services, 0) + require.Equal(t, usage.ServiceInstances, 0) +} + +func TestStateStore_Usage_Restore(t *testing.T) { + s := testStateStore(t) + restore := s.Restore() + restore.Registration(9, &structs.RegisterRequest{ + Node: "test-node", + Service: &structs.NodeService{ + ID: "mysql", + Service: "mysql", + Port: 8080, + Address: "198.18.0.2", + }, + }) + require.NoError(t, restore.Commit()) + + idx, count, err := s.NodeCount() + require.NoError(t, err) + require.Equal(t, idx, uint64(9)) + require.Equal(t, count, 1) +} + +func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) { + s := testStateStore(t) + txn := s.db.WriteTxn(1) + + // A single delete change will cause a negative count + changes := Changes{ + Index: 1, + Changes: memdb.Changes{ + { + Table: "nodes", + Before: &structs.Node{}, + After: nil, + }, + }, + } + + err := updateUsage(txn, changes) + require.Error(t, err) + require.Contains(t, err.Error(), "negative count") + + // A insert a change to create a usage entry + changes = Changes{ + Index: 1, + Changes: memdb.Changes{ + { + Table: "nodes", + Before: nil, + After: &structs.Node{}, + }, + }, + } + + err = updateUsage(txn, changes) + require.NoError(t, err) + + // Two deletes will cause a negative count now + changes = Changes{ + Index: 1, + Changes: memdb.Changes{ + { + Table: "nodes", + Before: &structs.Node{}, + After: nil, + }, + { + Table: "nodes", + Before: &structs.Node{}, + After: nil, + }, + }, + } + + err = updateUsage(txn, changes) + require.Error(t, err) + require.Contains(t, err.Error(), "negative count") +}