consul: Adding reconcilation to handle reaped Serf nodes. Fixes #15.

This commit is contained in:
Armon Dadgar 2014-04-03 15:51:03 -07:00
parent 38ae4711db
commit ed39e90e3a
2 changed files with 94 additions and 0 deletions

View File

@ -6,6 +6,7 @@ import (
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
"net" "net"
"strconv"
"time" "time"
) )
@ -102,10 +103,65 @@ WAIT:
func (s *Server) reconcile() (err error) { func (s *Server) reconcile() (err error) {
defer metrics.MeasureSince([]string{"consul", "leader", "reconcile"}, time.Now()) defer metrics.MeasureSince([]string{"consul", "leader", "reconcile"}, time.Now())
members := s.serfLAN.Members() members := s.serfLAN.Members()
knownMembers := make(map[string]struct{})
for _, member := range members { for _, member := range members {
if err := s.reconcileMember(member); err != nil { if err := s.reconcileMember(member); err != nil {
return err return err
} }
knownMembers[member.Name] = struct{}{}
}
// Reconcile any members that have been reaped while we were not the leader
return s.reconcileReaped(knownMembers)
}
// reconcileReaped is used to reconcile nodes that have failed and been reaped
// from Serf but remain in the catalog. This is done by looking for SerfCheckID
// in a crticial state that does not correspond to a known Serf member. We generate
// a "reap" event to cause the node to be cleaned up.
func (s *Server) reconcileReaped(known map[string]struct{}) error {
state := s.fsm.State()
_, critical := state.ChecksInState(structs.HealthCritical)
for _, check := range critical {
// Ignore any non serf checks
if check.CheckID != SerfCheckID {
continue
}
// Check if this node is "known" by serf
if _, ok := known[check.Node]; ok {
continue
}
// Create a fake member
member := serf.Member{
Name: check.Node,
Tags: map[string]string{
"dc": s.config.Datacenter,
"role": "node",
},
}
// Get the node services, look for ConsulServiceID
_, services := state.NodeServices(check.Node)
serverPort := 0
for _, service := range services.Services {
if service.ID == ConsulServiceID {
serverPort = service.Port
break
}
}
// Create the appropriate tags if this was a server node
if serverPort > 0 {
member.Tags["role"] = "consul"
member.Tags["port"] = strconv.FormatUint(uint64(serverPort), 10)
}
// Attempt to reap this member
if err := s.handleReapMember(member); err != nil {
return err
}
} }
return nil return nil
} }

View File

@ -208,6 +208,44 @@ func TestLeader_ReapMember(t *testing.T) {
} }
} }
func TestLeader_Reconcile_ReapMember(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
// Wait until we have a leader
time.Sleep(100 * time.Millisecond)
// Register a non-existing member
dead := structs.RegisterRequest{
Datacenter: s1.config.Datacenter,
Node: "no-longer-around",
Address: "127.1.1.1",
Check: &structs.HealthCheck{
Node: "no-longer-around",
CheckID: SerfCheckID,
Name: SerfCheckName,
Status: structs.HealthCritical,
},
}
var out struct{}
if err := s1.RPC("Catalog.Register", &dead, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Force a reconciliation
if err := s1.reconcile(); err != nil {
t.Fatalf("err: %v", err)
}
// Node should be gone
state := s1.fsm.State()
_, found, _ := state.GetNode("no-longer-around")
if found {
t.Fatalf("client registered")
}
}
func TestLeader_Reconcile(t *testing.T) { func TestLeader_Reconcile(t *testing.T) {
dir1, s1 := testServer(t) dir1, s1 := testServer(t)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)