From 4a3d7db24fa851fad603207d7804507aeaf61015 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 16 Aug 2016 00:05:55 -0700 Subject: [PATCH] Adds ability to deregister a service based on critical check state longer than a timeout. --- api/agent.go | 11 +- api/agent_test.go | 7 + command/agent/agent.go | 114 +++++++++++--- command/agent/agent_endpoint.go | 8 +- command/agent/agent_test.go | 139 +++++++++++++++++- command/agent/check.go | 16 +- command/agent/config.go | 39 ++++- command/agent/config_test.go | 12 +- command/agent/local.go | 51 ++++++- command/agent/local_test.go | 61 ++++++++ .../source/docs/agent/checks.html.markdown | 10 ++ .../docs/agent/http/agent.html.markdown | 25 +++- 12 files changed, 439 insertions(+), 54 deletions(-) diff --git a/api/agent.go b/api/agent.go index 3df013cc5b..36e0900013 100644 --- a/api/agent.go +++ b/api/agent.go @@ -62,8 +62,7 @@ type AgentCheckRegistration struct { AgentServiceCheck } -// AgentServiceCheck is used to create an associated -// check for a service +// AgentServiceCheck is used to define a node or service level check type AgentServiceCheck struct { Script string `json:",omitempty"` DockerContainerID string `json:",omitempty"` @@ -74,6 +73,14 @@ type AgentServiceCheck struct { HTTP string `json:",omitempty"` TCP string `json:",omitempty"` Status string `json:",omitempty"` + + // Checks that are associated with a service may also contain this + // optional DeregisterCriticalServiceAfter field, which is a timeout in + // the same Go time format as Interval and TTL. If a check is in the + // critical state for more than this configured value, then its + // associated service (and all of its associated checks) will + // automatically be deregistered. + DeregisterCriticalServiceAfter string `json:",omitempty"` } type AgentServiceChecks []*AgentServiceCheck diff --git a/api/agent_test.go b/api/agent_test.go index b188d7ce3f..215d240dc9 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -455,6 +455,13 @@ func TestAgent_Checks_serviceBound(t *testing.T) { ServiceID: "redis", } reg.TTL = "15s" + reg.DeregisterCriticalServiceAfter = "nope" + err := agent.CheckRegister(reg) + if err == nil || !strings.Contains(err.Error(), "invalid duration") { + t.Fatalf("err: %v", err) + } + + reg.DeregisterCriticalServiceAfter = "90m" if err := agent.CheckRegister(reg); err != nil { t.Fatalf("err: %v", err) } diff --git a/command/agent/agent.go b/command/agent/agent.go index 79b94d55db..4c41b5e6f9 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -74,6 +74,10 @@ type Agent struct { // services and checks. Used for anti-entropy. state localState + // checkReapAfter maps the check ID to a timeout after which we should + // reap its associated service + checkReapAfter map[types.CheckID]time.Duration + // checkMonitors maps the check ID to an associated monitor checkMonitors map[types.CheckID]*CheckMonitor @@ -174,24 +178,25 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { } agent := &Agent{ - config: config, - logger: log.New(logOutput, "", log.LstdFlags), - logOutput: logOutput, - checkMonitors: make(map[types.CheckID]*CheckMonitor), - checkTTLs: make(map[types.CheckID]*CheckTTL), - checkHTTPs: make(map[types.CheckID]*CheckHTTP), - checkTCPs: make(map[types.CheckID]*CheckTCP), - checkDockers: make(map[types.CheckID]*CheckDocker), - eventCh: make(chan serf.UserEvent, 1024), - eventBuf: make([]*UserEvent, 256), - shutdownCh: make(chan struct{}), - endpoints: make(map[string]string), + config: config, + logger: log.New(logOutput, "", log.LstdFlags), + logOutput: logOutput, + checkReapAfter: make(map[types.CheckID]time.Duration), + checkMonitors: make(map[types.CheckID]*CheckMonitor), + checkTTLs: make(map[types.CheckID]*CheckTTL), + checkHTTPs: make(map[types.CheckID]*CheckHTTP), + checkTCPs: make(map[types.CheckID]*CheckTCP), + checkDockers: make(map[types.CheckID]*CheckDocker), + eventCh: make(chan serf.UserEvent, 1024), + eventBuf: make([]*UserEvent, 256), + shutdownCh: make(chan struct{}), + endpoints: make(map[string]string), } - // Initialize the local state + // Initialize the local state. agent.state.Init(config, agent.logger) - // Setup either the client or the server + // Setup either the client or the server. var err error if config.Server { err = agent.setupServer() @@ -213,7 +218,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { return nil, err } - // Load checks/services + // Load checks/services. if err := agent.loadServices(config); err != nil { return nil, err } @@ -221,7 +226,11 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { return nil, err } - // Start handling events + // Start watching for critical services to deregister, based on their + // checks. + go agent.reapServices() + + // Start handling events. go agent.handleEvents() // Start sending network coordinate to the server. @@ -229,7 +238,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { go agent.sendCoordinate() } - // Write out the PID file if necessary + // Write out the PID file if necessary. err = agent.storePid() if err != nil { return nil, err @@ -664,6 +673,52 @@ func (a *Agent) sendCoordinate() { } } +// reapServices is a long running goroutine that looks for checks that have been +// critical too long and dregisters their associated services. +func (a *Agent) reapServices() { + reap := func() { + reaped := make(map[string]struct{}) + for checkID, check := range a.state.CriticalChecks() { + // There's nothing to do if there's no service. + if check.Check.ServiceID == "" { + continue + } + + // There might be multiple checks for one service, so + // we don't need to reap multiple times. + serviceID := check.Check.ServiceID + if _, ok := reaped[serviceID]; ok { + continue + } + + // See if there's a timeout. + a.checkLock.Lock() + timeout, ok := a.checkReapAfter[checkID] + a.checkLock.Unlock() + + // Reap, if necessary. We keep track of which service + // this is so that we won't try to remove it again. + if ok && check.CriticalFor > timeout { + reaped[serviceID] = struct{}{} + a.RemoveService(serviceID, true) + a.logger.Printf("[INFO] agent: Check %q for service %q has been critical for too long; deregistered service", + checkID, serviceID) + } + } + } + + for { + select { + case <-time.After(a.config.CheckReapInterval): + reap() + + case <-a.shutdownCh: + return + } + } + +} + // persistService saves a service definition to a JSON file in the data dir func (a *Agent) persistService(service *structs.NodeService) error { svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID)) @@ -987,6 +1042,18 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist } else { return fmt.Errorf("Check type is not valid") } + + if chkType.DeregisterCriticalServiceAfter > 0 { + timeout := chkType.DeregisterCriticalServiceAfter + if timeout < a.config.CheckDeregisterIntervalMin { + timeout = a.config.CheckDeregisterIntervalMin + a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has deregister interval below minimum of %v", + check.CheckID, a.config.CheckDeregisterIntervalMin)) + } + a.checkReapAfter[check.CheckID] = timeout + } else { + delete(a.checkReapAfter, check.CheckID) + } } // Add to the local state for anti-entropy @@ -1015,6 +1082,7 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error { defer a.checkLock.Unlock() // Stop any monitors + delete(a.checkReapAfter, checkID) if check, ok := a.checkMonitors[checkID]; ok { check.Stop() delete(a.checkMonitors, checkID) @@ -1043,25 +1111,27 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error { return nil } -// UpdateCheck is used to update the status of a check. -// This can only be used with checks of the TTL type. -func (a *Agent) UpdateCheck(checkID types.CheckID, status, output string) error { +// updateTTLCheck is used to update the status of a TTL check via the Agent API. +func (a *Agent) updateTTLCheck(checkID types.CheckID, status, output string) error { a.checkLock.Lock() defer a.checkLock.Unlock() + // Grab the TTL check. check, ok := a.checkTTLs[checkID] if !ok { return fmt.Errorf("CheckID %q does not have associated TTL", checkID) } - // Set the status through CheckTTL to reset the TTL + // Set the status through CheckTTL to reset the TTL. check.SetStatus(status, output) + // We don't write any files in dev mode so bail here. if a.config.DevMode { return nil } - // Always persist the state for TTL checks + // Persist the state so the TTL check can come up in a good state after + // an agent restart, especially with long TTL values. if err := a.persistCheckState(check, status, output); err != nil { return fmt.Errorf("failed persisting state for check %q: %s", checkID, err) } diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index d8c3275292..2e8fdfe273 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -144,7 +144,7 @@ func (s *HTTPServer) AgentDeregisterCheck(resp http.ResponseWriter, req *http.Re func (s *HTTPServer) AgentCheckPass(resp http.ResponseWriter, req *http.Request) (interface{}, error) { checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/pass/")) note := req.URL.Query().Get("note") - if err := s.agent.UpdateCheck(checkID, structs.HealthPassing, note); err != nil { + if err := s.agent.updateTTLCheck(checkID, structs.HealthPassing, note); err != nil { return nil, err } s.syncChanges() @@ -154,7 +154,7 @@ func (s *HTTPServer) AgentCheckPass(resp http.ResponseWriter, req *http.Request) func (s *HTTPServer) AgentCheckWarn(resp http.ResponseWriter, req *http.Request) (interface{}, error) { checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/warn/")) note := req.URL.Query().Get("note") - if err := s.agent.UpdateCheck(checkID, structs.HealthWarning, note); err != nil { + if err := s.agent.updateTTLCheck(checkID, structs.HealthWarning, note); err != nil { return nil, err } s.syncChanges() @@ -164,7 +164,7 @@ func (s *HTTPServer) AgentCheckWarn(resp http.ResponseWriter, req *http.Request) func (s *HTTPServer) AgentCheckFail(resp http.ResponseWriter, req *http.Request) (interface{}, error) { checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/fail/")) note := req.URL.Query().Get("note") - if err := s.agent.UpdateCheck(checkID, structs.HealthCritical, note); err != nil { + if err := s.agent.updateTTLCheck(checkID, structs.HealthCritical, note); err != nil { return nil, err } s.syncChanges() @@ -216,7 +216,7 @@ func (s *HTTPServer) AgentCheckUpdate(resp http.ResponseWriter, req *http.Reques } checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/update/")) - if err := s.agent.UpdateCheck(checkID, update.Status, update.Output); err != nil { + if err := s.agent.updateTTLCheck(checkID, update.Status, update.Output); err != nil { return nil, err } s.syncChanges() diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index bab39a7d08..8c9fee43c5 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -641,7 +641,7 @@ func TestAgent_RemoveCheck(t *testing.T) { } } -func TestAgent_UpdateCheck(t *testing.T) { +func TestAgent_updateTTLCheck(t *testing.T) { dir, agent := makeAgent(t, nextConfig()) defer os.RemoveAll(dir) defer agent.Shutdown() @@ -655,17 +655,17 @@ func TestAgent_UpdateCheck(t *testing.T) { chk := &CheckType{ TTL: 15 * time.Second, } + + // Add check and update it. err := agent.AddCheck(health, chk, false, "") if err != nil { t.Fatalf("err: %v", err) } - - // Remove check - if err := agent.UpdateCheck("mem", structs.HealthPassing, "foo"); err != nil { + if err := agent.updateTTLCheck("mem", structs.HealthPassing, "foo"); err != nil { t.Fatalf("err: %v", err) } - // Ensure we have a check mapping + // Ensure we have a check mapping. status := agent.state.Checks()["mem"] if status.Status != structs.HealthPassing { t.Fatalf("bad: %v", status) @@ -1247,7 +1247,7 @@ func TestAgent_unloadServices(t *testing.T) { } } -func TestAgent_ServiceMaintenanceMode(t *testing.T) { +func TestAgent_Service_MaintenanceMode(t *testing.T) { config := nextConfig() dir, agent := makeAgent(t, config) defer os.RemoveAll(dir) @@ -1312,6 +1312,133 @@ func TestAgent_ServiceMaintenanceMode(t *testing.T) { } } +func TestAgent_Service_Reap(t *testing.T) { + config := nextConfig() + config.CheckReapInterval = time.Millisecond + config.CheckDeregisterIntervalMin = 0 + dir, agent := makeAgent(t, config) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + svc := &structs.NodeService{ + ID: "redis", + Service: "redis", + Tags: []string{"foo"}, + Port: 8000, + } + chkTypes := CheckTypes{ + &CheckType{ + Status: structs.HealthPassing, + TTL: 10 * time.Millisecond, + DeregisterCriticalServiceAfter: 100 * time.Millisecond, + }, + } + + // Register the service. + if err := agent.AddService(svc, chkTypes, false, ""); err != nil { + t.Fatalf("err: %v", err) + } + + // Make sure it's there and there's no critical check yet. + if _, ok := agent.state.Services()["redis"]; !ok { + t.Fatalf("should have redis service") + } + if checks := agent.state.CriticalChecks(); len(checks) > 0 { + t.Fatalf("should not have critical checks") + } + + // Wait for the check TTL to fail. + time.Sleep(30 * time.Millisecond) + if _, ok := agent.state.Services()["redis"]; !ok { + t.Fatalf("should have redis service") + } + if checks := agent.state.CriticalChecks(); len(checks) != 1 { + t.Fatalf("should have a critical check") + } + + // Pass the TTL. + if err := agent.updateTTLCheck("service:redis", structs.HealthPassing, "foo"); err != nil { + t.Fatalf("err: %v", err) + } + if _, ok := agent.state.Services()["redis"]; !ok { + t.Fatalf("should have redis service") + } + if checks := agent.state.CriticalChecks(); len(checks) > 0 { + t.Fatalf("should not have critical checks") + } + + // Wait for the check TTL to fail again. + time.Sleep(30 * time.Millisecond) + if _, ok := agent.state.Services()["redis"]; !ok { + t.Fatalf("should have redis service") + } + if checks := agent.state.CriticalChecks(); len(checks) != 1 { + t.Fatalf("should have a critical check") + } + + // Wait for the reap. + time.Sleep(300 * time.Millisecond) + if _, ok := agent.state.Services()["redis"]; ok { + t.Fatalf("redis service should have been reaped") + } + if checks := agent.state.CriticalChecks(); len(checks) > 0 { + t.Fatalf("should not have critical checks") + } +} + +func TestAgent_Service_NoReap(t *testing.T) { + config := nextConfig() + config.CheckReapInterval = time.Millisecond + config.CheckDeregisterIntervalMin = 0 + dir, agent := makeAgent(t, config) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + svc := &structs.NodeService{ + ID: "redis", + Service: "redis", + Tags: []string{"foo"}, + Port: 8000, + } + chkTypes := CheckTypes{ + &CheckType{ + Status: structs.HealthPassing, + TTL: 10 * time.Millisecond, + }, + } + + // Register the service. + if err := agent.AddService(svc, chkTypes, false, ""); err != nil { + t.Fatalf("err: %v", err) + } + + // Make sure it's there and there's no critical check yet. + if _, ok := agent.state.Services()["redis"]; !ok { + t.Fatalf("should have redis service") + } + if checks := agent.state.CriticalChecks(); len(checks) > 0 { + t.Fatalf("should not have critical checks") + } + + // Wait for the check TTL to fail. + time.Sleep(30 * time.Millisecond) + if _, ok := agent.state.Services()["redis"]; !ok { + t.Fatalf("should have redis service") + } + if checks := agent.state.CriticalChecks(); len(checks) != 1 { + t.Fatalf("should have a critical check") + } + + // Wait a while and make sure it doesn't reap. + time.Sleep(300 * time.Millisecond) + if _, ok := agent.state.Services()["redis"]; !ok { + t.Fatalf("should have redis service") + } + if checks := agent.state.CriticalChecks(); len(checks) != 1 { + t.Fatalf("should have a critical check") + } +} + func TestAgent_addCheck_restoresSnapshot(t *testing.T) { config := nextConfig() dir, agent := makeAgent(t, config) diff --git a/command/agent/check.go b/command/agent/check.go index a0102b9914..7bf67e9a7e 100644 --- a/command/agent/check.go +++ b/command/agent/check.go @@ -35,12 +35,11 @@ const ( HttpUserAgent = "Consul Health Check" ) -// CheckType is used to create either the CheckMonitor -// or the CheckTTL. -// Five types are supported: Script, HTTP, TCP, Docker and TTL -// Script, HTTP, Docker and TCP all require Interval -// Only one of the types needs to be provided -// TTL or Script/Interval or HTTP/Interval or TCP/Interval or Docker/Interval +// CheckType is used to create either the CheckMonitor or the CheckTTL. +// Five types are supported: Script, HTTP, TCP, Docker and TTL. Script, HTTP, +// Docker and TCP all require Interval. Only one of the types may to be +// provided: TTL or Script/Interval or HTTP/Interval or TCP/Interval or +// Docker/Interval. type CheckType struct { Script string HTTP string @@ -52,6 +51,11 @@ type CheckType struct { Timeout time.Duration TTL time.Duration + // DeregisterCriticalServiceAfter, if >0, will cause the associated + // service, if any, to be deregistered if this check is critical for + // longer than this duration. + DeregisterCriticalServiceAfter time.Duration + Status string Notes string diff --git a/command/agent/config.go b/command/agent/config.go index e2bb2e3849..3675e100f8 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -422,6 +422,14 @@ type Config struct { CheckUpdateInterval time.Duration `mapstructure:"-"` CheckUpdateIntervalRaw string `mapstructure:"check_update_interval" json:"-"` + // CheckReapInterval controls the interval on which we will look for + // failed checks and reap their associated services, if so configured. + CheckReapInterval time.Duration `mapstructure:"-"` + + // CheckDeregisterIntervalMin is the smallest allowed interval to set + // a check's DeregisterCriticalServiceAfter value to. + CheckDeregisterIntervalMin time.Duration `mapstructure:"-"` + // ACLToken is the default token used to make requests if a per-request // token is not provided. If not configured the 'anonymous' token is used. ACLToken string `mapstructure:"acl_token" json:"-"` @@ -632,11 +640,13 @@ func DefaultConfig() *Config { Telemetry: Telemetry{ StatsitePrefix: "consul", }, - SyslogFacility: "LOCAL0", - Protocol: consul.ProtocolVersion2Compatible, - CheckUpdateInterval: 5 * time.Minute, - AEInterval: time.Minute, - DisableCoordinates: false, + SyslogFacility: "LOCAL0", + Protocol: consul.ProtocolVersion2Compatible, + CheckUpdateInterval: 5 * time.Minute, + CheckDeregisterIntervalMin: time.Minute, + CheckReapInterval: 30 * time.Second, + AEInterval: time.Minute, + DisableCoordinates: false, // SyncCoordinateRateTarget is set based on the rate that we want // the server to handle as an aggregate across the entire cluster. @@ -975,6 +985,7 @@ AFTER_FIX: func FixupCheckType(raw interface{}) error { var ttlKey, intervalKey, timeoutKey string + const deregisterKey = "DeregisterCriticalServiceAfter" // Handle decoding of time durations rawMap, ok := raw.(map[string]interface{}) @@ -990,12 +1001,15 @@ func FixupCheckType(raw interface{}) error { intervalKey = k case "timeout": timeoutKey = k + case "deregister_critical_service_after": + rawMap[deregisterKey] = v + delete(rawMap, k) case "service_id": rawMap["serviceid"] = v - delete(rawMap, "service_id") + delete(rawMap, k) case "docker_container_id": rawMap["DockerContainerID"] = v - delete(rawMap, "docker_container_id") + delete(rawMap, k) } } @@ -1032,6 +1046,17 @@ func FixupCheckType(raw interface{}) error { } } + if deregister, ok := rawMap[deregisterKey]; ok { + timeoutS, ok := deregister.(string) + if ok { + if dur, err := time.ParseDuration(timeoutS); err != nil { + return err + } else { + rawMap[deregisterKey] = dur + } + } + } + return nil } diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 354dfeca6a..3b4de35ed0 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -1258,7 +1258,7 @@ func TestDecodeConfig_Multiples(t *testing.T) { func TestDecodeConfig_Service(t *testing.T) { // Basics - input := `{"service": {"id": "red1", "name": "redis", "tags": ["master"], "port":8000, "check": {"script": "/bin/check_redis", "interval": "10s", "ttl": "15s" }}}` + input := `{"service": {"id": "red1", "name": "redis", "tags": ["master"], "port":8000, "check": {"script": "/bin/check_redis", "interval": "10s", "ttl": "15s", "DeregisterCriticalServiceAfter": "90m" }}}` config, err := DecodeConfig(bytes.NewReader([]byte(input))) if err != nil { t.Fatalf("err: %s", err) @@ -1296,11 +1296,15 @@ func TestDecodeConfig_Service(t *testing.T) { if serv.Check.TTL != 15*time.Second { t.Fatalf("bad: %v", serv) } + + if serv.Check.DeregisterCriticalServiceAfter != 90*time.Minute { + t.Fatalf("bad: %v", serv) + } } func TestDecodeConfig_Check(t *testing.T) { // Basics - input := `{"check": {"id": "chk1", "name": "mem", "notes": "foobar", "script": "/bin/check_redis", "interval": "10s", "ttl": "15s", "shell": "/bin/bash", "docker_container_id": "redis" }}` + input := `{"check": {"id": "chk1", "name": "mem", "notes": "foobar", "script": "/bin/check_redis", "interval": "10s", "ttl": "15s", "shell": "/bin/bash", "docker_container_id": "redis", "deregister_critical_service_after": "90s" }}` config, err := DecodeConfig(bytes.NewReader([]byte(input))) if err != nil { t.Fatalf("err: %s", err) @@ -1342,6 +1346,10 @@ func TestDecodeConfig_Check(t *testing.T) { if chk.DockerContainerID != "redis" { t.Fatalf("bad: %v", chk) } + + if chk.DeregisterCriticalServiceAfter != 90*time.Second { + t.Fatalf("bad: %v", chk) + } } func TestMergeConfig(t *testing.T) { diff --git a/command/agent/local.go b/command/agent/local.go index 6c6ae1362d..98d8c86efb 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -57,9 +57,10 @@ type localState struct { serviceTokens map[string]string // Checks tracks the local checks - checks map[types.CheckID]*structs.HealthCheck - checkStatus map[types.CheckID]syncStatus - checkTokens map[types.CheckID]string + checks map[types.CheckID]*structs.HealthCheck + checkStatus map[types.CheckID]syncStatus + checkTokens map[types.CheckID]string + checkCriticalTime map[types.CheckID]time.Time // Used to track checks that are being deferred deferCheck map[types.CheckID]*time.Timer @@ -83,6 +84,7 @@ func (l *localState) Init(config *Config, logger *log.Logger) { l.checks = make(map[types.CheckID]*structs.HealthCheck) l.checkStatus = make(map[types.CheckID]syncStatus) l.checkTokens = make(map[types.CheckID]string) + l.checkCriticalTime = make(map[types.CheckID]time.Time) l.deferCheck = make(map[types.CheckID]*time.Timer) l.consulCh = make(chan struct{}, 1) l.triggerCh = make(chan struct{}, 1) @@ -222,6 +224,7 @@ func (l *localState) AddCheck(check *structs.HealthCheck, token string) { l.checks[check.CheckID] = check l.checkStatus[check.CheckID] = syncStatus{} l.checkTokens[check.CheckID] = token + delete(l.checkCriticalTime, check.CheckID) l.changeMade() } @@ -233,6 +236,7 @@ func (l *localState) RemoveCheck(checkID types.CheckID) { delete(l.checks, checkID) delete(l.checkTokens, checkID) + delete(l.checkCriticalTime, checkID) l.checkStatus[checkID] = syncStatus{remoteDelete: true} l.changeMade() } @@ -247,6 +251,17 @@ func (l *localState) UpdateCheck(checkID types.CheckID, status, output string) { return } + // Update the critical time tracking (this doesn't cause a server updates + // so we can always keep this up to date). + if status == structs.HealthCritical { + _, wasCritical := l.checkCriticalTime[checkID] + if !wasCritical { + l.checkCriticalTime[checkID] = time.Now() + } + } else { + delete(l.checkCriticalTime, checkID) + } + // Do nothing if update is idempotent if check.Status == status && check.Output == output { return @@ -294,6 +309,34 @@ func (l *localState) Checks() map[types.CheckID]*structs.HealthCheck { return checks } +// CriticalCheck is used to return the duration a check has been critical along +// with its associated health check. +type CriticalCheck struct { + CriticalFor time.Duration + Check *structs.HealthCheck +} + +// CriticalChecks returns locally registered health checks that the agent is +// aware of and are being kept in sync with the server, and that are in a +// critical state. This also returns information about how long each check has +// been critical. +func (l *localState) CriticalChecks() map[types.CheckID]CriticalCheck { + checks := make(map[types.CheckID]CriticalCheck) + + l.RLock() + defer l.RUnlock() + + now := time.Now() + for checkID, criticalTime := range l.checkCriticalTime { + checks[checkID] = CriticalCheck{ + CriticalFor: now.Sub(criticalTime), + Check: l.checks[checkID], + } + } + + return checks +} + // antiEntropy is a long running method used to perform anti-entropy // between local and remote state. func (l *localState) antiEntropy(shutdownCh chan struct{}) { @@ -546,7 +589,7 @@ func (l *localState) deleteService(id string) error { return err } -// deleteCheck is used to delete a service from the server +// deleteCheck is used to delete a check from the server func (l *localState) deleteCheck(id types.CheckID) error { if id == "" { return fmt.Errorf("CheckID missing") diff --git a/command/agent/local_test.go b/command/agent/local_test.go index 0b4e880fb3..c771ec3df8 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/consul/types" ) func TestAgentAntiEntropy_Services(t *testing.T) { @@ -959,6 +960,66 @@ func TestAgent_checkTokens(t *testing.T) { } } +func TestAgent_checkCriticalTime(t *testing.T) { + config := nextConfig() + l := new(localState) + l.Init(config, nil) + + // Add a passing check and make sure it's not critical. + checkID := types.CheckID("redis:1") + chk := &structs.HealthCheck{ + Node: "node", + CheckID: checkID, + Name: "redis:1", + ServiceID: "redis", + Status: structs.HealthPassing, + } + l.AddCheck(chk, "") + if checks := l.CriticalChecks(); len(checks) > 0 { + t.Fatalf("should not have any critical checks") + } + + // Set it to warning and make sure that doesn't show up as critical. + l.UpdateCheck(checkID, structs.HealthWarning, "") + if checks := l.CriticalChecks(); len(checks) > 0 { + t.Fatalf("should not have any critical checks") + } + + // Fail the check and make sure the time looks reasonable. + l.UpdateCheck(checkID, structs.HealthCritical, "") + if crit, ok := l.CriticalChecks()[checkID]; !ok { + t.Fatalf("should have a critical check") + } else if crit.CriticalFor > time.Millisecond { + t.Fatalf("bad: %#v", crit) + } + + // Wait a while, then fail it again and make sure the time keeps track + // of the initial failure, and doesn't reset here. + time.Sleep(10 * time.Millisecond) + l.UpdateCheck(chk.CheckID, structs.HealthCritical, "") + if crit, ok := l.CriticalChecks()[checkID]; !ok { + t.Fatalf("should have a critical check") + } else if crit.CriticalFor < 5*time.Millisecond || + crit.CriticalFor > 15*time.Millisecond { + t.Fatalf("bad: %#v", crit) + } + + // Set it passing again. + l.UpdateCheck(checkID, structs.HealthPassing, "") + if checks := l.CriticalChecks(); len(checks) > 0 { + t.Fatalf("should not have any critical checks") + } + + // Fail the check and make sure the time looks like it started again + // from the latest failure, not the original one. + l.UpdateCheck(checkID, structs.HealthCritical, "") + if crit, ok := l.CriticalChecks()[checkID]; !ok { + t.Fatalf("should have a critical check") + } else if crit.CriticalFor > time.Millisecond { + t.Fatalf("bad: %#v", crit) + } +} + func TestAgent_nestedPauseResume(t *testing.T) { l := new(localState) if l.isPaused() != false { diff --git a/website/source/docs/agent/checks.html.markdown b/website/source/docs/agent/checks.html.markdown index 105cbf1af9..dd6bf1541a 100644 --- a/website/source/docs/agent/checks.html.markdown +++ b/website/source/docs/agent/checks.html.markdown @@ -169,6 +169,16 @@ parsed by Go's `time` package, and has the following > optional fraction and a unit suffix, such as "300ms", "-1.5h" or "2h45m". > Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". +Checks that are associated with a service may also contain an optional +`deregister_critical_service_after` field, which is a timeout in the same Go time +format as `interval` and `ttl`. If a check is in the critical state for more than this +configured value, then its associated service (and all of its associated checks) +will automatically be deregistered. The minimum timeout is 1 minute, and the +process that reaps critical services runs every 15 seconds, so a may take slightly +longer than the configured timeout to trigger the deregistration. This should +generally be configured with a timeout that's much, much longer than any expected +recoverable outage for the given service. + To configure a check, either provide it as a `-config-file` option to the agent or place it inside the `-config-dir` of the agent. The file must end in the ".json" extension to be loaded by Consul. Check definitions can diff --git a/website/source/docs/agent/http/agent.html.markdown b/website/source/docs/agent/http/agent.html.markdown index ccbe6fe61a..158ac2a4d2 100644 --- a/website/source/docs/agent/http/agent.html.markdown +++ b/website/source/docs/agent/http/agent.html.markdown @@ -243,13 +243,14 @@ body must look like: "ID": "mem", "Name": "Memory utilization", "Notes": "Ensure we don't oversubscribe memory", + "DeregisterCriticalServiceAfter": "90m" "Script": "/usr/local/bin/check_mem.py", "DockerContainerID": "f972c95ebf0e", "Shell": "/bin/bash", "HTTP": "http://example.com", "TCP": "example.com:22", "Interval": "10s", - "TTL": "15s" + "TTL": "15s", } ``` @@ -261,6 +262,16 @@ If an `ID` is not provided, it is set to `Name`. You cannot have duplicate The `Notes` field is not used internally by Consul and is meant to be human-readable. +Checks that are associated with a service may also contain an optional +`DeregisterCriticalServiceAfter` field, which is a timeout in the same Go time +format as `Interval` and `TTL`. If a check is in the critical state for more than this +configured value, then its associated service (and all of its associated checks) +will automatically be deregistered. The minimum timeout is 1 minute, and the +process that reaps critical services runs every 15 seconds, so a may take slightly +longer than the configured timeout to trigger the deregistration. This should +generally be configured with a timeout that's much, much longer than any expected +recoverable outage for the given service. + If a `Script` is provided, the check type is a script, and Consul will evaluate the script every `Interval` to update the status. @@ -389,6 +400,7 @@ body must look like: "Port": 8000, "EnableTagOverride": false, "Check": { + "DeregisterCriticalServiceAfter": "90m", "Script": "/usr/local/bin/check_redis.py", "HTTP": "http://localhost:5000/health", "Interval": "10s", @@ -413,6 +425,17 @@ information. If `Check` is provided, only one of `Script`, `HTTP`, `TCP` or `TTL` should be specified. `Script` and `HTTP` also require `Interval`. The created check will be named "service:\". + +Checks that are associated with a service may also contain an optional +`DeregisterCriticalServiceAfter` field, which is a timeout in the same Go time +format as `Interval` and `TTL`. If a check is in the critical state for more than this +configured value, then its associated service (and all of its associated checks) +will automatically be deregistered. The minimum timeout is 1 minute, and the +process that reaps critical services runs every 15 seconds, so a may take slightly +longer than the configured timeout to trigger the deregistration. This should +generally be configured with a timeout that's much, much longer than any expected +recoverable outage for the given service. + There is more information about checks [here](/docs/agent/checks.html). `EnableTagOverride` can optionally be specified to disable the anti-entropy