diff --git a/agent/agent_test.go b/agent/agent_test.go index 43660671dc..ef0be73b84 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -7,6 +7,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math/rand" "net" "net/http" "net/http/httptest" @@ -584,7 +585,13 @@ func testAgent_AddServices_AliasUpdateCheckNotReverted(t *testing.T, extraHCL st services := make([]*structs.ServiceDefinition, numServices) checkIDs := make([]types.CheckID, numServices) - for i := 0; i < numServices; i++ { + services[0] = &structs.ServiceDefinition{ + ID: "fake", + Name: "fake", + Port: 8080, + Checks: []*structs.CheckType{}, + } + for i := 1; i < numServices; i++ { name := fmt.Sprintf("web-%d", i) services[i] = &structs.ServiceDefinition{ @@ -621,6 +628,152 @@ func testAgent_AddServices_AliasUpdateCheckNotReverted(t *testing.T, extraHCL st }) } +func test_createAlias(t *testing.T, agent *TestAgent, chk *structs.CheckType, expectedResult string) func(r *retry.R) { + t.Helper() + serviceNum := rand.Int() + srv := &structs.NodeService{ + Service: fmt.Sprintf("serviceAlias-%d", serviceNum), + Tags: []string{"tag1"}, + Port: 8900 + serviceNum, + } + if srv.ID == "" { + srv.ID = fmt.Sprintf("serviceAlias-%d", serviceNum) + } + chk.Status = api.HealthWarning + if chk.CheckID == "" { + chk.CheckID = types.CheckID(fmt.Sprintf("check-%d", serviceNum)) + } + err := agent.AddService(srv, []*structs.CheckType{chk}, false, "", ConfigSourceLocal) + assert.NoError(t, err) + return func(r *retry.R) { + t.Helper() + found := false + for _, c := range agent.State.CheckStates(structs.WildcardEnterpriseMeta()) { + if c.Check.CheckID == chk.CheckID { + found = true + assert.Equal(t, expectedResult, c.Check.Status, "Check state should be %s, was %s in %#v", expectedResult, c.Check.Status, c.Check) + var srvID structs.ServiceID + srvID.Init(srv.ID, structs.WildcardEnterpriseMeta()) + if err := agent.Agent.State.RemoveService(structs.ServiceID(srvID)); err != nil { + fmt.Println("[DEBUG] Fail to remove service", srvID, ", err:=", err) + } + fmt.Println("[DEBUG] Service Removed", srvID, ", err:=", err) + break + } + } + assert.True(t, found) + } +} + +// TestAgent_CheckAliasRPC test the Alias Check to be properly sync remotely +// and locally. +// It contains a few hacks such as unlockIndexOnNode because watch performed +// in CheckAlias.runQuery() waits for 1 min, so Shutdoww the agent might take time +// So, we ensure the agent will update regularilly the index +func TestAgent_CheckAliasRPC(t *testing.T) { + t.Helper() + + a := NewTestAgent(t, ` + node_name = "node1" + `) + + srv := &structs.NodeService{ + ID: "svcid1", + Service: "svcname1", + Tags: []string{"tag1"}, + Port: 8100, + } + unlockIndexOnNode := func() { + // We ensure to not block and update Agent's index + srv.Tags = []string{fmt.Sprintf("tag-%s", time.Now())} + assert.NoError(t, a.waitForUp()) + err := a.AddService(srv, []*structs.CheckType{}, false, "", ConfigSourceLocal) + assert.NoError(t, err) + } + shutdownAgent := func() { + // This is to be sure Alias Checks on remote won't be blocked during 1 min + unlockIndexOnNode() + fmt.Println("[DEBUG] STARTING shutdown for TestAgent_CheckAliasRPC", time.Now()) + go a.Shutdown() + unlockIndexOnNode() + fmt.Println("[DEBUG] DONE shutdown for TestAgent_CheckAliasRPC", time.Now()) + } + defer shutdownAgent() + testrpc.WaitForTestAgent(t, a.RPC, "dc1") + + assert.NoError(t, a.waitForUp()) + err := a.AddService(srv, []*structs.CheckType{}, false, "", ConfigSourceLocal) + assert.NoError(t, err) + + retry.Run(t, func(r *retry.R) { + t.Helper() + var args structs.NodeSpecificRequest + args.Datacenter = "dc1" + args.Node = "node1" + args.AllowStale = true + var out structs.IndexedNodeServices + err := a.RPC("Catalog.NodeServices", &args, &out) + assert.NoError(r, err) + foundService := false + var lookup structs.ServiceID + lookup.Init("svcid1", structs.WildcardEnterpriseMeta()) + for _, srv := range out.NodeServices.Services { + sid := srv.CompoundServiceID() + if lookup.Matches(&sid) { + foundService = true + } + } + assert.True(r, foundService, "could not find svcid1 in %#v", out.NodeServices.Services) + }) + + checks := make([](func(*retry.R)), 0) + + checks = append(checks, test_createAlias(t, a, &structs.CheckType{ + Name: "Check_Local_Ok", + AliasService: "svcid1", + }, api.HealthPassing)) + + checks = append(checks, test_createAlias(t, a, &structs.CheckType{ + Name: "Check_Local_Fail", + AliasService: "svcidNoExistingID", + }, api.HealthCritical)) + + checks = append(checks, test_createAlias(t, a, &structs.CheckType{ + Name: "Check_Remote_Host_Ok", + AliasNode: "node1", + AliasService: "svcid1", + }, api.HealthPassing)) + + checks = append(checks, test_createAlias(t, a, &structs.CheckType{ + Name: "Check_Remote_Host_Non_Existing_Service", + AliasNode: "node1", + AliasService: "svcidNoExistingID", + }, api.HealthCritical)) + + // We wait for max 5s for all checks to be in sync + { + for i := 0; i < 50; i++ { + unlockIndexOnNode() + allNonWarning := true + for _, chk := range a.State.Checks(structs.WildcardEnterpriseMeta()) { + if chk.Status == api.HealthWarning { + allNonWarning = false + } + } + if allNonWarning { + break + } else { + time.Sleep(100 * time.Millisecond) + } + } + } + + for _, toRun := range checks { + unlockIndexOnNode() + retry.Run(t, toRun) + } +} + func TestAgent_AddServiceNoExec(t *testing.T) { t.Run("normal", func(t *testing.T) { t.Parallel() diff --git a/agent/checks/alias.go b/agent/checks/alias.go index 2ecc39ebb2..0ef7c0c479 100644 --- a/agent/checks/alias.go +++ b/agent/checks/alias.go @@ -114,8 +114,9 @@ func (c *CheckAlias) runLocal(stopCh chan struct{}) { for _, chk := range checks { checksList = append(checksList, chk) } - - c.processChecks(checksList) + c.processChecks(checksList, func(serviceID *structs.ServiceID) bool { + return c.Notify.ServiceExists(*serviceID) + }) extendRefreshTimer() } @@ -134,12 +135,44 @@ func (c *CheckAlias) runLocal(stopCh chan struct{}) { } } +// CheckIfServiceIDExists is used to determine if a service exists +type CheckIfServiceIDExists func(*structs.ServiceID) bool + +func (c *CheckAlias) checkServiceExistsOnRemoteServer(serviceID *structs.ServiceID) (bool, error) { + args := c.RPCReq + args.Node = c.Node + args.AllowStale = true + args.EnterpriseMeta = c.EnterpriseMeta + // We are late at maximum of 15s compared to leader + args.MaxStaleDuration = time.Duration(15 * time.Second) + attempts := 0 +RETRY_CALL: + var out structs.IndexedNodeServices + attempts++ + if err := c.RPC.RPC("Catalog.NodeServices", &args, &out); err != nil { + if attempts <= 3 { + time.Sleep(time.Duration(attempts) * time.Second) + goto RETRY_CALL + } + return false, err + } + for _, srv := range out.NodeServices.Services { + sid := srv.CompoundServiceID() + if serviceID.Matches(&sid) { + return true, nil + } + } + return false, nil +} + func (c *CheckAlias) runQuery(stopCh chan struct{}) { args := c.RPCReq args.Node = c.Node args.AllowStale = true args.MaxQueryTime = 1 * time.Minute args.EnterpriseMeta = c.EnterpriseMeta + // We are late at maximum of 15s compared to leader + args.MaxStaleDuration = time.Duration(15 * time.Second) var attempt uint for { @@ -173,6 +206,7 @@ func (c *CheckAlias) runQuery(stopCh chan struct{}) { // but for blocking queries isn't that much more efficient since the checks // index is global to the cluster. var out structs.IndexedHealthChecks + if err := c.RPC.RPC("Health.NodeChecks", &args, &out); err != nil { attempt++ if attempt > 1 { @@ -195,29 +229,37 @@ func (c *CheckAlias) runQuery(stopCh chan struct{}) { if args.MinQueryIndex < 1 { args.MinQueryIndex = 1 } - - c.processChecks(out.HealthChecks) + c.processChecks(out.HealthChecks, func(serviceID *structs.ServiceID) bool { + ret, err := c.checkServiceExistsOnRemoteServer(serviceID) + if err != nil { + // We cannot determine if node has the check, let's assume it exists + return true + } + return ret + }) } } // processChecks is a common helper for taking a set of health checks and // using them to update our alias. This is abstracted since the checks can // come from both the remote server as well as local state. -func (c *CheckAlias) processChecks(checks []*structs.HealthCheck) { +func (c *CheckAlias) processChecks(checks []*structs.HealthCheck, CheckIfServiceIDExists CheckIfServiceIDExists) { health := api.HealthPassing msg := "No checks found." + serviceFound := false for _, chk := range checks { - if c.Node != "" && chk.Node != c.Node { + if c.Node != "" && c.Node != chk.Node { continue } - - // We allow ServiceID == "" so that we also check node checks sid := chk.CompoundServiceID() - - if chk.ServiceID != "" && !c.ServiceID.Matches(&sid) { + serviceMatch := c.ServiceID.Matches(&sid) + if chk.ServiceID != "" && !serviceMatch { continue } - + // We have at least one healthcheck for this service + if serviceMatch { + serviceFound = true + } if chk.Status == api.HealthCritical || chk.Status == api.HealthWarning { health = chk.Status msg = fmt.Sprintf("Aliased check %q failing: %s", chk.Name, chk.Output) @@ -228,13 +270,18 @@ func (c *CheckAlias) processChecks(checks []*structs.HealthCheck) { if chk.Status == api.HealthCritical { break } + } else { + // if current health is warning, don't overwrite it + if health == api.HealthPassing { + msg = "All checks passing." + } + } + } + if !serviceFound { + if !CheckIfServiceIDExists(&c.ServiceID) { + msg = fmt.Sprintf("Service %s could not be found on node %s", c.ServiceID.ID, c.Node) + health = api.HealthCritical } - - msg = "All checks passing." } - - // TODO(rb): if no matching checks found should this default to critical? - - // Update our check value c.Notify.UpdateCheck(c.CheckID, health, msg) } diff --git a/agent/checks/alias_test.go b/agent/checks/alias_test.go index 4992c098a8..24147b2c19 100644 --- a/agent/checks/alias_test.go +++ b/agent/checks/alias_test.go @@ -30,7 +30,7 @@ func TestCheckAlias_remoteErrBackoff(t *testing.T) { RPC: rpc, } - rpc.Reply.Store(fmt.Errorf("failure")) + rpc.AddReply("Health.NodeChecks", fmt.Errorf("failure")) chk.Start() defer chk.Stop() @@ -62,7 +62,7 @@ func TestCheckAlias_remoteNoChecks(t *testing.T) { RPC: rpc, } - rpc.Reply.Store(structs.IndexedHealthChecks{}) + rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{}) chk.Start() defer chk.Stop() @@ -88,7 +88,7 @@ func TestCheckAlias_remoteNodeFailure(t *testing.T) { RPC: rpc, } - rpc.Reply.Store(structs.IndexedHealthChecks{ + rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{ HealthChecks: []*structs.HealthCheck{ // Should ignore non-matching node &structs.HealthCheck{ @@ -137,7 +137,7 @@ func TestCheckAlias_remotePassing(t *testing.T) { RPC: rpc, } - rpc.Reply.Store(structs.IndexedHealthChecks{ + rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{ HealthChecks: []*structs.HealthCheck{ // Should ignore non-matching node &structs.HealthCheck{ @@ -171,6 +171,116 @@ func TestCheckAlias_remotePassing(t *testing.T) { }) } +// Remote service has no healtchecks, but service exists on remote host +func TestCheckAlias_remotePassingWithoutChecksButWithService(t *testing.T) { + t.Parallel() + + notify := newMockAliasNotify() + chkID := structs.NewCheckID("foo", nil) + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + ServiceID: structs.ServiceID{ID: "web"}, + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{ + HealthChecks: []*structs.HealthCheck{ + // Should ignore non-matching node + &structs.HealthCheck{ + Node: "A", + ServiceID: "web", + Status: api.HealthCritical, + }, + + // Should ignore non-matching service + &structs.HealthCheck{ + Node: "remote", + ServiceID: "db", + Status: api.HealthCritical, + }, + }, + }) + + injected := structs.IndexedNodeServices{ + NodeServices: &structs.NodeServices{ + Node: &structs.Node{ + Node: "remote", + }, + Services: make(map[string]*structs.NodeService), + }, + QueryMeta: structs.QueryMeta{}, + } + injected.NodeServices.Services["web"] = &structs.NodeService{ + Service: "web", + ID: "web", + } + rpc.AddReply("Catalog.NodeServices", injected) + + chk.Start() + defer chk.Stop() + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthPassing; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} + +// Remote service has no healtchecks, service does not exists on remote host +func TestCheckAlias_remotePassingWithoutChecksAndWithoutService(t *testing.T) { + t.Parallel() + + notify := newMockAliasNotify() + chkID := structs.NewCheckID("foo", nil) + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + ServiceID: structs.ServiceID{ID: "web"}, + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{ + HealthChecks: []*structs.HealthCheck{ + // Should ignore non-matching node + &structs.HealthCheck{ + Node: "A", + ServiceID: "web", + Status: api.HealthCritical, + }, + + // Should ignore non-matching service + &structs.HealthCheck{ + Node: "remote", + ServiceID: "db", + Status: api.HealthCritical, + }, + }, + }) + + injected := structs.IndexedNodeServices{ + NodeServices: &structs.NodeServices{ + Node: &structs.Node{ + Node: "remote", + }, + Services: make(map[string]*structs.NodeService), + }, + QueryMeta: structs.QueryMeta{}, + } + rpc.AddReply("Catalog.NodeServices", injected) + + chk.Start() + defer chk.Stop() + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthCritical; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} + // If any checks are critical, it should be critical func TestCheckAlias_remoteCritical(t *testing.T) { t.Parallel() @@ -186,7 +296,7 @@ func TestCheckAlias_remoteCritical(t *testing.T) { RPC: rpc, } - rpc.Reply.Store(structs.IndexedHealthChecks{ + rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{ HealthChecks: []*structs.HealthCheck{ // Should ignore non-matching node &structs.HealthCheck{ @@ -241,7 +351,7 @@ func TestCheckAlias_remoteWarning(t *testing.T) { RPC: rpc, } - rpc.Reply.Store(structs.IndexedHealthChecks{ + rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{ HealthChecks: []*structs.HealthCheck{ // Should ignore non-matching node &structs.HealthCheck{ @@ -295,7 +405,7 @@ func TestCheckAlias_remoteNodeOnlyPassing(t *testing.T) { RPC: rpc, } - rpc.Reply.Store(structs.IndexedHealthChecks{ + rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{ HealthChecks: []*structs.HealthCheck{ // Should ignore non-matching node &structs.HealthCheck{ @@ -342,7 +452,7 @@ func TestCheckAlias_remoteNodeOnlyCritical(t *testing.T) { RPC: rpc, } - rpc.Reply.Store(structs.IndexedHealthChecks{ + rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{ HealthChecks: []*structs.HealthCheck{ // Should ignore non-matching node &structs.HealthCheck{ @@ -403,9 +513,19 @@ type mockRPC struct { Calls uint32 // Read-only, number of RPC calls Args atomic.Value // Read-only, the last args sent - // Write-only, the reply to send. If of type "error" then an error will + // Write-only, the replies to send, indexed per method. If of type "error" then an error will // be returned from the RPC call. - Reply atomic.Value + Replies map[string]*atomic.Value +} + +func (m *mockRPC) AddReply(method string, reply interface{}) { + if m.Replies == nil { + m.Replies = make(map[string]*atomic.Value) + } + val := &atomic.Value{} + val.Store(reply) + m.Replies[method] = val + } func (m *mockRPC) RPC(method string, args interface{}, reply interface{}) error { @@ -424,12 +544,15 @@ func (m *mockRPC) RPC(method string, args interface{}, reply interface{}) error } replyv = replyv.Elem() // Get pointer value replyv.Set(reflect.Zero(replyv.Type())) // Reset to zero value - if v := m.Reply.Load(); v != nil { + repl := m.Replies[method] + if repl == nil { + return fmt.Errorf("No Such Method: %s", method) + } + if v := m.Replies[method].Load(); v != nil { // Return an error if the reply is an error type if err, ok := v.(error); ok { return err } - replyv.Set(reflect.ValueOf(v)) // Set to reply value if non-nil } @@ -442,6 +565,8 @@ func TestCheckAlias_localInitialStatus(t *testing.T) { t.Parallel() notify := newMockAliasNotify() + // We fake a local service web to ensure check if passing works + notify.Notify.AddServiceID(structs.ServiceID{ID: "web"}) chkID := structs.NewCheckID(types.CheckID("foo"), nil) rpc := &mockRPC{} chk := &CheckAlias{ @@ -463,3 +588,27 @@ func TestCheckAlias_localInitialStatus(t *testing.T) { } }) } + +// Local check on non-existing service +func TestCheckAlias_localInitialStatusShouldFailBecauseNoService(t *testing.T) { + t.Parallel() + + notify := newMockAliasNotify() + chkID := structs.NewCheckID(types.CheckID("foo"), nil) + rpc := &mockRPC{} + chk := &CheckAlias{ + ServiceID: structs.ServiceID{ID: "web"}, + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + chk.Start() + defer chk.Stop() + + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthCritical; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} diff --git a/agent/checks/check.go b/agent/checks/check.go index 9c04ca4233..1bebeffa98 100644 --- a/agent/checks/check.go +++ b/agent/checks/check.go @@ -52,6 +52,8 @@ type RPC interface { // should take care to be idempotent. type CheckNotifier interface { UpdateCheck(checkID structs.CheckID, status, output string) + // ServiceExists return true if the given service does exists + ServiceExists(serviceID structs.ServiceID) bool } // CheckMonitor is used to periodically invoke a script to diff --git a/agent/local/state.go b/agent/local/state.go index 460009a0d6..793cab3f0a 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -485,6 +485,13 @@ func (l *State) AddAliasCheck(checkID structs.CheckID, srcServiceID structs.Serv return nil } +// ServiceExists return true if the given service does exists +func (l *State) ServiceExists(serviceID structs.ServiceID) bool { + l.Lock() + defer l.Unlock() + return l.services[serviceID] != nil +} + // RemoveAliasCheck removes the mapping for the alias check. func (l *State) RemoveAliasCheck(checkID structs.CheckID, srcServiceID structs.ServiceID) { l.Lock() diff --git a/agent/local/state_test.go b/agent/local/state_test.go index f73757b071..872876d8de 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -48,7 +48,9 @@ func TestAgentAntiEntropy_Services(t *testing.T) { }, EnterpriseMeta: *structs.DefaultEnterpriseMeta(), } + assert.False(t, a.State.ServiceExists(structs.ServiceID{ID: srv1.ID})) a.State.AddService(srv1, "") + assert.True(t, a.State.ServiceExists(structs.ServiceID{ID: srv1.ID})) args.Service = srv1 if err := a.RPC("Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) diff --git a/agent/mock/notify.go b/agent/mock/notify.go index 866786c5a1..ccbee20c4c 100644 --- a/agent/mock/notify.go +++ b/agent/mock/notify.go @@ -14,19 +14,31 @@ type Notify struct { // of the notification mock in order to prevent panics // raised by the race conditions detector. sync.RWMutex - state map[structs.CheckID]string - updates map[structs.CheckID]int - output map[structs.CheckID]string + state map[structs.CheckID]string + updates map[structs.CheckID]int + output map[structs.CheckID]string + serviceIDs map[structs.ServiceID]bool } func NewNotify() *Notify { return &Notify{ - state: make(map[structs.CheckID]string), - updates: make(map[structs.CheckID]int), - output: make(map[structs.CheckID]string), + state: make(map[structs.CheckID]string), + updates: make(map[structs.CheckID]int), + output: make(map[structs.CheckID]string), + serviceIDs: make(map[structs.ServiceID]bool), } } +// ServiceExists mock +func (c *Notify) ServiceExists(serviceID structs.ServiceID) bool { + return c.serviceIDs[serviceID] +} + +// AddServiceID will mock a service being present locally +func (c *Notify) AddServiceID(serviceID structs.ServiceID) { + c.serviceIDs[serviceID] = true +} + func NewNotifyChan() (*Notify, chan int) { n := &Notify{ updated: make(chan int),