mirror of https://github.com/status-im/consul.git
More autopilot reorganizing
This commit is contained in:
parent
b92f895c23
commit
2310687c1d
|
@ -26,29 +26,30 @@ func (d *AutopilotDelegate) FetchStats(ctx context.Context, servers []serf.Membe
|
||||||
return d.server.statsFetcher.Fetch(ctx, servers)
|
return d.server.statsFetcher.Fetch(ctx, servers)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *AutopilotDelegate) IsServer(m serf.Member) (bool, *autopilot.ServerInfo) {
|
func (d *AutopilotDelegate) IsServer(m serf.Member) (*autopilot.ServerInfo, error) {
|
||||||
if m.Tags["role"] != "consul" {
|
if m.Tags["role"] != "consul" {
|
||||||
return false, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
port_str := m.Tags["port"]
|
port_str := m.Tags["port"]
|
||||||
port, err := strconv.Atoi(port_str)
|
port, err := strconv.Atoi(port_str)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, nil
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
build_version, err := metadata.Build(&m)
|
build_version, err := metadata.Build(&m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, nil
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return true, &autopilot.ServerInfo{
|
server := &autopilot.ServerInfo{
|
||||||
Name: m.Name,
|
Name: m.Name,
|
||||||
ID: m.Tags["id"],
|
ID: m.Tags["id"],
|
||||||
Addr: &net.TCPAddr{IP: m.Addr, Port: port},
|
Addr: &net.TCPAddr{IP: m.Addr, Port: port},
|
||||||
Build: *build_version,
|
Build: *build_version,
|
||||||
Status: m.Status,
|
Status: m.Status,
|
||||||
}
|
}
|
||||||
|
return server, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Heartbeat a metric for monitoring if we're the leader
|
// Heartbeat a metric for monitoring if we're the leader
|
||||||
|
|
|
@ -18,7 +18,7 @@ import (
|
||||||
type Delegate interface {
|
type Delegate interface {
|
||||||
AutopilotConfig() *Config
|
AutopilotConfig() *Config
|
||||||
FetchStats(context.Context, []serf.Member) map[string]*ServerStats
|
FetchStats(context.Context, []serf.Member) map[string]*ServerStats
|
||||||
IsServer(serf.Member) (bool, *ServerInfo)
|
IsServer(serf.Member) (*ServerInfo, error)
|
||||||
NotifyHealth(OperatorHealthReply)
|
NotifyHealth(OperatorHealthReply)
|
||||||
PromoteNonVoters(*Config, OperatorHealthReply) ([]raft.Server, error)
|
PromoteNonVoters(*Config, OperatorHealthReply) ([]raft.Server, error)
|
||||||
Raft() *raft.Raft
|
Raft() *raft.Raft
|
||||||
|
@ -87,36 +87,14 @@ func (a *Autopilot) run() {
|
||||||
case <-a.shutdownCh:
|
case <-a.shutdownCh:
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
autopilotConfig := a.delegate.AutopilotConfig()
|
if err := a.promoteServers(); err != nil {
|
||||||
if autopilotConfig == nil {
|
a.logger.Printf("[ERR] autopilot: %v", err)
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Skip the non-voter promotions unless all servers support the new APIs
|
|
||||||
minRaftProtocol, err := a.MinRaftProtocol()
|
|
||||||
if err != nil {
|
|
||||||
a.logger.Printf("[ERR] autopilot: error getting server raft protocol versions: %s", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if minRaftProtocol >= 3 {
|
|
||||||
promotions, err := a.delegate.PromoteNonVoters(autopilotConfig, a.GetClusterHealth())
|
|
||||||
if err != nil {
|
|
||||||
a.logger.Printf("[ERR] autopilot: Error checking for non-voters to promote: %s", err)
|
|
||||||
}
|
|
||||||
if err := a.handlePromotions(promotions); err != nil {
|
|
||||||
a.logger.Printf("[ERR] autopilot: Error handling promotions: %s", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := a.pruneDeadServers(); err != nil {
|
if err := a.pruneDeadServers(); err != nil {
|
||||||
a.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
|
a.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
|
||||||
}
|
}
|
||||||
case <-a.removeDeadCh:
|
case <-a.removeDeadCh:
|
||||||
autopilotConfig := a.delegate.AutopilotConfig()
|
|
||||||
if autopilotConfig == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := a.pruneDeadServers(); err != nil {
|
if err := a.pruneDeadServers(); err != nil {
|
||||||
a.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
|
a.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -124,6 +102,31 @@ func (a *Autopilot) run() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// promoteServers asks the delegate for any promotions and carries them out.
|
||||||
|
func (a *Autopilot) promoteServers() error {
|
||||||
|
conf := a.delegate.AutopilotConfig()
|
||||||
|
if conf == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip the non-voter promotions unless all servers support the new APIs
|
||||||
|
minRaftProtocol, err := a.MinRaftProtocol()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error getting server raft protocol versions: %s", err)
|
||||||
|
}
|
||||||
|
if minRaftProtocol >= 3 {
|
||||||
|
promotions, err := a.delegate.PromoteNonVoters(conf, a.GetClusterHealth())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error checking for non-voters to promote: %s", err)
|
||||||
|
}
|
||||||
|
if err := a.handlePromotions(promotions); err != nil {
|
||||||
|
return fmt.Errorf("error handling promotions: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// fmtServer prints info about a server in a standard way for logging.
|
// fmtServer prints info about a server in a standard way for logging.
|
||||||
func fmtServer(server raft.Server) string {
|
func fmtServer(server raft.Server) string {
|
||||||
return fmt.Sprintf("Server (ID: %q Address: %q)", server.ID, server.Address)
|
return fmt.Sprintf("Server (ID: %q Address: %q)", server.ID, server.Address)
|
||||||
|
@ -133,7 +136,7 @@ func fmtServer(server raft.Server) string {
|
||||||
func NumPeers(raftConfig raft.Configuration) int {
|
func NumPeers(raftConfig raft.Configuration) int {
|
||||||
var numPeers int
|
var numPeers int
|
||||||
for _, server := range raftConfig.Servers {
|
for _, server := range raftConfig.Servers {
|
||||||
if isVoter(server.Suffrage) {
|
if server.Suffrage == raft.Voter {
|
||||||
numPeers++
|
numPeers++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -159,11 +162,15 @@ func (a *Autopilot) pruneDeadServers() error {
|
||||||
|
|
||||||
serfLAN := a.delegate.Serf()
|
serfLAN := a.delegate.Serf()
|
||||||
for _, member := range serfLAN.Members() {
|
for _, member := range serfLAN.Members() {
|
||||||
valid, parts := a.delegate.IsServer(member)
|
server, err := a.delegate.IsServer(member)
|
||||||
if valid {
|
if err != nil {
|
||||||
|
a.logger.Printf("[INFO] autopilot: Error parsing server info for %q: %s", member.Name, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if server != nil {
|
||||||
// todo(kyhavlov): change this to index by UUID
|
// todo(kyhavlov): change this to index by UUID
|
||||||
if _, ok := staleRaftServers[parts.Addr.String()]; ok {
|
if _, ok := staleRaftServers[server.Addr.String()]; ok {
|
||||||
delete(staleRaftServers, parts.Addr.String())
|
delete(staleRaftServers, server.Addr.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
if member.Status == serf.StatusFailed {
|
if member.Status == serf.StatusFailed {
|
||||||
|
@ -218,7 +225,11 @@ func (a *Autopilot) MinRaftProtocol() (int, error) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if ok, _ := a.delegate.IsServer(m); !ok {
|
server, err := a.delegate.IsServer(m)
|
||||||
|
if err != nil {
|
||||||
|
return -1, err
|
||||||
|
}
|
||||||
|
if server == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -316,9 +327,13 @@ func (a *Autopilot) updateClusterHealth() error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
valid, parts := a.delegate.IsServer(member)
|
server, err := a.delegate.IsServer(member)
|
||||||
if valid {
|
if err != nil {
|
||||||
serverMap[parts.ID] = parts
|
a.logger.Printf("[INFO] autopilot: Error parsing server info for %q: %s", member.Name, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if server != nil {
|
||||||
|
serverMap[server.ID] = server
|
||||||
serverMembers = append(serverMembers, member)
|
serverMembers = append(serverMembers, member)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -450,7 +465,7 @@ func (a *Autopilot) GetServerHealth(id string) *ServerHealth {
|
||||||
return a.clusterHealth.ServerHealth(id)
|
return a.clusterHealth.ServerHealth(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func isVoter(suffrage raft.ServerSuffrage) bool {
|
func isPotentialVoter(suffrage raft.ServerSuffrage) bool {
|
||||||
switch suffrage {
|
switch suffrage {
|
||||||
case raft.Voter, raft.Staging:
|
case raft.Voter, raft.Staging:
|
||||||
return true
|
return true
|
||||||
|
|
|
@ -14,7 +14,7 @@ func PromoteStableServers(autopilotConfig *Config, health OperatorHealthReply, s
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
var promotions []raft.Server
|
var promotions []raft.Server
|
||||||
for _, server := range servers {
|
for _, server := range servers {
|
||||||
if !isVoter(server.Suffrage) {
|
if !isPotentialVoter(server.Suffrage) {
|
||||||
health := health.ServerHealth(string(server.ID))
|
health := health.ServerHealth(string(server.ID))
|
||||||
if health.IsStable(now, autopilotConfig) {
|
if health.IsStable(now, autopilotConfig) {
|
||||||
promotions = append(promotions, server)
|
promotions = append(promotions, server)
|
||||||
|
|
Loading…
Reference in New Issue