From c46f9f9f3190e04d09c64f43d387c60b65a84554 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Thu, 2 Dec 2021 17:15:10 -0600 Subject: [PATCH] agent: add variation of force-leave that exclusively works on the WAN (#11722) Fixes #6548 --- .changelog/11722.txt | 3 ++ agent/agent.go | 24 ++++++++++- agent/agent_endpoint.go | 9 ++++- agent/agent_endpoint_test.go | 68 ++++++++++++++++++++++++++++++++ agent/consul/server.go | 12 ++++++ agent/consul/server_oss.go | 12 ++++-- api/agent.go | 33 ++++++++++------ command/forceleave/forceleave.go | 17 ++++---- 8 files changed, 153 insertions(+), 25 deletions(-) create mode 100644 .changelog/11722.txt diff --git a/.changelog/11722.txt b/.changelog/11722.txt new file mode 100644 index 0000000000..e0fcec47e0 --- /dev/null +++ b/.changelog/11722.txt @@ -0,0 +1,3 @@ +```release-note:improvement +agent: add variation of force-leave that exclusively works on the WAN +``` diff --git a/agent/agent.go b/agent/agent.go index e048b33aab..bc6aea7503 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1553,9 +1553,10 @@ func (a *Agent) RefreshPrimaryGatewayFallbackAddresses(addrs []string) error { } // ForceLeave is used to remove a failed node from the cluster -func (a *Agent) ForceLeave(node string, prune bool, entMeta *structs.EnterpriseMeta) (err error) { +func (a *Agent) ForceLeave(node string, prune bool, entMeta *structs.EnterpriseMeta) error { a.logger.Info("Force leaving node", "node", node) - err = a.delegate.RemoveFailedNode(node, prune, entMeta) + + err := a.delegate.RemoveFailedNode(node, prune, entMeta) if err != nil { a.logger.Warn("Failed to remove node", "node", node, @@ -1565,6 +1566,25 @@ func (a *Agent) ForceLeave(node string, prune bool, entMeta *structs.EnterpriseM return err } +// ForceLeaveWAN is used to remove a failed node from the WAN cluster +func (a *Agent) ForceLeaveWAN(node string, prune bool, entMeta *structs.EnterpriseMeta) error { + a.logger.Info("(WAN) Force leaving node", "node", node) + + srv, ok := a.delegate.(*consul.Server) + if !ok { + return fmt.Errorf("Must be a server to force-leave a node from the WAN cluster") + } + + err := srv.RemoveFailedNodeWAN(node, prune, entMeta) + if err != nil { + a.logger.Warn("(WAN) Failed to remove node", + "node", node, + "error", err, + ) + } + return err +} + // AgentLocalMember is used to retrieve the LAN member for the local node. func (a *Agent) AgentLocalMember() serf.Member { return a.delegate.AgentLocalMember() diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index f736e4b576..6c7a70837a 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -640,8 +640,15 @@ func (s *HTTPHandlers) AgentForceLeave(resp http.ResponseWriter, req *http.Reque // Check the value of the prune query _, prune := req.URL.Query()["prune"] + // Check if the WAN is being queried + _, wan := req.URL.Query()["wan"] + addr := strings.TrimPrefix(req.URL.Path, "/v1/agent/force-leave/") - return nil, s.agent.ForceLeave(addr, prune, entMeta) + if wan { + return nil, s.agent.ForceLeaveWAN(addr, prune, entMeta) + } else { + return nil, s.agent.ForceLeave(addr, prune, entMeta) + } } // syncChanges is a helper function which wraps a blocking call to sync diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index e9f0d12d84..168f6bbb87 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -2265,6 +2265,74 @@ func TestAgent_ForceLeavePrune(t *testing.T) { }) } +func TestAgent_ForceLeavePrune_WAN(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + + a1 := StartTestAgent(t, TestAgent{Name: "dc1", HCL: ` + datacenter = "dc1" + primary_datacenter = "dc1" + gossip_wan { + probe_interval = "50ms" + suspicion_mult = 2 + } + `}) + defer a1.Shutdown() + + a2 := StartTestAgent(t, TestAgent{Name: "dc2", HCL: ` + datacenter = "dc2" + primary_datacenter = "dc1" + `}) + defer a2.Shutdown() + + testrpc.WaitForLeader(t, a1.RPC, "dc1") + testrpc.WaitForLeader(t, a2.RPC, "dc2") + + // Wait for the WAN join. + addr := fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortWAN) + _, err := a2.JoinWAN([]string{addr}) + require.NoError(t, err) + + testrpc.WaitForLeader(t, a1.RPC, "dc2") + testrpc.WaitForLeader(t, a2.RPC, "dc1") + + retry.Run(t, func(r *retry.R) { + require.Len(r, a1.WANMembers(), 2) + require.Len(r, a2.WANMembers(), 2) + }) + + wanNodeName_a2 := a2.Config.NodeName + ".dc2" + + // Shutdown and wait for agent being marked as failed, so we wait for full + // shutdown of Agent. + require.NoError(t, a2.Shutdown()) + retry.Run(t, func(r *retry.R) { + m := a1.WANMembers() + for _, member := range m { + if member.Name == wanNodeName_a2 { + if member.Status != serf.StatusFailed { + r.Fatalf("got status %q want %q", member.Status, serf.StatusFailed) + } + } + } + }) + + // Force leave now + req, err := http.NewRequest("PUT", fmt.Sprintf("/v1/agent/force-leave/%s?prune=1&wan=1", wanNodeName_a2), nil) + require.NoError(t, err) + + resp := httptest.NewRecorder() + a1.srv.h.ServeHTTP(resp, req) + require.Equal(t, http.StatusOK, resp.Code, resp.Body.String()) + + retry.Run(t, func(r *retry.R) { + require.Len(r, a1.WANMembers(), 1) + }) +} + func TestAgent_RegisterCheck(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/consul/server.go b/agent/consul/server.go index dee51b15b9..b51fc7f7cb 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -1193,6 +1193,18 @@ func (s *Server) RemoveFailedNode(node string, prune bool, entMeta *structs.Ente return s.removeFailedNode(removeFn, node, wanNode, entMeta) } +// RemoveFailedNodeWAN is used to remove a failed node from the WAN cluster. +func (s *Server) RemoveFailedNodeWAN(wanNode string, prune bool, entMeta *structs.EnterpriseMeta) error { + var removeFn func(*serf.Serf, string) error + if prune { + removeFn = (*serf.Serf).RemoveFailedNodePrune + } else { + removeFn = (*serf.Serf).RemoveFailedNode + } + + return s.removeFailedNode(removeFn, "", wanNode, entMeta) +} + // IsLeader checks if this server is the cluster leader func (s *Server) IsLeader() bool { return s.raft.State() == raft.Leader diff --git a/agent/consul/server_oss.go b/agent/consul/server_oss.go index b49b716b91..f6217b9991 100644 --- a/agent/consul/server_oss.go +++ b/agent/consul/server_oss.go @@ -26,6 +26,8 @@ func (s *Server) JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (int, } // removeFailedNode is used to remove a failed node from the cluster +// +// if node is empty, just remove wanNode from the WAN func (s *Server) removeFailedNode( removeFn func(*serf.Serf, string) error, node, wanNode string, @@ -42,10 +44,12 @@ func (s *Server) removeFailedNode( var merr error - if found, err := maybeRemove(s.serfLAN, node); err != nil { - merr = multierror.Append(merr, fmt.Errorf("could not remove failed node from LAN: %w", err)) - } else if found { - foundAny = true + if node != "" { + if found, err := maybeRemove(s.serfLAN, node); err != nil { + merr = multierror.Append(merr, fmt.Errorf("could not remove failed node from LAN: %w", err)) + } else if found { + foundAny = true + } } if s.serfWAN != nil { diff --git a/api/agent.go b/api/agent.go index 785c9e2cca..e3b5d362a0 100644 --- a/api/agent.go +++ b/api/agent.go @@ -1021,25 +1021,36 @@ func (a *Agent) Leave() error { return nil } +type ForceLeaveOpts struct { + // Prune indicates if we should remove a failed agent from the list of + // members in addition to ejecting it. + Prune bool + + // WAN indicates that the request should exclusively target the WAN pool. + WAN bool +} + // ForceLeave is used to have the agent eject a failed node func (a *Agent) ForceLeave(node string) error { - r := a.c.newRequest("PUT", "/v1/agent/force-leave/"+node) - _, resp, err := a.c.doRequest(r) - if err != nil { - return err - } - defer closeResponseBody(resp) - if err := requireOK(resp); err != nil { - return err - } - return nil + return a.ForceLeaveOpts(node, ForceLeaveOpts{}) } // ForceLeavePrune is used to have an a failed agent removed // from the list of members func (a *Agent) ForceLeavePrune(node string) error { + return a.ForceLeaveOpts(node, ForceLeaveOpts{Prune: true}) +} + +// ForceLeaveOpts is used to have the agent eject a failed node or remove it +// completely from the list of members. +func (a *Agent) ForceLeaveOpts(node string, opts ForceLeaveOpts) error { r := a.c.newRequest("PUT", "/v1/agent/force-leave/"+node) - r.params.Set("prune", "1") + if opts.Prune { + r.params.Set("prune", "1") + } + if opts.WAN { + r.params.Set("wan", "1") + } _, resp, err := a.c.doRequest(r) if err != nil { return err diff --git a/command/forceleave/forceleave.go b/command/forceleave/forceleave.go index 6b3672d391..513c175567 100644 --- a/command/forceleave/forceleave.go +++ b/command/forceleave/forceleave.go @@ -6,6 +6,7 @@ import ( "github.com/mitchellh/cli" + "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/command/flags" ) @@ -21,14 +22,17 @@ type cmd struct { http *flags.HTTPFlags help string - //flags + // flags prune bool + wan bool } func (c *cmd) init() { c.flags = flag.NewFlagSet("", flag.ContinueOnError) c.flags.BoolVar(&c.prune, "prune", false, "Remove agent completely from list of members") + c.flags.BoolVar(&c.wan, "wan", false, + "Exclusively leave the agent from the WAN serf pool.") c.http = &flags.HTTPFlags{} flags.Merge(c.flags, c.http.ClientFlags()) flags.Merge(c.flags, c.http.PartitionFlag()) @@ -54,12 +58,10 @@ func (c *cmd) Run(args []string) int { return 1 } - if c.prune { - err = client.Agent().ForceLeavePrune(nodes[0]) - } else { - err = client.Agent().ForceLeave(nodes[0]) - } - + err = client.Agent().ForceLeaveOpts(nodes[0], api.ForceLeaveOpts{ + Prune: c.prune, + WAN: c.wan, + }) if err != nil { c.UI.Error(fmt.Sprintf("Error force leaving: %s", err)) return 1 @@ -88,4 +90,5 @@ Usage: consul force-leave [options] name time before eventually reaping them. -prune Remove agent completely from list of members + -wan Exclusively leave the agent from the WAN serf pool. `