diff --git a/.changelog/9519.txt b/.changelog/9519.txt new file mode 100644 index 0000000000..204d6b806f --- /dev/null +++ b/.changelog/9519.txt @@ -0,0 +1,3 @@ +```release-note:improvement +server: use the presense of stored federation state data as a sign that we already activated the federation state feature flag +``` diff --git a/agent/consul/helper_test.go b/agent/consul/helper_test.go index 7373234369..468d28bcdb 100644 --- a/agent/consul/helper_test.go +++ b/agent/consul/helper_test.go @@ -186,6 +186,16 @@ func waitForNewACLReplication(t *testing.T, server *Server, expectedReplicationT }) } +func waitForFederationStateFeature(t *testing.T, server *Server) { + t.Helper() + + retry.Run(t, func(r *retry.R) { + require.True(r, server.DatacenterSupportsFederationStates()) + }) + + require.True(t, server.DatacenterSupportsFederationStates()) +} + func seeEachOther(a, b []serf.Member, addra, addrb string) bool { return serfMembersContains(a, addrb) && serfMembersContains(b, addra) } diff --git a/agent/consul/leader.go b/agent/consul/leader.go index 0872a5b3b9..f8a875c9ec 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -1540,6 +1540,10 @@ func (s *Server) reapTombstones(index uint64) { } } +func (s *Server) setDatacenterSupportsFederationStates() { + atomic.StoreInt32(&s.dcSupportsFederationStates, 1) +} + func (s *Server) DatacenterSupportsFederationStates() bool { if atomic.LoadInt32(&s.dcSupportsFederationStates) != 0 { return true @@ -1564,7 +1568,7 @@ func (s *Server) DatacenterSupportsFederationStates() bool { s.router.CheckServers(s.config.Datacenter, state.update) if state.supported && state.found { - atomic.StoreInt32(&s.dcSupportsFederationStates, 1) + s.setDatacenterSupportsFederationStates() return true } diff --git a/agent/consul/leader_federation_state_ae.go b/agent/consul/leader_federation_state_ae.go index 4e5a8a45b2..5adf08f347 100644 --- a/agent/consul/leader_federation_state_ae.go +++ b/agent/consul/leader_federation_state_ae.go @@ -17,6 +17,16 @@ const ( ) func (s *Server) startFederationStateAntiEntropy() { + // Check to see if we can skip waiting for serf feature detection below. + if !s.DatacenterSupportsFederationStates() { + _, fedStates, err := s.fsm.State().FederationStateList(nil) + if err != nil { + s.logger.Warn("Failed to check for existing federation states and activate the feature flag quicker; skipping this optimization", "error", err) + } else if len(fedStates) > 0 { + s.setDatacenterSupportsFederationStates() + } + } + if s.config.DisableFederationStateAntiEntropy { return } diff --git a/agent/consul/leader_federation_state_ae_test.go b/agent/consul/leader_federation_state_ae_test.go index 17d5bf0865..875de4848a 100644 --- a/agent/consul/leader_federation_state_ae_test.go +++ b/agent/consul/leader_federation_state_ae_test.go @@ -13,6 +13,83 @@ import ( "github.com/stretchr/testify/require" ) +func TestLeader_FederationStateAntiEntropy_FeatureIsStickyEvenIfSerfTagsRegress(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + + // We test this by having two datacenters with one server each. They + // initially come up and pass the serf barrier, then we power them both + // off. We leave the primary off permanently, and then we stand up the + // secondary. Hopefully it should transition to allow federation states. + + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + waitForLeaderEstablishment(t, s1) + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc1" + c.FederationStateReplicationRate = 100 + c.FederationStateReplicationBurst = 100 + c.FederationStateReplicationApplyLimit = 1000000 + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + codec2 := rpcClient(t, s2) + defer codec2.Close() + + waitForLeaderEstablishment(t, s2) + + // Create the WAN link + joinWAN(t, s2, s1) + waitForLeaderEstablishment(t, s1) + waitForLeaderEstablishment(t, s2) + + waitForFederationStateFeature(t, s1) + waitForFederationStateFeature(t, s2) + + // Wait for everybody's AE to complete. + retry.Run(t, func(r *retry.R) { + _, states, err := s1.fsm.State().FederationStateList(nil) + require.NoError(r, err) + require.Len(r, states, 2) + }) + + // Shutdown s1 and s2. + s1.Shutdown() + s2.Shutdown() + + // Restart just s2 + + dir2new, s2new := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc1" + c.FederationStateReplicationRate = 100 + c.FederationStateReplicationBurst = 100 + c.FederationStateReplicationApplyLimit = 1000000 + + c.DataDir = s2.config.DataDir + c.NodeName = s2.config.NodeName + c.NodeID = s2.config.NodeID + }) + defer os.RemoveAll(dir2new) + defer s2new.Shutdown() + + waitForLeaderEstablishment(t, s2new) + + // It should be able to transition without connectivity to the primary. + waitForFederationStateFeature(t, s2new) +} + func TestLeader_FederationStateAntiEntropy_BlockingQuery(t *testing.T) { t.Parallel()