mirror of https://github.com/status-im/consul.git
server: initialize mgw-wanfed to use local gateways more on startup (#9528)
Fixes #9342
This commit is contained in:
parent
17e16f708f
commit
685c38a1b1
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
server: When wan federating via mesh gateways after initial federation default to using the local mesh gateways unless the heuristic indicates a bypass is required.
|
||||||
|
```
|
|
@ -4449,6 +4449,7 @@ func TestAgent_JoinWAN_viaMeshGateway(t *testing.T) {
|
||||||
}
|
}
|
||||||
# wanfed
|
# wanfed
|
||||||
primary_gateways = ["` + gwAddr + `"]
|
primary_gateways = ["` + gwAddr + `"]
|
||||||
|
retry_interval_wan = "250ms"
|
||||||
connect {
|
connect {
|
||||||
enabled = true
|
enabled = true
|
||||||
enable_mesh_gateway_wan_federation = true
|
enable_mesh_gateway_wan_federation = true
|
||||||
|
@ -4474,6 +4475,7 @@ func TestAgent_JoinWAN_viaMeshGateway(t *testing.T) {
|
||||||
}
|
}
|
||||||
# wanfed
|
# wanfed
|
||||||
primary_gateways = ["` + gwAddr + `"]
|
primary_gateways = ["` + gwAddr + `"]
|
||||||
|
retry_interval_wan = "250ms"
|
||||||
connect {
|
connect {
|
||||||
enabled = true
|
enabled = true
|
||||||
enable_mesh_gateway_wan_federation = true
|
enable_mesh_gateway_wan_federation = true
|
||||||
|
|
|
@ -46,6 +46,7 @@ type GatewayLocator struct {
|
||||||
gatewaysLock sync.Mutex
|
gatewaysLock sync.Mutex
|
||||||
primaryGateways []string // WAN addrs
|
primaryGateways []string // WAN addrs
|
||||||
localGateways []string // LAN addrs
|
localGateways []string // LAN addrs
|
||||||
|
populatedGateways bool
|
||||||
|
|
||||||
// primaryMeshGatewayDiscoveredAddresses is the current fallback addresses
|
// primaryMeshGatewayDiscoveredAddresses is the current fallback addresses
|
||||||
// for the mesh gateways in the primary datacenter.
|
// for the mesh gateways in the primary datacenter.
|
||||||
|
@ -205,6 +206,10 @@ func (g *GatewayLocator) listGateways(primary bool) []string {
|
||||||
g.gatewaysLock.Lock()
|
g.gatewaysLock.Lock()
|
||||||
defer g.gatewaysLock.Unlock()
|
defer g.gatewaysLock.Unlock()
|
||||||
|
|
||||||
|
if !g.populatedGateways {
|
||||||
|
return nil // don't even do anything yet
|
||||||
|
}
|
||||||
|
|
||||||
var addrs []string
|
var addrs []string
|
||||||
if primary {
|
if primary {
|
||||||
if g.datacenter == g.primaryDatacenter {
|
if g.datacenter == g.primaryDatacenter {
|
||||||
|
@ -267,6 +272,7 @@ type serverDelegate interface {
|
||||||
blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error
|
blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error
|
||||||
IsLeader() bool
|
IsLeader() bool
|
||||||
LeaderLastContact() time.Time
|
LeaderLastContact() time.Time
|
||||||
|
setDatacenterSupportsFederationStates()
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGatewayLocator(
|
func NewGatewayLocator(
|
||||||
|
@ -283,6 +289,8 @@ func NewGatewayLocator(
|
||||||
primaryGatewaysReadyCh: make(chan struct{}),
|
primaryGatewaysReadyCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
g.logPrimaryDialingMessage(g.DialPrimaryThroughLocalGateway())
|
g.logPrimaryDialingMessage(g.DialPrimaryThroughLocalGateway())
|
||||||
|
// initialize
|
||||||
|
g.SetLastFederationStateReplicationError(nil, false)
|
||||||
return g
|
return g
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -292,7 +300,10 @@ func (g *GatewayLocator) Run(ctx context.Context) {
|
||||||
var lastFetchIndex uint64
|
var lastFetchIndex uint64
|
||||||
retryLoopBackoff(ctx, func() error {
|
retryLoopBackoff(ctx, func() error {
|
||||||
idx, err := g.runOnce(lastFetchIndex)
|
idx, err := g.runOnce(lastFetchIndex)
|
||||||
if err != nil {
|
if errors.Is(err, errGatewayLocalStateNotInitialized) {
|
||||||
|
// don't do exponential backoff for something that's not broken
|
||||||
|
return nil
|
||||||
|
} else if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -300,9 +311,7 @@ func (g *GatewayLocator) Run(ctx context.Context) {
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}, func(err error) {
|
}, func(err error) {
|
||||||
if !errors.Is(err, errGatewayLocalStateNotInitialized) {
|
|
||||||
g.logger.Error("error tracking primary and local mesh gateways", "error", err)
|
g.logger.Error("error tracking primary and local mesh gateways", "error", err)
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -367,6 +376,10 @@ func (g *GatewayLocator) checkLocalStateIsReady() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GatewayLocator) updateFromState(results []*structs.FederationState) {
|
func (g *GatewayLocator) updateFromState(results []*structs.FederationState) {
|
||||||
|
if len(results) > 0 {
|
||||||
|
g.srv.setDatacenterSupportsFederationStates()
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
local structs.CheckServiceNodes
|
local structs.CheckServiceNodes
|
||||||
primary structs.CheckServiceNodes
|
primary structs.CheckServiceNodes
|
||||||
|
@ -388,6 +401,8 @@ func (g *GatewayLocator) updateFromState(results []*structs.FederationState) {
|
||||||
g.gatewaysLock.Lock()
|
g.gatewaysLock.Lock()
|
||||||
defer g.gatewaysLock.Unlock()
|
defer g.gatewaysLock.Unlock()
|
||||||
|
|
||||||
|
g.populatedGateways = true
|
||||||
|
|
||||||
changed := false
|
changed := false
|
||||||
primaryReady := false
|
primaryReady := false
|
||||||
if !stringslice.Equal(g.primaryGateways, primaryAddrs) {
|
if !stringslice.Equal(g.primaryGateways, primaryAddrs) {
|
||||||
|
|
|
@ -2,6 +2,7 @@ package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -64,13 +65,25 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
)
|
)
|
||||||
g.SetUseReplicationSignal(isLeader)
|
g.SetUseReplicationSignal(isLeader)
|
||||||
|
|
||||||
|
t.Run("before first run", func(t *testing.T) {
|
||||||
|
assert.False(t, g.DialPrimaryThroughLocalGateway()) // not important
|
||||||
|
assert.Len(t, tsd.Calls, 0)
|
||||||
|
assert.Equal(t, []string(nil), g.listGateways(false))
|
||||||
|
assert.Equal(t, []string(nil), g.listGateways(true))
|
||||||
|
assert.False(t, tsd.datacenterSupportsFederationStates())
|
||||||
|
})
|
||||||
|
|
||||||
idx, err := g.runOnce(0)
|
idx, err := g.runOnce(0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.False(t, g.DialPrimaryThroughLocalGateway())
|
|
||||||
assert.Equal(t, uint64(1), idx)
|
assert.Equal(t, uint64(1), idx)
|
||||||
|
|
||||||
|
t.Run("after first run", func(t *testing.T) {
|
||||||
|
assert.False(t, g.DialPrimaryThroughLocalGateway()) // not important
|
||||||
assert.Len(t, tsd.Calls, 1)
|
assert.Len(t, tsd.Calls, 1)
|
||||||
assert.Equal(t, []string(nil), g.listGateways(false))
|
assert.Equal(t, []string(nil), g.listGateways(false))
|
||||||
assert.Equal(t, []string(nil), g.listGateways(true))
|
assert.Equal(t, []string(nil), g.listGateways(true))
|
||||||
|
assert.False(t, tsd.datacenterSupportsFederationStates()) // no results, so we don't flip the bit yet
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -93,17 +106,25 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
)
|
)
|
||||||
g.SetUseReplicationSignal(isLeader)
|
g.SetUseReplicationSignal(isLeader)
|
||||||
|
|
||||||
|
t.Run("before first run", func(t *testing.T) {
|
||||||
|
assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure!
|
||||||
|
assert.Len(t, tsd.Calls, 0)
|
||||||
|
assert.Equal(t, []string(nil), g.listGateways(false))
|
||||||
|
assert.Equal(t, []string(nil), g.listGateways(true))
|
||||||
|
assert.False(t, tsd.datacenterSupportsFederationStates())
|
||||||
|
})
|
||||||
|
|
||||||
idx, err := g.runOnce(0)
|
idx, err := g.runOnce(0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
if isLeader {
|
|
||||||
assert.False(t, g.DialPrimaryThroughLocalGateway())
|
|
||||||
} else {
|
|
||||||
assert.True(t, g.DialPrimaryThroughLocalGateway())
|
|
||||||
}
|
|
||||||
assert.Equal(t, uint64(1), idx)
|
assert.Equal(t, uint64(1), idx)
|
||||||
|
|
||||||
|
t.Run("after first run", func(t *testing.T) {
|
||||||
|
assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure!
|
||||||
assert.Len(t, tsd.Calls, 1)
|
assert.Len(t, tsd.Calls, 1)
|
||||||
assert.Equal(t, []string(nil), g.listGateways(false))
|
assert.Equal(t, []string(nil), g.listGateways(false))
|
||||||
assert.Equal(t, []string(nil), g.listGateways(true))
|
assert.Equal(t, []string(nil), g.listGateways(true))
|
||||||
|
assert.False(t, tsd.datacenterSupportsFederationStates()) // no results, so we don't flip the bit yet
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -130,20 +151,28 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
"8.8.8.8:8888",
|
"8.8.8.8:8888",
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("before first run", func(t *testing.T) {
|
||||||
|
assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure!
|
||||||
|
assert.Len(t, tsd.Calls, 0)
|
||||||
|
assert.Equal(t, []string(nil), g.listGateways(false))
|
||||||
|
assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize
|
||||||
|
assert.False(t, tsd.datacenterSupportsFederationStates())
|
||||||
|
})
|
||||||
|
|
||||||
idx, err := g.runOnce(0)
|
idx, err := g.runOnce(0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
if isLeader {
|
|
||||||
assert.False(t, g.DialPrimaryThroughLocalGateway())
|
|
||||||
} else {
|
|
||||||
assert.True(t, g.DialPrimaryThroughLocalGateway())
|
|
||||||
}
|
|
||||||
assert.Equal(t, uint64(1), idx)
|
assert.Equal(t, uint64(1), idx)
|
||||||
|
|
||||||
|
t.Run("after first run", func(t *testing.T) {
|
||||||
|
assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure!
|
||||||
assert.Len(t, tsd.Calls, 1)
|
assert.Len(t, tsd.Calls, 1)
|
||||||
assert.Equal(t, []string(nil), g.listGateways(false))
|
assert.Equal(t, []string(nil), g.listGateways(false))
|
||||||
assert.Equal(t, []string{
|
assert.Equal(t, []string{
|
||||||
"7.7.7.7:7777",
|
"7.7.7.7:7777",
|
||||||
"8.8.8.8:8888",
|
"8.8.8.8:8888",
|
||||||
}, g.listGateways(true))
|
}, g.listGateways(true))
|
||||||
|
assert.False(t, tsd.datacenterSupportsFederationStates()) // no results, so we don't flip the bit yet
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -170,10 +199,20 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
)
|
)
|
||||||
g.SetUseReplicationSignal(isLeader)
|
g.SetUseReplicationSignal(isLeader)
|
||||||
|
|
||||||
|
t.Run("before first run", func(t *testing.T) {
|
||||||
|
assert.False(t, g.DialPrimaryThroughLocalGateway()) // not important
|
||||||
|
assert.Len(t, tsd.Calls, 0)
|
||||||
|
assert.Equal(t, []string(nil), g.listGateways(false))
|
||||||
|
assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize
|
||||||
|
assert.False(t, tsd.datacenterSupportsFederationStates())
|
||||||
|
})
|
||||||
|
|
||||||
idx, err := g.runOnce(0)
|
idx, err := g.runOnce(0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.False(t, g.DialPrimaryThroughLocalGateway())
|
|
||||||
assert.Equal(t, uint64(2), idx)
|
assert.Equal(t, uint64(2), idx)
|
||||||
|
|
||||||
|
t.Run("after first run", func(t *testing.T) {
|
||||||
|
assert.False(t, g.DialPrimaryThroughLocalGateway()) // not important
|
||||||
assert.Len(t, tsd.Calls, 1)
|
assert.Len(t, tsd.Calls, 1)
|
||||||
assert.Equal(t, []string{
|
assert.Equal(t, []string{
|
||||||
"1.2.3.4:5555",
|
"1.2.3.4:5555",
|
||||||
|
@ -183,6 +222,8 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
"1.2.3.4:5555",
|
"1.2.3.4:5555",
|
||||||
"4.3.2.1:9999",
|
"4.3.2.1:9999",
|
||||||
}, g.listGateways(true))
|
}, g.listGateways(true))
|
||||||
|
assert.True(t, tsd.datacenterSupportsFederationStates()) // have results, so we flip the bit
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -205,66 +246,34 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
)
|
)
|
||||||
g.SetUseReplicationSignal(isLeader)
|
g.SetUseReplicationSignal(isLeader)
|
||||||
|
|
||||||
idx, err := g.runOnce(0)
|
t.Run("before first run", func(t *testing.T) {
|
||||||
require.NoError(t, err)
|
assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure!
|
||||||
if isLeader {
|
assert.Len(t, tsd.Calls, 0)
|
||||||
assert.False(t, g.DialPrimaryThroughLocalGateway())
|
assert.Equal(t, []string(nil), g.listGateways(false))
|
||||||
} else {
|
assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize
|
||||||
assert.True(t, g.DialPrimaryThroughLocalGateway())
|
assert.False(t, tsd.datacenterSupportsFederationStates())
|
||||||
}
|
|
||||||
assert.Equal(t, uint64(2), idx)
|
|
||||||
assert.Len(t, tsd.Calls, 1)
|
|
||||||
assert.Equal(t, []string{
|
|
||||||
"5.6.7.8:5555",
|
|
||||||
"8.7.6.5:9999",
|
|
||||||
}, g.listGateways(false))
|
|
||||||
if isLeader {
|
|
||||||
assert.Equal(t, []string{
|
|
||||||
"1.2.3.4:5555",
|
|
||||||
"4.3.2.1:9999",
|
|
||||||
}, g.listGateways(true))
|
|
||||||
} else {
|
|
||||||
assert.Equal(t, []string{
|
|
||||||
"5.6.7.8:5555",
|
|
||||||
"8.7.6.5:9999",
|
|
||||||
}, g.listGateways(true))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("secondary - with data and fallback - no repl", func(t *testing.T) {
|
|
||||||
// Only run for the leader.
|
|
||||||
logger := testutil.Logger(t)
|
|
||||||
tsd := &testServerDelegate{State: state, isLeader: true}
|
|
||||||
g := NewGatewayLocator(
|
|
||||||
logger,
|
|
||||||
tsd,
|
|
||||||
"dc2",
|
|
||||||
"dc1",
|
|
||||||
)
|
|
||||||
g.SetUseReplicationSignal(true)
|
|
||||||
|
|
||||||
g.RefreshPrimaryGatewayFallbackAddresses([]string{
|
|
||||||
"7.7.7.7:7777",
|
|
||||||
"8.8.8.8:8888",
|
|
||||||
})
|
})
|
||||||
|
|
||||||
idx, err := g.runOnce(0)
|
idx, err := g.runOnce(0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.False(t, g.DialPrimaryThroughLocalGateway())
|
|
||||||
assert.Equal(t, uint64(2), idx)
|
assert.Equal(t, uint64(2), idx)
|
||||||
|
|
||||||
|
t.Run("after first run", func(t *testing.T) {
|
||||||
|
assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure!
|
||||||
assert.Len(t, tsd.Calls, 1)
|
assert.Len(t, tsd.Calls, 1)
|
||||||
assert.Equal(t, []string{
|
assert.Equal(t, []string{
|
||||||
"5.6.7.8:5555",
|
"5.6.7.8:5555",
|
||||||
"8.7.6.5:9999",
|
"8.7.6.5:9999",
|
||||||
}, g.listGateways(false))
|
}, g.listGateways(false))
|
||||||
assert.Equal(t, []string{
|
assert.Equal(t, []string{
|
||||||
"1.2.3.4:5555",
|
"5.6.7.8:5555",
|
||||||
"4.3.2.1:9999",
|
"8.7.6.5:9999",
|
||||||
"7.7.7.7:7777",
|
|
||||||
"8.8.8.8:8888",
|
|
||||||
}, g.listGateways(true))
|
}, g.listGateways(true))
|
||||||
|
assert.True(t, tsd.datacenterSupportsFederationStates()) // have results, so we flip the bit
|
||||||
|
})
|
||||||
|
|
||||||
|
})
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("secondary - with data and fallback - repl ok", func(t *testing.T) {
|
t.Run("secondary - with data and fallback - repl ok", func(t *testing.T) {
|
||||||
|
@ -286,10 +295,20 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
|
|
||||||
g.SetLastFederationStateReplicationError(nil, true)
|
g.SetLastFederationStateReplicationError(nil, true)
|
||||||
|
|
||||||
|
t.Run("before first run", func(t *testing.T) {
|
||||||
|
assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure!
|
||||||
|
assert.Len(t, tsd.Calls, 0)
|
||||||
|
assert.Equal(t, []string(nil), g.listGateways(false))
|
||||||
|
assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize
|
||||||
|
assert.False(t, tsd.datacenterSupportsFederationStates())
|
||||||
|
})
|
||||||
|
|
||||||
idx, err := g.runOnce(0)
|
idx, err := g.runOnce(0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.True(t, g.DialPrimaryThroughLocalGateway())
|
|
||||||
assert.Equal(t, uint64(2), idx)
|
assert.Equal(t, uint64(2), idx)
|
||||||
|
|
||||||
|
t.Run("after first run", func(t *testing.T) {
|
||||||
|
assert.True(t, g.DialPrimaryThroughLocalGateway())
|
||||||
assert.Len(t, tsd.Calls, 1)
|
assert.Len(t, tsd.Calls, 1)
|
||||||
assert.Equal(t, []string{
|
assert.Equal(t, []string{
|
||||||
"5.6.7.8:5555",
|
"5.6.7.8:5555",
|
||||||
|
@ -299,6 +318,8 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
"5.6.7.8:5555",
|
"5.6.7.8:5555",
|
||||||
"8.7.6.5:9999",
|
"8.7.6.5:9999",
|
||||||
}, g.listGateways(true))
|
}, g.listGateways(true))
|
||||||
|
assert.True(t, tsd.datacenterSupportsFederationStates()) // have results, so we flip the bit
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("secondary - with data and fallback - repl ok then failed 2 times", func(t *testing.T) {
|
t.Run("secondary - with data and fallback - repl ok then failed 2 times", func(t *testing.T) {
|
||||||
|
@ -322,10 +343,20 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
||||||
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
||||||
|
|
||||||
|
t.Run("before first run", func(t *testing.T) {
|
||||||
|
assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure!
|
||||||
|
assert.Len(t, tsd.Calls, 0)
|
||||||
|
assert.Equal(t, []string(nil), g.listGateways(false))
|
||||||
|
assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize
|
||||||
|
assert.False(t, tsd.datacenterSupportsFederationStates())
|
||||||
|
})
|
||||||
|
|
||||||
idx, err := g.runOnce(0)
|
idx, err := g.runOnce(0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.True(t, g.DialPrimaryThroughLocalGateway())
|
|
||||||
assert.Equal(t, uint64(2), idx)
|
assert.Equal(t, uint64(2), idx)
|
||||||
|
|
||||||
|
t.Run("after first run", func(t *testing.T) {
|
||||||
|
assert.True(t, g.DialPrimaryThroughLocalGateway())
|
||||||
assert.Len(t, tsd.Calls, 1)
|
assert.Len(t, tsd.Calls, 1)
|
||||||
assert.Equal(t, []string{
|
assert.Equal(t, []string{
|
||||||
"5.6.7.8:5555",
|
"5.6.7.8:5555",
|
||||||
|
@ -335,6 +366,8 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
"5.6.7.8:5555",
|
"5.6.7.8:5555",
|
||||||
"8.7.6.5:9999",
|
"8.7.6.5:9999",
|
||||||
}, g.listGateways(true))
|
}, g.listGateways(true))
|
||||||
|
assert.True(t, tsd.datacenterSupportsFederationStates()) // have results, so we flip the bit
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("secondary - with data and fallback - repl ok then failed 3 times", func(t *testing.T) {
|
t.Run("secondary - with data and fallback - repl ok then failed 3 times", func(t *testing.T) {
|
||||||
|
@ -359,10 +392,20 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
||||||
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
||||||
|
|
||||||
|
t.Run("before first run", func(t *testing.T) {
|
||||||
|
assert.False(t, g.DialPrimaryThroughLocalGateway()) // too many errors
|
||||||
|
assert.Len(t, tsd.Calls, 0)
|
||||||
|
assert.Equal(t, []string(nil), g.listGateways(false))
|
||||||
|
assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize
|
||||||
|
assert.False(t, tsd.datacenterSupportsFederationStates())
|
||||||
|
})
|
||||||
|
|
||||||
idx, err := g.runOnce(0)
|
idx, err := g.runOnce(0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.False(t, g.DialPrimaryThroughLocalGateway())
|
|
||||||
assert.Equal(t, uint64(2), idx)
|
assert.Equal(t, uint64(2), idx)
|
||||||
|
|
||||||
|
t.Run("after first run", func(t *testing.T) {
|
||||||
|
assert.False(t, g.DialPrimaryThroughLocalGateway())
|
||||||
assert.Len(t, tsd.Calls, 1)
|
assert.Len(t, tsd.Calls, 1)
|
||||||
assert.Equal(t, []string{
|
assert.Equal(t, []string{
|
||||||
"5.6.7.8:5555",
|
"5.6.7.8:5555",
|
||||||
|
@ -374,6 +417,8 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
"7.7.7.7:7777",
|
"7.7.7.7:7777",
|
||||||
"8.8.8.8:8888",
|
"8.8.8.8:8888",
|
||||||
}, g.listGateways(true))
|
}, g.listGateways(true))
|
||||||
|
assert.True(t, tsd.datacenterSupportsFederationStates()) // have results, so we flip the bit
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("secondary - with data and fallback - repl ok then failed 3 times then ok again", func(t *testing.T) {
|
t.Run("secondary - with data and fallback - repl ok then failed 3 times then ok again", func(t *testing.T) {
|
||||||
|
@ -399,10 +444,20 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
|
||||||
g.SetLastFederationStateReplicationError(nil, true)
|
g.SetLastFederationStateReplicationError(nil, true)
|
||||||
|
|
||||||
|
t.Run("before first run", func(t *testing.T) {
|
||||||
|
assert.True(t, g.DialPrimaryThroughLocalGateway()) // all better again
|
||||||
|
assert.Len(t, tsd.Calls, 0)
|
||||||
|
assert.Equal(t, []string(nil), g.listGateways(false))
|
||||||
|
assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize
|
||||||
|
assert.False(t, tsd.datacenterSupportsFederationStates())
|
||||||
|
})
|
||||||
|
|
||||||
idx, err := g.runOnce(0)
|
idx, err := g.runOnce(0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.True(t, g.DialPrimaryThroughLocalGateway())
|
|
||||||
assert.Equal(t, uint64(2), idx)
|
assert.Equal(t, uint64(2), idx)
|
||||||
|
|
||||||
|
t.Run("after first run", func(t *testing.T) {
|
||||||
|
assert.True(t, g.DialPrimaryThroughLocalGateway()) // all better again
|
||||||
assert.Len(t, tsd.Calls, 1)
|
assert.Len(t, tsd.Calls, 1)
|
||||||
assert.Equal(t, []string{
|
assert.Equal(t, []string{
|
||||||
"5.6.7.8:5555",
|
"5.6.7.8:5555",
|
||||||
|
@ -412,10 +467,14 @@ func TestGatewayLocator(t *testing.T) {
|
||||||
"5.6.7.8:5555",
|
"5.6.7.8:5555",
|
||||||
"8.7.6.5:9999",
|
"8.7.6.5:9999",
|
||||||
}, g.listGateways(true))
|
}, g.listGateways(true))
|
||||||
|
assert.True(t, tsd.datacenterSupportsFederationStates()) // have results, so we flip the bit
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
type testServerDelegate struct {
|
type testServerDelegate struct {
|
||||||
|
dcSupportsFederationStates int32 // atomically accessed, at start to prevent alignment issues
|
||||||
|
|
||||||
State *state.Store
|
State *state.Store
|
||||||
|
|
||||||
Calls []uint64
|
Calls []uint64
|
||||||
|
@ -424,6 +483,14 @@ type testServerDelegate struct {
|
||||||
lastContact time.Time
|
lastContact time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *testServerDelegate) setDatacenterSupportsFederationStates() {
|
||||||
|
atomic.StoreInt32(&d.dcSupportsFederationStates, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *testServerDelegate) datacenterSupportsFederationStates() bool {
|
||||||
|
return atomic.LoadInt32(&d.dcSupportsFederationStates) != 0
|
||||||
|
}
|
||||||
|
|
||||||
// This is just enough to exercise the logic.
|
// This is just enough to exercise the logic.
|
||||||
func (d *testServerDelegate) blockingQuery(
|
func (d *testServerDelegate) blockingQuery(
|
||||||
queryOpts structs.QueryOptionsCompat,
|
queryOpts structs.QueryOptionsCompat,
|
||||||
|
|
Loading…
Reference in New Issue