mirror of https://github.com/status-im/consul.git
Add the workload health controller (#17215)
This commit is contained in:
parent
d20e3df63c
commit
1d6a0c8f21
|
@ -844,7 +844,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
|
|||
|
||||
func (s *Server) registerResources() {
|
||||
catalog.RegisterTypes(s.typeRegistry)
|
||||
catalog.RegisterControllers(s.controllerManager)
|
||||
catalog.RegisterControllers(s.controllerManager, catalog.DefaultControllerDependencies())
|
||||
|
||||
mesh.RegisterTypes(s.typeRegistry)
|
||||
reaper.RegisterControllers(s.controllerManager)
|
||||
|
|
|
@ -5,6 +5,7 @@ package catalog
|
|||
|
||||
import (
|
||||
"github.com/hashicorp/consul/internal/catalog/internal/controllers"
|
||||
"github.com/hashicorp/consul/internal/catalog/internal/mappers/nodemapper"
|
||||
"github.com/hashicorp/consul/internal/catalog/internal/types"
|
||||
"github.com/hashicorp/consul/internal/controller"
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
|
@ -46,8 +47,16 @@ func RegisterTypes(r resource.Registry) {
|
|||
types.Register(r)
|
||||
}
|
||||
|
||||
type ControllerDependencies = controllers.Dependencies
|
||||
|
||||
func DefaultControllerDependencies() ControllerDependencies {
|
||||
return ControllerDependencies{
|
||||
WorkloadHealthNodeMapper: nodemapper.New(),
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterControllers registers controllers for the catalog types with
|
||||
// the given controller Manager.
|
||||
func RegisterControllers(mgr *controller.Manager) {
|
||||
controllers.Register(mgr)
|
||||
func RegisterControllers(mgr *controller.Manager, deps ControllerDependencies) {
|
||||
controllers.Register(mgr, deps)
|
||||
}
|
||||
|
|
|
@ -5,9 +5,15 @@ package controllers
|
|||
|
||||
import (
|
||||
"github.com/hashicorp/consul/internal/catalog/internal/controllers/nodehealth"
|
||||
"github.com/hashicorp/consul/internal/catalog/internal/controllers/workloadhealth"
|
||||
"github.com/hashicorp/consul/internal/controller"
|
||||
)
|
||||
|
||||
func Register(mgr *controller.Manager) {
|
||||
mgr.Register(nodehealth.NodeHealthController())
|
||||
type Dependencies struct {
|
||||
WorkloadHealthNodeMapper workloadhealth.NodeMapper
|
||||
}
|
||||
|
||||
func Register(mgr *controller.Manager, deps Dependencies) {
|
||||
mgr.Register(nodehealth.NodeHealthController())
|
||||
mgr.Register(workloadhealth.WorkloadHealthController(deps.WorkloadHealthNodeMapper))
|
||||
}
|
||||
|
|
|
@ -0,0 +1,238 @@
|
|||
package workloadhealth
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/internal/catalog/internal/controllers/nodehealth"
|
||||
"github.com/hashicorp/consul/internal/catalog/internal/types"
|
||||
"github.com/hashicorp/consul/internal/controller"
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
var (
|
||||
errNodeUnreconciled = errors.New("Node health has not been reconciled yet")
|
||||
errNodeHealthInvalid = errors.New("Node health has invalid reason")
|
||||
errNodeHealthConditionNotFound = fmt.Errorf("Node health status is missing the %s condition", nodehealth.StatusConditionHealthy)
|
||||
)
|
||||
|
||||
// The NodeMapper interface is used to provide an implementation around being able to
|
||||
// map a watch event for a Node resource and translate it to reconciliation requests
|
||||
// for all Workloads assigned to that node.
|
||||
type NodeMapper interface {
|
||||
// MapNodeToWorkloads will take a Node resource and return controller requests
|
||||
// for all Workloads associated with the Node.
|
||||
MapNodeToWorkloads(ctx context.Context, rt controller.Runtime, res *pbresource.Resource) ([]controller.Request, error)
|
||||
|
||||
// TrackWorkload instructs the NodeMapper to associate the given workload
|
||||
// ID with the given node ID.
|
||||
TrackWorkload(workloadID *pbresource.ID, nodeID *pbresource.ID)
|
||||
|
||||
// UntrackWorkload instructs the Nodemapper to forget about any
|
||||
// association it was tracking for this workload.
|
||||
UntrackWorkload(workloadID *pbresource.ID)
|
||||
|
||||
// NodeIDFromWorkload is used to generate the resource ID for the Node referenced
|
||||
// within the NodeName field of the Workload.
|
||||
NodeIDFromWorkload(workload *pbresource.Resource, workloadData *pbcatalog.Workload) *pbresource.ID
|
||||
}
|
||||
|
||||
func WorkloadHealthController(nodeMap NodeMapper) controller.Controller {
|
||||
if nodeMap == nil {
|
||||
panic("No NodeMapper was provided to the WorkloadHealthController constructor")
|
||||
}
|
||||
|
||||
return controller.ForType(types.WorkloadType).
|
||||
WithWatch(types.HealthStatusType, controller.MapOwnerFiltered(types.WorkloadType)).
|
||||
WithWatch(types.NodeType, nodeMap.MapNodeToWorkloads).
|
||||
WithReconciler(&workloadHealthReconciler{nodeMap: nodeMap})
|
||||
}
|
||||
|
||||
type workloadHealthReconciler struct {
|
||||
nodeMap NodeMapper
|
||||
}
|
||||
|
||||
func (r *workloadHealthReconciler) Reconcile(ctx context.Context, rt controller.Runtime, req controller.Request) error {
|
||||
// The runtime is passed by value so replacing it here for the remaineder of this
|
||||
// reconciliation request processing will not affect future invocations.
|
||||
rt.Logger = rt.Logger.With("resource-id", req.ID, "controller", StatusKey)
|
||||
|
||||
rt.Logger.Trace("reconciling workload health")
|
||||
|
||||
// read the workload
|
||||
rsp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: req.ID})
|
||||
switch {
|
||||
case status.Code(err) == codes.NotFound:
|
||||
rt.Logger.Trace("workload has been deleted")
|
||||
r.nodeMap.UntrackWorkload(req.ID)
|
||||
return nil
|
||||
case err != nil:
|
||||
rt.Logger.Error("the resource service has returned an unexpected error", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
res := rsp.Resource
|
||||
var workload pbcatalog.Workload
|
||||
if err := res.Data.UnmarshalTo(&workload); err != nil {
|
||||
// This should be impossible and will not be exercised in tests. Various
|
||||
// type validations on admission ensure that all Workloads would
|
||||
// be marshallable in this way.
|
||||
rt.Logger.Error("error unmarshalling workload data", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
nodeHealth := pbcatalog.Health_HEALTH_PASSING
|
||||
if workload.NodeName != "" {
|
||||
nodeID := r.nodeMap.NodeIDFromWorkload(res, &workload)
|
||||
r.nodeMap.TrackWorkload(res.Id, nodeID)
|
||||
nodeHealth, err = getNodeHealth(ctx, rt, nodeID)
|
||||
if err != nil {
|
||||
rt.Logger.Error("error looking up node health", "error", err, "node-id", nodeID)
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// the node association may be been removed so stop tracking it.
|
||||
r.nodeMap.UntrackWorkload(res.Id)
|
||||
}
|
||||
|
||||
workloadHealth, err := getWorkloadHealth(ctx, rt, req.ID)
|
||||
if err != nil {
|
||||
// This should be impossible under normal operations and will not be exercised
|
||||
// within the unit tests. This can only fail if the resource service fails
|
||||
// or allows admission of invalid health statuses.
|
||||
rt.Logger.Error("error aggregating workload health statuses", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
health := nodeHealth
|
||||
if workloadHealth > health {
|
||||
health = workloadHealth
|
||||
}
|
||||
|
||||
statusState := pbresource.Condition_STATE_TRUE
|
||||
if health != pbcatalog.Health_HEALTH_PASSING {
|
||||
statusState = pbresource.Condition_STATE_FALSE
|
||||
}
|
||||
|
||||
message := WorkloadHealthyMessage
|
||||
if workload.NodeName != "" {
|
||||
message = NodeAndWorkloadHealthyMessage
|
||||
}
|
||||
switch {
|
||||
case workloadHealth != pbcatalog.Health_HEALTH_PASSING && nodeHealth != pbcatalog.Health_HEALTH_PASSING:
|
||||
message = NodeAndWorkloadUnhealthyMessage
|
||||
case workloadHealth != pbcatalog.Health_HEALTH_PASSING:
|
||||
message = WorkloadUnhealthyMessage
|
||||
case nodeHealth != pbcatalog.Health_HEALTH_PASSING:
|
||||
message = nodehealth.NodeUnhealthyMessage
|
||||
}
|
||||
|
||||
newStatus := &pbresource.Status{
|
||||
ObservedGeneration: res.Generation,
|
||||
Conditions: []*pbresource.Condition{
|
||||
{
|
||||
Type: StatusConditionHealthy,
|
||||
State: statusState,
|
||||
Reason: health.String(),
|
||||
Message: message,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if resource.EqualStatus(res.Status[StatusKey], newStatus, false) {
|
||||
rt.Logger.Trace("resources workload health status is unchanged",
|
||||
"health", health.String(),
|
||||
"node-health", nodeHealth.String(),
|
||||
"workload-health", workloadHealth.String())
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = rt.Client.WriteStatus(ctx, &pbresource.WriteStatusRequest{
|
||||
Id: res.Id,
|
||||
Key: StatusKey,
|
||||
Status: newStatus,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
rt.Logger.Error("error encountered when attempting to update the resources workload status", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
rt.Logger.Trace("resource's workload health status was updated",
|
||||
"health", health.String(),
|
||||
"node-health", nodeHealth.String(),
|
||||
"workload-health", workloadHealth.String())
|
||||
return nil
|
||||
}
|
||||
|
||||
func getNodeHealth(ctx context.Context, rt controller.Runtime, nodeRef *pbresource.ID) (pbcatalog.Health, error) {
|
||||
rsp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: nodeRef})
|
||||
switch {
|
||||
case status.Code(err) == codes.NotFound:
|
||||
return pbcatalog.Health_HEALTH_CRITICAL, nil
|
||||
case err != nil:
|
||||
return pbcatalog.Health_HEALTH_CRITICAL, err
|
||||
default:
|
||||
healthStatus, ok := rsp.Resource.Status[nodehealth.StatusKey]
|
||||
if !ok {
|
||||
// The Nodes health has never been reconciled and therefore the
|
||||
// workloads health cannot be determined. Returning nil is acceptable
|
||||
// because the controller should sometime soon run reconciliation for
|
||||
// the node which will then trigger rereconciliation of this workload
|
||||
return pbcatalog.Health_HEALTH_CRITICAL, errNodeUnreconciled
|
||||
}
|
||||
|
||||
for _, condition := range healthStatus.Conditions {
|
||||
if condition.Type == nodehealth.StatusConditionHealthy {
|
||||
if condition.State == pbresource.Condition_STATE_TRUE {
|
||||
return pbcatalog.Health_HEALTH_PASSING, nil
|
||||
}
|
||||
|
||||
healthReason, valid := pbcatalog.Health_value[condition.Reason]
|
||||
if !valid {
|
||||
// The Nodes health is unknown - presumably the node health controller
|
||||
// will come along and fix that up momentarily causing this workload
|
||||
// reconciliation to occur again.
|
||||
return pbcatalog.Health_HEALTH_CRITICAL, errNodeHealthInvalid
|
||||
}
|
||||
return pbcatalog.Health(healthReason), nil
|
||||
}
|
||||
}
|
||||
return pbcatalog.Health_HEALTH_CRITICAL, errNodeHealthConditionNotFound
|
||||
}
|
||||
}
|
||||
|
||||
func getWorkloadHealth(ctx context.Context, rt controller.Runtime, workloadRef *pbresource.ID) (pbcatalog.Health, error) {
|
||||
rsp, err := rt.Client.ListByOwner(ctx, &pbresource.ListByOwnerRequest{
|
||||
Owner: workloadRef,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return pbcatalog.Health_HEALTH_CRITICAL, err
|
||||
}
|
||||
|
||||
workloadHealth := pbcatalog.Health_HEALTH_PASSING
|
||||
|
||||
for _, res := range rsp.Resources {
|
||||
if resource.EqualType(res.Id.Type, types.HealthStatusType) {
|
||||
var hs pbcatalog.HealthStatus
|
||||
if err := res.Data.UnmarshalTo(&hs); err != nil {
|
||||
// This should be impossible and will not be executing in tests. The resource type
|
||||
// is the HealthStatus type and therefore must be unmarshallable into the HealthStatus
|
||||
// object or else it wouldn't have passed admission validation checks.
|
||||
return workloadHealth, fmt.Errorf("error unmarshalling health status data: %w", err)
|
||||
}
|
||||
|
||||
if hs.Status > workloadHealth {
|
||||
workloadHealth = hs.Status
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return workloadHealth, nil
|
||||
}
|
|
@ -0,0 +1,760 @@
|
|||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package workloadhealth
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing"
|
||||
"github.com/hashicorp/consul/internal/catalog/internal/controllers/nodehealth"
|
||||
"github.com/hashicorp/consul/internal/catalog/internal/mappers/nodemapper"
|
||||
"github.com/hashicorp/consul/internal/catalog/internal/types"
|
||||
"github.com/hashicorp/consul/internal/controller"
|
||||
"github.com/hashicorp/consul/internal/resource/resourcetest"
|
||||
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
"github.com/hashicorp/consul/proto/private/prototest"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
var (
|
||||
nodeData = &pbcatalog.Node{
|
||||
Addresses: []*pbcatalog.NodeAddress{
|
||||
{
|
||||
Host: "127.0.0.1",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
fakeType = &pbresource.Type{
|
||||
Group: "not",
|
||||
GroupVersion: "vfake",
|
||||
Kind: "found",
|
||||
}
|
||||
)
|
||||
|
||||
func resourceID(rtype *pbresource.Type, name string) *pbresource.ID {
|
||||
return &pbresource.ID{
|
||||
Type: rtype,
|
||||
Tenancy: &pbresource.Tenancy{
|
||||
Partition: "default",
|
||||
Namespace: "default",
|
||||
PeerName: "local",
|
||||
},
|
||||
Name: name,
|
||||
}
|
||||
}
|
||||
|
||||
func workloadData(nodeName string) *pbcatalog.Workload {
|
||||
return &pbcatalog.Workload{
|
||||
Addresses: []*pbcatalog.WorkloadAddress{
|
||||
{
|
||||
Host: "198.18.0.1",
|
||||
},
|
||||
},
|
||||
Ports: map[string]*pbcatalog.WorkloadPort{
|
||||
"http": {
|
||||
Port: 8080,
|
||||
Protocol: pbcatalog.Protocol_PROTOCOL_HTTP,
|
||||
},
|
||||
},
|
||||
Identity: "test",
|
||||
NodeName: nodeName,
|
||||
}
|
||||
}
|
||||
|
||||
// controllerSuite is just the base information the three other test suites
|
||||
// in this file will use. It will be embedded into the others allowing
|
||||
// for the test helpers and default setup to be reused and to force consistent
|
||||
// anming of the various data bits this holds on to.
|
||||
type controllerSuite struct {
|
||||
suite.Suite
|
||||
client pbresource.ResourceServiceClient
|
||||
runtime controller.Runtime
|
||||
}
|
||||
|
||||
func (suite *controllerSuite) SetupTest() {
|
||||
suite.client = svctest.RunResourceService(suite.T(), types.Register)
|
||||
suite.runtime = controller.Runtime{Client: suite.client, Logger: testutil.Logger(suite.T())}
|
||||
}
|
||||
|
||||
// injectNodeWithStatus is a helper method to write a Node resource and synthesize its status
|
||||
// in a manner consistent with the node-health controller. This allows us to not actually
|
||||
// run and test the node-health controller but consume its "api" in the form of how
|
||||
// it encodes status.
|
||||
func (suite *controllerSuite) injectNodeWithStatus(name string, health pbcatalog.Health) *pbresource.Resource {
|
||||
suite.T().Helper()
|
||||
state := pbresource.Condition_STATE_TRUE
|
||||
if health >= pbcatalog.Health_HEALTH_WARNING {
|
||||
state = pbresource.Condition_STATE_FALSE
|
||||
}
|
||||
|
||||
return resourcetest.Resource(types.NodeType, name).
|
||||
WithData(suite.T(), nodeData).
|
||||
WithStatus(nodehealth.StatusKey, &pbresource.Status{
|
||||
Conditions: []*pbresource.Condition{
|
||||
{
|
||||
Type: nodehealth.StatusConditionHealthy,
|
||||
State: state,
|
||||
Reason: health.String(),
|
||||
},
|
||||
},
|
||||
}).
|
||||
Write(suite.T(), suite.client)
|
||||
}
|
||||
|
||||
// the workloadHealthControllerTestSuite intends to test the main Reconciliation
|
||||
// functionality but will not do exhaustive testing of the getNodeHealth
|
||||
// or getWorkloadHealth functions. Without mocking the resource service which
|
||||
// we for now are avoiding, it should be impossible to inject errors into
|
||||
// those functions that would force some kinds of error cases. Therefore,
|
||||
// those other functions will be tested with their own test suites.
|
||||
type workloadHealthControllerTestSuite struct {
|
||||
controllerSuite
|
||||
|
||||
mapper *nodemapper.NodeMapper
|
||||
reconciler *workloadHealthReconciler
|
||||
}
|
||||
|
||||
func (suite *workloadHealthControllerTestSuite) SetupTest() {
|
||||
// invoke all the other suite setup
|
||||
suite.controllerSuite.SetupTest()
|
||||
|
||||
suite.mapper = nodemapper.New()
|
||||
suite.reconciler = &workloadHealthReconciler{
|
||||
nodeMap: suite.mapper,
|
||||
}
|
||||
}
|
||||
|
||||
// testReconcileWithNode will inject a node with the given health, a workload
|
||||
// associated with that node and then a health status owned by the workload
|
||||
// with the given workload health. Once all the resource injection has been
|
||||
// performed this will invoke the Reconcile method once on the reconciler
|
||||
// and checks a couple things:
|
||||
//
|
||||
// * The node to workload association is now being tracked by the node mapper
|
||||
// * The workloads status was updated and now matches the expected value
|
||||
func (suite *workloadHealthControllerTestSuite) testReconcileWithNode(nodeHealth, workloadHealth pbcatalog.Health, status *pbresource.Condition) *pbresource.Resource {
|
||||
suite.T().Helper()
|
||||
|
||||
node := suite.injectNodeWithStatus("test-node", nodeHealth)
|
||||
|
||||
workload := resourcetest.Resource(types.WorkloadType, "test-workload").
|
||||
WithData(suite.T(), workloadData(node.Id.Name)).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
resourcetest.Resource(types.HealthStatusType, "test-status").
|
||||
WithData(suite.T(), &pbcatalog.HealthStatus{Type: "tcp", Status: workloadHealth}).
|
||||
WithOwner(workload.Id).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
err := suite.reconciler.Reconcile(context.Background(), suite.runtime, controller.Request{
|
||||
ID: workload.Id,
|
||||
})
|
||||
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
// ensure that the node is now being tracked by the mapper
|
||||
reqs, err := suite.mapper.MapNodeToWorkloads(context.Background(), suite.runtime, node)
|
||||
require.NoError(suite.T(), err)
|
||||
require.Len(suite.T(), reqs, 1)
|
||||
prototest.AssertDeepEqual(suite.T(), reqs[0].ID, workload.Id)
|
||||
|
||||
suite.T().Cleanup(func() {
|
||||
// future calls to reconcile would normally have done this as the resource was
|
||||
// removed. In the case of reconcile being called manually, when the resources
|
||||
// are automatically removed, the tracking will be stale. In most tests this step
|
||||
// to remove the tracking should be unnecessary as they will not be reusing a
|
||||
// mapper between subtests and so it will get "removed" as the mapper is gc'ed.
|
||||
suite.mapper.UntrackWorkload(workload.Id)
|
||||
})
|
||||
|
||||
return suite.checkWorkloadStatus(workload.Id, status)
|
||||
}
|
||||
|
||||
// testReconcileWithoutNode will inject a workload associated and then a health status
|
||||
// owned by the workload with the given workload health. Once all the resource injection
|
||||
// has been performed this will invoke the Reconcile method once on the reconciler
|
||||
// and check that the computed status matches the expected value
|
||||
//
|
||||
// This is really just a tirmmed down version of testReconcileWithNode. It seemed
|
||||
// simpler and easier to read if these were two separate methods instead of combining
|
||||
// them in one with more branching based off of detecting whether nodes are in use.
|
||||
func (suite *workloadHealthControllerTestSuite) testReconcileWithoutNode(workloadHealth pbcatalog.Health, status *pbresource.Condition) *pbresource.Resource {
|
||||
suite.T().Helper()
|
||||
workload := resourcetest.Resource(types.WorkloadType, "test-workload").
|
||||
WithData(suite.T(), workloadData("")).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
resourcetest.Resource(types.HealthStatusType, "test-status").
|
||||
WithData(suite.T(), &pbcatalog.HealthStatus{Type: "tcp", Status: workloadHealth}).
|
||||
WithOwner(workload.Id).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
err := suite.reconciler.Reconcile(context.Background(), suite.runtime, controller.Request{
|
||||
ID: workload.Id,
|
||||
})
|
||||
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
// Read the resource back so we can detect the status changes
|
||||
return suite.checkWorkloadStatus(workload.Id, status)
|
||||
}
|
||||
|
||||
// checkWorkloadStatus will read the workload resource and verify that its
|
||||
// status has the expected value.
|
||||
func (suite *workloadHealthControllerTestSuite) checkWorkloadStatus(id *pbresource.ID, status *pbresource.Condition) *pbresource.Resource {
|
||||
suite.T().Helper()
|
||||
|
||||
rsp, err := suite.client.Read(context.Background(), &pbresource.ReadRequest{
|
||||
Id: id,
|
||||
})
|
||||
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
actualStatus, found := rsp.Resource.Status[StatusKey]
|
||||
require.True(suite.T(), found)
|
||||
require.Equal(suite.T(), rsp.Resource.Generation, actualStatus.ObservedGeneration)
|
||||
require.Len(suite.T(), actualStatus.Conditions, 1)
|
||||
prototest.AssertDeepEqual(suite.T(), status, actualStatus.Conditions[0])
|
||||
|
||||
return rsp.Resource
|
||||
}
|
||||
|
||||
func (suite *workloadHealthControllerTestSuite) TestReconcile() {
|
||||
// This test intends to ensure all the permutations of node health and workload
|
||||
// health end up with the correct computed status. When a test case omits
|
||||
// the workload health (or sets it to pbcatalog.Health_HEALTH_ANY) then the
|
||||
// workloads are nodeless and therefore node health will not be considered.
|
||||
// Additionally the messages put in the status for nodeless workloads are
|
||||
// a little different to not mention nodes and provide the user more context
|
||||
// about where the failing health checks are.
|
||||
|
||||
type testCase struct {
|
||||
nodeHealth pbcatalog.Health
|
||||
workloadHealth pbcatalog.Health
|
||||
expectedStatus *pbresource.Condition
|
||||
}
|
||||
|
||||
cases := map[string]testCase{
|
||||
"workload-passing": {
|
||||
workloadHealth: pbcatalog.Health_HEALTH_PASSING,
|
||||
expectedStatus: &pbresource.Condition{
|
||||
Type: StatusConditionHealthy,
|
||||
State: pbresource.Condition_STATE_TRUE,
|
||||
Reason: "HEALTH_PASSING",
|
||||
Message: WorkloadHealthyMessage,
|
||||
},
|
||||
},
|
||||
"workload-warning": {
|
||||
workloadHealth: pbcatalog.Health_HEALTH_WARNING,
|
||||
expectedStatus: &pbresource.Condition{
|
||||
Type: StatusConditionHealthy,
|
||||
State: pbresource.Condition_STATE_FALSE,
|
||||
Reason: "HEALTH_WARNING",
|
||||
Message: WorkloadUnhealthyMessage,
|
||||
},
|
||||
},
|
||||
"workload-critical": {
|
||||
workloadHealth: pbcatalog.Health_HEALTH_CRITICAL,
|
||||
expectedStatus: &pbresource.Condition{
|
||||
Type: StatusConditionHealthy,
|
||||
State: pbresource.Condition_STATE_FALSE,
|
||||
Reason: "HEALTH_CRITICAL",
|
||||
Message: WorkloadUnhealthyMessage,
|
||||
},
|
||||
},
|
||||
"workload-maintenance": {
|
||||
workloadHealth: pbcatalog.Health_HEALTH_MAINTENANCE,
|
||||
expectedStatus: &pbresource.Condition{
|
||||
Type: StatusConditionHealthy,
|
||||
State: pbresource.Condition_STATE_FALSE,
|
||||
Reason: "HEALTH_MAINTENANCE",
|
||||
Message: WorkloadUnhealthyMessage,
|
||||
},
|
||||
},
|
||||
"combined-passing": {
|
||||
nodeHealth: pbcatalog.Health_HEALTH_PASSING,
|
||||
workloadHealth: pbcatalog.Health_HEALTH_PASSING,
|
||||
expectedStatus: &pbresource.Condition{
|
||||
Type: StatusConditionHealthy,
|
||||
State: pbresource.Condition_STATE_TRUE,
|
||||
Reason: "HEALTH_PASSING",
|
||||
Message: NodeAndWorkloadHealthyMessage,
|
||||
},
|
||||
},
|
||||
"combined-warning-node": {
|
||||
nodeHealth: pbcatalog.Health_HEALTH_WARNING,
|
||||
workloadHealth: pbcatalog.Health_HEALTH_PASSING,
|
||||
expectedStatus: &pbresource.Condition{
|
||||
Type: StatusConditionHealthy,
|
||||
State: pbresource.Condition_STATE_FALSE,
|
||||
Reason: "HEALTH_WARNING",
|
||||
Message: nodehealth.NodeUnhealthyMessage,
|
||||
},
|
||||
},
|
||||
"combined-warning-workload": {
|
||||
nodeHealth: pbcatalog.Health_HEALTH_PASSING,
|
||||
workloadHealth: pbcatalog.Health_HEALTH_WARNING,
|
||||
expectedStatus: &pbresource.Condition{
|
||||
Type: StatusConditionHealthy,
|
||||
State: pbresource.Condition_STATE_FALSE,
|
||||
Reason: "HEALTH_WARNING",
|
||||
Message: WorkloadUnhealthyMessage,
|
||||
},
|
||||
},
|
||||
"combined-critical-node": {
|
||||
nodeHealth: pbcatalog.Health_HEALTH_CRITICAL,
|
||||
workloadHealth: pbcatalog.Health_HEALTH_WARNING,
|
||||
expectedStatus: &pbresource.Condition{
|
||||
Type: StatusConditionHealthy,
|
||||
State: pbresource.Condition_STATE_FALSE,
|
||||
Reason: "HEALTH_CRITICAL",
|
||||
Message: NodeAndWorkloadUnhealthyMessage,
|
||||
},
|
||||
},
|
||||
"combined-critical-workload": {
|
||||
nodeHealth: pbcatalog.Health_HEALTH_WARNING,
|
||||
workloadHealth: pbcatalog.Health_HEALTH_CRITICAL,
|
||||
expectedStatus: &pbresource.Condition{
|
||||
Type: StatusConditionHealthy,
|
||||
State: pbresource.Condition_STATE_FALSE,
|
||||
Reason: "HEALTH_CRITICAL",
|
||||
Message: NodeAndWorkloadUnhealthyMessage,
|
||||
},
|
||||
},
|
||||
"combined-maintenance-node": {
|
||||
nodeHealth: pbcatalog.Health_HEALTH_MAINTENANCE,
|
||||
workloadHealth: pbcatalog.Health_HEALTH_CRITICAL,
|
||||
expectedStatus: &pbresource.Condition{
|
||||
Type: StatusConditionHealthy,
|
||||
State: pbresource.Condition_STATE_FALSE,
|
||||
Reason: "HEALTH_MAINTENANCE",
|
||||
Message: NodeAndWorkloadUnhealthyMessage,
|
||||
},
|
||||
},
|
||||
"combined-maintenance-workload": {
|
||||
nodeHealth: pbcatalog.Health_HEALTH_CRITICAL,
|
||||
workloadHealth: pbcatalog.Health_HEALTH_MAINTENANCE,
|
||||
expectedStatus: &pbresource.Condition{
|
||||
Type: StatusConditionHealthy,
|
||||
State: pbresource.Condition_STATE_FALSE,
|
||||
Reason: "HEALTH_MAINTENANCE",
|
||||
Message: NodeAndWorkloadUnhealthyMessage,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tcase := range cases {
|
||||
suite.Run(name, func() {
|
||||
if tcase.nodeHealth != pbcatalog.Health_HEALTH_ANY {
|
||||
suite.testReconcileWithNode(tcase.nodeHealth, tcase.workloadHealth, tcase.expectedStatus)
|
||||
} else {
|
||||
suite.testReconcileWithoutNode(tcase.workloadHealth, tcase.expectedStatus)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *workloadHealthControllerTestSuite) TestReconcileReadError() {
|
||||
// This test's goal is to prove that errors other than NotFound from the Resource service
|
||||
// when reading the workload to reconcile will be propagate back to the Reconcile caller.
|
||||
//
|
||||
// Passing a resource with an unknown type isn't particularly realistic as the controller
|
||||
// manager running our reconciliation will ensure all resource ids used are valid. However
|
||||
// its a really easy way right not to force the error.
|
||||
id := resourceID(fakeType, "blah")
|
||||
|
||||
err := suite.reconciler.Reconcile(context.Background(), suite.runtime, controller.Request{ID: id})
|
||||
require.Error(suite.T(), err)
|
||||
require.Equal(suite.T(), codes.InvalidArgument, status.Code(err))
|
||||
}
|
||||
|
||||
func (suite *workloadHealthControllerTestSuite) TestReconcileNotFound() {
|
||||
// This test wants to ensure that tracking for a workload is removed when the workload is deleted
|
||||
// so this test will inject the tracking, issue the Reconcile call which will get a
|
||||
// not found error and then ensure that the tracking was removed.
|
||||
|
||||
workload := resourcetest.Resource(types.WorkloadType, "foo").
|
||||
WithData(suite.T(), workloadData("test-node")).
|
||||
// don't write this because then in the call to reconcile the resource
|
||||
// would be found and defeat the purpose of the tes
|
||||
Build()
|
||||
|
||||
node := resourcetest.Resource(types.NodeType, "test-node").
|
||||
WithData(suite.T(), nodeData).
|
||||
// Whether this gets written or not doesn't matter
|
||||
Build()
|
||||
|
||||
// Track the workload - this simulates a previous round of reconciliation
|
||||
// where the workload existed and was associated to the node. Other tests
|
||||
// will cover more of the lifecycle of the controller so for the purposes
|
||||
// of this test we can just inject it ourselves.
|
||||
suite.mapper.TrackWorkload(workload.Id, node.Id)
|
||||
|
||||
// check that the worklooad is in fact tracked properly
|
||||
reqs, err := suite.mapper.MapNodeToWorkloads(context.Background(), suite.runtime, node)
|
||||
|
||||
require.NoError(suite.T(), err)
|
||||
require.Len(suite.T(), reqs, 1)
|
||||
prototest.AssertDeepEqual(suite.T(), workload.Id, reqs[0].ID)
|
||||
|
||||
// This workload was never actually inserted so the request should return a NotFound
|
||||
// error and remove the workload from tracking
|
||||
require.NoError(
|
||||
suite.T(),
|
||||
suite.reconciler.Reconcile(
|
||||
context.Background(),
|
||||
suite.runtime,
|
||||
controller.Request{ID: workload.Id}))
|
||||
|
||||
// Check the mapper again to ensure the node:workload association was removed.
|
||||
reqs, err = suite.mapper.MapNodeToWorkloads(context.Background(), suite.runtime, node)
|
||||
require.NoError(suite.T(), err)
|
||||
require.Empty(suite.T(), reqs)
|
||||
}
|
||||
|
||||
func (suite *workloadHealthControllerTestSuite) TestGetNodeHealthError() {
|
||||
// This test aims to ensure that errors coming from the getNodeHealth
|
||||
// function are propagated back to the caller. In order to do so
|
||||
// we are going to inject a node but not set its status yet. This
|
||||
// simulates the condition where the workload health controller happened
|
||||
// to start reconciliation before the node health controller. In that
|
||||
// case we also expect the errNodeUnreconciled error to be returned
|
||||
// but the exact error isn't very relevant to the core reason this
|
||||
// test exists.
|
||||
|
||||
node := resourcetest.Resource(types.NodeType, "test-node").
|
||||
WithData(suite.T(), nodeData).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
workload := resourcetest.Resource(types.WorkloadType, "test-workload").
|
||||
WithData(suite.T(), workloadData(node.Id.Name)).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
resourcetest.Resource(types.HealthStatusType, "test-status").
|
||||
WithData(suite.T(), &pbcatalog.HealthStatus{Type: "tcp", Status: pbcatalog.Health_HEALTH_CRITICAL}).
|
||||
WithOwner(workload.Id).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
err := suite.reconciler.Reconcile(context.Background(), suite.runtime, controller.Request{
|
||||
ID: workload.Id,
|
||||
})
|
||||
|
||||
require.Error(suite.T(), err)
|
||||
require.Equal(suite.T(), errNodeUnreconciled, err)
|
||||
}
|
||||
|
||||
func (suite *workloadHealthControllerTestSuite) TestReconcile_AvoidReconciliationWrite() {
|
||||
// The sole purpose of this test is to ensure that calls to Reconcile for an already
|
||||
// reconciled workload will not perform extra/unnecessary status writes. Basically
|
||||
// we check that calling Reconcile twice in a row without any actual health change
|
||||
// doesn't bump the Version (which would increased for any write of the resource
|
||||
// or its status)
|
||||
status := &pbresource.Condition{
|
||||
Type: StatusConditionHealthy,
|
||||
State: pbresource.Condition_STATE_FALSE,
|
||||
Reason: "HEALTH_WARNING",
|
||||
Message: WorkloadUnhealthyMessage,
|
||||
}
|
||||
res1 := suite.testReconcileWithoutNode(pbcatalog.Health_HEALTH_WARNING, status)
|
||||
|
||||
err := suite.reconciler.Reconcile(context.Background(), suite.runtime, controller.Request{ID: res1.Id})
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
// check that the status hasn't changed
|
||||
res2 := suite.checkWorkloadStatus(res1.Id, status)
|
||||
|
||||
// If another status write was performed then the versions would differ. This
|
||||
// therefore proves that after a second reconciliation without any change
|
||||
// in status that the controller is not making extra status writes.
|
||||
require.Equal(suite.T(), res1.Version, res2.Version)
|
||||
}
|
||||
|
||||
func (suite *workloadHealthControllerTestSuite) TestController() {
|
||||
// This test aims to be a very light weight integration test of the
|
||||
// controller with the controller manager as well as a general
|
||||
// controller lifecycle test.
|
||||
|
||||
// create the controller manager
|
||||
mgr := controller.NewManager(suite.client, testutil.Logger(suite.T()))
|
||||
|
||||
// register our controller
|
||||
mgr.Register(WorkloadHealthController(suite.mapper))
|
||||
mgr.SetRaftLeader(true)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
suite.T().Cleanup(cancel)
|
||||
|
||||
// run the manager
|
||||
go mgr.Run(ctx)
|
||||
|
||||
// create a node to link things with
|
||||
node := suite.injectNodeWithStatus("test-node", pbcatalog.Health_HEALTH_PASSING)
|
||||
|
||||
// create the workload
|
||||
workload := resourcetest.Resource(types.WorkloadType, "test-workload").
|
||||
WithData(suite.T(), workloadData(node.Id.Name)).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
// Wait for reconciliation to occur and mark the workload as passing.
|
||||
suite.waitForReconciliation(workload.Id, "HEALTH_PASSING")
|
||||
|
||||
// Simulate a node unhealty
|
||||
suite.injectNodeWithStatus("test-node", pbcatalog.Health_HEALTH_WARNING)
|
||||
|
||||
// Wait for reconciliation to occur and mark the workload as warning
|
||||
// due to the node going into the warning state.
|
||||
suite.waitForReconciliation(workload.Id, "HEALTH_WARNING")
|
||||
|
||||
// Now register a critical health check that should supercede the nodes
|
||||
// warning status
|
||||
|
||||
resourcetest.Resource(types.HealthStatusType, "test-status").
|
||||
WithData(suite.T(), &pbcatalog.HealthStatus{Type: "tcp", Status: pbcatalog.Health_HEALTH_CRITICAL}).
|
||||
WithOwner(workload.Id).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
// Wait for reconciliation to occur again and mark the workload as unhealthy
|
||||
suite.waitForReconciliation(workload.Id, "HEALTH_CRITICAL")
|
||||
|
||||
// Put the health status back into a passing state and delink the node
|
||||
resourcetest.Resource(types.HealthStatusType, "test-status").
|
||||
WithData(suite.T(), &pbcatalog.HealthStatus{Type: "tcp", Status: pbcatalog.Health_HEALTH_PASSING}).
|
||||
WithOwner(workload.Id).
|
||||
Write(suite.T(), suite.client)
|
||||
workload = resourcetest.Resource(types.WorkloadType, "test-workload").
|
||||
WithData(suite.T(), workloadData("")).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
// Now that the workload health is passing and its not associated with the node its status should
|
||||
// eventually become passing
|
||||
suite.waitForReconciliation(workload.Id, "HEALTH_PASSING")
|
||||
}
|
||||
|
||||
// wait for reconciliation is a helper to check if a resource has been reconciled and
|
||||
// is marked with the expected status.
|
||||
func (suite *workloadHealthControllerTestSuite) waitForReconciliation(id *pbresource.ID, reason string) {
|
||||
suite.T().Helper()
|
||||
|
||||
retry.Run(suite.T(), func(r *retry.R) {
|
||||
rsp, err := suite.client.Read(context.Background(), &pbresource.ReadRequest{
|
||||
Id: id,
|
||||
})
|
||||
require.NoError(r, err)
|
||||
|
||||
status, found := rsp.Resource.Status[StatusKey]
|
||||
require.True(r, found)
|
||||
require.Equal(r, rsp.Resource.Generation, status.ObservedGeneration)
|
||||
require.Len(r, status.Conditions, 1)
|
||||
require.Equal(r, reason, status.Conditions[0].Reason)
|
||||
})
|
||||
}
|
||||
|
||||
func TestWorkloadHealthController(t *testing.T) {
|
||||
suite.Run(t, new(workloadHealthControllerTestSuite))
|
||||
}
|
||||
|
||||
type getWorkloadHealthTestSuite struct {
|
||||
controllerSuite
|
||||
}
|
||||
|
||||
func (suite *getWorkloadHealthTestSuite) addHealthStatuses(workload *pbresource.ID, desiredHealth pbcatalog.Health) {
|
||||
// In order to exercise the behavior to ensure that the ordering a health status is
|
||||
// seen doesn't matter this is strategically naming health status so that they will be
|
||||
// returned in an order with the most precedent status being in the middle of the list.
|
||||
// This will ensure that statuses seen later can override a previous status that that
|
||||
// status seen later do not override if they would lower the overall status such as
|
||||
// going from critical -> warning.
|
||||
healthStatuses := []pbcatalog.Health{
|
||||
pbcatalog.Health_HEALTH_PASSING,
|
||||
pbcatalog.Health_HEALTH_WARNING,
|
||||
pbcatalog.Health_HEALTH_CRITICAL,
|
||||
pbcatalog.Health_HEALTH_MAINTENANCE,
|
||||
pbcatalog.Health_HEALTH_CRITICAL,
|
||||
pbcatalog.Health_HEALTH_WARNING,
|
||||
pbcatalog.Health_HEALTH_PASSING,
|
||||
}
|
||||
|
||||
for idx, health := range healthStatuses {
|
||||
if desiredHealth >= health {
|
||||
resourcetest.Resource(types.HealthStatusType, fmt.Sprintf("check-%s-%d", workload.Name, idx)).
|
||||
WithData(suite.T(), &pbcatalog.HealthStatus{Type: "tcp", Status: health}).
|
||||
WithOwner(workload).
|
||||
Write(suite.T(), suite.client)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *getWorkloadHealthTestSuite) TestListError() {
|
||||
// This test's goal is to exercise the error propgataion behavior within
|
||||
// getWorkloadHealth. When the resource listing fails, we want to
|
||||
// propagate the error which should eventually result in retrying
|
||||
// the operation.
|
||||
health, err := getWorkloadHealth(context.Background(), suite.runtime, resourceID(fakeType, "foo"))
|
||||
|
||||
require.Error(suite.T(), err)
|
||||
require.Equal(suite.T(), codes.InvalidArgument, status.Code(err))
|
||||
require.Equal(suite.T(), pbcatalog.Health_HEALTH_CRITICAL, health)
|
||||
}
|
||||
|
||||
func (suite *getWorkloadHealthTestSuite) TestNoHealthStatuses() {
|
||||
// This test's goal is to ensure that when no HealthStatuses are owned by the
|
||||
// workload that the health is assumed to be passing.
|
||||
workload := resourcetest.Resource(types.WorkloadType, "foo").
|
||||
WithData(suite.T(), workloadData("")).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
health, err := getWorkloadHealth(context.Background(), suite.runtime, workload.Id)
|
||||
require.NoError(suite.T(), err)
|
||||
require.Equal(suite.T(), pbcatalog.Health_HEALTH_PASSING, health)
|
||||
}
|
||||
|
||||
func (suite *getWorkloadHealthTestSuite) TestWithStatuses() {
|
||||
// This test's goal is to ensure that the health calculation given multiple
|
||||
// statuses results in the most precedent winning. The addHealthStatuses
|
||||
// helper method is used to inject multiple statuses in a way such that
|
||||
// the resource service will return them in a predictable order and can
|
||||
// properly exercise the code.
|
||||
for value, status := range pbcatalog.Health_name {
|
||||
health := pbcatalog.Health(value)
|
||||
if health == pbcatalog.Health_HEALTH_ANY {
|
||||
continue
|
||||
}
|
||||
|
||||
suite.Run(status, func() {
|
||||
workload := resourcetest.Resource(types.WorkloadType, "foo").
|
||||
WithData(suite.T(), workloadData("")).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
suite.addHealthStatuses(workload.Id, health)
|
||||
|
||||
actualHealth, err := getWorkloadHealth(context.Background(), suite.runtime, workload.Id)
|
||||
require.NoError(suite.T(), err)
|
||||
require.Equal(suite.T(), health, actualHealth)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetWorkloadHealth(t *testing.T) {
|
||||
suite.Run(t, new(getWorkloadHealthTestSuite))
|
||||
}
|
||||
|
||||
type getNodeHealthTestSuite struct {
|
||||
controllerSuite
|
||||
}
|
||||
|
||||
func (suite *getNodeHealthTestSuite) TestNotfound() {
|
||||
// This test's goal is to ensure that getNodeHealth when called with a node id that isn't
|
||||
// present in the system results in a the critical health but no error. This situation
|
||||
// could occur when a linked node gets removed without the workloads being modified/removed.
|
||||
// When that occurs we want to steer traffic away from the linked node as soon as possible.
|
||||
health, err := getNodeHealth(context.Background(), suite.runtime, resourceID(types.NodeType, "not-found"))
|
||||
require.NoError(suite.T(), err)
|
||||
require.Equal(suite.T(), pbcatalog.Health_HEALTH_CRITICAL, health)
|
||||
|
||||
}
|
||||
|
||||
func (suite *getNodeHealthTestSuite) TestReadError() {
|
||||
// This test's goal is to ensure the getNodeHealth propagates unexpected errors from
|
||||
// its resource read call back to the caller.
|
||||
health, err := getNodeHealth(context.Background(), suite.runtime, resourceID(fakeType, "not-found"))
|
||||
require.Error(suite.T(), err)
|
||||
require.Equal(suite.T(), codes.InvalidArgument, status.Code(err))
|
||||
require.Equal(suite.T(), pbcatalog.Health_HEALTH_CRITICAL, health)
|
||||
}
|
||||
|
||||
func (suite *getNodeHealthTestSuite) TestUnreconciled() {
|
||||
// This test's goal is to ensure that nodes with unreconciled health are deemed
|
||||
// critical. Basically, the workload health controller should defer calculating
|
||||
// the workload health until the associated nodes health is known.
|
||||
node := resourcetest.Resource(types.NodeType, "unreconciled").
|
||||
WithData(suite.T(), nodeData).
|
||||
Write(suite.T(), suite.client).
|
||||
GetId()
|
||||
|
||||
health, err := getNodeHealth(context.Background(), suite.runtime, node)
|
||||
require.Error(suite.T(), err)
|
||||
require.Equal(suite.T(), errNodeUnreconciled, err)
|
||||
require.Equal(suite.T(), pbcatalog.Health_HEALTH_CRITICAL, health)
|
||||
}
|
||||
|
||||
func (suite *getNodeHealthTestSuite) TestNoConditions() {
|
||||
// This test's goal is to ensure that if a node's health status doesn't have
|
||||
// the expected condition then its deemedd critical. This should never happen
|
||||
// in the integrated system as the node health controller would have to be
|
||||
// buggy to add an empty status. However it could also indicate some breaking
|
||||
// change went in. Regardless, the code to handle this state is written
|
||||
// and it will be tested here.
|
||||
node := resourcetest.Resource(types.NodeType, "no-conditions").
|
||||
WithData(suite.T(), nodeData).
|
||||
WithStatus(nodehealth.StatusKey, &pbresource.Status{}).
|
||||
Write(suite.T(), suite.client).
|
||||
GetId()
|
||||
|
||||
health, err := getNodeHealth(context.Background(), suite.runtime, node)
|
||||
require.Error(suite.T(), err)
|
||||
require.Equal(suite.T(), errNodeHealthConditionNotFound, err)
|
||||
require.Equal(suite.T(), pbcatalog.Health_HEALTH_CRITICAL, health)
|
||||
}
|
||||
|
||||
func (suite *getNodeHealthTestSuite) TestInvalidReason() {
|
||||
// This test has the same goal as TestNoConditions which is to ensure that if
|
||||
// the node health status isn't properly formed then we assume it is unhealthy.
|
||||
// Just like that other test, it should be impossible for the normal running
|
||||
// system to actually get into this state or at least for the node-health
|
||||
// controller to put it into this state. As users or other controllers could
|
||||
// potentially force it into this state by writing the status themselves, it
|
||||
// would be good to ensure the defined behavior works as expected.
|
||||
node := resourcetest.Resource(types.NodeType, "invalid-reason").
|
||||
WithData(suite.T(), nodeData).
|
||||
WithStatus(nodehealth.StatusKey, &pbresource.Status{
|
||||
Conditions: []*pbresource.Condition{
|
||||
{
|
||||
Type: nodehealth.StatusConditionHealthy,
|
||||
State: pbresource.Condition_STATE_FALSE,
|
||||
Reason: "INVALID_REASON",
|
||||
},
|
||||
},
|
||||
}).
|
||||
Write(suite.T(), suite.client).
|
||||
GetId()
|
||||
|
||||
health, err := getNodeHealth(context.Background(), suite.runtime, node)
|
||||
require.Error(suite.T(), err)
|
||||
require.Equal(suite.T(), errNodeHealthInvalid, err)
|
||||
require.Equal(suite.T(), pbcatalog.Health_HEALTH_CRITICAL, health)
|
||||
}
|
||||
|
||||
func (suite *getNodeHealthTestSuite) TestValidHealth() {
|
||||
// This test aims to ensure that all status that would be reported by the node-health
|
||||
// controller gets accurately detected and returned by the getNodeHealth function.
|
||||
for value, healthStr := range pbcatalog.Health_name {
|
||||
health := pbcatalog.Health(value)
|
||||
|
||||
// this is not a valid health that a health status
|
||||
// may be in.
|
||||
if health == pbcatalog.Health_HEALTH_ANY {
|
||||
continue
|
||||
}
|
||||
|
||||
suite.T().Run(healthStr, func(t *testing.T) {
|
||||
node := suite.injectNodeWithStatus("test-node", health)
|
||||
|
||||
actualHealth, err := getNodeHealth(context.Background(), suite.runtime, node.Id)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, health, actualHealth)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetNodeHealth(t *testing.T) {
|
||||
suite.Run(t, new(getNodeHealthTestSuite))
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
package workloadhealth
|
||||
|
||||
const (
|
||||
StatusKey = "consul.io/workload-health"
|
||||
StatusConditionHealthy = "healthy"
|
||||
|
||||
NodeAndWorkloadHealthyMessage = "All workload and associated node health checks are passing"
|
||||
WorkloadHealthyMessage = "All workload health checks are passing"
|
||||
NodeAndWorkloadUnhealthyMessage = "One or more workload and node health checks are not passing"
|
||||
WorkloadUnhealthyMessage = "One or more workload health checks are not passing"
|
||||
)
|
|
@ -0,0 +1,104 @@
|
|||
package nodemapper
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/consul/internal/catalog/internal/types"
|
||||
"github.com/hashicorp/consul/internal/controller"
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
type NodeMapper struct {
|
||||
lock sync.Mutex
|
||||
nodesToWorkloads map[string][]controller.Request
|
||||
workloadsToNodes map[string]string
|
||||
}
|
||||
|
||||
func New() *NodeMapper {
|
||||
return &NodeMapper{
|
||||
workloadsToNodes: make(map[string]string),
|
||||
nodesToWorkloads: make(map[string][]controller.Request),
|
||||
}
|
||||
}
|
||||
|
||||
// NodeIDFromWorkload will create a resource ID referencing the Node type with the same tenancy as
|
||||
// the workload and with the name populated from the workloads NodeName field.
|
||||
func (m *NodeMapper) NodeIDFromWorkload(workload *pbresource.Resource, workloadData *pbcatalog.Workload) *pbresource.ID {
|
||||
return &pbresource.ID{
|
||||
Type: types.NodeType,
|
||||
Tenancy: workload.Id.Tenancy,
|
||||
Name: workloadData.NodeName,
|
||||
}
|
||||
}
|
||||
|
||||
// MapNodeToWorkloads will take a Node resource and return controller requests
|
||||
// for all Workloads associated with the Node.
|
||||
func (m *NodeMapper) MapNodeToWorkloads(_ context.Context, _ controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
return m.nodesToWorkloads[res.Id.Name], nil
|
||||
}
|
||||
|
||||
// TrackWorkload instructs the NodeMapper to associate the given workload
|
||||
// ID with the given node ID.
|
||||
func (m *NodeMapper) TrackWorkload(workloadID *pbresource.ID, nodeID *pbresource.ID) {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
|
||||
if previousNode, found := m.workloadsToNodes[workloadID.Name]; found && previousNode == nodeID.Name {
|
||||
return
|
||||
} else if found {
|
||||
// the node association is being changed
|
||||
m.untrackWorkloadFromNode(workloadID, previousNode)
|
||||
}
|
||||
|
||||
// Now set up the latest tracking
|
||||
m.nodesToWorkloads[nodeID.Name] = append(m.nodesToWorkloads[nodeID.Name], controller.Request{ID: workloadID})
|
||||
m.workloadsToNodes[workloadID.Name] = nodeID.Name
|
||||
}
|
||||
|
||||
// UntrackWorkload will cause the node mapper to forget about the specified
|
||||
// workload if it is currently tracking it.
|
||||
func (m *NodeMapper) UntrackWorkload(workloadID *pbresource.ID) {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
|
||||
node, found := m.workloadsToNodes[workloadID.Name]
|
||||
if !found {
|
||||
return
|
||||
}
|
||||
m.untrackWorkloadFromNode(workloadID, node)
|
||||
}
|
||||
|
||||
// untrackWorkloadFromNode will disassociate the specified workload and node.
|
||||
// This method will clean up unnecessary tracking entries if the node name
|
||||
// is no longer associated with any workloads.
|
||||
func (m *NodeMapper) untrackWorkloadFromNode(workloadID *pbresource.ID, node string) {
|
||||
foundIdx := -1
|
||||
for idx, req := range m.nodesToWorkloads[node] {
|
||||
if resource.EqualID(req.ID, workloadID) {
|
||||
foundIdx = idx
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if foundIdx != -1 {
|
||||
workloads := m.nodesToWorkloads[node]
|
||||
l := len(workloads)
|
||||
|
||||
if l == 1 {
|
||||
delete(m.nodesToWorkloads, node)
|
||||
} else if foundIdx == l-1 {
|
||||
m.nodesToWorkloads[node] = workloads[:foundIdx]
|
||||
} else if foundIdx == 0 {
|
||||
m.nodesToWorkloads[node] = workloads[1:]
|
||||
} else {
|
||||
m.nodesToWorkloads[node] = append(workloads[:foundIdx], workloads[foundIdx+1:]...)
|
||||
}
|
||||
}
|
||||
|
||||
delete(m.workloadsToNodes, workloadID.Name)
|
||||
}
|
|
@ -0,0 +1,146 @@
|
|||
package nodemapper
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/internal/catalog/internal/types"
|
||||
"github.com/hashicorp/consul/internal/controller"
|
||||
"github.com/hashicorp/consul/internal/resource/resourcetest"
|
||||
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
"github.com/hashicorp/consul/proto/private/prototest"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNodeMapper_NodeIDFromWorkload(t *testing.T) {
|
||||
mapper := New()
|
||||
|
||||
data := &pbcatalog.Workload{
|
||||
NodeName: "test-node",
|
||||
// the other fields should be irrelevant
|
||||
}
|
||||
|
||||
workload := resourcetest.Resource(types.WorkloadType, "test-workload").
|
||||
WithData(t, data).Build()
|
||||
|
||||
actual := mapper.NodeIDFromWorkload(workload, data)
|
||||
expected := &pbresource.ID{
|
||||
Type: types.NodeType,
|
||||
Tenancy: workload.Id.Tenancy,
|
||||
Name: "test-node",
|
||||
}
|
||||
|
||||
prototest.AssertDeepEqual(t, expected, actual)
|
||||
}
|
||||
|
||||
func requireWorkloadsTracked(t *testing.T, mapper *NodeMapper, node *pbresource.Resource, workloads ...*pbresource.ID) {
|
||||
t.Helper()
|
||||
reqs, err := mapper.MapNodeToWorkloads(
|
||||
context.Background(),
|
||||
controller.Runtime{},
|
||||
node)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Len(t, reqs, len(workloads))
|
||||
for _, workload := range workloads {
|
||||
prototest.AssertContainsElement(t, reqs, controller.Request{ID: workload})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeMapper_WorkloadTracking(t *testing.T) {
|
||||
mapper := New()
|
||||
|
||||
node1 := resourcetest.Resource(types.NodeType, "node1").
|
||||
WithData(t, &pbcatalog.Node{Addresses: []*pbcatalog.NodeAddress{{Host: "198.18.0.1"}}}).
|
||||
Build()
|
||||
|
||||
node2 := resourcetest.Resource(types.NodeType, "node2").
|
||||
WithData(t, &pbcatalog.Node{Addresses: []*pbcatalog.NodeAddress{{Host: "198.18.0.2"}}}).
|
||||
Build()
|
||||
|
||||
tenant := &pbresource.Tenancy{
|
||||
Partition: "default",
|
||||
Namespace: "default",
|
||||
PeerName: "local",
|
||||
}
|
||||
|
||||
workload1 := &pbresource.ID{Type: types.WorkloadType, Tenancy: tenant, Name: "workload1"}
|
||||
workload2 := &pbresource.ID{Type: types.WorkloadType, Tenancy: tenant, Name: "workload2"}
|
||||
workload3 := &pbresource.ID{Type: types.WorkloadType, Tenancy: tenant, Name: "workload3"}
|
||||
workload4 := &pbresource.ID{Type: types.WorkloadType, Tenancy: tenant, Name: "workload4"}
|
||||
workload5 := &pbresource.ID{Type: types.WorkloadType, Tenancy: tenant, Name: "workload5"}
|
||||
|
||||
// No Workloads have been tracked so the mapper should return empty lists
|
||||
requireWorkloadsTracked(t, mapper, node1)
|
||||
requireWorkloadsTracked(t, mapper, node2)
|
||||
// As nothing is tracked these should be pretty much no-ops
|
||||
mapper.UntrackWorkload(workload1)
|
||||
mapper.UntrackWorkload(workload2)
|
||||
mapper.UntrackWorkload(workload2)
|
||||
mapper.UntrackWorkload(workload3)
|
||||
mapper.UntrackWorkload(workload4)
|
||||
mapper.UntrackWorkload(workload5)
|
||||
|
||||
// Now track some workloads
|
||||
mapper.TrackWorkload(workload1, node1.Id)
|
||||
mapper.TrackWorkload(workload2, node1.Id)
|
||||
mapper.TrackWorkload(workload3, node2.Id)
|
||||
mapper.TrackWorkload(workload4, node2.Id)
|
||||
|
||||
// Mapping should now return 2 workload requests for each node
|
||||
requireWorkloadsTracked(t, mapper, node1, workload1, workload2)
|
||||
requireWorkloadsTracked(t, mapper, node2, workload3, workload4)
|
||||
|
||||
// Track the same workloads again, this should end up being mostly a no-op
|
||||
mapper.TrackWorkload(workload1, node1.Id)
|
||||
mapper.TrackWorkload(workload2, node1.Id)
|
||||
mapper.TrackWorkload(workload3, node2.Id)
|
||||
mapper.TrackWorkload(workload4, node2.Id)
|
||||
|
||||
// Mappings should be unchanged from the initial workload tracking
|
||||
requireWorkloadsTracked(t, mapper, node1, workload1, workload2)
|
||||
requireWorkloadsTracked(t, mapper, node2, workload3, workload4)
|
||||
|
||||
// Change the workload association for workload2
|
||||
mapper.TrackWorkload(workload2, node2.Id)
|
||||
|
||||
// Node1 should now track just the single workload and node2 should track 3
|
||||
requireWorkloadsTracked(t, mapper, node1, workload1)
|
||||
requireWorkloadsTracked(t, mapper, node2, workload2, workload3, workload4)
|
||||
|
||||
// Untrack the workloads - this is done in very specific ordering to ensure all
|
||||
// the workload tracking removal paths get hit. This does assume that the ordering
|
||||
// of requests is stable between removals.
|
||||
|
||||
// remove the one and only workload from a node
|
||||
mapper.UntrackWorkload(workload1)
|
||||
requireWorkloadsTracked(t, mapper, node1)
|
||||
|
||||
// track an additional workload
|
||||
mapper.TrackWorkload(workload5, node2.Id)
|
||||
reqs, err := mapper.MapNodeToWorkloads(context.Background(), controller.Runtime{}, node2)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, reqs, 4)
|
||||
|
||||
first := reqs[0].ID
|
||||
second := reqs[1].ID
|
||||
third := reqs[2].ID
|
||||
fourth := reqs[3].ID
|
||||
|
||||
// remove from the middle of the request list
|
||||
mapper.UntrackWorkload(second)
|
||||
requireWorkloadsTracked(t, mapper, node2, first, third, fourth)
|
||||
|
||||
// remove from the end of the list
|
||||
mapper.UntrackWorkload(fourth)
|
||||
requireWorkloadsTracked(t, mapper, node2, first, third)
|
||||
|
||||
// remove from the beginning of the list
|
||||
mapper.UntrackWorkload(first)
|
||||
requireWorkloadsTracked(t, mapper, node2, third)
|
||||
|
||||
// remove the last element
|
||||
mapper.UntrackWorkload(third)
|
||||
requireWorkloadsTracked(t, mapper, node2)
|
||||
}
|
Loading…
Reference in New Issue