From 9f048afe299556471e7eb0d609b466354bdd12c9 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Tue, 18 Jul 2017 14:09:19 -0500 Subject: [PATCH 1/3] Fix race condition between removing a service and adding a check for the same service, which was causing orphaned checks --- agent/agent_test.go | 67 +++++++++++++++++++++++++++++++++++++++++++++ agent/local.go | 17 ++++++++---- agent/local_test.go | 2 ++ 3 files changed, 80 insertions(+), 6 deletions(-) diff --git a/agent/agent_test.go b/agent/agent_test.go index c19ab98323..a477682435 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -14,6 +14,8 @@ import ( "testing" "time" + "sync" + "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/consul/structs" "github.com/hashicorp/consul/api" @@ -626,6 +628,71 @@ func TestAgent_RemoveServiceRemovesAllChecks(t *testing.T) { } } +func TestAgent_AddCheck_RemoveServiceRace(t *testing.T) { + t.Parallel() + cfg := TestConfig() + cfg.NodeName = "node1" + a := NewTestAgent(t.Name(), cfg) + defer a.Shutdown() + + // NOTE - trying a few times reproduces the race condition where a check can be added while a service is being deleted. + // This test may return a false positive if Addcheck happens to execute before RemoveService + for i := 0; i < 50; i++ { + addServiceAndChecks(a, t) + //try removing the service and adding a check for that service in two different go routines + var wg sync.WaitGroup + wg.Add(2) + + chk1 := &structs.CheckType{CheckID: "chk1", Name: "chk1", TTL: time.Minute} + hchk1 := &structs.HealthCheck{Node: "node1", CheckID: "chk1", Name: "chk1", Status: "critical", ServiceID: "redis", ServiceName: "redis"} + + go func() { + a.AddCheck(hchk1, chk1, false, "") + wg.Done() + }() + + go func() { + a.RemoveService("redis", false) + wg.Done() + }() + + wg.Wait() + checks := a.state.Checks() + if len(checks) > 0 { + t.Fatalf("Expected checks map to be empty") + } + } + +} + +func addServiceAndChecks(a *TestAgent, t *testing.T) { + svc := &structs.NodeService{ID: "redis", Service: "redis", Port: 8000} + chk1 := &structs.CheckType{CheckID: "chk1", Name: "chk1", TTL: time.Minute} + chk2 := &structs.CheckType{CheckID: "chk2", Name: "chk2", TTL: 2 * time.Minute} + hchk1 := &structs.HealthCheck{Node: "node1", CheckID: "chk1", Name: "chk1", Status: "critical", ServiceID: "redis", ServiceName: "redis"} + hchk2 := &structs.HealthCheck{Node: "node1", CheckID: "chk2", Name: "chk2", Status: "critical", ServiceID: "redis", ServiceName: "redis"} + // register service with chk1 + if err := a.AddService(svc, []*structs.CheckType{chk1}, false, ""); err != nil { + t.Fatal("Failed to register service", err) + } + // verify chk1 exists + if a.state.Checks()["chk1"] == nil { + t.Fatal("Could not find health check chk1") + } + // update the service with chk2 + if err := a.AddService(svc, []*structs.CheckType{chk2}, false, ""); err != nil { + t.Fatal("Failed to update service", err) + } + // check that both checks are there + if got, want := a.state.Checks()["chk1"], hchk1; !verify.Values(t, "", got, want) { + t.FailNow() + } + if got, want := a.state.Checks()["chk2"], hchk2; !verify.Values(t, "", got, want) { + t.FailNow() + } + +} + func TestAgent_AddCheck(t *testing.T) { t.Parallel() cfg := TestConfig() diff --git a/agent/local.go b/agent/local.go index d71ca22116..e2cb8e37ad 100644 --- a/agent/local.go +++ b/agent/local.go @@ -253,12 +253,17 @@ func (l *localState) AddCheck(check *structs.HealthCheck, token string) { l.Lock() defer l.Unlock() - - l.checks[check.CheckID] = check - l.checkStatus[check.CheckID] = syncStatus{} - l.checkTokens[check.CheckID] = token - delete(l.checkCriticalTime, check.CheckID) - l.changeMade() + allow := true + if check.ServiceID != "" { //if there is a serviceID associated with the check, make sure it exists + _, allow = l.services[check.ServiceID] + } + if allow { + l.checks[check.CheckID] = check + l.checkStatus[check.CheckID] = syncStatus{} + l.checkTokens[check.CheckID] = token + delete(l.checkCriticalTime, check.CheckID) + l.changeMade() + } } // RemoveCheck is used to remove a health check from the local state. diff --git a/agent/local_test.go b/agent/local_test.go index de66480dfd..2a6217f3a9 100644 --- a/agent/local_test.go +++ b/agent/local_test.go @@ -1475,6 +1475,8 @@ func TestAgent_checkCriticalTime(t *testing.T) { cfg := TestConfig() l := NewLocalState(cfg, nil) + svc := &structs.NodeService{ID: "redis", Service: "redis", Port: 8000} + l.AddService(svc, "") // Add a passing check and make sure it's not critical. checkID := types.CheckID("redis:1") chk := &structs.HealthCheck{ From 6a257f242e9ee706806e8f1a0df5be80b9536036 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Tue, 18 Jul 2017 16:06:37 -0500 Subject: [PATCH 2/3] Removed unit test, added clarifying comment and returned a friendlier error message similar to the one in agent's AddService method Fixes #3297 --- agent/agent.go | 5 +++- agent/agent_test.go | 67 --------------------------------------------- agent/local.go | 24 ++++++++-------- agent/local_test.go | 1 + 4 files changed, 18 insertions(+), 79 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 0ae753b1ce..c2529bb99f 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1787,7 +1787,10 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, } // Add to the local state for anti-entropy - a.state.AddCheck(check, token) + err := a.state.AddCheck(check, token) + if err != nil { + return err + } // Persist the check if persist && !a.config.DevMode { diff --git a/agent/agent_test.go b/agent/agent_test.go index a477682435..c19ab98323 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -14,8 +14,6 @@ import ( "testing" "time" - "sync" - "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/consul/structs" "github.com/hashicorp/consul/api" @@ -628,71 +626,6 @@ func TestAgent_RemoveServiceRemovesAllChecks(t *testing.T) { } } -func TestAgent_AddCheck_RemoveServiceRace(t *testing.T) { - t.Parallel() - cfg := TestConfig() - cfg.NodeName = "node1" - a := NewTestAgent(t.Name(), cfg) - defer a.Shutdown() - - // NOTE - trying a few times reproduces the race condition where a check can be added while a service is being deleted. - // This test may return a false positive if Addcheck happens to execute before RemoveService - for i := 0; i < 50; i++ { - addServiceAndChecks(a, t) - //try removing the service and adding a check for that service in two different go routines - var wg sync.WaitGroup - wg.Add(2) - - chk1 := &structs.CheckType{CheckID: "chk1", Name: "chk1", TTL: time.Minute} - hchk1 := &structs.HealthCheck{Node: "node1", CheckID: "chk1", Name: "chk1", Status: "critical", ServiceID: "redis", ServiceName: "redis"} - - go func() { - a.AddCheck(hchk1, chk1, false, "") - wg.Done() - }() - - go func() { - a.RemoveService("redis", false) - wg.Done() - }() - - wg.Wait() - checks := a.state.Checks() - if len(checks) > 0 { - t.Fatalf("Expected checks map to be empty") - } - } - -} - -func addServiceAndChecks(a *TestAgent, t *testing.T) { - svc := &structs.NodeService{ID: "redis", Service: "redis", Port: 8000} - chk1 := &structs.CheckType{CheckID: "chk1", Name: "chk1", TTL: time.Minute} - chk2 := &structs.CheckType{CheckID: "chk2", Name: "chk2", TTL: 2 * time.Minute} - hchk1 := &structs.HealthCheck{Node: "node1", CheckID: "chk1", Name: "chk1", Status: "critical", ServiceID: "redis", ServiceName: "redis"} - hchk2 := &structs.HealthCheck{Node: "node1", CheckID: "chk2", Name: "chk2", Status: "critical", ServiceID: "redis", ServiceName: "redis"} - // register service with chk1 - if err := a.AddService(svc, []*structs.CheckType{chk1}, false, ""); err != nil { - t.Fatal("Failed to register service", err) - } - // verify chk1 exists - if a.state.Checks()["chk1"] == nil { - t.Fatal("Could not find health check chk1") - } - // update the service with chk2 - if err := a.AddService(svc, []*structs.CheckType{chk2}, false, ""); err != nil { - t.Fatal("Failed to update service", err) - } - // check that both checks are there - if got, want := a.state.Checks()["chk1"], hchk1; !verify.Values(t, "", got, want) { - t.FailNow() - } - if got, want := a.state.Checks()["chk2"], hchk2; !verify.Values(t, "", got, want) { - t.FailNow() - } - -} - func TestAgent_AddCheck(t *testing.T) { t.Parallel() cfg := TestConfig() diff --git a/agent/local.go b/agent/local.go index e2cb8e37ad..decf7acddf 100644 --- a/agent/local.go +++ b/agent/local.go @@ -247,23 +247,25 @@ func (l *localState) checkToken(checkID types.CheckID) string { // AddCheck is used to add a health check to the local state. // This entry is persistent and the agent will make a best effort to // ensure it is registered -func (l *localState) AddCheck(check *structs.HealthCheck, token string) { +func (l *localState) AddCheck(check *structs.HealthCheck, token string) error { // Set the node name check.Node = l.config.NodeName l.Lock() defer l.Unlock() - allow := true - if check.ServiceID != "" { //if there is a serviceID associated with the check, make sure it exists - _, allow = l.services[check.ServiceID] - } - if allow { - l.checks[check.CheckID] = check - l.checkStatus[check.CheckID] = syncStatus{} - l.checkTokens[check.CheckID] = token - delete(l.checkCriticalTime, check.CheckID) - l.changeMade() + + // if there is a serviceID associated with the check, make sure it exists before adding it + // NOTE - This logic may be moved to be handled within the Agent's Addcheck method after a refactor + if check.ServiceID != "" && l.services[check.ServiceID] == nil { + return fmt.Errorf("ServiceID %q does not exist", check.ServiceID) } + + l.checks[check.CheckID] = check + l.checkStatus[check.CheckID] = syncStatus{} + l.checkTokens[check.CheckID] = token + delete(l.checkCriticalTime, check.CheckID) + l.changeMade() + return nil } // RemoveCheck is used to remove a health check from the local state. diff --git a/agent/local_test.go b/agent/local_test.go index 2a6217f3a9..e1ae506c02 100644 --- a/agent/local_test.go +++ b/agent/local_test.go @@ -1477,6 +1477,7 @@ func TestAgent_checkCriticalTime(t *testing.T) { svc := &structs.NodeService{ID: "redis", Service: "redis", Port: 8000} l.AddService(svc, "") + // Add a passing check and make sure it's not critical. checkID := types.CheckID("redis:1") chk := &structs.HealthCheck{ From e50f0e6722ec37de9333e9c417e06633f0441554 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Tue, 18 Jul 2017 16:54:20 -0500 Subject: [PATCH 3/3] Clean up any watch monitors associated with a failed AddCheck --- agent/agent.go | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index c2529bb99f..7b2fd0d471 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1789,6 +1789,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, // Add to the local state for anti-entropy err := a.state.AddCheck(check, token) if err != nil { + a.cancelCheckMonitors(check.CheckID) return err } @@ -1814,6 +1815,21 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error { a.checkLock.Lock() defer a.checkLock.Unlock() + a.cancelCheckMonitors(checkID) + + if persist { + if err := a.purgeCheck(checkID); err != nil { + return err + } + if err := a.purgeCheckState(checkID); err != nil { + return err + } + } + a.logger.Printf("[DEBUG] agent: removed check %q", checkID) + return nil +} + +func (a *Agent) cancelCheckMonitors(checkID types.CheckID) { // Stop any monitors delete(a.checkReapAfter, checkID) if check, ok := a.checkMonitors[checkID]; ok { @@ -1836,16 +1852,6 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error { check.Stop() delete(a.checkDockers, checkID) } - if persist { - if err := a.purgeCheck(checkID); err != nil { - return err - } - if err := a.purgeCheckState(checkID); err != nil { - return err - } - } - a.logger.Printf("[DEBUG] agent: removed check %q", checkID) - return nil } // updateTTLCheck is used to update the status of a TTL check via the Agent API.