From 985d23252296d9645ac3615ca29db32048ef3915 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Fri, 17 Feb 2017 10:49:16 -0800 Subject: [PATCH 01/11] Add configurable cleanup of dead servers when a new server joins --- consul/config.go | 6 ++++ consul/leader.go | 14 +++++++++ consul/leader_test.go | 69 +++++++++++++++++++++++++++++++++++++++++++ consul/server.go | 1 + 4 files changed, 90 insertions(+) diff --git a/consul/config.go b/consul/config.go index a11f73f454..e07500f5c7 100644 --- a/consul/config.go +++ b/consul/config.go @@ -274,6 +274,10 @@ type Config struct { // This period is meant to be long enough for a leader election to take // place, and a small jitter is applied to avoid a thundering herd. RPCHoldTimeout time.Duration + + // AutopilotServerCleanup controls whether to remove dead servers when a new + // server is added to the Raft peers + AutopilotServerCleanup bool } // CheckVersion is used to check if the ProtocolVersion is valid @@ -346,6 +350,8 @@ func DefaultConfig() *Config { RPCHoldTimeout: 7 * time.Second, TLSMinVersion: "tls10", + + AutopilotServerCleanup: true, } // Increase our reap interval to 3 days instead of 24h. diff --git a/consul/leader.go b/consul/leader.go index 5cfcec4632..fc0a32bf28 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -567,6 +567,20 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error { s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err) return err } + + // Look for dead servers to clean up + if s.config.AutopilotServerCleanup { + for _, member := range s.serfLAN.Members() { + valid, _ := agent.IsConsulServer(member) + if valid && member.Name != m.Name && member.Status == serf.StatusFailed { + if err := s.handleDeregisterMember("Removing failed server", member); err != nil { + return fmt.Errorf("[ERROR] consul: Couldn't deregister failed server (%s): %v", member.Name, err) + } + s.logger.Printf("[INFO] consul: Removed failed server: %v", member.Name) + } + } + } + return nil } diff --git a/consul/leader_test.go b/consul/leader_test.go index 329bf41a35..b85acee058 100644 --- a/consul/leader_test.go +++ b/consul/leader_test.go @@ -622,3 +622,72 @@ func TestLeader_ReapTombstones(t *testing.T) { t.Fatalf("err: %v", err) }) } + +func TestLeader_DeadServerCleanup(t *testing.T) { + dir1, s1 := testServerDCExpect(t, "dc1", 3) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServerDCExpect(t, "dc1", 3) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + dir3, s3 := testServerDCExpect(t, "dc1", 3) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + + servers := []*Server{s1, s2, s3} + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + if _, err := s3.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + for _, s := range servers { + testutil.WaitForResult(func() (bool, error) { + peers, _ := s.numPeers() + return peers == 3, nil + }, func(err error) { + t.Fatalf("should have 3 peers") + }) + } + + // Kill a non-leader server (s2 or s3, so s4 can still join s1) + var nonLeader *Server + var removedIndex int + for i, s := range servers { + if !s.IsLeader() && i > 0 { + nonLeader = s + removedIndex = i + break + } + } + nonLeader.Shutdown() + + time.Sleep(1 * time.Second) + + // Bring up and join a new server + dir4, s4 := testServerDCExpect(t, "dc1", 3) + defer os.RemoveAll(dir4) + defer s4.Shutdown() + + if _, err := s4.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + // Make sure the dead server is removed and we're back to 3 total peers + servers[removedIndex] = s4 + for _, s := range servers { + testutil.WaitForResult(func() (bool, error) { + peers, _ := s.numPeers() + return peers == 3, nil + }, func(err error) { + t.Fatalf("should have 3 peers") + }) + } +} diff --git a/consul/server.go b/consul/server.go index 13d3088052..d58d0d4a4f 100644 --- a/consul/server.go +++ b/consul/server.go @@ -317,6 +317,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w conf.Tags["vsn"] = fmt.Sprintf("%d", s.config.ProtocolVersion) conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin) conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax) + conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion) conf.Tags["build"] = s.config.Build conf.Tags["port"] = fmt.Sprintf("%d", addr.Port) if s.config.Bootstrap { From b20fd222f649ec9b5fd6d94dfa80b6eda58129e3 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 22 Feb 2017 12:53:32 -0800 Subject: [PATCH 02/11] Add raft version 2/3 compatibility --- command/agent/agent.go | 7 ++ command/agent/config.go | 22 +++++++ command/agent/config_test.go | 18 +++++ consul/agent/server.go | 24 +++++-- consul/agent/server_test.go | 8 +++ consul/config.go | 8 +-- consul/leader.go | 68 +++++++++++++------ consul/leader_test.go | 124 ++++++++++++++++++++++++++++++----- consul/serf.go | 13 +++- consul/server.go | 7 +- consul/server_test.go | 7 ++ consul/util.go | 24 +++++++ 12 files changed, 279 insertions(+), 51 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 4782f43b3a..58bbef6cf6 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -28,6 +28,7 @@ import ( "github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/serf" "github.com/shirou/gopsutil/host" + "github.com/hashicorp/raft" ) const ( @@ -412,6 +413,12 @@ func (a *Agent) consulConfig() *consul.Config { if a.config.SessionTTLMinRaw != "" { base.SessionTTLMin = a.config.SessionTTLMin } + if a.config.Autopilot.RaftProtocolVersion != 0 { + base.RaftConfig.ProtocolVersion = raft.ProtocolVersion(a.config.Autopilot.RaftProtocolVersion) + } + if a.config.Autopilot.DeadServerCleanup != nil { + base.DeadServerCleanup = *a.config.Autopilot.DeadServerCleanup + } // Format the build string revision := a.config.Revision diff --git a/command/agent/config.go b/command/agent/config.go index ac72e023d9..1f77931469 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -262,6 +262,16 @@ type Telemetry struct { CirconusBrokerSelectTag string `mapstructure:"circonus_broker_select_tag"` } +// Autopilot is used to configure helpful features for operating Consul servers. +type Autopilot struct { + // RaftProtocolVersion sets the Raft protocol version to use on this server. + RaftProtocolVersion int `mapstructure:"raft_protocol"` + + // DeadServerCleanup enables the automatic cleanup of dead servers when new ones + // are added to the peer list. Defaults to true. + DeadServerCleanup *bool `mapstructure:"dead_server_cleanup"` +} + // Config is the configuration that can be set for an Agent. // Some of this is configurable as CLI flags, but most must // be set using a configuration file. @@ -387,6 +397,9 @@ type Config struct { // servers. This can be changed on reload. SkipLeaveOnInt *bool `mapstructure:"skip_leave_on_interrupt"` + // Autopilot is used to configure helpful features for operating Consul servers. + Autopilot Autopilot `mapstructure:"autopilot"` + Telemetry Telemetry `mapstructure:"telemetry"` // Protocol is the Consul protocol version to use. @@ -759,6 +772,9 @@ func DefaultConfig() *Config { CheckReapInterval: 30 * time.Second, AEInterval: time.Minute, DisableCoordinates: false, + Autopilot: Autopilot{ + DeadServerCleanup: Bool(true), + }, // SyncCoordinateRateTarget is set based on the rate that we want // the server to handle as an aggregate across the entire cluster. @@ -1331,6 +1347,12 @@ func MergeConfig(a, b *Config) *Config { if b.SkipLeaveOnInt != nil { result.SkipLeaveOnInt = b.SkipLeaveOnInt } + if b.Autopilot.RaftProtocolVersion != 0 { + result.Autopilot.RaftProtocolVersion = b.Autopilot.RaftProtocolVersion + } + if b.Autopilot.DeadServerCleanup != nil { + result.Autopilot.DeadServerCleanup = b.Autopilot.DeadServerCleanup + } if b.Telemetry.DisableHostname == true { result.Telemetry.DisableHostname = true } diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 4c9addc484..1510c3fdd6 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -1098,6 +1098,20 @@ func TestDecodeConfig_Performance(t *testing.T) { } } +func TestDecodeConfig_Autopilot(t *testing.T) { + input := `{"autopilot": { "raft_protocol": 3, "dead_server_cleanup": true }}` + config, err := DecodeConfig(bytes.NewReader([]byte(input))) + if err != nil { + t.Fatalf("err: %s", err) + } + if config.Autopilot.RaftProtocolVersion != 3 { + t.Fatalf("bad: raft_protocol isn't set: %#v", config) + } + if config.Autopilot.DeadServerCleanup == nil || !*config.Autopilot.DeadServerCleanup { + t.Fatalf("bad: dead_server_cleanup isn't set: %#v", config) + } +} + func TestDecodeConfig_Services(t *testing.T) { input := `{ "services": [ @@ -1619,6 +1633,10 @@ func TestMergeConfig(t *testing.T) { Server: true, LeaveOnTerm: Bool(true), SkipLeaveOnInt: Bool(true), + Autopilot: Autopilot{ + RaftProtocolVersion: 3, + DeadServerCleanup: Bool(true), + }, EnableDebug: true, VerifyIncoming: true, VerifyOutgoing: true, diff --git a/consul/agent/server.go b/consul/agent/server.go index 8ba1f0c5da..bd6c6c7cfc 100644 --- a/consul/agent/server.go +++ b/consul/agent/server.go @@ -25,13 +25,15 @@ func (k *Key) Equal(x *Key) bool { // Server is used to return details of a consul server type Server struct { - Name string - Datacenter string - Port int - Bootstrap bool - Expect int - Version int - Addr net.Addr + Name string + ID string + Datacenter string + Port int + Bootstrap bool + Expect int + Version int + RaftVersion int + Addr net.Addr } // Key returns the corresponding Key @@ -84,16 +86,24 @@ func IsConsulServer(m serf.Member) (bool, *Server) { return false, nil } + raft_vsn_str := m.Tags["raft_vsn"] + raft_vsn, err := strconv.Atoi(raft_vsn_str) + if err != nil { + return false, nil + } + addr := &net.TCPAddr{IP: m.Addr, Port: port} parts := &Server{ Name: m.Name, + ID: m.Tags["id"], Datacenter: datacenter, Port: port, Bootstrap: bootstrap, Expect: expect, Addr: addr, Version: vsn, + RaftVersion: raft_vsn, } return true, parts } diff --git a/consul/agent/server_test.go b/consul/agent/server_test.go index 0dda84f851..adf7622398 100644 --- a/consul/agent/server_test.go +++ b/consul/agent/server_test.go @@ -56,9 +56,11 @@ func TestIsConsulServer(t *testing.T) { Addr: net.IP([]byte{127, 0, 0, 1}), Tags: map[string]string{ "role": "consul", + "id": "asdf", "dc": "east-aws", "port": "10000", "vsn": "1", + "raft_vsn": "3", }, } ok, parts := agent.IsConsulServer(m) @@ -68,12 +70,18 @@ func TestIsConsulServer(t *testing.T) { if parts.Name != "foo" { t.Fatalf("bad: %v", parts) } + if parts.ID != "asdf" { + t.Fatalf("bad: %v", parts.ID) + } if parts.Bootstrap { t.Fatalf("unexpected bootstrap") } if parts.Expect != 0 { t.Fatalf("bad: %v", parts.Expect) } + if parts.RaftVersion != 3 { + t.Fatalf("bad: %v", parts.RaftVersion) + } m.Tags["bootstrap"] = "1" m.Tags["disabled"] = "1" ok, parts = agent.IsConsulServer(m) diff --git a/consul/config.go b/consul/config.go index e07500f5c7..01d80711e8 100644 --- a/consul/config.go +++ b/consul/config.go @@ -275,9 +275,9 @@ type Config struct { // place, and a small jitter is applied to avoid a thundering herd. RPCHoldTimeout time.Duration - // AutopilotServerCleanup controls whether to remove dead servers when a new + // DeadServerCleanup controls whether to remove dead servers when a new // server is added to the Raft peers - AutopilotServerCleanup bool + DeadServerCleanup bool } // CheckVersion is used to check if the ProtocolVersion is valid @@ -351,7 +351,7 @@ func DefaultConfig() *Config { TLSMinVersion: "tls10", - AutopilotServerCleanup: true, + DeadServerCleanup: true, } // Increase our reap interval to 3 days instead of 24h. @@ -368,7 +368,7 @@ func DefaultConfig() *Config { // Enable interoperability with unversioned Raft library, and don't // start using new ID-based features yet. - conf.RaftConfig.ProtocolVersion = 1 + conf.RaftConfig.ProtocolVersion = 2 conf.ScaleRaft(DefaultRaftMultiplier) // Disable shutdown on removal diff --git a/consul/leader.go b/consul/leader.go index fc0a32bf28..8197a5e0fb 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -562,21 +562,32 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error { } // Attempt to add as a peer - addFuture := s.raft.AddPeer(raft.ServerAddress(addr)) - if err := addFuture.Error(); err != nil { - s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err) + minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members()) + if err != nil { return err } + if minRaftProtocol >= 2 && parts.RaftVersion >= 3 { + addFuture := s.raft.AddVoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0) + if err := addFuture.Error(); err != nil { + s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err) + return err + } + } else { + addFuture := s.raft.AddPeer(raft.ServerAddress(addr)) + if err := addFuture.Error(); err != nil { + s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err) + return err + } + } + // Look for dead servers to clean up - if s.config.AutopilotServerCleanup { + if s.config.DeadServerCleanup { for _, member := range s.serfLAN.Members() { valid, _ := agent.IsConsulServer(member) if valid && member.Name != m.Name && member.Status == serf.StatusFailed { - if err := s.handleDeregisterMember("Removing failed server", member); err != nil { - return fmt.Errorf("[ERROR] consul: Couldn't deregister failed server (%s): %v", member.Name, err) - } - s.logger.Printf("[INFO] consul: Removed failed server: %v", member.Name) + s.logger.Printf("[INFO] consul: Attempting removal of failed server: %v", member.Name) + go s.serfLAN.RemoveFailedNode(member.Name) } } } @@ -597,21 +608,38 @@ func (s *Server) removeConsulServer(m serf.Member, port int) error { s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err) return err } - for _, server := range configFuture.Configuration().Servers { - if server.Address == raft.ServerAddress(addr) { - goto REMOVE - } - } - return nil -REMOVE: - // Attempt to remove as a peer. - future := s.raft.RemovePeer(raft.ServerAddress(addr)) - if err := future.Error(); err != nil { - s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v", - addr, err) + minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members()) + if err != nil { return err } + + _, parts := agent.IsConsulServer(m) + + // Pick which remove API to use based on how the server was added. + for _, server := range configFuture.Configuration().Servers { + // If we understand the new add/remove APIs and the server was added by ID, use the new remove API + if minRaftProtocol >= 2 && server.ID == raft.ServerID(parts.ID) { + s.logger.Printf("[INFO] consul: removing server via new api, %q %q", server.ID, server.Address) + future := s.raft.RemoveServer(raft.ServerID(parts.ID), 0, 0) + if err := future.Error(); err != nil { + s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v", + addr, err) + return err + } + break + } else if server.Address == raft.ServerAddress(addr) { + // If not, use the old remove API + future := s.raft.RemovePeer(raft.ServerAddress(addr)) + if err := future.Error(); err != nil { + s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v", + addr, err) + return err + } + break + } + } + return nil } diff --git a/consul/leader_test.go b/consul/leader_test.go index b85acee058..053752122f 100644 --- a/consul/leader_test.go +++ b/consul/leader_test.go @@ -624,15 +624,15 @@ func TestLeader_ReapTombstones(t *testing.T) { } func TestLeader_DeadServerCleanup(t *testing.T) { - dir1, s1 := testServerDCExpect(t, "dc1", 3) + dir1, s1 := testServerDCBootstrap(t, "dc1", true) defer os.RemoveAll(dir1) defer s1.Shutdown() - dir2, s2 := testServerDCExpect(t, "dc1", 3) + dir2, s2 := testServerDCBootstrap(t, "dc1", false) defer os.RemoveAll(dir2) defer s2.Shutdown() - dir3, s3 := testServerDCExpect(t, "dc1", 3) + dir3, s3 := testServerDCBootstrap(t, "dc1", false) defer os.RemoveAll(dir3) defer s3.Shutdown() @@ -657,31 +657,32 @@ func TestLeader_DeadServerCleanup(t *testing.T) { }) } - // Kill a non-leader server (s2 or s3, so s4 can still join s1) - var nonLeader *Server - var removedIndex int - for i, s := range servers { - if !s.IsLeader() && i > 0 { - nonLeader = s - removedIndex = i - break - } - } - nonLeader.Shutdown() + // Kill a non-leader server + s2.Shutdown() - time.Sleep(1 * time.Second) + testutil.WaitForResult(func() (bool, error) { + alive := 0 + for _, m := range s1.LANMembers() { + if m.Status == serf.StatusAlive { + alive++ + } + } + return alive == 2, nil + }, func(err error) { + t.Fatalf("should have 2 alive members") + }) // Bring up and join a new server - dir4, s4 := testServerDCExpect(t, "dc1", 3) + dir4, s4 := testServerDCBootstrap(t, "dc1", false) defer os.RemoveAll(dir4) defer s4.Shutdown() if _, err := s4.JoinLAN([]string{addr}); err != nil { t.Fatalf("err: %v", err) } + servers[1] = s4 // Make sure the dead server is removed and we're back to 3 total peers - servers[removedIndex] = s4 for _, s := range servers { testutil.WaitForResult(func() (bool, error) { peers, _ := s.numPeers() @@ -691,3 +692,92 @@ func TestLeader_DeadServerCleanup(t *testing.T) { }) } } + +func TestLeader_RollRaftServer(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.Bootstrap = true + c.Datacenter = "dc1" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Bootstrap = false + c.Datacenter = "dc1" + c.RaftConfig.ProtocolVersion = 1 + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + dir3, s3 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + + servers := []*Server{s1, s2, s3} + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + if _, err := s3.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + for _, s := range servers { + testutil.WaitForResult(func() (bool, error) { + peers, _ := s.numPeers() + return peers == 3, nil + }, func(err error) { + t.Fatalf("should have 3 peers") + }) + } + + // Kill the v1 server + s2.Shutdown() + + for _, s := range []*Server{s1, s3} { + testutil.WaitForResult(func() (bool, error) { + minVer, err := ServerMinRaftProtocol(s.LANMembers()) + return minVer == 2, err + }, func(err error) { + t.Fatalf("minimum protocol version among servers should be 2") + }) + } + + // Replace the dead server with one running raft protocol v3 + dir4, s4 := testServerWithConfig(t, func(c *Config) { + c.Bootstrap = false + c.Datacenter = "dc1" + c.RaftConfig.ProtocolVersion = 3 + }) + defer os.RemoveAll(dir4) + defer s4.Shutdown() + if _, err := s4.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + servers[1] = s4 + + // Make sure the dead server is removed and we're back to 3 total peers + for _, s := range servers { + testutil.WaitForResult(func() (bool, error) { + addrs := 0 + ids := 0 + future := s.raft.GetConfiguration() + if err := future.Error(); err != nil { + return false, err + } + for _, server := range future.Configuration().Servers { + if string(server.ID) == string(server.Address) { + addrs++ + } else { + ids++ + } + } + return addrs == 2 && ids == 1, nil + }, func(err error) { + t.Fatalf("should see 2 legacy IDs and 1 GUID") + }) + } +} \ No newline at end of file diff --git a/consul/serf.go b/consul/serf.go index 6306bd405a..a9175c04f1 100644 --- a/consul/serf.go +++ b/consul/serf.go @@ -280,11 +280,22 @@ func (s *Server) maybeBootstrap() { // Attempt a live bootstrap! var configuration raft.Configuration var addrs []string + minRaftVersion, err := ServerMinRaftProtocol(members) + if err != nil { + s.logger.Printf("[ERR] consul: Failed to read server raft versions: %v", err) + } + for _, server := range servers { addr := server.Addr.String() addrs = append(addrs, addr) + var id raft.ServerID + if server.ID != "" && minRaftVersion >= 3 { + id = raft.ServerID(server.ID) + } else { + id = raft.ServerID(addr) + } peer := raft.Server{ - ID: raft.ServerID(addr), + ID: id, Address: raft.ServerAddress(addr), } configuration.Servers = append(configuration.Servers, peer) diff --git a/consul/server.go b/consul/server.go index d58d0d4a4f..410910bd52 100644 --- a/consul/server.go +++ b/consul/server.go @@ -379,9 +379,12 @@ func (s *Server) setupRaft() error { // Make sure we set the LogOutput. s.config.RaftConfig.LogOutput = s.config.LogOutput - // Our version of Raft protocol requires the LocalID to match the network + // Versions of the Raft protocol below 3 require the LocalID to match the network // address of the transport. s.config.RaftConfig.LocalID = raft.ServerID(trans.LocalAddr()) + if s.config.RaftConfig.ProtocolVersion >= 3 { + s.config.RaftConfig.LocalID = raft.ServerID(s.config.NodeID) + } // Build an all in-memory setup for dev mode, otherwise prepare a full // disk-based setup. @@ -479,7 +482,7 @@ func (s *Server) setupRaft() error { configuration := raft.Configuration{ Servers: []raft.Server{ raft.Server{ - ID: raft.ServerID(trans.LocalAddr()), + ID: s.config.RaftConfig.LocalID, Address: trans.LocalAddr(), }, }, diff --git a/consul/server_test.go b/consul/server_test.go index a3805dfff0..72bf3bf6f4 100644 --- a/consul/server_test.go +++ b/consul/server_test.go @@ -12,6 +12,8 @@ import ( "time" "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/go-uuid" + "github.com/hashicorp/consul/types" ) var nextPort int32 = 15000 @@ -46,6 +48,11 @@ func testServerConfig(t *testing.T, NodeName string) (string, *Config) { IP: []byte{127, 0, 0, 1}, Port: getPort(), } + nodeID, err := uuid.GenerateUUID() + if err != nil { + t.Fatal(err) + } + config.NodeID = types.NodeID(nodeID) config.SerfLANConfig.MemberlistConfig.BindAddr = "127.0.0.1" config.SerfLANConfig.MemberlistConfig.BindPort = getPort() config.SerfLANConfig.MemberlistConfig.SuspicionMult = 2 diff --git a/consul/util.go b/consul/util.go index b0000a1998..0f5f33b210 100644 --- a/consul/util.go +++ b/consul/util.go @@ -91,6 +91,30 @@ func CanServersUnderstandProtocol(members []serf.Member, version uint8) (bool, e return (numServers > 0) && (numWhoGrok == numServers), nil } +// ServerMinRaftProtocol returns the lowest supported Raft protocol among alive servers +func ServerMinRaftProtocol(members []serf.Member) (int, error) { + minVersion := -1 + for _, m := range members { + if m.Tags["role"] != "consul" || m.Status != serf.StatusAlive { + continue + } + + vsn, ok := m.Tags["raft_vsn"] + if !ok { + vsn = "1" + } + raftVsn, err := strconv.Atoi(vsn) + if err != nil { + return -1, err + } + + if minVersion == -1 || raftVsn < minVersion { + minVersion = raftVsn + } + } + return minVersion, nil +} + // Returns if a member is a consul node. Returns a bool, // and the datacenter. func isConsulNode(m serf.Member) (bool, string) { From 0023454ccccb68e12e3d0d194a7670e21a9d0f3d Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 22 Feb 2017 13:11:01 -0800 Subject: [PATCH 03/11] Add config section to docs for autopilot --- website/source/docs/agent/options.html.markdown | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/website/source/docs/agent/options.html.markdown b/website/source/docs/agent/options.html.markdown index e3f96a4eda..325fdcbdf5 100644 --- a/website/source/docs/agent/options.html.markdown +++ b/website/source/docs/agent/options.html.markdown @@ -555,6 +555,19 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass * `atlas_endpoint` Equivalent to the [`-atlas-endpoint` command-line flag](#_atlas_endpoint). +* `autopilot` Added in Consul 0.8, this object + allows a number of sub-keys to be set which can configure operator-friendly settings for Consul servers. +

+ The following sub-keys are available: + + * `raft_protocol` - This controls the internal + version of the Raft consensus protocol used for server communications. This defaults to 2 but must + be set to 3 in order to gain access to other Autopilot features, with the exception of + [`dead_server_cleanup`](#dead_server_cleanup). + + * `dead_server_cleanup` - This controls + the automatic removal of dead server nodes whenever a new server is added to the cluster. Defaults to `true`. + * `bootstrap` Equivalent to the [`-bootstrap` command-line flag](#_bootstrap). From 950a9d221269edee486e33971f0eb27d9e780747 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Thu, 23 Feb 2017 13:08:40 -0800 Subject: [PATCH 04/11] Move raft_protocol out of autopilot config --- command/agent/config.go | 12 ++++++------ command/agent/config_test.go | 18 +++++++++++++----- consul/config.go | 5 +++-- consul/leader.go | 5 +++-- consul/util.go | 5 +++++ 5 files changed, 30 insertions(+), 15 deletions(-) diff --git a/command/agent/config.go b/command/agent/config.go index 1f77931469..2a78dcd367 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -264,9 +264,6 @@ type Telemetry struct { // Autopilot is used to configure helpful features for operating Consul servers. type Autopilot struct { - // RaftProtocolVersion sets the Raft protocol version to use on this server. - RaftProtocolVersion int `mapstructure:"raft_protocol"` - // DeadServerCleanup enables the automatic cleanup of dead servers when new ones // are added to the peer list. Defaults to true. DeadServerCleanup *bool `mapstructure:"dead_server_cleanup"` @@ -405,6 +402,9 @@ type Config struct { // Protocol is the Consul protocol version to use. Protocol int `mapstructure:"protocol"` + // RaftProtocol sets the Raft protocol version to use on this server. + RaftProtocol int `mapstructure:"raft_protocol"` + // EnableDebug is used to enable various debugging features EnableDebug bool `mapstructure:"enable_debug"` @@ -1299,6 +1299,9 @@ func MergeConfig(a, b *Config) *Config { if b.Protocol > 0 { result.Protocol = b.Protocol } + if b.RaftProtocol != 0 { + result.RaftProtocol = b.RaftProtocol + } if b.NodeID != "" { result.NodeID = b.NodeID } @@ -1347,9 +1350,6 @@ func MergeConfig(a, b *Config) *Config { if b.SkipLeaveOnInt != nil { result.SkipLeaveOnInt = b.SkipLeaveOnInt } - if b.Autopilot.RaftProtocolVersion != 0 { - result.Autopilot.RaftProtocolVersion = b.Autopilot.RaftProtocolVersion - } if b.Autopilot.DeadServerCleanup != nil { result.Autopilot.DeadServerCleanup = b.Autopilot.DeadServerCleanup } diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 1510c3fdd6..cbd00216ac 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -285,6 +285,17 @@ func TestDecodeConfig(t *testing.T) { t.Fatalf("bad: %#v", config) } + // raft protocol + input = `{"raft_protocol": 3}` + config, err = DecodeConfig(bytes.NewReader([]byte(input))) + if err != nil { + t.Fatalf("err: %s", err) + } + + if config.RaftProtocol != 3 { + t.Fatalf("bad: %#v", config) + } + // Node metadata fields input = `{"node_meta": {"thing1": "1", "thing2": "2"}}` config, err = DecodeConfig(bytes.NewReader([]byte(input))) @@ -1099,14 +1110,11 @@ func TestDecodeConfig_Performance(t *testing.T) { } func TestDecodeConfig_Autopilot(t *testing.T) { - input := `{"autopilot": { "raft_protocol": 3, "dead_server_cleanup": true }}` + input := `{"autopilot": { "dead_server_cleanup": true }}` config, err := DecodeConfig(bytes.NewReader([]byte(input))) if err != nil { t.Fatalf("err: %s", err) } - if config.Autopilot.RaftProtocolVersion != 3 { - t.Fatalf("bad: raft_protocol isn't set: %#v", config) - } if config.Autopilot.DeadServerCleanup == nil || !*config.Autopilot.DeadServerCleanup { t.Fatalf("bad: dead_server_cleanup isn't set: %#v", config) } @@ -1633,8 +1641,8 @@ func TestMergeConfig(t *testing.T) { Server: true, LeaveOnTerm: Bool(true), SkipLeaveOnInt: Bool(true), + RaftProtocol: 3, Autopilot: Autopilot{ - RaftProtocolVersion: 3, DeadServerCleanup: Bool(true), }, EnableDebug: true, diff --git a/consul/config.go b/consul/config.go index 01d80711e8..7cfc09470d 100644 --- a/consul/config.go +++ b/consul/config.go @@ -366,8 +366,9 @@ func DefaultConfig() *Config { conf.SerfLANConfig.MemberlistConfig.BindPort = DefaultLANSerfPort conf.SerfWANConfig.MemberlistConfig.BindPort = DefaultWANSerfPort - // Enable interoperability with unversioned Raft library, and don't - // start using new ID-based features yet. + // TODO: default to 3 in Consul 0.9 + // Use a transitional version of the raft protocol to interoperate with + // versions 1 and 3 conf.RaftConfig.ProtocolVersion = 2 conf.ScaleRaft(DefaultRaftMultiplier) diff --git a/consul/leader.go b/consul/leader.go index 8197a5e0fb..93fad4361c 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -620,16 +620,17 @@ func (s *Server) removeConsulServer(m serf.Member, port int) error { for _, server := range configFuture.Configuration().Servers { // If we understand the new add/remove APIs and the server was added by ID, use the new remove API if minRaftProtocol >= 2 && server.ID == raft.ServerID(parts.ID) { - s.logger.Printf("[INFO] consul: removing server via new api, %q %q", server.ID, server.Address) + s.logger.Printf("[INFO] consul: removing server by ID: %q", server.ID) future := s.raft.RemoveServer(raft.ServerID(parts.ID), 0, 0) if err := future.Error(); err != nil { s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v", - addr, err) + server.ID, err) return err } break } else if server.Address == raft.ServerAddress(addr) { // If not, use the old remove API + s.logger.Printf("[INFO] consul: removing server by address: %q", server.Address) future := s.raft.RemovePeer(raft.ServerAddress(addr)) if err := future.Error(); err != nil { s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v", diff --git a/consul/util.go b/consul/util.go index 0f5f33b210..f0d56e4533 100644 --- a/consul/util.go +++ b/consul/util.go @@ -112,6 +112,11 @@ func ServerMinRaftProtocol(members []serf.Member) (int, error) { minVersion = raftVsn } } + + if minVersion == -1 { + return minVersion, fmt.Errorf("No servers found") + } + return minVersion, nil } From 81c7a0299e29ae06437da14f36df6d718cdfc04d Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Thu, 23 Feb 2017 20:32:13 -0800 Subject: [PATCH 05/11] Add state store table and endpoints for autopilot --- command/agent/agent.go | 10 +- command/agent/config.go | 3 - command/agent/config_test.go | 12 +- command/agent/http.go | 1 + command/agent/operator_endpoint.go | 44 +++++- command/agent/operator_endpoint_test.go | 57 ++++++++ consul/agent/server.go | 16 +- consul/agent/server_test.go | 10 +- consul/config.go | 11 +- consul/leader.go | 35 ++++- consul/leader_test.go | 2 +- consul/operator_endpoint.go | 52 +++++++ consul/operator_endpoint_test.go | 186 ++++++++++++++++++++++++ consul/serf.go | 2 +- consul/server_test.go | 2 +- consul/state/autopilot.go | 39 +++++ consul/state/autopilot_test.go | 31 ++++ consul/state/schema.go | 19 +++ consul/structs/operator.go | 24 +++ 19 files changed, 519 insertions(+), 37 deletions(-) create mode 100644 consul/state/autopilot.go create mode 100644 consul/state/autopilot_test.go diff --git a/command/agent/agent.go b/command/agent/agent.go index 58bbef6cf6..8e4d0b6c9a 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -25,10 +25,10 @@ import ( "github.com/hashicorp/consul/types" "github.com/hashicorp/go-sockaddr/template" "github.com/hashicorp/go-uuid" + "github.com/hashicorp/raft" "github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/serf" "github.com/shirou/gopsutil/host" - "github.com/hashicorp/raft" ) const ( @@ -383,6 +383,9 @@ func (a *Agent) consulConfig() *consul.Config { if a.config.Protocol > 0 { base.ProtocolVersion = uint8(a.config.Protocol) } + if a.config.RaftProtocol != 0 { + base.RaftConfig.ProtocolVersion = raft.ProtocolVersion(a.config.RaftProtocol) + } if a.config.ACLToken != "" { base.ACLToken = a.config.ACLToken } @@ -413,11 +416,8 @@ func (a *Agent) consulConfig() *consul.Config { if a.config.SessionTTLMinRaw != "" { base.SessionTTLMin = a.config.SessionTTLMin } - if a.config.Autopilot.RaftProtocolVersion != 0 { - base.RaftConfig.ProtocolVersion = raft.ProtocolVersion(a.config.Autopilot.RaftProtocolVersion) - } if a.config.Autopilot.DeadServerCleanup != nil { - base.DeadServerCleanup = *a.config.Autopilot.DeadServerCleanup + base.AutopilotConfig.DeadServerCleanup = *a.config.Autopilot.DeadServerCleanup } // Format the build string diff --git a/command/agent/config.go b/command/agent/config.go index 2a78dcd367..6d5f5420dc 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -772,9 +772,6 @@ func DefaultConfig() *Config { CheckReapInterval: 30 * time.Second, AEInterval: time.Minute, DisableCoordinates: false, - Autopilot: Autopilot{ - DeadServerCleanup: Bool(true), - }, // SyncCoordinateRateTarget is set based on the rate that we want // the server to handle as an aggregate across the entire cluster. diff --git a/command/agent/config_test.go b/command/agent/config_test.go index cbd00216ac..cad0a18981 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -1638,12 +1638,12 @@ func TestMergeConfig(t *testing.T) { RPC: "127.0.0.3", HTTPS: "127.0.0.4", }, - Server: true, - LeaveOnTerm: Bool(true), - SkipLeaveOnInt: Bool(true), - RaftProtocol: 3, - Autopilot: Autopilot{ - DeadServerCleanup: Bool(true), + Server: true, + LeaveOnTerm: Bool(true), + SkipLeaveOnInt: Bool(true), + RaftProtocol: 3, + Autopilot: Autopilot{ + DeadServerCleanup: Bool(true), }, EnableDebug: true, VerifyIncoming: true, diff --git a/command/agent/http.go b/command/agent/http.go index dc70693690..439a2daaab 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -297,6 +297,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.handleFuncMetrics("/v1/operator/raft/configuration", s.wrap(s.OperatorRaftConfiguration)) s.handleFuncMetrics("/v1/operator/raft/peer", s.wrap(s.OperatorRaftPeer)) s.handleFuncMetrics("/v1/operator/keyring", s.wrap(s.OperatorKeyringEndpoint)) + s.handleFuncMetrics("/v1/operator/autopilot/configuration", s.wrap(s.OperatorAutopilotConfiguration)) s.handleFuncMetrics("/v1/query", s.wrap(s.PreparedQueryGeneral)) s.handleFuncMetrics("/v1/query/", s.wrap(s.PreparedQuerySpecific)) s.handleFuncMetrics("/v1/session/create", s.wrap(s.SessionCreate)) diff --git a/command/agent/operator_endpoint.go b/command/agent/operator_endpoint.go index e7e42bb7ad..7509b54b82 100644 --- a/command/agent/operator_endpoint.go +++ b/command/agent/operator_endpoint.go @@ -3,11 +3,11 @@ package agent import ( "fmt" "net/http" + "strconv" "github.com/hashicorp/consul/consul/structs" multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/raft" - "strconv" ) // OperatorRaftConfiguration is used to inspect the current Raft configuration. @@ -105,7 +105,7 @@ func (s *HTTPServer) OperatorKeyringEndpoint(resp http.ResponseWriter, req *http case "DELETE": return s.KeyringRemove(resp, req, &args) default: - resp.WriteHeader(405) + resp.WriteHeader(http.StatusMethodNotAllowed) return nil, nil } } @@ -166,3 +166,43 @@ func keyringErrorsOrNil(responses []*structs.KeyringResponse) error { } return errs } + +// OperatorAutopilotConfiguration is used to inspect the current Autopilot configuration. +// This supports the stale query mode in case the cluster doesn't have a leader. +func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Switch on the method + switch req.Method { + case "GET": + var args structs.DCSpecificRequest + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil + } + + var reply structs.AutopilotConfig + if err := s.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil { + return nil, err + } + + return reply, nil + case "PUT": + var args structs.AutopilotSetConfigRequest + s.parseDC(req, &args.Datacenter) + s.parseToken(req, &args.Token) + + if err := decodeBody(req, &args.Config, nil); err != nil { + resp.WriteHeader(400) + resp.Write([]byte(fmt.Sprintf("Error parsing relay factor: %v", err))) + return nil, nil + } + + var reply struct{} + if err := s.agent.RPC("Operator.AutopilotSetConfiguration", &args, &reply); err != nil { + return nil, err + } + + return nil, nil + default: + resp.WriteHeader(http.StatusMethodNotAllowed) + return nil, nil + } +} diff --git a/command/agent/operator_endpoint_test.go b/command/agent/operator_endpoint_test.go index 4b1dd54126..bedcba0fbd 100644 --- a/command/agent/operator_endpoint_test.go +++ b/command/agent/operator_endpoint_test.go @@ -285,3 +285,60 @@ func TestOperator_Keyring_InvalidRelayFactor(t *testing.T) { } }, configFunc) } + +func TestOperator_AutopilotGetConfiguration(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + body := bytes.NewBuffer(nil) + req, err := http.NewRequest("GET", "/v1/operator/autopilot/configuration", body) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.OperatorAutopilotConfiguration(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + if resp.Code != 200 { + t.Fatalf("bad code: %d", resp.Code) + } + out, ok := obj.(structs.AutopilotConfig) + if !ok { + t.Fatalf("unexpected: %T", obj) + } + if !out.DeadServerCleanup { + t.Fatalf("bad: %#v", out) + } + }) +} + +func TestOperator_AutopilotSetConfiguration(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + body := bytes.NewBuffer([]byte(`{"DeadServerCleanup": false}`)) + req, err := http.NewRequest("PUT", "/v1/operator/autopilot/configuration", body) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + if _, err = srv.OperatorAutopilotConfiguration(resp, req); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Code != 200 { + t.Fatalf("bad code: %d", resp.Code) + } + + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + + var reply structs.AutopilotConfig + if err := srv.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil { + t.Fatalf("err: %v", err) + } + + if reply.DeadServerCleanup { + t.Fatalf("bad: %#v", reply) + } + }) +} \ No newline at end of file diff --git a/consul/agent/server.go b/consul/agent/server.go index bd6c6c7cfc..6510f34eab 100644 --- a/consul/agent/server.go +++ b/consul/agent/server.go @@ -95,14 +95,14 @@ func IsConsulServer(m serf.Member) (bool, *Server) { addr := &net.TCPAddr{IP: m.Addr, Port: port} parts := &Server{ - Name: m.Name, - ID: m.Tags["id"], - Datacenter: datacenter, - Port: port, - Bootstrap: bootstrap, - Expect: expect, - Addr: addr, - Version: vsn, + Name: m.Name, + ID: m.Tags["id"], + Datacenter: datacenter, + Port: port, + Bootstrap: bootstrap, + Expect: expect, + Addr: addr, + Version: vsn, RaftVersion: raft_vsn, } return true, parts diff --git a/consul/agent/server_test.go b/consul/agent/server_test.go index adf7622398..06321c7e2f 100644 --- a/consul/agent/server_test.go +++ b/consul/agent/server_test.go @@ -55,11 +55,11 @@ func TestIsConsulServer(t *testing.T) { Name: "foo", Addr: net.IP([]byte{127, 0, 0, 1}), Tags: map[string]string{ - "role": "consul", - "id": "asdf", - "dc": "east-aws", - "port": "10000", - "vsn": "1", + "role": "consul", + "id": "asdf", + "dc": "east-aws", + "port": "10000", + "vsn": "1", "raft_vsn": "3", }, } diff --git a/consul/config.go b/consul/config.go index 7cfc09470d..b2b09dbb9c 100644 --- a/consul/config.go +++ b/consul/config.go @@ -7,6 +7,7 @@ import ( "os" "time" + "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/types" "github.com/hashicorp/memberlist" @@ -275,9 +276,9 @@ type Config struct { // place, and a small jitter is applied to avoid a thundering herd. RPCHoldTimeout time.Duration - // DeadServerCleanup controls whether to remove dead servers when a new - // server is added to the Raft peers - DeadServerCleanup bool + // AutopilotConfig is used to apply the initial autopilot config when + // bootstrapping. + AutopilotConfig *structs.AutopilotConfig } // CheckVersion is used to check if the ProtocolVersion is valid @@ -351,7 +352,9 @@ func DefaultConfig() *Config { TLSMinVersion: "tls10", - DeadServerCleanup: true, + AutopilotConfig: &structs.AutopilotConfig{ + DeadServerCleanup: true, + }, } // Increase our reap interval to 3 days instead of 24h. diff --git a/consul/leader.go b/consul/leader.go index 93fad4361c..be9d8bcd4d 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -152,6 +152,13 @@ func (s *Server) establishLeadership() error { err) return err } + + // Setup autopilot config if we are the leader and need to + if err := s.initializeAutopilot(); err != nil { + s.logger.Printf("[ERR] consul: Autopilot initialization failed: %v", err) + return err + } + return nil } @@ -237,6 +244,26 @@ func (s *Server) initializeACL() error { return nil } +// initializeAutopilot is used to setup the autopilot config if we are +// the leader and need to do this +func (s *Server) initializeAutopilot() error { + // Bail if the config has already been initialized + state := s.fsm.State() + config, err := state.AutopilotConfig() + if err != nil { + return fmt.Errorf("failed to get autopilot config: %v", err) + } + if config != nil { + return nil + } + + if err := state.UpdateAutopilotConfig(s.config.AutopilotConfig); err != nil { + return err + } + + return nil +} + // reconcile is used to reconcile the differences between Serf // membership and what is reflected in our strongly consistent store. // Mainly we need to ensure all live nodes are registered, all failed @@ -581,8 +608,14 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error { } } + state := s.fsm.State() + autopilotConf, err := state.AutopilotConfig() + if err != nil { + return err + } + // Look for dead servers to clean up - if s.config.DeadServerCleanup { + if autopilotConf.DeadServerCleanup { for _, member := range s.serfLAN.Members() { valid, _ := agent.IsConsulServer(member) if valid && member.Name != m.Name && member.Status == serf.StatusFailed { diff --git a/consul/leader_test.go b/consul/leader_test.go index 053752122f..cd9f1663fb 100644 --- a/consul/leader_test.go +++ b/consul/leader_test.go @@ -780,4 +780,4 @@ func TestLeader_RollRaftServer(t *testing.T) { t.Fatalf("should see 2 legacy IDs and 1 GUID") }) } -} \ No newline at end of file +} diff --git a/consul/operator_endpoint.go b/consul/operator_endpoint.go index 027e1d1e4e..5820e6620d 100644 --- a/consul/operator_endpoint.go +++ b/consul/operator_endpoint.go @@ -125,3 +125,55 @@ REMOVE: op.srv.logger.Printf("[WARN] consul.operator: Removed Raft peer %q", args.Address) return nil } + +// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration. +func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *structs.AutopilotConfig) error { + if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done { + return err + } + + // This action requires operator read access. + acl, err := op.srv.resolveToken(args.Token) + if err != nil { + return err + } + if acl != nil && !acl.OperatorRead() { + return permissionDeniedErr + } + + // We can't fetch the leader and the configuration atomically with + // the current Raft API. + state := op.srv.fsm.State() + config, err := state.AutopilotConfig() + if err != nil { + return err + } + + *reply = *config + + return nil +} + +// AutopilotGetConfiguration is used to set the current Autopilot configuration. +func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRequest, reply *struct{}) error { + if done, err := op.srv.forward("Operator.AutopilotSetConfiguration", args, args, reply); done { + return err + } + + // This action requires operator read access. + acl, err := op.srv.resolveToken(args.Token) + if err != nil { + return err + } + if acl != nil && !acl.OperatorWrite() { + return permissionDeniedErr + } + + // Update the autopilot config + state := op.srv.fsm.State() + if err := state.UpdateAutopilotConfig(&args.Config); err != nil { + return err + } + + return nil +} diff --git a/consul/operator_endpoint_test.go b/consul/operator_endpoint_test.go index 6fcc1bc7de..41fb6d9656 100644 --- a/consul/operator_endpoint_test.go +++ b/consul/operator_endpoint_test.go @@ -243,3 +243,189 @@ func TestOperator_RaftRemovePeerByAddress_ACLDeny(t *testing.T) { t.Fatalf("err: %v", err) } } + +func TestOperator_Autopilot_GetConfiguration(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.AutopilotConfig.DeadServerCleanup = false + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Change the autopilot config from the default + arg := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var reply structs.AutopilotConfig + err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply) + if err != nil { + t.Fatalf("err: %v", err) + } + if reply.DeadServerCleanup { + t.Fatalf("bad: %#v", reply) + } +} + +func TestOperator_Autopilot_GetConfiguration_ACLDeny(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + c.AutopilotConfig.DeadServerCleanup = false + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Change the autopilot config from the default + arg := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var reply structs.AutopilotConfig + err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply) + if err == nil || !strings.Contains(err.Error(), permissionDenied) { + t.Fatalf("err: %v", err) + } + + // Create an ACL with operator read permissions. + var token string + { + var rules = ` + operator = "read" + ` + + req := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: rules, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Now it should kick back for being an invalid config, which means it + // tried to do the operation. + arg.Token = token + err = msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply) + if err != nil { + t.Fatalf("err: %v", err) + } + if reply.DeadServerCleanup { + t.Fatalf("bad: %#v", reply) + } +} + +func TestOperator_Autopilot_SetConfiguration(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.AutopilotConfig.DeadServerCleanup = false + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Change the autopilot config from the default + arg := structs.AutopilotSetConfigRequest{ + Datacenter: "dc1", + Config: structs.AutopilotConfig{ + DeadServerCleanup: true, + }, + } + var reply struct{} + err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Make sure it's changed + state := s1.fsm.State() + config, err := state.AutopilotConfig() + if err != nil { + t.Fatal(err) + } + if !config.DeadServerCleanup { + t.Fatalf("bad: %#v", config) + } +} + +func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + c.AutopilotConfig.DeadServerCleanup = false + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Change the autopilot config from the default + arg := structs.AutopilotSetConfigRequest{ + Datacenter: "dc1", + Config: structs.AutopilotConfig{ + DeadServerCleanup: true, + }, + } + var reply struct{} + err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply) + if err == nil || !strings.Contains(err.Error(), permissionDenied) { + t.Fatalf("err: %v", err) + } + + // Create an ACL with operator write permissions. + var token string + { + var rules = ` + operator = "write" + ` + + req := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: rules, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Now it should kick back for being an invalid config, which means it + // tried to do the operation. + arg.Token = token + err = msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Make sure it's changed + state := s1.fsm.State() + config, err := state.AutopilotConfig() + if err != nil { + t.Fatal(err) + } + if !config.DeadServerCleanup { + t.Fatalf("bad: %#v", config) + } +} \ No newline at end of file diff --git a/consul/serf.go b/consul/serf.go index a9175c04f1..c9e5044720 100644 --- a/consul/serf.go +++ b/consul/serf.go @@ -289,7 +289,7 @@ func (s *Server) maybeBootstrap() { addr := server.Addr.String() addrs = append(addrs, addr) var id raft.ServerID - if server.ID != "" && minRaftVersion >= 3 { + if minRaftVersion >= 3 { id = raft.ServerID(server.ID) } else { id = raft.ServerID(addr) diff --git a/consul/server_test.go b/consul/server_test.go index 72bf3bf6f4..cbb100d934 100644 --- a/consul/server_test.go +++ b/consul/server_test.go @@ -12,8 +12,8 @@ import ( "time" "github.com/hashicorp/consul/testutil" - "github.com/hashicorp/go-uuid" "github.com/hashicorp/consul/types" + "github.com/hashicorp/go-uuid" ) var nextPort int32 = 15000 diff --git a/consul/state/autopilot.go b/consul/state/autopilot.go new file mode 100644 index 0000000000..89c1953e37 --- /dev/null +++ b/consul/state/autopilot.go @@ -0,0 +1,39 @@ +package state + +import ( + "fmt" + + "github.com/hashicorp/consul/consul/structs" +) + +// AutopilotConfig is used to get the current Autopilot configuration. +func (s *StateStore) AutopilotConfig() (*structs.AutopilotConfig, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Get the autopilot config + c, err := tx.First("autopilot-config", "id") + if err != nil { + return nil, fmt.Errorf("failed autopilot config lookup: %s", err) + } + + config, ok := c.(*structs.AutopilotConfig) + if !ok { + return nil, nil + } + + return config, nil +} + +// AutopilotConfig is used to set the current Autopilot configuration. +func (s *StateStore) UpdateAutopilotConfig(config *structs.AutopilotConfig) error { + tx := s.db.Txn(true) + defer tx.Abort() + + if err := tx.Insert("autopilot-config", config); err != nil { + return fmt.Errorf("failed updating autopilot config: %s", err) + } + + tx.Commit() + return nil +} diff --git a/consul/state/autopilot_test.go b/consul/state/autopilot_test.go new file mode 100644 index 0000000000..ab12863aef --- /dev/null +++ b/consul/state/autopilot_test.go @@ -0,0 +1,31 @@ +package state + +import ( + "reflect" + "testing" + + "github.com/hashicorp/consul/consul/structs" +) + +func TestStateStore_Autopilot(t *testing.T) { + s := testStateStore(t) + + expected := &structs.AutopilotConfig{ + DeadServerCleanup: true, + } + + if err := s.UpdateAutopilotConfig(expected); err != nil { + t.Fatal(err) + } + + idx, config, err := s.AutopilotConfig() + if err != nil { + t.Fatal(err) + } + if idx != 0 { + t.Fatalf("bad: %d", idx) + } + if !reflect.DeepEqual(expected, config) { + t.Fatalf("bad: %#v, %#v", expected, config) + } +} diff --git a/consul/state/schema.go b/consul/state/schema.go index 9b93e2b5fb..7a9966506b 100644 --- a/consul/state/schema.go +++ b/consul/state/schema.go @@ -31,6 +31,7 @@ func stateStoreSchema() *memdb.DBSchema { aclsTableSchema, coordinatesTableSchema, preparedQueriesTableSchema, + autopilotConfigTableSchema, } // Add the tables to the root schema @@ -440,3 +441,21 @@ func preparedQueriesTableSchema() *memdb.TableSchema { }, } } + +// autopilotConfigTableSchema returns a new table schema used for storing +// the autopilot configuration +func autopilotConfigTableSchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: "autopilot-config", + Indexes: map[string]*memdb.IndexSchema{ + "id": &memdb.IndexSchema{ + Name: "id", + AllowMissing: true, + Unique: true, + Indexer: &memdb.ConditionalIndex{ + Conditional: func(obj interface{}) (bool, error) { return true, nil }, + }, + }, + }, + } +} diff --git a/consul/structs/operator.go b/consul/structs/operator.go index d564400bf9..592f747de4 100644 --- a/consul/structs/operator.go +++ b/consul/structs/operator.go @@ -4,6 +4,12 @@ import ( "github.com/hashicorp/raft" ) +type AutopilotConfig struct { + // DeadServerCleanup controls whether to remove dead servers when a new + // server is added to the Raft peers + DeadServerCleanup bool +} + // RaftServer has information about a server in the Raft configuration. type RaftServer struct { // ID is the unique ID for the server. These are currently the same @@ -55,3 +61,21 @@ type RaftPeerByAddressRequest struct { func (op *RaftPeerByAddressRequest) RequestDatacenter() string { return op.Datacenter } + +// AutopilotSetConfigRequest is used by the Operator endpoint to update the +// current Autopilot configuration of the cluster. +type AutopilotSetConfigRequest struct { + // Datacenter is the target this request is intended for. + Datacenter string + + // Config is the new Autopilot configuration to use. + Config AutopilotConfig + + // WriteRequest holds the ACL token to go along with this request. + WriteRequest +} + +// RequestDatacenter returns the datacenter for a given request. +func (op *AutopilotSetConfigRequest) RequestDatacenter() string { + return op.Datacenter +} From 1c24c5d8d6816cd400395f7edd7a8fa7d0bbff9f Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Thu, 23 Feb 2017 21:00:15 -0800 Subject: [PATCH 06/11] Add docs and api client methods for autopilot config --- api/operator.go | 37 ++++++++++++ api/operator_test.go | 28 +++++++++ .../docs/agent/http/operator.html.markdown | 57 +++++++++++++++++++ 3 files changed, 122 insertions(+) diff --git a/api/operator.go b/api/operator.go index a8d04a38eb..8f0aa661d3 100644 --- a/api/operator.go +++ b/api/operator.go @@ -63,6 +63,13 @@ type KeyringResponse struct { NumNodes int } +// AutopilotConfiguration is used for querying/setting the Autopilot configuration +type AutopilotConfiguration struct { + // DeadServerCleanup controls whether to remove dead servers from the Raft peer list + // when a new server joins + DeadServerCleanup bool +} + // RaftGetConfiguration is used to query the current Raft peer set. func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) { r := op.c.newRequest("GET", "/v1/operator/raft/configuration") @@ -161,3 +168,33 @@ func (op *Operator) KeyringUse(key string, q *WriteOptions) error { resp.Body.Close() return nil } + +// RaftGetConfiguration is used to query the current Raft peer set. +func (op *Operator) AutopilotGetConfiguration(q *QueryOptions) (*AutopilotConfiguration, error) { + r := op.c.newRequest("GET", "/v1/operator/autopilot/configuration") + r.setQueryOptions(q) + _, resp, err := requireOK(op.c.doRequest(r)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var out AutopilotConfiguration + if err := decodeBody(resp, &out); err != nil { + return nil, err + } + return &out, nil +} + +// RaftGetConfiguration is used to query the current Raft peer set. +func (op *Operator) AutopilotSetConfiguration(conf *AutopilotConfiguration, q *WriteOptions) error { + r := op.c.newRequest("PUT", "/v1/operator/autopilot/configuration") + r.setWriteOptions(q) + r.obj = conf + _, resp, err := requireOK(op.c.doRequest(r)) + if err != nil { + return err + } + resp.Body.Close() + return nil +} \ No newline at end of file diff --git a/api/operator_test.go b/api/operator_test.go index e55495e2e9..59b05545a9 100644 --- a/api/operator_test.go +++ b/api/operator_test.go @@ -104,3 +104,31 @@ func TestOperator_KeyringInstallListPutRemove(t *testing.T) { } } } + +func TestOperator_AutopilotGetSetConfiguration(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + operator := c.Operator() + config, err := operator.AutopilotGetConfiguration(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if !config.DeadServerCleanup { + t.Fatalf("bad: %v", config) + } + + // Change a config setting + if err := operator.AutopilotSetConfiguration(&AutopilotConfiguration{false}, nil); err != nil { + t.Fatalf("err: %v", err) + } + + config, err = operator.AutopilotGetConfiguration(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if config.DeadServerCleanup { + t.Fatalf("bad: %v", config) + } +} \ No newline at end of file diff --git a/website/source/docs/agent/http/operator.html.markdown b/website/source/docs/agent/http/operator.html.markdown index 7b6bdd52e9..a75b1d08d4 100644 --- a/website/source/docs/agent/http/operator.html.markdown +++ b/website/source/docs/agent/http/operator.html.markdown @@ -28,6 +28,7 @@ The following endpoints are supported: * [`/v1/operator/raft/configuration`](#raft-configuration): Inspects the Raft configuration * [`/v1/operator/raft/peer`](#raft-peer): Operates on Raft peers * [`/v1/operator/keyring`](#keyring): Operates on gossip keyring +* [`/v1/operator/autopilot/configuration`](#autopilot-configuration): Operates on the Autopilot configuration Not all endpoints support blocking queries and all consistency modes, see details in the sections below. @@ -258,3 +259,59 @@ If ACLs are enabled, the client will need to supply an ACL Token with [`keyring`](/docs/internals/acl.html#keyring) write privileges. The return code will indicate success or failure. + +### /v1/operator/autopilot/configuration + +Available in Consul 0.8.0 and later, the autopilot configuration endpoint supports the +`GET` and `PUT` methods. + +This endpoint supports the use of ACL tokens using either the `X-CONSUL-TOKEN` +header or the `?token=` query parameter. + +By default, the datacenter of the agent is queried; however, the `dc` can be +provided using the `?dc=` query parameter. + +#### GET Method + +When using the `GET` method, the request will be forwarded to the cluster +leader to retrieve its latest Autopilot configuration. + +If the cluster doesn't currently have a leader an error will be returned. You +can use the `?stale` query parameter to read the Raft configuration from any +of the Consul servers. + +If ACLs are enabled, the client will need to supply an ACL Token with +[`operator`](/docs/internals/acl.html#operator) read privileges. + +A JSON body is returned that looks like this: + +```javascript +{ + "DeadServerCleanup": true +} +``` + +`DeadServerCleanup` is whether dead servers should be removed automatically when +a new server is added to the cluster. + +#### PUT Method + +Using the `PUT` method, this endpoint will update the Autopilot configuration +of the cluster. + +If ACLs are enabled, the client will need to supply an ACL Token with +[`operator`](/docs/internals/acl.html#operator) write privileges. + +The `PUT` method expects a JSON request body to be submitted. The request +body must look like: + +```javascript +{ + "DeadServerCleanup": true +} +``` + +`DeadServerCleanup` is whether dead servers should be removed automatically when +a new server is added to the cluster. + +The return code will indicate success or failure. From c2e7f450020019809bb30306a39538ad7bc90379 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Fri, 24 Feb 2017 13:08:49 -0800 Subject: [PATCH 07/11] Add CAS capability to autopilot config endpoint --- api/operator.go | 46 ++++++++++++-- api/operator_test.go | 48 ++++++++++++++- command/agent/operator_endpoint.go | 24 +++++++- command/agent/operator_endpoint_test.go | 82 ++++++++++++++++++++++++- consul/fsm.go | 21 +++++++ consul/leader.go | 11 ++-- consul/operator_endpoint.go | 20 ++++-- consul/operator_endpoint_test.go | 2 +- consul/state/autopilot.go | 65 +++++++++++++++++--- consul/state/autopilot_test.go | 59 +++++++++++++++++- consul/structs/operator.go | 6 ++ consul/structs/structs.go | 1 + 12 files changed, 354 insertions(+), 31 deletions(-) diff --git a/api/operator.go b/api/operator.go index 8f0aa661d3..edfeeaec25 100644 --- a/api/operator.go +++ b/api/operator.go @@ -1,5 +1,13 @@ package api +import ( + "bytes" + "fmt" + "io" + "strconv" + "strings" +) + // Operator can be used to perform low-level operator tasks for Consul. type Operator struct { c *Client @@ -65,9 +73,16 @@ type KeyringResponse struct { // AutopilotConfiguration is used for querying/setting the Autopilot configuration type AutopilotConfiguration struct { - // DeadServerCleanup controls whether to remove dead servers from the Raft peer list - // when a new server joins + // DeadServerCleanup controls whether to remove dead servers from the Raft + // peer list when a new server joins DeadServerCleanup bool + + // CreateIndex holds the index corresponding the creation of this configuration. + // This is a read-only field. + CreateIndex uint64 + + // ModifyIndex is used for doing a Check-And-Set update operation. + ModifyIndex uint64 } // RaftGetConfiguration is used to query the current Raft peer set. @@ -169,7 +184,7 @@ func (op *Operator) KeyringUse(key string, q *WriteOptions) error { return nil } -// RaftGetConfiguration is used to query the current Raft peer set. +// AutopilotGetConfiguration is used to query the current Autopilot configuration. func (op *Operator) AutopilotGetConfiguration(q *QueryOptions) (*AutopilotConfiguration, error) { r := op.c.newRequest("GET", "/v1/operator/autopilot/configuration") r.setQueryOptions(q) @@ -186,7 +201,7 @@ func (op *Operator) AutopilotGetConfiguration(q *QueryOptions) (*AutopilotConfig return &out, nil } -// RaftGetConfiguration is used to query the current Raft peer set. +// AutopilotSetConfiguration is used to set the current Autopilot configuration. func (op *Operator) AutopilotSetConfiguration(conf *AutopilotConfiguration, q *WriteOptions) error { r := op.c.newRequest("PUT", "/v1/operator/autopilot/configuration") r.setWriteOptions(q) @@ -197,4 +212,27 @@ func (op *Operator) AutopilotSetConfiguration(conf *AutopilotConfiguration, q *W } resp.Body.Close() return nil +} + +// AutopilotCASConfiguration is used to perform a Check-And-Set update on the +// Autopilot configuration. The ModifyIndex value will be respected. Returns +// true on success or false on failures. +func (op *Operator) AutopilotCASConfiguration(conf *AutopilotConfiguration, q *WriteOptions) (bool, error) { + r := op.c.newRequest("PUT", "/v1/operator/autopilot/configuration") + r.setWriteOptions(q) + r.params.Set("cas", strconv.FormatUint(conf.ModifyIndex, 10)) + r.obj = conf + _, resp, err := requireOK(op.c.doRequest(r)) + if err != nil { + return false, err + } + defer resp.Body.Close() + + var buf bytes.Buffer + if _, err := io.Copy(&buf, resp.Body); err != nil { + return false, fmt.Errorf("Failed to read response: %v", err) + } + res := strings.Contains(string(buf.Bytes()), "true") + + return res, nil } \ No newline at end of file diff --git a/api/operator_test.go b/api/operator_test.go index 59b05545a9..efd75db3a2 100644 --- a/api/operator_test.go +++ b/api/operator_test.go @@ -120,7 +120,8 @@ func TestOperator_AutopilotGetSetConfiguration(t *testing.T) { } // Change a config setting - if err := operator.AutopilotSetConfiguration(&AutopilotConfiguration{false}, nil); err != nil { + newConf := &AutopilotConfiguration{DeadServerCleanup: false} + if err := operator.AutopilotSetConfiguration(newConf, nil); err != nil { t.Fatalf("err: %v", err) } @@ -131,4 +132,49 @@ func TestOperator_AutopilotGetSetConfiguration(t *testing.T) { if config.DeadServerCleanup { t.Fatalf("bad: %v", config) } +} + +func TestOperator_AutopilotCASConfiguration(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + operator := c.Operator() + config, err := operator.AutopilotGetConfiguration(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if !config.DeadServerCleanup { + t.Fatalf("bad: %v", config) + } + + // Pass an invalid ModifyIndex + { + newConf := &AutopilotConfiguration{ + DeadServerCleanup: false, + ModifyIndex: config.ModifyIndex - 1, + } + resp, err := operator.AutopilotCASConfiguration(newConf, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if resp { + t.Fatalf("bad: %v", resp) + } + } + + // Pass a valid ModifyIndex + { + newConf := &AutopilotConfiguration{ + DeadServerCleanup: false, + ModifyIndex: config.ModifyIndex, + } + resp, err := operator.AutopilotCASConfiguration(newConf, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if !resp { + t.Fatalf("bad: %v", resp) + } + } } \ No newline at end of file diff --git a/command/agent/operator_endpoint.go b/command/agent/operator_endpoint.go index 7509b54b82..d9cb4de639 100644 --- a/command/agent/operator_endpoint.go +++ b/command/agent/operator_endpoint.go @@ -189,18 +189,36 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re s.parseDC(req, &args.Datacenter) s.parseToken(req, &args.Token) + // Check for cas value + params := req.URL.Query() + if _, ok := params["cas"]; ok { + casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64) + if err != nil { + resp.WriteHeader(400) + resp.Write([]byte(fmt.Sprintf("Error parsing cas value: %v", err))) + return nil, nil + } + args.Config.ModifyIndex = casVal + args.CAS = true + } + if err := decodeBody(req, &args.Config, nil); err != nil { resp.WriteHeader(400) - resp.Write([]byte(fmt.Sprintf("Error parsing relay factor: %v", err))) + resp.Write([]byte(fmt.Sprintf("Error parsing autopilot config: %v", err))) return nil, nil } - var reply struct{} + var reply bool if err := s.agent.RPC("Operator.AutopilotSetConfiguration", &args, &reply); err != nil { return nil, err } - return nil, nil + // Only use the out value if this was a CAS + if !args.CAS { + return true, nil + } else { + return reply, nil + } default: resp.WriteHeader(http.StatusMethodNotAllowed) return nil, nil diff --git a/command/agent/operator_endpoint_test.go b/command/agent/operator_endpoint_test.go index bedcba0fbd..081eb6ab73 100644 --- a/command/agent/operator_endpoint_test.go +++ b/command/agent/operator_endpoint_test.go @@ -336,9 +336,87 @@ func TestOperator_AutopilotSetConfiguration(t *testing.T) { if err := srv.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil { t.Fatalf("err: %v", err) } - if reply.DeadServerCleanup { t.Fatalf("bad: %#v", reply) } }) -} \ No newline at end of file +} + +func TestOperator_AutopilotCASConfiguration(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + body := bytes.NewBuffer([]byte(`{"DeadServerCleanup": false}`)) + req, err := http.NewRequest("PUT", "/v1/operator/autopilot/configuration", body) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + if _, err = srv.OperatorAutopilotConfiguration(resp, req); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Code != 200 { + t.Fatalf("bad code: %d", resp.Code) + } + + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + + var reply structs.AutopilotConfig + if err := srv.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil { + t.Fatalf("err: %v", err) + } + + if reply.DeadServerCleanup { + t.Fatalf("bad: %#v", reply) + } + + // Create a CAS request, bad index + { + buf := bytes.NewBuffer([]byte(`{"DeadServerCleanup": true}`)) + req, err := http.NewRequest("PUT", + fmt.Sprintf("/v1/operator/autopilot/configuration?cas=%d", reply.ModifyIndex-1), buf) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.OperatorAutopilotConfiguration(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + if res := obj.(bool); res { + t.Fatalf("should NOT work") + } + } + + // Create a CAS request, good index + { + buf := bytes.NewBuffer([]byte(`{"DeadServerCleanup": true}`)) + req, err := http.NewRequest("PUT", + fmt.Sprintf("/v1/operator/autopilot/configuration?cas=%d", reply.ModifyIndex), buf) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.OperatorAutopilotConfiguration(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + if res := obj.(bool); !res { + t.Fatalf("should work") + } + } + + // Verify the update + if err := srv.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil { + t.Fatalf("err: %v", err) + } + if !reply.DeadServerCleanup { + t.Fatalf("bad: %#v", reply) + } + }) +} diff --git a/consul/fsm.go b/consul/fsm.go index 41fc87acdc..b625148132 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -105,6 +105,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} { return c.applyPreparedQueryOperation(buf[1:], log.Index) case structs.TxnRequestType: return c.applyTxn(buf[1:], log.Index) + case structs.AutopilotRequestType: + return c.applyAutopilotUpdate(buf[1:], log.Index) default: if ignoreUnknown { c.logger.Printf("[WARN] consul.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType) @@ -310,6 +312,25 @@ func (c *consulFSM) applyTxn(buf []byte, index uint64) interface{} { return structs.TxnResponse{results, errors} } +func (c *consulFSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} { + var req structs.AutopilotSetConfigRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + defer metrics.MeasureSince([]string{"consul", "fsm", "autopilot"}, time.Now()) + + if req.CAS { + act, err := c.state.AutopilotCASConfig(index, req.Config.ModifyIndex, &req.Config) + if err != nil { + return err + } else { + return act + } + } else { + return c.state.AutopilotSetConfig(index, &req.Config) + } +} + func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) { defer func(start time.Time) { c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start)) diff --git a/consul/leader.go b/consul/leader.go index be9d8bcd4d..9482f774db 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -249,7 +249,7 @@ func (s *Server) initializeACL() error { func (s *Server) initializeAutopilot() error { // Bail if the config has already been initialized state := s.fsm.State() - config, err := state.AutopilotConfig() + _, config, err := state.AutopilotConfig() if err != nil { return fmt.Errorf("failed to get autopilot config: %v", err) } @@ -257,8 +257,11 @@ func (s *Server) initializeAutopilot() error { return nil } - if err := state.UpdateAutopilotConfig(s.config.AutopilotConfig); err != nil { - return err + req := structs.AutopilotSetConfigRequest{ + Config: *s.config.AutopilotConfig, + } + if _, err = s.raftApply(structs.AutopilotRequestType, req); err != nil { + return fmt.Errorf("failed to initialize autopilot config") } return nil @@ -609,7 +612,7 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error { } state := s.fsm.State() - autopilotConf, err := state.AutopilotConfig() + _, autopilotConf, err := state.AutopilotConfig() if err != nil { return err } diff --git a/consul/operator_endpoint.go b/consul/operator_endpoint.go index 5820e6620d..a130e6b2ba 100644 --- a/consul/operator_endpoint.go +++ b/consul/operator_endpoint.go @@ -144,7 +144,7 @@ func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, r // We can't fetch the leader and the configuration atomically with // the current Raft API. state := op.srv.fsm.State() - config, err := state.AutopilotConfig() + _, config, err := state.AutopilotConfig() if err != nil { return err } @@ -154,8 +154,8 @@ func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, r return nil } -// AutopilotGetConfiguration is used to set the current Autopilot configuration. -func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRequest, reply *struct{}) error { +// AutopilotSetConfiguration is used to set the current Autopilot configuration. +func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRequest, reply *bool) error { if done, err := op.srv.forward("Operator.AutopilotSetConfiguration", args, args, reply); done { return err } @@ -169,11 +169,19 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe return permissionDeniedErr } - // Update the autopilot config - state := op.srv.fsm.State() - if err := state.UpdateAutopilotConfig(&args.Config); err != nil { + // Apply the update + resp, err := op.srv.raftApply(structs.AutopilotRequestType, args) + if err != nil { + op.srv.logger.Printf("[ERR] consul.operator: Apply failed: %v", err) return err } + if respErr, ok := resp.(error); ok { + return respErr + } + // Check if the return type is a bool. + if respBool, ok := resp.(bool); ok { + *reply = respBool + } return nil } diff --git a/consul/operator_endpoint_test.go b/consul/operator_endpoint_test.go index 41fb6d9656..27cc54ea5e 100644 --- a/consul/operator_endpoint_test.go +++ b/consul/operator_endpoint_test.go @@ -428,4 +428,4 @@ func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) { if !config.DeadServerCleanup { t.Fatalf("bad: %#v", config) } -} \ No newline at end of file +} diff --git a/consul/state/autopilot.go b/consul/state/autopilot.go index 89c1953e37..3d8ef5b53d 100644 --- a/consul/state/autopilot.go +++ b/consul/state/autopilot.go @@ -4,36 +4,83 @@ import ( "fmt" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/go-memdb" ) // AutopilotConfig is used to get the current Autopilot configuration. -func (s *StateStore) AutopilotConfig() (*structs.AutopilotConfig, error) { +func (s *StateStore) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) { tx := s.db.Txn(false) defer tx.Abort() // Get the autopilot config c, err := tx.First("autopilot-config", "id") if err != nil { - return nil, fmt.Errorf("failed autopilot config lookup: %s", err) + return 0, nil, fmt.Errorf("failed autopilot config lookup: %s", err) } config, ok := c.(*structs.AutopilotConfig) if !ok { - return nil, nil + return 0, nil, nil } - return config, nil + return config.ModifyIndex, config, nil } -// AutopilotConfig is used to set the current Autopilot configuration. -func (s *StateStore) UpdateAutopilotConfig(config *structs.AutopilotConfig) error { +// AutopilotSetConfig is used to set the current Autopilot configuration. +func (s *StateStore) AutopilotSetConfig(idx uint64, config *structs.AutopilotConfig) error { tx := s.db.Txn(true) defer tx.Abort() - if err := tx.Insert("autopilot-config", config); err != nil { - return fmt.Errorf("failed updating autopilot config: %s", err) - } + s.autopilotSetConfigTxn(idx, tx, config) tx.Commit() return nil } + +// AutopilotCASConfig is used to try updating the Autopilot configuration with a +// given Raft index. If the CAS index specified is not equal to the last observed index +// for the config, then the call is a noop, +func (s *StateStore) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotConfig) (bool, error) { + tx := s.db.Txn(true) + defer tx.Abort() + + // Check for an existing config + existing, err := tx.First("autopilot-config", "id") + if err != nil { + return false, fmt.Errorf("failed autopilot config lookup: %s", err) + } + + // If the existing index does not match the provided CAS + // index arg, then we shouldn't update anything and can safely + // return early here. + e, ok := existing.(*structs.AutopilotConfig) + if !ok || e.ModifyIndex != cidx { + return false, nil + } + + s.autopilotSetConfigTxn(idx, tx, config) + + tx.Commit() + return true, nil +} + +func (s *StateStore) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.AutopilotConfig) error { + // Check for an existing config + existing, err := tx.First("autopilot-config", "id") + if err != nil { + return fmt.Errorf("failed autopilot config lookup: %s", err) + } + + // Set the indexes. + if existing != nil { + config.CreateIndex = existing.(*structs.AutopilotConfig).CreateIndex + } else { + config.CreateIndex = idx + } + config.ModifyIndex = idx + + if err := tx.Insert("autopilot-config", config); err != nil { + return fmt.Errorf("failed updating autopilot config: %s", err) + } + return nil +} diff --git a/consul/state/autopilot_test.go b/consul/state/autopilot_test.go index ab12863aef..64b2895d06 100644 --- a/consul/state/autopilot_test.go +++ b/consul/state/autopilot_test.go @@ -14,7 +14,7 @@ func TestStateStore_Autopilot(t *testing.T) { DeadServerCleanup: true, } - if err := s.UpdateAutopilotConfig(expected); err != nil { + if err := s.AutopilotSetConfig(0, expected); err != nil { t.Fatal(err) } @@ -29,3 +29,60 @@ func TestStateStore_Autopilot(t *testing.T) { t.Fatalf("bad: %#v, %#v", expected, config) } } + +func TestStateStore_AutopilotCAS(t *testing.T) { + s := testStateStore(t) + + expected := &structs.AutopilotConfig{ + DeadServerCleanup: true, + } + + if err := s.AutopilotSetConfig(0, expected); err != nil { + t.Fatal(err) + } + if err := s.AutopilotSetConfig(1, expected); err != nil { + t.Fatal(err) + } + + // Do a CAS with an index lower than the entry + ok, err := s.AutopilotCASConfig(2, 0, &structs.AutopilotConfig{ + DeadServerCleanup: false, + }) + if ok || err != nil { + t.Fatalf("expected (false, nil), got: (%v, %#v)", ok, err) + } + + // Check that the index is untouched and the entry + // has not been deleted. + idx, config, err := s.AutopilotConfig() + if err != nil { + t.Fatal(err) + } + if idx != 1 { + t.Fatalf("bad: %d", idx) + } + if !config.DeadServerCleanup { + t.Fatalf("bad: %#v", config) + } + + // Do another CAS, this time with the correct index + ok, err = s.AutopilotCASConfig(2, 1, &structs.AutopilotConfig{ + DeadServerCleanup: false, + }) + if !ok || err != nil { + t.Fatalf("expected (true, nil), got: (%v, %#v)", ok, err) + } + + // Check that the index is untouched and the entry + // has not been deleted. + idx, config, err = s.AutopilotConfig() + if err != nil { + t.Fatal(err) + } + if idx != 2 { + t.Fatalf("bad: %d", idx) + } + if config.DeadServerCleanup { + t.Fatalf("bad: %#v", config) + } +} diff --git a/consul/structs/operator.go b/consul/structs/operator.go index 592f747de4..bb42ba7a96 100644 --- a/consul/structs/operator.go +++ b/consul/structs/operator.go @@ -8,6 +8,9 @@ type AutopilotConfig struct { // DeadServerCleanup controls whether to remove dead servers when a new // server is added to the Raft peers DeadServerCleanup bool + + // RaftIndex stores the create/modify indexes of this configuration + RaftIndex } // RaftServer has information about a server in the Raft configuration. @@ -71,6 +74,9 @@ type AutopilotSetConfigRequest struct { // Config is the new Autopilot configuration to use. Config AutopilotConfig + // CAS controls whether to use check-and-set semantics for this request. + CAS bool + // WriteRequest holds the ACL token to go along with this request. WriteRequest } diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 13c67b3d55..353689d1a9 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -40,6 +40,7 @@ const ( CoordinateBatchUpdateType PreparedQueryRequestType TxnRequestType + AutopilotRequestType ) const ( From c1f776c78bce9d65bca717ca9f1feb0c0e96be07 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Fri, 24 Feb 2017 15:54:49 -0800 Subject: [PATCH 08/11] Added operator autopilot subcommands --- api/operator.go | 2 +- api/operator_test.go | 2 +- command/operator_autopilot.go | 32 ++++++++++ command/operator_autopilot_get.go | 61 ++++++++++++++++++ command/operator_autopilot_get_test.go | 37 +++++++++++ command/operator_autopilot_set.go | 85 ++++++++++++++++++++++++++ command/operator_autopilot_set_test.go | 50 +++++++++++++++ command/operator_autopilot_test.go | 11 ++++ commands.go | 27 ++++++++ 9 files changed, 305 insertions(+), 2 deletions(-) create mode 100644 command/operator_autopilot.go create mode 100644 command/operator_autopilot_get.go create mode 100644 command/operator_autopilot_get_test.go create mode 100644 command/operator_autopilot_set.go create mode 100644 command/operator_autopilot_set_test.go create mode 100644 command/operator_autopilot_test.go diff --git a/api/operator.go b/api/operator.go index edfeeaec25..9d977b3187 100644 --- a/api/operator.go +++ b/api/operator.go @@ -235,4 +235,4 @@ func (op *Operator) AutopilotCASConfiguration(conf *AutopilotConfiguration, q *W res := strings.Contains(string(buf.Bytes()), "true") return res, nil -} \ No newline at end of file +} diff --git a/api/operator_test.go b/api/operator_test.go index efd75db3a2..e5e1e122b2 100644 --- a/api/operator_test.go +++ b/api/operator_test.go @@ -177,4 +177,4 @@ func TestOperator_AutopilotCASConfiguration(t *testing.T) { t.Fatalf("bad: %v", resp) } } -} \ No newline at end of file +} diff --git a/command/operator_autopilot.go b/command/operator_autopilot.go new file mode 100644 index 0000000000..c187fa6a4f --- /dev/null +++ b/command/operator_autopilot.go @@ -0,0 +1,32 @@ +package command + +import ( + "strings" + + "github.com/hashicorp/consul/command/base" + "github.com/mitchellh/cli" +) + +type OperatorAutopilotCommand struct { + base.Command +} + +func (c *OperatorAutopilotCommand) Help() string { + helpText := ` +Usage: consul operator autopilot [options] + +The Autopilot operator command is used to interact with Consul's Autopilot +subsystem. The command can be used to view or modify the current configuration. + +` + + return strings.TrimSpace(helpText) +} + +func (c *OperatorAutopilotCommand) Synopsis() string { + return "Provides tools for modifying Autopilot configuration" +} + +func (c *OperatorAutopilotCommand) Run(args []string) int { + return cli.RunResultHelp +} diff --git a/command/operator_autopilot_get.go b/command/operator_autopilot_get.go new file mode 100644 index 0000000000..ed714f8819 --- /dev/null +++ b/command/operator_autopilot_get.go @@ -0,0 +1,61 @@ +package command + +import ( + "flag" + "fmt" + "strings" + + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/command/base" +) + +type OperatorAutopilotGetCommand struct { + base.Command +} + +func (c *OperatorAutopilotGetCommand) Help() string { + helpText := ` +Usage: consul operator autopilot get-config [options] + +Displays the current Autopilot configuration. + +` + c.Command.Help() + + return strings.TrimSpace(helpText) +} + +func (c *OperatorAutopilotGetCommand) Synopsis() string { + return "Display the current Autopilot configuration" +} + +func (c *OperatorAutopilotGetCommand) Run(args []string) int { + c.Command.NewFlagSet(c) + + if err := c.Command.Parse(args); err != nil { + if err == flag.ErrHelp { + return 0 + } + c.Ui.Error(fmt.Sprintf("Failed to parse args: %v", err)) + return 1 + } + + // Set up a client. + client, err := c.Command.HTTPClient() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err)) + return 1 + } + + // Fetch the current configuration. + opts := &api.QueryOptions{ + AllowStale: c.Command.HTTPStale(), + } + config, err := client.Operator().AutopilotGetConfiguration(opts) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying Autopilot configuration: %s", err)) + return 1 + } + c.Ui.Output(fmt.Sprintf("DeadServerCleanup = %v", config.DeadServerCleanup)) + + return 0 +} diff --git a/command/operator_autopilot_get_test.go b/command/operator_autopilot_get_test.go new file mode 100644 index 0000000000..74496d0f16 --- /dev/null +++ b/command/operator_autopilot_get_test.go @@ -0,0 +1,37 @@ +package command + +import ( + "strings" + "testing" + + "github.com/hashicorp/consul/command/base" + "github.com/mitchellh/cli" +) + +func TestOperator_Autopilot_Get_Implements(t *testing.T) { + var _ cli.Command = &OperatorAutopilotGetCommand{} +} + +func TestOperator_Autopilot_Get(t *testing.T) { + a1 := testAgent(t) + defer a1.Shutdown() + waitForLeader(t, a1.httpAddr) + + ui := new(cli.MockUi) + c := OperatorAutopilotGetCommand{ + Command: base.Command{ + Ui: ui, + Flags: base.FlagSetHTTP, + }, + } + args := []string{"-http-addr=" + a1.httpAddr} + + code := c.Run(args) + if code != 0 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + output := strings.TrimSpace(ui.OutputWriter.String()) + if !strings.Contains(output, "DeadServerCleanup = true") { + t.Fatalf("bad: %s", output) + } +} diff --git a/command/operator_autopilot_set.go b/command/operator_autopilot_set.go new file mode 100644 index 0000000000..fda4b65cef --- /dev/null +++ b/command/operator_autopilot_set.go @@ -0,0 +1,85 @@ +package command + +import ( + "flag" + "fmt" + "strings" + + "github.com/hashicorp/consul/command/base" +) + +type OperatorAutopilotSetCommand struct { + base.Command +} + +func (c *OperatorAutopilotSetCommand) Help() string { + helpText := ` +Usage: consul operator autopilot set-config [options] + +Modifies the current Autopilot configuration. + +` + c.Command.Help() + + return strings.TrimSpace(helpText) +} + +func (c *OperatorAutopilotSetCommand) Synopsis() string { + return "Modify the current Autopilot configuration" +} + +func (c *OperatorAutopilotSetCommand) Run(args []string) int { + var deadServerCleanup string + + f := c.Command.NewFlagSet(c) + + f.StringVar(&deadServerCleanup, "dead-server-cleanup", "", + "Controls whether Consul will automatically remove dead servers "+ + "when new ones are successfully added. Must be one of `true|false`.") + + if err := c.Command.Parse(args); err != nil { + if err == flag.ErrHelp { + return 0 + } + c.Ui.Error(fmt.Sprintf("Failed to parse args: %v", err)) + return 1 + } + + // Set up a client. + client, err := c.Command.HTTPClient() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err)) + return 1 + } + + // Fetch the current configuration. + operator := client.Operator() + conf, err := operator.AutopilotGetConfiguration(nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying Autopilot configuration: %s", err)) + return 1 + } + + if deadServerCleanup != "" { + switch deadServerCleanup { + case "true": + conf.DeadServerCleanup = true + case "false": + conf.DeadServerCleanup = false + default: + c.Ui.Error(fmt.Sprintf("Invalid value for dead-server-cleanup: %q", deadServerCleanup)) + } + } + + result, err := operator.AutopilotCASConfiguration(conf, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error setting Autopilot configuration: %s", err)) + return 1 + } + if result { + c.Ui.Output("Configuration updated") + } else { + c.Ui.Output("Configuration could not be atomically updated") + } + + return 0 +} diff --git a/command/operator_autopilot_set_test.go b/command/operator_autopilot_set_test.go new file mode 100644 index 0000000000..e9da49e417 --- /dev/null +++ b/command/operator_autopilot_set_test.go @@ -0,0 +1,50 @@ +package command + +import ( + "strings" + "testing" + + "github.com/hashicorp/consul/command/base" + "github.com/hashicorp/consul/consul/structs" + "github.com/mitchellh/cli" +) + +func TestOperator_Autopilot_Set_Implements(t *testing.T) { + var _ cli.Command = &OperatorAutopilotSetCommand{} +} + +func TestOperator_Autopilot_Set(t *testing.T) { + a1 := testAgent(t) + defer a1.Shutdown() + waitForLeader(t, a1.httpAddr) + + ui := new(cli.MockUi) + c := OperatorAutopilotSetCommand{ + Command: base.Command{ + Ui: ui, + Flags: base.FlagSetHTTP, + }, + } + args := []string{"-http-addr=" + a1.httpAddr, "-dead-server-cleanup=false"} + + code := c.Run(args) + if code != 0 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + output := strings.TrimSpace(ui.OutputWriter.String()) + if !strings.Contains(output, "Configuration updated") { + t.Fatalf("bad: %s", output) + } + + req := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var reply structs.AutopilotConfig + if err := a1.agent.RPC("Operator.AutopilotGetConfiguration", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + + if reply.DeadServerCleanup { + t.Fatalf("bad: %#v", reply) + } +} diff --git a/command/operator_autopilot_test.go b/command/operator_autopilot_test.go new file mode 100644 index 0000000000..6f4adb4a2b --- /dev/null +++ b/command/operator_autopilot_test.go @@ -0,0 +1,11 @@ +package command + +import ( + "testing" + + "github.com/mitchellh/cli" +) + +func TestOperator_Autopilot_Implements(t *testing.T) { + var _ cli.Command = &OperatorAutopilotCommand{} +} diff --git a/commands.go b/commands.go index a9ed849b7b..8e061ca3d0 100644 --- a/commands.go +++ b/commands.go @@ -213,6 +213,33 @@ func init() { }, nil }, + "operator autopilot": func() (cli.Command, error) { + return &command.OperatorAutopilotCommand{ + Command: base.Command{ + Flags: base.FlagSetNone, + Ui: ui, + }, + }, nil + }, + + "operator autopilot get-config": func() (cli.Command, error) { + return &command.OperatorAutopilotGetCommand{ + Command: base.Command{ + Flags: base.FlagSetHTTP, + Ui: ui, + }, + }, nil + }, + + "operator autopilot set-config": func() (cli.Command, error) { + return &command.OperatorAutopilotSetCommand{ + Command: base.Command{ + Flags: base.FlagSetHTTP, + Ui: ui, + }, + }, nil + }, + "operator raft": func() (cli.Command, error) { return &command.OperatorRaftCommand{ Command: base.Command{ From bf735aa008f4b8ffcf52d6ea6b2bb6ae46a9fcac Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Fri, 24 Feb 2017 16:00:39 -0800 Subject: [PATCH 09/11] Use BoolValue for flag type --- command/operator_autopilot_set.go | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/command/operator_autopilot_set.go b/command/operator_autopilot_set.go index fda4b65cef..6808add954 100644 --- a/command/operator_autopilot_set.go +++ b/command/operator_autopilot_set.go @@ -28,11 +28,11 @@ func (c *OperatorAutopilotSetCommand) Synopsis() string { } func (c *OperatorAutopilotSetCommand) Run(args []string) int { - var deadServerCleanup string + var deadServerCleanup base.BoolValue f := c.Command.NewFlagSet(c) - f.StringVar(&deadServerCleanup, "dead-server-cleanup", "", + f.Var(&deadServerCleanup, "dead-server-cleanup", "Controls whether Consul will automatically remove dead servers "+ "when new ones are successfully added. Must be one of `true|false`.") @@ -59,17 +59,10 @@ func (c *OperatorAutopilotSetCommand) Run(args []string) int { return 1 } - if deadServerCleanup != "" { - switch deadServerCleanup { - case "true": - conf.DeadServerCleanup = true - case "false": - conf.DeadServerCleanup = false - default: - c.Ui.Error(fmt.Sprintf("Invalid value for dead-server-cleanup: %q", deadServerCleanup)) - } - } + // Update the config values. + deadServerCleanup.Merge(&conf.DeadServerCleanup) + // Check-and-set the new configuration. result, err := operator.AutopilotCASConfiguration(conf, nil) if err != nil { c.Ui.Error(fmt.Sprintf("Error setting Autopilot configuration: %s", err)) From 5429e8ce66804cd3f345fc4d149debf11882f21e Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Fri, 24 Feb 2017 16:55:44 -0800 Subject: [PATCH 10/11] Add cli docs and minor test/comment tweaks --- command/operator_autopilot_set.go | 6 +- consul/operator_endpoint_test.go | 19 +++--- .../docs/agent/http/operator.html.markdown | 8 ++- .../docs/commands/operator.html.markdown.erb | 4 +- .../operator/autopilot.html.markdown.erb | 67 +++++++++++++++++++ website/source/layouts/docs.erb | 3 + 6 files changed, 91 insertions(+), 16 deletions(-) create mode 100644 website/source/docs/commands/operator/autopilot.html.markdown.erb diff --git a/command/operator_autopilot_set.go b/command/operator_autopilot_set.go index 6808add954..a0a47c2d3a 100644 --- a/command/operator_autopilot_set.go +++ b/command/operator_autopilot_set.go @@ -69,10 +69,10 @@ func (c *OperatorAutopilotSetCommand) Run(args []string) int { return 1 } if result { - c.Ui.Output("Configuration updated") + c.Ui.Output("Configuration updated!") + return 0 } else { c.Ui.Output("Configuration could not be atomically updated") + return 1 } - - return 0 } diff --git a/consul/operator_endpoint_test.go b/consul/operator_endpoint_test.go index 27cc54ea5e..e7cfdb772c 100644 --- a/consul/operator_endpoint_test.go +++ b/consul/operator_endpoint_test.go @@ -255,7 +255,6 @@ func TestOperator_Autopilot_GetConfiguration(t *testing.T) { testutil.WaitForLeader(t, s1.RPC, "dc1") - // Change the autopilot config from the default arg := structs.DCSpecificRequest{ Datacenter: "dc1", } @@ -283,7 +282,7 @@ func TestOperator_Autopilot_GetConfiguration_ACLDeny(t *testing.T) { testutil.WaitForLeader(t, s1.RPC, "dc1") - // Change the autopilot config from the default + // Try to get config without permissions arg := structs.DCSpecificRequest{ Datacenter: "dc1", } @@ -315,8 +314,7 @@ func TestOperator_Autopilot_GetConfiguration_ACLDeny(t *testing.T) { } } - // Now it should kick back for being an invalid config, which means it - // tried to do the operation. + // Now we can read and verify the config arg.Token = token err = msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply) if err != nil { @@ -345,7 +343,7 @@ func TestOperator_Autopilot_SetConfiguration(t *testing.T) { DeadServerCleanup: true, }, } - var reply struct{} + var reply *bool err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply) if err != nil { t.Fatalf("err: %v", err) @@ -353,7 +351,7 @@ func TestOperator_Autopilot_SetConfiguration(t *testing.T) { // Make sure it's changed state := s1.fsm.State() - config, err := state.AutopilotConfig() + _, config, err := state.AutopilotConfig() if err != nil { t.Fatal(err) } @@ -376,14 +374,14 @@ func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) { testutil.WaitForLeader(t, s1.RPC, "dc1") - // Change the autopilot config from the default + // Try to set config without permissions arg := structs.AutopilotSetConfigRequest{ Datacenter: "dc1", Config: structs.AutopilotConfig{ DeadServerCleanup: true, }, } - var reply struct{} + var reply *bool err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply) if err == nil || !strings.Contains(err.Error(), permissionDenied) { t.Fatalf("err: %v", err) @@ -411,8 +409,7 @@ func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) { } } - // Now it should kick back for being an invalid config, which means it - // tried to do the operation. + // Now we can update the config arg.Token = token err = msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply) if err != nil { @@ -421,7 +418,7 @@ func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) { // Make sure it's changed state := s1.fsm.State() - config, err := state.AutopilotConfig() + _, config, err := state.AutopilotConfig() if err != nil { t.Fatal(err) } diff --git a/website/source/docs/agent/http/operator.html.markdown b/website/source/docs/agent/http/operator.html.markdown index a75b1d08d4..12323971bc 100644 --- a/website/source/docs/agent/http/operator.html.markdown +++ b/website/source/docs/agent/http/operator.html.markdown @@ -287,7 +287,9 @@ A JSON body is returned that looks like this: ```javascript { - "DeadServerCleanup": true + "DeadServerCleanup": true, + "CreateIndex": 4, + "ModifyIndex": 4 } ``` @@ -299,6 +301,10 @@ a new server is added to the cluster. Using the `PUT` method, this endpoint will update the Autopilot configuration of the cluster. +The `?cas=` can optionally be specified to update the configuration as a +Check-And-Set operation. The update will only happen if the given index matches +the `ModifyIndex` of the configuration at the time of writing. + If ACLs are enabled, the client will need to supply an ACL Token with [`operator`](/docs/internals/acl.html#operator) write privileges. diff --git a/website/source/docs/commands/operator.html.markdown.erb b/website/source/docs/commands/operator.html.markdown.erb index 6961f57627..69bd2a826f 100644 --- a/website/source/docs/commands/operator.html.markdown.erb +++ b/website/source/docs/commands/operator.html.markdown.erb @@ -35,10 +35,12 @@ Usage: consul operator [options] Subcommands: - raft Provides cluster-level tools for Consul operators + autopilot Provides tools for modifying Autopilot configuration + raft Provides cluster-level tools for Consul operators ``` For more information, examples, and usage about a subcommand, click on the name of the subcommand in the sidebar or one of the links below: +- [autopilot] (/docs/commands/operator/autopilot.html) - [raft] (/docs/commands/operator/raft.html) diff --git a/website/source/docs/commands/operator/autopilot.html.markdown.erb b/website/source/docs/commands/operator/autopilot.html.markdown.erb new file mode 100644 index 0000000000..a028914add --- /dev/null +++ b/website/source/docs/commands/operator/autopilot.html.markdown.erb @@ -0,0 +1,67 @@ +--- +layout: "docs" +page_title: "Commands: Operator Autopilot" +sidebar_current: "docs-commands-operator-autopilot" +description: > + The operator autopilot subcommand is used to view and modify Consul's Autopilot configuration. +--- + +# Consul Operator Autopilot + +Command: `consul operator autopilot` + +The Autopilot operator command is used to interact with Consul's Autopilot subsystem. The +command can be used to view or modify the current Autopilot configuration. + +```text +Usage: consul operator autopilot [options] + +The Autopilot operator command is used to interact with Consul's Autopilot +subsystem. The command can be used to view or modify the current configuration. + +Subcommands: + + get-config Display the current Autopilot configuration + set-config Modify the current Autopilot configuration +``` + +## get-config + +This command displays the current Raft peer configuration. + +Usage: `consul operator autopilot get-config [options]` + +#### API Options + +<%= partial "docs/commands/http_api_options_client" %> +<%= partial "docs/commands/http_api_options_server" %> + +The output looks like this: + +``` +DeadServerCleanup = true +``` + +## set-config + +Modifies the current Autopilot configuration. + +Usage: `consul operator autopilot set-config [options]` + +#### API Options + +<%= partial "docs/commands/http_api_options_client" %> +<%= partial "docs/commands/http_api_options_server" %> + +#### Command Options + +* `-dead-server-cleanup` - Specifies whether to enable automatic removal of dead servers +upon the successful joining of new servers to the cluster. Must be one of `[true|false]`. + +The output looks like this: + +``` +Configuration updated! +``` + +The return code will indicate success or failure. diff --git a/website/source/layouts/docs.erb b/website/source/layouts/docs.erb index f972c06607..c1e2f28876 100644 --- a/website/source/layouts/docs.erb +++ b/website/source/layouts/docs.erb @@ -142,6 +142,9 @@ > operator