diff --git a/consul/autopilot.go b/consul/autopilot.go index 1df9c95712..f775448bd8 100644 --- a/consul/autopilot.go +++ b/consul/autopilot.go @@ -11,6 +11,12 @@ import ( "github.com/hashicorp/serf/serf" ) +// AutopilotPolicy is the interface for the Autopilot mechanism +type AutopilotPolicy interface { + // PromoteNonVoters defines the handling of non-voting servers + PromoteNonVoters(*structs.AutopilotConfig) error +} + func (s *Server) startAutopilot() { s.autopilotShutdownCh = make(chan struct{}) @@ -48,8 +54,12 @@ func (s *Server) serverHealthLoop() { valid, parts := agent.IsConsulServer(member) if valid { - health := s.queryServerHealth(member, parts, autopilotConf) - serverHealths[parts.Addr.String()] = health + health, err := s.queryServerHealth(member, parts, autopilotConf) + if err != nil { + s.logger.Printf("[ERR] consul: error fetching server health: %s", err) + } else { + serverHealths[parts.Addr.String()] = health + } } } @@ -57,7 +67,7 @@ func (s *Server) serverHealthLoop() { s.autopilotHealth = serverHealths s.autopilotLock.Unlock() - if err := s.promoteNonVoters(autopilotConf); err != nil { + if err := s.autopilotPolicy.PromoteNonVoters(autopilotConf); err != nil { s.logger.Printf("[ERR] consul: error checking for non-voters to promote: %s", err) } } @@ -121,9 +131,15 @@ func (s *Server) pruneDeadServers() error { return nil } -// promoteNonVoters promotes eligible non-voting servers to voters. -func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error { - minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers()) +// BasicAutopilot defines a policy for promoting non-voting servers in a way +// that maintains an odd-numbered voter count. +type BasicAutopilot struct { + server *Server +} + +// PromoteNonVoters promotes eligible non-voting servers to voters. +func (b *BasicAutopilot) PromoteNonVoters(autopilotConf *structs.AutopilotConfig) error { + minRaftProtocol, err := ServerMinRaftProtocol(b.server.LANMembers()) if err != nil { return fmt.Errorf("error getting server raft protocol versions: %s", err) } @@ -133,7 +149,7 @@ func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error return nil } - future := s.raft.GetConfiguration() + future := b.server.raft.GetConfiguration() if err := future.Error(); err != nil { return fmt.Errorf("failed to get raft configuration: %v", err) } @@ -144,7 +160,7 @@ func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error for _, server := range raftServers { // If this server has been stable and passing for long enough, promote it to a voter if server.Suffrage == raft.Nonvoter { - health := s.getServerHealth(string(server.Address)) + health := b.server.getServerHealth(string(server.Address)) if health != nil && health.Healthy && time.Now().Sub(health.StableSince) >= autopilotConf.ServerStabilizationTime { promotions = append(promotions, server) } @@ -162,7 +178,7 @@ func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error // to get to an odd-sized quorum newServers := false if voterCount%2 == 0 { - addFuture := s.raft.AddVoter(promotions[0].ID, promotions[0].Address, 0, 0) + addFuture := b.server.raft.AddVoter(promotions[0].ID, promotions[0].Address, 0, 0) if err := addFuture.Error(); err != nil { return fmt.Errorf("failed to add raft peer: %v", err) } @@ -172,11 +188,11 @@ func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error // Promote remaining servers in twos to maintain an odd quorum size for i := 0; i < len(promotions)-1; i += 2 { - addFirst := s.raft.AddVoter(promotions[i].ID, promotions[i].Address, 0, 0) + addFirst := b.server.raft.AddVoter(promotions[i].ID, promotions[i].Address, 0, 0) if err := addFirst.Error(); err != nil { return fmt.Errorf("failed to add raft peer: %v", err) } - addSecond := s.raft.AddVoter(promotions[i+1].ID, promotions[i+1].Address, 0, 0) + addSecond := b.server.raft.AddVoter(promotions[i+1].ID, promotions[i+1].Address, 0, 0) if err := addSecond.Error(); err != nil { return fmt.Errorf("failed to add raft peer: %v", err) } @@ -186,7 +202,7 @@ func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error // If we added a new server, trigger a check to remove dead servers if newServers { select { - case s.autopilotRemoveDeadCh <- struct{}{}: + case b.server.autopilotRemoveDeadCh <- struct{}{}: default: } } @@ -196,10 +212,11 @@ func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error // queryServerHealth fetches the raft stats for the given server and uses them // to update its ServerHealth -func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, autopilotConf *structs.AutopilotConfig) *structs.ServerHealth { +func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, + autopilotConf *structs.AutopilotConfig) (*structs.ServerHealth, error) { stats, err := s.getServerStats(server) if err != nil { - s.logger.Printf("[DEBUG] consul: error getting server's raft stats: %s", err) + return nil, fmt.Errorf("error getting raft stats: %s", err) } health := &structs.ServerHealth{ @@ -216,7 +233,7 @@ func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, aut if health.LastContact != "never" { health.LastContactRaw, err = time.ParseDuration(health.LastContact) if err != nil { - s.logger.Printf("[DEBUG] consul: error parsing server's last_contact value: %s", err) + return nil, fmt.Errorf("error parsing last_contact duration: %s", err) } } @@ -236,7 +253,7 @@ func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, aut health.StableSince = lastHealth.StableSince } - return health + return health, nil } func (s *Server) getServerHealth(addr string) *structs.ServerHealth { diff --git a/consul/server.go b/consul/server.go index f14d30053c..e46fedd9ff 100644 --- a/consul/server.go +++ b/consul/server.go @@ -76,12 +76,19 @@ type Server struct { // aclCache is the non-authoritative ACL cache. aclCache *aclCache - // autopilot - autopilotHealth map[string]*structs.ServerHealth - autopilotLock sync.RWMutex - autopilotShutdownCh chan struct{} + // autopilotHealth stores the current view of server healths. + autopilotHealth map[string]*structs.ServerHealth + autopilotLock sync.RWMutex + + // autopilotPolicy controls the behavior of Autopilot for certain tasks. + autopilotPolicy AutopilotPolicy + + // autopilotRemoveDeadCh is used to trigger a check for dead server removals. autopilotRemoveDeadCh chan struct{} + // autopilotShutdownCh is used to stop the Autopilot loop. + autopilotShutdownCh chan struct{} + // Consul configuration config *Config @@ -243,6 +250,7 @@ func NewServer(config *Config) (*Server, error) { tombstoneGC: gc, shutdownCh: make(chan struct{}), } + s.autopilotPolicy = &BasicAutopilot{s} // Initialize the authoritative ACL cache. s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclLocalFault) diff --git a/consul/structs/operator.go b/consul/structs/operator.go index 1babb4972b..0ebeac8d3e 100644 --- a/consul/structs/operator.go +++ b/consul/structs/operator.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/serf/serf" ) +// AutopilotConfig holds the Autopilot configuration for a cluster. type AutopilotConfig struct { // CleanupDeadServers controls whether to remove dead servers when a new // server is added to the Raft peers.