mirror of
https://github.com/status-im/consul.git
synced 2025-01-11 14:24:39 +00:00
Relaxes Autopilot promotion logic. (#3623)
* Relaxes Autopilot promotion logic. When we defaulted the Raft protocol version to 3 in #3477 we made the numPeers() routine more strict to only count voters (this is more conservative and more correct). This had the side effect of breaking rolling updates because it's at odds with the Autopilot non-voter promotion logic. That logic used to wait to only promote to maintain an odd quorum of servers. During a rolling update (add one new server, wait, and then kill an old server) the dead server cleanup would still count the old server as a peer, which is conservative and the right thing to do, and no longer count the non-voter. This would wait to promote, so you could get into a stalemate. It is safer to promote early than remove early, so by promoting as soon as possible we have chosen that as the solution here. Fixes #3611 * Gets rid of unnecessary extra not-a-voter check.
This commit is contained in:
parent
bcafded4e6
commit
6bf55d16a2
@ -55,11 +55,11 @@ func (s *Server) autopilotLoop() {
|
||||
}
|
||||
|
||||
if err := s.autopilotPolicy.PromoteNonVoters(autopilotConfig); err != nil {
|
||||
s.logger.Printf("[ERR] autopilot: error checking for non-voters to promote: %s", err)
|
||||
s.logger.Printf("[ERR] autopilot: Error checking for non-voters to promote: %s", err)
|
||||
}
|
||||
|
||||
if err := s.pruneDeadServers(autopilotConfig); err != nil {
|
||||
s.logger.Printf("[ERR] autopilot: error checking for dead servers to remove: %s", err)
|
||||
s.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
|
||||
}
|
||||
case <-s.autopilotRemoveDeadCh:
|
||||
autopilotConfig, ok := s.getOrCreateAutopilotConfig()
|
||||
@ -68,60 +68,62 @@ func (s *Server) autopilotLoop() {
|
||||
}
|
||||
|
||||
if err := s.pruneDeadServers(autopilotConfig); err != nil {
|
||||
s.logger.Printf("[ERR] autopilot: error checking for dead servers to remove: %s", err)
|
||||
s.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// fmtServer prints info about a server in a standard way for logging.
|
||||
func fmtServer(server raft.Server) string {
|
||||
return fmt.Sprintf("Server (ID: %q Address: %q)", server.ID, server.Address)
|
||||
}
|
||||
|
||||
// pruneDeadServers removes up to numPeers/2 failed servers
|
||||
func (s *Server) pruneDeadServers(autopilotConfig *structs.AutopilotConfig) error {
|
||||
// Find any failed servers
|
||||
if !autopilotConfig.CleanupDeadServers {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Failed servers are known to Serf and marked failed, and stale servers
|
||||
// are known to Raft but not Serf.
|
||||
var failed []string
|
||||
staleRaftServers := make(map[string]raft.Server)
|
||||
if autopilotConfig.CleanupDeadServers {
|
||||
future := s.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
future := s.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, server := range future.Configuration().Servers {
|
||||
staleRaftServers[string(server.Address)] = server
|
||||
}
|
||||
for _, member := range s.serfLAN.Members() {
|
||||
valid, parts := metadata.IsConsulServer(member)
|
||||
if valid {
|
||||
if _, ok := staleRaftServers[parts.Addr.String()]; ok {
|
||||
delete(staleRaftServers, parts.Addr.String())
|
||||
}
|
||||
|
||||
for _, server := range future.Configuration().Servers {
|
||||
staleRaftServers[string(server.Address)] = server
|
||||
}
|
||||
|
||||
for _, member := range s.serfLAN.Members() {
|
||||
valid, parts := metadata.IsConsulServer(member)
|
||||
|
||||
if valid {
|
||||
// Remove this server from the stale list; it has a serf entry
|
||||
if _, ok := staleRaftServers[parts.Addr.String()]; ok {
|
||||
delete(staleRaftServers, parts.Addr.String())
|
||||
}
|
||||
|
||||
if member.Status == serf.StatusFailed {
|
||||
failed = append(failed, member.Name)
|
||||
}
|
||||
if member.Status == serf.StatusFailed {
|
||||
failed = append(failed, member.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We can bail early if there's nothing to do.
|
||||
removalCount := len(failed) + len(staleRaftServers)
|
||||
|
||||
// Nothing to remove, return early
|
||||
if removalCount == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Only do removals if a minority of servers will be affected.
|
||||
peers, err := s.numPeers()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Only do removals if a minority of servers will be affected
|
||||
if removalCount < peers/2 {
|
||||
for _, server := range failed {
|
||||
s.logger.Printf("[INFO] autopilot: Attempting removal of failed server: %v", server)
|
||||
go s.serfLAN.RemoveFailedNode(server)
|
||||
for _, node := range failed {
|
||||
s.logger.Printf("[INFO] autopilot: Attempting removal of failed server node %q", node)
|
||||
go s.serfLAN.RemoveFailedNode(node)
|
||||
}
|
||||
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
|
||||
@ -129,12 +131,11 @@ func (s *Server) pruneDeadServers(autopilotConfig *structs.AutopilotConfig) erro
|
||||
return err
|
||||
}
|
||||
for _, raftServer := range staleRaftServers {
|
||||
s.logger.Printf("[INFO] autopilot: Attempting removal of stale %s", fmtServer(raftServer))
|
||||
var future raft.Future
|
||||
if minRaftProtocol >= 2 {
|
||||
s.logger.Printf("[INFO] autopilot: Attempting removal of stale raft server : %v", raftServer.ID)
|
||||
future = s.raft.RemoveServer(raftServer.ID, 0, 0)
|
||||
} else {
|
||||
s.logger.Printf("[INFO] autopilot: Attempting removal of stale raft server : %v", raftServer.ID)
|
||||
future = s.raft.RemovePeer(raftServer.Address)
|
||||
}
|
||||
if err := future.Error(); err != nil {
|
||||
@ -156,82 +157,65 @@ type BasicAutopilot struct {
|
||||
|
||||
// PromoteNonVoters promotes eligible non-voting servers to voters.
|
||||
func (b *BasicAutopilot) PromoteNonVoters(autopilotConfig *structs.AutopilotConfig) error {
|
||||
// If we don't meet the minimum version for non-voter features, bail
|
||||
// early.
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(b.server.LANMembers())
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting server raft protocol versions: %s", err)
|
||||
}
|
||||
|
||||
// If we don't meet the minimum version for non-voter features, bail early
|
||||
if minRaftProtocol < 3 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Find any non-voters eligible for promotion.
|
||||
now := time.Now()
|
||||
var promotions []raft.Server
|
||||
future := b.server.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
return fmt.Errorf("failed to get raft configuration: %v", err)
|
||||
}
|
||||
|
||||
// Find any non-voters eligible for promotion
|
||||
var promotions []raft.Server
|
||||
voterCount := 0
|
||||
for _, server := range future.Configuration().Servers {
|
||||
// If this server has been stable and passing for long enough, promote it to a voter
|
||||
if !isVoter(server.Suffrage) {
|
||||
health := b.server.getServerHealth(string(server.ID))
|
||||
if health.IsStable(time.Now(), autopilotConfig) {
|
||||
if health.IsStable(now, autopilotConfig) {
|
||||
promotions = append(promotions, server)
|
||||
}
|
||||
} else {
|
||||
voterCount++
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := b.server.handlePromotions(voterCount, promotions); err != nil {
|
||||
if err := b.server.handlePromotions(promotions); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) handlePromotions(voterCount int, promotions []raft.Server) (bool, error) {
|
||||
if len(promotions) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// If there's currently an even number of servers, we can promote the first server in the list
|
||||
// to get to an odd-sized quorum
|
||||
newServers := false
|
||||
if voterCount%2 == 0 {
|
||||
addFuture := s.raft.AddVoter(promotions[0].ID, promotions[0].Address, 0, 0)
|
||||
// handlePromotions is a helper shared with Consul Enterprise that attempts to
|
||||
// apply desired server promotions to the Raft configuration.
|
||||
func (s *Server) handlePromotions(promotions []raft.Server) error {
|
||||
// This used to wait to only promote to maintain an odd quorum of
|
||||
// servers, but this was at odds with the dead server cleanup when doing
|
||||
// rolling updates (add one new server, wait, and then kill an old
|
||||
// server). The dead server cleanup would still count the old server as
|
||||
// a peer, which is conservative and the right thing to do, and this
|
||||
// would wait to promote, so you could get into a stalemate. It is safer
|
||||
// to promote early than remove early, so by promoting as soon as
|
||||
// possible we have chosen that as the solution here.
|
||||
for _, server := range promotions {
|
||||
s.logger.Printf("[INFO] autopilot: Promoting %s to voter", fmtServer(server))
|
||||
addFuture := s.raft.AddVoter(server.ID, server.Address, 0, 0)
|
||||
if err := addFuture.Error(); err != nil {
|
||||
return newServers, fmt.Errorf("failed to add raft peer: %v", err)
|
||||
return fmt.Errorf("failed to add raft peer: %v", err)
|
||||
}
|
||||
promotions = promotions[1:]
|
||||
newServers = true
|
||||
}
|
||||
|
||||
// Promote remaining servers in twos to maintain an odd quorum size
|
||||
for i := 0; i < len(promotions)-1; i += 2 {
|
||||
addFirst := s.raft.AddVoter(promotions[i].ID, promotions[i].Address, 0, 0)
|
||||
if err := addFirst.Error(); err != nil {
|
||||
return newServers, fmt.Errorf("failed to add raft peer: %v", err)
|
||||
}
|
||||
addSecond := s.raft.AddVoter(promotions[i+1].ID, promotions[i+1].Address, 0, 0)
|
||||
if err := addSecond.Error(); err != nil {
|
||||
return newServers, fmt.Errorf("failed to add raft peer: %v", err)
|
||||
}
|
||||
newServers = true
|
||||
}
|
||||
|
||||
// If we added a new server, trigger a check to remove dead servers
|
||||
if newServers {
|
||||
// If we promoted a server, trigger a check to remove dead servers.
|
||||
if len(promotions) > 0 {
|
||||
select {
|
||||
case s.autopilotRemoveDeadCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
return newServers, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// serverHealthLoop monitors the health of the servers in the cluster
|
||||
@ -246,7 +230,7 @@ func (s *Server) serverHealthLoop() {
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := s.updateClusterHealth(); err != nil {
|
||||
s.logger.Printf("[ERR] autopilot: error updating cluster health: %s", err)
|
||||
s.logger.Printf("[ERR] autopilot: Error updating cluster health: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -330,7 +314,7 @@ func (s *Server) updateClusterHealth() error {
|
||||
health.Version = parts.Build.String()
|
||||
if stats, ok := fetchedStats[string(server.ID)]; ok {
|
||||
if err := s.updateServerHealth(&health, parts, stats, autopilotConf, targetLastIndex); err != nil {
|
||||
s.logger.Printf("[WARN] autopilot: error updating server health: %s", err)
|
||||
s.logger.Printf("[WARN] autopilot: Error updating server %s health: %s", fmtServer(server), err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -133,6 +133,85 @@ func TestAutopilot_CleanupDeadServerPeriodic(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestAutopilot_RollingUpdate(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.Bootstrap = true
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
conf := func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.Bootstrap = false
|
||||
}
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, conf)
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
dir3, s3 := testServerWithConfig(t, conf)
|
||||
defer os.RemoveAll(dir3)
|
||||
defer s3.Shutdown()
|
||||
|
||||
// Join the servers to s1, and wait until they are all promoted to
|
||||
// voters.
|
||||
servers := []*Server{s1, s2, s3}
|
||||
for _, s := range servers[1:] {
|
||||
joinLAN(t, s, s1)
|
||||
}
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
r.Check(wantRaft(servers))
|
||||
for _, s := range servers {
|
||||
r.Check(wantPeers(s, 3))
|
||||
}
|
||||
})
|
||||
|
||||
// Add one more server like we are doing a rolling update.
|
||||
dir4, s4 := testServerWithConfig(t, conf)
|
||||
defer os.RemoveAll(dir4)
|
||||
defer s4.Shutdown()
|
||||
joinLAN(t, s1, s4)
|
||||
servers = append(servers, s4)
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
r.Check(wantRaft(servers))
|
||||
for _, s := range servers {
|
||||
r.Check(wantPeers(s, 3))
|
||||
}
|
||||
})
|
||||
|
||||
// Now kill one of the "old" nodes like we are doing a rolling update.
|
||||
s3.Shutdown()
|
||||
|
||||
isVoter := func() bool {
|
||||
future := s1.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
for _, s := range future.Configuration().Servers {
|
||||
if string(s.ID) == string(s4.config.NodeID) {
|
||||
return s.Suffrage == raft.Voter
|
||||
}
|
||||
}
|
||||
t.Fatalf("didn't find s4")
|
||||
return false
|
||||
}
|
||||
|
||||
// Wait for s4 to stabilize, get promoted to a voter, and for s3 to be
|
||||
// removed.
|
||||
servers = []*Server{s1, s2, s4}
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
r.Check(wantRaft(servers))
|
||||
for _, s := range servers {
|
||||
r.Check(wantPeers(s, 3))
|
||||
}
|
||||
if !isVoter() {
|
||||
r.Fatalf("should be a voter")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestAutopilot_CleanupStaleRaftServer(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerDCBootstrap(t, "dc1", true)
|
||||
@ -196,6 +275,7 @@ func TestAutopilot_PromoteNonVoter(t *testing.T) {
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
@ -206,10 +286,8 @@ func TestAutopilot_PromoteNonVoter(t *testing.T) {
|
||||
defer s2.Shutdown()
|
||||
joinLAN(t, s2, s1)
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
// Wait for the new server to be added as a non-voter, but make sure
|
||||
// it doesn't get promoted to a voter even after ServerStabilizationTime,
|
||||
// because that would result in an even-numbered quorum count.
|
||||
// Make sure we see it as a nonvoter initially. We wait until half
|
||||
// the stabilization period has passed.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
future := s1.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
@ -217,7 +295,6 @@ func TestAutopilot_PromoteNonVoter(t *testing.T) {
|
||||
}
|
||||
|
||||
servers := future.Configuration().Servers
|
||||
|
||||
if len(servers) != 2 {
|
||||
r.Fatalf("bad: %v", servers)
|
||||
}
|
||||
@ -231,20 +308,12 @@ func TestAutopilot_PromoteNonVoter(t *testing.T) {
|
||||
if !health.Healthy {
|
||||
r.Fatalf("bad: %v", health)
|
||||
}
|
||||
if time.Since(health.StableSince) < s1.config.AutopilotConfig.ServerStabilizationTime {
|
||||
if time.Since(health.StableSince) < s1.config.AutopilotConfig.ServerStabilizationTime/2 {
|
||||
r.Fatal("stable period not elapsed")
|
||||
}
|
||||
})
|
||||
|
||||
// Now add another server and make sure they both get promoted to voters after stabilization
|
||||
dir3, s3 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.Bootstrap = false
|
||||
c.RaftConfig.ProtocolVersion = 3
|
||||
})
|
||||
defer os.RemoveAll(dir3)
|
||||
defer s3.Shutdown()
|
||||
joinLAN(t, s3, s1)
|
||||
// Make sure it ends up as a voter.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
future := s1.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
@ -252,14 +321,11 @@ func TestAutopilot_PromoteNonVoter(t *testing.T) {
|
||||
}
|
||||
|
||||
servers := future.Configuration().Servers
|
||||
if len(servers) != 3 {
|
||||
if len(servers) != 2 {
|
||||
r.Fatalf("bad: %v", servers)
|
||||
}
|
||||
if servers[1].Suffrage != raft.Voter {
|
||||
r.Fatalf("bad: %v", servers)
|
||||
}
|
||||
if servers[2].Suffrage != raft.Voter {
|
||||
r.Fatalf("bad: %v", servers)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -386,17 +386,22 @@ func TestServer_LeaveLeader(t *testing.T) {
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
// Second server not in bootstrap mode
|
||||
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
// Try to join
|
||||
joinLAN(t, s2, s1)
|
||||
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
|
||||
defer os.RemoveAll(dir3)
|
||||
defer s3.Shutdown()
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc1")
|
||||
|
||||
joinLAN(t, s2, s1)
|
||||
joinLAN(t, s3, s1)
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
r.Check(wantPeers(s1, 3))
|
||||
r.Check(wantPeers(s2, 3))
|
||||
r.Check(wantPeers(s3, 3))
|
||||
})
|
||||
// Issue a leave to the leader
|
||||
var leader *Server
|
||||
switch {
|
||||
@ -404,6 +409,8 @@ func TestServer_LeaveLeader(t *testing.T) {
|
||||
leader = s1
|
||||
case s2.IsLeader():
|
||||
leader = s2
|
||||
case s3.IsLeader():
|
||||
leader = s3
|
||||
default:
|
||||
t.Fatal("no leader")
|
||||
}
|
||||
@ -413,8 +420,9 @@ func TestServer_LeaveLeader(t *testing.T) {
|
||||
|
||||
// Should lose a peer
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
r.Check(wantPeers(s1, 1))
|
||||
r.Check(wantPeers(s2, 1))
|
||||
r.Check(wantPeers(s1, 2))
|
||||
r.Check(wantPeers(s2, 2))
|
||||
r.Check(wantPeers(s3, 2))
|
||||
})
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user