mirror of
https://github.com/status-im/consul.git
synced 2025-01-22 11:40:06 +00:00
Merge pull request #2802 from hashicorp/f-autopilot-improvements
Fix an issue with changing server ID when re-joining
This commit is contained in:
commit
09e2663d10
@ -112,6 +112,9 @@ type ServerHealth struct {
|
||||
// Name is the node name of the server.
|
||||
Name string
|
||||
|
||||
// Address is the address of the server.
|
||||
Address string
|
||||
|
||||
// The status of the SerfHealth check for the server.
|
||||
SerfStatus string
|
||||
|
||||
@ -128,6 +131,9 @@ type ServerHealth struct {
|
||||
// Autopilot config.
|
||||
Healthy bool
|
||||
|
||||
// Voter is whether this is a voting server.
|
||||
Voter bool
|
||||
|
||||
// StableSince is the last time this server's Healthy value changed.
|
||||
StableSince time.Time
|
||||
}
|
||||
|
@ -298,11 +298,13 @@ func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Re
|
||||
out.Servers = append(out.Servers, api.ServerHealth{
|
||||
ID: server.ID,
|
||||
Name: server.Name,
|
||||
Address: server.Address,
|
||||
SerfStatus: server.SerfStatus.String(),
|
||||
LastContact: api.NewReadableDuration(server.LastContact),
|
||||
LastTerm: server.LastTerm,
|
||||
LastIndex: server.LastIndex,
|
||||
Healthy: server.Healthy,
|
||||
Voter: server.Voter,
|
||||
StableSince: server.StableSince.Round(time.Second).UTC(),
|
||||
})
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/consul/agent"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/raft"
|
||||
@ -102,7 +103,7 @@ func (s *Server) pruneDeadServers() error {
|
||||
go s.serfLAN.RemoveFailedNode(server)
|
||||
}
|
||||
} else {
|
||||
s.logger.Printf("[ERR] consul: Failed to remove dead servers: too many dead servers: %d/%d", len(failed), peers)
|
||||
s.logger.Printf("[DEBUG] consul: Failed to remove dead servers: too many dead servers: %d/%d", len(failed), peers)
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -198,91 +199,135 @@ func (s *Server) serverHealthLoop() {
|
||||
case <-s.shutdownCh:
|
||||
return
|
||||
case <-ticker.C:
|
||||
serverHealths := make(map[string]*structs.ServerHealth)
|
||||
|
||||
// Don't do anything if the min Raft version is too low
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers())
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] consul: error getting server raft protocol versions: %s", err)
|
||||
break
|
||||
if err := s.updateClusterHealth(); err != nil {
|
||||
s.logger.Printf("[ERR] consul: error updating cluster health: %s", err)
|
||||
}
|
||||
if minRaftProtocol < 3 {
|
||||
break
|
||||
}
|
||||
|
||||
state := s.fsm.State()
|
||||
_, autopilotConf, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] consul: error retrieving autopilot config: %s", err)
|
||||
break
|
||||
}
|
||||
// Bail early if autopilot config hasn't been initialized yet
|
||||
if autopilotConf == nil {
|
||||
break
|
||||
}
|
||||
|
||||
// Build an updated map of server healths
|
||||
for _, member := range s.LANMembers() {
|
||||
if member.Status == serf.StatusLeft {
|
||||
continue
|
||||
}
|
||||
|
||||
valid, parts := agent.IsConsulServer(member)
|
||||
if valid {
|
||||
health, err := s.queryServerHealth(member, parts, autopilotConf)
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] consul: error fetching server health: %s", err)
|
||||
serverHealths[parts.ID] = &structs.ServerHealth{
|
||||
ID: parts.ID,
|
||||
Name: parts.Name,
|
||||
Healthy: false,
|
||||
}
|
||||
} else {
|
||||
serverHealths[parts.ID] = health
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
s.serverHealthLock.Lock()
|
||||
s.serverHealths = serverHealths
|
||||
s.serverHealthLock.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// queryServerHealth fetches the raft stats for the given server and uses them
|
||||
// to update its ServerHealth
|
||||
func (s *Server) queryServerHealth(member serf.Member, server *agent.Server,
|
||||
autopilotConf *structs.AutopilotConfig) (*structs.ServerHealth, error) {
|
||||
stats, err := s.getServerStats(server)
|
||||
// updateClusterHealth fetches the Raft stats of the other servers and updates
|
||||
// s.clusterHealth based on the configured Autopilot thresholds
|
||||
func (s *Server) updateClusterHealth() error {
|
||||
// Don't do anything if the min Raft version is too low
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error getting raft stats: %s", err)
|
||||
return fmt.Errorf("error getting server raft protocol versions: %s", err)
|
||||
}
|
||||
if minRaftProtocol < 3 {
|
||||
return nil
|
||||
}
|
||||
|
||||
health := &structs.ServerHealth{
|
||||
ID: server.ID,
|
||||
Name: server.Name,
|
||||
SerfStatus: member.Status,
|
||||
LastContact: -1,
|
||||
LastTerm: stats.LastTerm,
|
||||
LastIndex: stats.LastIndex,
|
||||
state := s.fsm.State()
|
||||
_, autopilotConf, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving autopilot config: %s", err)
|
||||
}
|
||||
// Bail early if autopilot config hasn't been initialized yet
|
||||
if autopilotConf == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get the the serf members which are Consul servers
|
||||
serverMap := make(map[string]serf.Member)
|
||||
for _, member := range s.LANMembers() {
|
||||
if member.Status == serf.StatusLeft {
|
||||
continue
|
||||
}
|
||||
|
||||
valid, parts := agent.IsConsulServer(member)
|
||||
if valid {
|
||||
serverMap[parts.ID] = member
|
||||
}
|
||||
}
|
||||
|
||||
future := s.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
return fmt.Errorf("error getting Raft configuration %s", err)
|
||||
}
|
||||
|
||||
// Build a current list of server healths
|
||||
var clusterHealth structs.OperatorHealthReply
|
||||
servers := future.Configuration().Servers
|
||||
healthyCount := 0
|
||||
voterCount := 0
|
||||
for _, server := range servers {
|
||||
health := structs.ServerHealth{
|
||||
ID: string(server.ID),
|
||||
Address: string(server.Address),
|
||||
LastContact: -1,
|
||||
Voter: server.Suffrage == raft.Voter,
|
||||
}
|
||||
|
||||
member, ok := serverMap[string(server.ID)]
|
||||
if ok {
|
||||
health.Name = member.Name
|
||||
health.SerfStatus = member.Status
|
||||
if err := s.updateServerHealth(&health, member, autopilotConf); err != nil {
|
||||
s.logger.Printf("[ERR] consul: error getting server health: %s", err)
|
||||
}
|
||||
} else {
|
||||
health.SerfStatus = serf.StatusNone
|
||||
}
|
||||
|
||||
if health.Healthy {
|
||||
healthyCount++
|
||||
}
|
||||
|
||||
if health.Voter {
|
||||
voterCount++
|
||||
}
|
||||
|
||||
clusterHealth.Servers = append(clusterHealth.Servers, health)
|
||||
}
|
||||
clusterHealth.Healthy = healthyCount == len(servers)
|
||||
|
||||
// If we have extra healthy voters, update FailureTolerance
|
||||
requiredQuorum := len(servers)/2 + 1
|
||||
if voterCount > requiredQuorum {
|
||||
clusterHealth.FailureTolerance = voterCount - requiredQuorum
|
||||
}
|
||||
|
||||
// Heartbeat a metric for monitoring if we're the leader
|
||||
if s.IsLeader() {
|
||||
metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
|
||||
if clusterHealth.Healthy {
|
||||
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1)
|
||||
} else {
|
||||
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0)
|
||||
}
|
||||
}
|
||||
|
||||
s.clusterHealthLock.Lock()
|
||||
s.clusterHealth = clusterHealth
|
||||
s.clusterHealthLock.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateServerHealth fetches the raft stats for the given server and uses them
|
||||
// to update its ServerHealth
|
||||
func (s *Server) updateServerHealth(health *structs.ServerHealth, member serf.Member, autopilotConf *structs.AutopilotConfig) error {
|
||||
_, server := agent.IsConsulServer(member)
|
||||
|
||||
stats, err := s.getServerStats(server)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting raft stats: %s", err)
|
||||
}
|
||||
|
||||
health.LastTerm = stats.LastTerm
|
||||
health.LastIndex = stats.LastIndex
|
||||
|
||||
if stats.LastContact != "never" {
|
||||
health.LastContact, err = time.ParseDuration(stats.LastContact)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error parsing last_contact duration: %s", err)
|
||||
return fmt.Errorf("error parsing last_contact duration: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Set LastContact to 0 for the leader
|
||||
if s.config.NodeName == member.Name {
|
||||
health.LastContact = 0
|
||||
}
|
||||
|
||||
lastTerm, err := strconv.ParseUint(s.raft.Stats()["last_log_term"], 10, 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error parsing last_log_term: %s", err)
|
||||
return fmt.Errorf("error parsing last_log_term: %s", err)
|
||||
}
|
||||
health.Healthy = health.IsHealthy(lastTerm, s.raft.LastIndex(), autopilotConf)
|
||||
|
||||
@ -294,17 +339,24 @@ func (s *Server) queryServerHealth(member serf.Member, server *agent.Server,
|
||||
health.StableSince = lastHealth.StableSince
|
||||
}
|
||||
|
||||
return health, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) getClusterHealth() structs.OperatorHealthReply {
|
||||
s.clusterHealthLock.RLock()
|
||||
defer s.clusterHealthLock.RUnlock()
|
||||
return s.clusterHealth
|
||||
}
|
||||
|
||||
func (s *Server) getServerHealth(id string) *structs.ServerHealth {
|
||||
s.serverHealthLock.RLock()
|
||||
defer s.serverHealthLock.RUnlock()
|
||||
h, ok := s.serverHealths[id]
|
||||
if !ok {
|
||||
return nil
|
||||
s.clusterHealthLock.RLock()
|
||||
defer s.clusterHealthLock.RUnlock()
|
||||
for _, health := range s.clusterHealth.Servers {
|
||||
if health.ID == id {
|
||||
return &health
|
||||
}
|
||||
}
|
||||
return h
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) getServerStats(server *agent.Server) (structs.ServerStats, error) {
|
||||
|
@ -12,15 +12,27 @@ import (
|
||||
)
|
||||
|
||||
func TestAutopilot_CleanupDeadServer(t *testing.T) {
|
||||
dir1, s1 := testServerDCBootstrap(t, "dc1", true)
|
||||
for i := 1; i <= 3; i++ {
|
||||
testCleanupDeadServer(t, i)
|
||||
}
|
||||
}
|
||||
|
||||
func testCleanupDeadServer(t *testing.T, raftVersion int) {
|
||||
conf := func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.Bootstrap = false
|
||||
c.BootstrapExpect = 3
|
||||
c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(raftVersion)
|
||||
}
|
||||
dir1, s1 := testServerWithConfig(t, conf)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
|
||||
dir2, s2 := testServerWithConfig(t, conf)
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
|
||||
dir3, s3 := testServerWithConfig(t, conf)
|
||||
defer os.RemoveAll(dir3)
|
||||
defer s3.Shutdown()
|
||||
|
||||
@ -45,8 +57,13 @@ func TestAutopilot_CleanupDeadServer(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// Bring up a new server
|
||||
dir4, s4 := testServerWithConfig(t, conf)
|
||||
defer os.RemoveAll(dir4)
|
||||
defer s4.Shutdown()
|
||||
|
||||
// Kill a non-leader server
|
||||
s2.Shutdown()
|
||||
s3.Shutdown()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
alive := 0
|
||||
@ -60,15 +77,11 @@ func TestAutopilot_CleanupDeadServer(t *testing.T) {
|
||||
t.Fatalf("should have 2 alive members")
|
||||
})
|
||||
|
||||
// Bring up and join a new server
|
||||
dir4, s4 := testServerDCBootstrap(t, "dc1", false)
|
||||
defer os.RemoveAll(dir4)
|
||||
defer s4.Shutdown()
|
||||
|
||||
// Join the new server
|
||||
if _, err := s4.JoinLAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
servers[1] = s4
|
||||
servers[2] = s4
|
||||
|
||||
// Make sure the dead server is removed and we're back to 3 total peers
|
||||
for _, s := range servers {
|
||||
|
@ -580,29 +580,51 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO (slackpad) - This will need to be changed once we support node IDs.
|
||||
addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String()
|
||||
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// See if it's already in the configuration. It's harmless to re-add it
|
||||
// but we want to avoid doing that if possible to prevent useless Raft
|
||||
// log entries.
|
||||
// log entries. If the address is the same but the ID changed, remove the
|
||||
// old server before adding the new one.
|
||||
configFuture := s.raft.GetConfiguration()
|
||||
if err := configFuture.Error(); err != nil {
|
||||
s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err)
|
||||
return err
|
||||
}
|
||||
for _, server := range configFuture.Configuration().Servers {
|
||||
if server.Address == raft.ServerAddress(addr) {
|
||||
// No-op if the raft version is too low
|
||||
if server.Address == raft.ServerAddress(addr) && (minRaftProtocol < 2 || parts.RaftVersion < 3) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// If the address or ID matches an existing server, see if we need to remove the old one first
|
||||
if server.Address == raft.ServerAddress(addr) || server.ID == raft.ServerID(parts.ID) {
|
||||
// Exit with no-op if this is being called on an existing server
|
||||
if server.Address == raft.ServerAddress(addr) && server.ID == raft.ServerID(parts.ID) {
|
||||
return nil
|
||||
} else {
|
||||
future := s.raft.RemoveServer(server.ID, 0, 0)
|
||||
if server.Address == raft.ServerAddress(addr) {
|
||||
if err := future.Error(); err != nil {
|
||||
return fmt.Errorf("error removing server with duplicate address %q: %s", server.Address, err)
|
||||
}
|
||||
s.logger.Printf("[INFO] consul: removed server with duplicate address: %s", server.Address)
|
||||
} else {
|
||||
if err := future.Error(); err != nil {
|
||||
return fmt.Errorf("error removing server with duplicate ID %q: %s", server.ID, err)
|
||||
}
|
||||
s.logger.Printf("[INFO] consul: removed server with duplicate ID: %s", server.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Attempt to add as a peer
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch {
|
||||
case minRaftProtocol >= 3:
|
||||
addFuture := s.raft.AddNonvoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0)
|
||||
@ -635,7 +657,6 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error {
|
||||
|
||||
// removeConsulServer is used to try to remove a consul server that has left
|
||||
func (s *Server) removeConsulServer(m serf.Member, port int) error {
|
||||
// TODO (slackpad) - This will need to be changed once we support node IDs.
|
||||
addr := (&net.TCPAddr{IP: m.Addr, Port: port}).String()
|
||||
|
||||
// See if it's already in the configuration. It's harmless to re-remove it
|
||||
|
@ -711,3 +711,167 @@ func TestLeader_RollRaftServer(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeader_ChangeServerAddress(t *testing.T) {
|
||||
conf := func(c *Config) {
|
||||
c.Bootstrap = false
|
||||
c.BootstrapExpect = 3
|
||||
c.Datacenter = "dc1"
|
||||
c.RaftConfig.ProtocolVersion = 3
|
||||
}
|
||||
dir1, s1 := testServerWithConfig(t, conf)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, conf)
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
dir3, s3 := testServerWithConfig(t, conf)
|
||||
defer os.RemoveAll(dir3)
|
||||
defer s3.Shutdown()
|
||||
|
||||
servers := []*Server{s1, s2, s3}
|
||||
|
||||
// Try to join
|
||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
|
||||
if _, err := s2.JoinLAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if _, err := s3.JoinLAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
for _, s := range servers {
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
peers, _ := s.numPeers()
|
||||
return peers == 3, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("should have 3 peers")
|
||||
})
|
||||
}
|
||||
|
||||
// Shut down a server, freeing up its address/port
|
||||
s3.Shutdown()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
alive := 0
|
||||
for _, m := range s1.LANMembers() {
|
||||
if m.Status == serf.StatusAlive {
|
||||
alive++
|
||||
}
|
||||
}
|
||||
return alive == 2, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("should have 2 alive members")
|
||||
})
|
||||
|
||||
// Bring up a new server with s3's address that will get a different ID
|
||||
dir4, s4 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Bootstrap = false
|
||||
c.BootstrapExpect = 3
|
||||
c.Datacenter = "dc1"
|
||||
c.RaftConfig.ProtocolVersion = 3
|
||||
c.NodeID = s3.config.NodeID
|
||||
})
|
||||
defer os.RemoveAll(dir4)
|
||||
defer s4.Shutdown()
|
||||
if _, err := s4.JoinLAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
servers[2] = s4
|
||||
|
||||
// Make sure the dead server is removed and we're back to 3 total peers
|
||||
for _, s := range servers {
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
peers, _ := s.numPeers()
|
||||
return peers == 3, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("should have 3 peers")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeader_ChangeServerID(t *testing.T) {
|
||||
conf := func(c *Config) {
|
||||
c.Bootstrap = false
|
||||
c.BootstrapExpect = 3
|
||||
c.Datacenter = "dc1"
|
||||
c.RaftConfig.ProtocolVersion = 3
|
||||
}
|
||||
dir1, s1 := testServerWithConfig(t, conf)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, conf)
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
dir3, s3 := testServerWithConfig(t, conf)
|
||||
defer os.RemoveAll(dir3)
|
||||
defer s3.Shutdown()
|
||||
|
||||
servers := []*Server{s1, s2, s3}
|
||||
|
||||
// Try to join
|
||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
|
||||
if _, err := s2.JoinLAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if _, err := s3.JoinLAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
for _, s := range servers {
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
peers, _ := s.numPeers()
|
||||
return peers == 3, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("should have 3 peers")
|
||||
})
|
||||
}
|
||||
|
||||
// Shut down a server, freeing up its address/port
|
||||
s3.Shutdown()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
alive := 0
|
||||
for _, m := range s1.LANMembers() {
|
||||
if m.Status == serf.StatusAlive {
|
||||
alive++
|
||||
}
|
||||
}
|
||||
return alive == 2, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("should have 2 alive members")
|
||||
})
|
||||
|
||||
// Bring up a new server with s3's address that will get a different ID
|
||||
dir4, s4 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Bootstrap = false
|
||||
c.BootstrapExpect = 3
|
||||
c.Datacenter = "dc1"
|
||||
c.RaftConfig.ProtocolVersion = 3
|
||||
c.SerfLANConfig.MemberlistConfig = s3.config.SerfLANConfig.MemberlistConfig
|
||||
c.RPCAddr = s3.config.RPCAddr
|
||||
c.RPCAdvertise = s3.config.RPCAdvertise
|
||||
})
|
||||
defer os.RemoveAll(dir4)
|
||||
defer s4.Shutdown()
|
||||
if _, err := s4.JoinLAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
servers[2] = s4
|
||||
|
||||
// Make sure the dead server is removed and we're back to 3 total peers
|
||||
for _, s := range servers {
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
peers, _ := s.numPeers()
|
||||
return peers == 3, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("should have 3 peers")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -212,31 +212,7 @@ func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs
|
||||
return fmt.Errorf("all servers must have raft_protocol set to 3 or higher to use this endpoint")
|
||||
}
|
||||
|
||||
var status structs.OperatorHealthReply
|
||||
future := op.srv.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
healthyCount := 0
|
||||
servers := future.Configuration().Servers
|
||||
for _, s := range servers {
|
||||
health := op.srv.getServerHealth(string(s.ID))
|
||||
if health != nil {
|
||||
if health.Healthy {
|
||||
healthyCount++
|
||||
}
|
||||
status.Servers = append(status.Servers, *health)
|
||||
}
|
||||
}
|
||||
status.Healthy = healthyCount == len(servers)
|
||||
|
||||
// If we have extra healthy servers, set FailureTolerance
|
||||
if healthyCount > len(servers)/2+1 {
|
||||
status.FailureTolerance = healthyCount - (len(servers)/2 + 1)
|
||||
}
|
||||
|
||||
*reply = status
|
||||
*reply = op.srv.getClusterHealth()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -429,22 +429,21 @@ func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestOperator_ServerHealth(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
conf := func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.Bootstrap = true
|
||||
c.Bootstrap = false
|
||||
c.BootstrapExpect = 3
|
||||
c.RaftConfig.ProtocolVersion = 3
|
||||
c.ServerHealthInterval = 100 * time.Millisecond
|
||||
})
|
||||
c.AutopilotInterval = 100 * time.Millisecond
|
||||
}
|
||||
dir1, s1 := testServerWithConfig(t, conf)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.Bootstrap = false
|
||||
c.RaftConfig.ProtocolVersion = 3
|
||||
})
|
||||
dir2, s2 := testServerWithConfig(t, conf)
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
@ -453,11 +452,7 @@ func TestOperator_ServerHealth(t *testing.T) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
dir3, s3 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.Bootstrap = false
|
||||
c.RaftConfig.ProtocolVersion = 3
|
||||
})
|
||||
dir3, s3 := testServerWithConfig(t, conf)
|
||||
defer os.RemoveAll(dir3)
|
||||
defer s3.Shutdown()
|
||||
if _, err := s3.JoinLAN([]string{addr}); err != nil {
|
||||
@ -484,14 +479,15 @@ func TestOperator_ServerHealth(t *testing.T) {
|
||||
if len(reply.Servers) != 3 {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
}
|
||||
if reply.Servers[0].LastContact != 0 {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
}
|
||||
if reply.Servers[1].LastContact <= 0 {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
}
|
||||
if reply.Servers[2].LastContact <= 0 {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
// Leader should have LastContact == 0, others should be positive
|
||||
for _, s := range reply.Servers {
|
||||
isLeader := s1.raft.Leader() == raft.ServerAddress(s.Address)
|
||||
if isLeader && s.LastContact != 0 {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
}
|
||||
if !isLeader && s.LastContact <= 0 {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
|
@ -58,6 +58,7 @@ func (s *Server) lanEventHandler() {
|
||||
case serf.EventUser:
|
||||
s.localEvent(e.(serf.UserEvent))
|
||||
case serf.EventMemberUpdate: // Ignore
|
||||
s.localMemberEvent(e.(serf.MemberEvent))
|
||||
case serf.EventQuery: // Ignore
|
||||
default:
|
||||
s.logger.Printf("[WARN] consul: Unhandled LAN Serf Event: %#v", e)
|
||||
|
@ -88,6 +88,10 @@ type Server struct {
|
||||
// autopilotWaitGroup is used to block until Autopilot shuts down.
|
||||
autopilotWaitGroup sync.WaitGroup
|
||||
|
||||
// clusterHealth stores the current view of the cluster's health.
|
||||
clusterHealth structs.OperatorHealthReply
|
||||
clusterHealthLock sync.RWMutex
|
||||
|
||||
// Consul configuration
|
||||
config *Config
|
||||
|
||||
@ -157,10 +161,6 @@ type Server struct {
|
||||
sessionTimers map[string]*time.Timer
|
||||
sessionTimersLock sync.Mutex
|
||||
|
||||
// serverHealths stores the current view of server healths.
|
||||
serverHealths map[string]*structs.ServerHealth
|
||||
serverHealthLock sync.RWMutex
|
||||
|
||||
// tombstoneGC is used to track the pending GC invocations
|
||||
// for the KV tombstones
|
||||
tombstoneGC *state.TombstoneGC
|
||||
|
@ -111,6 +111,9 @@ type ServerHealth struct {
|
||||
// Name is the node name of the server.
|
||||
Name string
|
||||
|
||||
// Address is the address of the server.
|
||||
Address string
|
||||
|
||||
// The status of the SerfHealth check for the server.
|
||||
SerfStatus serf.MemberStatus
|
||||
|
||||
@ -127,6 +130,9 @@ type ServerHealth struct {
|
||||
// Autopilot config.
|
||||
Healthy bool
|
||||
|
||||
// Voter is whether this is a voting server.
|
||||
Voter bool
|
||||
|
||||
// StableSince is the last time this server's Healthy value changed.
|
||||
StableSince time.Time
|
||||
}
|
||||
|
6
vendor/github.com/hashicorp/raft/api.go
generated
vendored
6
vendor/github.com/hashicorp/raft/api.go
generated
vendored
@ -979,10 +979,10 @@ func (r *Raft) Stats() map[string]string {
|
||||
}
|
||||
|
||||
last := r.LastContact()
|
||||
if last.IsZero() {
|
||||
s["last_contact"] = "never"
|
||||
} else if r.getState() == Leader {
|
||||
if r.getState() == Leader {
|
||||
s["last_contact"] = "0"
|
||||
} else if last.IsZero() {
|
||||
s["last_contact"] = "never"
|
||||
} else {
|
||||
s["last_contact"] = fmt.Sprintf("%v", time.Now().Sub(last))
|
||||
}
|
||||
|
8
vendor/vendor.json
vendored
8
vendor/vendor.json
vendored
@ -600,10 +600,12 @@
|
||||
"revisionTime": "2015-11-16T02:03:38Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "wpirHJV/6VEbbD+HyAP2/6Xc0ek=",
|
||||
"checksumSHA1": "NvFexY/rs9sPfve+ny/rkMkCL5M=",
|
||||
"path": "github.com/hashicorp/raft",
|
||||
"revision": "aaad9f10266e089bd401e7a6487651a69275641b",
|
||||
"revisionTime": "2016-11-10T00:52:40Z"
|
||||
"revision": "6b063a18bfe6e0da3fdc2b9bf6256be9c0a4849a",
|
||||
"revisionTime": "2017-03-16T02:42:32Z",
|
||||
"version": "library-v2-stage-one",
|
||||
"versionExact": "library-v2-stage-one"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "QAxukkv54/iIvLfsUP6IK4R0m/A=",
|
||||
|
@ -360,21 +360,25 @@ A JSON body is returned that looks like this:
|
||||
{
|
||||
"ID": "e349749b-3303-3ddf-959c-b5885a0e1f6e",
|
||||
"Name": "node1",
|
||||
"Address": "127.0.0.1:8300",
|
||||
"SerfStatus": "alive",
|
||||
"LastContact": "0s",
|
||||
"LastTerm": 2,
|
||||
"LastIndex": 46,
|
||||
"Healthy": true,
|
||||
"Voter": true,
|
||||
"StableSince": "2017-03-06T22:07:51Z"
|
||||
},
|
||||
{
|
||||
"ID": "e36ee410-cc3c-0a0c-c724-63817ab30303",
|
||||
"Name": "node2",
|
||||
"Address": "127.0.0.1:8205",
|
||||
"SerfStatus": "alive",
|
||||
"LastContact": "27.291304ms",
|
||||
"LastTerm": 2,
|
||||
"LastIndex": 46,
|
||||
"Healthy": true,
|
||||
"Voter": false,
|
||||
"StableSince": "2017-03-06T22:18:26Z"
|
||||
}
|
||||
]
|
||||
@ -392,6 +396,8 @@ The `Servers` list holds detailed health information on each server:
|
||||
|
||||
- `Name` is the node name of the server.
|
||||
|
||||
- `Address` is the address of the server.
|
||||
|
||||
- `SerfStatus` is the SerfHealth check status for the server.
|
||||
|
||||
- `LastContact` is the time elapsed since this server's last contact with the leader.
|
||||
@ -402,4 +408,6 @@ The `Servers` list holds detailed health information on each server:
|
||||
|
||||
- `Healthy` is whether the server is healthy according to the current Autopilot configuration.
|
||||
|
||||
- `Voter` is whether the server is a voting member of the Raft cluster.
|
||||
|
||||
- `StableSince` is the time this server has been in its current `Healthy` state.
|
@ -189,4 +189,16 @@ These metrics give insight into the health of the cluster as a whole.
|
||||
<td>ms</td>
|
||||
<td>timer</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>`consul.autopilot.failure_tolerance`</td>
|
||||
<td>This tracks the number of voting servers that the cluster can lose while continuing to function.</td>
|
||||
<td>servers</td>
|
||||
<td>gauge</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>`consul.autopilot.healthy`</td>
|
||||
<td>This tracks the overall health of the local server cluster. If all servers are considered healthy by Autopilot, this will be set to 1. If any are unhealthy, this will be 0.</td>
|
||||
<td>boolean</td>
|
||||
<td>gauge</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
Loading…
x
Reference in New Issue
Block a user