diff --git a/agent/proxycfg/proxycfg.deepcopy.go b/agent/proxycfg/proxycfg.deepcopy.go index 7c577e18aa..350f76bf08 100644 --- a/agent/proxycfg/proxycfg.deepcopy.go +++ b/agent/proxycfg/proxycfg.deepcopy.go @@ -13,6 +13,10 @@ import ( // DeepCopy generates a deep copy of *ConfigSnapshot func (o *ConfigSnapshot) DeepCopy() *ConfigSnapshot { var cp ConfigSnapshot = *o + if o.ServiceLocality != nil { + cp.ServiceLocality = new(structs.Locality) + *cp.ServiceLocality = *o.ServiceLocality + } if o.ServiceMeta != nil { cp.ServiceMeta = make(map[string]string, len(o.ServiceMeta)) for k2, v2 := range o.ServiceMeta { diff --git a/agent/proxycfg/snapshot.go b/agent/proxycfg/snapshot.go index 1880dcd669..e8f95d9651 100644 --- a/agent/proxycfg/snapshot.go +++ b/agent/proxycfg/snapshot.go @@ -901,6 +901,7 @@ func IngressListenerKeyFromListener(l structs.IngressListener) IngressListenerKe type ConfigSnapshot struct { Kind structs.ServiceKind Service string + ServiceLocality *structs.Locality ProxyID ProxyID Address string Port int diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 911e4f316e..028a3fd59d 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -124,6 +124,7 @@ type serviceInstance struct { taggedAddresses map[string]structs.ServiceAddress proxyCfg structs.ConnectProxyConfig token string + locality *structs.Locality } func copyProxyConfig(ns *structs.NodeService) (structs.ConnectProxyConfig, error) { @@ -244,6 +245,7 @@ func newServiceInstanceFromNodeService(id ProxyID, ns *structs.NodeService, toke return serviceInstance{ kind: ns.Kind, service: ns.Service, + locality: ns.Locality, proxyID: id, address: ns.Address, port: ns.Port, @@ -303,6 +305,7 @@ func newConfigSnapshotFromServiceInstance(s serviceInstance, config stateConfig) return ConfigSnapshot{ Kind: s.kind, Service: s.service, + ServiceLocality: s.locality, ProxyID: s.proxyID, Address: s.address, Port: s.port, diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 1499c35eb8..c1f6d07606 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -2095,6 +2095,18 @@ func (csn *CheckServiceNode) CanRead(authz acl.Authorizer) acl.EnforcementDecisi return acl.Allow } +func (csn *CheckServiceNode) Locality() *Locality { + if csn.Service != nil && csn.Service.Locality != nil { + return csn.Service.Locality + } + + if csn.Node != nil && csn.Node.Locality != nil { + return csn.Node.Locality + } + + return nil +} + type CheckServiceNodes []CheckServiceNode func (csns CheckServiceNodes) DeepCopy() CheckServiceNodes { diff --git a/agent/xds/endpoints.go b/agent/xds/endpoints.go index ad03971336..aef2dc31c9 100644 --- a/agent/xds/endpoints.go +++ b/agent/xds/endpoints.go @@ -135,7 +135,9 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg. endpoints, ok := cfgSnap.ConnectProxy.PreparedQueryEndpoints[uid] if ok { la := makeLoadAssignment( + cfgSnap, clusterName, + nil, []loadAssignmentEndpointGroup{ {Endpoints: endpoints}, }, @@ -158,7 +160,9 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg. endpoints, ok := cfgSnap.ConnectProxy.DestinationGateways.Get(uid) if ok { la := makeLoadAssignment( + cfgSnap, name, + nil, []loadAssignmentEndpointGroup{ {Endpoints: endpoints}, }, @@ -224,7 +228,9 @@ func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.C clusterName := connect.GatewaySNI(key.Datacenter, key.Partition, cfgSnap.Roots.TrustDomain) la := makeLoadAssignment( + cfgSnap, clusterName, + nil, []loadAssignmentEndpointGroup{ {Endpoints: endpoints}, }, @@ -239,7 +245,9 @@ func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.C clusterName := cfgSnap.ServerSNIFn(key.Datacenter, "") la := makeLoadAssignment( + cfgSnap, clusterName, + nil, []loadAssignmentEndpointGroup{ {Endpoints: endpoints}, }, @@ -409,7 +417,9 @@ func (s *ResourceGenerator) endpointsFromServicesAndResolvers( for subsetName, groups := range clusterEndpoints { clusterName := connect.ServiceSNI(svc.Name, subsetName, svc.NamespaceOrDefault(), svc.PartitionOrDefault(), cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain) la := makeLoadAssignment( + cfgSnap, clusterName, + nil, groups, cfgSnap.Locality, ) @@ -444,7 +454,9 @@ func (s *ResourceGenerator) makeEndpointsForOutgoingPeeredServices( groups := []loadAssignmentEndpointGroup{{Endpoints: serviceGroup.Nodes, OnlyPassing: false}} la := makeLoadAssignment( + cfgSnap, clusterName, + nil, groups, // Use an empty key here so that it never matches. This will force the mesh gateway to always // reference the remote mesh gateway's wan addr. @@ -606,7 +618,9 @@ func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService( return la, nil } la = makeLoadAssignment( + cfgSnap, clusterName, + nil, []loadAssignmentEndpointGroup{ {Endpoints: localGw}, }, @@ -626,7 +640,9 @@ func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService( return nil, nil } la = makeLoadAssignment( + cfgSnap, clusterName, + nil, []loadAssignmentEndpointGroup{ {Endpoints: endpoints}, }, @@ -756,7 +772,9 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain( } la := makeLoadAssignment( + cfgSnap, clusterName, + ti.PrioritizeByLocality, []loadAssignmentEndpointGroup{endpointGroup}, gatewayKey, ) @@ -842,7 +860,7 @@ type loadAssignmentEndpointGroup struct { OverrideHealth envoy_core_v3.HealthStatus } -func makeLoadAssignment(clusterName string, endpointGroups []loadAssignmentEndpointGroup, localKey proxycfg.GatewayKey) *envoy_endpoint_v3.ClusterLoadAssignment { +func makeLoadAssignment(cfgSnap *proxycfg.ConfigSnapshot, clusterName string, policy *structs.DiscoveryPrioritizeByLocality, endpointGroups []loadAssignmentEndpointGroup, localKey proxycfg.GatewayKey) *envoy_endpoint_v3.ClusterLoadAssignment { cla := &envoy_endpoint_v3.ClusterLoadAssignment{ ClusterName: clusterName, Endpoints: make([]*envoy_endpoint_v3.LocalityLbEndpoints, 0, len(endpointGroups)), @@ -856,35 +874,46 @@ func makeLoadAssignment(clusterName string, endpointGroups []loadAssignmentEndpo } } - for priority, endpointGroup := range endpointGroups { - endpoints := endpointGroup.Endpoints - es := make([]*envoy_endpoint_v3.LbEndpoint, 0, len(endpoints)) + var priority uint32 - for _, ep := range endpoints { - // TODO (mesh-gateway) - should we respect the translate_wan_addrs configuration here or just always use the wan for cross-dc? - _, addr, port := ep.BestAddress(!localKey.Matches(ep.Node.Datacenter, ep.Node.PartitionOrDefault())) - healthStatus, weight := calculateEndpointHealthAndWeight(ep, endpointGroup.OnlyPassing) + for _, endpointGroup := range endpointGroups { + endpointsByLocality, err := groupedEndpoints(cfgSnap.ServiceLocality, policy, endpointGroup.Endpoints) - if endpointGroup.OverrideHealth != envoy_core_v3.HealthStatus_UNKNOWN { - healthStatus = endpointGroup.OverrideHealth - } - - endpoint := &envoy_endpoint_v3.Endpoint{ - Address: makeAddress(addr, port), - } - es = append(es, &envoy_endpoint_v3.LbEndpoint{ - HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{ - Endpoint: endpoint, - }, - HealthStatus: healthStatus, - LoadBalancingWeight: makeUint32Value(weight), - }) + if err != nil { + continue } - cla.Endpoints = append(cla.Endpoints, &envoy_endpoint_v3.LocalityLbEndpoints{ - Priority: uint32(priority), - LbEndpoints: es, - }) + for _, endpoints := range endpointsByLocality { + es := make([]*envoy_endpoint_v3.LbEndpoint, 0, len(endpointGroup.Endpoints)) + + for _, ep := range endpoints { + // TODO (mesh-gateway) - should we respect the translate_wan_addrs configuration here or just always use the wan for cross-dc? + _, addr, port := ep.BestAddress(!localKey.Matches(ep.Node.Datacenter, ep.Node.PartitionOrDefault())) + healthStatus, weight := calculateEndpointHealthAndWeight(ep, endpointGroup.OnlyPassing) + + if endpointGroup.OverrideHealth != envoy_core_v3.HealthStatus_UNKNOWN { + healthStatus = endpointGroup.OverrideHealth + } + + endpoint := &envoy_endpoint_v3.Endpoint{ + Address: makeAddress(addr, port), + } + es = append(es, &envoy_endpoint_v3.LbEndpoint{ + HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{ + Endpoint: endpoint, + }, + HealthStatus: healthStatus, + LoadBalancingWeight: makeUint32Value(weight), + }) + } + + cla.Endpoints = append(cla.Endpoints, &envoy_endpoint_v3.LocalityLbEndpoints{ + Priority: priority, + LbEndpoints: es, + }) + + priority++ + } } return cla diff --git a/agent/xds/endpoints_test.go b/agent/xds/endpoints_test.go index ebdd06aa41..eee35103aa 100644 --- a/agent/xds/endpoints_test.go +++ b/agent/xds/endpoints_test.go @@ -101,6 +101,7 @@ func Test_makeLoadAssignment(t *testing.T) { tests := []struct { name string clusterName string + locality *structs.Locality endpoints []loadAssignmentEndpointGroup want *envoy_endpoint_v3.ClusterLoadAssignment }{ @@ -211,11 +212,24 @@ func Test_makeLoadAssignment(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { got := makeLoadAssignment( + &proxycfg.ConfigSnapshot{ServiceLocality: tt.locality}, tt.clusterName, + nil, tt.endpoints, proxycfg.GatewayKey{Datacenter: "dc1"}, ) require.Equal(t, tt.want, got) + + if tt.locality == nil { + got := makeLoadAssignment( + &proxycfg.ConfigSnapshot{ServiceLocality: &structs.Locality{Region: "us-west-1", Zone: "us-west-1a"}}, + tt.clusterName, + nil, + tt.endpoints, + proxycfg.GatewayKey{Datacenter: "dc1"}, + ) + require.Equal(t, tt.want, got) + } }) } } diff --git a/agent/xds/failover_policy.go b/agent/xds/failover_policy.go index 5edcae914d..77839a37cf 100644 --- a/agent/xds/failover_policy.go +++ b/agent/xds/failover_policy.go @@ -27,6 +27,8 @@ type targetInfo struct { // Region is the region from the failover target's Locality. nil means the // target is in the local Consul cluster. Region *string + + PrioritizeByLocality *structs.DiscoveryPrioritizeByLocality } type discoChainTargetGroup struct { @@ -87,7 +89,7 @@ func (s *ResourceGenerator) mapDiscoChainTargets(cfgSnap *proxycfg.ConfigSnapsho var sni, rootPEMs string var spiffeIDs []string targetUID := proxycfg.NewUpstreamIDFromTargetID(tid) - ti := targetInfo{TargetID: tid} + ti := targetInfo{TargetID: tid, PrioritizeByLocality: target.PrioritizeByLocality} configureTLS := true if forMeshGateway { diff --git a/agent/xds/locality_policy.go b/agent/xds/locality_policy.go new file mode 100644 index 0000000000..d2dd977f1a --- /dev/null +++ b/agent/xds/locality_policy.go @@ -0,0 +1,21 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package xds + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/structs" +) + +func groupedEndpoints(locality *structs.Locality, policy *structs.DiscoveryPrioritizeByLocality, csns structs.CheckServiceNodes) ([]structs.CheckServiceNodes, error) { + switch { + case policy == nil || policy.Mode == "" || policy.Mode == "none": + return []structs.CheckServiceNodes{csns}, nil + case policy.Mode == "failover": + return prioritizeByLocalityFailover(locality, csns), nil + default: + return nil, fmt.Errorf("unexpected priortize-by-locality mode %q", policy.Mode) + } +} diff --git a/agent/xds/locality_policy_oss.go b/agent/xds/locality_policy_oss.go new file mode 100644 index 0000000000..16147aeb0c --- /dev/null +++ b/agent/xds/locality_policy_oss.go @@ -0,0 +1,15 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +//go:build !consulent +// +build !consulent + +package xds + +import ( + "github.com/hashicorp/consul/agent/structs" +) + +func prioritizeByLocalityFailover(locality *structs.Locality, csns structs.CheckServiceNodes) []structs.CheckServiceNodes { + return nil +} diff --git a/test/integration/consul-container/libs/assert/service.go b/test/integration/consul-container/libs/assert/service.go index 57e853e1e7..f8ff1d0ba9 100644 --- a/test/integration/consul-container/libs/assert/service.go +++ b/test/integration/consul-container/libs/assert/service.go @@ -40,6 +40,20 @@ func CatalogServiceExists(t *testing.T, c *api.Client, svc string, opts *api.Que }) } +// CatalogServiceHasInstanceCount verifies the service name exists in the Consul catalog and has the specified +// number of instances. +func CatalogServiceHasInstanceCount(t *testing.T, c *api.Client, svc string, count int, opts *api.QueryOptions) { + retry.Run(t, func(r *retry.R) { + services, _, err := c.Catalog().Service(svc, "", opts) + if err != nil { + r.Fatal("error reading service data") + } + if len(services) != count { + r.Fatalf("did not find %d catalog entries for %s", count, svc) + } + }) +} + // CatalogServiceExists verifies the node name exists in the Consul catalog func CatalogNodeExists(t *testing.T, c *api.Client, nodeName string) { retry.Run(t, func(r *retry.R) { diff --git a/test/integration/consul-container/libs/service/helpers.go b/test/integration/consul-container/libs/service/helpers.go index ac254b846a..70624bf001 100644 --- a/test/integration/consul-container/libs/service/helpers.go +++ b/test/integration/consul-container/libs/service/helpers.go @@ -46,6 +46,7 @@ type ServiceOpts struct { Checks Checks Connect SidecarService Namespace string + Locality *api.Locality } // createAndRegisterStaticServerAndSidecar register the services and launch static-server containers @@ -119,6 +120,7 @@ func CreateAndRegisterStaticServerAndSidecar(node libcluster.Agent, serviceOpts Namespace: serviceOpts.Namespace, Meta: serviceOpts.Meta, Check: &agentCheck, + Locality: serviceOpts.Locality, } return createAndRegisterStaticServerAndSidecar(node, serviceOpts.HTTPPort, serviceOpts.GRPCPort, req, containerArgs...) }