2
0
mirror of https://github.com/status-im/consul.git synced 2025-01-14 15:54:40 +00:00

Issue 3452 ()

* Make sure that id and address are set in member created during reaping of catalog nodes that have been removed from serf

* Get address from node table in the state store rather than from service address

* Fix incorrect lookup by checkname instead of node name

* Make sure that serverlookup is called with the right address format, added unit test.

* Address code review comments

* Tweaks style stuff.
This commit is contained in:
preetapan 2017-09-26 22:49:41 -05:00 committed by James Phillips
parent a8f228c2ae
commit 4d9fc638b4
2 changed files with 106 additions and 16 deletions

@ -329,9 +329,8 @@ func (s *Server) getOrCreateAutopilotConfig() (*structs.AutopilotConfig, bool) {
} }
// reconcileReaped is used to reconcile nodes that have failed and been reaped // reconcileReaped is used to reconcile nodes that have failed and been reaped
// from Serf but remain in the catalog. This is done by looking for SerfCheckID // from Serf but remain in the catalog. This is done by looking for unknown nodes with serfHealth checks registered.
// in a critical state that does not correspond to a known Serf member. We generate // We generate a "reap" event to cause the node to be cleaned up.
// a "reap" event to cause the node to be cleaned up.
func (s *Server) reconcileReaped(known map[string]struct{}) error { func (s *Server) reconcileReaped(known map[string]struct{}) error {
state := s.fsm.State() state := s.fsm.State()
_, checks, err := state.ChecksInState(nil, api.HealthAny) _, checks, err := state.ChecksInState(nil, api.HealthAny)
@ -349,6 +348,35 @@ func (s *Server) reconcileReaped(known map[string]struct{}) error {
continue continue
} }
// Get the node services, look for ConsulServiceID
_, services, err := state.NodeServices(nil, check.Node)
if err != nil {
return err
}
serverPort := 0
serverAddr := ""
serverID := ""
CHECKS:
for _, service := range services.Services {
if service.ID == structs.ConsulServiceID {
_, node, err := state.GetNode(check.Node)
if err != nil {
s.logger.Printf("[ERR] consul: Unable to look up node with name %q: %v", check.Node, err)
continue CHECKS
}
serverAddr = node.Address
serverPort = service.Port
lookupAddr := net.JoinHostPort(serverAddr, strconv.Itoa(serverPort))
svr := s.serverLookup.Server(raft.ServerAddress(lookupAddr))
if svr != nil {
serverID = svr.ID
}
break
}
}
// Create a fake member // Create a fake member
member := serf.Member{ member := serf.Member{
Name: check.Node, Name: check.Node,
@ -358,23 +386,12 @@ func (s *Server) reconcileReaped(known map[string]struct{}) error {
}, },
} }
// Get the node services, look for ConsulServiceID
_, services, err := state.NodeServices(nil, check.Node)
if err != nil {
return err
}
serverPort := 0
for _, service := range services.Services {
if service.ID == structs.ConsulServiceID {
serverPort = service.Port
break
}
}
// Create the appropriate tags if this was a server node // Create the appropriate tags if this was a server node
if serverPort > 0 { if serverPort > 0 {
member.Tags["role"] = "consul" member.Tags["role"] = "consul"
member.Tags["port"] = strconv.FormatUint(uint64(serverPort), 10) member.Tags["port"] = strconv.FormatUint(uint64(serverPort), 10)
member.Tags["id"] = serverID
member.Addr = net.ParseIP(serverAddr)
} }
// Attempt to reap this member // Attempt to reap this member

@ -250,6 +250,79 @@ func TestLeader_ReapMember(t *testing.T) {
} }
} }
func TestLeader_ReapServer(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "allow"
c.ACLEnforceVersion8 = true
c.Bootstrap = true
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "allow"
c.ACLEnforceVersion8 = true
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "allow"
c.ACLEnforceVersion8 = true
c.Bootstrap = false
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Try to join
joinLAN(t, s1, s2)
joinLAN(t, s1, s3)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
state := s1.fsm.State()
// s3 should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(s3.config.NodeName)
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
})
// call reconcileReaped with a map that does not contain s3
knownMembers := make(map[string]struct{})
knownMembers[s1.config.NodeName] = struct{}{}
knownMembers[s2.config.NodeName] = struct{}{}
err := s1.reconcileReaped(knownMembers)
if err != nil {
t.Fatalf("Unexpected error :%v", err)
}
// s3 should be deregistered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(s3.config.NodeName)
if err != nil {
r.Fatalf("err: %v", err)
}
if node != nil {
r.Fatalf("server with id %v should not be registered", s3.config.NodeID)
}
})
}
func TestLeader_Reconcile_ReapMember(t *testing.T) { func TestLeader_Reconcile_ReapMember(t *testing.T) {
t.Parallel() t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) { dir1, s1 := testServerWithConfig(t, func(c *Config) {