mirror of
https://github.com/status-im/consul.git
synced 2025-01-09 13:26:07 +00:00
Reorganized cluster health check loop and logic
This commit is contained in:
parent
66ef4a006b
commit
5353221666
@ -112,6 +112,9 @@ type ServerHealth struct {
|
||||
// Name is the node name of the server.
|
||||
Name string
|
||||
|
||||
// Address is the address of the server.
|
||||
Address string
|
||||
|
||||
// The status of the SerfHealth check for the server.
|
||||
SerfStatus string
|
||||
|
||||
|
@ -298,6 +298,7 @@ func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Re
|
||||
out.Servers = append(out.Servers, api.ServerHealth{
|
||||
ID: server.ID,
|
||||
Name: server.Name,
|
||||
Address: server.Address,
|
||||
SerfStatus: server.SerfStatus.String(),
|
||||
LastContact: api.NewReadableDuration(server.LastContact),
|
||||
LastTerm: server.LastTerm,
|
||||
|
@ -199,137 +199,137 @@ func (s *Server) serverHealthLoop() {
|
||||
case <-s.shutdownCh:
|
||||
return
|
||||
case <-ticker.C:
|
||||
// Don't do anything if the min Raft version is too low
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers())
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] consul: error getting server raft protocol versions: %s", err)
|
||||
break
|
||||
}
|
||||
if minRaftProtocol < 3 {
|
||||
break
|
||||
}
|
||||
|
||||
state := s.fsm.State()
|
||||
_, autopilotConf, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] consul: error retrieving autopilot config: %s", err)
|
||||
break
|
||||
}
|
||||
// Bail early if autopilot config hasn't been initialized yet
|
||||
if autopilotConf == nil {
|
||||
break
|
||||
}
|
||||
|
||||
// Get the the serf members which are Consul servers
|
||||
serverMap := make(map[string]serf.Member)
|
||||
for _, member := range s.LANMembers() {
|
||||
if member.Status == serf.StatusLeft {
|
||||
continue
|
||||
}
|
||||
|
||||
valid, parts := agent.IsConsulServer(member)
|
||||
if valid {
|
||||
serverMap[parts.ID] = member
|
||||
}
|
||||
}
|
||||
|
||||
future := s.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
s.logger.Printf("[ERR] consul: error getting Raft configuration %s", err)
|
||||
break
|
||||
}
|
||||
|
||||
// Build a current list of server healths
|
||||
var clusterHealth structs.OperatorHealthReply
|
||||
servers := future.Configuration().Servers
|
||||
healthyCount := 0
|
||||
voterCount := 0
|
||||
for _, server := range servers {
|
||||
member, ok := serverMap[string(server.ID)]
|
||||
if !ok {
|
||||
s.logger.Printf("[DEBUG] consul: couldn't find serf member for server with ID %q", server.ID)
|
||||
continue
|
||||
}
|
||||
|
||||
health, err := s.queryServerHealth(member, autopilotConf)
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] consul: error fetching server health: %s", err)
|
||||
clusterHealth.Servers = append(clusterHealth.Servers, structs.ServerHealth{
|
||||
ID: string(server.ID),
|
||||
Name: member.Name,
|
||||
SerfStatus: serf.StatusFailed,
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
if health.Healthy {
|
||||
healthyCount++
|
||||
}
|
||||
|
||||
if server.Suffrage != raft.Nonvoter {
|
||||
health.Voter = true
|
||||
voterCount++
|
||||
}
|
||||
clusterHealth.Servers = append(clusterHealth.Servers, *health)
|
||||
}
|
||||
clusterHealth.Healthy = healthyCount == len(servers)
|
||||
|
||||
// If we have extra healthy voters, update FailureTolerance
|
||||
if voterCount > len(servers)/2+1 {
|
||||
clusterHealth.FailureTolerance = voterCount - (len(servers)/2 + 1)
|
||||
}
|
||||
|
||||
// Heartbeat a metric for monitoring if we're the leader
|
||||
if s.IsLeader() {
|
||||
metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
|
||||
if clusterHealth.Healthy {
|
||||
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1)
|
||||
} else {
|
||||
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0)
|
||||
}
|
||||
}
|
||||
|
||||
s.clusterHealthLock.Lock()
|
||||
s.clusterHealth = clusterHealth
|
||||
s.clusterHealthLock.Unlock()
|
||||
s.updateClusterHealth()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// queryServerHealth fetches the raft stats for the given server and uses them
|
||||
// updateClusterHealth fetches the Raft stats of the other servers and updates
|
||||
// s.clusterHealth based on the configured Autopilot thresholds
|
||||
func (s *Server) updateClusterHealth() error {
|
||||
// Don't do anything if the min Raft version is too low
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers())
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting server raft protocol versions: %s", err)
|
||||
}
|
||||
if minRaftProtocol < 3 {
|
||||
return nil
|
||||
}
|
||||
|
||||
state := s.fsm.State()
|
||||
_, autopilotConf, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving autopilot config: %s", err)
|
||||
}
|
||||
// Bail early if autopilot config hasn't been initialized yet
|
||||
if autopilotConf == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get the the serf members which are Consul servers
|
||||
serverMap := make(map[string]serf.Member)
|
||||
for _, member := range s.LANMembers() {
|
||||
if member.Status == serf.StatusLeft {
|
||||
continue
|
||||
}
|
||||
|
||||
valid, parts := agent.IsConsulServer(member)
|
||||
if valid {
|
||||
serverMap[parts.ID] = member
|
||||
}
|
||||
}
|
||||
|
||||
future := s.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
return fmt.Errorf("error getting Raft configuration %s", err)
|
||||
}
|
||||
|
||||
// Build a current list of server healths
|
||||
var clusterHealth structs.OperatorHealthReply
|
||||
servers := future.Configuration().Servers
|
||||
healthyCount := 0
|
||||
voterCount := 0
|
||||
for _, server := range servers {
|
||||
health := structs.ServerHealth{
|
||||
ID: string(server.ID),
|
||||
Address: string(server.Address),
|
||||
LastContact: -1,
|
||||
Voter: server.Suffrage != raft.Nonvoter,
|
||||
}
|
||||
|
||||
// Set LastContact to 0 for the leader
|
||||
if s.raft.Leader() == server.Address {
|
||||
health.LastContact = 0
|
||||
}
|
||||
|
||||
member, ok := serverMap[string(server.ID)]
|
||||
if ok {
|
||||
health.Name = member.Name
|
||||
health.SerfStatus = member.Status
|
||||
if err := s.updateServerHealth(&health, member, autopilotConf); err != nil {
|
||||
s.logger.Printf("[ERR] consul: error getting server health: %s", err)
|
||||
}
|
||||
} else {
|
||||
health.SerfStatus = serf.StatusNone
|
||||
}
|
||||
|
||||
if health.Healthy {
|
||||
healthyCount++
|
||||
}
|
||||
|
||||
if health.Voter {
|
||||
voterCount++
|
||||
}
|
||||
|
||||
clusterHealth.Servers = append(clusterHealth.Servers, health)
|
||||
}
|
||||
clusterHealth.Healthy = healthyCount == len(servers)
|
||||
|
||||
// If we have extra healthy voters, update FailureTolerance
|
||||
if voterCount > len(servers)/2+1 {
|
||||
clusterHealth.FailureTolerance = voterCount - (len(servers)/2 + 1)
|
||||
}
|
||||
|
||||
// Heartbeat a metric for monitoring if we're the leader
|
||||
if s.IsLeader() {
|
||||
metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
|
||||
if clusterHealth.Healthy {
|
||||
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1)
|
||||
} else {
|
||||
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0)
|
||||
}
|
||||
}
|
||||
|
||||
s.clusterHealthLock.Lock()
|
||||
s.clusterHealth = clusterHealth
|
||||
s.clusterHealthLock.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateServerHealth fetches the raft stats for the given server and uses them
|
||||
// to update its ServerHealth
|
||||
func (s *Server) queryServerHealth(member serf.Member, autopilotConf *structs.AutopilotConfig) (*structs.ServerHealth, error) {
|
||||
func (s *Server) updateServerHealth(health *structs.ServerHealth, member serf.Member, autopilotConf *structs.AutopilotConfig) error {
|
||||
_, server := agent.IsConsulServer(member)
|
||||
|
||||
stats, err := s.getServerStats(server)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error getting raft stats: %s", err)
|
||||
return fmt.Errorf("error getting raft stats: %s", err)
|
||||
}
|
||||
|
||||
health := &structs.ServerHealth{
|
||||
ID: server.ID,
|
||||
Name: server.Name,
|
||||
SerfStatus: member.Status,
|
||||
LastContact: -1,
|
||||
LastTerm: stats.LastTerm,
|
||||
LastIndex: stats.LastIndex,
|
||||
}
|
||||
health.LastTerm = stats.LastTerm
|
||||
health.LastIndex = stats.LastIndex
|
||||
|
||||
if stats.LastContact != "never" {
|
||||
health.LastContact, err = time.ParseDuration(stats.LastContact)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error parsing last_contact duration: %s", err)
|
||||
return fmt.Errorf("error parsing last_contact duration: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Set LastContact to 0 for the leader
|
||||
if s.config.NodeName == member.Name {
|
||||
health.LastContact = 0
|
||||
}
|
||||
|
||||
lastTerm, err := strconv.ParseUint(s.raft.Stats()["last_log_term"], 10, 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error parsing last_log_term: %s", err)
|
||||
return fmt.Errorf("error parsing last_log_term: %s", err)
|
||||
}
|
||||
health.Healthy = health.IsHealthy(lastTerm, s.raft.LastIndex(), autopilotConf)
|
||||
|
||||
@ -341,7 +341,7 @@ func (s *Server) queryServerHealth(member serf.Member, autopilotConf *structs.Au
|
||||
health.StableSince = lastHealth.StableSince
|
||||
}
|
||||
|
||||
return health, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) getClusterHealth() structs.OperatorHealthReply {
|
||||
|
@ -874,4 +874,4 @@ func TestLeader_ChangeServerID(t *testing.T) {
|
||||
t.Fatalf("should have 3 peers")
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -479,14 +479,15 @@ func TestOperator_ServerHealth(t *testing.T) {
|
||||
if len(reply.Servers) != 3 {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
}
|
||||
if reply.Servers[0].LastContact != 0 {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
}
|
||||
if reply.Servers[1].LastContact <= 0 {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
}
|
||||
if reply.Servers[2].LastContact <= 0 {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
// Leader should have LastContact == 0, others should be positive
|
||||
for _, s := range reply.Servers {
|
||||
isLeader := s1.raft.Leader() == raft.ServerAddress(s.Address)
|
||||
if isLeader && s.LastContact != 0 {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
}
|
||||
if !isLeader && s.LastContact <= 0 {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
|
@ -111,6 +111,9 @@ type ServerHealth struct {
|
||||
// Name is the node name of the server.
|
||||
Name string
|
||||
|
||||
// Address is the address of the server.
|
||||
Address string
|
||||
|
||||
// The status of the SerfHealth check for the server.
|
||||
SerfStatus serf.MemberStatus
|
||||
|
||||
|
@ -360,6 +360,7 @@ A JSON body is returned that looks like this:
|
||||
{
|
||||
"ID": "e349749b-3303-3ddf-959c-b5885a0e1f6e",
|
||||
"Name": "node1",
|
||||
"Address": "127.0.0.1:8300",
|
||||
"SerfStatus": "alive",
|
||||
"LastContact": "0s",
|
||||
"LastTerm": 2,
|
||||
@ -371,6 +372,7 @@ A JSON body is returned that looks like this:
|
||||
{
|
||||
"ID": "e36ee410-cc3c-0a0c-c724-63817ab30303",
|
||||
"Name": "node2",
|
||||
"Address": "127.0.0.1:8205",
|
||||
"SerfStatus": "alive",
|
||||
"LastContact": "27.291304ms",
|
||||
"LastTerm": 2,
|
||||
@ -394,6 +396,8 @@ The `Servers` list holds detailed health information on each server:
|
||||
|
||||
- `Name` is the node name of the server.
|
||||
|
||||
- `Address` is the address of the server.
|
||||
|
||||
- `SerfStatus` is the SerfHealth check status for the server.
|
||||
|
||||
- `LastContact` is the time elapsed since this server's last contact with the leader.
|
||||
@ -404,4 +408,6 @@ The `Servers` list holds detailed health information on each server:
|
||||
|
||||
- `Healthy` is whether the server is healthy according to the current Autopilot configuration.
|
||||
|
||||
- `Voter` is whether the server is a voting member of the Raft cluster.
|
||||
|
||||
- `StableSince` is the time this server has been in its current `Healthy` state.
|
Loading…
x
Reference in New Issue
Block a user