consul/agent/xds/endpoints.go
Freddy 9ed325ba8b
Enable gateways to resolve hostnames to IPv4 addresses (#7999)
The DNS resolution will be handled by Envoy and defaults to LOGICAL_DNS. This discovery type can be overridden on a per-gateway basis with the envoy_dns_discovery_type Gateway Option.

If a service contains an instance with a hostname as an address we set the Envoy cluster to use DNS as the discovery type rather than EDS. Since both mesh gateways and terminating gateways route to clusters using SNI, whenever there is a mix of hostnames and IP addresses associated with a service we use the hostname + CDS rather than the IPs + EDS.

Note that we detect hostnames by attempting to parse the service instance's address as an IP. If it is not a valid IP we assume it is a hostname.
2020-06-03 15:28:45 -06:00

555 lines
17 KiB
Go

package xds
import (
"errors"
"fmt"
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
envoyendpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
"github.com/gogo/protobuf/proto"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
bexpr "github.com/hashicorp/go-bexpr"
)
const (
UnnamedSubset = ""
)
// endpointsFromSnapshot returns the xDS API representation of the "endpoints"
func (s *Server) endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, _ string) ([]proto.Message, error) {
if cfgSnap == nil {
return nil, errors.New("nil config given")
}
switch cfgSnap.Kind {
case structs.ServiceKindConnectProxy:
return s.endpointsFromSnapshotConnectProxy(cfgSnap)
case structs.ServiceKindTerminatingGateway:
return s.endpointsFromSnapshotTerminatingGateway(cfgSnap)
case structs.ServiceKindMeshGateway:
return s.endpointsFromSnapshotMeshGateway(cfgSnap)
case structs.ServiceKindIngressGateway:
return s.endpointsFromSnapshotIngressGateway(cfgSnap)
default:
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
}
}
// endpointsFromSnapshotConnectProxy returns the xDS API representation of the "endpoints"
// (upstream instances) in the snapshot.
func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
resources := make([]proto.Message, 0,
len(cfgSnap.ConnectProxy.PreparedQueryEndpoints)+len(cfgSnap.ConnectProxy.WatchedUpstreamEndpoints))
for _, u := range cfgSnap.Proxy.Upstreams {
id := u.Identifier()
var chain *structs.CompiledDiscoveryChain
if u.DestinationType != structs.UpstreamDestTypePreparedQuery {
chain = cfgSnap.ConnectProxy.DiscoveryChain[id]
}
if chain == nil {
// We ONLY want this branch for prepared queries.
dc := u.Datacenter
if dc == "" {
dc = cfgSnap.Datacenter
}
clusterName := connect.UpstreamSNI(&u, "", dc, cfgSnap.Roots.TrustDomain)
endpoints, ok := cfgSnap.ConnectProxy.PreparedQueryEndpoints[id]
if ok {
la := makeLoadAssignment(
clusterName,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
cfgSnap.Datacenter,
)
resources = append(resources, la)
}
} else {
// Newfangled discovery chain plumbing.
es := s.endpointsFromDiscoveryChain(
u,
chain,
cfgSnap.Datacenter,
cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id],
cfgSnap.ConnectProxy.WatchedGatewayEndpoints[id],
)
resources = append(resources, es...)
}
}
return resources, nil
}
func (s *Server) filterSubsetEndpoints(subset *structs.ServiceResolverSubset, endpoints structs.CheckServiceNodes) (structs.CheckServiceNodes, error) {
// locally execute the subsets filter
if subset.Filter != "" {
filter, err := bexpr.CreateFilter(subset.Filter, nil, endpoints)
if err != nil {
return nil, err
}
raw, err := filter.Execute(endpoints)
if err != nil {
return nil, err
}
return raw.(structs.CheckServiceNodes), nil
}
return endpoints, nil
}
func (s *Server) endpointsFromSnapshotTerminatingGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
return s.endpointsFromServicesAndResolvers(cfgSnap, cfgSnap.TerminatingGateway.ServiceGroups, cfgSnap.TerminatingGateway.ServiceResolvers)
}
func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
datacenters := cfgSnap.MeshGateway.Datacenters()
resources := make([]proto.Message, 0, len(datacenters)+len(cfgSnap.MeshGateway.ServiceGroups))
// generate the endpoints for the gateways in the remote datacenters
for _, dc := range datacenters {
// Skip creating endpoints for mesh gateways in local DC and gateways in remote DCs with a hostname as their address
// EDS cannot resolve hostnames so we provide them through CDS instead
if dc == cfgSnap.Datacenter || len(cfgSnap.MeshGateway.HostnameDatacenters[dc]) > 0 {
continue
}
endpoints, ok := cfgSnap.MeshGateway.GatewayGroups[dc]
if !ok {
endpoints, ok = cfgSnap.MeshGateway.FedStateGateways[dc]
if !ok { // not possible
s.Logger.Error("skipping mesh gateway endpoints because no definition found", "datacenter", dc)
continue
}
}
{ // standard connect
clusterName := connect.DatacenterSNI(dc, cfgSnap.Roots.TrustDomain)
la := makeLoadAssignment(
clusterName,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
cfgSnap.Datacenter,
)
resources = append(resources, la)
}
if cfgSnap.ServiceMeta[structs.MetaWANFederationKey] == "1" && cfgSnap.ServerSNIFn != nil {
clusterName := cfgSnap.ServerSNIFn(dc, "")
la := makeLoadAssignment(
clusterName,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
cfgSnap.Datacenter,
)
resources = append(resources, la)
}
}
// generate endpoints for our servers if WAN federation is enabled
if cfgSnap.ServiceMeta[structs.MetaWANFederationKey] == "1" && cfgSnap.ServerSNIFn != nil {
var allServersLbEndpoints []envoyendpoint.LbEndpoint
for _, srv := range cfgSnap.MeshGateway.ConsulServers {
clusterName := cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node)
addr, port := srv.BestAddress(false /*wan*/)
lbEndpoint := envoyendpoint.LbEndpoint{
HostIdentifier: &envoyendpoint.LbEndpoint_Endpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr(addr, port),
},
},
HealthStatus: envoycore.HealthStatus_UNKNOWN,
}
cla := &envoy.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
LbEndpoints: []envoyendpoint.LbEndpoint{lbEndpoint},
}},
}
allServersLbEndpoints = append(allServersLbEndpoints, lbEndpoint)
resources = append(resources, cla)
}
// And add one catch all so that remote datacenters can dial ANY server
// in this datacenter without knowing its name.
resources = append(resources, &envoy.ClusterLoadAssignment{
ClusterName: cfgSnap.ServerSNIFn(cfgSnap.Datacenter, ""),
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
LbEndpoints: allServersLbEndpoints,
}},
})
}
// Generate the endpoints for each service and its subsets
e, err := s.endpointsFromServicesAndResolvers(cfgSnap, cfgSnap.MeshGateway.ServiceGroups, cfgSnap.MeshGateway.ServiceResolvers)
if err != nil {
return nil, err
}
resources = append(resources, e...)
return resources, nil
}
func (s *Server) endpointsFromServicesAndResolvers(
cfgSnap *proxycfg.ConfigSnapshot,
services map[structs.ServiceID]structs.CheckServiceNodes,
resolvers map[structs.ServiceID]*structs.ServiceResolverConfigEntry) ([]proto.Message, error) {
resources := make([]proto.Message, 0, len(services))
// generate the endpoints for the linked service groups
for svc, endpoints := range services {
// Skip creating endpoints for services that have hostnames as addresses
// EDS cannot resolve hostnames so we provide them through CDS instead
if cfgSnap.Kind == structs.ServiceKindTerminatingGateway && len(cfgSnap.TerminatingGateway.HostnameServices[svc]) > 0 {
continue
}
clusterEndpoints := make(map[string][]loadAssignmentEndpointGroup)
clusterEndpoints[UnnamedSubset] = []loadAssignmentEndpointGroup{{Endpoints: endpoints, OnlyPassing: false}}
// Collect all of the loadAssignmentEndpointGroups for the various subsets. We do this before generating
// the endpoints for the default/unnamed subset so that we can take into account the DefaultSubset on the
// service-resolver which may prevent the default/unnamed cluster from creating endpoints for all service
// instances.
if resolver, hasResolver := resolvers[svc]; hasResolver {
for subsetName, subset := range resolver.Subsets {
subsetEndpoints, err := s.filterSubsetEndpoints(&subset, endpoints)
if err != nil {
return nil, err
}
groups := []loadAssignmentEndpointGroup{{Endpoints: subsetEndpoints, OnlyPassing: subset.OnlyPassing}}
clusterEndpoints[subsetName] = groups
// if this subset is the default then override the unnamed subset with this configuration
if subsetName == resolver.DefaultSubset {
clusterEndpoints[UnnamedSubset] = groups
}
}
}
// now generate the load assignment for all subsets
for subsetName, groups := range clusterEndpoints {
clusterName := connect.ServiceSNI(svc.ID, subsetName, svc.NamespaceOrDefault(), cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain)
la := makeLoadAssignment(
clusterName,
groups,
cfgSnap.Datacenter,
)
resources = append(resources, la)
}
}
return resources, nil
}
func (s *Server) endpointsFromSnapshotIngressGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
var resources []proto.Message
createdClusters := make(map[string]bool)
for _, upstreams := range cfgSnap.IngressGateway.Upstreams {
for _, u := range upstreams {
id := u.Identifier()
// If we've already created endpoints for this upstream, skip it. Multiple listeners may
// reference the same upstream, so we don't need to create duplicate endpoints in that case.
if createdClusters[id] {
continue
}
es := s.endpointsFromDiscoveryChain(
u,
cfgSnap.IngressGateway.DiscoveryChain[id],
cfgSnap.Datacenter,
cfgSnap.IngressGateway.WatchedUpstreamEndpoints[id],
cfgSnap.IngressGateway.WatchedGatewayEndpoints[id],
)
resources = append(resources, es...)
createdClusters[id] = true
}
}
return resources, nil
}
func makeEndpoint(clusterName, host string, port int) envoyendpoint.LbEndpoint {
return envoyendpoint.LbEndpoint{
HostIdentifier: &envoyendpoint.LbEndpoint_Endpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr(host, port),
},
},
}
}
func (s *Server) endpointsFromDiscoveryChain(
upstream structs.Upstream,
chain *structs.CompiledDiscoveryChain,
datacenter string,
upstreamEndpoints, gatewayEndpoints map[string]structs.CheckServiceNodes,
) []proto.Message {
var resources []proto.Message
if chain == nil {
return resources
}
cfg, err := ParseUpstreamConfigNoDefaults(upstream.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
s.Logger.Warn("failed to parse", "upstream", upstream.Identifier(),
"error", err)
}
var escapeHatchCluster *envoy.Cluster
if cfg.ClusterJSON != "" {
if chain.IsDefault() {
// If you haven't done anything to setup the discovery chain, then
// you can use the envoy_cluster_json escape hatch.
escapeHatchCluster, err = makeClusterFromUserConfig(cfg.ClusterJSON)
if err != nil {
return resources
}
} else {
s.Logger.Warn("ignoring escape hatch setting, because a discovery chain is configued for",
"discovery chain", chain.ServiceName, "upstream", upstream.Identifier(),
"envoy_cluster_json", chain.ServiceName)
}
}
// Find all resolver nodes.
for _, node := range chain.Nodes {
if node.Type != structs.DiscoveryGraphNodeTypeResolver {
continue
}
failover := node.Resolver.Failover
targetID := node.Resolver.Target
target := chain.Targets[targetID]
clusterName := CustomizeClusterName(target.Name, chain)
if escapeHatchCluster != nil {
clusterName = escapeHatchCluster.Name
}
s.Logger.Debug("generating endpoints for", "cluster", clusterName)
// Determine if we have to generate the entire cluster differently.
failoverThroughMeshGateway := chain.WillFailoverThroughMeshGateway(node)
if failoverThroughMeshGateway {
actualTargetID := firstHealthyTarget(
chain.Targets,
upstreamEndpoints,
targetID,
failover.Targets,
)
if actualTargetID != targetID {
targetID = actualTargetID
}
failover = nil
}
primaryGroup, valid := makeLoadAssignmentEndpointGroup(
chain.Targets,
upstreamEndpoints,
gatewayEndpoints,
targetID,
datacenter,
)
if !valid {
continue // skip the cluster if we're still populating the snapshot
}
var endpointGroups []loadAssignmentEndpointGroup
if failover != nil && len(failover.Targets) > 0 {
endpointGroups = make([]loadAssignmentEndpointGroup, 0, len(failover.Targets)+1)
endpointGroups = append(endpointGroups, primaryGroup)
for _, failTargetID := range failover.Targets {
failoverGroup, valid := makeLoadAssignmentEndpointGroup(
chain.Targets,
upstreamEndpoints,
gatewayEndpoints,
failTargetID,
datacenter,
)
if !valid {
continue // skip the failover target if we're still populating the snapshot
}
endpointGroups = append(endpointGroups, failoverGroup)
}
} else {
endpointGroups = append(endpointGroups, primaryGroup)
}
la := makeLoadAssignment(
clusterName,
endpointGroups,
datacenter,
)
resources = append(resources, la)
}
return resources
}
type loadAssignmentEndpointGroup struct {
Endpoints structs.CheckServiceNodes
OnlyPassing bool
OverrideHealth envoycore.HealthStatus
}
func makeLoadAssignment(clusterName string, endpointGroups []loadAssignmentEndpointGroup, localDatacenter string) *envoy.ClusterLoadAssignment {
cla := &envoy.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: make([]envoyendpoint.LocalityLbEndpoints, 0, len(endpointGroups)),
}
if len(endpointGroups) > 1 {
cla.Policy = &envoy.ClusterLoadAssignment_Policy{
// We choose such a large value here that the failover math should
// in effect not happen until zero instances are healthy.
OverprovisioningFactor: makeUint32Value(100000),
}
}
for priority, endpointGroup := range endpointGroups {
endpoints := endpointGroup.Endpoints
es := make([]envoyendpoint.LbEndpoint, 0, len(endpoints))
for _, ep := range endpoints {
// TODO (mesh-gateway) - should we respect the translate_wan_addrs configuration here or just always use the wan for cross-dc?
addr, port := ep.BestAddress(localDatacenter != ep.Node.Datacenter)
healthStatus, weight := calculateEndpointHealthAndWeight(ep, endpointGroup.OnlyPassing)
if endpointGroup.OverrideHealth != envoycore.HealthStatus_UNKNOWN {
healthStatus = endpointGroup.OverrideHealth
}
es = append(es, envoyendpoint.LbEndpoint{
HostIdentifier: &envoyendpoint.LbEndpoint_Endpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr(addr, port),
},
},
HealthStatus: healthStatus,
LoadBalancingWeight: makeUint32Value(weight),
})
}
cla.Endpoints = append(cla.Endpoints, envoyendpoint.LocalityLbEndpoints{
Priority: uint32(priority),
LbEndpoints: es,
})
}
return cla
}
func makeLoadAssignmentEndpointGroup(
targets map[string]*structs.DiscoveryTarget,
targetHealth map[string]structs.CheckServiceNodes,
gatewayHealth map[string]structs.CheckServiceNodes,
targetID string,
currentDatacenter string,
) (loadAssignmentEndpointGroup, bool) {
realEndpoints, ok := targetHealth[targetID]
if !ok {
// skip the cluster if we're still populating the snapshot
return loadAssignmentEndpointGroup{}, false
}
target := targets[targetID]
var gatewayDatacenter string
switch target.MeshGateway.Mode {
case structs.MeshGatewayModeRemote:
gatewayDatacenter = target.Datacenter
case structs.MeshGatewayModeLocal:
gatewayDatacenter = currentDatacenter
}
if gatewayDatacenter == "" {
return loadAssignmentEndpointGroup{
Endpoints: realEndpoints,
OnlyPassing: target.Subset.OnlyPassing,
}, true
}
// If using a mesh gateway we need to pull those endpoints instead.
gatewayEndpoints, ok := gatewayHealth[gatewayDatacenter]
if !ok {
// skip the cluster if we're still populating the snapshot
return loadAssignmentEndpointGroup{}, false
}
// But we will use the health from the actual backend service.
overallHealth := envoycore.HealthStatus_UNHEALTHY
for _, ep := range realEndpoints {
health, _ := calculateEndpointHealthAndWeight(ep, target.Subset.OnlyPassing)
if health == envoycore.HealthStatus_HEALTHY {
overallHealth = envoycore.HealthStatus_HEALTHY
break
}
}
return loadAssignmentEndpointGroup{
Endpoints: gatewayEndpoints,
OverrideHealth: overallHealth,
}, true
}
func calculateEndpointHealthAndWeight(
ep structs.CheckServiceNode,
onlyPassing bool,
) (envoycore.HealthStatus, int) {
healthStatus := envoycore.HealthStatus_HEALTHY
weight := 1
if ep.Service.Weights != nil {
weight = ep.Service.Weights.Passing
}
for _, chk := range ep.Checks {
if chk.Status == api.HealthCritical {
healthStatus = envoycore.HealthStatus_UNHEALTHY
}
if onlyPassing && chk.Status != api.HealthPassing {
healthStatus = envoycore.HealthStatus_UNHEALTHY
}
if chk.Status == api.HealthWarning && ep.Service.Weights != nil {
weight = ep.Service.Weights.Warning
}
}
// Make weights fit Envoy's limits. A zero weight means that either Warning
// (likely) or Passing (weirdly) weight has been set to 0 effectively making
// this instance unhealthy and should not be sent traffic.
if weight < 1 {
healthStatus = envoycore.HealthStatus_UNHEALTHY
weight = 1
}
if weight > 128 {
weight = 128
}
return healthStatus, weight
}