From 9b4497de094753da7c873a3ad0470fde4219bf4a Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Thu, 9 Mar 2017 16:43:07 -0800 Subject: [PATCH] Cleaned up and reorganized some autopilot-related code --- api/operator.go | 100 +++++++++- api/operator_test.go | 26 +++ command/agent/operator_endpoint.go | 76 +++++++- command/agent/operator_endpoint_test.go | 56 +++++- command/operator_autopilot_set.go | 37 ++-- command/operator_autopilot_set_test.go | 18 +- consul/autopilot.go | 183 ++++++++++-------- consul/autopilot_test.go | 5 +- consul/config.go | 15 +- consul/operator_endpoint.go | 22 ++- consul/operator_endpoint_test.go | 41 ++-- consul/server.go | 14 +- consul/structs/operator.go | 48 ++++- consul/structs/operator_test.go | 94 +++++++++ testutil/server.go | 1 + .../docs/agent/http/operator.html.markdown | 10 +- .../source/docs/agent/options.html.markdown | 5 +- .../operator/autopilot.html.markdown.erb | 2 +- 18 files changed, 581 insertions(+), 172 deletions(-) create mode 100644 consul/structs/operator_test.go diff --git a/api/operator.go b/api/operator.go index cb5516a63d..2c9961c9d1 100644 --- a/api/operator.go +++ b/api/operator.go @@ -82,7 +82,7 @@ type AutopilotConfiguration struct { // LastContactThreshold is the limit on the amount of time a server can go // without leader contact before being considered unhealthy. - LastContactThreshold time.Duration + LastContactThreshold *ReadableDuration // MaxTrailingLogs is the amount of entries in the Raft Log that a server can // be behind before being considered unhealthy. @@ -91,7 +91,7 @@ type AutopilotConfiguration struct { // ServerStabilizationTime is the minimum amount of time a server must be // in a stable, healthy state before it can be added to the cluster. Only // applicable with Raft protocol version 3 or higher. - ServerStabilizationTime time.Duration + ServerStabilizationTime *ReadableDuration // CreateIndex holds the index corresponding the creation of this configuration. // This is a read-only field. @@ -104,6 +104,84 @@ type AutopilotConfiguration struct { ModifyIndex uint64 } +// ServerHealth is the health (from the leader's point of view) of a server. +type ServerHealth struct { + // ID is the raft ID of the server. + ID string + + // Name is the node name of the server. + Name string + + // The status of the SerfHealth check for the server. + SerfStatus string + + // LastContact is the time since this node's last contact with the leader. + LastContact *ReadableDuration + + // LastTerm is the highest leader term this server has a record of in its Raft log. + LastTerm uint64 + + // LastIndex is the last log index this server has a record of in its Raft log. + LastIndex uint64 + + // Healthy is whether or not the server is healthy according to the current + // Autopilot config. + Healthy bool + + // StableSince is the last time this server's Healthy value changed. + StableSince time.Time +} + +// OperatorHealthReply is a representation of the overall health of the cluster +type OperatorHealthReply struct { + // Healthy is true if all the servers in the cluster are healthy. + Healthy bool + + // FailureTolerance is the number of healthy servers that could be lost without + // an outage occurring. + FailureTolerance int + + // Servers holds the health of each server. + Servers []ServerHealth +} + +// ReadableDuration is a duration type that is serialized to JSON in human readable format. +type ReadableDuration time.Duration + +func NewReadableDuration(dur time.Duration) *ReadableDuration { + d := ReadableDuration(dur) + return &d +} + +func (d *ReadableDuration) String() string { return d.Duration().String() } +func (d *ReadableDuration) Duration() time.Duration { + if d == nil { + return time.Duration(0) + } + return time.Duration(*d) +} + +func (d *ReadableDuration) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`"%s"`, d.Duration().String())), nil +} + +func (d *ReadableDuration) UnmarshalJSON(raw []byte) error { + if d == nil { + return fmt.Errorf("cannot unmarshal to nil pointer") + } + + str := string(raw) + if len(str) < 2 || str[0] != '"' || str[len(str)-1] != '"' { + return fmt.Errorf("must be enclosed with quotes: %s", str) + } + dur, err := time.ParseDuration(str[1 : len(str)-1]) + if err != nil { + return err + } + *d = ReadableDuration(dur) + return nil +} + // RaftGetConfiguration is used to query the current Raft peer set. func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) { r := op.c.newRequest("GET", "/v1/operator/raft/configuration") @@ -217,6 +295,7 @@ func (op *Operator) AutopilotGetConfiguration(q *QueryOptions) (*AutopilotConfig if err := decodeBody(resp, &out); err != nil { return nil, err } + return &out, nil } @@ -255,3 +334,20 @@ func (op *Operator) AutopilotCASConfiguration(conf *AutopilotConfiguration, q *W return res, nil } + +// AutopilotServerHealth +func (op *Operator) AutopilotServerHealth(q *QueryOptions) (*OperatorHealthReply, error) { + r := op.c.newRequest("GET", "/v1/operator/autopilot/health") + r.setQueryOptions(q) + _, resp, err := requireOK(op.c.doRequest(r)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var out OperatorHealthReply + if err := decodeBody(resp, &out); err != nil { + return nil, err + } + return &out, nil +} diff --git a/api/operator_test.go b/api/operator_test.go index 13ce53e611..38768c6881 100644 --- a/api/operator_test.go +++ b/api/operator_test.go @@ -1,6 +1,7 @@ package api import ( + "fmt" "strings" "testing" @@ -178,3 +179,28 @@ func TestOperator_AutopilotCASConfiguration(t *testing.T) { } } } + +func TestOperator_ServerHealth(t *testing.T) { + t.Parallel() + c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) { + c.RaftProtocol = 3 + }) + defer s.Stop() + + operator := c.Operator() + testutil.WaitForResult(func() (bool, error) { + out, err := operator.AutopilotServerHealth(nil) + if err != nil { + return false, fmt.Errorf("err: %v", err) + } + if len(out.Servers) != 1 || + !out.Servers[0].Healthy || + out.Servers[0].Name != s.Config.NodeName { + return false, fmt.Errorf("bad: %v", out) + } + + return true, nil + }, func(err error) { + t.Fatal(err) + }) +} diff --git a/command/agent/operator_endpoint.go b/command/agent/operator_endpoint.go index cb1ccd93a3..a04e801fce 100644 --- a/command/agent/operator_endpoint.go +++ b/command/agent/operator_endpoint.go @@ -4,10 +4,13 @@ import ( "fmt" "net/http" "strconv" + "time" + "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/consul/structs" multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/raft" + "strings" ) // OperatorRaftConfiguration is used to inspect the current Raft configuration. @@ -183,12 +186,35 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re return nil, err } - return reply, nil + out := api.AutopilotConfiguration{ + CleanupDeadServers: reply.CleanupDeadServers, + LastContactThreshold: api.NewReadableDuration(reply.LastContactThreshold), + MaxTrailingLogs: reply.MaxTrailingLogs, + ServerStabilizationTime: api.NewReadableDuration(reply.ServerStabilizationTime), + CreateIndex: reply.CreateIndex, + ModifyIndex: reply.ModifyIndex, + } + + return out, nil case "PUT": var args structs.AutopilotSetConfigRequest s.parseDC(req, &args.Datacenter) s.parseToken(req, &args.Token) + var conf api.AutopilotConfiguration + if err := decodeBody(req, &conf, FixupConfigDurations); err != nil { + resp.WriteHeader(400) + resp.Write([]byte(fmt.Sprintf("Error parsing autopilot config: %v", err))) + return nil, nil + } + + args.Config = structs.AutopilotConfig{ + CleanupDeadServers: conf.CleanupDeadServers, + LastContactThreshold: conf.LastContactThreshold.Duration(), + MaxTrailingLogs: conf.MaxTrailingLogs, + ServerStabilizationTime: conf.ServerStabilizationTime.Duration(), + } + // Check for cas value params := req.URL.Query() if _, ok := params["cas"]; ok { @@ -202,12 +228,6 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re args.CAS = true } - if err := decodeBody(req, &args.Config, nil); err != nil { - resp.WriteHeader(400) - resp.Write([]byte(fmt.Sprintf("Error parsing autopilot config: %v", err))) - return nil, nil - } - var reply bool if err := s.agent.RPC("Operator.AutopilotSetConfiguration", &args, &reply); err != nil { return nil, err @@ -225,6 +245,29 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re } } +// FixupConfigDurations is used to handle parsing the duration fields in +// the Autopilot config struct +func FixupConfigDurations(raw interface{}) error { + rawMap, ok := raw.(map[string]interface{}) + if !ok { + return nil + } + for key, val := range rawMap { + if strings.ToLower(key) == "lastcontactthreshold" || + strings.ToLower(key) == "serverstabilizationtime" { + // Convert a string value into an integer + if vStr, ok := val.(string); ok { + dur, err := time.ParseDuration(vStr) + if err != nil { + return err + } + rawMap[key] = dur + } + } + } + return nil +} + // OperatorServerHealth is used to get the health of the servers in the local DC func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Request) (interface{}, error) { if req.Method != "GET" { @@ -247,5 +290,22 @@ func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Re resp.WriteHeader(http.StatusTooManyRequests) } - return reply, nil + out := &api.OperatorHealthReply{ + Healthy: reply.Healthy, + FailureTolerance: reply.FailureTolerance, + } + for _, server := range reply.Servers { + out.Servers = append(out.Servers, api.ServerHealth{ + ID: server.ID, + Name: server.Name, + SerfStatus: server.SerfStatus.String(), + LastContact: api.NewReadableDuration(server.LastContact), + LastTerm: server.LastTerm, + LastIndex: server.LastIndex, + Healthy: server.Healthy, + StableSince: server.StableSince.Round(time.Second).UTC(), + }) + } + + return out, nil } diff --git a/command/agent/operator_endpoint_test.go b/command/agent/operator_endpoint_test.go index 20b6614f51..c0ad9f9ad0 100644 --- a/command/agent/operator_endpoint_test.go +++ b/command/agent/operator_endpoint_test.go @@ -7,10 +7,11 @@ import ( "net/http/httptest" "strings" "testing" + "time" + "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/testutil" - "github.com/hashicorp/serf/serf" ) func TestOperator_OperatorRaftConfiguration(t *testing.T) { @@ -304,7 +305,7 @@ func TestOperator_AutopilotGetConfiguration(t *testing.T) { if resp.Code != 200 { t.Fatalf("bad code: %d", resp.Code) } - out, ok := obj.(structs.AutopilotConfig) + out, ok := obj.(api.AutopilotConfiguration) if !ok { t.Fatalf("unexpected: %T", obj) } @@ -424,7 +425,10 @@ func TestOperator_AutopilotCASConfiguration(t *testing.T) { } func TestOperator_OperatorServerHealth(t *testing.T) { - httpTest(t, func(srv *HTTPServer) { + cb := func(c *Config) { + c.RaftProtocol = 3 + } + httpTestWithConfig(t, func(srv *HTTPServer) { body := bytes.NewBuffer(nil) req, err := http.NewRequest("GET", "/v1/operator/autopilot/health", body) if err != nil { @@ -440,14 +444,14 @@ func TestOperator_OperatorServerHealth(t *testing.T) { if resp.Code != 200 { return false, fmt.Errorf("bad code: %d", resp.Code) } - out, ok := obj.(structs.OperatorHealthReply) + out, ok := obj.(*api.OperatorHealthReply) if !ok { return false, fmt.Errorf("unexpected: %T", obj) } if len(out.Servers) != 1 || !out.Servers[0].Healthy || out.Servers[0].Name != srv.agent.config.NodeName || - out.Servers[0].SerfStatusRaw != serf.StatusAlive || + out.Servers[0].SerfStatus != "alive" || out.FailureTolerance != 0 { return false, fmt.Errorf("bad: %v", out) } @@ -457,5 +461,45 @@ func TestOperator_OperatorServerHealth(t *testing.T) { t.Fatal(err) }) - }) + }, cb) +} + +func TestOperator_OperatorServerHealth_Unhealthy(t *testing.T) { + threshold := time.Duration(-1) + cb := func(c *Config) { + c.RaftProtocol = 3 + c.Autopilot.LastContactThreshold = &threshold + } + httpTestWithConfig(t, func(srv *HTTPServer) { + body := bytes.NewBuffer(nil) + req, err := http.NewRequest("GET", "/v1/operator/autopilot/health", body) + if err != nil { + t.Fatalf("err: %v", err) + } + + testutil.WaitForResult(func() (bool, error) { + resp := httptest.NewRecorder() + obj, err := srv.OperatorServerHealth(resp, req) + if err != nil { + return false, fmt.Errorf("err: %v", err) + } + if resp.Code != 429 { + return false, fmt.Errorf("bad code: %d", resp.Code) + } + out, ok := obj.(*api.OperatorHealthReply) + if !ok { + return false, fmt.Errorf("unexpected: %T", obj) + } + if len(out.Servers) != 1 || + out.Healthy || + out.Servers[0].Name != srv.agent.config.NodeName { + return false, fmt.Errorf("bad: %v", out) + } + + return true, nil + }, func(err error) { + t.Fatal(err) + }) + + }, cb) } diff --git a/command/operator_autopilot_set.go b/command/operator_autopilot_set.go index b4f4fe73d7..3bd8dc8596 100644 --- a/command/operator_autopilot_set.go +++ b/command/operator_autopilot_set.go @@ -3,10 +3,10 @@ package command import ( "flag" "fmt" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/command/base" "strings" "time" - - "github.com/hashicorp/consul/command/base" ) type OperatorAutopilotSetCommand struct { @@ -30,9 +30,9 @@ func (c *OperatorAutopilotSetCommand) Synopsis() string { func (c *OperatorAutopilotSetCommand) Run(args []string) int { var cleanupDeadServers base.BoolValue - var lastContactThresholdRaw string var maxTrailingLogs base.UintValue - var serverStabilizationTimeRaw string + var lastContactThreshold base.DurationValue + var serverStabilizationTime base.DurationValue f := c.Command.NewFlagSet(c) @@ -42,11 +42,11 @@ func (c *OperatorAutopilotSetCommand) Run(args []string) int { f.Var(&maxTrailingLogs, "max-trailing-logs", "Controls the maximum number of log entries that a server can trail the "+ "leader by before being considered unhealthy.") - f.StringVar(&lastContactThresholdRaw, "last-contact-threshold", "", + f.Var(&lastContactThreshold, "last-contact-threshold", "Controls the maximum amount of time a server can go without contact "+ "from the leader before being considered unhealthy. Must be a duration value "+ - "such as `10s`.") - f.StringVar(&serverStabilizationTimeRaw, "server-stabilization-time", "", + "such as `200ms`.") + f.Var(&serverStabilizationTime, "server-stabilization-time", "Controls the minimum amount of time a server must be stable in the "+ "'healthy' state before being added to the cluster. Only takes effect if all "+ "servers are running Raft protocol version 3 or higher. Must be a duration "+ @@ -77,25 +77,18 @@ func (c *OperatorAutopilotSetCommand) Run(args []string) int { // Update the config values based on the set flags. cleanupDeadServers.Merge(&conf.CleanupDeadServers) + trailing := uint(conf.MaxTrailingLogs) maxTrailingLogs.Merge(&trailing) conf.MaxTrailingLogs = uint64(trailing) - if lastContactThresholdRaw != "" { - dur, err := time.ParseDuration(lastContactThresholdRaw) - if err != nil { - c.Ui.Error(fmt.Sprintf("invalid value for last-contact-threshold: %v", err)) - return 1 - } - conf.LastContactThreshold = dur - } - if serverStabilizationTimeRaw != "" { - dur, err := time.ParseDuration(serverStabilizationTimeRaw) - if err != nil { - c.Ui.Error(fmt.Sprintf("invalid value for server-stabilization-time: %v", err)) - } - conf.ServerStabilizationTime = dur - } + last := time.Duration(*conf.LastContactThreshold) + lastContactThreshold.Merge(&last) + conf.LastContactThreshold = api.NewReadableDuration(last) + + stablization := time.Duration(*conf.ServerStabilizationTime) + serverStabilizationTime.Merge(&stablization) + conf.ServerStabilizationTime = api.NewReadableDuration(stablization) // Check-and-set the new configuration. result, err := operator.AutopilotCASConfiguration(conf, nil) diff --git a/command/operator_autopilot_set_test.go b/command/operator_autopilot_set_test.go index f4cab0b589..4b859aa0eb 100644 --- a/command/operator_autopilot_set_test.go +++ b/command/operator_autopilot_set_test.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/consul/command/base" "github.com/hashicorp/consul/consul/structs" "github.com/mitchellh/cli" + "time" ) func TestOperator_Autopilot_Set_Implements(t *testing.T) { @@ -25,7 +26,13 @@ func TestOperator_Autopilot_Set(t *testing.T) { Flags: base.FlagSetHTTP, }, } - args := []string{"-http-addr=" + a1.httpAddr, "-cleanup-dead-servers=false"} + args := []string{ + "-http-addr=" + a1.httpAddr, + "-cleanup-dead-servers=false", + "-max-trailing-logs=99", + "-last-contact-threshold=123ms", + "-server-stabilization-time=123ms", + } code := c.Run(args) if code != 0 { @@ -47,4 +54,13 @@ func TestOperator_Autopilot_Set(t *testing.T) { if reply.CleanupDeadServers { t.Fatalf("bad: %#v", reply) } + if reply.MaxTrailingLogs != 99 { + t.Fatalf("bad: %#v", reply) + } + if reply.LastContactThreshold != 123*time.Millisecond { + t.Fatalf("bad: %#v", reply) + } + if reply.ServerStabilizationTime != 123*time.Millisecond { + t.Fatalf("bad: %#v", reply) + } } diff --git a/consul/autopilot.go b/consul/autopilot.go index f775448bd8..1815985fc9 100644 --- a/consul/autopilot.go +++ b/consul/autopilot.go @@ -3,6 +3,7 @@ package consul import ( "fmt" "strconv" + "sync" "time" "github.com/hashicorp/consul/consul/agent" @@ -19,70 +20,38 @@ type AutopilotPolicy interface { func (s *Server) startAutopilot() { s.autopilotShutdownCh = make(chan struct{}) + s.autopilotWaitGroup = sync.WaitGroup{} + s.autopilotWaitGroup.Add(1) - go s.serverHealthLoop() - go s.removeDeadLoop() + go s.autopilotLoop() } func (s *Server) stopAutopilot() { close(s.autopilotShutdownCh) + s.autopilotWaitGroup.Wait() } -// serverHealthLoop monitors the health of the servers in the cluster -func (s *Server) serverHealthLoop() { +// autopilotLoop periodically looks for nonvoting servers to promote and dead servers to remove. +func (s *Server) autopilotLoop() { // Monitor server health until shutdown - ticker := time.NewTicker(s.config.ServerHealthInterval) + ticker := time.NewTicker(s.config.AutopilotInterval) for { select { case <-s.autopilotShutdownCh: ticker.Stop() + s.autopilotWaitGroup.Done() return case <-ticker.C: - serverHealths := make(map[string]*structs.ServerHealth) - state := s.fsm.State() _, autopilotConf, err := state.AutopilotConfig() if err != nil { s.logger.Printf("[ERR] consul: error retrieving autopilot config: %s", err) } - // Build an updated map of server healths - for _, member := range s.LANMembers() { - if member.Status == serf.StatusLeft { - continue - } - - valid, parts := agent.IsConsulServer(member) - if valid { - health, err := s.queryServerHealth(member, parts, autopilotConf) - if err != nil { - s.logger.Printf("[ERR] consul: error fetching server health: %s", err) - } else { - serverHealths[parts.Addr.String()] = health - } - } - } - - s.autopilotLock.Lock() - s.autopilotHealth = serverHealths - s.autopilotLock.Unlock() - if err := s.autopilotPolicy.PromoteNonVoters(autopilotConf); err != nil { s.logger.Printf("[ERR] consul: error checking for non-voters to promote: %s", err) } - } - } -} -// removeDeadLoop checks for dead servers periodically, or when receiving on autopilotRemoveDeadCh -func (s *Server) removeDeadLoop() { - ticker := time.NewTicker(s.config.RemoveDeadInterval) - for { - select { - case <-s.autopilotShutdownCh: - ticker.Stop() - return - case <-ticker.C: if err := s.pruneDeadServers(); err != nil { s.logger.Printf("[ERR] consul: error checking for dead servers to remove: %s", err) } @@ -113,13 +82,18 @@ func (s *Server) pruneDeadServers() error { } } + // Nothing to remove, return early + if len(failed) == 0 { + return nil + } + peers, err := s.numPeers() if err != nil { return err } // Only do removals if a minority of servers will be affected - if len(failed) <= peers/2 { + if len(failed) < peers/2 || (len(failed) == 1 && peers >= 3) { for _, server := range failed { s.logger.Printf("[INFO] consul: Attempting removal of failed server: %v", server) go s.serfLAN.RemoveFailedNode(server) @@ -160,8 +134,8 @@ func (b *BasicAutopilot) PromoteNonVoters(autopilotConf *structs.AutopilotConfig for _, server := range raftServers { // If this server has been stable and passing for long enough, promote it to a voter if server.Suffrage == raft.Nonvoter { - health := b.server.getServerHealth(string(server.Address)) - if health != nil && health.Healthy && time.Now().Sub(health.StableSince) >= autopilotConf.ServerStabilizationTime { + health := b.server.getServerHealth(string(server.ID)) + if health.IsStable(time.Now(), autopilotConf) { promotions = append(promotions, server) } } else { @@ -210,6 +184,68 @@ func (b *BasicAutopilot) PromoteNonVoters(autopilotConf *structs.AutopilotConfig return nil } +// serverHealthLoop monitors the health of the servers in the cluster +func (s *Server) serverHealthLoop() { + // Monitor server health until shutdown + ticker := time.NewTicker(s.config.ServerHealthInterval) + for { + select { + case <-s.shutdownCh: + ticker.Stop() + return + case <-ticker.C: + serverHealths := make(map[string]*structs.ServerHealth) + + // Don't do anything if the min Raft version is too low + minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers()) + if err != nil { + s.logger.Printf("[ERR] consul: error getting server raft protocol versions: %s", err) + break + } + if minRaftProtocol < 3 { + break + } + + state := s.fsm.State() + _, autopilotConf, err := state.AutopilotConfig() + if err != nil { + s.logger.Printf("[ERR] consul: error retrieving autopilot config: %s", err) + break + } + // Bail early if autopilot config hasn't been initialized yet + if autopilotConf == nil { + break + } + + // Build an updated map of server healths + for _, member := range s.LANMembers() { + if member.Status == serf.StatusLeft { + continue + } + + valid, parts := agent.IsConsulServer(member) + if valid { + health, err := s.queryServerHealth(member, parts, autopilotConf) + if err != nil { + s.logger.Printf("[ERR] consul: error fetching server health: %s", err) + serverHealths[parts.ID] = &structs.ServerHealth{ + ID: parts.ID, + Name: parts.Name, + Healthy: false, + } + } else { + serverHealths[parts.ID] = health + } + } + } + + s.serverHealthLock.Lock() + s.serverHealths = serverHealths + s.serverHealthLock.Unlock() + } + } +} + // queryServerHealth fetches the raft stats for the given server and uses them // to update its ServerHealth func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, @@ -220,18 +256,16 @@ func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, } health := &structs.ServerHealth{ - ID: server.ID, - Name: server.Name, - SerfStatusRaw: member.Status, - SerfStatus: member.Status.String(), - LastContactRaw: -1, - LastContact: stats.LastContact, - LastTerm: stats.LastTerm, - LastIndex: stats.LastIndex, + ID: server.ID, + Name: server.Name, + SerfStatus: member.Status, + LastContact: -1, + LastTerm: stats.LastTerm, + LastIndex: stats.LastIndex, } - if health.LastContact != "never" { - health.LastContactRaw, err = time.ParseDuration(health.LastContact) + if stats.LastContact != "never" { + health.LastContact, err = time.ParseDuration(stats.LastContact) if err != nil { return nil, fmt.Errorf("error parsing last_contact duration: %s", err) } @@ -239,14 +273,17 @@ func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, // Set LastContact to 0 for the leader if s.config.NodeName == member.Name { - health.LastContactRaw = 0 - health.LastContact = "leader" + health.LastContact = 0 } - health.Healthy = s.isServerHealthy(health, autopilotConf) + lastTerm, err := strconv.ParseUint(s.raft.Stats()["last_log_term"], 10, 64) + if err != nil { + return nil, fmt.Errorf("error parsing last_log_term: %s", err) + } + health.Healthy = health.IsHealthy(lastTerm, s.raft.LastIndex(), autopilotConf) // If this is a new server or the health changed, reset StableSince - lastHealth := s.getServerHealth(server.Addr.String()) + lastHealth := s.getServerHealth(server.ID) if lastHealth == nil || lastHealth.Healthy != health.Healthy { health.StableSince = time.Now() } else { @@ -256,10 +293,10 @@ func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, return health, nil } -func (s *Server) getServerHealth(addr string) *structs.ServerHealth { - s.autopilotLock.RLock() - defer s.autopilotLock.RUnlock() - h, ok := s.autopilotHealth[addr] +func (s *Server) getServerHealth(id string) *structs.ServerHealth { + s.serverHealthLock.RLock() + defer s.serverHealthLock.RUnlock() + h, ok := s.serverHealths[id] if !ok { return nil } @@ -272,27 +309,3 @@ func (s *Server) getServerStats(server *agent.Server) (structs.ServerStats, erro err := s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply) return reply, err } - -// isServerHealthy determines whether the given ServerHealth is healthy -// based on the current Autopilot config -func (s *Server) isServerHealthy(health *structs.ServerHealth, autopilotConf *structs.AutopilotConfig) bool { - if health.SerfStatusRaw != serf.StatusAlive { - return false - } - - if health.LastContactRaw > autopilotConf.LastContactThreshold || health.LastContactRaw < 0 { - return false - } - - lastTerm, _ := strconv.ParseUint(s.raft.Stats()["last_log_term"], 10, 64) - if health.LastTerm != lastTerm { - return false - } - - if s.raft.LastIndex() > autopilotConf.MaxTrailingLogs && - health.LastIndex < s.raft.LastIndex()-autopilotConf.MaxTrailingLogs { - return false - } - - return true -} diff --git a/consul/autopilot_test.go b/consul/autopilot_test.go index ae38f3f3eb..034197208f 100644 --- a/consul/autopilot_test.go +++ b/consul/autopilot_test.go @@ -85,7 +85,7 @@ func TestAutopilot_CleanupDeadServerPeriodic(t *testing.T) { dir1, s1 := testServerWithConfig(t, func(c *Config) { c.Datacenter = "dc1" c.Bootstrap = true - c.RemoveDeadInterval = 100 * time.Millisecond + c.AutopilotInterval = 100 * time.Millisecond }) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -144,6 +144,7 @@ func TestAutopilot_PromoteNonVoter(t *testing.T) { c.RaftConfig.ProtocolVersion = 3 c.AutopilotConfig.ServerStabilizationTime = 200 * time.Millisecond c.ServerHealthInterval = 100 * time.Millisecond + c.AutopilotInterval = 100 * time.Millisecond }) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -182,7 +183,7 @@ func TestAutopilot_PromoteNonVoter(t *testing.T) { if servers[1].Suffrage != raft.Nonvoter { return false, fmt.Errorf("bad: %v", servers) } - health := s1.getServerHealth(string(servers[1].Address)) + health := s1.getServerHealth(string(servers[1].ID)) if health == nil { return false, fmt.Errorf("nil health") } diff --git a/consul/config.go b/consul/config.go index 0d758a322c..f64567a038 100644 --- a/consul/config.go +++ b/consul/config.go @@ -280,13 +280,14 @@ type Config struct { // bootstrapping. AutopilotConfig *structs.AutopilotConfig - // ServerHealthInterval is the frequency with which the leader will check - // the health of the servers in the cluster + // ServerHealthInterval is the frequency with which the health of the + // servers in the cluster will be updated. ServerHealthInterval time.Duration - // RemoveDeadInterval is the frequency with which the leader will look for - // dead servers to remove from the cluster - RemoveDeadInterval time.Duration + // AutopilotInterval is the frequency with which the leader will perform + // autopilot tasks, such as promoting eligible non-voters and removing + // dead servers. + AutopilotInterval time.Duration } // CheckVersion is used to check if the ProtocolVersion is valid @@ -366,8 +367,8 @@ func DefaultConfig() *Config { MaxTrailingLogs: 250, ServerStabilizationTime: 10 * time.Second, }, - ServerHealthInterval: 1 * time.Second, - RemoveDeadInterval: 30 * time.Second, + ServerHealthInterval: 2 * time.Second, + AutopilotInterval: 10 * time.Second, } // Increase our reap interval to 3 days instead of 24h. diff --git a/consul/operator_endpoint.go b/consul/operator_endpoint.go index e30a1f434f..16d8b75dc0 100644 --- a/consul/operator_endpoint.go +++ b/consul/operator_endpoint.go @@ -8,7 +8,6 @@ import ( "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" - "time" ) // Operator endpoint is used to perform low-level operator tasks for Consul. @@ -204,9 +203,16 @@ func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs return permissionDeniedErr } - status := structs.OperatorHealthReply{ - Healthy: true, + // Exit early if the min Raft version is too low + minRaftProtocol, err := ServerMinRaftProtocol(op.srv.LANMembers()) + if err != nil { + return fmt.Errorf("error getting server raft protocol versions: %s", err) } + if minRaftProtocol < 3 { + return fmt.Errorf("all servers must have raft_protocol set to 3 or higher to use this endpoint") + } + + var status structs.OperatorHealthReply future := op.srv.raft.GetConfiguration() if err := future.Error(); err != nil { return err @@ -215,19 +221,15 @@ func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs healthyCount := 0 servers := future.Configuration().Servers for _, s := range servers { - health := op.srv.getServerHealth(string(s.Address)) + health := op.srv.getServerHealth(string(s.ID)) if health != nil { - // Fix up StableSince to be more readable - health.StableSince = health.StableSince.Round(time.Second).UTC() - - if !health.Healthy { - status.Healthy = false - } else { + if health.Healthy { healthyCount++ } status.Servers = append(status.Servers, *health) } } + status.Healthy = healthyCount == len(servers) // If we have extra healthy servers, set FailureTolerance if healthyCount > len(servers)/2+1 { diff --git a/consul/operator_endpoint_test.go b/consul/operator_endpoint_test.go index a762ba6b86..d430bd28fb 100644 --- a/consul/operator_endpoint_test.go +++ b/consul/operator_endpoint_test.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/consul/testutil" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/raft" + "time" ) func TestOperator_RaftGetConfiguration(t *testing.T) { @@ -428,16 +429,11 @@ func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) { } func TestOperator_ServerHealth(t *testing.T) { - for i := 1; i <= 3; i++ { - testServerHealth(t, i) - } -} - -func testServerHealth(t *testing.T, protocol int) { dir1, s1 := testServerWithConfig(t, func(c *Config) { c.Datacenter = "dc1" c.Bootstrap = true - c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(protocol) + c.RaftConfig.ProtocolVersion = 3 + c.ServerHealthInterval = 100 * time.Millisecond }) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -447,7 +443,7 @@ func testServerHealth(t *testing.T, protocol int) { dir2, s2 := testServerWithConfig(t, func(c *Config) { c.Datacenter = "dc1" c.Bootstrap = false - c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(protocol) + c.RaftConfig.ProtocolVersion = 3 }) defer os.RemoveAll(dir2) defer s2.Shutdown() @@ -460,7 +456,7 @@ func testServerHealth(t *testing.T, protocol int) { dir3, s3 := testServerWithConfig(t, func(c *Config) { c.Datacenter = "dc1" c.Bootstrap = false - c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(protocol) + c.RaftConfig.ProtocolVersion = 3 }) defer os.RemoveAll(dir3) defer s3.Shutdown() @@ -488,13 +484,13 @@ func testServerHealth(t *testing.T, protocol int) { if len(reply.Servers) != 3 { return false, fmt.Errorf("bad: %v", reply) } - if reply.Servers[0].LastContact != "leader" { + if reply.Servers[0].LastContact != 0 { return false, fmt.Errorf("bad: %v", reply) } - if reply.Servers[1].LastContactRaw <= 0 { + if reply.Servers[1].LastContact <= 0 { return false, fmt.Errorf("bad: %v", reply) } - if reply.Servers[2].LastContactRaw <= 0 { + if reply.Servers[2].LastContact <= 0 { return false, fmt.Errorf("bad: %v", reply) } return true, nil @@ -502,3 +498,24 @@ func testServerHealth(t *testing.T, protocol int) { t.Fatal(err) }) } + +func TestOperator_ServerHealth_UnsupportedRaftVersion(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc1" + c.Bootstrap = true + c.RaftConfig.ProtocolVersion = 2 + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + arg := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var reply structs.OperatorHealthReply + err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply) + if err == nil || !strings.Contains(err.Error(), "raft_protocol set to 3 or higher") { + t.Fatalf("bad: %v", err) + } +} diff --git a/consul/server.go b/consul/server.go index e46fedd9ff..112b9aff43 100644 --- a/consul/server.go +++ b/consul/server.go @@ -76,10 +76,6 @@ type Server struct { // aclCache is the non-authoritative ACL cache. aclCache *aclCache - // autopilotHealth stores the current view of server healths. - autopilotHealth map[string]*structs.ServerHealth - autopilotLock sync.RWMutex - // autopilotPolicy controls the behavior of Autopilot for certain tasks. autopilotPolicy AutopilotPolicy @@ -89,6 +85,9 @@ type Server struct { // autopilotShutdownCh is used to stop the Autopilot loop. autopilotShutdownCh chan struct{} + // autopilotWaitGroup is used to block until Autopilot shuts down. + autopilotWaitGroup sync.WaitGroup + // Consul configuration config *Config @@ -158,6 +157,10 @@ type Server struct { sessionTimers map[string]*time.Timer sessionTimersLock sync.Mutex + // serverHealths stores the current view of server healths. + serverHealths map[string]*structs.ServerHealth + serverHealthLock sync.RWMutex + // tombstoneGC is used to track the pending GC invocations // for the KV tombstones tombstoneGC *state.TombstoneGC @@ -315,6 +318,9 @@ func NewServer(config *Config) (*Server, error) { // Start the metrics handlers. go s.sessionStats() + // Start the server health checking. + go s.serverHealthLoop() + return s, nil } diff --git a/consul/structs/operator.go b/consul/structs/operator.go index 0ebeac8d3e..f17cd8f418 100644 --- a/consul/structs/operator.go +++ b/consul/structs/operator.go @@ -112,12 +112,10 @@ type ServerHealth struct { Name string // The status of the SerfHealth check for the server. - SerfStatusRaw serf.MemberStatus `json:"-"` - SerfStatus string + SerfStatus serf.MemberStatus // LastContact is the time since this node's last contact with the leader. - LastContactRaw time.Duration `json:"-"` - LastContact string + LastContact time.Duration // LastTerm is the highest leader term this server has a record of in its Raft log. LastTerm uint64 @@ -129,10 +127,50 @@ type ServerHealth struct { // Autopilot config. Healthy bool - // StableSince is the amount of time since this server's Healthy value last changed. + // StableSince is the last time this server's Healthy value changed. StableSince time.Time } +// IsHealthy determines whether this ServerHealth is considered healthy +// based on the given Autopilot config +func (h *ServerHealth) IsHealthy(lastTerm uint64, lastIndex uint64, autopilotConf *AutopilotConfig) bool { + if h.SerfStatus != serf.StatusAlive { + return false + } + + if h.LastContact > autopilotConf.LastContactThreshold || h.LastContact < 0 { + return false + } + + if h.LastTerm != lastTerm { + return false + } + + if lastIndex > autopilotConf.MaxTrailingLogs && h.LastIndex < lastIndex-autopilotConf.MaxTrailingLogs { + return false + } + + return true +} + +// IsStable returns true if the ServerHealth is in a stable, passing state +// according to the given AutopilotConfig +func (h *ServerHealth) IsStable(now time.Time, conf *AutopilotConfig) bool { + if h == nil { + return false + } + + if !h.Healthy { + return false + } + + if now.Sub(h.StableSince) < conf.ServerStabilizationTime { + return false + } + + return true +} + // ServerStats holds miscellaneous Raft metrics for a server type ServerStats struct { // LastContact is the time since this node's last contact with the leader. diff --git a/consul/structs/operator_test.go b/consul/structs/operator_test.go new file mode 100644 index 0000000000..8a0916f113 --- /dev/null +++ b/consul/structs/operator_test.go @@ -0,0 +1,94 @@ +package structs + +import ( + "testing" + "time" + + "github.com/hashicorp/serf/serf" +) + +func TestServerHealth_IsHealthy(t *testing.T) { + cases := []struct { + health ServerHealth + lastTerm uint64 + lastIndex uint64 + conf AutopilotConfig + expected bool + }{ + // Healthy server, all values within allowed limits + { + health: ServerHealth{SerfStatus: serf.StatusAlive, LastTerm: 1, LastIndex: 0}, + lastTerm: 1, + lastIndex: 10, + conf: AutopilotConfig{MaxTrailingLogs: 20}, + expected: true, + }, + // Serf status failed + { + health: ServerHealth{SerfStatus: serf.StatusFailed}, + expected: false, + }, + // Old value for lastTerm + { + health: ServerHealth{SerfStatus: serf.StatusAlive, LastTerm: 0}, + lastTerm: 1, + expected: false, + }, + // Too far behind on logs + { + health: ServerHealth{SerfStatus: serf.StatusAlive, LastIndex: 0}, + lastIndex: 10, + conf: AutopilotConfig{MaxTrailingLogs: 5}, + expected: false, + }, + } + + for index, tc := range cases { + actual := tc.health.IsHealthy(tc.lastTerm, tc.lastIndex, &tc.conf) + if actual != tc.expected { + t.Fatalf("bad value for case %d: %v", index, actual) + } + } +} + +func TestServerHealth_IsStable(t *testing.T) { + start := time.Now() + cases := []struct { + health *ServerHealth + now time.Time + conf AutopilotConfig + expected bool + }{ + // Healthy server, all values within allowed limits + { + health: &ServerHealth{Healthy: true, StableSince: start}, + now: start.Add(15 * time.Second), + conf: AutopilotConfig{ServerStabilizationTime: 10 * time.Second}, + expected: true, + }, + // Unhealthy server + { + health: &ServerHealth{Healthy: false}, + expected: false, + }, + // Healthy server, hasn't reached stabilization time + { + health: &ServerHealth{Healthy: true, StableSince: start}, + now: start.Add(5 * time.Second), + conf: AutopilotConfig{ServerStabilizationTime: 10 * time.Second}, + expected: false, + }, + // Nil struct + { + health: nil, + expected: false, + }, + } + + for index, tc := range cases { + actual := tc.health.IsStable(tc.now, &tc.conf) + if actual != tc.expected { + t.Fatalf("bad value for case %d: %v", index, actual) + } + } +} diff --git a/testutil/server.go b/testutil/server.go index bdbf611782..ad350c01f0 100644 --- a/testutil/server.go +++ b/testutil/server.go @@ -65,6 +65,7 @@ type TestServerConfig struct { Bind string `json:"bind_addr,omitempty"` Addresses *TestAddressConfig `json:"addresses,omitempty"` Ports *TestPortConfig `json:"ports,omitempty"` + RaftProtocol int `json:"raft_protocol,omitempty"` ACLMasterToken string `json:"acl_master_token,omitempty"` ACLDatacenter string `json:"acl_datacenter,omitempty"` ACLDefaultPolicy string `json:"acl_default_policy,omitempty"` diff --git a/website/source/docs/agent/http/operator.html.markdown b/website/source/docs/agent/http/operator.html.markdown index d9aac210fb..dabc9980ac 100644 --- a/website/source/docs/agent/http/operator.html.markdown +++ b/website/source/docs/agent/http/operator.html.markdown @@ -289,9 +289,9 @@ A JSON body is returned that looks like this: ```javascript { "CleanupDeadServers": true, - "LastContactThreshold": 200000000, + "LastContactThreshold": "200ms", "MaxTrailingLogs": 250, - "ServerStabilizationTime": 10000000000, + "ServerStabilizationTime": "10s", "CreateIndex": 4, "ModifyIndex": 4 } @@ -318,9 +318,9 @@ body must look like: ```javascript { "CleanupDeadServers": true, - "LastContactThreshold": 200000000, + "LastContactThreshold": "200ms", "MaxTrailingLogs": 250, - "ServerStabilizationTime": 10000000000, + "ServerStabilizationTime": "10s", "CreateIndex": 4, "ModifyIndex": 4 } @@ -361,7 +361,7 @@ A JSON body is returned that looks like this: "ID": "e349749b-3303-3ddf-959c-b5885a0e1f6e", "Name": "node1", "SerfStatus": "alive", - "LastContact": "leader", + "LastContact": "0s", "LastTerm": 2, "LastIndex": 46, "Healthy": true, diff --git a/website/source/docs/agent/options.html.markdown b/website/source/docs/agent/options.html.markdown index 7c53eb4117..4b85c47be1 100644 --- a/website/source/docs/agent/options.html.markdown +++ b/website/source/docs/agent/options.html.markdown @@ -562,7 +562,8 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass The following sub-keys are available: * `cleanup_dead_servers` - This controls - the automatic removal of dead server nodes whenever a new server is added to the cluster. Defaults to `true`. + the automatic removal of dead server nodes periodically and whenever a new server is added to the cluster. + Defaults to `true`. * `last_contact_threshold` - Controls the maximum amount of time a server can go without contact from the leader before being considered unhealthy. @@ -575,7 +576,7 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass * `server_stabilization_time` - Controls the minimum amount of time a server must be stable in the 'healthy' state before being added to the cluster. Only takes effect if all servers are running Raft protocol version 3 or higher. Must be a duration value - such as `10s`. Defaults to `30s`. + such as `30s`. Defaults to `10s`. * `bootstrap` Equivalent to the [`-bootstrap` command-line flag](#_bootstrap). diff --git a/website/source/docs/commands/operator/autopilot.html.markdown.erb b/website/source/docs/commands/operator/autopilot.html.markdown.erb index a2e00c267e..f61e872b15 100644 --- a/website/source/docs/commands/operator/autopilot.html.markdown.erb +++ b/website/source/docs/commands/operator/autopilot.html.markdown.erb @@ -62,7 +62,7 @@ Usage: `consul operator autopilot set-config [options]` upon the successful joining of new servers to the cluster. Must be one of `[true|false]`. * `last-contact-threshold` - Controls the maximum amount of time a server can go without contact -from the leader before being considered unhealthy. Must be a duration value such as `10s`. +from the leader before being considered unhealthy. Must be a duration value such as `200ms`. * `max-trailing-logs` - Controls the maximum number of log entries that a server can trail the leader by before being considered unhealthy.