mirror of https://github.com/status-im/consul.git
Merge pull request #2858 from hashicorp/operator-shuffle
Moves operator sub-functions into their own files.
This commit is contained in:
commit
2dc74e8683
378
api/operator.go
378
api/operator.go
|
@ -1,14 +1,5 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Operator can be used to perform low-level operator tasks for Consul.
|
||||
type Operator struct {
|
||||
c *Client
|
||||
|
@ -18,372 +9,3 @@ type Operator struct {
|
|||
func (c *Client) Operator() *Operator {
|
||||
return &Operator{c}
|
||||
}
|
||||
|
||||
// RaftServer has information about a server in the Raft configuration.
|
||||
type RaftServer struct {
|
||||
// ID is the unique ID for the server. These are currently the same
|
||||
// as the address, but they will be changed to a real GUID in a future
|
||||
// release of Consul.
|
||||
ID string
|
||||
|
||||
// Node is the node name of the server, as known by Consul, or this
|
||||
// will be set to "(unknown)" otherwise.
|
||||
Node string
|
||||
|
||||
// Address is the IP:port of the server, used for Raft communications.
|
||||
Address string
|
||||
|
||||
// Leader is true if this server is the current cluster leader.
|
||||
Leader bool
|
||||
|
||||
// Voter is true if this server has a vote in the cluster. This might
|
||||
// be false if the server is staging and still coming online, or if
|
||||
// it's a non-voting server, which will be added in a future release of
|
||||
// Consul.
|
||||
Voter bool
|
||||
}
|
||||
|
||||
// RaftConfigration is returned when querying for the current Raft configuration.
|
||||
type RaftConfiguration struct {
|
||||
// Servers has the list of servers in the Raft configuration.
|
||||
Servers []*RaftServer
|
||||
|
||||
// Index has the Raft index of this configuration.
|
||||
Index uint64
|
||||
}
|
||||
|
||||
// keyringRequest is used for performing Keyring operations
|
||||
type keyringRequest struct {
|
||||
Key string
|
||||
}
|
||||
|
||||
// KeyringResponse is returned when listing the gossip encryption keys
|
||||
type KeyringResponse struct {
|
||||
// Whether this response is for a WAN ring
|
||||
WAN bool
|
||||
|
||||
// The datacenter name this request corresponds to
|
||||
Datacenter string
|
||||
|
||||
// A map of the encryption keys to the number of nodes they're installed on
|
||||
Keys map[string]int
|
||||
|
||||
// The total number of nodes in this ring
|
||||
NumNodes int
|
||||
}
|
||||
|
||||
// AutopilotConfiguration is used for querying/setting the Autopilot configuration.
|
||||
// Autopilot helps manage operator tasks related to Consul servers like removing
|
||||
// failed servers from the Raft quorum.
|
||||
type AutopilotConfiguration struct {
|
||||
// CleanupDeadServers controls whether to remove dead servers from the Raft
|
||||
// peer list when a new server joins
|
||||
CleanupDeadServers bool
|
||||
|
||||
// LastContactThreshold is the limit on the amount of time a server can go
|
||||
// without leader contact before being considered unhealthy.
|
||||
LastContactThreshold *ReadableDuration
|
||||
|
||||
// MaxTrailingLogs is the amount of entries in the Raft Log that a server can
|
||||
// be behind before being considered unhealthy.
|
||||
MaxTrailingLogs uint64
|
||||
|
||||
// ServerStabilizationTime is the minimum amount of time a server must be
|
||||
// in a stable, healthy state before it can be added to the cluster. Only
|
||||
// applicable with Raft protocol version 3 or higher.
|
||||
ServerStabilizationTime *ReadableDuration
|
||||
|
||||
// (Enterprise-only) RedundancyZoneTag is the node tag to use for separating
|
||||
// servers into zones for redundancy. If left blank, this feature will be disabled.
|
||||
RedundancyZoneTag string
|
||||
|
||||
// (Enterprise-only) DisableUpgradeMigration will disable Autopilot's upgrade migration
|
||||
// strategy of waiting until enough newer-versioned servers have been added to the
|
||||
// cluster before promoting them to voters.
|
||||
DisableUpgradeMigration bool
|
||||
|
||||
// CreateIndex holds the index corresponding the creation of this configuration.
|
||||
// This is a read-only field.
|
||||
CreateIndex uint64
|
||||
|
||||
// ModifyIndex will be set to the index of the last update when retrieving the
|
||||
// Autopilot configuration. Resubmitting a configuration with
|
||||
// AutopilotCASConfiguration will perform a check-and-set operation which ensures
|
||||
// there hasn't been a subsequent update since the configuration was retrieved.
|
||||
ModifyIndex uint64
|
||||
}
|
||||
|
||||
// ServerHealth is the health (from the leader's point of view) of a server.
|
||||
type ServerHealth struct {
|
||||
// ID is the raft ID of the server.
|
||||
ID string
|
||||
|
||||
// Name is the node name of the server.
|
||||
Name string
|
||||
|
||||
// Address is the address of the server.
|
||||
Address string
|
||||
|
||||
// The status of the SerfHealth check for the server.
|
||||
SerfStatus string
|
||||
|
||||
// Version is the Consul version of the server.
|
||||
Version string
|
||||
|
||||
// Leader is whether this server is currently the leader.
|
||||
Leader bool
|
||||
|
||||
// LastContact is the time since this node's last contact with the leader.
|
||||
LastContact *ReadableDuration
|
||||
|
||||
// LastTerm is the highest leader term this server has a record of in its Raft log.
|
||||
LastTerm uint64
|
||||
|
||||
// LastIndex is the last log index this server has a record of in its Raft log.
|
||||
LastIndex uint64
|
||||
|
||||
// Healthy is whether or not the server is healthy according to the current
|
||||
// Autopilot config.
|
||||
Healthy bool
|
||||
|
||||
// Voter is whether this is a voting server.
|
||||
Voter bool
|
||||
|
||||
// StableSince is the last time this server's Healthy value changed.
|
||||
StableSince time.Time
|
||||
}
|
||||
|
||||
// OperatorHealthReply is a representation of the overall health of the cluster
|
||||
type OperatorHealthReply struct {
|
||||
// Healthy is true if all the servers in the cluster are healthy.
|
||||
Healthy bool
|
||||
|
||||
// FailureTolerance is the number of healthy servers that could be lost without
|
||||
// an outage occurring.
|
||||
FailureTolerance int
|
||||
|
||||
// Servers holds the health of each server.
|
||||
Servers []ServerHealth
|
||||
}
|
||||
|
||||
// ReadableDuration is a duration type that is serialized to JSON in human readable format.
|
||||
type ReadableDuration time.Duration
|
||||
|
||||
func NewReadableDuration(dur time.Duration) *ReadableDuration {
|
||||
d := ReadableDuration(dur)
|
||||
return &d
|
||||
}
|
||||
|
||||
func (d *ReadableDuration) String() string { return d.Duration().String() }
|
||||
func (d *ReadableDuration) Duration() time.Duration {
|
||||
if d == nil {
|
||||
return time.Duration(0)
|
||||
}
|
||||
return time.Duration(*d)
|
||||
}
|
||||
|
||||
func (d *ReadableDuration) MarshalJSON() ([]byte, error) {
|
||||
return []byte(fmt.Sprintf(`"%s"`, d.Duration().String())), nil
|
||||
}
|
||||
|
||||
func (d *ReadableDuration) UnmarshalJSON(raw []byte) error {
|
||||
if d == nil {
|
||||
return fmt.Errorf("cannot unmarshal to nil pointer")
|
||||
}
|
||||
|
||||
str := string(raw)
|
||||
if len(str) < 2 || str[0] != '"' || str[len(str)-1] != '"' {
|
||||
return fmt.Errorf("must be enclosed with quotes: %s", str)
|
||||
}
|
||||
dur, err := time.ParseDuration(str[1 : len(str)-1])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*d = ReadableDuration(dur)
|
||||
return nil
|
||||
}
|
||||
|
||||
// RaftGetConfiguration is used to query the current Raft peer set.
|
||||
func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) {
|
||||
r := op.c.newRequest("GET", "/v1/operator/raft/configuration")
|
||||
r.setQueryOptions(q)
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var out RaftConfiguration
|
||||
if err := decodeBody(resp, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &out, nil
|
||||
}
|
||||
|
||||
// RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft
|
||||
// quorum but no longer known to Serf or the catalog) by address in the form of
|
||||
// "IP:port".
|
||||
func (op *Operator) RaftRemovePeerByAddress(address string, q *WriteOptions) error {
|
||||
r := op.c.newRequest("DELETE", "/v1/operator/raft/peer")
|
||||
r.setWriteOptions(q)
|
||||
|
||||
r.params.Set("address", string(address))
|
||||
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp.Body.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// RaftRemovePeerByID is used to kick a stale peer (one that it in the Raft
|
||||
// quorum but no longer known to Serf or the catalog) by ID.
|
||||
func (op *Operator) RaftRemovePeerByID(id string, q *WriteOptions) error {
|
||||
r := op.c.newRequest("DELETE", "/v1/operator/raft/peer")
|
||||
r.setWriteOptions(q)
|
||||
|
||||
r.params.Set("id", string(id))
|
||||
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp.Body.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// KeyringInstall is used to install a new gossip encryption key into the cluster
|
||||
func (op *Operator) KeyringInstall(key string, q *WriteOptions) error {
|
||||
r := op.c.newRequest("POST", "/v1/operator/keyring")
|
||||
r.setWriteOptions(q)
|
||||
r.obj = keyringRequest{
|
||||
Key: key,
|
||||
}
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp.Body.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// KeyringList is used to list the gossip keys installed in the cluster
|
||||
func (op *Operator) KeyringList(q *QueryOptions) ([]*KeyringResponse, error) {
|
||||
r := op.c.newRequest("GET", "/v1/operator/keyring")
|
||||
r.setQueryOptions(q)
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var out []*KeyringResponse
|
||||
if err := decodeBody(resp, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// KeyringRemove is used to remove a gossip encryption key from the cluster
|
||||
func (op *Operator) KeyringRemove(key string, q *WriteOptions) error {
|
||||
r := op.c.newRequest("DELETE", "/v1/operator/keyring")
|
||||
r.setWriteOptions(q)
|
||||
r.obj = keyringRequest{
|
||||
Key: key,
|
||||
}
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp.Body.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// KeyringUse is used to change the active gossip encryption key
|
||||
func (op *Operator) KeyringUse(key string, q *WriteOptions) error {
|
||||
r := op.c.newRequest("PUT", "/v1/operator/keyring")
|
||||
r.setWriteOptions(q)
|
||||
r.obj = keyringRequest{
|
||||
Key: key,
|
||||
}
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp.Body.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// AutopilotGetConfiguration is used to query the current Autopilot configuration.
|
||||
func (op *Operator) AutopilotGetConfiguration(q *QueryOptions) (*AutopilotConfiguration, error) {
|
||||
r := op.c.newRequest("GET", "/v1/operator/autopilot/configuration")
|
||||
r.setQueryOptions(q)
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var out AutopilotConfiguration
|
||||
if err := decodeBody(resp, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &out, nil
|
||||
}
|
||||
|
||||
// AutopilotSetConfiguration is used to set the current Autopilot configuration.
|
||||
func (op *Operator) AutopilotSetConfiguration(conf *AutopilotConfiguration, q *WriteOptions) error {
|
||||
r := op.c.newRequest("PUT", "/v1/operator/autopilot/configuration")
|
||||
r.setWriteOptions(q)
|
||||
r.obj = conf
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp.Body.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// AutopilotCASConfiguration is used to perform a Check-And-Set update on the
|
||||
// Autopilot configuration. The ModifyIndex value will be respected. Returns
|
||||
// true on success or false on failures.
|
||||
func (op *Operator) AutopilotCASConfiguration(conf *AutopilotConfiguration, q *WriteOptions) (bool, error) {
|
||||
r := op.c.newRequest("PUT", "/v1/operator/autopilot/configuration")
|
||||
r.setWriteOptions(q)
|
||||
r.params.Set("cas", strconv.FormatUint(conf.ModifyIndex, 10))
|
||||
r.obj = conf
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var buf bytes.Buffer
|
||||
if _, err := io.Copy(&buf, resp.Body); err != nil {
|
||||
return false, fmt.Errorf("Failed to read response: %v", err)
|
||||
}
|
||||
res := strings.Contains(string(buf.Bytes()), "true")
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// AutopilotServerHealth
|
||||
func (op *Operator) AutopilotServerHealth(q *QueryOptions) (*OperatorHealthReply, error) {
|
||||
r := op.c.newRequest("GET", "/v1/operator/autopilot/health")
|
||||
r.setQueryOptions(q)
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var out OperatorHealthReply
|
||||
if err := decodeBody(resp, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &out, nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,215 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// AutopilotConfiguration is used for querying/setting the Autopilot configuration.
|
||||
// Autopilot helps manage operator tasks related to Consul servers like removing
|
||||
// failed servers from the Raft quorum.
|
||||
type AutopilotConfiguration struct {
|
||||
// CleanupDeadServers controls whether to remove dead servers from the Raft
|
||||
// peer list when a new server joins
|
||||
CleanupDeadServers bool
|
||||
|
||||
// LastContactThreshold is the limit on the amount of time a server can go
|
||||
// without leader contact before being considered unhealthy.
|
||||
LastContactThreshold *ReadableDuration
|
||||
|
||||
// MaxTrailingLogs is the amount of entries in the Raft Log that a server can
|
||||
// be behind before being considered unhealthy.
|
||||
MaxTrailingLogs uint64
|
||||
|
||||
// ServerStabilizationTime is the minimum amount of time a server must be
|
||||
// in a stable, healthy state before it can be added to the cluster. Only
|
||||
// applicable with Raft protocol version 3 or higher.
|
||||
ServerStabilizationTime *ReadableDuration
|
||||
|
||||
// (Enterprise-only) RedundancyZoneTag is the node tag to use for separating
|
||||
// servers into zones for redundancy. If left blank, this feature will be disabled.
|
||||
RedundancyZoneTag string
|
||||
|
||||
// (Enterprise-only) DisableUpgradeMigration will disable Autopilot's upgrade migration
|
||||
// strategy of waiting until enough newer-versioned servers have been added to the
|
||||
// cluster before promoting them to voters.
|
||||
DisableUpgradeMigration bool
|
||||
|
||||
// CreateIndex holds the index corresponding the creation of this configuration.
|
||||
// This is a read-only field.
|
||||
CreateIndex uint64
|
||||
|
||||
// ModifyIndex will be set to the index of the last update when retrieving the
|
||||
// Autopilot configuration. Resubmitting a configuration with
|
||||
// AutopilotCASConfiguration will perform a check-and-set operation which ensures
|
||||
// there hasn't been a subsequent update since the configuration was retrieved.
|
||||
ModifyIndex uint64
|
||||
}
|
||||
|
||||
// ServerHealth is the health (from the leader's point of view) of a server.
|
||||
type ServerHealth struct {
|
||||
// ID is the raft ID of the server.
|
||||
ID string
|
||||
|
||||
// Name is the node name of the server.
|
||||
Name string
|
||||
|
||||
// Address is the address of the server.
|
||||
Address string
|
||||
|
||||
// The status of the SerfHealth check for the server.
|
||||
SerfStatus string
|
||||
|
||||
// Version is the Consul version of the server.
|
||||
Version string
|
||||
|
||||
// Leader is whether this server is currently the leader.
|
||||
Leader bool
|
||||
|
||||
// LastContact is the time since this node's last contact with the leader.
|
||||
LastContact *ReadableDuration
|
||||
|
||||
// LastTerm is the highest leader term this server has a record of in its Raft log.
|
||||
LastTerm uint64
|
||||
|
||||
// LastIndex is the last log index this server has a record of in its Raft log.
|
||||
LastIndex uint64
|
||||
|
||||
// Healthy is whether or not the server is healthy according to the current
|
||||
// Autopilot config.
|
||||
Healthy bool
|
||||
|
||||
// Voter is whether this is a voting server.
|
||||
Voter bool
|
||||
|
||||
// StableSince is the last time this server's Healthy value changed.
|
||||
StableSince time.Time
|
||||
}
|
||||
|
||||
// OperatorHealthReply is a representation of the overall health of the cluster
|
||||
type OperatorHealthReply struct {
|
||||
// Healthy is true if all the servers in the cluster are healthy.
|
||||
Healthy bool
|
||||
|
||||
// FailureTolerance is the number of healthy servers that could be lost without
|
||||
// an outage occurring.
|
||||
FailureTolerance int
|
||||
|
||||
// Servers holds the health of each server.
|
||||
Servers []ServerHealth
|
||||
}
|
||||
|
||||
// ReadableDuration is a duration type that is serialized to JSON in human readable format.
|
||||
type ReadableDuration time.Duration
|
||||
|
||||
func NewReadableDuration(dur time.Duration) *ReadableDuration {
|
||||
d := ReadableDuration(dur)
|
||||
return &d
|
||||
}
|
||||
|
||||
func (d *ReadableDuration) String() string {
|
||||
return d.Duration().String()
|
||||
}
|
||||
|
||||
func (d *ReadableDuration) Duration() time.Duration {
|
||||
if d == nil {
|
||||
return time.Duration(0)
|
||||
}
|
||||
return time.Duration(*d)
|
||||
}
|
||||
|
||||
func (d *ReadableDuration) MarshalJSON() ([]byte, error) {
|
||||
return []byte(fmt.Sprintf(`"%s"`, d.Duration().String())), nil
|
||||
}
|
||||
|
||||
func (d *ReadableDuration) UnmarshalJSON(raw []byte) error {
|
||||
if d == nil {
|
||||
return fmt.Errorf("cannot unmarshal to nil pointer")
|
||||
}
|
||||
|
||||
str := string(raw)
|
||||
if len(str) < 2 || str[0] != '"' || str[len(str)-1] != '"' {
|
||||
return fmt.Errorf("must be enclosed with quotes: %s", str)
|
||||
}
|
||||
dur, err := time.ParseDuration(str[1 : len(str)-1])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*d = ReadableDuration(dur)
|
||||
return nil
|
||||
}
|
||||
|
||||
// AutopilotGetConfiguration is used to query the current Autopilot configuration.
|
||||
func (op *Operator) AutopilotGetConfiguration(q *QueryOptions) (*AutopilotConfiguration, error) {
|
||||
r := op.c.newRequest("GET", "/v1/operator/autopilot/configuration")
|
||||
r.setQueryOptions(q)
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var out AutopilotConfiguration
|
||||
if err := decodeBody(resp, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &out, nil
|
||||
}
|
||||
|
||||
// AutopilotSetConfiguration is used to set the current Autopilot configuration.
|
||||
func (op *Operator) AutopilotSetConfiguration(conf *AutopilotConfiguration, q *WriteOptions) error {
|
||||
r := op.c.newRequest("PUT", "/v1/operator/autopilot/configuration")
|
||||
r.setWriteOptions(q)
|
||||
r.obj = conf
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp.Body.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// AutopilotCASConfiguration is used to perform a Check-And-Set update on the
|
||||
// Autopilot configuration. The ModifyIndex value will be respected. Returns
|
||||
// true on success or false on failures.
|
||||
func (op *Operator) AutopilotCASConfiguration(conf *AutopilotConfiguration, q *WriteOptions) (bool, error) {
|
||||
r := op.c.newRequest("PUT", "/v1/operator/autopilot/configuration")
|
||||
r.setWriteOptions(q)
|
||||
r.params.Set("cas", strconv.FormatUint(conf.ModifyIndex, 10))
|
||||
r.obj = conf
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var buf bytes.Buffer
|
||||
if _, err := io.Copy(&buf, resp.Body); err != nil {
|
||||
return false, fmt.Errorf("Failed to read response: %v", err)
|
||||
}
|
||||
res := strings.Contains(string(buf.Bytes()), "true")
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// AutopilotServerHealth
|
||||
func (op *Operator) AutopilotServerHealth(q *QueryOptions) (*OperatorHealthReply, error) {
|
||||
r := op.c.newRequest("GET", "/v1/operator/autopilot/health")
|
||||
r.setQueryOptions(q)
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var out OperatorHealthReply
|
||||
if err := decodeBody(resp, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &out, nil
|
||||
}
|
|
@ -0,0 +1,107 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
)
|
||||
|
||||
func TestOperator_AutopilotGetSetConfiguration(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
defer s.Stop()
|
||||
|
||||
operator := c.Operator()
|
||||
config, err := operator.AutopilotGetConfiguration(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !config.CleanupDeadServers {
|
||||
t.Fatalf("bad: %v", config)
|
||||
}
|
||||
|
||||
// Change a config setting
|
||||
newConf := &AutopilotConfiguration{CleanupDeadServers: false}
|
||||
if err := operator.AutopilotSetConfiguration(newConf, nil); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
config, err = operator.AutopilotGetConfiguration(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if config.CleanupDeadServers {
|
||||
t.Fatalf("bad: %v", config)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_AutopilotCASConfiguration(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
defer s.Stop()
|
||||
|
||||
operator := c.Operator()
|
||||
config, err := operator.AutopilotGetConfiguration(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !config.CleanupDeadServers {
|
||||
t.Fatalf("bad: %v", config)
|
||||
}
|
||||
|
||||
// Pass an invalid ModifyIndex
|
||||
{
|
||||
newConf := &AutopilotConfiguration{
|
||||
CleanupDeadServers: false,
|
||||
ModifyIndex: config.ModifyIndex - 1,
|
||||
}
|
||||
resp, err := operator.AutopilotCASConfiguration(newConf, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if resp {
|
||||
t.Fatalf("bad: %v", resp)
|
||||
}
|
||||
}
|
||||
|
||||
// Pass a valid ModifyIndex
|
||||
{
|
||||
newConf := &AutopilotConfiguration{
|
||||
CleanupDeadServers: false,
|
||||
ModifyIndex: config.ModifyIndex,
|
||||
}
|
||||
resp, err := operator.AutopilotCASConfiguration(newConf, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !resp {
|
||||
t.Fatalf("bad: %v", resp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_AutopilotServerHealth(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) {
|
||||
c.RaftProtocol = 3
|
||||
})
|
||||
defer s.Stop()
|
||||
|
||||
operator := c.Operator()
|
||||
if err := testutil.WaitForResult(func() (bool, error) {
|
||||
out, err := operator.AutopilotServerHealth(nil)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("err: %v", err)
|
||||
}
|
||||
if len(out.Servers) != 1 ||
|
||||
!out.Servers[0].Healthy ||
|
||||
out.Servers[0].Name != s.Config.NodeName {
|
||||
return false, fmt.Errorf("bad: %v", out)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,83 @@
|
|||
package api
|
||||
|
||||
// keyringRequest is used for performing Keyring operations
|
||||
type keyringRequest struct {
|
||||
Key string
|
||||
}
|
||||
|
||||
// KeyringResponse is returned when listing the gossip encryption keys
|
||||
type KeyringResponse struct {
|
||||
// Whether this response is for a WAN ring
|
||||
WAN bool
|
||||
|
||||
// The datacenter name this request corresponds to
|
||||
Datacenter string
|
||||
|
||||
// A map of the encryption keys to the number of nodes they're installed on
|
||||
Keys map[string]int
|
||||
|
||||
// The total number of nodes in this ring
|
||||
NumNodes int
|
||||
}
|
||||
|
||||
// KeyringInstall is used to install a new gossip encryption key into the cluster
|
||||
func (op *Operator) KeyringInstall(key string, q *WriteOptions) error {
|
||||
r := op.c.newRequest("POST", "/v1/operator/keyring")
|
||||
r.setWriteOptions(q)
|
||||
r.obj = keyringRequest{
|
||||
Key: key,
|
||||
}
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp.Body.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// KeyringList is used to list the gossip keys installed in the cluster
|
||||
func (op *Operator) KeyringList(q *QueryOptions) ([]*KeyringResponse, error) {
|
||||
r := op.c.newRequest("GET", "/v1/operator/keyring")
|
||||
r.setQueryOptions(q)
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var out []*KeyringResponse
|
||||
if err := decodeBody(resp, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// KeyringRemove is used to remove a gossip encryption key from the cluster
|
||||
func (op *Operator) KeyringRemove(key string, q *WriteOptions) error {
|
||||
r := op.c.newRequest("DELETE", "/v1/operator/keyring")
|
||||
r.setWriteOptions(q)
|
||||
r.obj = keyringRequest{
|
||||
Key: key,
|
||||
}
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp.Body.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// KeyringUse is used to change the active gossip encryption key
|
||||
func (op *Operator) KeyringUse(key string, q *WriteOptions) error {
|
||||
r := op.c.newRequest("PUT", "/v1/operator/keyring")
|
||||
r.setWriteOptions(q)
|
||||
r.obj = keyringRequest{
|
||||
Key: key,
|
||||
}
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp.Body.Close()
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
)
|
||||
|
||||
func TestOperator_KeyringInstallListPutRemove(t *testing.T) {
|
||||
oldKey := "d8wu8CSUrqgtjVsvcBPmhQ=="
|
||||
newKey := "qxycTi/SsePj/TZzCBmNXw=="
|
||||
t.Parallel()
|
||||
c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) {
|
||||
c.Encrypt = oldKey
|
||||
})
|
||||
defer s.Stop()
|
||||
|
||||
operator := c.Operator()
|
||||
if err := operator.KeyringInstall(newKey, nil); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
listResponses, err := operator.KeyringList(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err %v", err)
|
||||
}
|
||||
|
||||
// Make sure the new key is installed
|
||||
if len(listResponses) != 2 {
|
||||
t.Fatalf("bad: %v", len(listResponses))
|
||||
}
|
||||
for _, response := range listResponses {
|
||||
if len(response.Keys) != 2 {
|
||||
t.Fatalf("bad: %v", len(response.Keys))
|
||||
}
|
||||
if _, ok := response.Keys[oldKey]; !ok {
|
||||
t.Fatalf("bad: %v", ok)
|
||||
}
|
||||
if _, ok := response.Keys[newKey]; !ok {
|
||||
t.Fatalf("bad: %v", ok)
|
||||
}
|
||||
}
|
||||
|
||||
// Switch the primary to the new key
|
||||
if err := operator.KeyringUse(newKey, nil); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if err := operator.KeyringRemove(oldKey, nil); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
listResponses, err = operator.KeyringList(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err %v", err)
|
||||
}
|
||||
|
||||
// Make sure the old key is removed
|
||||
if len(listResponses) != 2 {
|
||||
t.Fatalf("bad: %v", len(listResponses))
|
||||
}
|
||||
for _, response := range listResponses {
|
||||
if len(response.Keys) != 1 {
|
||||
t.Fatalf("bad: %v", len(response.Keys))
|
||||
}
|
||||
if _, ok := response.Keys[oldKey]; ok {
|
||||
t.Fatalf("bad: %v", ok)
|
||||
}
|
||||
if _, ok := response.Keys[newKey]; !ok {
|
||||
t.Fatalf("bad: %v", ok)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,86 @@
|
|||
package api
|
||||
|
||||
// RaftServer has information about a server in the Raft configuration.
|
||||
type RaftServer struct {
|
||||
// ID is the unique ID for the server. These are currently the same
|
||||
// as the address, but they will be changed to a real GUID in a future
|
||||
// release of Consul.
|
||||
ID string
|
||||
|
||||
// Node is the node name of the server, as known by Consul, or this
|
||||
// will be set to "(unknown)" otherwise.
|
||||
Node string
|
||||
|
||||
// Address is the IP:port of the server, used for Raft communications.
|
||||
Address string
|
||||
|
||||
// Leader is true if this server is the current cluster leader.
|
||||
Leader bool
|
||||
|
||||
// Voter is true if this server has a vote in the cluster. This might
|
||||
// be false if the server is staging and still coming online, or if
|
||||
// it's a non-voting server, which will be added in a future release of
|
||||
// Consul.
|
||||
Voter bool
|
||||
}
|
||||
|
||||
// RaftConfigration is returned when querying for the current Raft configuration.
|
||||
type RaftConfiguration struct {
|
||||
// Servers has the list of servers in the Raft configuration.
|
||||
Servers []*RaftServer
|
||||
|
||||
// Index has the Raft index of this configuration.
|
||||
Index uint64
|
||||
}
|
||||
|
||||
// RaftGetConfiguration is used to query the current Raft peer set.
|
||||
func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) {
|
||||
r := op.c.newRequest("GET", "/v1/operator/raft/configuration")
|
||||
r.setQueryOptions(q)
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var out RaftConfiguration
|
||||
if err := decodeBody(resp, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &out, nil
|
||||
}
|
||||
|
||||
// RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft
|
||||
// quorum but no longer known to Serf or the catalog) by address in the form of
|
||||
// "IP:port".
|
||||
func (op *Operator) RaftRemovePeerByAddress(address string, q *WriteOptions) error {
|
||||
r := op.c.newRequest("DELETE", "/v1/operator/raft/peer")
|
||||
r.setWriteOptions(q)
|
||||
|
||||
r.params.Set("address", string(address))
|
||||
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp.Body.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// RaftRemovePeerByID is used to kick a stale peer (one that it in the Raft
|
||||
// quorum but no longer known to Serf or the catalog) by ID.
|
||||
func (op *Operator) RaftRemovePeerByID(id string, q *WriteOptions) error {
|
||||
r := op.c.newRequest("DELETE", "/v1/operator/raft/peer")
|
||||
r.setWriteOptions(q)
|
||||
|
||||
r.params.Set("id", string(id))
|
||||
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp.Body.Close()
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestOperator_RaftGetConfiguration(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
defer s.Stop()
|
||||
|
||||
operator := c.Operator()
|
||||
out, err := operator.RaftGetConfiguration(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(out.Servers) != 1 ||
|
||||
!out.Servers[0].Leader ||
|
||||
!out.Servers[0].Voter {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_RaftRemovePeerByAddress(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
defer s.Stop()
|
||||
|
||||
// If we get this error, it proves we sent the address all the way
|
||||
// through.
|
||||
operator := c.Operator()
|
||||
err := operator.RaftRemovePeerByAddress("nope", nil)
|
||||
if err == nil || !strings.Contains(err.Error(),
|
||||
"address \"nope\" was not found in the Raft configuration") {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
|
@ -1,206 +0,0 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
)
|
||||
|
||||
func TestOperator_RaftGetConfiguration(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
defer s.Stop()
|
||||
|
||||
operator := c.Operator()
|
||||
out, err := operator.RaftGetConfiguration(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(out.Servers) != 1 ||
|
||||
!out.Servers[0].Leader ||
|
||||
!out.Servers[0].Voter {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_RaftRemovePeerByAddress(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
defer s.Stop()
|
||||
|
||||
// If we get this error, it proves we sent the address all the way
|
||||
// through.
|
||||
operator := c.Operator()
|
||||
err := operator.RaftRemovePeerByAddress("nope", nil)
|
||||
if err == nil || !strings.Contains(err.Error(),
|
||||
"address \"nope\" was not found in the Raft configuration") {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_KeyringInstallListPutRemove(t *testing.T) {
|
||||
oldKey := "d8wu8CSUrqgtjVsvcBPmhQ=="
|
||||
newKey := "qxycTi/SsePj/TZzCBmNXw=="
|
||||
t.Parallel()
|
||||
c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) {
|
||||
c.Encrypt = oldKey
|
||||
})
|
||||
defer s.Stop()
|
||||
|
||||
operator := c.Operator()
|
||||
if err := operator.KeyringInstall(newKey, nil); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
listResponses, err := operator.KeyringList(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err %v", err)
|
||||
}
|
||||
|
||||
// Make sure the new key is installed
|
||||
if len(listResponses) != 2 {
|
||||
t.Fatalf("bad: %v", len(listResponses))
|
||||
}
|
||||
for _, response := range listResponses {
|
||||
if len(response.Keys) != 2 {
|
||||
t.Fatalf("bad: %v", len(response.Keys))
|
||||
}
|
||||
if _, ok := response.Keys[oldKey]; !ok {
|
||||
t.Fatalf("bad: %v", ok)
|
||||
}
|
||||
if _, ok := response.Keys[newKey]; !ok {
|
||||
t.Fatalf("bad: %v", ok)
|
||||
}
|
||||
}
|
||||
|
||||
// Switch the primary to the new key
|
||||
if err := operator.KeyringUse(newKey, nil); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if err := operator.KeyringRemove(oldKey, nil); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
listResponses, err = operator.KeyringList(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err %v", err)
|
||||
}
|
||||
|
||||
// Make sure the old key is removed
|
||||
if len(listResponses) != 2 {
|
||||
t.Fatalf("bad: %v", len(listResponses))
|
||||
}
|
||||
for _, response := range listResponses {
|
||||
if len(response.Keys) != 1 {
|
||||
t.Fatalf("bad: %v", len(response.Keys))
|
||||
}
|
||||
if _, ok := response.Keys[oldKey]; ok {
|
||||
t.Fatalf("bad: %v", ok)
|
||||
}
|
||||
if _, ok := response.Keys[newKey]; !ok {
|
||||
t.Fatalf("bad: %v", ok)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_AutopilotGetSetConfiguration(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
defer s.Stop()
|
||||
|
||||
operator := c.Operator()
|
||||
config, err := operator.AutopilotGetConfiguration(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !config.CleanupDeadServers {
|
||||
t.Fatalf("bad: %v", config)
|
||||
}
|
||||
|
||||
// Change a config setting
|
||||
newConf := &AutopilotConfiguration{CleanupDeadServers: false}
|
||||
if err := operator.AutopilotSetConfiguration(newConf, nil); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
config, err = operator.AutopilotGetConfiguration(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if config.CleanupDeadServers {
|
||||
t.Fatalf("bad: %v", config)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_AutopilotCASConfiguration(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
defer s.Stop()
|
||||
|
||||
operator := c.Operator()
|
||||
config, err := operator.AutopilotGetConfiguration(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !config.CleanupDeadServers {
|
||||
t.Fatalf("bad: %v", config)
|
||||
}
|
||||
|
||||
// Pass an invalid ModifyIndex
|
||||
{
|
||||
newConf := &AutopilotConfiguration{
|
||||
CleanupDeadServers: false,
|
||||
ModifyIndex: config.ModifyIndex - 1,
|
||||
}
|
||||
resp, err := operator.AutopilotCASConfiguration(newConf, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if resp {
|
||||
t.Fatalf("bad: %v", resp)
|
||||
}
|
||||
}
|
||||
|
||||
// Pass a valid ModifyIndex
|
||||
{
|
||||
newConf := &AutopilotConfiguration{
|
||||
CleanupDeadServers: false,
|
||||
ModifyIndex: config.ModifyIndex,
|
||||
}
|
||||
resp, err := operator.AutopilotCASConfiguration(newConf, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !resp {
|
||||
t.Fatalf("bad: %v", resp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_ServerHealth(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) {
|
||||
c.RaftProtocol = 3
|
||||
})
|
||||
defer s.Stop()
|
||||
|
||||
operator := c.Operator()
|
||||
if err := testutil.WaitForResult(func() (bool, error) {
|
||||
out, err := operator.AutopilotServerHealth(nil)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("err: %v", err)
|
||||
}
|
||||
if len(out.Servers) != 1 ||
|
||||
!out.Servers[0].Healthy ||
|
||||
out.Servers[0].Name != s.Config.NodeName {
|
||||
return false, fmt.Errorf("bad: %v", out)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,98 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
)
|
||||
|
||||
// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration.
|
||||
func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *structs.AutopilotConfig) error {
|
||||
if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// This action requires operator read access.
|
||||
acl, err := op.srv.resolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if acl != nil && !acl.OperatorRead() {
|
||||
return permissionDeniedErr
|
||||
}
|
||||
|
||||
state := op.srv.fsm.State()
|
||||
_, config, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*reply = *config
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// AutopilotSetConfiguration is used to set the current Autopilot configuration.
|
||||
func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRequest, reply *bool) error {
|
||||
if done, err := op.srv.forward("Operator.AutopilotSetConfiguration", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// This action requires operator write access.
|
||||
acl, err := op.srv.resolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if acl != nil && !acl.OperatorWrite() {
|
||||
return permissionDeniedErr
|
||||
}
|
||||
|
||||
// Apply the update
|
||||
resp, err := op.srv.raftApply(structs.AutopilotRequestType, args)
|
||||
if err != nil {
|
||||
op.srv.logger.Printf("[ERR] consul.operator: Apply failed: %v", err)
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
// Check if the return type is a bool.
|
||||
if respBool, ok := resp.(bool); ok {
|
||||
*reply = respBool
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ServerHealth is used to get the current health of the servers.
|
||||
func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs.OperatorHealthReply) error {
|
||||
// This must be sent to the leader, so we fix the args since we are
|
||||
// re-using a structure where we don't support all the options.
|
||||
args.RequireConsistent = true
|
||||
args.AllowStale = false
|
||||
if done, err := op.srv.forward("Operator.ServerHealth", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// This action requires operator read access.
|
||||
acl, err := op.srv.resolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if acl != nil && !acl.OperatorRead() {
|
||||
return permissionDeniedErr
|
||||
}
|
||||
|
||||
// Exit early if the min Raft version is too low
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.LANMembers())
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting server raft protocol versions: %s", err)
|
||||
}
|
||||
if minRaftProtocol < 3 {
|
||||
return fmt.Errorf("all servers must have raft_protocol set to 3 or higher to use this endpoint")
|
||||
}
|
||||
|
||||
*reply = op.srv.getClusterHealth()
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,285 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/raft"
|
||||
)
|
||||
|
||||
func TestOperator_Autopilot_GetConfiguration(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.AutopilotConfig.CleanupDeadServers = false
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var reply structs.AutopilotConfig
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if reply.CleanupDeadServers {
|
||||
t.Fatalf("bad: %#v", reply)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_Autopilot_GetConfiguration_ACLDeny(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLMasterToken = "root"
|
||||
c.ACLDefaultPolicy = "deny"
|
||||
c.AutopilotConfig.CleanupDeadServers = false
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Try to get config without permissions
|
||||
arg := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var reply structs.AutopilotConfig
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
|
||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Create an ACL with operator read permissions.
|
||||
var token string
|
||||
{
|
||||
var rules = `
|
||||
operator = "read"
|
||||
`
|
||||
|
||||
req := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTypeClient,
|
||||
Rules: rules,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Now we can read and verify the config
|
||||
arg.Token = token
|
||||
err = msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if reply.CleanupDeadServers {
|
||||
t.Fatalf("bad: %#v", reply)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_Autopilot_SetConfiguration(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.AutopilotConfig.CleanupDeadServers = false
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Change the autopilot config from the default
|
||||
arg := structs.AutopilotSetConfigRequest{
|
||||
Datacenter: "dc1",
|
||||
Config: structs.AutopilotConfig{
|
||||
CleanupDeadServers: true,
|
||||
},
|
||||
}
|
||||
var reply *bool
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Make sure it's changed
|
||||
state := s1.fsm.State()
|
||||
_, config, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !config.CleanupDeadServers {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLMasterToken = "root"
|
||||
c.ACLDefaultPolicy = "deny"
|
||||
c.AutopilotConfig.CleanupDeadServers = false
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Try to set config without permissions
|
||||
arg := structs.AutopilotSetConfigRequest{
|
||||
Datacenter: "dc1",
|
||||
Config: structs.AutopilotConfig{
|
||||
CleanupDeadServers: true,
|
||||
},
|
||||
}
|
||||
var reply *bool
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply)
|
||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Create an ACL with operator write permissions.
|
||||
var token string
|
||||
{
|
||||
var rules = `
|
||||
operator = "write"
|
||||
`
|
||||
|
||||
req := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTypeClient,
|
||||
Rules: rules,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Now we can update the config
|
||||
arg.Token = token
|
||||
err = msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Make sure it's changed
|
||||
state := s1.fsm.State()
|
||||
_, config, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !config.CleanupDeadServers {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_ServerHealth(t *testing.T) {
|
||||
conf := func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.Bootstrap = false
|
||||
c.BootstrapExpect = 3
|
||||
c.RaftConfig.ProtocolVersion = 3
|
||||
c.ServerHealthInterval = 100 * time.Millisecond
|
||||
c.AutopilotInterval = 100 * time.Millisecond
|
||||
}
|
||||
dir1, s1 := testServerWithConfig(t, conf)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, conf)
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
|
||||
if _, err := s2.JoinLAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
dir3, s3 := testServerWithConfig(t, conf)
|
||||
defer os.RemoveAll(dir3)
|
||||
defer s3.Shutdown()
|
||||
if _, err := s3.JoinLAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
if err := testutil.WaitForResult(func() (bool, error) {
|
||||
arg := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var reply structs.OperatorHealthReply
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("err: %v", err)
|
||||
}
|
||||
if !reply.Healthy {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
}
|
||||
if reply.FailureTolerance != 1 {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
}
|
||||
if len(reply.Servers) != 3 {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
}
|
||||
// Leader should have LastContact == 0, others should be positive
|
||||
for _, s := range reply.Servers {
|
||||
isLeader := s1.raft.Leader() == raft.ServerAddress(s.Address)
|
||||
if isLeader && s.LastContact != 0 {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
}
|
||||
if !isLeader && s.LastContact <= 0 {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_ServerHealth_UnsupportedRaftVersion(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.Bootstrap = true
|
||||
c.RaftConfig.ProtocolVersion = 2
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
arg := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var reply structs.OperatorHealthReply
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply)
|
||||
if err == nil || !strings.Contains(err.Error(), "raft_protocol set to 3 or higher") {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
}
|
|
@ -1,296 +1,6 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"github.com/hashicorp/consul/consul/agent"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
// Operator endpoint is used to perform low-level operator tasks for Consul.
|
||||
type Operator struct {
|
||||
srv *Server
|
||||
}
|
||||
|
||||
// RaftGetConfiguration is used to retrieve the current Raft configuration.
|
||||
func (op *Operator) RaftGetConfiguration(args *structs.DCSpecificRequest, reply *structs.RaftConfigurationResponse) error {
|
||||
if done, err := op.srv.forward("Operator.RaftGetConfiguration", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// This action requires operator read access.
|
||||
acl, err := op.srv.resolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if acl != nil && !acl.OperatorRead() {
|
||||
return permissionDeniedErr
|
||||
}
|
||||
|
||||
// We can't fetch the leader and the configuration atomically with
|
||||
// the current Raft API.
|
||||
future := op.srv.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Index the Consul information about the servers.
|
||||
serverMap := make(map[raft.ServerAddress]serf.Member)
|
||||
for _, member := range op.srv.serfLAN.Members() {
|
||||
valid, parts := agent.IsConsulServer(member)
|
||||
if !valid {
|
||||
continue
|
||||
}
|
||||
|
||||
addr := (&net.TCPAddr{IP: member.Addr, Port: parts.Port}).String()
|
||||
serverMap[raft.ServerAddress(addr)] = member
|
||||
}
|
||||
|
||||
// Fill out the reply.
|
||||
leader := op.srv.raft.Leader()
|
||||
reply.Index = future.Index()
|
||||
for _, server := range future.Configuration().Servers {
|
||||
node := "(unknown)"
|
||||
if member, ok := serverMap[server.Address]; ok {
|
||||
node = member.Name
|
||||
}
|
||||
|
||||
entry := &structs.RaftServer{
|
||||
ID: server.ID,
|
||||
Node: node,
|
||||
Address: server.Address,
|
||||
Leader: server.Address == leader,
|
||||
Voter: server.Suffrage == raft.Voter,
|
||||
}
|
||||
reply.Servers = append(reply.Servers, entry)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft
|
||||
// quorum but no longer known to Serf or the catalog) by address in the form of
|
||||
// "IP:port". The reply argument is not used, but it required to fulfill the RPC
|
||||
// interface.
|
||||
func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftRemovePeerRequest, reply *struct{}) error {
|
||||
if done, err := op.srv.forward("Operator.RaftRemovePeerByAddress", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// This is a super dangerous operation that requires operator write
|
||||
// access.
|
||||
acl, err := op.srv.resolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if acl != nil && !acl.OperatorWrite() {
|
||||
return permissionDeniedErr
|
||||
}
|
||||
|
||||
// Since this is an operation designed for humans to use, we will return
|
||||
// an error if the supplied address isn't among the peers since it's
|
||||
// likely they screwed up.
|
||||
{
|
||||
future := op.srv.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, s := range future.Configuration().Servers {
|
||||
if s.Address == args.Address {
|
||||
args.ID = s.ID
|
||||
goto REMOVE
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("address %q was not found in the Raft configuration",
|
||||
args.Address)
|
||||
}
|
||||
|
||||
REMOVE:
|
||||
// The Raft library itself will prevent various forms of foot-shooting,
|
||||
// like making a configuration with no voters. Some consideration was
|
||||
// given here to adding more checks, but it was decided to make this as
|
||||
// low-level and direct as possible. We've got ACL coverage to lock this
|
||||
// down, and if you are an operator, it's assumed you know what you are
|
||||
// doing if you are calling this. If you remove a peer that's known to
|
||||
// Serf, for example, it will come back when the leader does a reconcile
|
||||
// pass.
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var future raft.Future
|
||||
if minRaftProtocol >= 2 {
|
||||
future = op.srv.raft.RemoveServer(args.ID, 0, 0)
|
||||
} else {
|
||||
future = op.srv.raft.RemovePeer(args.Address)
|
||||
}
|
||||
if err := future.Error(); err != nil {
|
||||
op.srv.logger.Printf("[WARN] consul.operator: Failed to remove Raft peer %q: %v",
|
||||
args.Address, err)
|
||||
return err
|
||||
}
|
||||
|
||||
op.srv.logger.Printf("[WARN] consul.operator: Removed Raft peer %q", args.Address)
|
||||
return nil
|
||||
}
|
||||
|
||||
// RaftRemovePeerByID is used to kick a stale peer (one that is in the Raft
|
||||
// quorum but no longer known to Serf or the catalog) by address in the form of
|
||||
// "IP:port". The reply argument is not used, but is required to fulfill the RPC
|
||||
// interface.
|
||||
func (op *Operator) RaftRemovePeerByID(args *structs.RaftRemovePeerRequest, reply *struct{}) error {
|
||||
if done, err := op.srv.forward("Operator.RaftRemovePeerByID", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// This is a super dangerous operation that requires operator write
|
||||
// access.
|
||||
acl, err := op.srv.resolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if acl != nil && !acl.OperatorWrite() {
|
||||
return permissionDeniedErr
|
||||
}
|
||||
|
||||
// Since this is an operation designed for humans to use, we will return
|
||||
// an error if the supplied id isn't among the peers since it's
|
||||
// likely they screwed up.
|
||||
{
|
||||
future := op.srv.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, s := range future.Configuration().Servers {
|
||||
if s.ID == args.ID {
|
||||
args.Address = s.Address
|
||||
goto REMOVE
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("id %q was not found in the Raft configuration",
|
||||
args.ID)
|
||||
}
|
||||
|
||||
REMOVE:
|
||||
// The Raft library itself will prevent various forms of foot-shooting,
|
||||
// like making a configuration with no voters. Some consideration was
|
||||
// given here to adding more checks, but it was decided to make this as
|
||||
// low-level and direct as possible. We've got ACL coverage to lock this
|
||||
// down, and if you are an operator, it's assumed you know what you are
|
||||
// doing if you are calling this. If you remove a peer that's known to
|
||||
// Serf, for example, it will come back when the leader does a reconcile
|
||||
// pass.
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var future raft.Future
|
||||
if minRaftProtocol >= 2 {
|
||||
future = op.srv.raft.RemoveServer(args.ID, 0, 0)
|
||||
} else {
|
||||
future = op.srv.raft.RemovePeer(args.Address)
|
||||
}
|
||||
if err := future.Error(); err != nil {
|
||||
op.srv.logger.Printf("[WARN] consul.operator: Failed to remove Raft peer with id %q: %v",
|
||||
args.ID, err)
|
||||
return err
|
||||
}
|
||||
|
||||
op.srv.logger.Printf("[WARN] consul.operator: Removed Raft peer with id %q", args.ID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration.
|
||||
func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *structs.AutopilotConfig) error {
|
||||
if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// This action requires operator read access.
|
||||
acl, err := op.srv.resolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if acl != nil && !acl.OperatorRead() {
|
||||
return permissionDeniedErr
|
||||
}
|
||||
|
||||
state := op.srv.fsm.State()
|
||||
_, config, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*reply = *config
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// AutopilotSetConfiguration is used to set the current Autopilot configuration.
|
||||
func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRequest, reply *bool) error {
|
||||
if done, err := op.srv.forward("Operator.AutopilotSetConfiguration", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// This action requires operator write access.
|
||||
acl, err := op.srv.resolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if acl != nil && !acl.OperatorWrite() {
|
||||
return permissionDeniedErr
|
||||
}
|
||||
|
||||
// Apply the update
|
||||
resp, err := op.srv.raftApply(structs.AutopilotRequestType, args)
|
||||
if err != nil {
|
||||
op.srv.logger.Printf("[ERR] consul.operator: Apply failed: %v", err)
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
// Check if the return type is a bool.
|
||||
if respBool, ok := resp.(bool); ok {
|
||||
*reply = respBool
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ServerHealth is used to get the current health of the servers.
|
||||
func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs.OperatorHealthReply) error {
|
||||
// This must be sent to the leader, so we fix the args since we are
|
||||
// re-using a structure where we don't support all the options.
|
||||
args.RequireConsistent = true
|
||||
args.AllowStale = false
|
||||
if done, err := op.srv.forward("Operator.ServerHealth", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// This action requires operator read access.
|
||||
acl, err := op.srv.resolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if acl != nil && !acl.OperatorRead() {
|
||||
return permissionDeniedErr
|
||||
}
|
||||
|
||||
// Exit early if the min Raft version is too low
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.LANMembers())
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting server raft protocol versions: %s", err)
|
||||
}
|
||||
if minRaftProtocol < 3 {
|
||||
return fmt.Errorf("all servers must have raft_protocol set to 3 or higher to use this endpoint")
|
||||
}
|
||||
|
||||
*reply = op.srv.getClusterHealth()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,200 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"github.com/hashicorp/consul/consul/agent"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
// RaftGetConfiguration is used to retrieve the current Raft configuration.
|
||||
func (op *Operator) RaftGetConfiguration(args *structs.DCSpecificRequest, reply *structs.RaftConfigurationResponse) error {
|
||||
if done, err := op.srv.forward("Operator.RaftGetConfiguration", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// This action requires operator read access.
|
||||
acl, err := op.srv.resolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if acl != nil && !acl.OperatorRead() {
|
||||
return permissionDeniedErr
|
||||
}
|
||||
|
||||
// We can't fetch the leader and the configuration atomically with
|
||||
// the current Raft API.
|
||||
future := op.srv.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Index the Consul information about the servers.
|
||||
serverMap := make(map[raft.ServerAddress]serf.Member)
|
||||
for _, member := range op.srv.serfLAN.Members() {
|
||||
valid, parts := agent.IsConsulServer(member)
|
||||
if !valid {
|
||||
continue
|
||||
}
|
||||
|
||||
addr := (&net.TCPAddr{IP: member.Addr, Port: parts.Port}).String()
|
||||
serverMap[raft.ServerAddress(addr)] = member
|
||||
}
|
||||
|
||||
// Fill out the reply.
|
||||
leader := op.srv.raft.Leader()
|
||||
reply.Index = future.Index()
|
||||
for _, server := range future.Configuration().Servers {
|
||||
node := "(unknown)"
|
||||
if member, ok := serverMap[server.Address]; ok {
|
||||
node = member.Name
|
||||
}
|
||||
|
||||
entry := &structs.RaftServer{
|
||||
ID: server.ID,
|
||||
Node: node,
|
||||
Address: server.Address,
|
||||
Leader: server.Address == leader,
|
||||
Voter: server.Suffrage == raft.Voter,
|
||||
}
|
||||
reply.Servers = append(reply.Servers, entry)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft
|
||||
// quorum but no longer known to Serf or the catalog) by address in the form of
|
||||
// "IP:port". The reply argument is not used, but it required to fulfill the RPC
|
||||
// interface.
|
||||
func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftRemovePeerRequest, reply *struct{}) error {
|
||||
if done, err := op.srv.forward("Operator.RaftRemovePeerByAddress", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// This is a super dangerous operation that requires operator write
|
||||
// access.
|
||||
acl, err := op.srv.resolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if acl != nil && !acl.OperatorWrite() {
|
||||
return permissionDeniedErr
|
||||
}
|
||||
|
||||
// Since this is an operation designed for humans to use, we will return
|
||||
// an error if the supplied address isn't among the peers since it's
|
||||
// likely they screwed up.
|
||||
{
|
||||
future := op.srv.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, s := range future.Configuration().Servers {
|
||||
if s.Address == args.Address {
|
||||
args.ID = s.ID
|
||||
goto REMOVE
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("address %q was not found in the Raft configuration",
|
||||
args.Address)
|
||||
}
|
||||
|
||||
REMOVE:
|
||||
// The Raft library itself will prevent various forms of foot-shooting,
|
||||
// like making a configuration with no voters. Some consideration was
|
||||
// given here to adding more checks, but it was decided to make this as
|
||||
// low-level and direct as possible. We've got ACL coverage to lock this
|
||||
// down, and if you are an operator, it's assumed you know what you are
|
||||
// doing if you are calling this. If you remove a peer that's known to
|
||||
// Serf, for example, it will come back when the leader does a reconcile
|
||||
// pass.
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var future raft.Future
|
||||
if minRaftProtocol >= 2 {
|
||||
future = op.srv.raft.RemoveServer(args.ID, 0, 0)
|
||||
} else {
|
||||
future = op.srv.raft.RemovePeer(args.Address)
|
||||
}
|
||||
if err := future.Error(); err != nil {
|
||||
op.srv.logger.Printf("[WARN] consul.operator: Failed to remove Raft peer %q: %v",
|
||||
args.Address, err)
|
||||
return err
|
||||
}
|
||||
|
||||
op.srv.logger.Printf("[WARN] consul.operator: Removed Raft peer %q", args.Address)
|
||||
return nil
|
||||
}
|
||||
|
||||
// RaftRemovePeerByID is used to kick a stale peer (one that is in the Raft
|
||||
// quorum but no longer known to Serf or the catalog) by address in the form of
|
||||
// "IP:port". The reply argument is not used, but is required to fulfill the RPC
|
||||
// interface.
|
||||
func (op *Operator) RaftRemovePeerByID(args *structs.RaftRemovePeerRequest, reply *struct{}) error {
|
||||
if done, err := op.srv.forward("Operator.RaftRemovePeerByID", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// This is a super dangerous operation that requires operator write
|
||||
// access.
|
||||
acl, err := op.srv.resolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if acl != nil && !acl.OperatorWrite() {
|
||||
return permissionDeniedErr
|
||||
}
|
||||
|
||||
// Since this is an operation designed for humans to use, we will return
|
||||
// an error if the supplied id isn't among the peers since it's
|
||||
// likely they screwed up.
|
||||
{
|
||||
future := op.srv.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, s := range future.Configuration().Servers {
|
||||
if s.ID == args.ID {
|
||||
args.Address = s.Address
|
||||
goto REMOVE
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("id %q was not found in the Raft configuration",
|
||||
args.ID)
|
||||
}
|
||||
|
||||
REMOVE:
|
||||
// The Raft library itself will prevent various forms of foot-shooting,
|
||||
// like making a configuration with no voters. Some consideration was
|
||||
// given here to adding more checks, but it was decided to make this as
|
||||
// low-level and direct as possible. We've got ACL coverage to lock this
|
||||
// down, and if you are an operator, it's assumed you know what you are
|
||||
// doing if you are calling this. If you remove a peer that's known to
|
||||
// Serf, for example, it will come back when the leader does a reconcile
|
||||
// pass.
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var future raft.Future
|
||||
if minRaftProtocol >= 2 {
|
||||
future = op.srv.raft.RemoveServer(args.ID, 0, 0)
|
||||
} else {
|
||||
future = op.srv.raft.RemovePeer(args.Address)
|
||||
}
|
||||
if err := future.Error(); err != nil {
|
||||
op.srv.logger.Printf("[WARN] consul.operator: Failed to remove Raft peer with id %q: %v",
|
||||
args.ID, err)
|
||||
return err
|
||||
}
|
||||
|
||||
op.srv.logger.Printf("[WARN] consul.operator: Removed Raft peer with id %q", args.ID)
|
||||
return nil
|
||||
}
|
|
@ -7,8 +7,6 @@ import (
|
|||
"strings"
|
||||
"testing"
|
||||
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
|
@ -361,274 +359,3 @@ func TestOperator_RaftRemovePeerByID_ACLDeny(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_Autopilot_GetConfiguration(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.AutopilotConfig.CleanupDeadServers = false
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var reply structs.AutopilotConfig
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if reply.CleanupDeadServers {
|
||||
t.Fatalf("bad: %#v", reply)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_Autopilot_GetConfiguration_ACLDeny(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLMasterToken = "root"
|
||||
c.ACLDefaultPolicy = "deny"
|
||||
c.AutopilotConfig.CleanupDeadServers = false
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Try to get config without permissions
|
||||
arg := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var reply structs.AutopilotConfig
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
|
||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Create an ACL with operator read permissions.
|
||||
var token string
|
||||
{
|
||||
var rules = `
|
||||
operator = "read"
|
||||
`
|
||||
|
||||
req := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTypeClient,
|
||||
Rules: rules,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Now we can read and verify the config
|
||||
arg.Token = token
|
||||
err = msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if reply.CleanupDeadServers {
|
||||
t.Fatalf("bad: %#v", reply)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_Autopilot_SetConfiguration(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.AutopilotConfig.CleanupDeadServers = false
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Change the autopilot config from the default
|
||||
arg := structs.AutopilotSetConfigRequest{
|
||||
Datacenter: "dc1",
|
||||
Config: structs.AutopilotConfig{
|
||||
CleanupDeadServers: true,
|
||||
},
|
||||
}
|
||||
var reply *bool
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Make sure it's changed
|
||||
state := s1.fsm.State()
|
||||
_, config, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !config.CleanupDeadServers {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLMasterToken = "root"
|
||||
c.ACLDefaultPolicy = "deny"
|
||||
c.AutopilotConfig.CleanupDeadServers = false
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Try to set config without permissions
|
||||
arg := structs.AutopilotSetConfigRequest{
|
||||
Datacenter: "dc1",
|
||||
Config: structs.AutopilotConfig{
|
||||
CleanupDeadServers: true,
|
||||
},
|
||||
}
|
||||
var reply *bool
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply)
|
||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Create an ACL with operator write permissions.
|
||||
var token string
|
||||
{
|
||||
var rules = `
|
||||
operator = "write"
|
||||
`
|
||||
|
||||
req := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTypeClient,
|
||||
Rules: rules,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Now we can update the config
|
||||
arg.Token = token
|
||||
err = msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Make sure it's changed
|
||||
state := s1.fsm.State()
|
||||
_, config, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !config.CleanupDeadServers {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_ServerHealth(t *testing.T) {
|
||||
conf := func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.Bootstrap = false
|
||||
c.BootstrapExpect = 3
|
||||
c.RaftConfig.ProtocolVersion = 3
|
||||
c.ServerHealthInterval = 100 * time.Millisecond
|
||||
c.AutopilotInterval = 100 * time.Millisecond
|
||||
}
|
||||
dir1, s1 := testServerWithConfig(t, conf)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, conf)
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
|
||||
if _, err := s2.JoinLAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
dir3, s3 := testServerWithConfig(t, conf)
|
||||
defer os.RemoveAll(dir3)
|
||||
defer s3.Shutdown()
|
||||
if _, err := s3.JoinLAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
if err := testutil.WaitForResult(func() (bool, error) {
|
||||
arg := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var reply structs.OperatorHealthReply
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("err: %v", err)
|
||||
}
|
||||
if !reply.Healthy {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
}
|
||||
if reply.FailureTolerance != 1 {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
}
|
||||
if len(reply.Servers) != 3 {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
}
|
||||
// Leader should have LastContact == 0, others should be positive
|
||||
for _, s := range reply.Servers {
|
||||
isLeader := s1.raft.Leader() == raft.ServerAddress(s.Address)
|
||||
if isLeader && s.LastContact != 0 {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
}
|
||||
if !isLeader && s.LastContact <= 0 {
|
||||
return false, fmt.Errorf("bad: %v", reply)
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_ServerHealth_UnsupportedRaftVersion(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.Bootstrap = true
|
||||
c.RaftConfig.ProtocolVersion = 2
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
arg := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var reply structs.OperatorHealthReply
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply)
|
||||
if err == nil || !strings.Contains(err.Error(), "raft_protocol set to 3 or higher") {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue