mirror of
https://github.com/status-im/consul.git
synced 2025-01-26 05:29:55 +00:00
8e22d80e35
Failover is pushed entirely down to the data plane by creating envoy clusters and putting each successive destination in a different load assignment priority band. For example this shows that normally requests go to 1.2.3.4:8080 but when that fails they go to 6.7.8.9:8080: - name: foo load_assignment: cluster_name: foo policy: overprovisioning_factor: 100000 endpoints: - priority: 0 lb_endpoints: - endpoint: address: socket_address: address: 1.2.3.4 port_value: 8080 - priority: 1 lb_endpoints: - endpoint: address: socket_address: address: 6.7.8.9 port_value: 8080 Mesh gateways route requests based solely on the SNI header tacked onto the TLS layer. Envoy currently only lets you configure the outbound SNI header at the cluster layer. If you try to failover through a mesh gateway you ideally would configure the SNI value per endpoint, but that's not possible in envoy today. This PR introduces a simpler way around the problem for now: 1. We identify any target of failover that will use mesh gateway mode local or remote and then further isolate any resolver node in the compiled discovery chain that has a failover destination set to one of those targets. 2. For each of these resolvers we will perform a small measurement of comparative healths of the endpoints that come back from the health API for the set of primary target and serial failover targets. We walk the list of targets in order and if any endpoint is healthy we return that target, otherwise we move on to the next target. 3. The CDS and EDS endpoints both perform the measurements in (2) for the affected resolver nodes. 4. For CDS this measurement selects which TLS SNI field to use for the cluster (note the cluster is always going to be named for the primary target) 5. For EDS this measurement selects which set of endpoints will populate the cluster. Priority tiered failover is ignored. One of the big downsides to this approach to failover is that the failover detection and correction is going to be controlled by consul rather than deferring that entirely to the data plane as with the prior version. This also means that we are bound to only failover using official health signals and cannot make use of data plane signals like outlier detection to affect failover. In this specific scenario the lack of data plane signals is ok because the effectiveness is already muted by the fact that the ultimate destination endpoints will have their data plane signals scrambled when they pass through the mesh gateway wrapper anyway so we're not losing much. Another related fix is that we now use the endpoint health from the underlying service, not the health of the gateway (regardless of failover mode).
516 lines
15 KiB
Go
516 lines
15 KiB
Go
package proxycfg
|
|
|
|
import (
|
|
"log"
|
|
"os"
|
|
"path"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/mitchellh/copystructure"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/hashicorp/consul/agent/cache"
|
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
|
"github.com/hashicorp/consul/agent/consul/discoverychain"
|
|
"github.com/hashicorp/consul/agent/local"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/agent/token"
|
|
)
|
|
|
|
// assertLastReqArgs verifies that each request type had the correct source
|
|
// parameters (e.g. Datacenter name) and token.
|
|
func assertLastReqArgs(t *testing.T, types *TestCacheTypes, token string, source *structs.QuerySource) {
|
|
t.Helper()
|
|
// Roots needs correct DC and token
|
|
rootReq := types.roots.lastReq.Load()
|
|
require.IsType(t, rootReq, &structs.DCSpecificRequest{})
|
|
require.Equal(t, token, rootReq.(*structs.DCSpecificRequest).Token)
|
|
require.Equal(t, source.Datacenter, rootReq.(*structs.DCSpecificRequest).Datacenter)
|
|
|
|
// Leaf needs correct DC and token
|
|
leafReq := types.leaf.lastReq.Load()
|
|
require.IsType(t, leafReq, &cachetype.ConnectCALeafRequest{})
|
|
require.Equal(t, token, leafReq.(*cachetype.ConnectCALeafRequest).Token)
|
|
require.Equal(t, source.Datacenter, leafReq.(*cachetype.ConnectCALeafRequest).Datacenter)
|
|
|
|
// Intentions needs correct DC and token
|
|
intReq := types.intentions.lastReq.Load()
|
|
require.IsType(t, intReq, &structs.IntentionQueryRequest{})
|
|
require.Equal(t, token, intReq.(*structs.IntentionQueryRequest).Token)
|
|
require.Equal(t, source.Datacenter, intReq.(*structs.IntentionQueryRequest).Datacenter)
|
|
}
|
|
|
|
func TestManager_BasicLifecycle(t *testing.T) {
|
|
// Create a bunch of common data for the various test cases.
|
|
roots, leaf := TestCerts(t)
|
|
|
|
dbDefaultChain := func() *structs.CompiledDiscoveryChain {
|
|
return discoverychain.TestCompileConfigEntries(t, "db", "default", "dc1", "dc1",
|
|
func(req *discoverychain.CompileRequest) {
|
|
// This is because structs.TestUpstreams uses an opaque config
|
|
// to override connect timeouts.
|
|
req.OverrideConnectTimeout = 1 * time.Second
|
|
},
|
|
&structs.ServiceResolverConfigEntry{
|
|
Kind: structs.ServiceResolver,
|
|
Name: "db",
|
|
},
|
|
)
|
|
}
|
|
dbSplitChain := func() *structs.CompiledDiscoveryChain {
|
|
return discoverychain.TestCompileConfigEntries(t, "db", "default", "dc1", "dc1",
|
|
func(req *discoverychain.CompileRequest) {
|
|
// This is because structs.TestUpstreams uses an opaque config
|
|
// to override connect timeouts.
|
|
req.OverrideConnectTimeout = 1 * time.Second
|
|
},
|
|
&structs.ProxyConfigEntry{
|
|
Kind: structs.ProxyDefaults,
|
|
Name: structs.ProxyConfigGlobal,
|
|
Config: map[string]interface{}{
|
|
"protocol": "http",
|
|
},
|
|
},
|
|
&structs.ServiceResolverConfigEntry{
|
|
Kind: structs.ServiceResolver,
|
|
Name: "db",
|
|
Subsets: map[string]structs.ServiceResolverSubset{
|
|
"v1": {
|
|
Filter: "Service.Meta.version == v1",
|
|
},
|
|
"v2": {
|
|
Filter: "Service.Meta.version == v2",
|
|
},
|
|
},
|
|
},
|
|
&structs.ServiceSplitterConfigEntry{
|
|
Kind: structs.ServiceSplitter,
|
|
Name: "db",
|
|
Splits: []structs.ServiceSplit{
|
|
{Weight: 60, ServiceSubset: "v1"},
|
|
{Weight: 40, ServiceSubset: "v2"},
|
|
},
|
|
},
|
|
)
|
|
}
|
|
webProxy := &structs.NodeService{
|
|
Kind: structs.ServiceKindConnectProxy,
|
|
ID: "web-sidecar-proxy",
|
|
Service: "web-sidecar-proxy",
|
|
Port: 9999,
|
|
Proxy: structs.ConnectProxyConfig{
|
|
DestinationServiceID: "web",
|
|
DestinationServiceName: "web",
|
|
LocalServiceAddress: "127.0.0.1",
|
|
LocalServicePort: 8080,
|
|
Config: map[string]interface{}{
|
|
"foo": "bar",
|
|
},
|
|
Upstreams: structs.TestUpstreams(t),
|
|
},
|
|
}
|
|
|
|
rootsCacheKey := testGenCacheKey(&structs.DCSpecificRequest{
|
|
Datacenter: "dc1",
|
|
QueryOptions: structs.QueryOptions{Token: "my-token"},
|
|
})
|
|
leafCacheKey := testGenCacheKey(&cachetype.ConnectCALeafRequest{
|
|
Datacenter: "dc1",
|
|
Token: "my-token",
|
|
Service: "web",
|
|
})
|
|
intentionCacheKey := testGenCacheKey(&structs.IntentionQueryRequest{
|
|
Datacenter: "dc1",
|
|
QueryOptions: structs.QueryOptions{Token: "my-token"},
|
|
Match: &structs.IntentionQueryMatch{
|
|
Type: structs.IntentionMatchDestination,
|
|
Entries: []structs.IntentionMatchEntry{
|
|
{
|
|
Namespace: structs.IntentionDefaultNamespace,
|
|
Name: "web",
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
dbChainCacheKey := testGenCacheKey(&structs.DiscoveryChainRequest{
|
|
Name: "db",
|
|
EvaluateInDatacenter: "dc1",
|
|
EvaluateInNamespace: "default",
|
|
// This is because structs.TestUpstreams uses an opaque config
|
|
// to override connect timeouts.
|
|
OverrideConnectTimeout: 1 * time.Second,
|
|
Datacenter: "dc1",
|
|
QueryOptions: structs.QueryOptions{Token: "my-token"},
|
|
})
|
|
|
|
dbHealthCacheKey := testGenCacheKey(&structs.ServiceSpecificRequest{
|
|
Datacenter: "dc1",
|
|
QueryOptions: structs.QueryOptions{Token: "my-token", Filter: ""},
|
|
ServiceName: "db",
|
|
Connect: true,
|
|
})
|
|
db_v1_HealthCacheKey := testGenCacheKey(&structs.ServiceSpecificRequest{
|
|
Datacenter: "dc1",
|
|
QueryOptions: structs.QueryOptions{Token: "my-token",
|
|
Filter: "Service.Meta.version == v1",
|
|
},
|
|
ServiceName: "db",
|
|
Connect: true,
|
|
})
|
|
db_v2_HealthCacheKey := testGenCacheKey(&structs.ServiceSpecificRequest{
|
|
Datacenter: "dc1",
|
|
QueryOptions: structs.QueryOptions{Token: "my-token",
|
|
Filter: "Service.Meta.version == v2",
|
|
},
|
|
ServiceName: "db",
|
|
Connect: true,
|
|
})
|
|
|
|
// Create test cases using some of the common data above.
|
|
tests := []*testcase_BasicLifecycle{
|
|
{
|
|
name: "simple-default-resolver",
|
|
setup: func(t *testing.T, types *TestCacheTypes) {
|
|
// Note that we deliberately leave the 'geo-cache' prepared query to time out
|
|
types.health.Set(dbHealthCacheKey, &structs.IndexedCheckServiceNodes{
|
|
Nodes: TestUpstreamNodes(t),
|
|
})
|
|
types.compiledChain.Set(dbChainCacheKey, &structs.DiscoveryChainResponse{
|
|
Chain: dbDefaultChain(),
|
|
})
|
|
},
|
|
expectSnap: &ConfigSnapshot{
|
|
Kind: structs.ServiceKindConnectProxy,
|
|
Service: webProxy.Service,
|
|
ProxyID: webProxy.ID,
|
|
Address: webProxy.Address,
|
|
Port: webProxy.Port,
|
|
Proxy: webProxy.Proxy,
|
|
TaggedAddresses: make(map[string]structs.ServiceAddress),
|
|
Roots: roots,
|
|
ConnectProxy: configSnapshotConnectProxy{
|
|
Leaf: leaf,
|
|
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
|
|
"db": dbDefaultChain(),
|
|
},
|
|
WatchedUpstreams: nil, // Clone() clears this out
|
|
WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{
|
|
"db": {
|
|
"db.default.dc1": TestUpstreamNodes(t),
|
|
},
|
|
},
|
|
WatchedGateways: nil, // Clone() clears this out
|
|
WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{
|
|
"db": {},
|
|
},
|
|
UpstreamEndpoints: map[string]structs.CheckServiceNodes{},
|
|
},
|
|
Datacenter: "dc1",
|
|
},
|
|
},
|
|
{
|
|
name: "chain-resolver-with-version-split",
|
|
setup: func(t *testing.T, types *TestCacheTypes) {
|
|
// Note that we deliberately leave the 'geo-cache' prepared query to time out
|
|
types.health.Set(db_v1_HealthCacheKey, &structs.IndexedCheckServiceNodes{
|
|
Nodes: TestUpstreamNodes(t),
|
|
})
|
|
types.health.Set(db_v2_HealthCacheKey, &structs.IndexedCheckServiceNodes{
|
|
Nodes: TestUpstreamNodesAlternate(t),
|
|
})
|
|
types.compiledChain.Set(dbChainCacheKey, &structs.DiscoveryChainResponse{
|
|
Chain: dbSplitChain(),
|
|
})
|
|
},
|
|
expectSnap: &ConfigSnapshot{
|
|
Kind: structs.ServiceKindConnectProxy,
|
|
Service: webProxy.Service,
|
|
ProxyID: webProxy.ID,
|
|
Address: webProxy.Address,
|
|
Port: webProxy.Port,
|
|
Proxy: webProxy.Proxy,
|
|
TaggedAddresses: make(map[string]structs.ServiceAddress),
|
|
Roots: roots,
|
|
ConnectProxy: configSnapshotConnectProxy{
|
|
Leaf: leaf,
|
|
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
|
|
"db": dbSplitChain(),
|
|
},
|
|
WatchedUpstreams: nil, // Clone() clears this out
|
|
WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{
|
|
"db": {
|
|
"v1.db.default.dc1": TestUpstreamNodes(t),
|
|
"v2.db.default.dc1": TestUpstreamNodesAlternate(t),
|
|
},
|
|
},
|
|
WatchedGateways: nil, // Clone() clears this out
|
|
WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{
|
|
"db": {},
|
|
},
|
|
UpstreamEndpoints: map[string]structs.CheckServiceNodes{},
|
|
},
|
|
Datacenter: "dc1",
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
require.NotNil(t, tt.setup)
|
|
require.NotNil(t, tt.expectSnap)
|
|
|
|
// Use a mocked cache to make life simpler
|
|
types := NewTestCacheTypes(t)
|
|
|
|
// Setup initial values
|
|
types.roots.Set(rootsCacheKey, roots)
|
|
types.leaf.Set(leafCacheKey, leaf)
|
|
types.intentions.Set(intentionCacheKey, TestIntentions(t))
|
|
tt.setup(t, types)
|
|
|
|
expectSnapCopy, err := copystructure.Copy(tt.expectSnap)
|
|
require.NoError(t, err)
|
|
|
|
webProxyCopy, err := copystructure.Copy(webProxy)
|
|
require.NoError(t, err)
|
|
|
|
testManager_BasicLifecycle(t, tt, types,
|
|
rootsCacheKey, leafCacheKey,
|
|
roots, leaf,
|
|
webProxyCopy.(*structs.NodeService),
|
|
expectSnapCopy.(*ConfigSnapshot),
|
|
)
|
|
})
|
|
}
|
|
}
|
|
|
|
type testcase_BasicLifecycle struct {
|
|
name string
|
|
setup func(t *testing.T, types *TestCacheTypes)
|
|
webProxy *structs.NodeService
|
|
expectSnap *ConfigSnapshot
|
|
}
|
|
|
|
func testManager_BasicLifecycle(
|
|
t *testing.T,
|
|
tt *testcase_BasicLifecycle,
|
|
types *TestCacheTypes,
|
|
rootsCacheKey, leafCacheKey string,
|
|
roots *structs.IndexedCARoots,
|
|
leaf *structs.IssuedCert,
|
|
webProxy *structs.NodeService,
|
|
expectSnap *ConfigSnapshot,
|
|
) {
|
|
c := TestCacheWithTypes(t, types)
|
|
|
|
require := require.New(t)
|
|
|
|
logger := log.New(os.Stderr, "", log.LstdFlags)
|
|
state := local.NewState(local.Config{}, logger, &token.Store{})
|
|
source := &structs.QuerySource{
|
|
Node: "node1",
|
|
Datacenter: "dc1",
|
|
}
|
|
|
|
// Stub state syncing
|
|
state.TriggerSyncChanges = func() {}
|
|
|
|
// Create manager
|
|
m, err := NewManager(ManagerConfig{c, state, source, logger})
|
|
require.NoError(err)
|
|
|
|
// And run it
|
|
go func() {
|
|
err := m.Run()
|
|
require.NoError(err)
|
|
}()
|
|
|
|
// BEFORE we register, we should be able to get a watch channel
|
|
wCh, cancel := m.Watch(webProxy.ID)
|
|
defer cancel()
|
|
|
|
// And it should block with nothing sent on it yet
|
|
assertWatchChanBlocks(t, wCh)
|
|
|
|
require.NoError(state.AddService(webProxy, "my-token"))
|
|
|
|
// We should see the initial config delivered but not until after the
|
|
// coalesce timeout
|
|
start := time.Now()
|
|
assertWatchChanRecvs(t, wCh, expectSnap)
|
|
require.True(time.Since(start) >= coalesceTimeout)
|
|
|
|
assertLastReqArgs(t, types, "my-token", source)
|
|
|
|
// Update NodeConfig
|
|
webProxy.Port = 7777
|
|
require.NoError(state.AddService(webProxy, "my-token"))
|
|
|
|
expectSnap.Port = 7777
|
|
assertWatchChanRecvs(t, wCh, expectSnap)
|
|
|
|
// Register a second watcher
|
|
wCh2, cancel2 := m.Watch(webProxy.ID)
|
|
defer cancel2()
|
|
|
|
// New watcher should immediately receive the current state
|
|
assertWatchChanRecvs(t, wCh2, expectSnap)
|
|
|
|
// Change token
|
|
require.NoError(state.AddService(webProxy, "other-token"))
|
|
assertWatchChanRecvs(t, wCh, expectSnap)
|
|
assertWatchChanRecvs(t, wCh2, expectSnap)
|
|
|
|
// This is actually sort of timing dependent - the cache background fetcher
|
|
// will still be fetching with the old token, but we rely on the fact that our
|
|
// mock type will have been blocked on those for a while.
|
|
assertLastReqArgs(t, types, "other-token", source)
|
|
// Update roots
|
|
newRoots, newLeaf := TestCerts(t)
|
|
newRoots.Roots = append(newRoots.Roots, roots.Roots...)
|
|
types.roots.Set(rootsCacheKey, newRoots)
|
|
|
|
// Expect new roots in snapshot
|
|
expectSnap.Roots = newRoots
|
|
assertWatchChanRecvs(t, wCh, expectSnap)
|
|
assertWatchChanRecvs(t, wCh2, expectSnap)
|
|
|
|
// Update leaf
|
|
types.leaf.Set(leafCacheKey, newLeaf)
|
|
|
|
// Expect new roots in snapshot
|
|
expectSnap.ConnectProxy.Leaf = newLeaf
|
|
assertWatchChanRecvs(t, wCh, expectSnap)
|
|
assertWatchChanRecvs(t, wCh2, expectSnap)
|
|
|
|
// Remove the proxy
|
|
state.RemoveService(webProxy.ID)
|
|
|
|
// Chan should NOT close
|
|
assertWatchChanBlocks(t, wCh)
|
|
assertWatchChanBlocks(t, wCh2)
|
|
|
|
// Re-add the proxy with another new port
|
|
webProxy.Port = 3333
|
|
require.NoError(state.AddService(webProxy, "other-token"))
|
|
|
|
// Same watch chan should be notified again
|
|
expectSnap.Port = 3333
|
|
assertWatchChanRecvs(t, wCh, expectSnap)
|
|
assertWatchChanRecvs(t, wCh2, expectSnap)
|
|
|
|
// Cancel watch
|
|
cancel()
|
|
|
|
// Watch chan should be closed
|
|
assertWatchChanRecvs(t, wCh, nil)
|
|
|
|
// We specifically don't remove the proxy or cancel the second watcher to
|
|
// ensure both are cleaned up by close.
|
|
require.NoError(m.Close())
|
|
|
|
// Sanity check the state is clean
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
require.Len(m.proxies, 0)
|
|
require.Len(m.watchers, 0)
|
|
}
|
|
|
|
func assertWatchChanBlocks(t *testing.T, ch <-chan *ConfigSnapshot) {
|
|
t.Helper()
|
|
|
|
select {
|
|
case <-ch:
|
|
t.Fatal("Should be nothing sent on watch chan yet")
|
|
default:
|
|
}
|
|
}
|
|
|
|
func assertWatchChanRecvs(t *testing.T, ch <-chan *ConfigSnapshot, expect *ConfigSnapshot) {
|
|
t.Helper()
|
|
|
|
select {
|
|
case got, ok := <-ch:
|
|
require.Equal(t, expect, got)
|
|
if expect == nil {
|
|
require.False(t, ok, "watch chan should be closed")
|
|
}
|
|
case <-time.After(100*time.Millisecond + coalesceTimeout):
|
|
t.Fatal("recv timeout")
|
|
}
|
|
}
|
|
|
|
func TestManager_deliverLatest(t *testing.T) {
|
|
// None of these need to do anything to test this method just be valid
|
|
logger := log.New(os.Stderr, "", log.LstdFlags)
|
|
cfg := ManagerConfig{
|
|
Cache: cache.New(nil),
|
|
State: local.NewState(local.Config{}, logger, &token.Store{}),
|
|
Source: &structs.QuerySource{
|
|
Node: "node1",
|
|
Datacenter: "dc1",
|
|
},
|
|
Logger: logger,
|
|
}
|
|
require := require.New(t)
|
|
|
|
m, err := NewManager(cfg)
|
|
require.NoError(err)
|
|
|
|
snap1 := &ConfigSnapshot{
|
|
ProxyID: "test-proxy",
|
|
Port: 1111,
|
|
}
|
|
snap2 := &ConfigSnapshot{
|
|
ProxyID: "test-proxy",
|
|
Port: 2222,
|
|
}
|
|
|
|
// Put an overall time limit on this test case so we don't have to guard every
|
|
// call to ensure the whole test doesn't deadlock.
|
|
time.AfterFunc(100*time.Millisecond, func() {
|
|
t.Fatal("test timed out")
|
|
})
|
|
|
|
// test 1 buffered chan
|
|
ch1 := make(chan *ConfigSnapshot, 1)
|
|
|
|
// Sending to an unblocked chan should work
|
|
m.deliverLatest(snap1, ch1)
|
|
|
|
// Check it was delivered
|
|
require.Equal(snap1, <-ch1)
|
|
|
|
// Now send both without reading simulating a slow client
|
|
m.deliverLatest(snap1, ch1)
|
|
m.deliverLatest(snap2, ch1)
|
|
|
|
// Check we got the _second_ one
|
|
require.Equal(snap2, <-ch1)
|
|
|
|
// Same again for 5-buffered chan
|
|
ch5 := make(chan *ConfigSnapshot, 5)
|
|
|
|
// Sending to an unblocked chan should work
|
|
m.deliverLatest(snap1, ch5)
|
|
|
|
// Check it was delivered
|
|
require.Equal(snap1, <-ch5)
|
|
|
|
// Now send enough to fill the chan simulating a slow client
|
|
for i := 0; i < 5; i++ {
|
|
m.deliverLatest(snap1, ch5)
|
|
}
|
|
m.deliverLatest(snap2, ch5)
|
|
|
|
// Check we got the _second_ one
|
|
require.Equal(snap2, <-ch5)
|
|
}
|
|
|
|
func testGenCacheKey(req cache.Request) string {
|
|
info := req.CacheInfo()
|
|
return path.Join(info.Key, info.Datacenter)
|
|
}
|