mirror of https://github.com/status-im/consul.git
Adds ability to deregister a service based on critical check state longer than a timeout.
This commit is contained in:
parent
315b9d4ea4
commit
4a3d7db24f
11
api/agent.go
11
api/agent.go
|
@ -62,8 +62,7 @@ type AgentCheckRegistration struct {
|
|||
AgentServiceCheck
|
||||
}
|
||||
|
||||
// AgentServiceCheck is used to create an associated
|
||||
// check for a service
|
||||
// AgentServiceCheck is used to define a node or service level check
|
||||
type AgentServiceCheck struct {
|
||||
Script string `json:",omitempty"`
|
||||
DockerContainerID string `json:",omitempty"`
|
||||
|
@ -74,6 +73,14 @@ type AgentServiceCheck struct {
|
|||
HTTP string `json:",omitempty"`
|
||||
TCP string `json:",omitempty"`
|
||||
Status string `json:",omitempty"`
|
||||
|
||||
// Checks that are associated with a service may also contain this
|
||||
// optional DeregisterCriticalServiceAfter field, which is a timeout in
|
||||
// the same Go time format as Interval and TTL. If a check is in the
|
||||
// critical state for more than this configured value, then its
|
||||
// associated service (and all of its associated checks) will
|
||||
// automatically be deregistered.
|
||||
DeregisterCriticalServiceAfter string `json:",omitempty"`
|
||||
}
|
||||
type AgentServiceChecks []*AgentServiceCheck
|
||||
|
||||
|
|
|
@ -455,6 +455,13 @@ func TestAgent_Checks_serviceBound(t *testing.T) {
|
|||
ServiceID: "redis",
|
||||
}
|
||||
reg.TTL = "15s"
|
||||
reg.DeregisterCriticalServiceAfter = "nope"
|
||||
err := agent.CheckRegister(reg)
|
||||
if err == nil || !strings.Contains(err.Error(), "invalid duration") {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
reg.DeregisterCriticalServiceAfter = "90m"
|
||||
if err := agent.CheckRegister(reg); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
|
|
@ -74,6 +74,10 @@ type Agent struct {
|
|||
// services and checks. Used for anti-entropy.
|
||||
state localState
|
||||
|
||||
// checkReapAfter maps the check ID to a timeout after which we should
|
||||
// reap its associated service
|
||||
checkReapAfter map[types.CheckID]time.Duration
|
||||
|
||||
// checkMonitors maps the check ID to an associated monitor
|
||||
checkMonitors map[types.CheckID]*CheckMonitor
|
||||
|
||||
|
@ -174,24 +178,25 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
|
|||
}
|
||||
|
||||
agent := &Agent{
|
||||
config: config,
|
||||
logger: log.New(logOutput, "", log.LstdFlags),
|
||||
logOutput: logOutput,
|
||||
checkMonitors: make(map[types.CheckID]*CheckMonitor),
|
||||
checkTTLs: make(map[types.CheckID]*CheckTTL),
|
||||
checkHTTPs: make(map[types.CheckID]*CheckHTTP),
|
||||
checkTCPs: make(map[types.CheckID]*CheckTCP),
|
||||
checkDockers: make(map[types.CheckID]*CheckDocker),
|
||||
eventCh: make(chan serf.UserEvent, 1024),
|
||||
eventBuf: make([]*UserEvent, 256),
|
||||
shutdownCh: make(chan struct{}),
|
||||
endpoints: make(map[string]string),
|
||||
config: config,
|
||||
logger: log.New(logOutput, "", log.LstdFlags),
|
||||
logOutput: logOutput,
|
||||
checkReapAfter: make(map[types.CheckID]time.Duration),
|
||||
checkMonitors: make(map[types.CheckID]*CheckMonitor),
|
||||
checkTTLs: make(map[types.CheckID]*CheckTTL),
|
||||
checkHTTPs: make(map[types.CheckID]*CheckHTTP),
|
||||
checkTCPs: make(map[types.CheckID]*CheckTCP),
|
||||
checkDockers: make(map[types.CheckID]*CheckDocker),
|
||||
eventCh: make(chan serf.UserEvent, 1024),
|
||||
eventBuf: make([]*UserEvent, 256),
|
||||
shutdownCh: make(chan struct{}),
|
||||
endpoints: make(map[string]string),
|
||||
}
|
||||
|
||||
// Initialize the local state
|
||||
// Initialize the local state.
|
||||
agent.state.Init(config, agent.logger)
|
||||
|
||||
// Setup either the client or the server
|
||||
// Setup either the client or the server.
|
||||
var err error
|
||||
if config.Server {
|
||||
err = agent.setupServer()
|
||||
|
@ -213,7 +218,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// Load checks/services
|
||||
// Load checks/services.
|
||||
if err := agent.loadServices(config); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -221,7 +226,11 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// Start handling events
|
||||
// Start watching for critical services to deregister, based on their
|
||||
// checks.
|
||||
go agent.reapServices()
|
||||
|
||||
// Start handling events.
|
||||
go agent.handleEvents()
|
||||
|
||||
// Start sending network coordinate to the server.
|
||||
|
@ -229,7 +238,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
|
|||
go agent.sendCoordinate()
|
||||
}
|
||||
|
||||
// Write out the PID file if necessary
|
||||
// Write out the PID file if necessary.
|
||||
err = agent.storePid()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -664,6 +673,52 @@ func (a *Agent) sendCoordinate() {
|
|||
}
|
||||
}
|
||||
|
||||
// reapServices is a long running goroutine that looks for checks that have been
|
||||
// critical too long and dregisters their associated services.
|
||||
func (a *Agent) reapServices() {
|
||||
reap := func() {
|
||||
reaped := make(map[string]struct{})
|
||||
for checkID, check := range a.state.CriticalChecks() {
|
||||
// There's nothing to do if there's no service.
|
||||
if check.Check.ServiceID == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
// There might be multiple checks for one service, so
|
||||
// we don't need to reap multiple times.
|
||||
serviceID := check.Check.ServiceID
|
||||
if _, ok := reaped[serviceID]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// See if there's a timeout.
|
||||
a.checkLock.Lock()
|
||||
timeout, ok := a.checkReapAfter[checkID]
|
||||
a.checkLock.Unlock()
|
||||
|
||||
// Reap, if necessary. We keep track of which service
|
||||
// this is so that we won't try to remove it again.
|
||||
if ok && check.CriticalFor > timeout {
|
||||
reaped[serviceID] = struct{}{}
|
||||
a.RemoveService(serviceID, true)
|
||||
a.logger.Printf("[INFO] agent: Check %q for service %q has been critical for too long; deregistered service",
|
||||
checkID, serviceID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-time.After(a.config.CheckReapInterval):
|
||||
reap()
|
||||
|
||||
case <-a.shutdownCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// persistService saves a service definition to a JSON file in the data dir
|
||||
func (a *Agent) persistService(service *structs.NodeService) error {
|
||||
svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID))
|
||||
|
@ -987,6 +1042,18 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist
|
|||
} else {
|
||||
return fmt.Errorf("Check type is not valid")
|
||||
}
|
||||
|
||||
if chkType.DeregisterCriticalServiceAfter > 0 {
|
||||
timeout := chkType.DeregisterCriticalServiceAfter
|
||||
if timeout < a.config.CheckDeregisterIntervalMin {
|
||||
timeout = a.config.CheckDeregisterIntervalMin
|
||||
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has deregister interval below minimum of %v",
|
||||
check.CheckID, a.config.CheckDeregisterIntervalMin))
|
||||
}
|
||||
a.checkReapAfter[check.CheckID] = timeout
|
||||
} else {
|
||||
delete(a.checkReapAfter, check.CheckID)
|
||||
}
|
||||
}
|
||||
|
||||
// Add to the local state for anti-entropy
|
||||
|
@ -1015,6 +1082,7 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error {
|
|||
defer a.checkLock.Unlock()
|
||||
|
||||
// Stop any monitors
|
||||
delete(a.checkReapAfter, checkID)
|
||||
if check, ok := a.checkMonitors[checkID]; ok {
|
||||
check.Stop()
|
||||
delete(a.checkMonitors, checkID)
|
||||
|
@ -1043,25 +1111,27 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// UpdateCheck is used to update the status of a check.
|
||||
// This can only be used with checks of the TTL type.
|
||||
func (a *Agent) UpdateCheck(checkID types.CheckID, status, output string) error {
|
||||
// updateTTLCheck is used to update the status of a TTL check via the Agent API.
|
||||
func (a *Agent) updateTTLCheck(checkID types.CheckID, status, output string) error {
|
||||
a.checkLock.Lock()
|
||||
defer a.checkLock.Unlock()
|
||||
|
||||
// Grab the TTL check.
|
||||
check, ok := a.checkTTLs[checkID]
|
||||
if !ok {
|
||||
return fmt.Errorf("CheckID %q does not have associated TTL", checkID)
|
||||
}
|
||||
|
||||
// Set the status through CheckTTL to reset the TTL
|
||||
// Set the status through CheckTTL to reset the TTL.
|
||||
check.SetStatus(status, output)
|
||||
|
||||
// We don't write any files in dev mode so bail here.
|
||||
if a.config.DevMode {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Always persist the state for TTL checks
|
||||
// Persist the state so the TTL check can come up in a good state after
|
||||
// an agent restart, especially with long TTL values.
|
||||
if err := a.persistCheckState(check, status, output); err != nil {
|
||||
return fmt.Errorf("failed persisting state for check %q: %s", checkID, err)
|
||||
}
|
||||
|
|
|
@ -144,7 +144,7 @@ func (s *HTTPServer) AgentDeregisterCheck(resp http.ResponseWriter, req *http.Re
|
|||
func (s *HTTPServer) AgentCheckPass(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/pass/"))
|
||||
note := req.URL.Query().Get("note")
|
||||
if err := s.agent.UpdateCheck(checkID, structs.HealthPassing, note); err != nil {
|
||||
if err := s.agent.updateTTLCheck(checkID, structs.HealthPassing, note); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.syncChanges()
|
||||
|
@ -154,7 +154,7 @@ func (s *HTTPServer) AgentCheckPass(resp http.ResponseWriter, req *http.Request)
|
|||
func (s *HTTPServer) AgentCheckWarn(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/warn/"))
|
||||
note := req.URL.Query().Get("note")
|
||||
if err := s.agent.UpdateCheck(checkID, structs.HealthWarning, note); err != nil {
|
||||
if err := s.agent.updateTTLCheck(checkID, structs.HealthWarning, note); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.syncChanges()
|
||||
|
@ -164,7 +164,7 @@ func (s *HTTPServer) AgentCheckWarn(resp http.ResponseWriter, req *http.Request)
|
|||
func (s *HTTPServer) AgentCheckFail(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/fail/"))
|
||||
note := req.URL.Query().Get("note")
|
||||
if err := s.agent.UpdateCheck(checkID, structs.HealthCritical, note); err != nil {
|
||||
if err := s.agent.updateTTLCheck(checkID, structs.HealthCritical, note); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.syncChanges()
|
||||
|
@ -216,7 +216,7 @@ func (s *HTTPServer) AgentCheckUpdate(resp http.ResponseWriter, req *http.Reques
|
|||
}
|
||||
|
||||
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/update/"))
|
||||
if err := s.agent.UpdateCheck(checkID, update.Status, update.Output); err != nil {
|
||||
if err := s.agent.updateTTLCheck(checkID, update.Status, update.Output); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.syncChanges()
|
||||
|
|
|
@ -641,7 +641,7 @@ func TestAgent_RemoveCheck(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestAgent_UpdateCheck(t *testing.T) {
|
||||
func TestAgent_updateTTLCheck(t *testing.T) {
|
||||
dir, agent := makeAgent(t, nextConfig())
|
||||
defer os.RemoveAll(dir)
|
||||
defer agent.Shutdown()
|
||||
|
@ -655,17 +655,17 @@ func TestAgent_UpdateCheck(t *testing.T) {
|
|||
chk := &CheckType{
|
||||
TTL: 15 * time.Second,
|
||||
}
|
||||
|
||||
// Add check and update it.
|
||||
err := agent.AddCheck(health, chk, false, "")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Remove check
|
||||
if err := agent.UpdateCheck("mem", structs.HealthPassing, "foo"); err != nil {
|
||||
if err := agent.updateTTLCheck("mem", structs.HealthPassing, "foo"); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Ensure we have a check mapping
|
||||
// Ensure we have a check mapping.
|
||||
status := agent.state.Checks()["mem"]
|
||||
if status.Status != structs.HealthPassing {
|
||||
t.Fatalf("bad: %v", status)
|
||||
|
@ -1247,7 +1247,7 @@ func TestAgent_unloadServices(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestAgent_ServiceMaintenanceMode(t *testing.T) {
|
||||
func TestAgent_Service_MaintenanceMode(t *testing.T) {
|
||||
config := nextConfig()
|
||||
dir, agent := makeAgent(t, config)
|
||||
defer os.RemoveAll(dir)
|
||||
|
@ -1312,6 +1312,133 @@ func TestAgent_ServiceMaintenanceMode(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestAgent_Service_Reap(t *testing.T) {
|
||||
config := nextConfig()
|
||||
config.CheckReapInterval = time.Millisecond
|
||||
config.CheckDeregisterIntervalMin = 0
|
||||
dir, agent := makeAgent(t, config)
|
||||
defer os.RemoveAll(dir)
|
||||
defer agent.Shutdown()
|
||||
|
||||
svc := &structs.NodeService{
|
||||
ID: "redis",
|
||||
Service: "redis",
|
||||
Tags: []string{"foo"},
|
||||
Port: 8000,
|
||||
}
|
||||
chkTypes := CheckTypes{
|
||||
&CheckType{
|
||||
Status: structs.HealthPassing,
|
||||
TTL: 10 * time.Millisecond,
|
||||
DeregisterCriticalServiceAfter: 100 * time.Millisecond,
|
||||
},
|
||||
}
|
||||
|
||||
// Register the service.
|
||||
if err := agent.AddService(svc, chkTypes, false, ""); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Make sure it's there and there's no critical check yet.
|
||||
if _, ok := agent.state.Services()["redis"]; !ok {
|
||||
t.Fatalf("should have redis service")
|
||||
}
|
||||
if checks := agent.state.CriticalChecks(); len(checks) > 0 {
|
||||
t.Fatalf("should not have critical checks")
|
||||
}
|
||||
|
||||
// Wait for the check TTL to fail.
|
||||
time.Sleep(30 * time.Millisecond)
|
||||
if _, ok := agent.state.Services()["redis"]; !ok {
|
||||
t.Fatalf("should have redis service")
|
||||
}
|
||||
if checks := agent.state.CriticalChecks(); len(checks) != 1 {
|
||||
t.Fatalf("should have a critical check")
|
||||
}
|
||||
|
||||
// Pass the TTL.
|
||||
if err := agent.updateTTLCheck("service:redis", structs.HealthPassing, "foo"); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if _, ok := agent.state.Services()["redis"]; !ok {
|
||||
t.Fatalf("should have redis service")
|
||||
}
|
||||
if checks := agent.state.CriticalChecks(); len(checks) > 0 {
|
||||
t.Fatalf("should not have critical checks")
|
||||
}
|
||||
|
||||
// Wait for the check TTL to fail again.
|
||||
time.Sleep(30 * time.Millisecond)
|
||||
if _, ok := agent.state.Services()["redis"]; !ok {
|
||||
t.Fatalf("should have redis service")
|
||||
}
|
||||
if checks := agent.state.CriticalChecks(); len(checks) != 1 {
|
||||
t.Fatalf("should have a critical check")
|
||||
}
|
||||
|
||||
// Wait for the reap.
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
if _, ok := agent.state.Services()["redis"]; ok {
|
||||
t.Fatalf("redis service should have been reaped")
|
||||
}
|
||||
if checks := agent.state.CriticalChecks(); len(checks) > 0 {
|
||||
t.Fatalf("should not have critical checks")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgent_Service_NoReap(t *testing.T) {
|
||||
config := nextConfig()
|
||||
config.CheckReapInterval = time.Millisecond
|
||||
config.CheckDeregisterIntervalMin = 0
|
||||
dir, agent := makeAgent(t, config)
|
||||
defer os.RemoveAll(dir)
|
||||
defer agent.Shutdown()
|
||||
|
||||
svc := &structs.NodeService{
|
||||
ID: "redis",
|
||||
Service: "redis",
|
||||
Tags: []string{"foo"},
|
||||
Port: 8000,
|
||||
}
|
||||
chkTypes := CheckTypes{
|
||||
&CheckType{
|
||||
Status: structs.HealthPassing,
|
||||
TTL: 10 * time.Millisecond,
|
||||
},
|
||||
}
|
||||
|
||||
// Register the service.
|
||||
if err := agent.AddService(svc, chkTypes, false, ""); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Make sure it's there and there's no critical check yet.
|
||||
if _, ok := agent.state.Services()["redis"]; !ok {
|
||||
t.Fatalf("should have redis service")
|
||||
}
|
||||
if checks := agent.state.CriticalChecks(); len(checks) > 0 {
|
||||
t.Fatalf("should not have critical checks")
|
||||
}
|
||||
|
||||
// Wait for the check TTL to fail.
|
||||
time.Sleep(30 * time.Millisecond)
|
||||
if _, ok := agent.state.Services()["redis"]; !ok {
|
||||
t.Fatalf("should have redis service")
|
||||
}
|
||||
if checks := agent.state.CriticalChecks(); len(checks) != 1 {
|
||||
t.Fatalf("should have a critical check")
|
||||
}
|
||||
|
||||
// Wait a while and make sure it doesn't reap.
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
if _, ok := agent.state.Services()["redis"]; !ok {
|
||||
t.Fatalf("should have redis service")
|
||||
}
|
||||
if checks := agent.state.CriticalChecks(); len(checks) != 1 {
|
||||
t.Fatalf("should have a critical check")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgent_addCheck_restoresSnapshot(t *testing.T) {
|
||||
config := nextConfig()
|
||||
dir, agent := makeAgent(t, config)
|
||||
|
|
|
@ -35,12 +35,11 @@ const (
|
|||
HttpUserAgent = "Consul Health Check"
|
||||
)
|
||||
|
||||
// CheckType is used to create either the CheckMonitor
|
||||
// or the CheckTTL.
|
||||
// Five types are supported: Script, HTTP, TCP, Docker and TTL
|
||||
// Script, HTTP, Docker and TCP all require Interval
|
||||
// Only one of the types needs to be provided
|
||||
// TTL or Script/Interval or HTTP/Interval or TCP/Interval or Docker/Interval
|
||||
// CheckType is used to create either the CheckMonitor or the CheckTTL.
|
||||
// Five types are supported: Script, HTTP, TCP, Docker and TTL. Script, HTTP,
|
||||
// Docker and TCP all require Interval. Only one of the types may to be
|
||||
// provided: TTL or Script/Interval or HTTP/Interval or TCP/Interval or
|
||||
// Docker/Interval.
|
||||
type CheckType struct {
|
||||
Script string
|
||||
HTTP string
|
||||
|
@ -52,6 +51,11 @@ type CheckType struct {
|
|||
Timeout time.Duration
|
||||
TTL time.Duration
|
||||
|
||||
// DeregisterCriticalServiceAfter, if >0, will cause the associated
|
||||
// service, if any, to be deregistered if this check is critical for
|
||||
// longer than this duration.
|
||||
DeregisterCriticalServiceAfter time.Duration
|
||||
|
||||
Status string
|
||||
|
||||
Notes string
|
||||
|
|
|
@ -422,6 +422,14 @@ type Config struct {
|
|||
CheckUpdateInterval time.Duration `mapstructure:"-"`
|
||||
CheckUpdateIntervalRaw string `mapstructure:"check_update_interval" json:"-"`
|
||||
|
||||
// CheckReapInterval controls the interval on which we will look for
|
||||
// failed checks and reap their associated services, if so configured.
|
||||
CheckReapInterval time.Duration `mapstructure:"-"`
|
||||
|
||||
// CheckDeregisterIntervalMin is the smallest allowed interval to set
|
||||
// a check's DeregisterCriticalServiceAfter value to.
|
||||
CheckDeregisterIntervalMin time.Duration `mapstructure:"-"`
|
||||
|
||||
// ACLToken is the default token used to make requests if a per-request
|
||||
// token is not provided. If not configured the 'anonymous' token is used.
|
||||
ACLToken string `mapstructure:"acl_token" json:"-"`
|
||||
|
@ -632,11 +640,13 @@ func DefaultConfig() *Config {
|
|||
Telemetry: Telemetry{
|
||||
StatsitePrefix: "consul",
|
||||
},
|
||||
SyslogFacility: "LOCAL0",
|
||||
Protocol: consul.ProtocolVersion2Compatible,
|
||||
CheckUpdateInterval: 5 * time.Minute,
|
||||
AEInterval: time.Minute,
|
||||
DisableCoordinates: false,
|
||||
SyslogFacility: "LOCAL0",
|
||||
Protocol: consul.ProtocolVersion2Compatible,
|
||||
CheckUpdateInterval: 5 * time.Minute,
|
||||
CheckDeregisterIntervalMin: time.Minute,
|
||||
CheckReapInterval: 30 * time.Second,
|
||||
AEInterval: time.Minute,
|
||||
DisableCoordinates: false,
|
||||
|
||||
// SyncCoordinateRateTarget is set based on the rate that we want
|
||||
// the server to handle as an aggregate across the entire cluster.
|
||||
|
@ -975,6 +985,7 @@ AFTER_FIX:
|
|||
|
||||
func FixupCheckType(raw interface{}) error {
|
||||
var ttlKey, intervalKey, timeoutKey string
|
||||
const deregisterKey = "DeregisterCriticalServiceAfter"
|
||||
|
||||
// Handle decoding of time durations
|
||||
rawMap, ok := raw.(map[string]interface{})
|
||||
|
@ -990,12 +1001,15 @@ func FixupCheckType(raw interface{}) error {
|
|||
intervalKey = k
|
||||
case "timeout":
|
||||
timeoutKey = k
|
||||
case "deregister_critical_service_after":
|
||||
rawMap[deregisterKey] = v
|
||||
delete(rawMap, k)
|
||||
case "service_id":
|
||||
rawMap["serviceid"] = v
|
||||
delete(rawMap, "service_id")
|
||||
delete(rawMap, k)
|
||||
case "docker_container_id":
|
||||
rawMap["DockerContainerID"] = v
|
||||
delete(rawMap, "docker_container_id")
|
||||
delete(rawMap, k)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1032,6 +1046,17 @@ func FixupCheckType(raw interface{}) error {
|
|||
}
|
||||
}
|
||||
|
||||
if deregister, ok := rawMap[deregisterKey]; ok {
|
||||
timeoutS, ok := deregister.(string)
|
||||
if ok {
|
||||
if dur, err := time.ParseDuration(timeoutS); err != nil {
|
||||
return err
|
||||
} else {
|
||||
rawMap[deregisterKey] = dur
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1258,7 +1258,7 @@ func TestDecodeConfig_Multiples(t *testing.T) {
|
|||
|
||||
func TestDecodeConfig_Service(t *testing.T) {
|
||||
// Basics
|
||||
input := `{"service": {"id": "red1", "name": "redis", "tags": ["master"], "port":8000, "check": {"script": "/bin/check_redis", "interval": "10s", "ttl": "15s" }}}`
|
||||
input := `{"service": {"id": "red1", "name": "redis", "tags": ["master"], "port":8000, "check": {"script": "/bin/check_redis", "interval": "10s", "ttl": "15s", "DeregisterCriticalServiceAfter": "90m" }}}`
|
||||
config, err := DecodeConfig(bytes.NewReader([]byte(input)))
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
|
@ -1296,11 +1296,15 @@ func TestDecodeConfig_Service(t *testing.T) {
|
|||
if serv.Check.TTL != 15*time.Second {
|
||||
t.Fatalf("bad: %v", serv)
|
||||
}
|
||||
|
||||
if serv.Check.DeregisterCriticalServiceAfter != 90*time.Minute {
|
||||
t.Fatalf("bad: %v", serv)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecodeConfig_Check(t *testing.T) {
|
||||
// Basics
|
||||
input := `{"check": {"id": "chk1", "name": "mem", "notes": "foobar", "script": "/bin/check_redis", "interval": "10s", "ttl": "15s", "shell": "/bin/bash", "docker_container_id": "redis" }}`
|
||||
input := `{"check": {"id": "chk1", "name": "mem", "notes": "foobar", "script": "/bin/check_redis", "interval": "10s", "ttl": "15s", "shell": "/bin/bash", "docker_container_id": "redis", "deregister_critical_service_after": "90s" }}`
|
||||
config, err := DecodeConfig(bytes.NewReader([]byte(input)))
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
|
@ -1342,6 +1346,10 @@ func TestDecodeConfig_Check(t *testing.T) {
|
|||
if chk.DockerContainerID != "redis" {
|
||||
t.Fatalf("bad: %v", chk)
|
||||
}
|
||||
|
||||
if chk.DeregisterCriticalServiceAfter != 90*time.Second {
|
||||
t.Fatalf("bad: %v", chk)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMergeConfig(t *testing.T) {
|
||||
|
|
|
@ -57,9 +57,10 @@ type localState struct {
|
|||
serviceTokens map[string]string
|
||||
|
||||
// Checks tracks the local checks
|
||||
checks map[types.CheckID]*structs.HealthCheck
|
||||
checkStatus map[types.CheckID]syncStatus
|
||||
checkTokens map[types.CheckID]string
|
||||
checks map[types.CheckID]*structs.HealthCheck
|
||||
checkStatus map[types.CheckID]syncStatus
|
||||
checkTokens map[types.CheckID]string
|
||||
checkCriticalTime map[types.CheckID]time.Time
|
||||
|
||||
// Used to track checks that are being deferred
|
||||
deferCheck map[types.CheckID]*time.Timer
|
||||
|
@ -83,6 +84,7 @@ func (l *localState) Init(config *Config, logger *log.Logger) {
|
|||
l.checks = make(map[types.CheckID]*structs.HealthCheck)
|
||||
l.checkStatus = make(map[types.CheckID]syncStatus)
|
||||
l.checkTokens = make(map[types.CheckID]string)
|
||||
l.checkCriticalTime = make(map[types.CheckID]time.Time)
|
||||
l.deferCheck = make(map[types.CheckID]*time.Timer)
|
||||
l.consulCh = make(chan struct{}, 1)
|
||||
l.triggerCh = make(chan struct{}, 1)
|
||||
|
@ -222,6 +224,7 @@ func (l *localState) AddCheck(check *structs.HealthCheck, token string) {
|
|||
l.checks[check.CheckID] = check
|
||||
l.checkStatus[check.CheckID] = syncStatus{}
|
||||
l.checkTokens[check.CheckID] = token
|
||||
delete(l.checkCriticalTime, check.CheckID)
|
||||
l.changeMade()
|
||||
}
|
||||
|
||||
|
@ -233,6 +236,7 @@ func (l *localState) RemoveCheck(checkID types.CheckID) {
|
|||
|
||||
delete(l.checks, checkID)
|
||||
delete(l.checkTokens, checkID)
|
||||
delete(l.checkCriticalTime, checkID)
|
||||
l.checkStatus[checkID] = syncStatus{remoteDelete: true}
|
||||
l.changeMade()
|
||||
}
|
||||
|
@ -247,6 +251,17 @@ func (l *localState) UpdateCheck(checkID types.CheckID, status, output string) {
|
|||
return
|
||||
}
|
||||
|
||||
// Update the critical time tracking (this doesn't cause a server updates
|
||||
// so we can always keep this up to date).
|
||||
if status == structs.HealthCritical {
|
||||
_, wasCritical := l.checkCriticalTime[checkID]
|
||||
if !wasCritical {
|
||||
l.checkCriticalTime[checkID] = time.Now()
|
||||
}
|
||||
} else {
|
||||
delete(l.checkCriticalTime, checkID)
|
||||
}
|
||||
|
||||
// Do nothing if update is idempotent
|
||||
if check.Status == status && check.Output == output {
|
||||
return
|
||||
|
@ -294,6 +309,34 @@ func (l *localState) Checks() map[types.CheckID]*structs.HealthCheck {
|
|||
return checks
|
||||
}
|
||||
|
||||
// CriticalCheck is used to return the duration a check has been critical along
|
||||
// with its associated health check.
|
||||
type CriticalCheck struct {
|
||||
CriticalFor time.Duration
|
||||
Check *structs.HealthCheck
|
||||
}
|
||||
|
||||
// CriticalChecks returns locally registered health checks that the agent is
|
||||
// aware of and are being kept in sync with the server, and that are in a
|
||||
// critical state. This also returns information about how long each check has
|
||||
// been critical.
|
||||
func (l *localState) CriticalChecks() map[types.CheckID]CriticalCheck {
|
||||
checks := make(map[types.CheckID]CriticalCheck)
|
||||
|
||||
l.RLock()
|
||||
defer l.RUnlock()
|
||||
|
||||
now := time.Now()
|
||||
for checkID, criticalTime := range l.checkCriticalTime {
|
||||
checks[checkID] = CriticalCheck{
|
||||
CriticalFor: now.Sub(criticalTime),
|
||||
Check: l.checks[checkID],
|
||||
}
|
||||
}
|
||||
|
||||
return checks
|
||||
}
|
||||
|
||||
// antiEntropy is a long running method used to perform anti-entropy
|
||||
// between local and remote state.
|
||||
func (l *localState) antiEntropy(shutdownCh chan struct{}) {
|
||||
|
@ -546,7 +589,7 @@ func (l *localState) deleteService(id string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// deleteCheck is used to delete a service from the server
|
||||
// deleteCheck is used to delete a check from the server
|
||||
func (l *localState) deleteCheck(id types.CheckID) error {
|
||||
if id == "" {
|
||||
return fmt.Errorf("CheckID missing")
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/consul/types"
|
||||
)
|
||||
|
||||
func TestAgentAntiEntropy_Services(t *testing.T) {
|
||||
|
@ -959,6 +960,66 @@ func TestAgent_checkTokens(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestAgent_checkCriticalTime(t *testing.T) {
|
||||
config := nextConfig()
|
||||
l := new(localState)
|
||||
l.Init(config, nil)
|
||||
|
||||
// Add a passing check and make sure it's not critical.
|
||||
checkID := types.CheckID("redis:1")
|
||||
chk := &structs.HealthCheck{
|
||||
Node: "node",
|
||||
CheckID: checkID,
|
||||
Name: "redis:1",
|
||||
ServiceID: "redis",
|
||||
Status: structs.HealthPassing,
|
||||
}
|
||||
l.AddCheck(chk, "")
|
||||
if checks := l.CriticalChecks(); len(checks) > 0 {
|
||||
t.Fatalf("should not have any critical checks")
|
||||
}
|
||||
|
||||
// Set it to warning and make sure that doesn't show up as critical.
|
||||
l.UpdateCheck(checkID, structs.HealthWarning, "")
|
||||
if checks := l.CriticalChecks(); len(checks) > 0 {
|
||||
t.Fatalf("should not have any critical checks")
|
||||
}
|
||||
|
||||
// Fail the check and make sure the time looks reasonable.
|
||||
l.UpdateCheck(checkID, structs.HealthCritical, "")
|
||||
if crit, ok := l.CriticalChecks()[checkID]; !ok {
|
||||
t.Fatalf("should have a critical check")
|
||||
} else if crit.CriticalFor > time.Millisecond {
|
||||
t.Fatalf("bad: %#v", crit)
|
||||
}
|
||||
|
||||
// Wait a while, then fail it again and make sure the time keeps track
|
||||
// of the initial failure, and doesn't reset here.
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
l.UpdateCheck(chk.CheckID, structs.HealthCritical, "")
|
||||
if crit, ok := l.CriticalChecks()[checkID]; !ok {
|
||||
t.Fatalf("should have a critical check")
|
||||
} else if crit.CriticalFor < 5*time.Millisecond ||
|
||||
crit.CriticalFor > 15*time.Millisecond {
|
||||
t.Fatalf("bad: %#v", crit)
|
||||
}
|
||||
|
||||
// Set it passing again.
|
||||
l.UpdateCheck(checkID, structs.HealthPassing, "")
|
||||
if checks := l.CriticalChecks(); len(checks) > 0 {
|
||||
t.Fatalf("should not have any critical checks")
|
||||
}
|
||||
|
||||
// Fail the check and make sure the time looks like it started again
|
||||
// from the latest failure, not the original one.
|
||||
l.UpdateCheck(checkID, structs.HealthCritical, "")
|
||||
if crit, ok := l.CriticalChecks()[checkID]; !ok {
|
||||
t.Fatalf("should have a critical check")
|
||||
} else if crit.CriticalFor > time.Millisecond {
|
||||
t.Fatalf("bad: %#v", crit)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgent_nestedPauseResume(t *testing.T) {
|
||||
l := new(localState)
|
||||
if l.isPaused() != false {
|
||||
|
|
|
@ -169,6 +169,16 @@ parsed by Go's `time` package, and has the following
|
|||
> optional fraction and a unit suffix, such as "300ms", "-1.5h" or "2h45m".
|
||||
> Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
|
||||
|
||||
Checks that are associated with a service may also contain an optional
|
||||
`deregister_critical_service_after` field, which is a timeout in the same Go time
|
||||
format as `interval` and `ttl`. If a check is in the critical state for more than this
|
||||
configured value, then its associated service (and all of its associated checks)
|
||||
will automatically be deregistered. The minimum timeout is 1 minute, and the
|
||||
process that reaps critical services runs every 15 seconds, so a may take slightly
|
||||
longer than the configured timeout to trigger the deregistration. This should
|
||||
generally be configured with a timeout that's much, much longer than any expected
|
||||
recoverable outage for the given service.
|
||||
|
||||
To configure a check, either provide it as a `-config-file` option to the
|
||||
agent or place it inside the `-config-dir` of the agent. The file must
|
||||
end in the ".json" extension to be loaded by Consul. Check definitions can
|
||||
|
|
|
@ -243,13 +243,14 @@ body must look like:
|
|||
"ID": "mem",
|
||||
"Name": "Memory utilization",
|
||||
"Notes": "Ensure we don't oversubscribe memory",
|
||||
"DeregisterCriticalServiceAfter": "90m"
|
||||
"Script": "/usr/local/bin/check_mem.py",
|
||||
"DockerContainerID": "f972c95ebf0e",
|
||||
"Shell": "/bin/bash",
|
||||
"HTTP": "http://example.com",
|
||||
"TCP": "example.com:22",
|
||||
"Interval": "10s",
|
||||
"TTL": "15s"
|
||||
"TTL": "15s",
|
||||
}
|
||||
```
|
||||
|
||||
|
@ -261,6 +262,16 @@ If an `ID` is not provided, it is set to `Name`. You cannot have duplicate
|
|||
|
||||
The `Notes` field is not used internally by Consul and is meant to be human-readable.
|
||||
|
||||
Checks that are associated with a service may also contain an optional
|
||||
`DeregisterCriticalServiceAfter` field, which is a timeout in the same Go time
|
||||
format as `Interval` and `TTL`. If a check is in the critical state for more than this
|
||||
configured value, then its associated service (and all of its associated checks)
|
||||
will automatically be deregistered. The minimum timeout is 1 minute, and the
|
||||
process that reaps critical services runs every 15 seconds, so a may take slightly
|
||||
longer than the configured timeout to trigger the deregistration. This should
|
||||
generally be configured with a timeout that's much, much longer than any expected
|
||||
recoverable outage for the given service.
|
||||
|
||||
If a `Script` is provided, the check type is a script, and Consul will
|
||||
evaluate the script every `Interval` to update the status.
|
||||
|
||||
|
@ -389,6 +400,7 @@ body must look like:
|
|||
"Port": 8000,
|
||||
"EnableTagOverride": false,
|
||||
"Check": {
|
||||
"DeregisterCriticalServiceAfter": "90m",
|
||||
"Script": "/usr/local/bin/check_redis.py",
|
||||
"HTTP": "http://localhost:5000/health",
|
||||
"Interval": "10s",
|
||||
|
@ -413,6 +425,17 @@ information.
|
|||
|
||||
If `Check` is provided, only one of `Script`, `HTTP`, `TCP` or `TTL` should be specified.
|
||||
`Script` and `HTTP` also require `Interval`. The created check will be named "service:\<ServiceId\>".
|
||||
|
||||
Checks that are associated with a service may also contain an optional
|
||||
`DeregisterCriticalServiceAfter` field, which is a timeout in the same Go time
|
||||
format as `Interval` and `TTL`. If a check is in the critical state for more than this
|
||||
configured value, then its associated service (and all of its associated checks)
|
||||
will automatically be deregistered. The minimum timeout is 1 minute, and the
|
||||
process that reaps critical services runs every 15 seconds, so a may take slightly
|
||||
longer than the configured timeout to trigger the deregistration. This should
|
||||
generally be configured with a timeout that's much, much longer than any expected
|
||||
recoverable outage for the given service.
|
||||
|
||||
There is more information about checks [here](/docs/agent/checks.html).
|
||||
|
||||
`EnableTagOverride` can optionally be specified to disable the anti-entropy
|
||||
|
|
Loading…
Reference in New Issue