From 9ef3f20127fcb6f65c27b9ade0d7ebc6d71c431a Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" Date: Fri, 22 Jan 2021 10:03:24 -0600 Subject: [PATCH] server: when wan federating via mesh gateways only do heuristic primary DC bypass on the leader (#9366) Fixes #9341 --- .changelog/9366.txt | 4 + agent/consul/federation_state_replication.go | 2 +- agent/consul/gateway_locator.go | 38 ++- agent/consul/gateway_locator_test.go | 296 ++++++++++++------- agent/consul/leader.go | 10 + 5 files changed, 236 insertions(+), 114 deletions(-) create mode 100644 .changelog/9366.txt diff --git a/.changelog/9366.txt b/.changelog/9366.txt new file mode 100644 index 0000000000..9f2ed0e22c --- /dev/null +++ b/.changelog/9366.txt @@ -0,0 +1,4 @@ +```release-note:bug +server: When wan federating via mesh gateways only do heuristic primary DC bypass on the leader. +``` + diff --git a/agent/consul/federation_state_replication.go b/agent/consul/federation_state_replication.go index 52af2f5dca..8d039b8147 100644 --- a/agent/consul/federation_state_replication.go +++ b/agent/consul/federation_state_replication.go @@ -39,7 +39,7 @@ func (r *FederationStateReplicator) FetchRemote(lastRemoteIndex uint64) (int, in } lenRemote, remote, remoteIndex, err := r.fetchRemote(lastRemoteIndex) if r.gatewayLocator != nil { - r.gatewayLocator.SetLastFederationStateReplicationError(err) + r.gatewayLocator.SetLastFederationStateReplicationError(err, true) } return lenRemote, remote, remoteIndex, err } diff --git a/agent/consul/gateway_locator.go b/agent/consul/gateway_locator.go index 7233266462..0846eaf073 100644 --- a/agent/consul/gateway_locator.go +++ b/agent/consul/gateway_locator.go @@ -59,11 +59,12 @@ type GatewayLocator struct { // these are a collection of measurements that factor into deciding if we // should directly dial the primary's mesh gateways or if we should try to // route through our local gateways (if they are up). - lastReplLock sync.Mutex - lastReplSuccess time.Time - lastReplFailure time.Time - lastReplSuccesses uint64 - lastReplFailures uint64 + lastReplLock sync.Mutex + lastReplSuccess time.Time + lastReplFailure time.Time + lastReplSuccesses uint64 + lastReplFailures uint64 + useReplicationSignal bool // this should be set to true on the leader } // SetLastFederationStateReplicationError is used to indicate if the federation @@ -74,14 +75,23 @@ type GatewayLocator struct { // our chosen mesh-gateway configuration can reach the primary's servers (like // a ping or status RPC) we cheat and use the federation state replicator // goroutine's success or failure as a proxy. -func (g *GatewayLocator) SetLastFederationStateReplicationError(err error) { +func (g *GatewayLocator) SetLastFederationStateReplicationError(err error, fromReplication bool) { + if g == nil { + return + } + g.lastReplLock.Lock() defer g.lastReplLock.Unlock() + oldChoice := g.dialPrimaryThroughLocalGateway() if err == nil { g.lastReplSuccess = time.Now().UTC() g.lastReplSuccesses++ g.lastReplFailures = 0 + if fromReplication { + // If we get info from replication, assume replication is operating. + g.useReplicationSignal = true + } } else { g.lastReplFailure = time.Now().UTC() g.lastReplFailures++ @@ -93,6 +103,16 @@ func (g *GatewayLocator) SetLastFederationStateReplicationError(err error) { } } +func (g *GatewayLocator) SetUseReplicationSignal(newValue bool) { + if g == nil { + return + } + + g.lastReplLock.Lock() + g.useReplicationSignal = newValue + g.lastReplLock.Unlock() +} + func (g *GatewayLocator) logPrimaryDialingMessage(useLocal bool) { if g.datacenter == g.primaryDatacenter { // These messages are useless when the server is in the primary @@ -140,6 +160,12 @@ func (g *GatewayLocator) DialPrimaryThroughLocalGateway() bool { const localFederationStateReplicatorFailuresBeforeDialingDirectly = 3 func (g *GatewayLocator) dialPrimaryThroughLocalGateway() bool { + if !g.useReplicationSignal { + // Followers should blindly assume these gateways work. The leader will + // try to bypass them and correct the replicated federation state info + // that the followers will eventually pick up on. + return true + } if g.lastReplSuccess.IsZero() && g.lastReplFailure.IsZero() { return false // no data yet } diff --git a/agent/consul/gateway_locator_test.go b/agent/consul/gateway_locator_test.go index bf33c51486..95da8ed2d8 100644 --- a/agent/consul/gateway_locator_test.go +++ b/agent/consul/gateway_locator_test.go @@ -18,6 +18,9 @@ import ( func TestGatewayLocator(t *testing.T) { state := state.NewStateStore(nil) + serverRoles := []string{"leader", "follower"} + now := time.Now().UTC() + dc1 := &structs.FederationState{ Datacenter: "dc1", MeshGateways: []structs.CheckServiceNode{ @@ -44,67 +47,105 @@ func TestGatewayLocator(t *testing.T) { } t.Run("primary - no data", func(t *testing.T) { - logger := testutil.Logger(t) - tsd := &testServerDelegate{State: state, isLeader: true} - g := NewGatewayLocator( - logger, - tsd, - "dc1", - "dc1", - ) + for _, role := range serverRoles { + t.Run(role, func(t *testing.T) { + isLeader := role == "leader" - idx, err := g.runOnce(0) - require.NoError(t, err) - assert.False(t, g.DialPrimaryThroughLocalGateway()) - assert.Equal(t, uint64(1), idx) - assert.Len(t, tsd.Calls, 1) - assert.Equal(t, []string(nil), g.listGateways(false)) - assert.Equal(t, []string(nil), g.listGateways(true)) + logger := testutil.Logger(t) + tsd := &testServerDelegate{State: state, isLeader: isLeader} + if !isLeader { + tsd.lastContact = now + } + g := NewGatewayLocator( + logger, + tsd, + "dc1", + "dc1", + ) + g.SetUseReplicationSignal(isLeader) + + idx, err := g.runOnce(0) + require.NoError(t, err) + assert.False(t, g.DialPrimaryThroughLocalGateway()) + assert.Equal(t, uint64(1), idx) + assert.Len(t, tsd.Calls, 1) + assert.Equal(t, []string(nil), g.listGateways(false)) + assert.Equal(t, []string(nil), g.listGateways(true)) + }) + } }) t.Run("secondary - no data", func(t *testing.T) { - logger := testutil.Logger(t) - tsd := &testServerDelegate{State: state, isLeader: true} - g := NewGatewayLocator( - logger, - tsd, - "dc2", - "dc1", - ) + for _, role := range serverRoles { + t.Run(role, func(t *testing.T) { + isLeader := role == "leader" - idx, err := g.runOnce(0) - require.NoError(t, err) - assert.False(t, g.DialPrimaryThroughLocalGateway()) - assert.Equal(t, uint64(1), idx) - assert.Len(t, tsd.Calls, 1) - assert.Equal(t, []string(nil), g.listGateways(false)) - assert.Equal(t, []string(nil), g.listGateways(true)) + logger := testutil.Logger(t) + tsd := &testServerDelegate{State: state, isLeader: isLeader} + if !isLeader { + tsd.lastContact = now + } + g := NewGatewayLocator( + logger, + tsd, + "dc2", + "dc1", + ) + g.SetUseReplicationSignal(isLeader) + + idx, err := g.runOnce(0) + require.NoError(t, err) + if isLeader { + assert.False(t, g.DialPrimaryThroughLocalGateway()) + } else { + assert.True(t, g.DialPrimaryThroughLocalGateway()) + } + assert.Equal(t, uint64(1), idx) + assert.Len(t, tsd.Calls, 1) + assert.Equal(t, []string(nil), g.listGateways(false)) + assert.Equal(t, []string(nil), g.listGateways(true)) + }) + } }) t.Run("secondary - just fallback", func(t *testing.T) { - logger := testutil.Logger(t) - tsd := &testServerDelegate{State: state, isLeader: true} - g := NewGatewayLocator( - logger, - tsd, - "dc2", - "dc1", - ) - g.RefreshPrimaryGatewayFallbackAddresses([]string{ - "7.7.7.7:7777", - "8.8.8.8:8888", - }) + for _, role := range serverRoles { + t.Run(role, func(t *testing.T) { + isLeader := role == "leader" - idx, err := g.runOnce(0) - require.NoError(t, err) - assert.False(t, g.DialPrimaryThroughLocalGateway()) - assert.Equal(t, uint64(1), idx) - assert.Len(t, tsd.Calls, 1) - assert.Equal(t, []string(nil), g.listGateways(false)) - assert.Equal(t, []string{ - "7.7.7.7:7777", - "8.8.8.8:8888", - }, g.listGateways(true)) + logger := testutil.Logger(t) + tsd := &testServerDelegate{State: state, isLeader: isLeader} + if !isLeader { + tsd.lastContact = now + } + g := NewGatewayLocator( + logger, + tsd, + "dc2", + "dc1", + ) + g.SetUseReplicationSignal(isLeader) + g.RefreshPrimaryGatewayFallbackAddresses([]string{ + "7.7.7.7:7777", + "8.8.8.8:8888", + }) + + idx, err := g.runOnce(0) + require.NoError(t, err) + if isLeader { + assert.False(t, g.DialPrimaryThroughLocalGateway()) + } else { + assert.True(t, g.DialPrimaryThroughLocalGateway()) + } + assert.Equal(t, uint64(1), idx) + assert.Len(t, tsd.Calls, 1) + assert.Equal(t, []string(nil), g.listGateways(false)) + assert.Equal(t, []string{ + "7.7.7.7:7777", + "8.8.8.8:8888", + }, g.listGateways(true)) + }) + } }) // Insert data for the dcs @@ -112,56 +153,88 @@ func TestGatewayLocator(t *testing.T) { require.NoError(t, state.FederationStateSet(2, dc2)) t.Run("primary - with data", func(t *testing.T) { - logger := testutil.Logger(t) - tsd := &testServerDelegate{State: state, isLeader: true} - g := NewGatewayLocator( - logger, - tsd, - "dc1", - "dc1", - ) + for _, role := range serverRoles { + t.Run(role, func(t *testing.T) { + isLeader := role == "leader" - idx, err := g.runOnce(0) - require.NoError(t, err) - assert.False(t, g.DialPrimaryThroughLocalGateway()) - assert.Equal(t, uint64(2), idx) - assert.Len(t, tsd.Calls, 1) - assert.Equal(t, []string{ - "1.2.3.4:5555", - "4.3.2.1:9999", - }, g.listGateways(false)) - assert.Equal(t, []string{ - "1.2.3.4:5555", - "4.3.2.1:9999", - }, g.listGateways(true)) + logger := testutil.Logger(t) + tsd := &testServerDelegate{State: state, isLeader: isLeader} + if !isLeader { + tsd.lastContact = now + } + g := NewGatewayLocator( + logger, + tsd, + "dc1", + "dc1", + ) + g.SetUseReplicationSignal(isLeader) + + idx, err := g.runOnce(0) + require.NoError(t, err) + assert.False(t, g.DialPrimaryThroughLocalGateway()) + assert.Equal(t, uint64(2), idx) + assert.Len(t, tsd.Calls, 1) + assert.Equal(t, []string{ + "1.2.3.4:5555", + "4.3.2.1:9999", + }, g.listGateways(false)) + assert.Equal(t, []string{ + "1.2.3.4:5555", + "4.3.2.1:9999", + }, g.listGateways(true)) + }) + } }) t.Run("secondary - with data", func(t *testing.T) { - logger := testutil.Logger(t) - tsd := &testServerDelegate{State: state, isLeader: true} - g := NewGatewayLocator( - logger, - tsd, - "dc2", - "dc1", - ) + for _, role := range serverRoles { + t.Run(role, func(t *testing.T) { + isLeader := role == "leader" - idx, err := g.runOnce(0) - require.NoError(t, err) - assert.False(t, g.DialPrimaryThroughLocalGateway()) - assert.Equal(t, uint64(2), idx) - assert.Len(t, tsd.Calls, 1) - assert.Equal(t, []string{ - "5.6.7.8:5555", - "8.7.6.5:9999", - }, g.listGateways(false)) - assert.Equal(t, []string{ - "1.2.3.4:5555", - "4.3.2.1:9999", - }, g.listGateways(true)) + logger := testutil.Logger(t) + tsd := &testServerDelegate{State: state, isLeader: isLeader} + if !isLeader { + tsd.lastContact = now + } + g := NewGatewayLocator( + logger, + tsd, + "dc2", + "dc1", + ) + g.SetUseReplicationSignal(isLeader) + + idx, err := g.runOnce(0) + require.NoError(t, err) + if isLeader { + assert.False(t, g.DialPrimaryThroughLocalGateway()) + } else { + assert.True(t, g.DialPrimaryThroughLocalGateway()) + } + assert.Equal(t, uint64(2), idx) + assert.Len(t, tsd.Calls, 1) + assert.Equal(t, []string{ + "5.6.7.8:5555", + "8.7.6.5:9999", + }, g.listGateways(false)) + if isLeader { + assert.Equal(t, []string{ + "1.2.3.4:5555", + "4.3.2.1:9999", + }, g.listGateways(true)) + } else { + assert.Equal(t, []string{ + "5.6.7.8:5555", + "8.7.6.5:9999", + }, g.listGateways(true)) + } + }) + } }) t.Run("secondary - with data and fallback - no repl", func(t *testing.T) { + // Only run for the leader. logger := testutil.Logger(t) tsd := &testServerDelegate{State: state, isLeader: true} g := NewGatewayLocator( @@ -170,6 +243,7 @@ func TestGatewayLocator(t *testing.T) { "dc2", "dc1", ) + g.SetUseReplicationSignal(true) g.RefreshPrimaryGatewayFallbackAddresses([]string{ "7.7.7.7:7777", @@ -194,6 +268,7 @@ func TestGatewayLocator(t *testing.T) { }) t.Run("secondary - with data and fallback - repl ok", func(t *testing.T) { + // Only run for the leader. logger := testutil.Logger(t) tsd := &testServerDelegate{State: state, isLeader: true} g := NewGatewayLocator( @@ -202,13 +277,14 @@ func TestGatewayLocator(t *testing.T) { "dc2", "dc1", ) + g.SetUseReplicationSignal(true) g.RefreshPrimaryGatewayFallbackAddresses([]string{ "7.7.7.7:7777", "8.8.8.8:8888", }) - g.SetLastFederationStateReplicationError(nil) + g.SetLastFederationStateReplicationError(nil, true) idx, err := g.runOnce(0) require.NoError(t, err) @@ -226,6 +302,7 @@ func TestGatewayLocator(t *testing.T) { }) t.Run("secondary - with data and fallback - repl ok then failed 2 times", func(t *testing.T) { + // Only run for the leader. logger := testutil.Logger(t) tsd := &testServerDelegate{State: state, isLeader: true} g := NewGatewayLocator( @@ -234,15 +311,16 @@ func TestGatewayLocator(t *testing.T) { "dc2", "dc1", ) + g.SetUseReplicationSignal(true) g.RefreshPrimaryGatewayFallbackAddresses([]string{ "7.7.7.7:7777", "8.8.8.8:8888", }) - g.SetLastFederationStateReplicationError(nil) - g.SetLastFederationStateReplicationError(errors.New("fake")) - g.SetLastFederationStateReplicationError(errors.New("fake")) + g.SetLastFederationStateReplicationError(nil, true) + g.SetLastFederationStateReplicationError(errors.New("fake"), true) + g.SetLastFederationStateReplicationError(errors.New("fake"), true) idx, err := g.runOnce(0) require.NoError(t, err) @@ -260,6 +338,7 @@ func TestGatewayLocator(t *testing.T) { }) t.Run("secondary - with data and fallback - repl ok then failed 3 times", func(t *testing.T) { + // Only run for the leader. logger := testutil.Logger(t) tsd := &testServerDelegate{State: state, isLeader: true} g := NewGatewayLocator( @@ -268,16 +347,17 @@ func TestGatewayLocator(t *testing.T) { "dc2", "dc1", ) + g.SetUseReplicationSignal(true) g.RefreshPrimaryGatewayFallbackAddresses([]string{ "7.7.7.7:7777", "8.8.8.8:8888", }) - g.SetLastFederationStateReplicationError(nil) - g.SetLastFederationStateReplicationError(errors.New("fake")) - g.SetLastFederationStateReplicationError(errors.New("fake")) - g.SetLastFederationStateReplicationError(errors.New("fake")) + g.SetLastFederationStateReplicationError(nil, true) + g.SetLastFederationStateReplicationError(errors.New("fake"), true) + g.SetLastFederationStateReplicationError(errors.New("fake"), true) + g.SetLastFederationStateReplicationError(errors.New("fake"), true) idx, err := g.runOnce(0) require.NoError(t, err) @@ -297,6 +377,7 @@ func TestGatewayLocator(t *testing.T) { }) t.Run("secondary - with data and fallback - repl ok then failed 3 times then ok again", func(t *testing.T) { + // Only run for the leader. logger := testutil.Logger(t) tsd := &testServerDelegate{State: state, isLeader: true} g := NewGatewayLocator( @@ -305,17 +386,18 @@ func TestGatewayLocator(t *testing.T) { "dc2", "dc1", ) + g.SetUseReplicationSignal(true) g.RefreshPrimaryGatewayFallbackAddresses([]string{ "7.7.7.7:7777", "8.8.8.8:8888", }) - g.SetLastFederationStateReplicationError(nil) - g.SetLastFederationStateReplicationError(errors.New("fake")) - g.SetLastFederationStateReplicationError(errors.New("fake")) - g.SetLastFederationStateReplicationError(errors.New("fake")) - g.SetLastFederationStateReplicationError(nil) + g.SetLastFederationStateReplicationError(nil, true) + g.SetLastFederationStateReplicationError(errors.New("fake"), true) + g.SetLastFederationStateReplicationError(errors.New("fake"), true) + g.SetLastFederationStateReplicationError(errors.New("fake"), true) + g.SetLastFederationStateReplicationError(nil, true) idx, err := g.runOnce(0) require.NoError(t, err) diff --git a/agent/consul/leader.go b/agent/consul/leader.go index e7028098c0..c1a15d1853 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -978,12 +978,22 @@ func (s *Server) startFederationStateReplication() { return } + if s.gatewayLocator != nil { + s.gatewayLocator.SetUseReplicationSignal(true) + s.gatewayLocator.SetLastFederationStateReplicationError(nil, false) + } + s.leaderRoutineManager.Start(federationStateReplicationRoutineName, s.federationStateReplicator.Run) } func (s *Server) stopFederationStateReplication() { // will be a no-op when not started s.leaderRoutineManager.Stop(federationStateReplicationRoutineName) + + if s.gatewayLocator != nil { + s.gatewayLocator.SetUseReplicationSignal(false) + s.gatewayLocator.SetLastFederationStateReplicationError(nil, false) + } } // getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary