server: when wan federating via mesh gateways only do heuristic primary DC bypass on the leader (#9366)

Fixes #9341
This commit is contained in:
R.B. Boyer 2021-01-22 10:03:24 -06:00 committed by GitHub
parent 2a5b88e5b1
commit 9ef3f20127
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 236 additions and 114 deletions

4
.changelog/9366.txt Normal file
View File

@ -0,0 +1,4 @@
```release-note:bug
server: When wan federating via mesh gateways only do heuristic primary DC bypass on the leader.
```

View File

@ -39,7 +39,7 @@ func (r *FederationStateReplicator) FetchRemote(lastRemoteIndex uint64) (int, in
} }
lenRemote, remote, remoteIndex, err := r.fetchRemote(lastRemoteIndex) lenRemote, remote, remoteIndex, err := r.fetchRemote(lastRemoteIndex)
if r.gatewayLocator != nil { if r.gatewayLocator != nil {
r.gatewayLocator.SetLastFederationStateReplicationError(err) r.gatewayLocator.SetLastFederationStateReplicationError(err, true)
} }
return lenRemote, remote, remoteIndex, err return lenRemote, remote, remoteIndex, err
} }

View File

@ -59,11 +59,12 @@ type GatewayLocator struct {
// these are a collection of measurements that factor into deciding if we // these are a collection of measurements that factor into deciding if we
// should directly dial the primary's mesh gateways or if we should try to // should directly dial the primary's mesh gateways or if we should try to
// route through our local gateways (if they are up). // route through our local gateways (if they are up).
lastReplLock sync.Mutex lastReplLock sync.Mutex
lastReplSuccess time.Time lastReplSuccess time.Time
lastReplFailure time.Time lastReplFailure time.Time
lastReplSuccesses uint64 lastReplSuccesses uint64
lastReplFailures uint64 lastReplFailures uint64
useReplicationSignal bool // this should be set to true on the leader
} }
// SetLastFederationStateReplicationError is used to indicate if the federation // SetLastFederationStateReplicationError is used to indicate if the federation
@ -74,14 +75,23 @@ type GatewayLocator struct {
// our chosen mesh-gateway configuration can reach the primary's servers (like // our chosen mesh-gateway configuration can reach the primary's servers (like
// a ping or status RPC) we cheat and use the federation state replicator // a ping or status RPC) we cheat and use the federation state replicator
// goroutine's success or failure as a proxy. // goroutine's success or failure as a proxy.
func (g *GatewayLocator) SetLastFederationStateReplicationError(err error) { func (g *GatewayLocator) SetLastFederationStateReplicationError(err error, fromReplication bool) {
if g == nil {
return
}
g.lastReplLock.Lock() g.lastReplLock.Lock()
defer g.lastReplLock.Unlock() defer g.lastReplLock.Unlock()
oldChoice := g.dialPrimaryThroughLocalGateway() oldChoice := g.dialPrimaryThroughLocalGateway()
if err == nil { if err == nil {
g.lastReplSuccess = time.Now().UTC() g.lastReplSuccess = time.Now().UTC()
g.lastReplSuccesses++ g.lastReplSuccesses++
g.lastReplFailures = 0 g.lastReplFailures = 0
if fromReplication {
// If we get info from replication, assume replication is operating.
g.useReplicationSignal = true
}
} else { } else {
g.lastReplFailure = time.Now().UTC() g.lastReplFailure = time.Now().UTC()
g.lastReplFailures++ g.lastReplFailures++
@ -93,6 +103,16 @@ func (g *GatewayLocator) SetLastFederationStateReplicationError(err error) {
} }
} }
func (g *GatewayLocator) SetUseReplicationSignal(newValue bool) {
if g == nil {
return
}
g.lastReplLock.Lock()
g.useReplicationSignal = newValue
g.lastReplLock.Unlock()
}
func (g *GatewayLocator) logPrimaryDialingMessage(useLocal bool) { func (g *GatewayLocator) logPrimaryDialingMessage(useLocal bool) {
if g.datacenter == g.primaryDatacenter { if g.datacenter == g.primaryDatacenter {
// These messages are useless when the server is in the primary // These messages are useless when the server is in the primary
@ -140,6 +160,12 @@ func (g *GatewayLocator) DialPrimaryThroughLocalGateway() bool {
const localFederationStateReplicatorFailuresBeforeDialingDirectly = 3 const localFederationStateReplicatorFailuresBeforeDialingDirectly = 3
func (g *GatewayLocator) dialPrimaryThroughLocalGateway() bool { func (g *GatewayLocator) dialPrimaryThroughLocalGateway() bool {
if !g.useReplicationSignal {
// Followers should blindly assume these gateways work. The leader will
// try to bypass them and correct the replicated federation state info
// that the followers will eventually pick up on.
return true
}
if g.lastReplSuccess.IsZero() && g.lastReplFailure.IsZero() { if g.lastReplSuccess.IsZero() && g.lastReplFailure.IsZero() {
return false // no data yet return false // no data yet
} }

View File

@ -18,6 +18,9 @@ import (
func TestGatewayLocator(t *testing.T) { func TestGatewayLocator(t *testing.T) {
state := state.NewStateStore(nil) state := state.NewStateStore(nil)
serverRoles := []string{"leader", "follower"}
now := time.Now().UTC()
dc1 := &structs.FederationState{ dc1 := &structs.FederationState{
Datacenter: "dc1", Datacenter: "dc1",
MeshGateways: []structs.CheckServiceNode{ MeshGateways: []structs.CheckServiceNode{
@ -44,67 +47,105 @@ func TestGatewayLocator(t *testing.T) {
} }
t.Run("primary - no data", func(t *testing.T) { t.Run("primary - no data", func(t *testing.T) {
logger := testutil.Logger(t) for _, role := range serverRoles {
tsd := &testServerDelegate{State: state, isLeader: true} t.Run(role, func(t *testing.T) {
g := NewGatewayLocator( isLeader := role == "leader"
logger,
tsd,
"dc1",
"dc1",
)
idx, err := g.runOnce(0) logger := testutil.Logger(t)
require.NoError(t, err) tsd := &testServerDelegate{State: state, isLeader: isLeader}
assert.False(t, g.DialPrimaryThroughLocalGateway()) if !isLeader {
assert.Equal(t, uint64(1), idx) tsd.lastContact = now
assert.Len(t, tsd.Calls, 1) }
assert.Equal(t, []string(nil), g.listGateways(false)) g := NewGatewayLocator(
assert.Equal(t, []string(nil), g.listGateways(true)) logger,
tsd,
"dc1",
"dc1",
)
g.SetUseReplicationSignal(isLeader)
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(1), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true))
})
}
}) })
t.Run("secondary - no data", func(t *testing.T) { t.Run("secondary - no data", func(t *testing.T) {
logger := testutil.Logger(t) for _, role := range serverRoles {
tsd := &testServerDelegate{State: state, isLeader: true} t.Run(role, func(t *testing.T) {
g := NewGatewayLocator( isLeader := role == "leader"
logger,
tsd,
"dc2",
"dc1",
)
idx, err := g.runOnce(0) logger := testutil.Logger(t)
require.NoError(t, err) tsd := &testServerDelegate{State: state, isLeader: isLeader}
assert.False(t, g.DialPrimaryThroughLocalGateway()) if !isLeader {
assert.Equal(t, uint64(1), idx) tsd.lastContact = now
assert.Len(t, tsd.Calls, 1) }
assert.Equal(t, []string(nil), g.listGateways(false)) g := NewGatewayLocator(
assert.Equal(t, []string(nil), g.listGateways(true)) logger,
tsd,
"dc2",
"dc1",
)
g.SetUseReplicationSignal(isLeader)
idx, err := g.runOnce(0)
require.NoError(t, err)
if isLeader {
assert.False(t, g.DialPrimaryThroughLocalGateway())
} else {
assert.True(t, g.DialPrimaryThroughLocalGateway())
}
assert.Equal(t, uint64(1), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true))
})
}
}) })
t.Run("secondary - just fallback", func(t *testing.T) { t.Run("secondary - just fallback", func(t *testing.T) {
logger := testutil.Logger(t) for _, role := range serverRoles {
tsd := &testServerDelegate{State: state, isLeader: true} t.Run(role, func(t *testing.T) {
g := NewGatewayLocator( isLeader := role == "leader"
logger,
tsd,
"dc2",
"dc1",
)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
idx, err := g.runOnce(0) logger := testutil.Logger(t)
require.NoError(t, err) tsd := &testServerDelegate{State: state, isLeader: isLeader}
assert.False(t, g.DialPrimaryThroughLocalGateway()) if !isLeader {
assert.Equal(t, uint64(1), idx) tsd.lastContact = now
assert.Len(t, tsd.Calls, 1) }
assert.Equal(t, []string(nil), g.listGateways(false)) g := NewGatewayLocator(
assert.Equal(t, []string{ logger,
"7.7.7.7:7777", tsd,
"8.8.8.8:8888", "dc2",
}, g.listGateways(true)) "dc1",
)
g.SetUseReplicationSignal(isLeader)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
idx, err := g.runOnce(0)
require.NoError(t, err)
if isLeader {
assert.False(t, g.DialPrimaryThroughLocalGateway())
} else {
assert.True(t, g.DialPrimaryThroughLocalGateway())
}
assert.Equal(t, uint64(1), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string{
"7.7.7.7:7777",
"8.8.8.8:8888",
}, g.listGateways(true))
})
}
}) })
// Insert data for the dcs // Insert data for the dcs
@ -112,56 +153,88 @@ func TestGatewayLocator(t *testing.T) {
require.NoError(t, state.FederationStateSet(2, dc2)) require.NoError(t, state.FederationStateSet(2, dc2))
t.Run("primary - with data", func(t *testing.T) { t.Run("primary - with data", func(t *testing.T) {
logger := testutil.Logger(t) for _, role := range serverRoles {
tsd := &testServerDelegate{State: state, isLeader: true} t.Run(role, func(t *testing.T) {
g := NewGatewayLocator( isLeader := role == "leader"
logger,
tsd,
"dc1",
"dc1",
)
idx, err := g.runOnce(0) logger := testutil.Logger(t)
require.NoError(t, err) tsd := &testServerDelegate{State: state, isLeader: isLeader}
assert.False(t, g.DialPrimaryThroughLocalGateway()) if !isLeader {
assert.Equal(t, uint64(2), idx) tsd.lastContact = now
assert.Len(t, tsd.Calls, 1) }
assert.Equal(t, []string{ g := NewGatewayLocator(
"1.2.3.4:5555", logger,
"4.3.2.1:9999", tsd,
}, g.listGateways(false)) "dc1",
assert.Equal(t, []string{ "dc1",
"1.2.3.4:5555", )
"4.3.2.1:9999", g.SetUseReplicationSignal(isLeader)
}, g.listGateways(true))
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(2), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
}, g.listGateways(false))
assert.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
}, g.listGateways(true))
})
}
}) })
t.Run("secondary - with data", func(t *testing.T) { t.Run("secondary - with data", func(t *testing.T) {
logger := testutil.Logger(t) for _, role := range serverRoles {
tsd := &testServerDelegate{State: state, isLeader: true} t.Run(role, func(t *testing.T) {
g := NewGatewayLocator( isLeader := role == "leader"
logger,
tsd,
"dc2",
"dc1",
)
idx, err := g.runOnce(0) logger := testutil.Logger(t)
require.NoError(t, err) tsd := &testServerDelegate{State: state, isLeader: isLeader}
assert.False(t, g.DialPrimaryThroughLocalGateway()) if !isLeader {
assert.Equal(t, uint64(2), idx) tsd.lastContact = now
assert.Len(t, tsd.Calls, 1) }
assert.Equal(t, []string{ g := NewGatewayLocator(
"5.6.7.8:5555", logger,
"8.7.6.5:9999", tsd,
}, g.listGateways(false)) "dc2",
assert.Equal(t, []string{ "dc1",
"1.2.3.4:5555", )
"4.3.2.1:9999", g.SetUseReplicationSignal(isLeader)
}, g.listGateways(true))
idx, err := g.runOnce(0)
require.NoError(t, err)
if isLeader {
assert.False(t, g.DialPrimaryThroughLocalGateway())
} else {
assert.True(t, g.DialPrimaryThroughLocalGateway())
}
assert.Equal(t, uint64(2), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(false))
if isLeader {
assert.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
}, g.listGateways(true))
} else {
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(true))
}
})
}
}) })
t.Run("secondary - with data and fallback - no repl", func(t *testing.T) { t.Run("secondary - with data and fallback - no repl", func(t *testing.T) {
// Only run for the leader.
logger := testutil.Logger(t) logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true} tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator( g := NewGatewayLocator(
@ -170,6 +243,7 @@ func TestGatewayLocator(t *testing.T) {
"dc2", "dc2",
"dc1", "dc1",
) )
g.SetUseReplicationSignal(true)
g.RefreshPrimaryGatewayFallbackAddresses([]string{ g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777", "7.7.7.7:7777",
@ -194,6 +268,7 @@ func TestGatewayLocator(t *testing.T) {
}) })
t.Run("secondary - with data and fallback - repl ok", func(t *testing.T) { t.Run("secondary - with data and fallback - repl ok", func(t *testing.T) {
// Only run for the leader.
logger := testutil.Logger(t) logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true} tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator( g := NewGatewayLocator(
@ -202,13 +277,14 @@ func TestGatewayLocator(t *testing.T) {
"dc2", "dc2",
"dc1", "dc1",
) )
g.SetUseReplicationSignal(true)
g.RefreshPrimaryGatewayFallbackAddresses([]string{ g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777", "7.7.7.7:7777",
"8.8.8.8:8888", "8.8.8.8:8888",
}) })
g.SetLastFederationStateReplicationError(nil) g.SetLastFederationStateReplicationError(nil, true)
idx, err := g.runOnce(0) idx, err := g.runOnce(0)
require.NoError(t, err) require.NoError(t, err)
@ -226,6 +302,7 @@ func TestGatewayLocator(t *testing.T) {
}) })
t.Run("secondary - with data and fallback - repl ok then failed 2 times", func(t *testing.T) { t.Run("secondary - with data and fallback - repl ok then failed 2 times", func(t *testing.T) {
// Only run for the leader.
logger := testutil.Logger(t) logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true} tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator( g := NewGatewayLocator(
@ -234,15 +311,16 @@ func TestGatewayLocator(t *testing.T) {
"dc2", "dc2",
"dc1", "dc1",
) )
g.SetUseReplicationSignal(true)
g.RefreshPrimaryGatewayFallbackAddresses([]string{ g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777", "7.7.7.7:7777",
"8.8.8.8:8888", "8.8.8.8:8888",
}) })
g.SetLastFederationStateReplicationError(nil) g.SetLastFederationStateReplicationError(nil, true)
g.SetLastFederationStateReplicationError(errors.New("fake")) g.SetLastFederationStateReplicationError(errors.New("fake"), true)
g.SetLastFederationStateReplicationError(errors.New("fake")) g.SetLastFederationStateReplicationError(errors.New("fake"), true)
idx, err := g.runOnce(0) idx, err := g.runOnce(0)
require.NoError(t, err) require.NoError(t, err)
@ -260,6 +338,7 @@ func TestGatewayLocator(t *testing.T) {
}) })
t.Run("secondary - with data and fallback - repl ok then failed 3 times", func(t *testing.T) { t.Run("secondary - with data and fallback - repl ok then failed 3 times", func(t *testing.T) {
// Only run for the leader.
logger := testutil.Logger(t) logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true} tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator( g := NewGatewayLocator(
@ -268,16 +347,17 @@ func TestGatewayLocator(t *testing.T) {
"dc2", "dc2",
"dc1", "dc1",
) )
g.SetUseReplicationSignal(true)
g.RefreshPrimaryGatewayFallbackAddresses([]string{ g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777", "7.7.7.7:7777",
"8.8.8.8:8888", "8.8.8.8:8888",
}) })
g.SetLastFederationStateReplicationError(nil) g.SetLastFederationStateReplicationError(nil, true)
g.SetLastFederationStateReplicationError(errors.New("fake")) g.SetLastFederationStateReplicationError(errors.New("fake"), true)
g.SetLastFederationStateReplicationError(errors.New("fake")) g.SetLastFederationStateReplicationError(errors.New("fake"), true)
g.SetLastFederationStateReplicationError(errors.New("fake")) g.SetLastFederationStateReplicationError(errors.New("fake"), true)
idx, err := g.runOnce(0) idx, err := g.runOnce(0)
require.NoError(t, err) require.NoError(t, err)
@ -297,6 +377,7 @@ func TestGatewayLocator(t *testing.T) {
}) })
t.Run("secondary - with data and fallback - repl ok then failed 3 times then ok again", func(t *testing.T) { t.Run("secondary - with data and fallback - repl ok then failed 3 times then ok again", func(t *testing.T) {
// Only run for the leader.
logger := testutil.Logger(t) logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true} tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator( g := NewGatewayLocator(
@ -305,17 +386,18 @@ func TestGatewayLocator(t *testing.T) {
"dc2", "dc2",
"dc1", "dc1",
) )
g.SetUseReplicationSignal(true)
g.RefreshPrimaryGatewayFallbackAddresses([]string{ g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777", "7.7.7.7:7777",
"8.8.8.8:8888", "8.8.8.8:8888",
}) })
g.SetLastFederationStateReplicationError(nil) g.SetLastFederationStateReplicationError(nil, true)
g.SetLastFederationStateReplicationError(errors.New("fake")) g.SetLastFederationStateReplicationError(errors.New("fake"), true)
g.SetLastFederationStateReplicationError(errors.New("fake")) g.SetLastFederationStateReplicationError(errors.New("fake"), true)
g.SetLastFederationStateReplicationError(errors.New("fake")) g.SetLastFederationStateReplicationError(errors.New("fake"), true)
g.SetLastFederationStateReplicationError(nil) g.SetLastFederationStateReplicationError(nil, true)
idx, err := g.runOnce(0) idx, err := g.runOnce(0)
require.NoError(t, err) require.NoError(t, err)

View File

@ -978,12 +978,22 @@ func (s *Server) startFederationStateReplication() {
return return
} }
if s.gatewayLocator != nil {
s.gatewayLocator.SetUseReplicationSignal(true)
s.gatewayLocator.SetLastFederationStateReplicationError(nil, false)
}
s.leaderRoutineManager.Start(federationStateReplicationRoutineName, s.federationStateReplicator.Run) s.leaderRoutineManager.Start(federationStateReplicationRoutineName, s.federationStateReplicator.Run)
} }
func (s *Server) stopFederationStateReplication() { func (s *Server) stopFederationStateReplication() {
// will be a no-op when not started // will be a no-op when not started
s.leaderRoutineManager.Stop(federationStateReplicationRoutineName) s.leaderRoutineManager.Stop(federationStateReplicationRoutineName)
if s.gatewayLocator != nil {
s.gatewayLocator.SetUseReplicationSignal(false)
s.gatewayLocator.SetLastFederationStateReplicationError(nil, false)
}
} }
// getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary // getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary