mirror of https://github.com/status-im/consul.git
server: when wan federating via mesh gateways only do heuristic primary DC bypass on the leader (#9366)
Fixes #9341
This commit is contained in:
parent
a62e4496ba
commit
5fe99cc2bd
|
@ -0,0 +1,4 @@
|
||||||
|
```release-note:bug
|
||||||
|
server: When wan federating via mesh gateways only do heuristic primary DC bypass on the leader.
|
||||||
|
```
|
||||||
|
|
|
@ -39,7 +39,7 @@ func (r *FederationStateReplicator) FetchRemote(lastRemoteIndex uint64) (int, in
|
||||||
}
|
}
|
||||||
lenRemote, remote, remoteIndex, err := r.fetchRemote(lastRemoteIndex)
|
lenRemote, remote, remoteIndex, err := r.fetchRemote(lastRemoteIndex)
|
||||||
if r.gatewayLocator != nil {
|
if r.gatewayLocator != nil {
|
||||||
r.gatewayLocator.SetLastFederationStateReplicationError(err)
|
r.gatewayLocator.SetLastFederationStateReplicationError(err, true)
|
||||||
}
|
}
|
||||||
return lenRemote, remote, remoteIndex, err
|
return lenRemote, remote, remoteIndex, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,6 +64,7 @@ type GatewayLocator struct {
|
||||||
lastReplFailure time.Time
|
lastReplFailure time.Time
|
||||||
lastReplSuccesses uint64
|
lastReplSuccesses uint64
|
||||||
lastReplFailures uint64
|
lastReplFailures uint64
|
||||||
|
useReplicationSignal bool // this should be set to true on the leader
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetLastFederationStateReplicationError is used to indicate if the federation
|
// SetLastFederationStateReplicationError is used to indicate if the federation
|
||||||
|
@ -74,14 +75,23 @@ type GatewayLocator struct {
|
||||||
// our chosen mesh-gateway configuration can reach the primary's servers (like
|
// our chosen mesh-gateway configuration can reach the primary's servers (like
|
||||||
// a ping or status RPC) we cheat and use the federation state replicator
|
// a ping or status RPC) we cheat and use the federation state replicator
|
||||||
// goroutine's success or failure as a proxy.
|
// goroutine's success or failure as a proxy.
|
||||||
func (g *GatewayLocator) SetLastFederationStateReplicationError(err error) {
|
func (g *GatewayLocator) SetLastFederationStateReplicationError(err error, fromReplication bool) {
|
||||||
|
if g == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
g.lastReplLock.Lock()
|
g.lastReplLock.Lock()
|
||||||
defer g.lastReplLock.Unlock()
|
defer g.lastReplLock.Unlock()
|
||||||
|
|
||||||
oldChoice := g.dialPrimaryThroughLocalGateway()
|
oldChoice := g.dialPrimaryThroughLocalGateway()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
g.lastReplSuccess = time.Now().UTC()
|
g.lastReplSuccess = time.Now().UTC()
|
||||||
g.lastReplSuccesses++
|
g.lastReplSuccesses++
|
||||||
g.lastReplFailures = 0
|
g.lastReplFailures = 0
|
||||||
|
if fromReplication {
|
||||||
|
// If we get info from replication, assume replication is operating.
|
||||||
|
g.useReplicationSignal = true
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
g.lastReplFailure = time.Now().UTC()
|
g.lastReplFailure = time.Now().UTC()
|
||||||
g.lastReplFailures++
|
g.lastReplFailures++
|
||||||
|
@ -93,6 +103,16 @@ func (g *GatewayLocator) SetLastFederationStateReplicationError(err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *GatewayLocator) SetUseReplicationSignal(newValue bool) {
|
||||||
|
if g == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
g.lastReplLock.Lock()
|
||||||
|
g.useReplicationSignal = newValue
|
||||||
|
g.lastReplLock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
func (g *GatewayLocator) logPrimaryDialingMessage(useLocal bool) {
|
func (g *GatewayLocator) logPrimaryDialingMessage(useLocal bool) {
|
||||||
if g.datacenter == g.primaryDatacenter {
|
if g.datacenter == g.primaryDatacenter {
|
||||||
// These messages are useless when the server is in the primary
|
// These messages are useless when the server is in the primary
|
||||||
|
@ -140,6 +160,12 @@ func (g *GatewayLocator) DialPrimaryThroughLocalGateway() bool {
|
||||||
const localFederationStateReplicatorFailuresBeforeDialingDirectly = 3
|
const localFederationStateReplicatorFailuresBeforeDialingDirectly = 3
|
||||||
|
|
||||||
func (g *GatewayLocator) dialPrimaryThroughLocalGateway() bool {
|
func (g *GatewayLocator) dialPrimaryThroughLocalGateway() bool {
|
||||||
|
if !g.useReplicationSignal {
|
||||||
|
// Followers should blindly assume these gateways work. The leader will
|
||||||
|
// try to bypass them and correct the replicated federation state info
|
||||||
|
// that the followers will eventually pick up on.
|
||||||
|
return true
|
||||||
|
}
|
||||||
if g.lastReplSuccess.IsZero() && g.lastReplFailure.IsZero() {
|
if g.lastReplSuccess.IsZero() && g.lastReplFailure.IsZero() {
|
||||||
return false // no data yet
|
return false // no data yet
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,9 @@ import (
|
||||||
func TestGatewayLocator(t *testing.T) {
|
func TestGatewayLocator(t *testing.T) {
|
||||||
state := state.NewStateStore(nil)
|
state := state.NewStateStore(nil)
|
||||||
|
|
||||||
|
serverRoles := []string{"leader", "follower"}
|
||||||
|
now := time.Now().UTC()
|
||||||
|
|
||||||
dc1 := &structs.FederationState{
|
dc1 := &structs.FederationState{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
MeshGateways: []structs.CheckServiceNode{
|
MeshGateways: []structs.CheckServiceNode{
|
||||||
|
@ -44,14 +47,22 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("primary - no data", func(t *testing.T) {
|
t.Run("primary - no data", func(t *testing.T) {
|
||||||
|
for _, role := range serverRoles {
|
||||||
|
t.Run(role, func(t *testing.T) {
|
||||||
|
isLeader := role == "leader"
|
||||||
|
|
||||||
logger := testutil.Logger(t)
|
logger := testutil.Logger(t)
|
||||||
tsd := &testServerDelegate{State: state, isLeader: true}
|
tsd := &testServerDelegate{State: state, isLeader: isLeader}
|
||||||
|
if !isLeader {
|
||||||
|
tsd.lastContact = now
|
||||||
|
}
|
||||||
g := NewGatewayLocator(
|
g := NewGatewayLocator(
|
||||||
logger,
|
logger,
|
||||||
tsd,
|
tsd,
|
||||||
"dc1",
|
"dc1",
|
||||||
"dc1",
|
"dc1",
|
||||||
)
|
)
|
||||||
|
g.SetUseReplicationSignal(isLeader)
|
||||||
|
|
||||||
idx, err := g.runOnce(0)
|
idx, err := g.runOnce(0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -61,35 +72,59 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
assert.Equal(t, []string(nil), g.listGateways(false))
|
assert.Equal(t, []string(nil), g.listGateways(false))
|
||||||
assert.Equal(t, []string(nil), g.listGateways(true))
|
assert.Equal(t, []string(nil), g.listGateways(true))
|
||||||
})
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
t.Run("secondary - no data", func(t *testing.T) {
|
t.Run("secondary - no data", func(t *testing.T) {
|
||||||
|
for _, role := range serverRoles {
|
||||||
|
t.Run(role, func(t *testing.T) {
|
||||||
|
isLeader := role == "leader"
|
||||||
|
|
||||||
logger := testutil.Logger(t)
|
logger := testutil.Logger(t)
|
||||||
tsd := &testServerDelegate{State: state, isLeader: true}
|
tsd := &testServerDelegate{State: state, isLeader: isLeader}
|
||||||
|
if !isLeader {
|
||||||
|
tsd.lastContact = now
|
||||||
|
}
|
||||||
g := NewGatewayLocator(
|
g := NewGatewayLocator(
|
||||||
logger,
|
logger,
|
||||||
tsd,
|
tsd,
|
||||||
"dc2",
|
"dc2",
|
||||||
"dc1",
|
"dc1",
|
||||||
)
|
)
|
||||||
|
g.SetUseReplicationSignal(isLeader)
|
||||||
|
|
||||||
idx, err := g.runOnce(0)
|
idx, err := g.runOnce(0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
if isLeader {
|
||||||
assert.False(t, g.DialPrimaryThroughLocalGateway())
|
assert.False(t, g.DialPrimaryThroughLocalGateway())
|
||||||
|
} else {
|
||||||
|
assert.True(t, g.DialPrimaryThroughLocalGateway())
|
||||||
|
}
|
||||||
assert.Equal(t, uint64(1), idx)
|
assert.Equal(t, uint64(1), idx)
|
||||||
assert.Len(t, tsd.Calls, 1)
|
assert.Len(t, tsd.Calls, 1)
|
||||||
assert.Equal(t, []string(nil), g.listGateways(false))
|
assert.Equal(t, []string(nil), g.listGateways(false))
|
||||||
assert.Equal(t, []string(nil), g.listGateways(true))
|
assert.Equal(t, []string(nil), g.listGateways(true))
|
||||||
})
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
t.Run("secondary - just fallback", func(t *testing.T) {
|
t.Run("secondary - just fallback", func(t *testing.T) {
|
||||||
|
for _, role := range serverRoles {
|
||||||
|
t.Run(role, func(t *testing.T) {
|
||||||
|
isLeader := role == "leader"
|
||||||
|
|
||||||
logger := testutil.Logger(t)
|
logger := testutil.Logger(t)
|
||||||
tsd := &testServerDelegate{State: state, isLeader: true}
|
tsd := &testServerDelegate{State: state, isLeader: isLeader}
|
||||||
|
if !isLeader {
|
||||||
|
tsd.lastContact = now
|
||||||
|
}
|
||||||
g := NewGatewayLocator(
|
g := NewGatewayLocator(
|
||||||
logger,
|
logger,
|
||||||
tsd,
|
tsd,
|
||||||
"dc2",
|
"dc2",
|
||||||
"dc1",
|
"dc1",
|
||||||
)
|
)
|
||||||
|
g.SetUseReplicationSignal(isLeader)
|
||||||
g.RefreshPrimaryGatewayFallbackAddresses([]string{
|
g.RefreshPrimaryGatewayFallbackAddresses([]string{
|
||||||
"7.7.7.7:7777",
|
"7.7.7.7:7777",
|
||||||
"8.8.8.8:8888",
|
"8.8.8.8:8888",
|
||||||
|
@ -97,7 +132,11 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
|
|
||||||
idx, err := g.runOnce(0)
|
idx, err := g.runOnce(0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
if isLeader {
|
||||||
assert.False(t, g.DialPrimaryThroughLocalGateway())
|
assert.False(t, g.DialPrimaryThroughLocalGateway())
|
||||||
|
} else {
|
||||||
|
assert.True(t, g.DialPrimaryThroughLocalGateway())
|
||||||
|
}
|
||||||
assert.Equal(t, uint64(1), idx)
|
assert.Equal(t, uint64(1), idx)
|
||||||
assert.Len(t, tsd.Calls, 1)
|
assert.Len(t, tsd.Calls, 1)
|
||||||
assert.Equal(t, []string(nil), g.listGateways(false))
|
assert.Equal(t, []string(nil), g.listGateways(false))
|
||||||
|
@ -106,20 +145,30 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
"8.8.8.8:8888",
|
"8.8.8.8:8888",
|
||||||
}, g.listGateways(true))
|
}, g.listGateways(true))
|
||||||
})
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
// Insert data for the dcs
|
// Insert data for the dcs
|
||||||
require.NoError(t, state.FederationStateSet(1, dc1))
|
require.NoError(t, state.FederationStateSet(1, dc1))
|
||||||
require.NoError(t, state.FederationStateSet(2, dc2))
|
require.NoError(t, state.FederationStateSet(2, dc2))
|
||||||
|
|
||||||
t.Run("primary - with data", func(t *testing.T) {
|
t.Run("primary - with data", func(t *testing.T) {
|
||||||
|
for _, role := range serverRoles {
|
||||||
|
t.Run(role, func(t *testing.T) {
|
||||||
|
isLeader := role == "leader"
|
||||||
|
|
||||||
logger := testutil.Logger(t)
|
logger := testutil.Logger(t)
|
||||||
tsd := &testServerDelegate{State: state, isLeader: true}
|
tsd := &testServerDelegate{State: state, isLeader: isLeader}
|
||||||
|
if !isLeader {
|
||||||
|
tsd.lastContact = now
|
||||||
|
}
|
||||||
g := NewGatewayLocator(
|
g := NewGatewayLocator(
|
||||||
logger,
|
logger,
|
||||||
tsd,
|
tsd,
|
||||||
"dc1",
|
"dc1",
|
||||||
"dc1",
|
"dc1",
|
||||||
)
|
)
|
||||||
|
g.SetUseReplicationSignal(isLeader)
|
||||||
|
|
||||||
idx, err := g.runOnce(0)
|
idx, err := g.runOnce(0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -135,33 +184,57 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
"4.3.2.1:9999",
|
"4.3.2.1:9999",
|
||||||
}, g.listGateways(true))
|
}, g.listGateways(true))
|
||||||
})
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
t.Run("secondary - with data", func(t *testing.T) {
|
t.Run("secondary - with data", func(t *testing.T) {
|
||||||
|
for _, role := range serverRoles {
|
||||||
|
t.Run(role, func(t *testing.T) {
|
||||||
|
isLeader := role == "leader"
|
||||||
|
|
||||||
logger := testutil.Logger(t)
|
logger := testutil.Logger(t)
|
||||||
tsd := &testServerDelegate{State: state, isLeader: true}
|
tsd := &testServerDelegate{State: state, isLeader: isLeader}
|
||||||
|
if !isLeader {
|
||||||
|
tsd.lastContact = now
|
||||||
|
}
|
||||||
g := NewGatewayLocator(
|
g := NewGatewayLocator(
|
||||||
logger,
|
logger,
|
||||||
tsd,
|
tsd,
|
||||||
"dc2",
|
"dc2",
|
||||||
"dc1",
|
"dc1",
|
||||||
)
|
)
|
||||||
|
g.SetUseReplicationSignal(isLeader)
|
||||||
|
|
||||||
idx, err := g.runOnce(0)
|
idx, err := g.runOnce(0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
if isLeader {
|
||||||
assert.False(t, g.DialPrimaryThroughLocalGateway())
|
assert.False(t, g.DialPrimaryThroughLocalGateway())
|
||||||
|
} else {
|
||||||
|
assert.True(t, g.DialPrimaryThroughLocalGateway())
|
||||||
|
}
|
||||||
assert.Equal(t, uint64(2), idx)
|
assert.Equal(t, uint64(2), idx)
|
||||||
assert.Len(t, tsd.Calls, 1)
|
assert.Len(t, tsd.Calls, 1)
|
||||||
assert.Equal(t, []string{
|
assert.Equal(t, []string{
|
||||||
"5.6.7.8:5555",
|
"5.6.7.8:5555",
|
||||||
"8.7.6.5:9999",
|
"8.7.6.5:9999",
|
||||||
}, g.listGateways(false))
|
}, g.listGateways(false))
|
||||||
|
if isLeader {
|
||||||
assert.Equal(t, []string{
|
assert.Equal(t, []string{
|
||||||
"1.2.3.4:5555",
|
"1.2.3.4:5555",
|
||||||
"4.3.2.1:9999",
|
"4.3.2.1:9999",
|
||||||
}, g.listGateways(true))
|
}, g.listGateways(true))
|
||||||
|
} else {
|
||||||
|
assert.Equal(t, []string{
|
||||||
|
"5.6.7.8:5555",
|
||||||
|
"8.7.6.5:9999",
|
||||||
|
}, g.listGateways(true))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("secondary - with data and fallback - no repl", func(t *testing.T) {
|
t.Run("secondary - with data and fallback - no repl", func(t *testing.T) {
|
||||||
|
// Only run for the leader.
|
||||||
logger := testutil.Logger(t)
|
logger := testutil.Logger(t)
|
||||||
tsd := &testServerDelegate{State: state, isLeader: true}
|
tsd := &testServerDelegate{State: state, isLeader: true}
|
||||||
g := NewGatewayLocator(
|
g := NewGatewayLocator(
|
||||||
|
@ -170,6 +243,7 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
"dc2",
|
"dc2",
|
||||||
"dc1",
|
"dc1",
|
||||||
)
|
)
|
||||||
|
g.SetUseReplicationSignal(true)
|
||||||
|
|
||||||
g.RefreshPrimaryGatewayFallbackAddresses([]string{
|
g.RefreshPrimaryGatewayFallbackAddresses([]string{
|
||||||
"7.7.7.7:7777",
|
"7.7.7.7:7777",
|
||||||
|
@ -194,6 +268,7 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("secondary - with data and fallback - repl ok", func(t *testing.T) {
|
t.Run("secondary - with data and fallback - repl ok", func(t *testing.T) {
|
||||||
|
// Only run for the leader.
|
||||||
logger := testutil.Logger(t)
|
logger := testutil.Logger(t)
|
||||||
tsd := &testServerDelegate{State: state, isLeader: true}
|
tsd := &testServerDelegate{State: state, isLeader: true}
|
||||||
g := NewGatewayLocator(
|
g := NewGatewayLocator(
|
||||||
|
@ -202,13 +277,14 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
"dc2",
|
"dc2",
|
||||||
"dc1",
|
"dc1",
|
||||||
)
|
)
|
||||||
|
g.SetUseReplicationSignal(true)
|
||||||
|
|
||||||
g.RefreshPrimaryGatewayFallbackAddresses([]string{
|
g.RefreshPrimaryGatewayFallbackAddresses([]string{
|
||||||
"7.7.7.7:7777",
|
"7.7.7.7:7777",
|
||||||
"8.8.8.8:8888",
|
"8.8.8.8:8888",
|
||||||
})
|
})
|
||||||
|
|
||||||
g.SetLastFederationStateReplicationError(nil)
|
g.SetLastFederationStateReplicationError(nil, true)
|
||||||
|
|
||||||
idx, err := g.runOnce(0)
|
idx, err := g.runOnce(0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -226,6 +302,7 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("secondary - with data and fallback - repl ok then failed 2 times", func(t *testing.T) {
|
t.Run("secondary - with data and fallback - repl ok then failed 2 times", func(t *testing.T) {
|
||||||
|
// Only run for the leader.
|
||||||
logger := testutil.Logger(t)
|
logger := testutil.Logger(t)
|
||||||
tsd := &testServerDelegate{State: state, isLeader: true}
|
tsd := &testServerDelegate{State: state, isLeader: true}
|
||||||
g := NewGatewayLocator(
|
g := NewGatewayLocator(
|
||||||
|
@ -234,15 +311,16 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
"dc2",
|
"dc2",
|
||||||
"dc1",
|
"dc1",
|
||||||
)
|
)
|
||||||
|
g.SetUseReplicationSignal(true)
|
||||||
|
|
||||||
g.RefreshPrimaryGatewayFallbackAddresses([]string{
|
g.RefreshPrimaryGatewayFallbackAddresses([]string{
|
||||||
"7.7.7.7:7777",
|
"7.7.7.7:7777",
|
||||||
"8.8.8.8:8888",
|
"8.8.8.8:8888",
|
||||||
})
|
})
|
||||||
|
|
||||||
g.SetLastFederationStateReplicationError(nil)
|
g.SetLastFederationStateReplicationError(nil, true)
|
||||||
g.SetLastFederationStateReplicationError(errors.New("fake"))
|
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
||||||
g.SetLastFederationStateReplicationError(errors.New("fake"))
|
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
||||||
|
|
||||||
idx, err := g.runOnce(0)
|
idx, err := g.runOnce(0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -260,6 +338,7 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("secondary - with data and fallback - repl ok then failed 3 times", func(t *testing.T) {
|
t.Run("secondary - with data and fallback - repl ok then failed 3 times", func(t *testing.T) {
|
||||||
|
// Only run for the leader.
|
||||||
logger := testutil.Logger(t)
|
logger := testutil.Logger(t)
|
||||||
tsd := &testServerDelegate{State: state, isLeader: true}
|
tsd := &testServerDelegate{State: state, isLeader: true}
|
||||||
g := NewGatewayLocator(
|
g := NewGatewayLocator(
|
||||||
|
@ -268,16 +347,17 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
"dc2",
|
"dc2",
|
||||||
"dc1",
|
"dc1",
|
||||||
)
|
)
|
||||||
|
g.SetUseReplicationSignal(true)
|
||||||
|
|
||||||
g.RefreshPrimaryGatewayFallbackAddresses([]string{
|
g.RefreshPrimaryGatewayFallbackAddresses([]string{
|
||||||
"7.7.7.7:7777",
|
"7.7.7.7:7777",
|
||||||
"8.8.8.8:8888",
|
"8.8.8.8:8888",
|
||||||
})
|
})
|
||||||
|
|
||||||
g.SetLastFederationStateReplicationError(nil)
|
g.SetLastFederationStateReplicationError(nil, true)
|
||||||
g.SetLastFederationStateReplicationError(errors.New("fake"))
|
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
||||||
g.SetLastFederationStateReplicationError(errors.New("fake"))
|
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
||||||
g.SetLastFederationStateReplicationError(errors.New("fake"))
|
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
||||||
|
|
||||||
idx, err := g.runOnce(0)
|
idx, err := g.runOnce(0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -297,6 +377,7 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("secondary - with data and fallback - repl ok then failed 3 times then ok again", func(t *testing.T) {
|
t.Run("secondary - with data and fallback - repl ok then failed 3 times then ok again", func(t *testing.T) {
|
||||||
|
// Only run for the leader.
|
||||||
logger := testutil.Logger(t)
|
logger := testutil.Logger(t)
|
||||||
tsd := &testServerDelegate{State: state, isLeader: true}
|
tsd := &testServerDelegate{State: state, isLeader: true}
|
||||||
g := NewGatewayLocator(
|
g := NewGatewayLocator(
|
||||||
|
@ -305,17 +386,18 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
"dc2",
|
"dc2",
|
||||||
"dc1",
|
"dc1",
|
||||||
)
|
)
|
||||||
|
g.SetUseReplicationSignal(true)
|
||||||
|
|
||||||
g.RefreshPrimaryGatewayFallbackAddresses([]string{
|
g.RefreshPrimaryGatewayFallbackAddresses([]string{
|
||||||
"7.7.7.7:7777",
|
"7.7.7.7:7777",
|
||||||
"8.8.8.8:8888",
|
"8.8.8.8:8888",
|
||||||
})
|
})
|
||||||
|
|
||||||
g.SetLastFederationStateReplicationError(nil)
|
g.SetLastFederationStateReplicationError(nil, true)
|
||||||
g.SetLastFederationStateReplicationError(errors.New("fake"))
|
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
||||||
g.SetLastFederationStateReplicationError(errors.New("fake"))
|
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
||||||
g.SetLastFederationStateReplicationError(errors.New("fake"))
|
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
||||||
g.SetLastFederationStateReplicationError(nil)
|
g.SetLastFederationStateReplicationError(nil, true)
|
||||||
|
|
||||||
idx, err := g.runOnce(0)
|
idx, err := g.runOnce(0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -978,12 +978,22 @@ func (s *Server) startFederationStateReplication() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if s.gatewayLocator != nil {
|
||||||
|
s.gatewayLocator.SetUseReplicationSignal(true)
|
||||||
|
s.gatewayLocator.SetLastFederationStateReplicationError(nil, false)
|
||||||
|
}
|
||||||
|
|
||||||
s.leaderRoutineManager.Start(federationStateReplicationRoutineName, s.federationStateReplicator.Run)
|
s.leaderRoutineManager.Start(federationStateReplicationRoutineName, s.federationStateReplicator.Run)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) stopFederationStateReplication() {
|
func (s *Server) stopFederationStateReplication() {
|
||||||
// will be a no-op when not started
|
// will be a no-op when not started
|
||||||
s.leaderRoutineManager.Stop(federationStateReplicationRoutineName)
|
s.leaderRoutineManager.Stop(federationStateReplicationRoutineName)
|
||||||
|
|
||||||
|
if s.gatewayLocator != nil {
|
||||||
|
s.gatewayLocator.SetUseReplicationSignal(false)
|
||||||
|
s.gatewayLocator.SetLastFederationStateReplicationError(nil, false)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary
|
// getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary
|
||||||
|
|
Loading…
Reference in New Issue