From 44bcda85239ed102c070abf6afbb54fd5fa8a3de Mon Sep 17 00:00:00 2001 From: skpratt Date: Tue, 23 Jan 2024 19:44:10 -0600 Subject: [PATCH] Net 7074/decentralized exported services management (#20318) * Add decentralized management of V1 exported-services config entries using V2 multicluster resources. * cleanup --------- Co-authored-by: Matt Keeler --- agent/consul/server.go | 5 +- agent/consul/v2_config_entry_exports_shim.go | 69 ++++ internal/multicluster/exports.go | 4 +- .../internal/controllers/register.go | 3 + .../controllers/v1compat/controller.go | 370 ++++++++++++++++++ 5 files changed, 449 insertions(+), 2 deletions(-) create mode 100644 agent/consul/v2_config_entry_exports_shim.go create mode 100644 internal/multicluster/internal/controllers/v1compat/controller.go diff --git a/agent/consul/server.go b/agent/consul/server.go index 471e2b2c30..97ec8cecb7 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -36,6 +36,7 @@ import ( "google.golang.org/grpc" "github.com/hashicorp/consul-net-rpc/net/rpc" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/blockingquery" "github.com/hashicorp/consul/agent/connect" @@ -1005,7 +1006,7 @@ func (s *Server) registerControllers(deps Deps, proxyUpdater ProxyUpdater) error if s.useV2Resources { catalog.RegisterControllers(s.controllerManager) - multicluster.RegisterControllers(s.controllerManager, multicluster.DefaultControllerDependencies()) + multicluster.RegisterControllers(s.controllerManager, multicluster.DefaultControllerDependencies(&V1ServiceExportsShim{s: s})) defaultAllow, err := s.config.ACLResolverSettings.IsDefaultAllow() if err != nil { return err @@ -1043,6 +1044,8 @@ func (s *Server) registerControllers(deps Deps, proxyUpdater ProxyUpdater) error auth.RegisterControllers(s.controllerManager, auth.DefaultControllerDependencies()) } + multicluster.RegisterControllers(s.controllerManager, multicluster.DefaultControllerDependencies(&V1ServiceExportsShim{s: s})) + reaper.RegisterControllers(s.controllerManager) if s.config.DevMode { diff --git a/agent/consul/v2_config_entry_exports_shim.go b/agent/consul/v2_config_entry_exports_shim.go new file mode 100644 index 0000000000..4bdf3791bc --- /dev/null +++ b/agent/consul/v2_config_entry_exports_shim.go @@ -0,0 +1,69 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package consul + +import ( + "context" + "fmt" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/structs" +) + +type V1ServiceExportsShim struct { + s *Server +} + +func (s *V1ServiceExportsShim) GetExportedServicesConfigEntry(_ context.Context, name string, entMeta *acl.EnterpriseMeta) (*structs.ExportedServicesConfigEntry, error) { + _, entry, err := s.s.fsm.State().ConfigEntry(nil, structs.ExportedServices, name, entMeta) + if err != nil { + return nil, err + } + + if entry == nil { + return nil, nil + } + + exp, ok := entry.(*structs.ExportedServicesConfigEntry) + if !ok { + return nil, fmt.Errorf("exported services config entry is the wrong type: expected ExportedServicesConfigEntry, actual: %T", entry) + } + + return exp, nil +} + +func (s *V1ServiceExportsShim) WriteExportedServicesConfigEntry(_ context.Context, cfg *structs.ExportedServicesConfigEntry) error { + if err := cfg.Normalize(); err != nil { + return err + } + + if err := cfg.Validate(); err != nil { + return err + } + + req := &structs.ConfigEntryRequest{ + Op: structs.ConfigEntryUpsert, + Entry: cfg, + } + + _, err := s.s.raftApply(structs.ConfigEntryRequestType, req) + return err +} + +func (s *V1ServiceExportsShim) DeleteExportedServicesConfigEntry(_ context.Context, name string, entMeta *acl.EnterpriseMeta) error { + req := &structs.ConfigEntryRequest{ + Op: structs.ConfigEntryDelete, + Entry: &structs.ExportedServicesConfigEntry{ + Name: name, + EnterpriseMeta: *entMeta, + }, + } + + if err := req.Entry.Normalize(); err != nil { + return err + } + + _, err := s.s.raftApply(structs.ConfigEntryRequestType, req) + return err +} diff --git a/internal/multicluster/exports.go b/internal/multicluster/exports.go index 48311ca04f..e23269619d 100644 --- a/internal/multicluster/exports.go +++ b/internal/multicluster/exports.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/multicluster/internal/controllers" exportedServicesSamenessGroupExpander "github.com/hashicorp/consul/internal/multicluster/internal/controllers/exportedservices/expander" + "github.com/hashicorp/consul/internal/multicluster/internal/controllers/v1compat" "github.com/hashicorp/consul/internal/multicluster/internal/types" "github.com/hashicorp/consul/internal/resource" ) @@ -26,9 +27,10 @@ func RegisterTypes(r resource.Registry) { type ControllerDependencies = controllers.Dependencies -func DefaultControllerDependencies() ControllerDependencies { +func DefaultControllerDependencies(ac v1compat.AggregatedConfig) ControllerDependencies { return ControllerDependencies{ ExportedServicesSamenessGroupsExpander: exportedServicesSamenessGroupExpander.New(), + ConfigEntryExports: ac, } } diff --git a/internal/multicluster/internal/controllers/register.go b/internal/multicluster/internal/controllers/register.go index 5f85240d1b..e779c21aa0 100644 --- a/internal/multicluster/internal/controllers/register.go +++ b/internal/multicluster/internal/controllers/register.go @@ -6,12 +6,15 @@ package controllers import ( "github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/multicluster/internal/controllers/exportedservices" + "github.com/hashicorp/consul/internal/multicluster/internal/controllers/v1compat" ) type Dependencies struct { ExportedServicesSamenessGroupsExpander exportedservices.ExportedServicesSamenessGroupExpander + ConfigEntryExports v1compat.AggregatedConfig } func Register(mgr *controller.Manager, deps Dependencies) { mgr.Register(exportedservices.Controller(deps.ExportedServicesSamenessGroupsExpander)) + mgr.Register(v1compat.Controller(deps.ConfigEntryExports)) } diff --git a/internal/multicluster/internal/controllers/v1compat/controller.go b/internal/multicluster/internal/controllers/v1compat/controller.go new file mode 100644 index 0000000000..f5079b618f --- /dev/null +++ b/internal/multicluster/internal/controllers/v1compat/controller.go @@ -0,0 +1,370 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package v1compat + +import ( + "context" + "fmt" + "sort" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/multicluster/internal/types" + "github.com/hashicorp/consul/internal/resource" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1" + pbmulticluster "github.com/hashicorp/consul/proto-public/pbmulticluster/v2beta1" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +const ( + ControllerName = "consul.io/exported-services-v1-compat" + controllerMetaKey = "managed-by-controller" +) + +type AggregatedConfig interface { + GetExportedServicesConfigEntry(context.Context, string, *acl.EnterpriseMeta) (*structs.ExportedServicesConfigEntry, error) + WriteExportedServicesConfigEntry(context.Context, *structs.ExportedServicesConfigEntry) error + DeleteExportedServicesConfigEntry(context.Context, string, *acl.EnterpriseMeta) error +} + +func mapExportedServices(_ context.Context, _ controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) { + return []controller.Request{ + { + ID: &pbresource.ID{ + Type: pbmulticluster.ComputedExportedServicesType, + Tenancy: &pbresource.Tenancy{ + Partition: res.Id.Tenancy.Partition, + }, + Name: types.ComputedExportedServicesName, + }, + }, + }, nil +} + +func Controller(config AggregatedConfig) *controller.Controller { + return controller.NewController(ControllerName, pbmulticluster.ComputedExportedServicesType). + WithWatch(pbmulticluster.PartitionExportedServicesType, mapExportedServices). + WithWatch(pbmulticluster.NamespaceExportedServicesType, mapExportedServices). + WithWatch(pbmulticluster.ExportedServicesType, mapExportedServices). + // TODO Add custom watch for exported-services for config entry events to attempt re-reconciliation when that changes + WithReconciler(&reconciler{config: config}) +} + +type reconciler struct { + config AggregatedConfig +} + +// Reconcile will reconcile one ComputedExportedServices in response to some event. +func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req controller.Request) error { + rt.Logger = rt.Logger.With("resource-id", req.ID, "controller", ControllerName) + + entMeta := acl.DefaultEnterpriseMeta() + entMeta.OverridePartition(req.ID.Tenancy.Partition) + existing, err := r.config.GetExportedServicesConfigEntry(ctx, req.ID.Tenancy.Partition, entMeta) + if err != nil { + rt.Logger.Error("error getting exported service config entry", "error", err) + } + + if existing != nil && existing.Meta["managed-by-controller"] != ControllerName { + // in order to not cause an outage we need to ensure that the user has replicated all + // of their exports to v2 resources first and then setting the meta key on the existing + // config entry to allow this controller to overwrite what the user previously had. + rt.Logger.Info("existing exported-services config entry is not managed with v2 resources. Add a metadata value of %q with a value of %q to opt-in to controller management of that config entry", controllerMetaKey, ControllerName) + return nil + } + + newCfg := &structs.ExportedServicesConfigEntry{ + // v1 exported-services config entries must have a Name that is the partitions name + Name: req.ID.Tenancy.Partition, + Meta: map[string]string{ + controllerMetaKey: ControllerName, + }, + EnterpriseMeta: *entMeta, + } + + partitionExports, err := resource.ListDecodedResource[*pbmulticluster.PartitionExportedServices](ctx, rt.Client, &pbresource.ListRequest{ + Type: pbmulticluster.PartitionExportedServicesType, + Tenancy: req.ID.Tenancy, + }) + + if err != nil { + rt.Logger.Error("error retrieving partition exported services", "error", err) + return err + } + + namespaceExports, err := resource.ListDecodedResource[*pbmulticluster.NamespaceExportedServices](ctx, rt.Client, &pbresource.ListRequest{ + Type: pbmulticluster.NamespaceExportedServicesType, + Tenancy: req.ID.Tenancy, + }) + + if err != nil { + rt.Logger.Error("error retrieving namespace exported service", "error", err) + return err + } + + serviceExports, err := resource.ListDecodedResource[*pbmulticluster.ExportedServices](ctx, rt.Client, &pbresource.ListRequest{ + Type: pbmulticluster.ExportedServicesType, + Tenancy: req.ID.Tenancy, + }) + + if err != nil { + rt.Logger.Error("error retrieving exported services", "error", err) + return err + } + + if len(partitionExports) == 0 && len(namespaceExports) == 0 && len(serviceExports) == 0 { + if existing == nil { + return nil + } + + if err := r.config.DeleteExportedServicesConfigEntry(ctx, req.ID.Tenancy.Partition, entMeta); err != nil { + rt.Logger.Error("error deleting exported services config entry", "error", err) + return err + } + return nil + } + + exports := newExportTracker() + + for _, partitionExport := range partitionExports { + exports.trackPartitionConsumer(partitionExport.Data.Consumers) + } + + for _, namespaceExport := range namespaceExports { + exports.trackNamespaceConsumers(namespaceExport.Id.Tenancy.Namespace, namespaceExport.Data.Consumers) + } + + for _, serviceExport := range serviceExports { + for _, svc := range serviceExport.Data.Services { + svcId := &pbresource.ID{ + Type: pbcatalog.ServiceType, + Tenancy: serviceExport.Id.Tenancy, + Name: svc, + } + + exports.trackExportedServices(svcId, serviceExport.Data.Consumers) + } + } + + newCfg.Services = exports.allExports() + + if existing != nil && configEntryEquivalent(existing, newCfg) { + rt.Logger.Trace("managed exported-services config entry is already up to date") + return nil + } + + if err := r.config.WriteExportedServicesConfigEntry(ctx, newCfg); err != nil { + rt.Logger.Error("error writing exported services config entry", "error", err) + return err + } + + rt.Logger.Debug("Updated exported services config entry") + return nil +} + +type exportTracker struct { + partitions *exportConsumers + namespaces map[string]*exportConsumers + services map[resource.ReferenceKey]*exportConsumers +} + +type exportConsumers struct { + partitions map[string]struct{} + peers map[string]struct{} + samenessGroups map[string]struct{} +} + +func newExportConsumers() *exportConsumers { + return &exportConsumers{ + partitions: make(map[string]struct{}), + peers: make(map[string]struct{}), + samenessGroups: make(map[string]struct{}), + } +} + +func (c *exportConsumers) addConsumers(consumers []*pbmulticluster.ExportedServicesConsumer) { + for _, consumer := range consumers { + switch v := consumer.ConsumerTenancy.(type) { + case *pbmulticluster.ExportedServicesConsumer_Peer: + c.peers[v.Peer] = struct{}{} + case *pbmulticluster.ExportedServicesConsumer_Partition: + c.partitions[v.Partition] = struct{}{} + case *pbmulticluster.ExportedServicesConsumer_SamenessGroup: + c.samenessGroups[v.SamenessGroup] = struct{}{} + default: + panic(fmt.Errorf("Unknown exported service consumer type: %T", v)) + } + } +} + +func (c *exportConsumers) configEntryConsumers() []structs.ServiceConsumer { + consumers := make([]structs.ServiceConsumer, 0, len(c.partitions)+len(c.peers)+len(c.samenessGroups)) + + partitions := keys(c.partitions) + sort.Strings(partitions) + for _, consumer := range partitions { + consumers = append(consumers, structs.ServiceConsumer{ + Partition: consumer, + }) + } + + peers := keys(c.peers) + sort.Strings(peers) + for _, consumer := range peers { + consumers = append(consumers, structs.ServiceConsumer{ + Partition: consumer, + }) + } + + samenessGroups := keys(c.samenessGroups) + sort.Strings(samenessGroups) + for _, consumer := range samenessGroups { + consumers = append(consumers, structs.ServiceConsumer{ + Partition: consumer, + }) + } + + return consumers +} + +func newExportTracker() *exportTracker { + return &exportTracker{ + partitions: newExportConsumers(), + namespaces: make(map[string]*exportConsumers), + services: make(map[resource.ReferenceKey]*exportConsumers), + } +} + +func (t *exportTracker) trackPartitionConsumer(consumers []*pbmulticluster.ExportedServicesConsumer) { + t.partitions.addConsumers(consumers) +} + +func (t *exportTracker) trackNamespaceConsumers(namespace string, consumers []*pbmulticluster.ExportedServicesConsumer) { + c, ok := t.namespaces[namespace] + if !ok { + c = newExportConsumers() + t.namespaces[namespace] = c + } + + c.addConsumers(consumers) +} + +func (t *exportTracker) trackExportedServices(svcID *pbresource.ID, consumers []*pbmulticluster.ExportedServicesConsumer) { + key := resource.NewReferenceKey(svcID) + + c, ok := t.services[key] + if !ok { + c = newExportConsumers() + t.services[key] = c + } + + c.addConsumers(consumers) +} + +func (t *exportTracker) allExports() []structs.ExportedService { + var exports []structs.ExportedService + + partitionConsumers := t.partitions.configEntryConsumers() + if len(partitionConsumers) > 0 { + exports = append(exports, structs.ExportedService{ + Name: "*", + Namespace: "*", + Consumers: partitionConsumers, + }) + } + + namespaces := keys(t.namespaces) + sort.Strings(namespaces) + for _, ns := range namespaces { + exports = append(exports, structs.ExportedService{ + Name: "*", + Namespace: ns, + Consumers: t.namespaces[ns].configEntryConsumers(), + }) + } + + services := keys(t.services) + sort.Slice(services, func(i, j int) bool { + // the partitions must already be equal because we are only + // looking at resource exports for a single partition. + + if services[i].Namespace < services[j].Namespace { + return false + } else if services[i].Namespace > services[j].Namespace { + return true + } + + if services[i].Name < services[j].Name { + return false + } else if services[i].Name > services[j].Name { + return true + } + + return false + }) + for _, svcKey := range services { + exports = append(exports, structs.ExportedService{ + Name: svcKey.Name, + Namespace: svcKey.Namespace, + Consumers: t.services[svcKey].configEntryConsumers(), + }) + } + + return exports +} + +func configEntryEquivalent(a, b *structs.ExportedServicesConfigEntry) bool { + if a.Name != b.Name { + return false + } + + if len(a.Services) != len(b.Services) { + return false + } + + for i := 0; i < len(a.Services); i++ { + svcA := a.Services[i] + svcB := b.Services[i] + + if svcA.Name != svcB.Name { + return false + } + + if svcA.Namespace != svcB.Namespace { + return false + } + + if len(svcA.Consumers) != len(svcB.Consumers) { + return false + } + + for j := 0; j < len(svcA.Consumers); j++ { + consumerA := svcA.Consumers[j] + consumerB := svcB.Consumers[j] + + if consumerA.Partition != consumerB.Partition { + return false + } + + if consumerA.Peer != consumerB.Peer { + return false + } + + if consumerA.SamenessGroup != consumerB.SamenessGroup { + return false + } + } + } + return true +} + +func keys[K comparable, V any](m map[K]V) []K { + keys := make([]K, 0, len(m)) + for key := range m { + keys = append(keys, key) + } + + return keys +}