From 8f145559d88fae16b74c2256c7e9387b3d2c22cc Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Wed, 25 Oct 2017 11:18:07 +0200 Subject: [PATCH] Decouple the code that executes checks from the agent --- agent/agent.go | 65 +- agent/agent_endpoint.go | 5 +- agent/agent_endpoint_test.go | 5 +- agent/agent_test.go | 11 +- agent/check.go | 640 ----------------- agent/checks/check.go | 646 ++++++++++++++++++ agent/{ => checks}/check_test.go | 45 +- agent/{ => checks}/docker.go | 6 +- agent/{ => checks}/docker_unix.go | 2 +- agent/{ => checks}/docker_windows.go | 2 +- agent/exec/exec.go | 14 + agent/{util_other.go => exec/exec_unix.go} | 7 +- .../{util_windows.go => exec/exec_windows.go} | 6 +- agent/remote_exec.go | 11 +- agent/testagent.go | 11 +- agent/unique/id.go | 19 + agent/util.go | 9 - agent/watch_handler.go | 9 +- command/lock/lock.go | 9 +- command/watch/watch.go | 9 +- 20 files changed, 783 insertions(+), 748 deletions(-) create mode 100644 agent/checks/check.go rename agent/{ => checks}/check_test.go (94%) rename agent/{ => checks}/docker.go (98%) rename agent/{ => checks}/docker_unix.go (83%) rename agent/{ => checks}/docker_windows.go (80%) create mode 100644 agent/exec/exec.go rename agent/{util_other.go => exec/exec_unix.go} (77%) rename agent/{util_windows.go => exec/exec_windows.go} (76%) create mode 100644 agent/unique/id.go diff --git a/agent/agent.go b/agent/agent.go index f40a027448..54628bca13 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -21,6 +21,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/ae" + "github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/local" @@ -120,25 +121,25 @@ type Agent struct { checkReapAfter map[types.CheckID]time.Duration // checkMonitors maps the check ID to an associated monitor - checkMonitors map[types.CheckID]*CheckMonitor + checkMonitors map[types.CheckID]*checks.CheckMonitor // checkHTTPs maps the check ID to an associated HTTP check - checkHTTPs map[types.CheckID]*CheckHTTP + checkHTTPs map[types.CheckID]*checks.CheckHTTP // checkTCPs maps the check ID to an associated TCP check - checkTCPs map[types.CheckID]*CheckTCP + checkTCPs map[types.CheckID]*checks.CheckTCP // checkTTLs maps the check ID to an associated check TTL - checkTTLs map[types.CheckID]*CheckTTL + checkTTLs map[types.CheckID]*checks.CheckTTL // checkDockers maps the check ID to an associated Docker Exec based check - checkDockers map[types.CheckID]*CheckDocker + checkDockers map[types.CheckID]*checks.CheckDocker // checkLock protects updates to the check* maps checkLock sync.Mutex // dockerClient is the client for performing docker health checks. - dockerClient *DockerClient + dockerClient *checks.DockerClient // eventCh is used to receive user events eventCh chan serf.UserEvent @@ -206,11 +207,11 @@ func New(c *config.RuntimeConfig) (*Agent, error) { config: c, acls: acls, checkReapAfter: make(map[types.CheckID]time.Duration), - checkMonitors: make(map[types.CheckID]*CheckMonitor), - checkTTLs: make(map[types.CheckID]*CheckTTL), - checkHTTPs: make(map[types.CheckID]*CheckHTTP), - checkTCPs: make(map[types.CheckID]*CheckTCP), - checkDockers: make(map[types.CheckID]*CheckDocker), + checkMonitors: make(map[types.CheckID]*checks.CheckMonitor), + checkTTLs: make(map[types.CheckID]*checks.CheckTTL), + checkHTTPs: make(map[types.CheckID]*checks.CheckHTTP), + checkTCPs: make(map[types.CheckID]*checks.CheckTCP), + checkDockers: make(map[types.CheckID]*checks.CheckDocker), eventCh: make(chan serf.UserEvent, 1024), eventBuf: make([]*UserEvent, 256), joinLANNotifier: &systemd.Notifier{}, @@ -1675,7 +1676,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, delete(a.checkTTLs, check.CheckID) } - ttl := &CheckTTL{ + ttl := &checks.CheckTTL{ Notify: a.State, CheckID: check.CheckID, TTL: chkType.TTL, @@ -1696,13 +1697,13 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, existing.Stop() delete(a.checkHTTPs, check.CheckID) } - if chkType.Interval < MinInterval { + if chkType.Interval < checks.MinInterval { a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v", - check.CheckID, MinInterval)) - chkType.Interval = MinInterval + check.CheckID, checks.MinInterval)) + chkType.Interval = checks.MinInterval } - http := &CheckHTTP{ + http := &checks.CheckHTTP{ Notify: a.State, CheckID: check.CheckID, HTTP: chkType.HTTP, @@ -1721,13 +1722,13 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, existing.Stop() delete(a.checkTCPs, check.CheckID) } - if chkType.Interval < MinInterval { + if chkType.Interval < checks.MinInterval { a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v", - check.CheckID, MinInterval)) - chkType.Interval = MinInterval + check.CheckID, checks.MinInterval)) + chkType.Interval = checks.MinInterval } - tcp := &CheckTCP{ + tcp := &checks.CheckTCP{ Notify: a.State, CheckID: check.CheckID, TCP: chkType.TCP, @@ -1743,10 +1744,10 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, existing.Stop() delete(a.checkDockers, check.CheckID) } - if chkType.Interval < MinInterval { + if chkType.Interval < checks.MinInterval { a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v", - check.CheckID, MinInterval)) - chkType.Interval = MinInterval + check.CheckID, checks.MinInterval)) + chkType.Interval = checks.MinInterval } if chkType.Script != "" { a.logger.Printf("[WARN] agent: check %q has the 'script' field, which has been deprecated "+ @@ -1755,16 +1756,16 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, } if a.dockerClient == nil { - dc, err := NewDockerClient(os.Getenv("DOCKER_HOST"), CheckBufSize) + dc, err := checks.NewDockerClient(os.Getenv("DOCKER_HOST"), checks.BufSize) if err != nil { a.logger.Printf("[ERR] agent: error creating docker client: %s", err) return err } - a.logger.Printf("[DEBUG] agent: created docker client for %s", dc.host) + a.logger.Printf("[DEBUG] agent: created docker client for %s", dc.Host()) a.dockerClient = dc } - dockerCheck := &CheckDocker{ + dockerCheck := &checks.CheckDocker{ Notify: a.State, CheckID: check.CheckID, DockerContainerID: chkType.DockerContainerID, @@ -1773,7 +1774,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, ScriptArgs: chkType.ScriptArgs, Interval: chkType.Interval, Logger: a.logger, - client: a.dockerClient, + Client: a.dockerClient, } dockerCheck.Start() a.checkDockers[check.CheckID] = dockerCheck @@ -1783,10 +1784,10 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, existing.Stop() delete(a.checkMonitors, check.CheckID) } - if chkType.Interval < MinInterval { + if chkType.Interval < checks.MinInterval { a.logger.Printf("[WARN] agent: check '%s' has interval below minimum of %v", - check.CheckID, MinInterval) - chkType.Interval = MinInterval + check.CheckID, checks.MinInterval) + chkType.Interval = checks.MinInterval } if chkType.Script != "" { a.logger.Printf("[WARN] agent: check %q has the 'script' field, which has been deprecated "+ @@ -1794,7 +1795,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, check.CheckID) } - monitor := &CheckMonitor{ + monitor := &checks.CheckMonitor{ Notify: a.State, CheckID: check.CheckID, Script: chkType.Script, @@ -1922,7 +1923,7 @@ func (a *Agent) updateTTLCheck(checkID types.CheckID, status, output string) err // persistCheckState is used to record the check status into the data dir. // This allows the state to be restored on a later agent start. Currently // only useful for TTL based checks. -func (a *Agent) persistCheckState(check *CheckTTL, status, output string) error { +func (a *Agent) persistCheckState(check *checks.CheckTTL, status, output string) error { // Create the persisted state state := persistedCheckState{ CheckID: check.CheckID, diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 6f105b8b0e..bf74827234 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" @@ -490,9 +491,9 @@ func (s *HTTPServer) AgentCheckUpdate(resp http.ResponseWriter, req *http.Reques } total := len(update.Output) - if total > CheckBufSize { + if total > checks.BufSize { update.Output = fmt.Sprintf("%s ... (captured %d of %d bytes)", - update.Output[:CheckBufSize], CheckBufSize, total) + update.Output[:checks.BufSize], checks.BufSize, total) } checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/update/")) diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index 55fd2536ce..fbb418bdea 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -14,6 +14,7 @@ import ( "time" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" @@ -1127,7 +1128,7 @@ func TestAgent_UpdateCheck(t *testing.T) { t.Run("log output limit", func(t *testing.T) { args := checkUpdate{ Status: api.HealthPassing, - Output: strings.Repeat("-= bad -=", 5*CheckBufSize), + Output: strings.Repeat("-= bad -=", 5*checks.BufSize), } req, _ := http.NewRequest("PUT", "/v1/agent/check/update/test", jsonReader(args)) resp := httptest.NewRecorder() @@ -1146,7 +1147,7 @@ func TestAgent_UpdateCheck(t *testing.T) { // rough check that the output buffer was cut down so this test // isn't super brittle. state := a.State.Checks()["test"] - if state.Status != api.HealthPassing || len(state.Output) > 2*CheckBufSize { + if state.Status != api.HealthPassing || len(state.Output) > 2*checks.BufSize { t.Fatalf("bad: %v", state) } }) diff --git a/agent/agent_test.go b/agent/agent_test.go index a883caf703..84eec1057b 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -13,6 +13,7 @@ import ( "testing" "time" + "github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/testutil" @@ -646,7 +647,7 @@ func TestAgent_AddCheck_MinInterval(t *testing.T) { // Ensure a TTL is setup if mon, ok := a.checkMonitors["mem"]; !ok { t.Fatalf("missing mem monitor") - } else if mon.Interval != MinInterval { + } else if mon.Interval != checks.MinInterval { t.Fatalf("bad mem monitor interval") } } @@ -680,7 +681,7 @@ func TestAgent_AddCheck_RestoreState(t *testing.T) { defer a.Shutdown() // Create some state and persist it - ttl := &CheckTTL{ + ttl := &checks.CheckTTL{ CheckID: "baz", TTL: time.Minute, } @@ -1764,7 +1765,7 @@ func TestAgent_persistCheckState(t *testing.T) { defer a.Shutdown() // Create the TTL check to persist - check := &CheckTTL{ + check := &checks.CheckTTL{ CheckID: "check1", TTL: 10 * time.Minute, } @@ -1811,7 +1812,7 @@ func TestAgent_loadCheckState(t *testing.T) { defer a.Shutdown() // Create a check whose state will expire immediately - check := &CheckTTL{ + check := &checks.CheckTTL{ CheckID: "check1", TTL: 0, } @@ -1877,7 +1878,7 @@ func TestAgent_purgeCheckState(t *testing.T) { } // Persist some state to the data dir - check := &CheckTTL{ + check := &checks.CheckTTL{ CheckID: "check1", TTL: time.Minute, } diff --git a/agent/check.go b/agent/check.go index 440fc36aef..0a3ce6b48d 100644 --- a/agent/check.go +++ b/agent/check.go @@ -1,290 +1,10 @@ package agent import ( - "crypto/tls" - "fmt" - "io" - "io/ioutil" - "log" - "net" - "net/http" - "os" - "os/exec" - "sync" - "syscall" - "time" - - "github.com/armon/circbuf" "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/types" - "github.com/hashicorp/go-cleanhttp" ) -const ( - // MinInterval is the minimal interval between - // two checks. Do not allow for a interval below this value. - // Otherwise we risk fork bombing a system. - MinInterval = time.Second - - // CheckBufSize is the maximum size of the captured - // check output. Prevents an enormous buffer - // from being captured - CheckBufSize = 4 * 1024 // 4KB - - // UserAgent is the value of the User-Agent header - // for HTTP health checks. - UserAgent = "Consul Health Check" -) - -// CheckNotifier interface is used by the CheckMonitor -// to notify when a check has a status update. The update -// should take care to be idempotent. -type CheckNotifier interface { - UpdateCheck(checkID types.CheckID, status, output string) -} - -// CheckMonitor is used to periodically invoke a script to -// determine the health of a given check. It is compatible with -// nagios plugins and expects the output in the same format. -type CheckMonitor struct { - Notify CheckNotifier - CheckID types.CheckID - Script string - ScriptArgs []string - Interval time.Duration - Timeout time.Duration - Logger *log.Logger - - stop bool - stopCh chan struct{} - stopLock sync.Mutex -} - -// Start is used to start a check monitor. -// Monitor runs until stop is called -func (c *CheckMonitor) Start() { - c.stopLock.Lock() - defer c.stopLock.Unlock() - c.stop = false - c.stopCh = make(chan struct{}) - go c.run() -} - -// Stop is used to stop a check monitor. -func (c *CheckMonitor) Stop() { - c.stopLock.Lock() - defer c.stopLock.Unlock() - if !c.stop { - c.stop = true - close(c.stopCh) - } -} - -// run is invoked by a goroutine to run until Stop() is called -func (c *CheckMonitor) run() { - // Get the randomized initial pause time - initialPauseTime := lib.RandomStagger(c.Interval) - next := time.After(initialPauseTime) - for { - select { - case <-next: - c.check() - next = time.After(c.Interval) - case <-c.stopCh: - return - } - } -} - -// check is invoked periodically to perform the script check -func (c *CheckMonitor) check() { - // Create the command - var cmd *exec.Cmd - var err error - var cmdDisplay string - if len(c.ScriptArgs) > 0 { - cmdDisplay = fmt.Sprintf("%v", c.ScriptArgs) - cmd, err = ExecSubprocess(c.ScriptArgs) - } else { - cmdDisplay = c.Script - cmd, err = ExecScript(c.Script) - } - if err != nil { - c.Logger.Printf("[ERR] agent: failed to setup invoke '%s': %s", cmdDisplay, err) - c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error()) - return - } - - // Collect the output - output, _ := circbuf.NewBuffer(CheckBufSize) - cmd.Stdout = output - cmd.Stderr = output - SetSysProcAttr(cmd) - - truncateAndLogOutput := func() string { - outputStr := string(output.Bytes()) - if output.TotalWritten() > output.Size() { - outputStr = fmt.Sprintf("Captured %d of %d bytes\n...\n%s", - output.Size(), output.TotalWritten(), outputStr) - } - c.Logger.Printf("[DEBUG] agent: Check '%s' script '%s' output: %s", - c.CheckID, cmdDisplay, outputStr) - return outputStr - } - - // Start the check - if err := cmd.Start(); err != nil { - c.Logger.Printf("[ERR] agent: failed to invoke '%s': %s", cmdDisplay, err) - c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error()) - return - } - - // Wait for the check to complete - waitCh := make(chan error, 1) - go func() { - waitCh <- cmd.Wait() - }() - - timeout := 30 * time.Second - if c.Timeout > 0 { - timeout = c.Timeout - } - select { - case <-time.After(timeout): - if err := KillCommandSubtree(cmd); err != nil { - c.Logger.Printf("[WARN] Failed to kill check '%s' after timeout: %v", cmdDisplay, err) - } - - msg := fmt.Sprintf("Timed out (%s) running check", timeout.String()) - c.Logger.Printf("[WARN] %s '%s'", msg, cmdDisplay) - - outputStr := truncateAndLogOutput() - if len(outputStr) > 0 { - msg += "\n\n" + outputStr - } - c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, msg) - - // Now wait for the process to exit so we never start another - // instance concurrently. - <-waitCh - return - - case err = <-waitCh: - // The process returned before the timeout, proceed normally - } - - // Check if the check passed - outputStr := truncateAndLogOutput() - if err == nil { - c.Logger.Printf("[DEBUG] agent: Check '%v' is passing", c.CheckID) - c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, outputStr) - return - } - - // If the exit code is 1, set check as warning - exitErr, ok := err.(*exec.ExitError) - if ok { - if status, ok := exitErr.Sys().(syscall.WaitStatus); ok { - code := status.ExitStatus() - if code == 1 { - c.Logger.Printf("[WARN] agent: Check '%v' is now warning", c.CheckID) - c.Notify.UpdateCheck(c.CheckID, api.HealthWarning, outputStr) - return - } - } - } - - // Set the health as critical - c.Logger.Printf("[WARN] agent: Check '%v' is now critical", c.CheckID) - c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, outputStr) -} - -// CheckTTL is used to apply a TTL to check status, -// and enables clients to set the status of a check -// but upon the TTL expiring, the check status is -// automatically set to critical. -type CheckTTL struct { - Notify CheckNotifier - CheckID types.CheckID - TTL time.Duration - Logger *log.Logger - - timer *time.Timer - - lastOutput string - lastOutputLock sync.RWMutex - - stop bool - stopCh chan struct{} - stopLock sync.Mutex -} - -// Start is used to start a check ttl, runs until Stop() -func (c *CheckTTL) Start() { - c.stopLock.Lock() - defer c.stopLock.Unlock() - c.stop = false - c.stopCh = make(chan struct{}) - c.timer = time.NewTimer(c.TTL) - go c.run() -} - -// Stop is used to stop a check ttl. -func (c *CheckTTL) Stop() { - c.stopLock.Lock() - defer c.stopLock.Unlock() - if !c.stop { - c.timer.Stop() - c.stop = true - close(c.stopCh) - } -} - -// run is used to handle TTL expiration and to update the check status -func (c *CheckTTL) run() { - for { - select { - case <-c.timer.C: - c.Logger.Printf("[WARN] agent: Check '%v' missed TTL, is now critical", - c.CheckID) - c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, c.getExpiredOutput()) - - case <-c.stopCh: - return - } - } -} - -// getExpiredOutput formats the output for the case when the TTL is expired. -func (c *CheckTTL) getExpiredOutput() string { - c.lastOutputLock.RLock() - defer c.lastOutputLock.RUnlock() - - const prefix = "TTL expired" - if c.lastOutput == "" { - return prefix - } - - return fmt.Sprintf("%s (last output before timeout follows): %s", prefix, c.lastOutput) -} - -// SetStatus is used to update the status of the check, -// and to renew the TTL. If expired, TTL is restarted. -func (c *CheckTTL) SetStatus(status, output string) { - c.Logger.Printf("[DEBUG] agent: Check '%v' status is now %v", - c.CheckID, status) - c.Notify.UpdateCheck(c.CheckID, status, output) - - // Store the last output so we can retain it if the TTL expires. - c.lastOutputLock.Lock() - c.lastOutput = output - c.lastOutputLock.Unlock() - - c.timer.Reset(c.TTL) -} - // persistedCheck is used to serialize a check and write it to disk // so that it may be restored later on. type persistedCheck struct { @@ -303,363 +23,3 @@ type persistedCheckState struct { Status string Expires int64 } - -// CheckHTTP is used to periodically make an HTTP request to -// determine the health of a given check. -// The check is passing if the response code is 2XX. -// The check is warning if the response code is 429. -// The check is critical if the response code is anything else -// or if the request returns an error -type CheckHTTP struct { - Notify CheckNotifier - CheckID types.CheckID - HTTP string - Header map[string][]string - Method string - Interval time.Duration - Timeout time.Duration - Logger *log.Logger - TLSSkipVerify bool - - httpClient *http.Client - stop bool - stopCh chan struct{} - stopLock sync.Mutex -} - -// Start is used to start an HTTP check. -// The check runs until stop is called -func (c *CheckHTTP) Start() { - c.stopLock.Lock() - defer c.stopLock.Unlock() - - if c.httpClient == nil { - // Create the transport. We disable HTTP Keep-Alive's to prevent - // failing checks due to the keepalive interval. - trans := cleanhttp.DefaultTransport() - trans.DisableKeepAlives = true - - // Skip SSL certificate verification if TLSSkipVerify is true - if trans.TLSClientConfig == nil { - trans.TLSClientConfig = &tls.Config{ - InsecureSkipVerify: c.TLSSkipVerify, - } - } else { - trans.TLSClientConfig.InsecureSkipVerify = c.TLSSkipVerify - } - - // Create the HTTP client. - c.httpClient = &http.Client{ - Timeout: 10 * time.Second, - Transport: trans, - } - - // For long (>10s) interval checks the http timeout is 10s, otherwise the - // timeout is the interval. This means that a check *should* return - // before the next check begins. - if c.Timeout > 0 && c.Timeout < c.Interval { - c.httpClient.Timeout = c.Timeout - } else if c.Interval < 10*time.Second { - c.httpClient.Timeout = c.Interval - } - } - - c.stop = false - c.stopCh = make(chan struct{}) - go c.run() -} - -// Stop is used to stop an HTTP check. -func (c *CheckHTTP) Stop() { - c.stopLock.Lock() - defer c.stopLock.Unlock() - if !c.stop { - c.stop = true - close(c.stopCh) - } -} - -// run is invoked by a goroutine to run until Stop() is called -func (c *CheckHTTP) run() { - // Get the randomized initial pause time - initialPauseTime := lib.RandomStagger(c.Interval) - c.Logger.Printf("[DEBUG] agent: pausing %v before first HTTP request of %s", initialPauseTime, c.HTTP) - next := time.After(initialPauseTime) - for { - select { - case <-next: - c.check() - next = time.After(c.Interval) - case <-c.stopCh: - return - } - } -} - -// check is invoked periodically to perform the HTTP check -func (c *CheckHTTP) check() { - method := c.Method - if method == "" { - method = "GET" - } - - req, err := http.NewRequest(method, c.HTTP, nil) - if err != nil { - c.Logger.Printf("[WARN] agent: http request failed '%s': %s", c.HTTP, err) - c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error()) - return - } - - req.Header = http.Header(c.Header) - - // this happens during testing but not in prod - if req.Header == nil { - req.Header = make(http.Header) - } - - if host := req.Header.Get("Host"); host != "" { - req.Host = host - } - - if req.Header.Get("User-Agent") == "" { - req.Header.Set("User-Agent", UserAgent) - } - if req.Header.Get("Accept") == "" { - req.Header.Set("Accept", "text/plain, text/*, */*") - } - - resp, err := c.httpClient.Do(req) - if err != nil { - c.Logger.Printf("[WARN] agent: http request failed '%s': %s", c.HTTP, err) - c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error()) - return - } - defer resp.Body.Close() - - // Read the response into a circular buffer to limit the size - output, _ := circbuf.NewBuffer(CheckBufSize) - if _, err := io.Copy(output, resp.Body); err != nil { - c.Logger.Printf("[WARN] agent: Check '%v': Get error while reading body: %s", c.CheckID, err) - } - - // Format the response body - result := fmt.Sprintf("HTTP GET %s: %s Output: %s", c.HTTP, resp.Status, output.String()) - - if resp.StatusCode >= 200 && resp.StatusCode <= 299 { - // PASSING (2xx) - c.Logger.Printf("[DEBUG] agent: Check '%v' is passing", c.CheckID) - c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, result) - - } else if resp.StatusCode == 429 { - // WARNING - // 429 Too Many Requests (RFC 6585) - // The user has sent too many requests in a given amount of time. - c.Logger.Printf("[WARN] agent: Check '%v' is now warning", c.CheckID) - c.Notify.UpdateCheck(c.CheckID, api.HealthWarning, result) - - } else { - // CRITICAL - c.Logger.Printf("[WARN] agent: Check '%v' is now critical", c.CheckID) - c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, result) - } -} - -// CheckTCP is used to periodically make an TCP/UDP connection to -// determine the health of a given check. -// The check is passing if the connection succeeds -// The check is critical if the connection returns an error -type CheckTCP struct { - Notify CheckNotifier - CheckID types.CheckID - TCP string - Interval time.Duration - Timeout time.Duration - Logger *log.Logger - - dialer *net.Dialer - stop bool - stopCh chan struct{} - stopLock sync.Mutex -} - -// Start is used to start a TCP check. -// The check runs until stop is called -func (c *CheckTCP) Start() { - c.stopLock.Lock() - defer c.stopLock.Unlock() - - if c.dialer == nil { - // Create the socket dialer - c.dialer = &net.Dialer{DualStack: true} - - // For long (>10s) interval checks the socket timeout is 10s, otherwise - // the timeout is the interval. This means that a check *should* return - // before the next check begins. - if c.Timeout > 0 && c.Timeout < c.Interval { - c.dialer.Timeout = c.Timeout - } else if c.Interval < 10*time.Second { - c.dialer.Timeout = c.Interval - } - } - - c.stop = false - c.stopCh = make(chan struct{}) - go c.run() -} - -// Stop is used to stop a TCP check. -func (c *CheckTCP) Stop() { - c.stopLock.Lock() - defer c.stopLock.Unlock() - if !c.stop { - c.stop = true - close(c.stopCh) - } -} - -// run is invoked by a goroutine to run until Stop() is called -func (c *CheckTCP) run() { - // Get the randomized initial pause time - initialPauseTime := lib.RandomStagger(c.Interval) - c.Logger.Printf("[DEBUG] agent: pausing %v before first socket connection of %s", initialPauseTime, c.TCP) - next := time.After(initialPauseTime) - for { - select { - case <-next: - c.check() - next = time.After(c.Interval) - case <-c.stopCh: - return - } - } -} - -// check is invoked periodically to perform the TCP check -func (c *CheckTCP) check() { - conn, err := c.dialer.Dial(`tcp`, c.TCP) - if err != nil { - c.Logger.Printf("[WARN] agent: socket connection failed '%s': %s", c.TCP, err) - c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error()) - return - } - conn.Close() - c.Logger.Printf("[DEBUG] agent: Check '%v' is passing", c.CheckID) - c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("TCP connect %s: Success", c.TCP)) -} - -// CheckDocker is used to periodically invoke a script to -// determine the health of an application running inside a -// Docker Container. We assume that the script is compatible -// with nagios plugins and expects the output in the same format. -type CheckDocker struct { - Notify CheckNotifier - CheckID types.CheckID - Script string - ScriptArgs []string - DockerContainerID string - Shell string - Interval time.Duration - Logger *log.Logger - - client *DockerClient - stop chan struct{} -} - -func (c *CheckDocker) Start() { - if c.stop != nil { - panic("Docker check already started") - } - - if c.Logger == nil { - c.Logger = log.New(ioutil.Discard, "", 0) - } - - if c.Shell == "" { - c.Shell = os.Getenv("SHELL") - if c.Shell == "" { - c.Shell = "/bin/sh" - } - } - c.stop = make(chan struct{}) - go c.run() -} - -func (c *CheckDocker) Stop() { - if c.stop == nil { - panic("Stop called before start") - } - close(c.stop) -} - -func (c *CheckDocker) run() { - firstWait := lib.RandomStagger(c.Interval) - next := time.After(firstWait) - for { - select { - case <-next: - c.check() - next = time.After(c.Interval) - case <-c.stop: - return - } - } -} - -func (c *CheckDocker) check() { - var out string - status, b, err := c.doCheck() - if err != nil { - c.Logger.Printf("[DEBUG] agent: Check '%s': %s", c.CheckID, err) - out = err.Error() - } else { - // out is already limited to CheckBufSize since we're getting a - // limited buffer. So we don't need to truncate it just report - // that it was truncated. - out = string(b.Bytes()) - if int(b.TotalWritten()) > len(out) { - out = fmt.Sprintf("Captured %d of %d bytes\n...\n%s", len(out), b.TotalWritten(), out) - } - c.Logger.Printf("[DEBUG] agent: Check '%s' script '%s' output: %s", c.CheckID, c.Script, out) - } - - if status == api.HealthCritical { - c.Logger.Printf("[WARN] agent: Check '%v' is now critical", c.CheckID) - } - - c.Notify.UpdateCheck(c.CheckID, status, out) -} - -func (c *CheckDocker) doCheck() (string, *circbuf.Buffer, error) { - var cmd []string - if len(c.ScriptArgs) > 0 { - cmd = c.ScriptArgs - } else { - cmd = []string{c.Shell, "-c", c.Script} - } - - execID, err := c.client.CreateExec(c.DockerContainerID, cmd) - if err != nil { - return api.HealthCritical, nil, err - } - - buf, err := c.client.StartExec(c.DockerContainerID, execID) - if err != nil { - return api.HealthCritical, nil, err - } - - exitCode, err := c.client.InspectExec(c.DockerContainerID, execID) - if err != nil { - return api.HealthCritical, nil, err - } - - switch exitCode { - case 0: - return api.HealthPassing, buf, nil - case 1: - c.Logger.Printf("[DEBUG] Check failed with exit code: %d", exitCode) - return api.HealthWarning, buf, nil - default: - c.Logger.Printf("[DEBUG] Check failed with exit code: %d", exitCode) - return api.HealthCritical, buf, nil - } -} diff --git a/agent/checks/check.go b/agent/checks/check.go new file mode 100644 index 0000000000..446521021a --- /dev/null +++ b/agent/checks/check.go @@ -0,0 +1,646 @@ +package checks + +import ( + "crypto/tls" + "fmt" + "io" + "io/ioutil" + "log" + "net" + "net/http" + "os" + osexec "os/exec" + "sync" + "syscall" + "time" + + "github.com/armon/circbuf" + "github.com/hashicorp/consul/agent/exec" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/lib" + "github.com/hashicorp/consul/types" + "github.com/hashicorp/go-cleanhttp" +) + +const ( + // MinInterval is the minimal interval between + // two checks. Do not allow for a interval below this value. + // Otherwise we risk fork bombing a system. + MinInterval = time.Second + + // BufSize is the maximum size of the captured + // check output. Prevents an enormous buffer + // from being captured + BufSize = 4 * 1024 // 4KB + + // UserAgent is the value of the User-Agent header + // for HTTP health checks. + UserAgent = "Consul Health Check" +) + +// CheckNotifier interface is used by the CheckMonitor +// to notify when a check has a status update. The update +// should take care to be idempotent. +type CheckNotifier interface { + UpdateCheck(checkID types.CheckID, status, output string) +} + +// CheckMonitor is used to periodically invoke a script to +// determine the health of a given check. It is compatible with +// nagios plugins and expects the output in the same format. +type CheckMonitor struct { + Notify CheckNotifier + CheckID types.CheckID + Script string + ScriptArgs []string + Interval time.Duration + Timeout time.Duration + Logger *log.Logger + + stop bool + stopCh chan struct{} + stopLock sync.Mutex +} + +// Start is used to start a check monitor. +// Monitor runs until stop is called +func (c *CheckMonitor) Start() { + c.stopLock.Lock() + defer c.stopLock.Unlock() + c.stop = false + c.stopCh = make(chan struct{}) + go c.run() +} + +// Stop is used to stop a check monitor. +func (c *CheckMonitor) Stop() { + c.stopLock.Lock() + defer c.stopLock.Unlock() + if !c.stop { + c.stop = true + close(c.stopCh) + } +} + +// run is invoked by a goroutine to run until Stop() is called +func (c *CheckMonitor) run() { + // Get the randomized initial pause time + initialPauseTime := lib.RandomStagger(c.Interval) + next := time.After(initialPauseTime) + for { + select { + case <-next: + c.check() + next = time.After(c.Interval) + case <-c.stopCh: + return + } + } +} + +// check is invoked periodically to perform the script check +func (c *CheckMonitor) check() { + // Create the command + var cmd *osexec.Cmd + var err error + var cmdDisplay string + if len(c.ScriptArgs) > 0 { + cmdDisplay = fmt.Sprintf("%v", c.ScriptArgs) + cmd, err = exec.Subprocess(c.ScriptArgs) + } else { + cmdDisplay = c.Script + cmd, err = exec.Script(c.Script) + } + if err != nil { + c.Logger.Printf("[ERR] agent: failed to setup invoke '%s': %s", cmdDisplay, err) + c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error()) + return + } + + // Collect the output + output, _ := circbuf.NewBuffer(BufSize) + cmd.Stdout = output + cmd.Stderr = output + exec.SetSysProcAttr(cmd) + + truncateAndLogOutput := func() string { + outputStr := string(output.Bytes()) + if output.TotalWritten() > output.Size() { + outputStr = fmt.Sprintf("Captured %d of %d bytes\n...\n%s", + output.Size(), output.TotalWritten(), outputStr) + } + c.Logger.Printf("[DEBUG] agent: Check '%s' script '%s' output: %s", + c.CheckID, cmdDisplay, outputStr) + return outputStr + } + + // Start the check + if err := cmd.Start(); err != nil { + c.Logger.Printf("[ERR] agent: failed to invoke '%s': %s", cmdDisplay, err) + c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error()) + return + } + + // Wait for the check to complete + waitCh := make(chan error, 1) + go func() { + waitCh <- cmd.Wait() + }() + + timeout := 30 * time.Second + if c.Timeout > 0 { + timeout = c.Timeout + } + select { + case <-time.After(timeout): + if err := exec.KillCommandSubtree(cmd); err != nil { + c.Logger.Printf("[WARN] Failed to kill check '%s' after timeout: %v", cmdDisplay, err) + } + + msg := fmt.Sprintf("Timed out (%s) running check", timeout.String()) + c.Logger.Printf("[WARN] %s '%s'", msg, cmdDisplay) + + outputStr := truncateAndLogOutput() + if len(outputStr) > 0 { + msg += "\n\n" + outputStr + } + c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, msg) + + // Now wait for the process to exit so we never start another + // instance concurrently. + <-waitCh + return + + case err = <-waitCh: + // The process returned before the timeout, proceed normally + } + + // Check if the check passed + outputStr := truncateAndLogOutput() + if err == nil { + c.Logger.Printf("[DEBUG] agent: Check '%v' is passing", c.CheckID) + c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, outputStr) + return + } + + // If the exit code is 1, set check as warning + exitErr, ok := err.(*osexec.ExitError) + if ok { + if status, ok := exitErr.Sys().(syscall.WaitStatus); ok { + code := status.ExitStatus() + if code == 1 { + c.Logger.Printf("[WARN] agent: Check '%v' is now warning", c.CheckID) + c.Notify.UpdateCheck(c.CheckID, api.HealthWarning, outputStr) + return + } + } + } + + // Set the health as critical + c.Logger.Printf("[WARN] agent: Check '%v' is now critical", c.CheckID) + c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, outputStr) +} + +// CheckTTL is used to apply a TTL to check status, +// and enables clients to set the status of a check +// but upon the TTL expiring, the check status is +// automatically set to critical. +type CheckTTL struct { + Notify CheckNotifier + CheckID types.CheckID + TTL time.Duration + Logger *log.Logger + + timer *time.Timer + + lastOutput string + lastOutputLock sync.RWMutex + + stop bool + stopCh chan struct{} + stopLock sync.Mutex +} + +// Start is used to start a check ttl, runs until Stop() +func (c *CheckTTL) Start() { + c.stopLock.Lock() + defer c.stopLock.Unlock() + c.stop = false + c.stopCh = make(chan struct{}) + c.timer = time.NewTimer(c.TTL) + go c.run() +} + +// Stop is used to stop a check ttl. +func (c *CheckTTL) Stop() { + c.stopLock.Lock() + defer c.stopLock.Unlock() + if !c.stop { + c.timer.Stop() + c.stop = true + close(c.stopCh) + } +} + +// run is used to handle TTL expiration and to update the check status +func (c *CheckTTL) run() { + for { + select { + case <-c.timer.C: + c.Logger.Printf("[WARN] agent: Check '%v' missed TTL, is now critical", + c.CheckID) + c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, c.getExpiredOutput()) + + case <-c.stopCh: + return + } + } +} + +// getExpiredOutput formats the output for the case when the TTL is expired. +func (c *CheckTTL) getExpiredOutput() string { + c.lastOutputLock.RLock() + defer c.lastOutputLock.RUnlock() + + const prefix = "TTL expired" + if c.lastOutput == "" { + return prefix + } + + return fmt.Sprintf("%s (last output before timeout follows): %s", prefix, c.lastOutput) +} + +// SetStatus is used to update the status of the check, +// and to renew the TTL. If expired, TTL is restarted. +func (c *CheckTTL) SetStatus(status, output string) { + c.Logger.Printf("[DEBUG] agent: Check '%v' status is now %v", + c.CheckID, status) + c.Notify.UpdateCheck(c.CheckID, status, output) + + // Store the last output so we can retain it if the TTL expires. + c.lastOutputLock.Lock() + c.lastOutput = output + c.lastOutputLock.Unlock() + + c.timer.Reset(c.TTL) +} + +// CheckHTTP is used to periodically make an HTTP request to +// determine the health of a given check. +// The check is passing if the response code is 2XX. +// The check is warning if the response code is 429. +// The check is critical if the response code is anything else +// or if the request returns an error +type CheckHTTP struct { + Notify CheckNotifier + CheckID types.CheckID + HTTP string + Header map[string][]string + Method string + Interval time.Duration + Timeout time.Duration + Logger *log.Logger + TLSSkipVerify bool + + httpClient *http.Client + stop bool + stopCh chan struct{} + stopLock sync.Mutex +} + +// Start is used to start an HTTP check. +// The check runs until stop is called +func (c *CheckHTTP) Start() { + c.stopLock.Lock() + defer c.stopLock.Unlock() + + if c.httpClient == nil { + // Create the transport. We disable HTTP Keep-Alive's to prevent + // failing checks due to the keepalive interval. + trans := cleanhttp.DefaultTransport() + trans.DisableKeepAlives = true + + // Skip SSL certificate verification if TLSSkipVerify is true + if trans.TLSClientConfig == nil { + trans.TLSClientConfig = &tls.Config{ + InsecureSkipVerify: c.TLSSkipVerify, + } + } else { + trans.TLSClientConfig.InsecureSkipVerify = c.TLSSkipVerify + } + + // Create the HTTP client. + c.httpClient = &http.Client{ + Timeout: 10 * time.Second, + Transport: trans, + } + + // For long (>10s) interval checks the http timeout is 10s, otherwise the + // timeout is the interval. This means that a check *should* return + // before the next check begins. + if c.Timeout > 0 && c.Timeout < c.Interval { + c.httpClient.Timeout = c.Timeout + } else if c.Interval < 10*time.Second { + c.httpClient.Timeout = c.Interval + } + } + + c.stop = false + c.stopCh = make(chan struct{}) + go c.run() +} + +// Stop is used to stop an HTTP check. +func (c *CheckHTTP) Stop() { + c.stopLock.Lock() + defer c.stopLock.Unlock() + if !c.stop { + c.stop = true + close(c.stopCh) + } +} + +// run is invoked by a goroutine to run until Stop() is called +func (c *CheckHTTP) run() { + // Get the randomized initial pause time + initialPauseTime := lib.RandomStagger(c.Interval) + c.Logger.Printf("[DEBUG] agent: pausing %v before first HTTP request of %s", initialPauseTime, c.HTTP) + next := time.After(initialPauseTime) + for { + select { + case <-next: + c.check() + next = time.After(c.Interval) + case <-c.stopCh: + return + } + } +} + +// check is invoked periodically to perform the HTTP check +func (c *CheckHTTP) check() { + method := c.Method + if method == "" { + method = "GET" + } + + req, err := http.NewRequest(method, c.HTTP, nil) + if err != nil { + c.Logger.Printf("[WARN] agent: http request failed '%s': %s", c.HTTP, err) + c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error()) + return + } + + req.Header = http.Header(c.Header) + + // this happens during testing but not in prod + if req.Header == nil { + req.Header = make(http.Header) + } + + if host := req.Header.Get("Host"); host != "" { + req.Host = host + } + + if req.Header.Get("User-Agent") == "" { + req.Header.Set("User-Agent", UserAgent) + } + if req.Header.Get("Accept") == "" { + req.Header.Set("Accept", "text/plain, text/*, */*") + } + + resp, err := c.httpClient.Do(req) + if err != nil { + c.Logger.Printf("[WARN] agent: http request failed '%s': %s", c.HTTP, err) + c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error()) + return + } + defer resp.Body.Close() + + // Read the response into a circular buffer to limit the size + output, _ := circbuf.NewBuffer(BufSize) + if _, err := io.Copy(output, resp.Body); err != nil { + c.Logger.Printf("[WARN] agent: Check '%v': Get error while reading body: %s", c.CheckID, err) + } + + // Format the response body + result := fmt.Sprintf("HTTP GET %s: %s Output: %s", c.HTTP, resp.Status, output.String()) + + if resp.StatusCode >= 200 && resp.StatusCode <= 299 { + // PASSING (2xx) + c.Logger.Printf("[DEBUG] agent: Check '%v' is passing", c.CheckID) + c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, result) + + } else if resp.StatusCode == 429 { + // WARNING + // 429 Too Many Requests (RFC 6585) + // The user has sent too many requests in a given amount of time. + c.Logger.Printf("[WARN] agent: Check '%v' is now warning", c.CheckID) + c.Notify.UpdateCheck(c.CheckID, api.HealthWarning, result) + + } else { + // CRITICAL + c.Logger.Printf("[WARN] agent: Check '%v' is now critical", c.CheckID) + c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, result) + } +} + +// CheckTCP is used to periodically make an TCP/UDP connection to +// determine the health of a given check. +// The check is passing if the connection succeeds +// The check is critical if the connection returns an error +type CheckTCP struct { + Notify CheckNotifier + CheckID types.CheckID + TCP string + Interval time.Duration + Timeout time.Duration + Logger *log.Logger + + dialer *net.Dialer + stop bool + stopCh chan struct{} + stopLock sync.Mutex +} + +// Start is used to start a TCP check. +// The check runs until stop is called +func (c *CheckTCP) Start() { + c.stopLock.Lock() + defer c.stopLock.Unlock() + + if c.dialer == nil { + // Create the socket dialer + c.dialer = &net.Dialer{DualStack: true} + + // For long (>10s) interval checks the socket timeout is 10s, otherwise + // the timeout is the interval. This means that a check *should* return + // before the next check begins. + if c.Timeout > 0 && c.Timeout < c.Interval { + c.dialer.Timeout = c.Timeout + } else if c.Interval < 10*time.Second { + c.dialer.Timeout = c.Interval + } + } + + c.stop = false + c.stopCh = make(chan struct{}) + go c.run() +} + +// Stop is used to stop a TCP check. +func (c *CheckTCP) Stop() { + c.stopLock.Lock() + defer c.stopLock.Unlock() + if !c.stop { + c.stop = true + close(c.stopCh) + } +} + +// run is invoked by a goroutine to run until Stop() is called +func (c *CheckTCP) run() { + // Get the randomized initial pause time + initialPauseTime := lib.RandomStagger(c.Interval) + c.Logger.Printf("[DEBUG] agent: pausing %v before first socket connection of %s", initialPauseTime, c.TCP) + next := time.After(initialPauseTime) + for { + select { + case <-next: + c.check() + next = time.After(c.Interval) + case <-c.stopCh: + return + } + } +} + +// check is invoked periodically to perform the TCP check +func (c *CheckTCP) check() { + conn, err := c.dialer.Dial(`tcp`, c.TCP) + if err != nil { + c.Logger.Printf("[WARN] agent: socket connection failed '%s': %s", c.TCP, err) + c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error()) + return + } + conn.Close() + c.Logger.Printf("[DEBUG] agent: Check '%v' is passing", c.CheckID) + c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("TCP connect %s: Success", c.TCP)) +} + +// CheckDocker is used to periodically invoke a script to +// determine the health of an application running inside a +// Docker Container. We assume that the script is compatible +// with nagios plugins and expects the output in the same format. +type CheckDocker struct { + Notify CheckNotifier + CheckID types.CheckID + Script string + ScriptArgs []string + DockerContainerID string + Shell string + Interval time.Duration + Logger *log.Logger + Client *DockerClient + + stop chan struct{} +} + +func (c *CheckDocker) Start() { + if c.stop != nil { + panic("Docker check already started") + } + + if c.Logger == nil { + c.Logger = log.New(ioutil.Discard, "", 0) + } + + if c.Shell == "" { + c.Shell = os.Getenv("SHELL") + if c.Shell == "" { + c.Shell = "/bin/sh" + } + } + c.stop = make(chan struct{}) + go c.run() +} + +func (c *CheckDocker) Stop() { + if c.stop == nil { + panic("Stop called before start") + } + close(c.stop) +} + +func (c *CheckDocker) run() { + firstWait := lib.RandomStagger(c.Interval) + next := time.After(firstWait) + for { + select { + case <-next: + c.check() + next = time.After(c.Interval) + case <-c.stop: + return + } + } +} + +func (c *CheckDocker) check() { + var out string + status, b, err := c.doCheck() + if err != nil { + c.Logger.Printf("[DEBUG] agent: Check '%s': %s", c.CheckID, err) + out = err.Error() + } else { + // out is already limited to CheckBufSize since we're getting a + // limited buffer. So we don't need to truncate it just report + // that it was truncated. + out = string(b.Bytes()) + if int(b.TotalWritten()) > len(out) { + out = fmt.Sprintf("Captured %d of %d bytes\n...\n%s", len(out), b.TotalWritten(), out) + } + c.Logger.Printf("[DEBUG] agent: Check '%s' script '%s' output: %s", c.CheckID, c.Script, out) + } + + if status == api.HealthCritical { + c.Logger.Printf("[WARN] agent: Check '%v' is now critical", c.CheckID) + } + + c.Notify.UpdateCheck(c.CheckID, status, out) +} + +func (c *CheckDocker) doCheck() (string, *circbuf.Buffer, error) { + var cmd []string + if len(c.ScriptArgs) > 0 { + cmd = c.ScriptArgs + } else { + cmd = []string{c.Shell, "-c", c.Script} + } + + execID, err := c.Client.CreateExec(c.DockerContainerID, cmd) + if err != nil { + return api.HealthCritical, nil, err + } + + buf, err := c.Client.StartExec(c.DockerContainerID, execID) + if err != nil { + return api.HealthCritical, nil, err + } + + exitCode, err := c.Client.InspectExec(c.DockerContainerID, execID) + if err != nil { + return api.HealthCritical, nil, err + } + + switch exitCode { + case 0: + return api.HealthPassing, buf, nil + case 1: + c.Logger.Printf("[DEBUG] Check failed with exit code: %d", exitCode) + return api.HealthWarning, buf, nil + default: + c.Logger.Printf("[DEBUG] Check failed with exit code: %d", exitCode) + return api.HealthCritical, buf, nil + } +} diff --git a/agent/check_test.go b/agent/checks/check_test.go similarity index 94% rename from agent/check_test.go rename to agent/checks/check_test.go index eb544d9775..c5a046848c 100644 --- a/agent/check_test.go +++ b/agent/checks/check_test.go @@ -1,4 +1,4 @@ -package agent +package checks import ( "bytes" @@ -15,6 +15,7 @@ import ( "time" "github.com/hashicorp/consul/agent/mock" + "github.com/hashicorp/consul/agent/unique" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/testutil/retry" "github.com/hashicorp/consul/types" @@ -38,7 +39,7 @@ func TestCheckMonitor_Script(t *testing.T) { CheckID: types.CheckID("foo"), Script: tt.script, Interval: 25 * time.Millisecond, - Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags), + Logger: log.New(ioutil.Discard, unique.ID(), log.LstdFlags), } check.Start() defer check.Stop() @@ -73,7 +74,7 @@ func TestCheckMonitor_Args(t *testing.T) { CheckID: types.CheckID("foo"), ScriptArgs: tt.args, Interval: 25 * time.Millisecond, - Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags), + Logger: log.New(ioutil.Discard, unique.ID(), log.LstdFlags), } check.Start() defer check.Stop() @@ -98,7 +99,7 @@ func TestCheckMonitor_Timeout(t *testing.T) { ScriptArgs: []string{"sh", "-c", "sleep 1 && exit 0"}, Interval: 50 * time.Millisecond, Timeout: 25 * time.Millisecond, - Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags), + Logger: log.New(ioutil.Discard, unique.ID(), log.LstdFlags), } check.Start() defer check.Stop() @@ -122,7 +123,7 @@ func TestCheckMonitor_RandomStagger(t *testing.T) { CheckID: types.CheckID("foo"), ScriptArgs: []string{"sh", "-c", "exit 0"}, Interval: 25 * time.Millisecond, - Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags), + Logger: log.New(ioutil.Discard, unique.ID(), log.LstdFlags), } check.Start() defer check.Stop() @@ -147,7 +148,7 @@ func TestCheckMonitor_LimitOutput(t *testing.T) { CheckID: types.CheckID("foo"), ScriptArgs: []string{"od", "-N", "81920", "/dev/urandom"}, Interval: 25 * time.Millisecond, - Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags), + Logger: log.New(ioutil.Discard, unique.ID(), log.LstdFlags), } check.Start() defer check.Stop() @@ -155,7 +156,7 @@ func TestCheckMonitor_LimitOutput(t *testing.T) { time.Sleep(50 * time.Millisecond) // Allow for extra bytes for the truncation message - if len(notif.Output("foo")) > CheckBufSize+100 { + if len(notif.Output("foo")) > BufSize+100 { t.Fatalf("output size is too long") } } @@ -167,7 +168,7 @@ func TestCheckTTL(t *testing.T) { Notify: notif, CheckID: types.CheckID("foo"), TTL: 200 * time.Millisecond, - Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags), + Logger: log.New(ioutil.Discard, unique.ID(), log.LstdFlags), } check.Start() defer check.Stop() @@ -277,7 +278,7 @@ func TestCheckHTTP(t *testing.T) { } // Body larger than 4k limit - body := bytes.Repeat([]byte{'a'}, 2*CheckBufSize) + body := bytes.Repeat([]byte{'a'}, 2*BufSize) w.WriteHeader(tt.code) w.Write(body) })) @@ -291,7 +292,7 @@ func TestCheckHTTP(t *testing.T) { Method: tt.method, Header: tt.header, Interval: 10 * time.Millisecond, - Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags), + Logger: log.New(ioutil.Discard, unique.ID(), log.LstdFlags), } check.Start() defer check.Stop() @@ -303,9 +304,9 @@ func TestCheckHTTP(t *testing.T) { if got, want := notif.State("foo"), tt.status; got != want { r.Fatalf("got state %q want %q", got, want) } - // Allow slightly more data than CheckBufSize, for the header - if n := len(notif.Output("foo")); n > (CheckBufSize + 256) { - r.Fatalf("output too long: %d (%d-byte limit)", n, CheckBufSize) + // Allow slightly more data than BufSize, for the header + if n := len(notif.Output("foo")); n > (BufSize + 256) { + r.Fatalf("output too long: %d (%d-byte limit)", n, BufSize) } }) }) @@ -327,7 +328,7 @@ func TestCheckHTTPTimeout(t *testing.T) { HTTP: server.URL, Timeout: timeout, Interval: 10 * time.Millisecond, - Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags), + Logger: log.New(ioutil.Discard, unique.ID(), log.LstdFlags), } check.Start() @@ -348,7 +349,7 @@ func TestCheckHTTP_disablesKeepAlives(t *testing.T) { CheckID: types.CheckID("foo"), HTTP: "http://foo.bar/baz", Interval: 10 * time.Second, - Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags), + Logger: log.New(ioutil.Discard, unique.ID(), log.LstdFlags), } check.Start() @@ -365,7 +366,7 @@ func TestCheckHTTP_TLSSkipVerify_defaultFalse(t *testing.T) { CheckID: "foo", HTTP: "https://foo.bar/baz", Interval: 10 * time.Second, - Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags), + Logger: log.New(ioutil.Discard, unique.ID(), log.LstdFlags), } check.Start() @@ -379,7 +380,7 @@ func TestCheckHTTP_TLSSkipVerify_defaultFalse(t *testing.T) { func largeBodyHandler(code int) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Body larger than 4k limit - body := bytes.Repeat([]byte{'a'}, 2*CheckBufSize) + body := bytes.Repeat([]byte{'a'}, 2*BufSize) w.WriteHeader(code) w.Write(body) }) @@ -397,7 +398,7 @@ func TestCheckHTTP_TLSSkipVerify_true_pass(t *testing.T) { CheckID: types.CheckID("skipverify_true"), HTTP: server.URL, Interval: 25 * time.Millisecond, - Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags), + Logger: log.New(ioutil.Discard, unique.ID(), log.LstdFlags), TLSSkipVerify: true, } @@ -429,7 +430,7 @@ func TestCheckHTTP_TLSSkipVerify_true_fail(t *testing.T) { CheckID: types.CheckID("skipverify_true"), HTTP: server.URL, Interval: 5 * time.Millisecond, - Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags), + Logger: log.New(ioutil.Discard, unique.ID(), log.LstdFlags), TLSSkipVerify: true, } check.Start() @@ -457,7 +458,7 @@ func TestCheckHTTP_TLSSkipVerify_false(t *testing.T) { CheckID: types.CheckID("skipverify_false"), HTTP: server.URL, Interval: 100 * time.Millisecond, - Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags), + Logger: log.New(ioutil.Discard, unique.ID(), log.LstdFlags), TLSSkipVerify: false, } @@ -504,7 +505,7 @@ func expectTCPStatus(t *testing.T, tcp string, status string) { CheckID: types.CheckID("foo"), TCP: tcp, Interval: 10 * time.Millisecond, - Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags), + Logger: log.New(ioutil.Discard, unique.ID(), log.LstdFlags), } check.Start() defer check.Stop() @@ -813,7 +814,7 @@ func TestCheck_Docker(t *testing.T) { ScriptArgs: []string{"/health.sh"}, DockerContainerID: "123", Interval: 25 * time.Millisecond, - client: c, + Client: c, } check.Start() defer check.Stop() diff --git a/agent/docker.go b/agent/checks/docker.go similarity index 98% rename from agent/docker.go rename to agent/checks/docker.go index 3e88b9b838..28c42a7571 100644 --- a/agent/docker.go +++ b/agent/checks/docker.go @@ -1,4 +1,4 @@ -package agent +package checks import ( "bytes" @@ -54,6 +54,10 @@ func NewDockerClient(host string, maxbuf int64) (*DockerClient, error) { }, nil } +func (c *DockerClient) Host() string { + return c.host +} + // ParseHost verifies that the given host strings is valid. // copied from github.com/docker/docker/client.go func ParseHost(host string) (string, string, string, error) { diff --git a/agent/docker_unix.go b/agent/checks/docker_unix.go similarity index 83% rename from agent/docker_unix.go rename to agent/checks/docker_unix.go index ff66cd277d..0cde4ad54c 100644 --- a/agent/docker_unix.go +++ b/agent/checks/docker_unix.go @@ -1,5 +1,5 @@ // +build !windows -package agent +package checks const DefaultDockerHost = "unix:///var/run/docker.sock" diff --git a/agent/docker_windows.go b/agent/checks/docker_windows.go similarity index 80% rename from agent/docker_windows.go rename to agent/checks/docker_windows.go index 0046ea9756..c2b1e173d4 100644 --- a/agent/docker_windows.go +++ b/agent/checks/docker_windows.go @@ -1,3 +1,3 @@ -package agent +package checks const DefaultDockerHost = "npipe:////./pipe/docker_engine" diff --git a/agent/exec/exec.go b/agent/exec/exec.go new file mode 100644 index 0000000000..19097fe0fb --- /dev/null +++ b/agent/exec/exec.go @@ -0,0 +1,14 @@ +package exec + +import ( + "fmt" + "os/exec" +) + +// Subprocess returns a command to execute a subprocess directly. +func Subprocess(args []string) (*exec.Cmd, error) { + if len(args) == 0 { + return nil, fmt.Errorf("need an executable to run") + } + return exec.Command(args[0], args[1:]...), nil +} diff --git a/agent/util_other.go b/agent/exec/exec_unix.go similarity index 77% rename from agent/util_other.go rename to agent/exec/exec_unix.go index d9ff0f8bab..74209a7923 100644 --- a/agent/util_other.go +++ b/agent/exec/exec_unix.go @@ -1,6 +1,5 @@ // +build !windows - -package agent +package exec import ( "os" @@ -8,8 +7,8 @@ import ( "syscall" ) -// ExecScript returns a command to execute a script through a shell. -func ExecScript(script string) (*exec.Cmd, error) { +// Script returns a command to execute a script through a shell. +func Script(script string) (*exec.Cmd, error) { shell := "/bin/sh" if other := os.Getenv("SHELL"); other != "" { shell = other diff --git a/agent/util_windows.go b/agent/exec/exec_windows.go similarity index 76% rename from agent/util_windows.go rename to agent/exec/exec_windows.go index acb3b0cc81..851b1d4cf8 100644 --- a/agent/util_windows.go +++ b/agent/exec/exec_windows.go @@ -1,6 +1,6 @@ // +build windows -package agent +package exec import ( "os" @@ -9,8 +9,8 @@ import ( "syscall" ) -// ExecScript returns a command to execute a script through a shell. -func ExecScript(script string) (*exec.Cmd, error) { +// Script returns a command to execute a script through a shell. +func Script(script string) (*exec.Cmd, error) { shell := "cmd" if other := os.Getenv("SHELL"); other != "" { shell = other diff --git a/agent/remote_exec.go b/agent/remote_exec.go index 5389486421..5700f45201 100644 --- a/agent/remote_exec.go +++ b/agent/remote_exec.go @@ -5,13 +5,14 @@ import ( "fmt" "io/ioutil" "os" - "os/exec" + osexec "os/exec" "path" "strconv" "sync" "syscall" "time" + "github.com/hashicorp/consul/agent/exec" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" ) @@ -161,12 +162,12 @@ func (a *Agent) handleRemoteExec(msg *UserEvent) { // Create the exec.Cmd a.logger.Printf("[INFO] agent: remote exec '%s'", script) - var cmd *exec.Cmd + var cmd *osexec.Cmd var err error if len(spec.Args) > 0 { - cmd, err = ExecSubprocess(spec.Args) + cmd, err = exec.Subprocess(spec.Args) } else { - cmd, err = ExecScript(script) + cmd, err = exec.Script(script) } if err != nil { a.logger.Printf("[DEBUG] agent: failed to start remote exec: %v", err) @@ -203,7 +204,7 @@ func (a *Agent) handleRemoteExec(msg *UserEvent) { } // Try to determine the exit code - if exitErr, ok := err.(*exec.ExitError); ok { + if exitErr, ok := err.(*osexec.ExitError); ok { if status, ok := exitErr.Sys().(syscall.WaitStatus); ok { exitCh <- status.ExitStatus() return diff --git a/agent/testagent.go b/agent/testagent.go index 1c4806d36c..33a6f49454 100644 --- a/agent/testagent.go +++ b/agent/testagent.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/unique" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib/freeport" "github.com/hashicorp/consul/logger" @@ -111,7 +112,7 @@ func (a *TestAgent) Start() *TestAgent { } hclDataDir = `data_dir = "` + d + `"` } - id := UniqueID() + id := unique.ID() for i := 10; i >= 0; i-- { a.Config = TestConfig( @@ -276,14 +277,6 @@ func (a *TestAgent) consulConfig() *consul.Config { return c } -func UniqueID() string { - id := strconv.FormatUint(rand.Uint64(), 36) - for len(id) < 16 { - id += " " - } - return id -} - // pickRandomPorts selects random ports from fixed size random blocks of // ports. This does not eliminate the chance for port conflict but // reduces it significanltly with little overhead. Furthermore, asking diff --git a/agent/unique/id.go b/agent/unique/id.go new file mode 100644 index 0000000000..d4ba76dfc5 --- /dev/null +++ b/agent/unique/id.go @@ -0,0 +1,19 @@ +package unique + +import ( + "math/rand" + "strconv" + "time" +) + +func init() { + rand.Seed(time.Now().UnixNano()) // seed random number generator +} + +func ID() string { + id := strconv.FormatUint(rand.Uint64(), 36) + for len(id) < 16 { + id += " " + } + return id +} diff --git a/agent/util.go b/agent/util.go index ae7c3c154f..1044a7742e 100644 --- a/agent/util.go +++ b/agent/util.go @@ -92,15 +92,6 @@ GROUP: return nil } -// ExecSubprocess returns a command to execute a subprocess directly. -func ExecSubprocess(args []string) (*exec.Cmd, error) { - if len(args) == 0 { - return nil, fmt.Errorf("need an executable to run") - } - - return exec.Command(args[0], args[1:]...), nil -} - // ForwardSignals will fire up a goroutine to forward signals to the given // subprocess until the shutdown channel is closed. func ForwardSignals(cmd *exec.Cmd, logFn func(error), shutdownCh <-chan struct{}) { diff --git a/agent/watch_handler.go b/agent/watch_handler.go index dc94a7219a..4c6a9d3f3f 100644 --- a/agent/watch_handler.go +++ b/agent/watch_handler.go @@ -9,10 +9,11 @@ import ( "log" "net/http" "os" - "os/exec" + osexec "os/exec" "strconv" "github.com/armon/circbuf" + "github.com/hashicorp/consul/agent/exec" "github.com/hashicorp/consul/watch" "github.com/hashicorp/go-cleanhttp" "golang.org/x/net/context" @@ -43,13 +44,13 @@ func makeWatchHandler(logOutput io.Writer, handler interface{}) watch.HandlerFun logger := log.New(logOutput, "", log.LstdFlags) fn := func(idx uint64, data interface{}) { // Create the command - var cmd *exec.Cmd + var cmd *osexec.Cmd var err error if len(args) > 0 { - cmd, err = ExecSubprocess(args) + cmd, err = exec.Subprocess(args) } else { - cmd, err = ExecScript(script) + cmd, err = exec.Script(script) } if err != nil { logger.Printf("[ERR] agent: Failed to setup watch: %v", err) diff --git a/command/lock/lock.go b/command/lock/lock.go index cb7db04c7b..5abddb3d90 100644 --- a/command/lock/lock.go +++ b/command/lock/lock.go @@ -4,7 +4,7 @@ import ( "flag" "fmt" "os" - "os/exec" + osexec "os/exec" "path" "strings" "sync" @@ -12,6 +12,7 @@ import ( "time" "github.com/hashicorp/consul/agent" + "github.com/hashicorp/consul/agent/exec" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/command/flags" "github.com/mitchellh/cli" @@ -341,12 +342,12 @@ func (c *cmd) startChild(args []string, passStdin, shell bool) error { } // Create the command - var cmd *exec.Cmd + var cmd *osexec.Cmd var err error if !shell { - cmd, err = agent.ExecSubprocess(args) + cmd, err = exec.Subprocess(args) } else { - cmd, err = agent.ExecScript(strings.Join(args, " ")) + cmd, err = exec.Script(strings.Join(args, " ")) } if err != nil { c.UI.Error(fmt.Sprintf("Error executing handler: %s", err)) diff --git a/command/watch/watch.go b/command/watch/watch.go index 1734647182..3b8c67836b 100644 --- a/command/watch/watch.go +++ b/command/watch/watch.go @@ -6,11 +6,12 @@ import ( "flag" "fmt" "os" - "os/exec" + osexec "os/exec" "strconv" "strings" "github.com/hashicorp/consul/agent" + "github.com/hashicorp/consul/agent/exec" "github.com/hashicorp/consul/command/flags" consulwatch "github.com/hashicorp/consul/watch" "github.com/mitchellh/cli" @@ -173,11 +174,11 @@ func (c *cmd) Run(args []string) int { // Create the command var buf bytes.Buffer var err error - var cmd *exec.Cmd + var cmd *osexec.Cmd if !c.shell { - cmd, err = agent.ExecSubprocess(c.flags.Args()) + cmd, err = exec.Subprocess(c.flags.Args()) } else { - cmd, err = agent.ExecScript(strings.Join(c.flags.Args(), " ")) + cmd, err = exec.Script(strings.Join(c.flags.Args(), " ")) } if err != nil { c.UI.Error(fmt.Sprintf("Error executing handler: %s", err))