Fix an issue with changing server IDs and add a few UX enhancements around autopilot features

This commit is contained in:
Kyle Havlovitz 2017-03-15 16:09:55 -07:00
parent 144a5e5340
commit 51b11cd344
No known key found for this signature in database
GPG Key ID: 8A5E6B173056AD6C
12 changed files with 161 additions and 86 deletions

View File

@ -128,6 +128,9 @@ type ServerHealth struct {
// Autopilot config. // Autopilot config.
Healthy bool Healthy bool
// Voter is whether this is a voting server.
Voter bool
// StableSince is the last time this server's Healthy value changed. // StableSince is the last time this server's Healthy value changed.
StableSince time.Time StableSince time.Time
} }

View File

@ -303,6 +303,7 @@ func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Re
LastTerm: server.LastTerm, LastTerm: server.LastTerm,
LastIndex: server.LastIndex, LastIndex: server.LastIndex,
Healthy: server.Healthy, Healthy: server.Healthy,
Voter: server.Voter,
StableSince: server.StableSince.Round(time.Second).UTC(), StableSince: server.StableSince.Round(time.Second).UTC(),
}) })
} }

View File

@ -6,6 +6,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/agent" "github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
@ -102,7 +103,7 @@ func (s *Server) pruneDeadServers() error {
go s.serfLAN.RemoveFailedNode(server) go s.serfLAN.RemoveFailedNode(server)
} }
} else { } else {
s.logger.Printf("[ERR] consul: Failed to remove dead servers: too many dead servers: %d/%d", len(failed), peers) s.logger.Printf("[DEBUG] consul: Failed to remove dead servers: too many dead servers: %d/%d", len(failed), peers)
} }
return nil return nil
@ -198,8 +199,6 @@ func (s *Server) serverHealthLoop() {
case <-s.shutdownCh: case <-s.shutdownCh:
return return
case <-ticker.C: case <-ticker.C:
serverHealths := make(map[string]*structs.ServerHealth)
// Don't do anything if the min Raft version is too low // Don't do anything if the min Raft version is too low
minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers()) minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers())
if err != nil { if err != nil {
@ -221,7 +220,8 @@ func (s *Server) serverHealthLoop() {
break break
} }
// Build an updated map of server healths // Get the the serf members which are Consul servers
serverMap := make(map[string]serf.Member)
for _, member := range s.LANMembers() { for _, member := range s.LANMembers() {
if member.Status == serf.StatusLeft { if member.Status == serf.StatusLeft {
continue continue
@ -229,31 +229,78 @@ func (s *Server) serverHealthLoop() {
valid, parts := agent.IsConsulServer(member) valid, parts := agent.IsConsulServer(member)
if valid { if valid {
health, err := s.queryServerHealth(member, parts, autopilotConf) serverMap[parts.ID] = member
if err != nil {
s.logger.Printf("[ERR] consul: error fetching server health: %s", err)
serverHealths[parts.ID] = &structs.ServerHealth{
ID: parts.ID,
Name: parts.Name,
Healthy: false,
}
} else {
serverHealths[parts.ID] = health
}
} }
} }
s.serverHealthLock.Lock() future := s.raft.GetConfiguration()
s.serverHealths = serverHealths if err := future.Error(); err != nil {
s.serverHealthLock.Unlock() s.logger.Printf("[ERR] consul: error getting Raft configuration %s", err)
break
}
// Build a current list of server healths
var clusterHealth structs.OperatorHealthReply
servers := future.Configuration().Servers
healthyCount := 0
voterCount := 0
for _, server := range servers {
member, ok := serverMap[string(server.ID)]
if !ok {
s.logger.Printf("[DEBUG] consul: couldn't find serf member for server with ID %q", server.ID)
continue
}
health, err := s.queryServerHealth(member, autopilotConf)
if err != nil {
s.logger.Printf("[ERR] consul: error fetching server health: %s", err)
clusterHealth.Servers = append(clusterHealth.Servers, structs.ServerHealth{
ID: string(server.ID),
Name: member.Name,
SerfStatus: serf.StatusFailed,
})
continue
}
if health.Healthy {
healthyCount++
}
if server.Suffrage != raft.Nonvoter {
health.Voter = true
voterCount++
}
clusterHealth.Servers = append(clusterHealth.Servers, *health)
}
clusterHealth.Healthy = healthyCount == len(servers)
// If we have extra healthy voters, update FailureTolerance
if voterCount > len(servers)/2+1 {
clusterHealth.FailureTolerance = voterCount - (len(servers)/2 + 1)
}
// Heartbeat a metric for monitoring if we're the leader
if s.IsLeader() {
metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
if clusterHealth.Healthy {
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1)
} else {
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0)
}
}
s.clusterHealthLock.Lock()
s.clusterHealth = clusterHealth
s.clusterHealthLock.Unlock()
} }
} }
} }
// queryServerHealth fetches the raft stats for the given server and uses them // queryServerHealth fetches the raft stats for the given server and uses them
// to update its ServerHealth // to update its ServerHealth
func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, func (s *Server) queryServerHealth(member serf.Member, autopilotConf *structs.AutopilotConfig) (*structs.ServerHealth, error) {
autopilotConf *structs.AutopilotConfig) (*structs.ServerHealth, error) { _, server := agent.IsConsulServer(member)
stats, err := s.getServerStats(server) stats, err := s.getServerStats(server)
if err != nil { if err != nil {
return nil, fmt.Errorf("error getting raft stats: %s", err) return nil, fmt.Errorf("error getting raft stats: %s", err)
@ -297,14 +344,21 @@ func (s *Server) queryServerHealth(member serf.Member, server *agent.Server,
return health, nil return health, nil
} }
func (s *Server) getClusterHealth() structs.OperatorHealthReply {
s.clusterHealthLock.RLock()
defer s.clusterHealthLock.RUnlock()
return s.clusterHealth
}
func (s *Server) getServerHealth(id string) *structs.ServerHealth { func (s *Server) getServerHealth(id string) *structs.ServerHealth {
s.serverHealthLock.RLock() s.clusterHealthLock.RLock()
defer s.serverHealthLock.RUnlock() defer s.clusterHealthLock.RUnlock()
h, ok := s.serverHealths[id] for _, health := range s.clusterHealth.Servers {
if !ok { if health.ID == id {
return nil return &health
}
} }
return h return nil
} }
func (s *Server) getServerStats(server *agent.Server) (structs.ServerStats, error) { func (s *Server) getServerStats(server *agent.Server) (structs.ServerStats, error) {

View File

@ -12,15 +12,27 @@ import (
) )
func TestAutopilot_CleanupDeadServer(t *testing.T) { func TestAutopilot_CleanupDeadServer(t *testing.T) {
dir1, s1 := testServerDCBootstrap(t, "dc1", true) for i := 1; i <= 3; i++ {
testCleanupDeadServer(t, i)
}
}
func testCleanupDeadServer(t *testing.T, raftVersion int) {
conf := func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = false
c.BootstrapExpect = 3
c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(raftVersion)
}
dir1, s1 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
defer s1.Shutdown() defer s1.Shutdown()
dir2, s2 := testServerDCBootstrap(t, "dc1", false) dir2, s2 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir2) defer os.RemoveAll(dir2)
defer s2.Shutdown() defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false) dir3, s3 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir3) defer os.RemoveAll(dir3)
defer s3.Shutdown() defer s3.Shutdown()
@ -45,8 +57,13 @@ func TestAutopilot_CleanupDeadServer(t *testing.T) {
}) })
} }
// Bring up a new server
dir4, s4 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir4)
defer s4.Shutdown()
// Kill a non-leader server // Kill a non-leader server
s2.Shutdown() s3.Shutdown()
testutil.WaitForResult(func() (bool, error) { testutil.WaitForResult(func() (bool, error) {
alive := 0 alive := 0
@ -60,15 +77,11 @@ func TestAutopilot_CleanupDeadServer(t *testing.T) {
t.Fatalf("should have 2 alive members") t.Fatalf("should have 2 alive members")
}) })
// Bring up and join a new server // Join the new server
dir4, s4 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir4)
defer s4.Shutdown()
if _, err := s4.JoinLAN([]string{addr}); err != nil { if _, err := s4.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
servers[1] = s4 servers[2] = s4
// Make sure the dead server is removed and we're back to 3 total peers // Make sure the dead server is removed and we're back to 3 total peers
for _, s := range servers { for _, s := range servers {

View File

@ -583,26 +583,42 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error {
// TODO (slackpad) - This will need to be changed once we support node IDs. // TODO (slackpad) - This will need to be changed once we support node IDs.
addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String() addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String()
minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
if err != nil {
return err
}
// See if it's already in the configuration. It's harmless to re-add it // See if it's already in the configuration. It's harmless to re-add it
// but we want to avoid doing that if possible to prevent useless Raft // but we want to avoid doing that if possible to prevent useless Raft
// log entries. // log entries. If the address is the same but the ID changed, remove the
// old server before adding the new one.
configFuture := s.raft.GetConfiguration() configFuture := s.raft.GetConfiguration()
if err := configFuture.Error(); err != nil { if err := configFuture.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err) s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err)
return err return err
} }
for _, server := range configFuture.Configuration().Servers { for _, server := range configFuture.Configuration().Servers {
if server.Address == raft.ServerAddress(addr) { // No-op if the raft version is too low
if server.Address == raft.ServerAddress(addr) && (minRaftProtocol < 2 || parts.RaftVersion < 3) {
return nil return nil
} }
// If the address or ID matches an existing server, see if we need to remove the old one first
if server.Address == raft.ServerAddress(addr) || server.ID == raft.ServerID(parts.ID) {
// Exit with no-op if this is being called on an existing server
if server.Address == raft.ServerAddress(addr) && server.ID == raft.ServerID(parts.ID) {
return nil
} else {
future := s.raft.RemoveServer(server.ID, 0, 0)
if err := future.Error(); err != nil {
return fmt.Errorf("error removing server with duplicate ID: %s", err)
}
s.logger.Printf("[INFO] consul: removed server with duplicate ID: %s", server.ID)
}
}
} }
// Attempt to add as a peer // Attempt to add as a peer
minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
if err != nil {
return err
}
switch { switch {
case minRaftProtocol >= 3: case minRaftProtocol >= 3:
addFuture := s.raft.AddNonvoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0) addFuture := s.raft.AddNonvoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0)
@ -635,7 +651,6 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error {
// removeConsulServer is used to try to remove a consul server that has left // removeConsulServer is used to try to remove a consul server that has left
func (s *Server) removeConsulServer(m serf.Member, port int) error { func (s *Server) removeConsulServer(m serf.Member, port int) error {
// TODO (slackpad) - This will need to be changed once we support node IDs.
addr := (&net.TCPAddr{IP: m.Addr, Port: port}).String() addr := (&net.TCPAddr{IP: m.Addr, Port: port}).String()
// See if it's already in the configuration. It's harmless to re-remove it // See if it's already in the configuration. It's harmless to re-remove it

View File

@ -212,31 +212,7 @@ func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs
return fmt.Errorf("all servers must have raft_protocol set to 3 or higher to use this endpoint") return fmt.Errorf("all servers must have raft_protocol set to 3 or higher to use this endpoint")
} }
var status structs.OperatorHealthReply *reply = op.srv.getClusterHealth()
future := op.srv.raft.GetConfiguration()
if err := future.Error(); err != nil {
return err
}
healthyCount := 0
servers := future.Configuration().Servers
for _, s := range servers {
health := op.srv.getServerHealth(string(s.ID))
if health != nil {
if health.Healthy {
healthyCount++
}
status.Servers = append(status.Servers, *health)
}
}
status.Healthy = healthyCount == len(servers)
// If we have extra healthy servers, set FailureTolerance
if healthyCount > len(servers)/2+1 {
status.FailureTolerance = healthyCount - (len(servers)/2 + 1)
}
*reply = status
return nil return nil
} }

View File

@ -429,22 +429,21 @@ func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) {
} }
func TestOperator_ServerHealth(t *testing.T) { func TestOperator_ServerHealth(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) { conf := func(c *Config) {
c.Datacenter = "dc1" c.Datacenter = "dc1"
c.Bootstrap = true c.Bootstrap = false
c.BootstrapExpect = 3
c.RaftConfig.ProtocolVersion = 3 c.RaftConfig.ProtocolVersion = 3
c.ServerHealthInterval = 100 * time.Millisecond c.ServerHealthInterval = 100 * time.Millisecond
}) c.AutopilotInterval = 100 * time.Millisecond
}
dir1, s1 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
defer s1.Shutdown() defer s1.Shutdown()
codec := rpcClient(t, s1) codec := rpcClient(t, s1)
defer codec.Close() defer codec.Close()
dir2, s2 := testServerWithConfig(t, func(c *Config) { dir2, s2 := testServerWithConfig(t, conf)
c.Datacenter = "dc1"
c.Bootstrap = false
c.RaftConfig.ProtocolVersion = 3
})
defer os.RemoveAll(dir2) defer os.RemoveAll(dir2)
defer s2.Shutdown() defer s2.Shutdown()
addr := fmt.Sprintf("127.0.0.1:%d", addr := fmt.Sprintf("127.0.0.1:%d",
@ -453,11 +452,7 @@ func TestOperator_ServerHealth(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
dir3, s3 := testServerWithConfig(t, func(c *Config) { dir3, s3 := testServerWithConfig(t, conf)
c.Datacenter = "dc1"
c.Bootstrap = false
c.RaftConfig.ProtocolVersion = 3
})
defer os.RemoveAll(dir3) defer os.RemoveAll(dir3)
defer s3.Shutdown() defer s3.Shutdown()
if _, err := s3.JoinLAN([]string{addr}); err != nil { if _, err := s3.JoinLAN([]string{addr}); err != nil {

View File

@ -58,6 +58,7 @@ func (s *Server) lanEventHandler() {
case serf.EventUser: case serf.EventUser:
s.localEvent(e.(serf.UserEvent)) s.localEvent(e.(serf.UserEvent))
case serf.EventMemberUpdate: // Ignore case serf.EventMemberUpdate: // Ignore
s.localMemberEvent(e.(serf.MemberEvent))
case serf.EventQuery: // Ignore case serf.EventQuery: // Ignore
default: default:
s.logger.Printf("[WARN] consul: Unhandled LAN Serf Event: %#v", e) s.logger.Printf("[WARN] consul: Unhandled LAN Serf Event: %#v", e)

View File

@ -88,6 +88,10 @@ type Server struct {
// autopilotWaitGroup is used to block until Autopilot shuts down. // autopilotWaitGroup is used to block until Autopilot shuts down.
autopilotWaitGroup sync.WaitGroup autopilotWaitGroup sync.WaitGroup
// clusterHealth stores the current view of the cluster's health.
clusterHealth structs.OperatorHealthReply
clusterHealthLock sync.RWMutex
// Consul configuration // Consul configuration
config *Config config *Config
@ -157,10 +161,6 @@ type Server struct {
sessionTimers map[string]*time.Timer sessionTimers map[string]*time.Timer
sessionTimersLock sync.Mutex sessionTimersLock sync.Mutex
// serverHealths stores the current view of server healths.
serverHealths map[string]*structs.ServerHealth
serverHealthLock sync.RWMutex
// tombstoneGC is used to track the pending GC invocations // tombstoneGC is used to track the pending GC invocations
// for the KV tombstones // for the KV tombstones
tombstoneGC *state.TombstoneGC tombstoneGC *state.TombstoneGC

View File

@ -127,6 +127,9 @@ type ServerHealth struct {
// Autopilot config. // Autopilot config.
Healthy bool Healthy bool
// Voter is whether this is a voting server.
Voter bool
// StableSince is the last time this server's Healthy value changed. // StableSince is the last time this server's Healthy value changed.
StableSince time.Time StableSince time.Time
} }

View File

@ -365,6 +365,7 @@ A JSON body is returned that looks like this:
"LastTerm": 2, "LastTerm": 2,
"LastIndex": 46, "LastIndex": 46,
"Healthy": true, "Healthy": true,
"Voter": true,
"StableSince": "2017-03-06T22:07:51Z" "StableSince": "2017-03-06T22:07:51Z"
}, },
{ {
@ -375,6 +376,7 @@ A JSON body is returned that looks like this:
"LastTerm": 2, "LastTerm": 2,
"LastIndex": 46, "LastIndex": 46,
"Healthy": true, "Healthy": true,
"Voter": false,
"StableSince": "2017-03-06T22:18:26Z" "StableSince": "2017-03-06T22:18:26Z"
} }
] ]

View File

@ -189,4 +189,16 @@ These metrics give insight into the health of the cluster as a whole.
<td>ms</td> <td>ms</td>
<td>timer</td> <td>timer</td>
</tr> </tr>
<tr>
<td>`consul.autopilot.failure_tolerance`</td>
<td>This tracks the number of voting servers that the cluster can lose while continuing to function.</td>
<td>servers</td>
<td>gauge</td>
</tr>
<tr>
<td>`consul.autopilot.healthy`</td>
<td>This tracks the overall health of the local server cluster. If all servers are considered healthy by Autopilot, this will be set to 1. If any are unhealthy, this will be 0.</td>
<td>boolean</td>
<td>gauge</td>
</tr>
</table> </table>