mirror of
https://github.com/status-im/consul.git
synced 2025-01-10 13:55:55 +00:00
Cleaned up and reorganized some autopilot-related code
This commit is contained in:
parent
ab0e412db4
commit
9b4497de09
100
api/operator.go
100
api/operator.go
@ -82,7 +82,7 @@ type AutopilotConfiguration struct {
|
||||
|
||||
// LastContactThreshold is the limit on the amount of time a server can go
|
||||
// without leader contact before being considered unhealthy.
|
||||
LastContactThreshold time.Duration
|
||||
LastContactThreshold *ReadableDuration
|
||||
|
||||
// MaxTrailingLogs is the amount of entries in the Raft Log that a server can
|
||||
// be behind before being considered unhealthy.
|
||||
@ -91,7 +91,7 @@ type AutopilotConfiguration struct {
|
||||
// ServerStabilizationTime is the minimum amount of time a server must be
|
||||
// in a stable, healthy state before it can be added to the cluster. Only
|
||||
// applicable with Raft protocol version 3 or higher.
|
||||
ServerStabilizationTime time.Duration
|
||||
ServerStabilizationTime *ReadableDuration
|
||||
|
||||
// CreateIndex holds the index corresponding the creation of this configuration.
|
||||
// This is a read-only field.
|
||||
@ -104,6 +104,84 @@ type AutopilotConfiguration struct {
|
||||
ModifyIndex uint64
|
||||
}
|
||||
|
||||
// ServerHealth is the health (from the leader's point of view) of a server.
|
||||
type ServerHealth struct {
|
||||
// ID is the raft ID of the server.
|
||||
ID string
|
||||
|
||||
// Name is the node name of the server.
|
||||
Name string
|
||||
|
||||
// The status of the SerfHealth check for the server.
|
||||
SerfStatus string
|
||||
|
||||
// LastContact is the time since this node's last contact with the leader.
|
||||
LastContact *ReadableDuration
|
||||
|
||||
// LastTerm is the highest leader term this server has a record of in its Raft log.
|
||||
LastTerm uint64
|
||||
|
||||
// LastIndex is the last log index this server has a record of in its Raft log.
|
||||
LastIndex uint64
|
||||
|
||||
// Healthy is whether or not the server is healthy according to the current
|
||||
// Autopilot config.
|
||||
Healthy bool
|
||||
|
||||
// StableSince is the last time this server's Healthy value changed.
|
||||
StableSince time.Time
|
||||
}
|
||||
|
||||
// OperatorHealthReply is a representation of the overall health of the cluster
|
||||
type OperatorHealthReply struct {
|
||||
// Healthy is true if all the servers in the cluster are healthy.
|
||||
Healthy bool
|
||||
|
||||
// FailureTolerance is the number of healthy servers that could be lost without
|
||||
// an outage occurring.
|
||||
FailureTolerance int
|
||||
|
||||
// Servers holds the health of each server.
|
||||
Servers []ServerHealth
|
||||
}
|
||||
|
||||
// ReadableDuration is a duration type that is serialized to JSON in human readable format.
|
||||
type ReadableDuration time.Duration
|
||||
|
||||
func NewReadableDuration(dur time.Duration) *ReadableDuration {
|
||||
d := ReadableDuration(dur)
|
||||
return &d
|
||||
}
|
||||
|
||||
func (d *ReadableDuration) String() string { return d.Duration().String() }
|
||||
func (d *ReadableDuration) Duration() time.Duration {
|
||||
if d == nil {
|
||||
return time.Duration(0)
|
||||
}
|
||||
return time.Duration(*d)
|
||||
}
|
||||
|
||||
func (d *ReadableDuration) MarshalJSON() ([]byte, error) {
|
||||
return []byte(fmt.Sprintf(`"%s"`, d.Duration().String())), nil
|
||||
}
|
||||
|
||||
func (d *ReadableDuration) UnmarshalJSON(raw []byte) error {
|
||||
if d == nil {
|
||||
return fmt.Errorf("cannot unmarshal to nil pointer")
|
||||
}
|
||||
|
||||
str := string(raw)
|
||||
if len(str) < 2 || str[0] != '"' || str[len(str)-1] != '"' {
|
||||
return fmt.Errorf("must be enclosed with quotes: %s", str)
|
||||
}
|
||||
dur, err := time.ParseDuration(str[1 : len(str)-1])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*d = ReadableDuration(dur)
|
||||
return nil
|
||||
}
|
||||
|
||||
// RaftGetConfiguration is used to query the current Raft peer set.
|
||||
func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) {
|
||||
r := op.c.newRequest("GET", "/v1/operator/raft/configuration")
|
||||
@ -217,6 +295,7 @@ func (op *Operator) AutopilotGetConfiguration(q *QueryOptions) (*AutopilotConfig
|
||||
if err := decodeBody(resp, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &out, nil
|
||||
}
|
||||
|
||||
@ -255,3 +334,20 @@ func (op *Operator) AutopilotCASConfiguration(conf *AutopilotConfiguration, q *W
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// AutopilotServerHealth
|
||||
func (op *Operator) AutopilotServerHealth(q *QueryOptions) (*OperatorHealthReply, error) {
|
||||
r := op.c.newRequest("GET", "/v1/operator/autopilot/health")
|
||||
r.setQueryOptions(q)
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var out OperatorHealthReply
|
||||
if err := decodeBody(resp, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &out, nil
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
@ -178,3 +179,28 @@ func TestOperator_AutopilotCASConfiguration(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_ServerHealth(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) {
|
||||
c.RaftProtocol = 3
|
||||
})
|
||||
defer s.Stop()
|
||||
|
||||
operator := c.Operator()
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
out, err := operator.AutopilotServerHealth(nil)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("err: %v", err)
|
||||
}
|
||||
if len(out.Servers) != 1 ||
|
||||
!out.Servers[0].Healthy ||
|
||||
out.Servers[0].Name != s.Config.NodeName {
|
||||
return false, fmt.Errorf("bad: %v", out)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatal(err)
|
||||
})
|
||||
}
|
||||
|
@ -4,10 +4,13 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/raft"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// OperatorRaftConfiguration is used to inspect the current Raft configuration.
|
||||
@ -183,12 +186,35 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return reply, nil
|
||||
out := api.AutopilotConfiguration{
|
||||
CleanupDeadServers: reply.CleanupDeadServers,
|
||||
LastContactThreshold: api.NewReadableDuration(reply.LastContactThreshold),
|
||||
MaxTrailingLogs: reply.MaxTrailingLogs,
|
||||
ServerStabilizationTime: api.NewReadableDuration(reply.ServerStabilizationTime),
|
||||
CreateIndex: reply.CreateIndex,
|
||||
ModifyIndex: reply.ModifyIndex,
|
||||
}
|
||||
|
||||
return out, nil
|
||||
case "PUT":
|
||||
var args structs.AutopilotSetConfigRequest
|
||||
s.parseDC(req, &args.Datacenter)
|
||||
s.parseToken(req, &args.Token)
|
||||
|
||||
var conf api.AutopilotConfiguration
|
||||
if err := decodeBody(req, &conf, FixupConfigDurations); err != nil {
|
||||
resp.WriteHeader(400)
|
||||
resp.Write([]byte(fmt.Sprintf("Error parsing autopilot config: %v", err)))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
args.Config = structs.AutopilotConfig{
|
||||
CleanupDeadServers: conf.CleanupDeadServers,
|
||||
LastContactThreshold: conf.LastContactThreshold.Duration(),
|
||||
MaxTrailingLogs: conf.MaxTrailingLogs,
|
||||
ServerStabilizationTime: conf.ServerStabilizationTime.Duration(),
|
||||
}
|
||||
|
||||
// Check for cas value
|
||||
params := req.URL.Query()
|
||||
if _, ok := params["cas"]; ok {
|
||||
@ -202,12 +228,6 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re
|
||||
args.CAS = true
|
||||
}
|
||||
|
||||
if err := decodeBody(req, &args.Config, nil); err != nil {
|
||||
resp.WriteHeader(400)
|
||||
resp.Write([]byte(fmt.Sprintf("Error parsing autopilot config: %v", err)))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var reply bool
|
||||
if err := s.agent.RPC("Operator.AutopilotSetConfiguration", &args, &reply); err != nil {
|
||||
return nil, err
|
||||
@ -225,6 +245,29 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re
|
||||
}
|
||||
}
|
||||
|
||||
// FixupConfigDurations is used to handle parsing the duration fields in
|
||||
// the Autopilot config struct
|
||||
func FixupConfigDurations(raw interface{}) error {
|
||||
rawMap, ok := raw.(map[string]interface{})
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
for key, val := range rawMap {
|
||||
if strings.ToLower(key) == "lastcontactthreshold" ||
|
||||
strings.ToLower(key) == "serverstabilizationtime" {
|
||||
// Convert a string value into an integer
|
||||
if vStr, ok := val.(string); ok {
|
||||
dur, err := time.ParseDuration(vStr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rawMap[key] = dur
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// OperatorServerHealth is used to get the health of the servers in the local DC
|
||||
func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
if req.Method != "GET" {
|
||||
@ -247,5 +290,22 @@ func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Re
|
||||
resp.WriteHeader(http.StatusTooManyRequests)
|
||||
}
|
||||
|
||||
return reply, nil
|
||||
out := &api.OperatorHealthReply{
|
||||
Healthy: reply.Healthy,
|
||||
FailureTolerance: reply.FailureTolerance,
|
||||
}
|
||||
for _, server := range reply.Servers {
|
||||
out.Servers = append(out.Servers, api.ServerHealth{
|
||||
ID: server.ID,
|
||||
Name: server.Name,
|
||||
SerfStatus: server.SerfStatus.String(),
|
||||
LastContact: api.NewReadableDuration(server.LastContact),
|
||||
LastTerm: server.LastTerm,
|
||||
LastIndex: server.LastIndex,
|
||||
Healthy: server.Healthy,
|
||||
StableSince: server.StableSince.Round(time.Second).UTC(),
|
||||
})
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
@ -7,10 +7,11 @@ import (
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
func TestOperator_OperatorRaftConfiguration(t *testing.T) {
|
||||
@ -304,7 +305,7 @@ func TestOperator_AutopilotGetConfiguration(t *testing.T) {
|
||||
if resp.Code != 200 {
|
||||
t.Fatalf("bad code: %d", resp.Code)
|
||||
}
|
||||
out, ok := obj.(structs.AutopilotConfig)
|
||||
out, ok := obj.(api.AutopilotConfiguration)
|
||||
if !ok {
|
||||
t.Fatalf("unexpected: %T", obj)
|
||||
}
|
||||
@ -424,7 +425,10 @@ func TestOperator_AutopilotCASConfiguration(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestOperator_OperatorServerHealth(t *testing.T) {
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
cb := func(c *Config) {
|
||||
c.RaftProtocol = 3
|
||||
}
|
||||
httpTestWithConfig(t, func(srv *HTTPServer) {
|
||||
body := bytes.NewBuffer(nil)
|
||||
req, err := http.NewRequest("GET", "/v1/operator/autopilot/health", body)
|
||||
if err != nil {
|
||||
@ -440,14 +444,14 @@ func TestOperator_OperatorServerHealth(t *testing.T) {
|
||||
if resp.Code != 200 {
|
||||
return false, fmt.Errorf("bad code: %d", resp.Code)
|
||||
}
|
||||
out, ok := obj.(structs.OperatorHealthReply)
|
||||
out, ok := obj.(*api.OperatorHealthReply)
|
||||
if !ok {
|
||||
return false, fmt.Errorf("unexpected: %T", obj)
|
||||
}
|
||||
if len(out.Servers) != 1 ||
|
||||
!out.Servers[0].Healthy ||
|
||||
out.Servers[0].Name != srv.agent.config.NodeName ||
|
||||
out.Servers[0].SerfStatusRaw != serf.StatusAlive ||
|
||||
out.Servers[0].SerfStatus != "alive" ||
|
||||
out.FailureTolerance != 0 {
|
||||
return false, fmt.Errorf("bad: %v", out)
|
||||
}
|
||||
@ -457,5 +461,45 @@ func TestOperator_OperatorServerHealth(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
})
|
||||
|
||||
})
|
||||
}, cb)
|
||||
}
|
||||
|
||||
func TestOperator_OperatorServerHealth_Unhealthy(t *testing.T) {
|
||||
threshold := time.Duration(-1)
|
||||
cb := func(c *Config) {
|
||||
c.RaftProtocol = 3
|
||||
c.Autopilot.LastContactThreshold = &threshold
|
||||
}
|
||||
httpTestWithConfig(t, func(srv *HTTPServer) {
|
||||
body := bytes.NewBuffer(nil)
|
||||
req, err := http.NewRequest("GET", "/v1/operator/autopilot/health", body)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.OperatorServerHealth(resp, req)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("err: %v", err)
|
||||
}
|
||||
if resp.Code != 429 {
|
||||
return false, fmt.Errorf("bad code: %d", resp.Code)
|
||||
}
|
||||
out, ok := obj.(*api.OperatorHealthReply)
|
||||
if !ok {
|
||||
return false, fmt.Errorf("unexpected: %T", obj)
|
||||
}
|
||||
if len(out.Servers) != 1 ||
|
||||
out.Healthy ||
|
||||
out.Servers[0].Name != srv.agent.config.NodeName {
|
||||
return false, fmt.Errorf("bad: %v", out)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatal(err)
|
||||
})
|
||||
|
||||
}, cb)
|
||||
}
|
||||
|
@ -3,10 +3,10 @@ package command
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/command/base"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/command/base"
|
||||
)
|
||||
|
||||
type OperatorAutopilotSetCommand struct {
|
||||
@ -30,9 +30,9 @@ func (c *OperatorAutopilotSetCommand) Synopsis() string {
|
||||
|
||||
func (c *OperatorAutopilotSetCommand) Run(args []string) int {
|
||||
var cleanupDeadServers base.BoolValue
|
||||
var lastContactThresholdRaw string
|
||||
var maxTrailingLogs base.UintValue
|
||||
var serverStabilizationTimeRaw string
|
||||
var lastContactThreshold base.DurationValue
|
||||
var serverStabilizationTime base.DurationValue
|
||||
|
||||
f := c.Command.NewFlagSet(c)
|
||||
|
||||
@ -42,11 +42,11 @@ func (c *OperatorAutopilotSetCommand) Run(args []string) int {
|
||||
f.Var(&maxTrailingLogs, "max-trailing-logs",
|
||||
"Controls the maximum number of log entries that a server can trail the "+
|
||||
"leader by before being considered unhealthy.")
|
||||
f.StringVar(&lastContactThresholdRaw, "last-contact-threshold", "",
|
||||
f.Var(&lastContactThreshold, "last-contact-threshold",
|
||||
"Controls the maximum amount of time a server can go without contact "+
|
||||
"from the leader before being considered unhealthy. Must be a duration value "+
|
||||
"such as `10s`.")
|
||||
f.StringVar(&serverStabilizationTimeRaw, "server-stabilization-time", "",
|
||||
"such as `200ms`.")
|
||||
f.Var(&serverStabilizationTime, "server-stabilization-time",
|
||||
"Controls the minimum amount of time a server must be stable in the "+
|
||||
"'healthy' state before being added to the cluster. Only takes effect if all "+
|
||||
"servers are running Raft protocol version 3 or higher. Must be a duration "+
|
||||
@ -77,25 +77,18 @@ func (c *OperatorAutopilotSetCommand) Run(args []string) int {
|
||||
|
||||
// Update the config values based on the set flags.
|
||||
cleanupDeadServers.Merge(&conf.CleanupDeadServers)
|
||||
|
||||
trailing := uint(conf.MaxTrailingLogs)
|
||||
maxTrailingLogs.Merge(&trailing)
|
||||
conf.MaxTrailingLogs = uint64(trailing)
|
||||
|
||||
if lastContactThresholdRaw != "" {
|
||||
dur, err := time.ParseDuration(lastContactThresholdRaw)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("invalid value for last-contact-threshold: %v", err))
|
||||
return 1
|
||||
}
|
||||
conf.LastContactThreshold = dur
|
||||
}
|
||||
if serverStabilizationTimeRaw != "" {
|
||||
dur, err := time.ParseDuration(serverStabilizationTimeRaw)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("invalid value for server-stabilization-time: %v", err))
|
||||
}
|
||||
conf.ServerStabilizationTime = dur
|
||||
}
|
||||
last := time.Duration(*conf.LastContactThreshold)
|
||||
lastContactThreshold.Merge(&last)
|
||||
conf.LastContactThreshold = api.NewReadableDuration(last)
|
||||
|
||||
stablization := time.Duration(*conf.ServerStabilizationTime)
|
||||
serverStabilizationTime.Merge(&stablization)
|
||||
conf.ServerStabilizationTime = api.NewReadableDuration(stablization)
|
||||
|
||||
// Check-and-set the new configuration.
|
||||
result, err := operator.AutopilotCASConfiguration(conf, nil)
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"github.com/hashicorp/consul/command/base"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/mitchellh/cli"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestOperator_Autopilot_Set_Implements(t *testing.T) {
|
||||
@ -25,7 +26,13 @@ func TestOperator_Autopilot_Set(t *testing.T) {
|
||||
Flags: base.FlagSetHTTP,
|
||||
},
|
||||
}
|
||||
args := []string{"-http-addr=" + a1.httpAddr, "-cleanup-dead-servers=false"}
|
||||
args := []string{
|
||||
"-http-addr=" + a1.httpAddr,
|
||||
"-cleanup-dead-servers=false",
|
||||
"-max-trailing-logs=99",
|
||||
"-last-contact-threshold=123ms",
|
||||
"-server-stabilization-time=123ms",
|
||||
}
|
||||
|
||||
code := c.Run(args)
|
||||
if code != 0 {
|
||||
@ -47,4 +54,13 @@ func TestOperator_Autopilot_Set(t *testing.T) {
|
||||
if reply.CleanupDeadServers {
|
||||
t.Fatalf("bad: %#v", reply)
|
||||
}
|
||||
if reply.MaxTrailingLogs != 99 {
|
||||
t.Fatalf("bad: %#v", reply)
|
||||
}
|
||||
if reply.LastContactThreshold != 123*time.Millisecond {
|
||||
t.Fatalf("bad: %#v", reply)
|
||||
}
|
||||
if reply.ServerStabilizationTime != 123*time.Millisecond {
|
||||
t.Fatalf("bad: %#v", reply)
|
||||
}
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package consul
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/agent"
|
||||
@ -19,70 +20,38 @@ type AutopilotPolicy interface {
|
||||
|
||||
func (s *Server) startAutopilot() {
|
||||
s.autopilotShutdownCh = make(chan struct{})
|
||||
s.autopilotWaitGroup = sync.WaitGroup{}
|
||||
s.autopilotWaitGroup.Add(1)
|
||||
|
||||
go s.serverHealthLoop()
|
||||
go s.removeDeadLoop()
|
||||
go s.autopilotLoop()
|
||||
}
|
||||
|
||||
func (s *Server) stopAutopilot() {
|
||||
close(s.autopilotShutdownCh)
|
||||
s.autopilotWaitGroup.Wait()
|
||||
}
|
||||
|
||||
// serverHealthLoop monitors the health of the servers in the cluster
|
||||
func (s *Server) serverHealthLoop() {
|
||||
// autopilotLoop periodically looks for nonvoting servers to promote and dead servers to remove.
|
||||
func (s *Server) autopilotLoop() {
|
||||
// Monitor server health until shutdown
|
||||
ticker := time.NewTicker(s.config.ServerHealthInterval)
|
||||
ticker := time.NewTicker(s.config.AutopilotInterval)
|
||||
for {
|
||||
select {
|
||||
case <-s.autopilotShutdownCh:
|
||||
ticker.Stop()
|
||||
s.autopilotWaitGroup.Done()
|
||||
return
|
||||
case <-ticker.C:
|
||||
serverHealths := make(map[string]*structs.ServerHealth)
|
||||
|
||||
state := s.fsm.State()
|
||||
_, autopilotConf, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] consul: error retrieving autopilot config: %s", err)
|
||||
}
|
||||
|
||||
// Build an updated map of server healths
|
||||
for _, member := range s.LANMembers() {
|
||||
if member.Status == serf.StatusLeft {
|
||||
continue
|
||||
}
|
||||
|
||||
valid, parts := agent.IsConsulServer(member)
|
||||
if valid {
|
||||
health, err := s.queryServerHealth(member, parts, autopilotConf)
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] consul: error fetching server health: %s", err)
|
||||
} else {
|
||||
serverHealths[parts.Addr.String()] = health
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
s.autopilotLock.Lock()
|
||||
s.autopilotHealth = serverHealths
|
||||
s.autopilotLock.Unlock()
|
||||
|
||||
if err := s.autopilotPolicy.PromoteNonVoters(autopilotConf); err != nil {
|
||||
s.logger.Printf("[ERR] consul: error checking for non-voters to promote: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// removeDeadLoop checks for dead servers periodically, or when receiving on autopilotRemoveDeadCh
|
||||
func (s *Server) removeDeadLoop() {
|
||||
ticker := time.NewTicker(s.config.RemoveDeadInterval)
|
||||
for {
|
||||
select {
|
||||
case <-s.autopilotShutdownCh:
|
||||
ticker.Stop()
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := s.pruneDeadServers(); err != nil {
|
||||
s.logger.Printf("[ERR] consul: error checking for dead servers to remove: %s", err)
|
||||
}
|
||||
@ -113,13 +82,18 @@ func (s *Server) pruneDeadServers() error {
|
||||
}
|
||||
}
|
||||
|
||||
// Nothing to remove, return early
|
||||
if len(failed) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
peers, err := s.numPeers()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Only do removals if a minority of servers will be affected
|
||||
if len(failed) <= peers/2 {
|
||||
if len(failed) < peers/2 || (len(failed) == 1 && peers >= 3) {
|
||||
for _, server := range failed {
|
||||
s.logger.Printf("[INFO] consul: Attempting removal of failed server: %v", server)
|
||||
go s.serfLAN.RemoveFailedNode(server)
|
||||
@ -160,8 +134,8 @@ func (b *BasicAutopilot) PromoteNonVoters(autopilotConf *structs.AutopilotConfig
|
||||
for _, server := range raftServers {
|
||||
// If this server has been stable and passing for long enough, promote it to a voter
|
||||
if server.Suffrage == raft.Nonvoter {
|
||||
health := b.server.getServerHealth(string(server.Address))
|
||||
if health != nil && health.Healthy && time.Now().Sub(health.StableSince) >= autopilotConf.ServerStabilizationTime {
|
||||
health := b.server.getServerHealth(string(server.ID))
|
||||
if health.IsStable(time.Now(), autopilotConf) {
|
||||
promotions = append(promotions, server)
|
||||
}
|
||||
} else {
|
||||
@ -210,6 +184,68 @@ func (b *BasicAutopilot) PromoteNonVoters(autopilotConf *structs.AutopilotConfig
|
||||
return nil
|
||||
}
|
||||
|
||||
// serverHealthLoop monitors the health of the servers in the cluster
|
||||
func (s *Server) serverHealthLoop() {
|
||||
// Monitor server health until shutdown
|
||||
ticker := time.NewTicker(s.config.ServerHealthInterval)
|
||||
for {
|
||||
select {
|
||||
case <-s.shutdownCh:
|
||||
ticker.Stop()
|
||||
return
|
||||
case <-ticker.C:
|
||||
serverHealths := make(map[string]*structs.ServerHealth)
|
||||
|
||||
// Don't do anything if the min Raft version is too low
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers())
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] consul: error getting server raft protocol versions: %s", err)
|
||||
break
|
||||
}
|
||||
if minRaftProtocol < 3 {
|
||||
break
|
||||
}
|
||||
|
||||
state := s.fsm.State()
|
||||
_, autopilotConf, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] consul: error retrieving autopilot config: %s", err)
|
||||
break
|
||||
}
|
||||
// Bail early if autopilot config hasn't been initialized yet
|
||||
if autopilotConf == nil {
|
||||
break
|
||||
}
|
||||
|
||||
// Build an updated map of server healths
|
||||
for _, member := range s.LANMembers() {
|
||||
if member.Status == serf.StatusLeft {
|
||||
continue
|
||||
}
|
||||
|
||||
valid, parts := agent.IsConsulServer(member)
|
||||
if valid {
|
||||
health, err := s.queryServerHealth(member, parts, autopilotConf)
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] consul: error fetching server health: %s", err)
|
||||
serverHealths[parts.ID] = &structs.ServerHealth{
|
||||
ID: parts.ID,
|
||||
Name: parts.Name,
|
||||
Healthy: false,
|
||||
}
|
||||
} else {
|
||||
serverHealths[parts.ID] = health
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
s.serverHealthLock.Lock()
|
||||
s.serverHealths = serverHealths
|
||||
s.serverHealthLock.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// queryServerHealth fetches the raft stats for the given server and uses them
|
||||
// to update its ServerHealth
|
||||
func (s *Server) queryServerHealth(member serf.Member, server *agent.Server,
|
||||
@ -222,16 +258,14 @@ func (s *Server) queryServerHealth(member serf.Member, server *agent.Server,
|
||||
health := &structs.ServerHealth{
|
||||
ID: server.ID,
|
||||
Name: server.Name,
|
||||
SerfStatusRaw: member.Status,
|
||||
SerfStatus: member.Status.String(),
|
||||
LastContactRaw: -1,
|
||||
LastContact: stats.LastContact,
|
||||
SerfStatus: member.Status,
|
||||
LastContact: -1,
|
||||
LastTerm: stats.LastTerm,
|
||||
LastIndex: stats.LastIndex,
|
||||
}
|
||||
|
||||
if health.LastContact != "never" {
|
||||
health.LastContactRaw, err = time.ParseDuration(health.LastContact)
|
||||
if stats.LastContact != "never" {
|
||||
health.LastContact, err = time.ParseDuration(stats.LastContact)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error parsing last_contact duration: %s", err)
|
||||
}
|
||||
@ -239,14 +273,17 @@ func (s *Server) queryServerHealth(member serf.Member, server *agent.Server,
|
||||
|
||||
// Set LastContact to 0 for the leader
|
||||
if s.config.NodeName == member.Name {
|
||||
health.LastContactRaw = 0
|
||||
health.LastContact = "leader"
|
||||
health.LastContact = 0
|
||||
}
|
||||
|
||||
health.Healthy = s.isServerHealthy(health, autopilotConf)
|
||||
lastTerm, err := strconv.ParseUint(s.raft.Stats()["last_log_term"], 10, 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error parsing last_log_term: %s", err)
|
||||
}
|
||||
health.Healthy = health.IsHealthy(lastTerm, s.raft.LastIndex(), autopilotConf)
|
||||
|
||||
// If this is a new server or the health changed, reset StableSince
|
||||
lastHealth := s.getServerHealth(server.Addr.String())
|
||||
lastHealth := s.getServerHealth(server.ID)
|
||||
if lastHealth == nil || lastHealth.Healthy != health.Healthy {
|
||||
health.StableSince = time.Now()
|
||||
} else {
|
||||
@ -256,10 +293,10 @@ func (s *Server) queryServerHealth(member serf.Member, server *agent.Server,
|
||||
return health, nil
|
||||
}
|
||||
|
||||
func (s *Server) getServerHealth(addr string) *structs.ServerHealth {
|
||||
s.autopilotLock.RLock()
|
||||
defer s.autopilotLock.RUnlock()
|
||||
h, ok := s.autopilotHealth[addr]
|
||||
func (s *Server) getServerHealth(id string) *structs.ServerHealth {
|
||||
s.serverHealthLock.RLock()
|
||||
defer s.serverHealthLock.RUnlock()
|
||||
h, ok := s.serverHealths[id]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
@ -272,27 +309,3 @@ func (s *Server) getServerStats(server *agent.Server) (structs.ServerStats, erro
|
||||
err := s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply)
|
||||
return reply, err
|
||||
}
|
||||
|
||||
// isServerHealthy determines whether the given ServerHealth is healthy
|
||||
// based on the current Autopilot config
|
||||
func (s *Server) isServerHealthy(health *structs.ServerHealth, autopilotConf *structs.AutopilotConfig) bool {
|
||||
if health.SerfStatusRaw != serf.StatusAlive {
|
||||
return false
|
||||
}
|
||||
|
||||
if health.LastContactRaw > autopilotConf.LastContactThreshold || health.LastContactRaw < 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
lastTerm, _ := strconv.ParseUint(s.raft.Stats()["last_log_term"], 10, 64)
|
||||
if health.LastTerm != lastTerm {
|
||||
return false
|
||||
}
|
||||
|
||||
if s.raft.LastIndex() > autopilotConf.MaxTrailingLogs &&
|
||||
health.LastIndex < s.raft.LastIndex()-autopilotConf.MaxTrailingLogs {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
@ -85,7 +85,7 @@ func TestAutopilot_CleanupDeadServerPeriodic(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.Bootstrap = true
|
||||
c.RemoveDeadInterval = 100 * time.Millisecond
|
||||
c.AutopilotInterval = 100 * time.Millisecond
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
@ -144,6 +144,7 @@ func TestAutopilot_PromoteNonVoter(t *testing.T) {
|
||||
c.RaftConfig.ProtocolVersion = 3
|
||||
c.AutopilotConfig.ServerStabilizationTime = 200 * time.Millisecond
|
||||
c.ServerHealthInterval = 100 * time.Millisecond
|
||||
c.AutopilotInterval = 100 * time.Millisecond
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
@ -182,7 +183,7 @@ func TestAutopilot_PromoteNonVoter(t *testing.T) {
|
||||
if servers[1].Suffrage != raft.Nonvoter {
|
||||
return false, fmt.Errorf("bad: %v", servers)
|
||||
}
|
||||
health := s1.getServerHealth(string(servers[1].Address))
|
||||
health := s1.getServerHealth(string(servers[1].ID))
|
||||
if health == nil {
|
||||
return false, fmt.Errorf("nil health")
|
||||
}
|
||||
|
@ -280,13 +280,14 @@ type Config struct {
|
||||
// bootstrapping.
|
||||
AutopilotConfig *structs.AutopilotConfig
|
||||
|
||||
// ServerHealthInterval is the frequency with which the leader will check
|
||||
// the health of the servers in the cluster
|
||||
// ServerHealthInterval is the frequency with which the health of the
|
||||
// servers in the cluster will be updated.
|
||||
ServerHealthInterval time.Duration
|
||||
|
||||
// RemoveDeadInterval is the frequency with which the leader will look for
|
||||
// dead servers to remove from the cluster
|
||||
RemoveDeadInterval time.Duration
|
||||
// AutopilotInterval is the frequency with which the leader will perform
|
||||
// autopilot tasks, such as promoting eligible non-voters and removing
|
||||
// dead servers.
|
||||
AutopilotInterval time.Duration
|
||||
}
|
||||
|
||||
// CheckVersion is used to check if the ProtocolVersion is valid
|
||||
@ -366,8 +367,8 @@ func DefaultConfig() *Config {
|
||||
MaxTrailingLogs: 250,
|
||||
ServerStabilizationTime: 10 * time.Second,
|
||||
},
|
||||
ServerHealthInterval: 1 * time.Second,
|
||||
RemoveDeadInterval: 30 * time.Second,
|
||||
ServerHealthInterval: 2 * time.Second,
|
||||
AutopilotInterval: 10 * time.Second,
|
||||
}
|
||||
|
||||
// Increase our reap interval to 3 days instead of 24h.
|
||||
|
@ -8,7 +8,6 @@ import (
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Operator endpoint is used to perform low-level operator tasks for Consul.
|
||||
@ -204,9 +203,16 @@ func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs
|
||||
return permissionDeniedErr
|
||||
}
|
||||
|
||||
status := structs.OperatorHealthReply{
|
||||
Healthy: true,
|
||||
// Exit early if the min Raft version is too low
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.LANMembers())
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting server raft protocol versions: %s", err)
|
||||
}
|
||||
if minRaftProtocol < 3 {
|
||||
return fmt.Errorf("all servers must have raft_protocol set to 3 or higher to use this endpoint")
|
||||
}
|
||||
|
||||
var status structs.OperatorHealthReply
|
||||
future := op.srv.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
return err
|
||||
@ -215,19 +221,15 @@ func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs
|
||||
healthyCount := 0
|
||||
servers := future.Configuration().Servers
|
||||
for _, s := range servers {
|
||||
health := op.srv.getServerHealth(string(s.Address))
|
||||
health := op.srv.getServerHealth(string(s.ID))
|
||||
if health != nil {
|
||||
// Fix up StableSince to be more readable
|
||||
health.StableSince = health.StableSince.Round(time.Second).UTC()
|
||||
|
||||
if !health.Healthy {
|
||||
status.Healthy = false
|
||||
} else {
|
||||
if health.Healthy {
|
||||
healthyCount++
|
||||
}
|
||||
status.Servers = append(status.Servers, *health)
|
||||
}
|
||||
}
|
||||
status.Healthy = healthyCount == len(servers)
|
||||
|
||||
// If we have extra healthy servers, set FailureTolerance
|
||||
if healthyCount > len(servers)/2+1 {
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/raft"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestOperator_RaftGetConfiguration(t *testing.T) {
|
||||
@ -428,16 +429,11 @@ func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestOperator_ServerHealth(t *testing.T) {
|
||||
for i := 1; i <= 3; i++ {
|
||||
testServerHealth(t, i)
|
||||
}
|
||||
}
|
||||
|
||||
func testServerHealth(t *testing.T, protocol int) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.Bootstrap = true
|
||||
c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(protocol)
|
||||
c.RaftConfig.ProtocolVersion = 3
|
||||
c.ServerHealthInterval = 100 * time.Millisecond
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
@ -447,7 +443,7 @@ func testServerHealth(t *testing.T, protocol int) {
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.Bootstrap = false
|
||||
c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(protocol)
|
||||
c.RaftConfig.ProtocolVersion = 3
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
@ -460,7 +456,7 @@ func testServerHealth(t *testing.T, protocol int) {
|
||||
dir3, s3 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.Bootstrap = false
|
||||
c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(protocol)
|
||||
c.RaftConfig.ProtocolVersion = 3
|
||||
})
|
||||
defer os.RemoveAll(dir3)
|
||||
defer s3.Shutdown()
|
||||
@ -488,13 +484,13 @@ func testServerHealth(t *testing.T, protocol int) {
|
||||
if len(reply.Servers) != 3 {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
}
|
||||
if reply.Servers[0].LastContact != "leader" {
|
||||
if reply.Servers[0].LastContact != 0 {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
}
|
||||
if reply.Servers[1].LastContactRaw <= 0 {
|
||||
if reply.Servers[1].LastContact <= 0 {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
}
|
||||
if reply.Servers[2].LastContactRaw <= 0 {
|
||||
if reply.Servers[2].LastContact <= 0 {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
}
|
||||
return true, nil
|
||||
@ -502,3 +498,24 @@ func testServerHealth(t *testing.T, protocol int) {
|
||||
t.Fatal(err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestOperator_ServerHealth_UnsupportedRaftVersion(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.Bootstrap = true
|
||||
c.RaftConfig.ProtocolVersion = 2
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
arg := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var reply structs.OperatorHealthReply
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply)
|
||||
if err == nil || !strings.Contains(err.Error(), "raft_protocol set to 3 or higher") {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -76,10 +76,6 @@ type Server struct {
|
||||
// aclCache is the non-authoritative ACL cache.
|
||||
aclCache *aclCache
|
||||
|
||||
// autopilotHealth stores the current view of server healths.
|
||||
autopilotHealth map[string]*structs.ServerHealth
|
||||
autopilotLock sync.RWMutex
|
||||
|
||||
// autopilotPolicy controls the behavior of Autopilot for certain tasks.
|
||||
autopilotPolicy AutopilotPolicy
|
||||
|
||||
@ -89,6 +85,9 @@ type Server struct {
|
||||
// autopilotShutdownCh is used to stop the Autopilot loop.
|
||||
autopilotShutdownCh chan struct{}
|
||||
|
||||
// autopilotWaitGroup is used to block until Autopilot shuts down.
|
||||
autopilotWaitGroup sync.WaitGroup
|
||||
|
||||
// Consul configuration
|
||||
config *Config
|
||||
|
||||
@ -158,6 +157,10 @@ type Server struct {
|
||||
sessionTimers map[string]*time.Timer
|
||||
sessionTimersLock sync.Mutex
|
||||
|
||||
// serverHealths stores the current view of server healths.
|
||||
serverHealths map[string]*structs.ServerHealth
|
||||
serverHealthLock sync.RWMutex
|
||||
|
||||
// tombstoneGC is used to track the pending GC invocations
|
||||
// for the KV tombstones
|
||||
tombstoneGC *state.TombstoneGC
|
||||
@ -315,6 +318,9 @@ func NewServer(config *Config) (*Server, error) {
|
||||
// Start the metrics handlers.
|
||||
go s.sessionStats()
|
||||
|
||||
// Start the server health checking.
|
||||
go s.serverHealthLoop()
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
|
@ -112,12 +112,10 @@ type ServerHealth struct {
|
||||
Name string
|
||||
|
||||
// The status of the SerfHealth check for the server.
|
||||
SerfStatusRaw serf.MemberStatus `json:"-"`
|
||||
SerfStatus string
|
||||
SerfStatus serf.MemberStatus
|
||||
|
||||
// LastContact is the time since this node's last contact with the leader.
|
||||
LastContactRaw time.Duration `json:"-"`
|
||||
LastContact string
|
||||
LastContact time.Duration
|
||||
|
||||
// LastTerm is the highest leader term this server has a record of in its Raft log.
|
||||
LastTerm uint64
|
||||
@ -129,10 +127,50 @@ type ServerHealth struct {
|
||||
// Autopilot config.
|
||||
Healthy bool
|
||||
|
||||
// StableSince is the amount of time since this server's Healthy value last changed.
|
||||
// StableSince is the last time this server's Healthy value changed.
|
||||
StableSince time.Time
|
||||
}
|
||||
|
||||
// IsHealthy determines whether this ServerHealth is considered healthy
|
||||
// based on the given Autopilot config
|
||||
func (h *ServerHealth) IsHealthy(lastTerm uint64, lastIndex uint64, autopilotConf *AutopilotConfig) bool {
|
||||
if h.SerfStatus != serf.StatusAlive {
|
||||
return false
|
||||
}
|
||||
|
||||
if h.LastContact > autopilotConf.LastContactThreshold || h.LastContact < 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
if h.LastTerm != lastTerm {
|
||||
return false
|
||||
}
|
||||
|
||||
if lastIndex > autopilotConf.MaxTrailingLogs && h.LastIndex < lastIndex-autopilotConf.MaxTrailingLogs {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// IsStable returns true if the ServerHealth is in a stable, passing state
|
||||
// according to the given AutopilotConfig
|
||||
func (h *ServerHealth) IsStable(now time.Time, conf *AutopilotConfig) bool {
|
||||
if h == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if !h.Healthy {
|
||||
return false
|
||||
}
|
||||
|
||||
if now.Sub(h.StableSince) < conf.ServerStabilizationTime {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// ServerStats holds miscellaneous Raft metrics for a server
|
||||
type ServerStats struct {
|
||||
// LastContact is the time since this node's last contact with the leader.
|
||||
|
94
consul/structs/operator_test.go
Normal file
94
consul/structs/operator_test.go
Normal file
@ -0,0 +1,94 @@
|
||||
package structs
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
func TestServerHealth_IsHealthy(t *testing.T) {
|
||||
cases := []struct {
|
||||
health ServerHealth
|
||||
lastTerm uint64
|
||||
lastIndex uint64
|
||||
conf AutopilotConfig
|
||||
expected bool
|
||||
}{
|
||||
// Healthy server, all values within allowed limits
|
||||
{
|
||||
health: ServerHealth{SerfStatus: serf.StatusAlive, LastTerm: 1, LastIndex: 0},
|
||||
lastTerm: 1,
|
||||
lastIndex: 10,
|
||||
conf: AutopilotConfig{MaxTrailingLogs: 20},
|
||||
expected: true,
|
||||
},
|
||||
// Serf status failed
|
||||
{
|
||||
health: ServerHealth{SerfStatus: serf.StatusFailed},
|
||||
expected: false,
|
||||
},
|
||||
// Old value for lastTerm
|
||||
{
|
||||
health: ServerHealth{SerfStatus: serf.StatusAlive, LastTerm: 0},
|
||||
lastTerm: 1,
|
||||
expected: false,
|
||||
},
|
||||
// Too far behind on logs
|
||||
{
|
||||
health: ServerHealth{SerfStatus: serf.StatusAlive, LastIndex: 0},
|
||||
lastIndex: 10,
|
||||
conf: AutopilotConfig{MaxTrailingLogs: 5},
|
||||
expected: false,
|
||||
},
|
||||
}
|
||||
|
||||
for index, tc := range cases {
|
||||
actual := tc.health.IsHealthy(tc.lastTerm, tc.lastIndex, &tc.conf)
|
||||
if actual != tc.expected {
|
||||
t.Fatalf("bad value for case %d: %v", index, actual)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestServerHealth_IsStable(t *testing.T) {
|
||||
start := time.Now()
|
||||
cases := []struct {
|
||||
health *ServerHealth
|
||||
now time.Time
|
||||
conf AutopilotConfig
|
||||
expected bool
|
||||
}{
|
||||
// Healthy server, all values within allowed limits
|
||||
{
|
||||
health: &ServerHealth{Healthy: true, StableSince: start},
|
||||
now: start.Add(15 * time.Second),
|
||||
conf: AutopilotConfig{ServerStabilizationTime: 10 * time.Second},
|
||||
expected: true,
|
||||
},
|
||||
// Unhealthy server
|
||||
{
|
||||
health: &ServerHealth{Healthy: false},
|
||||
expected: false,
|
||||
},
|
||||
// Healthy server, hasn't reached stabilization time
|
||||
{
|
||||
health: &ServerHealth{Healthy: true, StableSince: start},
|
||||
now: start.Add(5 * time.Second),
|
||||
conf: AutopilotConfig{ServerStabilizationTime: 10 * time.Second},
|
||||
expected: false,
|
||||
},
|
||||
// Nil struct
|
||||
{
|
||||
health: nil,
|
||||
expected: false,
|
||||
},
|
||||
}
|
||||
|
||||
for index, tc := range cases {
|
||||
actual := tc.health.IsStable(tc.now, &tc.conf)
|
||||
if actual != tc.expected {
|
||||
t.Fatalf("bad value for case %d: %v", index, actual)
|
||||
}
|
||||
}
|
||||
}
|
@ -65,6 +65,7 @@ type TestServerConfig struct {
|
||||
Bind string `json:"bind_addr,omitempty"`
|
||||
Addresses *TestAddressConfig `json:"addresses,omitempty"`
|
||||
Ports *TestPortConfig `json:"ports,omitempty"`
|
||||
RaftProtocol int `json:"raft_protocol,omitempty"`
|
||||
ACLMasterToken string `json:"acl_master_token,omitempty"`
|
||||
ACLDatacenter string `json:"acl_datacenter,omitempty"`
|
||||
ACLDefaultPolicy string `json:"acl_default_policy,omitempty"`
|
||||
|
@ -289,9 +289,9 @@ A JSON body is returned that looks like this:
|
||||
```javascript
|
||||
{
|
||||
"CleanupDeadServers": true,
|
||||
"LastContactThreshold": 200000000,
|
||||
"LastContactThreshold": "200ms",
|
||||
"MaxTrailingLogs": 250,
|
||||
"ServerStabilizationTime": 10000000000,
|
||||
"ServerStabilizationTime": "10s",
|
||||
"CreateIndex": 4,
|
||||
"ModifyIndex": 4
|
||||
}
|
||||
@ -318,9 +318,9 @@ body must look like:
|
||||
```javascript
|
||||
{
|
||||
"CleanupDeadServers": true,
|
||||
"LastContactThreshold": 200000000,
|
||||
"LastContactThreshold": "200ms",
|
||||
"MaxTrailingLogs": 250,
|
||||
"ServerStabilizationTime": 10000000000,
|
||||
"ServerStabilizationTime": "10s",
|
||||
"CreateIndex": 4,
|
||||
"ModifyIndex": 4
|
||||
}
|
||||
@ -361,7 +361,7 @@ A JSON body is returned that looks like this:
|
||||
"ID": "e349749b-3303-3ddf-959c-b5885a0e1f6e",
|
||||
"Name": "node1",
|
||||
"SerfStatus": "alive",
|
||||
"LastContact": "leader",
|
||||
"LastContact": "0s",
|
||||
"LastTerm": 2,
|
||||
"LastIndex": 46,
|
||||
"Healthy": true,
|
||||
|
@ -562,7 +562,8 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
|
||||
The following sub-keys are available:
|
||||
|
||||
* <a name="cleanup_dead_servers"></a><a href="#cleanup_dead_servers">`cleanup_dead_servers`</a> - This controls
|
||||
the automatic removal of dead server nodes whenever a new server is added to the cluster. Defaults to `true`.
|
||||
the automatic removal of dead server nodes periodically and whenever a new server is added to the cluster.
|
||||
Defaults to `true`.
|
||||
|
||||
* <a name="last_contact_threshold"></a><a href="#last_contact_threshold">`last_contact_threshold`</a> - Controls
|
||||
the maximum amount of time a server can go without contact from the leader before being considered unhealthy.
|
||||
@ -575,7 +576,7 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
|
||||
* <a name="server_stabilization_time"></a><a href="#server_stabilization_time">`server_stabilization_time`</a> -
|
||||
Controls the minimum amount of time a server must be stable in the 'healthy' state before being added to the
|
||||
cluster. Only takes effect if all servers are running Raft protocol version 3 or higher. Must be a duration value
|
||||
such as `10s`. Defaults to `30s`.
|
||||
such as `30s`. Defaults to `10s`.
|
||||
|
||||
* <a name="bootstrap"></a><a href="#bootstrap">`bootstrap`</a> Equivalent to the
|
||||
[`-bootstrap` command-line flag](#_bootstrap).
|
||||
|
@ -62,7 +62,7 @@ Usage: `consul operator autopilot set-config [options]`
|
||||
upon the successful joining of new servers to the cluster. Must be one of `[true|false]`.
|
||||
|
||||
* `last-contact-threshold` - Controls the maximum amount of time a server can go without contact
|
||||
from the leader before being considered unhealthy. Must be a duration value such as `10s`.
|
||||
from the leader before being considered unhealthy. Must be a duration value such as `200ms`.
|
||||
|
||||
* `max-trailing-logs` - Controls the maximum number of log entries that a server can trail
|
||||
the leader by before being considered unhealthy.
|
||||
|
Loading…
x
Reference in New Issue
Block a user