From 556b8bd1c25e6575283e08b9ad5c12113e8d1f96 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" Date: Mon, 25 Jan 2021 13:24:32 -0600 Subject: [PATCH] server: use the presense of stored federation state data as a sign that we already activated the federation state feature flag (#9519) This way we only have to wait for the serf barrier to pass once before we can make use of federation state APIs Without this patch every restart needs to re-compute the change. --- .changelog/9519.txt | 3 + agent/consul/helper_test.go | 10 +++ agent/consul/leader.go | 6 +- agent/consul/leader_federation_state_ae.go | 10 +++ .../consul/leader_federation_state_ae_test.go | 77 +++++++++++++++++++ 5 files changed, 105 insertions(+), 1 deletion(-) create mode 100644 .changelog/9519.txt diff --git a/.changelog/9519.txt b/.changelog/9519.txt new file mode 100644 index 0000000000..204d6b806f --- /dev/null +++ b/.changelog/9519.txt @@ -0,0 +1,3 @@ +```release-note:improvement +server: use the presense of stored federation state data as a sign that we already activated the federation state feature flag +``` diff --git a/agent/consul/helper_test.go b/agent/consul/helper_test.go index 7373234369..468d28bcdb 100644 --- a/agent/consul/helper_test.go +++ b/agent/consul/helper_test.go @@ -186,6 +186,16 @@ func waitForNewACLReplication(t *testing.T, server *Server, expectedReplicationT }) } +func waitForFederationStateFeature(t *testing.T, server *Server) { + t.Helper() + + retry.Run(t, func(r *retry.R) { + require.True(r, server.DatacenterSupportsFederationStates()) + }) + + require.True(t, server.DatacenterSupportsFederationStates()) +} + func seeEachOther(a, b []serf.Member, addra, addrb string) bool { return serfMembersContains(a, addrb) && serfMembersContains(b, addra) } diff --git a/agent/consul/leader.go b/agent/consul/leader.go index 0872a5b3b9..f8a875c9ec 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -1540,6 +1540,10 @@ func (s *Server) reapTombstones(index uint64) { } } +func (s *Server) setDatacenterSupportsFederationStates() { + atomic.StoreInt32(&s.dcSupportsFederationStates, 1) +} + func (s *Server) DatacenterSupportsFederationStates() bool { if atomic.LoadInt32(&s.dcSupportsFederationStates) != 0 { return true @@ -1564,7 +1568,7 @@ func (s *Server) DatacenterSupportsFederationStates() bool { s.router.CheckServers(s.config.Datacenter, state.update) if state.supported && state.found { - atomic.StoreInt32(&s.dcSupportsFederationStates, 1) + s.setDatacenterSupportsFederationStates() return true } diff --git a/agent/consul/leader_federation_state_ae.go b/agent/consul/leader_federation_state_ae.go index 4e5a8a45b2..5adf08f347 100644 --- a/agent/consul/leader_federation_state_ae.go +++ b/agent/consul/leader_federation_state_ae.go @@ -17,6 +17,16 @@ const ( ) func (s *Server) startFederationStateAntiEntropy() { + // Check to see if we can skip waiting for serf feature detection below. + if !s.DatacenterSupportsFederationStates() { + _, fedStates, err := s.fsm.State().FederationStateList(nil) + if err != nil { + s.logger.Warn("Failed to check for existing federation states and activate the feature flag quicker; skipping this optimization", "error", err) + } else if len(fedStates) > 0 { + s.setDatacenterSupportsFederationStates() + } + } + if s.config.DisableFederationStateAntiEntropy { return } diff --git a/agent/consul/leader_federation_state_ae_test.go b/agent/consul/leader_federation_state_ae_test.go index 17d5bf0865..875de4848a 100644 --- a/agent/consul/leader_federation_state_ae_test.go +++ b/agent/consul/leader_federation_state_ae_test.go @@ -13,6 +13,83 @@ import ( "github.com/stretchr/testify/require" ) +func TestLeader_FederationStateAntiEntropy_FeatureIsStickyEvenIfSerfTagsRegress(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + + // We test this by having two datacenters with one server each. They + // initially come up and pass the serf barrier, then we power them both + // off. We leave the primary off permanently, and then we stand up the + // secondary. Hopefully it should transition to allow federation states. + + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + waitForLeaderEstablishment(t, s1) + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc1" + c.FederationStateReplicationRate = 100 + c.FederationStateReplicationBurst = 100 + c.FederationStateReplicationApplyLimit = 1000000 + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + codec2 := rpcClient(t, s2) + defer codec2.Close() + + waitForLeaderEstablishment(t, s2) + + // Create the WAN link + joinWAN(t, s2, s1) + waitForLeaderEstablishment(t, s1) + waitForLeaderEstablishment(t, s2) + + waitForFederationStateFeature(t, s1) + waitForFederationStateFeature(t, s2) + + // Wait for everybody's AE to complete. + retry.Run(t, func(r *retry.R) { + _, states, err := s1.fsm.State().FederationStateList(nil) + require.NoError(r, err) + require.Len(r, states, 2) + }) + + // Shutdown s1 and s2. + s1.Shutdown() + s2.Shutdown() + + // Restart just s2 + + dir2new, s2new := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc1" + c.FederationStateReplicationRate = 100 + c.FederationStateReplicationBurst = 100 + c.FederationStateReplicationApplyLimit = 1000000 + + c.DataDir = s2.config.DataDir + c.NodeName = s2.config.NodeName + c.NodeID = s2.config.NodeID + }) + defer os.RemoveAll(dir2new) + defer s2new.Shutdown() + + waitForLeaderEstablishment(t, s2new) + + // It should be able to transition without connectivity to the primary. + waitForFederationStateFeature(t, s2new) +} + func TestLeader_FederationStateAntiEntropy_BlockingQuery(t *testing.T) { t.Parallel()