From 0712e03f337d56be60f71e7eadaf5c259170f28c Mon Sep 17 00:00:00 2001 From: Chris Piraino Date: Tue, 12 Jan 2021 15:31:47 -0600 Subject: [PATCH] Fix bug in usage metrics when multiple service instances are changed in a single transaction (#9440) * Fix bug in usage metrics that caused a negative count to occur There were a couple of instances were usage metrics would do the wrong thing and result in incorrect counts, causing the count to attempt to decrement below zero and return an error. The usage metrics did not account for various places where a single transaction could delete/update/add multiple service instances at once. We also remove the error when attempting to decrement below zero, and instead just make sure we do not accidentally underflow the unsigned integer. This is a more graceful failure than returning an error and not allowing a transaction to commit. * Add changelog --- .changelog/9440.txt | 3 + agent/consul/state/usage.go | 130 +++++++++++++++++-------------- agent/consul/state/usage_oss.go | 5 +- agent/consul/state/usage_test.go | 68 +++++++++++++++- 4 files changed, 142 insertions(+), 64 deletions(-) create mode 100644 .changelog/9440.txt diff --git a/.changelog/9440.txt b/.changelog/9440.txt new file mode 100644 index 0000000000..7c253c033a --- /dev/null +++ b/.changelog/9440.txt @@ -0,0 +1,3 @@ +```release-note:bug +state: fix computation of usage metrics to account for various places that can modify multiple services in a single transaction. +``` diff --git a/agent/consul/state/usage.go b/agent/consul/state/usage.go index 6e43f3729f..bde67d127d 100644 --- a/agent/consul/state/usage.go +++ b/agent/consul/state/usage.go @@ -61,6 +61,7 @@ const ( // usage metrics that we track. func updateUsage(tx WriteTxn, changes Changes) error { usageDeltas := make(map[string]int) + serviceNameChanges := make(map[structs.ServiceName]int) for _, change := range changes.Changes { var delta int if change.Created() { @@ -75,65 +76,27 @@ func updateUsage(tx WriteTxn, changes Changes) error { case "services": svc := changeObject(change).(*structs.ServiceNode) usageDeltas[change.Table] += delta - serviceIter, err := getWithTxn(tx, servicesTableName, "service", svc.ServiceName, &svc.EnterpriseMeta) - if err != nil { - return err + addEnterpriseServiceInstanceUsage(usageDeltas, change) + + // Construct a mapping of all of the various service names that were + // changed, in order to compare it with the finished memdb state. + // Make sure to account for the fact that services can change their names. + if serviceNameChanged(change) { + serviceNameChanges[svc.CompoundServiceName()] += 1 + before := change.Before.(*structs.ServiceNode) + serviceNameChanges[before.CompoundServiceName()] -= 1 + } else { + serviceNameChanges[svc.CompoundServiceName()] += delta } - - var serviceState uniqueServiceState - if serviceIter.Next() == nil { - // If no services exist, we know we deleted the last service - // instance. - serviceState = Deleted - usageDeltas[serviceNamesUsageTable] -= 1 - } else if serviceIter.Next() == nil { - // If a second call to Next() returns nil, we know only a single - // instance exists. If, in addition, a new service name has been - // registered, either via creating a new service instance or via - // renaming an existing service, than we update our service count. - // - // We only care about two cases here: - // 1. A new service instance has been created with a unique name - // 2. An existing service instance has been updated with a new unique name - // - // These are the only ways a new unique service can be created. The - // other valid cases here: an update that does not change the service - // name, and a deletion, both do not impact the count of unique service - // names in the system. - - if change.Created() { - // Given a single existing service instance of the service: If a - // service has just been created, then we know this is a new unique - // service. - serviceState = Created - usageDeltas[serviceNamesUsageTable] += 1 - } else if serviceNameChanged(change) { - // Given a single existing service instance of the service: If a - // service has been updated with a new service name, then we know - // this is a new unique service. - serviceState = Created - usageDeltas[serviceNamesUsageTable] += 1 - - // Check whether the previous name was deleted in this rename, this - // is a special case of renaming a service which does not result in - // changing the count of unique service names. - before := change.Before.(*structs.ServiceNode) - beforeSvc, err := firstWithTxn(tx, servicesTableName, "service", before.ServiceName, &before.EnterpriseMeta) - if err != nil { - return err - } - if beforeSvc == nil { - usageDeltas[serviceNamesUsageTable] -= 1 - // set serviceState to NoChange since we have both gained and lost a - // service, cancelling each other out - serviceState = NoChange - } - } - } - addEnterpriseServiceUsage(usageDeltas, change, serviceState) } } + serviceStates, err := updateServiceNameUsage(tx, usageDeltas, serviceNameChanges) + if err != nil { + return err + } + addEnterpriseServiceUsage(usageDeltas, serviceStates) + idx := changes.Index // This will happen when restoring from a snapshot, just take the max index // of the tables we are tracking. @@ -144,6 +107,46 @@ func updateUsage(tx WriteTxn, changes Changes) error { return writeUsageDeltas(tx, idx, usageDeltas) } +func updateServiceNameUsage(tx WriteTxn, usageDeltas map[string]int, serviceNameChanges map[structs.ServiceName]int) (map[structs.ServiceName]uniqueServiceState, error) { + serviceStates := make(map[structs.ServiceName]uniqueServiceState, len(serviceNameChanges)) + for svc, delta := range serviceNameChanges { + serviceIter, err := getWithTxn(tx, servicesTableName, "service", svc.Name, &svc.EnterpriseMeta) + if err != nil { + return nil, err + } + + // Count the number of service instances associated with the given service + // name at the end of this transaction, and compare that with how many were + // added/removed during the transaction. This allows us to handle a single + // transaction committing multiple changes related to a single service + // name. + var svcCount int + for service := serviceIter.Next(); service != nil; service = serviceIter.Next() { + svcCount += 1 + } + + var serviceState uniqueServiceState + switch { + case svcCount == 0: + // If no services exist, we know we deleted the last service + // instance. + serviceState = Deleted + usageDeltas[serviceNamesUsageTable] -= 1 + case svcCount == delta: + // If the current number of service instances equals the number added, + // than we know we created a new service name. + serviceState = Created + usageDeltas[serviceNamesUsageTable] += 1 + default: + serviceState = NoChange + } + + serviceStates[svc] = serviceState + } + + return serviceStates, nil +} + // serviceNameChanged returns a boolean that indicates whether the // provided change resulted in an update to the service's service name. func serviceNameChanged(change memdb.Change) bool { @@ -168,7 +171,11 @@ func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error if u == nil { if delta < 0 { - return fmt.Errorf("failed to insert usage entry for %q: delta will cause a negative count", id) + // Don't return an error here, since we don't want to block updates + // from happening to the state store. But, set the delta to 0 so that + // we do not accidentally underflow the uint64 and begin reporting + // large numbers. + delta = 0 } err := tx.Insert("usage", &UsageEntry{ ID: id, @@ -179,12 +186,17 @@ func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error return fmt.Errorf("failed to update usage entry: %s", err) } } else if cur, ok := u.(*UsageEntry); ok { - if cur.Count+delta < 0 { - return fmt.Errorf("failed to insert usage entry for %q: delta will cause a negative count", id) + updated := cur.Count + delta + if updated < 0 { + // Don't return an error here, since we don't want to block updates + // from happening to the state store. But, set the delta to 0 so that + // we do not accidentally underflow the uint64 and begin reporting + // large numbers. + updated = 0 } err := tx.Insert("usage", &UsageEntry{ ID: id, - Count: cur.Count + delta, + Count: updated, Index: idx, }) if err != nil { diff --git a/agent/consul/state/usage_oss.go b/agent/consul/state/usage_oss.go index f35576abf5..355f1fbcc4 100644 --- a/agent/consul/state/usage_oss.go +++ b/agent/consul/state/usage_oss.go @@ -3,12 +3,15 @@ package state import ( + "github.com/hashicorp/consul/agent/structs" memdb "github.com/hashicorp/go-memdb" ) type EnterpriseServiceUsage struct{} -func addEnterpriseServiceUsage(map[string]int, memdb.Change, uniqueServiceState) {} +func addEnterpriseServiceInstanceUsage(map[string]int, memdb.Change) {} + +func addEnterpriseServiceUsage(map[string]int, map[structs.ServiceName]uniqueServiceState) {} func compileEnterpriseUsage(tx ReadTxn, usage ServiceUsage) (ServiceUsage, error) { return usage, nil diff --git a/agent/consul/state/usage_test.go b/agent/consul/state/usage_test.go index f608d7d75c..3cb0b440ef 100644 --- a/agent/consul/state/usage_test.go +++ b/agent/consul/state/usage_test.go @@ -55,6 +55,43 @@ func TestStateStore_Usage_ServiceUsageEmpty(t *testing.T) { require.Equal(t, usage.ServiceInstances, 0) } +func TestStateStore_Usage_ServiceUsage_DeleteNode(t *testing.T) { + s := testStateStore(t) + testRegisterNode(t, s, 1, "node1") + + svc1 := &structs.NodeService{ + ID: "service1", + Service: "test", + Address: "1.1.1.1", + Port: 1111, + } + svc2 := &structs.NodeService{ + ID: "service2", + Service: "test", + Address: "1.1.1.1", + Port: 1111, + } + + // Register multiple instances on a single node to test that we do not + // double count deletions within the same transaction. + require.NoError(t, s.EnsureService(1, "node1", svc1)) + require.NoError(t, s.EnsureService(2, "node1", svc2)) + + idx, usage, err := s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(2)) + require.Equal(t, usage.Services, 1) + require.Equal(t, usage.ServiceInstances, 2) + + require.NoError(t, s.DeleteNode(3, "node1")) + + idx, usage, err = s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(3)) + require.Equal(t, usage.Services, 0) + require.Equal(t, usage.ServiceInstances, 0) +} + func TestStateStore_Usage_Restore(t *testing.T) { s := testStateStore(t) restore := s.Restore() @@ -67,12 +104,27 @@ func TestStateStore_Usage_Restore(t *testing.T) { Address: "198.18.0.2", }, }) + restore.Registration(9, &structs.RegisterRequest{ + Node: "test-node", + Service: &structs.NodeService{ + ID: "mysql1", + Service: "mysql", + Port: 8081, + Address: "198.18.0.2", + }, + }) require.NoError(t, restore.Commit()) idx, count, err := s.NodeCount() require.NoError(t, err) require.Equal(t, idx, uint64(9)) require.Equal(t, count, 1) + + idx, usage, err := s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(9)) + require.Equal(t, usage.Services, 1) + require.Equal(t, usage.ServiceInstances, 2) } func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) { @@ -92,8 +144,12 @@ func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) { } err := updateUsage(txn, changes) - require.Error(t, err) - require.Contains(t, err.Error(), "negative count") + require.NoError(t, err) + + // Check that we do not underflow + u, err := txn.First("usage", "id", "nodes") + require.NoError(t, err) + require.Equal(t, 0, u.(*UsageEntry).Count) // A insert a change to create a usage entry changes = Changes{ @@ -128,8 +184,12 @@ func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) { } err = updateUsage(txn, changes) - require.Error(t, err) - require.Contains(t, err.Error(), "negative count") + require.NoError(t, err) + + // Check that we do not underflow + u, err = txn.First("usage", "id", "nodes") + require.NoError(t, err) + require.Equal(t, 0, u.(*UsageEntry).Count) } func TestStateStore_Usage_ServiceUsage_updatingServiceName(t *testing.T) {