add order by locality failover to Consul enterprise (#16791)

This commit is contained in:
Eric Haberkorn 2023-03-30 10:08:38 -04:00 committed by GitHub
parent 39a0c4f5c4
commit 0d1d2fc4c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1339 additions and 1287 deletions

3
.changelog/_4734.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:feature
connect: **(Consul Enterprise only)** Implement order-by-locality failover.
```

View File

@ -20,7 +20,7 @@ func TestCompiledDiscoveryChain(t *testing.T) {
typ := &CompiledDiscoveryChain{RPC: rpc}
// just do the default chain
chain := discoverychain.TestCompileConfigEntries(t, "web", "default", "default", "dc1", "trustdomain.consul", nil)
chain := discoverychain.TestCompileConfigEntries(t, "web", "default", "default", "dc1", "trustdomain.consul", nil, nil)
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.

View File

@ -17,11 +17,11 @@ func TestCompileConfigEntries(t testing.T,
evaluateInPartition string,
evaluateInDatacenter string,
evaluateInTrustDomain string,
setup func(req *CompileRequest), entries ...structs.ConfigEntry) *structs.CompiledDiscoveryChain {
set := configentry.NewDiscoveryChainSet()
set.AddEntries(entries...)
setup func(req *CompileRequest),
set *configentry.DiscoveryChainSet) *structs.CompiledDiscoveryChain {
if set == nil {
set = configentry.NewDiscoveryChainSet()
}
req := CompileRequest{
ServiceName: serviceName,
EvaluateInNamespace: evaluateInNamespace,

View File

@ -11,6 +11,7 @@ import (
"github.com/hashicorp/consul/acl"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/proxycfg/internal/watch"
@ -57,21 +58,20 @@ func TestManager_BasicLifecycle(t *testing.T) {
roots, leaf := TestCerts(t)
dbDefaultChain := func() *structs.CompiledDiscoveryChain {
set := configentry.NewDiscoveryChainSet()
set.AddEntries(&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "db",
})
return discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", connect.TestClusterID+".consul", func(req *discoverychain.CompileRequest) {
// This is because structs.TestUpstreams uses an opaque config
// to override connect timeouts.
req.OverrideConnectTimeout = 1 * time.Second
}, &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "db",
})
}, set)
}
dbSplitChain := func() *structs.CompiledDiscoveryChain {
return discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", "trustdomain.consul", func(req *discoverychain.CompileRequest) {
// This is because structs.TestUpstreams uses an opaque config
// to override connect timeouts.
req.OverrideConnectTimeout = 1 * time.Second
}, &structs.ProxyConfigEntry{
set := configentry.NewDiscoveryChainSet()
set.AddEntries(&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
@ -96,6 +96,11 @@ func TestManager_BasicLifecycle(t *testing.T) {
{Weight: 40, ServiceSubset: "v2"},
},
})
return discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", "trustdomain.consul", func(req *discoverychain.CompileRequest) {
// This is because structs.TestUpstreams uses an opaque config
// to override connect timeouts.
req.OverrideConnectTimeout = 1 * time.Second
}, set)
}
upstreams := structs.TestUpstreams(t, false)

View File

@ -17,6 +17,7 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs"
apimod "github.com/hashicorp/consul/api"
@ -437,6 +438,12 @@ func upstreamIDForDC2(uid UpstreamID) UpstreamID {
return uid
}
func discoChainSetWithEntries(entries ...structs.ConfigEntry) *configentry.DiscoveryChainSet {
set := configentry.NewDiscoveryChainSet()
set.AddEntries(entries...)
return set
}
// This test is meant to exercise the various parts of the cache watching done by the state as
// well as its management of the ConfigSnapshot
//
@ -680,7 +687,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "default", "dc1", "trustdomain.consul",
func(req *discoverychain.CompileRequest) {
req.OverrideMeshGateway.Mode = meshGatewayProxyConfigValue
}),
}, nil),
},
Err: nil,
},
@ -690,7 +697,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-remote", "default", "default", "dc2", "trustdomain.consul",
func(req *discoverychain.CompileRequest) {
req.OverrideMeshGateway.Mode = structs.MeshGatewayModeRemote
}),
}, nil),
},
Err: nil,
},
@ -700,7 +707,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-local", "default", "default", "dc2", "trustdomain.consul",
func(req *discoverychain.CompileRequest) {
req.OverrideMeshGateway.Mode = structs.MeshGatewayModeLocal
}),
}, nil),
},
Err: nil,
},
@ -710,7 +717,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-direct", "default", "default", "dc2", "trustdomain.consul",
func(req *discoverychain.CompileRequest) {
req.OverrideMeshGateway.Mode = structs.MeshGatewayModeNone
}),
}, nil),
},
Err: nil,
},
@ -720,14 +727,14 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Chain: discoverychain.TestCompileConfigEntries(t, "api-dc2", "default", "default", "dc1", "trustdomain.consul",
func(req *discoverychain.CompileRequest) {
req.OverrideMeshGateway.Mode = meshGatewayProxyConfigValue
}, &structs.ServiceResolverConfigEntry{
}, discoChainSetWithEntries(&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "api-dc2",
Redirect: &structs.ServiceResolverRedirect{
Service: "api",
Datacenter: "dc2",
},
}),
})),
},
Err: nil,
},
@ -737,7 +744,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-to-peer", "default", "default", "dc1", "trustdomain.consul",
func(req *discoverychain.CompileRequest) {
req.OverrideMeshGateway.Mode = meshGatewayProxyConfigValue
}, &structs.ServiceResolverConfigEntry{
}, discoChainSetWithEntries(&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "api-failover-to-peer",
Failover: map[string]structs.ServiceResolverFailover{
@ -747,7 +754,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
},
},
}),
})),
},
Err: nil,
},
@ -1509,7 +1516,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
{
CorrelationID: "discovery-chain:" + apiUID.String(),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "default", "dc1", "trustdomain.consul", nil),
Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "default", "dc1", "trustdomain.consul", nil, nil),
},
Err: nil,
},
@ -2390,13 +2397,13 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(
t, "db", "default", "default", "dc1", "trustdomain.consul", nil,
&structs.ServiceConfigEntry{
discoChainSetWithEntries(&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "db",
TransparentProxy: structs.TransparentProxyConfig{
DialedDirectly: true,
},
},
}),
),
},
Err: nil,
@ -2569,13 +2576,14 @@ func TestState_WatchesAndUpdates(t *testing.T) {
{
CorrelationID: "discovery-chain:" + dbUID.String(),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", "trustdomain.consul", nil, &structs.ServiceResolverConfigEntry{
Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", "trustdomain.consul", nil,
discoChainSetWithEntries(&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "db",
Redirect: &structs.ServiceResolverRedirect{
Service: "mysql",
},
}),
})),
},
Err: nil,
},
@ -3100,7 +3108,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc2", "trustdomain.consul",
func(req *discoverychain.CompileRequest) {
req.OverrideMeshGateway.Mode = structs.MeshGatewayModeLocal
}),
}, nil),
},
Err: nil,
},
@ -3544,7 +3552,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
{
CorrelationID: fmt.Sprintf("discovery-chain:%s", apiUID.String()),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "default", "dc1", "trustdomain.consul", nil),
Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "default", "dc1", "trustdomain.consul", nil, nil),
},
Err: nil,
},
@ -3692,7 +3700,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
{
CorrelationID: fmt.Sprintf("discovery-chain:%s", hcpCollectorUID.String()),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, hcpCollector.Name, "default", "default", "dc1", "trustdomain.consul", nil),
Chain: discoverychain.TestCompileConfigEntries(t, hcpCollector.Name, "default", "default", "dc1", "trustdomain.consul", nil, nil),
},
Err: nil,
},

View File

@ -10,6 +10,7 @@ import (
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/mitchellh/go-testing-interface"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/structs"
)
@ -86,12 +87,15 @@ func TestConfigSnapshotAPIGateway(
},
}
set := configentry.NewDiscoveryChainSet()
set.AddEntries(entries...)
// Add a discovery chain watch event for each service.
for _, serviceName := range route.GetServiceNames() {
discoChain := UpdateEvent{
CorrelationID: fmt.Sprintf("discovery-chain:%s", UpstreamIDString("", "", serviceName.Name, &serviceName.EnterpriseMeta, "")),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, serviceName.Name, "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...),
Chain: discoverychain.TestCompileConfigEntries(t, serviceName.Name, "default", "default", "dc1", connect.TestClusterID+".consul", nil, set),
},
}
baseEvents = append(baseEvents, discoChain)

View File

@ -23,7 +23,7 @@ func TestConfigSnapshot(t testing.T, nsFn func(ns *structs.NodeService), extraUp
roots, leaf := TestCerts(t)
// no entries implies we'll get a default chain
dbChain := discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
dbChain := discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
assert.True(t, dbChain.Default)
var (
@ -309,7 +309,7 @@ func TestConfigSnapshotHCPMetrics(t testing.T) *ConfigSnapshot {
var (
collector = structs.NewServiceName(api.HCPMetricsCollectorName, nil)
collectorUID = NewUpstreamIDFromServiceName(collector)
collectorChain = discoverychain.TestCompileConfigEntries(t, api.HCPMetricsCollectorName, "default", "default", "dc1", connect.TestClusterID+".consul", nil)
collectorChain = discoverychain.TestCompileConfigEntries(t, api.HCPMetricsCollectorName, "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
)
return TestConfigSnapshot(t, func(ns *structs.NodeService) {

View File

@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs"
@ -156,6 +157,7 @@ func TestConfigSnapshotIngressGatewaySDS_GatewayLevel_MixedTLS(t testing.T) *Con
"dc1",
connect.TestClusterID+".consul",
nil,
nil,
)
insecureUID := UpstreamIDFromString("insecure")
@ -167,6 +169,7 @@ func TestConfigSnapshotIngressGatewaySDS_GatewayLevel_MixedTLS(t testing.T) *Con
"dc1",
connect.TestClusterID+".consul",
nil,
nil,
)
return TestConfigSnapshotIngressGateway(t, false, "tcp", "default", nil, func(entry *structs.IngressGatewayConfigEntry) {
@ -276,15 +279,16 @@ func TestConfigSnapshotIngressGatewaySDS_GatewayAndListenerLevel(t testing.T) *C
}
func TestConfigSnapshotIngressGatewaySDS_GatewayAndListenerLevel_HTTP(t testing.T) *ConfigSnapshot {
var (
http = structs.NewServiceName("http", nil)
httpUID = NewUpstreamIDFromServiceName(http)
httpChain = discoverychain.TestCompileConfigEntries(t, "http", "default", "default", "dc1", connect.TestClusterID+".consul", nil,
&structs.ServiceConfigEntry{
set := configentry.NewDiscoveryChainSet()
set.AddEntries(&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "http",
Protocol: "http",
})
var (
http = structs.NewServiceName("http", nil)
httpUID = NewUpstreamIDFromServiceName(http)
httpChain = discoverychain.TestCompileConfigEntries(t, "http", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
)
return TestConfigSnapshotIngressGateway(t, false, "http", "default", nil, func(entry *structs.IngressGatewayConfigEntry) {
@ -345,11 +349,11 @@ func TestConfigSnapshotIngressGatewaySDS_ServiceLevel(t testing.T) *ConfigSnapsh
var (
s1 = structs.NewServiceName("s1", nil)
s1UID = NewUpstreamIDFromServiceName(s1)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s2 = structs.NewServiceName("s2", nil)
s2UID = NewUpstreamIDFromServiceName(s2)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
)
return TestConfigSnapshotIngressGateway(t, false, "tcp", "default", nil, func(entry *structs.IngressGatewayConfigEntry) {
@ -436,11 +440,11 @@ func TestConfigSnapshotIngressGatewaySDS_ListenerAndServiceLevel(t testing.T) *C
var (
s1 = structs.NewServiceName("s1", nil)
s1UID = NewUpstreamIDFromServiceName(s1)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s2 = structs.NewServiceName("s2", nil)
s2UID = NewUpstreamIDFromServiceName(s2)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
)
return TestConfigSnapshotIngressGateway(t, false, "tcp", "default", nil, func(entry *structs.IngressGatewayConfigEntry) {
@ -526,11 +530,11 @@ func TestConfigSnapshotIngressGatewaySDS_MixedNoTLS(t testing.T) *ConfigSnapshot
var (
s1 = structs.NewServiceName("s1", nil)
s1UID = NewUpstreamIDFromServiceName(s1)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s2 = structs.NewServiceName("s2", nil)
s2UID = NewUpstreamIDFromServiceName(s2)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
)
return TestConfigSnapshotIngressGateway(t, false, "tcp", "default", nil, func(entry *structs.IngressGatewayConfigEntry) {
@ -611,11 +615,11 @@ func TestConfigSnapshotIngressGateway_MixedListeners(t testing.T) *ConfigSnapsho
var (
s1 = structs.NewServiceName("s1", nil)
s1UID = NewUpstreamIDFromServiceName(s1)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s2 = structs.NewServiceName("s2", nil)
s2UID = NewUpstreamIDFromServiceName(s2)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
)
return TestConfigSnapshotIngressGateway(t, false, "tcp", "default", nil, func(entry *structs.IngressGatewayConfigEntry) {
@ -713,22 +717,25 @@ func TestConfigSnapshotIngress_HTTPMultipleServices(t testing.T) *ConfigSnapshot
},
}
set := configentry.NewDiscoveryChainSet()
set.AddEntries(entries...)
var (
foo = structs.NewServiceName("foo", nil)
fooUID = NewUpstreamIDFromServiceName(foo)
fooChain = discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...)
fooChain = discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
bar = structs.NewServiceName("bar", nil)
barUID = NewUpstreamIDFromServiceName(bar)
barChain = discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...)
barChain = discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
baz = structs.NewServiceName("baz", nil)
bazUID = NewUpstreamIDFromServiceName(baz)
bazChain = discoverychain.TestCompileConfigEntries(t, "baz", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...)
bazChain = discoverychain.TestCompileConfigEntries(t, "baz", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
qux = structs.NewServiceName("qux", nil)
quxUID = NewUpstreamIDFromServiceName(qux)
quxChain = discoverychain.TestCompileConfigEntries(t, "qux", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...)
quxChain = discoverychain.TestCompileConfigEntries(t, "qux", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
)
require.False(t, fooChain.Default)
@ -870,14 +877,17 @@ func TestConfigSnapshotIngress_GRPCMultipleServices(t testing.T) *ConfigSnapshot
},
}
set := configentry.NewDiscoveryChainSet()
set.AddEntries(entries...)
var (
foo = structs.NewServiceName("foo", nil)
fooUID = NewUpstreamIDFromServiceName(foo)
fooChain = discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...)
fooChain = discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
bar = structs.NewServiceName("bar", nil)
barUID = NewUpstreamIDFromServiceName(bar)
barChain = discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...)
barChain = discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
)
require.False(t, fooChain.Default)
@ -955,11 +965,11 @@ func TestConfigSnapshotIngress_MultipleListenersDuplicateService(t testing.T) *C
var (
foo = structs.NewServiceName("foo", nil)
fooUID = NewUpstreamIDFromServiceName(foo)
fooChain = discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
fooChain = discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
bar = structs.NewServiceName("bar", nil)
barUID = NewUpstreamIDFromServiceName(bar)
barChain = discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
barChain = discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
)
return TestConfigSnapshotIngressGateway(t, false, "http", "default", nil, func(entry *structs.IngressGatewayConfigEntry) {
@ -1231,14 +1241,17 @@ func TestConfigSnapshotIngressGatewayWithChain(
},
}
set := configentry.NewDiscoveryChainSet()
set.AddEntries(entries...)
webChain := discoverychain.TestCompileConfigEntries(t, "web",
webEntMeta.NamespaceOrDefault(),
webEntMeta.PartitionOrDefault(), "dc1",
connect.TestClusterID+".consul", nil, entries...)
connect.TestClusterID+".consul", nil, set)
fooChain := discoverychain.TestCompileConfigEntries(t, "foo",
fooEntMeta.NamespaceOrDefault(),
fooEntMeta.PartitionOrDefault(), "dc1",
connect.TestClusterID+".consul", nil, entries...)
connect.TestClusterID+".consul", nil, set)
updates = []UpdateEvent{
{
@ -1294,19 +1307,19 @@ func TestConfigSnapshotIngressGateway_TLSMinVersionListenersGatewayDefaults(t te
var (
s1 = structs.NewServiceName("s1", nil)
s1UID = NewUpstreamIDFromServiceName(s1)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s2 = structs.NewServiceName("s2", nil)
s2UID = NewUpstreamIDFromServiceName(s2)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s3 = structs.NewServiceName("s3", nil)
s3UID = NewUpstreamIDFromServiceName(s3)
s3Chain = discoverychain.TestCompileConfigEntries(t, "s3", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s3Chain = discoverychain.TestCompileConfigEntries(t, "s3", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s4 = structs.NewServiceName("s4", nil)
s4UID = NewUpstreamIDFromServiceName(s4)
s4Chain = discoverychain.TestCompileConfigEntries(t, "s4", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s4Chain = discoverychain.TestCompileConfigEntries(t, "s4", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
)
return TestConfigSnapshotIngressGateway(t, true, "tcp", "default", nil,
@ -1460,11 +1473,11 @@ func TestConfigSnapshotIngressGateway_SingleTLSListener(t testing.T) *ConfigSnap
var (
s1 = structs.NewServiceName("s1", nil)
s1UID = NewUpstreamIDFromServiceName(s1)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s2 = structs.NewServiceName("s2", nil)
s2UID = NewUpstreamIDFromServiceName(s2)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
)
return TestConfigSnapshotIngressGateway(t, true, "tcp", "simple", nil,
func(entry *structs.IngressGatewayConfigEntry) {
@ -1539,11 +1552,11 @@ func TestConfigSnapshotIngressGateway_SingleTLSListener_GRPC(t testing.T) *Confi
var (
s1 = structs.NewServiceName("s1", nil)
s1UID = NewUpstreamIDFromServiceName(s1)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s2 = structs.NewServiceName("s2", nil)
s2UID = NewUpstreamIDFromServiceName(s2)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
)
return TestConfigSnapshotIngressGateway(t, true, "grpc", "simple", nil,
func(entry *structs.IngressGatewayConfigEntry) {
@ -1618,11 +1631,11 @@ func TestConfigSnapshotIngressGateway_SingleTLSListener_HTTP2(t testing.T) *Conf
var (
s1 = structs.NewServiceName("s1", nil)
s1UID = NewUpstreamIDFromServiceName(s1)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s2 = structs.NewServiceName("s2", nil)
s2UID = NewUpstreamIDFromServiceName(s2)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
)
return TestConfigSnapshotIngressGateway(t, true, "http2", "simple", nil,
func(entry *structs.IngressGatewayConfigEntry) {
@ -1697,11 +1710,11 @@ func TestConfigSnapshotIngressGateway_MultiTLSListener_MixedHTTP2gRPC(t testing.
var (
s1 = structs.NewServiceName("s1", nil)
s1UID = NewUpstreamIDFromServiceName(s1)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s2 = structs.NewServiceName("s2", nil)
s2UID = NewUpstreamIDFromServiceName(s2)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
)
return TestConfigSnapshotIngressGateway(t, true, "tcp", "simple", nil,
func(entry *structs.IngressGatewayConfigEntry) {
@ -1780,11 +1793,11 @@ func TestConfigSnapshotIngressGateway_GWTLSListener_MixedHTTP2gRPC(t testing.T)
var (
s1 = structs.NewServiceName("s1", nil)
s1UID = NewUpstreamIDFromServiceName(s1)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s2 = structs.NewServiceName("s2", nil)
s2UID = NewUpstreamIDFromServiceName(s2)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
)
return TestConfigSnapshotIngressGateway(t, true, "tcp", "simple", nil,
func(entry *structs.IngressGatewayConfigEntry) {
@ -1859,15 +1872,15 @@ func TestConfigSnapshotIngressGateway_TLSMixedMinVersionListeners(t testing.T) *
var (
s1 = structs.NewServiceName("s1", nil)
s1UID = NewUpstreamIDFromServiceName(s1)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s2 = structs.NewServiceName("s2", nil)
s2UID = NewUpstreamIDFromServiceName(s2)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s3 = structs.NewServiceName("s3", nil)
s3UID = NewUpstreamIDFromServiceName(s3)
s3Chain = discoverychain.TestCompileConfigEntries(t, "s3", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
s3Chain = discoverychain.TestCompileConfigEntries(t, "s3", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
)
return TestConfigSnapshotIngressGateway(t, true, "tcp", "default", nil,

View File

@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs"
@ -594,14 +595,16 @@ func TestConfigSnapshotPeeredMeshGateway(t testing.T, variant string, nsFn func(
entries = append(entries, proxyDefaults)
fallthrough // to-case: "default-services-tcp"
case "default-services-tcp":
set := configentry.NewDiscoveryChainSet()
set.AddEntries(entries...)
var (
fooSN = structs.NewServiceName("foo", nil)
barSN = structs.NewServiceName("bar", nil)
girSN = structs.NewServiceName("gir", nil)
fooChain = discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...)
barChain = discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...)
girChain = discoverychain.TestCompileConfigEntries(t, "gir", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...)
fooChain = discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
barChain = discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
girChain = discoverychain.TestCompileConfigEntries(t, "gir", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
)
assert.True(t, fooChain.Default)
@ -745,11 +748,14 @@ func TestConfigSnapshotPeeredMeshGateway(t testing.T, variant string, nsFn func(
require.NoError(t, entry.Validate())
}
set := configentry.NewDiscoveryChainSet()
set.AddEntries(entries...)
var (
dbSN = structs.NewServiceName("db", nil)
altSN = structs.NewServiceName("alt", nil)
dbChain = discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...)
dbChain = discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
)
needPeerA = true

View File

@ -10,6 +10,7 @@ import (
"github.com/mitchellh/go-testing-interface"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs"
@ -21,11 +22,11 @@ func TestConfigSnapshotTransparentProxy(t testing.T) *ConfigSnapshot {
var (
google = structs.NewServiceName("google", nil)
googleUID = NewUpstreamIDFromServiceName(google)
googleChain = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
googleChain = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
noEndpoints = structs.NewServiceName("no-endpoints", nil)
noEndpointsUID = NewUpstreamIDFromServiceName(noEndpoints)
noEndpointsChain = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
noEndpointsChain = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
db = structs.NewServiceName("db", nil)
)
@ -129,18 +130,19 @@ func TestConfigSnapshotTransparentProxyHTTPUpstream(t testing.T, additionalEntri
},
})
set := configentry.NewDiscoveryChainSet()
set.AddEntries(entries...)
// DiscoveryChain without an UpstreamConfig should yield a
// filter chain when in transparent proxy mode
var (
google = structs.NewServiceName("google", nil)
googleUID = NewUpstreamIDFromServiceName(google)
googleChain = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil,
entries...,
)
googleChain = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
noEndpoints = structs.NewServiceName("no-endpoints", nil)
noEndpointsUID = NewUpstreamIDFromServiceName(noEndpoints)
noEndpointsChain = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
noEndpointsChain = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
db = structs.NewServiceName("db", nil)
nodes = []structs.CheckServiceNode{
@ -253,11 +255,11 @@ func TestConfigSnapshotTransparentProxyCatalogDestinationsOnly(t testing.T) *Con
var (
google = structs.NewServiceName("google", nil)
googleUID = NewUpstreamIDFromServiceName(google)
googleChain = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
googleChain = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
noEndpoints = structs.NewServiceName("no-endpoints", nil)
noEndpointsUID = NewUpstreamIDFromServiceName(noEndpoints)
noEndpointsChain = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
noEndpointsChain = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
db = structs.NewServiceName("db", nil)
)
@ -334,20 +336,23 @@ func TestConfigSnapshotTransparentProxyCatalogDestinationsOnly(t testing.T) *Con
}
func TestConfigSnapshotTransparentProxyDialDirectly(t testing.T) *ConfigSnapshot {
set := configentry.NewDiscoveryChainSet()
set.AddEntries(&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "mongo",
ConnectTimeout: 33 * time.Second,
})
// DiscoveryChain without an UpstreamConfig should yield a
// filter chain when in transparent proxy mode
var (
kafka = structs.NewServiceName("kafka", nil)
kafkaUID = NewUpstreamIDFromServiceName(kafka)
kafkaChain = discoverychain.TestCompileConfigEntries(t, "kafka", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
kafkaChain = discoverychain.TestCompileConfigEntries(t, "kafka", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
mongo = structs.NewServiceName("mongo", nil)
mongoUID = NewUpstreamIDFromServiceName(mongo)
mongoChain = discoverychain.TestCompileConfigEntries(t, "mongo", "default", "default", "dc1", connect.TestClusterID+".consul", nil, &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "mongo",
ConnectTimeout: 33 * time.Second,
})
mongoChain = discoverychain.TestCompileConfigEntries(t, "mongo", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
db = structs.NewServiceName("db", nil)
)
@ -456,23 +461,23 @@ func TestConfigSnapshotTransparentProxyDialDirectly(t testing.T) *ConfigSnapshot
}
func TestConfigSnapshotTransparentProxyResolverRedirectUpstream(t testing.T) *ConfigSnapshot {
// Service-Resolver redirect with explicit upstream should spawn an outbound listener.
var (
db = structs.NewServiceName("db-redir", nil)
dbUID = NewUpstreamIDFromServiceName(db)
dbChain = discoverychain.TestCompileConfigEntries(t, "db-redir", "default", "default", "dc1", connect.TestClusterID+".consul", nil,
&structs.ServiceResolverConfigEntry{
set := configentry.NewDiscoveryChainSet()
set.AddEntries(&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "db-redir",
Redirect: &structs.ServiceResolverRedirect{
Service: "db",
},
},
)
})
// Service-Resolver redirect with explicit upstream should spawn an outbound listener.
var (
db = structs.NewServiceName("db-redir", nil)
dbUID = NewUpstreamIDFromServiceName(db)
dbChain = discoverychain.TestCompileConfigEntries(t, "db-redir", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
google = structs.NewServiceName("google", nil)
googleUID = NewUpstreamIDFromServiceName(google)
googleChain = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
googleChain = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
)
return TestConfigSnapshot(t, func(ns *structs.NodeService) {
@ -530,11 +535,11 @@ func TestConfigSnapshotTransparentProxyTerminatingGatewayCatalogDestinationsOnly
var (
google = structs.NewServiceName("google", nil)
googleUID = NewUpstreamIDFromServiceName(google)
googleChain = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
googleChain = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
kafka = structs.NewServiceName("kafka", nil)
kafkaUID = NewUpstreamIDFromServiceName(kafka)
kafkaChain = discoverychain.TestCompileConfigEntries(t, "kafka", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
kafkaChain = discoverychain.TestCompileConfigEntries(t, "kafka", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
db = structs.NewServiceName("db", nil)
)

View File

@ -9,9 +9,11 @@ import (
"github.com/mitchellh/go-testing-interface"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/private/pbcommon"
"github.com/hashicorp/consul/proto/private/pbpeering"
)
@ -97,7 +99,66 @@ func setupTestVariationConfigEntriesAndSnapshot(
Nodes: TestGatewayNodesDC2(t),
},
})
case "order-by-locality-failover":
cluster1UID := UpstreamID{
Name: "db",
Peer: "cluster-01",
EnterpriseMeta: acl.NewEnterpriseMetaWithPartition(dbUID.PartitionOrDefault(), ""),
}
cluster2UID := UpstreamID{
Name: "db",
Peer: "cluster-02",
EnterpriseMeta: acl.NewEnterpriseMetaWithPartition(dbUID.PartitionOrDefault(), ""),
}
chainID := makeChainID(structs.DiscoveryTargetOpts{Service: "db-v2"})
events = append(events,
UpdateEvent{
CorrelationID: "upstream-target:" + chainID + ":" + dbUID.String(),
Result: &structs.IndexedCheckServiceNodes{
Nodes: TestUpstreamNodesAlternate(t),
},
},
UpdateEvent{
CorrelationID: "peer-trust-bundle:cluster-01",
Result: &pbpeering.TrustBundleReadResponse{
Bundle: &pbpeering.PeeringTrustBundle{
PeerName: "peer1",
TrustDomain: "peer1.domain",
ExportedPartition: "peer1ap",
RootPEMs: []string{"peer1-root-1"},
},
},
},
UpdateEvent{
CorrelationID: "peer-trust-bundle:cluster-02",
Result: &pbpeering.TrustBundleReadResponse{
Bundle: &pbpeering.PeeringTrustBundle{
PeerName: "peer2",
TrustDomain: "peer2.domain",
ExportedPartition: "peer2ap",
RootPEMs: []string{"peer2-root-2"},
},
},
},
UpdateEvent{
CorrelationID: "upstream-peer:" + cluster1UID.String(),
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{structs.TestCheckNodeServiceWithNameInPeer(t, "db", "dc1", "cluster-01", "10.40.1.1", false, cluster1UID.EnterpriseMeta)},
},
},
UpdateEvent{
CorrelationID: "upstream-peer:" + cluster2UID.String(),
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{structs.TestCheckNodeServiceWithNameInPeer(t, "db", "dc2", "cluster-02", "10.40.1.2", false, cluster2UID.EnterpriseMeta)},
},
},
)
case "failover-to-cluster-peer":
uid := UpstreamID{
Name: "db",
Peer: "cluster-01",
EnterpriseMeta: acl.NewEnterpriseMetaWithPartition(dbUID.PartitionOrDefault(), ""),
}
events = append(events, UpdateEvent{
CorrelationID: "peer-trust-bundle:cluster-01",
Result: &pbpeering.TrustBundleReadResponse{
@ -109,10 +170,6 @@ func setupTestVariationConfigEntriesAndSnapshot(
},
},
})
uid := UpstreamID{
Name: "db",
Peer: "cluster-01",
}
if enterprise {
uid.EnterpriseMeta = acl.NewEnterpriseMetaWithPartition(dbUID.PartitionOrDefault(), "ns9")
}
@ -270,6 +327,7 @@ func setupTestVariationDiscoveryChain(
) *structs.CompiledDiscoveryChain {
// Compile a chain.
var (
peers []*pbpeering.Peering
entries []structs.ConfigEntry
compileSetup func(req *discoverychain.CompileRequest)
)
@ -374,6 +432,49 @@ func setupTestVariationDiscoveryChain(
},
},
)
case "order-by-locality-failover":
peers = append(peers,
&pbpeering.Peering{
Name: "cluster-01",
Remote: &pbpeering.RemoteInfo{
Locality: &pbcommon.Locality{Region: "us-west-1"},
},
},
&pbpeering.Peering{
Name: "cluster-02",
Remote: &pbpeering.RemoteInfo{
Locality: &pbcommon.Locality{Region: "us-west-2"},
},
})
cluster1Target := structs.ServiceResolverFailoverTarget{
Peer: "cluster-01",
}
cluster2Target := structs.ServiceResolverFailoverTarget{
Peer: "cluster-02",
}
entries = append(entries,
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "db",
EnterpriseMeta: entMeta,
ConnectTimeout: 33 * time.Second,
RequestTimeout: 33 * time.Second,
Failover: map[string]structs.ServiceResolverFailover{
"*": {
Policy: &structs.ServiceResolverFailoverPolicy{
Mode: "order-by-locality",
Regions: []string{"us-west-2", "us-west-1"},
},
Targets: []structs.ServiceResolverFailoverTarget{
cluster1Target,
cluster2Target,
{Service: "db-v2"},
},
},
},
},
)
case "redirect-to-cluster-peer":
redirect := &structs.ServiceResolverRedirect{
Peer: "cluster-01",
@ -930,7 +1031,12 @@ func setupTestVariationDiscoveryChain(
entries = append(entries, additionalEntries...)
}
return discoverychain.TestCompileConfigEntries(t, "db", entMeta.NamespaceOrDefault(), entMeta.PartitionOrDefault(), "dc1", connect.TestClusterID+".consul", compileSetup, entries...)
set := configentry.NewDiscoveryChainSet()
set.AddEntries(entries...)
set.AddPeers(peers...)
return discoverychain.TestCompileConfigEntries(t, "db", entMeta.NamespaceOrDefault(), entMeta.PartitionOrDefault(), "dc1", connect.TestClusterID+".consul", compileSetup, set)
}
func httpMatch(http *structs.ServiceRouteHTTPMatch) *structs.ServiceRouteMatch {

View File

@ -1114,7 +1114,7 @@ func (e *ServiceResolverConfigEntry) Validate() error {
}
if !f.Policy.isValid() {
return fmt.Errorf("Bad Failover[%q]: Policy must be one of '', 'default', or 'order-by-locality'", subset)
return fmt.Errorf("Bad Failover[%q]: Policy must be one of '', 'sequential', or 'order-by-locality'", subset)
}
if f.ServiceSubset != "" {
@ -1437,8 +1437,9 @@ type ServiceResolverFailover struct {
type ServiceResolverFailoverPolicy struct {
// Mode specifies the type of failover that will be performed. Valid values are
// "default", "" (equivalent to "default") and "order-by-locality".
// "sequential", "" (equivalent to "sequential") and "order-by-locality".
Mode string `json:",omitempty"`
Regions []string `json:",omitempty"`
}
func (f *ServiceResolverFailover) ToDiscoveryTargetOpts() DiscoveryTargetOpts {
@ -1465,7 +1466,7 @@ func (fp *ServiceResolverFailoverPolicy) isValid() bool {
switch fp.Mode {
case "":
case "default":
case "sequential":
case "order-by-locality":
default:
return false

View File

@ -183,6 +183,7 @@ type DiscoverySplit struct {
type DiscoveryFailover struct {
Targets []string `json:",omitempty"`
Policy *ServiceResolverFailoverPolicy `json:",omitempty"`
Regions []string `json:",omitempty"`
}
// DiscoveryTarget represents all of the inputs necessary to use a resolver

View File

@ -184,6 +184,14 @@ func (o *DiscoveryFailover) DeepCopy() *DiscoveryFailover {
if o.Policy != nil {
cp.Policy = new(ServiceResolverFailoverPolicy)
*cp.Policy = *o.Policy
if o.Policy.Regions != nil {
cp.Policy.Regions = make([]string, len(o.Policy.Regions))
copy(cp.Policy.Regions, o.Policy.Regions)
}
}
if o.Regions != nil {
cp.Regions = make([]string, len(o.Regions))
copy(cp.Regions, o.Regions)
}
return &cp
}
@ -917,6 +925,10 @@ func (o *ServiceResolverFailover) DeepCopy() *ServiceResolverFailover {
if o.Policy != nil {
cp.Policy = new(ServiceResolverFailoverPolicy)
*cp.Policy = *o.Policy
if o.Policy.Regions != nil {
cp.Policy.Regions = make([]string, len(o.Policy.Regions))
copy(cp.Policy.Regions, o.Policy.Regions)
}
}
return &cp
}

View File

@ -3058,3 +3058,10 @@ func (l *Locality) ToAPI() *api.Locality {
Zone: l.Zone,
}
}
func (l *Locality) GetRegion() string {
if l == nil {
return ""
}
return l.Region
}

View File

@ -1268,9 +1268,14 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
return nil, err
}
targetGroups, err := mappedTargets.groupedTargets()
if err != nil {
return nil, err
}
var failoverClusterNames []string
if mappedTargets.failover {
for _, targetGroup := range mappedTargets.groupedTargets() {
for _, targetGroup := range targetGroups {
failoverClusterNames = append(failoverClusterNames, targetGroup.ClusterName)
}
@ -1298,7 +1303,7 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
}
// Construct the target clusters.
for _, groupedTarget := range mappedTargets.groupedTargets() {
for _, groupedTarget := range targetGroups {
s.Logger.Debug("generating cluster for", "cluster", groupedTarget.ClusterName)
c := &envoy_cluster_v3.Cluster{
Name: groupedTarget.ClusterName,
@ -1881,124 +1886,3 @@ func (s *ResourceGenerator) getTargetClusterName(upstreamsSnapshot *proxycfg.Con
}
return clusterName
}
type discoChainTargets struct {
baseClusterName string
targets []targetInfo
failover bool
}
type targetInfo struct {
TargetID string
TLSContext *envoy_tls_v3.UpstreamTlsContext
}
type discoChainTargetGroup struct {
Targets []targetInfo
ClusterName string
}
func (ft discoChainTargets) groupedTargets() []discoChainTargetGroup {
var targetGroups []discoChainTargetGroup
if !ft.failover {
targetGroups = append(targetGroups, discoChainTargetGroup{
ClusterName: ft.baseClusterName,
Targets: ft.targets,
})
return targetGroups
}
for i, t := range ft.targets {
targetGroups = append(targetGroups, discoChainTargetGroup{
ClusterName: fmt.Sprintf("%s%d~%s", failoverClusterNamePrefix, i, ft.baseClusterName),
Targets: []targetInfo{t},
})
}
return targetGroups
}
func (s *ResourceGenerator) mapDiscoChainTargets(cfgSnap *proxycfg.ConfigSnapshot, chain *structs.CompiledDiscoveryChain, node *structs.DiscoveryGraphNode, upstreamConfig structs.UpstreamConfig, forMeshGateway bool) (discoChainTargets, error) {
failoverTargets := discoChainTargets{}
if node.Resolver == nil {
return discoChainTargets{}, fmt.Errorf("impossible to process a non-resolver node")
}
primaryTargetID := node.Resolver.Target
upstreamsSnapshot, err := cfgSnap.ToConfigSnapshotUpstreams()
if err != nil && !forMeshGateway {
return discoChainTargets{}, err
}
failoverTargets.baseClusterName = s.getTargetClusterName(upstreamsSnapshot, chain, primaryTargetID, forMeshGateway, false)
tids := []string{primaryTargetID}
failover := node.Resolver.Failover
if failover != nil && !forMeshGateway {
tids = append(tids, failover.Targets...)
failoverTargets.failover = true
}
for _, tid := range tids {
target := chain.Targets[tid]
var sni, rootPEMs string
var spiffeIDs []string
targetUID := proxycfg.NewUpstreamIDFromTargetID(tid)
ti := targetInfo{TargetID: tid}
configureTLS := true
if forMeshGateway {
// We only initiate TLS if we're doing an L7 proxy.
configureTLS = structs.IsProtocolHTTPLike(upstreamConfig.Protocol)
}
if !configureTLS {
failoverTargets.targets = append(failoverTargets.targets, ti)
continue
}
if targetUID.Peer != "" {
tbs, _ := upstreamsSnapshot.UpstreamPeerTrustBundles.Get(targetUID.Peer)
rootPEMs = tbs.ConcatenatedRootPEMs()
peerMeta, found := upstreamsSnapshot.UpstreamPeerMeta(targetUID)
if !found {
s.Logger.Warn("failed to fetch upstream peering metadata", "target", targetUID)
continue
}
sni = peerMeta.PrimarySNI()
spiffeIDs = peerMeta.SpiffeID
} else {
sni = target.SNI
rootPEMs = cfgSnap.RootPEMs()
spiffeIDs = []string{connect.SpiffeIDService{
Host: cfgSnap.Roots.TrustDomain,
Namespace: target.Namespace,
Partition: target.Partition,
Datacenter: target.Datacenter,
Service: target.Service,
}.URI().String()}
}
commonTLSContext := makeCommonTLSContext(
cfgSnap.Leaf(),
rootPEMs,
makeTLSParametersFromProxyTLSConfig(cfgSnap.MeshConfigTLSOutgoing()),
)
err := injectSANMatcher(commonTLSContext, spiffeIDs...)
if err != nil {
return failoverTargets, fmt.Errorf("failed to inject SAN matcher rules for cluster %q: %v", sni, err)
}
tlsContext := &envoy_tls_v3.UpstreamTlsContext{
CommonTlsContext: commonTLSContext,
Sni: sni,
}
ti.TLSContext = tlsContext
failoverTargets.targets = append(failoverTargets.targets, ti)
}
return failoverTargets, nil
}

View File

@ -678,7 +678,12 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(
return nil, err
}
for _, groupedTarget := range mappedTargets.groupedTargets() {
targetGroups, err := mappedTargets.groupedTargets()
if err != nil {
return nil, err
}
for _, groupedTarget := range targetGroups {
clusterName := groupedTarget.ClusterName
if escapeHatchCluster != nil {
clusterName = escapeHatchCluster.Name

View File

@ -1,41 +0,0 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package xds
import (
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
"github.com/hashicorp/consul/agent/structs"
)
func firstHealthyTarget(
targets map[string]*structs.DiscoveryTarget,
targetHealth map[string]structs.CheckServiceNodes,
primaryTarget string,
secondaryTargets []string,
) string {
series := make([]string, 0, len(secondaryTargets)+1)
series = append(series, primaryTarget)
series = append(series, secondaryTargets...)
for _, targetID := range series {
target, ok := targets[targetID]
if !ok {
continue
}
endpoints, ok := targetHealth[targetID]
if !ok {
continue
}
for _, ep := range endpoints {
healthStatus, _ := calculateEndpointHealthAndWeight(ep, target.Subset.OnlyPassing)
if healthStatus == envoy_core_v3.HealthStatus_HEALTHY {
return targetID
}
}
}
return primaryTarget // if everything is broken just use the primary for now
}

View File

@ -1,145 +0,0 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package xds
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
)
func TestFirstHealthyTarget(t *testing.T) {
passing := proxycfg.TestUpstreamNodesInStatus(t, "passing")
warning := proxycfg.TestUpstreamNodesInStatus(t, "warning")
critical := proxycfg.TestUpstreamNodesInStatus(t, "critical")
warnOnlyPassingTarget := structs.NewDiscoveryTarget(structs.DiscoveryTargetOpts{
Service: "all-warn",
Namespace: "default",
Partition: "default",
Datacenter: "dc1",
})
warnOnlyPassingTarget.Subset.OnlyPassing = true
failOnlyPassingTarget := structs.NewDiscoveryTarget(structs.DiscoveryTargetOpts{
Service: "all-fail",
Namespace: "default",
Partition: "default",
Datacenter: "dc1",
})
failOnlyPassingTarget.Subset.OnlyPassing = true
targets := map[string]*structs.DiscoveryTarget{
"all-ok.default.dc1": structs.NewDiscoveryTarget(structs.DiscoveryTargetOpts{
Service: "all-ok",
Namespace: "default",
Partition: "default",
Datacenter: "dc1",
}),
"all-warn.default.dc1": structs.NewDiscoveryTarget(structs.DiscoveryTargetOpts{
Service: "all-warn",
Namespace: "default",
Partition: "default",
Datacenter: "dc1",
}),
"all-fail.default.default.dc1": structs.NewDiscoveryTarget(structs.DiscoveryTargetOpts{
Service: "all-fail",
Namespace: "default",
Partition: "default",
Datacenter: "dc1",
}),
"all-warn-onlypassing.default.dc1": warnOnlyPassingTarget,
"all-fail-onlypassing.default.dc1": failOnlyPassingTarget,
}
targetHealth := map[string]structs.CheckServiceNodes{
"all-ok.default.dc1": passing,
"all-warn.default.dc1": warning,
"all-fail.default.default.dc1": critical,
"all-warn-onlypassing.default.dc1": warning,
"all-fail-onlypassing.default.dc1": critical,
}
cases := []struct {
primary string
secondary []string
expect string
}{
{
primary: "all-ok.default.dc1",
expect: "all-ok.default.dc1",
},
{
primary: "all-warn.default.dc1",
expect: "all-warn.default.dc1",
},
{
primary: "all-fail.default.default.dc1",
expect: "all-fail.default.default.dc1",
},
{
primary: "all-warn-onlypassing.default.dc1",
expect: "all-warn-onlypassing.default.dc1",
},
{
primary: "all-fail-onlypassing.default.dc1",
expect: "all-fail-onlypassing.default.dc1",
},
{
primary: "all-ok.default.dc1",
secondary: []string{
"all-warn.default.dc1",
},
expect: "all-ok.default.dc1",
},
{
primary: "all-warn.default.dc1",
secondary: []string{
"all-ok.default.dc1",
},
expect: "all-warn.default.dc1",
},
{
primary: "all-warn-onlypassing.default.dc1",
secondary: []string{
"all-ok.default.dc1",
},
expect: "all-ok.default.dc1",
},
{
primary: "all-fail.default.default.dc1",
secondary: []string{
"all-ok.default.dc1",
},
expect: "all-ok.default.dc1",
},
{
primary: "all-fail-onlypassing.default.dc1",
secondary: []string{
"all-ok.default.dc1",
},
expect: "all-ok.default.dc1",
},
{
primary: "all-fail.default.default.dc1",
secondary: []string{
"all-warn-onlypassing.default.dc1",
"all-warn.default.dc1",
"all-ok.default.dc1",
},
expect: "all-warn.default.dc1",
},
}
for _, tc := range cases {
tc := tc
name := fmt.Sprintf("%s and %v", tc.primary, tc.secondary)
t.Run(name, func(t *testing.T) {
targetID := firstHealthyTarget(targets, targetHealth, tc.primary, tc.secondary)
require.Equal(t, tc.expect, targetID)
})
}
}

View File

@ -0,0 +1,154 @@
package xds
import (
"fmt"
envoy_tls_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
)
type discoChainTargets struct {
baseClusterName string
targets []targetInfo
failover bool
failoverPolicy structs.ServiceResolverFailoverPolicy
}
type targetInfo struct {
TargetID string
TLSContext *envoy_tls_v3.UpstreamTlsContext
// Region is the region from the failover target's Locality. nil means the
// target is in the local Consul cluster.
Region *string
}
type discoChainTargetGroup struct {
Targets []targetInfo
ClusterName string
}
func (ft discoChainTargets) groupedTargets() ([]discoChainTargetGroup, error) {
var targetGroups []discoChainTargetGroup
if !ft.failover {
targetGroups = append(targetGroups, discoChainTargetGroup{
ClusterName: ft.baseClusterName,
Targets: ft.targets,
})
return targetGroups, nil
}
switch ft.failoverPolicy.Mode {
case "sequential", "":
return ft.sequential()
case "order-by-locality":
return ft.orderByLocality()
default:
return targetGroups, fmt.Errorf("unexpected failover policy")
}
}
func (s *ResourceGenerator) mapDiscoChainTargets(cfgSnap *proxycfg.ConfigSnapshot, chain *structs.CompiledDiscoveryChain, node *structs.DiscoveryGraphNode, upstreamConfig structs.UpstreamConfig, forMeshGateway bool) (discoChainTargets, error) {
failoverTargets := discoChainTargets{}
if node.Resolver == nil {
return discoChainTargets{}, fmt.Errorf("impossible to process a non-resolver node")
}
primaryTargetID := node.Resolver.Target
upstreamsSnapshot, err := cfgSnap.ToConfigSnapshotUpstreams()
if err != nil && !forMeshGateway {
return discoChainTargets{}, err
}
failoverTargets.baseClusterName = s.getTargetClusterName(upstreamsSnapshot, chain, primaryTargetID, forMeshGateway, false)
tids := []string{primaryTargetID}
failover := node.Resolver.Failover
if failover != nil && !forMeshGateway {
tids = append(tids, failover.Targets...)
failoverTargets.failover = true
if failover.Policy == nil {
failoverTargets.failoverPolicy = structs.ServiceResolverFailoverPolicy{}
} else {
failoverTargets.failoverPolicy = *failover.Policy
}
}
for _, tid := range tids {
target := chain.Targets[tid]
var sni, rootPEMs string
var spiffeIDs []string
targetUID := proxycfg.NewUpstreamIDFromTargetID(tid)
ti := targetInfo{TargetID: tid}
configureTLS := true
if forMeshGateway {
// We only initiate TLS if we're doing an L7 proxy.
configureTLS = structs.IsProtocolHTTPLike(upstreamConfig.Protocol)
}
if !configureTLS {
failoverTargets.targets = append(failoverTargets.targets, ti)
continue
}
if targetUID.Peer != "" {
tbs, _ := upstreamsSnapshot.UpstreamPeerTrustBundles.Get(targetUID.Peer)
rootPEMs = tbs.ConcatenatedRootPEMs()
peerMeta, found := upstreamsSnapshot.UpstreamPeerMeta(targetUID)
if !found {
s.Logger.Warn("failed to fetch upstream peering metadata", "target", targetUID)
continue
}
sni = peerMeta.PrimarySNI()
spiffeIDs = peerMeta.SpiffeID
region := target.Locality.GetRegion()
ti.Region = &region
} else {
sni = target.SNI
rootPEMs = cfgSnap.RootPEMs()
spiffeIDs = []string{connect.SpiffeIDService{
Host: cfgSnap.Roots.TrustDomain,
Namespace: target.Namespace,
Partition: target.Partition,
Datacenter: target.Datacenter,
Service: target.Service,
}.URI().String()}
}
commonTLSContext := makeCommonTLSContext(
cfgSnap.Leaf(),
rootPEMs,
makeTLSParametersFromProxyTLSConfig(cfgSnap.MeshConfigTLSOutgoing()),
)
err := injectSANMatcher(commonTLSContext, spiffeIDs...)
if err != nil {
return failoverTargets, fmt.Errorf("failed to inject SAN matcher rules for cluster %q: %v", sni, err)
}
tlsContext := &envoy_tls_v3.UpstreamTlsContext{
CommonTlsContext: commonTLSContext,
Sni: sni,
}
ti.TLSContext = tlsContext
failoverTargets.targets = append(failoverTargets.targets, ti)
}
return failoverTargets, nil
}
func (ft discoChainTargets) sequential() ([]discoChainTargetGroup, error) {
var targetGroups []discoChainTargetGroup
for i, t := range ft.targets {
targetGroups = append(targetGroups, discoChainTargetGroup{
ClusterName: fmt.Sprintf("%s%d~%s", failoverClusterNamePrefix, i, ft.baseClusterName),
Targets: []targetInfo{t},
})
}
return targetGroups, nil
}

View File

@ -0,0 +1,12 @@
//go:build !consulent
// +build !consulent
package xds
import (
"fmt"
)
func (ft discoChainTargets) orderByLocality() ([]discoChainTargetGroup, error) {
return nil, fmt.Errorf("order-by-locality is a Consul Enterprise feature")
}

View File

@ -332,7 +332,7 @@ func getAPIGatewayGoldenTestCases(t *testing.T) []goldenTestCase {
service := structs.NewServiceName("service", nil)
serviceUID := proxycfg.NewUpstreamIDFromServiceName(service)
serviceChain := discoverychain.TestCompileConfigEntries(t, "service", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
serviceChain := discoverychain.TestCompileConfigEntries(t, "service", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
return []goldenTestCase{
{

View File

@ -1447,12 +1447,14 @@ func ServiceResolverFailoverPolicyToStructs(s *ServiceResolverFailoverPolicy, t
return
}
t.Mode = s.Mode
t.Regions = s.Regions
}
func ServiceResolverFailoverPolicyFromStructs(t *structs.ServiceResolverFailoverPolicy, s *ServiceResolverFailoverPolicy) {
if s == nil {
return
}
s.Mode = t.Mode
s.Regions = t.Regions
}
func ServiceResolverFailoverTargetToStructs(s *ServiceResolverFailoverTarget, t *structs.ServiceResolverFailoverTarget) {
if s == nil {

File diff suppressed because it is too large Load Diff

View File

@ -175,6 +175,7 @@ message ServiceResolverFailover {
// name=Structs
message ServiceResolverFailoverPolicy {
string Mode = 1;
repeated string Regions = 2;
}
// mog annotation: