Adds new consul operator endpoint, CLI, and ACL and some basic Raft commands.

This commit is contained in:
James Phillips 2016-08-29 19:09:57 -07:00
parent 53149bd2f9
commit e5850d8a26
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
20 changed files with 1156 additions and 25 deletions

View File

@ -73,6 +73,14 @@ type ACL interface {
// KeyringWrite determines if the keyring can be manipulated
KeyringWrite() bool
// OperatorRead determines if the read-only Consul operator functions
// can be used.
OperatorRead() bool
// OperatorWrite determines if the state-changing Consul operator
// functions can be used.
OperatorWrite() bool
// ACLList checks for permission to list all the ACLs
ACLList() bool
@ -132,6 +140,14 @@ func (s *StaticACL) KeyringWrite() bool {
return s.defaultAllow
}
func (s *StaticACL) OperatorRead() bool {
return s.defaultAllow
}
func (s *StaticACL) OperatorWrite() bool {
return s.defaultAllow
}
func (s *StaticACL) ACLList() bool {
return s.allowManage
}
@ -188,10 +204,13 @@ type PolicyACL struct {
// preparedQueryRules contains the prepared query policies
preparedQueryRules *radix.Tree
// keyringRules contains the keyring policies. The keyring has
// keyringRule contains the keyring policies. The keyring has
// a very simple yes/no without prefix matching, so here we
// don't need to use a radix tree.
keyringRule string
// operatorRule contains the operator policies.
operatorRule string
}
// New is used to construct a policy based ACL from a set of policies
@ -228,6 +247,9 @@ func New(parent ACL, policy *Policy) (*PolicyACL, error) {
// Load the keyring policy
p.keyringRule = policy.Keyring
// Load the operator policy
p.operatorRule = policy.Operator
return p, nil
}
@ -422,6 +444,27 @@ func (p *PolicyACL) KeyringWrite() bool {
return p.parent.KeyringWrite()
}
// OperatorRead determines if the read-only operator functions are allowed.
func (p *PolicyACL) OperatorRead() bool {
switch p.operatorRule {
case PolicyRead, PolicyWrite:
return true
case PolicyDeny:
return false
default:
return p.parent.OperatorRead()
}
}
// OperatorWrite determines if the state-changing operator functions are
// allowed.
func (p *PolicyACL) OperatorWrite() bool {
if p.operatorRule == PolicyWrite {
return true
}
return p.parent.OperatorWrite()
}
// ACLList checks if listing of ACLs is allowed
func (p *PolicyACL) ACLList() bool {
return p.parent.ACLList()

View File

@ -65,6 +65,12 @@ func TestStaticACL(t *testing.T) {
if !all.KeyringWrite() {
t.Fatalf("should allow")
}
if !all.OperatorRead() {
t.Fatalf("should allow")
}
if !all.OperatorWrite() {
t.Fatalf("should allow")
}
if all.ACLList() {
t.Fatalf("should not allow")
}
@ -108,6 +114,12 @@ func TestStaticACL(t *testing.T) {
if none.KeyringWrite() {
t.Fatalf("should not allow")
}
if none.OperatorRead() {
t.Fatalf("should now allow")
}
if none.OperatorWrite() {
t.Fatalf("should not allow")
}
if none.ACLList() {
t.Fatalf("should not allow")
}
@ -145,6 +157,12 @@ func TestStaticACL(t *testing.T) {
if !manage.KeyringWrite() {
t.Fatalf("should allow")
}
if !manage.OperatorRead() {
t.Fatalf("should allow")
}
if !manage.OperatorWrite() {
t.Fatalf("should allow")
}
if !manage.ACLList() {
t.Fatalf("should allow")
}
@ -480,19 +498,18 @@ func TestPolicyACL_Parent(t *testing.T) {
}
func TestPolicyACL_Keyring(t *testing.T) {
// Test keyring ACLs
type keyringcase struct {
inp string
read bool
write bool
}
keyringcases := []keyringcase{
cases := []keyringcase{
{"", false, false},
{PolicyRead, true, false},
{PolicyWrite, true, true},
{PolicyDeny, false, false},
}
for _, c := range keyringcases {
for _, c := range cases {
acl, err := New(DenyAll(), &Policy{Keyring: c.inp})
if err != nil {
t.Fatalf("bad: %s", err)
@ -505,3 +522,29 @@ func TestPolicyACL_Keyring(t *testing.T) {
}
}
}
func TestPolicyACL_Operator(t *testing.T) {
type operatorcase struct {
inp string
read bool
write bool
}
cases := []operatorcase{
{"", false, false},
{PolicyRead, true, false},
{PolicyWrite, true, true},
{PolicyDeny, false, false},
}
for _, c := range cases {
acl, err := New(DenyAll(), &Policy{Operator: c.inp})
if err != nil {
t.Fatalf("bad: %s", err)
}
if acl.OperatorRead() != c.read {
t.Fatalf("bad: %#v", c)
}
if acl.OperatorWrite() != c.write {
t.Fatalf("bad: %#v", c)
}
}
}

View File

@ -21,6 +21,7 @@ type Policy struct {
Events []*EventPolicy `hcl:"event,expand"`
PreparedQueries []*PreparedQueryPolicy `hcl:"query,expand"`
Keyring string `hcl:"keyring"`
Operator string `hcl:"operator"`
}
// KeyPolicy represents a policy for a key
@ -125,5 +126,10 @@ func Parse(rules string) (*Policy, error) {
return nil, fmt.Errorf("Invalid keyring policy: %#v", p.Keyring)
}
// Validate the operator policy - this one is allowed to be empty
if p.Operator != "" && !isPolicyValid(p.Operator) {
return nil, fmt.Errorf("Invalid operator policy: %#v", p.Operator)
}
return p, nil
}

View File

@ -45,6 +45,7 @@ query "bar" {
policy = "deny"
}
keyring = "deny"
operator = "deny"
`
exp := &Policy{
Keys: []*KeyPolicy{
@ -104,6 +105,7 @@ keyring = "deny"
},
},
Keyring: PolicyDeny,
Operator: PolicyDeny,
}
out, err := Parse(inp)
@ -162,7 +164,8 @@ func TestACLPolicy_Parse_JSON(t *testing.T) {
"policy": "deny"
}
},
"keyring": "deny"
"keyring": "deny",
"operator": "deny"
}`
exp := &Policy{
Keys: []*KeyPolicy{
@ -222,6 +225,7 @@ func TestACLPolicy_Parse_JSON(t *testing.T) {
},
},
Keyring: PolicyDeny,
Operator: PolicyDeny,
}
out, err := Parse(inp)
@ -252,6 +256,24 @@ keyring = ""
}
}
func TestACLPolicy_Operator_Empty(t *testing.T) {
inp := `
operator = ""
`
exp := &Policy{
Operator: "",
}
out, err := Parse(inp)
if err != nil {
t.Fatalf("err: %v", err)
}
if !reflect.DeepEqual(out, exp) {
t.Fatalf("bad: %#v %#v", out, exp)
}
}
func TestACLPolicy_Bad_Policy(t *testing.T) {
cases := []string{
`key "" { policy = "nope" }`,
@ -259,6 +281,7 @@ func TestACLPolicy_Bad_Policy(t *testing.T) {
`event "" { policy = "nope" }`,
`query "" { policy = "nope" }`,
`keyring = "nope"`,
`operator = "nope"`,
}
for _, c := range cases {
_, err := Parse(c)

71
api/operator.go Normal file
View File

@ -0,0 +1,71 @@
package api
import (
"github.com/hashicorp/raft"
)
// Operator can be used to perform low-level operator tasks for Consul.
type Operator struct {
c *Client
}
// Operator returns a handle to the operator endpoints.
func (c *Client) Operator() *Operator {
return &Operator{c}
}
// RaftConfigration is returned when querying for the current Raft configuration.
// This has the low-level Raft structure, as well as some supplemental
// information from Consul.
type RaftConfiguration struct {
// Configuration is the low-level Raft configuration structure.
Configuration raft.Configuration
// NodeMap maps IDs in the Raft configuration to node names known by
// Consul. It's possible that not all configuration entries may have
// an entry here if the node isn't known to Consul. Given how this is
// generated, this may also contain entries that aren't present in the
// Raft configuration.
NodeMap map[raft.ServerID]string
// Leader is the ID of the current Raft leader. This may be blank if
// there isn't one.
Leader raft.ServerID
}
// RaftGetConfiguration is used to query the current Raft peer set.
func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) {
r := op.c.newRequest("GET", "/v1/operator/raft/configuration")
r.setQueryOptions(q)
_, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var out RaftConfiguration
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return &out, nil
}
// RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft
// quorum but no longer known to Serf or the catalog) by address in the form of
// "IP:port".
func (op *Operator) RaftRemovePeerByAddress(address raft.ServerAddress, q *WriteOptions) error {
r := op.c.newRequest("DELETE", "/v1/operator/raft/peer")
r.setWriteOptions(q)
// TODO (slackpad) Currently we made address a query parameter. Once
// IDs are in place this will be DELETE /v1/raft-peer/<id>.
r.params.Set("address", string(address))
_, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}

38
api/operator_test.go Normal file
View File

@ -0,0 +1,38 @@
package api
import (
"strings"
"testing"
)
func TestOperator_RaftGetConfiguration(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
operator := c.Operator()
out, err := operator.RaftGetConfiguration(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(out.Configuration.Servers) != 1 ||
len(out.NodeMap) != 1 ||
len(out.Leader) == 0 {
t.Fatalf("bad: %v", out)
}
}
func TestOperator_RaftRemovePeerByAddress(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
// If we get this error, it proves we sent the address all the way
// through.
operator := c.Operator()
err := operator.RaftRemovePeerByAddress("nope", nil)
if err == nil || !strings.Contains(err.Error(),
"address \"nope\" was not found in the Raft configuration") {
t.Fatalf("err: %v", err)
}
}

View File

@ -230,6 +230,9 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.handleFuncMetrics("/v1/status/leader", s.wrap(s.StatusLeader))
s.handleFuncMetrics("/v1/status/peers", s.wrap(s.StatusPeers))
s.handleFuncMetrics("/v1/operator/raft/configuration", s.wrap(s.OperatorRaftConfiguration))
s.handleFuncMetrics("/v1/operator/raft/peer", s.wrap(s.OperatorRaftPeer))
s.handleFuncMetrics("/v1/catalog/register", s.wrap(s.CatalogRegister))
s.handleFuncMetrics("/v1/catalog/deregister", s.wrap(s.CatalogDeregister))
s.handleFuncMetrics("/v1/catalog/datacenters", s.wrap(s.CatalogDatacenters))

View File

@ -0,0 +1,57 @@
package agent
import (
"net/http"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/raft"
)
// OperatorRaftConfiguration is used to inspect the current Raft configuration.
// This supports the stale query mode in case the cluster doesn't have a leader.
func (s *HTTPServer) OperatorRaftConfiguration(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "GET" {
resp.WriteHeader(http.StatusMethodNotAllowed)
return nil, nil
}
var args structs.DCSpecificRequest
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
var reply structs.RaftConfigurationResponse
if err := s.agent.RPC("Operator.RaftGetConfiguration", &args, &reply); err != nil {
return nil, err
}
return reply, nil
}
// OperatorRaftPeer supports actions on Raft peers. Currently we only support
// removing peers by address.
func (s *HTTPServer) OperatorRaftPeer(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "DELETE" {
resp.WriteHeader(http.StatusMethodNotAllowed)
return nil, nil
}
var args structs.RaftPeerByAddressRequest
s.parseDC(req, &args.Datacenter)
s.parseToken(req, &args.Token)
params := req.URL.Query()
if _, ok := params["address"]; ok {
args.Address = raft.ServerAddress(params.Get("address"))
} else {
resp.WriteHeader(http.StatusBadRequest)
resp.Write([]byte("Must specify ?address with IP:port of peer to remove"))
return nil, nil
}
var reply struct{}
if err := s.agent.RPC("Operator.RaftRemovePeerByAddress", &args, &reply); err != nil {
return nil, err
}
return nil, nil
}

View File

@ -0,0 +1,58 @@
package agent
import (
"bytes"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/hashicorp/consul/consul/structs"
)
func TestOperator_OperatorRaftConfiguration(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("GET", "/v1/operator/raft/configuration", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.OperatorRaftConfiguration(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
out, ok := obj.(structs.RaftConfigurationResponse)
if !ok {
t.Fatalf("unexpected: %T", obj)
}
if len(out.Configuration.Servers) != 1 ||
len(out.NodeMap) != 1 ||
len(out.Leader) == 0 {
t.Fatalf("bad: %v", out)
}
})
}
func TestOperator_OperatorRaftPeer(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("DELETE", "/v1/operator/raft/peer?address=nope", body)
if err != nil {
t.Fatalf("err: %v", err)
}
// If we get this error, it proves we sent the address all the
// way through.
resp := httptest.NewRecorder()
_, err = srv.OperatorRaftPeer(resp, req)
if err == nil || !strings.Contains(err.Error(),
"address \"nope\" was not found in the Raft configuration") {
t.Fatalf("err: %v", err)
}
})
}

180
command/operator.go Normal file
View File

@ -0,0 +1,180 @@
package command
import (
"flag"
"fmt"
"strings"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/raft"
"github.com/mitchellh/cli"
"github.com/ryanuber/columnize"
)
// OperatorCommand is used to provide various low-level tools for Consul
// operators.
type OperatorCommand struct {
Ui cli.Ui
}
func (c *OperatorCommand) Help() string {
helpText := `
Usage: consul operator <subcommand> [common options] [action] [options]
Provides cluster-level tools for Consul operators, such as interacting with
the Raft subsystem. NOTE: Use this command with extreme caution, as improper
use could lead to a Consul outage and even loss of data.
If ACLs are enabled then a token with operator privileges may required in
order to use this command. Requests are forwarded internally to the leader
if required, so this can be run from any Consul node in a cluster.
Run consul operator <subcommand> with no arguments for help on that
subcommand.
Common Options:
-http-addr=127.0.0.1:8500 HTTP address of the Consul agent.
-token="" ACL token to use. Defaults to that of agent.
Subcommands:
raft View and modify Consul's Raft configuration.
`
return strings.TrimSpace(helpText)
}
func (c *OperatorCommand) Run(args []string) int {
if len(args) < 1 {
c.Ui.Error("A subcommand must be specified")
c.Ui.Error("")
c.Ui.Error(c.Help())
return 1
}
var err error
subcommand := args[0]
switch subcommand {
case "raft":
err = c.raft(args[1:])
default:
err = fmt.Errorf("unknown subcommand %q", subcommand)
}
if err != nil {
c.Ui.Error(fmt.Sprintf("Operator %q subcommand failed: %v", subcommand, err))
return 1
}
return 0
}
// Synopsis returns a one-line description of this command.
func (c *OperatorCommand) Synopsis() string {
return "Provides cluster-level tools for Consul operators"
}
const raftHelp = `
Raft Subcommand Actions:
raft -list-peers -stale=[true|false]
Displays the current Raft peer configuration.
The -stale argument defaults to "false" which means the leader provides the
result. If the cluster is in an outage state without a leader, you may need
to set -stale to "true" to get the configuration from a non-leader server.
raft -remove-peer -address="IP:port"
Removes Consul server with given -address from the Raft configuration.
There are rare cases where a peer may be left behind in the Raft quorum even
though the server is no longer present and known to the cluster. This
command can be used to remove the failed server so that it is no longer
affects the Raft quorum. If the server still shows in the output of the
"consul members" command, it is preferable to clean up by simply running
"consul force-leave" instead of this command.
`
// raft handles the raft subcommands.
func (c *OperatorCommand) raft(args []string) error {
cmdFlags := flag.NewFlagSet("raft", flag.ContinueOnError)
cmdFlags.Usage = func() { c.Ui.Output(c.Help()) }
// Parse verb arguments.
var listPeers, removePeer bool
cmdFlags.BoolVar(&listPeers, "list-peers", false, "")
cmdFlags.BoolVar(&removePeer, "remove-peer", false, "")
// Parse other arguments.
var stale bool
var address, token string
cmdFlags.StringVar(&address, "address", "", "")
cmdFlags.BoolVar(&stale, "stale", false, "")
cmdFlags.StringVar(&token, "token", "", "")
httpAddr := HTTPAddrFlag(cmdFlags)
if err := cmdFlags.Parse(args); err != nil {
return err
}
// Set up a client.
conf := api.DefaultConfig()
conf.Address = *httpAddr
client, err := api.NewClient(conf)
if err != nil {
return fmt.Errorf("error connecting to Consul agent: %s", err)
}
operator := client.Operator()
// Dispatch based on the verb argument.
if listPeers {
// Fetch the current configuration.
q := &api.QueryOptions{
AllowStale: stale,
Token: token,
}
reply, err := operator.RaftGetConfiguration(q)
if err != nil {
return err
}
// Format it as a nice table.
result := []string{"Node|ID|Address|State|Voter"}
for _, s := range reply.Configuration.Servers {
node := "(unknown)"
if mappedNode, ok := reply.NodeMap[s.ID]; ok {
node = mappedNode
}
state := "follower"
if s.ID == reply.Leader {
state = "leader"
}
voter := s.Suffrage == raft.Voter
result = append(result, fmt.Sprintf("%s|%s|%s|%s|%v",
node, s.ID, s.Address, state, voter))
}
c.Ui.Output(columnize.SimpleFormat(result))
} else if removePeer {
// TODO (slackpad) Once we expose IDs, add support for removing
// by ID, add support for that.
if len(address) == 0 {
return fmt.Errorf("an address is required for the peer to remove")
}
// Try to kick the peer.
w := &api.WriteOptions{
Token: token,
}
sa := raft.ServerAddress(address)
if err := operator.RaftRemovePeerByAddress(sa, w); err != nil {
return err
}
c.Ui.Output(fmt.Sprintf("Removed peer with address %q", address))
} else {
c.Ui.Output(c.Help())
c.Ui.Output("")
c.Ui.Output(strings.TrimSpace(raftHelp))
}
return nil
}

52
command/operator_test.go Normal file
View File

@ -0,0 +1,52 @@
package command
import (
"strings"
"testing"
"github.com/mitchellh/cli"
)
func TestOperator_Implements(t *testing.T) {
var _ cli.Command = &OperatorCommand{}
}
func TestOperator_Raft_ListPeers(t *testing.T) {
a1 := testAgent(t)
defer a1.Shutdown()
waitForLeader(t, a1.httpAddr)
ui := new(cli.MockUi)
c := &OperatorCommand{Ui: ui}
args := []string{"raft", "-http-addr=" + a1.httpAddr, "-list-peers"}
code := c.Run(args)
if code != 0 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
output := strings.TrimSpace(ui.OutputWriter.String())
if !strings.Contains(output, "leader") {
t.Fatalf("bad: %s", output)
}
}
func TestOperator_Raft_RemovePeer(t *testing.T) {
a1 := testAgent(t)
defer a1.Shutdown()
waitForLeader(t, a1.httpAddr)
ui := new(cli.MockUi)
c := &OperatorCommand{Ui: ui}
args := []string{"raft", "-http-addr=" + a1.httpAddr, "-remove-peer", "-address=nope"}
code := c.Run(args)
if code != 1 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
// If we get this error, it proves we sent the address all they through.
output := strings.TrimSpace(ui.ErrorWriter.String())
if !strings.Contains(output, "address \"nope\" was not found in the Raft configuration") {
t.Fatalf("bad: %s", output)
}
}

View File

@ -103,6 +103,12 @@ func init() {
}, nil
},
"operator": func() (cli.Command, error) {
return &command.OperatorCommand{
Ui: ui,
}, nil
},
"info": func() (cli.Command, error) {
return &command.InfoCommand{
Ui: ui,

120
consul/operator_endpoint.go Normal file
View File

@ -0,0 +1,120 @@
package consul
import (
"fmt"
"net"
"github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/raft"
)
// Operator endpoint is used to perform low-level operator tasks for Consul.
type Operator struct {
srv *Server
}
// RaftGetConfiguration is used to retrieve the current Raft configuration.
func (op *Operator) RaftGetConfiguration(args *structs.DCSpecificRequest, reply *structs.RaftConfigurationResponse) error {
if done, err := op.srv.forward("Operator.RaftGetConfiguration", args, args, reply); done {
return err
}
// This action requires operator read access.
acl, err := op.srv.resolveToken(args.Token)
if err != nil {
return err
}
if acl != nil && !acl.OperatorRead() {
return permissionDeniedErr
}
// We can't fetch the leader and the configuration atomically with
// the current Raft API.
future := op.srv.raft.GetConfiguration()
if err := future.Error(); err != nil {
return err
}
reply.Configuration = future.Configuration()
leader := op.srv.raft.Leader()
// Index the configuration so we can easily look up IDs by address.
idMap := make(map[raft.ServerAddress]raft.ServerID)
for _, s := range reply.Configuration.Servers {
idMap[s.Address] = s.ID
}
// Fill out the node map and leader.
reply.NodeMap = make(map[raft.ServerID]string)
members := op.srv.serfLAN.Members()
for _, member := range members {
valid, parts := agent.IsConsulServer(member)
if !valid {
continue
}
// TODO (slackpad) We need to add a Raft API to get the leader by
// ID so we don't have to do this mapping.
addr := (&net.TCPAddr{IP: member.Addr, Port: parts.Port}).String()
if id, ok := idMap[raft.ServerAddress(addr)]; ok {
reply.NodeMap[id] = member.Name
if leader == raft.ServerAddress(addr) {
reply.Leader = id
}
}
}
return nil
}
// RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft
// quorum but no longer known to Serf or the catalog) by address in the form of
// "IP:port". The reply argument is not used, but it required to fulfill the RPC
// interface.
func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftPeerByAddressRequest, reply *struct{}) error {
if done, err := op.srv.forward("Operator.RaftRemovePeerByAddress", args, args, reply); done {
return err
}
// This is a super dangerous operation that requires operator write
// access.
acl, err := op.srv.resolveToken(args.Token)
if err != nil {
return err
}
if acl != nil && !acl.OperatorWrite() {
return permissionDeniedErr
}
// Since this is an operation designed for humans to use, we will return
// an error if the supplied address isn't among the peers since it's
// likely they screwed up.
{
future := op.srv.raft.GetConfiguration()
if err := future.Error(); err != nil {
return err
}
for _, s := range future.Configuration().Servers {
if s.Address == args.Address {
goto REMOVE
}
}
return fmt.Errorf("address %q was not found in the Raft configuration",
args.Address)
}
REMOVE:
// The Raft library itself will prevent various forms of foot-shooting,
// like making a configuration with no voters. Some consideration was
// given here to adding more checks, but it was decided to make this as
// low-level and direct as possible. We've got ACL coverage to lock this
// down, and if you are an operator, it's assumed you know what you are
// doing if you are calling this. If you remove a peer that's known to
// Serf, for example, it will come back when the leader does a reconcile
// pass.
future := op.srv.raft.RemovePeer(args.Address)
if err := future.Error(); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,229 @@
package consul
import (
"fmt"
"os"
"reflect"
"strings"
"testing"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/raft"
)
func TestOperator_RaftGetConfiguration(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var reply structs.RaftConfigurationResponse
if err := msgpackrpc.CallWithCodec(codec, "Operator.RaftGetConfiguration", &arg, &reply); err != nil {
t.Fatalf("err: %v", err)
}
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
expected := structs.RaftConfigurationResponse{
Configuration: future.Configuration(),
NodeMap: map[raft.ServerID]string{
raft.ServerID(s1.config.RPCAddr.String()): s1.config.NodeName,
},
Leader: raft.ServerID(s1.config.RPCAddr.String()),
}
if !reflect.DeepEqual(reply, expected) {
t.Fatalf("bad: %v", reply)
}
}
func TestOperator_RaftGetConfiguration_ACLDeny(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Make a request with no token to make sure it gets denied.
arg := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var reply structs.RaftConfigurationResponse
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftGetConfiguration", &arg, &reply)
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
t.Fatalf("err: %v", err)
}
// Create an ACL with operator read permissions.
var token string
{
var rules = `
operator = "read"
`
req := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: rules,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
t.Fatalf("err: %v", err)
}
}
// Now it should go through.
arg.Token = token
if err := msgpackrpc.CallWithCodec(codec, "Operator.RaftGetConfiguration", &arg, &reply); err != nil {
t.Fatalf("err: %v", err)
}
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
expected := structs.RaftConfigurationResponse{
Configuration: future.Configuration(),
NodeMap: map[raft.ServerID]string{
raft.ServerID(s1.config.RPCAddr.String()): s1.config.NodeName,
},
Leader: raft.ServerID(s1.config.RPCAddr.String()),
}
if !reflect.DeepEqual(reply, expected) {
t.Fatalf("bad: %v", reply)
}
}
func TestOperator_RaftRemovePeerByAddress(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Try to remove a peer that's not there.
arg := structs.RaftPeerByAddressRequest{
Datacenter: "dc1",
Address: raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", getPort())),
}
var reply struct{}
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply)
if err == nil || !strings.Contains(err.Error(), "not found in the Raft configuration") {
t.Fatalf("err: %v", err)
}
// Add it manually to Raft.
{
future := s1.raft.AddPeer(arg.Address)
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
}
// Make sure it's there.
{
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
configuration := future.Configuration()
if len(configuration.Servers) != 2 {
t.Fatalf("bad: %v", configuration)
}
}
// Remove it, now it should go through.
if err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply); err != nil {
t.Fatalf("err: %v", err)
}
// Make sure it's not there.
{
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
configuration := future.Configuration()
if len(configuration.Servers) != 1 {
t.Fatalf("bad: %v", configuration)
}
}
}
func TestOperator_RaftRemovePeerByAddress_ACLDeny(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Make a request with no token to make sure it gets denied.
arg := structs.RaftPeerByAddressRequest{
Datacenter: "dc1",
Address: raft.ServerAddress(s1.config.RPCAddr.String()),
}
var reply struct{}
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply)
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
t.Fatalf("err: %v", err)
}
// Create an ACL with operator write permissions.
var token string
{
var rules = `
operator = "write"
`
req := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: rules,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
t.Fatalf("err: %v", err)
}
}
// Now it should kick back for being an invalid config, which means it
// tried to do the operation.
arg.Token = token
err = msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply)
if err == nil || !strings.Contains(err.Error(), "at least one voter") {
t.Fatalf("err: %v", err)
}
}

View File

@ -162,15 +162,16 @@ type Server struct {
// Holds the RPC endpoints
type endpoints struct {
Catalog *Catalog
Health *Health
Status *Status
KVS *KVS
Session *Session
Internal *Internal
ACL *ACL
Catalog *Catalog
Coordinate *Coordinate
Health *Health
Internal *Internal
KVS *KVS
Operator *Operator
PreparedQuery *PreparedQuery
Session *Session
Status *Status
Txn *Txn
}
@ -496,27 +497,29 @@ func (s *Server) setupRaft() error {
// setupRPC is used to setup the RPC listener
func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
// Create endpoints
s.endpoints.Status = &Status{s}
s.endpoints.Catalog = &Catalog{s}
s.endpoints.Health = &Health{s}
s.endpoints.KVS = &KVS{s}
s.endpoints.Session = &Session{s}
s.endpoints.Internal = &Internal{s}
s.endpoints.ACL = &ACL{s}
s.endpoints.Catalog = &Catalog{s}
s.endpoints.Coordinate = NewCoordinate(s)
s.endpoints.Health = &Health{s}
s.endpoints.Internal = &Internal{s}
s.endpoints.KVS = &KVS{s}
s.endpoints.Operator = &Operator{s}
s.endpoints.PreparedQuery = &PreparedQuery{s}
s.endpoints.Session = &Session{s}
s.endpoints.Status = &Status{s}
s.endpoints.Txn = &Txn{s}
// Register the handlers
s.rpcServer.Register(s.endpoints.Status)
s.rpcServer.Register(s.endpoints.Catalog)
s.rpcServer.Register(s.endpoints.Health)
s.rpcServer.Register(s.endpoints.KVS)
s.rpcServer.Register(s.endpoints.Session)
s.rpcServer.Register(s.endpoints.Internal)
s.rpcServer.Register(s.endpoints.ACL)
s.rpcServer.Register(s.endpoints.Catalog)
s.rpcServer.Register(s.endpoints.Coordinate)
s.rpcServer.Register(s.endpoints.Health)
s.rpcServer.Register(s.endpoints.Internal)
s.rpcServer.Register(s.endpoints.KVS)
s.rpcServer.Register(s.endpoints.Operator)
s.rpcServer.Register(s.endpoints.PreparedQuery)
s.rpcServer.Register(s.endpoints.Session)
s.rpcServer.Register(s.endpoints.Status)
s.rpcServer.Register(s.endpoints.Txn)
list, err := net.ListenTCP("tcp", s.config.RPCAddr)

View File

@ -0,0 +1,42 @@
package structs
import (
"github.com/hashicorp/raft"
)
// RaftConfigrationResponse is returned when querying for the current Raft
// configuration. This has the low-level Raft structure, as well as some
// supplemental information from Consul.
type RaftConfigurationResponse struct {
// Configuration is the low-level Raft configuration structure.
Configuration raft.Configuration
// NodeMap maps IDs in the Raft configuration to node names known by
// Consul. It's possible that not all configuration entries may have
// an entry here if the node isn't known to Consul. Given how this is
// generated, this may also contain entries that aren't present in the
// Raft configuration.
NodeMap map[raft.ServerID]string
// Leader is the ID of the current Raft leader. This may be blank if
// there isn't one.
Leader raft.ServerID
}
// RaftPeerByAddressRequest is used by the Operator endpoint to apply a Raft
// operation on a specific Raft peer by address in the form of "IP:port".
type RaftPeerByAddressRequest struct {
// Datacenter is the target this request is intended for.
Datacenter string
// Address is the peer to remove, in the form "IP:port".
Address raft.ServerAddress
// WriteRequest holds the ACL token to go along with this request.
WriteRequest
}
// RequestDatacenter returns the datacenter for a given request.
func (op *RaftPeerByAddressRequest) RequestDatacenter() string {
return op.Datacenter
}

View File

@ -0,0 +1,48 @@
---
layout: "docs"
page_title: "Operator (HTTP)"
sidebar_current: "docs-agent-http-operator"
description: >
The operator endpoint provides cluster-level tools for Consul operators.
---
# Operator HTTP Endpoint
The Operator endpoints provide cluster-level tools for Consul operators, such
as interacting with the Raft subsystem. This was added in Consul 0.7.
~> Use this interface with extreme caution, as improper use could lead to a Consul
outage and even loss of data.
If ACLs are enabled then a token with operator privileges may required in
order to use this interface. See the [ACL](/docs/internals/acl.html#operator)
internals guide for more information.
See the [Outage Recovery](/docs/guides/outage.html) guide for some examples of how
these capabilities are used. For a CLI to perform these operations manually, please
see the documentation for the [`consul operator`](/docs/commands/operator.html)
command.
The following endpoints are supported:
* [`/v1/operator/raft/configuration`](#raft-configuration): Inspects the Raft configuration
* [`/v1/operator/raft/peer`](#raft-peer): Operates on Raft peers
Not all endpoints support blocking queries and all consistency modes,
see details in the sections below.
The operator endpoints support the use of ACL Tokens. See the
[ACL](/docs/internals/acl.html#operator) internals guide for more information.
### <a name="raft-configuration"></a> /v1/operator/raft/configuration
The Raft configuration endpoint supports the `GET` method.
#### GET Method
### <a name="raft-peer"></a> /v1/operator/raft/peer
The Raft peer endpoint supports the `DELETE` method.
#### DELETE Method

View File

@ -38,6 +38,7 @@ Available commands are:
lock Execute a command holding a lock
members Lists the members of a Consul cluster
monitor Stream logs from a Consul agent
operator Provides cluster-level tools for Consul operators
reload Triggers the agent to reload configuration files
rtt Estimates network round trip time between nodes
version Prints the Consul version

View File

@ -0,0 +1,100 @@
---
layout: "docs"
page_title: "Commands: Operator"
sidebar_current: "docs-commands-operator"
description: >
The operator command provides cluster-level tools for Consul operators.
---
# Consul Operator
Command: `consul operator`
The `operator` command provides cluster-level tools for Consul operators, such
as interacting with the Raft subsystem. This was added in Consul 0.7.
~> Use this command with extreme caution, as improper use could lead to a Consul
outage and even loss of data.
If ACLs are enabled then a token with operator privileges may required in
order to use this command. Requests are forwarded internally to the leader
if required, so this can be run from any Consul node in a cluster. See the
[ACL](/docs/internals/acl.html#operator) internals guide for more information.
See the [Outage Recovery](/docs/guides/outage.html) guide for some examples of how
this command is used. For an API to perform these operations programatically,
please see the documentation for the [Operator](/docs/agent/http/operator.html)
endpoint.
## Usage
Usage: `consul operator <subcommand> [common options] [action] [options]`
Run `consul operator <subcommand>` with no arguments for help on that
subcommand. The following subcommands are available:
* `raft` - View and modify Consul's Raft configuration.
Options common to all subcommands include:
* `-http-addr` - Address to the HTTP server of the agent you want to contact
to send this command. If this isn't specified, the command will contact
"127.0.0.1:8500" which is the default HTTP address of a Consul agent.
* `-token` - ACL token to use. Defaults to that of agent.
## Raft Operations
The `raft` subcommand is used to view and modify Consul's Raft configuration.
Two actions are available, as detailed in this section.
<a name="raft-list-peers"></a>
`raft -list-peers -stale=[true|false]`
This action displays the current Raft peer configuration.
The `-stale` argument defaults to "false" which means the leader provides the
result. If the cluster is in an outage state without a leader, you may need
to set `-stale` to "true" to get the configuration from a non-leader server.
The output looks like this:
```
Node ID Address State Voter
alice 127.0.0.1:8300 127.0.0.1:8300 follower true
bob 127.0.0.2:8300 127.0.0.2:8300 leader true
carol 127.0.0.3:8300 127.0.0.3:8300 follower true
```
* `Node` is the node name of the server, as known to Consul, or "(unknown)" if
the node is stale at not known.
* `ID` is the ID of the server. This is the same as the `Address` in Consul 0.7
but may be upgraded to a GUID in a future version of Consul.
* `Address` is the IP:port for the server.
* `State` is either "follower" or "leader" depending on the server's role in the
Raft configuration.
* `Voter` is "true" or "false", indicating if the server has a vote in the Raft
configuration. Future versions of Consul may add support for non-voting
servers.
<a name="raft-remove-peer"></a>
`raft -remove-peer -address="IP:port"`
This command removes Consul server with given -address from the Raft
configuration.
The `-address` argument is required and is the "IP:port" for the server to
remove. The port number is usually 8300, unless configured otherwise.
There are rare cases where a peer may be left behind in the Raft quorum even
though the server is no longer present and known to the cluster. This command
can be used to remove the failed server so that it is no longer affects the
Raft quorum. If the server still shows in the output of the
[`consul members`](/docs/commands/members.html) command, it is preferable to
clean up by simply running
[`consul force-leave`](http://localhost:4567/docs/commands/force-leave.html)
instead of this command.

View File

@ -118,6 +118,10 @@
<a href="/docs/commands/monitor.html">monitor</a>
</li>
<li<%= sidebar_current("docs-commands-operator") %>>
<a href="/docs/commands/operator.html">operator</a>
</li>
<li<%= sidebar_current("docs-commands-info") %>>
<a href="/docs/commands/info.html">info</a>
</li>
@ -178,6 +182,10 @@
<a href="/docs/agent/http/coordinate.html">Network Coordinates</a>
</li>
<li<%= sidebar_current("docs-agent-http-operator") %>>
<a href="/docs/agent/http/operator.html">Operator </a>
</li>
<li<%= sidebar_current("docs-agent-http-query") %>>
<a href="/docs/agent/http/query.html">Prepared Queries</a>
</li>