Add sameness group references in exported services controller (#20100)

This commit is contained in:
Ganesh S 2024-01-08 11:55:52 +05:30 committed by GitHub
parent c12245be3c
commit 0d57acc549
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 364 additions and 124 deletions

View File

@ -969,7 +969,7 @@ func (s *Server) registerControllers(deps Deps, proxyUpdater ProxyUpdater) error
if s.useV2Resources {
catalog.RegisterControllers(s.controllerManager, catalog.DefaultControllerDependencies())
multicluster.RegisterControllers(s.controllerManager)
multicluster.RegisterControllers(s.controllerManager, multicluster.DefaultControllerDependencies())
defaultAllow, err := s.config.ACLResolverSettings.IsDefaultAllow()
if err != nil {
return err

View File

@ -6,6 +6,7 @@ package multicluster
import (
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/multicluster/internal/controllers"
exportedServicesSamenessGroupExpander "github.com/hashicorp/consul/internal/multicluster/internal/controllers/exportedservices/expander"
"github.com/hashicorp/consul/internal/multicluster/internal/types"
"github.com/hashicorp/consul/internal/resource"
)
@ -23,8 +24,16 @@ func RegisterTypes(r resource.Registry) {
types.Register(r)
}
type ControllerDependencies = controllers.Dependencies
func DefaultControllerDependencies() ControllerDependencies {
return ControllerDependencies{
ExportedServicesSamenessGroupsExpander: exportedServicesSamenessGroupExpander.New(),
}
}
// RegisterControllers registers controllers for the multicluster types with
// the given controller Manager.
func RegisterControllers(mgr *controller.Manager) {
controllers.Register(mgr)
func RegisterControllers(mgr *controller.Manager, deps ControllerDependencies) {
controllers.Register(mgr, deps)
}

View File

@ -0,0 +1,140 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package exportedservices
import (
"sort"
"github.com/hashicorp/consul/internal/multicluster/internal/types"
"github.com/hashicorp/consul/internal/resource"
pbmulticluster "github.com/hashicorp/consul/proto-public/pbmulticluster/v2beta1"
"github.com/hashicorp/consul/proto-public/pbresource"
)
type serviceExports struct {
ref *pbresource.Reference
partitions map[string]struct{}
peers map[string]struct{}
}
type exportedServicesBuilder struct {
data map[resource.ReferenceKey]*serviceExports
samenessGroupsExpander ExportedServicesSamenessGroupExpander
samenessGroupsNameToMemberMap map[string][]*pbmulticluster.SamenessGroupMember
}
func newExportedServicesBuilder(samenessGroupsExpander ExportedServicesSamenessGroupExpander, samenessGroups []*types.DecodedSamenessGroup) *exportedServicesBuilder {
samenessGroupsNameToMemberMap := make(map[string][]*pbmulticluster.SamenessGroupMember)
for _, sg := range samenessGroups {
sgData := sg.GetData()
if sgData == nil {
// This should never occur
panic("sameness group resource cannot exist without data")
}
samenessGroupsNameToMemberMap[sg.GetId().GetName()] = sgData.GetMembers()
}
return &exportedServicesBuilder{
data: make(map[resource.ReferenceKey]*serviceExports),
samenessGroupsExpander: samenessGroupsExpander,
samenessGroupsNameToMemberMap: samenessGroupsNameToMemberMap,
}
}
func (b *exportedServicesBuilder) track(svcID *pbresource.ID, consumers []*pbmulticluster.ExportedServicesConsumer) error {
key := resource.NewReferenceKey(svcID)
exports, ok := b.data[key]
if !ok {
exports = &serviceExports{
ref: resource.Reference(svcID, ""),
partitions: make(map[string]struct{}),
peers: make(map[string]struct{}),
}
b.data[key] = exports
}
expandedConsumers, err := b.samenessGroupsExpander.Expand(consumers, b.samenessGroupsNameToMemberMap)
if err != nil {
return err
}
for _, p := range expandedConsumers.Partitions {
exports.partitions[p] = struct{}{}
}
for _, p := range expandedConsumers.Peers {
exports.peers[p] = struct{}{}
}
// TODO: Handle status for missing sameness groups
return nil
}
func (b *exportedServicesBuilder) build() *pbmulticluster.ComputedExportedServices {
if len(b.data) == 0 {
return nil
}
ces := &pbmulticluster.ComputedExportedServices{
Consumers: make([]*pbmulticluster.ComputedExportedService, 0, len(b.data)),
}
for _, svc := range sortRefValue(b.data) {
consumers := make([]*pbmulticluster.ComputedExportedServicesConsumer, 0, len(svc.peers)+len(svc.partitions))
for _, peer := range sortKeys(svc.peers) {
consumers = append(consumers, &pbmulticluster.ComputedExportedServicesConsumer{
ConsumerTenancy: &pbmulticluster.ComputedExportedServicesConsumer_Peer{
Peer: peer,
},
})
}
for _, partition := range sortKeys(svc.partitions) {
// Filter out the partition that matches with the
// partition of the service reference. This is done
// to avoid the name of the local partition to be
// present as a consumer in the ComputedExportedService resource.
if svc.ref.Tenancy.Partition == partition {
continue
}
consumers = append(consumers, &pbmulticluster.ComputedExportedServicesConsumer{
ConsumerTenancy: &pbmulticluster.ComputedExportedServicesConsumer_Partition{
Partition: partition,
},
})
}
ces.Consumers = append(ces.Consumers, &pbmulticluster.ComputedExportedService{
TargetRef: svc.ref,
Consumers: consumers,
})
}
return ces
}
func sortKeys(m map[string]struct{}) []string {
keys := make([]string, 0, len(m))
for key := range m {
keys = append(keys, key)
}
sort.Strings(keys)
return keys
}
func sortRefValue(m map[resource.ReferenceKey]*serviceExports) []*serviceExports {
vals := make([]*serviceExports, 0, len(m))
for _, val := range m {
vals = append(vals, val)
}
sort.Slice(vals, func(i, j int) bool {
return resource.ReferenceToString(vals[i].ref) < resource.ReferenceToString(vals[j].ref)
})
return vals
}

View File

@ -5,13 +5,12 @@ package exportedservices
import (
"context"
"fmt"
"sort"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"github.com/hashicorp/consul/internal/controller"
expanderTypes "github.com/hashicorp/consul/internal/multicluster/internal/controllers/exportedservices/expander/types"
"github.com/hashicorp/consul/internal/multicluster/internal/types"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/storage"
@ -24,22 +23,34 @@ const (
ControllerName = "consul.io/exported-services"
)
func Controller() *controller.Controller {
type ExportedServicesSamenessGroupExpander interface {
// Expand resolves a sameness group into peers and partition and returns
// them as individual slices
//
// It also returns back the list of sameness group names that can't be resolved.
Expand([]*pbmulticluster.ExportedServicesConsumer, map[string][]*pbmulticluster.SamenessGroupMember) (*expanderTypes.ExpandedConsumers, error)
return controller.NewController(ControllerName, pbmulticluster.ComputedExportedServicesType).
// List returns the list of sameness groups present in a given partition
List(context.Context, controller.Runtime, controller.Request) ([]*types.DecodedSamenessGroup, error)
}
func Controller(expander ExportedServicesSamenessGroupExpander) *controller.Controller {
if expander == nil {
panic("No sameness group expander was provided to the ExportedServiceController constructor")
}
ctrl := controller.NewController(ControllerName, pbmulticluster.ComputedExportedServicesType).
WithWatch(pbmulticluster.ExportedServicesType, ReplaceTypeForComputedExportedServices()).
WithWatch(pbcatalog.ServiceType, ReplaceTypeForComputedExportedServices()).
WithWatch(pbmulticluster.NamespaceExportedServicesType, ReplaceTypeForComputedExportedServices()).
WithWatch(pbmulticluster.PartitionExportedServicesType, ReplaceTypeForComputedExportedServices()).
WithReconciler(&reconciler{})
WithReconciler(&reconciler{samenessGroupExpander: expander})
return registerEnterpriseResourcesWatchers(ctrl)
}
type reconciler struct{}
type serviceExports struct {
ref *pbresource.Reference
partitions map[string]struct{}
peers map[string]struct{}
type reconciler struct {
samenessGroupExpander ExportedServicesSamenessGroupExpander
}
func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req controller.Request) error {
@ -107,7 +118,14 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
rt.Logger.Error("error getting services", "error", err)
return err
}
builder := newExportedServicesBuilder()
samenessGroups, err := r.samenessGroupExpander.List(ctx, rt, req)
if err != nil {
rt.Logger.Error("failed to fetch sameness groups", err)
return err
}
builder := newExportedServicesBuilder(r.samenessGroupExpander, samenessGroups)
svcs := make(map[resource.ReferenceKey]struct{}, len(services))
for _, svc := range services {
@ -122,7 +140,14 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
Name: svc,
}
if _, ok := svcs[resource.NewReferenceKey(id)]; ok {
builder.track(id, es.Data.Consumers)
if err := builder.track(id, es.Data.Consumers); err != nil {
rt.Logger.Error("error tracking service for exported service",
"exported_service", es.Id.Name,
"service", id.Name,
"error", err,
)
return err
}
}
}
}
@ -132,13 +157,27 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
if svc.Id.Tenancy.Namespace != nes.Id.Tenancy.Namespace {
continue
}
builder.track(svc.Id, nes.Data.Consumers)
if err := builder.track(svc.Id, nes.Data.Consumers); err != nil {
rt.Logger.Error("error tracking service for namespace exported service",
"exported_service", nes.Id.Name,
"service", svc.Id.Name,
"error", err,
)
return err
}
}
}
for _, pes := range partitionedExportedServices {
for _, svc := range services {
builder.track(svc.Id, pes.Data.Consumers)
if err := builder.track(svc.Id, pes.Data.Consumers); err != nil {
rt.Logger.Error("error tracking service for partition exported service",
"exported_service", pes.Id.Name,
"service", svc.Id.Name,
"error", err,
)
return err
}
}
}
newComputedExportedService := builder.build()
@ -208,101 +247,6 @@ func getOldComputedExportedService(ctx context.Context, rt controller.Runtime, r
return computedExpSvcRes, nil
}
type exportedServicesBuilder struct {
data map[resource.ReferenceKey]*serviceExports
}
func newExportedServicesBuilder() *exportedServicesBuilder {
return &exportedServicesBuilder{
data: make(map[resource.ReferenceKey]*serviceExports),
}
}
func (b *exportedServicesBuilder) track(id *pbresource.ID, consumers []*pbmulticluster.ExportedServicesConsumer) error {
key := resource.NewReferenceKey(id)
exports, ok := b.data[key]
if !ok {
exports = &serviceExports{
ref: resource.Reference(id, ""),
partitions: make(map[string]struct{}),
peers: make(map[string]struct{}),
}
b.data[key] = exports
}
for _, c := range consumers {
switch v := c.ConsumerTenancy.(type) {
case *pbmulticluster.ExportedServicesConsumer_Peer:
exports.peers[v.Peer] = struct{}{}
case *pbmulticluster.ExportedServicesConsumer_Partition:
exports.partitions[v.Partition] = struct{}{}
case *pbmulticluster.ExportedServicesConsumer_SamenessGroup:
// TODO do we currently validate that sameness groups can't be set?
return fmt.Errorf("unexpected export to sameness group %q", v.SamenessGroup)
}
}
return nil
}
func (b *exportedServicesBuilder) build() *pbmulticluster.ComputedExportedServices {
if len(b.data) == 0 {
return nil
}
ces := &pbmulticluster.ComputedExportedServices{
Consumers: make([]*pbmulticluster.ComputedExportedService, 0, len(b.data)),
}
for _, svc := range sortRefValue(b.data) {
consumers := make([]*pbmulticluster.ComputedExportedServicesConsumer, 0, len(svc.peers)+len(svc.partitions))
for _, peer := range sortKeys(svc.peers) {
consumers = append(consumers, &pbmulticluster.ComputedExportedServicesConsumer{
ConsumerTenancy: &pbmulticluster.ComputedExportedServicesConsumer_Peer{
Peer: peer,
},
})
}
for _, partition := range sortKeys(svc.partitions) {
consumers = append(consumers, &pbmulticluster.ComputedExportedServicesConsumer{
ConsumerTenancy: &pbmulticluster.ComputedExportedServicesConsumer_Partition{
Partition: partition,
},
})
}
ces.Consumers = append(ces.Consumers, &pbmulticluster.ComputedExportedService{
TargetRef: svc.ref,
Consumers: consumers,
})
}
return ces
}
func sortKeys(m map[string]struct{}) []string {
keys := make([]string, 0, len(m))
for key := range m {
keys = append(keys, key)
}
sort.Strings(keys)
return keys
}
func sortRefValue(m map[resource.ReferenceKey]*serviceExports) []*serviceExports {
vals := make([]*serviceExports, 0, len(m))
for _, val := range m {
vals = append(vals, val)
}
sort.Slice(vals, func(i, j int) bool {
return resource.ReferenceToString(vals[i].ref) < resource.ReferenceToString(vals[j].ref)
})
return vals
}
func getNamespaceForServices(exportedServices []*types.DecodedExportedServices, namespaceExportedServices []*types.DecodedNamespaceExportedServices, partitionedExportedServices []*types.DecodedPartitionExportedServices) string {
if len(partitionedExportedServices) > 0 {
return storage.Wildcard

View File

@ -14,6 +14,7 @@ import (
svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing"
"github.com/hashicorp/consul/internal/catalog"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/multicluster/internal/controllers/exportedservices/expander"
"github.com/hashicorp/consul/internal/multicluster/internal/types"
"github.com/hashicorp/consul/internal/resource"
rtest "github.com/hashicorp/consul/internal/resource/resourcetest"
@ -32,6 +33,7 @@ type controllerSuite struct {
rt controller.Runtime
isEnterprise bool
reconciler *reconciler
samenessGroupExpander ExportedServicesSamenessGroupExpander
tenancies []*pbresource.Tenancy
}
@ -44,12 +46,16 @@ func (suite *controllerSuite) SetupTest() {
WithTenancies(rtest.Tenancy("default.app"), rtest.Tenancy("foo.app")).
Run(suite.T())
suite.samenessGroupExpander = expander.New()
suite.client = rtest.NewClient(client)
suite.rt = controller.Runtime{
Client: suite.client,
Logger: testutil.Logger(suite.T()),
}
suite.reconciler = &reconciler{}
suite.reconciler = &reconciler{
samenessGroupExpander: suite.samenessGroupExpander,
}
suite.isEnterprise = versiontest.IsEnterprise()
}
@ -79,7 +85,7 @@ func (suite *controllerSuite) TestReconcile_DeleteOldCES_NoExportedServices() {
}
if suite.isEnterprise {
oldCESData.Consumers[0].Consumers = append(oldCESData.Consumers[0].Consumers, suite.constructConsumer("peer-n", "partition"))
oldCESData.Consumers[0].Consumers = append(oldCESData.Consumers[0].Consumers, suite.constructConsumer("part-n", "partition"))
}
oldCES := rtest.Resource(pbmulticluster.ComputedExportedServicesType, "global").
@ -345,7 +351,7 @@ func (suite *controllerSuite) TestReconcile_ComputeCES() {
func (suite *controllerSuite) TestController() {
// Run the controller manager
mgr := controller.NewManager(suite.client, suite.rt.Logger)
mgr.Register(Controller())
mgr.Register(Controller(suite.samenessGroupExpander))
mgr.SetRaftLeader(true)
go mgr.Run(suite.ctx)
@ -574,6 +580,7 @@ func (suite *controllerSuite) TestController() {
},
),
)
prototest.AssertDeepEqual(suite.T(), expectedComputedExportedService, computedCES)
suite.writeService("svc5", tenancy)

View File

@ -0,0 +1,12 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
//go:build !consulent
package expander
import "github.com/hashicorp/consul/internal/multicluster/internal/controllers/exportedservices/expander/expander_ce"
func New() *expander_ce.SamenessGroupExpander {
return expander_ce.New()
}

View File

@ -0,0 +1,43 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package expander_ce
import (
"context"
"github.com/hashicorp/consul/internal/controller"
expanderTypes "github.com/hashicorp/consul/internal/multicluster/internal/controllers/exportedservices/expander/types"
"github.com/hashicorp/consul/internal/multicluster/internal/types"
pbmulticluster "github.com/hashicorp/consul/proto-public/pbmulticluster/v2beta1"
)
type SamenessGroupExpander struct{}
func New() *SamenessGroupExpander {
return &SamenessGroupExpander{}
}
func (sg *SamenessGroupExpander) List(_ context.Context, _ controller.Runtime, _ controller.Request) ([]*types.DecodedSamenessGroup, error) {
return nil, nil
}
func (sg *SamenessGroupExpander) Expand(consumers []*pbmulticluster.ExportedServicesConsumer, _ map[string][]*pbmulticluster.SamenessGroupMember) (*expanderTypes.ExpandedConsumers, error) {
peers := make([]string, 0)
peersMap := make(map[string]struct{})
for _, c := range consumers {
switch c.ConsumerTenancy.(type) {
case *pbmulticluster.ExportedServicesConsumer_Peer:
_, ok := peersMap[c.GetPeer()]
if !ok {
peers = append(peers, c.GetPeer())
peersMap[c.GetPeer()] = struct{}{}
}
default:
panic("unexpected consumer tenancy type")
}
}
return &expanderTypes.ExpandedConsumers{
Peers: peers,
}, nil
}

View File

@ -0,0 +1,57 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package expander_ce
import (
"testing"
pbmulticluster "github.com/hashicorp/consul/proto-public/pbmulticluster/v2beta1"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
type expanderSuite struct {
suite.Suite
samenessGroupExpander *SamenessGroupExpander
}
func (suite *expanderSuite) SetupTest() {
suite.samenessGroupExpander = New()
}
func TestExpander(t *testing.T) {
suite.Run(t, new(expanderSuite))
}
func (suite *expanderSuite) TestExpand_NoSamenessGroupsPresent() {
consumers := []*pbmulticluster.ExportedServicesConsumer{
{
ConsumerTenancy: &pbmulticluster.ExportedServicesConsumer_Peer{
Peer: "peer-1",
},
},
{
ConsumerTenancy: &pbmulticluster.ExportedServicesConsumer_Peer{
Peer: "peer-2",
},
},
{
ConsumerTenancy: &pbmulticluster.ExportedServicesConsumer_Peer{
Peer: "peer-3",
},
},
{
ConsumerTenancy: &pbmulticluster.ExportedServicesConsumer_Peer{
Peer: "peer-1",
},
},
}
expandedConsumers, err := suite.samenessGroupExpander.Expand(consumers, nil)
require.NoError(suite.T(), err)
require.Equal(suite.T(), []string{"peer-1", "peer-2", "peer-3"}, expandedConsumers.Peers)
require.Nil(suite.T(), expandedConsumers.Partitions)
require.Len(suite.T(), expandedConsumers.MissingSamenessGroups, 0)
}

View File

@ -0,0 +1,10 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package types
type ExpandedConsumers struct {
Peers []string
Partitions []string
MissingSamenessGroups []string
}

View File

@ -0,0 +1,14 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
//go:build !consulent
package exportedservices
import (
"github.com/hashicorp/consul/internal/controller"
)
func registerEnterpriseResourcesWatchers(controller *controller.Controller) *controller.Controller {
return controller
}

View File

@ -8,6 +8,10 @@ import (
"github.com/hashicorp/consul/internal/multicluster/internal/controllers/exportedservices"
)
func Register(mgr *controller.Manager) {
mgr.Register(exportedservices.Controller())
type Dependencies struct {
ExportedServicesSamenessGroupsExpander exportedservices.ExportedServicesSamenessGroupExpander
}
func Register(mgr *controller.Manager, deps Dependencies) {
mgr.Register(exportedservices.Controller(deps.ExportedServicesSamenessGroupsExpander))
}