Eric Haberkorn d99312b86e
Add Upstream Service Targeting to Property Override Extension (#17517)
* add upstream service targeting to property override extension

* Also add baseline goldens for service specific property override extension.
* Refactor the extension framework to put more logic into the templates.

* fix up the golden tests
2023-05-30 14:53:42 -04:00

436 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package wasm
import (
"fmt"
"net/url"
"time"
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_wasm_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/wasm/v3"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/envoyextensions/extensioncommon"
"github.com/hashicorp/go-multierror"
"github.com/mitchellh/mapstructure"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/wrapperspb"
)
// wasmConfig defines the configuration for a Wasm Envoy extension.
type wasmConfig struct {
// Protocol is the type of Wasm filter to apply, "tcp" or "http".
Protocol string
// ProxyType identifies the type of Envoy proxy that this extension applies to.
// The extension will only be configured for proxies that match this type and
// will be ignored for all other proxy types.
ProxyType api.ServiceKind
// ListenerType identifies the listener type the filter will be applied to.
ListenerType string
// PluginConfig holds the configuration for the Wasm plugin.
PluginConfig pluginConfig
}
// pluginConfig defines a Wasm plugin configuration.
type pluginConfig struct {
// Name is the unique name for the filter in a VM. For use in identifying the
// filter if multiple filters are handled by the same VmID and RootID.
// Also used for logging/debugging.
Name string
// RootID is a unique ID for a set of filters in a VM which will share a
// RootContext and Contexts if applicable (e.g. a Wasm HttpFilter and a Wasm AccessLog).
// All filters with the same RootID and VmID will share Context(s).
RootID string
// VmConfig is the configuration for starting or finding the Wasm VM that the
// filter will run in.
VmConfig vmConfig
// Configuration holds the configuration that will be encoded as bytes and passed to
// the plugin on startup (proxy_on_configure).
Configuration string
// CapabilityRestrictionConfiguration controls the Wasm ABI capabilities available
// to the filter.
CapabilityRestrictionConfiguration capabilityRestrictionConfiguration
// failOpen controls the behavior when a runtime error occurs during filter
// processing.
//
// If set to false runtime errors will result in a failed request.
// For TCP filters, the connection will be closed. For HTTP filters a 503
// status is returned.
//
// If set to true, a runtime error will result in the filter being bypassed.
failOpen bool
}
// vmConfig defines a Wasm VM configuration.
type vmConfig struct {
// VmID is an ID which will be used along with a hash of the Wasm code to
// determine which VM will be used for the plugin. All plugins which use
// the same VmID and code will use the same VM. May be left blank.
VmID string
// Runtime is the Wasm runtime type, one of: v8, wasmtime, wamr, or wavm.
Runtime string
// Code references the Wasm code that will run in the filter.
Code dataSource
// Configuration holds the configuration that will be encoded as bytes and
// passed to the plugin during VM startup (proxy_on_vm_start).
Configuration string
// EnvironmentVariables specifies environment variables to be injected to
// this VM which will be available through WASIs environ_get and
// environ_get_sizes system calls.
EnvironmentVariables environmentVariables
}
// dataSource defines a local or remote location where Wasm code will be loaded from.
type dataSource struct {
// Local supports loading files from a local volume.
Local localDataSource
// Remote supports loading files from a remote server.
Remote remoteDataSource
}
// environmentVariables defines the environment variables that will be made available
// to the Wasm filter.
type environmentVariables struct {
// HostEnvKeys holds the keys of Envoys environment variables exposed to this VM.
// If a key exists in Envoys environment variables, then that key-value pair will
// be injected into the Wasm VM. If a key does not exist, it will be ignored.
HostEnvKeys []string
// KeyValues is a list of key-value pairs to be injected to this VM in the form of "KEY=VALUE".
KeyValues map[string]string
}
// localDataSource defines a file from a local file system.
type localDataSource struct {
// Filename is the path to the Wasm file on the local file system.
Filename string
}
// remoteDataSource defines a file from a remote file server.
type remoteDataSource struct {
// HttpURI
HttpURI httpURI
// SHA256 of the remote file. Used to validate the remote file.
SHA256 string
// RetryPolicy determines how retries are handled.
RetryPolicy retryPolicy
}
// httpURI defines a remote file using an HTTP URI.
type httpURI struct {
// Service is the upstream service the Wasm plugin will be fetched from.
Service api.CompoundServiceName
// URI is the location of the Wasm file on the upstream service.
URI string
// Timeout sets the maximum duration that a response can take.
Timeout string
timeout time.Duration
}
// retryPolicy defines how to handle retries when fetching remote files.
type retryPolicy struct {
// RetryBackOff holds parameters that control retry backoff strategy.
RetryBackOff retryBackoff
// NumRetries specifies the allowed number of retries.
NumRetries int
}
// retryBackoff holds parameters that control retry backoff strategy.
type retryBackoff struct {
// BaseInterval is the base interval to be used for the next back off
// computation. It should be greater than zero and less than or equal
// to MaxInterval.
BaseInterval string
// MaxInterval is the maximum interval between retries.
MaxInterval string
baseInterval time.Duration
maxInterval time.Duration
}
// capabilityRestrictionConfiguration controls Wasm capabilities available to modules.
type capabilityRestrictionConfiguration struct {
// AllowedCapabilities specifies the Wasm capabilities which will be allowed.
// Capabilities are mapped by name. The value which each capability maps to is
// currently ignored and should be left empty.
AllowedCapabilities map[string]any
}
// newWasmConfig creates a filterConfig from the given args.
// It starts with the default wasm configuration and merges in the config
// from the given args.
func newWasmConfig(args map[string]any) (*wasmConfig, error) {
cfg := &wasmConfig{}
if err := mapstructure.Decode(args, cfg); err != nil {
return cfg, err
}
cfg.normalize()
return cfg, nil
}
func (p *pluginConfig) asyncDataSource(rtCfg *extensioncommon.RuntimeConfig) (*envoy_core_v3.AsyncDataSource, error) {
// Local data source
if filename := p.VmConfig.Code.Local.Filename; filename != "" {
return &envoy_core_v3.AsyncDataSource{
Specifier: &envoy_core_v3.AsyncDataSource_Local{
Local: &envoy_core_v3.DataSource{
Specifier: &envoy_core_v3.DataSource_Filename{
Filename: filename,
},
},
},
}, nil
}
// Remote data source
// For a remote file, ensure there is an upstream cluster for the host specified in the URL.
// Envoy requires an explicit cluster in order to perform the DNS lookup required to actually
// fetch the data from the upstream source.
remote := &p.VmConfig.Code.Remote
clusterSNI := ""
for service, upstream := range rtCfg.Upstreams {
if service == remote.HttpURI.Service {
for sni := range upstream.SNIs {
clusterSNI = sni
break
}
}
}
if clusterSNI == "" {
return nil, fmt.Errorf("no upstream found for remote service %q", remote.HttpURI.Service.Name)
}
d := time.Second
if remote.HttpURI.timeout > 0 {
d = remote.HttpURI.timeout
}
timeout := &durationpb.Duration{Seconds: int64(d.Seconds())}
return &envoy_core_v3.AsyncDataSource{
Specifier: &envoy_core_v3.AsyncDataSource_Remote{
Remote: &envoy_core_v3.RemoteDataSource{
Sha256: remote.SHA256,
HttpUri: &envoy_core_v3.HttpUri{
Uri: remote.HttpURI.URI,
HttpUpstreamType: &envoy_core_v3.HttpUri_Cluster{
Cluster: clusterSNI,
},
Timeout: timeout,
},
RetryPolicy: p.retryPolicy(),
},
},
}, nil
}
func (p *pluginConfig) capConfig() *envoy_wasm_v3.CapabilityRestrictionConfig {
if len(p.CapabilityRestrictionConfiguration.AllowedCapabilities) == 0 {
return nil
}
allowedCaps := make(map[string]*envoy_wasm_v3.SanitizationConfig)
for key := range p.CapabilityRestrictionConfiguration.AllowedCapabilities {
allowedCaps[key] = &envoy_wasm_v3.SanitizationConfig{}
}
return &envoy_wasm_v3.CapabilityRestrictionConfig{
AllowedCapabilities: allowedCaps,
}
}
func (p *pluginConfig) envoyPluginConfig(rtCfg *extensioncommon.RuntimeConfig) (*envoy_wasm_v3.PluginConfig, error) {
var err error
var pluginCfgData, vmCfgData *anypb.Any
if p.Configuration != "" {
pluginCfgData, err = anypb.New(wrapperspb.String(p.Configuration))
if err != nil {
return nil, fmt.Errorf("failed to encode Wasm plugin configuration: %w", err)
}
}
if p.VmConfig.Configuration != "" {
vmCfgData, err = anypb.New(wrapperspb.String(p.VmConfig.Configuration))
if err != nil {
return nil, fmt.Errorf("failed to encode Wasm VM configuration: %w", err)
}
}
code, err := p.asyncDataSource(rtCfg)
if err != nil {
return nil, fmt.Errorf("failed to encode async data source configuration: %w", err)
}
var envVars *envoy_wasm_v3.EnvironmentVariables
if len(p.VmConfig.EnvironmentVariables.HostEnvKeys) > 0 ||
len(p.VmConfig.EnvironmentVariables.KeyValues) > 0 {
envVars = &envoy_wasm_v3.EnvironmentVariables{
HostEnvKeys: p.VmConfig.EnvironmentVariables.HostEnvKeys,
KeyValues: p.VmConfig.EnvironmentVariables.KeyValues,
}
}
return &envoy_wasm_v3.PluginConfig{
Name: p.Name,
RootId: p.RootID,
Vm: &envoy_wasm_v3.PluginConfig_VmConfig{
VmConfig: &envoy_wasm_v3.VmConfig{
VmId: p.VmConfig.VmID,
Runtime: fmt.Sprintf("envoy.wasm.runtime.%s", p.VmConfig.Runtime),
Code: code,
Configuration: vmCfgData,
EnvironmentVariables: envVars,
},
},
Configuration: pluginCfgData,
CapabilityRestrictionConfig: p.capConfig(),
FailOpen: p.failOpen,
}, nil
}
func (p *pluginConfig) retryPolicy() *envoy_core_v3.RetryPolicy {
remote := &p.VmConfig.Code.Remote
if remote.RetryPolicy.NumRetries <= 0 &&
remote.RetryPolicy.RetryBackOff.BaseInterval == "" &&
remote.RetryPolicy.RetryBackOff.MaxInterval == "" {
return nil
}
retryPolicy := &envoy_core_v3.RetryPolicy{}
if remote.RetryPolicy.NumRetries > 0 {
retryPolicy.NumRetries = wrapperspb.UInt32(uint32(remote.RetryPolicy.NumRetries))
}
var baseInterval, maxInterval *durationpb.Duration
if remote.RetryPolicy.RetryBackOff.baseInterval > 0 {
baseInterval = &durationpb.Duration{Seconds: int64(remote.RetryPolicy.RetryBackOff.baseInterval.Seconds())}
}
if remote.RetryPolicy.RetryBackOff.maxInterval > 0 {
maxInterval = &durationpb.Duration{Seconds: int64(remote.RetryPolicy.RetryBackOff.maxInterval.Seconds())}
}
if baseInterval != nil || maxInterval != nil {
retryPolicy.RetryBackOff = &envoy_core_v3.BackoffStrategy{
BaseInterval: baseInterval,
MaxInterval: maxInterval,
}
}
return retryPolicy
}
func (w *wasmConfig) normalize() {
if w.ProxyType == "" {
w.ProxyType = api.ServiceKindConnectProxy
}
if w.PluginConfig.VmConfig.Runtime == "" {
w.PluginConfig.VmConfig.Runtime = supportedRuntimes[0]
}
httpURI := &w.PluginConfig.VmConfig.Code.Remote.HttpURI
httpURI.Service.Namespace = acl.NamespaceOrDefault(httpURI.Service.Namespace)
httpURI.Service.Partition = acl.PartitionOrDefault(httpURI.Service.Partition)
if httpURI.timeout <= 0 {
httpURI.timeout = time.Second
}
}
// validate ensures the filterConfig is valid or it returns an error.
// This method must be called before using the configuration.
func (w *wasmConfig) validate() error {
var err, resultErr error
if w.Protocol != "tcp" && w.Protocol != "http" {
resultErr = multierror.Append(resultErr, fmt.Errorf(`unsupported Protocol %q, expected "tcp" or "http"`, w.Protocol))
}
if w.ProxyType != api.ServiceKindConnectProxy {
resultErr = multierror.Append(resultErr, fmt.Errorf("unsupported ProxyType %q, only %q is supported", w.ProxyType, api.ServiceKindConnectProxy))
}
if w.ListenerType != "inbound" && w.ListenerType != "outbound" {
resultErr = multierror.Append(resultErr, fmt.Errorf(`unsupported ListenerType %q, expected "inbound" or "outbound"`, w.ListenerType))
}
if err = validateRuntime(w.PluginConfig.VmConfig.Runtime); err != nil {
resultErr = multierror.Append(resultErr, err)
}
httpURI := &w.PluginConfig.VmConfig.Code.Remote.HttpURI
isLocal := w.PluginConfig.VmConfig.Code.Local.Filename != ""
isRemote := httpURI.Service.Name != "" || httpURI.URI != ""
if isLocal == isRemote {
resultErr = multierror.Append(resultErr, fmt.Errorf("VmConfig.Code must provide exactly one of Local or Remote data source"))
}
// If the data source is Local then validation is complete.
if isLocal {
return resultErr
}
// Validate the remote data source fields.
// Both Service and URI are required inputs for remote data sources.
// We could catch this above in the isRemote check; however, we do an explicit check
// here for UX to give the user extra feedback in case they only provide one or the other.
if httpURI.Service.Name == "" || httpURI.URI == "" {
resultErr = multierror.Append(resultErr, fmt.Errorf("both Service and URI are required for Remote data sources"))
}
if w.PluginConfig.VmConfig.Code.Remote.SHA256 == "" {
resultErr = multierror.Append(resultErr, fmt.Errorf("SHA256 checksum is required for Remote data sources"))
}
if _, err := url.Parse(httpURI.URI); err != nil {
resultErr = multierror.Append(resultErr, fmt.Errorf("invalid HttpURI.URI: %w", err))
}
if httpURI.Timeout != "" {
httpURI.timeout, err = time.ParseDuration(httpURI.Timeout)
if err != nil {
resultErr = multierror.Append(resultErr, fmt.Errorf("failed to parse HttpURI.Timeout %q as a duration: %w", httpURI.Timeout, err))
}
}
retryPolicy := &w.PluginConfig.VmConfig.Code.Remote.RetryPolicy
if retryPolicy.NumRetries < 0 {
resultErr = multierror.Append(resultErr, fmt.Errorf("RetryPolicy.NumRetries must be greater than or equal to 0"))
}
if retryPolicy.RetryBackOff.BaseInterval != "" {
retryPolicy.RetryBackOff.baseInterval, err = time.ParseDuration(retryPolicy.RetryBackOff.BaseInterval)
if err != nil {
resultErr = multierror.Append(resultErr, fmt.Errorf("failed to parse RetryBackOff.BaseInterval %q: %w", retryPolicy.RetryBackOff.BaseInterval, err))
}
}
if retryPolicy.RetryBackOff.MaxInterval != "" {
retryPolicy.RetryBackOff.maxInterval, err = time.ParseDuration(retryPolicy.RetryBackOff.MaxInterval)
if err != nil {
resultErr = multierror.Append(resultErr, fmt.Errorf("failed to parse RetryBackOff.MaxInterval %q: %w", retryPolicy.RetryBackOff.MaxInterval, err))
}
}
if retryPolicy.RetryBackOff.BaseInterval != "" && retryPolicy.RetryBackOff.baseInterval <= 0 {
resultErr = multierror.Append(resultErr, fmt.Errorf("RetryBackOff.BaseInterval %q must be greater than zero and less than or equal to RetryBackOff.MaxInterval %q",
retryPolicy.RetryBackOff.BaseInterval,
retryPolicy.RetryBackOff.MaxInterval),
)
}
if retryPolicy.RetryBackOff.MaxInterval != "" &&
retryPolicy.RetryBackOff.maxInterval < retryPolicy.RetryBackOff.baseInterval {
resultErr = multierror.Append(resultErr, fmt.Errorf("RetryBackOff.MaxInterval %q must be greater than or equal to RetryBackOff.BaseInterval %q",
retryPolicy.RetryBackOff.MaxInterval,
retryPolicy.RetryBackOff.BaseInterval),
)
}
return resultErr
}
func validateRuntime(s string) error {
for _, rt := range supportedRuntimes {
if s == rt {
return nil
}
}
return fmt.Errorf("unsupported runtime %q", s)
}