diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/expose_paths.go b/internal/mesh/internal/controllers/sidecarproxy/builder/expose_paths.go index e5baaa2cba..6215ab540c 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/builder/expose_paths.go +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/expose_paths.go @@ -22,7 +22,7 @@ func (b *Builder) buildExposePaths(workload *pbcatalog.Workload) { buildListener() b.addExposePathsRoute(exposePath, clusterName). - addLocalAppCluster(clusterName). + addLocalAppCluster(clusterName, nil). addLocalAppStaticEndpoints(clusterName, exposePath.LocalPathPort) } } diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/local_app.go b/internal/mesh/internal/controllers/sidecarproxy/builder/local_app.go index e6ba896b68..54ea6db578 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/builder/local_app.go +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/local_app.go @@ -6,6 +6,9 @@ package builder import ( "fmt" + pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1" + "google.golang.org/protobuf/types/known/wrapperspb" + "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/envoyextensions/xdscommon" pbauth "github.com/hashicorp/consul/proto-public/pbauth/v2beta1" @@ -31,13 +34,13 @@ func (b *Builder) BuildLocalApp(workload *pbcatalog.Workload, ctp *pbauth.Comput if port.Protocol != pbcatalog.Protocol_PROTOCOL_MESH { foundInboundNonMeshPorts = true - lb.addInboundRouter(clusterName, routeName, port, portName, trafficPermissions[portName]). + lb.addInboundRouter(clusterName, routeName, port, portName, trafficPermissions[portName], b.proxyCfg.GetDynamicConfig().GetInboundConnections()). addInboundTLS() if isL7(port.Protocol) { - b.addLocalAppRoute(routeName, clusterName) + b.addLocalAppRoute(routeName, clusterName, portName) } - b.addLocalAppCluster(clusterName). + b.addLocalAppCluster(clusterName, &portName). addLocalAppStaticEndpoints(clusterName, port.GetPort()) } } @@ -264,10 +267,16 @@ func (b *Builder) addInboundListener(name string, workload *pbcatalog.Workload) // Add TLS inspection capability to be able to parse ALPN and/or SNI information from inbound connections. listener.Capabilities = append(listener.Capabilities, pbproxystate.Capability_CAPABILITY_L4_TLS_INSPECTION) + if b.proxyCfg.GetDynamicConfig() != nil && b.proxyCfg.GetDynamicConfig().InboundConnections != nil { + listener.BalanceConnections = pbproxystate.BalanceConnections(b.proxyCfg.DynamicConfig.InboundConnections.BalanceInboundConnections) + } return b.NewListenerBuilder(listener) } -func (l *ListenerBuilder) addInboundRouter(clusterName string, routeName string, port *pbcatalog.WorkloadPort, portName string, tp *pbproxystate.TrafficPermissions) *ListenerBuilder { +func (l *ListenerBuilder) addInboundRouter(clusterName string, routeName string, + port *pbcatalog.WorkloadPort, portName string, tp *pbproxystate.TrafficPermissions, + ic *pbmesh.InboundConnectionsConfig) *ListenerBuilder { + if l.listener == nil { return l } @@ -289,6 +298,15 @@ func (l *ListenerBuilder) addInboundRouter(clusterName string, routeName string, AlpnProtocols: []string{getAlpnProtocolFromPortName(portName)}, }, } + + if ic != nil { + // MaxInboundConnections is uint32 that is used on: + // - router destinations MaxInboundConnection (uint64). + // - cluster circuit breakers UpstreamLimits.MaxConnections (uint32). + // It is cast to a uint64 here similarly as it is to the proxystateconverter code. + r.GetL4().MaxInboundConnections = uint64(ic.MaxInboundConnections) + } + l.listener.Routers = append(l.listener.Routers, r) } else if isL7(port.Protocol) { r := &pbproxystate.Router{ @@ -308,6 +326,13 @@ func (l *ListenerBuilder) addInboundRouter(clusterName string, routeName string, AlpnProtocols: []string{getAlpnProtocolFromPortName(portName)}, }, } + + if ic != nil { + // MaxInboundConnections is cast to a uint64 here similarly as it is to the + // as the L4 case statement above and in proxystateconverter code. + r.GetL7().MaxInboundConnections = uint64(ic.MaxInboundConnections) + } + l.listener.Routers = append(l.listener.Routers, r) } return l @@ -339,7 +364,7 @@ func getAlpnProtocolFromPortName(portName string) string { return fmt.Sprintf("consul~%s", portName) } -func (b *Builder) addLocalAppRoute(routeName string, clusterName string) { +func (b *Builder) addLocalAppRoute(routeName, clusterName, portName string) { proxyRouteRule := &pbproxystate.RouteRule{ Match: &pbproxystate.RouteMatch{ PathMatch: &pbproxystate.PathMatch{ @@ -356,6 +381,18 @@ func (b *Builder) addLocalAppRoute(routeName string, clusterName string) { }, }, } + if b.proxyCfg.GetDynamicConfig() != nil && b.proxyCfg.GetDynamicConfig().LocalConnection != nil { + lc, lcOK := b.proxyCfg.GetDynamicConfig().LocalConnection[portName] + if lcOK { + proxyRouteRule.Destination.DestinationConfiguration = + &pbproxystate.DestinationConfiguration{ + TimeoutConfig: &pbproxystate.TimeoutConfig{ + Timeout: lc.RequestTimeout, + }, + } + } + } + // Each route name for the local app is listenerName:port since there is a route per port on the local app listener. b.addRoute(routeName, &pbproxystate.Route{ VirtualHosts: []*pbproxystate.VirtualHost{{ @@ -373,9 +410,9 @@ func isL7(protocol pbcatalog.Protocol) bool { return false } -func (b *Builder) addLocalAppCluster(clusterName string) *Builder { +func (b *Builder) addLocalAppCluster(clusterName string, portName *string) *Builder { // Make cluster for this router destination. - b.proxyStateTemplate.ProxyState.Clusters[clusterName] = &pbproxystate.Cluster{ + cluster := &pbproxystate.Cluster{ Group: &pbproxystate.Cluster_EndpointGroup{ EndpointGroup: &pbproxystate.EndpointGroup{ Group: &pbproxystate.EndpointGroup_Static{ @@ -384,20 +421,34 @@ func (b *Builder) addLocalAppCluster(clusterName string) *Builder { }, }, } + + // configure inbound connections or connection timeout if either is defined + if b.proxyCfg.GetDynamicConfig() != nil && portName != nil { + lc, lcOK := b.proxyCfg.DynamicConfig.LocalConnection[*portName] + + if lcOK || b.proxyCfg.DynamicConfig.InboundConnections != nil { + cluster.GetEndpointGroup().GetStatic().Config = &pbproxystate.StaticEndpointGroupConfig{} + + if lcOK { + cluster.GetEndpointGroup().GetStatic().GetConfig().ConnectTimeout = lc.ConnectTimeout + } + + if b.proxyCfg.DynamicConfig.InboundConnections != nil { + cluster.GetEndpointGroup().GetStatic().GetConfig().CircuitBreakers = &pbproxystate.CircuitBreakers{ + UpstreamLimits: &pbproxystate.UpstreamLimits{ + MaxConnections: &wrapperspb.UInt32Value{Value: b.proxyCfg.DynamicConfig.InboundConnections.MaxInboundConnections}, + }, + } + } + } + } + + b.proxyStateTemplate.ProxyState.Clusters[clusterName] = cluster return b } func (b *Builder) addBlackHoleCluster() *Builder { - b.proxyStateTemplate.ProxyState.Clusters[xdscommon.BlackHoleClusterName] = &pbproxystate.Cluster{ - Group: &pbproxystate.Cluster_EndpointGroup{ - EndpointGroup: &pbproxystate.EndpointGroup{ - Group: &pbproxystate.EndpointGroup_Static{ - Static: &pbproxystate.StaticEndpointGroup{}, - }, - }, - }, - } - return b + return b.addLocalAppCluster(xdscommon.BlackHoleClusterName, nil) } func (b *Builder) addLocalAppStaticEndpoints(clusterName string, port uint32) { diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/local_app_test.go b/internal/mesh/internal/controllers/sidecarproxy/builder/local_app_test.go index 785e0aab2e..33dcab7155 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/builder/local_app_test.go +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/local_app_test.go @@ -4,7 +4,10 @@ package builder import ( + "google.golang.org/protobuf/types/known/durationpb" + "sort" "testing" + "time" "github.com/stretchr/testify/require" @@ -139,6 +142,55 @@ func TestBuildLocalApp_WithProxyConfiguration(t *testing.T) { }, }, }, + // source/local-and-inbound-connections shows that configuring LocalCOnnection + // and InboundConnections in DynamicConfig will set fields on standard clusters and routes, + // but will not set fields on exposed path clusters and routes. + "source/local-and-inbound-connections": { + workload: &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + { + Host: "10.0.0.1", + }, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "port1": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, + "port2": {Port: 20000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH}, + "port3": {Port: 8081, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + }, + }, + proxyCfg: &pbmesh.ComputedProxyConfiguration{ + DynamicConfig: &pbmesh.DynamicConfig{ + LocalConnection: map[string]*pbmesh.ConnectionConfig{ + "port1": { + ConnectTimeout: durationpb.New(6 * time.Second), + RequestTimeout: durationpb.New(7 * time.Second)}, + "port3": { + ConnectTimeout: durationpb.New(8 * time.Second), + RequestTimeout: durationpb.New(9 * time.Second)}, + }, + InboundConnections: &pbmesh.InboundConnectionsConfig{ + MaxInboundConnections: 123, + BalanceInboundConnections: pbmesh.BalanceConnections(pbproxystate.BalanceConnections_BALANCE_CONNECTIONS_EXACT), + }, + ExposeConfig: &pbmesh.ExposeConfig{ + ExposePaths: []*pbmesh.ExposePath{ + { + ListenerPort: 1234, + Path: "/health", + LocalPathPort: 9090, + Protocol: pbmesh.ExposePathProtocol_EXPOSE_PATH_PROTOCOL_HTTP, + }, + { + ListenerPort: 1235, + Path: "GetHealth", + LocalPathPort: 9091, + Protocol: pbmesh.ExposePathProtocol_EXPOSE_PATH_PROTOCOL_HTTP2, + }, + }, + }, + }, + }, + }, } for name, c := range cases { @@ -146,10 +198,24 @@ func TestBuildLocalApp_WithProxyConfiguration(t *testing.T) { proxyTmpl := New(testProxyStateTemplateID(), testIdentityRef(), "foo.consul", "dc1", true, c.proxyCfg). BuildLocalApp(c.workload, nil). Build() - actual := protoToJSON(t, proxyTmpl) - expected := golden.Get(t, actual, name+".golden") - require.JSONEq(t, expected, actual) + // sort routers because of test flakes where order was flip flopping. + actualRouters := proxyTmpl.ProxyState.Listeners[0].Routers + sort.Slice(actualRouters, func(i, j int) bool { + return actualRouters[i].String() < actualRouters[j].String() + }) + + actual := protoToJSON(t, proxyTmpl) + expected := JSONToProxyTemplate(t, golden.GetBytes(t, actual, name+".golden")) + + // sort routers on listener from golden file + expectedRouters := expected.ProxyState.Listeners[0].Routers + sort.Slice(expectedRouters, func(i, j int) bool { + return expectedRouters[i].String() < expectedRouters[j].String() + }) + + // convert back to json after sorting so that test output does not contain extraneous fields. + require.Equal(t, protoToJSON(t, expected), protoToJSON(t, proxyTmpl)) }) } } diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/source/local-and-inbound-connections.golden b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/source/local-and-inbound-connections.golden new file mode 100644 index 0000000000..169a4969ae --- /dev/null +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/source/local-and-inbound-connections.golden @@ -0,0 +1,300 @@ +{ + "proxyState": { + "clusters": { + "exposed_cluster_9090": { + "endpointGroup": { + "static": {} + }, + "name": "exposed_cluster_9090" + }, + "exposed_cluster_9091": { + "endpointGroup": { + "static": {} + }, + "name": "exposed_cluster_9091" + }, + "local_app:port1": { + "endpointGroup": { + "static": { + "config": { + "connectTimeout": "6s", + "circuitBreakers": { + "upstreamLimits": { + "maxConnections": 123 + } + } + } + } + }, + "name": "local_app:port1" + }, + "local_app:port3": { + "endpointGroup": { + "static": { + "config": { + "connectTimeout": "8s", + "circuitBreakers": { + "upstreamLimits": { + "maxConnections": 123 + } + } + } + } + }, + "name": "local_app:port3" + } + }, + "endpoints": { + "exposed_cluster_9090": { + "endpoints": [ + { + "hostPort": { + "host": "127.0.0.1", + "port": 9090 + } + } + ] + }, + "exposed_cluster_9091": { + "endpoints": [ + { + "hostPort": { + "host": "127.0.0.1", + "port": 9091 + } + } + ] + }, + "local_app:port1": { + "endpoints": [ + { + "hostPort": { + "host": "127.0.0.1", + "port": 8080 + } + } + ] + }, + "local_app:port3": { + "endpoints": [ + { + "hostPort": { + "host": "127.0.0.1", + "port": 8081 + } + } + ] + } + }, + "identity": { + "name": "test-identity", + "tenancy": { + "namespace": "default", + "partition": "default", + "peerName": "local" + }, + "type": { + "group": "auth", + "groupVersion": "v2beta1", + "kind": "WorkloadIdentity" + } + }, + "listeners": [ + { + "capabilities": [ + "CAPABILITY_L4_TLS_INSPECTION" + ], + "direction": "DIRECTION_INBOUND", + "hostPort": { + "host": "10.0.0.1", + "port": 20000 + }, + "name": "public_listener", + "balanceConnections": "BALANCE_CONNECTIONS_EXACT", + "routers": [ + { + "inboundTls": { + "inboundMesh": { + "identityKey": "test-identity", + "validationContext": { + "trustBundlePeerNameKeys": [ + "local" + ] + } + } + }, + "l4": { + "cluster": { + "name": "local_app:port1" + }, + "maxInboundConnections": 123, + "statPrefix": "public_listener", + "trafficPermissions": {} + }, + "match": { + "alpnProtocols": [ + "consul~port1" + ] + } + }, + { + "inboundTls": { + "inboundMesh": { + "identityKey": "test-identity", + "validationContext": { + "trustBundlePeerNameKeys": [ + "local" + ] + } + } + }, + "l7": { + "route": { + "name": "public_listener:port3" + }, + "maxInboundConnections": 123, + "statPrefix": "public_listener", + "staticRoute": true, + "trafficPermissions": {} + }, + "match": { + "alpnProtocols": [ + "consul~port3" + ] + } + } + ] + }, + { + "direction": "DIRECTION_INBOUND", + "hostPort": { + "host": "10.0.0.1", + "port": 9090 + }, + "name": "exposed_path_health", + "routers": [ + { + "l7": { + "route": { + "name": "exposed_path_filter_health_1234" + }, + "statPrefix": "exposed_path_filter_health_1234", + "staticRoute": true + } + } + ] + }, + { + "direction": "DIRECTION_INBOUND", + "hostPort": { + "host": "10.0.0.1", + "port": 9091 + }, + "name": "exposed_path_GetHealth", + "routers": [ + { + "l7": { + "protocol": "L7_PROTOCOL_HTTP2", + "route": { + "name": "exposed_path_filter_GetHealth_1235" + }, + "statPrefix": "exposed_path_filter_GetHealth_1235", + "staticRoute": true + } + } + ] + } + ], + "routes": { + "exposed_path_filter_GetHealth_1235": { + "virtualHosts": [ + { + "domains": [ + "*" + ], + "name": "exposed_path_filter_GetHealth_1235", + "routeRules": [ + { + "destination": { + "cluster": { + "name": "exposed_cluster_9091" + } + }, + "match": { + "pathMatch": { + "exact": "GetHealth" + } + } + } + ] + } + ] + }, + "exposed_path_filter_health_1234": { + "virtualHosts": [ + { + "domains": [ + "*" + ], + "name": "exposed_path_filter_health_1234", + "routeRules": [ + { + "destination": { + "cluster": { + "name": "exposed_cluster_9090" + } + }, + "match": { + "pathMatch": { + "exact": "/health" + } + } + } + ] + } + ] + }, + "public_listener:port3": { + "virtualHosts": [ + { + "domains": [ + "*" + ], + "name": "public_listener:port3", + "routeRules": [ + { + "destination": { + "cluster": { + "name": "local_app:port3" + }, + "destinationConfiguration": { + "timeoutConfig": { + "timeout": "9s" + } + } + }, + "match": { + "pathMatch": { + "prefix": "/" + } + } + } + ] + } + ] + } + } + }, + "requiredLeafCertificates": { + "test-identity": { + "name": "test-identity", + "namespace": "default", + "partition": "default" + } + }, + "requiredTrustBundles": { + "local": { + "peer": "local" + } + } +} \ No newline at end of file diff --git a/proto-public/pbmesh/v2beta1/connection.pb.go b/proto-public/pbmesh/v2beta1/connection.pb.go index d8ccaba7d7..591df222fc 100644 --- a/proto-public/pbmesh/v2beta1/connection.pb.go +++ b/proto-public/pbmesh/v2beta1/connection.pb.go @@ -137,8 +137,8 @@ type InboundConnectionsConfig struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - MaxInboundConnections uint64 `protobuf:"varint,12,opt,name=max_inbound_connections,json=maxInboundConnections,proto3" json:"max_inbound_connections,omitempty"` - BalanceInboundConnections BalanceConnections `protobuf:"varint,13,opt,name=balance_inbound_connections,json=balanceInboundConnections,proto3,enum=hashicorp.consul.mesh.v2beta1.BalanceConnections" json:"balance_inbound_connections,omitempty"` + MaxInboundConnections uint32 `protobuf:"varint,1,opt,name=max_inbound_connections,json=maxInboundConnections,proto3" json:"max_inbound_connections,omitempty"` + BalanceInboundConnections BalanceConnections `protobuf:"varint,2,opt,name=balance_inbound_connections,json=balanceInboundConnections,proto3,enum=hashicorp.consul.mesh.v2beta1.BalanceConnections" json:"balance_inbound_connections,omitempty"` } func (x *InboundConnectionsConfig) Reset() { @@ -173,7 +173,7 @@ func (*InboundConnectionsConfig) Descriptor() ([]byte, []int) { return file_pbmesh_v2beta1_connection_proto_rawDescGZIP(), []int{1} } -func (x *InboundConnectionsConfig) GetMaxInboundConnections() uint64 { +func (x *InboundConnectionsConfig) GetMaxInboundConnections() uint32 { if x != nil { return x.MaxInboundConnections } @@ -209,11 +209,11 @@ var file_pbmesh_v2beta1_connection_proto_rawDesc = []byte{ 0x0a, 0x18, 0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x36, 0x0a, 0x17, 0x6d, 0x61, 0x78, 0x5f, 0x69, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x5f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x04, 0x52, 0x15, 0x6d, 0x61, 0x78, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x15, 0x6d, 0x61, 0x78, 0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x71, 0x0a, 0x1b, 0x62, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x5f, 0x69, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x5f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, - 0x73, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x31, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, + 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x31, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x32, 0x62, 0x65, 0x74, 0x61, 0x31, 0x2e, 0x42, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x19, 0x62, 0x61, 0x6c, 0x61, diff --git a/proto-public/pbmesh/v2beta1/connection.proto b/proto-public/pbmesh/v2beta1/connection.proto index fdb7217a12..65cb21e586 100644 --- a/proto-public/pbmesh/v2beta1/connection.proto +++ b/proto-public/pbmesh/v2beta1/connection.proto @@ -17,8 +17,8 @@ message ConnectionConfig { // Referenced by ProxyConfiguration message InboundConnectionsConfig { - uint64 max_inbound_connections = 12; - BalanceConnections balance_inbound_connections = 13; + uint32 max_inbound_connections = 1; + BalanceConnections balance_inbound_connections = 2; } // +kubebuilder:validation:Enum=BALANCE_CONNECTIONS_DEFAULT;BALANCE_CONNECTIONS_EXACT