mirror of https://github.com/status-im/consul.git
Envoy config cluster (#5308)
* Start adding tests for cluster override * Refactor tests for clusters * Passing tests for custom upstream cluster override * Added capability to customise local app cluster * Rename config for local cluster override
This commit is contained in:
parent
aa338f7d86
commit
99fe9dabce
|
@ -1,15 +1,20 @@
|
|||
package xds
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
|
||||
envoyauth "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth"
|
||||
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
|
||||
"github.com/gogo/protobuf/jsonpb"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/gogo/protobuf/types"
|
||||
|
||||
"github.com/hashicorp/consul/agent/proxycfg"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
// clustersFromSnapshot returns the xDS API representation of the "clusters"
|
||||
|
@ -21,59 +26,146 @@ func clustersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]pro
|
|||
// Include the "app" cluster for the public listener
|
||||
clusters := make([]proto.Message, len(cfgSnap.Proxy.Upstreams)+1)
|
||||
|
||||
clusters[0] = makeAppCluster(cfgSnap)
|
||||
var err error
|
||||
clusters[0], err = makeAppCluster(cfgSnap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for idx, upstream := range cfgSnap.Proxy.Upstreams {
|
||||
clusters[idx+1] = makeUpstreamCluster(upstream.Identifier(), cfgSnap)
|
||||
clusters[idx+1], err = makeUpstreamCluster(upstream, cfgSnap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return clusters, nil
|
||||
}
|
||||
|
||||
func makeAppCluster(cfgSnap *proxycfg.ConfigSnapshot) *envoy.Cluster {
|
||||
addr := cfgSnap.Proxy.LocalServiceAddress
|
||||
if addr == "" {
|
||||
addr = "127.0.0.1"
|
||||
func makeAppCluster(cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Cluster, error) {
|
||||
var c *envoy.Cluster
|
||||
var err error
|
||||
|
||||
// If we have overriden local cluster config try to parse it into an Envoy cluster
|
||||
if clusterJSONRaw, ok := cfgSnap.Proxy.Config["envoy_local_cluster_json"]; ok {
|
||||
if clusterJSON, ok := clusterJSONRaw.(string); ok {
|
||||
c, err = makeClusterFromUserConfig(clusterJSON)
|
||||
if err != nil {
|
||||
return c, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return &envoy.Cluster{
|
||||
Name: LocalAppClusterName,
|
||||
// TODO(banks): make this configurable from the proxy config
|
||||
ConnectTimeout: 5 * time.Second,
|
||||
Type: envoy.Cluster_STATIC,
|
||||
// API v2 docs say hosts is deprecated and should use LoadAssignment as
|
||||
// below.. but it doesn't work for tcp_proxy target for some reason.
|
||||
Hosts: []*envoycore.Address{makeAddressPtr(addr, cfgSnap.Proxy.LocalServicePort)},
|
||||
// LoadAssignment: &envoy.ClusterLoadAssignment{
|
||||
// ClusterName: LocalAppClusterName,
|
||||
// Endpoints: []endpoint.LocalityLbEndpoints{
|
||||
// {
|
||||
// LbEndpoints: []endpoint.LbEndpoint{
|
||||
// makeEndpoint(LocalAppClusterName,
|
||||
// addr,
|
||||
// cfgSnap.Proxy.LocalServicePort),
|
||||
// },
|
||||
// },
|
||||
// },
|
||||
// },
|
||||
|
||||
if c == nil {
|
||||
addr := cfgSnap.Proxy.LocalServiceAddress
|
||||
if addr == "" {
|
||||
addr = "127.0.0.1"
|
||||
}
|
||||
c = &envoy.Cluster{
|
||||
Name: LocalAppClusterName,
|
||||
ConnectTimeout: 5 * time.Second,
|
||||
Type: envoy.Cluster_STATIC,
|
||||
// API v2 docs say hosts is deprecated and should use LoadAssignment as
|
||||
// below.. but it doesn't work for tcp_proxy target for some reason.
|
||||
Hosts: []*envoycore.Address{makeAddressPtr(addr, cfgSnap.Proxy.LocalServicePort)},
|
||||
// LoadAssignment: &envoy.ClusterLoadAssignment{
|
||||
// ClusterName: LocalAppClusterName,
|
||||
// Endpoints: []endpoint.LocalityLbEndpoints{
|
||||
// {
|
||||
// LbEndpoints: []endpoint.LbEndpoint{
|
||||
// makeEndpoint(LocalAppClusterName,
|
||||
// addr,
|
||||
// cfgSnap.Proxy.LocalServicePort),
|
||||
// },
|
||||
// },
|
||||
// },
|
||||
// },
|
||||
}
|
||||
}
|
||||
|
||||
return c, err
|
||||
}
|
||||
|
||||
func makeUpstreamCluster(name string, cfgSnap *proxycfg.ConfigSnapshot) *envoy.Cluster {
|
||||
return &envoy.Cluster{
|
||||
Name: name,
|
||||
// TODO(banks): make this configurable from the upstream config
|
||||
ConnectTimeout: 5 * time.Second,
|
||||
Type: envoy.Cluster_EDS,
|
||||
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
|
||||
EdsConfig: &envoycore.ConfigSource{
|
||||
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{
|
||||
Ads: &envoycore.AggregatedConfigSource{},
|
||||
func makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Cluster, error) {
|
||||
var c *envoy.Cluster
|
||||
var err error
|
||||
|
||||
// If we have overriden cluster config attempt to parse it into an Envoy cluster
|
||||
if clusterJSONRaw, ok := upstream.Config["envoy_cluster_json"]; ok {
|
||||
if clusterJSON, ok := clusterJSONRaw.(string); ok {
|
||||
c, err = makeClusterFromUserConfig(clusterJSON)
|
||||
if err != nil {
|
||||
return c, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if c == nil {
|
||||
c = &envoy.Cluster{
|
||||
Name: upstream.Identifier(),
|
||||
ConnectTimeout: 5 * time.Second,
|
||||
Type: envoy.Cluster_EDS,
|
||||
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
|
||||
EdsConfig: &envoycore.ConfigSource{
|
||||
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{
|
||||
Ads: &envoycore.AggregatedConfigSource{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
// Enable TLS upstream with the configured client certificate.
|
||||
TlsContext: &envoyauth.UpstreamTlsContext{
|
||||
CommonTlsContext: makeCommonTLSContext(cfgSnap),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Enable TLS upstream with the configured client certificate.
|
||||
c.TlsContext = &envoyauth.UpstreamTlsContext{
|
||||
CommonTlsContext: makeCommonTLSContext(cfgSnap),
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// makeClusterFromUserConfig returns the listener config decoded from an
|
||||
// arbitrary proto3 json format string or an error if it's invalid.
|
||||
//
|
||||
// For now we only support embedding in JSON strings because of the hcl parsing
|
||||
// pain (see config.go comment above call to patchSliceOfMaps). Until we
|
||||
// refactor config parser a _lot_ user's opaque config that contains arrays will
|
||||
// be mangled. We could actually fix that up in mapstructure which knows the
|
||||
// type of the target so could resolve the slices to singletons unambiguously
|
||||
// and it would work for us here... but we still have the problem that the
|
||||
// config would render incorrectly in general in our HTTP API responses so we
|
||||
// really need to fix it "properly".
|
||||
//
|
||||
// When we do that we can support just nesting the config directly into the
|
||||
// JSON/hcl naturally but this is a stop-gap that gets us an escape hatch
|
||||
// immediately. It's also probably not a bad thing to support long-term since
|
||||
// any config generated by other systems will likely be in canonical protobuf
|
||||
// from rather than our slight variant in JSON/hcl.
|
||||
func makeClusterFromUserConfig(configJSON string) (*envoy.Cluster, error) {
|
||||
var jsonFields map[string]*json.RawMessage
|
||||
if err := json.Unmarshal([]byte(configJSON), &jsonFields); err != nil {
|
||||
fmt.Println("Custom error", err, configJSON)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var c envoy.Cluster
|
||||
|
||||
if _, ok := jsonFields["@type"]; ok {
|
||||
// Type field is present so decode it as a types.Any
|
||||
var any types.Any
|
||||
err := jsonpb.UnmarshalString(configJSON, &any)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// And then unmarshal the listener again...
|
||||
err = proto.Unmarshal(any.Value, &c)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
//return nil, err
|
||||
}
|
||||
return &c, err
|
||||
}
|
||||
|
||||
// No @type so try decoding as a straight listener.
|
||||
err := jsonpb.UnmarshalString(configJSON, &c)
|
||||
return &c, err
|
||||
}
|
||||
|
|
|
@ -361,6 +361,88 @@ func expectListenerJSON(t *testing.T, snap *proxycfg.ConfigSnapshot, token strin
|
|||
expectListenerJSONResources(t, snap, token, v, n))
|
||||
}
|
||||
|
||||
func expectClustersJSONResources(t *testing.T, snap *proxycfg.ConfigSnapshot, token string, v, n uint64) map[string]string {
|
||||
return map[string]string{
|
||||
"local_app": `
|
||||
{
|
||||
"@type": "type.googleapis.com/envoy.api.v2.Cluster",
|
||||
"name": "local_app",
|
||||
"connectTimeout": "5s",
|
||||
"hosts": [
|
||||
{
|
||||
"socketAddress": {
|
||||
"address": "127.0.0.1",
|
||||
"portValue": 8080
|
||||
}
|
||||
}
|
||||
]
|
||||
}`,
|
||||
"service:db": `
|
||||
{
|
||||
"@type": "type.googleapis.com/envoy.api.v2.Cluster",
|
||||
"name": "service:db",
|
||||
"type": "EDS",
|
||||
"edsClusterConfig": {
|
||||
"edsConfig": {
|
||||
"ads": {
|
||||
|
||||
}
|
||||
}
|
||||
},
|
||||
"connectTimeout": "5s",
|
||||
"tlsContext": ` + expectedUpstreamTLSContextJSON(t, snap) + `
|
||||
}`,
|
||||
"prepared_query:geo-cache": `
|
||||
{
|
||||
"@type": "type.googleapis.com/envoy.api.v2.Cluster",
|
||||
"name": "prepared_query:geo-cache",
|
||||
"type": "EDS",
|
||||
"edsClusterConfig": {
|
||||
"edsConfig": {
|
||||
"ads": {
|
||||
|
||||
}
|
||||
}
|
||||
},
|
||||
"connectTimeout": "5s",
|
||||
"tlsContext": ` + expectedUpstreamTLSContextJSON(t, snap) + `
|
||||
}`,
|
||||
}
|
||||
}
|
||||
|
||||
func expectClustersJSONFromResources(t *testing.T, snap *proxycfg.ConfigSnapshot, token string, v, n uint64, resourcesJSON map[string]string) string {
|
||||
resJSON := ""
|
||||
|
||||
// Sort resources into specific order because that matters in JSONEq
|
||||
// comparison later.
|
||||
keyOrder := []string{"local_app"}
|
||||
for _, u := range snap.Proxy.Upstreams {
|
||||
keyOrder = append(keyOrder, u.Identifier())
|
||||
}
|
||||
for _, k := range keyOrder {
|
||||
j, ok := resourcesJSON[k]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if resJSON != "" {
|
||||
resJSON += ",\n"
|
||||
}
|
||||
resJSON += j
|
||||
}
|
||||
|
||||
return `{
|
||||
"versionInfo": "` + hexString(v) + `",
|
||||
"resources": [` + resJSON + `],
|
||||
"typeUrl": "type.googleapis.com/envoy.api.v2.Cluster",
|
||||
"nonce": "` + hexString(n) + `"
|
||||
}`
|
||||
}
|
||||
|
||||
func expectClustersJSON(t *testing.T, snap *proxycfg.ConfigSnapshot, token string, v, n uint64) string {
|
||||
return expectClustersJSONFromResources(t, snap, token, v, n,
|
||||
expectClustersJSONResources(t, snap, token, v, n))
|
||||
}
|
||||
|
||||
func expectEndpointsJSON(t *testing.T, snap *proxycfg.ConfigSnapshot, token string, v, n uint64) string {
|
||||
return `{
|
||||
"versionInfo": "` + hexString(v) + `",
|
||||
|
@ -401,58 +483,6 @@ func expectEndpointsJSON(t *testing.T, snap *proxycfg.ConfigSnapshot, token stri
|
|||
}`
|
||||
}
|
||||
|
||||
func expectClustersJSON(t *testing.T, snap *proxycfg.ConfigSnapshot, token string, v, n uint64) string {
|
||||
return `{
|
||||
"versionInfo": "` + hexString(v) + `",
|
||||
"resources": [
|
||||
{
|
||||
"@type": "type.googleapis.com/envoy.api.v2.Cluster",
|
||||
"name": "local_app",
|
||||
"connectTimeout": "5s",
|
||||
"hosts": [
|
||||
{
|
||||
"socketAddress": {
|
||||
"address": "127.0.0.1",
|
||||
"portValue": 8080
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"@type": "type.googleapis.com/envoy.api.v2.Cluster",
|
||||
"name": "service:db",
|
||||
"type": "EDS",
|
||||
"edsClusterConfig": {
|
||||
"edsConfig": {
|
||||
"ads": {
|
||||
|
||||
}
|
||||
}
|
||||
},
|
||||
"connectTimeout": "5s",
|
||||
"tlsContext": ` + expectedUpstreamTLSContextJSON(t, snap) + `
|
||||
},
|
||||
{
|
||||
"@type": "type.googleapis.com/envoy.api.v2.Cluster",
|
||||
"name": "prepared_query:geo-cache",
|
||||
"type": "EDS",
|
||||
"edsClusterConfig": {
|
||||
"edsConfig": {
|
||||
"ads": {
|
||||
|
||||
}
|
||||
}
|
||||
},
|
||||
"connectTimeout": "5s",
|
||||
"tlsContext": ` + expectedUpstreamTLSContextJSON(t, snap) + `
|
||||
}
|
||||
],
|
||||
"typeUrl": "type.googleapis.com/envoy.api.v2.Cluster",
|
||||
"nonce": "` + hexString(n) + `"
|
||||
}
|
||||
`
|
||||
}
|
||||
|
||||
func expectedUpstreamTLSContextJSON(t *testing.T, snap *proxycfg.ConfigSnapshot) string {
|
||||
return expectedTLSContextJSON(t, snap, false)
|
||||
}
|
||||
|
@ -973,7 +1003,7 @@ func TestServer_Check(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestServer_ConfigOverrides(t *testing.T) {
|
||||
func TestServer_ConfigOverridesListeners(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
|
@ -1118,6 +1148,123 @@ func TestServer_ConfigOverrides(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestServer_ConfigOverridesClusters(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
setup func(snap *proxycfg.ConfigSnapshot) string
|
||||
}{
|
||||
{
|
||||
name: "sanity check no custom",
|
||||
setup: func(snap *proxycfg.ConfigSnapshot) string {
|
||||
// Default snap and expectation
|
||||
return expectClustersJSON(t, snap, "my-token", 1, 1)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "custom public with no type",
|
||||
setup: func(snap *proxycfg.ConfigSnapshot) string {
|
||||
snap.Proxy.Config["envoy_local_cluster_json"] =
|
||||
customAppClusterJSON(t, customClusterJSONOptions{
|
||||
Name: "mylocal",
|
||||
IncludeType: false,
|
||||
})
|
||||
resources := expectClustersJSONResources(t, snap, "my-token", 1, 1)
|
||||
|
||||
// Replace an upstream listener with the custom one WITH type since
|
||||
// that's how it comes out the other end.
|
||||
resources["local_app"] =
|
||||
customAppClusterJSON(t, customClusterJSONOptions{
|
||||
Name: "mylocal",
|
||||
IncludeType: true,
|
||||
})
|
||||
return expectClustersJSONFromResources(t, snap, "my-token", 1, 1, resources)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "custom public with type",
|
||||
setup: func(snap *proxycfg.ConfigSnapshot) string {
|
||||
snap.Proxy.Config["envoy_local_cluster_json"] =
|
||||
customAppClusterJSON(t, customClusterJSONOptions{
|
||||
Name: "mylocal",
|
||||
IncludeType: true,
|
||||
})
|
||||
resources := expectClustersJSONResources(t, snap, "my-token", 1, 1)
|
||||
|
||||
// Replace an upstream listener with the custom one WITH type since
|
||||
// that's how it comes out the other end.
|
||||
resources["local_app"] =
|
||||
customAppClusterJSON(t, customClusterJSONOptions{
|
||||
Name: "mylocal",
|
||||
IncludeType: true,
|
||||
})
|
||||
return expectClustersJSONFromResources(t, snap, "my-token", 1, 1, resources)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "custom upstream with no type",
|
||||
setup: func(snap *proxycfg.ConfigSnapshot) string {
|
||||
snap.Proxy.Upstreams[0].Config["envoy_cluster_json"] =
|
||||
customEDSClusterJSON(t, customClusterJSONOptions{
|
||||
Name: "myservice",
|
||||
IncludeType: false,
|
||||
})
|
||||
resources := expectClustersJSONResources(t, snap, "my-token", 1, 1)
|
||||
|
||||
// Replace an upstream listener with the custom one WITH type since
|
||||
// that's how it comes out the other end.
|
||||
resources["service:db"] =
|
||||
customEDSClusterJSON(t, customClusterJSONOptions{
|
||||
Name: "myservice",
|
||||
IncludeType: true,
|
||||
TLSContext: expectedUpstreamTLSContextJSON(t, snap),
|
||||
})
|
||||
return expectClustersJSONFromResources(t, snap, "my-token", 1, 1, resources)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "custom upstream with type",
|
||||
setup: func(snap *proxycfg.ConfigSnapshot) string {
|
||||
snap.Proxy.Upstreams[0].Config["envoy_cluster_json"] =
|
||||
customEDSClusterJSON(t, customClusterJSONOptions{
|
||||
Name: "myservice",
|
||||
IncludeType: true,
|
||||
})
|
||||
resources := expectClustersJSONResources(t, snap, "my-token", 1, 1)
|
||||
|
||||
// Replace an upstream listener with the custom one WITH type since
|
||||
// that's how it comes out the other end.
|
||||
resources["service:db"] =
|
||||
customEDSClusterJSON(t, customClusterJSONOptions{
|
||||
Name: "myservice",
|
||||
IncludeType: true,
|
||||
TLSContext: expectedUpstreamTLSContextJSON(t, snap),
|
||||
})
|
||||
return expectClustersJSONFromResources(t, snap, "my-token", 1, 1, resources)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
// Sanity check default with no overrides first
|
||||
snap := proxycfg.TestConfigSnapshot(t)
|
||||
expect := tt.setup(snap)
|
||||
|
||||
clusters, err := clustersFromSnapshot(snap, "my-token")
|
||||
require.NoError(err)
|
||||
r, err := createResponse(ClusterType, "00000001", "00000001", clusters)
|
||||
require.NoError(err)
|
||||
|
||||
fmt.Println(r)
|
||||
|
||||
assertResponse(t, r, expect)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type customListenerJSONOptions struct {
|
||||
Name string
|
||||
IncludeType bool
|
||||
|
@ -1182,3 +1329,67 @@ func customListenerJSON(t *testing.T, opts customListenerJSONOptions) string {
|
|||
require.NoError(t, err)
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
type customClusterJSONOptions struct {
|
||||
Name string
|
||||
IncludeType bool
|
||||
TLSContext string
|
||||
}
|
||||
|
||||
var customEDSClusterJSONTpl = `{
|
||||
{{ if .IncludeType -}}
|
||||
"@type": "type.googleapis.com/envoy.api.v2.Cluster",
|
||||
{{- end }}
|
||||
{{ if .TLSContext -}}
|
||||
"tlsContext": {{ .TLSContext }},
|
||||
{{- end }}
|
||||
"name": "{{ .Name }}",
|
||||
"type": "EDS",
|
||||
"edsClusterConfig": {
|
||||
"edsConfig": {
|
||||
"ads": {
|
||||
|
||||
}
|
||||
}
|
||||
},
|
||||
"connectTimeout": "5s"
|
||||
}`
|
||||
|
||||
var customEDSClusterJSONTemplate = template.Must(template.New("").Parse(customEDSClusterJSONTpl))
|
||||
|
||||
func customEDSClusterJSON(t *testing.T, opts customClusterJSONOptions) string {
|
||||
t.Helper()
|
||||
var buf bytes.Buffer
|
||||
err := customEDSClusterJSONTemplate.Execute(&buf, opts)
|
||||
require.NoError(t, err)
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
var customAppClusterJSONTpl = `{
|
||||
{{ if .IncludeType -}}
|
||||
"@type": "type.googleapis.com/envoy.api.v2.Cluster",
|
||||
{{- end }}
|
||||
{{ if .TLSContext -}}
|
||||
"tlsContext": {{ .TLSContext }},
|
||||
{{- end }}
|
||||
"name": "{{ .Name }}",
|
||||
"connectTimeout": "5s",
|
||||
"hosts": [
|
||||
{
|
||||
"socketAddress": {
|
||||
"address": "127.0.0.1",
|
||||
"portValue": 8080
|
||||
}
|
||||
}
|
||||
]
|
||||
}`
|
||||
|
||||
var customAppClusterJSONTemplate = template.Must(template.New("").Parse(customAppClusterJSONTpl))
|
||||
|
||||
func customAppClusterJSON(t *testing.T, opts customClusterJSONOptions) string {
|
||||
t.Helper()
|
||||
var buf bytes.Buffer
|
||||
err := customAppClusterJSONTemplate.Execute(&buf, opts)
|
||||
require.NoError(t, err)
|
||||
return buf.String()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue