mirror of https://github.com/status-im/consul.git
Add CLI/API endpoints for removing peer by ID
This commit is contained in:
parent
73f0e6f8f6
commit
da9c825592
|
@ -227,8 +227,6 @@ func (op *Operator) RaftRemovePeerByAddress(address string, q *WriteOptions) err
|
|||
r := op.c.newRequest("DELETE", "/v1/operator/raft/peer")
|
||||
r.setWriteOptions(q)
|
||||
|
||||
// TODO (slackpad) Currently we made address a query parameter. Once
|
||||
// IDs are in place this will be DELETE /v1/operator/raft/peer/<id>.
|
||||
r.params.Set("address", string(address))
|
||||
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
|
@ -240,6 +238,23 @@ func (op *Operator) RaftRemovePeerByAddress(address string, q *WriteOptions) err
|
|||
return nil
|
||||
}
|
||||
|
||||
// RaftRemovePeerByID is used to kick a stale peer (one that it in the Raft
|
||||
// quorum but no longer known to Serf or the catalog) by ID.
|
||||
func (op *Operator) RaftRemovePeerByID(id string, q *WriteOptions) error {
|
||||
r := op.c.newRequest("DELETE", "/v1/operator/raft/peer")
|
||||
r.setWriteOptions(q)
|
||||
|
||||
r.params.Set("id", string(id))
|
||||
|
||||
_, resp, err := requireOK(op.c.doRequest(r))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp.Body.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// KeyringInstall is used to install a new gossip encryption key into the cluster
|
||||
func (op *Operator) KeyringInstall(key string, q *WriteOptions) error {
|
||||
r := op.c.newRequest("POST", "/v1/operator/keyring")
|
||||
|
|
|
@ -42,23 +42,40 @@ func (s *HTTPServer) OperatorRaftPeer(resp http.ResponseWriter, req *http.Reques
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
var args structs.RaftPeerByAddressRequest
|
||||
var args structs.RaftRemovePeerRequest
|
||||
s.parseDC(req, &args.Datacenter)
|
||||
s.parseToken(req, &args.Token)
|
||||
|
||||
params := req.URL.Query()
|
||||
if _, ok := params["address"]; ok {
|
||||
_, hasID := params["id"]
|
||||
if hasID {
|
||||
args.ID = raft.ServerID(params.Get("id"))
|
||||
}
|
||||
_, hasAddress := params["address"]
|
||||
if hasAddress {
|
||||
args.Address = raft.ServerAddress(params.Get("address"))
|
||||
} else {
|
||||
}
|
||||
|
||||
if !hasID && !hasAddress {
|
||||
resp.WriteHeader(http.StatusBadRequest)
|
||||
resp.Write([]byte("Must specify ?address with IP:port of peer to remove"))
|
||||
resp.Write([]byte("Must specify either ?id with the server's ID or ?address with IP:port of peer to remove"))
|
||||
return nil, nil
|
||||
}
|
||||
if hasID && hasAddress {
|
||||
resp.WriteHeader(http.StatusBadRequest)
|
||||
resp.Write([]byte("Must specify only one of ?id or ?address"))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var reply struct{}
|
||||
if err := s.agent.RPC("Operator.RaftRemovePeerByAddress", &args, &reply); err != nil {
|
||||
method := "Operator.RaftRemovePeerByID"
|
||||
if hasAddress {
|
||||
method = "Operator.RaftRemovePeerByAddress"
|
||||
}
|
||||
if err := s.agent.RPC(method, &args, &reply); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -59,6 +59,23 @@ func TestOperator_RaftPeer(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
body := bytes.NewBuffer(nil)
|
||||
req, err := http.NewRequest("DELETE", "/v1/operator/raft/peer?id=nope", body)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// If we get this error, it proves we sent the ID all the
|
||||
// way through.
|
||||
resp := httptest.NewRecorder()
|
||||
_, err = srv.OperatorRaftPeer(resp, req)
|
||||
if err == nil || !strings.Contains(err.Error(),
|
||||
"id \"nope\" was not found in the Raft configuration") {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestOperator_KeyringInstall(t *testing.T) {
|
||||
|
|
|
@ -87,7 +87,7 @@ func (c *OperatorRaftCommand) raft(args []string) error {
|
|||
}
|
||||
c.Ui.Output(result)
|
||||
} else if removePeer {
|
||||
if err := raftRemovePeers(address, operator); err != nil {
|
||||
if err := raftRemovePeers(address, "", operator); err != nil {
|
||||
return fmt.Errorf("Error removing peer: %v", err)
|
||||
}
|
||||
c.Ui.Output(fmt.Sprintf("Removed peer with address %q", address))
|
||||
|
|
|
@ -38,9 +38,11 @@ func (c *OperatorRaftRemoveCommand) Synopsis() string {
|
|||
func (c *OperatorRaftRemoveCommand) Run(args []string) int {
|
||||
f := c.Command.NewFlagSet(c)
|
||||
|
||||
var address string
|
||||
var address, id string
|
||||
f.StringVar(&address, "address", "",
|
||||
"The address to remove from the Raft configuration.")
|
||||
f.StringVar(&id, "id", "",
|
||||
"The ID to remove from the Raft configuration.")
|
||||
|
||||
if err := c.Command.Parse(args); err != nil {
|
||||
if err == flag.ErrHelp {
|
||||
|
@ -58,25 +60,36 @@ func (c *OperatorRaftRemoveCommand) Run(args []string) int {
|
|||
}
|
||||
|
||||
// Fetch the current configuration.
|
||||
if err := raftRemovePeers(address, client.Operator()); err != nil {
|
||||
if err := raftRemovePeers(address, id, client.Operator()); err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error removing peer: %v", err))
|
||||
return 1
|
||||
}
|
||||
c.Ui.Output(fmt.Sprintf("Removed peer with address %q", address))
|
||||
if address != "" {
|
||||
c.Ui.Output(fmt.Sprintf("Removed peer with address %q", address))
|
||||
} else {
|
||||
c.Ui.Output(fmt.Sprintf("Removed peer with id %q", id))
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
func raftRemovePeers(address string, operator *api.Operator) error {
|
||||
// TODO (slackpad) Once we expose IDs, add support for removing
|
||||
// by ID, add support for that.
|
||||
if len(address) == 0 {
|
||||
return fmt.Errorf("an address is required for the peer to remove")
|
||||
func raftRemovePeers(address, id string, operator *api.Operator) error {
|
||||
if len(address) == 0 && len(id) == 0 {
|
||||
return fmt.Errorf("an address or id is required for the peer to remove")
|
||||
}
|
||||
if len(address) > 0 && len(id) > 0 {
|
||||
return fmt.Errorf("cannot give both an address and id")
|
||||
}
|
||||
|
||||
// Try to kick the peer.
|
||||
if err := operator.RaftRemovePeerByAddress(address, nil); err != nil {
|
||||
return err
|
||||
if len(address) > 0 {
|
||||
if err := operator.RaftRemovePeerByAddress(address, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := operator.RaftRemovePeerByID(id, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -56,4 +56,27 @@ func TestOperator_Raft_RemovePeer(t *testing.T) {
|
|||
t.Fatalf("bad: %s", output)
|
||||
}
|
||||
}
|
||||
|
||||
// Test the remove-peer subcommand with -id
|
||||
{
|
||||
ui := new(cli.MockUi)
|
||||
c := OperatorRaftRemoveCommand{
|
||||
Command: base.Command{
|
||||
Ui: ui,
|
||||
Flags: base.FlagSetHTTP,
|
||||
},
|
||||
}
|
||||
args := []string{"-http-addr=" + a1.httpAddr, "-id=nope"}
|
||||
|
||||
code := c.Run(args)
|
||||
if code != 1 {
|
||||
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
|
||||
}
|
||||
|
||||
// If we get this error, it proves we sent the address all they through.
|
||||
output := strings.TrimSpace(ui.ErrorWriter.String())
|
||||
if !strings.Contains(output, "id \"nope\" was not found in the Raft configuration") {
|
||||
t.Fatalf("bad: %s", output)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -194,7 +194,7 @@ func TestAutopilot_CleanupStaleRaftServer(t *testing.T) {
|
|||
// Add s4 to peers directly
|
||||
s4addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
s4.config.SerfLANConfig.MemberlistConfig.BindPort)
|
||||
s1.raft.AddVoter(raft.ServerID(s4.config.NodeID), raft.ServerAddress(s4addr),0, 0)
|
||||
s1.raft.AddVoter(raft.ServerID(s4.config.NodeID), raft.ServerAddress(s4addr), 0, 0)
|
||||
|
||||
// Verify we have 4 peers
|
||||
peers, err := s1.numPeers()
|
||||
|
|
|
@ -74,7 +74,7 @@ func (op *Operator) RaftGetConfiguration(args *structs.DCSpecificRequest, reply
|
|||
// quorum but no longer known to Serf or the catalog) by address in the form of
|
||||
// "IP:port". The reply argument is not used, but it required to fulfill the RPC
|
||||
// interface.
|
||||
func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftPeerByAddressRequest, reply *struct{}) error {
|
||||
func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftRemovePeerRequest, reply *struct{}) error {
|
||||
if done, err := op.srv.forward("Operator.RaftRemovePeerByAddress", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
@ -99,6 +99,7 @@ func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftPeerByAddressReque
|
|||
}
|
||||
for _, s := range future.Configuration().Servers {
|
||||
if s.Address == args.Address {
|
||||
args.ID = s.ID
|
||||
goto REMOVE
|
||||
}
|
||||
}
|
||||
|
@ -115,7 +116,17 @@ REMOVE:
|
|||
// doing if you are calling this. If you remove a peer that's known to
|
||||
// Serf, for example, it will come back when the leader does a reconcile
|
||||
// pass.
|
||||
future := op.srv.raft.RemovePeer(args.Address)
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var future raft.Future
|
||||
if minRaftProtocol >= 2 {
|
||||
future = op.srv.raft.RemoveServer(args.ID, 0, 0)
|
||||
} else {
|
||||
future = op.srv.raft.RemovePeer(args.Address)
|
||||
}
|
||||
if err := future.Error(); err != nil {
|
||||
op.srv.logger.Printf("[WARN] consul.operator: Failed to remove Raft peer %q: %v",
|
||||
args.Address, err)
|
||||
|
@ -126,6 +137,73 @@ REMOVE:
|
|||
return nil
|
||||
}
|
||||
|
||||
// RaftRemovePeerByID is used to kick a stale peer (one that is in the Raft
|
||||
// quorum but no longer known to Serf or the catalog) by address in the form of
|
||||
// "IP:port". The reply argument is not used, but is required to fulfill the RPC
|
||||
// interface.
|
||||
func (op *Operator) RaftRemovePeerByID(args *structs.RaftRemovePeerRequest, reply *struct{}) error {
|
||||
if done, err := op.srv.forward("Operator.RaftRemovePeerByID", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// This is a super dangerous operation that requires operator write
|
||||
// access.
|
||||
acl, err := op.srv.resolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if acl != nil && !acl.OperatorWrite() {
|
||||
return permissionDeniedErr
|
||||
}
|
||||
|
||||
// Since this is an operation designed for humans to use, we will return
|
||||
// an error if the supplied id isn't among the peers since it's
|
||||
// likely they screwed up.
|
||||
{
|
||||
future := op.srv.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, s := range future.Configuration().Servers {
|
||||
if s.ID == args.ID {
|
||||
args.Address = s.Address
|
||||
goto REMOVE
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("id %q was not found in the Raft configuration",
|
||||
args.ID)
|
||||
}
|
||||
|
||||
REMOVE:
|
||||
// The Raft library itself will prevent various forms of foot-shooting,
|
||||
// like making a configuration with no voters. Some consideration was
|
||||
// given here to adding more checks, but it was decided to make this as
|
||||
// low-level and direct as possible. We've got ACL coverage to lock this
|
||||
// down, and if you are an operator, it's assumed you know what you are
|
||||
// doing if you are calling this. If you remove a peer that's known to
|
||||
// Serf, for example, it will come back when the leader does a reconcile
|
||||
// pass.
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var future raft.Future
|
||||
if minRaftProtocol >= 2 {
|
||||
future = op.srv.raft.RemoveServer(args.ID, 0, 0)
|
||||
} else {
|
||||
future = op.srv.raft.RemovePeer(args.Address)
|
||||
}
|
||||
if err := future.Error(); err != nil {
|
||||
op.srv.logger.Printf("[WARN] consul.operator: Failed to remove Raft peer with id %q: %v",
|
||||
args.ID, err)
|
||||
return err
|
||||
}
|
||||
|
||||
op.srv.logger.Printf("[WARN] consul.operator: Removed Raft peer with id %q", args.ID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration.
|
||||
func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *structs.AutopilotConfig) error {
|
||||
if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done {
|
||||
|
|
|
@ -143,7 +143,7 @@ func TestOperator_RaftRemovePeerByAddress(t *testing.T) {
|
|||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Try to remove a peer that's not there.
|
||||
arg := structs.RaftPeerByAddressRequest{
|
||||
arg := structs.RaftRemovePeerRequest{
|
||||
Datacenter: "dc1",
|
||||
Address: raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", getPort())),
|
||||
}
|
||||
|
@ -205,7 +205,7 @@ func TestOperator_RaftRemovePeerByAddress_ACLDeny(t *testing.T) {
|
|||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Make a request with no token to make sure it gets denied.
|
||||
arg := structs.RaftPeerByAddressRequest{
|
||||
arg := structs.RaftRemovePeerRequest{
|
||||
Datacenter: "dc1",
|
||||
Address: raft.ServerAddress(s1.config.RPCAddr.String()),
|
||||
}
|
||||
|
@ -246,6 +246,122 @@ func TestOperator_RaftRemovePeerByAddress_ACLDeny(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestOperator_RaftRemovePeerByID(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.RaftConfig.ProtocolVersion = 3
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Try to remove a peer that's not there.
|
||||
arg := structs.RaftRemovePeerRequest{
|
||||
Datacenter: "dc1",
|
||||
ID: raft.ServerID("e35bde83-4e9c-434f-a6ef-453f44ee21ea"),
|
||||
}
|
||||
var reply struct{}
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply)
|
||||
if err == nil || !strings.Contains(err.Error(), "not found in the Raft configuration") {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Add it manually to Raft.
|
||||
{
|
||||
future := s1.raft.AddVoter(arg.ID, raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", getPort())), 0, 0)
|
||||
if err := future.Error(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure it's there.
|
||||
{
|
||||
future := s1.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
configuration := future.Configuration()
|
||||
if len(configuration.Servers) != 2 {
|
||||
t.Fatalf("bad: %v", configuration)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove it, now it should go through.
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Make sure it's not there.
|
||||
{
|
||||
future := s1.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
configuration := future.Configuration()
|
||||
if len(configuration.Servers) != 1 {
|
||||
t.Fatalf("bad: %v", configuration)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_RaftRemovePeerByID_ACLDeny(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLMasterToken = "root"
|
||||
c.ACLDefaultPolicy = "deny"
|
||||
c.RaftConfig.ProtocolVersion = 3
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Make a request with no token to make sure it gets denied.
|
||||
arg := structs.RaftRemovePeerRequest{
|
||||
Datacenter: "dc1",
|
||||
ID: raft.ServerID(s1.config.NodeID),
|
||||
}
|
||||
var reply struct{}
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply)
|
||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Create an ACL with operator write permissions.
|
||||
var token string
|
||||
{
|
||||
var rules = `
|
||||
operator = "write"
|
||||
`
|
||||
|
||||
req := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTypeClient,
|
||||
Rules: rules,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Now it should kick back for being an invalid config, which means it
|
||||
// tried to do the operation.
|
||||
arg.Token = token
|
||||
err = msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply)
|
||||
if err == nil || !strings.Contains(err.Error(), "at least one voter") {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperator_Autopilot_GetConfiguration(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.AutopilotConfig.CleanupDeadServers = false
|
||||
|
|
|
@ -73,21 +73,24 @@ type RaftConfigurationResponse struct {
|
|||
Index uint64
|
||||
}
|
||||
|
||||
// RaftPeerByAddressRequest is used by the Operator endpoint to apply a Raft
|
||||
// RaftRemovePeerRequest is used by the Operator endpoint to apply a Raft
|
||||
// operation on a specific Raft peer by address in the form of "IP:port".
|
||||
type RaftPeerByAddressRequest struct {
|
||||
type RaftRemovePeerRequest struct {
|
||||
// Datacenter is the target this request is intended for.
|
||||
Datacenter string
|
||||
|
||||
// Address is the peer to remove, in the form "IP:port".
|
||||
Address raft.ServerAddress
|
||||
|
||||
// ID is the peer ID to remove.
|
||||
ID raft.ServerID
|
||||
|
||||
// WriteRequest holds the ACL token to go along with this request.
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
// RequestDatacenter returns the datacenter for a given request.
|
||||
func (op *RaftPeerByAddressRequest) RequestDatacenter() string {
|
||||
func (op *RaftRemovePeerRequest) RequestDatacenter() string {
|
||||
return op.Datacenter
|
||||
}
|
||||
|
||||
|
|
|
@ -688,9 +688,9 @@ even though the server is no longer present and known to the cluster. This
|
|||
endpoint can be used to remove the failed server so that it is no longer
|
||||
affects the Raft quorum.
|
||||
|
||||
An `?address=` query parameter is required and should be set to the
|
||||
`IP:port` for the server to remove. The port number is usually 8300, unless
|
||||
configured otherwise. Nothing is required in the body of the request.
|
||||
Either an `?id=` or `?address=` query parameter is required and should be set to the
|
||||
peer ID or `IP:port` respectively for the server to remove. The port number is usually
|
||||
8300, unless configured otherwise. Nothing is required in the body of the request.
|
||||
|
||||
By default, the datacenter of the agent is targeted; however, the `dc` can be
|
||||
provided using the `?dc=` query parameter.
|
||||
|
|
|
@ -78,4 +78,6 @@ Usage: `consul operator raft remove-peer -address="IP:port"`
|
|||
* `-address` - "IP:port" for the server to remove. The port number is usually
|
||||
8300, unless configured otherwise.
|
||||
|
||||
* `-id` - ID of the server to remove.
|
||||
|
||||
The return code will indicate success or failure.
|
||||
|
|
Loading…
Reference in New Issue