Fixes an issue where servers would delete catalog information set by

the node when they were trying to reconcile a member.
This commit is contained in:
James Phillips 2017-03-23 15:01:46 -07:00
parent d428bc63c1
commit 3f1c4a6f44
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
4 changed files with 132 additions and 2 deletions

View File

@ -450,11 +450,11 @@ func (s *Server) handleAliveMember(member serf.Member) error {
AFTER_CHECK:
s.logger.Printf("[INFO] consul: member '%s' joined, marking health alive", member.Name)
// Register with the catalog
// Register with the catalog.
req := structs.RegisterRequest{
Datacenter: s.config.Datacenter,
ID: types.NodeID(member.Tags["id"]),
Node: member.Name,
ID: types.NodeID(member.Tags["id"]),
Address: member.Addr.String(),
Service: service,
Check: &structs.HealthCheck{
@ -464,6 +464,10 @@ AFTER_CHECK:
Status: structs.HealthPassing,
Output: SerfCheckAliveOutput,
},
// If there's existing information about the node, do not
// clobber it.
SkipNodeUpdate: true,
}
_, err = s.raftApply(structs.RegisterRequestType, &req)
return err
@ -496,6 +500,7 @@ func (s *Server) handleFailedMember(member serf.Member) error {
req := structs.RegisterRequest{
Datacenter: s.config.Datacenter,
Node: member.Name,
ID: types.NodeID(member.Tags["id"]),
Address: member.Addr.String(),
Check: &structs.HealthCheck{
Node: member.Name,
@ -504,6 +509,10 @@ func (s *Server) handleFailedMember(member serf.Member) error {
Status: structs.HealthCritical,
Output: SerfCheckFailedOutput,
},
// If there's existing information about the node, do not
// clobber it.
SkipNodeUpdate: true,
}
_, err = s.raftApply(structs.RegisterRequestType, &req)
return err

View File

@ -354,6 +354,103 @@ func TestLeader_Reconcile(t *testing.T) {
})
}
func TestLeader_Reconcile_Races(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC, "dc1")
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := c1.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
// Wait for the server to reconcile the client and register it.
state := s1.fsm.State()
var nodeAddr string
testutil.WaitForResult(func() (bool, error) {
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
if node != nil {
nodeAddr = node.Address
return true, nil
} else {
return false, nil
}
}, func(err error) {
t.Fatalf("client should be registered")
})
// Add in some metadata via the catalog (as if the agent synced it
// there). We also set the serfHealth check to failing so the reconile
// will attempt to flip it back
req := structs.RegisterRequest{
Datacenter: s1.config.Datacenter,
Node: c1.config.NodeName,
ID: c1.config.NodeID,
Address: nodeAddr,
NodeMeta: map[string]string{"hello": "world"},
Check: &structs.HealthCheck{
Node: c1.config.NodeName,
CheckID: SerfCheckID,
Name: SerfCheckName,
Status: structs.HealthCritical,
Output: "",
},
}
var out struct{}
if err := s1.RPC("Catalog.Register", &req, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Force a reconcile and make sure the metadata stuck around.
if err := s1.reconcile(); err != nil {
t.Fatalf("err: %v", err)
}
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
if node == nil {
t.Fatalf("bad")
}
if hello, ok := node.Meta["hello"]; !ok || hello != "world" {
t.Fatalf("bad")
}
// Fail the member and wait for the health to go critical.
c1.Shutdown()
testutil.WaitForResult(func() (bool, error) {
_, checks, err := state.NodeChecks(nil, c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
return checks[0].Status == structs.HealthCritical, errors.New(checks[0].Status)
}, func(err error) {
t.Fatalf("check status is %v, should be critical", err)
})
// Make sure the metadata didn't get clobbered.
_, node, err = state.GetNode(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
if node == nil {
t.Fatalf("bad")
}
if hello, ok := node.Meta["hello"]; !ok || hello != "world" {
t.Fatalf("bad")
}
}
func TestLeader_LeftServer(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)

View File

@ -201,6 +201,14 @@ type RegisterRequest struct {
Service *NodeService
Check *HealthCheck
Checks HealthChecks
// SkipNodeUpdate can be used when a register request is intended for
// updating a service and/or checks, but doesn't want to overwrite any
// node information if the node is already registered. If the node
// doesn't exist, it will still be created, but if the node exists, any
// node portion of this update will not apply.
SkipNodeUpdate bool
WriteRequest
}
@ -217,6 +225,12 @@ func (r *RegisterRequest) ChangesNode(node *Node) bool {
return true
}
// If we've been asked to skip the node update, then say there are no
// changes.
if r.SkipNodeUpdate {
return false
}
// Check if any of the node-level fields are being changed.
if r.ID != node.ID ||
r.Node != node.Node ||

View File

@ -136,6 +136,16 @@ func TestStructs_RegisterRequest_ChangesNode(t *testing.T) {
t.Fatalf("should change")
}
req.SkipNodeUpdate = true
if req.ChangesNode(node) {
t.Fatalf("should skip")
}
req.SkipNodeUpdate = false
if !req.ChangesNode(node) {
t.Fatalf("should change")
}
restore()
if req.ChangesNode(node) {
t.Fatalf("should not change")