mirror of https://github.com/status-im/consul.git
server: when wan federating via mesh gateways only do heuristic primary DC bypass on the leader (#9366)
Fixes #9341
This commit is contained in:
parent
4d470f9822
commit
f135c3b64e
|
@ -0,0 +1,4 @@
|
|||
```release-note:bug
|
||||
server: When wan federating via mesh gateways only do heuristic primary DC bypass on the leader.
|
||||
```
|
||||
|
|
@ -39,7 +39,7 @@ func (r *FederationStateReplicator) FetchRemote(lastRemoteIndex uint64) (int, in
|
|||
}
|
||||
lenRemote, remote, remoteIndex, err := r.fetchRemote(lastRemoteIndex)
|
||||
if r.gatewayLocator != nil {
|
||||
r.gatewayLocator.SetLastFederationStateReplicationError(err)
|
||||
r.gatewayLocator.SetLastFederationStateReplicationError(err, true)
|
||||
}
|
||||
return lenRemote, remote, remoteIndex, err
|
||||
}
|
||||
|
|
|
@ -59,11 +59,12 @@ type GatewayLocator struct {
|
|||
// these are a collection of measurements that factor into deciding if we
|
||||
// should directly dial the primary's mesh gateways or if we should try to
|
||||
// route through our local gateways (if they are up).
|
||||
lastReplLock sync.Mutex
|
||||
lastReplSuccess time.Time
|
||||
lastReplFailure time.Time
|
||||
lastReplSuccesses uint64
|
||||
lastReplFailures uint64
|
||||
lastReplLock sync.Mutex
|
||||
lastReplSuccess time.Time
|
||||
lastReplFailure time.Time
|
||||
lastReplSuccesses uint64
|
||||
lastReplFailures uint64
|
||||
useReplicationSignal bool // this should be set to true on the leader
|
||||
}
|
||||
|
||||
// SetLastFederationStateReplicationError is used to indicate if the federation
|
||||
|
@ -74,14 +75,23 @@ type GatewayLocator struct {
|
|||
// our chosen mesh-gateway configuration can reach the primary's servers (like
|
||||
// a ping or status RPC) we cheat and use the federation state replicator
|
||||
// goroutine's success or failure as a proxy.
|
||||
func (g *GatewayLocator) SetLastFederationStateReplicationError(err error) {
|
||||
func (g *GatewayLocator) SetLastFederationStateReplicationError(err error, fromReplication bool) {
|
||||
if g == nil {
|
||||
return
|
||||
}
|
||||
|
||||
g.lastReplLock.Lock()
|
||||
defer g.lastReplLock.Unlock()
|
||||
|
||||
oldChoice := g.dialPrimaryThroughLocalGateway()
|
||||
if err == nil {
|
||||
g.lastReplSuccess = time.Now().UTC()
|
||||
g.lastReplSuccesses++
|
||||
g.lastReplFailures = 0
|
||||
if fromReplication {
|
||||
// If we get info from replication, assume replication is operating.
|
||||
g.useReplicationSignal = true
|
||||
}
|
||||
} else {
|
||||
g.lastReplFailure = time.Now().UTC()
|
||||
g.lastReplFailures++
|
||||
|
@ -93,6 +103,16 @@ func (g *GatewayLocator) SetLastFederationStateReplicationError(err error) {
|
|||
}
|
||||
}
|
||||
|
||||
func (g *GatewayLocator) SetUseReplicationSignal(newValue bool) {
|
||||
if g == nil {
|
||||
return
|
||||
}
|
||||
|
||||
g.lastReplLock.Lock()
|
||||
g.useReplicationSignal = newValue
|
||||
g.lastReplLock.Unlock()
|
||||
}
|
||||
|
||||
func (g *GatewayLocator) logPrimaryDialingMessage(useLocal bool) {
|
||||
if g.datacenter == g.primaryDatacenter {
|
||||
// These messages are useless when the server is in the primary
|
||||
|
@ -140,6 +160,12 @@ func (g *GatewayLocator) DialPrimaryThroughLocalGateway() bool {
|
|||
const localFederationStateReplicatorFailuresBeforeDialingDirectly = 3
|
||||
|
||||
func (g *GatewayLocator) dialPrimaryThroughLocalGateway() bool {
|
||||
if !g.useReplicationSignal {
|
||||
// Followers should blindly assume these gateways work. The leader will
|
||||
// try to bypass them and correct the replicated federation state info
|
||||
// that the followers will eventually pick up on.
|
||||
return true
|
||||
}
|
||||
if g.lastReplSuccess.IsZero() && g.lastReplFailure.IsZero() {
|
||||
return false // no data yet
|
||||
}
|
||||
|
|
|
@ -18,6 +18,9 @@ func TestGatewayLocator(t *testing.T) {
|
|||
state, err := state.NewStateStore(nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
serverRoles := []string{"leader", "follower"}
|
||||
now := time.Now().UTC()
|
||||
|
||||
dc1 := &structs.FederationState{
|
||||
Datacenter: "dc1",
|
||||
MeshGateways: []structs.CheckServiceNode{
|
||||
|
@ -44,67 +47,105 @@ func TestGatewayLocator(t *testing.T) {
|
|||
}
|
||||
|
||||
t.Run("primary - no data", func(t *testing.T) {
|
||||
logger := testutil.Logger(t)
|
||||
tsd := &testServerDelegate{State: state, isLeader: true}
|
||||
g := NewGatewayLocator(
|
||||
logger,
|
||||
tsd,
|
||||
"dc1",
|
||||
"dc1",
|
||||
)
|
||||
for _, role := range serverRoles {
|
||||
t.Run(role, func(t *testing.T) {
|
||||
isLeader := role == "leader"
|
||||
|
||||
idx, err := g.runOnce(0)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, g.DialPrimaryThroughLocalGateway())
|
||||
assert.Equal(t, uint64(1), idx)
|
||||
assert.Len(t, tsd.Calls, 1)
|
||||
assert.Equal(t, []string(nil), g.listGateways(false))
|
||||
assert.Equal(t, []string(nil), g.listGateways(true))
|
||||
logger := testutil.Logger(t)
|
||||
tsd := &testServerDelegate{State: state, isLeader: isLeader}
|
||||
if !isLeader {
|
||||
tsd.lastContact = now
|
||||
}
|
||||
g := NewGatewayLocator(
|
||||
logger,
|
||||
tsd,
|
||||
"dc1",
|
||||
"dc1",
|
||||
)
|
||||
g.SetUseReplicationSignal(isLeader)
|
||||
|
||||
idx, err := g.runOnce(0)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, g.DialPrimaryThroughLocalGateway())
|
||||
assert.Equal(t, uint64(1), idx)
|
||||
assert.Len(t, tsd.Calls, 1)
|
||||
assert.Equal(t, []string(nil), g.listGateways(false))
|
||||
assert.Equal(t, []string(nil), g.listGateways(true))
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("secondary - no data", func(t *testing.T) {
|
||||
logger := testutil.Logger(t)
|
||||
tsd := &testServerDelegate{State: state, isLeader: true}
|
||||
g := NewGatewayLocator(
|
||||
logger,
|
||||
tsd,
|
||||
"dc2",
|
||||
"dc1",
|
||||
)
|
||||
for _, role := range serverRoles {
|
||||
t.Run(role, func(t *testing.T) {
|
||||
isLeader := role == "leader"
|
||||
|
||||
idx, err := g.runOnce(0)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, g.DialPrimaryThroughLocalGateway())
|
||||
assert.Equal(t, uint64(1), idx)
|
||||
assert.Len(t, tsd.Calls, 1)
|
||||
assert.Equal(t, []string(nil), g.listGateways(false))
|
||||
assert.Equal(t, []string(nil), g.listGateways(true))
|
||||
logger := testutil.Logger(t)
|
||||
tsd := &testServerDelegate{State: state, isLeader: isLeader}
|
||||
if !isLeader {
|
||||
tsd.lastContact = now
|
||||
}
|
||||
g := NewGatewayLocator(
|
||||
logger,
|
||||
tsd,
|
||||
"dc2",
|
||||
"dc1",
|
||||
)
|
||||
g.SetUseReplicationSignal(isLeader)
|
||||
|
||||
idx, err := g.runOnce(0)
|
||||
require.NoError(t, err)
|
||||
if isLeader {
|
||||
assert.False(t, g.DialPrimaryThroughLocalGateway())
|
||||
} else {
|
||||
assert.True(t, g.DialPrimaryThroughLocalGateway())
|
||||
}
|
||||
assert.Equal(t, uint64(1), idx)
|
||||
assert.Len(t, tsd.Calls, 1)
|
||||
assert.Equal(t, []string(nil), g.listGateways(false))
|
||||
assert.Equal(t, []string(nil), g.listGateways(true))
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("secondary - just fallback", func(t *testing.T) {
|
||||
logger := testutil.Logger(t)
|
||||
tsd := &testServerDelegate{State: state, isLeader: true}
|
||||
g := NewGatewayLocator(
|
||||
logger,
|
||||
tsd,
|
||||
"dc2",
|
||||
"dc1",
|
||||
)
|
||||
g.RefreshPrimaryGatewayFallbackAddresses([]string{
|
||||
"7.7.7.7:7777",
|
||||
"8.8.8.8:8888",
|
||||
})
|
||||
for _, role := range serverRoles {
|
||||
t.Run(role, func(t *testing.T) {
|
||||
isLeader := role == "leader"
|
||||
|
||||
idx, err := g.runOnce(0)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, g.DialPrimaryThroughLocalGateway())
|
||||
assert.Equal(t, uint64(1), idx)
|
||||
assert.Len(t, tsd.Calls, 1)
|
||||
assert.Equal(t, []string(nil), g.listGateways(false))
|
||||
assert.Equal(t, []string{
|
||||
"7.7.7.7:7777",
|
||||
"8.8.8.8:8888",
|
||||
}, g.listGateways(true))
|
||||
logger := testutil.Logger(t)
|
||||
tsd := &testServerDelegate{State: state, isLeader: isLeader}
|
||||
if !isLeader {
|
||||
tsd.lastContact = now
|
||||
}
|
||||
g := NewGatewayLocator(
|
||||
logger,
|
||||
tsd,
|
||||
"dc2",
|
||||
"dc1",
|
||||
)
|
||||
g.SetUseReplicationSignal(isLeader)
|
||||
g.RefreshPrimaryGatewayFallbackAddresses([]string{
|
||||
"7.7.7.7:7777",
|
||||
"8.8.8.8:8888",
|
||||
})
|
||||
|
||||
idx, err := g.runOnce(0)
|
||||
require.NoError(t, err)
|
||||
if isLeader {
|
||||
assert.False(t, g.DialPrimaryThroughLocalGateway())
|
||||
} else {
|
||||
assert.True(t, g.DialPrimaryThroughLocalGateway())
|
||||
}
|
||||
assert.Equal(t, uint64(1), idx)
|
||||
assert.Len(t, tsd.Calls, 1)
|
||||
assert.Equal(t, []string(nil), g.listGateways(false))
|
||||
assert.Equal(t, []string{
|
||||
"7.7.7.7:7777",
|
||||
"8.8.8.8:8888",
|
||||
}, g.listGateways(true))
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
// Insert data for the dcs
|
||||
|
@ -112,56 +153,88 @@ func TestGatewayLocator(t *testing.T) {
|
|||
require.NoError(t, state.FederationStateSet(2, dc2))
|
||||
|
||||
t.Run("primary - with data", func(t *testing.T) {
|
||||
logger := testutil.Logger(t)
|
||||
tsd := &testServerDelegate{State: state, isLeader: true}
|
||||
g := NewGatewayLocator(
|
||||
logger,
|
||||
tsd,
|
||||
"dc1",
|
||||
"dc1",
|
||||
)
|
||||
for _, role := range serverRoles {
|
||||
t.Run(role, func(t *testing.T) {
|
||||
isLeader := role == "leader"
|
||||
|
||||
idx, err := g.runOnce(0)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, g.DialPrimaryThroughLocalGateway())
|
||||
assert.Equal(t, uint64(2), idx)
|
||||
assert.Len(t, tsd.Calls, 1)
|
||||
assert.Equal(t, []string{
|
||||
"1.2.3.4:5555",
|
||||
"4.3.2.1:9999",
|
||||
}, g.listGateways(false))
|
||||
assert.Equal(t, []string{
|
||||
"1.2.3.4:5555",
|
||||
"4.3.2.1:9999",
|
||||
}, g.listGateways(true))
|
||||
logger := testutil.Logger(t)
|
||||
tsd := &testServerDelegate{State: state, isLeader: isLeader}
|
||||
if !isLeader {
|
||||
tsd.lastContact = now
|
||||
}
|
||||
g := NewGatewayLocator(
|
||||
logger,
|
||||
tsd,
|
||||
"dc1",
|
||||
"dc1",
|
||||
)
|
||||
g.SetUseReplicationSignal(isLeader)
|
||||
|
||||
idx, err := g.runOnce(0)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, g.DialPrimaryThroughLocalGateway())
|
||||
assert.Equal(t, uint64(2), idx)
|
||||
assert.Len(t, tsd.Calls, 1)
|
||||
assert.Equal(t, []string{
|
||||
"1.2.3.4:5555",
|
||||
"4.3.2.1:9999",
|
||||
}, g.listGateways(false))
|
||||
assert.Equal(t, []string{
|
||||
"1.2.3.4:5555",
|
||||
"4.3.2.1:9999",
|
||||
}, g.listGateways(true))
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("secondary - with data", func(t *testing.T) {
|
||||
logger := testutil.Logger(t)
|
||||
tsd := &testServerDelegate{State: state, isLeader: true}
|
||||
g := NewGatewayLocator(
|
||||
logger,
|
||||
tsd,
|
||||
"dc2",
|
||||
"dc1",
|
||||
)
|
||||
for _, role := range serverRoles {
|
||||
t.Run(role, func(t *testing.T) {
|
||||
isLeader := role == "leader"
|
||||
|
||||
idx, err := g.runOnce(0)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, g.DialPrimaryThroughLocalGateway())
|
||||
assert.Equal(t, uint64(2), idx)
|
||||
assert.Len(t, tsd.Calls, 1)
|
||||
assert.Equal(t, []string{
|
||||
"5.6.7.8:5555",
|
||||
"8.7.6.5:9999",
|
||||
}, g.listGateways(false))
|
||||
assert.Equal(t, []string{
|
||||
"1.2.3.4:5555",
|
||||
"4.3.2.1:9999",
|
||||
}, g.listGateways(true))
|
||||
logger := testutil.Logger(t)
|
||||
tsd := &testServerDelegate{State: state, isLeader: isLeader}
|
||||
if !isLeader {
|
||||
tsd.lastContact = now
|
||||
}
|
||||
g := NewGatewayLocator(
|
||||
logger,
|
||||
tsd,
|
||||
"dc2",
|
||||
"dc1",
|
||||
)
|
||||
g.SetUseReplicationSignal(isLeader)
|
||||
|
||||
idx, err := g.runOnce(0)
|
||||
require.NoError(t, err)
|
||||
if isLeader {
|
||||
assert.False(t, g.DialPrimaryThroughLocalGateway())
|
||||
} else {
|
||||
assert.True(t, g.DialPrimaryThroughLocalGateway())
|
||||
}
|
||||
assert.Equal(t, uint64(2), idx)
|
||||
assert.Len(t, tsd.Calls, 1)
|
||||
assert.Equal(t, []string{
|
||||
"5.6.7.8:5555",
|
||||
"8.7.6.5:9999",
|
||||
}, g.listGateways(false))
|
||||
if isLeader {
|
||||
assert.Equal(t, []string{
|
||||
"1.2.3.4:5555",
|
||||
"4.3.2.1:9999",
|
||||
}, g.listGateways(true))
|
||||
} else {
|
||||
assert.Equal(t, []string{
|
||||
"5.6.7.8:5555",
|
||||
"8.7.6.5:9999",
|
||||
}, g.listGateways(true))
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("secondary - with data and fallback - no repl", func(t *testing.T) {
|
||||
// Only run for the leader.
|
||||
logger := testutil.Logger(t)
|
||||
tsd := &testServerDelegate{State: state, isLeader: true}
|
||||
g := NewGatewayLocator(
|
||||
|
@ -170,6 +243,7 @@ func TestGatewayLocator(t *testing.T) {
|
|||
"dc2",
|
||||
"dc1",
|
||||
)
|
||||
g.SetUseReplicationSignal(true)
|
||||
|
||||
g.RefreshPrimaryGatewayFallbackAddresses([]string{
|
||||
"7.7.7.7:7777",
|
||||
|
@ -194,6 +268,7 @@ func TestGatewayLocator(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("secondary - with data and fallback - repl ok", func(t *testing.T) {
|
||||
// Only run for the leader.
|
||||
logger := testutil.Logger(t)
|
||||
tsd := &testServerDelegate{State: state, isLeader: true}
|
||||
g := NewGatewayLocator(
|
||||
|
@ -202,13 +277,14 @@ func TestGatewayLocator(t *testing.T) {
|
|||
"dc2",
|
||||
"dc1",
|
||||
)
|
||||
g.SetUseReplicationSignal(true)
|
||||
|
||||
g.RefreshPrimaryGatewayFallbackAddresses([]string{
|
||||
"7.7.7.7:7777",
|
||||
"8.8.8.8:8888",
|
||||
})
|
||||
|
||||
g.SetLastFederationStateReplicationError(nil)
|
||||
g.SetLastFederationStateReplicationError(nil, true)
|
||||
|
||||
idx, err := g.runOnce(0)
|
||||
require.NoError(t, err)
|
||||
|
@ -226,6 +302,7 @@ func TestGatewayLocator(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("secondary - with data and fallback - repl ok then failed 2 times", func(t *testing.T) {
|
||||
// Only run for the leader.
|
||||
logger := testutil.Logger(t)
|
||||
tsd := &testServerDelegate{State: state, isLeader: true}
|
||||
g := NewGatewayLocator(
|
||||
|
@ -234,15 +311,16 @@ func TestGatewayLocator(t *testing.T) {
|
|||
"dc2",
|
||||
"dc1",
|
||||
)
|
||||
g.SetUseReplicationSignal(true)
|
||||
|
||||
g.RefreshPrimaryGatewayFallbackAddresses([]string{
|
||||
"7.7.7.7:7777",
|
||||
"8.8.8.8:8888",
|
||||
})
|
||||
|
||||
g.SetLastFederationStateReplicationError(nil)
|
||||
g.SetLastFederationStateReplicationError(errors.New("fake"))
|
||||
g.SetLastFederationStateReplicationError(errors.New("fake"))
|
||||
g.SetLastFederationStateReplicationError(nil, true)
|
||||
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
||||
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
||||
|
||||
idx, err := g.runOnce(0)
|
||||
require.NoError(t, err)
|
||||
|
@ -260,6 +338,7 @@ func TestGatewayLocator(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("secondary - with data and fallback - repl ok then failed 3 times", func(t *testing.T) {
|
||||
// Only run for the leader.
|
||||
logger := testutil.Logger(t)
|
||||
tsd := &testServerDelegate{State: state, isLeader: true}
|
||||
g := NewGatewayLocator(
|
||||
|
@ -268,16 +347,17 @@ func TestGatewayLocator(t *testing.T) {
|
|||
"dc2",
|
||||
"dc1",
|
||||
)
|
||||
g.SetUseReplicationSignal(true)
|
||||
|
||||
g.RefreshPrimaryGatewayFallbackAddresses([]string{
|
||||
"7.7.7.7:7777",
|
||||
"8.8.8.8:8888",
|
||||
})
|
||||
|
||||
g.SetLastFederationStateReplicationError(nil)
|
||||
g.SetLastFederationStateReplicationError(errors.New("fake"))
|
||||
g.SetLastFederationStateReplicationError(errors.New("fake"))
|
||||
g.SetLastFederationStateReplicationError(errors.New("fake"))
|
||||
g.SetLastFederationStateReplicationError(nil, true)
|
||||
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
||||
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
||||
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
||||
|
||||
idx, err := g.runOnce(0)
|
||||
require.NoError(t, err)
|
||||
|
@ -297,6 +377,7 @@ func TestGatewayLocator(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("secondary - with data and fallback - repl ok then failed 3 times then ok again", func(t *testing.T) {
|
||||
// Only run for the leader.
|
||||
logger := testutil.Logger(t)
|
||||
tsd := &testServerDelegate{State: state, isLeader: true}
|
||||
g := NewGatewayLocator(
|
||||
|
@ -305,17 +386,18 @@ func TestGatewayLocator(t *testing.T) {
|
|||
"dc2",
|
||||
"dc1",
|
||||
)
|
||||
g.SetUseReplicationSignal(true)
|
||||
|
||||
g.RefreshPrimaryGatewayFallbackAddresses([]string{
|
||||
"7.7.7.7:7777",
|
||||
"8.8.8.8:8888",
|
||||
})
|
||||
|
||||
g.SetLastFederationStateReplicationError(nil)
|
||||
g.SetLastFederationStateReplicationError(errors.New("fake"))
|
||||
g.SetLastFederationStateReplicationError(errors.New("fake"))
|
||||
g.SetLastFederationStateReplicationError(errors.New("fake"))
|
||||
g.SetLastFederationStateReplicationError(nil)
|
||||
g.SetLastFederationStateReplicationError(nil, true)
|
||||
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
||||
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
||||
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
||||
g.SetLastFederationStateReplicationError(nil, true)
|
||||
|
||||
idx, err := g.runOnce(0)
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -958,12 +958,22 @@ func (s *Server) startFederationStateReplication() {
|
|||
return
|
||||
}
|
||||
|
||||
if s.gatewayLocator != nil {
|
||||
s.gatewayLocator.SetUseReplicationSignal(true)
|
||||
s.gatewayLocator.SetLastFederationStateReplicationError(nil, false)
|
||||
}
|
||||
|
||||
s.leaderRoutineManager.Start(federationStateReplicationRoutineName, s.federationStateReplicator.Run)
|
||||
}
|
||||
|
||||
func (s *Server) stopFederationStateReplication() {
|
||||
// will be a no-op when not started
|
||||
s.leaderRoutineManager.Stop(federationStateReplicationRoutineName)
|
||||
|
||||
if s.gatewayLocator != nil {
|
||||
s.gatewayLocator.SetUseReplicationSignal(false)
|
||||
s.gatewayLocator.SetLastFederationStateReplicationError(nil, false)
|
||||
}
|
||||
}
|
||||
|
||||
// getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary
|
||||
|
|
Loading…
Reference in New Issue