From 0a261682cd0f76cd66249a37aef834669d86169e Mon Sep 17 00:00:00 2001 From: Matt Keeler Date: Fri, 19 Jan 2024 12:22:45 -0500 Subject: [PATCH] Migrate the node health controller to use the cache (#20248) Some edge case error testing had to be removed because it was no longer possible to force errors when going through the cache layer as opposed to the resource service itself. --- .../controllers/nodehealth/controller.go | 63 ++++++++---------- .../controllers/nodehealth/controller_test.go | 66 ++++--------------- 2 files changed, 42 insertions(+), 87 deletions(-) diff --git a/internal/catalog/internal/controllers/nodehealth/controller.go b/internal/catalog/internal/controllers/nodehealth/controller.go index 8502872169..b1b4e54000 100644 --- a/internal/catalog/internal/controllers/nodehealth/controller.go +++ b/internal/catalog/internal/controllers/nodehealth/controller.go @@ -7,19 +7,22 @@ import ( "context" "fmt" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/controller/cache" + "github.com/hashicorp/consul/internal/controller/cache/indexers" "github.com/hashicorp/consul/internal/controller/dependency" "github.com/hashicorp/consul/internal/resource" pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1" "github.com/hashicorp/consul/proto-public/pbresource" ) +const ( + nodeOwnerIndexName = "owner" +) + func NodeHealthController() *controller.Controller { return controller.NewController(StatusKey, pbcatalog.NodeType). - WithWatch(pbcatalog.NodeHealthStatusType, dependency.MapOwnerFiltered(pbcatalog.NodeType)). + WithWatch(pbcatalog.NodeHealthStatusType, dependency.MapOwnerFiltered(pbcatalog.NodeType), indexers.OwnerIndex(nodeOwnerIndexName)). WithReconciler(&nodeHealthReconciler{}) } @@ -33,38 +36,36 @@ func (r *nodeHealthReconciler) Reconcile(ctx context.Context, rt controller.Runt rt.Logger.Trace("reconciling node health") // read the node - rsp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: req.ID}) - switch { - case status.Code(err) == codes.NotFound: - rt.Logger.Trace("node has been deleted") - return nil - case err != nil: - rt.Logger.Error("the resource service has returned an unexpected error", "error", err) + node, err := rt.Cache.Get(pbcatalog.NodeType, "id", req.ID) + if err != nil { + rt.Logger.Error("the cache has returned an unexpected error", "error", err) return err } + if node == nil { + rt.Logger.Trace("node has been deleted") + return nil + } - res := rsp.Resource - - health, err := getNodeHealth(ctx, rt, req.ID) + health, err := getNodeHealth(rt, req.ID) if err != nil { rt.Logger.Error("failed to calculate the nodes health", "error", err) return err } newStatus := &pbresource.Status{ - ObservedGeneration: res.Generation, + ObservedGeneration: node.Generation, Conditions: []*pbresource.Condition{ Conditions[health], }, } - if resource.EqualStatus(res.Status[StatusKey], newStatus, false) { + if resource.EqualStatus(node.Status[StatusKey], newStatus, false) { rt.Logger.Trace("resources node health status is unchanged", "health", health.String()) return nil } _, err = rt.Client.WriteStatus(ctx, &pbresource.WriteStatusRequest{ - Id: res.Id, + Id: node.Id, Key: StatusKey, Status: newStatus, }) @@ -78,30 +79,24 @@ func (r *nodeHealthReconciler) Reconcile(ctx context.Context, rt controller.Runt return nil } -func getNodeHealth(ctx context.Context, rt controller.Runtime, nodeRef *pbresource.ID) (pbcatalog.Health, error) { - rsp, err := rt.Client.ListByOwner(ctx, &pbresource.ListByOwnerRequest{ - Owner: nodeRef, - }) - +func getNodeHealth(rt controller.Runtime, nodeRef *pbresource.ID) (pbcatalog.Health, error) { + iter, err := cache.ListIteratorDecoded[*pbcatalog.NodeHealthStatus](rt.Cache, pbcatalog.NodeHealthStatusType, nodeOwnerIndexName, nodeRef) if err != nil { return pbcatalog.Health_HEALTH_CRITICAL, err } health := pbcatalog.Health_HEALTH_PASSING - for _, res := range rsp.Resources { - if resource.EqualType(res.Id.Type, pbcatalog.NodeHealthStatusType) { - var hs pbcatalog.NodeHealthStatus - if err := res.Data.UnmarshalTo(&hs); err != nil { - // This should be impossible as the resource service + type validations the - // catalog is performing will ensure that no data gets written where unmarshalling - // to this type will error. - return pbcatalog.Health_HEALTH_CRITICAL, fmt.Errorf("error unmarshalling health status data: %w", err) - } + for hs, err := iter.Next(); hs != nil || err != nil; hs, err = iter.Next() { + if err != nil { + // This should be impossible as the resource service + type validations the + // catalog is performing will ensure that no data gets written where unmarshalling + // to this type will error. + return pbcatalog.Health_HEALTH_CRITICAL, fmt.Errorf("error getting decoded health status data: %w", err) + } - if hs.Status > health { - health = hs.Status - } + if hs.Data.Status > health { + health = hs.Data.Status } } diff --git a/internal/catalog/internal/controllers/nodehealth/controller_test.go b/internal/catalog/internal/controllers/nodehealth/controller_test.go index 4219a2f1d2..895e55fff3 100644 --- a/internal/catalog/internal/controllers/nodehealth/controller_test.go +++ b/internal/catalog/internal/controllers/nodehealth/controller_test.go @@ -11,8 +11,6 @@ import ( "github.com/oklog/ulid/v2" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing" "github.com/hashicorp/consul/internal/catalog/internal/types" @@ -70,7 +68,7 @@ type nodeHealthControllerTestSuite struct { resourceClient *resourcetest.Client runtime controller.Runtime - ctl nodeHealthReconciler + ctl *controller.TestController nodeNoHealth *pbresource.ID nodePassing *pbresource.ID @@ -97,30 +95,13 @@ func (suite *nodeHealthControllerTestSuite) SetupTest() { WithTenancies(suite.tenancies...). Run(suite.T()) - suite.resourceClient = resourcetest.NewClient(client) - suite.runtime = controller.Runtime{Client: suite.resourceClient, Logger: testutil.Logger(suite.T())} + suite.ctl = controller.NewTestController(NodeHealthController(), client). + WithLogger(testutil.Logger(suite.T())) + suite.runtime = suite.ctl.Runtime() + suite.resourceClient = resourcetest.NewClient(suite.runtime.Client) suite.isEnterprise = versiontest.IsEnterprise() } -func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthListError() { - suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { - // This resource id references a resource type that will not be - // registered with the resource service. The ListByOwner call - // should produce an InvalidArgument error. This test is meant - // to validate how that error is handled (its propagated back - // to the caller) - ref := resourceID( - &pbresource.Type{Group: "not", GroupVersion: "v1", Kind: "found"}, - "irrelevant", - tenancy, - ) - health, err := getNodeHealth(context.Background(), suite.runtime, ref) - require.Equal(suite.T(), pbcatalog.Health_HEALTH_CRITICAL, health) - require.Error(suite.T(), err) - require.Equal(suite.T(), codes.InvalidArgument, status.Code(err)) - }) -} - func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthNoNode() { suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { // This test is meant to ensure that when the node doesn't exist @@ -131,7 +112,7 @@ func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthNoNode() { Partition: tenancy.Partition, }) ref.Uid = ulid.Make().String() - health, err := getNodeHealth(context.Background(), suite.runtime, ref) + health, err := getNodeHealth(suite.runtime, ref) require.NoError(suite.T(), err) require.Equal(suite.T(), pbcatalog.Health_HEALTH_PASSING, health) @@ -141,7 +122,7 @@ func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthNoNode() { func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthNoStatus() { suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { - health, err := getNodeHealth(context.Background(), suite.runtime, suite.nodeNoHealth) + health, err := getNodeHealth(suite.runtime, suite.nodeNoHealth) require.NoError(suite.T(), err) require.Equal(suite.T(), pbcatalog.Health_HEALTH_PASSING, health) }) @@ -150,7 +131,7 @@ func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthNoStatus() { func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthPassingStatus() { suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { - health, err := getNodeHealth(context.Background(), suite.runtime, suite.nodePassing) + health, err := getNodeHealth(suite.runtime, suite.nodePassing) require.NoError(suite.T(), err) require.Equal(suite.T(), pbcatalog.Health_HEALTH_PASSING, health) }) @@ -159,7 +140,7 @@ func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthPassingStatus() { func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthCriticalStatus() { suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { - health, err := getNodeHealth(context.Background(), suite.runtime, suite.nodeCritical) + health, err := getNodeHealth(suite.runtime, suite.nodeCritical) require.NoError(suite.T(), err) require.Equal(suite.T(), pbcatalog.Health_HEALTH_CRITICAL, health) }) @@ -168,7 +149,7 @@ func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthCriticalStatus() { func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthWarningStatus() { suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { - health, err := getNodeHealth(context.Background(), suite.runtime, suite.nodeWarning) + health, err := getNodeHealth(suite.runtime, suite.nodeWarning) require.NoError(suite.T(), err) require.Equal(suite.T(), pbcatalog.Health_HEALTH_WARNING, health) }) @@ -177,7 +158,7 @@ func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthWarningStatus() { func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthMaintenanceStatus() { suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { - health, err := getNodeHealth(context.Background(), suite.runtime, suite.nodeMaintenance) + health, err := getNodeHealth(suite.runtime, suite.nodeMaintenance) require.NoError(suite.T(), err) require.Equal(suite.T(), pbcatalog.Health_HEALTH_MAINTENANCE, health) }) @@ -187,7 +168,7 @@ func (suite *nodeHealthControllerTestSuite) TestReconcileNodeNotFound() { suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { // This test ensures that removed nodes are ignored. In particular we don't // want to propagate the error and indefinitely keep re-reconciling in this case. - err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{ + err := suite.ctl.Reconcile(context.Background(), controller.Request{ ID: resourceID(pbcatalog.NodeType, "not-found", &pbresource.Tenancy{ Partition: tenancy.Partition, }), @@ -196,31 +177,10 @@ func (suite *nodeHealthControllerTestSuite) TestReconcileNodeNotFound() { }) } -func (suite *nodeHealthControllerTestSuite) TestReconcilePropagateReadError() { - suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { - // This test aims to ensure that errors other than NotFound errors coming - // from the initial resource read get propagated. This case is very unrealistic - // as the controller should not have given us a request ID for a resource type - // that doesn't exist but this was the easiest way I could think of to synthesize - // a Read error. - ref := resourceID( - &pbresource.Type{Group: "not", GroupVersion: "v1", Kind: "found"}, - "irrelevant", - tenancy, - ) - - err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{ - ID: ref, - }) - require.Error(suite.T(), err) - require.Equal(suite.T(), codes.InvalidArgument, status.Code(err)) - }) -} - func (suite *nodeHealthControllerTestSuite) testReconcileStatus(id *pbresource.ID, expectedStatus *pbresource.Condition) *pbresource.Resource { suite.T().Helper() - err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{ + err := suite.ctl.Reconcile(context.Background(), controller.Request{ ID: id, }) require.NoError(suite.T(), err)