mesh: add computed destinations with a controller that computes them (#19067)

This commit adds a new type ComputedDestinations that will contain all destinations from any Destinations resources and will be name-aligned with a workload. This also adds an explicit-destinations controller that computes these resources.

This is needed to simplify the tracking we need to do currently in the sidecar-proxy controller and makes it easier to query all explicit destinations that apply to a workload.
This commit is contained in:
Iryna Shustava 2023-10-12 12:04:12 -06:00 committed by GitHub
parent 197bcd4164
commit ad06c96456
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1889 additions and 145 deletions

View File

@ -0,0 +1,306 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package explicitdestinations
import (
"context"
"fmt"
"sort"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh/internal/controllers/explicitdestinations/mapper"
"github.com/hashicorp/consul/internal/mesh/internal/types"
"github.com/hashicorp/consul/internal/resource"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
"github.com/hashicorp/consul/proto-public/pbresource"
)
const ControllerName = "consul.io/explicit-mapper-controller"
func Controller(mapper *mapper.Mapper) controller.Controller {
if mapper == nil {
panic("mapper is required")
}
return controller.ForType(pbmesh.ComputedExplicitDestinationsType).
WithWatch(pbmesh.DestinationsType, mapper.MapDestinations).
WithWatch(pbcatalog.WorkloadType, controller.ReplaceType(pbmesh.ComputedExplicitDestinationsType)).
WithWatch(pbcatalog.ServiceType, mapper.MapService).
WithWatch(pbmesh.ComputedRoutesType, mapper.MapComputedRoute).
WithReconciler(&reconciler{mapper: mapper})
}
type reconciler struct {
mapper *mapper.Mapper
}
func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req controller.Request) error {
rt.Logger = rt.Logger.With("controller", ControllerName, "id", req.ID)
// Look up the associated workload.
workloadID := resource.ReplaceType(pbcatalog.WorkloadType, req.ID)
workload, err := resource.GetDecodedResource[*pbcatalog.Workload](ctx, rt.Client, workloadID)
if err != nil {
rt.Logger.Error("error fetching workload", "error", err)
return err
}
// If workload is not found, the decoded resource will be nil.
if workload == nil || workload.GetResource() == nil || workload.GetData() == nil {
// When workload is not there, we don't need to manually delete the resource
// because it is owned by the workload. In this case, we skip reconcile
// because there's nothing for us to do.
rt.Logger.Trace("the corresponding workload does not exist", "id", workloadID)
r.mapper.UntrackComputedExplicitDestinations(req.ID)
return nil
}
// Get existing ComputedExplicitDestinations resource (if any).
ced, err := resource.GetDecodedResource[*pbmesh.ComputedExplicitDestinations](ctx, rt.Client, req.ID)
if err != nil {
rt.Logger.Error("error fetching ComputedExplicitDestinations", "error", err)
return err
}
// If workload is not on the mesh, we need to delete the resource and return
// as for non-mesh workloads there should be no mapper.
if !workload.GetData().IsMeshEnabled() {
rt.Logger.Trace("workload is not on the mesh, skipping reconcile and deleting any corresponding ComputedDestinations", "id", workloadID)
r.mapper.UntrackComputedExplicitDestinations(req.ID)
// Delete CED only if it exists.
if ced != nil {
_, err = rt.Client.Delete(ctx, &pbresource.DeleteRequest{Id: req.ID})
if err != nil {
// If there's an error deleting CD, we want to re-trigger reconcile again.
rt.Logger.Error("error deleting ComputedDestinations", "error", err)
return err
}
}
// Otherwise, return as there's nothing else for us to do.
return nil
}
// Now get any mapper that we have in the cache that have selectors matching the name
// of this CD (name-aligned with workload).
destinationIDs := r.mapper.DestinationsForWorkload(req.ID)
rt.Logger.Trace("cached destinations IDs", "ids", destinationIDs)
decodedDestinations, err := r.fetchDestinations(ctx, rt.Client, destinationIDs)
if err != nil {
rt.Logger.Error("error fetching mapper", "error", err)
return err
}
if len(decodedDestinations) > 0 {
r.mapper.TrackDestinations(req.ID, decodedDestinations)
} else {
r.mapper.UntrackComputedExplicitDestinations(req.ID)
}
conflicts := findConflicts(decodedDestinations)
newComputedDestinationsData := &pbmesh.ComputedExplicitDestinations{}
for _, dst := range decodedDestinations {
updatedStatus := &pbresource.Status{
ObservedGeneration: dst.GetResource().GetGeneration(),
}
// First check if this resource has a conflict. If it does, update status and don't include it in the computed resource.
if _, ok := conflicts[resource.NewReferenceKey(dst.GetResource().GetId())]; ok {
rt.Logger.Trace("skipping this Destinations resource because it has conflicts with others", "id", dst.GetResource().GetId())
updatedStatus.Conditions = append(updatedStatus.Conditions, ConditionConflictFound(workload.GetResource().GetId()))
} else {
valid, cond := validate(ctx, rt.Client, dst)
// Only add it to computed mapper if its mapper are valid.
if valid {
newComputedDestinationsData.Destinations = append(newComputedDestinationsData.Destinations, dst.GetData().GetDestinations()...)
} else {
rt.Logger.Trace("Destinations is not valid", "condition", cond)
}
updatedStatus.Conditions = append(updatedStatus.Conditions, ConditionConflictNotFound, cond)
}
// Write status for this destination.
currentStatus := dst.GetResource().GetStatus()[ControllerName]
// If the status is unchanged then we should return and avoid the unnecessary write
if !resource.EqualStatus(currentStatus, updatedStatus, false) {
rt.Logger.Trace("updating status", "id", dst.GetResource().GetId())
_, err = rt.Client.WriteStatus(ctx, &pbresource.WriteStatusRequest{
Id: dst.GetResource().GetId(),
Key: ControllerName,
Status: updatedStatus,
})
if err != nil {
rt.Logger.Error("error writing new status", "id", dst.GetResource().GetId(), "error", err)
return err
}
}
}
// If after fetching and validating, we don't have any destinations,
// we need to skip reconcile and delete the resource.
if len(newComputedDestinationsData.GetDestinations()) == 0 {
rt.Logger.Trace("found no destinations associated with this workload")
if ced != nil {
rt.Logger.Trace("deleting ComputedDestinations")
_, err = rt.Client.Delete(ctx, &pbresource.DeleteRequest{Id: req.ID})
if err != nil {
// If there's an error deleting CD, we want to re-trigger reconcile again.
rt.Logger.Error("error deleting ComputedExplicitDestinations", "error", err)
return err
}
}
return nil
}
// Lastly, write the resource.
if ced == nil || !proto.Equal(ced.GetData(), newComputedDestinationsData) {
rt.Logger.Trace("writing new ComputedExplicitDestinations")
// First encode the endpoints data as an Any type.
cpcDataAsAny, err := anypb.New(newComputedDestinationsData)
if err != nil {
rt.Logger.Error("error marshalling latest ComputedExplicitDestinations", "error", err)
return err
}
_, err = rt.Client.Write(ctx, &pbresource.WriteRequest{
Resource: &pbresource.Resource{
Id: req.ID,
Owner: workloadID,
Data: cpcDataAsAny,
},
})
if err != nil {
rt.Logger.Error("error writing ComputedExplicitDestinations", "error", err)
return err
}
}
return nil
}
func validate(
ctx context.Context,
client pbresource.ResourceServiceClient,
destinations *types.DecodedDestinations) (bool, *pbresource.Condition) {
for _, dest := range destinations.GetData().GetDestinations() {
serviceRef := resource.ReferenceToString(dest.DestinationRef)
// Fetch and validate service.
service, err := resource.GetDecodedResource[*pbcatalog.Service](ctx, client, resource.IDFromReference(dest.DestinationRef))
if err != nil {
return false, ConditionDestinationServiceReadError(serviceRef)
}
if service == nil {
return false, ConditionDestinationServiceNotFound(serviceRef)
}
if !service.GetData().IsMeshEnabled() {
return false, ConditionMeshProtocolNotFound(serviceRef)
}
if service.GetData().FindServicePort(dest.DestinationPort) != nil &&
service.GetData().FindServicePort(dest.DestinationPort).Protocol == pbcatalog.Protocol_PROTOCOL_MESH {
return false, ConditionMeshProtocolDestinationPort(serviceRef, dest.DestinationPort)
}
// Fetch and validate computed routes for service.
serviceID := resource.IDFromReference(dest.DestinationRef)
cr, err := resource.GetDecodedResource[*pbmesh.ComputedRoutes](ctx, client, resource.ReplaceType(pbmesh.ComputedRoutesType, serviceID))
if err != nil {
return false, ConditionDestinationComputedRoutesReadErr(serviceRef)
}
if cr == nil {
return false, ConditionDestinationComputedRoutesNotFound(serviceRef)
}
_, ok := cr.Data.PortedConfigs[dest.DestinationPort]
if !ok {
return false, ConditionDestinationComputedRoutesPortNotFound(serviceRef, dest.DestinationPort)
}
// Otherwise, continue to the next destination.
}
return true, ConditionDestinationsAccepted()
}
func (r *reconciler) fetchDestinations(
ctx context.Context,
client pbresource.ResourceServiceClient,
destinationIDs []*pbresource.ID) ([]*types.DecodedDestinations, error) {
// Sort all configs alphabetically.
sort.Slice(destinationIDs, func(i, j int) bool {
return destinationIDs[i].GetName() < destinationIDs[j].GetName()
})
var decoded []*types.DecodedDestinations
for _, id := range destinationIDs {
res, err := resource.GetDecodedResource[*pbmesh.Destinations](ctx, client, id)
if err != nil {
return nil, err
}
if res == nil || res.GetResource() == nil || res.GetData() == nil {
// If resource is not found, we should untrack it.
r.mapper.UntrackDestinations(id)
continue
}
decoded = append(decoded, res)
}
return decoded, nil
}
// Find conflicts finds any resources where listen addresses of the destinations are conflicting.
// It will record both resources as conflicting in the resulting map.
func findConflicts(destinations []*types.DecodedDestinations) map[resource.ReferenceKey]struct{} {
addresses := make(map[string]*pbresource.ID)
duplicates := make(map[resource.ReferenceKey]struct{})
for _, decDestinations := range destinations {
for _, dst := range decDestinations.GetData().GetDestinations() {
var address string
switch dst.ListenAddr.(type) {
case *pbmesh.Destination_IpPort:
listenAddr := dst.GetListenAddr().(*pbmesh.Destination_IpPort)
address = fmt.Sprintf("%s:%d", listenAddr.IpPort.GetIp(), listenAddr.IpPort.GetPort())
case *pbmesh.Destination_Unix:
listenAddr := dst.GetListenAddr().(*pbmesh.Destination_Unix)
address = listenAddr.Unix.GetPath()
default:
continue
}
if id, ok := addresses[address]; ok {
// if there's already a listen address for one of the mapper, that means we've found a duplicate.
duplicates[resource.NewReferenceKey(decDestinations.GetResource().GetId())] = struct{}{}
// Also record the original resource as conflicting one.
duplicates[resource.NewReferenceKey(id)] = struct{}{}
// Don't evaluate the rest of mapper in this resource because this resource already has a duplicate.
break
} else {
// Otherwise, record this address.
addresses[address] = decDestinations.GetResource().GetId()
}
}
}
return duplicates
}

View File

@ -0,0 +1,846 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package explicitdestinations
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/protobuf/proto"
svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing"
"github.com/hashicorp/consul/internal/catalog"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh/internal/controllers/explicitdestinations/mapper"
"github.com/hashicorp/consul/internal/mesh/internal/controllers/routes/routestest"
"github.com/hashicorp/consul/internal/mesh/internal/types"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/resourcetest"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/proto/private/prototest"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
)
type controllerTestSuite struct {
suite.Suite
client *resourcetest.Client
runtime controller.Runtime
ctl *reconciler
ctx context.Context
workload *pbcatalog.Workload
workloadRes *pbresource.Resource
dest1 *pbmesh.Destinations
dest2 *pbmesh.Destinations
destService1 *pbresource.Resource
destService2 *pbresource.Resource
destService3 *pbresource.Resource
destService1Ref *pbresource.Reference
destService2Ref *pbresource.Reference
destService3Ref *pbresource.Reference
serviceData *pbcatalog.Service
destService1Routes *pbmesh.ComputedRoutes
destService2Routes *pbmesh.ComputedRoutes
destService3Routes *pbmesh.ComputedRoutes
expComputedDest *pbmesh.ComputedExplicitDestinations
}
func TestFindDuplicates(t *testing.T) {
// Create some conflicting destinations.
dest1 := &pbmesh.Destinations{
Workloads: &pbcatalog.WorkloadSelector{
Names: []string{"foo"},
},
Destinations: []*pbmesh.Destination{
{
ListenAddr: &pbmesh.Destination_IpPort{
IpPort: &pbmesh.IPPortAddress{
Ip: "127.0.0.1",
Port: 1000,
},
},
},
{
ListenAddr: &pbmesh.Destination_IpPort{
IpPort: &pbmesh.IPPortAddress{
Ip: "127.0.0.1",
Port: 2000,
},
},
},
},
}
dest2 := &pbmesh.Destinations{
Workloads: &pbcatalog.WorkloadSelector{
Names: []string{"foo"},
},
Destinations: []*pbmesh.Destination{
{
ListenAddr: &pbmesh.Destination_IpPort{
IpPort: &pbmesh.IPPortAddress{
Ip: "127.0.0.1",
Port: 1000,
},
},
},
},
}
dest3 := &pbmesh.Destinations{
Workloads: &pbcatalog.WorkloadSelector{
Names: []string{"foo"},
},
Destinations: []*pbmesh.Destination{
{
ListenAddr: &pbmesh.Destination_Unix{
Unix: &pbmesh.UnixSocketAddress{
Path: "/foo/bar",
},
},
},
},
}
dest4 := &pbmesh.Destinations{
Workloads: &pbcatalog.WorkloadSelector{
Names: []string{"foo"},
},
Destinations: []*pbmesh.Destination{
{
ListenAddr: &pbmesh.Destination_Unix{
Unix: &pbmesh.UnixSocketAddress{
Path: "/foo/bar",
},
},
},
},
}
destNonConflicting := &pbmesh.Destinations{
Workloads: &pbcatalog.WorkloadSelector{
Names: []string{"foo"},
},
Destinations: []*pbmesh.Destination{
{
ListenAddr: &pbmesh.Destination_IpPort{
IpPort: &pbmesh.IPPortAddress{
Ip: "127.0.0.1",
Port: 3000,
},
},
},
{
ListenAddr: &pbmesh.Destination_Unix{
Unix: &pbmesh.UnixSocketAddress{
Path: "/baz/bar",
},
},
},
},
}
var destinations []*types.DecodedDestinations
dest1Res := resourcetest.Resource(pbmesh.DestinationsType, "dest1").
WithData(t, dest1).
Build()
destinations = append(destinations, resourcetest.MustDecode[*pbmesh.Destinations](t, dest1Res))
dest2Res := resourcetest.Resource(pbmesh.DestinationsType, "dest2").
WithData(t, dest2).
Build()
destinations = append(destinations, resourcetest.MustDecode[*pbmesh.Destinations](t, dest2Res))
dest3Res := resourcetest.Resource(pbmesh.DestinationsType, "dest3").
WithData(t, dest3).
Build()
destinations = append(destinations, resourcetest.MustDecode[*pbmesh.Destinations](t, dest3Res))
dest4Res := resourcetest.Resource(pbmesh.DestinationsType, "dest4").
WithData(t, dest4).
Build()
destinations = append(destinations, resourcetest.MustDecode[*pbmesh.Destinations](t, dest4Res))
nonConflictingDestRes := resourcetest.Resource(pbmesh.DestinationsType, "nonConflictingDest").
WithData(t, destNonConflicting).
Build()
destinations = append(destinations, resourcetest.MustDecode[*pbmesh.Destinations](t, nonConflictingDestRes))
duplicates := findConflicts(destinations)
require.Contains(t, duplicates, resource.NewReferenceKey(dest1Res.Id))
require.Contains(t, duplicates, resource.NewReferenceKey(dest2Res.Id))
require.Contains(t, duplicates, resource.NewReferenceKey(dest3Res.Id))
require.Contains(t, duplicates, resource.NewReferenceKey(dest4Res.Id))
require.NotContains(t, duplicates, resource.NewReferenceKey(nonConflictingDestRes.Id))
}
func (suite *controllerTestSuite) SetupTest() {
resourceClient := svctest.RunResourceService(suite.T(), types.Register, catalog.RegisterTypes)
suite.client = resourcetest.NewClient(resourceClient)
suite.runtime = controller.Runtime{Client: resourceClient, Logger: testutil.Logger(suite.T())}
suite.ctx = testutil.TestContext(suite.T())
suite.ctl = &reconciler{
mapper: mapper.New(),
}
suite.workload = &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{{Host: "1.1.1.1"}},
Ports: map[string]*pbcatalog.WorkloadPort{
"tcp": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP},
"mesh": {Port: 20000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH},
},
Identity: "test",
}
suite.workloadRes = resourcetest.Resource(pbcatalog.WorkloadType, "test-workload").
WithData(suite.T(), suite.workload).
Write(suite.T(), suite.client)
suite.serviceData = &pbcatalog.Service{
Workloads: &pbcatalog.WorkloadSelector{Names: []string{"service-1-workloads"}},
Ports: []*pbcatalog.ServicePort{
{
TargetPort: "tcp",
Protocol: pbcatalog.Protocol_PROTOCOL_TCP,
},
{
TargetPort: "admin",
Protocol: pbcatalog.Protocol_PROTOCOL_TCP,
},
{
TargetPort: "mesh",
Protocol: pbcatalog.Protocol_PROTOCOL_MESH,
},
},
}
suite.destService1 = resourcetest.Resource(pbcatalog.ServiceType, "dest-service-1").
WithTenancy(resource.DefaultNamespacedTenancy()).
WithData(suite.T(), suite.serviceData).
Build()
suite.destService2 = resourcetest.Resource(pbcatalog.ServiceType, "dest-service-2").
WithTenancy(resource.DefaultNamespacedTenancy()).
WithData(suite.T(), suite.serviceData).
Build()
suite.destService3 = resourcetest.Resource(pbcatalog.ServiceType, "dest-service-3").
WithTenancy(resource.DefaultNamespacedTenancy()).
WithData(suite.T(), suite.serviceData).
Build()
suite.destService1Ref = resource.Reference(suite.destService1.Id, "")
suite.destService2Ref = resource.Reference(suite.destService2.Id, "")
suite.destService3Ref = resource.Reference(suite.destService3.Id, "")
suite.destService1Routes = routestest.BuildComputedRoutes(suite.T(), resource.ReplaceType(pbmesh.ComputedRoutesType, suite.destService1.Id),
resourcetest.MustDecode[*pbcatalog.Service](suite.T(), suite.destService1),
).GetData()
suite.destService2Routes = routestest.BuildComputedRoutes(suite.T(), resource.ReplaceType(pbmesh.ComputedRoutesType, suite.destService2.Id),
resourcetest.MustDecode[*pbcatalog.Service](suite.T(), suite.destService2),
).GetData()
suite.destService3Routes = routestest.BuildComputedRoutes(suite.T(), resource.ReplaceType(pbmesh.ComputedRoutesType, suite.destService3.Id),
resourcetest.MustDecode[*pbcatalog.Service](suite.T(), suite.destService3),
).GetData()
suite.dest1 = &pbmesh.Destinations{
Workloads: &pbcatalog.WorkloadSelector{
Names: []string{suite.workloadRes.Id.Name},
},
Destinations: []*pbmesh.Destination{
{
DestinationRef: suite.destService1Ref,
DestinationPort: "tcp",
ListenAddr: &pbmesh.Destination_IpPort{
IpPort: &pbmesh.IPPortAddress{
Ip: "127.0.0.1",
Port: 1000,
},
},
},
{
DestinationRef: suite.destService2Ref,
DestinationPort: "tcp",
ListenAddr: &pbmesh.Destination_IpPort{
IpPort: &pbmesh.IPPortAddress{
Ip: "127.0.0.1",
Port: 2000,
},
},
},
},
}
suite.dest2 = &pbmesh.Destinations{
Workloads: &pbcatalog.WorkloadSelector{
Prefixes: []string{"test-"},
},
Destinations: []*pbmesh.Destination{
{
DestinationRef: suite.destService3Ref,
DestinationPort: "tcp",
ListenAddr: &pbmesh.Destination_IpPort{
IpPort: &pbmesh.IPPortAddress{
Ip: "127.0.0.1",
Port: 3000,
},
},
},
{
DestinationRef: suite.destService2Ref,
DestinationPort: "admin",
ListenAddr: &pbmesh.Destination_IpPort{
IpPort: &pbmesh.IPPortAddress{
Ip: "127.0.0.1",
Port: 4000,
},
},
},
},
}
suite.expComputedDest = &pbmesh.ComputedExplicitDestinations{
Destinations: []*pbmesh.Destination{
{
DestinationRef: suite.destService1Ref,
DestinationPort: "tcp",
ListenAddr: &pbmesh.Destination_IpPort{
IpPort: &pbmesh.IPPortAddress{
Ip: "127.0.0.1",
Port: 1000,
},
},
},
{
DestinationRef: suite.destService2Ref,
DestinationPort: "tcp",
ListenAddr: &pbmesh.Destination_IpPort{
IpPort: &pbmesh.IPPortAddress{
Ip: "127.0.0.1",
Port: 2000,
},
},
},
{
DestinationRef: suite.destService3Ref,
DestinationPort: "tcp",
ListenAddr: &pbmesh.Destination_IpPort{
IpPort: &pbmesh.IPPortAddress{
Ip: "127.0.0.1",
Port: 3000,
},
},
},
{
DestinationRef: suite.destService2Ref,
DestinationPort: "admin",
ListenAddr: &pbmesh.Destination_IpPort{
IpPort: &pbmesh.IPPortAddress{
Ip: "127.0.0.1",
Port: 4000,
},
},
},
},
}
}
func (suite *controllerTestSuite) TestReconcile_NoWorkload() {
id := resourcetest.Resource(pbmesh.ComputedExplicitDestinationsType, "not-found").ID()
dest := resourcetest.Resource(pbmesh.DestinationsType, "dest1").
WithData(suite.T(), suite.dest1).
Build()
decDest := resourcetest.MustDecode[*pbmesh.Destinations](suite.T(), dest)
suite.ctl.mapper.TrackDestinations(id, []*types.DecodedDestinations{decDest})
err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{
ID: id,
})
require.NoError(suite.T(), err)
suite.client.RequireResourceNotFound(suite.T(), id)
// Check that we're not tracking services for this workload anymore.
reqs, err := suite.ctl.mapper.MapService(context.TODO(), controller.Runtime{}, suite.destService1)
require.NoError(suite.T(), err)
require.Nil(suite.T(), reqs)
reqs, err = suite.ctl.mapper.MapService(context.TODO(), controller.Runtime{}, suite.destService2)
require.NoError(suite.T(), err)
require.Nil(suite.T(), reqs)
}
func (suite *controllerTestSuite) TestReconcile_NonMeshWorkload() {
resourcetest.Resource(pbcatalog.WorkloadType, "non-mesh").
WithData(suite.T(), &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{{Host: "1.1.1.1"}},
Ports: map[string]*pbcatalog.WorkloadPort{
"tcp": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP},
},
}).
Write(suite.T(), suite.client)
cdID := resourcetest.Resource(pbmesh.ComputedExplicitDestinationsType, "non-mesh").
Write(suite.T(), suite.client).Id
dest := resourcetest.Resource(pbmesh.DestinationsType, "dest1").
WithData(suite.T(), suite.dest1).
Build()
decDest := resourcetest.MustDecode[*pbmesh.Destinations](suite.T(), dest)
suite.ctl.mapper.TrackDestinations(cdID, []*types.DecodedDestinations{decDest})
err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{
ID: cdID,
})
require.NoError(suite.T(), err)
suite.client.RequireResourceNotFound(suite.T(), cdID)
// Check that we're not tracking services for this workload anymore.
reqs, err := suite.ctl.mapper.MapService(context.TODO(), controller.Runtime{}, suite.destService1)
require.NoError(suite.T(), err)
require.Nil(suite.T(), reqs)
reqs, err = suite.ctl.mapper.MapService(context.TODO(), controller.Runtime{}, suite.destService2)
require.NoError(suite.T(), err)
require.Nil(suite.T(), reqs)
}
func (suite *controllerTestSuite) writeServices(t *testing.T) {
// Write all services.
resourcetest.Resource(pbcatalog.ServiceType, suite.destService1Ref.Name).
WithData(t, suite.serviceData).
Write(t, suite.client)
resourcetest.Resource(pbcatalog.ServiceType, suite.destService2Ref.Name).
WithData(t, suite.serviceData).
Write(t, suite.client)
resourcetest.Resource(pbcatalog.ServiceType, suite.destService3Ref.Name).
WithData(t, suite.serviceData).
Write(t, suite.client)
}
func (suite *controllerTestSuite) writeComputedRoutes(t *testing.T) {
// Write computed routes
resourcetest.Resource(pbmesh.ComputedRoutesType, suite.destService1Ref.Name).
WithData(t, suite.destService1Routes).
Write(t, suite.client)
resourcetest.Resource(pbmesh.ComputedRoutesType, suite.destService2Ref.Name).
WithData(t, suite.destService2Routes).
Write(t, suite.client)
resourcetest.Resource(pbmesh.ComputedRoutesType, suite.destService3Ref.Name).
WithData(t, suite.destService3Routes).
Write(t, suite.client)
}
func (suite *controllerTestSuite) TestReconcile_HappyPath() {
// Add configs in reverse alphabetical order.
d2 := resourcetest.Resource(pbmesh.DestinationsType, "dest2").
WithData(suite.T(), suite.dest2).
Write(suite.T(), suite.client)
_, err := suite.ctl.mapper.MapDestinations(suite.ctx, suite.runtime, d2)
require.NoError(suite.T(), err)
d1 := resourcetest.Resource(pbmesh.DestinationsType, "dest1").
WithData(suite.T(), suite.dest1).
Write(suite.T(), suite.client)
_, err = suite.ctl.mapper.MapDestinations(suite.ctx, suite.runtime, d1)
require.NoError(suite.T(), err)
suite.writeServices(suite.T())
suite.writeComputedRoutes(suite.T())
cdID := resource.ReplaceType(pbmesh.ComputedExplicitDestinationsType, suite.workloadRes.Id)
err = suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{
ID: cdID,
})
require.NoError(suite.T(), err)
suite.requireComputedDestinations(suite.T(), cdID)
suite.client.RequireStatusCondition(suite.T(), d1.Id, ControllerName, ConditionDestinationsAccepted())
}
func (suite *controllerTestSuite) TestReconcile_NoDestinations() {
dest := resourcetest.Resource(pbmesh.DestinationsType, "dest").
WithData(suite.T(), suite.dest1).
Build()
_, err := suite.ctl.mapper.MapDestinations(suite.ctx, suite.runtime, dest)
require.NoError(suite.T(), err)
cdID := resourcetest.Resource(pbmesh.ComputedExplicitDestinationsType, suite.workloadRes.Id.Name).
Write(suite.T(), suite.client).Id
err = suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{
ID: cdID,
})
require.NoError(suite.T(), err)
suite.client.RequireResourceNotFound(suite.T(), cdID)
}
func (suite *controllerTestSuite) TestReconcile_AllDestinationsInvalid() {
// We add a destination with services refs that don't exist which should result
// in computed mapper being deleted because all mapper are invalid.
dest := resourcetest.Resource(pbmesh.DestinationsType, "dest").
WithData(suite.T(), suite.dest1).
Write(suite.T(), suite.client)
_, err := suite.ctl.mapper.MapDestinations(suite.ctx, suite.runtime, dest)
require.NoError(suite.T(), err)
cdID := resourcetest.Resource(pbmesh.ComputedExplicitDestinationsType, suite.workloadRes.Id.Name).
Write(suite.T(), suite.client).Id
err = suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{
ID: cdID,
})
require.NoError(suite.T(), err)
suite.client.RequireResourceNotFound(suite.T(), cdID)
}
func (suite *controllerTestSuite) TestReconcile_StatusUpdate_ConflictingDestination() {
dest1 := resourcetest.Resource(pbmesh.DestinationsType, "dest1").
WithData(suite.T(), suite.dest1).
Write(suite.T(), suite.client)
_, err := suite.ctl.mapper.MapDestinations(suite.ctx, suite.runtime, dest1)
require.NoError(suite.T(), err)
// Write a conflicting destinations resource.
destData := proto.Clone(suite.dest2).(*pbmesh.Destinations)
destData.Destinations[0] = suite.dest1.Destinations[0]
dest2 := resourcetest.Resource(pbmesh.DestinationsType, "dest2").
WithData(suite.T(), destData).
Write(suite.T(), suite.client)
_, err = suite.ctl.mapper.MapDestinations(suite.ctx, suite.runtime, dest2)
require.NoError(suite.T(), err)
cdID := resourcetest.Resource(pbmesh.ComputedExplicitDestinationsType, suite.workloadRes.Id.Name).
Write(suite.T(), suite.client).Id
err = suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{
ID: cdID,
})
require.NoError(suite.T(), err)
suite.client.RequireResourceNotFound(suite.T(), cdID)
// Expect that the status on both resource is updated showing conflict.
suite.client.RequireStatusCondition(suite.T(), dest1.Id, ControllerName,
ConditionConflictFound(suite.workloadRes.Id))
suite.client.RequireStatusCondition(suite.T(), dest2.Id, ControllerName,
ConditionConflictFound(suite.workloadRes.Id))
// Update dest2 back to have non-conflicting data.
dest2 = resourcetest.Resource(pbmesh.DestinationsType, "dest2").
WithData(suite.T(), suite.dest2).
Write(suite.T(), suite.client)
_, err = suite.ctl.mapper.MapDestinations(suite.ctx, suite.runtime, dest2)
require.NoError(suite.T(), err)
err = suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{
ID: cdID,
})
require.NoError(suite.T(), err)
// Expect status on both to be updated to say that there's no conflict.
suite.client.RequireStatusCondition(suite.T(), dest1.Id, ControllerName,
ConditionConflictNotFound)
suite.client.RequireStatusCondition(suite.T(), dest2.Id, ControllerName,
ConditionConflictNotFound)
}
func (suite *controllerTestSuite) TestReconcile_StatusUpdate_NoService() {
dest := resourcetest.Resource(pbmesh.DestinationsType, "dest").
WithData(suite.T(), suite.dest2).
Write(suite.T(), suite.client)
_, err := suite.ctl.mapper.MapDestinations(suite.ctx, suite.runtime, dest)
require.NoError(suite.T(), err)
cdID := resourcetest.Resource(pbmesh.ComputedExplicitDestinationsType, suite.workloadRes.Id.Name).
Write(suite.T(), suite.client).Id
err = suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{
ID: cdID,
})
require.NoError(suite.T(), err)
suite.client.RequireResourceNotFound(suite.T(), cdID)
suite.client.RequireStatusCondition(suite.T(), dest.Id, ControllerName,
ConditionDestinationServiceNotFound(resource.ReferenceToString(suite.destService3Ref)))
}
func (suite *controllerTestSuite) TestReconcile_StatusUpdate_ServiceNotOnMesh() {
dest := resourcetest.Resource(pbmesh.DestinationsType, "dest").
WithData(suite.T(), suite.dest2).
Write(suite.T(), suite.client)
_, err := suite.ctl.mapper.MapDestinations(suite.ctx, suite.runtime, dest)
require.NoError(suite.T(), err)
resourcetest.Resource(pbcatalog.ServiceType, suite.destService3Ref.Name).
WithData(suite.T(), &pbcatalog.Service{
Workloads: &pbcatalog.WorkloadSelector{Names: []string{suite.workloadRes.Id.Name}},
Ports: []*pbcatalog.ServicePort{
{
TargetPort: "tcp",
Protocol: pbcatalog.Protocol_PROTOCOL_TCP,
},
},
}).
Write(suite.T(), suite.client)
cdID := resourcetest.Resource(pbmesh.ComputedExplicitDestinationsType, suite.workloadRes.Id.Name).
Write(suite.T(), suite.client).Id
err = suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{
ID: cdID,
})
require.NoError(suite.T(), err)
suite.client.RequireResourceNotFound(suite.T(), cdID)
suite.client.RequireStatusCondition(suite.T(), dest.Id, ControllerName,
ConditionMeshProtocolNotFound(resource.ReferenceToString(suite.destService3Ref)))
}
func (suite *controllerTestSuite) TestReconcile_StatusUpdate_DestinationPortIsMesh() {
dest := resourcetest.Resource(pbmesh.DestinationsType, "dest").
WithData(suite.T(), suite.dest2).
Write(suite.T(), suite.client)
_, err := suite.ctl.mapper.MapDestinations(suite.ctx, suite.runtime, dest)
require.NoError(suite.T(), err)
resourcetest.Resource(pbcatalog.ServiceType, suite.destService3Ref.Name).
WithData(suite.T(), &pbcatalog.Service{
Workloads: &pbcatalog.WorkloadSelector{Names: []string{suite.workloadRes.Id.Name}},
Ports: []*pbcatalog.ServicePort{
{
TargetPort: "tcp",
Protocol: pbcatalog.Protocol_PROTOCOL_MESH,
},
},
}).
Write(suite.T(), suite.client)
cdID := resourcetest.Resource(pbmesh.ComputedExplicitDestinationsType, suite.workloadRes.Id.Name).
Write(suite.T(), suite.client).Id
err = suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{
ID: cdID,
})
require.NoError(suite.T(), err)
suite.client.RequireResourceNotFound(suite.T(), cdID)
suite.client.RequireStatusCondition(suite.T(), dest.Id, ControllerName,
ConditionMeshProtocolDestinationPort(resource.ReferenceToString(suite.destService3Ref), "tcp"))
}
func (suite *controllerTestSuite) TestReconcile_StatusUpdate_ComputedRoutesNotFound() {
dest := resourcetest.Resource(pbmesh.DestinationsType, "dest").
WithData(suite.T(), suite.dest2).
Write(suite.T(), suite.client)
_, err := suite.ctl.mapper.MapDestinations(suite.ctx, suite.runtime, dest)
require.NoError(suite.T(), err)
resourcetest.Resource(pbcatalog.ServiceType, suite.destService3Ref.Name).
WithData(suite.T(), &pbcatalog.Service{
Workloads: &pbcatalog.WorkloadSelector{Names: []string{suite.workloadRes.Id.Name}},
Ports: []*pbcatalog.ServicePort{
{
TargetPort: "tcp",
Protocol: pbcatalog.Protocol_PROTOCOL_TCP,
},
{
TargetPort: "mesh",
Protocol: pbcatalog.Protocol_PROTOCOL_MESH,
},
},
}).
Write(suite.T(), suite.client)
cdID := resourcetest.Resource(pbmesh.ComputedExplicitDestinationsType, suite.workloadRes.Id.Name).
Write(suite.T(), suite.client).Id
err = suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{
ID: cdID,
})
require.NoError(suite.T(), err)
suite.client.RequireResourceNotFound(suite.T(), cdID)
suite.client.RequireStatusCondition(suite.T(), dest.Id, ControllerName,
ConditionDestinationComputedRoutesNotFound(resource.ReferenceToString(suite.destService3Ref)))
}
func (suite *controllerTestSuite) TestReconcile_StatusUpdate_ComputedRoutesPortNotFound() {
dest := resourcetest.Resource(pbmesh.DestinationsType, "dest").
WithData(suite.T(), suite.dest2).
Write(suite.T(), suite.client)
_, err := suite.ctl.mapper.MapDestinations(suite.ctx, suite.runtime, dest)
require.NoError(suite.T(), err)
destService := resourcetest.Resource(pbcatalog.ServiceType, suite.destService3Ref.Name).
WithData(suite.T(), &pbcatalog.Service{
Workloads: &pbcatalog.WorkloadSelector{Names: []string{suite.workloadRes.Id.Name}},
Ports: []*pbcatalog.ServicePort{
{
TargetPort: "tcp",
Protocol: pbcatalog.Protocol_PROTOCOL_TCP,
},
{
TargetPort: "mesh",
Protocol: pbcatalog.Protocol_PROTOCOL_MESH,
},
},
}).
Write(suite.T(), suite.client)
resourcetest.Resource(pbmesh.ComputedRoutesType, destService.Id.Name).
WithData(suite.T(), &pbmesh.ComputedRoutes{
PortedConfigs: map[string]*pbmesh.ComputedPortRoutes{
"some-other-port": {
Protocol: pbcatalog.Protocol_PROTOCOL_HTTP,
Config: &pbmesh.ComputedPortRoutes_Http{
Http: &pbmesh.ComputedHTTPRoute{},
},
},
},
}).
Write(suite.T(), suite.client)
cdID := resourcetest.Resource(pbmesh.ComputedExplicitDestinationsType, suite.workloadRes.Id.Name).
Write(suite.T(), suite.client).Id
err = suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{
ID: cdID,
})
require.NoError(suite.T(), err)
suite.client.RequireResourceNotFound(suite.T(), cdID)
suite.client.RequireStatusCondition(suite.T(), dest.Id, ControllerName,
ConditionDestinationComputedRoutesPortNotFound(resource.ReferenceToString(suite.destService3Ref), "tcp"))
}
func (suite *controllerTestSuite) TestController() {
mgr := controller.NewManager(suite.client, suite.runtime.Logger)
m := mapper.New()
mgr.Register(Controller(m))
mgr.SetRaftLeader(true)
go mgr.Run(suite.ctx)
cdID := resource.ReplaceType(pbmesh.ComputedExplicitDestinationsType, suite.workloadRes.Id)
dest1 := resourcetest.Resource(pbmesh.DestinationsType, "dest1").
WithData(suite.T(), suite.dest1).
Write(suite.T(), suite.client)
// At this point, none of the services or routes yet exist and so we should see the status of the destinations
// resource to reflect that. The CED resource should not be created in this case.
testutil.RunStep(suite.T(), "check that destinations status is updated", func(t *testing.T) {
retry.Run(t, func(r *retry.R) {
serviceRef := resource.IDToString(suite.destService1.Id)
suite.client.WaitForStatusCondition(r, dest1.Id, ControllerName, ConditionDestinationServiceNotFound(serviceRef))
suite.client.RequireResourceNotFound(r, cdID)
})
})
dest2 := resourcetest.Resource(pbmesh.DestinationsType, "dest2").
WithData(suite.T(), suite.dest2).
Write(suite.T(), suite.client)
suite.writeServices(suite.T())
// After we write services, we expect another reconciliation to be kicked off to validate and find that there are no computed routes.
testutil.RunStep(suite.T(), "check that destinations status says that there are no computed routes", func(t *testing.T) {
retry.Run(t, func(r *retry.R) {
suite.client.WaitForStatusCondition(r, dest1.Id, ControllerName,
ConditionDestinationComputedRoutesNotFound(resource.IDToString(suite.destService1.Id)))
suite.client.WaitForStatusCondition(r, dest2.Id, ControllerName,
ConditionDestinationComputedRoutesNotFound(resource.IDToString(suite.destService3.Id)))
suite.client.RequireResourceNotFound(r, cdID)
})
})
// Now write computed routes to get a computed resource.
suite.writeComputedRoutes(suite.T())
testutil.RunStep(suite.T(), "computed destinations generation", func(t *testing.T) {
retry.Run(t, func(r *retry.R) {
suite.client.RequireResourceExists(r, cdID)
suite.requireComputedDestinations(r, cdID)
})
})
testutil.RunStep(suite.T(), "add another workload", func(t *testing.T) {
// Create another workload that will match only dest2.
matchingWorkload := resourcetest.Resource(pbcatalog.WorkloadType, "test-extra-workload").
WithData(t, suite.workload).
Write(t, suite.client)
matchingWorkloadCDID := resource.ReplaceType(pbmesh.ComputedExplicitDestinationsType, matchingWorkload.Id)
retry.Run(t, func(r *retry.R) {
suite.client.RequireResourceExists(r, cdID)
suite.requireComputedDestinations(r, cdID)
matchingWorkloadCD := suite.client.RequireResourceExists(r, matchingWorkloadCDID)
dec := resourcetest.MustDecode[*pbmesh.ComputedExplicitDestinations](r, matchingWorkloadCD)
prototest.AssertDeepEqual(r, suite.dest2.GetDestinations(), dec.GetData().GetDestinations())
})
})
testutil.RunStep(suite.T(), "update workload selector", func(t *testing.T) {
// Update workload selector to no point to some non-existing workload
updatedDestinations := proto.Clone(suite.dest2).(*pbmesh.Destinations)
updatedDestinations.Workloads = &pbcatalog.WorkloadSelector{
Names: []string{"other-workload"},
}
matchingWorkload := resourcetest.Resource(pbcatalog.WorkloadType, "other-workload").
WithData(t, suite.workload).
Write(t, suite.client)
matchingWorkloadCDID := resource.ReplaceType(pbmesh.ComputedExplicitDestinationsType, matchingWorkload.Id)
resourcetest.Resource(pbmesh.DestinationsType, "dest2").
WithData(suite.T(), updatedDestinations).
Write(suite.T(), suite.client)
retry.Run(t, func(r *retry.R) {
res := suite.client.RequireResourceExists(r, cdID)
// The "test-workload" computed destinations should now be updated to use only proxy dest1.
expDest := &pbmesh.ComputedExplicitDestinations{
Destinations: suite.dest1.Destinations,
}
dec := resourcetest.MustDecode[*pbmesh.ComputedExplicitDestinations](t, res)
prototest.AssertDeepEqual(r, expDest.GetDestinations(), dec.GetData().GetDestinations())
matchingWorkloadCD := suite.client.RequireResourceExists(r, matchingWorkloadCDID)
dec = resourcetest.MustDecode[*pbmesh.ComputedExplicitDestinations](r, matchingWorkloadCD)
prototest.AssertDeepEqual(r, suite.dest2.GetDestinations(), dec.GetData().GetDestinations())
})
})
// Delete all destinations.
suite.client.MustDelete(suite.T(), dest1.Id)
suite.client.MustDelete(suite.T(), dest2.Id)
testutil.RunStep(suite.T(), "all destinations are deleted", func(t *testing.T) {
retry.Run(t, func(r *retry.R) {
suite.client.RequireResourceNotFound(r, cdID)
})
})
}
func TestControllerSuite(t *testing.T) {
suite.Run(t, new(controllerTestSuite))
}
func (suite *controllerTestSuite) requireComputedDestinations(t resourcetest.T, id *pbresource.ID) {
cdRes := suite.client.RequireResourceExists(t, id)
decCD := resourcetest.MustDecode[*pbmesh.ComputedExplicitDestinations](t, cdRes)
prototest.AssertElementsMatch(t, suite.expComputedDest.GetDestinations(), decCD.Data.GetDestinations())
resourcetest.RequireOwner(t, cdRes, resource.ReplaceType(pbcatalog.WorkloadType, id), true)
}

View File

@ -0,0 +1,74 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package mapper
import (
"context"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh/internal/mappers/workloadselectionmapper"
"github.com/hashicorp/consul/internal/mesh/internal/types"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/mappers/bimapper"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
"github.com/hashicorp/consul/proto-public/pbresource"
)
type Mapper struct {
workloadSelectionMapper *workloadselectionmapper.Mapper[*pbmesh.Destinations]
serviceRefMapper *bimapper.Mapper
}
func New() *Mapper {
return &Mapper{
workloadSelectionMapper: workloadselectionmapper.New[*pbmesh.Destinations](pbmesh.ComputedExplicitDestinationsType),
serviceRefMapper: bimapper.New(pbmesh.ComputedExplicitDestinationsType, pbcatalog.ServiceType),
}
}
func (m *Mapper) MapService(_ context.Context, _ controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) {
serviceRef := resource.Reference(res.GetId(), "")
compDestinations := m.serviceRefMapper.ItemIDsForLink(serviceRef)
return controller.MakeRequests(pbmesh.ComputedExplicitDestinationsType, compDestinations), nil
}
func (m *Mapper) MapDestinations(ctx context.Context, rt controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) {
return m.workloadSelectionMapper.MapToComputedType(ctx, rt, res)
}
func (m *Mapper) MapComputedRoute(_ context.Context, _ controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) {
serviceID := resource.ReplaceType(pbcatalog.ServiceType, res.GetId())
serviceRef := resource.Reference(serviceID, "")
compDestinations := m.serviceRefMapper.ItemIDsForLink(serviceRef)
return controller.MakeRequests(pbmesh.ComputedExplicitDestinationsType, compDestinations), nil
}
func (m *Mapper) TrackDestinations(id *pbresource.ID, destinations []*types.DecodedDestinations) {
var links []resource.ReferenceOrID
for _, dst := range destinations {
for _, d := range dst.GetData().GetDestinations() {
links = append(links, d.DestinationRef)
}
}
m.serviceRefMapper.TrackItem(id, links)
}
func (m *Mapper) UntrackComputedExplicitDestinations(id *pbresource.ID) {
m.serviceRefMapper.UntrackItem(id)
}
func (m *Mapper) UntrackDestinations(id *pbresource.ID) {
m.workloadSelectionMapper.UntrackID(id)
}
func (m *Mapper) DestinationsForWorkload(id *pbresource.ID) []*pbresource.ID {
return m.workloadSelectionMapper.IDsForWorkload(id)
}

View File

@ -0,0 +1,121 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package explicitdestinations
import (
"fmt"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/proto-public/pbresource"
)
const (
StatusConditionDestinationsAccepted = "DestinationsAccepted"
StatusReasonMeshProtocolNotFound = "MeshPortProtocolNotFound"
StatusReasonMeshProtocolFound = "AllDestinationServicesValid"
StatusReasonMeshProtocolDestinationPort = "DestinationWithMeshPortProtocol"
StatusReasonDestinationServiceNotFound = "ServiceNotFound"
StatusReasonDestinationServiceReadError = "ServiceReadError"
StatusReasonDestinationComputedRoutesNotFound = "ComputedRoutesNotFound"
StatusReasonDestinationComputedRoutesReadError = "ComputedRoutesReadError"
StatusReasonDestinationComputedRoutesPortNotFound = "ComputedRoutesPortNotFound"
StatusConditionConflictFound = "ConflictFound"
StatusReasonDuplicateListenAddress = "ConflictingListenAddress"
StatusReasonNoDuplicateListenAddress = "AllListenAddressesAreUnique"
)
var ConditionConflictNotFound = &pbresource.Condition{
Type: StatusConditionConflictFound,
State: pbresource.Condition_STATE_FALSE,
Reason: StatusReasonNoDuplicateListenAddress,
Message: "All mapper have unique listen addresses.",
}
func ConditionConflictFound(workloadID *pbresource.ID) *pbresource.Condition {
return &pbresource.Condition{
Type: StatusConditionConflictFound,
State: pbresource.Condition_STATE_TRUE,
Reason: StatusReasonDuplicateListenAddress,
Message: fmt.Sprintf("Another Destinations resource selecting workload %q configures the same listen address as one of the mapper in this resource. "+
"This resource will be skipped.", resource.IDToString(workloadID)),
}
}
func ConditionMeshProtocolNotFound(serviceRef string) *pbresource.Condition {
return &pbresource.Condition{
Type: StatusConditionDestinationsAccepted,
State: pbresource.Condition_STATE_FALSE,
Reason: StatusReasonMeshProtocolNotFound,
Message: fmt.Sprintf("service %q cannot be referenced as a Destination because it's not mesh-enabled.", serviceRef),
}
}
func ConditionDestinationsAccepted() *pbresource.Condition {
return &pbresource.Condition{
Type: StatusConditionDestinationsAccepted,
State: pbresource.Condition_STATE_TRUE,
Reason: StatusReasonMeshProtocolFound,
Message: "all destination services are valid.",
}
}
func ConditionDestinationServiceNotFound(serviceRef string) *pbresource.Condition {
return &pbresource.Condition{
Type: StatusConditionDestinationsAccepted,
State: pbresource.Condition_STATE_FALSE,
Reason: StatusReasonDestinationServiceNotFound,
Message: fmt.Sprintf("service %q does not exist.", serviceRef),
}
}
func ConditionDestinationServiceReadError(serviceRef string) *pbresource.Condition {
return &pbresource.Condition{
Type: StatusConditionDestinationsAccepted,
State: pbresource.Condition_STATE_FALSE,
Reason: StatusReasonDestinationServiceReadError,
Message: fmt.Sprintf("error reading service %q", serviceRef),
}
}
func ConditionMeshProtocolDestinationPort(serviceRef, port string) *pbresource.Condition {
return &pbresource.Condition{
Type: StatusConditionDestinationsAccepted,
State: pbresource.Condition_STATE_FALSE,
Reason: StatusReasonMeshProtocolDestinationPort,
Message: fmt.Sprintf("destination port %q for service %q has PROTOCOL_MESH which is unsupported for destination services", port, serviceRef),
}
}
func ConditionDestinationComputedRoutesNotFound(serviceRef string) *pbresource.Condition {
return &pbresource.Condition{
Type: StatusConditionDestinationsAccepted,
State: pbresource.Condition_STATE_FALSE,
Reason: StatusReasonDestinationComputedRoutesNotFound,
Message: fmt.Sprintf("computed routes %q does not exist.", serviceRef),
}
}
func ConditionDestinationComputedRoutesReadErr(serviceRef string) *pbresource.Condition {
return &pbresource.Condition{
Type: StatusConditionDestinationsAccepted,
State: pbresource.Condition_STATE_FALSE,
Reason: StatusReasonDestinationComputedRoutesReadError,
Message: fmt.Sprintf("error reading computed routes for %q service.", serviceRef),
}
}
func ConditionDestinationComputedRoutesPortNotFound(serviceRef, port string) *pbresource.Condition {
return &pbresource.Condition{
Type: StatusConditionDestinationsAccepted,
State: pbresource.Condition_STATE_FALSE,
Reason: StatusReasonDestinationComputedRoutesPortNotFound,
Message: fmt.Sprintf("computed routes %q does not exist for port %q.", serviceRef, port),
}
}

View File

@ -10,7 +10,7 @@ import (
"google.golang.org/protobuf/types/known/anypb"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh/internal/controllers/proxyconfiguration/mapper"
"github.com/hashicorp/consul/internal/mesh/internal/mappers/workloadselectionmapper"
"github.com/hashicorp/consul/internal/mesh/internal/types"
"github.com/hashicorp/consul/internal/resource"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
@ -20,19 +20,19 @@ import (
const ControllerName = "consul.io/proxy-configuration-controller"
func Controller(proxyConfigMapper *mapper.Mapper) controller.Controller {
func Controller(proxyConfigMapper *workloadselectionmapper.Mapper[*pbmesh.ProxyConfiguration]) controller.Controller {
if proxyConfigMapper == nil {
panic("proxy config mapper is required")
}
return controller.ForType(pbmesh.ComputedProxyConfigurationType).
WithWatch(pbmesh.ProxyConfigurationType, proxyConfigMapper.MapProxyConfiguration).
WithWatch(pbmesh.ProxyConfigurationType, proxyConfigMapper.MapToComputedType).
WithWatch(pbcatalog.WorkloadType, controller.ReplaceType(pbmesh.ComputedProxyConfigurationType)).
WithReconciler(&reconciler{proxyConfigMapper: proxyConfigMapper})
}
type reconciler struct {
proxyConfigMapper *mapper.Mapper
proxyConfigMapper *workloadselectionmapper.Mapper[*pbmesh.ProxyConfiguration]
}
func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req controller.Request) error {
@ -83,7 +83,7 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
// Now get any proxy configurations IDs that we have in the cache that have selectors matching the name
// of this CPC (name-aligned with the workload).
proxyCfgIDs := r.proxyConfigMapper.ProxyConfigurationsForWorkload(req.ID)
proxyCfgIDs := r.proxyConfigMapper.IDsForWorkload(req.ID)
rt.Logger.Trace("cached proxy cfg IDs", "ids", proxyCfgIDs)
decodedProxyCfgs, err := r.fetchProxyConfigs(ctx, rt.Client, proxyCfgIDs)
@ -164,7 +164,7 @@ func (r *reconciler) fetchProxyConfigs(
}
if res == nil || res.GetResource() == nil || res.GetData() == nil {
// If resource is not found, we should untrack it.
r.proxyConfigMapper.UntrackProxyConfiguration(id)
r.proxyConfigMapper.UntrackID(id)
continue
}
decoded = append(decoded, res)

View File

@ -16,7 +16,7 @@ import (
svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing"
"github.com/hashicorp/consul/internal/catalog"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh/internal/controllers/proxyconfiguration/mapper"
"github.com/hashicorp/consul/internal/mesh/internal/mappers/workloadselectionmapper"
"github.com/hashicorp/consul/internal/mesh/internal/types"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/resourcetest"
@ -54,7 +54,7 @@ func (suite *controllerTestSuite) SetupTest() {
suite.ctx = testutil.TestContext(suite.T())
suite.ctl = &reconciler{
proxyConfigMapper: mapper.New(),
proxyConfigMapper: workloadselectionmapper.New[*pbmesh.ProxyConfiguration](pbmesh.ComputedProxyConfigurationType),
}
suite.workload = &pbcatalog.Workload{
@ -151,19 +151,19 @@ func (suite *controllerTestSuite) TestReconcile_HappyPath() {
pCfg1 := resourcetest.Resource(pbmesh.ProxyConfigurationType, "cfg1").
WithData(suite.T(), suite.proxyCfg1).
Write(suite.T(), suite.client)
_, err := suite.ctl.proxyConfigMapper.MapProxyConfiguration(suite.ctx, suite.runtime, pCfg1)
_, err := suite.ctl.proxyConfigMapper.MapToComputedType(suite.ctx, suite.runtime, pCfg1)
require.NoError(suite.T(), err)
pCfg2 := resourcetest.Resource(pbmesh.ProxyConfigurationType, "cfg2").
WithData(suite.T(), suite.proxyCfg2).
Write(suite.T(), suite.client)
_, err = suite.ctl.proxyConfigMapper.MapProxyConfiguration(suite.ctx, suite.runtime, pCfg2)
_, err = suite.ctl.proxyConfigMapper.MapToComputedType(suite.ctx, suite.runtime, pCfg2)
require.NoError(suite.T(), err)
pCfg3 := resourcetest.Resource(pbmesh.ProxyConfigurationType, "cfg3").
WithData(suite.T(), suite.proxyCfg3).
Write(suite.T(), suite.client)
_, err = suite.ctl.proxyConfigMapper.MapProxyConfiguration(suite.ctx, suite.runtime, pCfg3)
_, err = suite.ctl.proxyConfigMapper.MapToComputedType(suite.ctx, suite.runtime, pCfg3)
require.NoError(suite.T(), err)
cpcID := resource.ReplaceType(pbmesh.ComputedProxyConfigurationType, suite.workloadRes.Id)
@ -181,7 +181,7 @@ func (suite *controllerTestSuite) TestReconcile_NoProxyConfigs() {
pCfg1 := resourcetest.Resource(pbmesh.ProxyConfigurationType, "cfg1").
WithData(suite.T(), suite.proxyCfg1).
Build()
_, err := suite.ctl.proxyConfigMapper.MapProxyConfiguration(suite.ctx, suite.runtime, pCfg1)
_, err := suite.ctl.proxyConfigMapper.MapToComputedType(suite.ctx, suite.runtime, pCfg1)
require.NoError(suite.T(), err)
cpcID := resourcetest.Resource(pbmesh.ComputedProxyConfigurationType, suite.workloadRes.Id.Name).
@ -199,7 +199,7 @@ func (suite *controllerTestSuite) TestController() {
// Run the controller manager
mgr := controller.NewManager(suite.client, suite.runtime.Logger)
m := mapper.New()
m := workloadselectionmapper.New[*pbmesh.ProxyConfiguration](pbmesh.ComputedProxyConfigurationType)
mgr.Register(Controller(m))
mgr.SetRaftLeader(true)
go mgr.Run(suite.ctx)
@ -244,7 +244,6 @@ func (suite *controllerTestSuite) TestController() {
})
testutil.RunStep(suite.T(), "update proxy config selector", func(t *testing.T) {
t.Log("running update proxy config selector")
// Update proxy config selector to no longer select "test-workload"
updatedProxyCfg := proto.Clone(suite.proxyCfg2).(*pbmesh.ProxyConfiguration)
updatedProxyCfg.Workloads = &pbcatalog.WorkloadSelector{
@ -262,7 +261,7 @@ func (suite *controllerTestSuite) TestController() {
retry.Run(t, func(r *retry.R) {
res := suite.client.RequireResourceExists(r, cpcID)
// The "test-workload" computed traffic permissions should now be updated to use only proxy cfg 1 and 3.
// The "test-workload" computed proxy configurations should now be updated to use only proxy cfg 1 and 3.
expProxyCfg := &pbmesh.ComputedProxyConfiguration{
DynamicConfig: &pbmesh.DynamicConfig{
Mode: pbmesh.ProxyMode_PROXY_MODE_TRANSPARENT,

View File

@ -1,71 +0,0 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package mapper
import (
"context"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh/internal/mappers/common"
"github.com/hashicorp/consul/internal/resource/mappers/selectiontracker"
"github.com/hashicorp/consul/lib/stringslice"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
"github.com/hashicorp/consul/proto-public/pbresource"
)
type Mapper struct {
workloadSelectionTracker *selectiontracker.WorkloadSelectionTracker
}
func New() *Mapper {
return &Mapper{
workloadSelectionTracker: selectiontracker.New(),
}
}
// MapProxyConfiguration is responsible for mapping ProxyConfiguration resources to the corresponding ComputedProxyConfiguration
// resource which are name-aligned with the workload.
func (m *Mapper) MapProxyConfiguration(ctx context.Context, rt controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) {
var proxyConfig pbmesh.ProxyConfiguration
err := res.Data.UnmarshalTo(&proxyConfig)
if err != nil {
return nil, err
}
// First, we return any existing workloads that this proxy configuration selects.
// The number of selected workloads may change in the future, but for this even we
// only need to care about triggering reconcile requests for the current ones.
requests, err := common.MapSelector(ctx, rt.Client, pbmesh.ComputedProxyConfigurationType,
proxyConfig.GetWorkloads(), res.Id.Tenancy)
if err != nil {
return nil, err
}
// Then generate requests for any previously selected workloads.
prevSelector := m.workloadSelectionTracker.GetSelector(res.GetId())
if !(stringslice.Equal(prevSelector.GetNames(), proxyConfig.GetWorkloads().GetNames()) &&
stringslice.Equal(prevSelector.GetPrefixes(), proxyConfig.GetWorkloads().GetPrefixes())) {
// the selector is different, so we need to map those selectors as well.
requestsForPrevSelector, err := common.MapSelector(ctx, rt.Client, pbmesh.ComputedProxyConfigurationType,
prevSelector, res.Id.Tenancy)
if err != nil {
return nil, err
}
requests = append(requests, requestsForPrevSelector...)
}
// Second, we track this proxy configuration's selector and ID in the tracker.
m.workloadSelectionTracker.TrackIDForSelector(res.Id, proxyConfig.GetWorkloads())
return requests, nil
}
func (m *Mapper) ProxyConfigurationsForWorkload(id *pbresource.ID) []*pbresource.ID {
return m.workloadSelectionTracker.GetIDsForWorkload(id)
}
func (m *Mapper) UntrackProxyConfiguration(id *pbresource.ID) {
m.workloadSelectionTracker.UntrackID(id)
}

View File

@ -9,12 +9,14 @@ import (
"github.com/hashicorp/consul/agent/leafcert"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh/internal/cache/sidecarproxycache"
"github.com/hashicorp/consul/internal/mesh/internal/controllers/explicitdestinations"
"github.com/hashicorp/consul/internal/mesh/internal/controllers/explicitdestinations/mapper"
"github.com/hashicorp/consul/internal/mesh/internal/controllers/proxyconfiguration"
"github.com/hashicorp/consul/internal/mesh/internal/controllers/proxyconfiguration/mapper"
"github.com/hashicorp/consul/internal/mesh/internal/controllers/routes"
"github.com/hashicorp/consul/internal/mesh/internal/controllers/sidecarproxy"
"github.com/hashicorp/consul/internal/mesh/internal/controllers/xds"
"github.com/hashicorp/consul/internal/mesh/internal/mappers/sidecarproxymapper"
"github.com/hashicorp/consul/internal/mesh/internal/mappers/workloadselectionmapper"
"github.com/hashicorp/consul/internal/resource/mappers/bimapper"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
@ -52,5 +54,6 @@ func Register(mgr *controller.Manager, deps Dependencies) {
mgr.Register(routes.Controller())
mgr.Register(proxyconfiguration.Controller(mapper.New()))
mgr.Register(proxyconfiguration.Controller(workloadselectionmapper.New[*pbmesh.ProxyConfiguration](pbmesh.ComputedProxyConfigurationType)))
mgr.Register(explicitdestinations.Controller(mapper.New()))
}

View File

@ -388,30 +388,30 @@ func (suite *meshControllerTestSuite) TestController() {
})
})
testutil.RunStep(suite.T(), "add explicit destinations and check that new proxy state is generated", func(t *testing.T) {
// Write a default ComputedRoutes for api.
routestest.ReconcileComputedRoutes(suite.T(), suite.client, apiComputedRoutesID,
resourcetest.MustDecode[*pbcatalog.Service](t, suite.apiService),
)
// Write a default ComputedRoutes for api.
routestest.ReconcileComputedRoutes(suite.T(), suite.client, apiComputedRoutesID,
resourcetest.MustDecode[*pbcatalog.Service](suite.T(), suite.apiService),
)
// Add a source service and check that a new proxy state is generated.
webDestinations = resourcetest.Resource(pbmesh.DestinationsType, "web-destinations").
WithData(suite.T(), &pbmesh.Destinations{
Workloads: &pbcatalog.WorkloadSelector{Names: []string{"web-def"}},
Destinations: []*pbmesh.Destination{
{
DestinationRef: resource.Reference(suite.apiService.Id, ""),
DestinationPort: "tcp",
ListenAddr: &pbmesh.Destination_IpPort{
IpPort: &pbmesh.IPPortAddress{
Ip: "127.0.0.1",
Port: 1234,
},
// Add a source service and check that a new proxy state is generated.
webDestinations = resourcetest.Resource(pbmesh.DestinationsType, "web-destinations").
WithData(suite.T(), &pbmesh.Destinations{
Workloads: &pbcatalog.WorkloadSelector{Names: []string{"web-def"}},
Destinations: []*pbmesh.Destination{
{
DestinationRef: resource.Reference(suite.apiService.Id, ""),
DestinationPort: "tcp",
ListenAddr: &pbmesh.Destination_IpPort{
IpPort: &pbmesh.IPPortAddress{
Ip: "127.0.0.1",
Port: 1234,
},
},
},
}).Write(suite.T(), suite.client)
},
}).Write(suite.T(), suite.client)
testutil.RunStep(suite.T(), "add explicit destinations and check that new proxy state is generated", func(t *testing.T) {
webProxyStateTemplate = suite.client.WaitForNewVersion(t, webProxyStateTemplateID, webProxyStateTemplate.Version)
requireExplicitDestinationsFound(t, "api", webProxyStateTemplate)

View File

@ -0,0 +1,88 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package workloadselectionmapper
import (
"context"
"google.golang.org/protobuf/proto"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh/internal/mappers/common"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/mappers/selectiontracker"
"github.com/hashicorp/consul/lib/stringslice"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
"github.com/hashicorp/consul/proto-public/pbresource"
)
// WorkloadSelecting denotes a resource type that uses workload selectors.
type WorkloadSelecting interface {
proto.Message
GetWorkloads() *pbcatalog.WorkloadSelector
}
type Mapper[T WorkloadSelecting] struct {
workloadSelectionTracker *selectiontracker.WorkloadSelectionTracker
computedType *pbresource.Type
}
func New[T WorkloadSelecting](computedType *pbresource.Type) *Mapper[T] {
if computedType == nil {
panic("computed type is required")
}
return &Mapper[T]{
workloadSelectionTracker: selectiontracker.New(),
computedType: computedType,
}
}
// MapToComputedType is responsible for mapping types with workload selectors to the corresponding computed type
// resources which are name-aligned with the workload. This function will also track workload selectors with the ids
// from the workload-selectable types in the mapper.
func (m *Mapper[T]) MapToComputedType(ctx context.Context, rt controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) {
dec, err := resource.Decode[T](res)
if err != nil {
return nil, err
}
// First, we return any existing workloads that this proxy configuration selects.
// The number of selected workloads may change in the future, but for this even we
// only need to care about triggering reconcile requests for the current ones.
requests, err := common.MapSelector(ctx, rt.Client, m.computedType,
dec.GetData().GetWorkloads(), res.Id.Tenancy)
if err != nil {
return nil, err
}
// Then generate requests for any previously selected workloads.
prevSelector := m.workloadSelectionTracker.GetSelector(res.GetId())
if !(stringslice.Equal(prevSelector.GetNames(), dec.GetData().GetWorkloads().GetNames()) &&
stringslice.Equal(prevSelector.GetPrefixes(), dec.GetData().GetWorkloads().GetPrefixes())) {
// the selector is different, so we need to map those selectors as well.
requestsForPrevSelector, err := common.MapSelector(ctx, rt.Client, m.computedType,
prevSelector, res.Id.Tenancy)
if err != nil {
return nil, err
}
requests = append(requests, requestsForPrevSelector...)
}
// Second, we track this proxy configuration's selector and ID in the tracker.
m.workloadSelectionTracker.TrackIDForSelector(res.Id, dec.GetData().GetWorkloads())
return requests, nil
}
// IDsForWorkload returns IDs of workload-selecting types that we're tracking for the
// given workload name.
func (m *Mapper[T]) IDsForWorkload(id *pbresource.ID) []*pbresource.ID {
return m.workloadSelectionTracker.GetIDsForWorkload(id)
}
// UntrackID removes tracking for the workload-selecting resource with the given ID.
func (m *Mapper[T]) UntrackID(id *pbresource.ID) {
m.workloadSelectionTracker.UntrackID(id)
}

View File

@ -1,7 +1,7 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package mapper
package workloadselectionmapper
import (
"context"
@ -21,9 +21,9 @@ import (
"github.com/hashicorp/consul/proto/private/prototest"
)
func TestMapProxyConfiguration(t *testing.T) {
func TestMapToComputedType(t *testing.T) {
resourceClient := svctest.RunResourceService(t, types.Register, catalog.RegisterTypes)
mapper := New()
mapper := New[*pbmesh.ProxyConfiguration](pbmesh.ComputedProxyConfigurationType)
workloadData := &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{{Host: "1.1.1.1"}},
@ -57,7 +57,7 @@ func TestMapProxyConfiguration(t *testing.T) {
Mode: pbmesh.ProxyMode_PROXY_MODE_TRANSPARENT,
},
}).Build()
reqs, err := mapper.MapProxyConfiguration(context.Background(), controller.Runtime{Client: resourceClient}, pCfg1)
reqs, err := mapper.MapToComputedType(context.Background(), controller.Runtime{Client: resourceClient}, pCfg1)
require.NoError(t, err)
prototest.AssertElementsMatch(t,
controller.MakeRequests(pbmesh.ComputedProxyConfigurationType, []*pbresource.ID{wID1, wID3, wID4}),
@ -75,23 +75,23 @@ func TestMapProxyConfiguration(t *testing.T) {
},
}).Build()
reqs, err = mapper.MapProxyConfiguration(context.Background(), controller.Runtime{Client: resourceClient}, pCfg2)
reqs, err = mapper.MapToComputedType(context.Background(), controller.Runtime{Client: resourceClient}, pCfg2)
require.NoError(t, err)
prototest.AssertElementsMatch(t,
controller.MakeRequests(pbmesh.ComputedProxyConfigurationType, []*pbresource.ID{wID1, wID2, wID3, wID4}),
reqs)
// Check mapper state for each workload.
ids := mapper.ProxyConfigurationsForWorkload(wID1)
ids := mapper.IDsForWorkload(wID1)
prototest.AssertElementsMatch(t, []*pbresource.ID{pCfg1.Id, pCfg2.Id}, ids)
ids = mapper.ProxyConfigurationsForWorkload(wID2)
ids = mapper.IDsForWorkload(wID2)
prototest.AssertElementsMatch(t, []*pbresource.ID{pCfg2.Id}, ids)
ids = mapper.ProxyConfigurationsForWorkload(wID3)
ids = mapper.IDsForWorkload(wID3)
prototest.AssertElementsMatch(t, []*pbresource.ID{pCfg1.Id, pCfg2.Id}, ids)
ids = mapper.ProxyConfigurationsForWorkload(wID4)
ids = mapper.IDsForWorkload(wID4)
prototest.AssertElementsMatch(t, []*pbresource.ID{pCfg1.Id, pCfg2.Id}, ids)
// Update pCfg2's selector and check that we generate requests for previous and new selector.
@ -106,37 +106,37 @@ func TestMapProxyConfiguration(t *testing.T) {
},
}).Build()
reqs, err = mapper.MapProxyConfiguration(context.Background(), controller.Runtime{Client: resourceClient}, pCfg2)
reqs, err = mapper.MapToComputedType(context.Background(), controller.Runtime{Client: resourceClient}, pCfg2)
require.NoError(t, err)
prototest.AssertElementsMatch(t,
controller.MakeRequests(pbmesh.ComputedProxyConfigurationType, []*pbresource.ID{wID4, wID1, wID2, wID3, wID4}),
reqs)
// Check mapper state for each workload.
ids = mapper.ProxyConfigurationsForWorkload(wID1)
ids = mapper.IDsForWorkload(wID1)
prototest.AssertElementsMatch(t, []*pbresource.ID{pCfg1.Id}, ids)
ids = mapper.ProxyConfigurationsForWorkload(wID2)
ids = mapper.IDsForWorkload(wID2)
prototest.AssertElementsMatch(t, []*pbresource.ID{}, ids)
ids = mapper.ProxyConfigurationsForWorkload(wID3)
ids = mapper.IDsForWorkload(wID3)
prototest.AssertElementsMatch(t, []*pbresource.ID{pCfg1.Id}, ids)
ids = mapper.ProxyConfigurationsForWorkload(wID4)
ids = mapper.IDsForWorkload(wID4)
prototest.AssertElementsMatch(t, []*pbresource.ID{pCfg1.Id, pCfg2.Id}, ids)
// Untrack one of the proxy cfgs and check that mapper is updated.
mapper.UntrackProxyConfiguration(pCfg1.Id)
mapper.UntrackID(pCfg1.Id)
ids = mapper.ProxyConfigurationsForWorkload(wID1)
ids = mapper.IDsForWorkload(wID1)
prototest.AssertElementsMatch(t, []*pbresource.ID{}, ids)
ids = mapper.ProxyConfigurationsForWorkload(wID2)
ids = mapper.IDsForWorkload(wID2)
prototest.AssertElementsMatch(t, []*pbresource.ID{}, ids)
ids = mapper.ProxyConfigurationsForWorkload(wID3)
ids = mapper.IDsForWorkload(wID3)
prototest.AssertElementsMatch(t, []*pbresource.ID{}, ids)
ids = mapper.ProxyConfigurationsForWorkload(wID4)
ids = mapper.IDsForWorkload(wID4)
prototest.AssertElementsMatch(t, []*pbresource.ID{pCfg2.Id}, ids)
}

View File

@ -0,0 +1,17 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package types
import (
"github.com/hashicorp/consul/internal/resource"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
)
func RegisterComputedExplicitDestinations(r resource.Registry) {
r.Register(resource.Registration{
Type: pbmesh.ComputedExplicitDestinationsType,
Proto: &pbmesh.ComputedExplicitDestinations{},
Scope: resource.ScopeNamespace,
})
}

View File

@ -13,17 +13,17 @@ import (
"github.com/hashicorp/consul/proto-public/pbresource"
)
func RegisterUpstreams(r resource.Registry) {
func RegisterDestinations(r resource.Registry) {
r.Register(resource.Registration{
Type: pbmesh.DestinationsType,
Proto: &pbmesh.Destinations{},
Scope: resource.ScopeNamespace,
Mutate: MutateUpstreams,
Validate: ValidateUpstreams,
Mutate: MutateDestinations,
Validate: ValidateDestinations,
})
}
func MutateUpstreams(res *pbresource.Resource) error {
func MutateDestinations(res *pbresource.Resource) error {
var destinations pbmesh.Destinations
if err := res.Data.UnmarshalTo(&destinations); err != nil {
@ -64,7 +64,7 @@ func isLocalPeer(p string) bool {
return p == "local" || p == ""
}
func ValidateUpstreams(res *pbresource.Resource) error {
func ValidateDestinations(res *pbresource.Resource) error {
var destinations pbmesh.Destinations
if err := res.Data.UnmarshalTo(&destinations); err != nil {

View File

@ -31,7 +31,7 @@ func TestMutateUpstreams(t *testing.T) {
WithData(t, tc.data).
Build()
err := MutateUpstreams(res)
err := MutateDestinations(res)
got := resourcetest.MustDecode[*pbmesh.Destinations](t, res)
@ -100,14 +100,14 @@ func TestValidateUpstreams(t *testing.T) {
Build()
if !tc.skipMutate {
require.NoError(t, MutateUpstreams(res))
require.NoError(t, MutateDestinations(res))
// Verify that mutate didn't actually change the object.
got := resourcetest.MustDecode[*pbmesh.Destinations](t, res)
prototest.AssertDeepEqual(t, tc.data, got.Data)
}
err := ValidateUpstreams(res)
err := ValidateDestinations(res)
// Verify that validate didn't actually change the object.
got := resourcetest.MustDecode[*pbmesh.Destinations](t, res)

View File

@ -10,7 +10,8 @@ import (
func Register(r resource.Registry) {
RegisterProxyConfiguration(r)
RegisterComputedProxyConfiguration(r)
RegisterUpstreams(r)
RegisterDestinations(r)
RegisterComputedExplicitDestinations(r)
RegisterUpstreamsConfiguration(r)
RegisterProxyStateTemplate(r)
RegisterHTTPRoute(r)

View File

@ -0,0 +1,19 @@
package catalogv2beta1
func (s *Service) IsMeshEnabled() bool {
for _, port := range s.GetPorts() {
if port.Protocol == Protocol_PROTOCOL_MESH {
return true
}
}
return false
}
func (s *Service) FindServicePort(name string) *ServicePort {
for _, port := range s.GetPorts() {
if port.TargetPort == name {
return port
}
}
return nil
}

View File

@ -0,0 +1,120 @@
package catalogv2beta1
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestIsMeshEnabled(t *testing.T) {
cases := map[string]struct {
service *Service
exp bool
}{
"nil": {service: nil, exp: false},
"no ports": {
service: &Service{},
exp: false,
},
"no mesh ports": {
service: &Service{
Ports: []*ServicePort{
{
TargetPort: "foo",
Protocol: Protocol_PROTOCOL_HTTP,
},
{
TargetPort: "bar",
Protocol: Protocol_PROTOCOL_TCP,
},
},
},
exp: false,
},
"with mesh ports": {
service: &Service{
Ports: []*ServicePort{
{
TargetPort: "foo",
Protocol: Protocol_PROTOCOL_HTTP,
},
{
TargetPort: "bar",
Protocol: Protocol_PROTOCOL_TCP,
},
{
TargetPort: "baz",
Protocol: Protocol_PROTOCOL_MESH,
},
},
},
exp: true,
},
}
for name, c := range cases {
t.Run(name, func(t *testing.T) {
require.Equal(t, c.exp, c.service.IsMeshEnabled())
})
}
}
func TestFindServicePort(t *testing.T) {
cases := map[string]struct {
service *Service
port string
exp *ServicePort
}{
"nil": {service: nil, port: "foo", exp: nil},
"no ports": {
service: &Service{},
port: "foo",
exp: nil,
},
"non-existing port": {
service: &Service{
Ports: []*ServicePort{
{
TargetPort: "foo",
Protocol: Protocol_PROTOCOL_HTTP,
},
{
TargetPort: "bar",
Protocol: Protocol_PROTOCOL_TCP,
},
},
},
port: "not-found",
exp: nil,
},
"existing port": {
service: &Service{
Ports: []*ServicePort{
{
TargetPort: "foo",
Protocol: Protocol_PROTOCOL_HTTP,
},
{
TargetPort: "bar",
Protocol: Protocol_PROTOCOL_TCP,
},
{
TargetPort: "baz",
Protocol: Protocol_PROTOCOL_MESH,
},
},
},
port: "bar",
exp: &ServicePort{
TargetPort: "bar",
Protocol: Protocol_PROTOCOL_TCP,
},
},
}
for name, c := range cases {
t.Run(name, func(t *testing.T) {
require.Equal(t, c.exp, c.service.FindServicePort(c.port))
})
}
}

View File

@ -0,0 +1,18 @@
// Code generated by protoc-gen-go-binary. DO NOT EDIT.
// source: pbmesh/v2beta1/computed_explicit_destinations.proto
package meshv2beta1
import (
"google.golang.org/protobuf/proto"
)
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *ComputedExplicitDestinations) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *ComputedExplicitDestinations) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}

View File

@ -0,0 +1,180 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.30.0
// protoc (unknown)
// source: pbmesh/v2beta1/computed_explicit_destinations.proto
package meshv2beta1
import (
_ "github.com/hashicorp/consul/proto-public/pbresource"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type ComputedExplicitDestinations struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// destinations is the list of explicit destinations to define for the selected workloads.
Destinations []*Destination `protobuf:"bytes,1,rep,name=destinations,proto3" json:"destinations,omitempty"`
}
func (x *ComputedExplicitDestinations) Reset() {
*x = ComputedExplicitDestinations{}
if protoimpl.UnsafeEnabled {
mi := &file_pbmesh_v2beta1_computed_explicit_destinations_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ComputedExplicitDestinations) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ComputedExplicitDestinations) ProtoMessage() {}
func (x *ComputedExplicitDestinations) ProtoReflect() protoreflect.Message {
mi := &file_pbmesh_v2beta1_computed_explicit_destinations_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ComputedExplicitDestinations.ProtoReflect.Descriptor instead.
func (*ComputedExplicitDestinations) Descriptor() ([]byte, []int) {
return file_pbmesh_v2beta1_computed_explicit_destinations_proto_rawDescGZIP(), []int{0}
}
func (x *ComputedExplicitDestinations) GetDestinations() []*Destination {
if x != nil {
return x.Destinations
}
return nil
}
var File_pbmesh_v2beta1_computed_explicit_destinations_proto protoreflect.FileDescriptor
var file_pbmesh_v2beta1_computed_explicit_destinations_proto_rawDesc = []byte{
0x0a, 0x33, 0x70, 0x62, 0x6d, 0x65, 0x73, 0x68, 0x2f, 0x76, 0x32, 0x62, 0x65, 0x74, 0x61, 0x31,
0x2f, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x65, 0x64, 0x5f, 0x65, 0x78, 0x70, 0x6c, 0x69, 0x63,
0x69, 0x74, 0x5f, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x1d, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70,
0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x32, 0x62,
0x65, 0x74, 0x61, 0x31, 0x1a, 0x21, 0x70, 0x62, 0x6d, 0x65, 0x73, 0x68, 0x2f, 0x76, 0x32, 0x62,
0x65, 0x74, 0x61, 0x31, 0x2f, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e,
0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1c, 0x70, 0x62, 0x72, 0x65, 0x73, 0x6f, 0x75,
0x72, 0x63, 0x65, 0x2f, 0x61, 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x76, 0x0a, 0x1c, 0x43, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x65,
0x64, 0x45, 0x78, 0x70, 0x6c, 0x69, 0x63, 0x69, 0x74, 0x44, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61,
0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x4e, 0x0a, 0x0c, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61,
0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2a, 0x2e, 0x68, 0x61,
0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x6d,
0x65, 0x73, 0x68, 0x2e, 0x76, 0x32, 0x62, 0x65, 0x74, 0x61, 0x31, 0x2e, 0x44, 0x65, 0x73, 0x74,
0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0c, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61,
0x74, 0x69, 0x6f, 0x6e, 0x73, 0x3a, 0x06, 0xa2, 0x93, 0x04, 0x02, 0x08, 0x03, 0x42, 0xa2, 0x02,
0x0a, 0x21, 0x63, 0x6f, 0x6d, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e,
0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x32, 0x62, 0x65,
0x74, 0x61, 0x31, 0x42, 0x21, 0x43, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x65, 0x64, 0x45, 0x78, 0x70,
0x6c, 0x69, 0x63, 0x69, 0x74, 0x44, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e,
0x73, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x43, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62,
0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63,
0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2d, 0x70, 0x75, 0x62, 0x6c,
0x69, 0x63, 0x2f, 0x70, 0x62, 0x6d, 0x65, 0x73, 0x68, 0x2f, 0x76, 0x32, 0x62, 0x65, 0x74, 0x61,
0x31, 0x3b, 0x6d, 0x65, 0x73, 0x68, 0x76, 0x32, 0x62, 0x65, 0x74, 0x61, 0x31, 0xa2, 0x02, 0x03,
0x48, 0x43, 0x4d, 0xaa, 0x02, 0x1d, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e,
0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x4d, 0x65, 0x73, 0x68, 0x2e, 0x56, 0x32, 0x62, 0x65,
0x74, 0x61, 0x31, 0xca, 0x02, 0x1d, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x5c,
0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x5c, 0x4d, 0x65, 0x73, 0x68, 0x5c, 0x56, 0x32, 0x62, 0x65,
0x74, 0x61, 0x31, 0xe2, 0x02, 0x29, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x5c,
0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x5c, 0x4d, 0x65, 0x73, 0x68, 0x5c, 0x56, 0x32, 0x62, 0x65,
0x74, 0x61, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea,
0x02, 0x20, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x3a, 0x3a, 0x43, 0x6f, 0x6e,
0x73, 0x75, 0x6c, 0x3a, 0x3a, 0x4d, 0x65, 0x73, 0x68, 0x3a, 0x3a, 0x56, 0x32, 0x62, 0x65, 0x74,
0x61, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_pbmesh_v2beta1_computed_explicit_destinations_proto_rawDescOnce sync.Once
file_pbmesh_v2beta1_computed_explicit_destinations_proto_rawDescData = file_pbmesh_v2beta1_computed_explicit_destinations_proto_rawDesc
)
func file_pbmesh_v2beta1_computed_explicit_destinations_proto_rawDescGZIP() []byte {
file_pbmesh_v2beta1_computed_explicit_destinations_proto_rawDescOnce.Do(func() {
file_pbmesh_v2beta1_computed_explicit_destinations_proto_rawDescData = protoimpl.X.CompressGZIP(file_pbmesh_v2beta1_computed_explicit_destinations_proto_rawDescData)
})
return file_pbmesh_v2beta1_computed_explicit_destinations_proto_rawDescData
}
var file_pbmesh_v2beta1_computed_explicit_destinations_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
var file_pbmesh_v2beta1_computed_explicit_destinations_proto_goTypes = []interface{}{
(*ComputedExplicitDestinations)(nil), // 0: hashicorp.consul.mesh.v2beta1.ComputedExplicitDestinations
(*Destination)(nil), // 1: hashicorp.consul.mesh.v2beta1.Destination
}
var file_pbmesh_v2beta1_computed_explicit_destinations_proto_depIdxs = []int32{
1, // 0: hashicorp.consul.mesh.v2beta1.ComputedExplicitDestinations.destinations:type_name -> hashicorp.consul.mesh.v2beta1.Destination
1, // [1:1] is the sub-list for method output_type
1, // [1:1] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
}
func init() { file_pbmesh_v2beta1_computed_explicit_destinations_proto_init() }
func file_pbmesh_v2beta1_computed_explicit_destinations_proto_init() {
if File_pbmesh_v2beta1_computed_explicit_destinations_proto != nil {
return
}
file_pbmesh_v2beta1_destinations_proto_init()
if !protoimpl.UnsafeEnabled {
file_pbmesh_v2beta1_computed_explicit_destinations_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ComputedExplicitDestinations); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pbmesh_v2beta1_computed_explicit_destinations_proto_rawDesc,
NumEnums: 0,
NumMessages: 1,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_pbmesh_v2beta1_computed_explicit_destinations_proto_goTypes,
DependencyIndexes: file_pbmesh_v2beta1_computed_explicit_destinations_proto_depIdxs,
MessageInfos: file_pbmesh_v2beta1_computed_explicit_destinations_proto_msgTypes,
}.Build()
File_pbmesh_v2beta1_computed_explicit_destinations_proto = out.File
file_pbmesh_v2beta1_computed_explicit_destinations_proto_rawDesc = nil
file_pbmesh_v2beta1_computed_explicit_destinations_proto_goTypes = nil
file_pbmesh_v2beta1_computed_explicit_destinations_proto_depIdxs = nil
}

View File

@ -0,0 +1,16 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
syntax = "proto3";
package hashicorp.consul.mesh.v2beta1;
import "pbmesh/v2beta1/destinations.proto";
import "pbresource/annotations.proto";
message ComputedExplicitDestinations {
option (hashicorp.consul.resource.spec) = {scope: SCOPE_NAMESPACE};
// destinations is the list of explicit destinations to define for the selected workloads.
repeated Destination destinations = 1;
}

View File

@ -10,19 +10,26 @@ const (
GroupName = "mesh"
Version = "v2beta1"
ComputedProxyConfigurationKind = "ComputedProxyConfiguration"
ComputedRoutesKind = "ComputedRoutes"
DestinationPolicyKind = "DestinationPolicy"
DestinationsKind = "Destinations"
DestinationsConfigurationKind = "DestinationsConfiguration"
GRPCRouteKind = "GRPCRoute"
HTTPRouteKind = "HTTPRoute"
ProxyConfigurationKind = "ProxyConfiguration"
ProxyStateTemplateKind = "ProxyStateTemplate"
TCPRouteKind = "TCPRoute"
ComputedExplicitDestinationsKind = "ComputedExplicitDestinations"
ComputedProxyConfigurationKind = "ComputedProxyConfiguration"
ComputedRoutesKind = "ComputedRoutes"
DestinationPolicyKind = "DestinationPolicy"
DestinationsKind = "Destinations"
DestinationsConfigurationKind = "DestinationsConfiguration"
GRPCRouteKind = "GRPCRoute"
HTTPRouteKind = "HTTPRoute"
ProxyConfigurationKind = "ProxyConfiguration"
ProxyStateTemplateKind = "ProxyStateTemplate"
TCPRouteKind = "TCPRoute"
)
var (
ComputedExplicitDestinationsType = &pbresource.Type{
Group: GroupName,
GroupVersion: Version,
Kind: ComputedExplicitDestinationsKind,
}
ComputedProxyConfigurationType = &pbresource.Type{
Group: GroupName,
GroupVersion: Version,