From ad8e25917bd49fe580b4948d38ca1bdaad58ee88 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 20 Mar 2014 12:51:49 -0700 Subject: [PATCH] consul: Handle reaping of serf members --- consul/leader.go | 15 ++++++++++++- consul/leader_test.go | 52 +++++++++++++++++++++++++++++++++++++++++++ consul/serf.go | 18 +++++++++++++-- 3 files changed, 82 insertions(+), 3 deletions(-) diff --git a/consul/leader.go b/consul/leader.go index 53a30d3224..07b1aedac6 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -127,6 +127,8 @@ func (s *Server) reconcileMember(member serf.Member) error { err = s.handleFailedMember(member) case serf.StatusLeft: err = s.handleLeftMember(member) + case StatusReap: + err = s.handleReapMember(member) } if err != nil { s.logger.Printf("[ERR] consul: failed to reconcile member: %v: %v", @@ -251,6 +253,17 @@ func (s *Server) handleFailedMember(member serf.Member) error { // handleLeftMember is used to handle members that gracefully // left. They are deregistered if necessary. func (s *Server) handleLeftMember(member serf.Member) error { + return s.handleDeregisterMember("left", member) +} + +// handleReapMember is used to handle members that have been +// reaped after a prolonged failure. They are deregistered. +func (s *Server) handleReapMember(member serf.Member) error { + return s.handleDeregisterMember("reaped", member) +} + +// handleDeregisterMember is used to deregister a member of a given reason +func (s *Server) handleDeregisterMember(reason string, member serf.Member) error { state := s.fsm.State() // Check if the node does not exists @@ -258,7 +271,7 @@ func (s *Server) handleLeftMember(member serf.Member) error { if !found { return nil } - s.logger.Printf("[INFO] consul: member '%s' left, deregistering", member.Name) + s.logger.Printf("[INFO] consul: member '%s' %s, deregistering", member.Name, reason) // Remove from Raft peers if this was a server if valid, parts := isConsulServer(member); valid { diff --git a/consul/leader_test.go b/consul/leader_test.go index f34b424e1f..bee57d6d02 100644 --- a/consul/leader_test.go +++ b/consul/leader_test.go @@ -3,6 +3,7 @@ package consul import ( "fmt" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/serf/serf" "os" "testing" "time" @@ -156,6 +157,57 @@ func TestLeader_LeftMember(t *testing.T) { } } +func TestLeader_ReapMember(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, c1 := testClient(t) + defer os.RemoveAll(dir2) + defer c1.Shutdown() + + // Wait until we have a leader + time.Sleep(100 * time.Millisecond) + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := c1.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for registration + time.Sleep(10 * time.Millisecond) + + // Should be registered + state := s1.fsm.State() + _, found, _ := state.GetNode(c1.config.NodeName) + if !found { + t.Fatalf("client not registered") + } + + // Simulate a node reaping + mems := s1.LANMembers() + var c1mem serf.Member + for _, m := range mems { + if m.Name == c1.config.NodeName { + c1mem = m + c1mem.Status = StatusReap + break + } + } + s1.reconcileCh <- c1mem + + // Wait to reconcile + time.Sleep(10 * time.Millisecond) + + // Should be deregistered + _, found, _ = state.GetNode(c1.config.NodeName) + if found { + t.Fatalf("client registered") + } +} + func TestLeader_Reconcile(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) diff --git a/consul/serf.go b/consul/serf.go index 31a19444db..3038d0abfc 100644 --- a/consul/serf.go +++ b/consul/serf.go @@ -6,6 +6,12 @@ import ( "strings" ) +const ( + // StatusReap is used to update the status of a node if we + // are handling a EventMemberReap + StatusReap = serf.MemberStatus(-1) +) + // lanEventHandler is used to handle events from the lan Serf cluster func (s *Server) lanEventHandler() { for { @@ -17,11 +23,12 @@ func (s *Server) lanEventHandler() { case serf.EventMemberLeave: fallthrough case serf.EventMemberFailed: + fallthrough + case serf.EventMemberReap: s.localMemberEvent(e.(serf.MemberEvent)) - case serf.EventMemberUpdate: // Ignore - case serf.EventMemberReap: // Ignore case serf.EventUser: s.localEvent(e.(serf.UserEvent)) + case serf.EventMemberUpdate: // Ignore case serf.EventQuery: // Ignore default: s.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e) @@ -67,8 +74,15 @@ func (s *Server) localMemberEvent(me serf.MemberEvent) { return } + // Check if this is a reap event + isReap := me.EventType() == serf.EventMemberReap + // Queue the members for reconciliation for _, m := range me.Members { + // Change the status if this is a reap event + if isReap { + m.Status = StatusReap + } select { case s.reconcileCh <- m: default: