Update xds generation for peering over mesh gws

This commit adds the xDS resources needed for INBOUND traffic from peer
clusters:

- 1 filter chain for all inbound peering requests.
- 1 cluster for all inbound peering requests.
- 1 endpoint per voting server with the gRPC TLS port configured.

There is one filter chain and cluster because unlike with WAN
federation, peer clusters will not attempt to dial individual servers.
Peer clusters will only dial the local mesh gateway addresses.
This commit is contained in:
freddygv 2022-09-22 21:14:25 -06:00
parent d818d7b096
commit b15d41534f
13 changed files with 387 additions and 4 deletions

View File

@ -494,7 +494,7 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
return nil return nil
} }
if meshConf.Peering == nil || !meshConf.Peering.PeerThroughMeshGateways { if !meshConf.PeerThroughMeshGateways() {
snap.MeshGateway.WatchedConsulServers.CancelWatch(structs.ConsulServiceName) snap.MeshGateway.WatchedConsulServers.CancelWatch(structs.ConsulServiceName)
return nil return nil
} }

View File

@ -697,7 +697,7 @@ func (s *ConfigSnapshot) Valid() bool {
if s.ServiceMeta[structs.MetaWANFederationKey] == "1" { if s.ServiceMeta[structs.MetaWANFederationKey] == "1" {
return false return false
} }
if cfg := s.MeshConfig(); cfg != nil && cfg.Peering != nil && cfg.Peering.PeerThroughMeshGateways { if cfg := s.MeshConfig(); cfg.PeerThroughMeshGateways() {
return false return false
} }
} }

View File

@ -476,6 +476,106 @@ func TestConfigSnapshotPeeredMeshGateway(t testing.T, variant string, nsFn func(
) )
switch variant { switch variant {
case "control-plane":
extraUpdates = append(extraUpdates,
UpdateEvent{
CorrelationID: meshConfigEntryID,
Result: &structs.ConfigEntryResponse{
Entry: &structs.MeshConfigEntry{
Peering: &structs.PeeringMeshConfig{
PeerThroughMeshGateways: true,
},
},
},
},
UpdateEvent{
CorrelationID: consulServerListWatchID,
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "replica",
Address: "127.0.0.10",
},
Service: &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
// Read replicas cannot handle peering requests.
Meta: map[string]string{"read_replica": "true"},
},
},
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "node1",
Address: "127.0.0.1",
},
Service: &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
Meta: map[string]string{
"grpc_port": "8502",
"grpc_tls_port": "8503",
},
},
},
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "node2",
Address: "127.0.0.2",
},
Service: &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
Meta: map[string]string{
"grpc_port": "8502",
"grpc_tls_port": "8503",
},
TaggedAddresses: map[string]structs.ServiceAddress{
// WAN address is not considered for traffic from local gateway to local servers.
structs.TaggedAddressWAN: {
Address: "consul.server.dc1.my-domain",
Port: 10101,
},
},
},
},
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "node3",
Address: "127.0.0.3",
},
Service: &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
Meta: map[string]string{
// Peering is not allowed over deprecated non-TLS gRPC port.
"grpc_port": "8502",
},
},
},
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "node4",
Address: "127.0.0.4",
},
Service: &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
Meta: map[string]string{
// Must have valid gRPC port.
"grpc_tls_port": "bad",
},
},
},
},
},
},
)
case "default-services-http": case "default-services-http":
proxyDefaults := &structs.ProxyConfigEntry{ proxyDefaults := &structs.ProxyConfigEntry{
Config: map[string]interface{}{ Config: map[string]interface{}{

View File

@ -155,6 +155,13 @@ func (e *MeshConfigEntry) MarshalJSON() ([]byte, error) {
return json.Marshal(source) return json.Marshal(source)
} }
func (e *MeshConfigEntry) PeerThroughMeshGateways() bool {
if e == nil || e.Peering == nil {
return false
}
return e.Peering.PeerThroughMeshGateways
}
func validateMeshDirectionalTLSConfig(cfg *MeshDirectionalTLSConfig) error { func validateMeshDirectionalTLSConfig(cfg *MeshDirectionalTLSConfig) error {
if cfg == nil { if cfg == nil {
return nil return nil

View File

@ -0,0 +1,46 @@
package structs
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestMeshConfigEntry_PeerThroughMeshGateways(t *testing.T) {
tests := map[string]struct {
input *MeshConfigEntry
want bool
}{
"nil entry": {
input: nil,
want: false,
},
"nil peering config": {
input: &MeshConfigEntry{
Peering: nil,
},
want: false,
},
"not peering through gateways": {
input: &MeshConfigEntry{
Peering: &PeeringMeshConfig{
PeerThroughMeshGateways: false,
},
},
want: false,
},
"peering through gateways": {
input: &MeshConfigEntry{
Peering: &PeeringMeshConfig{
PeerThroughMeshGateways: true,
},
},
want: true,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
assert.Equalf(t, tc.want, tc.input.PeerThroughMeshGateways(), "PeerThroughMeshGateways()")
})
}
}

View File

@ -396,6 +396,21 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
} }
} }
// Create a single cluster for local servers to be dialed by peers.
// When peering through gateways we load balance across the local servers. They cannot be addressed individually.
if cfg := cfgSnap.MeshConfig(); cfg.PeerThroughMeshGateways() {
servers, _ := cfgSnap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
// Peering control-plane traffic can only ever be handled by the local leader.
// We avoid routing to read replicas since they will never be Raft voters.
if haveVoters(servers) {
cluster := s.makeGatewayCluster(cfgSnap, clusterOpts{
name: connect.PeeringServerSAN(cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain),
})
clusters = append(clusters, cluster)
}
}
// generate the per-service/subset clusters // generate the per-service/subset clusters
c, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.MeshGateway.ServiceGroups, cfgSnap.MeshGateway.ServiceResolvers) c, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.MeshGateway.ServiceGroups, cfgSnap.MeshGateway.ServiceResolvers)
if err != nil { if err != nil {
@ -413,6 +428,16 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
return clusters, nil return clusters, nil
} }
func haveVoters(servers structs.CheckServiceNodes) bool {
for _, srv := range servers {
if isReplica := srv.Service.Meta["read_replica"]; isReplica == "true" {
continue
}
return true
}
return false
}
// clustersFromSnapshotTerminatingGateway returns the xDS API representation of the "clusters" // clustersFromSnapshotTerminatingGateway returns the xDS API representation of the "clusters"
// for a terminating gateway. This will include 1 cluster per Destination associated with this terminating gateway. // for a terminating gateway. This will include 1 cluster per Destination associated with this terminating gateway.
func (s *ResourceGenerator) clustersFromSnapshotTerminatingGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) { func (s *ResourceGenerator) clustersFromSnapshotTerminatingGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {

View File

@ -3,11 +3,11 @@ package xds
import ( import (
"errors" "errors"
"fmt" "fmt"
"strconv"
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
bexpr "github.com/hashicorp/go-bexpr" bexpr "github.com/hashicorp/go-bexpr"
@ -285,6 +285,50 @@ func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.C
}) })
} }
// Create endpoints for the cluster where local servers will be dialed by peers.
// When peering through gateways we load balance across the local servers. They cannot be addressed individually.
if cfg := cfgSnap.MeshConfig(); cfg.PeerThroughMeshGateways() {
var serverEndpoints []*envoy_endpoint_v3.LbEndpoint
servers, _ := cfgSnap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
for _, srv := range servers {
if isReplica := srv.Service.Meta["read_replica"]; isReplica == "true" {
// Peering control-plane traffic can only ever be handled by the local leader.
// We avoid routing to read replicas since they will never be Raft voters.
continue
}
_, addr, _ := srv.BestAddress(false)
portStr, ok := srv.Service.Meta["grpc_tls_port"]
if !ok {
s.Logger.Warn("peering is enabled but local server %q does not have the required gRPC TLS port configured",
"server", srv.Node.Node)
continue
}
port, err := strconv.Atoi(portStr)
if err != nil {
s.Logger.Error("peering is enabled but local server has invalid gRPC TLS port",
"server", srv.Node.Node, "port", portStr, "error", err)
continue
}
serverEndpoints = append(serverEndpoints, &envoy_endpoint_v3.LbEndpoint{
HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{
Endpoint: &envoy_endpoint_v3.Endpoint{
Address: makeAddress(addr, port),
},
},
})
}
resources = append(resources, &envoy_endpoint_v3.ClusterLoadAssignment{
ClusterName: connect.PeeringServerSAN(cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain),
Endpoints: []*envoy_endpoint_v3.LocalityLbEndpoints{{
LbEndpoints: serverEndpoints,
}},
})
}
// Generate the endpoints for each service and its subsets // Generate the endpoints for each service and its subsets
e, err := s.endpointsFromServicesAndResolvers(cfgSnap, cfgSnap.MeshGateway.ServiceGroups, cfgSnap.MeshGateway.ServiceResolvers) e, err := s.endpointsFromServicesAndResolvers(cfgSnap, cfgSnap.MeshGateway.ServiceGroups, cfgSnap.MeshGateway.ServiceResolvers)
if err != nil { if err != nil {

View File

@ -1751,7 +1751,7 @@ func (s *ResourceGenerator) makeMeshGatewayListener(name, addr string, port int,
l.FilterChains = append(l.FilterChains, &envoy_listener_v3.FilterChain{ l.FilterChains = append(l.FilterChains, &envoy_listener_v3.FilterChain{
FilterChainMatch: &envoy_listener_v3.FilterChainMatch{ FilterChainMatch: &envoy_listener_v3.FilterChainMatch{
ServerNames: []string{fmt.Sprintf("%s", clusterName)}, ServerNames: []string{clusterName},
}, },
Filters: []*envoy_listener_v3.Filter{ Filters: []*envoy_listener_v3.Filter{
dcTCPProxy, dcTCPProxy,
@ -1760,6 +1760,33 @@ func (s *ResourceGenerator) makeMeshGatewayListener(name, addr string, port int,
} }
} }
// Create a single cluster for local servers to be dialed by peers.
// When peering through gateways we load balance across the local servers. They cannot be addressed individually.
if cfg := cfgSnap.MeshConfig(); cfg.PeerThroughMeshGateways() {
servers, _ := cfgSnap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
// Peering control-plane traffic can only ever be handled by the local leader.
// We avoid routing to read replicas since they will never be Raft voters.
if haveVoters(servers) {
clusterName := connect.PeeringServerSAN(cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain)
filterName := fmt.Sprintf("%s.%s", name, cfgSnap.Datacenter)
filter, err := makeTCPProxyFilter(filterName, clusterName, "mesh_gateway_local_peering_server.")
if err != nil {
return nil, err
}
l.FilterChains = append(l.FilterChains, &envoy_listener_v3.FilterChain{
FilterChainMatch: &envoy_listener_v3.FilterChainMatch{
ServerNames: []string{clusterName},
},
Filters: []*envoy_listener_v3.Filter{
filter,
},
})
}
}
// This needs to get tacked on at the end as it has no // This needs to get tacked on at the end as it has no
// matching and will act as a catch all // matching and will act as a catch all
l.FilterChains = append(l.FilterChains, sniClusterChain) l.FilterChains = append(l.FilterChains, sniClusterChain)

View File

@ -215,6 +215,12 @@ func getMeshGatewayPeeringGoldenTestCases() []goldenTestCase {
return proxycfg.TestConfigSnapshotPeeredMeshGateway(t, "chain-and-l7-stuff", nil, nil) return proxycfg.TestConfigSnapshotPeeredMeshGateway(t, "chain-and-l7-stuff", nil, nil)
}, },
}, },
{
name: "mesh-gateway-peering-control-plane",
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshotPeeredMeshGateway(t, "control-plane", nil, nil)
},
},
} }
} }

View File

@ -0,0 +1,24 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"name": "server.dc1.peering.11111111-2222-3333-4444-555555555555.consul",
"type": "EDS",
"edsClusterConfig": {
"edsConfig": {
"ads": {
},
"resourceApiVersion": "V3"
}
},
"connectTimeout": "5s",
"outlierDetection": {
}
}
],
"typeUrl": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"nonce": "00000001"
}

View File

@ -0,0 +1,37 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
"clusterName": "server.dc1.peering.11111111-2222-3333-4444-555555555555.consul",
"endpoints": [
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socketAddress": {
"address": "127.0.0.1",
"portValue": 8503
}
}
}
},
{
"endpoint": {
"address": {
"socketAddress": {
"address": "127.0.0.2",
"portValue": 8503
}
}
}
}
]
}
]
}
],
"typeUrl": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
"nonce": "00000001"
}

View File

@ -0,0 +1,62 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.config.listener.v3.Listener",
"name": "default:1.2.3.4:8443",
"address": {
"socketAddress": {
"address": "1.2.3.4",
"portValue": 8443
}
},
"filterChains": [
{
"filterChainMatch": {
"serverNames": [
"server.dc1.peering.11111111-2222-3333-4444-555555555555.consul"
]
},
"filters": [
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "mesh_gateway_local_peering_server.default.dc1",
"cluster": "server.dc1.peering.11111111-2222-3333-4444-555555555555.consul"
}
}
]
},
{
"filters": [
{
"name": "envoy.filters.network.sni_cluster",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.sni_cluster.v3.SniCluster"
}
},
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "mesh_gateway_local.default",
"cluster": ""
}
}
]
}
],
"listenerFilters": [
{
"name": "envoy.filters.listener.tls_inspector",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.listener.tls_inspector.v3.TlsInspector"
}
}
]
}
],
"typeUrl": "type.googleapis.com/envoy.config.listener.v3.Listener",
"nonce": "00000001"
}

View File

@ -0,0 +1,5 @@
{
"versionInfo": "00000001",
"typeUrl": "type.googleapis.com/envoy.config.route.v3.RouteConfiguration",
"nonce": "00000001"
}