mirror of
https://github.com/status-im/consul.git
synced 2025-01-09 13:26:07 +00:00
Add state store table and endpoints for autopilot
This commit is contained in:
parent
950a9d2212
commit
81c7a0299e
@ -25,10 +25,10 @@ import (
|
||||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/go-sockaddr/template"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"github.com/shirou/gopsutil/host"
|
||||
"github.com/hashicorp/raft"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -383,6 +383,9 @@ func (a *Agent) consulConfig() *consul.Config {
|
||||
if a.config.Protocol > 0 {
|
||||
base.ProtocolVersion = uint8(a.config.Protocol)
|
||||
}
|
||||
if a.config.RaftProtocol != 0 {
|
||||
base.RaftConfig.ProtocolVersion = raft.ProtocolVersion(a.config.RaftProtocol)
|
||||
}
|
||||
if a.config.ACLToken != "" {
|
||||
base.ACLToken = a.config.ACLToken
|
||||
}
|
||||
@ -413,11 +416,8 @@ func (a *Agent) consulConfig() *consul.Config {
|
||||
if a.config.SessionTTLMinRaw != "" {
|
||||
base.SessionTTLMin = a.config.SessionTTLMin
|
||||
}
|
||||
if a.config.Autopilot.RaftProtocolVersion != 0 {
|
||||
base.RaftConfig.ProtocolVersion = raft.ProtocolVersion(a.config.Autopilot.RaftProtocolVersion)
|
||||
}
|
||||
if a.config.Autopilot.DeadServerCleanup != nil {
|
||||
base.DeadServerCleanup = *a.config.Autopilot.DeadServerCleanup
|
||||
base.AutopilotConfig.DeadServerCleanup = *a.config.Autopilot.DeadServerCleanup
|
||||
}
|
||||
|
||||
// Format the build string
|
||||
|
@ -772,9 +772,6 @@ func DefaultConfig() *Config {
|
||||
CheckReapInterval: 30 * time.Second,
|
||||
AEInterval: time.Minute,
|
||||
DisableCoordinates: false,
|
||||
Autopilot: Autopilot{
|
||||
DeadServerCleanup: Bool(true),
|
||||
},
|
||||
|
||||
// SyncCoordinateRateTarget is set based on the rate that we want
|
||||
// the server to handle as an aggregate across the entire cluster.
|
||||
|
@ -297,6 +297,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
|
||||
s.handleFuncMetrics("/v1/operator/raft/configuration", s.wrap(s.OperatorRaftConfiguration))
|
||||
s.handleFuncMetrics("/v1/operator/raft/peer", s.wrap(s.OperatorRaftPeer))
|
||||
s.handleFuncMetrics("/v1/operator/keyring", s.wrap(s.OperatorKeyringEndpoint))
|
||||
s.handleFuncMetrics("/v1/operator/autopilot/configuration", s.wrap(s.OperatorAutopilotConfiguration))
|
||||
s.handleFuncMetrics("/v1/query", s.wrap(s.PreparedQueryGeneral))
|
||||
s.handleFuncMetrics("/v1/query/", s.wrap(s.PreparedQuerySpecific))
|
||||
s.handleFuncMetrics("/v1/session/create", s.wrap(s.SessionCreate))
|
||||
|
@ -3,11 +3,11 @@ package agent
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/raft"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// OperatorRaftConfiguration is used to inspect the current Raft configuration.
|
||||
@ -105,7 +105,7 @@ func (s *HTTPServer) OperatorKeyringEndpoint(resp http.ResponseWriter, req *http
|
||||
case "DELETE":
|
||||
return s.KeyringRemove(resp, req, &args)
|
||||
default:
|
||||
resp.WriteHeader(405)
|
||||
resp.WriteHeader(http.StatusMethodNotAllowed)
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
@ -166,3 +166,43 @@ func keyringErrorsOrNil(responses []*structs.KeyringResponse) error {
|
||||
}
|
||||
return errs
|
||||
}
|
||||
|
||||
// OperatorAutopilotConfiguration is used to inspect the current Autopilot configuration.
|
||||
// This supports the stale query mode in case the cluster doesn't have a leader.
|
||||
func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
// Switch on the method
|
||||
switch req.Method {
|
||||
case "GET":
|
||||
var args structs.DCSpecificRequest
|
||||
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var reply structs.AutopilotConfig
|
||||
if err := s.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return reply, nil
|
||||
case "PUT":
|
||||
var args structs.AutopilotSetConfigRequest
|
||||
s.parseDC(req, &args.Datacenter)
|
||||
s.parseToken(req, &args.Token)
|
||||
|
||||
if err := decodeBody(req, &args.Config, nil); err != nil {
|
||||
resp.WriteHeader(400)
|
||||
resp.Write([]byte(fmt.Sprintf("Error parsing relay factor: %v", err)))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var reply struct{}
|
||||
if err := s.agent.RPC("Operator.AutopilotSetConfiguration", &args, &reply); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
default:
|
||||
resp.WriteHeader(http.StatusMethodNotAllowed)
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
@ -285,3 +285,60 @@ func TestOperator_Keyring_InvalidRelayFactor(t *testing.T) {
|
||||
}
|
||||
}, configFunc)
|
||||
}
|
||||
|
||||
func TestOperator_AutopilotGetConfiguration(t *testing.T) {
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
body := bytes.NewBuffer(nil)
|
||||
req, err := http.NewRequest("GET", "/v1/operator/autopilot/configuration", body)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.OperatorAutopilotConfiguration(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if resp.Code != 200 {
|
||||
t.Fatalf("bad code: %d", resp.Code)
|
||||
}
|
||||
out, ok := obj.(structs.AutopilotConfig)
|
||||
if !ok {
|
||||
t.Fatalf("unexpected: %T", obj)
|
||||
}
|
||||
if !out.DeadServerCleanup {
|
||||
t.Fatalf("bad: %#v", out)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestOperator_AutopilotSetConfiguration(t *testing.T) {
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
body := bytes.NewBuffer([]byte(`{"DeadServerCleanup": false}`))
|
||||
req, err := http.NewRequest("PUT", "/v1/operator/autopilot/configuration", body)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
if _, err = srv.OperatorAutopilotConfiguration(resp, req); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if resp.Code != 200 {
|
||||
t.Fatalf("bad code: %d", resp.Code)
|
||||
}
|
||||
|
||||
args := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
|
||||
var reply structs.AutopilotConfig
|
||||
if err := srv.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if reply.DeadServerCleanup {
|
||||
t.Fatalf("bad: %#v", reply)
|
||||
}
|
||||
})
|
||||
}
|
@ -7,6 +7,7 @@ import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/memberlist"
|
||||
@ -275,9 +276,9 @@ type Config struct {
|
||||
// place, and a small jitter is applied to avoid a thundering herd.
|
||||
RPCHoldTimeout time.Duration
|
||||
|
||||
// DeadServerCleanup controls whether to remove dead servers when a new
|
||||
// server is added to the Raft peers
|
||||
DeadServerCleanup bool
|
||||
// AutopilotConfig is used to apply the initial autopilot config when
|
||||
// bootstrapping.
|
||||
AutopilotConfig *structs.AutopilotConfig
|
||||
}
|
||||
|
||||
// CheckVersion is used to check if the ProtocolVersion is valid
|
||||
@ -351,7 +352,9 @@ func DefaultConfig() *Config {
|
||||
|
||||
TLSMinVersion: "tls10",
|
||||
|
||||
AutopilotConfig: &structs.AutopilotConfig{
|
||||
DeadServerCleanup: true,
|
||||
},
|
||||
}
|
||||
|
||||
// Increase our reap interval to 3 days instead of 24h.
|
||||
|
@ -152,6 +152,13 @@ func (s *Server) establishLeadership() error {
|
||||
err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Setup autopilot config if we are the leader and need to
|
||||
if err := s.initializeAutopilot(); err != nil {
|
||||
s.logger.Printf("[ERR] consul: Autopilot initialization failed: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -237,6 +244,26 @@ func (s *Server) initializeACL() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// initializeAutopilot is used to setup the autopilot config if we are
|
||||
// the leader and need to do this
|
||||
func (s *Server) initializeAutopilot() error {
|
||||
// Bail if the config has already been initialized
|
||||
state := s.fsm.State()
|
||||
config, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get autopilot config: %v", err)
|
||||
}
|
||||
if config != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := state.UpdateAutopilotConfig(s.config.AutopilotConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// reconcile is used to reconcile the differences between Serf
|
||||
// membership and what is reflected in our strongly consistent store.
|
||||
// Mainly we need to ensure all live nodes are registered, all failed
|
||||
@ -581,8 +608,14 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error {
|
||||
}
|
||||
}
|
||||
|
||||
state := s.fsm.State()
|
||||
autopilotConf, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Look for dead servers to clean up
|
||||
if s.config.DeadServerCleanup {
|
||||
if autopilotConf.DeadServerCleanup {
|
||||
for _, member := range s.serfLAN.Members() {
|
||||
valid, _ := agent.IsConsulServer(member)
|
||||
if valid && member.Name != m.Name && member.Status == serf.StatusFailed {
|
||||
|
@ -125,3 +125,55 @@ REMOVE:
|
||||
op.srv.logger.Printf("[WARN] consul.operator: Removed Raft peer %q", args.Address)
|
||||
return nil
|
||||
}
|
||||
|
||||
// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration.
|
||||
func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *structs.AutopilotConfig) error {
|
||||
if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// This action requires operator read access.
|
||||
acl, err := op.srv.resolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if acl != nil && !acl.OperatorRead() {
|
||||
return permissionDeniedErr
|
||||
}
|
||||
|
||||
// We can't fetch the leader and the configuration atomically with
|
||||
// the current Raft API.
|
||||
state := op.srv.fsm.State()
|
||||
config, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*reply = *config
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// AutopilotGetConfiguration is used to set the current Autopilot configuration.
|
||||
func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRequest, reply *struct{}) error {
|
||||
if done, err := op.srv.forward("Operator.AutopilotSetConfiguration", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// This action requires operator read access.
|
||||
acl, err := op.srv.resolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if acl != nil && !acl.OperatorWrite() {
|
||||
return permissionDeniedErr
|
||||
}
|
||||
|
||||
// Update the autopilot config
|
||||
state := op.srv.fsm.State()
|
||||
if err := state.UpdateAutopilotConfig(&args.Config); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -243,3 +243,189 @@ func TestOperator_RaftRemovePeerByAddress_ACLDeny(t *testing.T) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_Autopilot_GetConfiguration(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.AutopilotConfig.DeadServerCleanup = false
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Change the autopilot config from the default
|
||||
arg := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var reply structs.AutopilotConfig
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if reply.DeadServerCleanup {
|
||||
t.Fatalf("bad: %#v", reply)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_Autopilot_GetConfiguration_ACLDeny(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLMasterToken = "root"
|
||||
c.ACLDefaultPolicy = "deny"
|
||||
c.AutopilotConfig.DeadServerCleanup = false
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Change the autopilot config from the default
|
||||
arg := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var reply structs.AutopilotConfig
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
|
||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Create an ACL with operator read permissions.
|
||||
var token string
|
||||
{
|
||||
var rules = `
|
||||
operator = "read"
|
||||
`
|
||||
|
||||
req := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTypeClient,
|
||||
Rules: rules,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Now it should kick back for being an invalid config, which means it
|
||||
// tried to do the operation.
|
||||
arg.Token = token
|
||||
err = msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if reply.DeadServerCleanup {
|
||||
t.Fatalf("bad: %#v", reply)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_Autopilot_SetConfiguration(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.AutopilotConfig.DeadServerCleanup = false
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Change the autopilot config from the default
|
||||
arg := structs.AutopilotSetConfigRequest{
|
||||
Datacenter: "dc1",
|
||||
Config: structs.AutopilotConfig{
|
||||
DeadServerCleanup: true,
|
||||
},
|
||||
}
|
||||
var reply struct{}
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Make sure it's changed
|
||||
state := s1.fsm.State()
|
||||
config, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !config.DeadServerCleanup {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLMasterToken = "root"
|
||||
c.ACLDefaultPolicy = "deny"
|
||||
c.AutopilotConfig.DeadServerCleanup = false
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Change the autopilot config from the default
|
||||
arg := structs.AutopilotSetConfigRequest{
|
||||
Datacenter: "dc1",
|
||||
Config: structs.AutopilotConfig{
|
||||
DeadServerCleanup: true,
|
||||
},
|
||||
}
|
||||
var reply struct{}
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply)
|
||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Create an ACL with operator write permissions.
|
||||
var token string
|
||||
{
|
||||
var rules = `
|
||||
operator = "write"
|
||||
`
|
||||
|
||||
req := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTypeClient,
|
||||
Rules: rules,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Now it should kick back for being an invalid config, which means it
|
||||
// tried to do the operation.
|
||||
arg.Token = token
|
||||
err = msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Make sure it's changed
|
||||
state := s1.fsm.State()
|
||||
config, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !config.DeadServerCleanup {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
}
|
@ -289,7 +289,7 @@ func (s *Server) maybeBootstrap() {
|
||||
addr := server.Addr.String()
|
||||
addrs = append(addrs, addr)
|
||||
var id raft.ServerID
|
||||
if server.ID != "" && minRaftVersion >= 3 {
|
||||
if minRaftVersion >= 3 {
|
||||
id = raft.ServerID(server.ID)
|
||||
} else {
|
||||
id = raft.ServerID(addr)
|
||||
|
@ -12,8 +12,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
)
|
||||
|
||||
var nextPort int32 = 15000
|
||||
|
39
consul/state/autopilot.go
Normal file
39
consul/state/autopilot.go
Normal file
@ -0,0 +1,39 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
)
|
||||
|
||||
// AutopilotConfig is used to get the current Autopilot configuration.
|
||||
func (s *StateStore) AutopilotConfig() (*structs.AutopilotConfig, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the autopilot config
|
||||
c, err := tx.First("autopilot-config", "id")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed autopilot config lookup: %s", err)
|
||||
}
|
||||
|
||||
config, ok := c.(*structs.AutopilotConfig)
|
||||
if !ok {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
||||
// AutopilotConfig is used to set the current Autopilot configuration.
|
||||
func (s *StateStore) UpdateAutopilotConfig(config *structs.AutopilotConfig) error {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
if err := tx.Insert("autopilot-config", config); err != nil {
|
||||
return fmt.Errorf("failed updating autopilot config: %s", err)
|
||||
}
|
||||
|
||||
tx.Commit()
|
||||
return nil
|
||||
}
|
31
consul/state/autopilot_test.go
Normal file
31
consul/state/autopilot_test.go
Normal file
@ -0,0 +1,31 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
)
|
||||
|
||||
func TestStateStore_Autopilot(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
expected := &structs.AutopilotConfig{
|
||||
DeadServerCleanup: true,
|
||||
}
|
||||
|
||||
if err := s.UpdateAutopilotConfig(expected); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
idx, config, err := s.AutopilotConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if idx != 0 {
|
||||
t.Fatalf("bad: %d", idx)
|
||||
}
|
||||
if !reflect.DeepEqual(expected, config) {
|
||||
t.Fatalf("bad: %#v, %#v", expected, config)
|
||||
}
|
||||
}
|
@ -31,6 +31,7 @@ func stateStoreSchema() *memdb.DBSchema {
|
||||
aclsTableSchema,
|
||||
coordinatesTableSchema,
|
||||
preparedQueriesTableSchema,
|
||||
autopilotConfigTableSchema,
|
||||
}
|
||||
|
||||
// Add the tables to the root schema
|
||||
@ -440,3 +441,21 @@ func preparedQueriesTableSchema() *memdb.TableSchema {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// autopilotConfigTableSchema returns a new table schema used for storing
|
||||
// the autopilot configuration
|
||||
func autopilotConfigTableSchema() *memdb.TableSchema {
|
||||
return &memdb.TableSchema{
|
||||
Name: "autopilot-config",
|
||||
Indexes: map[string]*memdb.IndexSchema{
|
||||
"id": &memdb.IndexSchema{
|
||||
Name: "id",
|
||||
AllowMissing: true,
|
||||
Unique: true,
|
||||
Indexer: &memdb.ConditionalIndex{
|
||||
Conditional: func(obj interface{}) (bool, error) { return true, nil },
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -4,6 +4,12 @@ import (
|
||||
"github.com/hashicorp/raft"
|
||||
)
|
||||
|
||||
type AutopilotConfig struct {
|
||||
// DeadServerCleanup controls whether to remove dead servers when a new
|
||||
// server is added to the Raft peers
|
||||
DeadServerCleanup bool
|
||||
}
|
||||
|
||||
// RaftServer has information about a server in the Raft configuration.
|
||||
type RaftServer struct {
|
||||
// ID is the unique ID for the server. These are currently the same
|
||||
@ -55,3 +61,21 @@ type RaftPeerByAddressRequest struct {
|
||||
func (op *RaftPeerByAddressRequest) RequestDatacenter() string {
|
||||
return op.Datacenter
|
||||
}
|
||||
|
||||
// AutopilotSetConfigRequest is used by the Operator endpoint to update the
|
||||
// current Autopilot configuration of the cluster.
|
||||
type AutopilotSetConfigRequest struct {
|
||||
// Datacenter is the target this request is intended for.
|
||||
Datacenter string
|
||||
|
||||
// Config is the new Autopilot configuration to use.
|
||||
Config AutopilotConfig
|
||||
|
||||
// WriteRequest holds the ACL token to go along with this request.
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
// RequestDatacenter returns the datacenter for a given request.
|
||||
func (op *AutopilotSetConfigRequest) RequestDatacenter() string {
|
||||
return op.Datacenter
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user