NET-5073 - ProxyConfiguration: implement various connection options (#19187)

* NET-5073 - ProxyConfiguration: implement various connection options

* PR feedback - LocalConnection and InboundConnection do not affect exposed routes. configure L7 route destinations. fix connection proto sequence numbers.

* add timeout to L7 Route Destinations
This commit is contained in:
John Murret 2023-10-14 07:54:08 -06:00 committed by GitHub
parent 105ebfdd00
commit a7fbd00865
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 445 additions and 28 deletions

View File

@ -22,7 +22,7 @@ func (b *Builder) buildExposePaths(workload *pbcatalog.Workload) {
buildListener()
b.addExposePathsRoute(exposePath, clusterName).
addLocalAppCluster(clusterName).
addLocalAppCluster(clusterName, nil).
addLocalAppStaticEndpoints(clusterName, exposePath.LocalPathPort)
}
}

View File

@ -6,6 +6,9 @@ package builder
import (
"fmt"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
"google.golang.org/protobuf/types/known/wrapperspb"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/envoyextensions/xdscommon"
pbauth "github.com/hashicorp/consul/proto-public/pbauth/v2beta1"
@ -31,13 +34,13 @@ func (b *Builder) BuildLocalApp(workload *pbcatalog.Workload, ctp *pbauth.Comput
if port.Protocol != pbcatalog.Protocol_PROTOCOL_MESH {
foundInboundNonMeshPorts = true
lb.addInboundRouter(clusterName, routeName, port, portName, trafficPermissions[portName]).
lb.addInboundRouter(clusterName, routeName, port, portName, trafficPermissions[portName], b.proxyCfg.GetDynamicConfig().GetInboundConnections()).
addInboundTLS()
if isL7(port.Protocol) {
b.addLocalAppRoute(routeName, clusterName)
b.addLocalAppRoute(routeName, clusterName, portName)
}
b.addLocalAppCluster(clusterName).
b.addLocalAppCluster(clusterName, &portName).
addLocalAppStaticEndpoints(clusterName, port.GetPort())
}
}
@ -264,10 +267,16 @@ func (b *Builder) addInboundListener(name string, workload *pbcatalog.Workload)
// Add TLS inspection capability to be able to parse ALPN and/or SNI information from inbound connections.
listener.Capabilities = append(listener.Capabilities, pbproxystate.Capability_CAPABILITY_L4_TLS_INSPECTION)
if b.proxyCfg.GetDynamicConfig() != nil && b.proxyCfg.GetDynamicConfig().InboundConnections != nil {
listener.BalanceConnections = pbproxystate.BalanceConnections(b.proxyCfg.DynamicConfig.InboundConnections.BalanceInboundConnections)
}
return b.NewListenerBuilder(listener)
}
func (l *ListenerBuilder) addInboundRouter(clusterName string, routeName string, port *pbcatalog.WorkloadPort, portName string, tp *pbproxystate.TrafficPermissions) *ListenerBuilder {
func (l *ListenerBuilder) addInboundRouter(clusterName string, routeName string,
port *pbcatalog.WorkloadPort, portName string, tp *pbproxystate.TrafficPermissions,
ic *pbmesh.InboundConnectionsConfig) *ListenerBuilder {
if l.listener == nil {
return l
}
@ -289,6 +298,15 @@ func (l *ListenerBuilder) addInboundRouter(clusterName string, routeName string,
AlpnProtocols: []string{getAlpnProtocolFromPortName(portName)},
},
}
if ic != nil {
// MaxInboundConnections is uint32 that is used on:
// - router destinations MaxInboundConnection (uint64).
// - cluster circuit breakers UpstreamLimits.MaxConnections (uint32).
// It is cast to a uint64 here similarly as it is to the proxystateconverter code.
r.GetL4().MaxInboundConnections = uint64(ic.MaxInboundConnections)
}
l.listener.Routers = append(l.listener.Routers, r)
} else if isL7(port.Protocol) {
r := &pbproxystate.Router{
@ -308,6 +326,13 @@ func (l *ListenerBuilder) addInboundRouter(clusterName string, routeName string,
AlpnProtocols: []string{getAlpnProtocolFromPortName(portName)},
},
}
if ic != nil {
// MaxInboundConnections is cast to a uint64 here similarly as it is to the
// as the L4 case statement above and in proxystateconverter code.
r.GetL7().MaxInboundConnections = uint64(ic.MaxInboundConnections)
}
l.listener.Routers = append(l.listener.Routers, r)
}
return l
@ -339,7 +364,7 @@ func getAlpnProtocolFromPortName(portName string) string {
return fmt.Sprintf("consul~%s", portName)
}
func (b *Builder) addLocalAppRoute(routeName string, clusterName string) {
func (b *Builder) addLocalAppRoute(routeName, clusterName, portName string) {
proxyRouteRule := &pbproxystate.RouteRule{
Match: &pbproxystate.RouteMatch{
PathMatch: &pbproxystate.PathMatch{
@ -356,6 +381,18 @@ func (b *Builder) addLocalAppRoute(routeName string, clusterName string) {
},
},
}
if b.proxyCfg.GetDynamicConfig() != nil && b.proxyCfg.GetDynamicConfig().LocalConnection != nil {
lc, lcOK := b.proxyCfg.GetDynamicConfig().LocalConnection[portName]
if lcOK {
proxyRouteRule.Destination.DestinationConfiguration =
&pbproxystate.DestinationConfiguration{
TimeoutConfig: &pbproxystate.TimeoutConfig{
Timeout: lc.RequestTimeout,
},
}
}
}
// Each route name for the local app is listenerName:port since there is a route per port on the local app listener.
b.addRoute(routeName, &pbproxystate.Route{
VirtualHosts: []*pbproxystate.VirtualHost{{
@ -373,9 +410,9 @@ func isL7(protocol pbcatalog.Protocol) bool {
return false
}
func (b *Builder) addLocalAppCluster(clusterName string) *Builder {
func (b *Builder) addLocalAppCluster(clusterName string, portName *string) *Builder {
// Make cluster for this router destination.
b.proxyStateTemplate.ProxyState.Clusters[clusterName] = &pbproxystate.Cluster{
cluster := &pbproxystate.Cluster{
Group: &pbproxystate.Cluster_EndpointGroup{
EndpointGroup: &pbproxystate.EndpointGroup{
Group: &pbproxystate.EndpointGroup_Static{
@ -384,20 +421,34 @@ func (b *Builder) addLocalAppCluster(clusterName string) *Builder {
},
},
}
// configure inbound connections or connection timeout if either is defined
if b.proxyCfg.GetDynamicConfig() != nil && portName != nil {
lc, lcOK := b.proxyCfg.DynamicConfig.LocalConnection[*portName]
if lcOK || b.proxyCfg.DynamicConfig.InboundConnections != nil {
cluster.GetEndpointGroup().GetStatic().Config = &pbproxystate.StaticEndpointGroupConfig{}
if lcOK {
cluster.GetEndpointGroup().GetStatic().GetConfig().ConnectTimeout = lc.ConnectTimeout
}
if b.proxyCfg.DynamicConfig.InboundConnections != nil {
cluster.GetEndpointGroup().GetStatic().GetConfig().CircuitBreakers = &pbproxystate.CircuitBreakers{
UpstreamLimits: &pbproxystate.UpstreamLimits{
MaxConnections: &wrapperspb.UInt32Value{Value: b.proxyCfg.DynamicConfig.InboundConnections.MaxInboundConnections},
},
}
}
}
}
b.proxyStateTemplate.ProxyState.Clusters[clusterName] = cluster
return b
}
func (b *Builder) addBlackHoleCluster() *Builder {
b.proxyStateTemplate.ProxyState.Clusters[xdscommon.BlackHoleClusterName] = &pbproxystate.Cluster{
Group: &pbproxystate.Cluster_EndpointGroup{
EndpointGroup: &pbproxystate.EndpointGroup{
Group: &pbproxystate.EndpointGroup_Static{
Static: &pbproxystate.StaticEndpointGroup{},
},
},
},
}
return b
return b.addLocalAppCluster(xdscommon.BlackHoleClusterName, nil)
}
func (b *Builder) addLocalAppStaticEndpoints(clusterName string, port uint32) {

View File

@ -4,7 +4,10 @@
package builder
import (
"google.golang.org/protobuf/types/known/durationpb"
"sort"
"testing"
"time"
"github.com/stretchr/testify/require"
@ -139,6 +142,55 @@ func TestBuildLocalApp_WithProxyConfiguration(t *testing.T) {
},
},
},
// source/local-and-inbound-connections shows that configuring LocalCOnnection
// and InboundConnections in DynamicConfig will set fields on standard clusters and routes,
// but will not set fields on exposed path clusters and routes.
"source/local-and-inbound-connections": {
workload: &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{
{
Host: "10.0.0.1",
},
},
Ports: map[string]*pbcatalog.WorkloadPort{
"port1": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP},
"port2": {Port: 20000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH},
"port3": {Port: 8081, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
},
proxyCfg: &pbmesh.ComputedProxyConfiguration{
DynamicConfig: &pbmesh.DynamicConfig{
LocalConnection: map[string]*pbmesh.ConnectionConfig{
"port1": {
ConnectTimeout: durationpb.New(6 * time.Second),
RequestTimeout: durationpb.New(7 * time.Second)},
"port3": {
ConnectTimeout: durationpb.New(8 * time.Second),
RequestTimeout: durationpb.New(9 * time.Second)},
},
InboundConnections: &pbmesh.InboundConnectionsConfig{
MaxInboundConnections: 123,
BalanceInboundConnections: pbmesh.BalanceConnections(pbproxystate.BalanceConnections_BALANCE_CONNECTIONS_EXACT),
},
ExposeConfig: &pbmesh.ExposeConfig{
ExposePaths: []*pbmesh.ExposePath{
{
ListenerPort: 1234,
Path: "/health",
LocalPathPort: 9090,
Protocol: pbmesh.ExposePathProtocol_EXPOSE_PATH_PROTOCOL_HTTP,
},
{
ListenerPort: 1235,
Path: "GetHealth",
LocalPathPort: 9091,
Protocol: pbmesh.ExposePathProtocol_EXPOSE_PATH_PROTOCOL_HTTP2,
},
},
},
},
},
},
}
for name, c := range cases {
@ -146,10 +198,24 @@ func TestBuildLocalApp_WithProxyConfiguration(t *testing.T) {
proxyTmpl := New(testProxyStateTemplateID(), testIdentityRef(), "foo.consul", "dc1", true, c.proxyCfg).
BuildLocalApp(c.workload, nil).
Build()
actual := protoToJSON(t, proxyTmpl)
expected := golden.Get(t, actual, name+".golden")
require.JSONEq(t, expected, actual)
// sort routers because of test flakes where order was flip flopping.
actualRouters := proxyTmpl.ProxyState.Listeners[0].Routers
sort.Slice(actualRouters, func(i, j int) bool {
return actualRouters[i].String() < actualRouters[j].String()
})
actual := protoToJSON(t, proxyTmpl)
expected := JSONToProxyTemplate(t, golden.GetBytes(t, actual, name+".golden"))
// sort routers on listener from golden file
expectedRouters := expected.ProxyState.Listeners[0].Routers
sort.Slice(expectedRouters, func(i, j int) bool {
return expectedRouters[i].String() < expectedRouters[j].String()
})
// convert back to json after sorting so that test output does not contain extraneous fields.
require.Equal(t, protoToJSON(t, expected), protoToJSON(t, proxyTmpl))
})
}
}

View File

@ -0,0 +1,300 @@
{
"proxyState": {
"clusters": {
"exposed_cluster_9090": {
"endpointGroup": {
"static": {}
},
"name": "exposed_cluster_9090"
},
"exposed_cluster_9091": {
"endpointGroup": {
"static": {}
},
"name": "exposed_cluster_9091"
},
"local_app:port1": {
"endpointGroup": {
"static": {
"config": {
"connectTimeout": "6s",
"circuitBreakers": {
"upstreamLimits": {
"maxConnections": 123
}
}
}
}
},
"name": "local_app:port1"
},
"local_app:port3": {
"endpointGroup": {
"static": {
"config": {
"connectTimeout": "8s",
"circuitBreakers": {
"upstreamLimits": {
"maxConnections": 123
}
}
}
}
},
"name": "local_app:port3"
}
},
"endpoints": {
"exposed_cluster_9090": {
"endpoints": [
{
"hostPort": {
"host": "127.0.0.1",
"port": 9090
}
}
]
},
"exposed_cluster_9091": {
"endpoints": [
{
"hostPort": {
"host": "127.0.0.1",
"port": 9091
}
}
]
},
"local_app:port1": {
"endpoints": [
{
"hostPort": {
"host": "127.0.0.1",
"port": 8080
}
}
]
},
"local_app:port3": {
"endpoints": [
{
"hostPort": {
"host": "127.0.0.1",
"port": 8081
}
}
]
}
},
"identity": {
"name": "test-identity",
"tenancy": {
"namespace": "default",
"partition": "default",
"peerName": "local"
},
"type": {
"group": "auth",
"groupVersion": "v2beta1",
"kind": "WorkloadIdentity"
}
},
"listeners": [
{
"capabilities": [
"CAPABILITY_L4_TLS_INSPECTION"
],
"direction": "DIRECTION_INBOUND",
"hostPort": {
"host": "10.0.0.1",
"port": 20000
},
"name": "public_listener",
"balanceConnections": "BALANCE_CONNECTIONS_EXACT",
"routers": [
{
"inboundTls": {
"inboundMesh": {
"identityKey": "test-identity",
"validationContext": {
"trustBundlePeerNameKeys": [
"local"
]
}
}
},
"l4": {
"cluster": {
"name": "local_app:port1"
},
"maxInboundConnections": 123,
"statPrefix": "public_listener",
"trafficPermissions": {}
},
"match": {
"alpnProtocols": [
"consul~port1"
]
}
},
{
"inboundTls": {
"inboundMesh": {
"identityKey": "test-identity",
"validationContext": {
"trustBundlePeerNameKeys": [
"local"
]
}
}
},
"l7": {
"route": {
"name": "public_listener:port3"
},
"maxInboundConnections": 123,
"statPrefix": "public_listener",
"staticRoute": true,
"trafficPermissions": {}
},
"match": {
"alpnProtocols": [
"consul~port3"
]
}
}
]
},
{
"direction": "DIRECTION_INBOUND",
"hostPort": {
"host": "10.0.0.1",
"port": 9090
},
"name": "exposed_path_health",
"routers": [
{
"l7": {
"route": {
"name": "exposed_path_filter_health_1234"
},
"statPrefix": "exposed_path_filter_health_1234",
"staticRoute": true
}
}
]
},
{
"direction": "DIRECTION_INBOUND",
"hostPort": {
"host": "10.0.0.1",
"port": 9091
},
"name": "exposed_path_GetHealth",
"routers": [
{
"l7": {
"protocol": "L7_PROTOCOL_HTTP2",
"route": {
"name": "exposed_path_filter_GetHealth_1235"
},
"statPrefix": "exposed_path_filter_GetHealth_1235",
"staticRoute": true
}
}
]
}
],
"routes": {
"exposed_path_filter_GetHealth_1235": {
"virtualHosts": [
{
"domains": [
"*"
],
"name": "exposed_path_filter_GetHealth_1235",
"routeRules": [
{
"destination": {
"cluster": {
"name": "exposed_cluster_9091"
}
},
"match": {
"pathMatch": {
"exact": "GetHealth"
}
}
}
]
}
]
},
"exposed_path_filter_health_1234": {
"virtualHosts": [
{
"domains": [
"*"
],
"name": "exposed_path_filter_health_1234",
"routeRules": [
{
"destination": {
"cluster": {
"name": "exposed_cluster_9090"
}
},
"match": {
"pathMatch": {
"exact": "/health"
}
}
}
]
}
]
},
"public_listener:port3": {
"virtualHosts": [
{
"domains": [
"*"
],
"name": "public_listener:port3",
"routeRules": [
{
"destination": {
"cluster": {
"name": "local_app:port3"
},
"destinationConfiguration": {
"timeoutConfig": {
"timeout": "9s"
}
}
},
"match": {
"pathMatch": {
"prefix": "/"
}
}
}
]
}
]
}
}
},
"requiredLeafCertificates": {
"test-identity": {
"name": "test-identity",
"namespace": "default",
"partition": "default"
}
},
"requiredTrustBundles": {
"local": {
"peer": "local"
}
}
}

View File

@ -137,8 +137,8 @@ type InboundConnectionsConfig struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
MaxInboundConnections uint64 `protobuf:"varint,12,opt,name=max_inbound_connections,json=maxInboundConnections,proto3" json:"max_inbound_connections,omitempty"`
BalanceInboundConnections BalanceConnections `protobuf:"varint,13,opt,name=balance_inbound_connections,json=balanceInboundConnections,proto3,enum=hashicorp.consul.mesh.v2beta1.BalanceConnections" json:"balance_inbound_connections,omitempty"`
MaxInboundConnections uint32 `protobuf:"varint,1,opt,name=max_inbound_connections,json=maxInboundConnections,proto3" json:"max_inbound_connections,omitempty"`
BalanceInboundConnections BalanceConnections `protobuf:"varint,2,opt,name=balance_inbound_connections,json=balanceInboundConnections,proto3,enum=hashicorp.consul.mesh.v2beta1.BalanceConnections" json:"balance_inbound_connections,omitempty"`
}
func (x *InboundConnectionsConfig) Reset() {
@ -173,7 +173,7 @@ func (*InboundConnectionsConfig) Descriptor() ([]byte, []int) {
return file_pbmesh_v2beta1_connection_proto_rawDescGZIP(), []int{1}
}
func (x *InboundConnectionsConfig) GetMaxInboundConnections() uint64 {
func (x *InboundConnectionsConfig) GetMaxInboundConnections() uint32 {
if x != nil {
return x.MaxInboundConnections
}
@ -209,11 +209,11 @@ var file_pbmesh_v2beta1_connection_proto_rawDesc = []byte{
0x0a, 0x18, 0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74,
0x69, 0x6f, 0x6e, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x36, 0x0a, 0x17, 0x6d, 0x61,
0x78, 0x5f, 0x69, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x5f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63,
0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x04, 0x52, 0x15, 0x6d, 0x61, 0x78,
0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x15, 0x6d, 0x61, 0x78,
0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f,
0x6e, 0x73, 0x12, 0x71, 0x0a, 0x1b, 0x62, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x5f, 0x69, 0x6e,
0x62, 0x6f, 0x75, 0x6e, 0x64, 0x5f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e,
0x73, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x31, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63,
0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x31, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63,
0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e,
0x76, 0x32, 0x62, 0x65, 0x74, 0x61, 0x31, 0x2e, 0x42, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x43,
0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x19, 0x62, 0x61, 0x6c, 0x61,

View File

@ -17,8 +17,8 @@ message ConnectionConfig {
// Referenced by ProxyConfiguration
message InboundConnectionsConfig {
uint64 max_inbound_connections = 12;
BalanceConnections balance_inbound_connections = 13;
uint32 max_inbound_connections = 1;
BalanceConnections balance_inbound_connections = 2;
}
// +kubebuilder:validation:Enum=BALANCE_CONNECTIONS_DEFAULT;BALANCE_CONNECTIONS_EXACT