From 632e4a2c692512817fe9e282388d353f39149ad7 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Fri, 29 Jun 2018 18:15:48 -0700 Subject: [PATCH 01/18] agent/checks: add Alias check type --- agent/checks/alias.go | 135 +++++++++++++++++++ agent/checks/alias_test.go | 267 +++++++++++++++++++++++++++++++++++++ agent/checks/check.go | 7 + 3 files changed, 409 insertions(+) create mode 100644 agent/checks/alias.go create mode 100644 agent/checks/alias_test.go diff --git a/agent/checks/alias.go b/agent/checks/alias.go new file mode 100644 index 0000000000..3ee7d62171 --- /dev/null +++ b/agent/checks/alias.go @@ -0,0 +1,135 @@ +package checks + +import ( + "fmt" + "sync" + "time" + + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/types" +) + +// Constants related to alias check backoff. +const ( + checkAliasBackoffMin = 3 // 3 attempts before backing off + checkAliasBackoffMaxWait = 1 * time.Minute // maximum backoff wait time +) + +// CheckAlias is a check type that aliases the health of another service +// instance. If the service aliased has any critical health checks, then +// this check is critical. If the service has no critical but warnings, +// then this check is warning, and if a service has only passing checks, then +// this check is passing. +type CheckAlias struct { + Node string // Node name of the service. If empty, assumed to be this node. + ServiceID string // ID (not name) of the service to alias + + CheckID types.CheckID // ID of this check + RPC RPC // Used to query remote server if necessary + Notify CheckNotifier // For updating the check state + + stop bool + stopCh chan struct{} + stopLock sync.Mutex +} + +// Start is used to start a check ttl, runs until Stop() func (c *CheckAlias) Start() { +func (c *CheckAlias) Start() { + c.stopLock.Lock() + defer c.stopLock.Unlock() + c.stop = false + c.stopCh = make(chan struct{}) + go c.run(c.stopCh) +} + +// Stop is used to stop a check ttl. +func (c *CheckAlias) Stop() { + c.stopLock.Lock() + defer c.stopLock.Unlock() + if !c.stop { + c.stop = true + close(c.stopCh) + } +} + +// run is invoked in a goroutine until Stop() is called. +func (c *CheckAlias) run(stopCh chan struct{}) { + args := structs.NodeSpecificRequest{Node: c.Node} + args.AllowStale = true + args.MaxQueryTime = 1 * time.Minute + + var attempt uint + for { + // Check if we're stopped. We fallthrough and block otherwise, + // which has a maximum time set above so we'll always check for + // stop within a reasonable amount of time. + select { + case <-stopCh: + return + default: + } + + // Backoff if we have to + if attempt > checkAliasBackoffMin { + waitTime := (1 << (attempt - checkAliasBackoffMin)) * time.Second + if waitTime > checkAliasBackoffMaxWait { + waitTime = checkAliasBackoffMaxWait + } + time.Sleep(waitTime) + } + + // Get the current health checks for the specified node. + // + // NOTE(mitchellh): This currently returns ALL health checks for + // a node even though we also have the service ID. This can be + // optimized if we introduce a new RPC endpoint to filter both, + // but for blocking queries isn't that more efficient since the checks + // index is global to the cluster. + var out structs.IndexedHealthChecks + if err := c.RPC.RPC("Health.NodeChecks", &args, &out); err != nil { + attempt++ + continue + } + + attempt = 0 // Reset the attempts so we don't backoff the next + + // Set our index for the next request + args.MinQueryIndex = out.Index + + // We want to ensure that we're always blocking on subsequent requests + // to avoid hot loops. Index 1 is always safe since the min raft index + // is at least 5. Note this shouldn't happen but protecting against this + // case is safer than a 100% CPU loop. + if args.MinQueryIndex < 1 { + args.MinQueryIndex = 1 + } + + health := api.HealthPassing + msg := "All checks passing." + if len(out.HealthChecks) == 0 { + // No health checks means we're healthy by default + msg = "No checks found." + } + for _, chk := range out.HealthChecks { + if chk.ServiceID != c.ServiceID || chk.Node != c.Node { + continue + } + + if chk.Status == api.HealthCritical || chk.Status == api.HealthWarning { + health = chk.Status + msg = fmt.Sprintf("Aliased check %q failing: %s", chk.Name, chk.Output) + + // Critical checks exit the for loop immediately since we + // know that this is the health state. Warnings do not since + // there may still be a critical check. + if chk.Status == api.HealthCritical { + break + } + } + } + + // Update our check value + c.Notify.UpdateCheck(c.CheckID, health, msg) + } +} diff --git a/agent/checks/alias_test.go b/agent/checks/alias_test.go new file mode 100644 index 0000000000..27f089bb57 --- /dev/null +++ b/agent/checks/alias_test.go @@ -0,0 +1,267 @@ +package checks + +import ( + "fmt" + "reflect" + "sync/atomic" + "testing" + "time" + + "github.com/hashicorp/consul/agent/mock" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/testutil/retry" + "github.com/hashicorp/consul/types" + //"github.com/stretchr/testify/require" +) + +// Test that we do a backoff on error. +func TestCheckAlias_remoteErrBackoff(t *testing.T) { + t.Parallel() + + notify := mock.NewNotify() + chkID := types.CheckID("foo") + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + ServiceID: "web", + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.Reply.Store(fmt.Errorf("failure")) + + chk.Start() + defer chk.Stop() + + time.Sleep(100 * time.Millisecond) + if got, want := atomic.LoadUint32(&rpc.Calls), uint32(6); got > want { + t.Fatalf("got %d updates want at most %d", got, want) + } +} + +// No remote health checks should result in passing on the check. +func TestCheckAlias_remoteNoChecks(t *testing.T) { + t.Parallel() + + notify := mock.NewNotify() + chkID := types.CheckID("foo") + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + ServiceID: "web", + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.Reply.Store(structs.IndexedHealthChecks{}) + + chk.Start() + defer chk.Stop() + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthPassing; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} + +// Only passing should result in passing +func TestCheckAlias_remotePassing(t *testing.T) { + t.Parallel() + + notify := mock.NewNotify() + chkID := types.CheckID("foo") + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + ServiceID: "web", + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.Reply.Store(structs.IndexedHealthChecks{ + HealthChecks: []*structs.HealthCheck{ + // Should ignore non-matching node + &structs.HealthCheck{ + Node: "A", + ServiceID: "web", + Status: api.HealthCritical, + }, + + // Should ignore non-matching service + &structs.HealthCheck{ + Node: "remote", + ServiceID: "db", + Status: api.HealthCritical, + }, + + // Match + &structs.HealthCheck{ + Node: "remote", + ServiceID: "web", + Status: api.HealthPassing, + }, + }, + }) + + chk.Start() + defer chk.Stop() + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthPassing; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} + +// If any checks are critical, it should be critical +func TestCheckAlias_remoteCritical(t *testing.T) { + t.Parallel() + + notify := mock.NewNotify() + chkID := types.CheckID("foo") + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + ServiceID: "web", + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.Reply.Store(structs.IndexedHealthChecks{ + HealthChecks: []*structs.HealthCheck{ + // Should ignore non-matching node + &structs.HealthCheck{ + Node: "A", + ServiceID: "web", + Status: api.HealthCritical, + }, + + // Should ignore non-matching service + &structs.HealthCheck{ + Node: "remote", + ServiceID: "db", + Status: api.HealthCritical, + }, + + // Match + &structs.HealthCheck{ + Node: "remote", + ServiceID: "web", + Status: api.HealthPassing, + }, + + &structs.HealthCheck{ + Node: "remote", + ServiceID: "web", + Status: api.HealthCritical, + }, + }, + }) + + chk.Start() + defer chk.Stop() + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthCritical; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} + +// If no checks are critical and at least one is warning, then it should warn +func TestCheckAlias_remoteWarning(t *testing.T) { + t.Parallel() + + notify := mock.NewNotify() + chkID := types.CheckID("foo") + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + ServiceID: "web", + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.Reply.Store(structs.IndexedHealthChecks{ + HealthChecks: []*structs.HealthCheck{ + // Should ignore non-matching node + &structs.HealthCheck{ + Node: "A", + ServiceID: "web", + Status: api.HealthCritical, + }, + + // Should ignore non-matching service + &structs.HealthCheck{ + Node: "remote", + ServiceID: "db", + Status: api.HealthCritical, + }, + + // Match + &structs.HealthCheck{ + Node: "remote", + ServiceID: "web", + Status: api.HealthPassing, + }, + + &structs.HealthCheck{ + Node: "remote", + ServiceID: "web", + Status: api.HealthWarning, + }, + }, + }) + + chk.Start() + defer chk.Stop() + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthWarning; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} + +// mockRPC is an implementation of RPC that can be used for tests. The +// atomic.Value fields can be set concurrently and will reflect on the next +// RPC call. +type mockRPC struct { + Calls uint32 // Read-only, number of RPC calls + Args atomic.Value // Read-only, the last args sent + + // Write-only, the reply to send. If of type "error" then an error will + // be returned from the RPC call. + Reply atomic.Value +} + +func (m *mockRPC) RPC(method string, args interface{}, reply interface{}) error { + atomic.AddUint32(&m.Calls, 1) + m.Args.Store(args) + + // We don't adhere to blocking queries, so this helps prevent + // too much CPU usage on the check loop. + time.Sleep(10 * time.Millisecond) + + // This whole machinery below sets the value of the reply. This is + // basically what net/rpc does internally, though much condensed. + replyv := reflect.ValueOf(reply) + if replyv.Kind() != reflect.Ptr { + return fmt.Errorf("RPC reply must be pointer") + } + replyv = replyv.Elem() // Get pointer value + replyv.Set(reflect.Zero(replyv.Type())) // Reset to zero value + if v := m.Reply.Load(); v != nil { + // Return an error if the reply is an error type + if err, ok := v.(error); ok { + return err + } + + replyv.Set(reflect.ValueOf(v)) // Set to reply value if non-nil + } + + return nil +} diff --git a/agent/checks/check.go b/agent/checks/check.go index bc41206f08..b09bdc8be1 100644 --- a/agent/checks/check.go +++ b/agent/checks/check.go @@ -38,6 +38,13 @@ const ( UserAgent = "Consul Health Check" ) +// RPC is an interface that an RPC client must implement. This is a helper +// interface that is implemented by the agent delegate for checks that need +// to make RPC calls. +type RPC interface { + RPC(method string, args interface{}, reply interface{}) error +} + // CheckNotifier interface is used by the CheckMonitor // to notify when a check has a status update. The update // should take care to be idempotent. From f0658a0ede454bb23a5421dbb7d1cab701e13734 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Fri, 29 Jun 2018 23:09:58 -0700 Subject: [PATCH 02/18] agent/config: support configuring alias check --- agent/config/builder.go | 2 ++ agent/config/config.go | 2 ++ agent/config/runtime_test.go | 22 ++++++++++++++++++++++ agent/structs/check_definition.go | 4 ++++ agent/structs/check_type.go | 19 ++++++++++++++++--- 5 files changed, 46 insertions(+), 3 deletions(-) diff --git a/agent/config/builder.go b/agent/config/builder.go index e7958056e1..940e6f2074 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -1046,6 +1046,8 @@ func (b *Builder) checkVal(v *CheckDefinition) *structs.CheckDefinition { GRPC: b.stringVal(v.GRPC), GRPCUseTLS: b.boolVal(v.GRPCUseTLS), TLSSkipVerify: b.boolVal(v.TLSSkipVerify), + AliasNode: b.stringVal(v.AliasNode), + AliasService: b.stringVal(v.AliasService), Timeout: b.durationVal(fmt.Sprintf("check[%s].timeout", id), v.Timeout), TTL: b.durationVal(fmt.Sprintf("check[%s].ttl", id), v.TTL), DeregisterCriticalServiceAfter: b.durationVal(fmt.Sprintf("check[%s].deregister_critical_service_after", id), v.DeregisterCriticalServiceAfter), diff --git a/agent/config/config.go b/agent/config/config.go index ab81c2f718..c54090b3f7 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -348,6 +348,8 @@ type CheckDefinition struct { GRPC *string `json:"grpc,omitempty" hcl:"grpc" mapstructure:"grpc"` GRPCUseTLS *bool `json:"grpc_use_tls,omitempty" hcl:"grpc_use_tls" mapstructure:"grpc_use_tls"` TLSSkipVerify *bool `json:"tls_skip_verify,omitempty" hcl:"tls_skip_verify" mapstructure:"tls_skip_verify"` + AliasNode *string `json:"alias_node,omitempty" hcl:"alias_node" mapstructure:"alias_node"` + AliasService *string `json:"alias_service,omitempty" hcl:"alias_service" mapstructure:"alias_service"` Timeout *string `json:"timeout,omitempty" hcl:"timeout" mapstructure:"timeout"` TTL *string `json:"ttl,omitempty" hcl:"ttl" mapstructure:"ttl"` DeregisterCriticalServiceAfter *string `json:"deregister_critical_service_after,omitempty" hcl:"deregister_critical_service_after" mapstructure:"deregister_critical_service_after"` diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index 4df0bd5a09..1be66cc4bb 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -1941,6 +1941,24 @@ func TestConfigFlagsAndEdgecases(t *testing.T) { rt.DataDir = dataDir }, }, + { + desc: "alias check with no node", + args: []string{ + `-data-dir=` + dataDir, + }, + json: []string{ + `{ "check": { "name": "a", "alias_service": "foo" } }`, + }, + hcl: []string{ + `check = { name = "a", alias_service = "foo" }`, + }, + patch: func(rt *RuntimeConfig) { + rt.Checks = []*structs.CheckDefinition{ + &structs.CheckDefinition{Name: "a", AliasService: "foo"}, + } + rt.DataDir = dataDir + }, + }, { desc: "multiple service files", args: []string{ @@ -4271,6 +4289,8 @@ func TestSanitize(t *testing.T) { "CheckUpdateInterval": "0s", "Checks": [ { + "AliasNode": "", + "AliasService": "", "DeregisterCriticalServiceAfter": "0s", "DockerContainerID": "", "GRPC": "", @@ -4417,6 +4437,8 @@ func TestSanitize(t *testing.T) { { "Address": "", "Check": { + "AliasNode": "", + "AliasService": "", "CheckID": "", "DeregisterCriticalServiceAfter": "0s", "DockerContainerID": "", diff --git a/agent/structs/check_definition.go b/agent/structs/check_definition.go index 0ee6c20495..42e9692fce 100644 --- a/agent/structs/check_definition.go +++ b/agent/structs/check_definition.go @@ -32,6 +32,8 @@ type CheckDefinition struct { GRPC string GRPCUseTLS bool TLSSkipVerify bool + AliasNode string + AliasService string Timeout time.Duration TTL time.Duration DeregisterCriticalServiceAfter time.Duration @@ -63,6 +65,8 @@ func (c *CheckDefinition) CheckType() *CheckType { Notes: c.Notes, ScriptArgs: c.ScriptArgs, + AliasNode: c.AliasNode, + AliasService: c.AliasService, HTTP: c.HTTP, GRPC: c.GRPC, GRPCUseTLS: c.GRPCUseTLS, diff --git a/agent/structs/check_type.go b/agent/structs/check_type.go index 23a6830777..8586e77263 100644 --- a/agent/structs/check_type.go +++ b/agent/structs/check_type.go @@ -9,10 +9,10 @@ import ( ) // CheckType is used to create either the CheckMonitor or the CheckTTL. -// Six types are supported: Script, HTTP, TCP, Docker, TTL and GRPC. Script, +// The following types are supported: Script, HTTP, TCP, Docker, TTL, GRPC, Alias. Script, // HTTP, Docker, TCP and GRPC all require Interval. Only one of the types may // to be provided: TTL or Script/Interval or HTTP/Interval or TCP/Interval or -// Docker/Interval or GRPC/Interval. +// Docker/Interval or GRPC/Interval or AliasService. type CheckType struct { // fields already embedded in CheckDefinition // Note: CheckType.CheckID == CheckDefinition.ID @@ -31,6 +31,8 @@ type CheckType struct { Method string TCP string Interval time.Duration + AliasNode string + AliasService string DockerContainerID string Shell string GRPC string @@ -56,7 +58,13 @@ func (c *CheckType) Validate() error { if intervalCheck && c.Interval <= 0 { return fmt.Errorf("Interval must be > 0 for Script, HTTP, or TCP checks") } - if !intervalCheck && c.TTL <= 0 { + if intervalCheck && c.IsAlias() { + return fmt.Errorf("Interval cannot be set for Alias checks") + } + if c.IsAlias() && c.TTL > 0 { + return fmt.Errorf("TTL must be not be set for Alias checks") + } + if !intervalCheck && !c.IsAlias() && c.TTL <= 0 { return fmt.Errorf("TTL must be > 0 for TTL checks") } return nil @@ -67,6 +75,11 @@ func (c *CheckType) Empty() bool { return reflect.DeepEqual(c, &CheckType{}) } +// IsAlias checks if this is an alias check. +func (c *CheckType) IsAlias() bool { + return c.AliasService != "" +} + // IsScript checks if this is a check that execs some kind of script. func (c *CheckType) IsScript() bool { return len(c.ScriptArgs) > 0 From 60c75b88dac5dde924682fbf791ed8d8f995b104 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Sat, 30 Jun 2018 06:38:45 -0700 Subject: [PATCH 03/18] agent/checks: reflect node failure as alias check failure --- agent/checks/alias.go | 17 +++++++++---- agent/checks/alias_test.go | 49 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 5 deletions(-) diff --git a/agent/checks/alias.go b/agent/checks/alias.go index 3ee7d62171..1b02842d93 100644 --- a/agent/checks/alias.go +++ b/agent/checks/alias.go @@ -25,9 +25,10 @@ type CheckAlias struct { Node string // Node name of the service. If empty, assumed to be this node. ServiceID string // ID (not name) of the service to alias - CheckID types.CheckID // ID of this check - RPC RPC // Used to query remote server if necessary - Notify CheckNotifier // For updating the check state + CheckID types.CheckID // ID of this check + RPC RPC // Used to query remote server if necessary + RPCReq structs.NodeSpecificRequest // Base request + Notify CheckNotifier // For updating the check state stop bool stopCh chan struct{} @@ -55,7 +56,8 @@ func (c *CheckAlias) Stop() { // run is invoked in a goroutine until Stop() is called. func (c *CheckAlias) run(stopCh chan struct{}) { - args := structs.NodeSpecificRequest{Node: c.Node} + args := c.RPCReq + args.Node = c.Node args.AllowStale = true args.MaxQueryTime = 1 * time.Minute @@ -112,7 +114,12 @@ func (c *CheckAlias) run(stopCh chan struct{}) { msg = "No checks found." } for _, chk := range out.HealthChecks { - if chk.ServiceID != c.ServiceID || chk.Node != c.Node { + if chk.Node != c.Node { + continue + } + + // We allow ServiceID == "" so that we also check node checks + if chk.ServiceID != "" && chk.ServiceID != c.ServiceID { continue } diff --git a/agent/checks/alias_test.go b/agent/checks/alias_test.go index 27f089bb57..431cf116a3 100644 --- a/agent/checks/alias_test.go +++ b/agent/checks/alias_test.go @@ -67,6 +67,55 @@ func TestCheckAlias_remoteNoChecks(t *testing.T) { }) } +// If the node is critical then the check is critical +func TestCheckAlias_remoteNodeFailure(t *testing.T) { + t.Parallel() + + notify := mock.NewNotify() + chkID := types.CheckID("foo") + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + ServiceID: "web", + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.Reply.Store(structs.IndexedHealthChecks{ + HealthChecks: []*structs.HealthCheck{ + // Should ignore non-matching node + &structs.HealthCheck{ + Node: "A", + ServiceID: "web", + Status: api.HealthCritical, + }, + + // Node failure + &structs.HealthCheck{ + Node: "remote", + ServiceID: "", + Status: api.HealthCritical, + }, + + // Match + &structs.HealthCheck{ + Node: "remote", + ServiceID: "web", + Status: api.HealthPassing, + }, + }, + }) + + chk.Start() + defer chk.Stop() + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthCritical; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} + // Only passing should result in passing func TestCheckAlias_remotePassing(t *testing.T) { t.Parallel() From 4a67beb734f8be826f66e0c9520bf7f4edbb9604 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Sat, 30 Jun 2018 06:38:56 -0700 Subject: [PATCH 04/18] agent: run alias checks --- agent/agent.go | 33 +++++++++++++++++++++++++++++++++ agent/agent_test.go | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/agent/agent.go b/agent/agent.go index 13c55b1567..30203224dc 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -151,6 +151,9 @@ type Agent struct { // checkDockers maps the check ID to an associated Docker Exec based check checkDockers map[types.CheckID]*checks.CheckDocker + // checkAliases maps the check ID to an associated Alias checks + checkAliases map[types.CheckID]*checks.CheckAlias + // checkLock protects updates to the check* maps checkLock sync.Mutex @@ -232,6 +235,7 @@ func New(c *config.RuntimeConfig) (*Agent, error) { checkTCPs: make(map[types.CheckID]*checks.CheckTCP), checkGRPCs: make(map[types.CheckID]*checks.CheckGRPC), checkDockers: make(map[types.CheckID]*checks.CheckDocker), + checkAliases: make(map[types.CheckID]*checks.CheckAlias), eventCh: make(chan serf.UserEvent, 1024), eventBuf: make([]*UserEvent, 256), joinLANNotifier: &systemd.Notifier{}, @@ -1329,6 +1333,9 @@ func (a *Agent) ShutdownAgent() error { for _, chk := range a.checkDockers { chk.Stop() } + for _, chk := range a.checkAliases { + chk.Stop() + } // Stop the proxy manager if a.proxyManager != nil { @@ -2017,6 +2024,32 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, monitor.Start() a.checkMonitors[check.CheckID] = monitor + case chkType.IsAlias(): + if existing, ok := a.checkAliases[check.CheckID]; ok { + existing.Stop() + delete(a.checkAliases, check.CheckID) + } + if chkType.Interval < checks.MinInterval { + a.logger.Printf("[WARN] agent: check '%s' has interval below minimum of %v", + check.CheckID, checks.MinInterval) + chkType.Interval = checks.MinInterval + } + + var rpcReq structs.NodeSpecificRequest + rpcReq.Datacenter = a.config.Datacenter + rpcReq.Token = a.tokens.AgentToken() + + chkImpl := &checks.CheckAlias{ + Notify: a.State, + RPC: a.delegate, + RPCReq: rpcReq, + CheckID: check.CheckID, + Node: chkType.AliasNode, + ServiceID: chkType.AliasService, + } + chkImpl.Start() + a.checkAliases[check.CheckID] = chkImpl + default: return fmt.Errorf("Check type is not valid") } diff --git a/agent/agent_test.go b/agent/agent_test.go index f22fc88ff7..702592eb44 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -935,6 +935,42 @@ func TestAgent_AddCheck_GRPC(t *testing.T) { } } +func TestAgent_AddCheck_Alias(t *testing.T) { + t.Parallel() + a := NewTestAgent(t.Name(), "") + defer a.Shutdown() + + health := &structs.HealthCheck{ + Node: "foo", + CheckID: "aliashealth", + Name: "Alias health check", + Status: api.HealthCritical, + } + chk := &structs.CheckType{ + AliasService: "foo", + } + err := a.AddCheck(health, chk, false, "") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure we have a check mapping + sChk, ok := a.State.Checks()["aliashealth"] + if !ok { + t.Fatalf("missing aliashealth check") + } + + // Ensure our check is in the right state + if sChk.Status != api.HealthCritical { + t.Fatalf("check not critical") + } + + // Ensure a check is setup + if _, ok := a.checkAliases["aliashealth"]; !ok { + t.Fatalf("missing aliashealth check") + } +} + func TestAgent_RemoveCheck(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), ` From 7543d270e207ee3dcbc64fce97b95e2d571d5eca Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Sat, 30 Jun 2018 07:23:47 -0700 Subject: [PATCH 05/18] agent/local: support local alias checks --- agent/local/state.go | 54 +++++++++++++++++++++++++++++++++++-- agent/local/state_test.go | 56 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 106 insertions(+), 4 deletions(-) diff --git a/agent/local/state.go b/agent/local/state.go index 1caca1f3d2..6f90353565 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -170,8 +170,9 @@ type State struct { // Services tracks the local services services map[string]*ServiceState - // Checks tracks the local checks - checks map[types.CheckID]*CheckState + // Checks tracks the local checks. checkAliases are aliased checks. + checks map[types.CheckID]*CheckState + checkAliases map[string]map[types.CheckID]chan<- struct{} // metadata tracks the node metadata fields metadata map[string]string @@ -205,6 +206,7 @@ func NewState(c Config, lg *log.Logger, tokens *token.Store) *State { logger: lg, services: make(map[string]*ServiceState), checks: make(map[types.CheckID]*CheckState), + checkAliases: make(map[string]map[types.CheckID]chan<- struct{}), metadata: make(map[string]string), tokens: tokens, managedProxies: make(map[string]*ManagedProxy), @@ -406,6 +408,40 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error { return nil } +// AddAliasCheck creates an alias check. When any check for the srcServiceID +// is changed, checkID will reflect that using the same semantics as +// checks.CheckAlias. +// +// This is a local optimization so that the Alias check doesn't need to +// use blocking queries against the remote server for check updates for +// local services. +func (l *State) AddAliasCheck(checkID types.CheckID, srcServiceID string, notifyCh chan<- struct{}) error { + l.Lock() + defer l.Unlock() + + m, ok := l.checkAliases[srcServiceID] + if !ok { + m = make(map[types.CheckID]chan<- struct{}) + l.checkAliases[srcServiceID] = m + } + m[checkID] = notifyCh + + return nil +} + +// RemoveAliasCheck removes the mapping for the alias check. +func (l *State) RemoveAliasCheck(checkID types.CheckID, srcServiceID string) { + l.Lock() + defer l.Unlock() + + if m, ok := l.checkAliases[srcServiceID]; ok { + delete(m, checkID) + if len(m) == 0 { + delete(l.checkAliases, srcServiceID) + } + } +} + // RemoveCheck is used to remove a health check from the local state. // The agent will make a best effort to ensure it is deregistered // todo(fs): RemoveService returns an error for a non-existant service. RemoveCheck should as well. @@ -486,6 +522,20 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) { return } + // If this is a check for an aliased service, then notify the waiters. + if aliases, ok := l.checkAliases[c.Check.ServiceID]; ok && len(aliases) > 0 { + for _, notifyCh := range aliases { + // Do not block. All notify channels should be buffered to at + // least 1 in which case not-blocking does not result in loss + // of data because a failed send means a notification is + // already queued. + select { + case notifyCh <- struct{}{}: + default: + } + } + } + // Update status and mark out of sync c.Check.Status = status c.Check.Output = output diff --git a/agent/local/state_test.go b/agent/local/state_test.go index 5c66a77d04..4a0e8f21fb 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -11,8 +11,6 @@ import ( "github.com/hashicorp/go-memdb" - "github.com/stretchr/testify/require" - "github.com/hashicorp/consul/agent" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/local" @@ -23,6 +21,7 @@ import ( "github.com/hashicorp/consul/types" "github.com/pascaldekloe/goe/verify" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestAgentAntiEntropy_Services(t *testing.T) { @@ -1606,6 +1605,59 @@ func TestAgent_AddCheckFailure(t *testing.T) { } } +func TestAgent_AliasCheck(t *testing.T) { + t.Parallel() + + require := require.New(t) + cfg := config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`) + l := local.NewState(agent.LocalConfig(cfg), nil, new(token.Store)) + l.TriggerSyncChanges = func() {} + + // Add checks + require.NoError(l.AddService(&structs.NodeService{Service: "s1"}, "")) + require.NoError(l.AddService(&structs.NodeService{Service: "s2"}, "")) + require.NoError(l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("c1"), ServiceID: "s1"}, "")) + require.NoError(l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("c2"), ServiceID: "s2"}, "")) + + // Add an alias + notifyCh := make(chan struct{}, 1) + require.NoError(l.AddAliasCheck(types.CheckID("a1"), "s1", notifyCh)) + + // Update and verify we get notified + l.UpdateCheck(types.CheckID("c1"), api.HealthCritical, "") + select { + case <-notifyCh: + case <-time.After(100 * time.Millisecond): + t.Fatal("notify not received") + } + + // Update again and verify we do not get notified + l.UpdateCheck(types.CheckID("c1"), api.HealthCritical, "") + select { + case <-notifyCh: + t.Fatal("notify received") + + case <-time.After(50 * time.Millisecond): + } + + // Update other check and verify we do not get notified + l.UpdateCheck(types.CheckID("c2"), api.HealthCritical, "") + select { + case <-notifyCh: + t.Fatal("notify received") + + case <-time.After(50 * time.Millisecond): + } + + // Update change and verify we get notified + l.UpdateCheck(types.CheckID("c1"), api.HealthPassing, "") + select { + case <-notifyCh: + case <-time.After(100 * time.Millisecond): + t.Fatal("notify not received") + } +} + func TestAgent_sendCoordinate(t *testing.T) { t.Parallel() a := agent.NewTestAgent(t.Name(), ` From e9914ee71c80ddaa4ce451e5e46fcdac94d4edb0 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Sat, 30 Jun 2018 07:25:57 -0700 Subject: [PATCH 06/18] agent/checks: use local state for local services --- agent/checks/alias.go | 115 ++++++++++++++++++++++++++----------- agent/checks/alias_test.go | 33 +++++++++-- 2 files changed, 110 insertions(+), 38 deletions(-) diff --git a/agent/checks/alias.go b/agent/checks/alias.go index 1b02842d93..72963adb53 100644 --- a/agent/checks/alias.go +++ b/agent/checks/alias.go @@ -28,13 +28,24 @@ type CheckAlias struct { CheckID types.CheckID // ID of this check RPC RPC // Used to query remote server if necessary RPCReq structs.NodeSpecificRequest // Base request - Notify CheckNotifier // For updating the check state + Notify AliasNotifier // For updating the check state stop bool stopCh chan struct{} stopLock sync.Mutex } +// AliasNotifier is a CheckNotifier specifically for the Alias check. +// This requires additional methods that are satisfied by the agent +// local state. +type AliasNotifier interface { + CheckNotifier + + AddAliasCheck(types.CheckID, string, chan<- struct{}) error + RemoveAliasCheck(types.CheckID, string) + Checks() map[types.CheckID]*structs.HealthCheck +} + // Start is used to start a check ttl, runs until Stop() func (c *CheckAlias) Start() { func (c *CheckAlias) Start() { c.stopLock.Lock() @@ -56,6 +67,41 @@ func (c *CheckAlias) Stop() { // run is invoked in a goroutine until Stop() is called. func (c *CheckAlias) run(stopCh chan struct{}) { + // If we have a specific node set, then use a blocking query + if c.Node != "" { + c.runQuery(stopCh) + return + } + + // Use the local state to match the service. + c.runLocal(stopCh) +} + +func (c *CheckAlias) runLocal(stopCh chan struct{}) { + // Very important this is buffered as 1 so that we do not lose any + // queued updates. This only has to be exactly 1 since the existence + // of any update triggers us to load the full health check state. + notifyCh := make(chan struct{}, 1) + c.Notify.AddAliasCheck(c.CheckID, c.ServiceID, notifyCh) + defer c.Notify.RemoveAliasCheck(c.CheckID, c.ServiceID) + + for { + select { + case <-notifyCh: + checks := c.Notify.Checks() + checksList := make([]*structs.HealthCheck, 0, len(checks)) + for _, chk := range checks { + checksList = append(checksList, chk) + } + c.processChecks(checksList) + + case <-stopCh: + return + } + } +} + +func (c *CheckAlias) runQuery(stopCh chan struct{}) { args := c.RPCReq args.Node = c.Node args.AllowStale = true @@ -107,36 +153,41 @@ func (c *CheckAlias) run(stopCh chan struct{}) { args.MinQueryIndex = 1 } - health := api.HealthPassing - msg := "All checks passing." - if len(out.HealthChecks) == 0 { - // No health checks means we're healthy by default - msg = "No checks found." - } - for _, chk := range out.HealthChecks { - if chk.Node != c.Node { - continue - } - - // We allow ServiceID == "" so that we also check node checks - if chk.ServiceID != "" && chk.ServiceID != c.ServiceID { - continue - } - - if chk.Status == api.HealthCritical || chk.Status == api.HealthWarning { - health = chk.Status - msg = fmt.Sprintf("Aliased check %q failing: %s", chk.Name, chk.Output) - - // Critical checks exit the for loop immediately since we - // know that this is the health state. Warnings do not since - // there may still be a critical check. - if chk.Status == api.HealthCritical { - break - } - } - } - - // Update our check value - c.Notify.UpdateCheck(c.CheckID, health, msg) + c.processChecks(out.HealthChecks) } } + +// processChecks is a common helper for taking a set of health checks and +// using them to update our alias. This is abstracted since the checks can +// come from both the remote server as well as local state. +func (c *CheckAlias) processChecks(checks []*structs.HealthCheck) { + health := api.HealthPassing + msg := "No checks found." + for _, chk := range checks { + if c.Node != "" && chk.Node != c.Node { + continue + } + + // We allow ServiceID == "" so that we also check node checks + if chk.ServiceID != "" && chk.ServiceID != c.ServiceID { + continue + } + + if chk.Status == api.HealthCritical || chk.Status == api.HealthWarning { + health = chk.Status + msg = fmt.Sprintf("Aliased check %q failing: %s", chk.Name, chk.Output) + + // Critical checks exit the for loop immediately since we + // know that this is the health state. Warnings do not since + // there may still be a critical check. + if chk.Status == api.HealthCritical { + break + } + } + + msg = "All checks passing." + } + + // Update our check value + c.Notify.UpdateCheck(c.CheckID, health, msg) +} diff --git a/agent/checks/alias_test.go b/agent/checks/alias_test.go index 431cf116a3..e063451b36 100644 --- a/agent/checks/alias_test.go +++ b/agent/checks/alias_test.go @@ -19,7 +19,7 @@ import ( func TestCheckAlias_remoteErrBackoff(t *testing.T) { t.Parallel() - notify := mock.NewNotify() + notify := newMockAliasNotify() chkID := types.CheckID("foo") rpc := &mockRPC{} chk := &CheckAlias{ @@ -45,7 +45,7 @@ func TestCheckAlias_remoteErrBackoff(t *testing.T) { func TestCheckAlias_remoteNoChecks(t *testing.T) { t.Parallel() - notify := mock.NewNotify() + notify := newMockAliasNotify() chkID := types.CheckID("foo") rpc := &mockRPC{} chk := &CheckAlias{ @@ -71,7 +71,7 @@ func TestCheckAlias_remoteNoChecks(t *testing.T) { func TestCheckAlias_remoteNodeFailure(t *testing.T) { t.Parallel() - notify := mock.NewNotify() + notify := newMockAliasNotify() chkID := types.CheckID("foo") rpc := &mockRPC{} chk := &CheckAlias{ @@ -120,7 +120,7 @@ func TestCheckAlias_remoteNodeFailure(t *testing.T) { func TestCheckAlias_remotePassing(t *testing.T) { t.Parallel() - notify := mock.NewNotify() + notify := newMockAliasNotify() chkID := types.CheckID("foo") rpc := &mockRPC{} chk := &CheckAlias{ @@ -169,7 +169,7 @@ func TestCheckAlias_remotePassing(t *testing.T) { func TestCheckAlias_remoteCritical(t *testing.T) { t.Parallel() - notify := mock.NewNotify() + notify := newMockAliasNotify() chkID := types.CheckID("foo") rpc := &mockRPC{} chk := &CheckAlias{ @@ -224,7 +224,7 @@ func TestCheckAlias_remoteCritical(t *testing.T) { func TestCheckAlias_remoteWarning(t *testing.T) { t.Parallel() - notify := mock.NewNotify() + notify := newMockAliasNotify() chkID := types.CheckID("foo") rpc := &mockRPC{} chk := &CheckAlias{ @@ -275,6 +275,27 @@ func TestCheckAlias_remoteWarning(t *testing.T) { }) } +type mockAliasNotify struct { + *mock.Notify +} + +func newMockAliasNotify() *mockAliasNotify { + return &mockAliasNotify{ + Notify: mock.NewNotify(), + } +} + +func (m *mockAliasNotify) AddAliasCheck(chkID types.CheckID, serviceID string, ch chan<- struct{}) error { + return nil +} + +func (m *mockAliasNotify) RemoveAliasCheck(chkID types.CheckID, serviceID string) { +} + +func (m *mockAliasNotify) Checks() map[types.CheckID]*structs.HealthCheck { + return nil +} + // mockRPC is an implementation of RPC that can be used for tests. The // atomic.Value fields can be set concurrently and will reflect on the next // RPC call. From 1e9233eec1cf393cc1bdedb78a164bc63428d708 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Sat, 30 Jun 2018 07:37:43 -0700 Subject: [PATCH 07/18] agent/checks: set critical if RPC fails --- agent/checks/alias.go | 5 +++++ agent/checks/alias_test.go | 6 ++++++ 2 files changed, 11 insertions(+) diff --git a/agent/checks/alias.go b/agent/checks/alias.go index 72963adb53..9622c896c3 100644 --- a/agent/checks/alias.go +++ b/agent/checks/alias.go @@ -137,6 +137,11 @@ func (c *CheckAlias) runQuery(stopCh chan struct{}) { var out structs.IndexedHealthChecks if err := c.RPC.RPC("Health.NodeChecks", &args, &out); err != nil { attempt++ + if attempt > 1 { + c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, + fmt.Sprintf("Failure checking aliased node or service: %s", err)) + } + continue } diff --git a/agent/checks/alias_test.go b/agent/checks/alias_test.go index e063451b36..220f5a85f2 100644 --- a/agent/checks/alias_test.go +++ b/agent/checks/alias_test.go @@ -39,6 +39,12 @@ func TestCheckAlias_remoteErrBackoff(t *testing.T) { if got, want := atomic.LoadUint32(&rpc.Calls), uint32(6); got > want { t.Fatalf("got %d updates want at most %d", got, want) } + + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthCritical; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) } // No remote health checks should result in passing on the check. From 36e330941aa95f56662b2c5ef815bd25be07939b Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Sat, 30 Jun 2018 07:41:23 -0700 Subject: [PATCH 08/18] agent/checks: support node-only checks --- agent/checks/alias_test.go | 94 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/agent/checks/alias_test.go b/agent/checks/alias_test.go index 220f5a85f2..ea6603345f 100644 --- a/agent/checks/alias_test.go +++ b/agent/checks/alias_test.go @@ -281,6 +281,100 @@ func TestCheckAlias_remoteWarning(t *testing.T) { }) } +// Only passing should result in passing for node-only checks +func TestCheckAlias_remoteNodeOnlyPassing(t *testing.T) { + t.Parallel() + + notify := newMockAliasNotify() + chkID := types.CheckID("foo") + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.Reply.Store(structs.IndexedHealthChecks{ + HealthChecks: []*structs.HealthCheck{ + // Should ignore non-matching node + &structs.HealthCheck{ + Node: "A", + ServiceID: "web", + Status: api.HealthCritical, + }, + + // Should ignore any services + &structs.HealthCheck{ + Node: "remote", + ServiceID: "db", + Status: api.HealthCritical, + }, + + // Match + &structs.HealthCheck{ + Node: "remote", + Status: api.HealthPassing, + }, + }, + }) + + chk.Start() + defer chk.Stop() + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthPassing; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} + +// Only critical should result in passing for node-only checks +func TestCheckAlias_remoteNodeOnlyCritical(t *testing.T) { + t.Parallel() + + notify := newMockAliasNotify() + chkID := types.CheckID("foo") + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.Reply.Store(structs.IndexedHealthChecks{ + HealthChecks: []*structs.HealthCheck{ + // Should ignore non-matching node + &structs.HealthCheck{ + Node: "A", + ServiceID: "web", + Status: api.HealthCritical, + }, + + // Should ignore any services + &structs.HealthCheck{ + Node: "remote", + ServiceID: "db", + Status: api.HealthCritical, + }, + + // Match + &structs.HealthCheck{ + Node: "remote", + Status: api.HealthCritical, + }, + }, + }) + + chk.Start() + defer chk.Stop() + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthCritical; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} + type mockAliasNotify struct { *mock.Notify } From 5bc27feb0bfa0203b808414affd6ff9b811d9f9e Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Sat, 30 Jun 2018 07:41:46 -0700 Subject: [PATCH 09/18] agent/structs: check is alias if node is empty --- agent/structs/check_type.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/structs/check_type.go b/agent/structs/check_type.go index 8586e77263..43b76057c6 100644 --- a/agent/structs/check_type.go +++ b/agent/structs/check_type.go @@ -77,7 +77,7 @@ func (c *CheckType) Empty() bool { // IsAlias checks if this is an alias check. func (c *CheckType) IsAlias() bool { - return c.AliasService != "" + return c.AliasNode != "" || c.AliasService != "" } // IsScript checks if this is a check that execs some kind of script. From 19ced12668b15c650dfe0ba6cf60bc9893faae48 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Sat, 30 Jun 2018 07:52:25 -0700 Subject: [PATCH 10/18] agent: alias checks have no interval --- agent/agent.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 30203224dc..0320c645cd 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -2029,11 +2029,6 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, existing.Stop() delete(a.checkAliases, check.CheckID) } - if chkType.Interval < checks.MinInterval { - a.logger.Printf("[WARN] agent: check '%s' has interval below minimum of %v", - check.CheckID, checks.MinInterval) - chkType.Interval = checks.MinInterval - } var rpcReq structs.NodeSpecificRequest rpcReq.Datacenter = a.config.Datacenter From f97bfd5be8ecd79240ec7b43e3e191938e24e2ca Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Mon, 2 Jul 2018 20:50:00 -0700 Subject: [PATCH 11/18] agent: address some basic feedback --- agent/checks/alias.go | 8 ++++---- agent/local/state.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/agent/checks/alias.go b/agent/checks/alias.go index 9622c896c3..5a40693c54 100644 --- a/agent/checks/alias.go +++ b/agent/checks/alias.go @@ -17,7 +17,7 @@ const ( ) // CheckAlias is a check type that aliases the health of another service -// instance. If the service aliased has any critical health checks, then +// instance or node. If the service aliased has any critical health checks, then // this check is critical. If the service has no critical but warnings, // then this check is warning, and if a service has only passing checks, then // this check is passing. @@ -46,7 +46,7 @@ type AliasNotifier interface { Checks() map[types.CheckID]*structs.HealthCheck } -// Start is used to start a check ttl, runs until Stop() func (c *CheckAlias) Start() { +// Start is used to start the check, runs until Stop() func (c *CheckAlias) Start() { func (c *CheckAlias) Start() { c.stopLock.Lock() defer c.stopLock.Unlock() @@ -55,7 +55,7 @@ func (c *CheckAlias) Start() { go c.run(c.stopCh) } -// Stop is used to stop a check ttl. +// Stop is used to stop the check. func (c *CheckAlias) Stop() { c.stopLock.Lock() defer c.stopLock.Unlock() @@ -132,7 +132,7 @@ func (c *CheckAlias) runQuery(stopCh chan struct{}) { // NOTE(mitchellh): This currently returns ALL health checks for // a node even though we also have the service ID. This can be // optimized if we introduce a new RPC endpoint to filter both, - // but for blocking queries isn't that more efficient since the checks + // but for blocking queries isn't that much more efficient since the checks // index is global to the cluster. var out structs.IndexedHealthChecks if err := c.RPC.RPC("Health.NodeChecks", &args, &out); err != nil { diff --git a/agent/local/state.go b/agent/local/state.go index 6f90353565..1f15f231a9 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -528,7 +528,7 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) { // Do not block. All notify channels should be buffered to at // least 1 in which case not-blocking does not result in loss // of data because a failed send means a notification is - // already queued. + // already queued. This must be called with the lock held. select { case notifyCh <- struct{}{}: default: From d6ecd97d1dffa72a9e928e66d87889deb9c09c7f Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Thu, 12 Jul 2018 10:17:53 -0700 Subject: [PATCH 12/18] agent: use the correct ACL token for alias checks --- agent/agent.go | 10 +++- agent/agent_test.go | 112 +++++++++++++++++++++++++++++++++++++++----- 2 files changed, 108 insertions(+), 14 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 0320c645cd..2f9c351fdd 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -2032,7 +2032,15 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, var rpcReq structs.NodeSpecificRequest rpcReq.Datacenter = a.config.Datacenter - rpcReq.Token = a.tokens.AgentToken() + + // The token to set is really important. The behavior below follows + // the same behavior as anti-entropy: we use the user-specified token + // if set (either on the service or check definition), otherwise + // we use the "UserToken" on the agent. This is tested. + rpcReq.Token = a.tokens.UserToken() + if token != "" { + rpcReq.Token = token + } chkImpl := &checks.CheckAlias{ Notify: a.State, diff --git a/agent/agent_test.go b/agent/agent_test.go index 702592eb44..edf54253db 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -937,6 +937,8 @@ func TestAgent_AddCheck_GRPC(t *testing.T) { func TestAgent_AddCheck_Alias(t *testing.T) { t.Parallel() + + require := require.New(t) a := NewTestAgent(t.Name(), "") defer a.Shutdown() @@ -950,25 +952,109 @@ func TestAgent_AddCheck_Alias(t *testing.T) { AliasService: "foo", } err := a.AddCheck(health, chk, false, "") - if err != nil { - t.Fatalf("err: %v", err) - } + require.NoError(err) // Ensure we have a check mapping sChk, ok := a.State.Checks()["aliashealth"] - if !ok { - t.Fatalf("missing aliashealth check") - } + require.True(ok, "missing aliashealth check") + require.NotNil(sChk) + require.Equal(api.HealthCritical, sChk.Status) - // Ensure our check is in the right state - if sChk.Status != api.HealthCritical { - t.Fatalf("check not critical") - } + chkImpl, ok := a.checkAliases["aliashealth"] + require.True(ok, "missing aliashealth check") + require.Equal("", chkImpl.RPCReq.Token) - // Ensure a check is setup - if _, ok := a.checkAliases["aliashealth"]; !ok { - t.Fatalf("missing aliashealth check") + cs := a.State.CheckState("aliashealth") + require.NotNil(cs) + require.Equal("", cs.Token) +} + +func TestAgent_AddCheck_Alias_setToken(t *testing.T) { + t.Parallel() + + require := require.New(t) + a := NewTestAgent(t.Name(), "") + defer a.Shutdown() + + health := &structs.HealthCheck{ + Node: "foo", + CheckID: "aliashealth", + Name: "Alias health check", + Status: api.HealthCritical, } + chk := &structs.CheckType{ + AliasService: "foo", + } + err := a.AddCheck(health, chk, false, "foo") + require.NoError(err) + + cs := a.State.CheckState("aliashealth") + require.NotNil(cs) + require.Equal("foo", cs.Token) + + chkImpl, ok := a.checkAliases["aliashealth"] + require.True(ok, "missing aliashealth check") + require.Equal("foo", chkImpl.RPCReq.Token) +} + +func TestAgent_AddCheck_Alias_userToken(t *testing.T) { + t.Parallel() + + require := require.New(t) + a := NewTestAgent(t.Name(), ` +acl_token = "hello" + `) + defer a.Shutdown() + + health := &structs.HealthCheck{ + Node: "foo", + CheckID: "aliashealth", + Name: "Alias health check", + Status: api.HealthCritical, + } + chk := &structs.CheckType{ + AliasService: "foo", + } + err := a.AddCheck(health, chk, false, "") + require.NoError(err) + + cs := a.State.CheckState("aliashealth") + require.NotNil(cs) + require.Equal("", cs.Token) // State token should still be empty + + chkImpl, ok := a.checkAliases["aliashealth"] + require.True(ok, "missing aliashealth check") + require.Equal("hello", chkImpl.RPCReq.Token) // Check should use the token +} + +func TestAgent_AddCheck_Alias_userAndSetToken(t *testing.T) { + t.Parallel() + + require := require.New(t) + a := NewTestAgent(t.Name(), ` +acl_token = "hello" + `) + defer a.Shutdown() + + health := &structs.HealthCheck{ + Node: "foo", + CheckID: "aliashealth", + Name: "Alias health check", + Status: api.HealthCritical, + } + chk := &structs.CheckType{ + AliasService: "foo", + } + err := a.AddCheck(health, chk, false, "goodbye") + require.NoError(err) + + cs := a.State.CheckState("aliashealth") + require.NotNil(cs) + require.Equal("goodbye", cs.Token) + + chkImpl, ok := a.checkAliases["aliashealth"] + require.True(ok, "missing aliashealth check") + require.Equal("goodbye", chkImpl.RPCReq.Token) } func TestAgent_RemoveCheck(t *testing.T) { From 9a90400821c9df5131b8e3ab84a2575ec8c25bba Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Thu, 12 Jul 2018 10:21:49 -0700 Subject: [PATCH 13/18] agent/checks: prevent overflow of backoff --- agent/checks/alias.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/agent/checks/alias.go b/agent/checks/alias.go index 5a40693c54..e6c7082e57 100644 --- a/agent/checks/alias.go +++ b/agent/checks/alias.go @@ -120,7 +120,11 @@ func (c *CheckAlias) runQuery(stopCh chan struct{}) { // Backoff if we have to if attempt > checkAliasBackoffMin { - waitTime := (1 << (attempt - checkAliasBackoffMin)) * time.Second + shift := attempt - checkAliasBackoffMin + if shift > 31 { + shift = 31 // so we don't overflow to 0 + } + waitTime := (1 << shift) * time.Second if waitTime > checkAliasBackoffMaxWait { waitTime = checkAliasBackoffMaxWait } From 1027a01a102a7a02a6edfc31062ea0a351314501 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Thu, 12 Jul 2018 21:14:36 -0700 Subject: [PATCH 14/18] website: document alias check --- website/source/api/agent/check.html.md | 10 +++++++++ website/source/docs/agent/checks.html.md | 28 ++++++++++++++++++++++-- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/website/source/api/agent/check.html.md b/website/source/api/agent/check.html.md index 052693335a..326811eea9 100644 --- a/website/source/api/agent/check.html.md +++ b/website/source/api/agent/check.html.md @@ -116,6 +116,16 @@ The table below shows this endpoint's support for continue to be accepted in future versions of Consul), and `Args` in Consul 1.0.1 and later. +- `AliasNode` `(string: "")` - Specifies the ID of the node for an alias check. + If no service is specified, the check will alias the health of the node. + If a service is specified, the check will alias the specified service on + this particular node. + +- `AliasService` `(string: "")` - Specifies the ID of a service for an + alias check. If the service is not registered with the same agent, + `AliasNode` must also be specified. Note this is the service _ID_ and + not the service _name_ (though they are very often the same). + - `DockerContainerID` `(string: "")` - Specifies that the check is a Docker check, and Consul will evaluate the script every `Interval` in the given container using the specified `Shell`. Note that `Shell` is currently only diff --git a/website/source/docs/agent/checks.html.md b/website/source/docs/agent/checks.html.md index a563de5778..45d2f7cbd7 100644 --- a/website/source/docs/agent/checks.html.md +++ b/website/source/docs/agent/checks.html.md @@ -101,6 +101,17 @@ There are several different kinds of checks: TLS certificate is expected. Certificate verification can be turned off by setting the `tls_skip_verify` field to `true` in the check definition. +* Alias - These checks alias the health state another Consul + node or service. The state of the check will be updated asynchronously, + but is nearly instant. For aliased services on the same agent, the local + state is monitored and no additional network resources are consumed. For + other services and nodes, the check maintains a blocking query over the + agent's connection with a current server and allows stale requests. If there + are any errors in watching the aliased node or service, the check will be + unhealthy. For the blocking query, the check will use the ACL token set + on the service definition, otherwise falling back to the default ACL + token set with the agent (`acl_token`). + ## Check Definition A script check: @@ -165,7 +176,7 @@ A Docker check: ```javascript { -"check": { + "check": { "id": "mem-util", "name": "Memory utilization", "docker_container_id": "f972c95ebf0e", @@ -180,7 +191,7 @@ A gRPC check: ```javascript { -"check": { + "check": { "id": "mem-util", "name": "Service health status", "grpc": "127.0.0.1:12345", @@ -190,6 +201,17 @@ A gRPC check: } ``` +An alias check for a local service: + +```javascript +{ + "check": { + "id": "web-alias", + "alias_service": "web" + } +} +``` + Each type of definition must include a `name` and may optionally provide an `id` and `notes` field. The `id` must be unique per _agent_ otherwise only the last defined check with that `id` will be registered. If the `id` is not set @@ -205,6 +227,8 @@ a TTL check via the HTTP interface can set the `notes` value. Checks may also contain a `token` field to provide an ACL token. This token is used for any interaction with the catalog for the check, including [anti-entropy syncs](/docs/internals/anti-entropy.html) and deregistration. +For Alias checks, this token is used if a remote blocking query is necessary +to watch the state of the aliased node or service. Script, TCP, HTTP, Docker, and gRPC checks must include an `interval` field. This field is parsed by Go's `time` package, and has the following From 6be4fa9118f00c4d24291688dee667a9ac0fd51e Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Thu, 12 Jul 2018 21:18:00 -0700 Subject: [PATCH 15/18] website: fix some website typos --- website/source/docs/agent/checks.html.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/website/source/docs/agent/checks.html.md b/website/source/docs/agent/checks.html.md index 45d2f7cbd7..25520f2167 100644 --- a/website/source/docs/agent/checks.html.md +++ b/website/source/docs/agent/checks.html.md @@ -101,15 +101,15 @@ There are several different kinds of checks: TLS certificate is expected. Certificate verification can be turned off by setting the `tls_skip_verify` field to `true` in the check definition. -* Alias - These checks alias the health state another Consul +* Alias - These checks alias the health state of another registered node or service. The state of the check will be updated asynchronously, but is nearly instant. For aliased services on the same agent, the local state is monitored and no additional network resources are consumed. For other services and nodes, the check maintains a blocking query over the agent's connection with a current server and allows stale requests. If there - are any errors in watching the aliased node or service, the check will be - unhealthy. For the blocking query, the check will use the ACL token set - on the service definition, otherwise falling back to the default ACL + are any errors in watching the aliased node or service, the check state will be + critical. For the blocking query, the check will use the ACL token set + on the service or check definition or otherwise will fall back to the default ACL token set with the agent (`acl_token`). ## Check Definition From 9f128e40d6a650ed326c1a89e285962e332df458 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Wed, 18 Jul 2018 16:16:28 -0500 Subject: [PATCH 16/18] agent/local: don't use time.After in test since notify is instant --- agent/local/state_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/agent/local/state_test.go b/agent/local/state_test.go index 4a0e8f21fb..11590e5122 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -1627,7 +1627,7 @@ func TestAgent_AliasCheck(t *testing.T) { l.UpdateCheck(types.CheckID("c1"), api.HealthCritical, "") select { case <-notifyCh: - case <-time.After(100 * time.Millisecond): + default: t.Fatal("notify not received") } @@ -1653,7 +1653,7 @@ func TestAgent_AliasCheck(t *testing.T) { l.UpdateCheck(types.CheckID("c1"), api.HealthPassing, "") select { case <-notifyCh: - case <-time.After(100 * time.Millisecond): + default: t.Fatal("notify not received") } } From 8c72bb0cdf96c170065fc551d26d7690e87fee21 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Thu, 19 Jul 2018 14:20:50 -0500 Subject: [PATCH 17/18] agent/local: address remaining test feedback --- agent/local/state_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/agent/local/state_test.go b/agent/local/state_test.go index 11590e5122..e7dfb29060 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -1637,7 +1637,7 @@ func TestAgent_AliasCheck(t *testing.T) { case <-notifyCh: t.Fatal("notify received") - case <-time.After(50 * time.Millisecond): + default: } // Update other check and verify we do not get notified @@ -1646,7 +1646,7 @@ func TestAgent_AliasCheck(t *testing.T) { case <-notifyCh: t.Fatal("notify received") - case <-time.After(50 * time.Millisecond): + default: } // Update change and verify we get notified From b3854fdd28cb4fa5d3349b326b6dc31816605f42 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Thu, 19 Jul 2018 14:21:30 -0500 Subject: [PATCH 18/18] agent/local: silly spacing on select statements --- agent/local/state_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/agent/local/state_test.go b/agent/local/state_test.go index e7dfb29060..3cdf2a28a1 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -1636,7 +1636,6 @@ func TestAgent_AliasCheck(t *testing.T) { select { case <-notifyCh: t.Fatal("notify received") - default: } @@ -1645,7 +1644,6 @@ func TestAgent_AliasCheck(t *testing.T) { select { case <-notifyCh: t.Fatal("notify received") - default: }