mirror of https://github.com/status-im/consul.git
Add AutopilotPolicy interface and BasicAutopilot
This commit is contained in:
parent
c3d638e2c5
commit
ab0e412db4
|
@ -11,6 +11,12 @@ import (
|
|||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
// AutopilotPolicy is the interface for the Autopilot mechanism
|
||||
type AutopilotPolicy interface {
|
||||
// PromoteNonVoters defines the handling of non-voting servers
|
||||
PromoteNonVoters(*structs.AutopilotConfig) error
|
||||
}
|
||||
|
||||
func (s *Server) startAutopilot() {
|
||||
s.autopilotShutdownCh = make(chan struct{})
|
||||
|
||||
|
@ -48,16 +54,20 @@ func (s *Server) serverHealthLoop() {
|
|||
|
||||
valid, parts := agent.IsConsulServer(member)
|
||||
if valid {
|
||||
health := s.queryServerHealth(member, parts, autopilotConf)
|
||||
health, err := s.queryServerHealth(member, parts, autopilotConf)
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] consul: error fetching server health: %s", err)
|
||||
} else {
|
||||
serverHealths[parts.Addr.String()] = health
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
s.autopilotLock.Lock()
|
||||
s.autopilotHealth = serverHealths
|
||||
s.autopilotLock.Unlock()
|
||||
|
||||
if err := s.promoteNonVoters(autopilotConf); err != nil {
|
||||
if err := s.autopilotPolicy.PromoteNonVoters(autopilotConf); err != nil {
|
||||
s.logger.Printf("[ERR] consul: error checking for non-voters to promote: %s", err)
|
||||
}
|
||||
}
|
||||
|
@ -121,9 +131,15 @@ func (s *Server) pruneDeadServers() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// promoteNonVoters promotes eligible non-voting servers to voters.
|
||||
func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error {
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers())
|
||||
// BasicAutopilot defines a policy for promoting non-voting servers in a way
|
||||
// that maintains an odd-numbered voter count.
|
||||
type BasicAutopilot struct {
|
||||
server *Server
|
||||
}
|
||||
|
||||
// PromoteNonVoters promotes eligible non-voting servers to voters.
|
||||
func (b *BasicAutopilot) PromoteNonVoters(autopilotConf *structs.AutopilotConfig) error {
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(b.server.LANMembers())
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting server raft protocol versions: %s", err)
|
||||
}
|
||||
|
@ -133,7 +149,7 @@ func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error
|
|||
return nil
|
||||
}
|
||||
|
||||
future := s.raft.GetConfiguration()
|
||||
future := b.server.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
return fmt.Errorf("failed to get raft configuration: %v", err)
|
||||
}
|
||||
|
@ -144,7 +160,7 @@ func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error
|
|||
for _, server := range raftServers {
|
||||
// If this server has been stable and passing for long enough, promote it to a voter
|
||||
if server.Suffrage == raft.Nonvoter {
|
||||
health := s.getServerHealth(string(server.Address))
|
||||
health := b.server.getServerHealth(string(server.Address))
|
||||
if health != nil && health.Healthy && time.Now().Sub(health.StableSince) >= autopilotConf.ServerStabilizationTime {
|
||||
promotions = append(promotions, server)
|
||||
}
|
||||
|
@ -162,7 +178,7 @@ func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error
|
|||
// to get to an odd-sized quorum
|
||||
newServers := false
|
||||
if voterCount%2 == 0 {
|
||||
addFuture := s.raft.AddVoter(promotions[0].ID, promotions[0].Address, 0, 0)
|
||||
addFuture := b.server.raft.AddVoter(promotions[0].ID, promotions[0].Address, 0, 0)
|
||||
if err := addFuture.Error(); err != nil {
|
||||
return fmt.Errorf("failed to add raft peer: %v", err)
|
||||
}
|
||||
|
@ -172,11 +188,11 @@ func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error
|
|||
|
||||
// Promote remaining servers in twos to maintain an odd quorum size
|
||||
for i := 0; i < len(promotions)-1; i += 2 {
|
||||
addFirst := s.raft.AddVoter(promotions[i].ID, promotions[i].Address, 0, 0)
|
||||
addFirst := b.server.raft.AddVoter(promotions[i].ID, promotions[i].Address, 0, 0)
|
||||
if err := addFirst.Error(); err != nil {
|
||||
return fmt.Errorf("failed to add raft peer: %v", err)
|
||||
}
|
||||
addSecond := s.raft.AddVoter(promotions[i+1].ID, promotions[i+1].Address, 0, 0)
|
||||
addSecond := b.server.raft.AddVoter(promotions[i+1].ID, promotions[i+1].Address, 0, 0)
|
||||
if err := addSecond.Error(); err != nil {
|
||||
return fmt.Errorf("failed to add raft peer: %v", err)
|
||||
}
|
||||
|
@ -186,7 +202,7 @@ func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error
|
|||
// If we added a new server, trigger a check to remove dead servers
|
||||
if newServers {
|
||||
select {
|
||||
case s.autopilotRemoveDeadCh <- struct{}{}:
|
||||
case b.server.autopilotRemoveDeadCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
@ -196,10 +212,11 @@ func (s *Server) promoteNonVoters(autopilotConf *structs.AutopilotConfig) error
|
|||
|
||||
// queryServerHealth fetches the raft stats for the given server and uses them
|
||||
// to update its ServerHealth
|
||||
func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, autopilotConf *structs.AutopilotConfig) *structs.ServerHealth {
|
||||
func (s *Server) queryServerHealth(member serf.Member, server *agent.Server,
|
||||
autopilotConf *structs.AutopilotConfig) (*structs.ServerHealth, error) {
|
||||
stats, err := s.getServerStats(server)
|
||||
if err != nil {
|
||||
s.logger.Printf("[DEBUG] consul: error getting server's raft stats: %s", err)
|
||||
return nil, fmt.Errorf("error getting raft stats: %s", err)
|
||||
}
|
||||
|
||||
health := &structs.ServerHealth{
|
||||
|
@ -216,7 +233,7 @@ func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, aut
|
|||
if health.LastContact != "never" {
|
||||
health.LastContactRaw, err = time.ParseDuration(health.LastContact)
|
||||
if err != nil {
|
||||
s.logger.Printf("[DEBUG] consul: error parsing server's last_contact value: %s", err)
|
||||
return nil, fmt.Errorf("error parsing last_contact duration: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -236,7 +253,7 @@ func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, aut
|
|||
health.StableSince = lastHealth.StableSince
|
||||
}
|
||||
|
||||
return health
|
||||
return health, nil
|
||||
}
|
||||
|
||||
func (s *Server) getServerHealth(addr string) *structs.ServerHealth {
|
||||
|
|
|
@ -76,12 +76,19 @@ type Server struct {
|
|||
// aclCache is the non-authoritative ACL cache.
|
||||
aclCache *aclCache
|
||||
|
||||
// autopilot
|
||||
// autopilotHealth stores the current view of server healths.
|
||||
autopilotHealth map[string]*structs.ServerHealth
|
||||
autopilotLock sync.RWMutex
|
||||
autopilotShutdownCh chan struct{}
|
||||
|
||||
// autopilotPolicy controls the behavior of Autopilot for certain tasks.
|
||||
autopilotPolicy AutopilotPolicy
|
||||
|
||||
// autopilotRemoveDeadCh is used to trigger a check for dead server removals.
|
||||
autopilotRemoveDeadCh chan struct{}
|
||||
|
||||
// autopilotShutdownCh is used to stop the Autopilot loop.
|
||||
autopilotShutdownCh chan struct{}
|
||||
|
||||
// Consul configuration
|
||||
config *Config
|
||||
|
||||
|
@ -243,6 +250,7 @@ func NewServer(config *Config) (*Server, error) {
|
|||
tombstoneGC: gc,
|
||||
shutdownCh: make(chan struct{}),
|
||||
}
|
||||
s.autopilotPolicy = &BasicAutopilot{s}
|
||||
|
||||
// Initialize the authoritative ACL cache.
|
||||
s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclLocalFault)
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
// AutopilotConfig holds the Autopilot configuration for a cluster.
|
||||
type AutopilotConfig struct {
|
||||
// CleanupDeadServers controls whether to remove dead servers when a new
|
||||
// server is added to the Raft peers.
|
||||
|
|
Loading…
Reference in New Issue