consul: Handle reaping of serf members

This commit is contained in:
Armon Dadgar 2014-03-20 12:51:49 -07:00
parent 73b6ac322b
commit ad8e25917b
3 changed files with 82 additions and 3 deletions

View File

@ -127,6 +127,8 @@ func (s *Server) reconcileMember(member serf.Member) error {
err = s.handleFailedMember(member) err = s.handleFailedMember(member)
case serf.StatusLeft: case serf.StatusLeft:
err = s.handleLeftMember(member) err = s.handleLeftMember(member)
case StatusReap:
err = s.handleReapMember(member)
} }
if err != nil { if err != nil {
s.logger.Printf("[ERR] consul: failed to reconcile member: %v: %v", s.logger.Printf("[ERR] consul: failed to reconcile member: %v: %v",
@ -251,6 +253,17 @@ func (s *Server) handleFailedMember(member serf.Member) error {
// handleLeftMember is used to handle members that gracefully // handleLeftMember is used to handle members that gracefully
// left. They are deregistered if necessary. // left. They are deregistered if necessary.
func (s *Server) handleLeftMember(member serf.Member) error { func (s *Server) handleLeftMember(member serf.Member) error {
return s.handleDeregisterMember("left", member)
}
// handleReapMember is used to handle members that have been
// reaped after a prolonged failure. They are deregistered.
func (s *Server) handleReapMember(member serf.Member) error {
return s.handleDeregisterMember("reaped", member)
}
// handleDeregisterMember is used to deregister a member of a given reason
func (s *Server) handleDeregisterMember(reason string, member serf.Member) error {
state := s.fsm.State() state := s.fsm.State()
// Check if the node does not exists // Check if the node does not exists
@ -258,7 +271,7 @@ func (s *Server) handleLeftMember(member serf.Member) error {
if !found { if !found {
return nil return nil
} }
s.logger.Printf("[INFO] consul: member '%s' left, deregistering", member.Name) s.logger.Printf("[INFO] consul: member '%s' %s, deregistering", member.Name, reason)
// Remove from Raft peers if this was a server // Remove from Raft peers if this was a server
if valid, parts := isConsulServer(member); valid { if valid, parts := isConsulServer(member); valid {

View File

@ -3,6 +3,7 @@ package consul
import ( import (
"fmt" "fmt"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/serf"
"os" "os"
"testing" "testing"
"time" "time"
@ -156,6 +157,57 @@ func TestLeader_LeftMember(t *testing.T) {
} }
} }
func TestLeader_ReapMember(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
// Wait until we have a leader
time.Sleep(100 * time.Millisecond)
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := c1.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
// Wait for registration
time.Sleep(10 * time.Millisecond)
// Should be registered
state := s1.fsm.State()
_, found, _ := state.GetNode(c1.config.NodeName)
if !found {
t.Fatalf("client not registered")
}
// Simulate a node reaping
mems := s1.LANMembers()
var c1mem serf.Member
for _, m := range mems {
if m.Name == c1.config.NodeName {
c1mem = m
c1mem.Status = StatusReap
break
}
}
s1.reconcileCh <- c1mem
// Wait to reconcile
time.Sleep(10 * time.Millisecond)
// Should be deregistered
_, found, _ = state.GetNode(c1.config.NodeName)
if found {
t.Fatalf("client registered")
}
}
func TestLeader_Reconcile(t *testing.T) { func TestLeader_Reconcile(t *testing.T) {
dir1, s1 := testServer(t) dir1, s1 := testServer(t)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)

View File

@ -6,6 +6,12 @@ import (
"strings" "strings"
) )
const (
// StatusReap is used to update the status of a node if we
// are handling a EventMemberReap
StatusReap = serf.MemberStatus(-1)
)
// lanEventHandler is used to handle events from the lan Serf cluster // lanEventHandler is used to handle events from the lan Serf cluster
func (s *Server) lanEventHandler() { func (s *Server) lanEventHandler() {
for { for {
@ -17,11 +23,12 @@ func (s *Server) lanEventHandler() {
case serf.EventMemberLeave: case serf.EventMemberLeave:
fallthrough fallthrough
case serf.EventMemberFailed: case serf.EventMemberFailed:
fallthrough
case serf.EventMemberReap:
s.localMemberEvent(e.(serf.MemberEvent)) s.localMemberEvent(e.(serf.MemberEvent))
case serf.EventMemberUpdate: // Ignore
case serf.EventMemberReap: // Ignore
case serf.EventUser: case serf.EventUser:
s.localEvent(e.(serf.UserEvent)) s.localEvent(e.(serf.UserEvent))
case serf.EventMemberUpdate: // Ignore
case serf.EventQuery: // Ignore case serf.EventQuery: // Ignore
default: default:
s.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e) s.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e)
@ -67,8 +74,15 @@ func (s *Server) localMemberEvent(me serf.MemberEvent) {
return return
} }
// Check if this is a reap event
isReap := me.EventType() == serf.EventMemberReap
// Queue the members for reconciliation // Queue the members for reconciliation
for _, m := range me.Members { for _, m := range me.Members {
// Change the status if this is a reap event
if isReap {
m.Status = StatusReap
}
select { select {
case s.reconcileCh <- m: case s.reconcileCh <- m:
default: default: