diff --git a/agent/consul/federation_state_endpoint.go b/agent/consul/federation_state_endpoint.go index bbe49ff1ef..614d1f0a81 100644 --- a/agent/consul/federation_state_endpoint.go +++ b/agent/consul/federation_state_endpoint.go @@ -1,6 +1,7 @@ package consul import ( + "errors" "fmt" "time" @@ -11,6 +12,10 @@ import ( memdb "github.com/hashicorp/go-memdb" ) +var ( + errFederationStatesNotEnabled = errors.New("Federation states are currently disabled until all servers in the datacenter support the feature") +) + // FederationState endpoint is used to manipulate federation states from all // datacenters. type FederationState struct { @@ -25,6 +30,11 @@ func (c *FederationState) Apply(args *structs.FederationStateRequest, reply *boo if done, err := c.srv.forward("FederationState.Apply", args, args, reply); done { return err } + + if !c.srv.DatacenterSupportsFederationStates() { + return errFederationStatesNotEnabled + } + defer metrics.MeasureSince([]string{"federation_state", "apply"}, time.Now()) // Fetch the ACL token, if any. @@ -69,6 +79,11 @@ func (c *FederationState) Get(args *structs.FederationStateQuery, reply *structs if done, err := c.srv.forward("FederationState.Get", args, args, reply); done { return err } + + if !c.srv.DatacenterSupportsFederationStates() { + return errFederationStatesNotEnabled + } + defer metrics.MeasureSince([]string{"federation_state", "get"}, time.Now()) // Fetch the ACL token, if any. @@ -105,6 +120,11 @@ func (c *FederationState) List(args *structs.DCSpecificRequest, reply *structs.I if done, err := c.srv.forward("FederationState.List", args, args, reply); done { return err } + + if !c.srv.DatacenterSupportsFederationStates() { + return errFederationStatesNotEnabled + } + defer metrics.MeasureSince([]string{"federation_state", "list"}, time.Now()) // Fetch the ACL token, if any. @@ -143,6 +163,11 @@ func (c *FederationState) ListMeshGateways(args *structs.DCSpecificRequest, repl if done, err := c.srv.forward("FederationState.ListMeshGateways", args, args, reply); done { return err } + + if !c.srv.DatacenterSupportsFederationStates() { + return errFederationStatesNotEnabled + } + defer metrics.MeasureSince([]string{"federation_state", "list_mesh_gateways"}, time.Now()) return c.srv.blockingQuery( diff --git a/agent/consul/federation_state_replication.go b/agent/consul/federation_state_replication.go index 7717ddd4b2..ac6bc640ec 100644 --- a/agent/consul/federation_state_replication.go +++ b/agent/consul/federation_state_replication.go @@ -2,6 +2,7 @@ package consul import ( "context" + "errors" "fmt" "sort" "time" @@ -9,6 +10,12 @@ import ( "github.com/hashicorp/consul/agent/structs" ) +var errFederationStatesNotSupported = errors.New("Not all servers in the datacenter support federation states - preventing replication") + +func isErrFederationStatesNotSupported(err error) bool { + return errors.Is(err, errFederationStatesNotSupported) +} + type FederationStateReplicator struct { srv *Server gatewayLocator *GatewayLocator @@ -27,6 +34,9 @@ func (r *FederationStateReplicator) MetricName() string { return "federation-sta // FetchRemote implements IndexReplicatorDelegate. func (r *FederationStateReplicator) FetchRemote(lastRemoteIndex uint64) (int, interface{}, uint64, error) { + if !r.srv.DatacenterSupportsFederationStates() { + return 0, nil, 0, errFederationStatesNotSupported + } lenRemote, remote, remoteIndex, err := r.fetchRemote(lastRemoteIndex) if r.gatewayLocator != nil { r.gatewayLocator.SetLastFederationStateReplicationError(err) diff --git a/agent/consul/leader.go b/agent/consul/leader.go index 3f851baad9..c7cd037f42 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -1529,3 +1529,64 @@ func (s *Server) reapTombstones(index uint64) { ) } } + +func (s *Server) DatacenterSupportsFederationStates() bool { + if atomic.LoadInt32(&s.dcSupportsFederationStates) != 0 { + return true + } + + state := serversFederationStatesInfo{ + supported: true, + found: false, + } + + // check if they are supported in the primary dc + if s.config.PrimaryDatacenter != s.config.Datacenter { + s.router.CheckServers(s.config.PrimaryDatacenter, state.update) + + if !state.supported || !state.found { + s.logger.Debug("federation states are not enabled in the primary dc") + return false + } + } + + // check the servers in the local DC + s.router.CheckServers(s.config.Datacenter, state.update) + + if state.supported && state.found { + atomic.StoreInt32(&s.dcSupportsFederationStates, 1) + return true + } + + s.logger.Debug("federation states are not enabled in this datacenter", "datacenter", s.config.Datacenter) + return false +} + +type serversFederationStatesInfo struct { + // supported indicates whether every processed server supports federation states + supported bool + + // found indicates that at least one server was processed + found bool +} + +func (s *serversFederationStatesInfo) update(srv *metadata.Server) bool { + if srv.Status != serf.StatusAlive && srv.Status != serf.StatusFailed { + // they are left or something so regardless we treat these servers as meeting + // the version requirement + return true + } + + // mark that we processed at least one server + s.found = true + + if supported, ok := srv.FeatureFlags["fs"]; ok && supported == 1 { + return true + } + + // mark that at least one server does not support federation states + s.supported = false + + // prevent continuing server evaluation + return false +} diff --git a/agent/consul/leader_federation_state_ae.go b/agent/consul/leader_federation_state_ae.go index e4d1701994..99cf534cdd 100644 --- a/agent/consul/leader_federation_state_ae.go +++ b/agent/consul/leader_federation_state_ae.go @@ -43,6 +43,10 @@ func (s *Server) federationStateAntiEntropySync(ctx context.Context) error { var lastFetchIndex uint64 retryLoopBackoff(ctx.Done(), func() error { + if !s.DatacenterSupportsFederationStates() { + return nil + } + idx, err := s.federationStateAntiEntropyMaybeSync(ctx, lastFetchIndex) if err != nil { return err diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index 4dafc01f4d..a7b5fae4cb 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -1226,3 +1226,299 @@ func TestLeader_ACLLegacyReplication(t *testing.T) { require.False(t, srv.leaderRoutineManager.IsRunning(aclRoleReplicationRoutineName)) require.False(t, srv.leaderRoutineManager.IsRunning(aclTokenReplicationRoutineName)) } + +func TestDatacenterSupportsFederationStates(t *testing.T) { + addGateway := func(t *testing.T, srv *Server, dc, node string) { + t.Helper() + arg := structs.RegisterRequest{ + Datacenter: dc, + Node: node, + Address: "127.0.0.1", + Service: &structs.NodeService{ + Kind: structs.ServiceKindMeshGateway, + ID: "mesh-gateway", + Service: "mesh-gateway", + Port: 8080, + }, + } + + var out struct{} + require.NoError(t, srv.RPC("Catalog.Register", &arg, &out)) + } + + t.Run("one node primary with old version", func(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.NodeName = "node1" + c.Datacenter = "dc1" + c.PrimaryDatacenter = "dc1" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + s1.updateSerfTags("ft_fs", "0") + + waitForLeaderEstablishment(t, s1) + + addGateway(t, s1, "dc1", "node1") + + retry.Run(t, func(r *retry.R) { + if s1.DatacenterSupportsFederationStates() { + r.Fatal("server 1 shouldn't activate fedstates") + } + }) + }) + + t.Run("one node primary with new version", func(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.NodeName = "node1" + c.Datacenter = "dc1" + c.PrimaryDatacenter = "dc1" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + waitForLeaderEstablishment(t, s1) + + addGateway(t, s1, "dc1", "node1") + + retry.Run(t, func(r *retry.R) { + if !s1.DatacenterSupportsFederationStates() { + r.Fatal("server 1 didn't activate fedstates") + } + }) + + // Wait until after AE runs at least once. + retry.Run(t, func(r *retry.R) { + arg := structs.FederationStateQuery{ + Datacenter: "dc1", + TargetDatacenter: "dc1", + } + + var out structs.FederationStateResponse + require.NoError(r, s1.RPC("FederationState.Get", &arg, &out)) + require.NotNil(r, out.State) + require.Len(r, out.State.MeshGateways, 1) + }) + }) + + t.Run("two node primary with mixed versions", func(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.NodeName = "node1" + c.Datacenter = "dc1" + c.PrimaryDatacenter = "dc1" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + s1.updateSerfTags("ft_fs", "0") + + waitForLeaderEstablishment(t, s1) + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.NodeName = "node2" + c.Datacenter = "dc1" + c.PrimaryDatacenter = "dc1" + c.Bootstrap = false + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + // Put s1 last so we don't trigger a leader election. + servers := []*Server{s2, s1} + + // Try to join + joinLAN(t, s2, s1) + for _, s := range servers { + retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) }) + } + + waitForLeaderEstablishment(t, s1) + + addGateway(t, s1, "dc1", "node1") + + retry.Run(t, func(r *retry.R) { + if s1.DatacenterSupportsFederationStates() { + r.Fatal("server 1 shouldn't activate fedstates") + } + }) + retry.Run(t, func(r *retry.R) { + if s2.DatacenterSupportsFederationStates() { + r.Fatal("server 2 shouldn't activate fedstates") + } + }) + }) + + t.Run("two node primary with new version", func(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.NodeName = "node1" + c.Datacenter = "dc1" + c.PrimaryDatacenter = "dc1" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + waitForLeaderEstablishment(t, s1) + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.NodeName = "node2" + c.Datacenter = "dc1" + c.PrimaryDatacenter = "dc1" + c.Bootstrap = false + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + // Put s1 last so we don't trigger a leader election. + servers := []*Server{s2, s1} + + // Try to join + joinLAN(t, s2, s1) + for _, s := range servers { + retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) }) + } + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + testrpc.WaitForLeader(t, s2.RPC, "dc1") + + addGateway(t, s1, "dc1", "node1") + + retry.Run(t, func(r *retry.R) { + if !s1.DatacenterSupportsFederationStates() { + r.Fatal("server 1 didn't activate fedstates") + } + }) + retry.Run(t, func(r *retry.R) { + if !s2.DatacenterSupportsFederationStates() { + r.Fatal("server 2 didn't activate fedstates") + } + }) + + // Wait until after AE runs at least once. + retry.Run(t, func(r *retry.R) { + arg := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + + var out structs.IndexedFederationStates + require.NoError(r, s1.RPC("FederationState.List", &arg, &out)) + require.Len(r, out.States, 1) + require.Len(r, out.States[0].MeshGateways, 1) + }) + }) + + t.Run("primary and secondary with new version", func(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.NodeName = "node1" + c.Datacenter = "dc1" + c.PrimaryDatacenter = "dc1" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + waitForLeaderEstablishment(t, s1) + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.NodeName = "node2" + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc1" + c.FederationStateReplicationRate = 100 + c.FederationStateReplicationBurst = 100 + c.FederationStateReplicationApplyLimit = 1000000 + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + waitForLeaderEstablishment(t, s2) + + // Try to join + joinWAN(t, s2, s1) + testrpc.WaitForLeader(t, s1.RPC, "dc1") + testrpc.WaitForLeader(t, s1.RPC, "dc2") + + addGateway(t, s1, "dc1", "node1") + addGateway(t, s2, "dc2", "node2") + + retry.Run(t, func(r *retry.R) { + if !s1.DatacenterSupportsFederationStates() { + r.Fatal("server 1 didn't activate fedstates") + } + }) + retry.Run(t, func(r *retry.R) { + if !s2.DatacenterSupportsFederationStates() { + r.Fatal("server 2 didn't activate fedstates") + } + }) + + // Wait until after AE runs at least once for both. + retry.Run(t, func(r *retry.R) { + arg := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + + var out structs.IndexedFederationStates + require.NoError(r, s1.RPC("FederationState.List", &arg, &out)) + require.Len(r, out.States, 2) + require.Len(r, out.States[0].MeshGateways, 1) + require.Len(r, out.States[1].MeshGateways, 1) + }) + + // Wait until after replication runs for the secondary. + retry.Run(t, func(r *retry.R) { + arg := structs.DCSpecificRequest{ + Datacenter: "dc2", + } + + var out structs.IndexedFederationStates + require.NoError(r, s1.RPC("FederationState.List", &arg, &out)) + require.Len(r, out.States, 2) + require.Len(r, out.States[0].MeshGateways, 1) + require.Len(r, out.States[1].MeshGateways, 1) + }) + }) + + t.Run("primary and secondary with mixed versions", func(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.NodeName = "node1" + c.Datacenter = "dc1" + c.PrimaryDatacenter = "dc1" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + s1.updateSerfTags("ft_fs", "0") + + waitForLeaderEstablishment(t, s1) + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.NodeName = "node2" + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc1" + c.FederationStateReplicationRate = 100 + c.FederationStateReplicationBurst = 100 + c.FederationStateReplicationApplyLimit = 1000000 + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + waitForLeaderEstablishment(t, s2) + + // Try to join + joinWAN(t, s2, s1) + testrpc.WaitForLeader(t, s1.RPC, "dc1") + testrpc.WaitForLeader(t, s1.RPC, "dc2") + + addGateway(t, s1, "dc1", "node1") + addGateway(t, s2, "dc2", "node2") + + retry.Run(t, func(r *retry.R) { + if s1.DatacenterSupportsFederationStates() { + r.Fatal("server 1 shouldn't activate fedstates") + } + }) + retry.Run(t, func(r *retry.R) { + if s2.DatacenterSupportsFederationStates() { + r.Fatal("server 2 shouldn't activate fedstates") + } + }) + }) +} diff --git a/agent/consul/replication.go b/agent/consul/replication.go index c8f19d4ccd..39c9af4f15 100644 --- a/agent/consul/replication.go +++ b/agent/consul/replication.go @@ -183,7 +183,7 @@ func (r *IndexReplicator) Replicate(ctx context.Context, lastRemoteIndex uint64, metrics.MeasureSince([]string{"leader", "replication", r.Delegate.MetricName(), "fetch"}, fetchStart) if err != nil { - return 0, false, fmt.Errorf("failed to retrieve %s: %v", r.Delegate.PluralNoun(), err) + return 0, false, fmt.Errorf("failed to retrieve %s: %w", r.Delegate.PluralNoun(), err) } r.Logger.Debug("finished fetching remote objects", diff --git a/agent/consul/server.go b/agent/consul/server.go index fc5bf34ab1..38bb0dc583 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -163,6 +163,12 @@ type Server struct { // federation states federationStateReplicator *Replicator + // dcSupportsFederationStates is used to determine whether we can + // replicate federation states or not. All servers in the local + // DC must be on a version of Consul supporting federation states + // before this will get enabled. + dcSupportsFederationStates int32 + // tokens holds ACL tokens initially from the configuration, but can // be updated at runtime, so should always be used instead of going to // the configuration directly. @@ -447,9 +453,10 @@ func NewServerLogger(config *Config, logger hclog.InterceptLogger, tokens *token }, Logger: s.logger, }, - Rate: s.config.FederationStateReplicationRate, - Burst: s.config.FederationStateReplicationBurst, - Logger: logger, + Rate: s.config.FederationStateReplicationRate, + Burst: s.config.FederationStateReplicationBurst, + Logger: logger, + SuppressErrorLog: isErrFederationStatesNotSupported, } s.federationStateReplicator, err = NewReplicator(&federationStateReplicatorConfig) if err != nil { diff --git a/agent/consul/server_serf.go b/agent/consul/server_serf.go index 9c717a6008..3a2471fec0 100644 --- a/agent/consul/server_serf.go +++ b/agent/consul/server_serf.go @@ -74,6 +74,9 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w conf.Tags["acls"] = string(structs.ACLModeDisabled) } + // feature flag: advertise support for federation states + conf.Tags["ft_fs"] = "1" + var subLoggerName string if wan { subLoggerName = logging.WAN