server: don't activate federation state replication or anti-entropy until all servers are running 1.8.0+ (#8014)

This commit is contained in:
R.B. Boyer 2020-06-04 16:05:27 -05:00 committed by GitHub
parent dfcf45c6cf
commit b88bd6660e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 410 additions and 4 deletions

View File

@ -1,6 +1,7 @@
package consul package consul
import ( import (
"errors"
"fmt" "fmt"
"time" "time"
@ -11,6 +12,10 @@ import (
memdb "github.com/hashicorp/go-memdb" memdb "github.com/hashicorp/go-memdb"
) )
var (
errFederationStatesNotEnabled = errors.New("Federation states are currently disabled until all servers in the datacenter support the feature")
)
// FederationState endpoint is used to manipulate federation states from all // FederationState endpoint is used to manipulate federation states from all
// datacenters. // datacenters.
type FederationState struct { type FederationState struct {
@ -25,6 +30,11 @@ func (c *FederationState) Apply(args *structs.FederationStateRequest, reply *boo
if done, err := c.srv.forward("FederationState.Apply", args, args, reply); done { if done, err := c.srv.forward("FederationState.Apply", args, args, reply); done {
return err return err
} }
if !c.srv.DatacenterSupportsFederationStates() {
return errFederationStatesNotEnabled
}
defer metrics.MeasureSince([]string{"federation_state", "apply"}, time.Now()) defer metrics.MeasureSince([]string{"federation_state", "apply"}, time.Now())
// Fetch the ACL token, if any. // Fetch the ACL token, if any.
@ -69,6 +79,11 @@ func (c *FederationState) Get(args *structs.FederationStateQuery, reply *structs
if done, err := c.srv.forward("FederationState.Get", args, args, reply); done { if done, err := c.srv.forward("FederationState.Get", args, args, reply); done {
return err return err
} }
if !c.srv.DatacenterSupportsFederationStates() {
return errFederationStatesNotEnabled
}
defer metrics.MeasureSince([]string{"federation_state", "get"}, time.Now()) defer metrics.MeasureSince([]string{"federation_state", "get"}, time.Now())
// Fetch the ACL token, if any. // Fetch the ACL token, if any.
@ -105,6 +120,11 @@ func (c *FederationState) List(args *structs.DCSpecificRequest, reply *structs.I
if done, err := c.srv.forward("FederationState.List", args, args, reply); done { if done, err := c.srv.forward("FederationState.List", args, args, reply); done {
return err return err
} }
if !c.srv.DatacenterSupportsFederationStates() {
return errFederationStatesNotEnabled
}
defer metrics.MeasureSince([]string{"federation_state", "list"}, time.Now()) defer metrics.MeasureSince([]string{"federation_state", "list"}, time.Now())
// Fetch the ACL token, if any. // Fetch the ACL token, if any.
@ -143,6 +163,11 @@ func (c *FederationState) ListMeshGateways(args *structs.DCSpecificRequest, repl
if done, err := c.srv.forward("FederationState.ListMeshGateways", args, args, reply); done { if done, err := c.srv.forward("FederationState.ListMeshGateways", args, args, reply); done {
return err return err
} }
if !c.srv.DatacenterSupportsFederationStates() {
return errFederationStatesNotEnabled
}
defer metrics.MeasureSince([]string{"federation_state", "list_mesh_gateways"}, time.Now()) defer metrics.MeasureSince([]string{"federation_state", "list_mesh_gateways"}, time.Now())
return c.srv.blockingQuery( return c.srv.blockingQuery(

View File

@ -2,6 +2,7 @@ package consul
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"sort" "sort"
"time" "time"
@ -9,6 +10,12 @@ import (
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
) )
var errFederationStatesNotSupported = errors.New("Not all servers in the datacenter support federation states - preventing replication")
func isErrFederationStatesNotSupported(err error) bool {
return errors.Is(err, errFederationStatesNotSupported)
}
type FederationStateReplicator struct { type FederationStateReplicator struct {
srv *Server srv *Server
gatewayLocator *GatewayLocator gatewayLocator *GatewayLocator
@ -27,6 +34,9 @@ func (r *FederationStateReplicator) MetricName() string { return "federation-sta
// FetchRemote implements IndexReplicatorDelegate. // FetchRemote implements IndexReplicatorDelegate.
func (r *FederationStateReplicator) FetchRemote(lastRemoteIndex uint64) (int, interface{}, uint64, error) { func (r *FederationStateReplicator) FetchRemote(lastRemoteIndex uint64) (int, interface{}, uint64, error) {
if !r.srv.DatacenterSupportsFederationStates() {
return 0, nil, 0, errFederationStatesNotSupported
}
lenRemote, remote, remoteIndex, err := r.fetchRemote(lastRemoteIndex) lenRemote, remote, remoteIndex, err := r.fetchRemote(lastRemoteIndex)
if r.gatewayLocator != nil { if r.gatewayLocator != nil {
r.gatewayLocator.SetLastFederationStateReplicationError(err) r.gatewayLocator.SetLastFederationStateReplicationError(err)

View File

@ -1529,3 +1529,64 @@ func (s *Server) reapTombstones(index uint64) {
) )
} }
} }
func (s *Server) DatacenterSupportsFederationStates() bool {
if atomic.LoadInt32(&s.dcSupportsFederationStates) != 0 {
return true
}
state := serversFederationStatesInfo{
supported: true,
found: false,
}
// check if they are supported in the primary dc
if s.config.PrimaryDatacenter != s.config.Datacenter {
s.router.CheckServers(s.config.PrimaryDatacenter, state.update)
if !state.supported || !state.found {
s.logger.Debug("federation states are not enabled in the primary dc")
return false
}
}
// check the servers in the local DC
s.router.CheckServers(s.config.Datacenter, state.update)
if state.supported && state.found {
atomic.StoreInt32(&s.dcSupportsFederationStates, 1)
return true
}
s.logger.Debug("federation states are not enabled in this datacenter", "datacenter", s.config.Datacenter)
return false
}
type serversFederationStatesInfo struct {
// supported indicates whether every processed server supports federation states
supported bool
// found indicates that at least one server was processed
found bool
}
func (s *serversFederationStatesInfo) update(srv *metadata.Server) bool {
if srv.Status != serf.StatusAlive && srv.Status != serf.StatusFailed {
// they are left or something so regardless we treat these servers as meeting
// the version requirement
return true
}
// mark that we processed at least one server
s.found = true
if supported, ok := srv.FeatureFlags["fs"]; ok && supported == 1 {
return true
}
// mark that at least one server does not support federation states
s.supported = false
// prevent continuing server evaluation
return false
}

View File

@ -43,6 +43,10 @@ func (s *Server) federationStateAntiEntropySync(ctx context.Context) error {
var lastFetchIndex uint64 var lastFetchIndex uint64
retryLoopBackoff(ctx.Done(), func() error { retryLoopBackoff(ctx.Done(), func() error {
if !s.DatacenterSupportsFederationStates() {
return nil
}
idx, err := s.federationStateAntiEntropyMaybeSync(ctx, lastFetchIndex) idx, err := s.federationStateAntiEntropyMaybeSync(ctx, lastFetchIndex)
if err != nil { if err != nil {
return err return err

View File

@ -1226,3 +1226,299 @@ func TestLeader_ACLLegacyReplication(t *testing.T) {
require.False(t, srv.leaderRoutineManager.IsRunning(aclRoleReplicationRoutineName)) require.False(t, srv.leaderRoutineManager.IsRunning(aclRoleReplicationRoutineName))
require.False(t, srv.leaderRoutineManager.IsRunning(aclTokenReplicationRoutineName)) require.False(t, srv.leaderRoutineManager.IsRunning(aclTokenReplicationRoutineName))
} }
func TestDatacenterSupportsFederationStates(t *testing.T) {
addGateway := func(t *testing.T, srv *Server, dc, node string) {
t.Helper()
arg := structs.RegisterRequest{
Datacenter: dc,
Node: node,
Address: "127.0.0.1",
Service: &structs.NodeService{
Kind: structs.ServiceKindMeshGateway,
ID: "mesh-gateway",
Service: "mesh-gateway",
Port: 8080,
},
}
var out struct{}
require.NoError(t, srv.RPC("Catalog.Register", &arg, &out))
}
t.Run("one node primary with old version", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
s1.updateSerfTags("ft_fs", "0")
waitForLeaderEstablishment(t, s1)
addGateway(t, s1, "dc1", "node1")
retry.Run(t, func(r *retry.R) {
if s1.DatacenterSupportsFederationStates() {
r.Fatal("server 1 shouldn't activate fedstates")
}
})
})
t.Run("one node primary with new version", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
waitForLeaderEstablishment(t, s1)
addGateway(t, s1, "dc1", "node1")
retry.Run(t, func(r *retry.R) {
if !s1.DatacenterSupportsFederationStates() {
r.Fatal("server 1 didn't activate fedstates")
}
})
// Wait until after AE runs at least once.
retry.Run(t, func(r *retry.R) {
arg := structs.FederationStateQuery{
Datacenter: "dc1",
TargetDatacenter: "dc1",
}
var out structs.FederationStateResponse
require.NoError(r, s1.RPC("FederationState.Get", &arg, &out))
require.NotNil(r, out.State)
require.Len(r, out.State.MeshGateways, 1)
})
})
t.Run("two node primary with mixed versions", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
s1.updateSerfTags("ft_fs", "0")
waitForLeaderEstablishment(t, s1)
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node2"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Put s1 last so we don't trigger a leader election.
servers := []*Server{s2, s1}
// Try to join
joinLAN(t, s2, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) })
}
waitForLeaderEstablishment(t, s1)
addGateway(t, s1, "dc1", "node1")
retry.Run(t, func(r *retry.R) {
if s1.DatacenterSupportsFederationStates() {
r.Fatal("server 1 shouldn't activate fedstates")
}
})
retry.Run(t, func(r *retry.R) {
if s2.DatacenterSupportsFederationStates() {
r.Fatal("server 2 shouldn't activate fedstates")
}
})
})
t.Run("two node primary with new version", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
waitForLeaderEstablishment(t, s1)
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node2"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Put s1 last so we don't trigger a leader election.
servers := []*Server{s2, s1}
// Try to join
joinLAN(t, s2, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) })
}
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s2.RPC, "dc1")
addGateway(t, s1, "dc1", "node1")
retry.Run(t, func(r *retry.R) {
if !s1.DatacenterSupportsFederationStates() {
r.Fatal("server 1 didn't activate fedstates")
}
})
retry.Run(t, func(r *retry.R) {
if !s2.DatacenterSupportsFederationStates() {
r.Fatal("server 2 didn't activate fedstates")
}
})
// Wait until after AE runs at least once.
retry.Run(t, func(r *retry.R) {
arg := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var out structs.IndexedFederationStates
require.NoError(r, s1.RPC("FederationState.List", &arg, &out))
require.Len(r, out.States, 1)
require.Len(r, out.States[0].MeshGateways, 1)
})
})
t.Run("primary and secondary with new version", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
waitForLeaderEstablishment(t, s1)
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node2"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1"
c.FederationStateReplicationRate = 100
c.FederationStateReplicationBurst = 100
c.FederationStateReplicationApplyLimit = 1000000
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
waitForLeaderEstablishment(t, s2)
// Try to join
joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s1.RPC, "dc2")
addGateway(t, s1, "dc1", "node1")
addGateway(t, s2, "dc2", "node2")
retry.Run(t, func(r *retry.R) {
if !s1.DatacenterSupportsFederationStates() {
r.Fatal("server 1 didn't activate fedstates")
}
})
retry.Run(t, func(r *retry.R) {
if !s2.DatacenterSupportsFederationStates() {
r.Fatal("server 2 didn't activate fedstates")
}
})
// Wait until after AE runs at least once for both.
retry.Run(t, func(r *retry.R) {
arg := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var out structs.IndexedFederationStates
require.NoError(r, s1.RPC("FederationState.List", &arg, &out))
require.Len(r, out.States, 2)
require.Len(r, out.States[0].MeshGateways, 1)
require.Len(r, out.States[1].MeshGateways, 1)
})
// Wait until after replication runs for the secondary.
retry.Run(t, func(r *retry.R) {
arg := structs.DCSpecificRequest{
Datacenter: "dc2",
}
var out structs.IndexedFederationStates
require.NoError(r, s1.RPC("FederationState.List", &arg, &out))
require.Len(r, out.States, 2)
require.Len(r, out.States[0].MeshGateways, 1)
require.Len(r, out.States[1].MeshGateways, 1)
})
})
t.Run("primary and secondary with mixed versions", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
s1.updateSerfTags("ft_fs", "0")
waitForLeaderEstablishment(t, s1)
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node2"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1"
c.FederationStateReplicationRate = 100
c.FederationStateReplicationBurst = 100
c.FederationStateReplicationApplyLimit = 1000000
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
waitForLeaderEstablishment(t, s2)
// Try to join
joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s1.RPC, "dc2")
addGateway(t, s1, "dc1", "node1")
addGateway(t, s2, "dc2", "node2")
retry.Run(t, func(r *retry.R) {
if s1.DatacenterSupportsFederationStates() {
r.Fatal("server 1 shouldn't activate fedstates")
}
})
retry.Run(t, func(r *retry.R) {
if s2.DatacenterSupportsFederationStates() {
r.Fatal("server 2 shouldn't activate fedstates")
}
})
})
}

View File

@ -183,7 +183,7 @@ func (r *IndexReplicator) Replicate(ctx context.Context, lastRemoteIndex uint64,
metrics.MeasureSince([]string{"leader", "replication", r.Delegate.MetricName(), "fetch"}, fetchStart) metrics.MeasureSince([]string{"leader", "replication", r.Delegate.MetricName(), "fetch"}, fetchStart)
if err != nil { if err != nil {
return 0, false, fmt.Errorf("failed to retrieve %s: %v", r.Delegate.PluralNoun(), err) return 0, false, fmt.Errorf("failed to retrieve %s: %w", r.Delegate.PluralNoun(), err)
} }
r.Logger.Debug("finished fetching remote objects", r.Logger.Debug("finished fetching remote objects",

View File

@ -163,6 +163,12 @@ type Server struct {
// federation states // federation states
federationStateReplicator *Replicator federationStateReplicator *Replicator
// dcSupportsFederationStates is used to determine whether we can
// replicate federation states or not. All servers in the local
// DC must be on a version of Consul supporting federation states
// before this will get enabled.
dcSupportsFederationStates int32
// tokens holds ACL tokens initially from the configuration, but can // tokens holds ACL tokens initially from the configuration, but can
// be updated at runtime, so should always be used instead of going to // be updated at runtime, so should always be used instead of going to
// the configuration directly. // the configuration directly.
@ -449,6 +455,7 @@ func NewServerLogger(config *Config, logger hclog.InterceptLogger, tokens *token
Rate: s.config.FederationStateReplicationRate, Rate: s.config.FederationStateReplicationRate,
Burst: s.config.FederationStateReplicationBurst, Burst: s.config.FederationStateReplicationBurst,
Logger: logger, Logger: logger,
SuppressErrorLog: isErrFederationStatesNotSupported,
} }
s.federationStateReplicator, err = NewReplicator(&federationStateReplicatorConfig) s.federationStateReplicator, err = NewReplicator(&federationStateReplicatorConfig)
if err != nil { if err != nil {

View File

@ -74,6 +74,9 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
conf.Tags["acls"] = string(structs.ACLModeDisabled) conf.Tags["acls"] = string(structs.ACLModeDisabled)
} }
// feature flag: advertise support for federation states
conf.Tags["ft_fs"] = "1"
var subLoggerName string var subLoggerName string
if wan { if wan {
subLoggerName = logging.WAN subLoggerName = logging.WAN