From 4d9fc638b43a8869be310e213be35aecb8d76fb7 Mon Sep 17 00:00:00 2001 From: preetapan Date: Tue, 26 Sep 2017 22:49:41 -0500 Subject: [PATCH] Issue 3452 (#3500) * Make sure that id and address are set in member created during reaping of catalog nodes that have been removed from serf * Get address from node table in the state store rather than from service address * Fix incorrect lookup by checkname instead of node name * Make sure that serverlookup is called with the right address format, added unit test. * Address code review comments * Tweaks style stuff. --- agent/consul/leader.go | 49 +++++++++++++++++-------- agent/consul/leader_test.go | 73 +++++++++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+), 16 deletions(-) diff --git a/agent/consul/leader.go b/agent/consul/leader.go index e20d5a876e..c6047bb89d 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -329,9 +329,8 @@ func (s *Server) getOrCreateAutopilotConfig() (*structs.AutopilotConfig, bool) { } // reconcileReaped is used to reconcile nodes that have failed and been reaped -// from Serf but remain in the catalog. This is done by looking for SerfCheckID -// in a critical state that does not correspond to a known Serf member. We generate -// a "reap" event to cause the node to be cleaned up. +// from Serf but remain in the catalog. This is done by looking for unknown nodes with serfHealth checks registered. +// We generate a "reap" event to cause the node to be cleaned up. func (s *Server) reconcileReaped(known map[string]struct{}) error { state := s.fsm.State() _, checks, err := state.ChecksInState(nil, api.HealthAny) @@ -349,6 +348,35 @@ func (s *Server) reconcileReaped(known map[string]struct{}) error { continue } + // Get the node services, look for ConsulServiceID + _, services, err := state.NodeServices(nil, check.Node) + if err != nil { + return err + } + serverPort := 0 + serverAddr := "" + serverID := "" + + CHECKS: + for _, service := range services.Services { + if service.ID == structs.ConsulServiceID { + _, node, err := state.GetNode(check.Node) + if err != nil { + s.logger.Printf("[ERR] consul: Unable to look up node with name %q: %v", check.Node, err) + continue CHECKS + } + + serverAddr = node.Address + serverPort = service.Port + lookupAddr := net.JoinHostPort(serverAddr, strconv.Itoa(serverPort)) + svr := s.serverLookup.Server(raft.ServerAddress(lookupAddr)) + if svr != nil { + serverID = svr.ID + } + break + } + } + // Create a fake member member := serf.Member{ Name: check.Node, @@ -358,23 +386,12 @@ func (s *Server) reconcileReaped(known map[string]struct{}) error { }, } - // Get the node services, look for ConsulServiceID - _, services, err := state.NodeServices(nil, check.Node) - if err != nil { - return err - } - serverPort := 0 - for _, service := range services.Services { - if service.ID == structs.ConsulServiceID { - serverPort = service.Port - break - } - } - // Create the appropriate tags if this was a server node if serverPort > 0 { member.Tags["role"] = "consul" member.Tags["port"] = strconv.FormatUint(uint64(serverPort), 10) + member.Tags["id"] = serverID + member.Addr = net.ParseIP(serverAddr) } // Attempt to reap this member diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index 4de48dd834..2e106c4636 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -250,6 +250,79 @@ func TestLeader_ReapMember(t *testing.T) { } } +func TestLeader_ReapServer(t *testing.T) { + t.Parallel() + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "allow" + c.ACLEnforceVersion8 = true + c.Bootstrap = true + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "allow" + c.ACLEnforceVersion8 = true + c.Bootstrap = false + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + dir3, s3 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "allow" + c.ACLEnforceVersion8 = true + c.Bootstrap = false + }) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + + // Try to join + joinLAN(t, s1, s2) + joinLAN(t, s1, s3) + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + state := s1.fsm.State() + + // s3 should be registered + retry.Run(t, func(r *retry.R) { + _, node, err := state.GetNode(s3.config.NodeName) + if err != nil { + r.Fatalf("err: %v", err) + } + if node == nil { + r.Fatal("client not registered") + } + }) + + // call reconcileReaped with a map that does not contain s3 + knownMembers := make(map[string]struct{}) + knownMembers[s1.config.NodeName] = struct{}{} + knownMembers[s2.config.NodeName] = struct{}{} + + err := s1.reconcileReaped(knownMembers) + + if err != nil { + t.Fatalf("Unexpected error :%v", err) + } + // s3 should be deregistered + retry.Run(t, func(r *retry.R) { + _, node, err := state.GetNode(s3.config.NodeName) + if err != nil { + r.Fatalf("err: %v", err) + } + if node != nil { + r.Fatalf("server with id %v should not be registered", s3.config.NodeID) + } + }) + +} + func TestLeader_Reconcile_ReapMember(t *testing.T) { t.Parallel() dir1, s1 := testServerWithConfig(t, func(c *Config) {