From c608dc0d606f822282e734f4c3fcc2c41c813196 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" Date: Mon, 25 Jan 2021 17:30:38 -0600 Subject: [PATCH] server: initialize mgw-wanfed to use local gateways more on startup (#9528) Fixes #9342 --- .changelog/9528.txt | 3 + agent/agent_test.go | 2 + agent/consul/gateway_locator.go | 29 ++- agent/consul/gateway_locator_test.go | 317 ++++++++++++++++----------- 4 files changed, 219 insertions(+), 132 deletions(-) create mode 100644 .changelog/9528.txt diff --git a/.changelog/9528.txt b/.changelog/9528.txt new file mode 100644 index 0000000000..008e365630 --- /dev/null +++ b/.changelog/9528.txt @@ -0,0 +1,3 @@ +```release-note:bug +server: When wan federating via mesh gateways after initial federation default to using the local mesh gateways unless the heuristic indicates a bypass is required. +``` diff --git a/agent/agent_test.go b/agent/agent_test.go index 4fb3952773..aa8a1b9efd 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -4696,6 +4696,7 @@ func TestAgent_JoinWAN_viaMeshGateway(t *testing.T) { } # wanfed primary_gateways = ["` + gwAddr + `"] + retry_interval_wan = "250ms" connect { enabled = true enable_mesh_gateway_wan_federation = true @@ -4721,6 +4722,7 @@ func TestAgent_JoinWAN_viaMeshGateway(t *testing.T) { } # wanfed primary_gateways = ["` + gwAddr + `"] + retry_interval_wan = "250ms" connect { enabled = true enable_mesh_gateway_wan_federation = true diff --git a/agent/consul/gateway_locator.go b/agent/consul/gateway_locator.go index 0846eaf073..cac63692f7 100644 --- a/agent/consul/gateway_locator.go +++ b/agent/consul/gateway_locator.go @@ -43,9 +43,10 @@ type GatewayLocator struct { primaryDatacenter string // these ONLY contain ones that have the wanfed:1 meta - gatewaysLock sync.Mutex - primaryGateways []string // WAN addrs - localGateways []string // LAN addrs + gatewaysLock sync.Mutex + primaryGateways []string // WAN addrs + localGateways []string // LAN addrs + populatedGateways bool // primaryMeshGatewayDiscoveredAddresses is the current fallback addresses // for the mesh gateways in the primary datacenter. @@ -205,6 +206,10 @@ func (g *GatewayLocator) listGateways(primary bool) []string { g.gatewaysLock.Lock() defer g.gatewaysLock.Unlock() + if !g.populatedGateways { + return nil // don't even do anything yet + } + var addrs []string if primary { if g.datacenter == g.primaryDatacenter { @@ -267,6 +272,7 @@ type serverDelegate interface { blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error IsLeader() bool LeaderLastContact() time.Time + setDatacenterSupportsFederationStates() } func NewGatewayLocator( @@ -283,6 +289,8 @@ func NewGatewayLocator( primaryGatewaysReadyCh: make(chan struct{}), } g.logPrimaryDialingMessage(g.DialPrimaryThroughLocalGateway()) + // initialize + g.SetLastFederationStateReplicationError(nil, false) return g } @@ -292,7 +300,10 @@ func (g *GatewayLocator) Run(ctx context.Context) { var lastFetchIndex uint64 retryLoopBackoff(ctx, func() error { idx, err := g.runOnce(lastFetchIndex) - if err != nil { + if errors.Is(err, errGatewayLocalStateNotInitialized) { + // don't do exponential backoff for something that's not broken + return nil + } else if err != nil { return err } @@ -300,9 +311,7 @@ func (g *GatewayLocator) Run(ctx context.Context) { return nil }, func(err error) { - if !errors.Is(err, errGatewayLocalStateNotInitialized) { - g.logger.Error("error tracking primary and local mesh gateways", "error", err) - } + g.logger.Error("error tracking primary and local mesh gateways", "error", err) }) } @@ -367,6 +376,10 @@ func (g *GatewayLocator) checkLocalStateIsReady() error { } func (g *GatewayLocator) updateFromState(results []*structs.FederationState) { + if len(results) > 0 { + g.srv.setDatacenterSupportsFederationStates() + } + var ( local structs.CheckServiceNodes primary structs.CheckServiceNodes @@ -388,6 +401,8 @@ func (g *GatewayLocator) updateFromState(results []*structs.FederationState) { g.gatewaysLock.Lock() defer g.gatewaysLock.Unlock() + g.populatedGateways = true + changed := false primaryReady := false if !stringslice.Equal(g.primaryGateways, primaryAddrs) { diff --git a/agent/consul/gateway_locator_test.go b/agent/consul/gateway_locator_test.go index 95da8ed2d8..01bbfb105f 100644 --- a/agent/consul/gateway_locator_test.go +++ b/agent/consul/gateway_locator_test.go @@ -2,6 +2,7 @@ package consul import ( "errors" + "sync/atomic" "testing" "time" @@ -64,13 +65,25 @@ func TestGatewayLocator(t *testing.T) { ) g.SetUseReplicationSignal(isLeader) + t.Run("before first run", func(t *testing.T) { + assert.False(t, g.DialPrimaryThroughLocalGateway()) // not important + assert.Len(t, tsd.Calls, 0) + assert.Equal(t, []string(nil), g.listGateways(false)) + assert.Equal(t, []string(nil), g.listGateways(true)) + assert.False(t, tsd.datacenterSupportsFederationStates()) + }) + idx, err := g.runOnce(0) require.NoError(t, err) - assert.False(t, g.DialPrimaryThroughLocalGateway()) assert.Equal(t, uint64(1), idx) - assert.Len(t, tsd.Calls, 1) - assert.Equal(t, []string(nil), g.listGateways(false)) - assert.Equal(t, []string(nil), g.listGateways(true)) + + t.Run("after first run", func(t *testing.T) { + assert.False(t, g.DialPrimaryThroughLocalGateway()) // not important + assert.Len(t, tsd.Calls, 1) + assert.Equal(t, []string(nil), g.listGateways(false)) + assert.Equal(t, []string(nil), g.listGateways(true)) + assert.False(t, tsd.datacenterSupportsFederationStates()) // no results, so we don't flip the bit yet + }) }) } }) @@ -93,17 +106,25 @@ func TestGatewayLocator(t *testing.T) { ) g.SetUseReplicationSignal(isLeader) + t.Run("before first run", func(t *testing.T) { + assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure! + assert.Len(t, tsd.Calls, 0) + assert.Equal(t, []string(nil), g.listGateways(false)) + assert.Equal(t, []string(nil), g.listGateways(true)) + assert.False(t, tsd.datacenterSupportsFederationStates()) + }) + idx, err := g.runOnce(0) require.NoError(t, err) - if isLeader { - assert.False(t, g.DialPrimaryThroughLocalGateway()) - } else { - assert.True(t, g.DialPrimaryThroughLocalGateway()) - } assert.Equal(t, uint64(1), idx) - assert.Len(t, tsd.Calls, 1) - assert.Equal(t, []string(nil), g.listGateways(false)) - assert.Equal(t, []string(nil), g.listGateways(true)) + + t.Run("after first run", func(t *testing.T) { + assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure! + assert.Len(t, tsd.Calls, 1) + assert.Equal(t, []string(nil), g.listGateways(false)) + assert.Equal(t, []string(nil), g.listGateways(true)) + assert.False(t, tsd.datacenterSupportsFederationStates()) // no results, so we don't flip the bit yet + }) }) } }) @@ -130,20 +151,28 @@ func TestGatewayLocator(t *testing.T) { "8.8.8.8:8888", }) + t.Run("before first run", func(t *testing.T) { + assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure! + assert.Len(t, tsd.Calls, 0) + assert.Equal(t, []string(nil), g.listGateways(false)) + assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize + assert.False(t, tsd.datacenterSupportsFederationStates()) + }) + idx, err := g.runOnce(0) require.NoError(t, err) - if isLeader { - assert.False(t, g.DialPrimaryThroughLocalGateway()) - } else { - assert.True(t, g.DialPrimaryThroughLocalGateway()) - } assert.Equal(t, uint64(1), idx) - assert.Len(t, tsd.Calls, 1) - assert.Equal(t, []string(nil), g.listGateways(false)) - assert.Equal(t, []string{ - "7.7.7.7:7777", - "8.8.8.8:8888", - }, g.listGateways(true)) + + t.Run("after first run", func(t *testing.T) { + assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure! + assert.Len(t, tsd.Calls, 1) + assert.Equal(t, []string(nil), g.listGateways(false)) + assert.Equal(t, []string{ + "7.7.7.7:7777", + "8.8.8.8:8888", + }, g.listGateways(true)) + assert.False(t, tsd.datacenterSupportsFederationStates()) // no results, so we don't flip the bit yet + }) }) } }) @@ -170,19 +199,31 @@ func TestGatewayLocator(t *testing.T) { ) g.SetUseReplicationSignal(isLeader) + t.Run("before first run", func(t *testing.T) { + assert.False(t, g.DialPrimaryThroughLocalGateway()) // not important + assert.Len(t, tsd.Calls, 0) + assert.Equal(t, []string(nil), g.listGateways(false)) + assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize + assert.False(t, tsd.datacenterSupportsFederationStates()) + }) + idx, err := g.runOnce(0) require.NoError(t, err) - assert.False(t, g.DialPrimaryThroughLocalGateway()) assert.Equal(t, uint64(2), idx) - assert.Len(t, tsd.Calls, 1) - assert.Equal(t, []string{ - "1.2.3.4:5555", - "4.3.2.1:9999", - }, g.listGateways(false)) - assert.Equal(t, []string{ - "1.2.3.4:5555", - "4.3.2.1:9999", - }, g.listGateways(true)) + + t.Run("after first run", func(t *testing.T) { + assert.False(t, g.DialPrimaryThroughLocalGateway()) // not important + assert.Len(t, tsd.Calls, 1) + assert.Equal(t, []string{ + "1.2.3.4:5555", + "4.3.2.1:9999", + }, g.listGateways(false)) + assert.Equal(t, []string{ + "1.2.3.4:5555", + "4.3.2.1:9999", + }, g.listGateways(true)) + assert.True(t, tsd.datacenterSupportsFederationStates()) // have results, so we flip the bit + }) }) } }) @@ -205,68 +246,36 @@ func TestGatewayLocator(t *testing.T) { ) g.SetUseReplicationSignal(isLeader) + t.Run("before first run", func(t *testing.T) { + assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure! + assert.Len(t, tsd.Calls, 0) + assert.Equal(t, []string(nil), g.listGateways(false)) + assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize + assert.False(t, tsd.datacenterSupportsFederationStates()) + }) + idx, err := g.runOnce(0) require.NoError(t, err) - if isLeader { - assert.False(t, g.DialPrimaryThroughLocalGateway()) - } else { - assert.True(t, g.DialPrimaryThroughLocalGateway()) - } assert.Equal(t, uint64(2), idx) - assert.Len(t, tsd.Calls, 1) - assert.Equal(t, []string{ - "5.6.7.8:5555", - "8.7.6.5:9999", - }, g.listGateways(false)) - if isLeader { + + t.Run("after first run", func(t *testing.T) { + assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure! + assert.Len(t, tsd.Calls, 1) assert.Equal(t, []string{ - "1.2.3.4:5555", - "4.3.2.1:9999", - }, g.listGateways(true)) - } else { + "5.6.7.8:5555", + "8.7.6.5:9999", + }, g.listGateways(false)) assert.Equal(t, []string{ "5.6.7.8:5555", "8.7.6.5:9999", }, g.listGateways(true)) - } + assert.True(t, tsd.datacenterSupportsFederationStates()) // have results, so we flip the bit + }) + }) } }) - t.Run("secondary - with data and fallback - no repl", func(t *testing.T) { - // Only run for the leader. - logger := testutil.Logger(t) - tsd := &testServerDelegate{State: state, isLeader: true} - g := NewGatewayLocator( - logger, - tsd, - "dc2", - "dc1", - ) - g.SetUseReplicationSignal(true) - - g.RefreshPrimaryGatewayFallbackAddresses([]string{ - "7.7.7.7:7777", - "8.8.8.8:8888", - }) - - idx, err := g.runOnce(0) - require.NoError(t, err) - assert.False(t, g.DialPrimaryThroughLocalGateway()) - assert.Equal(t, uint64(2), idx) - assert.Len(t, tsd.Calls, 1) - assert.Equal(t, []string{ - "5.6.7.8:5555", - "8.7.6.5:9999", - }, g.listGateways(false)) - assert.Equal(t, []string{ - "1.2.3.4:5555", - "4.3.2.1:9999", - "7.7.7.7:7777", - "8.8.8.8:8888", - }, g.listGateways(true)) - }) - t.Run("secondary - with data and fallback - repl ok", func(t *testing.T) { // Only run for the leader. logger := testutil.Logger(t) @@ -286,19 +295,31 @@ func TestGatewayLocator(t *testing.T) { g.SetLastFederationStateReplicationError(nil, true) + t.Run("before first run", func(t *testing.T) { + assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure! + assert.Len(t, tsd.Calls, 0) + assert.Equal(t, []string(nil), g.listGateways(false)) + assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize + assert.False(t, tsd.datacenterSupportsFederationStates()) + }) + idx, err := g.runOnce(0) require.NoError(t, err) - assert.True(t, g.DialPrimaryThroughLocalGateway()) assert.Equal(t, uint64(2), idx) - assert.Len(t, tsd.Calls, 1) - assert.Equal(t, []string{ - "5.6.7.8:5555", - "8.7.6.5:9999", - }, g.listGateways(false)) - assert.Equal(t, []string{ - "5.6.7.8:5555", - "8.7.6.5:9999", - }, g.listGateways(true)) + + t.Run("after first run", func(t *testing.T) { + assert.True(t, g.DialPrimaryThroughLocalGateway()) + assert.Len(t, tsd.Calls, 1) + assert.Equal(t, []string{ + "5.6.7.8:5555", + "8.7.6.5:9999", + }, g.listGateways(false)) + assert.Equal(t, []string{ + "5.6.7.8:5555", + "8.7.6.5:9999", + }, g.listGateways(true)) + assert.True(t, tsd.datacenterSupportsFederationStates()) // have results, so we flip the bit + }) }) t.Run("secondary - with data and fallback - repl ok then failed 2 times", func(t *testing.T) { @@ -322,19 +343,31 @@ func TestGatewayLocator(t *testing.T) { g.SetLastFederationStateReplicationError(errors.New("fake"), true) g.SetLastFederationStateReplicationError(errors.New("fake"), true) + t.Run("before first run", func(t *testing.T) { + assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure! + assert.Len(t, tsd.Calls, 0) + assert.Equal(t, []string(nil), g.listGateways(false)) + assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize + assert.False(t, tsd.datacenterSupportsFederationStates()) + }) + idx, err := g.runOnce(0) require.NoError(t, err) - assert.True(t, g.DialPrimaryThroughLocalGateway()) assert.Equal(t, uint64(2), idx) - assert.Len(t, tsd.Calls, 1) - assert.Equal(t, []string{ - "5.6.7.8:5555", - "8.7.6.5:9999", - }, g.listGateways(false)) - assert.Equal(t, []string{ - "5.6.7.8:5555", - "8.7.6.5:9999", - }, g.listGateways(true)) + + t.Run("after first run", func(t *testing.T) { + assert.True(t, g.DialPrimaryThroughLocalGateway()) + assert.Len(t, tsd.Calls, 1) + assert.Equal(t, []string{ + "5.6.7.8:5555", + "8.7.6.5:9999", + }, g.listGateways(false)) + assert.Equal(t, []string{ + "5.6.7.8:5555", + "8.7.6.5:9999", + }, g.listGateways(true)) + assert.True(t, tsd.datacenterSupportsFederationStates()) // have results, so we flip the bit + }) }) t.Run("secondary - with data and fallback - repl ok then failed 3 times", func(t *testing.T) { @@ -359,21 +392,33 @@ func TestGatewayLocator(t *testing.T) { g.SetLastFederationStateReplicationError(errors.New("fake"), true) g.SetLastFederationStateReplicationError(errors.New("fake"), true) + t.Run("before first run", func(t *testing.T) { + assert.False(t, g.DialPrimaryThroughLocalGateway()) // too many errors + assert.Len(t, tsd.Calls, 0) + assert.Equal(t, []string(nil), g.listGateways(false)) + assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize + assert.False(t, tsd.datacenterSupportsFederationStates()) + }) + idx, err := g.runOnce(0) require.NoError(t, err) - assert.False(t, g.DialPrimaryThroughLocalGateway()) assert.Equal(t, uint64(2), idx) - assert.Len(t, tsd.Calls, 1) - assert.Equal(t, []string{ - "5.6.7.8:5555", - "8.7.6.5:9999", - }, g.listGateways(false)) - assert.Equal(t, []string{ - "1.2.3.4:5555", - "4.3.2.1:9999", - "7.7.7.7:7777", - "8.8.8.8:8888", - }, g.listGateways(true)) + + t.Run("after first run", func(t *testing.T) { + assert.False(t, g.DialPrimaryThroughLocalGateway()) + assert.Len(t, tsd.Calls, 1) + assert.Equal(t, []string{ + "5.6.7.8:5555", + "8.7.6.5:9999", + }, g.listGateways(false)) + assert.Equal(t, []string{ + "1.2.3.4:5555", + "4.3.2.1:9999", + "7.7.7.7:7777", + "8.8.8.8:8888", + }, g.listGateways(true)) + assert.True(t, tsd.datacenterSupportsFederationStates()) // have results, so we flip the bit + }) }) t.Run("secondary - with data and fallback - repl ok then failed 3 times then ok again", func(t *testing.T) { @@ -399,23 +444,37 @@ func TestGatewayLocator(t *testing.T) { g.SetLastFederationStateReplicationError(errors.New("fake"), true) g.SetLastFederationStateReplicationError(nil, true) + t.Run("before first run", func(t *testing.T) { + assert.True(t, g.DialPrimaryThroughLocalGateway()) // all better again + assert.Len(t, tsd.Calls, 0) + assert.Equal(t, []string(nil), g.listGateways(false)) + assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize + assert.False(t, tsd.datacenterSupportsFederationStates()) + }) + idx, err := g.runOnce(0) require.NoError(t, err) - assert.True(t, g.DialPrimaryThroughLocalGateway()) assert.Equal(t, uint64(2), idx) - assert.Len(t, tsd.Calls, 1) - assert.Equal(t, []string{ - "5.6.7.8:5555", - "8.7.6.5:9999", - }, g.listGateways(false)) - assert.Equal(t, []string{ - "5.6.7.8:5555", - "8.7.6.5:9999", - }, g.listGateways(true)) + + t.Run("after first run", func(t *testing.T) { + assert.True(t, g.DialPrimaryThroughLocalGateway()) // all better again + assert.Len(t, tsd.Calls, 1) + assert.Equal(t, []string{ + "5.6.7.8:5555", + "8.7.6.5:9999", + }, g.listGateways(false)) + assert.Equal(t, []string{ + "5.6.7.8:5555", + "8.7.6.5:9999", + }, g.listGateways(true)) + assert.True(t, tsd.datacenterSupportsFederationStates()) // have results, so we flip the bit + }) }) } type testServerDelegate struct { + dcSupportsFederationStates int32 // atomically accessed, at start to prevent alignment issues + State *state.Store Calls []uint64 @@ -424,6 +483,14 @@ type testServerDelegate struct { lastContact time.Time } +func (d *testServerDelegate) setDatacenterSupportsFederationStates() { + atomic.StoreInt32(&d.dcSupportsFederationStates, 1) +} + +func (d *testServerDelegate) datacenterSupportsFederationStates() bool { + return atomic.LoadInt32(&d.dcSupportsFederationStates) != 0 +} + // This is just enough to exercise the logic. func (d *testServerDelegate) blockingQuery( queryOpts structs.QueryOptionsCompat,