agent: handle re-bootstrapping in a secondary datacenter when WAN federation via mesh gateways is configured (#7931)

The main fix here is to always union the `primary-gateways` list with
the list of mesh gateways in the primary returned from the replicated
federation states list. This will allow any replicated (incorrect) state
to be supplemented with user-configured (correct) state in the config
file. Eventually the game of random selection whack-a-mole will pick a
winning entry and re-replicate the latest federation states from the
primary. If the user-configured state is actually the incorrect one,
then the same eventual correct selection process will work in that case,
too.

The secondary fix is actually to finish making wanfed-via-mgws actually
work as originally designed. Once a secondary datacenter has replicated
federation states for the primary AND managed to stand up its own local
mesh gateways then all of the RPCs from a secondary to the primary
SHOULD go through two sets of mesh gateways to arrive in the consul
servers in the primary (one hop for the secondary datacenter's mesh
gateway, and one hop through the primary datacenter's mesh gateway).
This was neglected in the initial implementation. While everything
works, ideally we should treat communications that go around the mesh
gateways as just provided for bootstrapping purposes.

Now we heuristically use the success/failure history of the federation
state replicator goroutine loop to determine if our current mesh gateway
route is working as intended. If it is, we try using the local gateways,
and if those don't work we fall back on trying the primary via the union
of the replicated state and the go-discover configuration flags.

This can be improved slightly in the future by possibly initializing the
gateway choice to local on startup if we already have replicated state.
This PR does not address that improvement.

Fixes #7339
This commit is contained in:
R.B. Boyer 2020-05-27 11:31:10 -05:00 committed by GitHub
parent 37cafc3f52
commit ddd0a13e27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 449 additions and 36 deletions

View File

@ -10,7 +10,8 @@ import (
)
type FederationStateReplicator struct {
srv *Server
srv *Server
gatewayLocator *GatewayLocator
}
var _ IndexReplicatorDelegate = (*FederationStateReplicator)(nil)
@ -26,6 +27,14 @@ func (r *FederationStateReplicator) MetricName() string { return "federation-sta
// FetchRemote implements IndexReplicatorDelegate.
func (r *FederationStateReplicator) FetchRemote(lastRemoteIndex uint64) (int, interface{}, uint64, error) {
lenRemote, remote, remoteIndex, err := r.fetchRemote(lastRemoteIndex)
if r.gatewayLocator != nil {
r.gatewayLocator.SetLastFederationStateReplicationError(err)
}
return lenRemote, remote, remoteIndex, err
}
func (r *FederationStateReplicator) fetchRemote(lastRemoteIndex uint64) (int, interface{}, uint64, error) {
req := structs.DCSpecificRequest{
Datacenter: r.srv.config.PrimaryDatacenter,
QueryOptions: structs.QueryOptions{

View File

@ -54,6 +54,104 @@ type GatewayLocator struct {
// This will be closed the FIRST time we get some gateways populated
primaryGatewaysReadyCh chan struct{}
primaryGatewaysReadyOnce sync.Once
// these are a collection of measurements that factor into deciding if we
// should directly dial the primary's mesh gateways or if we should try to
// route through our local gateways (if they are up).
lastReplLock sync.Mutex
lastReplSuccess time.Time
lastReplFailure time.Time
lastReplSuccesses uint64
lastReplFailures uint64
}
// SetLastFederationStateReplicationError is used to indicate if the federation
// state replication loop has succeeded (nil) or failed during the last
// execution.
//
// Rather than introduce a completely new mechanism to periodically probe that
// our chosen mesh-gateway configuration can reach the primary's servers (like
// a ping or status RPC) we cheat and use the federation state replicator
// goroutine's success or failure as a proxy.
func (g *GatewayLocator) SetLastFederationStateReplicationError(err error) {
g.lastReplLock.Lock()
defer g.lastReplLock.Unlock()
oldChoice := g.dialPrimaryThroughLocalGateway()
if err == nil {
g.lastReplSuccess = time.Now().UTC()
g.lastReplSuccesses++
g.lastReplFailures = 0
} else {
g.lastReplFailure = time.Now().UTC()
g.lastReplFailures++
g.lastReplSuccesses = 0
}
newChoice := g.dialPrimaryThroughLocalGateway()
if oldChoice != newChoice {
g.logPrimaryDialingMessage(newChoice)
}
}
func (g *GatewayLocator) logPrimaryDialingMessage(useLocal bool) {
if g.datacenter == g.primaryDatacenter {
// These messages are useless when the server is in the primary
// datacenter.
return
}
if useLocal {
g.logger.Info("will dial the primary datacenter using our local mesh gateways if possible")
} else {
g.logger.Info("will dial the primary datacenter through its mesh gateways")
}
}
// DialPrimaryThroughLocalGateway determines if we should dial the primary's
// mesh gateways directly or use our local mesh gateways (if they are up).
//
// Generally the system has three states:
//
// 1. Servers dial primary MGWs using fallback addresses from the agent config.
// 2. Servers dial primary MGWs using replicated federation state data.
// 3. Servers dial primary MGWs indirectly through local MGWs.
//
// After initial bootstrapping most communication should go through (3). If the
// local mesh gateways are not coming up for chicken/egg problems (mostly the
// kind that arise from secondary datacenter bootstrapping) then (2) is useful
// to solve the chicken/egg problem and get back to (3). In the worst case
// where we completely lost communication with the primary AND all of their old
// mesh gateway addresses are changed then we need to go all the way back to
// square one and re-bootstrap via (1).
//
// Since both (1) and (2) are meant to be temporary we simplify things and make
// the system only consider two overall configurations: (1+2, with the
// addresses being unioned) or (3).
//
// This method returns true if in state (3) and false if in state (1+2).
func (g *GatewayLocator) DialPrimaryThroughLocalGateway() bool {
if g.datacenter == g.primaryDatacenter {
return false // not important
}
g.lastReplLock.Lock()
defer g.lastReplLock.Unlock()
return g.dialPrimaryThroughLocalGateway()
}
const localFederationStateReplicatorFailuresBeforeDialingDirectly = 3
func (g *GatewayLocator) dialPrimaryThroughLocalGateway() bool {
if g.lastReplSuccess.IsZero() && g.lastReplFailure.IsZero() {
return false // no data yet
}
if g.lastReplSuccess.After(g.lastReplFailure) {
return true // we have viable data
}
if g.lastReplFailures < localFederationStateReplicatorFailuresBeforeDialingDirectly {
return true // maybe it's just a little broken
}
return false
}
// PrimaryMeshGatewayAddressesReadyCh returns a channel that will be closed
@ -82,15 +180,22 @@ func (g *GatewayLocator) listGateways(primary bool) []string {
var addrs []string
if primary {
addrs = g.primaryGateways
if g.datacenter == g.primaryDatacenter {
addrs = g.primaryGateways
} else if g.DialPrimaryThroughLocalGateway() && len(g.localGateways) > 0 {
addrs = g.localGateways
} else {
// Note calling StringSliceMergeSorted only works because both
// inputs are pre-sorted. If for some reason one of the lists has
// *duplicates* (which shouldn't happen) it's not great but it
// won't break anything other than biasing our eventual random
// choice a little bit.
addrs = lib.StringSliceMergeSorted(g.primaryGateways, g.PrimaryGatewayFallbackAddresses())
}
} else {
addrs = g.localGateways
}
if primary && len(addrs) == 0 {
addrs = g.PrimaryGatewayFallbackAddresses()
}
return addrs
}
@ -133,7 +238,6 @@ func getRandomItem(items []string) string {
type serverDelegate interface {
blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error
PrimaryGatewayFallbackAddresses() []string
IsLeader() bool
LeaderLastContact() time.Time
}
@ -144,13 +248,15 @@ func NewGatewayLocator(
datacenter string,
primaryDatacenter string,
) *GatewayLocator {
return &GatewayLocator{
g := &GatewayLocator{
logger: logger.Named(logging.GatewayLocator),
srv: srv,
datacenter: datacenter,
primaryDatacenter: primaryDatacenter,
primaryGatewaysReadyCh: make(chan struct{}),
}
g.logPrimaryDialingMessage(g.DialPrimaryThroughLocalGateway())
return g
}
var errGatewayLocalStateNotInitialized = errors.New("local state not initialized")

View File

@ -1,6 +1,7 @@
package consul
import (
"errors"
"testing"
"time"
@ -9,6 +10,7 @@ import (
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil"
memdb "github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -41,11 +43,7 @@ func TestGatewayLocator(t *testing.T) {
UpdatedAt: time.Now().UTC(),
}
// Insert data for the dcs
require.NoError(t, state.FederationStateSet(1, dc1))
require.NoError(t, state.FederationStateSet(2, dc2))
t.Run("primary", func(t *testing.T) {
t.Run("primary - no data", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
@ -57,19 +55,14 @@ func TestGatewayLocator(t *testing.T) {
idx, err := g.runOnce(0)
require.NoError(t, err)
require.Equal(t, uint64(2), idx)
require.Len(t, tsd.Calls, 1)
require.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
}, g.listGateways(false))
require.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
}, g.listGateways(true))
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(1), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true))
})
t.Run("secondary", func(t *testing.T) {
t.Run("secondary - no data", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
@ -81,24 +74,269 @@ func TestGatewayLocator(t *testing.T) {
idx, err := g.runOnce(0)
require.NoError(t, err)
require.Equal(t, uint64(2), idx)
require.Len(t, tsd.Calls, 1)
require.Equal(t, []string{
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(1), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true))
})
t.Run("secondary - just fallback", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(1), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string{
"7.7.7.7:7777",
"8.8.8.8:8888",
}, g.listGateways(true))
})
// Insert data for the dcs
require.NoError(t, state.FederationStateSet(1, dc1))
require.NoError(t, state.FederationStateSet(2, dc2))
t.Run("primary - with data", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc1",
"dc1",
)
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(2), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
}, g.listGateways(false))
assert.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
}, g.listGateways(true))
})
t.Run("secondary - with data", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(2), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(false))
require.Equal(t, []string{
assert.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
}, g.listGateways(true))
})
t.Run("secondary - with data and fallback - no repl", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(2), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(false))
assert.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
"7.7.7.7:7777",
"8.8.8.8:8888",
}, g.listGateways(true))
})
t.Run("secondary - with data and fallback - repl ok", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
g.SetLastFederationStateReplicationError(nil)
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.True(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(2), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(false))
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(true))
})
t.Run("secondary - with data and fallback - repl ok then failed 2 times", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
g.SetLastFederationStateReplicationError(nil)
g.SetLastFederationStateReplicationError(errors.New("fake"))
g.SetLastFederationStateReplicationError(errors.New("fake"))
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.True(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(2), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(false))
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(true))
})
t.Run("secondary - with data and fallback - repl ok then failed 3 times", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
g.SetLastFederationStateReplicationError(nil)
g.SetLastFederationStateReplicationError(errors.New("fake"))
g.SetLastFederationStateReplicationError(errors.New("fake"))
g.SetLastFederationStateReplicationError(errors.New("fake"))
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(2), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(false))
assert.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
"7.7.7.7:7777",
"8.8.8.8:8888",
}, g.listGateways(true))
})
t.Run("secondary - with data and fallback - repl ok then failed 3 times then ok again", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
g.SetLastFederationStateReplicationError(nil)
g.SetLastFederationStateReplicationError(errors.New("fake"))
g.SetLastFederationStateReplicationError(errors.New("fake"))
g.SetLastFederationStateReplicationError(errors.New("fake"))
g.SetLastFederationStateReplicationError(nil)
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.True(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(2), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(false))
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(true))
})
}
type testServerDelegate struct {
State *state.Store
FallbackAddrs []string
Calls []uint64
Calls []uint64
isLeader bool
lastContact time.Time
@ -128,10 +366,6 @@ func newFakeStateStore() (*state.Store, error) {
return state.NewStateStore(nil)
}
func (d *testServerDelegate) PrimaryGatewayFallbackAddresses() []string {
return d.FallbackAddrs
}
func (d *testServerDelegate) IsLeader() bool {
return d.isLeader
}

View File

@ -439,8 +439,11 @@ func NewServerLogger(config *Config, logger hclog.InterceptLogger, tokens *token
federationStateReplicatorConfig := ReplicatorConfig{
Name: logging.FederationState,
Delegate: &IndexReplicator{
Delegate: &FederationStateReplicator{srv: s},
Logger: s.logger,
Delegate: &FederationStateReplicator{
srv: s,
gatewayLocator: s.gatewayLocator,
},
Logger: s.logger,
},
Rate: s.config.FederationStateReplicationRate,
Burst: s.config.FederationStateReplicationBurst,

View File

@ -15,3 +15,42 @@ func StringSliceEqual(a, b []string) bool {
}
return true
}
// StringSliceMergeSorted takes two string slices that are assumed to be sorted
// and does a zipper merge of the two sorted slices, removing any cross-slice
// duplicates. If any individual slice contained duplicates those will be
// retained.
func StringSliceMergeSorted(a, b []string) []string {
if len(a) == 0 && len(b) == 0 {
return nil
} else if len(a) == 0 {
return b
} else if len(b) == 0 {
return a
}
out := make([]string, 0, len(a)+len(b))
i, j := 0, 0
for i < len(a) && j < len(b) {
switch {
case a[i] < b[j]:
out = append(out, a[i])
i++
case a[i] > b[j]:
out = append(out, b[j])
j++
default:
out = append(out, a[i])
i++
j++
}
}
if i < len(a) {
out = append(out, a[i:]...)
}
if j < len(b) {
out = append(out, b[j:]...)
}
return out
}

View File

@ -28,3 +28,25 @@ func TestStringSliceEqual(t *testing.T) {
})
}
}
func TestStringSliceMergeSorted(t *testing.T) {
for name, tc := range map[string]struct {
a, b []string
expect []string
}{
"nil": {nil, nil, nil},
"empty": {[]string{}, []string{}, nil},
"one and none": {[]string{"foo"}, []string{}, []string{"foo"}},
"one and one dupe": {[]string{"foo"}, []string{"foo"}, []string{"foo"}},
"one and one": {[]string{"foo"}, []string{"bar"}, []string{"bar", "foo"}},
"two and one": {[]string{"baz", "foo"}, []string{"bar"}, []string{"bar", "baz", "foo"}},
"two and two": {[]string{"baz", "foo"}, []string{"bar", "egg"}, []string{"bar", "baz", "egg", "foo"}},
"two and two dupe": {[]string{"bar", "foo"}, []string{"bar", "egg"}, []string{"bar", "egg", "foo"}},
} {
tc := tc
t.Run(name, func(t *testing.T) {
require.Equal(t, tc.expect, StringSliceMergeSorted(tc.a, tc.b))
require.Equal(t, tc.expect, StringSliceMergeSorted(tc.b, tc.a))
})
}
}