mirror of
https://github.com/status-im/consul.git
synced 2025-02-22 02:18:25 +00:00
xds controller: resolve ServiceEndpoints references in ProxyStateTemp… (#18544)
xds controller: resolve ServiceEndpoints references in ProxyStateTemplate
This commit is contained in:
parent
55723c541e
commit
0d60380214
156
internal/mesh/internal/controllers/xds/controller.go
Normal file
156
internal/mesh/internal/controllers/xds/controller.go
Normal file
@ -0,0 +1,156 @@
|
||||
package xds
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/hashicorp/consul/internal/catalog"
|
||||
"github.com/hashicorp/consul/internal/controller"
|
||||
"github.com/hashicorp/consul/internal/mesh/internal/controllers/xds/status"
|
||||
"github.com/hashicorp/consul/internal/mesh/internal/types"
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
"github.com/hashicorp/consul/internal/resource/mappers/bimapper"
|
||||
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1"
|
||||
"github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
const ControllerName = "consul.io/xds-controller"
|
||||
|
||||
func Controller(mapper *bimapper.Mapper, updater ProxyUpdater) controller.Controller {
|
||||
if mapper == nil || updater == nil {
|
||||
panic("mapper and updater are required")
|
||||
}
|
||||
|
||||
return controller.ForType(types.ProxyStateTemplateType).
|
||||
WithWatch(catalog.ServiceEndpointsType, mapper.MapLink).
|
||||
WithPlacement(controller.PlacementEachServer).
|
||||
WithReconciler(&xdsReconciler{bimapper: mapper, updater: updater})
|
||||
}
|
||||
|
||||
type xdsReconciler struct {
|
||||
bimapper *bimapper.Mapper
|
||||
updater ProxyUpdater
|
||||
}
|
||||
|
||||
// ProxyUpdater is an interface that defines the ability to push proxy updates to the updater
|
||||
// and also check its connectivity to the server.
|
||||
type ProxyUpdater interface {
|
||||
// PushChange allows pushing a computed ProxyState to xds for xds resource generation to send to a proxy.
|
||||
PushChange(id *pbresource.ID, snapshot *pbmesh.ProxyState) error
|
||||
|
||||
// ProxyConnectedToServer returns whether this id is connected to this server.
|
||||
ProxyConnectedToServer(id *pbresource.ID) bool
|
||||
}
|
||||
|
||||
func (r *xdsReconciler) Reconcile(ctx context.Context, rt controller.Runtime, req controller.Request) error {
|
||||
rt.Logger = rt.Logger.With("resource-id", req.ID, "controller", ControllerName)
|
||||
|
||||
rt.Logger.Trace("reconciling proxy state template", "id", req.ID)
|
||||
|
||||
// Get the ProxyStateTemplate.
|
||||
proxyStateTemplate, err := getProxyStateTemplate(ctx, rt, req.ID)
|
||||
if err != nil {
|
||||
rt.Logger.Error("error reading proxy state template", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if proxyStateTemplate == nil || proxyStateTemplate.Template == nil || !r.updater.ProxyConnectedToServer(req.ID) {
|
||||
rt.Logger.Trace("proxy state template has been deleted or this controller is not responsible for this proxy state template", "id", req.ID)
|
||||
|
||||
// If the proxy state was deleted, we should remove references to it in the mapper.
|
||||
r.bimapper.UntrackItem(req.ID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var (
|
||||
statusCondition *pbresource.Condition
|
||||
pstResource *pbresource.Resource
|
||||
)
|
||||
pstResource = proxyStateTemplate.Resource
|
||||
|
||||
// Initialize the ProxyState endpoints map.
|
||||
if proxyStateTemplate.Template.ProxyState == nil {
|
||||
rt.Logger.Error("proxy state was missing from proxy state template")
|
||||
// Set the status.
|
||||
statusCondition = status.ConditionRejectedNilProxyState(status.KeyFromID(req.ID))
|
||||
status.WriteStatusIfChanged(ctx, rt, pstResource, statusCondition)
|
||||
|
||||
return err
|
||||
}
|
||||
if proxyStateTemplate.Template.ProxyState.Endpoints == nil {
|
||||
proxyStateTemplate.Template.ProxyState.Endpoints = make(map[string]*pbproxystate.Endpoints)
|
||||
}
|
||||
|
||||
// Iterate through the endpoint references.
|
||||
// For endpoints, the controller should:
|
||||
// 1. Resolve ServiceEndpoint references
|
||||
// 2. Translate them into pbproxystate.Endpoints
|
||||
// 3. Add the pbproxystate.Endpoints to the ProxyState endpoints map.
|
||||
// 4. Track relationships between ProxyState and ServiceEndpoints, such that we can look up ServiceEndpoints and
|
||||
// figure out which ProxyStates are associated with it (for mapping watches) and vice versa (for looking up
|
||||
// references). The bimapper package is useful for tracking these relationships.
|
||||
endpointReferencesMap := proxyStateTemplate.Template.RequiredEndpoints
|
||||
var endpointsInProxyStateTemplate []resource.ReferenceOrID
|
||||
for xdsClusterName, endpointRef := range endpointReferencesMap {
|
||||
|
||||
// Step 1: Resolve the reference by looking up the ServiceEndpoints.
|
||||
// serviceEndpoints will not be nil unless there is an error.
|
||||
serviceEndpoints, err := getServiceEndpoints(ctx, rt, endpointRef.Id)
|
||||
if err != nil {
|
||||
rt.Logger.Error("error reading service endpoint", "id", endpointRef.Id, "error", err)
|
||||
// Set the status.
|
||||
statusCondition = status.ConditionRejectedErrorReadingEndpoints(status.KeyFromID(endpointRef.Id), err.Error())
|
||||
status.WriteStatusIfChanged(ctx, rt, pstResource, statusCondition)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Step 2: Translate it into pbproxystate.Endpoints.
|
||||
psEndpoints, err := generateProxyStateEndpoints(serviceEndpoints, endpointRef.Port)
|
||||
if err != nil {
|
||||
rt.Logger.Error("error translating service endpoints to proxy state endpoints", "endpoint", endpointRef.Id, "error", err)
|
||||
|
||||
// Set the status.
|
||||
statusCondition = status.ConditionRejectedCreatingProxyStateEndpoints(status.KeyFromID(endpointRef.Id), err.Error())
|
||||
status.WriteStatusIfChanged(ctx, rt, pstResource, statusCondition)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Step 3: Add the endpoints to ProxyState.
|
||||
proxyStateTemplate.Template.ProxyState.Endpoints[xdsClusterName] = psEndpoints
|
||||
|
||||
// Track all the endpoints that are used by this ProxyStateTemplate, so we can use this for step 4.
|
||||
endpointResourceRef := resource.Reference(endpointRef.Id, "")
|
||||
endpointsInProxyStateTemplate = append(endpointsInProxyStateTemplate, endpointResourceRef)
|
||||
|
||||
}
|
||||
|
||||
// Step 4: Track relationships between ProxyStateTemplates and ServiceEndpoints.
|
||||
r.bimapper.TrackItem(req.ID, endpointsInProxyStateTemplate)
|
||||
|
||||
computedProxyState := proxyStateTemplate.Template.ProxyState
|
||||
|
||||
err = r.updater.PushChange(req.ID, computedProxyState)
|
||||
if err != nil {
|
||||
// Set the status.
|
||||
statusCondition = status.ConditionRejectedPushChangeFailed(status.KeyFromID(req.ID))
|
||||
status.WriteStatusIfChanged(ctx, rt, pstResource, statusCondition)
|
||||
return err
|
||||
}
|
||||
|
||||
// Set the status.
|
||||
statusCondition = status.ConditionAccepted()
|
||||
status.WriteStatusIfChanged(ctx, rt, pstResource, statusCondition)
|
||||
return nil
|
||||
}
|
||||
|
||||
func resourceIdToReference(id *pbresource.ID) *pbresource.Reference {
|
||||
ref := &pbresource.Reference{
|
||||
Name: id.GetName(),
|
||||
Type: id.GetType(),
|
||||
Tenancy: id.GetTenancy(),
|
||||
}
|
||||
return ref
|
||||
}
|
664
internal/mesh/internal/controllers/xds/controller_test.go
Normal file
664
internal/mesh/internal/controllers/xds/controller_test.go
Normal file
@ -0,0 +1,664 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package xds
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing"
|
||||
"github.com/hashicorp/consul/internal/catalog"
|
||||
"github.com/hashicorp/consul/internal/controller"
|
||||
"github.com/hashicorp/consul/internal/mesh/internal/controllers/xds/status"
|
||||
"github.com/hashicorp/consul/internal/mesh/internal/types"
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
"github.com/hashicorp/consul/internal/resource/mappers/bimapper"
|
||||
"github.com/hashicorp/consul/internal/resource/resourcetest"
|
||||
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"
|
||||
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1"
|
||||
"github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
"github.com/hashicorp/consul/proto/private/prototest"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
)
|
||||
|
||||
type xdsControllerTestSuite struct {
|
||||
suite.Suite
|
||||
|
||||
ctx context.Context
|
||||
client *resourcetest.Client
|
||||
runtime controller.Runtime
|
||||
|
||||
ctl *xdsReconciler
|
||||
mapper *bimapper.Mapper
|
||||
updater *mockUpdater
|
||||
|
||||
fooProxyStateTemplate *pbresource.Resource
|
||||
barProxyStateTemplate *pbresource.Resource
|
||||
barEndpointRefs map[string]*pbproxystate.EndpointRef
|
||||
fooEndpointRefs map[string]*pbproxystate.EndpointRef
|
||||
fooEndpoints *pbresource.Resource
|
||||
fooService *pbresource.Resource
|
||||
fooBarEndpoints *pbresource.Resource
|
||||
fooBarService *pbresource.Resource
|
||||
expectedFooProxyStateEndpoints map[string]*pbproxystate.Endpoints
|
||||
expectedBarProxyStateEndpoints map[string]*pbproxystate.Endpoints
|
||||
}
|
||||
|
||||
func (suite *xdsControllerTestSuite) SetupTest() {
|
||||
suite.ctx = testutil.TestContext(suite.T())
|
||||
resourceClient := svctest.RunResourceService(suite.T(), types.Register, catalog.RegisterTypes)
|
||||
suite.runtime = controller.Runtime{Client: resourceClient, Logger: testutil.Logger(suite.T())}
|
||||
suite.client = resourcetest.NewClient(resourceClient)
|
||||
|
||||
suite.mapper = bimapper.New(types.ProxyStateTemplateType, catalog.ServiceEndpointsType)
|
||||
suite.updater = NewMockUpdater()
|
||||
|
||||
suite.ctl = &xdsReconciler{
|
||||
bimapper: suite.mapper,
|
||||
updater: suite.updater,
|
||||
}
|
||||
}
|
||||
|
||||
// This test ensures when a ProxyState is deleted, it is no longer tracked in the mapper.
|
||||
func (suite *xdsControllerTestSuite) TestReconcile_NoProxyStateTemplate() {
|
||||
// Track the id of a non-existent ProxyStateTemplate.
|
||||
proxyStateTemplateId := resourcetest.Resource(types.ProxyStateTemplateType, "not-found").ID()
|
||||
suite.mapper.TrackItem(proxyStateTemplateId, []resource.ReferenceOrID{})
|
||||
|
||||
// Run the reconcile, and since no ProxyStateTemplate is stored, this simulates a deletion.
|
||||
err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{
|
||||
ID: proxyStateTemplateId,
|
||||
})
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
// Assert that nothing is tracked in the mapper.
|
||||
require.True(suite.T(), suite.mapper.IsEmpty())
|
||||
}
|
||||
|
||||
// This test ensures if the controller was previously tracking a ProxyStateTemplate, and now that proxy has
|
||||
// disconnected from this server, it's ignored and removed from the mapper.
|
||||
func (suite *xdsControllerTestSuite) TestReconcile_RemoveTrackingProxiesNotConnectedToServer() {
|
||||
// Store the initial ProxyStateTemplate and track it in the mapper.
|
||||
proxyStateTemplate := resourcetest.Resource(types.ProxyStateTemplateType, "test").
|
||||
WithData(suite.T(), &pbmesh.ProxyStateTemplate{}).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
suite.mapper.TrackItem(proxyStateTemplate.Id, []resource.ReferenceOrID{})
|
||||
|
||||
// Simulate the proxy disconnecting from this server. The resource still exists, but this proxy might be connected
|
||||
// to a different server now, so we no longer need to track it.
|
||||
suite.updater.notConnected = true
|
||||
|
||||
// Run the reconcile.
|
||||
err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{
|
||||
ID: proxyStateTemplate.Id,
|
||||
})
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
// Assert that nothing is tracked in the mapper.
|
||||
require.True(suite.T(), suite.mapper.IsEmpty())
|
||||
}
|
||||
|
||||
// This test sets up the updater to return an error when calling PushChange, and ensures the status is set
|
||||
// correctly.
|
||||
func (suite *xdsControllerTestSuite) TestReconcile_PushChangeError() {
|
||||
// Have the mock simulate an error from the PushChange call.
|
||||
suite.updater.pushChangeError = true
|
||||
|
||||
// Setup a happy path scenario.
|
||||
suite.setupFooProxyStateTemplateAndEndpoints()
|
||||
|
||||
// Run the reconcile.
|
||||
err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{
|
||||
ID: suite.fooProxyStateTemplate.Id,
|
||||
})
|
||||
require.Error(suite.T(), err)
|
||||
|
||||
// Assert on the status reflecting endpoint not found.
|
||||
suite.client.RequireStatusCondition(suite.T(), suite.fooProxyStateTemplate.Id, ControllerName, status.ConditionRejectedPushChangeFailed(status.KeyFromID(suite.fooProxyStateTemplate.Id)))
|
||||
}
|
||||
|
||||
// This test sets up a ProxyStateTemplate that references a ServiceEndpoints that doesn't exist, and ensures the
|
||||
// status is correct.
|
||||
func (suite *xdsControllerTestSuite) TestReconcile_MissingEndpoint() {
|
||||
// Set fooProxyStateTemplate with a reference to fooEndpoints, without storing fooEndpoints so the controller should
|
||||
// notice it's missing.
|
||||
fooEndpointsId := resourcetest.Resource(catalog.ServiceEndpointsType, "foo-service").ID()
|
||||
fooRequiredEndpoints := make(map[string]*pbproxystate.EndpointRef)
|
||||
fooRequiredEndpoints["test-cluster-1"] = &pbproxystate.EndpointRef{
|
||||
Id: fooEndpointsId,
|
||||
Port: "mesh",
|
||||
}
|
||||
|
||||
fooProxyStateTemplate := resourcetest.Resource(types.ProxyStateTemplateType, "foo-pst").
|
||||
WithData(suite.T(), &pbmesh.ProxyStateTemplate{
|
||||
RequiredEndpoints: fooRequiredEndpoints,
|
||||
ProxyState: &pbmesh.ProxyState{},
|
||||
}).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
retry.Run(suite.T(), func(r *retry.R) {
|
||||
suite.client.RequireResourceExists(r, fooProxyStateTemplate.Id)
|
||||
})
|
||||
|
||||
// Run the reconcile.
|
||||
err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{
|
||||
ID: fooProxyStateTemplate.Id,
|
||||
})
|
||||
require.Error(suite.T(), err)
|
||||
|
||||
// Assert on the status reflecting endpoint not found.
|
||||
suite.client.RequireStatusCondition(suite.T(), fooProxyStateTemplate.Id, ControllerName, status.ConditionRejectedErrorReadingEndpoints(status.KeyFromID(fooEndpointsId), "rpc error: code = NotFound desc = resource not found"))
|
||||
}
|
||||
|
||||
// This test sets up a ProxyStateTemplate that references a ServiceEndpoints that can't be read correctly, and
|
||||
// checks the status is correct.
|
||||
func (suite *xdsControllerTestSuite) TestReconcile_ReadEndpointError() {
|
||||
badID := &pbresource.ID{
|
||||
Type: &pbresource.Type{
|
||||
Group: "not",
|
||||
Kind: "found",
|
||||
GroupVersion: "vfake",
|
||||
},
|
||||
Tenancy: &pbresource.Tenancy{Namespace: "default", Partition: "default", PeerName: "local"},
|
||||
}
|
||||
fooRequiredEndpoints := make(map[string]*pbproxystate.EndpointRef)
|
||||
fooRequiredEndpoints["test-cluster-1"] = &pbproxystate.EndpointRef{
|
||||
Id: badID,
|
||||
Port: "mesh",
|
||||
}
|
||||
|
||||
fooProxyStateTemplate := resourcetest.Resource(types.ProxyStateTemplateType, "foo-pst").
|
||||
WithData(suite.T(), &pbmesh.ProxyStateTemplate{
|
||||
RequiredEndpoints: fooRequiredEndpoints,
|
||||
ProxyState: &pbmesh.ProxyState{},
|
||||
}).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
retry.Run(suite.T(), func(r *retry.R) {
|
||||
suite.client.RequireResourceExists(r, fooProxyStateTemplate.Id)
|
||||
})
|
||||
|
||||
// Run the reconcile.
|
||||
err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{
|
||||
ID: fooProxyStateTemplate.Id,
|
||||
})
|
||||
require.Error(suite.T(), err)
|
||||
|
||||
// Assert on the status reflecting endpoint couldn't be read.
|
||||
suite.client.RequireStatusCondition(suite.T(), fooProxyStateTemplate.Id, ControllerName, status.ConditionRejectedErrorReadingEndpoints(status.KeyFromID(badID), "rpc error: code = InvalidArgument desc = id.name is required"))
|
||||
}
|
||||
|
||||
// This test is a happy path creation test to make sure pbproxystate.Endpoints are created in the computed
|
||||
// pbmesh.ProxyState from the RequiredEndpoints references. More specific translations between endpoint references
|
||||
// and pbproxystate.Endpoints are unit tested in endpoint_builder.go.
|
||||
func (suite *xdsControllerTestSuite) TestReconcile_ProxyStateTemplateComputesEndpoints() {
|
||||
// Set up fooEndpoints and fooProxyStateTemplate with a reference to fooEndpoints and store them in the state store.
|
||||
// This setup saves expected values in the suite so it can be asserted against later.
|
||||
suite.setupFooProxyStateTemplateAndEndpoints()
|
||||
|
||||
// Run the reconcile.
|
||||
err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{
|
||||
ID: suite.fooProxyStateTemplate.Id,
|
||||
})
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
// Assert on the status.
|
||||
suite.client.RequireStatusCondition(suite.T(), suite.fooProxyStateTemplate.Id, ControllerName, status.ConditionAccepted())
|
||||
|
||||
// Assert that the endpoints computed in the controller matches the expected endpoints.
|
||||
actualEndpoints := suite.updater.GetEndpoints(suite.fooProxyStateTemplate.Id.Name)
|
||||
prototest.AssertDeepEqual(suite.T(), suite.expectedFooProxyStateEndpoints, actualEndpoints)
|
||||
}
|
||||
|
||||
// This test is a happy path creation test that calls reconcile multiple times with a more complex setup. This
|
||||
// scenario is trickier to test in the controller test because the end computation of the xds controller is not
|
||||
// stored in the state store. So this test ensures that between multiple reconciles the correct ProxyStates are
|
||||
// computed for each ProxyStateTemplate.
|
||||
func (suite *xdsControllerTestSuite) TestReconcile_MultipleProxyStateTemplatesComputesMultipleEndpoints() {
|
||||
// Set up fooProxyStateTemplate and barProxyStateTemplate and their associated resources and store them. Resources
|
||||
// and expected results are stored in the suite to assert against.
|
||||
suite.setupFooBarProxyStateTemplateAndEndpoints()
|
||||
|
||||
// Reconcile the fooProxyStateTemplate.
|
||||
err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{
|
||||
ID: suite.fooProxyStateTemplate.Id,
|
||||
})
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
// Assert on the status.
|
||||
suite.client.RequireStatusCondition(suite.T(), suite.fooProxyStateTemplate.Id, ControllerName, status.ConditionAccepted())
|
||||
|
||||
// Assert that the endpoints computed in the controller matches the expected endpoints.
|
||||
actualEndpoints := suite.updater.GetEndpoints(suite.fooProxyStateTemplate.Id.Name)
|
||||
prototest.AssertDeepEqual(suite.T(), suite.expectedFooProxyStateEndpoints, actualEndpoints)
|
||||
|
||||
// Reconcile the barProxyStateTemplate.
|
||||
err = suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{
|
||||
ID: suite.barProxyStateTemplate.Id,
|
||||
})
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
// Assert on the status.
|
||||
suite.client.RequireStatusCondition(suite.T(), suite.barProxyStateTemplate.Id, ControllerName, status.ConditionAccepted())
|
||||
|
||||
// Assert that the endpoints computed in the controller matches the expected endpoints.
|
||||
actualBarEndpoints := suite.updater.GetEndpoints(suite.barProxyStateTemplate.Id.Name)
|
||||
prototest.AssertDeepEqual(suite.T(), suite.expectedBarProxyStateEndpoints, actualBarEndpoints)
|
||||
}
|
||||
|
||||
// Sets up a full controller, and tests that reconciles are getting triggered for the events it should.
|
||||
func (suite *xdsControllerTestSuite) TestController_ComputeAddUpdateEndpoints() {
|
||||
// Run the controller manager.
|
||||
mgr := controller.NewManager(suite.client, suite.runtime.Logger)
|
||||
mgr.Register(Controller(suite.mapper, suite.updater))
|
||||
mgr.SetRaftLeader(true)
|
||||
go mgr.Run(suite.ctx)
|
||||
|
||||
// Set up fooEndpoints and fooProxyStateTemplate with a reference to fooEndpoints. These need to be stored
|
||||
// because the controller reconcile looks them up.
|
||||
suite.setupFooProxyStateTemplateAndEndpoints()
|
||||
|
||||
// Assert that the expected ProxyState matches the actual ProxyState that PushChange was called with. This needs to
|
||||
// be in a retry block unlike the Reconcile tests because the controller triggers asynchronously.
|
||||
retry.Run(suite.T(), func(r *retry.R) {
|
||||
actualEndpoints := suite.updater.GetEndpoints(suite.fooProxyStateTemplate.Id.Name)
|
||||
// Assert on the status.
|
||||
suite.client.RequireStatusCondition(r, suite.fooProxyStateTemplate.Id, ControllerName, status.ConditionAccepted())
|
||||
// Assert that the endpoints computed in the controller matches the expected endpoints.
|
||||
prototest.AssertDeepEqual(r, suite.expectedFooProxyStateEndpoints, actualEndpoints)
|
||||
})
|
||||
|
||||
// Now, update the endpoint to be unhealthy. This will ensure the controller is getting triggered on changes to this
|
||||
// endpoint that it should be tracking, even when the ProxyStateTemplate does not change.
|
||||
resourcetest.Resource(catalog.ServiceEndpointsType, "foo-service").
|
||||
WithData(suite.T(), &pbcatalog.ServiceEndpoints{Endpoints: []*pbcatalog.Endpoint{
|
||||
{
|
||||
Ports: map[string]*pbcatalog.WorkloadPort{
|
||||
"mesh": {
|
||||
Port: 20000,
|
||||
Protocol: pbcatalog.Protocol_PROTOCOL_MESH,
|
||||
},
|
||||
},
|
||||
Addresses: []*pbcatalog.WorkloadAddress{
|
||||
{
|
||||
Host: "10.1.1.1",
|
||||
Ports: []string{"mesh"},
|
||||
},
|
||||
{
|
||||
Host: "10.2.2.2",
|
||||
Ports: []string{"mesh"},
|
||||
},
|
||||
},
|
||||
HealthStatus: pbcatalog.Health_HEALTH_CRITICAL,
|
||||
},
|
||||
}}).
|
||||
WithOwner(suite.fooService.Id).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
// Wait for the endpoint to be written.
|
||||
retry.Run(suite.T(), func(r *retry.R) {
|
||||
suite.client.RequireVersionChanged(suite.T(), suite.fooEndpoints.Id, suite.fooEndpoints.Version)
|
||||
})
|
||||
|
||||
// Update the expected endpoints to also have unhealthy status.
|
||||
suite.expectedFooProxyStateEndpoints["test-cluster-1"].Endpoints[0].HealthStatus = pbproxystate.HealthStatus_HEALTH_STATUS_UNHEALTHY
|
||||
suite.expectedFooProxyStateEndpoints["test-cluster-1"].Endpoints[1].HealthStatus = pbproxystate.HealthStatus_HEALTH_STATUS_UNHEALTHY
|
||||
|
||||
retry.Run(suite.T(), func(r *retry.R) {
|
||||
actualEndpoints := suite.updater.GetEndpoints(suite.fooProxyStateTemplate.Id.Name)
|
||||
// Assert on the status.
|
||||
suite.client.RequireStatusCondition(suite.T(), suite.fooProxyStateTemplate.Id, ControllerName, status.ConditionAccepted())
|
||||
// Assert that the endpoints computed in the controller matches the expected endpoints.
|
||||
prototest.AssertDeepEqual(r, suite.expectedFooProxyStateEndpoints, actualEndpoints)
|
||||
})
|
||||
|
||||
// Now add a new endpoint reference and endpoint to the fooProxyStateTemplate. This will ensure that the controller
|
||||
// now tracks the newly added endpoint.
|
||||
secondService := resourcetest.Resource(catalog.ServiceType, "second-service").
|
||||
WithData(suite.T(), &pbcatalog.Service{}).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
secondEndpoints := resourcetest.Resource(catalog.ServiceEndpointsType, "second-service").
|
||||
WithData(suite.T(), &pbcatalog.ServiceEndpoints{Endpoints: []*pbcatalog.Endpoint{
|
||||
{
|
||||
Ports: map[string]*pbcatalog.WorkloadPort{
|
||||
"mesh": {
|
||||
Port: 20000,
|
||||
Protocol: pbcatalog.Protocol_PROTOCOL_MESH,
|
||||
},
|
||||
},
|
||||
Addresses: []*pbcatalog.WorkloadAddress{
|
||||
{
|
||||
Host: "10.5.5.5",
|
||||
Ports: []string{"mesh"},
|
||||
},
|
||||
{
|
||||
Host: "10.6.6.6",
|
||||
Ports: []string{"mesh"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}}).
|
||||
WithOwner(secondService.Id).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
// Update the endpoint references on the fooProxyStateTemplate.
|
||||
suite.fooEndpointRefs["test-cluster-2"] = &pbproxystate.EndpointRef{
|
||||
Id: secondEndpoints.Id,
|
||||
Port: "mesh",
|
||||
}
|
||||
oldVersion := suite.fooProxyStateTemplate.Version
|
||||
fooProxyStateTemplate := resourcetest.Resource(types.ProxyStateTemplateType, "foo-pst").
|
||||
WithData(suite.T(), &pbmesh.ProxyStateTemplate{
|
||||
RequiredEndpoints: suite.fooEndpointRefs,
|
||||
ProxyState: &pbmesh.ProxyState{},
|
||||
}).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
retry.Run(suite.T(), func(r *retry.R) {
|
||||
suite.client.RequireVersionChanged(r, fooProxyStateTemplate.Id, oldVersion)
|
||||
})
|
||||
|
||||
// Update the expected endpoints with this new endpoints.
|
||||
suite.expectedFooProxyStateEndpoints["test-cluster-2"] = &pbproxystate.Endpoints{
|
||||
Endpoints: []*pbproxystate.Endpoint{
|
||||
{
|
||||
Address: &pbproxystate.Endpoint_HostPort{
|
||||
HostPort: &pbproxystate.HostPortAddress{
|
||||
Host: "10.5.5.5",
|
||||
Port: 20000,
|
||||
},
|
||||
},
|
||||
HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY,
|
||||
},
|
||||
{
|
||||
Address: &pbproxystate.Endpoint_HostPort{
|
||||
HostPort: &pbproxystate.HostPortAddress{
|
||||
Host: "10.6.6.6",
|
||||
Port: 20000,
|
||||
},
|
||||
},
|
||||
HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
retry.Run(suite.T(), func(r *retry.R) {
|
||||
actualEndpoints := suite.updater.GetEndpoints(suite.fooProxyStateTemplate.Id.Name)
|
||||
// Assert on the status.
|
||||
suite.client.RequireStatusCondition(suite.T(), suite.fooProxyStateTemplate.Id, ControllerName, status.ConditionAccepted())
|
||||
// Assert that the endpoints computed in the controller matches the expected endpoints.
|
||||
prototest.AssertDeepEqual(r, suite.expectedFooProxyStateEndpoints, actualEndpoints)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// Setup: fooProxyStateTemplate with an EndpointsRef to fooEndpoints
|
||||
// Saves all related resources to the suite so they can be modified if needed.
|
||||
func (suite *xdsControllerTestSuite) setupFooProxyStateTemplateAndEndpoints() {
|
||||
fooService := resourcetest.Resource(catalog.ServiceType, "foo-service").
|
||||
WithData(suite.T(), &pbcatalog.Service{}).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
fooEndpoints := resourcetest.Resource(catalog.ServiceEndpointsType, "foo-service").
|
||||
WithData(suite.T(), &pbcatalog.ServiceEndpoints{Endpoints: []*pbcatalog.Endpoint{
|
||||
{
|
||||
Ports: map[string]*pbcatalog.WorkloadPort{
|
||||
"mesh": {
|
||||
Port: 20000,
|
||||
Protocol: pbcatalog.Protocol_PROTOCOL_MESH,
|
||||
},
|
||||
},
|
||||
Addresses: []*pbcatalog.WorkloadAddress{
|
||||
{
|
||||
Host: "10.1.1.1",
|
||||
Ports: []string{"mesh"},
|
||||
},
|
||||
{
|
||||
Host: "10.2.2.2",
|
||||
Ports: []string{"mesh"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}}).
|
||||
WithOwner(fooService.Id).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
fooRequiredEndpoints := make(map[string]*pbproxystate.EndpointRef)
|
||||
fooRequiredEndpoints["test-cluster-1"] = &pbproxystate.EndpointRef{
|
||||
Id: fooEndpoints.Id,
|
||||
Port: "mesh",
|
||||
}
|
||||
|
||||
fooProxyStateTemplate := resourcetest.Resource(types.ProxyStateTemplateType, "foo-pst").
|
||||
WithData(suite.T(), &pbmesh.ProxyStateTemplate{
|
||||
RequiredEndpoints: fooRequiredEndpoints,
|
||||
ProxyState: &pbmesh.ProxyState{},
|
||||
}).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
retry.Run(suite.T(), func(r *retry.R) {
|
||||
suite.client.RequireResourceExists(r, fooProxyStateTemplate.Id)
|
||||
})
|
||||
|
||||
expectedFooProxyStateEndpoints := map[string]*pbproxystate.Endpoints{
|
||||
"test-cluster-1": {Endpoints: []*pbproxystate.Endpoint{
|
||||
{
|
||||
Address: &pbproxystate.Endpoint_HostPort{
|
||||
HostPort: &pbproxystate.HostPortAddress{
|
||||
Host: "10.1.1.1",
|
||||
Port: 20000,
|
||||
},
|
||||
},
|
||||
HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY,
|
||||
},
|
||||
{
|
||||
Address: &pbproxystate.Endpoint_HostPort{
|
||||
HostPort: &pbproxystate.HostPortAddress{
|
||||
Host: "10.2.2.2",
|
||||
Port: 20000,
|
||||
},
|
||||
},
|
||||
HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY,
|
||||
},
|
||||
}},
|
||||
}
|
||||
suite.fooService = fooService
|
||||
suite.fooEndpoints = fooEndpoints
|
||||
suite.fooEndpointRefs = fooRequiredEndpoints
|
||||
suite.fooProxyStateTemplate = fooProxyStateTemplate
|
||||
suite.expectedFooProxyStateEndpoints = expectedFooProxyStateEndpoints
|
||||
|
||||
}
|
||||
|
||||
// Setup:
|
||||
// - fooProxyStateTemplate with an EndpointsRef to fooEndpoints and fooBarEndpoints.
|
||||
// - barProxyStateTemplate with an EndpointsRef to fooBarEndpoints.
|
||||
//
|
||||
// Saves all related resources to the suite so they can be modified if needed.
|
||||
func (suite *xdsControllerTestSuite) setupFooBarProxyStateTemplateAndEndpoints() {
|
||||
fooService := resourcetest.Resource(catalog.ServiceType, "foo-service").
|
||||
WithData(suite.T(), &pbcatalog.Service{}).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
fooEndpoints := resourcetest.Resource(catalog.ServiceEndpointsType, "foo-service").
|
||||
WithData(suite.T(), &pbcatalog.ServiceEndpoints{Endpoints: []*pbcatalog.Endpoint{
|
||||
{
|
||||
Ports: map[string]*pbcatalog.WorkloadPort{
|
||||
"mesh": {
|
||||
Port: 20000,
|
||||
Protocol: pbcatalog.Protocol_PROTOCOL_MESH,
|
||||
},
|
||||
},
|
||||
Addresses: []*pbcatalog.WorkloadAddress{
|
||||
{
|
||||
Host: "10.1.1.1",
|
||||
Ports: []string{"mesh"},
|
||||
},
|
||||
{
|
||||
Host: "10.2.2.2",
|
||||
Ports: []string{"mesh"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}}).
|
||||
WithOwner(fooService.Id).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
fooBarService := resourcetest.Resource(catalog.ServiceType, "foo-bar-service").
|
||||
WithData(suite.T(), &pbcatalog.Service{}).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
fooBarEndpoints := resourcetest.Resource(catalog.ServiceEndpointsType, "foo-bar-service").
|
||||
WithData(suite.T(), &pbcatalog.ServiceEndpoints{Endpoints: []*pbcatalog.Endpoint{
|
||||
{
|
||||
Ports: map[string]*pbcatalog.WorkloadPort{
|
||||
"mesh": {
|
||||
Port: 20000,
|
||||
Protocol: pbcatalog.Protocol_PROTOCOL_MESH,
|
||||
},
|
||||
},
|
||||
Addresses: []*pbcatalog.WorkloadAddress{
|
||||
{
|
||||
Host: "10.3.3.3",
|
||||
Ports: []string{"mesh"},
|
||||
},
|
||||
{
|
||||
Host: "10.4.4.4",
|
||||
Ports: []string{"mesh"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}}).
|
||||
WithOwner(fooBarService.Id).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
fooRequiredEndpoints := make(map[string]*pbproxystate.EndpointRef)
|
||||
fooRequiredEndpoints["test-cluster-1"] = &pbproxystate.EndpointRef{
|
||||
Id: fooEndpoints.Id,
|
||||
Port: "mesh",
|
||||
}
|
||||
fooRequiredEndpoints["test-cluster-2"] = &pbproxystate.EndpointRef{
|
||||
Id: fooBarEndpoints.Id,
|
||||
Port: "mesh",
|
||||
}
|
||||
|
||||
barRequiredEndpoints := make(map[string]*pbproxystate.EndpointRef)
|
||||
barRequiredEndpoints["test-cluster-1"] = &pbproxystate.EndpointRef{
|
||||
Id: fooBarEndpoints.Id,
|
||||
// Sidecar proxy controller will usually set mesh port here.
|
||||
Port: "mesh",
|
||||
}
|
||||
|
||||
fooProxyStateTemplate := resourcetest.Resource(types.ProxyStateTemplateType, "foo-pst").
|
||||
WithData(suite.T(), &pbmesh.ProxyStateTemplate{
|
||||
// Contains the foo and foobar endpoints.
|
||||
RequiredEndpoints: fooRequiredEndpoints,
|
||||
ProxyState: &pbmesh.ProxyState{},
|
||||
}).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
retry.Run(suite.T(), func(r *retry.R) {
|
||||
suite.client.RequireResourceExists(r, fooProxyStateTemplate.Id)
|
||||
})
|
||||
|
||||
barProxyStateTemplate := resourcetest.Resource(types.ProxyStateTemplateType, "bar-pst").
|
||||
WithData(suite.T(), &pbmesh.ProxyStateTemplate{
|
||||
// Contains the foobar endpoint.
|
||||
RequiredEndpoints: barRequiredEndpoints,
|
||||
ProxyState: &pbmesh.ProxyState{},
|
||||
}).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
retry.Run(suite.T(), func(r *retry.R) {
|
||||
suite.client.RequireResourceExists(r, barProxyStateTemplate.Id)
|
||||
})
|
||||
|
||||
expectedFooProxyStateEndpoints := map[string]*pbproxystate.Endpoints{
|
||||
"test-cluster-1": {Endpoints: []*pbproxystate.Endpoint{
|
||||
{
|
||||
Address: &pbproxystate.Endpoint_HostPort{
|
||||
HostPort: &pbproxystate.HostPortAddress{
|
||||
Host: "10.1.1.1",
|
||||
Port: 20000,
|
||||
},
|
||||
},
|
||||
HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY,
|
||||
},
|
||||
{
|
||||
Address: &pbproxystate.Endpoint_HostPort{
|
||||
HostPort: &pbproxystate.HostPortAddress{
|
||||
Host: "10.2.2.2",
|
||||
Port: 20000,
|
||||
},
|
||||
},
|
||||
HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY,
|
||||
},
|
||||
}},
|
||||
"test-cluster-2": {Endpoints: []*pbproxystate.Endpoint{
|
||||
{
|
||||
Address: &pbproxystate.Endpoint_HostPort{
|
||||
HostPort: &pbproxystate.HostPortAddress{
|
||||
Host: "10.3.3.3",
|
||||
Port: 20000,
|
||||
},
|
||||
},
|
||||
HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY,
|
||||
},
|
||||
{
|
||||
Address: &pbproxystate.Endpoint_HostPort{
|
||||
HostPort: &pbproxystate.HostPortAddress{
|
||||
Host: "10.4.4.4",
|
||||
Port: 20000,
|
||||
},
|
||||
},
|
||||
HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY,
|
||||
},
|
||||
}},
|
||||
}
|
||||
|
||||
expectedBarProxyStateEndpoints := map[string]*pbproxystate.Endpoints{
|
||||
"test-cluster-1": {Endpoints: []*pbproxystate.Endpoint{
|
||||
{
|
||||
Address: &pbproxystate.Endpoint_HostPort{
|
||||
HostPort: &pbproxystate.HostPortAddress{
|
||||
Host: "10.3.3.3",
|
||||
Port: 20000,
|
||||
},
|
||||
},
|
||||
HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY,
|
||||
},
|
||||
{
|
||||
Address: &pbproxystate.Endpoint_HostPort{
|
||||
HostPort: &pbproxystate.HostPortAddress{
|
||||
Host: "10.4.4.4",
|
||||
Port: 20000,
|
||||
},
|
||||
},
|
||||
HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY,
|
||||
},
|
||||
}},
|
||||
}
|
||||
|
||||
suite.fooProxyStateTemplate = fooProxyStateTemplate
|
||||
suite.barProxyStateTemplate = barProxyStateTemplate
|
||||
suite.barEndpointRefs = barRequiredEndpoints
|
||||
suite.fooEndpointRefs = fooRequiredEndpoints
|
||||
suite.fooEndpoints = fooEndpoints
|
||||
suite.fooService = fooService
|
||||
suite.fooBarEndpoints = fooBarEndpoints
|
||||
suite.fooBarService = fooBarService
|
||||
suite.expectedFooProxyStateEndpoints = expectedFooProxyStateEndpoints
|
||||
suite.expectedBarProxyStateEndpoints = expectedBarProxyStateEndpoints
|
||||
}
|
||||
|
||||
func TestXdsController(t *testing.T) {
|
||||
suite.Run(t, new(xdsControllerTestSuite))
|
||||
}
|
72
internal/mesh/internal/controllers/xds/endpoint_builder.go
Normal file
72
internal/mesh/internal/controllers/xds/endpoint_builder.go
Normal file
@ -0,0 +1,72 @@
|
||||
package xds
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"golang.org/x/exp/slices"
|
||||
|
||||
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"
|
||||
"github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate"
|
||||
)
|
||||
|
||||
func generateProxyStateEndpoints(serviceEndpoints *ServiceEndpointsData, portName string) (*pbproxystate.Endpoints, error) {
|
||||
var psEndpoints []*pbproxystate.Endpoint
|
||||
|
||||
if serviceEndpoints.Endpoints == nil || serviceEndpoints.Resource == nil {
|
||||
return nil, fmt.Errorf("service endpoints requires both endpoints and resource")
|
||||
}
|
||||
eps := serviceEndpoints.Endpoints.GetEndpoints()
|
||||
|
||||
for _, ep := range eps {
|
||||
for _, addr := range ep.Addresses {
|
||||
// Check if the address is using the portName name this proxy state endpoints is for. If it does, create the
|
||||
// endpoint.
|
||||
if slices.Contains(addr.Ports, portName) {
|
||||
// Lookup the portName number from the portName name.
|
||||
wlPort, ok := ep.Ports[portName]
|
||||
if !ok {
|
||||
// This should never happen, as it should be validated by the ServiceEndpoints controller.
|
||||
return nil, fmt.Errorf("could not find portName %q in endpoint %s", portName, serviceEndpoints.Resource.Id)
|
||||
}
|
||||
portNum := wlPort.Port
|
||||
|
||||
psEndpoint, err := createProxyStateEndpoint(addr.Host, portNum, ep.HealthStatus)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
psEndpoints = append(psEndpoints, psEndpoint)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &pbproxystate.Endpoints{Endpoints: psEndpoints}, nil
|
||||
}
|
||||
|
||||
func createProxyStateEndpoint(host string, port uint32, health pbcatalog.Health) (*pbproxystate.Endpoint, error) {
|
||||
addr := net.ParseIP(host)
|
||||
if addr == nil {
|
||||
return nil, fmt.Errorf("host is not an ip")
|
||||
}
|
||||
|
||||
psEndpoint := &pbproxystate.Endpoint{
|
||||
Address: &pbproxystate.Endpoint_HostPort{
|
||||
HostPort: &pbproxystate.HostPortAddress{
|
||||
Host: host,
|
||||
Port: port,
|
||||
},
|
||||
},
|
||||
HealthStatus: endpointHealth(health),
|
||||
// TODO(xds): Weight will be added later. More information is potentially needed in the reference.
|
||||
}
|
||||
return psEndpoint, nil
|
||||
}
|
||||
|
||||
func endpointHealth(catalogHealth pbcatalog.Health) pbproxystate.HealthStatus {
|
||||
health := pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY
|
||||
|
||||
if catalogHealth == pbcatalog.Health_HEALTH_CRITICAL {
|
||||
health = pbproxystate.HealthStatus_HEALTH_STATUS_UNHEALTHY
|
||||
}
|
||||
return health
|
||||
}
|
233
internal/mesh/internal/controllers/xds/endpoint_builder_test.go
Normal file
233
internal/mesh/internal/controllers/xds/endpoint_builder_test.go
Normal file
@ -0,0 +1,233 @@
|
||||
package xds
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/internal/catalog"
|
||||
"github.com/hashicorp/consul/internal/resource/resourcetest"
|
||||
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"
|
||||
"github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate"
|
||||
"github.com/hashicorp/consul/proto/private/prototest"
|
||||
)
|
||||
|
||||
func TestMakeProxyStateEndpointsFromServiceEndpoints(t *testing.T) {
|
||||
type test struct {
|
||||
name string
|
||||
serviceEndpointsData *ServiceEndpointsData
|
||||
portName string
|
||||
expErr string
|
||||
expectedProxyStateEndpoints *pbproxystate.Endpoints
|
||||
}
|
||||
cases := []test{
|
||||
{
|
||||
name: "endpoints with passing health",
|
||||
serviceEndpointsData: serviceEndpointsData("passing"),
|
||||
portName: "mesh",
|
||||
expectedProxyStateEndpoints: &pbproxystate.Endpoints{
|
||||
Endpoints: []*pbproxystate.Endpoint{
|
||||
{
|
||||
Address: &pbproxystate.Endpoint_HostPort{
|
||||
HostPort: &pbproxystate.HostPortAddress{
|
||||
Host: "10.1.1.1",
|
||||
Port: 20000,
|
||||
},
|
||||
},
|
||||
HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY,
|
||||
},
|
||||
{
|
||||
Address: &pbproxystate.Endpoint_HostPort{
|
||||
HostPort: &pbproxystate.HostPortAddress{
|
||||
Host: "10.2.2.2",
|
||||
Port: 20000,
|
||||
},
|
||||
},
|
||||
HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY,
|
||||
},
|
||||
{
|
||||
Address: &pbproxystate.Endpoint_HostPort{
|
||||
HostPort: &pbproxystate.HostPortAddress{
|
||||
Host: "10.3.3.3",
|
||||
Port: 20000,
|
||||
},
|
||||
},
|
||||
HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "endpoints with critical health",
|
||||
serviceEndpointsData: serviceEndpointsData("critical"),
|
||||
portName: "mesh",
|
||||
expectedProxyStateEndpoints: &pbproxystate.Endpoints{
|
||||
Endpoints: []*pbproxystate.Endpoint{
|
||||
{
|
||||
Address: &pbproxystate.Endpoint_HostPort{
|
||||
HostPort: &pbproxystate.HostPortAddress{
|
||||
Host: "10.1.1.1",
|
||||
Port: 20000,
|
||||
},
|
||||
},
|
||||
HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_UNHEALTHY,
|
||||
},
|
||||
{
|
||||
Address: &pbproxystate.Endpoint_HostPort{
|
||||
HostPort: &pbproxystate.HostPortAddress{
|
||||
Host: "10.2.2.2",
|
||||
Port: 20000,
|
||||
},
|
||||
},
|
||||
HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_UNHEALTHY,
|
||||
},
|
||||
{
|
||||
Address: &pbproxystate.Endpoint_HostPort{
|
||||
HostPort: &pbproxystate.HostPortAddress{
|
||||
Host: "10.3.3.3",
|
||||
Port: 20000,
|
||||
},
|
||||
},
|
||||
HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_UNHEALTHY,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "endpoints with any health are considered healthy",
|
||||
serviceEndpointsData: serviceEndpointsData("any"),
|
||||
portName: "mesh",
|
||||
expectedProxyStateEndpoints: &pbproxystate.Endpoints{
|
||||
Endpoints: []*pbproxystate.Endpoint{
|
||||
{
|
||||
Address: &pbproxystate.Endpoint_HostPort{
|
||||
HostPort: &pbproxystate.HostPortAddress{
|
||||
Host: "10.1.1.1",
|
||||
Port: 20000,
|
||||
},
|
||||
},
|
||||
HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY,
|
||||
},
|
||||
{
|
||||
Address: &pbproxystate.Endpoint_HostPort{
|
||||
HostPort: &pbproxystate.HostPortAddress{
|
||||
Host: "10.2.2.2",
|
||||
Port: 20000,
|
||||
},
|
||||
},
|
||||
HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY,
|
||||
},
|
||||
{
|
||||
Address: &pbproxystate.Endpoint_HostPort{
|
||||
HostPort: &pbproxystate.HostPortAddress{
|
||||
Host: "10.3.3.3",
|
||||
Port: 20000,
|
||||
},
|
||||
},
|
||||
HealthStatus: pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "endpoints with missing ports returns an error",
|
||||
serviceEndpointsData: serviceEndpointsData("missing port lookup"),
|
||||
portName: "mesh",
|
||||
expErr: "could not find portName",
|
||||
},
|
||||
{
|
||||
name: "nil endpoints returns an error",
|
||||
serviceEndpointsData: serviceEndpointsData("nil endpoints"),
|
||||
portName: "mesh",
|
||||
expErr: "service endpoints requires both endpoints and resource",
|
||||
},
|
||||
{
|
||||
name: "nil resource returns an error",
|
||||
serviceEndpointsData: serviceEndpointsData("nil resource"),
|
||||
portName: "mesh",
|
||||
expErr: "service endpoints requires both endpoints and resource",
|
||||
},
|
||||
{
|
||||
name: "portName doesn't exist in endpoints results in empty endpoints",
|
||||
serviceEndpointsData: serviceEndpointsData("passing"),
|
||||
portName: "does-not-exist",
|
||||
expectedProxyStateEndpoints: &pbproxystate.Endpoints{
|
||||
Endpoints: []*pbproxystate.Endpoint{},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
actualEndpoints, err := generateProxyStateEndpoints(tc.serviceEndpointsData, tc.portName)
|
||||
if tc.expErr != "" {
|
||||
require.ErrorContains(t, err, tc.expErr)
|
||||
} else {
|
||||
prototest.AssertDeepEqual(t, tc.expectedProxyStateEndpoints, actualEndpoints)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func serviceEndpointsData(variation string) *ServiceEndpointsData {
|
||||
r := resourcetest.Resource(catalog.ServiceEndpointsType, "test").Build()
|
||||
eps := &pbcatalog.ServiceEndpoints{
|
||||
Endpoints: []*pbcatalog.Endpoint{
|
||||
{
|
||||
Ports: map[string]*pbcatalog.WorkloadPort{
|
||||
"mesh": {
|
||||
Port: 20000,
|
||||
Protocol: pbcatalog.Protocol_PROTOCOL_MESH,
|
||||
},
|
||||
},
|
||||
Addresses: []*pbcatalog.WorkloadAddress{
|
||||
{
|
||||
Host: "10.1.1.1",
|
||||
Ports: []string{"mesh"},
|
||||
},
|
||||
{
|
||||
Host: "10.2.2.2",
|
||||
Ports: []string{"mesh"},
|
||||
},
|
||||
},
|
||||
HealthStatus: pbcatalog.Health_HEALTH_PASSING,
|
||||
},
|
||||
{
|
||||
Ports: map[string]*pbcatalog.WorkloadPort{
|
||||
"mesh": {
|
||||
Port: 20000,
|
||||
Protocol: pbcatalog.Protocol_PROTOCOL_MESH,
|
||||
},
|
||||
},
|
||||
Addresses: []*pbcatalog.WorkloadAddress{
|
||||
{
|
||||
Host: "10.3.3.3",
|
||||
Ports: []string{"mesh"},
|
||||
},
|
||||
},
|
||||
HealthStatus: pbcatalog.Health_HEALTH_PASSING,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
switch variation {
|
||||
case "passing":
|
||||
case "critical":
|
||||
eps.Endpoints[0].HealthStatus = pbcatalog.Health_HEALTH_CRITICAL
|
||||
eps.Endpoints[1].HealthStatus = pbcatalog.Health_HEALTH_CRITICAL
|
||||
case "any":
|
||||
eps.Endpoints[0].HealthStatus = pbcatalog.Health_HEALTH_ANY
|
||||
eps.Endpoints[1].HealthStatus = pbcatalog.Health_HEALTH_ANY
|
||||
case "missing port lookup":
|
||||
delete(eps.Endpoints[0].Ports, "mesh")
|
||||
case "nil endpoints":
|
||||
eps = nil
|
||||
case "nil resource":
|
||||
r = nil
|
||||
}
|
||||
|
||||
return &ServiceEndpointsData{
|
||||
Resource: r,
|
||||
Endpoints: eps,
|
||||
}
|
||||
}
|
90
internal/mesh/internal/controllers/xds/mock_updater.go
Normal file
90
internal/mesh/internal/controllers/xds/mock_updater.go
Normal file
@ -0,0 +1,90 @@
|
||||
package xds
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1"
|
||||
"github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate"
|
||||
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
// mockUpdater mocks the updater functions, and stores ProxyStates from calls to PushChange, so we can assert it was
|
||||
// computed correctly in the controller.
|
||||
type mockUpdater struct {
|
||||
lock sync.Mutex
|
||||
// latestPs is a map from a ProxyStateTemplate's id.Name in string form to the last computed ProxyState for that
|
||||
// ProxyStateTemplate.
|
||||
latestPs map[string]*pbmesh.ProxyState
|
||||
notConnected bool
|
||||
pushChangeError bool
|
||||
}
|
||||
|
||||
func NewMockUpdater() *mockUpdater {
|
||||
return &mockUpdater{
|
||||
latestPs: make(map[string]*pbmesh.ProxyState),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockUpdater) SetPushChangeErrorTrue() {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
m.pushChangeError = true
|
||||
}
|
||||
|
||||
func (m *mockUpdater) SetProxyAsNotConnected() {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
m.notConnected = true
|
||||
}
|
||||
|
||||
func (m *mockUpdater) PushChange(id *pbresource.ID, snapshot *pbmesh.ProxyState) error {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
if m.pushChangeError {
|
||||
return fmt.Errorf("mock push change error")
|
||||
} else {
|
||||
m.setUnsafe(id.Name, snapshot)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockUpdater) ProxyConnectedToServer(_ *pbresource.ID) bool {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
if m.notConnected {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (p *mockUpdater) Get(name string) *pbmesh.ProxyState {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
ps, ok := p.latestPs[name]
|
||||
if ok {
|
||||
return ps
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *mockUpdater) GetEndpoints(name string) map[string]*pbproxystate.Endpoints {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
ps, ok := p.latestPs[name]
|
||||
if ok {
|
||||
return ps.Endpoints
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *mockUpdater) Set(name string, ps *pbmesh.ProxyState) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
p.setUnsafe(name, ps)
|
||||
}
|
||||
|
||||
func (p *mockUpdater) setUnsafe(name string, ps *pbmesh.ProxyState) {
|
||||
p.latestPs[name] = ps
|
||||
}
|
@ -0,0 +1,58 @@
|
||||
package xds
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/hashicorp/consul/internal/controller"
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"
|
||||
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
type ServiceEndpointsData struct {
|
||||
Resource *pbresource.Resource
|
||||
Endpoints *pbcatalog.ServiceEndpoints
|
||||
}
|
||||
|
||||
type ProxyStateTemplateData struct {
|
||||
Resource *pbresource.Resource
|
||||
Template *pbmesh.ProxyStateTemplate
|
||||
}
|
||||
|
||||
// getServiceEndpoints will return a non-nil &ServiceEndpointsData unless there is an error.
|
||||
func getServiceEndpoints(ctx context.Context, rt controller.Runtime, id *pbresource.ID) (*ServiceEndpointsData, error) {
|
||||
rsp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: id})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var se pbcatalog.ServiceEndpoints
|
||||
err = rsp.Resource.Data.UnmarshalTo(&se)
|
||||
if err != nil {
|
||||
return nil, resource.NewErrDataParse(&se, err)
|
||||
}
|
||||
|
||||
return &ServiceEndpointsData{Resource: rsp.Resource, Endpoints: &se}, nil
|
||||
}
|
||||
|
||||
func getProxyStateTemplate(ctx context.Context, rt controller.Runtime, id *pbresource.ID) (*ProxyStateTemplateData, error) {
|
||||
rsp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: id})
|
||||
switch {
|
||||
case status.Code(err) == codes.NotFound:
|
||||
return nil, nil
|
||||
case err != nil:
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var pst pbmesh.ProxyStateTemplate
|
||||
err = rsp.Resource.Data.UnmarshalTo(&pst)
|
||||
if err != nil {
|
||||
return nil, resource.NewErrDataParse(&pst, err)
|
||||
}
|
||||
|
||||
return &ProxyStateTemplateData{Resource: rsp.Resource, Template: &pst}, nil
|
||||
}
|
92
internal/mesh/internal/controllers/xds/status/status.go
Normal file
92
internal/mesh/internal/controllers/xds/status/status.go
Normal file
@ -0,0 +1,92 @@
|
||||
package status
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/internal/controller"
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
const (
|
||||
StatusConditionProxyStateAccepted = "ProxyStateAccepted"
|
||||
StatusReasonNilProxyState = "ProxyStateNil"
|
||||
StatusReasonProxyStateReferencesComputed = "ProxyStateReferencesComputed"
|
||||
StatusReasonEndpointNotRead = "ProxyStateEndpointReferenceReadError"
|
||||
StatusReasonCreatingProxyStateEndpointsFailed = "ProxyStateEndpointsNotComputed"
|
||||
StatusReasonPushChangeFailed = "ProxyStatePushChangeFailed"
|
||||
)
|
||||
|
||||
func KeyFromID(id *pbresource.ID) string {
|
||||
return fmt.Sprintf("%s/%s/%s",
|
||||
resource.ToGVK(id.Type),
|
||||
resource.TenancyToString(id.Tenancy),
|
||||
id.Name)
|
||||
}
|
||||
|
||||
func ConditionAccepted() *pbresource.Condition {
|
||||
return &pbresource.Condition{
|
||||
Type: StatusConditionProxyStateAccepted,
|
||||
State: pbresource.Condition_STATE_TRUE,
|
||||
Reason: StatusReasonProxyStateReferencesComputed,
|
||||
Message: fmt.Sprintf("proxy state was computed and pushed."),
|
||||
}
|
||||
}
|
||||
func ConditionRejectedNilProxyState(pstRef string) *pbresource.Condition {
|
||||
return &pbresource.Condition{
|
||||
Type: StatusConditionProxyStateAccepted,
|
||||
State: pbresource.Condition_STATE_FALSE,
|
||||
Reason: StatusReasonNilProxyState,
|
||||
Message: fmt.Sprintf("nil proxy state is not valid %q.", pstRef),
|
||||
}
|
||||
}
|
||||
func ConditionRejectedErrorReadingEndpoints(endpointRef string, err string) *pbresource.Condition {
|
||||
return &pbresource.Condition{
|
||||
Type: StatusConditionProxyStateAccepted,
|
||||
State: pbresource.Condition_STATE_FALSE,
|
||||
Reason: StatusReasonEndpointNotRead,
|
||||
Message: fmt.Sprintf("error reading referenced service endpoints %q: %s", endpointRef, err),
|
||||
}
|
||||
}
|
||||
func ConditionRejectedCreatingProxyStateEndpoints(endpointRef string, err string) *pbresource.Condition {
|
||||
return &pbresource.Condition{
|
||||
Type: StatusConditionProxyStateAccepted,
|
||||
State: pbresource.Condition_STATE_FALSE,
|
||||
Reason: StatusReasonCreatingProxyStateEndpointsFailed,
|
||||
Message: fmt.Sprintf("could not create proxy state endpoints from service endpoints %q: %s", endpointRef, err),
|
||||
}
|
||||
}
|
||||
func ConditionRejectedPushChangeFailed(pstRef string) *pbresource.Condition {
|
||||
return &pbresource.Condition{
|
||||
Type: StatusConditionProxyStateAccepted,
|
||||
State: pbresource.Condition_STATE_FALSE,
|
||||
Reason: StatusReasonPushChangeFailed,
|
||||
Message: fmt.Sprintf("failed to push change for proxy state template %q", pstRef),
|
||||
}
|
||||
}
|
||||
|
||||
// WriteStatusIfChanged updates the ProxyStateTemplate status if it has changed.
|
||||
func WriteStatusIfChanged(ctx context.Context, rt controller.Runtime, res *pbresource.Resource, condition *pbresource.Condition) {
|
||||
newStatus := &pbresource.Status{
|
||||
ObservedGeneration: res.Generation,
|
||||
Conditions: []*pbresource.Condition{
|
||||
condition,
|
||||
},
|
||||
}
|
||||
// If the status is unchanged then we should return and avoid the unnecessary write
|
||||
const controllerName = "consul.io/xds-controller"
|
||||
if resource.EqualStatus(res.Status[controllerName], newStatus, false) {
|
||||
return
|
||||
} else {
|
||||
_, err := rt.Client.WriteStatus(ctx, &pbresource.WriteStatusRequest{
|
||||
Id: res.Id,
|
||||
Key: controllerName,
|
||||
Status: newStatus,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
rt.Logger.Error("error updating the proxy state template status", "error", err, "proxyStateTeamplate", res.Id)
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user