mirror of https://github.com/status-im/consul.git
agent: add variation of force-leave that exclusively works on the WAN (#11722)
Fixes #6548
This commit is contained in:
parent
0c7a2257ec
commit
c46f9f9f31
|
@ -0,0 +1,3 @@
|
|||
```release-note:improvement
|
||||
agent: add variation of force-leave that exclusively works on the WAN
|
||||
```
|
|
@ -1553,9 +1553,10 @@ func (a *Agent) RefreshPrimaryGatewayFallbackAddresses(addrs []string) error {
|
|||
}
|
||||
|
||||
// ForceLeave is used to remove a failed node from the cluster
|
||||
func (a *Agent) ForceLeave(node string, prune bool, entMeta *structs.EnterpriseMeta) (err error) {
|
||||
func (a *Agent) ForceLeave(node string, prune bool, entMeta *structs.EnterpriseMeta) error {
|
||||
a.logger.Info("Force leaving node", "node", node)
|
||||
err = a.delegate.RemoveFailedNode(node, prune, entMeta)
|
||||
|
||||
err := a.delegate.RemoveFailedNode(node, prune, entMeta)
|
||||
if err != nil {
|
||||
a.logger.Warn("Failed to remove node",
|
||||
"node", node,
|
||||
|
@ -1565,6 +1566,25 @@ func (a *Agent) ForceLeave(node string, prune bool, entMeta *structs.EnterpriseM
|
|||
return err
|
||||
}
|
||||
|
||||
// ForceLeaveWAN is used to remove a failed node from the WAN cluster
|
||||
func (a *Agent) ForceLeaveWAN(node string, prune bool, entMeta *structs.EnterpriseMeta) error {
|
||||
a.logger.Info("(WAN) Force leaving node", "node", node)
|
||||
|
||||
srv, ok := a.delegate.(*consul.Server)
|
||||
if !ok {
|
||||
return fmt.Errorf("Must be a server to force-leave a node from the WAN cluster")
|
||||
}
|
||||
|
||||
err := srv.RemoveFailedNodeWAN(node, prune, entMeta)
|
||||
if err != nil {
|
||||
a.logger.Warn("(WAN) Failed to remove node",
|
||||
"node", node,
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// AgentLocalMember is used to retrieve the LAN member for the local node.
|
||||
func (a *Agent) AgentLocalMember() serf.Member {
|
||||
return a.delegate.AgentLocalMember()
|
||||
|
|
|
@ -640,8 +640,15 @@ func (s *HTTPHandlers) AgentForceLeave(resp http.ResponseWriter, req *http.Reque
|
|||
// Check the value of the prune query
|
||||
_, prune := req.URL.Query()["prune"]
|
||||
|
||||
// Check if the WAN is being queried
|
||||
_, wan := req.URL.Query()["wan"]
|
||||
|
||||
addr := strings.TrimPrefix(req.URL.Path, "/v1/agent/force-leave/")
|
||||
return nil, s.agent.ForceLeave(addr, prune, entMeta)
|
||||
if wan {
|
||||
return nil, s.agent.ForceLeaveWAN(addr, prune, entMeta)
|
||||
} else {
|
||||
return nil, s.agent.ForceLeave(addr, prune, entMeta)
|
||||
}
|
||||
}
|
||||
|
||||
// syncChanges is a helper function which wraps a blocking call to sync
|
||||
|
|
|
@ -2265,6 +2265,74 @@ func TestAgent_ForceLeavePrune(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestAgent_ForceLeavePrune_WAN(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
t.Parallel()
|
||||
|
||||
a1 := StartTestAgent(t, TestAgent{Name: "dc1", HCL: `
|
||||
datacenter = "dc1"
|
||||
primary_datacenter = "dc1"
|
||||
gossip_wan {
|
||||
probe_interval = "50ms"
|
||||
suspicion_mult = 2
|
||||
}
|
||||
`})
|
||||
defer a1.Shutdown()
|
||||
|
||||
a2 := StartTestAgent(t, TestAgent{Name: "dc2", HCL: `
|
||||
datacenter = "dc2"
|
||||
primary_datacenter = "dc1"
|
||||
`})
|
||||
defer a2.Shutdown()
|
||||
|
||||
testrpc.WaitForLeader(t, a1.RPC, "dc1")
|
||||
testrpc.WaitForLeader(t, a2.RPC, "dc2")
|
||||
|
||||
// Wait for the WAN join.
|
||||
addr := fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortWAN)
|
||||
_, err := a2.JoinWAN([]string{addr})
|
||||
require.NoError(t, err)
|
||||
|
||||
testrpc.WaitForLeader(t, a1.RPC, "dc2")
|
||||
testrpc.WaitForLeader(t, a2.RPC, "dc1")
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Len(r, a1.WANMembers(), 2)
|
||||
require.Len(r, a2.WANMembers(), 2)
|
||||
})
|
||||
|
||||
wanNodeName_a2 := a2.Config.NodeName + ".dc2"
|
||||
|
||||
// Shutdown and wait for agent being marked as failed, so we wait for full
|
||||
// shutdown of Agent.
|
||||
require.NoError(t, a2.Shutdown())
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
m := a1.WANMembers()
|
||||
for _, member := range m {
|
||||
if member.Name == wanNodeName_a2 {
|
||||
if member.Status != serf.StatusFailed {
|
||||
r.Fatalf("got status %q want %q", member.Status, serf.StatusFailed)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// Force leave now
|
||||
req, err := http.NewRequest("PUT", fmt.Sprintf("/v1/agent/force-leave/%s?prune=1&wan=1", wanNodeName_a2), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
a1.srv.h.ServeHTTP(resp, req)
|
||||
require.Equal(t, http.StatusOK, resp.Code, resp.Body.String())
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Len(r, a1.WANMembers(), 1)
|
||||
})
|
||||
}
|
||||
|
||||
func TestAgent_RegisterCheck(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
|
|
|
@ -1193,6 +1193,18 @@ func (s *Server) RemoveFailedNode(node string, prune bool, entMeta *structs.Ente
|
|||
return s.removeFailedNode(removeFn, node, wanNode, entMeta)
|
||||
}
|
||||
|
||||
// RemoveFailedNodeWAN is used to remove a failed node from the WAN cluster.
|
||||
func (s *Server) RemoveFailedNodeWAN(wanNode string, prune bool, entMeta *structs.EnterpriseMeta) error {
|
||||
var removeFn func(*serf.Serf, string) error
|
||||
if prune {
|
||||
removeFn = (*serf.Serf).RemoveFailedNodePrune
|
||||
} else {
|
||||
removeFn = (*serf.Serf).RemoveFailedNode
|
||||
}
|
||||
|
||||
return s.removeFailedNode(removeFn, "", wanNode, entMeta)
|
||||
}
|
||||
|
||||
// IsLeader checks if this server is the cluster leader
|
||||
func (s *Server) IsLeader() bool {
|
||||
return s.raft.State() == raft.Leader
|
||||
|
|
|
@ -26,6 +26,8 @@ func (s *Server) JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (int,
|
|||
}
|
||||
|
||||
// removeFailedNode is used to remove a failed node from the cluster
|
||||
//
|
||||
// if node is empty, just remove wanNode from the WAN
|
||||
func (s *Server) removeFailedNode(
|
||||
removeFn func(*serf.Serf, string) error,
|
||||
node, wanNode string,
|
||||
|
@ -42,10 +44,12 @@ func (s *Server) removeFailedNode(
|
|||
|
||||
var merr error
|
||||
|
||||
if found, err := maybeRemove(s.serfLAN, node); err != nil {
|
||||
merr = multierror.Append(merr, fmt.Errorf("could not remove failed node from LAN: %w", err))
|
||||
} else if found {
|
||||
foundAny = true
|
||||
if node != "" {
|
||||
if found, err := maybeRemove(s.serfLAN, node); err != nil {
|
||||
merr = multierror.Append(merr, fmt.Errorf("could not remove failed node from LAN: %w", err))
|
||||
} else if found {
|
||||
foundAny = true
|
||||
}
|
||||
}
|
||||
|
||||
if s.serfWAN != nil {
|
||||
|
|
33
api/agent.go
33
api/agent.go
|
@ -1021,25 +1021,36 @@ func (a *Agent) Leave() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type ForceLeaveOpts struct {
|
||||
// Prune indicates if we should remove a failed agent from the list of
|
||||
// members in addition to ejecting it.
|
||||
Prune bool
|
||||
|
||||
// WAN indicates that the request should exclusively target the WAN pool.
|
||||
WAN bool
|
||||
}
|
||||
|
||||
// ForceLeave is used to have the agent eject a failed node
|
||||
func (a *Agent) ForceLeave(node string) error {
|
||||
r := a.c.newRequest("PUT", "/v1/agent/force-leave/"+node)
|
||||
_, resp, err := a.c.doRequest(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closeResponseBody(resp)
|
||||
if err := requireOK(resp); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return a.ForceLeaveOpts(node, ForceLeaveOpts{})
|
||||
}
|
||||
|
||||
// ForceLeavePrune is used to have an a failed agent removed
|
||||
// from the list of members
|
||||
func (a *Agent) ForceLeavePrune(node string) error {
|
||||
return a.ForceLeaveOpts(node, ForceLeaveOpts{Prune: true})
|
||||
}
|
||||
|
||||
// ForceLeaveOpts is used to have the agent eject a failed node or remove it
|
||||
// completely from the list of members.
|
||||
func (a *Agent) ForceLeaveOpts(node string, opts ForceLeaveOpts) error {
|
||||
r := a.c.newRequest("PUT", "/v1/agent/force-leave/"+node)
|
||||
r.params.Set("prune", "1")
|
||||
if opts.Prune {
|
||||
r.params.Set("prune", "1")
|
||||
}
|
||||
if opts.WAN {
|
||||
r.params.Set("wan", "1")
|
||||
}
|
||||
_, resp, err := a.c.doRequest(r)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
|
||||
"github.com/mitchellh/cli"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/command/flags"
|
||||
)
|
||||
|
||||
|
@ -21,14 +22,17 @@ type cmd struct {
|
|||
http *flags.HTTPFlags
|
||||
help string
|
||||
|
||||
//flags
|
||||
// flags
|
||||
prune bool
|
||||
wan bool
|
||||
}
|
||||
|
||||
func (c *cmd) init() {
|
||||
c.flags = flag.NewFlagSet("", flag.ContinueOnError)
|
||||
c.flags.BoolVar(&c.prune, "prune", false,
|
||||
"Remove agent completely from list of members")
|
||||
c.flags.BoolVar(&c.wan, "wan", false,
|
||||
"Exclusively leave the agent from the WAN serf pool.")
|
||||
c.http = &flags.HTTPFlags{}
|
||||
flags.Merge(c.flags, c.http.ClientFlags())
|
||||
flags.Merge(c.flags, c.http.PartitionFlag())
|
||||
|
@ -54,12 +58,10 @@ func (c *cmd) Run(args []string) int {
|
|||
return 1
|
||||
}
|
||||
|
||||
if c.prune {
|
||||
err = client.Agent().ForceLeavePrune(nodes[0])
|
||||
} else {
|
||||
err = client.Agent().ForceLeave(nodes[0])
|
||||
}
|
||||
|
||||
err = client.Agent().ForceLeaveOpts(nodes[0], api.ForceLeaveOpts{
|
||||
Prune: c.prune,
|
||||
WAN: c.wan,
|
||||
})
|
||||
if err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Error force leaving: %s", err))
|
||||
return 1
|
||||
|
@ -88,4 +90,5 @@ Usage: consul force-leave [options] name
|
|||
time before eventually reaping them.
|
||||
|
||||
-prune Remove agent completely from list of members
|
||||
-wan Exclusively leave the agent from the WAN serf pool.
|
||||
`
|
||||
|
|
Loading…
Reference in New Issue