Merge pull request #2771 from hashicorp/f-autopilot

Autopilot dead server cleanup, config, and raft version compatibility
This commit is contained in:
Kyle Havlovitz 2017-02-28 15:04:16 -08:00 committed by GitHub
commit ab6c49ab4c
37 changed files with 1691 additions and 45 deletions

View File

@ -1,5 +1,13 @@
package api
import (
"bytes"
"fmt"
"io"
"strconv"
"strings"
)
// Operator can be used to perform low-level operator tasks for Consul.
type Operator struct {
c *Client
@ -63,6 +71,25 @@ type KeyringResponse struct {
NumNodes int
}
// AutopilotConfiguration is used for querying/setting the Autopilot configuration.
// Autopilot helps manage operator tasks related to Consul servers like removing
// failed servers from the Raft quorum.
type AutopilotConfiguration struct {
// CleanupDeadServers controls whether to remove dead servers from the Raft
// peer list when a new server joins
CleanupDeadServers bool
// CreateIndex holds the index corresponding the creation of this configuration.
// This is a read-only field.
CreateIndex uint64
// ModifyIndex will be set to the index of the last update when retrieving the
// Autopilot configuration. Resubmitting a configuration with
// AutopilotCASConfiguration will perform a check-and-set operation which ensures
// there hasn't been a subsequent update since the configuration was retrieved.
ModifyIndex uint64
}
// RaftGetConfiguration is used to query the current Raft peer set.
func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) {
r := op.c.newRequest("GET", "/v1/operator/raft/configuration")
@ -161,3 +188,56 @@ func (op *Operator) KeyringUse(key string, q *WriteOptions) error {
resp.Body.Close()
return nil
}
// AutopilotGetConfiguration is used to query the current Autopilot configuration.
func (op *Operator) AutopilotGetConfiguration(q *QueryOptions) (*AutopilotConfiguration, error) {
r := op.c.newRequest("GET", "/v1/operator/autopilot/configuration")
r.setQueryOptions(q)
_, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var out AutopilotConfiguration
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return &out, nil
}
// AutopilotSetConfiguration is used to set the current Autopilot configuration.
func (op *Operator) AutopilotSetConfiguration(conf *AutopilotConfiguration, q *WriteOptions) error {
r := op.c.newRequest("PUT", "/v1/operator/autopilot/configuration")
r.setWriteOptions(q)
r.obj = conf
_, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}
// AutopilotCASConfiguration is used to perform a Check-And-Set update on the
// Autopilot configuration. The ModifyIndex value will be respected. Returns
// true on success or false on failures.
func (op *Operator) AutopilotCASConfiguration(conf *AutopilotConfiguration, q *WriteOptions) (bool, error) {
r := op.c.newRequest("PUT", "/v1/operator/autopilot/configuration")
r.setWriteOptions(q)
r.params.Set("cas", strconv.FormatUint(conf.ModifyIndex, 10))
r.obj = conf
_, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return false, err
}
defer resp.Body.Close()
var buf bytes.Buffer
if _, err := io.Copy(&buf, resp.Body); err != nil {
return false, fmt.Errorf("Failed to read response: %v", err)
}
res := strings.Contains(string(buf.Bytes()), "true")
return res, nil
}

View File

@ -104,3 +104,77 @@ func TestOperator_KeyringInstallListPutRemove(t *testing.T) {
}
}
}
func TestOperator_AutopilotGetSetConfiguration(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
operator := c.Operator()
config, err := operator.AutopilotGetConfiguration(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if !config.CleanupDeadServers {
t.Fatalf("bad: %v", config)
}
// Change a config setting
newConf := &AutopilotConfiguration{CleanupDeadServers: false}
if err := operator.AutopilotSetConfiguration(newConf, nil); err != nil {
t.Fatalf("err: %v", err)
}
config, err = operator.AutopilotGetConfiguration(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if config.CleanupDeadServers {
t.Fatalf("bad: %v", config)
}
}
func TestOperator_AutopilotCASConfiguration(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
operator := c.Operator()
config, err := operator.AutopilotGetConfiguration(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if !config.CleanupDeadServers {
t.Fatalf("bad: %v", config)
}
// Pass an invalid ModifyIndex
{
newConf := &AutopilotConfiguration{
CleanupDeadServers: false,
ModifyIndex: config.ModifyIndex - 1,
}
resp, err := operator.AutopilotCASConfiguration(newConf, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp {
t.Fatalf("bad: %v", resp)
}
}
// Pass a valid ModifyIndex
{
newConf := &AutopilotConfiguration{
CleanupDeadServers: false,
ModifyIndex: config.ModifyIndex,
}
resp, err := operator.AutopilotCASConfiguration(newConf, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if !resp {
t.Fatalf("bad: %v", resp)
}
}
}

View File

@ -25,6 +25,7 @@ import (
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-sockaddr/template"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
"github.com/shirou/gopsutil/host"
@ -382,6 +383,9 @@ func (a *Agent) consulConfig() *consul.Config {
if a.config.Protocol > 0 {
base.ProtocolVersion = uint8(a.config.Protocol)
}
if a.config.RaftProtocol != 0 {
base.RaftConfig.ProtocolVersion = raft.ProtocolVersion(a.config.RaftProtocol)
}
if a.config.ACLToken != "" {
base.ACLToken = a.config.ACLToken
}
@ -412,6 +416,9 @@ func (a *Agent) consulConfig() *consul.Config {
if a.config.SessionTTLMinRaw != "" {
base.SessionTTLMin = a.config.SessionTTLMin
}
if a.config.Autopilot.CleanupDeadServers != nil {
base.AutopilotConfig.CleanupDeadServers = *a.config.Autopilot.CleanupDeadServers
}
// Format the build string
revision := a.config.Revision

View File

@ -260,6 +260,13 @@ type Telemetry struct {
CirconusBrokerSelectTag string `mapstructure:"circonus_broker_select_tag"`
}
// Autopilot is used to configure helpful features for operating Consul servers.
type Autopilot struct {
// CleanupDeadServers enables the automatic cleanup of dead servers when new ones
// are added to the peer list. Defaults to true.
CleanupDeadServers *bool `mapstructure:"cleanup_dead_servers"`
}
// Config is the configuration that can be set for an Agent.
// Some of this is configurable as CLI flags, but most must
// be set using a configuration file.
@ -385,11 +392,17 @@ type Config struct {
// servers. This can be changed on reload.
SkipLeaveOnInt *bool `mapstructure:"skip_leave_on_interrupt"`
// Autopilot is used to configure helpful features for operating Consul servers.
Autopilot Autopilot `mapstructure:"autopilot"`
Telemetry Telemetry `mapstructure:"telemetry"`
// Protocol is the Consul protocol version to use.
Protocol int `mapstructure:"protocol"`
// RaftProtocol sets the Raft protocol version to use on this server.
RaftProtocol int `mapstructure:"raft_protocol"`
// EnableDebug is used to enable various debugging features
EnableDebug bool `mapstructure:"enable_debug"`
@ -1280,6 +1293,9 @@ func MergeConfig(a, b *Config) *Config {
if b.Protocol > 0 {
result.Protocol = b.Protocol
}
if b.RaftProtocol != 0 {
result.RaftProtocol = b.RaftProtocol
}
if b.NodeID != "" {
result.NodeID = b.NodeID
}
@ -1328,6 +1344,9 @@ func MergeConfig(a, b *Config) *Config {
if b.SkipLeaveOnInt != nil {
result.SkipLeaveOnInt = b.SkipLeaveOnInt
}
if b.Autopilot.CleanupDeadServers != nil {
result.Autopilot.CleanupDeadServers = b.Autopilot.CleanupDeadServers
}
if b.Telemetry.DisableHostname == true {
result.Telemetry.DisableHostname = true
}

View File

@ -281,6 +281,17 @@ func TestDecodeConfig(t *testing.T) {
t.Fatalf("bad: %#v", config)
}
// raft protocol
input = `{"raft_protocol": 3}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
}
if config.RaftProtocol != 3 {
t.Fatalf("bad: %#v", config)
}
// Node metadata fields
input = `{"node_meta": {"thing1": "1", "thing2": "2"}}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
@ -1091,6 +1102,17 @@ func TestDecodeConfig_Performance(t *testing.T) {
}
}
func TestDecodeConfig_Autopilot(t *testing.T) {
input := `{"autopilot": { "cleanup_dead_servers": true }}`
config, err := DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
}
if config.Autopilot.CleanupDeadServers == nil || !*config.Autopilot.CleanupDeadServers {
t.Fatalf("bad: cleanup_dead_servers isn't set: %#v", config)
}
}
func TestDecodeConfig_Services(t *testing.T) {
input := `{
"services": [
@ -1602,9 +1624,13 @@ func TestMergeConfig(t *testing.T) {
HTTP: "127.0.0.2",
HTTPS: "127.0.0.4",
},
Server: true,
LeaveOnTerm: Bool(true),
SkipLeaveOnInt: Bool(true),
Server: true,
LeaveOnTerm: Bool(true),
SkipLeaveOnInt: Bool(true),
RaftProtocol: 3,
Autopilot: Autopilot{
CleanupDeadServers: Bool(true),
},
EnableDebug: true,
VerifyIncoming: true,
VerifyOutgoing: true,

View File

@ -297,6 +297,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.handleFuncMetrics("/v1/operator/raft/configuration", s.wrap(s.OperatorRaftConfiguration))
s.handleFuncMetrics("/v1/operator/raft/peer", s.wrap(s.OperatorRaftPeer))
s.handleFuncMetrics("/v1/operator/keyring", s.wrap(s.OperatorKeyringEndpoint))
s.handleFuncMetrics("/v1/operator/autopilot/configuration", s.wrap(s.OperatorAutopilotConfiguration))
s.handleFuncMetrics("/v1/query", s.wrap(s.PreparedQueryGeneral))
s.handleFuncMetrics("/v1/query/", s.wrap(s.PreparedQuerySpecific))
s.handleFuncMetrics("/v1/session/create", s.wrap(s.SessionCreate))

View File

@ -3,11 +3,11 @@ package agent
import (
"fmt"
"net/http"
"strconv"
"github.com/hashicorp/consul/consul/structs"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/raft"
"strconv"
)
// OperatorRaftConfiguration is used to inspect the current Raft configuration.
@ -105,7 +105,7 @@ func (s *HTTPServer) OperatorKeyringEndpoint(resp http.ResponseWriter, req *http
case "DELETE":
return s.KeyringRemove(resp, req, &args)
default:
resp.WriteHeader(405)
resp.WriteHeader(http.StatusMethodNotAllowed)
return nil, nil
}
}
@ -166,3 +166,61 @@ func keyringErrorsOrNil(responses []*structs.KeyringResponse) error {
}
return errs
}
// OperatorAutopilotConfiguration is used to inspect the current Autopilot configuration.
// This supports the stale query mode in case the cluster doesn't have a leader.
func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Switch on the method
switch req.Method {
case "GET":
var args structs.DCSpecificRequest
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
var reply structs.AutopilotConfig
if err := s.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
return nil, err
}
return reply, nil
case "PUT":
var args structs.AutopilotSetConfigRequest
s.parseDC(req, &args.Datacenter)
s.parseToken(req, &args.Token)
// Check for cas value
params := req.URL.Query()
if _, ok := params["cas"]; ok {
casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64)
if err != nil {
resp.WriteHeader(400)
resp.Write([]byte(fmt.Sprintf("Error parsing cas value: %v", err)))
return nil, nil
}
args.Config.ModifyIndex = casVal
args.CAS = true
}
if err := decodeBody(req, &args.Config, nil); err != nil {
resp.WriteHeader(400)
resp.Write([]byte(fmt.Sprintf("Error parsing autopilot config: %v", err)))
return nil, nil
}
var reply bool
if err := s.agent.RPC("Operator.AutopilotSetConfiguration", &args, &reply); err != nil {
return nil, err
}
// Only use the out value if this was a CAS
if !args.CAS {
return true, nil
} else {
return reply, nil
}
default:
resp.WriteHeader(http.StatusMethodNotAllowed)
return nil, nil
}
}

View File

@ -285,3 +285,138 @@ func TestOperator_Keyring_InvalidRelayFactor(t *testing.T) {
}
}, configFunc)
}
func TestOperator_AutopilotGetConfiguration(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("GET", "/v1/operator/autopilot/configuration", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.OperatorAutopilotConfiguration(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
out, ok := obj.(structs.AutopilotConfig)
if !ok {
t.Fatalf("unexpected: %T", obj)
}
if !out.CleanupDeadServers {
t.Fatalf("bad: %#v", out)
}
})
}
func TestOperator_AutopilotSetConfiguration(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer([]byte(`{"CleanupDeadServers": false}`))
req, err := http.NewRequest("PUT", "/v1/operator/autopilot/configuration", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
if _, err = srv.OperatorAutopilotConfiguration(resp, req); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
args := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var reply structs.AutopilotConfig
if err := srv.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if reply.CleanupDeadServers {
t.Fatalf("bad: %#v", reply)
}
})
}
func TestOperator_AutopilotCASConfiguration(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer([]byte(`{"CleanupDeadServers": false}`))
req, err := http.NewRequest("PUT", "/v1/operator/autopilot/configuration", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
if _, err = srv.OperatorAutopilotConfiguration(resp, req); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
args := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var reply structs.AutopilotConfig
if err := srv.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if reply.CleanupDeadServers {
t.Fatalf("bad: %#v", reply)
}
// Create a CAS request, bad index
{
buf := bytes.NewBuffer([]byte(`{"CleanupDeadServers": true}`))
req, err := http.NewRequest("PUT",
fmt.Sprintf("/v1/operator/autopilot/configuration?cas=%d", reply.ModifyIndex-1), buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.OperatorAutopilotConfiguration(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if res := obj.(bool); res {
t.Fatalf("should NOT work")
}
}
// Create a CAS request, good index
{
buf := bytes.NewBuffer([]byte(`{"CleanupDeadServers": true}`))
req, err := http.NewRequest("PUT",
fmt.Sprintf("/v1/operator/autopilot/configuration?cas=%d", reply.ModifyIndex), buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.OperatorAutopilotConfiguration(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if res := obj.(bool); !res {
t.Fatalf("should work")
}
}
// Verify the update
if err := srv.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if !reply.CleanupDeadServers {
t.Fatalf("bad: %#v", reply)
}
})
}

View File

@ -0,0 +1,32 @@
package command
import (
"strings"
"github.com/hashicorp/consul/command/base"
"github.com/mitchellh/cli"
)
type OperatorAutopilotCommand struct {
base.Command
}
func (c *OperatorAutopilotCommand) Help() string {
helpText := `
Usage: consul operator autopilot <subcommand> [options]
The Autopilot operator command is used to interact with Consul's Autopilot
subsystem. The command can be used to view or modify the current configuration.
`
return strings.TrimSpace(helpText)
}
func (c *OperatorAutopilotCommand) Synopsis() string {
return "Provides tools for modifying Autopilot configuration"
}
func (c *OperatorAutopilotCommand) Run(args []string) int {
return cli.RunResultHelp
}

View File

@ -0,0 +1,61 @@
package command
import (
"flag"
"fmt"
"strings"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/command/base"
)
type OperatorAutopilotGetCommand struct {
base.Command
}
func (c *OperatorAutopilotGetCommand) Help() string {
helpText := `
Usage: consul operator autopilot get-config [options]
Displays the current Autopilot configuration.
` + c.Command.Help()
return strings.TrimSpace(helpText)
}
func (c *OperatorAutopilotGetCommand) Synopsis() string {
return "Display the current Autopilot configuration"
}
func (c *OperatorAutopilotGetCommand) Run(args []string) int {
c.Command.NewFlagSet(c)
if err := c.Command.Parse(args); err != nil {
if err == flag.ErrHelp {
return 0
}
c.Ui.Error(fmt.Sprintf("Failed to parse args: %v", err))
return 1
}
// Set up a client.
client, err := c.Command.HTTPClient()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}
// Fetch the current configuration.
opts := &api.QueryOptions{
AllowStale: c.Command.HTTPStale(),
}
config, err := client.Operator().AutopilotGetConfiguration(opts)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying Autopilot configuration: %s", err))
return 1
}
c.Ui.Output(fmt.Sprintf("CleanupDeadServers = %v", config.CleanupDeadServers))
return 0
}

View File

@ -0,0 +1,37 @@
package command
import (
"strings"
"testing"
"github.com/hashicorp/consul/command/base"
"github.com/mitchellh/cli"
)
func TestOperator_Autopilot_Get_Implements(t *testing.T) {
var _ cli.Command = &OperatorAutopilotGetCommand{}
}
func TestOperator_Autopilot_Get(t *testing.T) {
a1 := testAgent(t)
defer a1.Shutdown()
waitForLeader(t, a1.httpAddr)
ui := new(cli.MockUi)
c := OperatorAutopilotGetCommand{
Command: base.Command{
Ui: ui,
Flags: base.FlagSetHTTP,
},
}
args := []string{"-http-addr=" + a1.httpAddr}
code := c.Run(args)
if code != 0 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
output := strings.TrimSpace(ui.OutputWriter.String())
if !strings.Contains(output, "CleanupDeadServers = true") {
t.Fatalf("bad: %s", output)
}
}

View File

@ -0,0 +1,78 @@
package command
import (
"flag"
"fmt"
"strings"
"github.com/hashicorp/consul/command/base"
)
type OperatorAutopilotSetCommand struct {
base.Command
}
func (c *OperatorAutopilotSetCommand) Help() string {
helpText := `
Usage: consul operator autopilot set-config [options]
Modifies the current Autopilot configuration.
` + c.Command.Help()
return strings.TrimSpace(helpText)
}
func (c *OperatorAutopilotSetCommand) Synopsis() string {
return "Modify the current Autopilot configuration"
}
func (c *OperatorAutopilotSetCommand) Run(args []string) int {
var cleanupDeadServers base.BoolValue
f := c.Command.NewFlagSet(c)
f.Var(&cleanupDeadServers, "cleanup-dead-servers",
"Controls whether Consul will automatically remove dead servers "+
"when new ones are successfully added. Must be one of `true|false`.")
if err := c.Command.Parse(args); err != nil {
if err == flag.ErrHelp {
return 0
}
c.Ui.Error(fmt.Sprintf("Failed to parse args: %v", err))
return 1
}
// Set up a client.
client, err := c.Command.HTTPClient()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}
// Fetch the current configuration.
operator := client.Operator()
conf, err := operator.AutopilotGetConfiguration(nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying Autopilot configuration: %s", err))
return 1
}
// Update the config values.
cleanupDeadServers.Merge(&conf.CleanupDeadServers)
// Check-and-set the new configuration.
result, err := operator.AutopilotCASConfiguration(conf, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error setting Autopilot configuration: %s", err))
return 1
}
if result {
c.Ui.Output("Configuration updated!")
return 0
} else {
c.Ui.Output("Configuration could not be atomically updated, please try again")
return 1
}
}

View File

@ -0,0 +1,50 @@
package command
import (
"strings"
"testing"
"github.com/hashicorp/consul/command/base"
"github.com/hashicorp/consul/consul/structs"
"github.com/mitchellh/cli"
)
func TestOperator_Autopilot_Set_Implements(t *testing.T) {
var _ cli.Command = &OperatorAutopilotSetCommand{}
}
func TestOperator_Autopilot_Set(t *testing.T) {
a1 := testAgent(t)
defer a1.Shutdown()
waitForLeader(t, a1.httpAddr)
ui := new(cli.MockUi)
c := OperatorAutopilotSetCommand{
Command: base.Command{
Ui: ui,
Flags: base.FlagSetHTTP,
},
}
args := []string{"-http-addr=" + a1.httpAddr, "-cleanup-dead-servers=false"}
code := c.Run(args)
if code != 0 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
output := strings.TrimSpace(ui.OutputWriter.String())
if !strings.Contains(output, "Configuration updated") {
t.Fatalf("bad: %s", output)
}
req := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var reply structs.AutopilotConfig
if err := a1.agent.RPC("Operator.AutopilotGetConfiguration", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if reply.CleanupDeadServers {
t.Fatalf("bad: %#v", reply)
}
}

View File

@ -0,0 +1,11 @@
package command
import (
"testing"
"github.com/mitchellh/cli"
)
func TestOperator_Autopilot_Implements(t *testing.T) {
var _ cli.Command = &OperatorAutopilotCommand{}
}

View File

@ -216,6 +216,33 @@ func init() {
}, nil
},
"operator autopilot": func() (cli.Command, error) {
return &command.OperatorAutopilotCommand{
Command: base.Command{
Flags: base.FlagSetNone,
Ui: ui,
},
}, nil
},
"operator autopilot get-config": func() (cli.Command, error) {
return &command.OperatorAutopilotGetCommand{
Command: base.Command{
Flags: base.FlagSetHTTP,
Ui: ui,
},
}, nil
},
"operator autopilot set-config": func() (cli.Command, error) {
return &command.OperatorAutopilotSetCommand{
Command: base.Command{
Flags: base.FlagSetHTTP,
Ui: ui,
},
}, nil
},
"operator raft": func() (cli.Command, error) {
return &command.OperatorRaftCommand{
Command: base.Command{

View File

@ -25,13 +25,15 @@ func (k *Key) Equal(x *Key) bool {
// Server is used to return details of a consul server
type Server struct {
Name string
Datacenter string
Port int
Bootstrap bool
Expect int
Version int
Addr net.Addr
Name string
ID string
Datacenter string
Port int
Bootstrap bool
Expect int
Version int
RaftVersion int
Addr net.Addr
}
// Key returns the corresponding Key
@ -84,16 +86,24 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
return false, nil
}
raft_vsn_str := m.Tags["raft_vsn"]
raft_vsn, err := strconv.Atoi(raft_vsn_str)
if err != nil {
return false, nil
}
addr := &net.TCPAddr{IP: m.Addr, Port: port}
parts := &Server{
Name: m.Name,
Datacenter: datacenter,
Port: port,
Bootstrap: bootstrap,
Expect: expect,
Addr: addr,
Version: vsn,
Name: m.Name,
ID: m.Tags["id"],
Datacenter: datacenter,
Port: port,
Bootstrap: bootstrap,
Expect: expect,
Addr: addr,
Version: vsn,
RaftVersion: raft_vsn,
}
return true, parts
}

View File

@ -55,10 +55,12 @@ func TestIsConsulServer(t *testing.T) {
Name: "foo",
Addr: net.IP([]byte{127, 0, 0, 1}),
Tags: map[string]string{
"role": "consul",
"dc": "east-aws",
"port": "10000",
"vsn": "1",
"role": "consul",
"id": "asdf",
"dc": "east-aws",
"port": "10000",
"vsn": "1",
"raft_vsn": "3",
},
}
ok, parts := agent.IsConsulServer(m)
@ -68,12 +70,18 @@ func TestIsConsulServer(t *testing.T) {
if parts.Name != "foo" {
t.Fatalf("bad: %v", parts)
}
if parts.ID != "asdf" {
t.Fatalf("bad: %v", parts.ID)
}
if parts.Bootstrap {
t.Fatalf("unexpected bootstrap")
}
if parts.Expect != 0 {
t.Fatalf("bad: %v", parts.Expect)
}
if parts.RaftVersion != 3 {
t.Fatalf("bad: %v", parts.RaftVersion)
}
m.Tags["bootstrap"] = "1"
m.Tags["disabled"] = "1"
ok, parts = agent.IsConsulServer(m)

View File

@ -7,6 +7,7 @@ import (
"os"
"time"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/memberlist"
@ -274,6 +275,10 @@ type Config struct {
// This period is meant to be long enough for a leader election to take
// place, and a small jitter is applied to avoid a thundering herd.
RPCHoldTimeout time.Duration
// AutopilotConfig is used to apply the initial autopilot config when
// bootstrapping.
AutopilotConfig *structs.AutopilotConfig
}
// CheckVersion is used to check if the ProtocolVersion is valid
@ -346,6 +351,10 @@ func DefaultConfig() *Config {
RPCHoldTimeout: 7 * time.Second,
TLSMinVersion: "tls10",
AutopilotConfig: &structs.AutopilotConfig{
CleanupDeadServers: true,
},
}
// Increase our reap interval to 3 days instead of 24h.
@ -360,9 +369,10 @@ func DefaultConfig() *Config {
conf.SerfLANConfig.MemberlistConfig.BindPort = DefaultLANSerfPort
conf.SerfWANConfig.MemberlistConfig.BindPort = DefaultWANSerfPort
// Enable interoperability with unversioned Raft library, and don't
// start using new ID-based features yet.
conf.RaftConfig.ProtocolVersion = 1
// TODO: default to 3 in Consul 0.9
// Use a transitional version of the raft protocol to interoperate with
// versions 1 and 3
conf.RaftConfig.ProtocolVersion = 2
conf.ScaleRaft(DefaultRaftMultiplier)
// Disable shutdown on removal

View File

@ -105,6 +105,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
return c.applyPreparedQueryOperation(buf[1:], log.Index)
case structs.TxnRequestType:
return c.applyTxn(buf[1:], log.Index)
case structs.AutopilotRequestType:
return c.applyAutopilotUpdate(buf[1:], log.Index)
default:
if ignoreUnknown {
c.logger.Printf("[WARN] consul.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
@ -310,6 +312,25 @@ func (c *consulFSM) applyTxn(buf []byte, index uint64) interface{} {
return structs.TxnResponse{results, errors}
}
func (c *consulFSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} {
var req structs.AutopilotSetConfigRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "autopilot"}, time.Now())
if req.CAS {
act, err := c.state.AutopilotCASConfig(index, req.Config.ModifyIndex, &req.Config)
if err != nil {
return err
} else {
return act
}
} else {
return c.state.AutopilotSetConfig(index, &req.Config)
}
}
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
defer func(start time.Time) {
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))

View File

@ -152,6 +152,13 @@ func (s *Server) establishLeadership() error {
err)
return err
}
// Setup autopilot config if we are the leader and need to
if err := s.initializeAutopilot(); err != nil {
s.logger.Printf("[ERR] consul: Autopilot initialization failed: %v", err)
return err
}
return nil
}
@ -237,6 +244,29 @@ func (s *Server) initializeACL() error {
return nil
}
// initializeAutopilot is used to setup the autopilot config if we are
// the leader and need to do this
func (s *Server) initializeAutopilot() error {
// Bail if the config has already been initialized
state := s.fsm.State()
_, config, err := state.AutopilotConfig()
if err != nil {
return fmt.Errorf("failed to get autopilot config: %v", err)
}
if config != nil {
return nil
}
req := structs.AutopilotSetConfigRequest{
Config: *s.config.AutopilotConfig,
}
if _, err = s.raftApply(structs.AutopilotRequestType, req); err != nil {
return fmt.Errorf("failed to initialize autopilot config")
}
return nil
}
// reconcile is used to reconcile the differences between Serf
// membership and what is reflected in our strongly consistent store.
// Mainly we need to ensure all live nodes are registered, all failed
@ -562,11 +592,42 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error {
}
// Attempt to add as a peer
addFuture := s.raft.AddPeer(raft.ServerAddress(addr))
if err := addFuture.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
if err != nil {
return err
}
if minRaftProtocol >= 2 && parts.RaftVersion >= 3 {
addFuture := s.raft.AddVoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0)
if err := addFuture.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
return err
}
} else {
addFuture := s.raft.AddPeer(raft.ServerAddress(addr))
if err := addFuture.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
return err
}
}
state := s.fsm.State()
_, autopilotConf, err := state.AutopilotConfig()
if err != nil {
return err
}
// Look for dead servers to clean up
if autopilotConf.CleanupDeadServers {
for _, member := range s.serfLAN.Members() {
valid, _ := agent.IsConsulServer(member)
if valid && member.Name != m.Name && member.Status == serf.StatusFailed {
s.logger.Printf("[INFO] consul: Attempting removal of failed server: %v", member.Name)
go s.serfLAN.RemoveFailedNode(member.Name)
}
}
}
return nil
}
@ -583,21 +644,39 @@ func (s *Server) removeConsulServer(m serf.Member, port int) error {
s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err)
return err
}
for _, server := range configFuture.Configuration().Servers {
if server.Address == raft.ServerAddress(addr) {
goto REMOVE
}
}
return nil
REMOVE:
// Attempt to remove as a peer.
future := s.raft.RemovePeer(raft.ServerAddress(addr))
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v",
addr, err)
minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
if err != nil {
return err
}
_, parts := agent.IsConsulServer(m)
// Pick which remove API to use based on how the server was added.
for _, server := range configFuture.Configuration().Servers {
// If we understand the new add/remove APIs and the server was added by ID, use the new remove API
if minRaftProtocol >= 2 && server.ID == raft.ServerID(parts.ID) {
s.logger.Printf("[INFO] consul: removing server by ID: %q", server.ID)
future := s.raft.RemoveServer(raft.ServerID(parts.ID), 0, 0)
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v",
server.ID, err)
return err
}
break
} else if server.Address == raft.ServerAddress(addr) {
// If not, use the old remove API
s.logger.Printf("[INFO] consul: removing server by address: %q", server.Address)
future := s.raft.RemovePeer(raft.ServerAddress(addr))
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v",
addr, err)
return err
}
break
}
}
return nil
}

View File

@ -622,3 +622,162 @@ func TestLeader_ReapTombstones(t *testing.T) {
t.Fatalf("err: %v", err)
})
}
func TestLeader_CleanupDeadServers(t *testing.T) {
dir1, s1 := testServerDCBootstrap(t, "dc1", true)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
if _, err := s3.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
return peers == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})
}
// Kill a non-leader server
s2.Shutdown()
testutil.WaitForResult(func() (bool, error) {
alive := 0
for _, m := range s1.LANMembers() {
if m.Status == serf.StatusAlive {
alive++
}
}
return alive == 2, nil
}, func(err error) {
t.Fatalf("should have 2 alive members")
})
// Bring up and join a new server
dir4, s4 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir4)
defer s4.Shutdown()
if _, err := s4.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
servers[1] = s4
// Make sure the dead server is removed and we're back to 3 total peers
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
return peers == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})
}
}
func TestLeader_RollRaftServer(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = true
c.Datacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 1
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
if _, err := s3.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
return peers == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})
}
// Kill the v1 server
s2.Shutdown()
for _, s := range []*Server{s1, s3} {
testutil.WaitForResult(func() (bool, error) {
minVer, err := ServerMinRaftProtocol(s.LANMembers())
return minVer == 2, err
}, func(err error) {
t.Fatalf("minimum protocol version among servers should be 2")
})
}
// Replace the dead server with one running raft protocol v3
dir4, s4 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 3
})
defer os.RemoveAll(dir4)
defer s4.Shutdown()
if _, err := s4.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
servers[1] = s4
// Make sure the dead server is removed and we're back to 3 total peers
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
addrs := 0
ids := 0
future := s.raft.GetConfiguration()
if err := future.Error(); err != nil {
return false, err
}
for _, server := range future.Configuration().Servers {
if string(server.ID) == string(server.Address) {
addrs++
} else {
ids++
}
}
return addrs == 2 && ids == 1, nil
}, func(err error) {
t.Fatalf("should see 2 legacy IDs and 1 GUID")
})
}
}

View File

@ -125,3 +125,61 @@ REMOVE:
op.srv.logger.Printf("[WARN] consul.operator: Removed Raft peer %q", args.Address)
return nil
}
// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration.
func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *structs.AutopilotConfig) error {
if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done {
return err
}
// This action requires operator read access.
acl, err := op.srv.resolveToken(args.Token)
if err != nil {
return err
}
if acl != nil && !acl.OperatorRead() {
return permissionDeniedErr
}
state := op.srv.fsm.State()
_, config, err := state.AutopilotConfig()
if err != nil {
return err
}
*reply = *config
return nil
}
// AutopilotSetConfiguration is used to set the current Autopilot configuration.
func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRequest, reply *bool) error {
if done, err := op.srv.forward("Operator.AutopilotSetConfiguration", args, args, reply); done {
return err
}
// This action requires operator write access.
acl, err := op.srv.resolveToken(args.Token)
if err != nil {
return err
}
if acl != nil && !acl.OperatorWrite() {
return permissionDeniedErr
}
// Apply the update
resp, err := op.srv.raftApply(structs.AutopilotRequestType, args)
if err != nil {
op.srv.logger.Printf("[ERR] consul.operator: Apply failed: %v", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
// Check if the return type is a bool.
if respBool, ok := resp.(bool); ok {
*reply = respBool
}
return nil
}

View File

@ -243,3 +243,186 @@ func TestOperator_RaftRemovePeerByAddress_ACLDeny(t *testing.T) {
t.Fatalf("err: %v", err)
}
}
func TestOperator_Autopilot_GetConfiguration(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.AutopilotConfig.CleanupDeadServers = false
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var reply structs.AutopilotConfig
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
if err != nil {
t.Fatalf("err: %v", err)
}
if reply.CleanupDeadServers {
t.Fatalf("bad: %#v", reply)
}
}
func TestOperator_Autopilot_GetConfiguration_ACLDeny(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
c.AutopilotConfig.CleanupDeadServers = false
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Try to get config without permissions
arg := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var reply structs.AutopilotConfig
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
t.Fatalf("err: %v", err)
}
// Create an ACL with operator read permissions.
var token string
{
var rules = `
operator = "read"
`
req := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: rules,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
t.Fatalf("err: %v", err)
}
}
// Now we can read and verify the config
arg.Token = token
err = msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
if err != nil {
t.Fatalf("err: %v", err)
}
if reply.CleanupDeadServers {
t.Fatalf("bad: %#v", reply)
}
}
func TestOperator_Autopilot_SetConfiguration(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.AutopilotConfig.CleanupDeadServers = false
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Change the autopilot config from the default
arg := structs.AutopilotSetConfigRequest{
Datacenter: "dc1",
Config: structs.AutopilotConfig{
CleanupDeadServers: true,
},
}
var reply *bool
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply)
if err != nil {
t.Fatalf("err: %v", err)
}
// Make sure it's changed
state := s1.fsm.State()
_, config, err := state.AutopilotConfig()
if err != nil {
t.Fatal(err)
}
if !config.CleanupDeadServers {
t.Fatalf("bad: %#v", config)
}
}
func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
c.AutopilotConfig.CleanupDeadServers = false
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Try to set config without permissions
arg := structs.AutopilotSetConfigRequest{
Datacenter: "dc1",
Config: structs.AutopilotConfig{
CleanupDeadServers: true,
},
}
var reply *bool
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply)
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
t.Fatalf("err: %v", err)
}
// Create an ACL with operator write permissions.
var token string
{
var rules = `
operator = "write"
`
req := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: rules,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
t.Fatalf("err: %v", err)
}
}
// Now we can update the config
arg.Token = token
err = msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply)
if err != nil {
t.Fatalf("err: %v", err)
}
// Make sure it's changed
state := s1.fsm.State()
_, config, err := state.AutopilotConfig()
if err != nil {
t.Fatal(err)
}
if !config.CleanupDeadServers {
t.Fatalf("bad: %#v", config)
}
}

View File

@ -280,11 +280,22 @@ func (s *Server) maybeBootstrap() {
// Attempt a live bootstrap!
var configuration raft.Configuration
var addrs []string
minRaftVersion, err := ServerMinRaftProtocol(members)
if err != nil {
s.logger.Printf("[ERR] consul: Failed to read server raft versions: %v", err)
}
for _, server := range servers {
addr := server.Addr.String()
addrs = append(addrs, addr)
var id raft.ServerID
if minRaftVersion >= 3 {
id = raft.ServerID(server.ID)
} else {
id = raft.ServerID(addr)
}
peer := raft.Server{
ID: raft.ServerID(addr),
ID: id,
Address: raft.ServerAddress(addr),
}
configuration.Servers = append(configuration.Servers, peer)

View File

@ -317,6 +317,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
conf.Tags["vsn"] = fmt.Sprintf("%d", s.config.ProtocolVersion)
conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin)
conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax)
conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion)
conf.Tags["build"] = s.config.Build
conf.Tags["port"] = fmt.Sprintf("%d", addr.Port)
if s.config.Bootstrap {
@ -378,9 +379,12 @@ func (s *Server) setupRaft() error {
// Make sure we set the LogOutput.
s.config.RaftConfig.LogOutput = s.config.LogOutput
// Our version of Raft protocol requires the LocalID to match the network
// Versions of the Raft protocol below 3 require the LocalID to match the network
// address of the transport.
s.config.RaftConfig.LocalID = raft.ServerID(trans.LocalAddr())
if s.config.RaftConfig.ProtocolVersion >= 3 {
s.config.RaftConfig.LocalID = raft.ServerID(s.config.NodeID)
}
// Build an all in-memory setup for dev mode, otherwise prepare a full
// disk-based setup.
@ -478,7 +482,7 @@ func (s *Server) setupRaft() error {
configuration := raft.Configuration{
Servers: []raft.Server{
raft.Server{
ID: raft.ServerID(trans.LocalAddr()),
ID: s.config.RaftConfig.LocalID,
Address: trans.LocalAddr(),
},
},

View File

@ -12,6 +12,8 @@ import (
"time"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-uuid"
)
var nextPort int32 = 15000
@ -46,6 +48,11 @@ func testServerConfig(t *testing.T, NodeName string) (string, *Config) {
IP: []byte{127, 0, 0, 1},
Port: getPort(),
}
nodeID, err := uuid.GenerateUUID()
if err != nil {
t.Fatal(err)
}
config.NodeID = types.NodeID(nodeID)
config.SerfLANConfig.MemberlistConfig.BindAddr = "127.0.0.1"
config.SerfLANConfig.MemberlistConfig.BindPort = getPort()
config.SerfLANConfig.MemberlistConfig.SuspicionMult = 2

86
consul/state/autopilot.go Normal file
View File

@ -0,0 +1,86 @@
package state
import (
"fmt"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb"
)
// AutopilotConfig is used to get the current Autopilot configuration.
func (s *StateStore) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the autopilot config
c, err := tx.First("autopilot-config", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed autopilot config lookup: %s", err)
}
config, ok := c.(*structs.AutopilotConfig)
if !ok {
return 0, nil, nil
}
return config.ModifyIndex, config, nil
}
// AutopilotSetConfig is used to set the current Autopilot configuration.
func (s *StateStore) AutopilotSetConfig(idx uint64, config *structs.AutopilotConfig) error {
tx := s.db.Txn(true)
defer tx.Abort()
s.autopilotSetConfigTxn(idx, tx, config)
tx.Commit()
return nil
}
// AutopilotCASConfig is used to try updating the Autopilot configuration with a
// given Raft index. If the CAS index specified is not equal to the last observed index
// for the config, then the call is a noop,
func (s *StateStore) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotConfig) (bool, error) {
tx := s.db.Txn(true)
defer tx.Abort()
// Check for an existing config
existing, err := tx.First("autopilot-config", "id")
if err != nil {
return false, fmt.Errorf("failed autopilot config lookup: %s", err)
}
// If the existing index does not match the provided CAS
// index arg, then we shouldn't update anything and can safely
// return early here.
e, ok := existing.(*structs.AutopilotConfig)
if !ok || e.ModifyIndex != cidx {
return false, nil
}
s.autopilotSetConfigTxn(idx, tx, config)
tx.Commit()
return true, nil
}
func (s *StateStore) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.AutopilotConfig) error {
// Check for an existing config
existing, err := tx.First("autopilot-config", "id")
if err != nil {
return fmt.Errorf("failed autopilot config lookup: %s", err)
}
// Set the indexes.
if existing != nil {
config.CreateIndex = existing.(*structs.AutopilotConfig).CreateIndex
} else {
config.CreateIndex = idx
}
config.ModifyIndex = idx
if err := tx.Insert("autopilot-config", config); err != nil {
return fmt.Errorf("failed updating autopilot config: %s", err)
}
return nil
}

View File

@ -0,0 +1,87 @@
package state
import (
"reflect"
"testing"
"github.com/hashicorp/consul/consul/structs"
)
func TestStateStore_Autopilot(t *testing.T) {
s := testStateStore(t)
expected := &structs.AutopilotConfig{
CleanupDeadServers: true,
}
if err := s.AutopilotSetConfig(0, expected); err != nil {
t.Fatal(err)
}
idx, config, err := s.AutopilotConfig()
if err != nil {
t.Fatal(err)
}
if idx != 0 {
t.Fatalf("bad: %d", idx)
}
if !reflect.DeepEqual(expected, config) {
t.Fatalf("bad: %#v, %#v", expected, config)
}
}
func TestStateStore_AutopilotCAS(t *testing.T) {
s := testStateStore(t)
expected := &structs.AutopilotConfig{
CleanupDeadServers: true,
}
if err := s.AutopilotSetConfig(0, expected); err != nil {
t.Fatal(err)
}
if err := s.AutopilotSetConfig(1, expected); err != nil {
t.Fatal(err)
}
// Do a CAS with an index lower than the entry
ok, err := s.AutopilotCASConfig(2, 0, &structs.AutopilotConfig{
CleanupDeadServers: false,
})
if ok || err != nil {
t.Fatalf("expected (false, nil), got: (%v, %#v)", ok, err)
}
// Check that the index is untouched and the entry
// has not been updated.
idx, config, err := s.AutopilotConfig()
if err != nil {
t.Fatal(err)
}
if idx != 1 {
t.Fatalf("bad: %d", idx)
}
if !config.CleanupDeadServers {
t.Fatalf("bad: %#v", config)
}
// Do another CAS, this time with the correct index
ok, err = s.AutopilotCASConfig(2, 1, &structs.AutopilotConfig{
CleanupDeadServers: false,
})
if !ok || err != nil {
t.Fatalf("expected (true, nil), got: (%v, %#v)", ok, err)
}
// Make sure the config was updated
idx, config, err = s.AutopilotConfig()
if err != nil {
t.Fatal(err)
}
if idx != 2 {
t.Fatalf("bad: %d", idx)
}
if config.CleanupDeadServers {
t.Fatalf("bad: %#v", config)
}
}

View File

@ -31,6 +31,7 @@ func stateStoreSchema() *memdb.DBSchema {
aclsTableSchema,
coordinatesTableSchema,
preparedQueriesTableSchema,
autopilotConfigTableSchema,
}
// Add the tables to the root schema
@ -440,3 +441,21 @@ func preparedQueriesTableSchema() *memdb.TableSchema {
},
}
}
// autopilotConfigTableSchema returns a new table schema used for storing
// the autopilot configuration
func autopilotConfigTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "autopilot-config",
Indexes: map[string]*memdb.IndexSchema{
"id": &memdb.IndexSchema{
Name: "id",
AllowMissing: true,
Unique: true,
Indexer: &memdb.ConditionalIndex{
Conditional: func(obj interface{}) (bool, error) { return true, nil },
},
},
},
}
}

View File

@ -4,6 +4,15 @@ import (
"github.com/hashicorp/raft"
)
type AutopilotConfig struct {
// CleanupDeadServers controls whether to remove dead servers when a new
// server is added to the Raft peers
CleanupDeadServers bool
// RaftIndex stores the create/modify indexes of this configuration
RaftIndex
}
// RaftServer has information about a server in the Raft configuration.
type RaftServer struct {
// ID is the unique ID for the server. These are currently the same
@ -55,3 +64,24 @@ type RaftPeerByAddressRequest struct {
func (op *RaftPeerByAddressRequest) RequestDatacenter() string {
return op.Datacenter
}
// AutopilotSetConfigRequest is used by the Operator endpoint to update the
// current Autopilot configuration of the cluster.
type AutopilotSetConfigRequest struct {
// Datacenter is the target this request is intended for.
Datacenter string
// Config is the new Autopilot configuration to use.
Config AutopilotConfig
// CAS controls whether to use check-and-set semantics for this request.
CAS bool
// WriteRequest holds the ACL token to go along with this request.
WriteRequest
}
// RequestDatacenter returns the datacenter for a given request.
func (op *AutopilotSetConfigRequest) RequestDatacenter() string {
return op.Datacenter
}

View File

@ -40,6 +40,7 @@ const (
CoordinateBatchUpdateType
PreparedQueryRequestType
TxnRequestType
AutopilotRequestType
)
const (

View File

@ -91,6 +91,35 @@ func CanServersUnderstandProtocol(members []serf.Member, version uint8) (bool, e
return (numServers > 0) && (numWhoGrok == numServers), nil
}
// ServerMinRaftProtocol returns the lowest supported Raft protocol among alive servers
func ServerMinRaftProtocol(members []serf.Member) (int, error) {
minVersion := -1
for _, m := range members {
if m.Tags["role"] != "consul" || m.Status != serf.StatusAlive {
continue
}
vsn, ok := m.Tags["raft_vsn"]
if !ok {
vsn = "1"
}
raftVsn, err := strconv.Atoi(vsn)
if err != nil {
return -1, err
}
if minVersion == -1 || raftVsn < minVersion {
minVersion = raftVsn
}
}
if minVersion == -1 {
return minVersion, fmt.Errorf("No servers found")
}
return minVersion, nil
}
// Returns if a member is a consul node. Returns a bool,
// and the datacenter.
func isConsulNode(m serf.Member) (bool, string) {

View File

@ -28,6 +28,7 @@ The following endpoints are supported:
* [`/v1/operator/raft/configuration`](#raft-configuration): Inspects the Raft configuration
* [`/v1/operator/raft/peer`](#raft-peer): Operates on Raft peers
* [`/v1/operator/keyring`](#keyring): Operates on gossip keyring
* [`/v1/operator/autopilot/configuration`](#autopilot-configuration): Operates on the Autopilot configuration
Not all endpoints support blocking queries and all consistency modes,
see details in the sections below.
@ -258,3 +259,65 @@ If ACLs are enabled, the client will need to supply an ACL Token with
[`keyring`](/docs/internals/acl.html#keyring) write privileges.
The return code will indicate success or failure.
### <a name="autopilot-configuration"></a> /v1/operator/autopilot/configuration
Available in Consul 0.8.0 and later, the autopilot configuration endpoint supports the
`GET` and `PUT` methods.
This endpoint supports the use of ACL tokens using either the `X-CONSUL-TOKEN`
header or the `?token=` query parameter.
By default, the datacenter of the agent is queried; however, the `dc` can be
provided using the `?dc=` query parameter.
#### GET Method
When using the `GET` method, the request will be forwarded to the cluster
leader to retrieve its latest Autopilot configuration.
If the cluster doesn't currently have a leader an error will be returned. You
can use the `?stale` query parameter to read the Raft configuration from any
of the Consul servers.
If ACLs are enabled, the client will need to supply an ACL Token with
[`operator`](/docs/internals/acl.html#operator) read privileges.
A JSON body is returned that looks like this:
```javascript
{
"CleanupDeadServers": true,
"CreateIndex": 4,
"ModifyIndex": 4
}
```
`CleanupDeadServers` is whether dead servers should be removed automatically when
a new server is added to the cluster.
#### PUT Method
Using the `PUT` method, this endpoint will update the Autopilot configuration
of the cluster.
The `?cas=<index>` can optionally be specified to update the configuration as a
Check-And-Set operation. The update will only happen if the given index matches
the `ModifyIndex` of the configuration at the time of writing.
If ACLs are enabled, the client will need to supply an ACL Token with
[`operator`](/docs/internals/acl.html#operator) write privileges.
The `PUT` method expects a JSON request body to be submitted. The request
body must look like:
```javascript
{
"CleanupDeadServers": true
}
```
`CleanupDeadServers` is whether dead servers should be removed automatically when
a new server is added to the cluster.
The return code will indicate success or failure.

View File

@ -551,6 +551,19 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
* <a name="atlas_endpoint"></a><a href="#atlas_endpoint">`atlas_endpoint`</a> Equivalent to the
[`-atlas-endpoint` command-line flag](#_atlas_endpoint).
* <a name="autopilot"></a><a href="#autopilot">`autopilot`</a> Added in Consul 0.8, this object
allows a number of sub-keys to be set which can configure operator-friendly settings for Consul servers.
<br><br>
The following sub-keys are available:
* <a name="raft_protocol"></a><a href="#raft_protocol">`raft_protocol`</a> - This controls the internal
version of the Raft consensus protocol used for server communications. This defaults to 2 but must
be set to 3 in order to gain access to other Autopilot features, with the exception of
[`cleanup_dead_servers`](#cleanup_dead_servers).
* <a name="cleanup_dead_servers"></a><a href="#cleanup_dead_servers">`cleanup_dead_servers`</a> - This controls
the automatic removal of dead server nodes whenever a new server is added to the cluster. Defaults to `true`.
* <a name="bootstrap"></a><a href="#bootstrap">`bootstrap`</a> Equivalent to the
[`-bootstrap` command-line flag](#_bootstrap).

View File

@ -35,10 +35,12 @@ Usage: consul operator <subcommand> [options]
Subcommands:
raft Provides cluster-level tools for Consul operators
autopilot Provides tools for modifying Autopilot configuration
raft Provides cluster-level tools for Consul operators
```
For more information, examples, and usage about a subcommand, click on the name
of the subcommand in the sidebar or one of the links below:
- [autopilot] (/docs/commands/operator/autopilot.html)
- [raft] (/docs/commands/operator/raft.html)

View File

@ -0,0 +1,67 @@
---
layout: "docs"
page_title: "Commands: Operator Autopilot"
sidebar_current: "docs-commands-operator-autopilot"
description: >
The operator autopilot subcommand is used to view and modify Consul's Autopilot configuration.
---
# Consul Operator Autopilot
Command: `consul operator autopilot`
The Autopilot operator command is used to interact with Consul's Autopilot subsystem. The
command can be used to view or modify the current Autopilot configuration.
```text
Usage: consul operator autopilot <subcommand> [options]
The Autopilot operator command is used to interact with Consul's Autopilot
subsystem. The command can be used to view or modify the current configuration.
Subcommands:
get-config Display the current Autopilot configuration
set-config Modify the current Autopilot configuration
```
## get-config
This command displays the current Raft peer configuration.
Usage: `consul operator autopilot get-config [options]`
#### API Options
<%= partial "docs/commands/http_api_options_client" %>
<%= partial "docs/commands/http_api_options_server" %>
The output looks like this:
```
CleanupDeadServers = true
```
## set-config
Modifies the current Autopilot configuration.
Usage: `consul operator autopilot set-config [options]`
#### API Options
<%= partial "docs/commands/http_api_options_client" %>
<%= partial "docs/commands/http_api_options_server" %>
#### Command Options
* `-cleanup-dead-servers` - Specifies whether to enable automatic removal of dead servers
upon the successful joining of new servers to the cluster. Must be one of `[true|false]`.
The output looks like this:
```
Configuration updated!
```
The return code will indicate success or failure.

View File

@ -142,6 +142,9 @@
<li<%= sidebar_current("docs-commands-operator") %>>
<a href="/docs/commands/operator.html">operator</a>
<ul class="subnav">
<li<%= sidebar_current("docs-commands-operator-autopilot") %>>
<a href="/docs/commands/operator/autopilot.html">autopilot</a>
</li>
<li<%= sidebar_current("docs-commands-operator-raft") %>>
<a href="/docs/commands/operator/raft.html">raft</a>
</li>