Add extension validation on config save and refactor extensions. (#16110)

This commit is contained in:
Derek Menteer 2023-01-30 15:35:26 -06:00 committed by GitHub
parent 90041639fc
commit 1b02749375
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1109 additions and 754 deletions

View File

@ -1,4 +1,4 @@
package lambda
package awslambda
import (
"errors"
@ -17,61 +17,58 @@ import (
"github.com/mitchellh/mapstructure"
pstruct "google.golang.org/protobuf/types/known/structpb"
"github.com/hashicorp/consul/agent/xds/builtinextensiontemplate"
"github.com/hashicorp/consul/agent/xds/xdscommon"
"github.com/hashicorp/consul/agent/envoyextensions/extensioncommon"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-multierror"
)
type lambda struct {
var _ extensioncommon.BasicExtension = (*awsLambda)(nil)
type awsLambda struct {
ARN string
PayloadPassthrough bool
Kind api.ServiceKind
InvocationMode string
}
var _ builtinextensiontemplate.Plugin = (*lambda)(nil)
// MakeLambdaExtension is a builtinextensiontemplate.PluginConstructor for a builtinextensiontemplate.EnvoyExtension.
func MakeLambdaExtension(ext xdscommon.ExtensionConfiguration) (builtinextensiontemplate.Plugin, error) {
var resultErr error
var plugin lambda
if name := ext.EnvoyExtension.Name; name != api.BuiltinAWSLambdaExtension {
return nil, fmt.Errorf("expected extension name 'lambda' but got %q", name)
// Constructor follows a specific function signature required for the extension registration.
func Constructor(ext api.EnvoyExtension) (extensioncommon.EnvoyExtender, error) {
var a awsLambda
if name := ext.Name; name != api.BuiltinAWSLambdaExtension {
return nil, fmt.Errorf("expected extension name %q but got %q", api.BuiltinAWSLambdaExtension, name)
}
if err := mapstructure.Decode(ext.EnvoyExtension.Arguments, &plugin); err != nil {
return nil, fmt.Errorf("error decoding extension arguments: %v", err)
if err := a.fromArguments(ext.Arguments); err != nil {
return nil, err
}
if plugin.ARN == "" {
resultErr = multierror.Append(resultErr, fmt.Errorf("ARN is required"))
}
plugin.Kind = ext.OutgoingProxyKind()
return plugin, resultErr
return &extensioncommon.BasicEnvoyExtender{
Extension: &a,
}, nil
}
func toEnvoyInvocationMode(s string) envoy_lambda_v3.Config_InvocationMode {
m := envoy_lambda_v3.Config_SYNCHRONOUS
if s == "asynchronous" {
m = envoy_lambda_v3.Config_ASYNCHRONOUS
func (a *awsLambda) fromArguments(args map[string]interface{}) error {
if err := mapstructure.Decode(args, a); err != nil {
return fmt.Errorf("error decoding extension arguments: %v", err)
}
return m
return a.validate()
}
func (a *awsLambda) validate() error {
var resultErr error
if a.ARN == "" {
resultErr = multierror.Append(resultErr, fmt.Errorf("ARN is required"))
}
return resultErr
}
// CanApply returns true if the kind of the provided ExtensionConfiguration matches
// the kind of the lambda configuration
func (p lambda) CanApply(config xdscommon.ExtensionConfiguration) bool {
return config.Kind == p.Kind
func (a *awsLambda) CanApply(config *extensioncommon.RuntimeConfig) bool {
return config.Kind == config.OutgoingProxyKind()
}
// PatchRoute modifies the routing configuration for a service of kind TerminatingGateway. If the kind is
// not TerminatingGateway, then it can not be modified.
func (p lambda) PatchRoute(route *envoy_route_v3.RouteConfiguration) (*envoy_route_v3.RouteConfiguration, bool, error) {
if p.Kind != api.ServiceKindTerminatingGateway {
func (a *awsLambda) PatchRoute(r *extensioncommon.RuntimeConfig, route *envoy_route_v3.RouteConfiguration) (*envoy_route_v3.RouteConfiguration, bool, error) {
if r.Kind != api.ServiceKindTerminatingGateway {
return route, false, nil
}
@ -94,7 +91,7 @@ func (p lambda) PatchRoute(route *envoy_route_v3.RouteConfiguration) (*envoy_rou
}
// PatchCluster patches the provided envoy cluster with data required to support an AWS lambda function
func (p lambda) PatchCluster(c *envoy_cluster_v3.Cluster) (*envoy_cluster_v3.Cluster, bool, error) {
func (a *awsLambda) PatchCluster(_ *extensioncommon.RuntimeConfig, c *envoy_cluster_v3.Cluster) (*envoy_cluster_v3.Cluster, bool, error) {
transportSocket, err := makeUpstreamTLSTransportSocket(&envoy_tls_v3.UpstreamTlsContext{
Sni: "*.amazonaws.com",
})
@ -104,7 +101,7 @@ func (p lambda) PatchCluster(c *envoy_cluster_v3.Cluster) (*envoy_cluster_v3.Clu
}
// Use the aws SDK to parse the ARN so that we can later extract the region
parsedARN, err := arn_sdk.Parse(p.ARN)
parsedARN, err := arn_sdk.Parse(a.ARN)
if err != nil {
return c, false, err
}
@ -156,7 +153,7 @@ func (p lambda) PatchCluster(c *envoy_cluster_v3.Cluster) (*envoy_cluster_v3.Clu
// PatchFilter patches the provided envoy filter with an inserted lambda filter being careful not to
// overwrite the http filters.
func (p lambda) PatchFilter(filter *envoy_listener_v3.Filter) (*envoy_listener_v3.Filter, bool, error) {
func (a *awsLambda) PatchFilter(_ *extensioncommon.RuntimeConfig, filter *envoy_listener_v3.Filter) (*envoy_listener_v3.Filter, bool, error) {
if filter.Name != "envoy.filters.network.http_connection_manager" {
return filter, false, nil
}
@ -172,9 +169,9 @@ func (p lambda) PatchFilter(filter *envoy_listener_v3.Filter) (*envoy_listener_v
lambdaHttpFilter, err := makeEnvoyHTTPFilter(
"envoy.filters.http.aws_lambda",
&envoy_lambda_v3.Config{
Arn: p.ARN,
PayloadPassthrough: p.PayloadPassthrough,
InvocationMode: toEnvoyInvocationMode(p.InvocationMode),
Arn: a.ARN,
PayloadPassthrough: a.PayloadPassthrough,
InvocationMode: toEnvoyInvocationMode(a.InvocationMode),
},
)
if err != nil {
@ -211,3 +208,11 @@ func (p lambda) PatchFilter(filter *envoy_listener_v3.Filter) (*envoy_listener_v
return newFilter, true, nil
}
func toEnvoyInvocationMode(s string) envoy_lambda_v3.Config_InvocationMode {
m := envoy_lambda_v3.Config_SYNCHRONOUS
if s == "asynchronous" {
m = envoy_lambda_v3.Config_ASYNCHRONOUS
}
return m
}

View File

@ -0,0 +1,435 @@
package awslambda
import (
"fmt"
"testing"
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
envoy_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
envoy_lambda_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/aws_lambda/v3"
envoy_http_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
envoy_tls_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
pstruct "google.golang.org/protobuf/types/known/structpb"
"github.com/hashicorp/consul/agent/envoyextensions/extensioncommon"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/prototest"
)
func TestConstructor(t *testing.T) {
kind := api.ServiceKindTerminatingGateway
cases := map[string]struct {
extensionName string
arn string
payloadPassthrough bool
region string
expected awsLambda
ok bool
}{
"no arguments": {
ok: false,
},
"a bad name": {
arn: "arn",
region: "blah",
extensionName: "bad",
ok: false,
},
"missing arn": {
region: "blah",
ok: false,
},
"including payload passthrough": {
arn: "arn",
region: "blah",
payloadPassthrough: true,
expected: awsLambda{
ARN: "arn",
PayloadPassthrough: true,
},
ok: true,
},
}
for n, tc := range cases {
t.Run(n, func(t *testing.T) {
extensionName := api.BuiltinAWSLambdaExtension
if tc.extensionName != "" {
extensionName = tc.extensionName
}
svc := api.CompoundServiceName{Name: "svc"}
ext := extensioncommon.RuntimeConfig{
ServiceName: svc,
Upstreams: map[api.CompoundServiceName]extensioncommon.UpstreamData{
svc: {OutgoingProxyKind: kind},
},
EnvoyExtension: api.EnvoyExtension{
Name: extensionName,
Arguments: map[string]interface{}{
"ARN": tc.arn,
"PayloadPassthrough": tc.payloadPassthrough,
},
},
}
e, err := Constructor(ext.EnvoyExtension)
if tc.ok {
require.NoError(t, err)
require.Equal(t, &extensioncommon.BasicEnvoyExtender{Extension: &tc.expected}, e)
} else {
require.Error(t, err)
}
})
}
}
func TestCanApply(t *testing.T) {
a := awsLambda{}
require.False(t, a.CanApply(&extensioncommon.RuntimeConfig{
Kind: api.ServiceKindConnectProxy,
ServiceName: api.CompoundServiceName{Name: "s1"},
Upstreams: map[api.CompoundServiceName]extensioncommon.UpstreamData{
{Name: "s1"}: {
OutgoingProxyKind: api.ServiceKindTerminatingGateway,
},
},
}))
require.True(t, a.CanApply(&extensioncommon.RuntimeConfig{
Kind: api.ServiceKindConnectProxy,
ServiceName: api.CompoundServiceName{Name: "s1"},
Upstreams: map[api.CompoundServiceName]extensioncommon.UpstreamData{
{Name: "s1"}: {
OutgoingProxyKind: api.ServiceKindConnectProxy,
},
},
}))
}
func TestPatchCluster(t *testing.T) {
cases := []struct {
name string
lambda awsLambda
input *envoy_cluster_v3.Cluster
expectedRegion string
isErrExpected bool
}{
{
name: "nominal",
input: &envoy_cluster_v3.Cluster{
Name: "test-cluster",
},
lambda: awsLambda{
ARN: "arn:aws:lambda:us-east-1:111111111111:function:lambda-1234",
PayloadPassthrough: true,
InvocationMode: "Asynchronous",
},
expectedRegion: "us-east-1",
},
{
name: "error invalid arn",
input: &envoy_cluster_v3.Cluster{
Name: "test-cluster",
},
lambda: awsLambda{
ARN: "?!@%^SA",
PayloadPassthrough: true,
InvocationMode: "Asynchronous",
},
isErrExpected: true,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
transportSocket, err := makeUpstreamTLSTransportSocket(&envoy_tls_v3.UpstreamTlsContext{
Sni: "*.amazonaws.com",
})
require.NoError(t, err)
expectedCluster := &envoy_cluster_v3.Cluster{
Name: tc.input.Name,
ConnectTimeout: tc.input.ConnectTimeout,
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{Type: envoy_cluster_v3.Cluster_LOGICAL_DNS},
DnsLookupFamily: envoy_cluster_v3.Cluster_V4_ONLY,
LbPolicy: envoy_cluster_v3.Cluster_ROUND_ROBIN,
Metadata: &envoy_core_v3.Metadata{
FilterMetadata: map[string]*pstruct.Struct{
"com.amazonaws.lambda": {
Fields: map[string]*pstruct.Value{
"egress_gateway": {Kind: &pstruct.Value_BoolValue{BoolValue: true}},
},
},
},
},
LoadAssignment: &envoy_endpoint_v3.ClusterLoadAssignment{
ClusterName: tc.input.Name,
Endpoints: []*envoy_endpoint_v3.LocalityLbEndpoints{
{
LbEndpoints: []*envoy_endpoint_v3.LbEndpoint{
{
HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{
Endpoint: &envoy_endpoint_v3.Endpoint{
Address: &envoy_core_v3.Address{
Address: &envoy_core_v3.Address_SocketAddress{
SocketAddress: &envoy_core_v3.SocketAddress{
Address: fmt.Sprintf("lambda.%s.amazonaws.com", tc.expectedRegion),
PortSpecifier: &envoy_core_v3.SocketAddress_PortValue{
PortValue: 443,
},
},
},
},
},
},
},
},
},
},
},
TransportSocket: transportSocket,
}
// Test patching the cluster
rc := extensioncommon.RuntimeConfig{}
patchedCluster, patchSuccess, err := tc.lambda.PatchCluster(&rc, tc.input)
if tc.isErrExpected {
assert.Error(t, err)
assert.False(t, patchSuccess)
} else {
assert.NoError(t, err)
assert.True(t, patchSuccess)
assert.Equal(t, expectedCluster, patchedCluster)
}
})
}
}
func TestPatchRoute(t *testing.T) {
tests := map[string]struct {
conf *extensioncommon.RuntimeConfig
route *envoy_route_v3.RouteConfiguration
expectRoute *envoy_route_v3.RouteConfiguration
expectBool bool
}{
"non terminating gateway unmodified": {
conf: &extensioncommon.RuntimeConfig{
Kind: api.ServiceKindConnectProxy,
},
route: &envoy_route_v3.RouteConfiguration{
VirtualHosts: []*envoy_route_v3.VirtualHost{
{
Routes: []*envoy_route_v3.Route{
{
Action: &envoy_route_v3.Route_Route{
Route: &envoy_route_v3.RouteAction{
HostRewriteSpecifier: &envoy_route_v3.RouteAction_HostRewriteLiteral{},
},
},
},
},
},
},
},
expectRoute: &envoy_route_v3.RouteConfiguration{
VirtualHosts: []*envoy_route_v3.VirtualHost{
{
Routes: []*envoy_route_v3.Route{
{
Action: &envoy_route_v3.Route_Route{
Route: &envoy_route_v3.RouteAction{
HostRewriteSpecifier: &envoy_route_v3.RouteAction_HostRewriteLiteral{},
},
},
},
},
},
},
},
expectBool: false,
},
"terminating gateway modified": {
conf: &extensioncommon.RuntimeConfig{
Kind: api.ServiceKindTerminatingGateway,
},
route: &envoy_route_v3.RouteConfiguration{
VirtualHosts: []*envoy_route_v3.VirtualHost{
{
Routes: []*envoy_route_v3.Route{
// This should be modified.
{
Action: &envoy_route_v3.Route_Route{
Route: &envoy_route_v3.RouteAction{
HostRewriteSpecifier: &envoy_route_v3.RouteAction_HostRewriteLiteral{},
},
},
},
// This should be not be modified.
{
Action: &envoy_route_v3.Route_DirectResponse{},
},
},
},
},
},
expectRoute: &envoy_route_v3.RouteConfiguration{
VirtualHosts: []*envoy_route_v3.VirtualHost{
{
Routes: []*envoy_route_v3.Route{
{
Action: &envoy_route_v3.Route_Route{
Route: &envoy_route_v3.RouteAction{
HostRewriteSpecifier: nil,
},
},
},
{
Action: &envoy_route_v3.Route_DirectResponse{},
},
},
},
},
},
expectBool: true,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
l := awsLambda{}
r, ok, err := l.PatchRoute(tc.conf, tc.route)
require.NoError(t, err)
require.Equal(t, tc.expectRoute, r)
require.Equal(t, tc.expectBool, ok)
})
}
}
func TestPatchFilter(t *testing.T) {
makeAny := func(m proto.Message) *anypb.Any {
v, err := anypb.New(m)
require.NoError(t, err)
return v
}
tests := map[string]struct {
filter *envoy_listener_v3.Filter
expectFilter *envoy_listener_v3.Filter
expectBool bool
expectErr string
}{
"invalid filter name is ignored": {
filter: &envoy_listener_v3.Filter{Name: "something"},
expectFilter: &envoy_listener_v3.Filter{Name: "something"},
expectBool: false,
},
"error getting typed config": {
filter: &envoy_listener_v3.Filter{Name: "envoy.filters.network.http_connection_manager"},
expectFilter: &envoy_listener_v3.Filter{Name: "envoy.filters.network.http_connection_manager"},
expectBool: false,
expectErr: "error getting typed config for http filter",
},
"error getting http connection manager": {
filter: &envoy_listener_v3.Filter{
Name: "envoy.filters.network.http_connection_manager",
ConfigType: &envoy_listener_v3.Filter_TypedConfig{
TypedConfig: &anypb.Any{},
},
},
expectFilter: &envoy_listener_v3.Filter{
Name: "envoy.filters.network.http_connection_manager",
ConfigType: &envoy_listener_v3.Filter_TypedConfig{
TypedConfig: &anypb.Any{},
},
},
expectBool: false,
expectErr: "error unmarshalling filter",
},
"StripAnyHostPort is set": {
filter: &envoy_listener_v3.Filter{
Name: "envoy.filters.network.http_connection_manager",
ConfigType: &envoy_listener_v3.Filter_TypedConfig{
TypedConfig: makeAny(&envoy_http_v3.HttpConnectionManager{}),
},
},
expectFilter: &envoy_listener_v3.Filter{
Name: "envoy.filters.network.http_connection_manager",
ConfigType: &envoy_listener_v3.Filter_TypedConfig{
TypedConfig: makeAny(&envoy_http_v3.HttpConnectionManager{
StripPortMode: &envoy_http_v3.HttpConnectionManager_StripAnyHostPort{
StripAnyHostPort: true,
},
}),
},
},
expectBool: true,
},
"lambda filter injected correctly": {
filter: &envoy_listener_v3.Filter{
Name: "envoy.filters.network.http_connection_manager",
ConfigType: &envoy_listener_v3.Filter_TypedConfig{
TypedConfig: makeAny(&envoy_http_v3.HttpConnectionManager{
HttpFilters: []*envoy_http_v3.HttpFilter{
{Name: "one"},
{Name: "two"},
{Name: "envoy.filters.http.router"},
{Name: "three"},
},
}),
},
},
expectFilter: &envoy_listener_v3.Filter{
Name: "envoy.filters.network.http_connection_manager",
ConfigType: &envoy_listener_v3.Filter_TypedConfig{
TypedConfig: makeAny(&envoy_http_v3.HttpConnectionManager{
StripPortMode: &envoy_http_v3.HttpConnectionManager_StripAnyHostPort{
StripAnyHostPort: true,
},
HttpFilters: []*envoy_http_v3.HttpFilter{
{Name: "one"},
{Name: "two"},
{
Name: "envoy.filters.http.aws_lambda",
ConfigType: &envoy_http_v3.HttpFilter_TypedConfig{TypedConfig: makeAny(
&envoy_lambda_v3.Config{
Arn: "some-arn",
PayloadPassthrough: true,
InvocationMode: envoy_lambda_v3.Config_ASYNCHRONOUS,
},
)},
},
{Name: "envoy.filters.http.router"},
{Name: "three"},
},
}),
},
},
expectBool: true,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
l := awsLambda{
ARN: "some-arn",
PayloadPassthrough: true,
InvocationMode: "asynchronous",
}
f, ok, err := l.PatchFilter(nil, tc.filter)
require.Equal(t, tc.expectBool, ok)
if tc.expectErr == "" {
require.NoError(t, err)
} else {
require.ErrorContains(t, err, tc.expectErr)
}
prototest.AssertDeepEqual(t, tc.expectFilter, f)
})
}
}

View File

@ -1,4 +1,4 @@
package lambda
package awslambda
import (
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"

View File

@ -10,87 +10,76 @@ import (
envoy_lua_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/lua/v3"
envoy_http_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
envoy_resource_v3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/hashicorp/consul/agent/envoyextensions/extensioncommon"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-multierror"
"github.com/mitchellh/mapstructure"
"github.com/hashicorp/consul/agent/xds/builtinextensiontemplate"
"github.com/hashicorp/consul/agent/xds/xdscommon"
"github.com/hashicorp/consul/api"
)
var _ extensioncommon.BasicExtension = (*lua)(nil)
type lua struct {
ProxyType string
Listener string
Script string
}
var _ builtinextensiontemplate.Plugin = (*lua)(nil)
// MakeLuaExtension is a builtinextensiontemplate.PluginConstructor for a builtinextensiontemplate.EnvoyExtension.
func MakeLuaExtension(ext xdscommon.ExtensionConfiguration) (builtinextensiontemplate.Plugin, error) {
var resultErr error
var plugin lua
if name := ext.EnvoyExtension.Name; name != api.BuiltinLuaExtension {
// Constructor follows a specific function signature required for the extension registration.
func Constructor(ext api.EnvoyExtension) (extensioncommon.EnvoyExtender, error) {
var l lua
if name := ext.Name; name != api.BuiltinLuaExtension {
return nil, fmt.Errorf("expected extension name 'lua' but got %q", name)
}
if err := mapstructure.Decode(ext.EnvoyExtension.Arguments, &plugin); err != nil {
return nil, fmt.Errorf("error decoding extension arguments: %v", err)
if err := l.fromArguments(ext.Arguments); err != nil {
return nil, err
}
if plugin.Script == "" {
resultErr = multierror.Append(resultErr, fmt.Errorf("Script is required"))
}
if err := validateProxyType(plugin.ProxyType); err != nil {
resultErr = multierror.Append(resultErr, err)
}
if err := validateListener(plugin.Listener); err != nil {
resultErr = multierror.Append(resultErr, err)
}
return plugin, resultErr
return &extensioncommon.BasicEnvoyExtender{
Extension: &l,
}, nil
}
func validateProxyType(t string) error {
if t != "connect-proxy" {
return fmt.Errorf("unexpected ProxyType %q", t)
func (l *lua) fromArguments(args map[string]interface{}) error {
if err := mapstructure.Decode(args, l); err != nil {
return fmt.Errorf("error decoding extension arguments: %v", err)
}
return nil
return l.validate()
}
func validateListener(t string) error {
if t != "inbound" && t != "outbound" {
return fmt.Errorf("unexpected Listener %q", t)
func (l *lua) validate() error {
var resultErr error
if l.Script == "" {
resultErr = multierror.Append(resultErr, fmt.Errorf("missing Script value"))
}
return nil
if l.ProxyType != "connect-proxy" {
resultErr = multierror.Append(resultErr, fmt.Errorf("unexpected ProxyType %q", l.ProxyType))
}
if l.Listener != "inbound" && l.Listener != "outbound" {
resultErr = multierror.Append(resultErr, fmt.Errorf("unexpected Listener %q", l.Listener))
}
return resultErr
}
// CanApply determines if the extension can apply to the given extension configuration.
func (p lua) CanApply(config xdscommon.ExtensionConfiguration) bool {
return string(config.Kind) == p.ProxyType && p.matchesListenerDirection(config)
func (l *lua) CanApply(config *extensioncommon.RuntimeConfig) bool {
return string(config.Kind) == l.ProxyType && l.matchesListenerDirection(config)
}
func (p lua) matchesListenerDirection(config xdscommon.ExtensionConfiguration) bool {
return (config.IsUpstream() && p.Listener == "outbound") || (!config.IsUpstream() && p.Listener == "inbound")
func (l *lua) matchesListenerDirection(config *extensioncommon.RuntimeConfig) bool {
return (config.IsUpstream() && l.Listener == "outbound") || (!config.IsUpstream() && l.Listener == "inbound")
}
// PatchRoute does nothing.
func (p lua) PatchRoute(route *envoy_route_v3.RouteConfiguration) (*envoy_route_v3.RouteConfiguration, bool, error) {
func (l *lua) PatchRoute(_ *extensioncommon.RuntimeConfig, route *envoy_route_v3.RouteConfiguration) (*envoy_route_v3.RouteConfiguration, bool, error) {
return route, false, nil
}
// PatchCluster does nothing.
func (p lua) PatchCluster(c *envoy_cluster_v3.Cluster) (*envoy_cluster_v3.Cluster, bool, error) {
func (l *lua) PatchCluster(_ *extensioncommon.RuntimeConfig, c *envoy_cluster_v3.Cluster) (*envoy_cluster_v3.Cluster, bool, error) {
return c, false, nil
}
// PatchFilter inserts a lua filter directly prior to envoy.filters.http.router.
func (p lua) PatchFilter(filter *envoy_listener_v3.Filter) (*envoy_listener_v3.Filter, bool, error) {
func (l *lua) PatchFilter(_ *extensioncommon.RuntimeConfig, filter *envoy_listener_v3.Filter) (*envoy_listener_v3.Filter, bool, error) {
if filter.Name != "envoy.filters.network.http_connection_manager" {
return filter, false, nil
}
@ -105,7 +94,7 @@ func (p lua) PatchFilter(filter *envoy_listener_v3.Filter) (*envoy_listener_v3.F
luaHttpFilter, err := makeEnvoyHTTPFilter(
"envoy.filters.http.lua",
&envoy_lua_v3.Lua{
InlineCode: p.Script,
InlineCode: l.Script,
},
)
if err != nil {

View File

@ -5,11 +5,11 @@ import (
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/xds/xdscommon"
"github.com/hashicorp/consul/agent/envoyextensions/extensioncommon"
"github.com/hashicorp/consul/api"
)
func TestMakeLuaPatcher(t *testing.T) {
func TestConstructor(t *testing.T) {
makeArguments := func(overrides map[string]interface{}) map[string]interface{} {
m := map[string]interface{}{
"ProxyType": "connect-proxy",
@ -71,7 +71,7 @@ func TestMakeLuaPatcher(t *testing.T) {
}
svc := api.CompoundServiceName{Name: "svc"}
ext := xdscommon.ExtensionConfiguration{
ext := extensioncommon.RuntimeConfig{
ServiceName: svc,
EnvoyExtension: api.EnvoyExtension{
Name: extensionName,
@ -79,11 +79,11 @@ func TestMakeLuaPatcher(t *testing.T) {
},
}
patcher, err := MakeLuaExtension(ext)
e, err := Constructor(ext.EnvoyExtension)
if tc.ok {
require.NoError(t, err)
require.Equal(t, tc.expected, patcher)
require.Equal(t, &extensioncommon.BasicEnvoyExtender{Extension: &tc.expected}, e)
} else {
require.Error(t, err)
}

View File

@ -14,8 +14,7 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"github.com/hashicorp/consul/agent/xds/builtinextensiontemplate"
"github.com/hashicorp/consul/agent/xds/xdscommon"
"github.com/hashicorp/consul/agent/envoyextensions/extensioncommon"
)
const builtinValidateExtension = "builtin/proxy/validate"
@ -64,13 +63,13 @@ type resource struct {
endpoints int
}
var _ builtinextensiontemplate.Plugin = (*Validate)(nil)
var _ extensioncommon.BasicExtension = (*Validate)(nil)
// EndpointValidator allows us to inject a different function for tests.
type EndpointValidator func(*resource, string, *envoy_admin_v3.Clusters)
// MakeValidate is a builtinextensiontemplate.PluginConstructor for a builtinextensiontemplate.EnvoyExtension.
func MakeValidate(ext xdscommon.ExtensionConfiguration) (builtinextensiontemplate.Plugin, error) {
func MakeValidate(ext extensioncommon.RuntimeConfig) (extensioncommon.BasicExtension, error) {
var resultErr error
var plugin Validate
@ -189,18 +188,18 @@ func DoEndpointValidation(r *resource, sni string, clusters *envoy_admin_v3.Clus
}
// CanApply determines if the extension can apply to the given extension configuration.
func (p *Validate) CanApply(config xdscommon.ExtensionConfiguration) bool {
func (p *Validate) CanApply(config *extensioncommon.RuntimeConfig) bool {
return true
}
func (p *Validate) PatchRoute(route *envoy_route_v3.RouteConfiguration) (*envoy_route_v3.RouteConfiguration, bool, error) {
func (p *Validate) PatchRoute(config *extensioncommon.RuntimeConfig, route *envoy_route_v3.RouteConfiguration) (*envoy_route_v3.RouteConfiguration, bool, error) {
// Route name on connect proxies will be the envoy ID. We are only validating routes for the specific upstream with
// the envoyID configured.
if route.Name != p.envoyID {
return route, false, nil
}
p.route = true
for sni := range builtinextensiontemplate.RouteClusterNames(route) {
for sni := range extensioncommon.RouteClusterNames(route) {
if _, ok := p.resources[sni]; ok {
continue
}
@ -209,7 +208,7 @@ func (p *Validate) PatchRoute(route *envoy_route_v3.RouteConfiguration) (*envoy_
return route, false, nil
}
func (p *Validate) PatchCluster(c *envoy_cluster_v3.Cluster) (*envoy_cluster_v3.Cluster, bool, error) {
func (p *Validate) PatchCluster(config *extensioncommon.RuntimeConfig, c *envoy_cluster_v3.Cluster) (*envoy_cluster_v3.Cluster, bool, error) {
v, ok := p.resources[c.Name]
if !ok {
v = &resource{}
@ -253,7 +252,7 @@ func (p *Validate) PatchCluster(c *envoy_cluster_v3.Cluster) (*envoy_cluster_v3.
return c, false, nil
}
func (p *Validate) PatchFilter(filter *envoy_listener_v3.Filter) (*envoy_listener_v3.Filter, bool, error) {
func (p *Validate) PatchFilter(config *extensioncommon.RuntimeConfig, filter *envoy_listener_v3.Filter) (*envoy_listener_v3.Filter, bool, error) {
// If a single filter exists for a listener we say it exists.
p.listener = true
@ -267,7 +266,7 @@ func (p *Validate) PatchFilter(filter *envoy_listener_v3.Filter) (*envoy_listene
}
// FilterClusterNames handles the filter being an http or tcp filter.
for sni := range builtinextensiontemplate.FilterClusterNames(filter) {
for sni := range extensioncommon.FilterClusterNames(filter) {
// Mark any clusters we see as required resources.
if r, ok := p.resources[sni]; ok {
r.required = true

View File

@ -6,7 +6,7 @@ import (
envoy_admin_v3 "github.com/envoyproxy/go-control-plane/envoy/admin/v3"
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_aggregate_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/clusters/aggregate/v3"
"github.com/hashicorp/consul/agent/xds/xdscommon"
"github.com/hashicorp/consul/agent/envoyextensions/extensioncommon"
"github.com/hashicorp/consul/api"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/anypb"
@ -283,7 +283,7 @@ func TestMakeValidate(t *testing.T) {
}
svc := api.CompoundServiceName{Name: "svc"}
ext := xdscommon.ExtensionConfiguration{
ext := extensioncommon.RuntimeConfig{
ServiceName: svc,
EnvoyExtension: api.EnvoyExtension{
Name: extensionName,

View File

@ -1,4 +1,4 @@
package builtinextensiontemplate
package extensioncommon
import (
"fmt"
@ -17,35 +17,39 @@ import (
"github.com/hashicorp/consul/api"
)
type EnvoyExtension struct {
Constructor PluginConstructor
Plugin Plugin
ready bool
// BasicExtension is the interface that each user of BasicEnvoyExtender must implement. It
// is responsible for modifying the xDS structures based on only the state of
// the extension.
type BasicExtension interface {
// CanApply determines if the extension can mutate resources for the given xdscommon.ExtensionConfiguration.
CanApply(*RuntimeConfig) bool
// PatchRoute patches a route to include the custom Envoy configuration
// required to integrate with the built in extension template.
PatchRoute(*RuntimeConfig, *envoy_route_v3.RouteConfiguration) (*envoy_route_v3.RouteConfiguration, bool, error)
// PatchCluster patches a cluster to include the custom Envoy configuration
// required to integrate with the built in extension template.
PatchCluster(*RuntimeConfig, *envoy_cluster_v3.Cluster) (*envoy_cluster_v3.Cluster, bool, error)
// PatchFilter patches an Envoy filter to include the custom Envoy
// configuration required to integrate with the built in extension template.
PatchFilter(*RuntimeConfig, *envoy_listener_v3.Filter) (*envoy_listener_v3.Filter, bool, error)
}
var _ xdscommon.EnvoyExtension = (*EnvoyExtension)(nil)
var _ EnvoyExtender = (*BasicEnvoyExtender)(nil)
// Validate ensures the data in ExtensionConfiguration can successfuly be used
// to apply the specified Envoy extension.
func (envoyExtension *EnvoyExtension) Validate(config xdscommon.ExtensionConfiguration) error {
plugin, err := envoyExtension.Constructor(config)
envoyExtension.Plugin = plugin
envoyExtension.ready = err == nil
return err
// BasicEnvoyExtender provides convenience functions for iterating and applying modifications
// to Envoy resources.
type BasicEnvoyExtender struct {
Extension BasicExtension
}
// Extend updates indexed xDS structures to include patches for
// built-in extensions. It is responsible for applying Plugins to
// the the appropriate xDS resources. If any portion of this function fails,
// it will attempt to continue and return an error. The caller can then determine
// if it is better to use a partially applied extension or error out.
func (envoyExtension *EnvoyExtension) Extend(resources *xdscommon.IndexedResources, config xdscommon.ExtensionConfiguration) (*xdscommon.IndexedResources, error) {
if !envoyExtension.ready {
panic("envoy extension used without being properly constructed")
}
func (envoyExtension *BasicEnvoyExtender) Validate(config *RuntimeConfig) error {
return nil
}
func (envoyExtender *BasicEnvoyExtender) Extend(resources *xdscommon.IndexedResources, config *RuntimeConfig) (*xdscommon.IndexedResources, error) {
var resultErr error
switch config.Kind {
@ -54,7 +58,7 @@ func (envoyExtension *EnvoyExtension) Extend(resources *xdscommon.IndexedResourc
return resources, nil
}
if !envoyExtension.Plugin.CanApply(config) {
if !envoyExtender.Extension.CanApply(config) {
return resources, nil
}
@ -78,7 +82,7 @@ func (envoyExtension *EnvoyExtension) Extend(resources *xdscommon.IndexedResourc
continue
}
newCluster, patched, err := envoyExtension.Plugin.PatchCluster(resource)
newCluster, patched, err := envoyExtender.Extension.PatchCluster(config, resource)
if err != nil {
resultErr = multierror.Append(resultErr, fmt.Errorf("error patching cluster: %w", err))
continue
@ -88,7 +92,7 @@ func (envoyExtension *EnvoyExtension) Extend(resources *xdscommon.IndexedResourc
}
case *envoy_listener_v3.Listener:
newListener, patched, err := envoyExtension.patchListener(config, resource)
newListener, patched, err := envoyExtender.patchListener(config, resource)
if err != nil {
resultErr = multierror.Append(resultErr, fmt.Errorf("error patching listener: %w", err))
continue
@ -110,7 +114,7 @@ func (envoyExtension *EnvoyExtension) Extend(resources *xdscommon.IndexedResourc
continue
}
newRoute, patched, err := envoyExtension.Plugin.PatchRoute(resource)
newRoute, patched, err := envoyExtender.Extension.PatchRoute(config, resource)
if err != nil {
resultErr = multierror.Append(resultErr, fmt.Errorf("error patching route: %w", err))
continue
@ -127,7 +131,7 @@ func (envoyExtension *EnvoyExtension) Extend(resources *xdscommon.IndexedResourc
return resources, resultErr
}
func (envoyExtension EnvoyExtension) patchListener(config xdscommon.ExtensionConfiguration, l *envoy_listener_v3.Listener) (proto.Message, bool, error) {
func (envoyExtension BasicEnvoyExtender) patchListener(config *RuntimeConfig, l *envoy_listener_v3.Listener) (proto.Message, bool, error) {
switch config.Kind {
case api.ServiceKindTerminatingGateway:
return envoyExtension.patchTerminatingGatewayListener(config, l)
@ -137,7 +141,7 @@ func (envoyExtension EnvoyExtension) patchListener(config xdscommon.ExtensionCon
return l, false, nil
}
func (envoyExtension EnvoyExtension) patchTerminatingGatewayListener(config xdscommon.ExtensionConfiguration, l *envoy_listener_v3.Listener) (proto.Message, bool, error) {
func (b BasicEnvoyExtender) patchTerminatingGatewayListener(config *RuntimeConfig, l *envoy_listener_v3.Listener) (proto.Message, bool, error) {
// We don't support directly targeting terminating gateways with extensions.
if !config.IsUpstream() {
return l, false, nil
@ -160,7 +164,7 @@ func (envoyExtension EnvoyExtension) patchTerminatingGatewayListener(config xdsc
var filters []*envoy_listener_v3.Filter
for _, filter := range filterChain.Filters {
newFilter, ok, err := envoyExtension.Plugin.PatchFilter(filter)
newFilter, ok, err := b.Extension.PatchFilter(config, filter)
if err != nil {
resultErr = multierror.Append(resultErr, fmt.Errorf("error patching listener filter: %w", err))
@ -177,7 +181,7 @@ func (envoyExtension EnvoyExtension) patchTerminatingGatewayListener(config xdsc
return l, patched, resultErr
}
func (envoyExtension EnvoyExtension) patchConnectProxyListener(config xdscommon.ExtensionConfiguration, l *envoy_listener_v3.Listener) (proto.Message, bool, error) {
func (b BasicEnvoyExtender) patchConnectProxyListener(config *RuntimeConfig, l *envoy_listener_v3.Listener) (proto.Message, bool, error) {
var resultErr error
envoyID := ""
@ -186,7 +190,7 @@ func (envoyExtension EnvoyExtension) patchConnectProxyListener(config xdscommon.
}
if config.IsUpstream() && envoyID == xdscommon.OutboundListenerName {
return envoyExtension.patchTProxyListener(config, l)
return b.patchTProxyListener(config, l)
}
// If the Envoy extension configuration is for an upstream service, the listener's
@ -207,7 +211,7 @@ func (envoyExtension EnvoyExtension) patchConnectProxyListener(config xdscommon.
var filters []*envoy_listener_v3.Filter
for _, filter := range filterChain.Filters {
newFilter, ok, err := envoyExtension.Plugin.PatchFilter(filter)
newFilter, ok, err := b.Extension.PatchFilter(config, filter)
if err != nil {
resultErr = multierror.Append(resultErr, fmt.Errorf("error patching listener filter: %w", err))
filters = append(filters, filter)
@ -224,7 +228,7 @@ func (envoyExtension EnvoyExtension) patchConnectProxyListener(config xdscommon.
return l, patched, resultErr
}
func (envoyExtension EnvoyExtension) patchTProxyListener(config xdscommon.ExtensionConfiguration, l *envoy_listener_v3.Listener) (proto.Message, bool, error) {
func (b BasicEnvoyExtender) patchTProxyListener(config *RuntimeConfig, l *envoy_listener_v3.Listener) (proto.Message, bool, error) {
var resultErr error
patched := false
@ -239,7 +243,7 @@ func (envoyExtension EnvoyExtension) patchTProxyListener(config xdscommon.Extens
}
for _, filter := range filterChain.Filters {
newFilter, ok, err := envoyExtension.Plugin.PatchFilter(filter)
newFilter, ok, err := b.Extension.PatchFilter(config, filter)
if err != nil {
resultErr = multierror.Append(resultErr, fmt.Errorf("error patching listener filter: %w", err))
filters = append(filters, filter)

View File

@ -0,0 +1,21 @@
package extensioncommon
import (
"github.com/hashicorp/consul/agent/xds/xdscommon"
)
// EnvoyExtender is the interface that all Envoy extensions must implement in order
// to be dynamically executed during runtime.
type EnvoyExtender interface {
// Validate ensures the data in config can successfuly be used
// to apply the specified Envoy extension.
Validate(*RuntimeConfig) error
// Extend updates indexed xDS structures to include patches for
// built-in extensions. It is responsible for applying extensions to
// the the appropriate xDS resources. If any portion of this function fails,
// it will attempt continue and return an error. The caller can then determine
// if it is better to use a partially applied extension or error out.
Extend(*xdscommon.IndexedResources, *RuntimeConfig) (*xdscommon.IndexedResources, error)
}

View File

@ -0,0 +1,63 @@
package extensioncommon
import "github.com/hashicorp/consul/api"
// UpstreamData has the SNI, EnvoyID, and OutgoingProxyKind of the upstream services for the local proxy and this data
// is used to choose which Envoy resources to patch.
type UpstreamData struct {
// SNI is the SNI header used to reach an upstream service.
SNI map[string]struct{}
// EnvoyID is the envoy ID of an upstream service, structured <service> or <partition>/<ns>/<service> when using a
// non-default namespace or partition.
EnvoyID string
// OutgoingProxyKind is the type of proxy of the upstream service. However, if the upstream is "typical" this will
// be set to "connect-proxy" instead.
OutgoingProxyKind api.ServiceKind
// VIP is the tproxy virtual IP used to reach an upstream service.
VIP string
}
// RuntimeConfig is the configuration for an extension attached to a service on the local proxy. Currently, it
// is only created for the local proxy's upstream service if the upstream service has an extension configured. In the
// future it will also include information about the service local to the local proxy as well. It should depend on the
// API client rather than the structs package because the API client is meant to be public.
type RuntimeConfig struct {
// EnvoyExtension is the extension that will patch Envoy resources.
EnvoyExtension api.EnvoyExtension
// ServiceName is the name of the service the EnvoyExtension is being applied to. It could be the local service or
// an upstream of the local service.
ServiceName api.CompoundServiceName
// Upstreams will only be configured if the EnvoyExtension is being applied to an upstream.
// If there are no Upstreams, then EnvoyExtension is being applied to the local service's resources.
Upstreams map[api.CompoundServiceName]UpstreamData
// Kind is mode the local Envoy proxy is running in. For now, only connect proxy and
// terminating gateways are supported.
Kind api.ServiceKind
}
func (ec RuntimeConfig) IsUpstream() bool {
_, ok := ec.Upstreams[ec.ServiceName]
return ok
}
func (ec RuntimeConfig) MatchesUpstreamServiceSNI(sni string) bool {
u := ec.Upstreams[ec.ServiceName]
_, match := u.SNI[sni]
return match
}
func (ec RuntimeConfig) EnvoyID() string {
u := ec.Upstreams[ec.ServiceName]
return u.EnvoyID
}
func (ec RuntimeConfig) OutgoingProxyKind() api.ServiceKind {
u := ec.Upstreams[ec.ServiceName]
return u.OutgoingProxyKind
}

View File

@ -0,0 +1,52 @@
package extensioncommon
import (
"testing"
"github.com/hashicorp/consul/api"
"github.com/stretchr/testify/require"
)
func makeTestRuntimeConfig() RuntimeConfig {
sn := api.CompoundServiceName{Name: "api"}
rc := RuntimeConfig{
Kind: api.ServiceKindConnectProxy,
ServiceName: sn,
Upstreams: map[api.CompoundServiceName]UpstreamData{
sn: {
EnvoyID: "eid",
OutgoingProxyKind: api.ServiceKindTerminatingGateway,
SNI: map[string]struct{}{
"sni1": {},
"sni2": {},
},
},
},
}
return rc
}
func TestRuntimeConfig_IsUpstream(t *testing.T) {
rc := makeTestRuntimeConfig()
require.True(t, rc.IsUpstream())
delete(rc.Upstreams, rc.ServiceName)
require.False(t, rc.IsUpstream())
}
func TestRuntimeConfig_MatchesUpstreamServiceSNI(t *testing.T) {
rc := makeTestRuntimeConfig()
require.True(t, rc.MatchesUpstreamServiceSNI("sni1"))
require.True(t, rc.MatchesUpstreamServiceSNI("sni2"))
require.False(t, rc.MatchesUpstreamServiceSNI("sni3"))
}
func TestRuntimeConfig_EnvoyID(t *testing.T) {
rc := makeTestRuntimeConfig()
require.Equal(t, "eid", rc.EnvoyID())
}
func TestRuntimeConfig_OutgoingProxyKind(t *testing.T) {
rc := makeTestRuntimeConfig()
require.Equal(t, api.ServiceKindTerminatingGateway, rc.OutgoingProxyKind())
}

View File

@ -0,0 +1,49 @@
package envoyextensions
import (
"fmt"
awslambda "github.com/hashicorp/consul/agent/envoyextensions/builtin/aws-lambda"
"github.com/hashicorp/consul/agent/envoyextensions/builtin/lua"
"github.com/hashicorp/consul/agent/envoyextensions/extensioncommon"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-multierror"
)
type extensionConstructor func(api.EnvoyExtension) (extensioncommon.EnvoyExtender, error)
var extensionConstructors = map[string]extensionConstructor{
api.BuiltinLuaExtension: lua.Constructor,
api.BuiltinAWSLambdaExtension: awslambda.Constructor,
}
// ConstructExtension attempts to lookup and build an extension from the registry with the
// given config. Returns an error if the extension does not exist, or if the extension fails
// to be constructed properly.
func ConstructExtension(ext api.EnvoyExtension) (extensioncommon.EnvoyExtender, error) {
constructor, ok := extensionConstructors[ext.Name]
if !ok {
return nil, fmt.Errorf("name %q is not a built-in extension", ext.Name)
}
return constructor(ext)
}
// ValidateExtensions will attempt to construct each instance of the given envoy extension configurations
// and returns an error if any fail to build. Note that this step is separated from the xds package and
// does not check any potential runtime configuration that the extension could encounter -- it simply
// ensures that the extension can be built from the given arguments.
func ValidateExtensions(extensions []api.EnvoyExtension) error {
var output error
for i, ext := range extensions {
if ext.Name == "" {
output = multierror.Append(output, fmt.Errorf("invalid EnvoyExtensions[%d]: Name is required", i))
continue
}
_, err := ConstructExtension(ext)
if err != nil {
output = multierror.Append(output, fmt.Errorf("invalid EnvoyExtensions[%d][%s]: %w", i, ext.Name, err))
continue
}
}
return output
}

View File

@ -0,0 +1,60 @@
package envoyextensions
import (
"testing"
"github.com/hashicorp/consul/api"
"github.com/stretchr/testify/require"
)
func TestValidateExtensions(t *testing.T) {
tests := map[string]struct {
input []api.EnvoyExtension
expectErrs []string
}{
"missing name": {
input: []api.EnvoyExtension{{}},
expectErrs: []string{"Name is required"},
},
"bad name": {
input: []api.EnvoyExtension{{
Name: "bad",
}},
expectErrs: []string{"not a built-in extension"},
},
"multiple errors": {
input: []api.EnvoyExtension{
{},
{
Name: "bad",
},
},
expectErrs: []string{
"invalid EnvoyExtensions[0]: Name is required",
"invalid EnvoyExtensions[1][bad]:",
},
},
"invalid arguments to constructor": {
input: []api.EnvoyExtension{{
Name: "builtin/lua",
}},
expectErrs: []string{
"invalid EnvoyExtensions[0][builtin/lua]",
"missing Script value",
},
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
err := ValidateExtensions(tc.input)
if len(tc.expectErrs) == 0 {
require.NoError(t, err)
return
}
for _, e := range tc.expectErrs {
require.ErrorContains(t, err, e)
}
})
}
}

View File

@ -4,6 +4,7 @@ import (
"github.com/mitchellh/go-testing-interface"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
)
func TestConfigSnapshotTerminatingGateway(t testing.T, populateServices bool, nsFn func(ns *structs.NodeService), extraUpdates []UpdateEvent) *ConfigSnapshot {
@ -952,7 +953,7 @@ func TestConfigSnapshotTerminatingGatewayWithLambdaService(t testing.T, extraUpd
ProxyConfig: map[string]interface{}{"protocol": "http"},
EnvoyExtensions: []structs.EnvoyExtension{
{
Name: structs.BuiltinAWSLambdaExtension,
Name: api.BuiltinAWSLambdaExtension,
Arguments: map[string]interface{}{
"ARN": "arn:aws:lambda:us-east-1:111111111111:function:lambda-1234",
"PayloadPassthrough": true,

View File

@ -8,7 +8,6 @@ import (
"strings"
"time"
"github.com/hashicorp/consul/api"
"github.com/miekg/dns"
"github.com/hashicorp/go-multierror"
@ -19,6 +18,7 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/envoyextensions"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/lib/decode"
)
@ -66,11 +66,6 @@ var AllConfigEntryKinds = []string{
InlineCertificate,
}
const (
BuiltinAWSLambdaExtension string = "builtin/aws/lambda"
BuiltinLuaExtension string = "builtin/lua"
)
// ConfigEntry is the interface for centralized configuration stored in Raft.
// Currently only service-defaults and proxy-defaults are supported.
type ConfigEntry interface {
@ -139,7 +134,7 @@ type ServiceConfigEntry struct {
LocalConnectTimeoutMs int `json:",omitempty" alias:"local_connect_timeout_ms"`
LocalRequestTimeoutMs int `json:",omitempty" alias:"local_request_timeout_ms"`
BalanceInboundConnections string `json:",omitempty" alias:"balance_inbound_connections"`
EnvoyExtensions []EnvoyExtension `json:",omitempty" alias:"envoy_extensions"`
EnvoyExtensions EnvoyExtensions `json:",omitempty" alias:"envoy_extensions"`
Meta map[string]string `json:",omitempty"`
acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
@ -260,7 +255,7 @@ func (e *ServiceConfigEntry) Validate() error {
}
}
if err := validateEnvoyExtensions(e.EnvoyExtensions); err != nil {
if err := envoyextensions.ValidateExtensions(e.EnvoyExtensions.ToAPI()); err != nil {
validationErr = multierror.Append(validationErr, err)
}
@ -311,52 +306,6 @@ func (e *ServiceConfigEntry) GetEnterpriseMeta() *acl.EnterpriseMeta {
return &e.EnterpriseMeta
}
// EnvoyExtension has configuration for an extension that patches Envoy resources.
type EnvoyExtension struct {
Name string
Required bool
Arguments map[string]interface{} `bexpr:"-"`
}
type EnvoyExtensions []EnvoyExtension
func (es EnvoyExtensions) ToAPI() []api.EnvoyExtension {
extensions := make([]api.EnvoyExtension, len(es))
for i, e := range es {
extensions[i] = api.EnvoyExtension{
Name: e.Name,
Required: e.Required,
Arguments: e.Arguments,
}
}
return extensions
}
func builtInExtension(name string) bool {
extensions := map[string]struct{}{
BuiltinAWSLambdaExtension: {},
BuiltinLuaExtension: {},
}
_, ok := extensions[name]
return ok
}
func validateEnvoyExtensions(extensions []EnvoyExtension) error {
var err error
for i, extension := range extensions {
if extension.Name == "" {
err = multierror.Append(err, fmt.Errorf("invalid EnvoyExtensions[%d]: Name is required", i))
}
if !builtInExtension(extension.Name) {
err = multierror.Append(err, fmt.Errorf("invalid EnvoyExtensions[%d]: Name %q is not a built-in extension", i, extension.Name))
}
}
return err
}
type UpstreamConfiguration struct {
// Overrides is a slice of per-service configuration. The name field is
// required.
@ -418,7 +367,7 @@ type ProxyConfigEntry struct {
MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway"`
Expose ExposeConfig `json:",omitempty"`
AccessLogs AccessLogsConfig `json:",omitempty" alias:"access_logs"`
EnvoyExtensions []EnvoyExtension `json:",omitempty" alias:"envoy_extensions"`
EnvoyExtensions EnvoyExtensions `json:",omitempty" alias:"envoy_extensions"`
Meta map[string]string `json:",omitempty"`
acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
@ -481,7 +430,7 @@ func (e *ProxyConfigEntry) Validate() error {
return err
}
if err := validateEnvoyExtensions(e.EnvoyExtensions); err != nil {
if err := envoyextensions.ValidateExtensions(e.EnvoyExtensions.ToAPI()); err != nil {
return err
}

View File

@ -16,6 +16,7 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/types"
)
@ -2831,16 +2832,19 @@ func TestServiceConfigEntry(t *testing.T) {
},
},
},
validateErr: `invalid EnvoyExtensions[0]: Name "not-a-builtin" is not a built-in extension`,
validateErr: `name "not-a-builtin" is not a built-in extension`,
},
"validate: valid extension name": {
"validate: valid extension": {
entry: &ServiceConfigEntry{
Kind: ServiceDefaults,
Name: "external",
Protocol: "http",
EnvoyExtensions: []EnvoyExtension{
{
Name: BuiltinAWSLambdaExtension,
Name: api.BuiltinAWSLambdaExtension,
Arguments: map[string]interface{}{
"ARN": "some-arn",
},
},
},
},

View File

@ -0,0 +1,26 @@
package structs
import (
"github.com/hashicorp/consul/api"
)
// EnvoyExtension has configuration for an extension that patches Envoy resources.
type EnvoyExtension struct {
Name string
Required bool
Arguments map[string]interface{} `bexpr:"-"`
}
type EnvoyExtensions []EnvoyExtension
func (es EnvoyExtensions) ToAPI() []api.EnvoyExtension {
extensions := make([]api.EnvoyExtension, len(es))
for i, e := range es {
extensions[i] = api.EnvoyExtension{
Name: e.Name,
Required: e.Required,
Arguments: e.Arguments,
}
}
return extensions
}

View File

@ -1,186 +0,0 @@
package lambda
import (
"fmt"
"testing"
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
envoy_tls_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
pstruct "google.golang.org/protobuf/types/known/structpb"
"github.com/hashicorp/consul/agent/xds/xdscommon"
"github.com/hashicorp/consul/api"
)
func TestMakeLambdaExtension(t *testing.T) {
kind := api.ServiceKindTerminatingGateway
cases := map[string]struct {
extensionName string
arn string
payloadPassthrough bool
region string
expected lambda
ok bool
}{
"no arguments": {
ok: false,
},
"a bad name": {
arn: "arn",
region: "blah",
extensionName: "bad",
ok: false,
},
"missing arn": {
region: "blah",
ok: false,
},
"including payload passthrough": {
arn: "arn",
region: "blah",
payloadPassthrough: true,
expected: lambda{
ARN: "arn",
PayloadPassthrough: true,
Kind: kind,
},
ok: true,
},
}
for n, tc := range cases {
t.Run(n, func(t *testing.T) {
extensionName := api.BuiltinAWSLambdaExtension
if tc.extensionName != "" {
extensionName = tc.extensionName
}
svc := api.CompoundServiceName{Name: "svc"}
ext := xdscommon.ExtensionConfiguration{
ServiceName: svc,
Upstreams: map[api.CompoundServiceName]xdscommon.UpstreamData{
svc: {OutgoingProxyKind: kind},
},
EnvoyExtension: api.EnvoyExtension{
Name: extensionName,
Arguments: map[string]interface{}{
"ARN": tc.arn,
"PayloadPassthrough": tc.payloadPassthrough,
},
},
}
plugin, err := MakeLambdaExtension(ext)
if tc.ok {
require.NoError(t, err)
require.Equal(t, tc.expected, plugin)
} else {
require.Error(t, err)
}
})
}
}
func TestPatchCluster(t *testing.T) {
cases := []struct {
name string
lambda lambda
input *envoy_cluster_v3.Cluster
expectedRegion string
isErrExpected bool
}{
{
name: "nominal",
input: &envoy_cluster_v3.Cluster{
Name: "test-cluster",
},
lambda: lambda{
ARN: "arn:aws:lambda:us-east-1:111111111111:function:lambda-1234",
PayloadPassthrough: true,
Kind: "some-name",
InvocationMode: "Asynchronous",
},
expectedRegion: "us-east-1",
},
{
name: "error invalid arn",
input: &envoy_cluster_v3.Cluster{
Name: "test-cluster",
},
lambda: lambda{
ARN: "?!@%^SA",
PayloadPassthrough: true,
Kind: "some-name",
InvocationMode: "Asynchronous",
},
isErrExpected: true,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
transportSocket, err := makeUpstreamTLSTransportSocket(&envoy_tls_v3.UpstreamTlsContext{
Sni: "*.amazonaws.com",
})
require.NoError(t, err)
expectedCluster := &envoy_cluster_v3.Cluster{
Name: tc.input.Name,
ConnectTimeout: tc.input.ConnectTimeout,
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{Type: envoy_cluster_v3.Cluster_LOGICAL_DNS},
DnsLookupFamily: envoy_cluster_v3.Cluster_V4_ONLY,
LbPolicy: envoy_cluster_v3.Cluster_ROUND_ROBIN,
Metadata: &envoy_core_v3.Metadata{
FilterMetadata: map[string]*pstruct.Struct{
"com.amazonaws.lambda": {
Fields: map[string]*pstruct.Value{
"egress_gateway": {Kind: &pstruct.Value_BoolValue{BoolValue: true}},
},
},
},
},
LoadAssignment: &envoy_endpoint_v3.ClusterLoadAssignment{
ClusterName: tc.input.Name,
Endpoints: []*envoy_endpoint_v3.LocalityLbEndpoints{
{
LbEndpoints: []*envoy_endpoint_v3.LbEndpoint{
{
HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{
Endpoint: &envoy_endpoint_v3.Endpoint{
Address: &envoy_core_v3.Address{
Address: &envoy_core_v3.Address_SocketAddress{
SocketAddress: &envoy_core_v3.SocketAddress{
Address: fmt.Sprintf("lambda.%s.amazonaws.com", tc.expectedRegion),
PortSpecifier: &envoy_core_v3.SocketAddress_PortValue{
PortValue: 443,
},
},
},
},
},
},
},
},
},
},
},
TransportSocket: transportSocket,
}
// Test patching the cluster
patchedCluster, patchSuccess, err := tc.lambda.PatchCluster(tc.input)
if tc.isErrExpected {
assert.Error(t, err)
assert.False(t, patchSuccess)
} else {
assert.NoError(t, err)
assert.True(t, patchSuccess)
assert.Equal(t, expectedCluster, patchedCluster)
}
})
}
}

View File

@ -1,35 +0,0 @@
package builtinextensiontemplate
import (
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
envoy_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
"github.com/hashicorp/consul/agent/xds/xdscommon"
)
// Plugin is the interface that each extension must implement. It
// is responsible for modifying the xDS structures based on only the state of
// the extension.
type Plugin interface {
// CanApply determines if the extension can mutate resources for the given xdscommon.ExtensionConfiguration.
CanApply(xdscommon.ExtensionConfiguration) bool
// PatchRoute patches a route to include the custom Envoy configuration
// required to integrate with the built in extension template.
PatchRoute(*envoy_route_v3.RouteConfiguration) (*envoy_route_v3.RouteConfiguration, bool, error)
// PatchCluster patches a cluster to include the custom Envoy configuration
// required to integrate with the built in extension template.
PatchCluster(*envoy_cluster_v3.Cluster) (*envoy_cluster_v3.Cluster, bool, error)
// PatchFilter patches an Envoy filter to include the custom Envoy
// configuration required to integrate with the built in extension template.
PatchFilter(*envoy_listener_v3.Filter) (*envoy_listener_v3.Filter, bool, error)
}
// PluginConstructor is used to construct a plugin based on
// xdscommon.ExtensionConfiguration. This function should contain all the logic around
// turning an extension's arguments into a plugin. The PluginConstructor will be used
// as the Constructor field on an EnvoyExtension.
type PluginConstructor func(extension xdscommon.ExtensionConfiguration) (Plugin, error)

View File

@ -23,10 +23,12 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"github.com/hashicorp/consul/agent/envoyextensions"
external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/extensionruntime"
"github.com/hashicorp/consul/agent/xds/xdscommon"
"github.com/hashicorp/consul/logging"
)
@ -393,52 +395,50 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
}
func (s *Server) applyEnvoyExtensions(resources *xdscommon.IndexedResources, cfgSnap *proxycfg.ConfigSnapshot) error {
var err error
cfgs := xdscommon.GetExtensionConfigurations(cfgSnap)
for _, extensions := range cfgs {
for _, ext := range extensions {
serviceConfigs := extensionruntime.GetRuntimeConfigurations(cfgSnap)
for _, cfgs := range serviceConfigs {
for _, cfg := range cfgs {
logFn := s.Logger.Warn
if ext.EnvoyExtension.Required {
if cfg.EnvoyExtension.Required {
logFn = s.Logger.Error
}
extensionContext := []interface{}{
"extension", ext.EnvoyExtension.Name,
"service", ext.ServiceName.Name,
"namespace", ext.ServiceName.Namespace,
"partition", ext.ServiceName.Partition,
errorParams := []interface{}{
"extension", cfg.EnvoyExtension.Name,
"service", cfg.ServiceName.Name,
"namespace", cfg.ServiceName.Namespace,
"partition", cfg.ServiceName.Partition,
}
extension, ok := GetBuiltInExtension(ext)
if !ok {
logFn("failed to find extension", extensionContext...)
if ext.EnvoyExtension.Required {
return status.Errorf(codes.Unavailable, "failed to find extension %q for service %q", ext.EnvoyExtension.Name, ext.ServiceName.Name)
}
continue
}
err = extension.Validate(ext)
extender, err := envoyextensions.ConstructExtension(cfg.EnvoyExtension)
if err != nil {
extensionContext = append(extensionContext, "error", err)
logFn("failed to validate extension arguments", extensionContext...)
if ext.EnvoyExtension.Required {
return status.Errorf(codes.Unavailable, "failed to validate arguments for extension %q for service %q", ext.EnvoyExtension.Name, ext.ServiceName.Name)
logFn("failed to construct extension", errorParams...)
if cfg.EnvoyExtension.Required {
return status.Errorf(codes.Unavailable, "failed to construct extension %q for service %q", cfg.EnvoyExtension.Name, cfg.ServiceName.Name)
}
continue
}
resources, err = extension.Extend(resources, ext)
err = extender.Validate(&cfg)
if err != nil {
errorParams = append(errorParams, "error", err)
logFn("failed to validate extension arguments", errorParams...)
if cfg.EnvoyExtension.Required {
return status.Errorf(codes.Unavailable, "failed to validate arguments for extension %q for service %q", cfg.EnvoyExtension.Name, cfg.ServiceName.Name)
}
continue
}
resources, err = extender.Extend(resources, &cfg)
if err == nil {
continue
}
logFn("failed to apply envoy extension", extensionContext...)
if ext.EnvoyExtension.Required {
return status.Errorf(codes.Unavailable, "failed to patch xDS resources in the %q plugin: %v", ext.EnvoyExtension.Name, err)
logFn("failed to apply envoy extension", errorParams...)
if cfg.EnvoyExtension.Required {
return status.Errorf(codes.Unavailable, "failed to patch xDS resources in the %q extension: %v", cfg.EnvoyExtension.Name, err)
}
}
}

View File

@ -16,15 +16,17 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"github.com/hashicorp/consul/agent/envoyextensions"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/extensionruntime"
"github.com/hashicorp/consul/agent/xds/proxysupport"
"github.com/hashicorp/consul/agent/xds/xdscommon"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil"
)
func TestBuiltinExtensionsFromSnapshot(t *testing.T) {
func TestEnvoyExtenderWithSnapshot(t *testing.T) {
// If opposite is true, the returned service defaults config entry will have
// payload-passthrough=true and invocation-mode=asynchronous.
// Otherwise payload-passthrough=false and invocation-mode=synchronous.
@ -208,14 +210,14 @@ end`,
require.NoError(t, err)
indexedResources := indexResources(g.Logger, res)
cfgs := xdscommon.GetExtensionConfigurations(snap)
cfgs := extensionruntime.GetRuntimeConfigurations(snap)
for _, extensions := range cfgs {
for _, ext := range extensions {
builtInExtension, ok := GetBuiltInExtension(ext)
require.True(t, ok)
err = builtInExtension.Validate(ext)
extender, err := envoyextensions.ConstructExtension(ext.EnvoyExtension)
require.NoError(t, err)
indexedResources, err = builtInExtension.Extend(indexedResources, ext)
err = extender.Validate(&ext)
require.NoError(t, err)
indexedResources, err = extender.Extend(indexedResources, &ext)
require.NoError(t, err)
}
}

View File

@ -0,0 +1,156 @@
package extensionruntime
import (
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/envoyextensions/extensioncommon"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
)
func GetRuntimeConfigurations(cfgSnap *proxycfg.ConfigSnapshot) map[api.CompoundServiceName][]extensioncommon.RuntimeConfig {
extensionsMap := make(map[api.CompoundServiceName][]api.EnvoyExtension)
upstreamMap := make(map[api.CompoundServiceName]extensioncommon.UpstreamData)
var kind api.ServiceKind
extensionConfigurationsMap := make(map[api.CompoundServiceName][]extensioncommon.RuntimeConfig)
trustDomain := ""
if cfgSnap.Roots != nil {
trustDomain = cfgSnap.Roots.TrustDomain
}
switch cfgSnap.Kind {
case structs.ServiceKindConnectProxy:
kind = api.ServiceKindConnectProxy
outgoingKindByService := make(map[api.CompoundServiceName]api.ServiceKind)
vipForService := make(map[api.CompoundServiceName]string)
for uid, upstreamData := range cfgSnap.ConnectProxy.WatchedUpstreamEndpoints {
sn := upstreamIDToCompoundServiceName(uid)
for _, serviceNodes := range upstreamData {
for _, serviceNode := range serviceNodes {
if serviceNode.Service == nil {
continue
}
vip := serviceNode.Service.TaggedAddresses[structs.TaggedAddressVirtualIP].Address
if vip != "" {
if _, ok := vipForService[sn]; !ok {
vipForService[sn] = vip
}
}
// Store the upstream's kind, and for ServiceKindTypical we don't do anything because we'll default
// any unset upstreams to ServiceKindConnectProxy below.
switch serviceNode.Service.Kind {
case structs.ServiceKindTypical:
default:
outgoingKindByService[sn] = api.ServiceKind(serviceNode.Service.Kind)
}
// We only need the kind from one instance, so break once we find it.
break
}
}
}
// TODO(peering): consider PeerUpstreamEndpoints in addition to DiscoveryChain
// These are the discovery chains for upstreams which have the Envoy Extensions applied to the local service.
for uid, dc := range cfgSnap.ConnectProxy.DiscoveryChain {
compoundServiceName := upstreamIDToCompoundServiceName(uid)
extensionsMap[compoundServiceName] = convertEnvoyExtensions(dc.EnvoyExtensions)
meta := uid.EnterpriseMeta
sni := connect.ServiceSNI(uid.Name, "", meta.NamespaceOrDefault(), meta.PartitionOrDefault(), cfgSnap.Datacenter, trustDomain)
outgoingKind, ok := outgoingKindByService[compoundServiceName]
if !ok {
outgoingKind = api.ServiceKindConnectProxy
}
upstreamMap[compoundServiceName] = extensioncommon.UpstreamData{
SNI: map[string]struct{}{sni: {}},
VIP: vipForService[compoundServiceName],
EnvoyID: uid.EnvoyID(),
OutgoingProxyKind: outgoingKind,
}
}
// Adds extensions configured for the local service to the RuntimeConfig. This only applies to
// connect-proxies because extensions are either global or tied to a specific service, so the terminating
// gateway's Envoy resources for the local service (i.e not to upstreams) would never need to be modified.
localSvc := api.CompoundServiceName{
Name: cfgSnap.Proxy.DestinationServiceName,
Namespace: cfgSnap.ProxyID.NamespaceOrDefault(),
Partition: cfgSnap.ProxyID.PartitionOrEmpty(),
}
extensionConfigurationsMap[localSvc] = []extensioncommon.RuntimeConfig{}
cfgSnapExts := convertEnvoyExtensions(cfgSnap.Proxy.EnvoyExtensions)
for _, ext := range cfgSnapExts {
extCfg := extensioncommon.RuntimeConfig{
EnvoyExtension: ext,
ServiceName: localSvc,
// Upstreams is nil to signify this extension is not being applied to an upstream service, but rather to the local service.
Upstreams: nil,
Kind: kind,
}
extensionConfigurationsMap[localSvc] = append(extensionConfigurationsMap[localSvc], extCfg)
}
case structs.ServiceKindTerminatingGateway:
kind = api.ServiceKindTerminatingGateway
for svc, c := range cfgSnap.TerminatingGateway.ServiceConfigs {
compoundServiceName := serviceNameToCompoundServiceName(svc)
extensionsMap[compoundServiceName] = convertEnvoyExtensions(c.EnvoyExtensions)
sni := connect.ServiceSNI(svc.Name, "", svc.NamespaceOrDefault(), svc.PartitionOrDefault(), cfgSnap.Datacenter, trustDomain)
envoyID := proxycfg.NewUpstreamIDFromServiceName(svc)
snis := map[string]struct{}{sni: {}}
resolver, hasResolver := cfgSnap.TerminatingGateway.ServiceResolvers[svc]
if hasResolver {
for subsetName := range resolver.Subsets {
sni := connect.ServiceSNI(svc.Name, subsetName, svc.NamespaceOrDefault(), svc.PartitionOrDefault(), cfgSnap.Datacenter, trustDomain)
snis[sni] = struct{}{}
}
}
upstreamMap[compoundServiceName] = extensioncommon.UpstreamData{
SNI: snis,
EnvoyID: envoyID.EnvoyID(),
OutgoingProxyKind: api.ServiceKindTerminatingGateway,
}
}
}
for svc, exts := range extensionsMap {
extensionConfigurationsMap[svc] = []extensioncommon.RuntimeConfig{}
for _, ext := range exts {
extCfg := extensioncommon.RuntimeConfig{
EnvoyExtension: ext,
Kind: kind,
ServiceName: svc,
Upstreams: upstreamMap,
}
extensionConfigurationsMap[svc] = append(extensionConfigurationsMap[svc], extCfg)
}
}
return extensionConfigurationsMap
}
func serviceNameToCompoundServiceName(svc structs.ServiceName) api.CompoundServiceName {
return api.CompoundServiceName{
Name: svc.Name,
Partition: svc.PartitionOrDefault(),
Namespace: svc.NamespaceOrDefault(),
}
}
func upstreamIDToCompoundServiceName(uid proxycfg.UpstreamID) api.CompoundServiceName {
return api.CompoundServiceName{
Name: uid.Name,
Partition: uid.PartitionOrDefault(),
Namespace: uid.NamespaceOrDefault(),
}
}
func convertEnvoyExtensions(structExtensions structs.EnvoyExtensions) []api.EnvoyExtension {
return structExtensions.ToAPI()
}

View File

@ -1,19 +1,20 @@
//go:build !consulent
// +build !consulent
package xdscommon
package extensionruntime
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/envoyextensions/extensioncommon"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
)
func TestGetExtensionConfigurations_TerminatingGateway(t *testing.T) {
func TestGetRuntimeConfigurations_TerminatingGateway(t *testing.T) {
snap := proxycfg.TestConfigSnapshotTerminatingGatewayWithLambdaServiceAndServiceResolvers(t)
webService := api.CompoundServiceName{
@ -37,21 +38,21 @@ func TestGetExtensionConfigurations_TerminatingGateway(t *testing.T) {
Partition: "default",
}
expected := map[api.CompoundServiceName][]ExtensionConfiguration{
expected := map[api.CompoundServiceName][]extensioncommon.RuntimeConfig{
apiService: {},
cacheService: {},
dbService: {},
webService: {
{
EnvoyExtension: api.EnvoyExtension{
Name: structs.BuiltinAWSLambdaExtension,
Name: api.BuiltinAWSLambdaExtension,
Arguments: map[string]interface{}{
"ARN": "arn:aws:lambda:us-east-1:111111111111:function:lambda-1234",
"PayloadPassthrough": true,
},
},
ServiceName: webService,
Upstreams: map[api.CompoundServiceName]UpstreamData{
Upstreams: map[api.CompoundServiceName]extensioncommon.UpstreamData{
apiService: {
SNI: map[string]struct{}{
"api.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul": {},
@ -88,10 +89,10 @@ func TestGetExtensionConfigurations_TerminatingGateway(t *testing.T) {
},
}
require.Equal(t, expected, GetExtensionConfigurations(snap))
require.Equal(t, expected, GetRuntimeConfigurations(snap))
}
func TestGetExtensionConfigurations_ConnectProxy(t *testing.T) {
func TestGetRuntimeConfigurations_ConnectProxy(t *testing.T) {
dbService := api.CompoundServiceName{
Name: "db",
Partition: "default",
@ -106,7 +107,7 @@ func TestGetExtensionConfigurations_ConnectProxy(t *testing.T) {
// Setup multiple extensions to ensure all of them are in the ExtensionConfiguration map.
envoyExtensions := []structs.EnvoyExtension{
{
Name: structs.BuiltinAWSLambdaExtension,
Name: api.BuiltinAWSLambdaExtension,
Arguments: map[string]interface{}{
"ARN": "arn:aws:lambda:us-east-1:111111111111:function:lambda-1234",
"PayloadPassthrough": true,
@ -139,23 +140,23 @@ func TestGetExtensionConfigurations_ConnectProxy(t *testing.T) {
type testCase struct {
snapshot *proxycfg.ConfigSnapshot
expected map[api.CompoundServiceName][]ExtensionConfiguration
expected map[api.CompoundServiceName][]extensioncommon.RuntimeConfig
}
cases := map[string]testCase{
"connect proxy upstream": {
snapshot: snapConnect,
expected: map[api.CompoundServiceName][]ExtensionConfiguration{
expected: map[api.CompoundServiceName][]extensioncommon.RuntimeConfig{
dbService: {
{
EnvoyExtension: api.EnvoyExtension{
Name: structs.BuiltinAWSLambdaExtension,
Name: api.BuiltinAWSLambdaExtension,
Arguments: map[string]interface{}{
"ARN": "arn:aws:lambda:us-east-1:111111111111:function:lambda-1234",
"PayloadPassthrough": true,
},
},
ServiceName: dbService,
Upstreams: map[api.CompoundServiceName]UpstreamData{
Upstreams: map[api.CompoundServiceName]extensioncommon.UpstreamData{
dbService: {
SNI: map[string]struct{}{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul": {},
@ -175,7 +176,7 @@ func TestGetExtensionConfigurations_ConnectProxy(t *testing.T) {
},
},
ServiceName: dbService,
Upstreams: map[api.CompoundServiceName]UpstreamData{
Upstreams: map[api.CompoundServiceName]extensioncommon.UpstreamData{
dbService: {
SNI: map[string]struct{}{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul": {},
@ -192,18 +193,18 @@ func TestGetExtensionConfigurations_ConnectProxy(t *testing.T) {
},
"terminating gateway upstream": {
snapshot: snapTermGw,
expected: map[api.CompoundServiceName][]ExtensionConfiguration{
expected: map[api.CompoundServiceName][]extensioncommon.RuntimeConfig{
dbService: {
{
EnvoyExtension: api.EnvoyExtension{
Name: structs.BuiltinAWSLambdaExtension,
Name: api.BuiltinAWSLambdaExtension,
Arguments: map[string]interface{}{
"ARN": "arn:aws:lambda:us-east-1:111111111111:function:lambda-1234",
"PayloadPassthrough": true,
},
},
ServiceName: dbService,
Upstreams: map[api.CompoundServiceName]UpstreamData{
Upstreams: map[api.CompoundServiceName]extensioncommon.UpstreamData{
dbService: {
SNI: map[string]struct{}{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul": {},
@ -223,7 +224,7 @@ func TestGetExtensionConfigurations_ConnectProxy(t *testing.T) {
},
},
ServiceName: dbService,
Upstreams: map[api.CompoundServiceName]UpstreamData{
Upstreams: map[api.CompoundServiceName]extensioncommon.UpstreamData{
dbService: {
SNI: map[string]struct{}{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul": {},
@ -240,12 +241,12 @@ func TestGetExtensionConfigurations_ConnectProxy(t *testing.T) {
},
"local service extensions": {
snapshot: snapWebConnect,
expected: map[api.CompoundServiceName][]ExtensionConfiguration{
expected: map[api.CompoundServiceName][]extensioncommon.RuntimeConfig{
dbService: {},
webService: {
{
EnvoyExtension: api.EnvoyExtension{
Name: structs.BuiltinAWSLambdaExtension,
Name: api.BuiltinAWSLambdaExtension,
Arguments: map[string]interface{}{
"ARN": "arn:aws:lambda:us-east-1:111111111111:function:lambda-1234",
"PayloadPassthrough": true,
@ -274,7 +275,7 @@ func TestGetExtensionConfigurations_ConnectProxy(t *testing.T) {
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
require.Equal(t, tc.expected, GetExtensionConfigurations(tc.snapshot))
require.Equal(t, tc.expected, GetRuntimeConfigurations(tc.snapshot))
})
}
}

View File

@ -1,24 +0,0 @@
package xds
import (
"github.com/hashicorp/consul/agent/xds/builtinextensions/lambda"
"github.com/hashicorp/consul/agent/xds/builtinextensions/lua"
"github.com/hashicorp/consul/agent/xds/builtinextensiontemplate"
"github.com/hashicorp/consul/agent/xds/xdscommon"
"github.com/hashicorp/consul/api"
)
func GetBuiltInExtension(ext xdscommon.ExtensionConfiguration) (builtinextensiontemplate.EnvoyExtension, bool) {
var c builtinextensiontemplate.PluginConstructor
switch ext.EnvoyExtension.Name {
case api.BuiltinAWSLambdaExtension:
c = lambda.MakeLambdaExtension
case api.BuiltinLuaExtension:
c = lua.MakeLuaExtension
default:
var e builtinextensiontemplate.EnvoyExtension
return e, false
}
return builtinextensiontemplate.EnvoyExtension{Constructor: c}, true
}

View File

@ -2,10 +2,10 @@ package xds
import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/envoyextensions/builtin/validate"
"github.com/hashicorp/consul/agent/envoyextensions/extensioncommon"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/builtinextensions/validate"
"github.com/hashicorp/consul/agent/xds/builtinextensiontemplate"
"github.com/hashicorp/consul/agent/xds/xdscommon"
"github.com/hashicorp/consul/api"
@ -79,7 +79,7 @@ func Validate(indexedResources *xdscommon.IndexedResources, service api.Compound
}
// Build an ExtensionConfiguration for Validate plugin.
extConfig := xdscommon.ExtensionConfiguration{
extConfig := extensioncommon.RuntimeConfig{
EnvoyExtension: api.EnvoyExtension{
Name: "builtin/proxy/validate",
Arguments: map[string]interface{}{
@ -87,7 +87,7 @@ func Validate(indexedResources *xdscommon.IndexedResources, service api.Compound
},
},
ServiceName: service,
Upstreams: map[api.CompoundServiceName]xdscommon.UpstreamData{
Upstreams: map[api.CompoundServiceName]extensioncommon.UpstreamData{
service: {
VIP: vip,
// Even though snis are under the upstream service name we're validating, it actually contains all
@ -100,19 +100,24 @@ func Validate(indexedResources *xdscommon.IndexedResources, service api.Compound
},
Kind: api.ServiceKindConnectProxy,
}
extension := builtinextensiontemplate.EnvoyExtension{Constructor: validate.MakeValidate}
err := extension.Validate(extConfig)
basicExtension, err := validate.MakeValidate(extConfig)
if err != nil {
return err
}
extender := extensioncommon.BasicEnvoyExtender{
Extension: basicExtension,
}
err = extender.Validate(&extConfig)
if err != nil {
return err
}
_, err = extension.Extend(indexedResources, extConfig)
_, err = extender.Extend(indexedResources, &extConfig)
if err != nil {
return err
}
v, ok := extension.Plugin.(*validate.Validate)
v, ok := extender.Extension.(*validate.Validate)
if !ok {
panic("validate plugin was not correctly created")
}

View File

@ -2,14 +2,29 @@ package xdscommon
import (
"google.golang.org/protobuf/proto"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
)
const (
// PublicListenerName is the name we give the public listener in Envoy config.
PublicListenerName = "public_listener"
// OutboundListenerName is the name we give the outbound Envoy listener when transparent proxy mode is enabled.
OutboundListenerName = "outbound_listener"
// LocalAppClusterName is the name we give the local application "cluster" in
// Envoy config. Note that all cluster names may collide with service names
// since we want cluster names and service names to match to enable nice
// metrics correlation without massaging prefixes on cluster names.
//
// We should probably make this more unlikely to collide however changing it
// potentially breaks upgrade compatibility without restarting all Envoy's as
// it will no longer match their existing cluster name. Changing this will
// affect metrics output so could break dashboards (for local app traffic).
//
// We should probably just make it configurable if anyone actually has
// services named "local_app" in the future.
LocalAppClusterName = "local_app"
// Resource types in xDS v3. These are copied from
// envoyproxy/go-control-plane/pkg/resource/v3/resource.go since we don't need any of
// the rest of that package.
@ -26,33 +41,8 @@ const (
// ListenerType is the TypeURL for Listener discovery responses.
ListenerType = apiTypePrefix + "envoy.config.listener.v3.Listener"
// PublicListenerName is the name we give the public listener in Envoy config.
PublicListenerName = "public_listener"
// LocalAppClusterName is the name we give the local application "cluster" in
// Envoy config. Note that all cluster names may collide with service names
// since we want cluster names and service names to match to enable nice
// metrics correlation without massaging prefixes on cluster names.
//
// We should probably make this more unlikely to collide however changing it
// potentially breaks upgrade compatibility without restarting all Envoy's as
// it will no longer match their existing cluster name. Changing this will
// affect metrics output so could break dashboards (for local app traffic).
//
// We should probably just make it configurable if anyone actually has
// services named "local_app" in the future.
LocalAppClusterName = "local_app"
// OutboundListenerName is the name we give the outbound Envoy listener when transparent proxy mode is enabled.
OutboundListenerName = "outbound_listener"
)
type EnvoyExtension interface {
Extend(*IndexedResources, ExtensionConfiguration) (*IndexedResources, error)
Validate(ExtensionConfiguration) error
}
type IndexedResources struct {
// Index is a map of typeURL => resourceName => resource
Index map[string]map[string]proto.Message
@ -77,214 +67,3 @@ func EmptyIndexedResources() *IndexedResources {
},
}
}
type ServiceConfig struct {
// Kind identifies the final proxy kind that will make the request to the
// destination service.
Kind api.ServiceKind
EnvoyExtensions []api.EnvoyExtension
}
// ExtensionConfiguration is the configuration for an extension attached to a service on the local proxy. Currently, it
// is only created for the local proxy's upstream service if the upstream service has an extension configured. In the
// future it will also include information about the service local to the local proxy as well. It should depend on the
// API client rather than the structs package because the API client is meant to be public.
type ExtensionConfiguration struct {
// EnvoyExtension is the extension that will patch Envoy resources.
EnvoyExtension api.EnvoyExtension
// ServiceName is the name of the service the EnvoyExtension is being applied to. It could be the local service or
// an upstream of the local service.
ServiceName api.CompoundServiceName
// Upstreams will only be configured on the ExtensionConfiguration if the EnvoyExtension is being applied to an
// upstream. If there are no Upstreams, then EnvoyExtension is being applied to the local service's resources.
Upstreams map[api.CompoundServiceName]UpstreamData
// Kind is mode the local Envoy proxy is running in. For now, only connect proxy and
// terminating gateways are supported.
Kind api.ServiceKind
}
// UpstreamData has the SNI, EnvoyID, and OutgoingProxyKind of the upstream services for the local proxy and this data
// is used to choose which Envoy resources to patch.
type UpstreamData struct {
// VIP is the tproxy virtual IP used to reach an upstream service.
VIP string
// SNI is the SNI header used to reach an upstream service.
SNI map[string]struct{}
// EnvoyID is the envoy ID of an upstream service, structured <service> or <partition>/<ns>/<service> when using a
// non-default namespace or partition.
EnvoyID string
// OutgoingProxyKind is the type of proxy of the upstream service. However, if the upstream is "typical" this will
// be set to "connect-proxy" instead.
OutgoingProxyKind api.ServiceKind
}
func (ec ExtensionConfiguration) IsUpstream() bool {
_, ok := ec.Upstreams[ec.ServiceName]
return ok
}
func (ec ExtensionConfiguration) MatchesUpstreamServiceSNI(sni string) bool {
u := ec.Upstreams[ec.ServiceName]
_, match := u.SNI[sni]
return match
}
func (ec ExtensionConfiguration) EnvoyID() string {
u := ec.Upstreams[ec.ServiceName]
return u.EnvoyID
}
func (ec ExtensionConfiguration) OutgoingProxyKind() api.ServiceKind {
u := ec.Upstreams[ec.ServiceName]
return u.OutgoingProxyKind
}
func GetExtensionConfigurations(cfgSnap *proxycfg.ConfigSnapshot) map[api.CompoundServiceName][]ExtensionConfiguration {
extensionsMap := make(map[api.CompoundServiceName][]api.EnvoyExtension)
upstreamMap := make(map[api.CompoundServiceName]UpstreamData)
var kind api.ServiceKind
extensionConfigurationsMap := make(map[api.CompoundServiceName][]ExtensionConfiguration)
trustDomain := ""
if cfgSnap.Roots != nil {
trustDomain = cfgSnap.Roots.TrustDomain
}
switch cfgSnap.Kind {
case structs.ServiceKindConnectProxy:
kind = api.ServiceKindConnectProxy
outgoingKindByService := make(map[api.CompoundServiceName]api.ServiceKind)
vipForService := make(map[api.CompoundServiceName]string)
for uid, upstreamData := range cfgSnap.ConnectProxy.WatchedUpstreamEndpoints {
sn := upstreamIDToCompoundServiceName(uid)
for _, serviceNodes := range upstreamData {
for _, serviceNode := range serviceNodes {
if serviceNode.Service == nil {
continue
}
vip := serviceNode.Service.TaggedAddresses[structs.TaggedAddressVirtualIP].Address
if vip != "" {
if _, ok := vipForService[sn]; !ok {
vipForService[sn] = vip
}
}
// Store the upstream's kind, and for ServiceKindTypical we don't do anything because we'll default
// any unset upstreams to ServiceKindConnectProxy below.
switch serviceNode.Service.Kind {
case structs.ServiceKindTypical:
default:
outgoingKindByService[sn] = api.ServiceKind(serviceNode.Service.Kind)
}
// We only need the kind from one instance, so break once we find it.
break
}
}
}
// TODO(peering): consider PeerUpstreamEndpoints in addition to DiscoveryChain
// These are the discovery chains for upstreams which have the Envoy Extensions applied to the local service.
for uid, dc := range cfgSnap.ConnectProxy.DiscoveryChain {
compoundServiceName := upstreamIDToCompoundServiceName(uid)
extensionsMap[compoundServiceName] = convertEnvoyExtensions(dc.EnvoyExtensions)
meta := uid.EnterpriseMeta
sni := connect.ServiceSNI(uid.Name, "", meta.NamespaceOrDefault(), meta.PartitionOrDefault(), cfgSnap.Datacenter, trustDomain)
outgoingKind, ok := outgoingKindByService[compoundServiceName]
if !ok {
outgoingKind = api.ServiceKindConnectProxy
}
upstreamMap[compoundServiceName] = UpstreamData{
SNI: map[string]struct{}{sni: {}},
VIP: vipForService[compoundServiceName],
EnvoyID: uid.EnvoyID(),
OutgoingProxyKind: outgoingKind,
}
}
// Adds extensions configured for the local service to the ExtensionConfiguration. This only applies to
// connect-proxies because extensions are either global or tied to a specific service, so the terminating
// gateway's Envoy resources for the local service (i.e not to upstreams) would never need to be modified.
localSvc := api.CompoundServiceName{
Name: cfgSnap.Proxy.DestinationServiceName,
Namespace: cfgSnap.ProxyID.NamespaceOrDefault(),
Partition: cfgSnap.ProxyID.PartitionOrEmpty(),
}
extensionConfigurationsMap[localSvc] = []ExtensionConfiguration{}
cfgSnapExts := convertEnvoyExtensions(cfgSnap.Proxy.EnvoyExtensions)
for _, ext := range cfgSnapExts {
extCfg := ExtensionConfiguration{
EnvoyExtension: ext,
ServiceName: localSvc,
// Upstreams is nil to signify this extension is not being applied to an upstream service, but rather to the local service.
Upstreams: nil,
Kind: kind,
}
extensionConfigurationsMap[localSvc] = append(extensionConfigurationsMap[localSvc], extCfg)
}
case structs.ServiceKindTerminatingGateway:
kind = api.ServiceKindTerminatingGateway
for svc, c := range cfgSnap.TerminatingGateway.ServiceConfigs {
compoundServiceName := serviceNameToCompoundServiceName(svc)
extensionsMap[compoundServiceName] = convertEnvoyExtensions(c.EnvoyExtensions)
sni := connect.ServiceSNI(svc.Name, "", svc.NamespaceOrDefault(), svc.PartitionOrDefault(), cfgSnap.Datacenter, trustDomain)
envoyID := proxycfg.NewUpstreamIDFromServiceName(svc)
snis := map[string]struct{}{sni: {}}
resolver, hasResolver := cfgSnap.TerminatingGateway.ServiceResolvers[svc]
if hasResolver {
for subsetName := range resolver.Subsets {
sni := connect.ServiceSNI(svc.Name, subsetName, svc.NamespaceOrDefault(), svc.PartitionOrDefault(), cfgSnap.Datacenter, trustDomain)
snis[sni] = struct{}{}
}
}
upstreamMap[compoundServiceName] = UpstreamData{
SNI: snis,
EnvoyID: envoyID.EnvoyID(),
OutgoingProxyKind: api.ServiceKindTerminatingGateway,
}
}
}
for svc, exts := range extensionsMap {
extensionConfigurationsMap[svc] = []ExtensionConfiguration{}
for _, ext := range exts {
extCfg := ExtensionConfiguration{
EnvoyExtension: ext,
Kind: kind,
ServiceName: svc,
Upstreams: upstreamMap,
}
extensionConfigurationsMap[svc] = append(extensionConfigurationsMap[svc], extCfg)
}
}
return extensionConfigurationsMap
}
func serviceNameToCompoundServiceName(svc structs.ServiceName) api.CompoundServiceName {
return api.CompoundServiceName{
Name: svc.Name,
Partition: svc.PartitionOrDefault(),
Namespace: svc.NamespaceOrDefault(),
}
}
func upstreamIDToCompoundServiceName(uid proxycfg.UpstreamID) api.CompoundServiceName {
return api.CompoundServiceName{
Name: uid.Name,
Partition: uid.PartitionOrDefault(),
Namespace: uid.NamespaceOrDefault(),
}
}
func convertEnvoyExtensions(structExtensions structs.EnvoyExtensions) []api.EnvoyExtension {
return structExtensions.ToAPI()
}

View File

@ -981,7 +981,7 @@ func ServiceDefaultsToStructs(s *ServiceDefaults, t *structs.ServiceConfigEntry)
t.LocalRequestTimeoutMs = int(s.LocalRequestTimeoutMs)
t.BalanceInboundConnections = s.BalanceInboundConnections
{
t.EnvoyExtensions = make([]structs.EnvoyExtension, len(s.EnvoyExtensions))
t.EnvoyExtensions = make(structs.EnvoyExtensions, len(s.EnvoyExtensions))
for i := range s.EnvoyExtensions {
if s.EnvoyExtensions[i] != nil {
EnvoyExtensionToStructs(s.EnvoyExtensions[i], &t.EnvoyExtensions[i])