mirror of
https://github.com/status-im/consul.git
synced 2025-01-11 06:16:08 +00:00
Add new usage memdb table that tracks usage counts of various elements
We update the usage table on Commit() by using the TrackedChanges() API of memdb. Track memdb changes on restore so that usage data can be compiled
This commit is contained in:
parent
f61649f2eb
commit
04705e90f9
@ -654,6 +654,12 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, fedState2, fedStateLoaded2)
|
||||
|
||||
// Verify usage data is correctly updated
|
||||
idx, nodeCount, err := fsm2.state.NodeCount()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(nodes), nodeCount)
|
||||
require.NotZero(t, idx)
|
||||
|
||||
// Snapshot
|
||||
snap, err = fsm2.Snapshot()
|
||||
require.NoError(t, err)
|
||||
|
@ -89,17 +89,20 @@ func (c *changeTrackerDB) publish(changes Changes) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// WriteTxnRestore returns a wrapped RW transaction that does NOT have change
|
||||
// tracking enabled. This should only be used in Restore where we need to
|
||||
// replace the entire contents of the Store without a need to track the changes.
|
||||
// WriteTxnRestore uses a zero index since the whole restore doesn't really occur
|
||||
// at one index - the effect is to write many values that were previously
|
||||
// WriteTxnRestore returns a wrapped RW transaction that should only be used in
|
||||
// Restore where we need to replace the entire contents of the Store.
|
||||
// WriteTxnRestore uses a zero index since the whole restore doesn't really
|
||||
// occur at one index - the effect is to write many values that were previously
|
||||
// written across many indexes.
|
||||
func (c *changeTrackerDB) WriteTxnRestore() *txn {
|
||||
return &txn{
|
||||
t := &txn{
|
||||
Txn: c.db.Txn(true),
|
||||
Index: 0,
|
||||
}
|
||||
|
||||
// We enable change tracking so that usage data is correctly populated.
|
||||
t.Txn.TrackChanges()
|
||||
return t
|
||||
}
|
||||
|
||||
// txn wraps a memdb.Txn to capture changes and send them to the EventPublisher.
|
||||
@ -125,14 +128,21 @@ type txn struct {
|
||||
// by the caller. A non-nil error indicates that a commit failed and was not
|
||||
// applied.
|
||||
func (tx *txn) Commit() error {
|
||||
changes := Changes{
|
||||
Index: tx.Index,
|
||||
Changes: tx.Txn.Changes(),
|
||||
}
|
||||
|
||||
if len(changes.Changes) > 0 {
|
||||
if err := updateUsage(tx, changes); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// publish may be nil if this is a read-only or WriteTxnRestore transaction.
|
||||
// In those cases changes should also be empty, and there will be nothing
|
||||
// to publish.
|
||||
if tx.publish != nil {
|
||||
changes := Changes{
|
||||
Index: tx.Index,
|
||||
Changes: tx.Txn.Changes(),
|
||||
}
|
||||
if err := tx.publish(changes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
170
agent/consul/state/usage.go
Normal file
170
agent/consul/state/usage.go
Normal file
@ -0,0 +1,170 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
// usageTableSchema returns a new table schema used for tracking various indexes
|
||||
// for the Raft log.
|
||||
func usageTableSchema() *memdb.TableSchema {
|
||||
return &memdb.TableSchema{
|
||||
Name: "usage",
|
||||
Indexes: map[string]*memdb.IndexSchema{
|
||||
"id": {
|
||||
Name: "id",
|
||||
AllowMissing: false,
|
||||
Unique: true,
|
||||
Indexer: &memdb.StringFieldIndex{
|
||||
Field: "ID",
|
||||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
registerSchema(usageTableSchema)
|
||||
}
|
||||
|
||||
type UsageEntry struct {
|
||||
ID string
|
||||
Index uint64
|
||||
Count int
|
||||
}
|
||||
|
||||
// updateUsage takes a set of memdb changes and computes a delta for specific
|
||||
// usage metrics that we track.
|
||||
func updateUsage(tx *txn, changes Changes) error {
|
||||
usageDeltas := make(map[string]int)
|
||||
for _, change := range changes.Changes {
|
||||
var delta int
|
||||
if change.Created() {
|
||||
delta = 1
|
||||
} else if change.Deleted() {
|
||||
delta = -1
|
||||
}
|
||||
switch change.Table {
|
||||
case "nodes":
|
||||
usageDeltas[change.Table] += delta
|
||||
case "services":
|
||||
usageDeltas[change.Table] += delta
|
||||
}
|
||||
|
||||
addEnterpriseUsage(usageDeltas, change)
|
||||
}
|
||||
|
||||
idx := changes.Index
|
||||
// This will happen when restoring from a snapshot, just take the max index
|
||||
// of the tables we are tracking.
|
||||
if idx == 0 {
|
||||
idx = maxIndexTxn(tx, "nodes", "services")
|
||||
}
|
||||
|
||||
for id, delta := range usageDeltas {
|
||||
u, err := tx.First("usage", "id", id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to retrieve existing usage entry: %s", err)
|
||||
}
|
||||
|
||||
if u == nil {
|
||||
if delta < 0 {
|
||||
return fmt.Errorf("failed to insert usage entry for %q: delta will cause a negative count", id)
|
||||
}
|
||||
err := tx.Insert("usage", &UsageEntry{
|
||||
ID: id,
|
||||
Count: delta,
|
||||
Index: idx,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update usage entry: %s", err)
|
||||
}
|
||||
} else if cur, ok := u.(*UsageEntry); ok {
|
||||
if cur.Count+delta < 0 {
|
||||
return fmt.Errorf("failed to insert usage entry for %q: delta will cause a negative count", id)
|
||||
}
|
||||
err := tx.Insert("usage", &UsageEntry{
|
||||
ID: id,
|
||||
Count: cur.Count + delta,
|
||||
Index: idx,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update usage entry: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ServiceUsage contains all of the usage data related to services
|
||||
type ServiceUsage struct {
|
||||
Services int
|
||||
ServiceInstances int
|
||||
EnterpriseServiceUsage
|
||||
}
|
||||
|
||||
// NodeCount returns the latest seen Raft index, a count of the number of nodes
|
||||
// registered, and any errors.
|
||||
func (s *Store) NodeCount() (uint64, int, error) {
|
||||
tx := s.db.ReadTxn()
|
||||
defer tx.Abort()
|
||||
|
||||
usage, err := tx.First("usage", "id", "nodes")
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("failed nodes lookup: %s", err)
|
||||
}
|
||||
|
||||
// If no nodes have been registered, the usage entry will not exist.
|
||||
if usage == nil {
|
||||
return 0, 0, nil
|
||||
}
|
||||
|
||||
nodeUsage, ok := usage.(*UsageEntry)
|
||||
if !ok {
|
||||
return 0, 0, fmt.Errorf("failed nodes lookup: type %T is not *UsageEntry", usage)
|
||||
}
|
||||
|
||||
return nodeUsage.Index, nodeUsage.Count, nil
|
||||
}
|
||||
|
||||
// ServiceUsage returns the latest seen Raft index, a compiled set of service
|
||||
// usage data, and any errors.
|
||||
func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) {
|
||||
tx := s.db.ReadTxn()
|
||||
defer tx.Abort()
|
||||
|
||||
usage, err := firstUsageEntry(tx, "services")
|
||||
if err != nil {
|
||||
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
|
||||
}
|
||||
|
||||
results, err := s.compileServiceUsage(tx, usage.Count)
|
||||
if err != nil {
|
||||
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
|
||||
}
|
||||
|
||||
return usage.Index, results, nil
|
||||
}
|
||||
|
||||
func firstUsageEntry(tx *txn, id string) (*UsageEntry, error) {
|
||||
usage, err := tx.First("usage", "id", id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If no elements have been inserted, the usage entry will not exist. We
|
||||
// return a valid value so that can be certain the return value is not nil
|
||||
// when no error has occurred.
|
||||
if usage == nil {
|
||||
return &UsageEntry{ID: id, Count: 0}, nil
|
||||
}
|
||||
|
||||
realUsage, ok := usage.(*UsageEntry)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("failed usage lookup: type %T is not *UsageEntry", usage)
|
||||
}
|
||||
|
||||
return realUsage, nil
|
||||
}
|
33
agent/consul/state/usage_oss.go
Normal file
33
agent/consul/state/usage_oss.go
Normal file
@ -0,0 +1,33 @@
|
||||
// +build !consulent
|
||||
|
||||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
type EnterpriseServiceUsage struct{}
|
||||
|
||||
func addEnterpriseUsage(map[string]int, memdb.Change) {}
|
||||
|
||||
func (s *Store) compileServiceUsage(tx *txn, totalInstances int) (ServiceUsage, error) {
|
||||
var totalServices int
|
||||
results, err := tx.Get(
|
||||
"index",
|
||||
"id_prefix",
|
||||
serviceIndexName("", nil),
|
||||
)
|
||||
if err != nil {
|
||||
return ServiceUsage{}, fmt.Errorf("failed services index lookup: %s", err)
|
||||
}
|
||||
for i := results.Next(); i != nil; i = results.Next() {
|
||||
totalServices += 1
|
||||
}
|
||||
|
||||
return ServiceUsage{
|
||||
Services: totalServices,
|
||||
ServiceInstances: totalInstances,
|
||||
}, nil
|
||||
}
|
25
agent/consul/state/usage_oss_test.go
Normal file
25
agent/consul/state/usage_oss_test.go
Normal file
@ -0,0 +1,25 @@
|
||||
// +build !consulent
|
||||
|
||||
package state
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestStateStore_Usage_ServiceUsage(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
testRegisterNode(t, s, 0, "node1")
|
||||
testRegisterNode(t, s, 1, "node2")
|
||||
testRegisterService(t, s, 8, "node1", "service1")
|
||||
testRegisterService(t, s, 9, "node2", "service1")
|
||||
testRegisterService(t, s, 10, "node2", "service2")
|
||||
|
||||
idx, usage, err := s.ServiceUsage()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, uint64(10))
|
||||
require.Equal(t, 2, usage.Services)
|
||||
require.Equal(t, 3, usage.ServiceInstances)
|
||||
}
|
133
agent/consul/state/usage_test.go
Normal file
133
agent/consul/state/usage_test.go
Normal file
@ -0,0 +1,133 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestStateStore_Usage_NodeCount(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// No nodes have been registered, and thus no usage entry exists
|
||||
idx, count, err := s.NodeCount()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, uint64(0))
|
||||
require.Equal(t, count, 0)
|
||||
|
||||
testRegisterNode(t, s, 0, "node1")
|
||||
testRegisterNode(t, s, 1, "node2")
|
||||
|
||||
idx, count, err = s.NodeCount()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, uint64(1))
|
||||
require.Equal(t, count, 2)
|
||||
}
|
||||
|
||||
func TestStateStore_Usage_NodeCount_Delete(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
testRegisterNode(t, s, 0, "node1")
|
||||
testRegisterNode(t, s, 1, "node2")
|
||||
|
||||
idx, count, err := s.NodeCount()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, uint64(1))
|
||||
require.Equal(t, count, 2)
|
||||
|
||||
require.NoError(t, s.DeleteNode(2, "node2"))
|
||||
idx, count, err = s.NodeCount()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, uint64(2))
|
||||
require.Equal(t, count, 1)
|
||||
}
|
||||
|
||||
func TestStateStore_Usage_ServiceUsageEmpty(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// No services have been registered, and thus no usage entry exists
|
||||
idx, usage, err := s.ServiceUsage()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, uint64(0))
|
||||
require.Equal(t, usage.Services, 0)
|
||||
require.Equal(t, usage.ServiceInstances, 0)
|
||||
}
|
||||
|
||||
func TestStateStore_Usage_Restore(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
restore := s.Restore()
|
||||
restore.Registration(9, &structs.RegisterRequest{
|
||||
Node: "test-node",
|
||||
Service: &structs.NodeService{
|
||||
ID: "mysql",
|
||||
Service: "mysql",
|
||||
Port: 8080,
|
||||
Address: "198.18.0.2",
|
||||
},
|
||||
})
|
||||
require.NoError(t, restore.Commit())
|
||||
|
||||
idx, count, err := s.NodeCount()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, uint64(9))
|
||||
require.Equal(t, count, 1)
|
||||
}
|
||||
|
||||
func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
txn := s.db.WriteTxn(1)
|
||||
|
||||
// A single delete change will cause a negative count
|
||||
changes := Changes{
|
||||
Index: 1,
|
||||
Changes: memdb.Changes{
|
||||
{
|
||||
Table: "nodes",
|
||||
Before: &structs.Node{},
|
||||
After: nil,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err := updateUsage(txn, changes)
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "negative count")
|
||||
|
||||
// A insert a change to create a usage entry
|
||||
changes = Changes{
|
||||
Index: 1,
|
||||
Changes: memdb.Changes{
|
||||
{
|
||||
Table: "nodes",
|
||||
Before: nil,
|
||||
After: &structs.Node{},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err = updateUsage(txn, changes)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Two deletes will cause a negative count now
|
||||
changes = Changes{
|
||||
Index: 1,
|
||||
Changes: memdb.Changes{
|
||||
{
|
||||
Table: "nodes",
|
||||
Before: &structs.Node{},
|
||||
After: nil,
|
||||
},
|
||||
{
|
||||
Table: "nodes",
|
||||
Before: &structs.Node{},
|
||||
After: nil,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err = updateUsage(txn, changes)
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "negative count")
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user