mirror of
https://github.com/status-im/consul.git
synced 2025-01-22 11:40:06 +00:00
Checks to passing/critical only after reaching a consecutive success/failure threshold (#5739)
A check may be set to become passing/critical only if a specified number of successive checks return passing/critical in a row. Status will stay identical as before until the threshold is reached. This feature is available for HTTP, TCP, gRPC, Docker & Monitor checks.
This commit is contained in:
parent
b0310364c6
commit
039615641e
@ -2627,6 +2627,8 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
||||
}
|
||||
}
|
||||
|
||||
statusHandler := checks.NewStatusHandler(a.State, a.logger, chkType.SuccessBeforePassing, chkType.FailuresBeforeCritical)
|
||||
|
||||
switch {
|
||||
|
||||
case chkType.IsTTL():
|
||||
@ -2667,7 +2669,6 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
||||
tlsClientConfig := a.tlsConfigurator.OutgoingTLSConfigForCheck(chkType.TLSSkipVerify)
|
||||
|
||||
http := &checks.CheckHTTP{
|
||||
Notify: a.State,
|
||||
CheckID: check.CheckID,
|
||||
ServiceID: check.ServiceID,
|
||||
HTTP: chkType.HTTP,
|
||||
@ -2678,6 +2679,7 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
||||
Logger: a.logger,
|
||||
OutputMaxSize: maxOutputSize,
|
||||
TLSClientConfig: tlsClientConfig,
|
||||
StatusHandler: statusHandler,
|
||||
}
|
||||
|
||||
if proxy != nil && proxy.Proxy.Expose.Checks {
|
||||
@ -2704,13 +2706,13 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
||||
}
|
||||
|
||||
tcp := &checks.CheckTCP{
|
||||
Notify: a.State,
|
||||
CheckID: check.CheckID,
|
||||
ServiceID: check.ServiceID,
|
||||
TCP: chkType.TCP,
|
||||
Interval: chkType.Interval,
|
||||
Timeout: chkType.Timeout,
|
||||
Logger: a.logger,
|
||||
CheckID: check.CheckID,
|
||||
ServiceID: check.ServiceID,
|
||||
TCP: chkType.TCP,
|
||||
Interval: chkType.Interval,
|
||||
Timeout: chkType.Timeout,
|
||||
Logger: a.logger,
|
||||
StatusHandler: statusHandler,
|
||||
}
|
||||
tcp.Start()
|
||||
a.checkTCPs[check.CheckID] = tcp
|
||||
@ -2732,7 +2734,6 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
||||
}
|
||||
|
||||
grpc := &checks.CheckGRPC{
|
||||
Notify: a.State,
|
||||
CheckID: check.CheckID,
|
||||
ServiceID: check.ServiceID,
|
||||
GRPC: chkType.GRPC,
|
||||
@ -2740,6 +2741,7 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
||||
Timeout: chkType.Timeout,
|
||||
Logger: a.logger,
|
||||
TLSClientConfig: tlsClientConfig,
|
||||
StatusHandler: statusHandler,
|
||||
}
|
||||
|
||||
if proxy != nil && proxy.Proxy.Expose.Checks {
|
||||
@ -2776,7 +2778,6 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
||||
}
|
||||
|
||||
dockerCheck := &checks.CheckDocker{
|
||||
Notify: a.State,
|
||||
CheckID: check.CheckID,
|
||||
ServiceID: check.ServiceID,
|
||||
DockerContainerID: chkType.DockerContainerID,
|
||||
@ -2785,6 +2786,7 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
||||
Interval: chkType.Interval,
|
||||
Logger: a.logger,
|
||||
Client: a.dockerClient,
|
||||
StatusHandler: statusHandler,
|
||||
}
|
||||
if prev := a.checkDockers[check.CheckID]; prev != nil {
|
||||
prev.Stop()
|
||||
@ -2811,6 +2813,7 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
||||
Timeout: chkType.Timeout,
|
||||
Logger: a.logger,
|
||||
OutputMaxSize: maxOutputSize,
|
||||
StatusHandler: statusHandler,
|
||||
}
|
||||
monitor.Start()
|
||||
a.checkMonitors[check.CheckID] = monitor
|
||||
|
@ -3,7 +3,6 @@ package checks
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
@ -15,6 +14,8 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
|
||||
"github.com/armon/circbuf"
|
||||
"github.com/hashicorp/consul/agent/exec"
|
||||
"github.com/hashicorp/consul/api"
|
||||
@ -56,6 +57,7 @@ type CheckNotifier interface {
|
||||
// CheckMonitor is used to periodically invoke a script to
|
||||
// determine the health of a given check. It is compatible with
|
||||
// nagios plugins and expects the output in the same format.
|
||||
// Supports failures_before_critical and success_before_passing.
|
||||
type CheckMonitor struct {
|
||||
Notify CheckNotifier
|
||||
CheckID types.CheckID
|
||||
@ -66,6 +68,7 @@ type CheckMonitor struct {
|
||||
Timeout time.Duration
|
||||
Logger *log.Logger
|
||||
OutputMaxSize int
|
||||
StatusHandler *StatusHandler
|
||||
|
||||
stop bool
|
||||
stopCh chan struct{}
|
||||
@ -184,8 +187,7 @@ func (c *CheckMonitor) check() {
|
||||
// Check if the check passed
|
||||
outputStr := truncateAndLogOutput()
|
||||
if err == nil {
|
||||
c.Logger.Printf("[DEBUG] agent: Check %q is passing", c.CheckID)
|
||||
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, outputStr)
|
||||
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, outputStr)
|
||||
return
|
||||
}
|
||||
|
||||
@ -195,16 +197,14 @@ func (c *CheckMonitor) check() {
|
||||
if status, ok := exitErr.Sys().(syscall.WaitStatus); ok {
|
||||
code := status.ExitStatus()
|
||||
if code == 1 {
|
||||
c.Logger.Printf("[WARN] agent: Check %q is now warning", c.CheckID)
|
||||
c.Notify.UpdateCheck(c.CheckID, api.HealthWarning, outputStr)
|
||||
c.StatusHandler.updateCheck(c.CheckID, api.HealthWarning, outputStr)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Set the health as critical
|
||||
c.Logger.Printf("[WARN] agent: Check %q is now critical", c.CheckID)
|
||||
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, outputStr)
|
||||
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, outputStr)
|
||||
}
|
||||
|
||||
// CheckTTL is used to apply a TTL to check status,
|
||||
@ -308,8 +308,8 @@ func (c *CheckTTL) SetStatus(status, output string) string {
|
||||
// The check is warning if the response code is 429.
|
||||
// The check is critical if the response code is anything else
|
||||
// or if the request returns an error
|
||||
// Supports failures_before_critical and success_before_passing.
|
||||
type CheckHTTP struct {
|
||||
Notify CheckNotifier
|
||||
CheckID types.CheckID
|
||||
ServiceID string
|
||||
HTTP string
|
||||
@ -320,6 +320,7 @@ type CheckHTTP struct {
|
||||
Logger *log.Logger
|
||||
TLSClientConfig *tls.Config
|
||||
OutputMaxSize int
|
||||
StatusHandler *StatusHandler
|
||||
|
||||
httpClient *http.Client
|
||||
stop bool
|
||||
@ -418,8 +419,7 @@ func (c *CheckHTTP) check() {
|
||||
|
||||
req, err := http.NewRequest(method, target, nil)
|
||||
if err != nil {
|
||||
c.Logger.Printf("[WARN] agent: Check %q HTTP request failed: %s", c.CheckID, err)
|
||||
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
|
||||
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
@ -443,8 +443,7 @@ func (c *CheckHTTP) check() {
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
c.Logger.Printf("[WARN] agent: Check %q HTTP request failed: %s", c.CheckID, err)
|
||||
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
|
||||
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error())
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
@ -460,20 +459,15 @@ func (c *CheckHTTP) check() {
|
||||
|
||||
if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
|
||||
// PASSING (2xx)
|
||||
c.Logger.Printf("[DEBUG] agent: Check %q is passing", c.CheckID)
|
||||
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, result)
|
||||
|
||||
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, result)
|
||||
} else if resp.StatusCode == 429 {
|
||||
// WARNING
|
||||
// 429 Too Many Requests (RFC 6585)
|
||||
// The user has sent too many requests in a given amount of time.
|
||||
c.Logger.Printf("[WARN] agent: Check %q is now warning", c.CheckID)
|
||||
c.Notify.UpdateCheck(c.CheckID, api.HealthWarning, result)
|
||||
|
||||
c.StatusHandler.updateCheck(c.CheckID, api.HealthWarning, result)
|
||||
} else {
|
||||
// CRITICAL
|
||||
c.Logger.Printf("[WARN] agent: Check %q is now critical", c.CheckID)
|
||||
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, result)
|
||||
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, result)
|
||||
}
|
||||
}
|
||||
|
||||
@ -481,14 +475,15 @@ func (c *CheckHTTP) check() {
|
||||
// determine the health of a given check.
|
||||
// The check is passing if the connection succeeds
|
||||
// The check is critical if the connection returns an error
|
||||
// Supports failures_before_critical and success_before_passing.
|
||||
type CheckTCP struct {
|
||||
Notify CheckNotifier
|
||||
CheckID types.CheckID
|
||||
ServiceID string
|
||||
TCP string
|
||||
Interval time.Duration
|
||||
Timeout time.Duration
|
||||
Logger *log.Logger
|
||||
CheckID types.CheckID
|
||||
ServiceID string
|
||||
TCP string
|
||||
Interval time.Duration
|
||||
Timeout time.Duration
|
||||
Logger *log.Logger
|
||||
StatusHandler *StatusHandler
|
||||
|
||||
dialer *net.Dialer
|
||||
stop bool
|
||||
@ -549,20 +544,19 @@ func (c *CheckTCP) check() {
|
||||
conn, err := c.dialer.Dial(`tcp`, c.TCP)
|
||||
if err != nil {
|
||||
c.Logger.Printf("[WARN] agent: Check %q socket connection failed: %s", c.CheckID, err)
|
||||
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
|
||||
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error())
|
||||
return
|
||||
}
|
||||
conn.Close()
|
||||
c.Logger.Printf("[DEBUG] agent: Check %q is passing", c.CheckID)
|
||||
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("TCP connect %s: Success", c.TCP))
|
||||
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("TCP connect %s: Success", c.TCP))
|
||||
}
|
||||
|
||||
// CheckDocker is used to periodically invoke a script to
|
||||
// determine the health of an application running inside a
|
||||
// Docker Container. We assume that the script is compatible
|
||||
// with nagios plugins and expects the output in the same format.
|
||||
// Supports failures_before_critical and success_before_passing.
|
||||
type CheckDocker struct {
|
||||
Notify CheckNotifier
|
||||
CheckID types.CheckID
|
||||
ServiceID string
|
||||
Script string
|
||||
@ -572,6 +566,7 @@ type CheckDocker struct {
|
||||
Interval time.Duration
|
||||
Logger *log.Logger
|
||||
Client *DockerClient
|
||||
StatusHandler *StatusHandler
|
||||
|
||||
stop chan struct{}
|
||||
}
|
||||
@ -633,12 +628,7 @@ func (c *CheckDocker) check() {
|
||||
}
|
||||
c.Logger.Printf("[TRACE] agent: Check %q output: %s", c.CheckID, out)
|
||||
}
|
||||
|
||||
if status == api.HealthCritical {
|
||||
c.Logger.Printf("[WARN] agent: Check %q is now critical", c.CheckID)
|
||||
}
|
||||
|
||||
c.Notify.UpdateCheck(c.CheckID, status, out)
|
||||
c.StatusHandler.updateCheck(c.CheckID, status, out)
|
||||
}
|
||||
|
||||
func (c *CheckDocker) doCheck() (string, *circbuf.Buffer, error) {
|
||||
@ -681,8 +671,8 @@ func (c *CheckDocker) doCheck() (string, *circbuf.Buffer, error) {
|
||||
// The check is passing if returned status is SERVING.
|
||||
// The check is critical if connection fails or returned status is
|
||||
// not SERVING.
|
||||
// Supports failures_before_critical and success_before_passing.
|
||||
type CheckGRPC struct {
|
||||
Notify CheckNotifier
|
||||
CheckID types.CheckID
|
||||
ServiceID string
|
||||
GRPC string
|
||||
@ -690,6 +680,7 @@ type CheckGRPC struct {
|
||||
Timeout time.Duration
|
||||
TLSClientConfig *tls.Config
|
||||
Logger *log.Logger
|
||||
StatusHandler *StatusHandler
|
||||
|
||||
probe *GrpcHealthProbe
|
||||
stop bool
|
||||
@ -747,11 +738,9 @@ func (c *CheckGRPC) check() {
|
||||
|
||||
err := c.probe.Check(target)
|
||||
if err != nil {
|
||||
c.Logger.Printf("[DEBUG] agent: Check %q failed: %s", c.CheckID, err.Error())
|
||||
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
|
||||
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error())
|
||||
} else {
|
||||
c.Logger.Printf("[DEBUG] agent: Check %q is passing", c.CheckID)
|
||||
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("gRPC check %s: success", target))
|
||||
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("gRPC check %s: success", target))
|
||||
}
|
||||
}
|
||||
|
||||
@ -763,3 +752,50 @@ func (c *CheckGRPC) Stop() {
|
||||
close(c.stopCh)
|
||||
}
|
||||
}
|
||||
|
||||
// StatusHandler keep tracks of successive error/success counts and ensures
|
||||
// that status can be set to critical/passing only once the successive number of event
|
||||
// reaches the given threshold.
|
||||
type StatusHandler struct {
|
||||
inner CheckNotifier
|
||||
logger *log.Logger
|
||||
successBeforePassing int
|
||||
successCounter int
|
||||
failuresBeforeCritical int
|
||||
failuresCounter int
|
||||
}
|
||||
|
||||
// NewStatusHandler set counters values to threshold in order to immediatly update status after first check.
|
||||
func NewStatusHandler(inner CheckNotifier, logger *log.Logger, successBeforePassing, failuresBeforeCritical int) *StatusHandler {
|
||||
return &StatusHandler{
|
||||
logger: logger,
|
||||
inner: inner,
|
||||
successBeforePassing: successBeforePassing,
|
||||
successCounter: successBeforePassing,
|
||||
failuresBeforeCritical: failuresBeforeCritical,
|
||||
failuresCounter: failuresBeforeCritical,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StatusHandler) updateCheck(checkID types.CheckID, status, output string) {
|
||||
|
||||
if status == api.HealthPassing || status == api.HealthWarning {
|
||||
s.successCounter++
|
||||
s.failuresCounter = 0
|
||||
if s.successCounter >= s.successBeforePassing {
|
||||
s.logger.Printf("[DEBUG] agent: Check %q is %q", checkID, status)
|
||||
s.inner.UpdateCheck(checkID, status, output)
|
||||
return
|
||||
}
|
||||
s.logger.Printf("[WARN] agent: Check %q was %q but has not reached success threshold %d/%d", checkID, status, s.successCounter, s.successBeforePassing)
|
||||
} else {
|
||||
s.failuresCounter++
|
||||
s.successCounter = 0
|
||||
if s.failuresCounter >= s.failuresBeforeCritical {
|
||||
s.logger.Printf("[WARN] agent: Check %q is now critical", checkID)
|
||||
s.inner.UpdateCheck(checkID, status, output)
|
||||
return
|
||||
}
|
||||
s.logger.Printf("[WARN] agent: Check %q failed but has not reached failure threshold %d/%d", checkID, s.failuresCounter, s.failuresBeforeCritical)
|
||||
}
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func uniqueID() string {
|
||||
@ -43,13 +44,17 @@ func TestCheckMonitor_Script(t *testing.T) {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.status, func(t *testing.T) {
|
||||
notif := mock.NewNotify()
|
||||
logger := log.New(ioutil.Discard, uniqueID(), log.LstdFlags)
|
||||
statusHandler := NewStatusHandler(notif, logger, 0, 0)
|
||||
|
||||
check := &CheckMonitor{
|
||||
Notify: notif,
|
||||
CheckID: types.CheckID("foo"),
|
||||
Script: tt.script,
|
||||
Interval: 25 * time.Millisecond,
|
||||
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||
OutputMaxSize: DefaultBufSize,
|
||||
Logger: logger,
|
||||
StatusHandler: statusHandler,
|
||||
}
|
||||
check.Start()
|
||||
defer check.Stop()
|
||||
@ -79,13 +84,16 @@ func TestCheckMonitor_Args(t *testing.T) {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.status, func(t *testing.T) {
|
||||
notif := mock.NewNotify()
|
||||
logger := log.New(ioutil.Discard, uniqueID(), log.LstdFlags)
|
||||
statusHandler := NewStatusHandler(notif, logger, 0, 0)
|
||||
check := &CheckMonitor{
|
||||
Notify: notif,
|
||||
CheckID: types.CheckID("foo"),
|
||||
ScriptArgs: tt.args,
|
||||
Interval: 25 * time.Millisecond,
|
||||
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||
OutputMaxSize: DefaultBufSize,
|
||||
Logger: logger,
|
||||
StatusHandler: statusHandler,
|
||||
}
|
||||
check.Start()
|
||||
defer check.Stop()
|
||||
@ -104,14 +112,18 @@ func TestCheckMonitor_Args(t *testing.T) {
|
||||
func TestCheckMonitor_Timeout(t *testing.T) {
|
||||
// t.Parallel() // timing test. no parallel
|
||||
notif := mock.NewNotify()
|
||||
logger := log.New(ioutil.Discard, uniqueID(), log.LstdFlags)
|
||||
statusHandler := NewStatusHandler(notif, logger, 0, 0)
|
||||
|
||||
check := &CheckMonitor{
|
||||
Notify: notif,
|
||||
CheckID: types.CheckID("foo"),
|
||||
ScriptArgs: []string{"sh", "-c", "sleep 1 && exit 0"},
|
||||
Interval: 50 * time.Millisecond,
|
||||
Timeout: 25 * time.Millisecond,
|
||||
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||
OutputMaxSize: DefaultBufSize,
|
||||
Logger: logger,
|
||||
StatusHandler: statusHandler,
|
||||
}
|
||||
check.Start()
|
||||
defer check.Stop()
|
||||
@ -130,13 +142,16 @@ func TestCheckMonitor_Timeout(t *testing.T) {
|
||||
func TestCheckMonitor_RandomStagger(t *testing.T) {
|
||||
// t.Parallel() // timing test. no parallel
|
||||
notif := mock.NewNotify()
|
||||
logger := log.New(ioutil.Discard, uniqueID(), log.LstdFlags)
|
||||
statusHandler := NewStatusHandler(notif, logger, 0, 0)
|
||||
check := &CheckMonitor{
|
||||
Notify: notif,
|
||||
CheckID: types.CheckID("foo"),
|
||||
ScriptArgs: []string{"sh", "-c", "exit 0"},
|
||||
Interval: 25 * time.Millisecond,
|
||||
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||
OutputMaxSize: DefaultBufSize,
|
||||
Logger: logger,
|
||||
StatusHandler: statusHandler,
|
||||
}
|
||||
check.Start()
|
||||
defer check.Stop()
|
||||
@ -156,13 +171,16 @@ func TestCheckMonitor_RandomStagger(t *testing.T) {
|
||||
func TestCheckMonitor_LimitOutput(t *testing.T) {
|
||||
t.Parallel()
|
||||
notif := mock.NewNotify()
|
||||
logger := log.New(ioutil.Discard, uniqueID(), log.LstdFlags)
|
||||
statusHandler := NewStatusHandler(notif, logger, 0, 0)
|
||||
check := &CheckMonitor{
|
||||
Notify: notif,
|
||||
CheckID: types.CheckID("foo"),
|
||||
ScriptArgs: []string{"od", "-N", "81920", "/dev/urandom"},
|
||||
Interval: 25 * time.Millisecond,
|
||||
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||
OutputMaxSize: DefaultBufSize,
|
||||
Logger: logger,
|
||||
StatusHandler: statusHandler,
|
||||
}
|
||||
check.Start()
|
||||
defer check.Stop()
|
||||
@ -299,15 +317,17 @@ func TestCheckHTTP(t *testing.T) {
|
||||
defer server.Close()
|
||||
|
||||
notif := mock.NewNotify()
|
||||
logger := log.New(ioutil.Discard, uniqueID(), log.LstdFlags)
|
||||
statusHandler := NewStatusHandler(notif, logger, 0, 0)
|
||||
|
||||
check := &CheckHTTP{
|
||||
Notify: notif,
|
||||
CheckID: types.CheckID("foo"),
|
||||
HTTP: server.URL,
|
||||
Method: tt.method,
|
||||
OutputMaxSize: DefaultBufSize,
|
||||
Header: tt.header,
|
||||
Interval: 10 * time.Millisecond,
|
||||
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||
Logger: logger,
|
||||
StatusHandler: statusHandler,
|
||||
}
|
||||
check.Start()
|
||||
defer check.Stop()
|
||||
@ -337,15 +357,18 @@ func TestCheckHTTP_Proxied(t *testing.T) {
|
||||
defer proxy.Close()
|
||||
|
||||
notif := mock.NewNotify()
|
||||
logger := log.New(ioutil.Discard, uniqueID(), log.LstdFlags)
|
||||
statusHandler := NewStatusHandler(notif, logger, 0, 0)
|
||||
|
||||
check := &CheckHTTP{
|
||||
Notify: notif,
|
||||
CheckID: types.CheckID("foo"),
|
||||
HTTP: "",
|
||||
Method: "GET",
|
||||
OutputMaxSize: DefaultBufSize,
|
||||
Interval: 10 * time.Millisecond,
|
||||
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||
Logger: logger,
|
||||
ProxyHTTP: proxy.URL,
|
||||
StatusHandler: statusHandler,
|
||||
}
|
||||
|
||||
check.Start()
|
||||
@ -369,15 +392,18 @@ func TestCheckHTTP_NotProxied(t *testing.T) {
|
||||
defer server.Close()
|
||||
|
||||
notif := mock.NewNotify()
|
||||
logger := log.New(ioutil.Discard, uniqueID(), log.LstdFlags)
|
||||
statusHandler := NewStatusHandler(notif, logger, 0, 0)
|
||||
|
||||
check := &CheckHTTP{
|
||||
Notify: notif,
|
||||
CheckID: types.CheckID("foo"),
|
||||
HTTP: server.URL,
|
||||
Method: "GET",
|
||||
OutputMaxSize: DefaultBufSize,
|
||||
Interval: 10 * time.Millisecond,
|
||||
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||
Logger: logger,
|
||||
ProxyHTTP: "",
|
||||
StatusHandler: statusHandler,
|
||||
}
|
||||
check.Start()
|
||||
defer check.Stop()
|
||||
@ -480,15 +506,16 @@ func TestCheckMaxOutputSize(t *testing.T) {
|
||||
defer server.Close()
|
||||
|
||||
notif := mock.NewNotify()
|
||||
logger := log.New(ioutil.Discard, uniqueID(), log.LstdFlags)
|
||||
maxOutputSize := 32
|
||||
check := &CheckHTTP{
|
||||
Notify: notif,
|
||||
CheckID: types.CheckID("bar"),
|
||||
HTTP: server.URL + "/v1/agent/self",
|
||||
Timeout: timeout,
|
||||
Interval: 2 * time.Millisecond,
|
||||
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||
Logger: logger,
|
||||
OutputMaxSize: maxOutputSize,
|
||||
StatusHandler: NewStatusHandler(notif, logger, 0, 0),
|
||||
}
|
||||
|
||||
check.Start()
|
||||
@ -515,13 +542,16 @@ func TestCheckHTTPTimeout(t *testing.T) {
|
||||
defer server.Close()
|
||||
|
||||
notif := mock.NewNotify()
|
||||
logger := log.New(ioutil.Discard, uniqueID(), log.LstdFlags)
|
||||
statusHandler := NewStatusHandler(notif, logger, 0, 0)
|
||||
|
||||
check := &CheckHTTP{
|
||||
Notify: notif,
|
||||
CheckID: types.CheckID("bar"),
|
||||
HTTP: server.URL,
|
||||
Timeout: timeout,
|
||||
Interval: 10 * time.Millisecond,
|
||||
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||
CheckID: types.CheckID("bar"),
|
||||
HTTP: server.URL,
|
||||
Timeout: timeout,
|
||||
Interval: 10 * time.Millisecond,
|
||||
Logger: logger,
|
||||
StatusHandler: statusHandler,
|
||||
}
|
||||
|
||||
check.Start()
|
||||
@ -538,11 +568,14 @@ func TestCheckHTTPTimeout(t *testing.T) {
|
||||
|
||||
func TestCheckHTTP_disablesKeepAlives(t *testing.T) {
|
||||
t.Parallel()
|
||||
notif := mock.NewNotify()
|
||||
logger := log.New(ioutil.Discard, uniqueID(), log.LstdFlags)
|
||||
check := &CheckHTTP{
|
||||
CheckID: types.CheckID("foo"),
|
||||
HTTP: "http://foo.bar/baz",
|
||||
Interval: 10 * time.Second,
|
||||
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||
CheckID: types.CheckID("foo"),
|
||||
HTTP: "http://foo.bar/baz",
|
||||
Interval: 10 * time.Second,
|
||||
Logger: logger,
|
||||
StatusHandler: NewStatusHandler(notif, logger, 0, 0),
|
||||
}
|
||||
|
||||
check.Start()
|
||||
@ -576,13 +609,16 @@ func TestCheckHTTP_TLS_SkipVerify(t *testing.T) {
|
||||
}
|
||||
|
||||
notif := mock.NewNotify()
|
||||
logger := log.New(ioutil.Discard, uniqueID(), log.LstdFlags)
|
||||
statusHandler := NewStatusHandler(notif, logger, 0, 0)
|
||||
|
||||
check := &CheckHTTP{
|
||||
Notify: notif,
|
||||
CheckID: types.CheckID("skipverify_true"),
|
||||
HTTP: server.URL,
|
||||
Interval: 25 * time.Millisecond,
|
||||
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||
Logger: logger,
|
||||
TLSClientConfig: tlsClientConfig,
|
||||
StatusHandler: statusHandler,
|
||||
}
|
||||
|
||||
check.Start()
|
||||
@ -610,13 +646,15 @@ func TestCheckHTTP_TLS_BadVerify(t *testing.T) {
|
||||
}
|
||||
|
||||
notif := mock.NewNotify()
|
||||
logger := log.New(ioutil.Discard, uniqueID(), log.LstdFlags)
|
||||
statusHandler := NewStatusHandler(notif, logger, 0, 0)
|
||||
check := &CheckHTTP{
|
||||
Notify: notif,
|
||||
CheckID: types.CheckID("skipverify_false"),
|
||||
HTTP: server.URL,
|
||||
Interval: 100 * time.Millisecond,
|
||||
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||
Logger: logger,
|
||||
TLSClientConfig: tlsClientConfig,
|
||||
StatusHandler: statusHandler,
|
||||
}
|
||||
|
||||
check.Start()
|
||||
@ -658,12 +696,14 @@ func mockTCPServer(network string) net.Listener {
|
||||
|
||||
func expectTCPStatus(t *testing.T, tcp string, status string) {
|
||||
notif := mock.NewNotify()
|
||||
logger := log.New(ioutil.Discard, uniqueID(), log.LstdFlags)
|
||||
statusHandler := NewStatusHandler(notif, logger, 0, 0)
|
||||
check := &CheckTCP{
|
||||
Notify: notif,
|
||||
CheckID: types.CheckID("foo"),
|
||||
TCP: tcp,
|
||||
Interval: 10 * time.Millisecond,
|
||||
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||
CheckID: types.CheckID("foo"),
|
||||
TCP: tcp,
|
||||
Interval: 10 * time.Millisecond,
|
||||
Logger: logger,
|
||||
StatusHandler: statusHandler,
|
||||
}
|
||||
check.Start()
|
||||
defer check.Stop()
|
||||
@ -677,6 +717,98 @@ func expectTCPStatus(t *testing.T, tcp string, status string) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestStatusHandlerUpdateStatusAfterConsecutiveChecksThresholdIsReached(t *testing.T) {
|
||||
t.Parallel()
|
||||
checkID := types.CheckID("foo")
|
||||
notif := mock.NewNotify()
|
||||
logger := log.New(ioutil.Discard, uniqueID(), log.LstdFlags)
|
||||
statusHandler := NewStatusHandler(notif, logger, 2, 3)
|
||||
|
||||
// Set the initial status to passing after a single success
|
||||
statusHandler.updateCheck(checkID, api.HealthPassing, "bar")
|
||||
|
||||
// Status should become critical after 3 failed checks only
|
||||
statusHandler.updateCheck(checkID, api.HealthCritical, "bar")
|
||||
statusHandler.updateCheck(checkID, api.HealthCritical, "bar")
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Equal(r, 1, notif.Updates("foo"))
|
||||
require.Equal(r, api.HealthPassing, notif.State("foo"))
|
||||
})
|
||||
|
||||
statusHandler.updateCheck(checkID, api.HealthCritical, "bar")
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Equal(r, 2, notif.Updates("foo"))
|
||||
require.Equal(r, api.HealthCritical, notif.State("foo"))
|
||||
})
|
||||
|
||||
// Status should be passing after 2 passing check
|
||||
statusHandler.updateCheck(checkID, api.HealthPassing, "bar")
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Equal(r, 2, notif.Updates("foo"))
|
||||
require.Equal(r, api.HealthCritical, notif.State("foo"))
|
||||
})
|
||||
|
||||
statusHandler.updateCheck(checkID, api.HealthPassing, "bar")
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Equal(r, 3, notif.Updates("foo"))
|
||||
require.Equal(r, api.HealthPassing, notif.State("foo"))
|
||||
})
|
||||
}
|
||||
|
||||
func TestStatusHandlerResetCountersOnNonIdenticalsConsecutiveChecks(t *testing.T) {
|
||||
t.Parallel()
|
||||
checkID := types.CheckID("foo")
|
||||
notif := mock.NewNotify()
|
||||
logger := log.New(ioutil.Discard, uniqueID(), log.LstdFlags)
|
||||
statusHandler := NewStatusHandler(notif, logger, 2, 3)
|
||||
|
||||
// Set the initial status to passing after a single success
|
||||
statusHandler.updateCheck(checkID, api.HealthPassing, "bar")
|
||||
|
||||
// Status should remain passing after FAIL PASS FAIL FAIL sequence
|
||||
// Although we have 3 FAILS, they are not consecutive
|
||||
|
||||
statusHandler.updateCheck(checkID, api.HealthCritical, "bar")
|
||||
statusHandler.updateCheck(checkID, api.HealthPassing, "bar")
|
||||
statusHandler.updateCheck(checkID, api.HealthCritical, "bar")
|
||||
statusHandler.updateCheck(checkID, api.HealthCritical, "bar")
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Equal(r, 1, notif.Updates("foo"))
|
||||
require.Equal(r, api.HealthPassing, notif.State("foo"))
|
||||
})
|
||||
|
||||
// Critical after a 3rd consecutive FAIL
|
||||
statusHandler.updateCheck(checkID, api.HealthCritical, "bar")
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Equal(r, 2, notif.Updates("foo"))
|
||||
require.Equal(r, api.HealthCritical, notif.State("foo"))
|
||||
})
|
||||
|
||||
// Status should remain critical after PASS FAIL PASS sequence
|
||||
statusHandler.updateCheck(checkID, api.HealthPassing, "bar")
|
||||
statusHandler.updateCheck(checkID, api.HealthCritical, "bar")
|
||||
statusHandler.updateCheck(checkID, api.HealthPassing, "bar")
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Equal(r, 2, notif.Updates("foo"))
|
||||
require.Equal(r, api.HealthCritical, notif.State("foo"))
|
||||
})
|
||||
|
||||
// Passing after a 2nd consecutive PASS
|
||||
statusHandler.updateCheck(checkID, api.HealthPassing, "bar")
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Equal(r, 3, notif.Updates("foo"))
|
||||
require.Equal(r, api.HealthPassing, notif.State("foo"))
|
||||
})
|
||||
}
|
||||
|
||||
func TestCheckTCPCritical(t *testing.T) {
|
||||
t.Parallel()
|
||||
var (
|
||||
@ -971,14 +1103,15 @@ func TestCheck_Docker(t *testing.T) {
|
||||
}
|
||||
|
||||
notif, upd := mock.NewNotifyChan()
|
||||
statusHandler := NewStatusHandler(notif, log.New(ioutil.Discard, uniqueID(), log.LstdFlags), 0, 0)
|
||||
id := types.CheckID("chk")
|
||||
check := &CheckDocker{
|
||||
Notify: notif,
|
||||
CheckID: id,
|
||||
ScriptArgs: []string{"/health.sh"},
|
||||
DockerContainerID: "123",
|
||||
Interval: 25 * time.Millisecond,
|
||||
Client: c,
|
||||
StatusHandler: statusHandler,
|
||||
}
|
||||
check.Start()
|
||||
defer check.Stop()
|
||||
|
@ -4,10 +4,6 @@ import (
|
||||
"crypto/tls"
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/hashicorp/consul/agent/mock"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/types"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
@ -15,6 +11,11 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/mock"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/types"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/health"
|
||||
hv1 "google.golang.org/grpc/health/grpc_health_v1"
|
||||
@ -106,13 +107,15 @@ func TestGRPC_Proxied(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
notif := mock.NewNotify()
|
||||
logger := log.New(ioutil.Discard, uniqueID(), log.LstdFlags)
|
||||
statusHandler := NewStatusHandler(notif, logger, 0, 0)
|
||||
check := &CheckGRPC{
|
||||
Notify: notif,
|
||||
CheckID: types.CheckID("foo"),
|
||||
GRPC: "",
|
||||
Interval: 10 * time.Millisecond,
|
||||
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||
ProxyGRPC: server,
|
||||
CheckID: types.CheckID("foo"),
|
||||
GRPC: "",
|
||||
Interval: 10 * time.Millisecond,
|
||||
Logger: logger,
|
||||
ProxyGRPC: server,
|
||||
StatusHandler: statusHandler,
|
||||
}
|
||||
check.Start()
|
||||
defer check.Stop()
|
||||
@ -132,13 +135,15 @@ func TestGRPC_NotProxied(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
notif := mock.NewNotify()
|
||||
logger := log.New(ioutil.Discard, uniqueID(), log.LstdFlags)
|
||||
statusHandler := NewStatusHandler(notif, logger, 0, 0)
|
||||
check := &CheckGRPC{
|
||||
Notify: notif,
|
||||
CheckID: types.CheckID("foo"),
|
||||
GRPC: server,
|
||||
Interval: 10 * time.Millisecond,
|
||||
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||
ProxyGRPC: "",
|
||||
CheckID: types.CheckID("foo"),
|
||||
GRPC: server,
|
||||
Interval: 10 * time.Millisecond,
|
||||
Logger: logger,
|
||||
ProxyGRPC: "",
|
||||
StatusHandler: statusHandler,
|
||||
}
|
||||
check.Start()
|
||||
defer check.Stop()
|
||||
|
@ -1211,6 +1211,8 @@ func (b *Builder) checkVal(v *CheckDefinition) *structs.CheckDefinition {
|
||||
AliasService: b.stringVal(v.AliasService),
|
||||
Timeout: b.durationVal(fmt.Sprintf("check[%s].timeout", id), v.Timeout),
|
||||
TTL: b.durationVal(fmt.Sprintf("check[%s].ttl", id), v.TTL),
|
||||
SuccessBeforePassing: b.intVal(v.SuccessBeforePassing),
|
||||
FailuresBeforeCritical: b.intVal(v.FailuresBeforeCritical),
|
||||
DeregisterCriticalServiceAfter: b.durationVal(fmt.Sprintf("check[%s].deregister_critical_service_after", id), v.DeregisterCriticalServiceAfter),
|
||||
OutputMaxSize: b.intValWithDefault(v.OutputMaxSize, checks.DefaultBufSize),
|
||||
}
|
||||
|
@ -419,6 +419,8 @@ type CheckDefinition struct {
|
||||
AliasService *string `json:"alias_service,omitempty" hcl:"alias_service" mapstructure:"alias_service"`
|
||||
Timeout *string `json:"timeout,omitempty" hcl:"timeout" mapstructure:"timeout"`
|
||||
TTL *string `json:"ttl,omitempty" hcl:"ttl" mapstructure:"ttl"`
|
||||
SuccessBeforePassing *int `json:"success_before_passing,omitempty" hcl:"success_before_passing" mapstructure:"success_before_passing"`
|
||||
FailuresBeforeCritical *int `json:"failures_before_critical,omitempty" hcl:"failures_before_critical" mapstructure:"failures_before_critical"`
|
||||
DeregisterCriticalServiceAfter *string `json:"deregister_critical_service_after,omitempty" hcl:"deregister_critical_service_after" mapstructure:"deregister_critical_service_after"`
|
||||
}
|
||||
|
||||
|
@ -5721,6 +5721,8 @@ func TestSanitize(t *testing.T) {
|
||||
"AliasService": "",
|
||||
"DeregisterCriticalServiceAfter": "0s",
|
||||
"DockerContainerID": "",
|
||||
"SuccessBeforePassing": 0,
|
||||
"FailuresBeforeCritical": 0,
|
||||
"GRPC": "",
|
||||
"GRPCUseTLS": false,
|
||||
"HTTP": "",
|
||||
@ -5893,6 +5895,8 @@ func TestSanitize(t *testing.T) {
|
||||
"CheckID": "",
|
||||
"DeregisterCriticalServiceAfter": "0s",
|
||||
"DockerContainerID": "",
|
||||
"SuccessBeforePassing": 0,
|
||||
"FailuresBeforeCritical": 0,
|
||||
"GRPC": "",
|
||||
"GRPCUseTLS": false,
|
||||
"HTTP": "",
|
||||
|
@ -36,6 +36,8 @@ type CheckDefinition struct {
|
||||
AliasService string
|
||||
Timeout time.Duration
|
||||
TTL time.Duration
|
||||
SuccessBeforePassing int
|
||||
FailuresBeforeCritical int
|
||||
DeregisterCriticalServiceAfter time.Duration
|
||||
OutputMaxSize int
|
||||
}
|
||||
@ -81,6 +83,8 @@ func (c *CheckDefinition) CheckType() *CheckType {
|
||||
TLSSkipVerify: c.TLSSkipVerify,
|
||||
Timeout: c.Timeout,
|
||||
TTL: c.TTL,
|
||||
SuccessBeforePassing: c.SuccessBeforePassing,
|
||||
FailuresBeforeCritical: c.FailuresBeforeCritical,
|
||||
DeregisterCriticalServiceAfter: c.DeregisterCriticalServiceAfter,
|
||||
}
|
||||
}
|
||||
|
@ -27,21 +27,23 @@ type CheckType struct {
|
||||
// fields copied to CheckDefinition
|
||||
// Update CheckDefinition when adding fields here
|
||||
|
||||
ScriptArgs []string
|
||||
HTTP string
|
||||
Header map[string][]string
|
||||
Method string
|
||||
TCP string
|
||||
Interval time.Duration
|
||||
AliasNode string
|
||||
AliasService string
|
||||
DockerContainerID string
|
||||
Shell string
|
||||
GRPC string
|
||||
GRPCUseTLS bool
|
||||
TLSSkipVerify bool
|
||||
Timeout time.Duration
|
||||
TTL time.Duration
|
||||
ScriptArgs []string
|
||||
HTTP string
|
||||
Header map[string][]string
|
||||
Method string
|
||||
TCP string
|
||||
Interval time.Duration
|
||||
AliasNode string
|
||||
AliasService string
|
||||
DockerContainerID string
|
||||
Shell string
|
||||
GRPC string
|
||||
GRPCUseTLS bool
|
||||
TLSSkipVerify bool
|
||||
Timeout time.Duration
|
||||
TTL time.Duration
|
||||
SuccessBeforePassing int
|
||||
FailuresBeforeCritical int
|
||||
|
||||
// Definition fields used when exposing checks through a proxy
|
||||
ProxyHTTP string
|
||||
|
@ -367,3 +367,21 @@ key in your configuration file.
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
## Success/Failures before passing/critical
|
||||
|
||||
A check may be set to become passing/critical only if a specified number of consecutive
|
||||
checks return passing/critical. Status will stay identical as before until
|
||||
the threshold is reached.
|
||||
This feature is available for HTTP, TCP, gRPC, Docker & Monitor checks.
|
||||
By default, both passing and critical thresholds will be set to 0 so the check status will always reflect the last check result.
|
||||
|
||||
```javascript
|
||||
{
|
||||
"checks": {
|
||||
...
|
||||
"success_before_passing" : 3
|
||||
"failures_before_critical" : 3
|
||||
},
|
||||
}
|
||||
```
|
Loading…
x
Reference in New Issue
Block a user