mirror of
https://github.com/status-im/consul.git
synced 2025-01-24 04:31:12 +00:00
d99312b86e
* add upstream service targeting to property override extension * Also add baseline goldens for service specific property override extension. * Refactor the extension framework to put more logic into the templates. * fix up the golden tests
436 lines
16 KiB
Go
436 lines
16 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
||
// SPDX-License-Identifier: MPL-2.0
|
||
|
||
package wasm
|
||
|
||
import (
|
||
"fmt"
|
||
"net/url"
|
||
"time"
|
||
|
||
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
|
||
envoy_wasm_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/wasm/v3"
|
||
"github.com/hashicorp/consul/acl"
|
||
"github.com/hashicorp/consul/api"
|
||
"github.com/hashicorp/consul/envoyextensions/extensioncommon"
|
||
"github.com/hashicorp/go-multierror"
|
||
"github.com/mitchellh/mapstructure"
|
||
"google.golang.org/protobuf/types/known/anypb"
|
||
"google.golang.org/protobuf/types/known/durationpb"
|
||
"google.golang.org/protobuf/types/known/wrapperspb"
|
||
)
|
||
|
||
// wasmConfig defines the configuration for a Wasm Envoy extension.
|
||
type wasmConfig struct {
|
||
// Protocol is the type of Wasm filter to apply, "tcp" or "http".
|
||
Protocol string
|
||
// ProxyType identifies the type of Envoy proxy that this extension applies to.
|
||
// The extension will only be configured for proxies that match this type and
|
||
// will be ignored for all other proxy types.
|
||
ProxyType api.ServiceKind
|
||
// ListenerType identifies the listener type the filter will be applied to.
|
||
ListenerType string
|
||
// PluginConfig holds the configuration for the Wasm plugin.
|
||
PluginConfig pluginConfig
|
||
}
|
||
|
||
// pluginConfig defines a Wasm plugin configuration.
|
||
type pluginConfig struct {
|
||
// Name is the unique name for the filter in a VM. For use in identifying the
|
||
// filter if multiple filters are handled by the same VmID and RootID.
|
||
// Also used for logging/debugging.
|
||
Name string
|
||
// RootID is a unique ID for a set of filters in a VM which will share a
|
||
// RootContext and Contexts if applicable (e.g. a Wasm HttpFilter and a Wasm AccessLog).
|
||
// All filters with the same RootID and VmID will share Context(s).
|
||
RootID string
|
||
// VmConfig is the configuration for starting or finding the Wasm VM that the
|
||
// filter will run in.
|
||
VmConfig vmConfig
|
||
// Configuration holds the configuration that will be encoded as bytes and passed to
|
||
// the plugin on startup (proxy_on_configure).
|
||
Configuration string
|
||
// CapabilityRestrictionConfiguration controls the Wasm ABI capabilities available
|
||
// to the filter.
|
||
CapabilityRestrictionConfiguration capabilityRestrictionConfiguration
|
||
|
||
// failOpen controls the behavior when a runtime error occurs during filter
|
||
// processing.
|
||
//
|
||
// If set to false runtime errors will result in a failed request.
|
||
// For TCP filters, the connection will be closed. For HTTP filters a 503
|
||
// status is returned.
|
||
//
|
||
// If set to true, a runtime error will result in the filter being bypassed.
|
||
failOpen bool
|
||
}
|
||
|
||
// vmConfig defines a Wasm VM configuration.
|
||
type vmConfig struct {
|
||
// VmID is an ID which will be used along with a hash of the Wasm code to
|
||
// determine which VM will be used for the plugin. All plugins which use
|
||
// the same VmID and code will use the same VM. May be left blank.
|
||
VmID string
|
||
// Runtime is the Wasm runtime type, one of: v8, wasmtime, wamr, or wavm.
|
||
Runtime string
|
||
// Code references the Wasm code that will run in the filter.
|
||
Code dataSource
|
||
// Configuration holds the configuration that will be encoded as bytes and
|
||
// passed to the plugin during VM startup (proxy_on_vm_start).
|
||
Configuration string
|
||
// EnvironmentVariables specifies environment variables to be injected to
|
||
// this VM which will be available through WASI’s environ_get and
|
||
// environ_get_sizes system calls.
|
||
EnvironmentVariables environmentVariables
|
||
}
|
||
|
||
// dataSource defines a local or remote location where Wasm code will be loaded from.
|
||
type dataSource struct {
|
||
// Local supports loading files from a local volume.
|
||
Local localDataSource
|
||
// Remote supports loading files from a remote server.
|
||
Remote remoteDataSource
|
||
}
|
||
|
||
// environmentVariables defines the environment variables that will be made available
|
||
// to the Wasm filter.
|
||
type environmentVariables struct {
|
||
// HostEnvKeys holds the keys of Envoy’s environment variables exposed to this VM.
|
||
// If a key exists in Envoy’s environment variables, then that key-value pair will
|
||
// be injected into the Wasm VM. If a key does not exist, it will be ignored.
|
||
HostEnvKeys []string
|
||
// KeyValues is a list of key-value pairs to be injected to this VM in the form of "KEY=VALUE".
|
||
KeyValues map[string]string
|
||
}
|
||
|
||
// localDataSource defines a file from a local file system.
|
||
type localDataSource struct {
|
||
// Filename is the path to the Wasm file on the local file system.
|
||
Filename string
|
||
}
|
||
|
||
// remoteDataSource defines a file from a remote file server.
|
||
type remoteDataSource struct {
|
||
// HttpURI
|
||
HttpURI httpURI
|
||
// SHA256 of the remote file. Used to validate the remote file.
|
||
SHA256 string
|
||
// RetryPolicy determines how retries are handled.
|
||
RetryPolicy retryPolicy
|
||
}
|
||
|
||
// httpURI defines a remote file using an HTTP URI.
|
||
type httpURI struct {
|
||
// Service is the upstream service the Wasm plugin will be fetched from.
|
||
Service api.CompoundServiceName
|
||
// URI is the location of the Wasm file on the upstream service.
|
||
URI string
|
||
// Timeout sets the maximum duration that a response can take.
|
||
Timeout string
|
||
|
||
timeout time.Duration
|
||
}
|
||
|
||
// retryPolicy defines how to handle retries when fetching remote files.
|
||
type retryPolicy struct {
|
||
// RetryBackOff holds parameters that control retry backoff strategy.
|
||
RetryBackOff retryBackoff
|
||
// NumRetries specifies the allowed number of retries.
|
||
NumRetries int
|
||
}
|
||
|
||
// retryBackoff holds parameters that control retry backoff strategy.
|
||
type retryBackoff struct {
|
||
// BaseInterval is the base interval to be used for the next back off
|
||
// computation. It should be greater than zero and less than or equal
|
||
// to MaxInterval.
|
||
BaseInterval string
|
||
// MaxInterval is the maximum interval between retries.
|
||
MaxInterval string
|
||
|
||
baseInterval time.Duration
|
||
maxInterval time.Duration
|
||
}
|
||
|
||
// capabilityRestrictionConfiguration controls Wasm capabilities available to modules.
|
||
type capabilityRestrictionConfiguration struct {
|
||
// AllowedCapabilities specifies the Wasm capabilities which will be allowed.
|
||
// Capabilities are mapped by name. The value which each capability maps to is
|
||
// currently ignored and should be left empty.
|
||
AllowedCapabilities map[string]any
|
||
}
|
||
|
||
// newWasmConfig creates a filterConfig from the given args.
|
||
// It starts with the default wasm configuration and merges in the config
|
||
// from the given args.
|
||
func newWasmConfig(args map[string]any) (*wasmConfig, error) {
|
||
cfg := &wasmConfig{}
|
||
if err := mapstructure.Decode(args, cfg); err != nil {
|
||
return cfg, err
|
||
}
|
||
cfg.normalize()
|
||
return cfg, nil
|
||
}
|
||
|
||
func (p *pluginConfig) asyncDataSource(rtCfg *extensioncommon.RuntimeConfig) (*envoy_core_v3.AsyncDataSource, error) {
|
||
|
||
// Local data source
|
||
if filename := p.VmConfig.Code.Local.Filename; filename != "" {
|
||
return &envoy_core_v3.AsyncDataSource{
|
||
Specifier: &envoy_core_v3.AsyncDataSource_Local{
|
||
Local: &envoy_core_v3.DataSource{
|
||
Specifier: &envoy_core_v3.DataSource_Filename{
|
||
Filename: filename,
|
||
},
|
||
},
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
// Remote data source
|
||
// For a remote file, ensure there is an upstream cluster for the host specified in the URL.
|
||
// Envoy requires an explicit cluster in order to perform the DNS lookup required to actually
|
||
// fetch the data from the upstream source.
|
||
remote := &p.VmConfig.Code.Remote
|
||
clusterSNI := ""
|
||
for service, upstream := range rtCfg.Upstreams {
|
||
if service == remote.HttpURI.Service {
|
||
for sni := range upstream.SNIs {
|
||
clusterSNI = sni
|
||
break
|
||
}
|
||
}
|
||
}
|
||
if clusterSNI == "" {
|
||
return nil, fmt.Errorf("no upstream found for remote service %q", remote.HttpURI.Service.Name)
|
||
}
|
||
|
||
d := time.Second
|
||
if remote.HttpURI.timeout > 0 {
|
||
d = remote.HttpURI.timeout
|
||
}
|
||
timeout := &durationpb.Duration{Seconds: int64(d.Seconds())}
|
||
|
||
return &envoy_core_v3.AsyncDataSource{
|
||
Specifier: &envoy_core_v3.AsyncDataSource_Remote{
|
||
Remote: &envoy_core_v3.RemoteDataSource{
|
||
Sha256: remote.SHA256,
|
||
HttpUri: &envoy_core_v3.HttpUri{
|
||
Uri: remote.HttpURI.URI,
|
||
HttpUpstreamType: &envoy_core_v3.HttpUri_Cluster{
|
||
Cluster: clusterSNI,
|
||
},
|
||
Timeout: timeout,
|
||
},
|
||
RetryPolicy: p.retryPolicy(),
|
||
},
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
func (p *pluginConfig) capConfig() *envoy_wasm_v3.CapabilityRestrictionConfig {
|
||
if len(p.CapabilityRestrictionConfiguration.AllowedCapabilities) == 0 {
|
||
return nil
|
||
}
|
||
|
||
allowedCaps := make(map[string]*envoy_wasm_v3.SanitizationConfig)
|
||
for key := range p.CapabilityRestrictionConfiguration.AllowedCapabilities {
|
||
allowedCaps[key] = &envoy_wasm_v3.SanitizationConfig{}
|
||
}
|
||
|
||
return &envoy_wasm_v3.CapabilityRestrictionConfig{
|
||
AllowedCapabilities: allowedCaps,
|
||
}
|
||
}
|
||
|
||
func (p *pluginConfig) envoyPluginConfig(rtCfg *extensioncommon.RuntimeConfig) (*envoy_wasm_v3.PluginConfig, error) {
|
||
var err error
|
||
var pluginCfgData, vmCfgData *anypb.Any
|
||
|
||
if p.Configuration != "" {
|
||
pluginCfgData, err = anypb.New(wrapperspb.String(p.Configuration))
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to encode Wasm plugin configuration: %w", err)
|
||
}
|
||
}
|
||
|
||
if p.VmConfig.Configuration != "" {
|
||
vmCfgData, err = anypb.New(wrapperspb.String(p.VmConfig.Configuration))
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to encode Wasm VM configuration: %w", err)
|
||
}
|
||
}
|
||
|
||
code, err := p.asyncDataSource(rtCfg)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to encode async data source configuration: %w", err)
|
||
}
|
||
|
||
var envVars *envoy_wasm_v3.EnvironmentVariables
|
||
if len(p.VmConfig.EnvironmentVariables.HostEnvKeys) > 0 ||
|
||
len(p.VmConfig.EnvironmentVariables.KeyValues) > 0 {
|
||
envVars = &envoy_wasm_v3.EnvironmentVariables{
|
||
HostEnvKeys: p.VmConfig.EnvironmentVariables.HostEnvKeys,
|
||
KeyValues: p.VmConfig.EnvironmentVariables.KeyValues,
|
||
}
|
||
}
|
||
|
||
return &envoy_wasm_v3.PluginConfig{
|
||
Name: p.Name,
|
||
RootId: p.RootID,
|
||
Vm: &envoy_wasm_v3.PluginConfig_VmConfig{
|
||
VmConfig: &envoy_wasm_v3.VmConfig{
|
||
VmId: p.VmConfig.VmID,
|
||
Runtime: fmt.Sprintf("envoy.wasm.runtime.%s", p.VmConfig.Runtime),
|
||
Code: code,
|
||
Configuration: vmCfgData,
|
||
EnvironmentVariables: envVars,
|
||
},
|
||
},
|
||
Configuration: pluginCfgData,
|
||
CapabilityRestrictionConfig: p.capConfig(),
|
||
FailOpen: p.failOpen,
|
||
}, nil
|
||
}
|
||
|
||
func (p *pluginConfig) retryPolicy() *envoy_core_v3.RetryPolicy {
|
||
remote := &p.VmConfig.Code.Remote
|
||
if remote.RetryPolicy.NumRetries <= 0 &&
|
||
remote.RetryPolicy.RetryBackOff.BaseInterval == "" &&
|
||
remote.RetryPolicy.RetryBackOff.MaxInterval == "" {
|
||
return nil
|
||
}
|
||
|
||
retryPolicy := &envoy_core_v3.RetryPolicy{}
|
||
|
||
if remote.RetryPolicy.NumRetries > 0 {
|
||
retryPolicy.NumRetries = wrapperspb.UInt32(uint32(remote.RetryPolicy.NumRetries))
|
||
}
|
||
|
||
var baseInterval, maxInterval *durationpb.Duration
|
||
if remote.RetryPolicy.RetryBackOff.baseInterval > 0 {
|
||
baseInterval = &durationpb.Duration{Seconds: int64(remote.RetryPolicy.RetryBackOff.baseInterval.Seconds())}
|
||
}
|
||
if remote.RetryPolicy.RetryBackOff.maxInterval > 0 {
|
||
maxInterval = &durationpb.Duration{Seconds: int64(remote.RetryPolicy.RetryBackOff.maxInterval.Seconds())}
|
||
}
|
||
|
||
if baseInterval != nil || maxInterval != nil {
|
||
retryPolicy.RetryBackOff = &envoy_core_v3.BackoffStrategy{
|
||
BaseInterval: baseInterval,
|
||
MaxInterval: maxInterval,
|
||
}
|
||
}
|
||
|
||
return retryPolicy
|
||
}
|
||
|
||
func (w *wasmConfig) normalize() {
|
||
if w.ProxyType == "" {
|
||
w.ProxyType = api.ServiceKindConnectProxy
|
||
}
|
||
|
||
if w.PluginConfig.VmConfig.Runtime == "" {
|
||
w.PluginConfig.VmConfig.Runtime = supportedRuntimes[0]
|
||
}
|
||
|
||
httpURI := &w.PluginConfig.VmConfig.Code.Remote.HttpURI
|
||
httpURI.Service.Namespace = acl.NamespaceOrDefault(httpURI.Service.Namespace)
|
||
httpURI.Service.Partition = acl.PartitionOrDefault(httpURI.Service.Partition)
|
||
if httpURI.timeout <= 0 {
|
||
httpURI.timeout = time.Second
|
||
}
|
||
}
|
||
|
||
// validate ensures the filterConfig is valid or it returns an error.
|
||
// This method must be called before using the configuration.
|
||
func (w *wasmConfig) validate() error {
|
||
var err, resultErr error
|
||
if w.Protocol != "tcp" && w.Protocol != "http" {
|
||
resultErr = multierror.Append(resultErr, fmt.Errorf(`unsupported Protocol %q, expected "tcp" or "http"`, w.Protocol))
|
||
}
|
||
if w.ProxyType != api.ServiceKindConnectProxy {
|
||
resultErr = multierror.Append(resultErr, fmt.Errorf("unsupported ProxyType %q, only %q is supported", w.ProxyType, api.ServiceKindConnectProxy))
|
||
}
|
||
if w.ListenerType != "inbound" && w.ListenerType != "outbound" {
|
||
resultErr = multierror.Append(resultErr, fmt.Errorf(`unsupported ListenerType %q, expected "inbound" or "outbound"`, w.ListenerType))
|
||
}
|
||
if err = validateRuntime(w.PluginConfig.VmConfig.Runtime); err != nil {
|
||
resultErr = multierror.Append(resultErr, err)
|
||
}
|
||
|
||
httpURI := &w.PluginConfig.VmConfig.Code.Remote.HttpURI
|
||
isLocal := w.PluginConfig.VmConfig.Code.Local.Filename != ""
|
||
isRemote := httpURI.Service.Name != "" || httpURI.URI != ""
|
||
if isLocal == isRemote {
|
||
resultErr = multierror.Append(resultErr, fmt.Errorf("VmConfig.Code must provide exactly one of Local or Remote data source"))
|
||
}
|
||
|
||
// If the data source is Local then validation is complete.
|
||
if isLocal {
|
||
return resultErr
|
||
}
|
||
|
||
// Validate the remote data source fields.
|
||
// Both Service and URI are required inputs for remote data sources.
|
||
// We could catch this above in the isRemote check; however, we do an explicit check
|
||
// here for UX to give the user extra feedback in case they only provide one or the other.
|
||
if httpURI.Service.Name == "" || httpURI.URI == "" {
|
||
resultErr = multierror.Append(resultErr, fmt.Errorf("both Service and URI are required for Remote data sources"))
|
||
}
|
||
if w.PluginConfig.VmConfig.Code.Remote.SHA256 == "" {
|
||
resultErr = multierror.Append(resultErr, fmt.Errorf("SHA256 checksum is required for Remote data sources"))
|
||
}
|
||
if _, err := url.Parse(httpURI.URI); err != nil {
|
||
resultErr = multierror.Append(resultErr, fmt.Errorf("invalid HttpURI.URI: %w", err))
|
||
}
|
||
if httpURI.Timeout != "" {
|
||
httpURI.timeout, err = time.ParseDuration(httpURI.Timeout)
|
||
if err != nil {
|
||
resultErr = multierror.Append(resultErr, fmt.Errorf("failed to parse HttpURI.Timeout %q as a duration: %w", httpURI.Timeout, err))
|
||
}
|
||
}
|
||
|
||
retryPolicy := &w.PluginConfig.VmConfig.Code.Remote.RetryPolicy
|
||
if retryPolicy.NumRetries < 0 {
|
||
resultErr = multierror.Append(resultErr, fmt.Errorf("RetryPolicy.NumRetries must be greater than or equal to 0"))
|
||
}
|
||
|
||
if retryPolicy.RetryBackOff.BaseInterval != "" {
|
||
retryPolicy.RetryBackOff.baseInterval, err = time.ParseDuration(retryPolicy.RetryBackOff.BaseInterval)
|
||
if err != nil {
|
||
resultErr = multierror.Append(resultErr, fmt.Errorf("failed to parse RetryBackOff.BaseInterval %q: %w", retryPolicy.RetryBackOff.BaseInterval, err))
|
||
}
|
||
}
|
||
if retryPolicy.RetryBackOff.MaxInterval != "" {
|
||
retryPolicy.RetryBackOff.maxInterval, err = time.ParseDuration(retryPolicy.RetryBackOff.MaxInterval)
|
||
if err != nil {
|
||
resultErr = multierror.Append(resultErr, fmt.Errorf("failed to parse RetryBackOff.MaxInterval %q: %w", retryPolicy.RetryBackOff.MaxInterval, err))
|
||
}
|
||
}
|
||
|
||
if retryPolicy.RetryBackOff.BaseInterval != "" && retryPolicy.RetryBackOff.baseInterval <= 0 {
|
||
resultErr = multierror.Append(resultErr, fmt.Errorf("RetryBackOff.BaseInterval %q must be greater than zero and less than or equal to RetryBackOff.MaxInterval %q",
|
||
retryPolicy.RetryBackOff.BaseInterval,
|
||
retryPolicy.RetryBackOff.MaxInterval),
|
||
)
|
||
}
|
||
if retryPolicy.RetryBackOff.MaxInterval != "" &&
|
||
retryPolicy.RetryBackOff.maxInterval < retryPolicy.RetryBackOff.baseInterval {
|
||
resultErr = multierror.Append(resultErr, fmt.Errorf("RetryBackOff.MaxInterval %q must be greater than or equal to RetryBackOff.BaseInterval %q",
|
||
retryPolicy.RetryBackOff.MaxInterval,
|
||
retryPolicy.RetryBackOff.BaseInterval),
|
||
)
|
||
}
|
||
return resultErr
|
||
}
|
||
|
||
func validateRuntime(s string) error {
|
||
for _, rt := range supportedRuntimes {
|
||
if s == rt {
|
||
return nil
|
||
}
|
||
}
|
||
return fmt.Errorf("unsupported runtime %q", s)
|
||
}
|