Update workload health controller to use the controller cache (#20240)

This commit is contained in:
Matt Keeler 2024-01-18 16:30:11 -05:00 committed by GitHub
parent 7888d00e49
commit 9897be76ad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 171 additions and 460 deletions

View File

@ -92,26 +92,26 @@ func VerifyCatalogV2Beta1IntegrationTestResults(t *testing.T, client pbresource.
})
testutil.RunStep(t, "workload-health-reconciliation", func(t *testing.T) {
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-1").ID(), workloadhealth.StatusKey, workloadhealth.ConditionNodeAndWorkloadPassing)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-2").ID(), workloadhealth.StatusKey, workloadhealth.ConditionWorkloadWarning)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-3").ID(), workloadhealth.StatusKey, workloadhealth.ConditionWorkloadCritical)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-4").ID(), workloadhealth.StatusKey, workloadhealth.ConditionWorkloadMaintenance)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-5").ID(), workloadhealth.StatusKey, workloadhealth.ConditionNodeWarning)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-6").ID(), workloadhealth.StatusKey, workloadhealth.ConditionNodeAndWorkloadWarning)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-7").ID(), workloadhealth.StatusKey, workloadhealth.ConditionNodeAndWorkloadCritical)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-8").ID(), workloadhealth.StatusKey, workloadhealth.ConditionNodeAndWorkloadMaintenance)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-9").ID(), workloadhealth.StatusKey, workloadhealth.ConditionNodeCritical)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-10").ID(), workloadhealth.StatusKey, workloadhealth.ConditionNodeAndWorkloadCritical)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-11").ID(), workloadhealth.StatusKey, workloadhealth.ConditionNodeAndWorkloadCritical)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-12").ID(), workloadhealth.StatusKey, workloadhealth.ConditionNodeAndWorkloadMaintenance)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-13").ID(), workloadhealth.StatusKey, workloadhealth.ConditionNodeMaintenance)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-14").ID(), workloadhealth.StatusKey, workloadhealth.ConditionNodeAndWorkloadMaintenance)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-15").ID(), workloadhealth.StatusKey, workloadhealth.ConditionNodeAndWorkloadMaintenance)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-16").ID(), workloadhealth.StatusKey, workloadhealth.ConditionNodeAndWorkloadMaintenance)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-17").ID(), workloadhealth.StatusKey, workloadhealth.ConditionWorkloadPassing)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-18").ID(), workloadhealth.StatusKey, workloadhealth.ConditionWorkloadWarning)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-19").ID(), workloadhealth.StatusKey, workloadhealth.ConditionWorkloadCritical)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-20").ID(), workloadhealth.StatusKey, workloadhealth.ConditionWorkloadMaintenance)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-1").ID(), workloadhealth.ControllerID, workloadhealth.ConditionNodeAndWorkloadPassing)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-2").ID(), workloadhealth.ControllerID, workloadhealth.ConditionWorkloadWarning)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-3").ID(), workloadhealth.ControllerID, workloadhealth.ConditionWorkloadCritical)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-4").ID(), workloadhealth.ControllerID, workloadhealth.ConditionWorkloadMaintenance)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-5").ID(), workloadhealth.ControllerID, workloadhealth.ConditionNodeWarning)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-6").ID(), workloadhealth.ControllerID, workloadhealth.ConditionNodeAndWorkloadWarning)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-7").ID(), workloadhealth.ControllerID, workloadhealth.ConditionNodeAndWorkloadCritical)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-8").ID(), workloadhealth.ControllerID, workloadhealth.ConditionNodeAndWorkloadMaintenance)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-9").ID(), workloadhealth.ControllerID, workloadhealth.ConditionNodeCritical)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-10").ID(), workloadhealth.ControllerID, workloadhealth.ConditionNodeAndWorkloadCritical)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-11").ID(), workloadhealth.ControllerID, workloadhealth.ConditionNodeAndWorkloadCritical)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-12").ID(), workloadhealth.ControllerID, workloadhealth.ConditionNodeAndWorkloadMaintenance)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-13").ID(), workloadhealth.ControllerID, workloadhealth.ConditionNodeMaintenance)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-14").ID(), workloadhealth.ControllerID, workloadhealth.ConditionNodeAndWorkloadMaintenance)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-15").ID(), workloadhealth.ControllerID, workloadhealth.ConditionNodeAndWorkloadMaintenance)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-16").ID(), workloadhealth.ControllerID, workloadhealth.ConditionNodeAndWorkloadMaintenance)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-17").ID(), workloadhealth.ControllerID, workloadhealth.ConditionWorkloadPassing)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-18").ID(), workloadhealth.ControllerID, workloadhealth.ConditionWorkloadWarning)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-19").ID(), workloadhealth.ControllerID, workloadhealth.ConditionWorkloadCritical)
c.WaitForStatusCondition(t, rtest.Resource(pbcatalog.WorkloadType, "api-20").ID(), workloadhealth.ControllerID, workloadhealth.ConditionWorkloadMaintenance)
})
testutil.RunStep(t, "service-reconciliation", func(t *testing.T) {

View File

@ -10,7 +10,6 @@ import (
"github.com/hashicorp/consul/internal/catalog/internal/controllers/nodehealth"
"github.com/hashicorp/consul/internal/catalog/internal/controllers/workloadhealth"
"github.com/hashicorp/consul/internal/catalog/internal/mappers/failovermapper"
"github.com/hashicorp/consul/internal/catalog/internal/mappers/nodemapper"
"github.com/hashicorp/consul/internal/catalog/internal/types"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/resource"
@ -25,7 +24,7 @@ var (
NodeHealthStatusConditionHealthy = nodehealth.StatusConditionHealthy
NodeHealthConditions = nodehealth.Conditions
WorkloadHealthStatusKey = workloadhealth.StatusKey
WorkloadHealthStatusKey = workloadhealth.ControllerID
WorkloadHealthStatusConditionHealthy = workloadhealth.StatusConditionHealthy
WorkloadHealthConditions = workloadhealth.WorkloadConditions
WorkloadAndNodeHealthConditions = workloadhealth.NodeAndWorkloadConditions
@ -64,9 +63,8 @@ type ControllerDependencies = controllers.Dependencies
func DefaultControllerDependencies() ControllerDependencies {
return ControllerDependencies{
WorkloadHealthNodeMapper: nodemapper.New(),
EndpointsWorkloadMapper: selectiontracker.New(),
FailoverMapper: failovermapper.New(),
EndpointsWorkloadMapper: selectiontracker.New(),
FailoverMapper: failovermapper.New(),
}
}

View File

@ -235,7 +235,7 @@ func (r *serviceEndpointsReconciler) Reconcile(ctx context.Context, rt controlle
// or the status isn't in the expected form then this function will return
// HEALTH_CRITICAL.
func determineWorkloadHealth(workload *pbresource.Resource) pbcatalog.Health {
status, found := workload.Status[workloadhealth.StatusKey]
status, found := workload.Status[workloadhealth.ControllerID]
if !found {
return pbcatalog.Health_HEALTH_CRITICAL
}

View File

@ -311,7 +311,7 @@ func TestDetermineWorkloadHealth(t *testing.T) {
},
"condition-not-found": {
res: rtest.Resource(pbcatalog.WorkloadType, "foo").
WithStatus(workloadhealth.StatusKey, &pbresource.Status{
WithStatus(workloadhealth.ControllerID, &pbresource.Status{
Conditions: []*pbresource.Condition{
{
Type: "other",
@ -325,7 +325,7 @@ func TestDetermineWorkloadHealth(t *testing.T) {
},
"invalid-reason": {
res: rtest.Resource(pbcatalog.WorkloadType, "foo").
WithStatus(workloadhealth.StatusKey, &pbresource.Status{
WithStatus(workloadhealth.ControllerID, &pbresource.Status{
Conditions: []*pbresource.Condition{
{
Type: workloadhealth.StatusConditionHealthy,
@ -339,7 +339,7 @@ func TestDetermineWorkloadHealth(t *testing.T) {
},
"passing": {
res: rtest.Resource(pbcatalog.WorkloadType, "foo").
WithStatus(workloadhealth.StatusKey, &pbresource.Status{
WithStatus(workloadhealth.ControllerID, &pbresource.Status{
Conditions: []*pbresource.Condition{
{
Type: workloadhealth.StatusConditionHealthy,
@ -353,7 +353,7 @@ func TestDetermineWorkloadHealth(t *testing.T) {
},
"warning": {
res: rtest.Resource(pbcatalog.WorkloadType, "foo").
WithStatus(workloadhealth.StatusKey, &pbresource.Status{
WithStatus(workloadhealth.ControllerID, &pbresource.Status{
Conditions: []*pbresource.Condition{
{
Type: workloadhealth.StatusConditionHealthy,
@ -367,7 +367,7 @@ func TestDetermineWorkloadHealth(t *testing.T) {
},
"critical": {
res: rtest.Resource(pbcatalog.WorkloadType, "foo").
WithStatus(workloadhealth.StatusKey, &pbresource.Status{
WithStatus(workloadhealth.ControllerID, &pbresource.Status{
Conditions: []*pbresource.Condition{
{
Type: workloadhealth.StatusConditionHealthy,
@ -381,7 +381,7 @@ func TestDetermineWorkloadHealth(t *testing.T) {
},
"maintenance": {
res: rtest.Resource(pbcatalog.WorkloadType, "foo").
WithStatus(workloadhealth.StatusKey, &pbresource.Status{
WithStatus(workloadhealth.ControllerID, &pbresource.Status{
Conditions: []*pbresource.Condition{
{
Type: workloadhealth.StatusConditionHealthy,
@ -777,7 +777,7 @@ func (suite *controllerSuite) TestController() {
// Update the health status of the workload
suite.client.WriteStatus(suite.ctx, &pbresource.WriteStatusRequest{
Id: workload.Id,
Key: workloadhealth.StatusKey,
Key: workloadhealth.ControllerID,
Status: &pbresource.Status{
ObservedGeneration: workload.Generation,
Conditions: []*pbresource.Condition{

View File

@ -12,14 +12,13 @@ import (
)
type Dependencies struct {
WorkloadHealthNodeMapper workloadhealth.NodeMapper
EndpointsWorkloadMapper endpoints.WorkloadMapper
FailoverMapper failover.FailoverMapper
EndpointsWorkloadMapper endpoints.WorkloadMapper
FailoverMapper failover.FailoverMapper
}
func Register(mgr *controller.Manager, deps Dependencies) {
mgr.Register(nodehealth.NodeHealthController())
mgr.Register(workloadhealth.WorkloadHealthController(deps.WorkloadHealthNodeMapper))
mgr.Register(workloadhealth.WorkloadHealthController())
mgr.Register(endpoints.ServiceEndpointsController(deps.EndpointsWorkloadMapper))
mgr.Register(failover.FailoverPolicyController(deps.FailoverMapper))
}

View File

@ -13,107 +13,73 @@ import (
"github.com/hashicorp/consul/internal/catalog/internal/controllers/nodehealth"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/controller/cache/index"
"github.com/hashicorp/consul/internal/controller/cache/indexers"
"github.com/hashicorp/consul/internal/controller/dependency"
"github.com/hashicorp/consul/internal/resource"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
"github.com/hashicorp/consul/proto-public/pbresource"
)
const (
nodeIndexName = "node"
)
var (
errNodeUnreconciled = errors.New("Node health has not been reconciled yet")
errNodeHealthInvalid = errors.New("Node health has invalid reason")
errNodeHealthConditionNotFound = fmt.Errorf("Node health status is missing the %s condition", nodehealth.StatusConditionHealthy)
)
// The NodeMapper interface is used to provide an implementation around being able to
// map a watch event for a Node resource and translate it to reconciliation requests
// for all Workloads assigned to that node.
type NodeMapper interface {
// MapNodeToWorkloads will take a Node resource and return controller requests
// for all Workloads associated with the Node.
MapNodeToWorkloads(ctx context.Context, rt controller.Runtime, res *pbresource.Resource) ([]controller.Request, error)
// TrackWorkload instructs the NodeMapper to associate the given workload
// ID with the given node ID.
TrackWorkload(workloadID *pbresource.ID, nodeID *pbresource.ID)
// UntrackWorkload instructs the Nodemapper to forget about any
// association it was tracking for this workload.
UntrackWorkload(workloadID *pbresource.ID)
// NodeIDFromWorkload is used to generate the resource ID for the Node referenced
// within the NodeName field of the Workload.
NodeIDFromWorkload(workload *pbresource.Resource, workloadData *pbcatalog.Workload) *pbresource.ID
}
func WorkloadHealthController(nodeMap NodeMapper) *controller.Controller {
if nodeMap == nil {
panic("No NodeMapper was provided to the WorkloadHealthController constructor")
}
return controller.NewController(StatusKey, pbcatalog.WorkloadType).
func WorkloadHealthController() *controller.Controller {
return controller.NewController(
ControllerID,
pbcatalog.WorkloadType,
// Keep an index on which Workloads are assigned to nodes.
indexers.DecodedSingleIndexer[*pbcatalog.Workload](
nodeIndexName,
index.ReferenceOrIDFromArgs,
nodeIndexer,
)).
WithWatch(pbcatalog.HealthStatusType, dependency.MapOwnerFiltered(pbcatalog.WorkloadType)).
WithWatch(pbcatalog.NodeType, nodeMap.MapNodeToWorkloads).
WithReconciler(&workloadHealthReconciler{nodeMap: nodeMap})
WithWatch(pbcatalog.NodeType, dependency.CacheListMapper(pbcatalog.WorkloadType, nodeIndexName)).
WithReconciler(&workloadHealthReconciler{})
}
type workloadHealthReconciler struct {
nodeMap NodeMapper
}
type workloadHealthReconciler struct{}
func (r *workloadHealthReconciler) Reconcile(ctx context.Context, rt controller.Runtime, req controller.Request) error {
// The runtime is passed by value so replacing it here for the remainder of this
// reconciliation request processing will not affect future invocations.
rt.Logger = rt.Logger.With("resource-id", req.ID, "controller", StatusKey)
rt.Logger = rt.Logger.With("resource-id", req.ID)
rt.Logger.Trace("reconciling workload health")
// read the workload
rsp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: req.ID})
switch {
case status.Code(err) == codes.NotFound:
rt.Logger.Trace("workload has been deleted")
r.nodeMap.UntrackWorkload(req.ID)
return nil
case err != nil:
workload, err := resource.GetDecodedResource[*pbcatalog.Workload](ctx, rt.Client, req.ID)
if err != nil {
rt.Logger.Error("the resource service has returned an unexpected error", "error", err)
return err
}
res := rsp.Resource
var workload pbcatalog.Workload
if err := res.Data.UnmarshalTo(&workload); err != nil {
// This should be impossible and will not be exercised in tests. Various
// type validations on admission ensure that all Workloads would
// be marshallable in this way.
rt.Logger.Error("error unmarshalling workload data", "error", err)
return err
if workload == nil {
rt.Logger.Trace("workload has been deleted")
return nil
}
nodeHealth := pbcatalog.Health_HEALTH_PASSING
if workload.NodeName != "" {
nodeID := r.nodeMap.NodeIDFromWorkload(res, &workload)
r.nodeMap.TrackWorkload(res.Id, nodeID)
// It is important that getting the nodes health happens after tracking the
// Workload with the node mapper. If the order were reversed we could
// potentially miss events for data that changes after we read the node but
// before we configured the node mapper to map subsequent events to this
// workload.
nodeID := nodeIDFromWorkload(workload)
if nodeID != nil {
nodeHealth, err = getNodeHealth(ctx, rt, nodeID)
if err != nil {
rt.Logger.Error("error looking up node health", "error", err, "node-id", nodeID)
return err
}
} else {
// the node association may be been removed so stop tracking it.
r.nodeMap.UntrackWorkload(res.Id)
}
// passing the workload from the response because getWorkloadHealth uses
// resourceClient.ListByOwner which requires ownerID have a Uid and this is the
// safest way for application and test code to ensure Uid is provided.
workloadHealth, err := getWorkloadHealth(ctx, rt, rsp.Resource.Id)
workloadHealth, err := getWorkloadHealth(ctx, rt, workload.Id)
if err != nil {
// This should be impossible under normal operations and will not be exercised
// within the unit tests. This can only fail if the resource service fails
@ -128,18 +94,18 @@ func (r *workloadHealthReconciler) Reconcile(ctx context.Context, rt controller.
}
condition := WorkloadConditions[workloadHealth]
if workload.NodeName != "" {
if nodeID != nil {
condition = NodeAndWorkloadConditions[workloadHealth][nodeHealth]
}
newStatus := &pbresource.Status{
ObservedGeneration: res.Generation,
ObservedGeneration: workload.Generation,
Conditions: []*pbresource.Condition{
condition,
},
}
if resource.EqualStatus(res.Status[StatusKey], newStatus, false) {
if resource.EqualStatus(workload.Status[ControllerID], newStatus, false) {
rt.Logger.Trace("resources workload health status is unchanged",
"health", health.String(),
"node-health", nodeHealth.String(),
@ -148,8 +114,8 @@ func (r *workloadHealthReconciler) Reconcile(ctx context.Context, rt controller.
}
_, err = rt.Client.WriteStatus(ctx, &pbresource.WriteStatusRequest{
Id: res.Id,
Key: StatusKey,
Id: workload.Id,
Key: ControllerID,
Status: newStatus,
})
@ -232,3 +198,26 @@ func getWorkloadHealth(ctx context.Context, rt controller.Runtime, workloadRef *
return workloadHealth, nil
}
func nodeIDFromWorkload(workload *resource.DecodedResource[*pbcatalog.Workload]) *pbresource.ID {
if workload.Data.NodeName == "" {
return nil
}
return &pbresource.ID{
Type: pbcatalog.NodeType,
Name: workload.Data.NodeName,
Tenancy: &pbresource.Tenancy{
Partition: workload.Id.GetTenancy().GetPartition(),
},
}
}
func nodeIndexer(workload *resource.DecodedResource[*pbcatalog.Workload]) (bool, []byte, error) {
nodeID := nodeIDFromWorkload(workload)
if nodeID == nil {
return false, nil, nil
}
return true, index.IndexFromRefOrID(nodeID), nil
}

View File

@ -13,13 +13,12 @@ import (
"github.com/stretchr/testify/suite"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/testing/protocmp"
svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing"
"github.com/hashicorp/consul/internal/catalog/internal/controllers/nodehealth"
"github.com/hashicorp/consul/internal/catalog/internal/mappers/nodemapper"
"github.com/hashicorp/consul/internal/catalog/internal/types"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/controller/controllertest"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/resourcetest"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
@ -88,15 +87,19 @@ type controllerSuite struct {
isEnterprise bool
tenancies []*pbresource.Tenancy
ctl *controller.TestController
}
func (suite *controllerSuite) SetupTest() {
suite.tenancies = resourcetest.TestTenancies()
suite.client = svctest.NewResourceServiceBuilder().
client := svctest.NewResourceServiceBuilder().
WithRegisterFns(types.Register).
WithTenancies(suite.tenancies...).
Run(suite.T())
suite.runtime = controller.Runtime{Client: suite.client, Logger: testutil.Logger(suite.T())}
suite.ctl = controller.NewTestController(WorkloadHealthController(), client).
WithLogger(testutil.Logger(suite.T()))
suite.runtime = suite.ctl.Runtime()
suite.client = suite.runtime.Client
suite.isEnterprise = versiontest.IsEnterprise()
}
@ -104,15 +107,15 @@ func (suite *controllerSuite) SetupTest() {
// in a manner consistent with the node-health controller. This allows us to not actually
// run and test the node-health controller but consume its "api" in the form of how
// it encodes status.
func (suite *controllerSuite) injectNodeWithStatus(name string, health pbcatalog.Health, tenancy *pbresource.Tenancy) *pbresource.Resource {
suite.T().Helper()
func injectNodeWithStatus(t testutil.TestingTB, client pbresource.ResourceServiceClient, name string, health pbcatalog.Health, tenancy *pbresource.Tenancy) *pbresource.Resource {
t.Helper()
state := pbresource.Condition_STATE_TRUE
if health >= pbcatalog.Health_HEALTH_WARNING {
state = pbresource.Condition_STATE_FALSE
}
return resourcetest.Resource(pbcatalog.NodeType, name).
WithData(suite.T(), nodeData).
WithData(t, nodeData).
WithTenancy(&pbresource.Tenancy{
Partition: tenancy.Partition,
}).
@ -125,7 +128,7 @@ func (suite *controllerSuite) injectNodeWithStatus(name string, health pbcatalog
},
},
}).
Write(suite.T(), suite.client)
Write(t, client)
}
// the workloadHealthControllerTestSuite intends to test the main Reconciliation
@ -136,19 +139,11 @@ func (suite *controllerSuite) injectNodeWithStatus(name string, health pbcatalog
// those other functions will be tested with their own test suites.
type workloadHealthControllerTestSuite struct {
controllerSuite
mapper *nodemapper.NodeMapper
reconciler *workloadHealthReconciler
}
func (suite *workloadHealthControllerTestSuite) SetupTest() {
// invoke all the other suite setup
suite.controllerSuite.SetupTest()
suite.mapper = nodemapper.New()
suite.reconciler = &workloadHealthReconciler{
nodeMap: suite.mapper,
}
}
// testReconcileWithNode will inject a node with the given health, a workload
@ -162,7 +157,7 @@ func (suite *workloadHealthControllerTestSuite) SetupTest() {
func (suite *workloadHealthControllerTestSuite) testReconcileWithNode(nodeHealth, workloadHealth pbcatalog.Health, tenancy *pbresource.Tenancy, status *pbresource.Condition) *pbresource.Resource {
suite.T().Helper()
node := suite.injectNodeWithStatus("test-node", nodeHealth, tenancy)
node := injectNodeWithStatus(suite.T(), suite.client, "test-node", nodeHealth, tenancy)
workload := resourcetest.Resource(pbcatalog.WorkloadType, "test-workload").
WithData(suite.T(), workloadData(node.Id.Name)).
@ -175,28 +170,12 @@ func (suite *workloadHealthControllerTestSuite) testReconcileWithNode(nodeHealth
WithTenancy(tenancy).
Write(suite.T(), suite.client)
err := suite.reconciler.Reconcile(context.Background(), suite.runtime, controller.Request{
err := suite.ctl.Reconcile(context.Background(), controller.Request{
ID: workload.Id,
})
require.NoError(suite.T(), err)
// ensure that the node is now being tracked by the mapper
reqs, err := suite.mapper.MapNodeToWorkloads(context.Background(), suite.runtime, node)
require.NoError(suite.T(), err)
require.Len(suite.T(), reqs, 1)
protocmp.Transform()
prototest.AssertDeepEqual(suite.T(), workload.Id, reqs[0].ID, protocmp.IgnoreFields(workload.Id, "uid"))
suite.T().Cleanup(func() {
// future calls to reconcile would normally have done this as the resource was
// removed. In the case of reconcile being called manually, when the resources
// are automatically removed, the tracking will be stale. In most tests this step
// to remove the tracking should be unnecessary as they will not be reusing a
// mapper between subtests and so it will get "removed" as the mapper is gc'ed.
suite.mapper.UntrackWorkload(workload.Id)
})
return suite.checkWorkloadStatus(workload.Id, status)
}
@ -221,7 +200,7 @@ func (suite *workloadHealthControllerTestSuite) testReconcileWithoutNode(workloa
WithOwner(workload.Id).
Write(suite.T(), suite.client)
err := suite.reconciler.Reconcile(context.Background(), suite.runtime, controller.Request{
err := suite.ctl.Reconcile(context.Background(), controller.Request{
ID: workload.Id,
})
@ -242,7 +221,7 @@ func (suite *workloadHealthControllerTestSuite) checkWorkloadStatus(id *pbresour
require.NoError(suite.T(), err)
actualStatus, found := rsp.Resource.Status[StatusKey]
actualStatus, found := rsp.Resource.Status[ControllerID]
require.True(suite.T(), found)
require.Equal(suite.T(), rsp.Resource.Generation, actualStatus.ObservedGeneration)
require.Len(suite.T(), actualStatus.Conditions, 1)
@ -398,61 +377,12 @@ func (suite *workloadHealthControllerTestSuite) TestReconcileReadError() {
suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) {
id := resourceID(fakeType, "blah", tenancy)
err := suite.reconciler.Reconcile(context.Background(), suite.runtime, controller.Request{ID: id})
err := suite.ctl.Reconcile(context.Background(), controller.Request{ID: id})
require.Error(suite.T(), err)
require.Equal(suite.T(), codes.InvalidArgument, status.Code(err))
})
}
func (suite *workloadHealthControllerTestSuite) TestReconcileNotFound() {
// This test wants to ensure that tracking for a workload is removed when the workload is deleted
// so this test will inject the tracking, issue the Reconcile call which will get a
// not found error and then ensure that the tracking was removed.
suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) {
workload := resourcetest.Resource(pbcatalog.WorkloadType, "foo").
WithData(suite.T(), workloadData("test-node")).
// don't write this because then in the call to reconcile the resource
// would be found and defeat the purpose of the tes
WithTenancy(tenancy).
Build()
node := resourcetest.Resource(pbcatalog.NodeType, "test-node").
WithData(suite.T(), nodeData).
WithTenancy(&pbresource.Tenancy{
Partition: tenancy.Partition,
}).
// Whether this gets written or not doesn't matter
Build()
// Track the workload - this simulates a previous round of reconciliation
// where the workload existed and was associated to the node. Other tests
// will cover more of the lifecycle of the controller so for the purposes
// of this test we can just inject it ourselves.
suite.mapper.TrackWorkload(workload.Id, node.Id)
// check that the worklooad is in fact tracked properly
reqs, err := suite.mapper.MapNodeToWorkloads(context.Background(), suite.runtime, node)
require.NoError(suite.T(), err)
require.Len(suite.T(), reqs, 1)
prototest.AssertDeepEqual(suite.T(), workload.Id, reqs[0].ID)
// This workload was never actually inserted so the request should return a NotFound
// error and remove the workload from tracking
require.NoError(
suite.T(),
suite.reconciler.Reconcile(
context.Background(),
suite.runtime,
controller.Request{ID: workload.Id}))
// Check the mapper again to ensure the node:workload association was removed.
reqs, err = suite.mapper.MapNodeToWorkloads(context.Background(), suite.runtime, node)
require.NoError(suite.T(), err)
require.Empty(suite.T(), reqs)
})
}
func (suite *workloadHealthControllerTestSuite) TestGetNodeHealthError() {
// This test aims to ensure that errors coming from the getNodeHealth
// function are propagated back to the caller. In order to do so
@ -482,7 +412,7 @@ func (suite *workloadHealthControllerTestSuite) TestGetNodeHealthError() {
WithTenancy(tenancy).
Write(suite.T(), suite.client)
err := suite.reconciler.Reconcile(context.Background(), suite.runtime, controller.Request{
err := suite.ctl.Reconcile(context.Background(), controller.Request{
ID: workload.Id,
})
@ -506,7 +436,7 @@ func (suite *workloadHealthControllerTestSuite) TestReconcile_AvoidReconciliatio
}
res1 := suite.testReconcileWithoutNode(pbcatalog.Health_HEALTH_WARNING, tenancy, status)
err := suite.reconciler.Reconcile(context.Background(), suite.runtime, controller.Request{ID: res1.Id})
err := suite.ctl.Reconcile(context.Background(), controller.Request{ID: res1.Id})
require.NoError(suite.T(), err)
// check that the status hasn't changed
@ -519,84 +449,85 @@ func (suite *workloadHealthControllerTestSuite) TestReconcile_AvoidReconciliatio
})
}
func (suite *workloadHealthControllerTestSuite) TestController() {
func TestController(t *testing.T) {
// This test aims to be a very light weight integration test of the
// controller with the controller manager as well as a general
// controller lifecycle test.
// create the controller manager
mgr := controller.NewManager(suite.client, testutil.Logger(suite.T()))
client := controllertest.NewControllerTestBuilder().
WithTenancies(resourcetest.TestTenancies()...).
WithResourceRegisterFns(types.Register).
WithControllerRegisterFns(func(mgr *controller.Manager) {
mgr.Register(WorkloadHealthController())
}).
Run(t)
// register our controller
mgr.Register(WorkloadHealthController(suite.mapper))
mgr.SetRaftLeader(true)
ctx, cancel := context.WithCancel(context.Background())
suite.T().Cleanup(cancel)
for _, tenancy := range resourcetest.TestTenancies() {
t.Run(tenancySubTestName(tenancy), func(t *testing.T) {
tenancy := tenancy
// run the manager
go mgr.Run(ctx)
node := injectNodeWithStatus(t, client, "test-node", pbcatalog.Health_HEALTH_PASSING, tenancy)
suite.controllerSuite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) {
node := suite.injectNodeWithStatus("test-node", pbcatalog.Health_HEALTH_PASSING, tenancy)
// create the workload
workload := resourcetest.Resource(pbcatalog.WorkloadType, "test-workload").
WithData(t, workloadData(node.Id.Name)).
WithTenancy(tenancy).
Write(t, client)
// create the workload
workload := resourcetest.Resource(pbcatalog.WorkloadType, "test-workload").
WithData(suite.T(), workloadData(node.Id.Name)).
WithTenancy(tenancy).
Write(suite.T(), suite.client)
// Wait for reconciliation to occur and mark the workload as passing.
waitForReconciliation(t, client, workload.Id, "HEALTH_PASSING")
// Wait for reconciliation to occur and mark the workload as passing.
suite.waitForReconciliation(workload.Id, "HEALTH_PASSING")
// Simulate a node unhealthy
injectNodeWithStatus(t, client, "test-node", pbcatalog.Health_HEALTH_WARNING, tenancy)
// Simulate a node unhealthy
suite.injectNodeWithStatus("test-node", pbcatalog.Health_HEALTH_WARNING, tenancy)
// Wait for reconciliation to occur and mark the workload as warning
// due to the node going into the warning state.
waitForReconciliation(t, client, workload.Id, "HEALTH_WARNING")
// Wait for reconciliation to occur and mark the workload as warning
// due to the node going into the warning state.
suite.waitForReconciliation(workload.Id, "HEALTH_WARNING")
// Now register a critical health check that should supercede the nodes
// warning status
// Now register a critical health check that should supercede the nodes
// warning status
resourcetest.Resource(pbcatalog.HealthStatusType, "test-status").
WithData(t, &pbcatalog.HealthStatus{Type: "tcp", Status: pbcatalog.Health_HEALTH_CRITICAL}).
WithOwner(workload.Id).
WithTenancy(tenancy).
Write(t, client)
resourcetest.Resource(pbcatalog.HealthStatusType, "test-status").
WithData(suite.T(), &pbcatalog.HealthStatus{Type: "tcp", Status: pbcatalog.Health_HEALTH_CRITICAL}).
WithOwner(workload.Id).
WithTenancy(tenancy).
Write(suite.T(), suite.client)
// Wait for reconciliation to occur again and mark the workload as unhealthy
waitForReconciliation(t, client, workload.Id, "HEALTH_CRITICAL")
// Wait for reconciliation to occur again and mark the workload as unhealthy
suite.waitForReconciliation(workload.Id, "HEALTH_CRITICAL")
// Put the health status back into a passing state and delink the node
resourcetest.Resource(pbcatalog.HealthStatusType, "test-status").
WithData(t, &pbcatalog.HealthStatus{Type: "tcp", Status: pbcatalog.Health_HEALTH_PASSING}).
WithOwner(workload.Id).
WithTenancy(tenancy).
Write(t, client)
workload = resourcetest.Resource(pbcatalog.WorkloadType, "test-workload").
WithData(t, workloadData("")).
WithTenancy(tenancy).
Write(t, client)
// Put the health status back into a passing state and delink the node
resourcetest.Resource(pbcatalog.HealthStatusType, "test-status").
WithData(suite.T(), &pbcatalog.HealthStatus{Type: "tcp", Status: pbcatalog.Health_HEALTH_PASSING}).
WithOwner(workload.Id).
WithTenancy(tenancy).
Write(suite.T(), suite.client)
workload = resourcetest.Resource(pbcatalog.WorkloadType, "test-workload").
WithData(suite.T(), workloadData("")).
WithTenancy(tenancy).
Write(suite.T(), suite.client)
// Now that the workload health is passing and its not associated with the node its status should
// eventually become passing
suite.waitForReconciliation(workload.Id, "HEALTH_PASSING")
})
// Now that the workload health is passing and its not associated with the node its status should
// eventually become passing
waitForReconciliation(t, client, workload.Id, "HEALTH_PASSING")
})
}
}
// wait for reconciliation is a helper to check if a resource has been reconciled and
// is marked with the expected status.
func (suite *workloadHealthControllerTestSuite) waitForReconciliation(id *pbresource.ID, reason string) {
suite.T().Helper()
func waitForReconciliation(t testutil.TestingTB, client pbresource.ResourceServiceClient, id *pbresource.ID, reason string) {
t.Helper()
retry.RunWith(&retry.Timer{Wait: 100 * time.Millisecond, Timeout: 5 * time.Second},
suite.T(), func(r *retry.R) {
rsp, err := suite.client.Read(context.Background(), &pbresource.ReadRequest{
t, func(r *retry.R) {
rsp, err := client.Read(context.Background(), &pbresource.ReadRequest{
Id: id,
})
require.NoError(r, err)
status, found := rsp.Resource.Status[StatusKey]
status, found := rsp.Resource.Status[ControllerID]
require.True(r, found)
require.Equal(r, rsp.Resource.Generation, status.ObservedGeneration)
require.Len(r, status.Conditions, 1)
@ -604,7 +535,7 @@ func (suite *workloadHealthControllerTestSuite) waitForReconciliation(id *pbreso
})
}
func TestWorkloadHealthController(t *testing.T) {
func TestWorkloadHealthController_Reconcile(t *testing.T) {
suite.Run(t, new(workloadHealthControllerTestSuite))
}
@ -822,7 +753,7 @@ func (suite *getNodeHealthTestSuite) TestValidHealth() {
}
suite.T().Run(healthStr, func(t *testing.T) {
node := suite.injectNodeWithStatus("test-node", health, tenancy)
node := injectNodeWithStatus(suite.T(), suite.client, "test-node", health, tenancy)
actualHealth, err := getNodeHealth(context.Background(), suite.runtime, node.Id)
require.NoError(t, err)
@ -838,12 +769,12 @@ func TestGetNodeHealth(t *testing.T) {
func (suite *controllerSuite) runTestCaseWithTenancies(testFunc func(*pbresource.Tenancy)) {
for _, tenancy := range suite.tenancies {
suite.Run(suite.appendTenancyInfo(tenancy), func() {
suite.Run(tenancySubTestName(tenancy), func() {
testFunc(tenancy)
})
}
}
func (suite *controllerSuite) appendTenancyInfo(tenancy *pbresource.Tenancy) string {
func tenancySubTestName(tenancy *pbresource.Tenancy) string {
return fmt.Sprintf("%s_Namespace_%s_Partition", tenancy.Namespace, tenancy.Partition)
}

View File

@ -10,7 +10,7 @@ import (
)
const (
StatusKey = "consul.io/workload-health"
ControllerID = "consul.io/workload-health"
StatusConditionHealthy = "healthy"
NodeAndWorkloadHealthyMessage = "All workload and associated node health checks are passing"

View File

@ -1,57 +0,0 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package nodemapper
import (
"context"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/mappers/bimapper"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
"github.com/hashicorp/consul/proto-public/pbresource"
)
type NodeMapper struct {
b *bimapper.Mapper
}
func New() *NodeMapper {
return &NodeMapper{
b: bimapper.New(pbcatalog.WorkloadType, pbcatalog.NodeType),
}
}
// NodeIDFromWorkload will create a resource ID referencing the Node type with the same tenancy as
// the workload and with the name populated from the workloads NodeName field.
func (m *NodeMapper) NodeIDFromWorkload(workload *pbresource.Resource, workloadData *pbcatalog.Workload) *pbresource.ID {
return &pbresource.ID{
Type: pbcatalog.NodeType,
Tenancy: &pbresource.Tenancy{
Partition: workload.Id.Tenancy.GetPartition(),
},
Name: workloadData.NodeName,
}
}
// MapNodeToWorkloads will take a Node resource and return controller requests
// for all Workloads associated with the Node.
func (m *NodeMapper) MapNodeToWorkloads(_ context.Context, _ controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) {
ids := m.b.ItemIDsForLink(res.Id)
return controller.MakeRequests(pbcatalog.WorkloadType, ids), nil
}
// TrackWorkload instructs the NodeMapper to associate the given workload
// ID with the given node ID.
func (m *NodeMapper) TrackWorkload(workloadID *pbresource.ID, nodeID *pbresource.ID) {
m.b.TrackItem(workloadID, []resource.ReferenceOrID{
nodeID,
})
}
// UntrackWorkload will cause the node mapper to forget about the specified
// workload if it is currently tracking it.
func (m *NodeMapper) UntrackWorkload(workloadID *pbresource.ID) {
m.b.UntrackItem(workloadID)
}

View File

@ -1,154 +0,0 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package nodemapper
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/resourcetest"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/proto/private/prototest"
)
func TestNodeMapper_NodeIDFromWorkload(t *testing.T) {
mapper := New()
data := &pbcatalog.Workload{
NodeName: "test-node",
// the other fields should be irrelevant
}
workload := resourcetest.Resource(pbcatalog.WorkloadType, "test-workload").
WithData(t, data).Build()
actual := mapper.NodeIDFromWorkload(workload, data)
expected := &pbresource.ID{
Type: pbcatalog.NodeType,
Tenancy: &pbresource.Tenancy{
Partition: workload.Id.Tenancy.GetPartition(),
},
Name: "test-node",
}
prototest.AssertDeepEqual(t, expected, actual)
}
func requireWorkloadsTracked(t *testing.T, mapper *NodeMapper, node *pbresource.Resource, workloads ...*pbresource.ID) {
t.Helper()
reqs, err := mapper.MapNodeToWorkloads(
context.Background(),
controller.Runtime{},
node)
require.NoError(t, err)
require.Len(t, reqs, len(workloads))
for _, workload := range workloads {
prototest.AssertContainsElement(t, reqs, controller.Request{ID: workload})
}
}
func TestNodeMapper_WorkloadTracking(t *testing.T) {
mapper := New()
node1 := resourcetest.Resource(pbcatalog.NodeType, "node1").
WithTenancy(resource.DefaultPartitionedTenancy()).
WithData(t, &pbcatalog.Node{Addresses: []*pbcatalog.NodeAddress{{Host: "198.18.0.1"}}}).
Build()
node2 := resourcetest.Resource(pbcatalog.NodeType, "node2").
WithTenancy(resource.DefaultPartitionedTenancy()).
WithData(t, &pbcatalog.Node{Addresses: []*pbcatalog.NodeAddress{{Host: "198.18.0.2"}}}).
Build()
tenant := &pbresource.Tenancy{
Partition: "default",
Namespace: "default",
PeerName: "local",
}
workload1 := &pbresource.ID{Type: pbcatalog.WorkloadType, Tenancy: tenant, Name: "workload1"}
workload2 := &pbresource.ID{Type: pbcatalog.WorkloadType, Tenancy: tenant, Name: "workload2"}
workload3 := &pbresource.ID{Type: pbcatalog.WorkloadType, Tenancy: tenant, Name: "workload3"}
workload4 := &pbresource.ID{Type: pbcatalog.WorkloadType, Tenancy: tenant, Name: "workload4"}
workload5 := &pbresource.ID{Type: pbcatalog.WorkloadType, Tenancy: tenant, Name: "workload5"}
// No Workloads have been tracked so the mapper should return empty lists
requireWorkloadsTracked(t, mapper, node1)
requireWorkloadsTracked(t, mapper, node2)
// As nothing is tracked these should be pretty much no-ops
mapper.UntrackWorkload(workload1)
mapper.UntrackWorkload(workload2)
mapper.UntrackWorkload(workload2)
mapper.UntrackWorkload(workload3)
mapper.UntrackWorkload(workload4)
mapper.UntrackWorkload(workload5)
// Now track some workloads
mapper.TrackWorkload(workload1, node1.Id)
mapper.TrackWorkload(workload2, node1.Id)
mapper.TrackWorkload(workload3, node2.Id)
mapper.TrackWorkload(workload4, node2.Id)
// Mapping should now return 2 workload requests for each node
requireWorkloadsTracked(t, mapper, node1, workload1, workload2)
requireWorkloadsTracked(t, mapper, node2, workload3, workload4)
// Track the same workloads again, this should end up being mostly a no-op
mapper.TrackWorkload(workload1, node1.Id)
mapper.TrackWorkload(workload2, node1.Id)
mapper.TrackWorkload(workload3, node2.Id)
mapper.TrackWorkload(workload4, node2.Id)
// Mappings should be unchanged from the initial workload tracking
requireWorkloadsTracked(t, mapper, node1, workload1, workload2)
requireWorkloadsTracked(t, mapper, node2, workload3, workload4)
// Change the workload association for workload2
mapper.TrackWorkload(workload2, node2.Id)
// Node1 should now track just the single workload and node2 should track 3
requireWorkloadsTracked(t, mapper, node1, workload1)
requireWorkloadsTracked(t, mapper, node2, workload2, workload3, workload4)
// Untrack the workloads - this is done in very specific ordering to ensure all
// the workload tracking removal paths get hit. This does assume that the ordering
// of requests is stable between removals.
// remove the one and only workload from a node
mapper.UntrackWorkload(workload1)
requireWorkloadsTracked(t, mapper, node1)
// track an additional workload
mapper.TrackWorkload(workload5, node2.Id)
reqs, err := mapper.MapNodeToWorkloads(context.Background(), controller.Runtime{}, node2)
require.NoError(t, err)
require.Len(t, reqs, 4)
first := reqs[0].ID
second := reqs[1].ID
third := reqs[2].ID
fourth := reqs[3].ID
// remove from the middle of the request list
mapper.UntrackWorkload(second)
requireWorkloadsTracked(t, mapper, node2, first, third, fourth)
// remove from the end of the list
mapper.UntrackWorkload(fourth)
requireWorkloadsTracked(t, mapper, node2, first, third)
// remove from the beginning of the list
mapper.UntrackWorkload(first)
requireWorkloadsTracked(t, mapper, node2, third)
// remove the last element
mapper.UntrackWorkload(third)
requireWorkloadsTracked(t, mapper, node2)
}

View File

@ -42,6 +42,11 @@ func NewTestController(ctl *Controller, client pbresource.ResourceServiceClient)
}
}
func (tc *TestController) WithLogger(logger hclog.Logger) *TestController {
tc.logger = tc.c.buildLogger(logger)
return tc
}
// Reconcile invokes the controllers configured reconciler with the cache enabled Runtime
func (tc *TestController) Reconcile(ctx context.Context, req Request) error {
return tc.c.reconciler.Reconcile(ctx, tc.Runtime(), req)