mirror of https://github.com/status-im/consul.git
Merge pull request #3612 from hashicorp/decouple-checks-from-agent
Decouple the code that executes checks from the agent
This commit is contained in:
commit
b6de1f7446
|
@ -21,6 +21,7 @@ import (
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/ae"
|
"github.com/hashicorp/consul/agent/ae"
|
||||||
|
"github.com/hashicorp/consul/agent/checks"
|
||||||
"github.com/hashicorp/consul/agent/config"
|
"github.com/hashicorp/consul/agent/config"
|
||||||
"github.com/hashicorp/consul/agent/consul"
|
"github.com/hashicorp/consul/agent/consul"
|
||||||
"github.com/hashicorp/consul/agent/local"
|
"github.com/hashicorp/consul/agent/local"
|
||||||
|
@ -120,25 +121,25 @@ type Agent struct {
|
||||||
checkReapAfter map[types.CheckID]time.Duration
|
checkReapAfter map[types.CheckID]time.Duration
|
||||||
|
|
||||||
// checkMonitors maps the check ID to an associated monitor
|
// checkMonitors maps the check ID to an associated monitor
|
||||||
checkMonitors map[types.CheckID]*CheckMonitor
|
checkMonitors map[types.CheckID]*checks.CheckMonitor
|
||||||
|
|
||||||
// checkHTTPs maps the check ID to an associated HTTP check
|
// checkHTTPs maps the check ID to an associated HTTP check
|
||||||
checkHTTPs map[types.CheckID]*CheckHTTP
|
checkHTTPs map[types.CheckID]*checks.CheckHTTP
|
||||||
|
|
||||||
// checkTCPs maps the check ID to an associated TCP check
|
// checkTCPs maps the check ID to an associated TCP check
|
||||||
checkTCPs map[types.CheckID]*CheckTCP
|
checkTCPs map[types.CheckID]*checks.CheckTCP
|
||||||
|
|
||||||
// checkTTLs maps the check ID to an associated check TTL
|
// checkTTLs maps the check ID to an associated check TTL
|
||||||
checkTTLs map[types.CheckID]*CheckTTL
|
checkTTLs map[types.CheckID]*checks.CheckTTL
|
||||||
|
|
||||||
// checkDockers maps the check ID to an associated Docker Exec based check
|
// checkDockers maps the check ID to an associated Docker Exec based check
|
||||||
checkDockers map[types.CheckID]*CheckDocker
|
checkDockers map[types.CheckID]*checks.CheckDocker
|
||||||
|
|
||||||
// checkLock protects updates to the check* maps
|
// checkLock protects updates to the check* maps
|
||||||
checkLock sync.Mutex
|
checkLock sync.Mutex
|
||||||
|
|
||||||
// dockerClient is the client for performing docker health checks.
|
// dockerClient is the client for performing docker health checks.
|
||||||
dockerClient *DockerClient
|
dockerClient *checks.DockerClient
|
||||||
|
|
||||||
// eventCh is used to receive user events
|
// eventCh is used to receive user events
|
||||||
eventCh chan serf.UserEvent
|
eventCh chan serf.UserEvent
|
||||||
|
@ -206,11 +207,11 @@ func New(c *config.RuntimeConfig) (*Agent, error) {
|
||||||
config: c,
|
config: c,
|
||||||
acls: acls,
|
acls: acls,
|
||||||
checkReapAfter: make(map[types.CheckID]time.Duration),
|
checkReapAfter: make(map[types.CheckID]time.Duration),
|
||||||
checkMonitors: make(map[types.CheckID]*CheckMonitor),
|
checkMonitors: make(map[types.CheckID]*checks.CheckMonitor),
|
||||||
checkTTLs: make(map[types.CheckID]*CheckTTL),
|
checkTTLs: make(map[types.CheckID]*checks.CheckTTL),
|
||||||
checkHTTPs: make(map[types.CheckID]*CheckHTTP),
|
checkHTTPs: make(map[types.CheckID]*checks.CheckHTTP),
|
||||||
checkTCPs: make(map[types.CheckID]*CheckTCP),
|
checkTCPs: make(map[types.CheckID]*checks.CheckTCP),
|
||||||
checkDockers: make(map[types.CheckID]*CheckDocker),
|
checkDockers: make(map[types.CheckID]*checks.CheckDocker),
|
||||||
eventCh: make(chan serf.UserEvent, 1024),
|
eventCh: make(chan serf.UserEvent, 1024),
|
||||||
eventBuf: make([]*UserEvent, 256),
|
eventBuf: make([]*UserEvent, 256),
|
||||||
joinLANNotifier: &systemd.Notifier{},
|
joinLANNotifier: &systemd.Notifier{},
|
||||||
|
@ -1675,7 +1676,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
||||||
delete(a.checkTTLs, check.CheckID)
|
delete(a.checkTTLs, check.CheckID)
|
||||||
}
|
}
|
||||||
|
|
||||||
ttl := &CheckTTL{
|
ttl := &checks.CheckTTL{
|
||||||
Notify: a.State,
|
Notify: a.State,
|
||||||
CheckID: check.CheckID,
|
CheckID: check.CheckID,
|
||||||
TTL: chkType.TTL,
|
TTL: chkType.TTL,
|
||||||
|
@ -1696,13 +1697,13 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
||||||
existing.Stop()
|
existing.Stop()
|
||||||
delete(a.checkHTTPs, check.CheckID)
|
delete(a.checkHTTPs, check.CheckID)
|
||||||
}
|
}
|
||||||
if chkType.Interval < MinInterval {
|
if chkType.Interval < checks.MinInterval {
|
||||||
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
|
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
|
||||||
check.CheckID, MinInterval))
|
check.CheckID, checks.MinInterval))
|
||||||
chkType.Interval = MinInterval
|
chkType.Interval = checks.MinInterval
|
||||||
}
|
}
|
||||||
|
|
||||||
http := &CheckHTTP{
|
http := &checks.CheckHTTP{
|
||||||
Notify: a.State,
|
Notify: a.State,
|
||||||
CheckID: check.CheckID,
|
CheckID: check.CheckID,
|
||||||
HTTP: chkType.HTTP,
|
HTTP: chkType.HTTP,
|
||||||
|
@ -1721,13 +1722,13 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
||||||
existing.Stop()
|
existing.Stop()
|
||||||
delete(a.checkTCPs, check.CheckID)
|
delete(a.checkTCPs, check.CheckID)
|
||||||
}
|
}
|
||||||
if chkType.Interval < MinInterval {
|
if chkType.Interval < checks.MinInterval {
|
||||||
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
|
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
|
||||||
check.CheckID, MinInterval))
|
check.CheckID, checks.MinInterval))
|
||||||
chkType.Interval = MinInterval
|
chkType.Interval = checks.MinInterval
|
||||||
}
|
}
|
||||||
|
|
||||||
tcp := &CheckTCP{
|
tcp := &checks.CheckTCP{
|
||||||
Notify: a.State,
|
Notify: a.State,
|
||||||
CheckID: check.CheckID,
|
CheckID: check.CheckID,
|
||||||
TCP: chkType.TCP,
|
TCP: chkType.TCP,
|
||||||
|
@ -1743,10 +1744,10 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
||||||
existing.Stop()
|
existing.Stop()
|
||||||
delete(a.checkDockers, check.CheckID)
|
delete(a.checkDockers, check.CheckID)
|
||||||
}
|
}
|
||||||
if chkType.Interval < MinInterval {
|
if chkType.Interval < checks.MinInterval {
|
||||||
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
|
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
|
||||||
check.CheckID, MinInterval))
|
check.CheckID, checks.MinInterval))
|
||||||
chkType.Interval = MinInterval
|
chkType.Interval = checks.MinInterval
|
||||||
}
|
}
|
||||||
if chkType.Script != "" {
|
if chkType.Script != "" {
|
||||||
a.logger.Printf("[WARN] agent: check %q has the 'script' field, which has been deprecated "+
|
a.logger.Printf("[WARN] agent: check %q has the 'script' field, which has been deprecated "+
|
||||||
|
@ -1755,16 +1756,16 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
||||||
}
|
}
|
||||||
|
|
||||||
if a.dockerClient == nil {
|
if a.dockerClient == nil {
|
||||||
dc, err := NewDockerClient(os.Getenv("DOCKER_HOST"), CheckBufSize)
|
dc, err := checks.NewDockerClient(os.Getenv("DOCKER_HOST"), checks.BufSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.logger.Printf("[ERR] agent: error creating docker client: %s", err)
|
a.logger.Printf("[ERR] agent: error creating docker client: %s", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
a.logger.Printf("[DEBUG] agent: created docker client for %s", dc.host)
|
a.logger.Printf("[DEBUG] agent: created docker client for %s", dc.Host())
|
||||||
a.dockerClient = dc
|
a.dockerClient = dc
|
||||||
}
|
}
|
||||||
|
|
||||||
dockerCheck := &CheckDocker{
|
dockerCheck := &checks.CheckDocker{
|
||||||
Notify: a.State,
|
Notify: a.State,
|
||||||
CheckID: check.CheckID,
|
CheckID: check.CheckID,
|
||||||
DockerContainerID: chkType.DockerContainerID,
|
DockerContainerID: chkType.DockerContainerID,
|
||||||
|
@ -1773,7 +1774,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
||||||
ScriptArgs: chkType.ScriptArgs,
|
ScriptArgs: chkType.ScriptArgs,
|
||||||
Interval: chkType.Interval,
|
Interval: chkType.Interval,
|
||||||
Logger: a.logger,
|
Logger: a.logger,
|
||||||
client: a.dockerClient,
|
Client: a.dockerClient,
|
||||||
}
|
}
|
||||||
dockerCheck.Start()
|
dockerCheck.Start()
|
||||||
a.checkDockers[check.CheckID] = dockerCheck
|
a.checkDockers[check.CheckID] = dockerCheck
|
||||||
|
@ -1783,10 +1784,10 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
||||||
existing.Stop()
|
existing.Stop()
|
||||||
delete(a.checkMonitors, check.CheckID)
|
delete(a.checkMonitors, check.CheckID)
|
||||||
}
|
}
|
||||||
if chkType.Interval < MinInterval {
|
if chkType.Interval < checks.MinInterval {
|
||||||
a.logger.Printf("[WARN] agent: check '%s' has interval below minimum of %v",
|
a.logger.Printf("[WARN] agent: check '%s' has interval below minimum of %v",
|
||||||
check.CheckID, MinInterval)
|
check.CheckID, checks.MinInterval)
|
||||||
chkType.Interval = MinInterval
|
chkType.Interval = checks.MinInterval
|
||||||
}
|
}
|
||||||
if chkType.Script != "" {
|
if chkType.Script != "" {
|
||||||
a.logger.Printf("[WARN] agent: check %q has the 'script' field, which has been deprecated "+
|
a.logger.Printf("[WARN] agent: check %q has the 'script' field, which has been deprecated "+
|
||||||
|
@ -1794,7 +1795,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
||||||
check.CheckID)
|
check.CheckID)
|
||||||
}
|
}
|
||||||
|
|
||||||
monitor := &CheckMonitor{
|
monitor := &checks.CheckMonitor{
|
||||||
Notify: a.State,
|
Notify: a.State,
|
||||||
CheckID: check.CheckID,
|
CheckID: check.CheckID,
|
||||||
Script: chkType.Script,
|
Script: chkType.Script,
|
||||||
|
@ -1922,7 +1923,7 @@ func (a *Agent) updateTTLCheck(checkID types.CheckID, status, output string) err
|
||||||
// persistCheckState is used to record the check status into the data dir.
|
// persistCheckState is used to record the check status into the data dir.
|
||||||
// This allows the state to be restored on a later agent start. Currently
|
// This allows the state to be restored on a later agent start. Currently
|
||||||
// only useful for TTL based checks.
|
// only useful for TTL based checks.
|
||||||
func (a *Agent) persistCheckState(check *CheckTTL, status, output string) error {
|
func (a *Agent) persistCheckState(check *checks.CheckTTL, status, output string) error {
|
||||||
// Create the persisted state
|
// Create the persisted state
|
||||||
state := persistedCheckState{
|
state := persistedCheckState{
|
||||||
CheckID: check.CheckID,
|
CheckID: check.CheckID,
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/agent/checks"
|
||||||
"github.com/hashicorp/consul/agent/config"
|
"github.com/hashicorp/consul/agent/config"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
|
@ -490,9 +491,9 @@ func (s *HTTPServer) AgentCheckUpdate(resp http.ResponseWriter, req *http.Reques
|
||||||
}
|
}
|
||||||
|
|
||||||
total := len(update.Output)
|
total := len(update.Output)
|
||||||
if total > CheckBufSize {
|
if total > checks.BufSize {
|
||||||
update.Output = fmt.Sprintf("%s ... (captured %d of %d bytes)",
|
update.Output = fmt.Sprintf("%s ... (captured %d of %d bytes)",
|
||||||
update.Output[:CheckBufSize], CheckBufSize, total)
|
update.Output[:checks.BufSize], checks.BufSize, total)
|
||||||
}
|
}
|
||||||
|
|
||||||
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/update/"))
|
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/update/"))
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/agent/checks"
|
||||||
"github.com/hashicorp/consul/agent/config"
|
"github.com/hashicorp/consul/agent/config"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
|
@ -1127,7 +1128,7 @@ func TestAgent_UpdateCheck(t *testing.T) {
|
||||||
t.Run("log output limit", func(t *testing.T) {
|
t.Run("log output limit", func(t *testing.T) {
|
||||||
args := checkUpdate{
|
args := checkUpdate{
|
||||||
Status: api.HealthPassing,
|
Status: api.HealthPassing,
|
||||||
Output: strings.Repeat("-= bad -=", 5*CheckBufSize),
|
Output: strings.Repeat("-= bad -=", 5*checks.BufSize),
|
||||||
}
|
}
|
||||||
req, _ := http.NewRequest("PUT", "/v1/agent/check/update/test", jsonReader(args))
|
req, _ := http.NewRequest("PUT", "/v1/agent/check/update/test", jsonReader(args))
|
||||||
resp := httptest.NewRecorder()
|
resp := httptest.NewRecorder()
|
||||||
|
@ -1146,7 +1147,7 @@ func TestAgent_UpdateCheck(t *testing.T) {
|
||||||
// rough check that the output buffer was cut down so this test
|
// rough check that the output buffer was cut down so this test
|
||||||
// isn't super brittle.
|
// isn't super brittle.
|
||||||
state := a.State.Checks()["test"]
|
state := a.State.Checks()["test"]
|
||||||
if state.Status != api.HealthPassing || len(state.Output) > 2*CheckBufSize {
|
if state.Status != api.HealthPassing || len(state.Output) > 2*checks.BufSize {
|
||||||
t.Fatalf("bad: %v", state)
|
t.Fatalf("bad: %v", state)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/checks"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/consul/testutil"
|
"github.com/hashicorp/consul/testutil"
|
||||||
|
@ -646,7 +647,7 @@ func TestAgent_AddCheck_MinInterval(t *testing.T) {
|
||||||
// Ensure a TTL is setup
|
// Ensure a TTL is setup
|
||||||
if mon, ok := a.checkMonitors["mem"]; !ok {
|
if mon, ok := a.checkMonitors["mem"]; !ok {
|
||||||
t.Fatalf("missing mem monitor")
|
t.Fatalf("missing mem monitor")
|
||||||
} else if mon.Interval != MinInterval {
|
} else if mon.Interval != checks.MinInterval {
|
||||||
t.Fatalf("bad mem monitor interval")
|
t.Fatalf("bad mem monitor interval")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -680,7 +681,7 @@ func TestAgent_AddCheck_RestoreState(t *testing.T) {
|
||||||
defer a.Shutdown()
|
defer a.Shutdown()
|
||||||
|
|
||||||
// Create some state and persist it
|
// Create some state and persist it
|
||||||
ttl := &CheckTTL{
|
ttl := &checks.CheckTTL{
|
||||||
CheckID: "baz",
|
CheckID: "baz",
|
||||||
TTL: time.Minute,
|
TTL: time.Minute,
|
||||||
}
|
}
|
||||||
|
@ -1764,7 +1765,7 @@ func TestAgent_persistCheckState(t *testing.T) {
|
||||||
defer a.Shutdown()
|
defer a.Shutdown()
|
||||||
|
|
||||||
// Create the TTL check to persist
|
// Create the TTL check to persist
|
||||||
check := &CheckTTL{
|
check := &checks.CheckTTL{
|
||||||
CheckID: "check1",
|
CheckID: "check1",
|
||||||
TTL: 10 * time.Minute,
|
TTL: 10 * time.Minute,
|
||||||
}
|
}
|
||||||
|
@ -1811,7 +1812,7 @@ func TestAgent_loadCheckState(t *testing.T) {
|
||||||
defer a.Shutdown()
|
defer a.Shutdown()
|
||||||
|
|
||||||
// Create a check whose state will expire immediately
|
// Create a check whose state will expire immediately
|
||||||
check := &CheckTTL{
|
check := &checks.CheckTTL{
|
||||||
CheckID: "check1",
|
CheckID: "check1",
|
||||||
TTL: 0,
|
TTL: 0,
|
||||||
}
|
}
|
||||||
|
@ -1877,7 +1878,7 @@ func TestAgent_purgeCheckState(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Persist some state to the data dir
|
// Persist some state to the data dir
|
||||||
check := &CheckTTL{
|
check := &checks.CheckTTL{
|
||||||
CheckID: "check1",
|
CheckID: "check1",
|
||||||
TTL: time.Minute,
|
TTL: time.Minute,
|
||||||
}
|
}
|
||||||
|
|
640
agent/check.go
640
agent/check.go
|
@ -1,290 +1,10 @@
|
||||||
package agent
|
package agent
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/tls"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"io/ioutil"
|
|
||||||
"log"
|
|
||||||
"net"
|
|
||||||
"net/http"
|
|
||||||
"os"
|
|
||||||
"os/exec"
|
|
||||||
"sync"
|
|
||||||
"syscall"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/armon/circbuf"
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
|
||||||
"github.com/hashicorp/consul/lib"
|
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
"github.com/hashicorp/go-cleanhttp"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
// MinInterval is the minimal interval between
|
|
||||||
// two checks. Do not allow for a interval below this value.
|
|
||||||
// Otherwise we risk fork bombing a system.
|
|
||||||
MinInterval = time.Second
|
|
||||||
|
|
||||||
// CheckBufSize is the maximum size of the captured
|
|
||||||
// check output. Prevents an enormous buffer
|
|
||||||
// from being captured
|
|
||||||
CheckBufSize = 4 * 1024 // 4KB
|
|
||||||
|
|
||||||
// UserAgent is the value of the User-Agent header
|
|
||||||
// for HTTP health checks.
|
|
||||||
UserAgent = "Consul Health Check"
|
|
||||||
)
|
|
||||||
|
|
||||||
// CheckNotifier interface is used by the CheckMonitor
|
|
||||||
// to notify when a check has a status update. The update
|
|
||||||
// should take care to be idempotent.
|
|
||||||
type CheckNotifier interface {
|
|
||||||
UpdateCheck(checkID types.CheckID, status, output string)
|
|
||||||
}
|
|
||||||
|
|
||||||
// CheckMonitor is used to periodically invoke a script to
|
|
||||||
// determine the health of a given check. It is compatible with
|
|
||||||
// nagios plugins and expects the output in the same format.
|
|
||||||
type CheckMonitor struct {
|
|
||||||
Notify CheckNotifier
|
|
||||||
CheckID types.CheckID
|
|
||||||
Script string
|
|
||||||
ScriptArgs []string
|
|
||||||
Interval time.Duration
|
|
||||||
Timeout time.Duration
|
|
||||||
Logger *log.Logger
|
|
||||||
|
|
||||||
stop bool
|
|
||||||
stopCh chan struct{}
|
|
||||||
stopLock sync.Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start is used to start a check monitor.
|
|
||||||
// Monitor runs until stop is called
|
|
||||||
func (c *CheckMonitor) Start() {
|
|
||||||
c.stopLock.Lock()
|
|
||||||
defer c.stopLock.Unlock()
|
|
||||||
c.stop = false
|
|
||||||
c.stopCh = make(chan struct{})
|
|
||||||
go c.run()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop is used to stop a check monitor.
|
|
||||||
func (c *CheckMonitor) Stop() {
|
|
||||||
c.stopLock.Lock()
|
|
||||||
defer c.stopLock.Unlock()
|
|
||||||
if !c.stop {
|
|
||||||
c.stop = true
|
|
||||||
close(c.stopCh)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// run is invoked by a goroutine to run until Stop() is called
|
|
||||||
func (c *CheckMonitor) run() {
|
|
||||||
// Get the randomized initial pause time
|
|
||||||
initialPauseTime := lib.RandomStagger(c.Interval)
|
|
||||||
next := time.After(initialPauseTime)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-next:
|
|
||||||
c.check()
|
|
||||||
next = time.After(c.Interval)
|
|
||||||
case <-c.stopCh:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// check is invoked periodically to perform the script check
|
|
||||||
func (c *CheckMonitor) check() {
|
|
||||||
// Create the command
|
|
||||||
var cmd *exec.Cmd
|
|
||||||
var err error
|
|
||||||
var cmdDisplay string
|
|
||||||
if len(c.ScriptArgs) > 0 {
|
|
||||||
cmdDisplay = fmt.Sprintf("%v", c.ScriptArgs)
|
|
||||||
cmd, err = ExecSubprocess(c.ScriptArgs)
|
|
||||||
} else {
|
|
||||||
cmdDisplay = c.Script
|
|
||||||
cmd, err = ExecScript(c.Script)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
c.Logger.Printf("[ERR] agent: failed to setup invoke '%s': %s", cmdDisplay, err)
|
|
||||||
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Collect the output
|
|
||||||
output, _ := circbuf.NewBuffer(CheckBufSize)
|
|
||||||
cmd.Stdout = output
|
|
||||||
cmd.Stderr = output
|
|
||||||
SetSysProcAttr(cmd)
|
|
||||||
|
|
||||||
truncateAndLogOutput := func() string {
|
|
||||||
outputStr := string(output.Bytes())
|
|
||||||
if output.TotalWritten() > output.Size() {
|
|
||||||
outputStr = fmt.Sprintf("Captured %d of %d bytes\n...\n%s",
|
|
||||||
output.Size(), output.TotalWritten(), outputStr)
|
|
||||||
}
|
|
||||||
c.Logger.Printf("[DEBUG] agent: Check '%s' script '%s' output: %s",
|
|
||||||
c.CheckID, cmdDisplay, outputStr)
|
|
||||||
return outputStr
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start the check
|
|
||||||
if err := cmd.Start(); err != nil {
|
|
||||||
c.Logger.Printf("[ERR] agent: failed to invoke '%s': %s", cmdDisplay, err)
|
|
||||||
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for the check to complete
|
|
||||||
waitCh := make(chan error, 1)
|
|
||||||
go func() {
|
|
||||||
waitCh <- cmd.Wait()
|
|
||||||
}()
|
|
||||||
|
|
||||||
timeout := 30 * time.Second
|
|
||||||
if c.Timeout > 0 {
|
|
||||||
timeout = c.Timeout
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case <-time.After(timeout):
|
|
||||||
if err := KillCommandSubtree(cmd); err != nil {
|
|
||||||
c.Logger.Printf("[WARN] Failed to kill check '%s' after timeout: %v", cmdDisplay, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
msg := fmt.Sprintf("Timed out (%s) running check", timeout.String())
|
|
||||||
c.Logger.Printf("[WARN] %s '%s'", msg, cmdDisplay)
|
|
||||||
|
|
||||||
outputStr := truncateAndLogOutput()
|
|
||||||
if len(outputStr) > 0 {
|
|
||||||
msg += "\n\n" + outputStr
|
|
||||||
}
|
|
||||||
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, msg)
|
|
||||||
|
|
||||||
// Now wait for the process to exit so we never start another
|
|
||||||
// instance concurrently.
|
|
||||||
<-waitCh
|
|
||||||
return
|
|
||||||
|
|
||||||
case err = <-waitCh:
|
|
||||||
// The process returned before the timeout, proceed normally
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if the check passed
|
|
||||||
outputStr := truncateAndLogOutput()
|
|
||||||
if err == nil {
|
|
||||||
c.Logger.Printf("[DEBUG] agent: Check '%v' is passing", c.CheckID)
|
|
||||||
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, outputStr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the exit code is 1, set check as warning
|
|
||||||
exitErr, ok := err.(*exec.ExitError)
|
|
||||||
if ok {
|
|
||||||
if status, ok := exitErr.Sys().(syscall.WaitStatus); ok {
|
|
||||||
code := status.ExitStatus()
|
|
||||||
if code == 1 {
|
|
||||||
c.Logger.Printf("[WARN] agent: Check '%v' is now warning", c.CheckID)
|
|
||||||
c.Notify.UpdateCheck(c.CheckID, api.HealthWarning, outputStr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set the health as critical
|
|
||||||
c.Logger.Printf("[WARN] agent: Check '%v' is now critical", c.CheckID)
|
|
||||||
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, outputStr)
|
|
||||||
}
|
|
||||||
|
|
||||||
// CheckTTL is used to apply a TTL to check status,
|
|
||||||
// and enables clients to set the status of a check
|
|
||||||
// but upon the TTL expiring, the check status is
|
|
||||||
// automatically set to critical.
|
|
||||||
type CheckTTL struct {
|
|
||||||
Notify CheckNotifier
|
|
||||||
CheckID types.CheckID
|
|
||||||
TTL time.Duration
|
|
||||||
Logger *log.Logger
|
|
||||||
|
|
||||||
timer *time.Timer
|
|
||||||
|
|
||||||
lastOutput string
|
|
||||||
lastOutputLock sync.RWMutex
|
|
||||||
|
|
||||||
stop bool
|
|
||||||
stopCh chan struct{}
|
|
||||||
stopLock sync.Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start is used to start a check ttl, runs until Stop()
|
|
||||||
func (c *CheckTTL) Start() {
|
|
||||||
c.stopLock.Lock()
|
|
||||||
defer c.stopLock.Unlock()
|
|
||||||
c.stop = false
|
|
||||||
c.stopCh = make(chan struct{})
|
|
||||||
c.timer = time.NewTimer(c.TTL)
|
|
||||||
go c.run()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop is used to stop a check ttl.
|
|
||||||
func (c *CheckTTL) Stop() {
|
|
||||||
c.stopLock.Lock()
|
|
||||||
defer c.stopLock.Unlock()
|
|
||||||
if !c.stop {
|
|
||||||
c.timer.Stop()
|
|
||||||
c.stop = true
|
|
||||||
close(c.stopCh)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// run is used to handle TTL expiration and to update the check status
|
|
||||||
func (c *CheckTTL) run() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-c.timer.C:
|
|
||||||
c.Logger.Printf("[WARN] agent: Check '%v' missed TTL, is now critical",
|
|
||||||
c.CheckID)
|
|
||||||
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, c.getExpiredOutput())
|
|
||||||
|
|
||||||
case <-c.stopCh:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// getExpiredOutput formats the output for the case when the TTL is expired.
|
|
||||||
func (c *CheckTTL) getExpiredOutput() string {
|
|
||||||
c.lastOutputLock.RLock()
|
|
||||||
defer c.lastOutputLock.RUnlock()
|
|
||||||
|
|
||||||
const prefix = "TTL expired"
|
|
||||||
if c.lastOutput == "" {
|
|
||||||
return prefix
|
|
||||||
}
|
|
||||||
|
|
||||||
return fmt.Sprintf("%s (last output before timeout follows): %s", prefix, c.lastOutput)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetStatus is used to update the status of the check,
|
|
||||||
// and to renew the TTL. If expired, TTL is restarted.
|
|
||||||
func (c *CheckTTL) SetStatus(status, output string) {
|
|
||||||
c.Logger.Printf("[DEBUG] agent: Check '%v' status is now %v",
|
|
||||||
c.CheckID, status)
|
|
||||||
c.Notify.UpdateCheck(c.CheckID, status, output)
|
|
||||||
|
|
||||||
// Store the last output so we can retain it if the TTL expires.
|
|
||||||
c.lastOutputLock.Lock()
|
|
||||||
c.lastOutput = output
|
|
||||||
c.lastOutputLock.Unlock()
|
|
||||||
|
|
||||||
c.timer.Reset(c.TTL)
|
|
||||||
}
|
|
||||||
|
|
||||||
// persistedCheck is used to serialize a check and write it to disk
|
// persistedCheck is used to serialize a check and write it to disk
|
||||||
// so that it may be restored later on.
|
// so that it may be restored later on.
|
||||||
type persistedCheck struct {
|
type persistedCheck struct {
|
||||||
|
@ -303,363 +23,3 @@ type persistedCheckState struct {
|
||||||
Status string
|
Status string
|
||||||
Expires int64
|
Expires int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// CheckHTTP is used to periodically make an HTTP request to
|
|
||||||
// determine the health of a given check.
|
|
||||||
// The check is passing if the response code is 2XX.
|
|
||||||
// The check is warning if the response code is 429.
|
|
||||||
// The check is critical if the response code is anything else
|
|
||||||
// or if the request returns an error
|
|
||||||
type CheckHTTP struct {
|
|
||||||
Notify CheckNotifier
|
|
||||||
CheckID types.CheckID
|
|
||||||
HTTP string
|
|
||||||
Header map[string][]string
|
|
||||||
Method string
|
|
||||||
Interval time.Duration
|
|
||||||
Timeout time.Duration
|
|
||||||
Logger *log.Logger
|
|
||||||
TLSSkipVerify bool
|
|
||||||
|
|
||||||
httpClient *http.Client
|
|
||||||
stop bool
|
|
||||||
stopCh chan struct{}
|
|
||||||
stopLock sync.Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start is used to start an HTTP check.
|
|
||||||
// The check runs until stop is called
|
|
||||||
func (c *CheckHTTP) Start() {
|
|
||||||
c.stopLock.Lock()
|
|
||||||
defer c.stopLock.Unlock()
|
|
||||||
|
|
||||||
if c.httpClient == nil {
|
|
||||||
// Create the transport. We disable HTTP Keep-Alive's to prevent
|
|
||||||
// failing checks due to the keepalive interval.
|
|
||||||
trans := cleanhttp.DefaultTransport()
|
|
||||||
trans.DisableKeepAlives = true
|
|
||||||
|
|
||||||
// Skip SSL certificate verification if TLSSkipVerify is true
|
|
||||||
if trans.TLSClientConfig == nil {
|
|
||||||
trans.TLSClientConfig = &tls.Config{
|
|
||||||
InsecureSkipVerify: c.TLSSkipVerify,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
trans.TLSClientConfig.InsecureSkipVerify = c.TLSSkipVerify
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create the HTTP client.
|
|
||||||
c.httpClient = &http.Client{
|
|
||||||
Timeout: 10 * time.Second,
|
|
||||||
Transport: trans,
|
|
||||||
}
|
|
||||||
|
|
||||||
// For long (>10s) interval checks the http timeout is 10s, otherwise the
|
|
||||||
// timeout is the interval. This means that a check *should* return
|
|
||||||
// before the next check begins.
|
|
||||||
if c.Timeout > 0 && c.Timeout < c.Interval {
|
|
||||||
c.httpClient.Timeout = c.Timeout
|
|
||||||
} else if c.Interval < 10*time.Second {
|
|
||||||
c.httpClient.Timeout = c.Interval
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
c.stop = false
|
|
||||||
c.stopCh = make(chan struct{})
|
|
||||||
go c.run()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop is used to stop an HTTP check.
|
|
||||||
func (c *CheckHTTP) Stop() {
|
|
||||||
c.stopLock.Lock()
|
|
||||||
defer c.stopLock.Unlock()
|
|
||||||
if !c.stop {
|
|
||||||
c.stop = true
|
|
||||||
close(c.stopCh)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// run is invoked by a goroutine to run until Stop() is called
|
|
||||||
func (c *CheckHTTP) run() {
|
|
||||||
// Get the randomized initial pause time
|
|
||||||
initialPauseTime := lib.RandomStagger(c.Interval)
|
|
||||||
c.Logger.Printf("[DEBUG] agent: pausing %v before first HTTP request of %s", initialPauseTime, c.HTTP)
|
|
||||||
next := time.After(initialPauseTime)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-next:
|
|
||||||
c.check()
|
|
||||||
next = time.After(c.Interval)
|
|
||||||
case <-c.stopCh:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// check is invoked periodically to perform the HTTP check
|
|
||||||
func (c *CheckHTTP) check() {
|
|
||||||
method := c.Method
|
|
||||||
if method == "" {
|
|
||||||
method = "GET"
|
|
||||||
}
|
|
||||||
|
|
||||||
req, err := http.NewRequest(method, c.HTTP, nil)
|
|
||||||
if err != nil {
|
|
||||||
c.Logger.Printf("[WARN] agent: http request failed '%s': %s", c.HTTP, err)
|
|
||||||
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
req.Header = http.Header(c.Header)
|
|
||||||
|
|
||||||
// this happens during testing but not in prod
|
|
||||||
if req.Header == nil {
|
|
||||||
req.Header = make(http.Header)
|
|
||||||
}
|
|
||||||
|
|
||||||
if host := req.Header.Get("Host"); host != "" {
|
|
||||||
req.Host = host
|
|
||||||
}
|
|
||||||
|
|
||||||
if req.Header.Get("User-Agent") == "" {
|
|
||||||
req.Header.Set("User-Agent", UserAgent)
|
|
||||||
}
|
|
||||||
if req.Header.Get("Accept") == "" {
|
|
||||||
req.Header.Set("Accept", "text/plain, text/*, */*")
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := c.httpClient.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
c.Logger.Printf("[WARN] agent: http request failed '%s': %s", c.HTTP, err)
|
|
||||||
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
// Read the response into a circular buffer to limit the size
|
|
||||||
output, _ := circbuf.NewBuffer(CheckBufSize)
|
|
||||||
if _, err := io.Copy(output, resp.Body); err != nil {
|
|
||||||
c.Logger.Printf("[WARN] agent: Check '%v': Get error while reading body: %s", c.CheckID, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Format the response body
|
|
||||||
result := fmt.Sprintf("HTTP GET %s: %s Output: %s", c.HTTP, resp.Status, output.String())
|
|
||||||
|
|
||||||
if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
|
|
||||||
// PASSING (2xx)
|
|
||||||
c.Logger.Printf("[DEBUG] agent: Check '%v' is passing", c.CheckID)
|
|
||||||
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, result)
|
|
||||||
|
|
||||||
} else if resp.StatusCode == 429 {
|
|
||||||
// WARNING
|
|
||||||
// 429 Too Many Requests (RFC 6585)
|
|
||||||
// The user has sent too many requests in a given amount of time.
|
|
||||||
c.Logger.Printf("[WARN] agent: Check '%v' is now warning", c.CheckID)
|
|
||||||
c.Notify.UpdateCheck(c.CheckID, api.HealthWarning, result)
|
|
||||||
|
|
||||||
} else {
|
|
||||||
// CRITICAL
|
|
||||||
c.Logger.Printf("[WARN] agent: Check '%v' is now critical", c.CheckID)
|
|
||||||
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, result)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// CheckTCP is used to periodically make an TCP/UDP connection to
|
|
||||||
// determine the health of a given check.
|
|
||||||
// The check is passing if the connection succeeds
|
|
||||||
// The check is critical if the connection returns an error
|
|
||||||
type CheckTCP struct {
|
|
||||||
Notify CheckNotifier
|
|
||||||
CheckID types.CheckID
|
|
||||||
TCP string
|
|
||||||
Interval time.Duration
|
|
||||||
Timeout time.Duration
|
|
||||||
Logger *log.Logger
|
|
||||||
|
|
||||||
dialer *net.Dialer
|
|
||||||
stop bool
|
|
||||||
stopCh chan struct{}
|
|
||||||
stopLock sync.Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start is used to start a TCP check.
|
|
||||||
// The check runs until stop is called
|
|
||||||
func (c *CheckTCP) Start() {
|
|
||||||
c.stopLock.Lock()
|
|
||||||
defer c.stopLock.Unlock()
|
|
||||||
|
|
||||||
if c.dialer == nil {
|
|
||||||
// Create the socket dialer
|
|
||||||
c.dialer = &net.Dialer{DualStack: true}
|
|
||||||
|
|
||||||
// For long (>10s) interval checks the socket timeout is 10s, otherwise
|
|
||||||
// the timeout is the interval. This means that a check *should* return
|
|
||||||
// before the next check begins.
|
|
||||||
if c.Timeout > 0 && c.Timeout < c.Interval {
|
|
||||||
c.dialer.Timeout = c.Timeout
|
|
||||||
} else if c.Interval < 10*time.Second {
|
|
||||||
c.dialer.Timeout = c.Interval
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
c.stop = false
|
|
||||||
c.stopCh = make(chan struct{})
|
|
||||||
go c.run()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop is used to stop a TCP check.
|
|
||||||
func (c *CheckTCP) Stop() {
|
|
||||||
c.stopLock.Lock()
|
|
||||||
defer c.stopLock.Unlock()
|
|
||||||
if !c.stop {
|
|
||||||
c.stop = true
|
|
||||||
close(c.stopCh)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// run is invoked by a goroutine to run until Stop() is called
|
|
||||||
func (c *CheckTCP) run() {
|
|
||||||
// Get the randomized initial pause time
|
|
||||||
initialPauseTime := lib.RandomStagger(c.Interval)
|
|
||||||
c.Logger.Printf("[DEBUG] agent: pausing %v before first socket connection of %s", initialPauseTime, c.TCP)
|
|
||||||
next := time.After(initialPauseTime)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-next:
|
|
||||||
c.check()
|
|
||||||
next = time.After(c.Interval)
|
|
||||||
case <-c.stopCh:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// check is invoked periodically to perform the TCP check
|
|
||||||
func (c *CheckTCP) check() {
|
|
||||||
conn, err := c.dialer.Dial(`tcp`, c.TCP)
|
|
||||||
if err != nil {
|
|
||||||
c.Logger.Printf("[WARN] agent: socket connection failed '%s': %s", c.TCP, err)
|
|
||||||
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
conn.Close()
|
|
||||||
c.Logger.Printf("[DEBUG] agent: Check '%v' is passing", c.CheckID)
|
|
||||||
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("TCP connect %s: Success", c.TCP))
|
|
||||||
}
|
|
||||||
|
|
||||||
// CheckDocker is used to periodically invoke a script to
|
|
||||||
// determine the health of an application running inside a
|
|
||||||
// Docker Container. We assume that the script is compatible
|
|
||||||
// with nagios plugins and expects the output in the same format.
|
|
||||||
type CheckDocker struct {
|
|
||||||
Notify CheckNotifier
|
|
||||||
CheckID types.CheckID
|
|
||||||
Script string
|
|
||||||
ScriptArgs []string
|
|
||||||
DockerContainerID string
|
|
||||||
Shell string
|
|
||||||
Interval time.Duration
|
|
||||||
Logger *log.Logger
|
|
||||||
|
|
||||||
client *DockerClient
|
|
||||||
stop chan struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *CheckDocker) Start() {
|
|
||||||
if c.stop != nil {
|
|
||||||
panic("Docker check already started")
|
|
||||||
}
|
|
||||||
|
|
||||||
if c.Logger == nil {
|
|
||||||
c.Logger = log.New(ioutil.Discard, "", 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
if c.Shell == "" {
|
|
||||||
c.Shell = os.Getenv("SHELL")
|
|
||||||
if c.Shell == "" {
|
|
||||||
c.Shell = "/bin/sh"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
c.stop = make(chan struct{})
|
|
||||||
go c.run()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *CheckDocker) Stop() {
|
|
||||||
if c.stop == nil {
|
|
||||||
panic("Stop called before start")
|
|
||||||
}
|
|
||||||
close(c.stop)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *CheckDocker) run() {
|
|
||||||
firstWait := lib.RandomStagger(c.Interval)
|
|
||||||
next := time.After(firstWait)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-next:
|
|
||||||
c.check()
|
|
||||||
next = time.After(c.Interval)
|
|
||||||
case <-c.stop:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *CheckDocker) check() {
|
|
||||||
var out string
|
|
||||||
status, b, err := c.doCheck()
|
|
||||||
if err != nil {
|
|
||||||
c.Logger.Printf("[DEBUG] agent: Check '%s': %s", c.CheckID, err)
|
|
||||||
out = err.Error()
|
|
||||||
} else {
|
|
||||||
// out is already limited to CheckBufSize since we're getting a
|
|
||||||
// limited buffer. So we don't need to truncate it just report
|
|
||||||
// that it was truncated.
|
|
||||||
out = string(b.Bytes())
|
|
||||||
if int(b.TotalWritten()) > len(out) {
|
|
||||||
out = fmt.Sprintf("Captured %d of %d bytes\n...\n%s", len(out), b.TotalWritten(), out)
|
|
||||||
}
|
|
||||||
c.Logger.Printf("[DEBUG] agent: Check '%s' script '%s' output: %s", c.CheckID, c.Script, out)
|
|
||||||
}
|
|
||||||
|
|
||||||
if status == api.HealthCritical {
|
|
||||||
c.Logger.Printf("[WARN] agent: Check '%v' is now critical", c.CheckID)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.Notify.UpdateCheck(c.CheckID, status, out)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *CheckDocker) doCheck() (string, *circbuf.Buffer, error) {
|
|
||||||
var cmd []string
|
|
||||||
if len(c.ScriptArgs) > 0 {
|
|
||||||
cmd = c.ScriptArgs
|
|
||||||
} else {
|
|
||||||
cmd = []string{c.Shell, "-c", c.Script}
|
|
||||||
}
|
|
||||||
|
|
||||||
execID, err := c.client.CreateExec(c.DockerContainerID, cmd)
|
|
||||||
if err != nil {
|
|
||||||
return api.HealthCritical, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
buf, err := c.client.StartExec(c.DockerContainerID, execID)
|
|
||||||
if err != nil {
|
|
||||||
return api.HealthCritical, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
exitCode, err := c.client.InspectExec(c.DockerContainerID, execID)
|
|
||||||
if err != nil {
|
|
||||||
return api.HealthCritical, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
switch exitCode {
|
|
||||||
case 0:
|
|
||||||
return api.HealthPassing, buf, nil
|
|
||||||
case 1:
|
|
||||||
c.Logger.Printf("[DEBUG] Check failed with exit code: %d", exitCode)
|
|
||||||
return api.HealthWarning, buf, nil
|
|
||||||
default:
|
|
||||||
c.Logger.Printf("[DEBUG] Check failed with exit code: %d", exitCode)
|
|
||||||
return api.HealthCritical, buf, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,646 @@
|
||||||
|
package checks
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/tls"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
osexec "os/exec"
|
||||||
|
"sync"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/armon/circbuf"
|
||||||
|
"github.com/hashicorp/consul/agent/exec"
|
||||||
|
"github.com/hashicorp/consul/api"
|
||||||
|
"github.com/hashicorp/consul/lib"
|
||||||
|
"github.com/hashicorp/consul/types"
|
||||||
|
"github.com/hashicorp/go-cleanhttp"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// MinInterval is the minimal interval between
|
||||||
|
// two checks. Do not allow for a interval below this value.
|
||||||
|
// Otherwise we risk fork bombing a system.
|
||||||
|
MinInterval = time.Second
|
||||||
|
|
||||||
|
// BufSize is the maximum size of the captured
|
||||||
|
// check output. Prevents an enormous buffer
|
||||||
|
// from being captured
|
||||||
|
BufSize = 4 * 1024 // 4KB
|
||||||
|
|
||||||
|
// UserAgent is the value of the User-Agent header
|
||||||
|
// for HTTP health checks.
|
||||||
|
UserAgent = "Consul Health Check"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CheckNotifier interface is used by the CheckMonitor
|
||||||
|
// to notify when a check has a status update. The update
|
||||||
|
// should take care to be idempotent.
|
||||||
|
type CheckNotifier interface {
|
||||||
|
UpdateCheck(checkID types.CheckID, status, output string)
|
||||||
|
}
|
||||||
|
|
||||||
|
// CheckMonitor is used to periodically invoke a script to
|
||||||
|
// determine the health of a given check. It is compatible with
|
||||||
|
// nagios plugins and expects the output in the same format.
|
||||||
|
type CheckMonitor struct {
|
||||||
|
Notify CheckNotifier
|
||||||
|
CheckID types.CheckID
|
||||||
|
Script string
|
||||||
|
ScriptArgs []string
|
||||||
|
Interval time.Duration
|
||||||
|
Timeout time.Duration
|
||||||
|
Logger *log.Logger
|
||||||
|
|
||||||
|
stop bool
|
||||||
|
stopCh chan struct{}
|
||||||
|
stopLock sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start is used to start a check monitor.
|
||||||
|
// Monitor runs until stop is called
|
||||||
|
func (c *CheckMonitor) Start() {
|
||||||
|
c.stopLock.Lock()
|
||||||
|
defer c.stopLock.Unlock()
|
||||||
|
c.stop = false
|
||||||
|
c.stopCh = make(chan struct{})
|
||||||
|
go c.run()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop is used to stop a check monitor.
|
||||||
|
func (c *CheckMonitor) Stop() {
|
||||||
|
c.stopLock.Lock()
|
||||||
|
defer c.stopLock.Unlock()
|
||||||
|
if !c.stop {
|
||||||
|
c.stop = true
|
||||||
|
close(c.stopCh)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// run is invoked by a goroutine to run until Stop() is called
|
||||||
|
func (c *CheckMonitor) run() {
|
||||||
|
// Get the randomized initial pause time
|
||||||
|
initialPauseTime := lib.RandomStagger(c.Interval)
|
||||||
|
next := time.After(initialPauseTime)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-next:
|
||||||
|
c.check()
|
||||||
|
next = time.After(c.Interval)
|
||||||
|
case <-c.stopCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check is invoked periodically to perform the script check
|
||||||
|
func (c *CheckMonitor) check() {
|
||||||
|
// Create the command
|
||||||
|
var cmd *osexec.Cmd
|
||||||
|
var err error
|
||||||
|
var cmdDisplay string
|
||||||
|
if len(c.ScriptArgs) > 0 {
|
||||||
|
cmdDisplay = fmt.Sprintf("%v", c.ScriptArgs)
|
||||||
|
cmd, err = exec.Subprocess(c.ScriptArgs)
|
||||||
|
} else {
|
||||||
|
cmdDisplay = c.Script
|
||||||
|
cmd, err = exec.Script(c.Script)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
c.Logger.Printf("[ERR] agent: failed to setup invoke '%s': %s", cmdDisplay, err)
|
||||||
|
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect the output
|
||||||
|
output, _ := circbuf.NewBuffer(BufSize)
|
||||||
|
cmd.Stdout = output
|
||||||
|
cmd.Stderr = output
|
||||||
|
exec.SetSysProcAttr(cmd)
|
||||||
|
|
||||||
|
truncateAndLogOutput := func() string {
|
||||||
|
outputStr := string(output.Bytes())
|
||||||
|
if output.TotalWritten() > output.Size() {
|
||||||
|
outputStr = fmt.Sprintf("Captured %d of %d bytes\n...\n%s",
|
||||||
|
output.Size(), output.TotalWritten(), outputStr)
|
||||||
|
}
|
||||||
|
c.Logger.Printf("[DEBUG] agent: Check '%s' script '%s' output: %s",
|
||||||
|
c.CheckID, cmdDisplay, outputStr)
|
||||||
|
return outputStr
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the check
|
||||||
|
if err := cmd.Start(); err != nil {
|
||||||
|
c.Logger.Printf("[ERR] agent: failed to invoke '%s': %s", cmdDisplay, err)
|
||||||
|
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for the check to complete
|
||||||
|
waitCh := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
waitCh <- cmd.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
|
timeout := 30 * time.Second
|
||||||
|
if c.Timeout > 0 {
|
||||||
|
timeout = c.Timeout
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-time.After(timeout):
|
||||||
|
if err := exec.KillCommandSubtree(cmd); err != nil {
|
||||||
|
c.Logger.Printf("[WARN] Failed to kill check '%s' after timeout: %v", cmdDisplay, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
msg := fmt.Sprintf("Timed out (%s) running check", timeout.String())
|
||||||
|
c.Logger.Printf("[WARN] %s '%s'", msg, cmdDisplay)
|
||||||
|
|
||||||
|
outputStr := truncateAndLogOutput()
|
||||||
|
if len(outputStr) > 0 {
|
||||||
|
msg += "\n\n" + outputStr
|
||||||
|
}
|
||||||
|
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, msg)
|
||||||
|
|
||||||
|
// Now wait for the process to exit so we never start another
|
||||||
|
// instance concurrently.
|
||||||
|
<-waitCh
|
||||||
|
return
|
||||||
|
|
||||||
|
case err = <-waitCh:
|
||||||
|
// The process returned before the timeout, proceed normally
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the check passed
|
||||||
|
outputStr := truncateAndLogOutput()
|
||||||
|
if err == nil {
|
||||||
|
c.Logger.Printf("[DEBUG] agent: Check '%v' is passing", c.CheckID)
|
||||||
|
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, outputStr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the exit code is 1, set check as warning
|
||||||
|
exitErr, ok := err.(*osexec.ExitError)
|
||||||
|
if ok {
|
||||||
|
if status, ok := exitErr.Sys().(syscall.WaitStatus); ok {
|
||||||
|
code := status.ExitStatus()
|
||||||
|
if code == 1 {
|
||||||
|
c.Logger.Printf("[WARN] agent: Check '%v' is now warning", c.CheckID)
|
||||||
|
c.Notify.UpdateCheck(c.CheckID, api.HealthWarning, outputStr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the health as critical
|
||||||
|
c.Logger.Printf("[WARN] agent: Check '%v' is now critical", c.CheckID)
|
||||||
|
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, outputStr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// CheckTTL is used to apply a TTL to check status,
|
||||||
|
// and enables clients to set the status of a check
|
||||||
|
// but upon the TTL expiring, the check status is
|
||||||
|
// automatically set to critical.
|
||||||
|
type CheckTTL struct {
|
||||||
|
Notify CheckNotifier
|
||||||
|
CheckID types.CheckID
|
||||||
|
TTL time.Duration
|
||||||
|
Logger *log.Logger
|
||||||
|
|
||||||
|
timer *time.Timer
|
||||||
|
|
||||||
|
lastOutput string
|
||||||
|
lastOutputLock sync.RWMutex
|
||||||
|
|
||||||
|
stop bool
|
||||||
|
stopCh chan struct{}
|
||||||
|
stopLock sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start is used to start a check ttl, runs until Stop()
|
||||||
|
func (c *CheckTTL) Start() {
|
||||||
|
c.stopLock.Lock()
|
||||||
|
defer c.stopLock.Unlock()
|
||||||
|
c.stop = false
|
||||||
|
c.stopCh = make(chan struct{})
|
||||||
|
c.timer = time.NewTimer(c.TTL)
|
||||||
|
go c.run()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop is used to stop a check ttl.
|
||||||
|
func (c *CheckTTL) Stop() {
|
||||||
|
c.stopLock.Lock()
|
||||||
|
defer c.stopLock.Unlock()
|
||||||
|
if !c.stop {
|
||||||
|
c.timer.Stop()
|
||||||
|
c.stop = true
|
||||||
|
close(c.stopCh)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// run is used to handle TTL expiration and to update the check status
|
||||||
|
func (c *CheckTTL) run() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-c.timer.C:
|
||||||
|
c.Logger.Printf("[WARN] agent: Check '%v' missed TTL, is now critical",
|
||||||
|
c.CheckID)
|
||||||
|
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, c.getExpiredOutput())
|
||||||
|
|
||||||
|
case <-c.stopCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// getExpiredOutput formats the output for the case when the TTL is expired.
|
||||||
|
func (c *CheckTTL) getExpiredOutput() string {
|
||||||
|
c.lastOutputLock.RLock()
|
||||||
|
defer c.lastOutputLock.RUnlock()
|
||||||
|
|
||||||
|
const prefix = "TTL expired"
|
||||||
|
if c.lastOutput == "" {
|
||||||
|
return prefix
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Sprintf("%s (last output before timeout follows): %s", prefix, c.lastOutput)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetStatus is used to update the status of the check,
|
||||||
|
// and to renew the TTL. If expired, TTL is restarted.
|
||||||
|
func (c *CheckTTL) SetStatus(status, output string) {
|
||||||
|
c.Logger.Printf("[DEBUG] agent: Check '%v' status is now %v",
|
||||||
|
c.CheckID, status)
|
||||||
|
c.Notify.UpdateCheck(c.CheckID, status, output)
|
||||||
|
|
||||||
|
// Store the last output so we can retain it if the TTL expires.
|
||||||
|
c.lastOutputLock.Lock()
|
||||||
|
c.lastOutput = output
|
||||||
|
c.lastOutputLock.Unlock()
|
||||||
|
|
||||||
|
c.timer.Reset(c.TTL)
|
||||||
|
}
|
||||||
|
|
||||||
|
// CheckHTTP is used to periodically make an HTTP request to
|
||||||
|
// determine the health of a given check.
|
||||||
|
// The check is passing if the response code is 2XX.
|
||||||
|
// The check is warning if the response code is 429.
|
||||||
|
// The check is critical if the response code is anything else
|
||||||
|
// or if the request returns an error
|
||||||
|
type CheckHTTP struct {
|
||||||
|
Notify CheckNotifier
|
||||||
|
CheckID types.CheckID
|
||||||
|
HTTP string
|
||||||
|
Header map[string][]string
|
||||||
|
Method string
|
||||||
|
Interval time.Duration
|
||||||
|
Timeout time.Duration
|
||||||
|
Logger *log.Logger
|
||||||
|
TLSSkipVerify bool
|
||||||
|
|
||||||
|
httpClient *http.Client
|
||||||
|
stop bool
|
||||||
|
stopCh chan struct{}
|
||||||
|
stopLock sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start is used to start an HTTP check.
|
||||||
|
// The check runs until stop is called
|
||||||
|
func (c *CheckHTTP) Start() {
|
||||||
|
c.stopLock.Lock()
|
||||||
|
defer c.stopLock.Unlock()
|
||||||
|
|
||||||
|
if c.httpClient == nil {
|
||||||
|
// Create the transport. We disable HTTP Keep-Alive's to prevent
|
||||||
|
// failing checks due to the keepalive interval.
|
||||||
|
trans := cleanhttp.DefaultTransport()
|
||||||
|
trans.DisableKeepAlives = true
|
||||||
|
|
||||||
|
// Skip SSL certificate verification if TLSSkipVerify is true
|
||||||
|
if trans.TLSClientConfig == nil {
|
||||||
|
trans.TLSClientConfig = &tls.Config{
|
||||||
|
InsecureSkipVerify: c.TLSSkipVerify,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
trans.TLSClientConfig.InsecureSkipVerify = c.TLSSkipVerify
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the HTTP client.
|
||||||
|
c.httpClient = &http.Client{
|
||||||
|
Timeout: 10 * time.Second,
|
||||||
|
Transport: trans,
|
||||||
|
}
|
||||||
|
|
||||||
|
// For long (>10s) interval checks the http timeout is 10s, otherwise the
|
||||||
|
// timeout is the interval. This means that a check *should* return
|
||||||
|
// before the next check begins.
|
||||||
|
if c.Timeout > 0 && c.Timeout < c.Interval {
|
||||||
|
c.httpClient.Timeout = c.Timeout
|
||||||
|
} else if c.Interval < 10*time.Second {
|
||||||
|
c.httpClient.Timeout = c.Interval
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
c.stop = false
|
||||||
|
c.stopCh = make(chan struct{})
|
||||||
|
go c.run()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop is used to stop an HTTP check.
|
||||||
|
func (c *CheckHTTP) Stop() {
|
||||||
|
c.stopLock.Lock()
|
||||||
|
defer c.stopLock.Unlock()
|
||||||
|
if !c.stop {
|
||||||
|
c.stop = true
|
||||||
|
close(c.stopCh)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// run is invoked by a goroutine to run until Stop() is called
|
||||||
|
func (c *CheckHTTP) run() {
|
||||||
|
// Get the randomized initial pause time
|
||||||
|
initialPauseTime := lib.RandomStagger(c.Interval)
|
||||||
|
c.Logger.Printf("[DEBUG] agent: pausing %v before first HTTP request of %s", initialPauseTime, c.HTTP)
|
||||||
|
next := time.After(initialPauseTime)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-next:
|
||||||
|
c.check()
|
||||||
|
next = time.After(c.Interval)
|
||||||
|
case <-c.stopCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check is invoked periodically to perform the HTTP check
|
||||||
|
func (c *CheckHTTP) check() {
|
||||||
|
method := c.Method
|
||||||
|
if method == "" {
|
||||||
|
method = "GET"
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequest(method, c.HTTP, nil)
|
||||||
|
if err != nil {
|
||||||
|
c.Logger.Printf("[WARN] agent: http request failed '%s': %s", c.HTTP, err)
|
||||||
|
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Header = http.Header(c.Header)
|
||||||
|
|
||||||
|
// this happens during testing but not in prod
|
||||||
|
if req.Header == nil {
|
||||||
|
req.Header = make(http.Header)
|
||||||
|
}
|
||||||
|
|
||||||
|
if host := req.Header.Get("Host"); host != "" {
|
||||||
|
req.Host = host
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.Header.Get("User-Agent") == "" {
|
||||||
|
req.Header.Set("User-Agent", UserAgent)
|
||||||
|
}
|
||||||
|
if req.Header.Get("Accept") == "" {
|
||||||
|
req.Header.Set("Accept", "text/plain, text/*, */*")
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
c.Logger.Printf("[WARN] agent: http request failed '%s': %s", c.HTTP, err)
|
||||||
|
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
// Read the response into a circular buffer to limit the size
|
||||||
|
output, _ := circbuf.NewBuffer(BufSize)
|
||||||
|
if _, err := io.Copy(output, resp.Body); err != nil {
|
||||||
|
c.Logger.Printf("[WARN] agent: Check '%v': Get error while reading body: %s", c.CheckID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Format the response body
|
||||||
|
result := fmt.Sprintf("HTTP GET %s: %s Output: %s", c.HTTP, resp.Status, output.String())
|
||||||
|
|
||||||
|
if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
|
||||||
|
// PASSING (2xx)
|
||||||
|
c.Logger.Printf("[DEBUG] agent: Check '%v' is passing", c.CheckID)
|
||||||
|
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, result)
|
||||||
|
|
||||||
|
} else if resp.StatusCode == 429 {
|
||||||
|
// WARNING
|
||||||
|
// 429 Too Many Requests (RFC 6585)
|
||||||
|
// The user has sent too many requests in a given amount of time.
|
||||||
|
c.Logger.Printf("[WARN] agent: Check '%v' is now warning", c.CheckID)
|
||||||
|
c.Notify.UpdateCheck(c.CheckID, api.HealthWarning, result)
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// CRITICAL
|
||||||
|
c.Logger.Printf("[WARN] agent: Check '%v' is now critical", c.CheckID)
|
||||||
|
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CheckTCP is used to periodically make an TCP/UDP connection to
|
||||||
|
// determine the health of a given check.
|
||||||
|
// The check is passing if the connection succeeds
|
||||||
|
// The check is critical if the connection returns an error
|
||||||
|
type CheckTCP struct {
|
||||||
|
Notify CheckNotifier
|
||||||
|
CheckID types.CheckID
|
||||||
|
TCP string
|
||||||
|
Interval time.Duration
|
||||||
|
Timeout time.Duration
|
||||||
|
Logger *log.Logger
|
||||||
|
|
||||||
|
dialer *net.Dialer
|
||||||
|
stop bool
|
||||||
|
stopCh chan struct{}
|
||||||
|
stopLock sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start is used to start a TCP check.
|
||||||
|
// The check runs until stop is called
|
||||||
|
func (c *CheckTCP) Start() {
|
||||||
|
c.stopLock.Lock()
|
||||||
|
defer c.stopLock.Unlock()
|
||||||
|
|
||||||
|
if c.dialer == nil {
|
||||||
|
// Create the socket dialer
|
||||||
|
c.dialer = &net.Dialer{DualStack: true}
|
||||||
|
|
||||||
|
// For long (>10s) interval checks the socket timeout is 10s, otherwise
|
||||||
|
// the timeout is the interval. This means that a check *should* return
|
||||||
|
// before the next check begins.
|
||||||
|
if c.Timeout > 0 && c.Timeout < c.Interval {
|
||||||
|
c.dialer.Timeout = c.Timeout
|
||||||
|
} else if c.Interval < 10*time.Second {
|
||||||
|
c.dialer.Timeout = c.Interval
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
c.stop = false
|
||||||
|
c.stopCh = make(chan struct{})
|
||||||
|
go c.run()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop is used to stop a TCP check.
|
||||||
|
func (c *CheckTCP) Stop() {
|
||||||
|
c.stopLock.Lock()
|
||||||
|
defer c.stopLock.Unlock()
|
||||||
|
if !c.stop {
|
||||||
|
c.stop = true
|
||||||
|
close(c.stopCh)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// run is invoked by a goroutine to run until Stop() is called
|
||||||
|
func (c *CheckTCP) run() {
|
||||||
|
// Get the randomized initial pause time
|
||||||
|
initialPauseTime := lib.RandomStagger(c.Interval)
|
||||||
|
c.Logger.Printf("[DEBUG] agent: pausing %v before first socket connection of %s", initialPauseTime, c.TCP)
|
||||||
|
next := time.After(initialPauseTime)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-next:
|
||||||
|
c.check()
|
||||||
|
next = time.After(c.Interval)
|
||||||
|
case <-c.stopCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check is invoked periodically to perform the TCP check
|
||||||
|
func (c *CheckTCP) check() {
|
||||||
|
conn, err := c.dialer.Dial(`tcp`, c.TCP)
|
||||||
|
if err != nil {
|
||||||
|
c.Logger.Printf("[WARN] agent: socket connection failed '%s': %s", c.TCP, err)
|
||||||
|
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
conn.Close()
|
||||||
|
c.Logger.Printf("[DEBUG] agent: Check '%v' is passing", c.CheckID)
|
||||||
|
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("TCP connect %s: Success", c.TCP))
|
||||||
|
}
|
||||||
|
|
||||||
|
// CheckDocker is used to periodically invoke a script to
|
||||||
|
// determine the health of an application running inside a
|
||||||
|
// Docker Container. We assume that the script is compatible
|
||||||
|
// with nagios plugins and expects the output in the same format.
|
||||||
|
type CheckDocker struct {
|
||||||
|
Notify CheckNotifier
|
||||||
|
CheckID types.CheckID
|
||||||
|
Script string
|
||||||
|
ScriptArgs []string
|
||||||
|
DockerContainerID string
|
||||||
|
Shell string
|
||||||
|
Interval time.Duration
|
||||||
|
Logger *log.Logger
|
||||||
|
Client *DockerClient
|
||||||
|
|
||||||
|
stop chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CheckDocker) Start() {
|
||||||
|
if c.stop != nil {
|
||||||
|
panic("Docker check already started")
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.Logger == nil {
|
||||||
|
c.Logger = log.New(ioutil.Discard, "", 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.Shell == "" {
|
||||||
|
c.Shell = os.Getenv("SHELL")
|
||||||
|
if c.Shell == "" {
|
||||||
|
c.Shell = "/bin/sh"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c.stop = make(chan struct{})
|
||||||
|
go c.run()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CheckDocker) Stop() {
|
||||||
|
if c.stop == nil {
|
||||||
|
panic("Stop called before start")
|
||||||
|
}
|
||||||
|
close(c.stop)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CheckDocker) run() {
|
||||||
|
firstWait := lib.RandomStagger(c.Interval)
|
||||||
|
next := time.After(firstWait)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-next:
|
||||||
|
c.check()
|
||||||
|
next = time.After(c.Interval)
|
||||||
|
case <-c.stop:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CheckDocker) check() {
|
||||||
|
var out string
|
||||||
|
status, b, err := c.doCheck()
|
||||||
|
if err != nil {
|
||||||
|
c.Logger.Printf("[DEBUG] agent: Check '%s': %s", c.CheckID, err)
|
||||||
|
out = err.Error()
|
||||||
|
} else {
|
||||||
|
// out is already limited to CheckBufSize since we're getting a
|
||||||
|
// limited buffer. So we don't need to truncate it just report
|
||||||
|
// that it was truncated.
|
||||||
|
out = string(b.Bytes())
|
||||||
|
if int(b.TotalWritten()) > len(out) {
|
||||||
|
out = fmt.Sprintf("Captured %d of %d bytes\n...\n%s", len(out), b.TotalWritten(), out)
|
||||||
|
}
|
||||||
|
c.Logger.Printf("[DEBUG] agent: Check '%s' script '%s' output: %s", c.CheckID, c.Script, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
if status == api.HealthCritical {
|
||||||
|
c.Logger.Printf("[WARN] agent: Check '%v' is now critical", c.CheckID)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.Notify.UpdateCheck(c.CheckID, status, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CheckDocker) doCheck() (string, *circbuf.Buffer, error) {
|
||||||
|
var cmd []string
|
||||||
|
if len(c.ScriptArgs) > 0 {
|
||||||
|
cmd = c.ScriptArgs
|
||||||
|
} else {
|
||||||
|
cmd = []string{c.Shell, "-c", c.Script}
|
||||||
|
}
|
||||||
|
|
||||||
|
execID, err := c.Client.CreateExec(c.DockerContainerID, cmd)
|
||||||
|
if err != nil {
|
||||||
|
return api.HealthCritical, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
buf, err := c.Client.StartExec(c.DockerContainerID, execID)
|
||||||
|
if err != nil {
|
||||||
|
return api.HealthCritical, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
exitCode, err := c.Client.InspectExec(c.DockerContainerID, execID)
|
||||||
|
if err != nil {
|
||||||
|
return api.HealthCritical, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch exitCode {
|
||||||
|
case 0:
|
||||||
|
return api.HealthPassing, buf, nil
|
||||||
|
case 1:
|
||||||
|
c.Logger.Printf("[DEBUG] Check failed with exit code: %d", exitCode)
|
||||||
|
return api.HealthWarning, buf, nil
|
||||||
|
default:
|
||||||
|
c.Logger.Printf("[DEBUG] Check failed with exit code: %d", exitCode)
|
||||||
|
return api.HealthCritical, buf, nil
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package agent
|
package checks
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
@ -18,8 +18,17 @@ import (
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/consul/testutil/retry"
|
"github.com/hashicorp/consul/testutil/retry"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
|
uuid "github.com/hashicorp/go-uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func uniqueID() string {
|
||||||
|
id, err := uuid.GenerateUUID()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return id
|
||||||
|
}
|
||||||
|
|
||||||
func TestCheckMonitor_Script(t *testing.T) {
|
func TestCheckMonitor_Script(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
script, status string
|
script, status string
|
||||||
|
@ -38,7 +47,7 @@ func TestCheckMonitor_Script(t *testing.T) {
|
||||||
CheckID: types.CheckID("foo"),
|
CheckID: types.CheckID("foo"),
|
||||||
Script: tt.script,
|
Script: tt.script,
|
||||||
Interval: 25 * time.Millisecond,
|
Interval: 25 * time.Millisecond,
|
||||||
Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags),
|
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||||
}
|
}
|
||||||
check.Start()
|
check.Start()
|
||||||
defer check.Stop()
|
defer check.Stop()
|
||||||
|
@ -73,7 +82,7 @@ func TestCheckMonitor_Args(t *testing.T) {
|
||||||
CheckID: types.CheckID("foo"),
|
CheckID: types.CheckID("foo"),
|
||||||
ScriptArgs: tt.args,
|
ScriptArgs: tt.args,
|
||||||
Interval: 25 * time.Millisecond,
|
Interval: 25 * time.Millisecond,
|
||||||
Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags),
|
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||||
}
|
}
|
||||||
check.Start()
|
check.Start()
|
||||||
defer check.Stop()
|
defer check.Stop()
|
||||||
|
@ -98,7 +107,7 @@ func TestCheckMonitor_Timeout(t *testing.T) {
|
||||||
ScriptArgs: []string{"sh", "-c", "sleep 1 && exit 0"},
|
ScriptArgs: []string{"sh", "-c", "sleep 1 && exit 0"},
|
||||||
Interval: 50 * time.Millisecond,
|
Interval: 50 * time.Millisecond,
|
||||||
Timeout: 25 * time.Millisecond,
|
Timeout: 25 * time.Millisecond,
|
||||||
Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags),
|
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||||
}
|
}
|
||||||
check.Start()
|
check.Start()
|
||||||
defer check.Stop()
|
defer check.Stop()
|
||||||
|
@ -122,7 +131,7 @@ func TestCheckMonitor_RandomStagger(t *testing.T) {
|
||||||
CheckID: types.CheckID("foo"),
|
CheckID: types.CheckID("foo"),
|
||||||
ScriptArgs: []string{"sh", "-c", "exit 0"},
|
ScriptArgs: []string{"sh", "-c", "exit 0"},
|
||||||
Interval: 25 * time.Millisecond,
|
Interval: 25 * time.Millisecond,
|
||||||
Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags),
|
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||||
}
|
}
|
||||||
check.Start()
|
check.Start()
|
||||||
defer check.Stop()
|
defer check.Stop()
|
||||||
|
@ -147,7 +156,7 @@ func TestCheckMonitor_LimitOutput(t *testing.T) {
|
||||||
CheckID: types.CheckID("foo"),
|
CheckID: types.CheckID("foo"),
|
||||||
ScriptArgs: []string{"od", "-N", "81920", "/dev/urandom"},
|
ScriptArgs: []string{"od", "-N", "81920", "/dev/urandom"},
|
||||||
Interval: 25 * time.Millisecond,
|
Interval: 25 * time.Millisecond,
|
||||||
Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags),
|
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||||
}
|
}
|
||||||
check.Start()
|
check.Start()
|
||||||
defer check.Stop()
|
defer check.Stop()
|
||||||
|
@ -155,7 +164,7 @@ func TestCheckMonitor_LimitOutput(t *testing.T) {
|
||||||
time.Sleep(50 * time.Millisecond)
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
|
||||||
// Allow for extra bytes for the truncation message
|
// Allow for extra bytes for the truncation message
|
||||||
if len(notif.Output("foo")) > CheckBufSize+100 {
|
if len(notif.Output("foo")) > BufSize+100 {
|
||||||
t.Fatalf("output size is too long")
|
t.Fatalf("output size is too long")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -167,7 +176,7 @@ func TestCheckTTL(t *testing.T) {
|
||||||
Notify: notif,
|
Notify: notif,
|
||||||
CheckID: types.CheckID("foo"),
|
CheckID: types.CheckID("foo"),
|
||||||
TTL: 200 * time.Millisecond,
|
TTL: 200 * time.Millisecond,
|
||||||
Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags),
|
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||||
}
|
}
|
||||||
check.Start()
|
check.Start()
|
||||||
defer check.Stop()
|
defer check.Stop()
|
||||||
|
@ -277,7 +286,7 @@ func TestCheckHTTP(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Body larger than 4k limit
|
// Body larger than 4k limit
|
||||||
body := bytes.Repeat([]byte{'a'}, 2*CheckBufSize)
|
body := bytes.Repeat([]byte{'a'}, 2*BufSize)
|
||||||
w.WriteHeader(tt.code)
|
w.WriteHeader(tt.code)
|
||||||
w.Write(body)
|
w.Write(body)
|
||||||
}))
|
}))
|
||||||
|
@ -291,7 +300,7 @@ func TestCheckHTTP(t *testing.T) {
|
||||||
Method: tt.method,
|
Method: tt.method,
|
||||||
Header: tt.header,
|
Header: tt.header,
|
||||||
Interval: 10 * time.Millisecond,
|
Interval: 10 * time.Millisecond,
|
||||||
Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags),
|
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||||
}
|
}
|
||||||
check.Start()
|
check.Start()
|
||||||
defer check.Stop()
|
defer check.Stop()
|
||||||
|
@ -303,9 +312,9 @@ func TestCheckHTTP(t *testing.T) {
|
||||||
if got, want := notif.State("foo"), tt.status; got != want {
|
if got, want := notif.State("foo"), tt.status; got != want {
|
||||||
r.Fatalf("got state %q want %q", got, want)
|
r.Fatalf("got state %q want %q", got, want)
|
||||||
}
|
}
|
||||||
// Allow slightly more data than CheckBufSize, for the header
|
// Allow slightly more data than BufSize, for the header
|
||||||
if n := len(notif.Output("foo")); n > (CheckBufSize + 256) {
|
if n := len(notif.Output("foo")); n > (BufSize + 256) {
|
||||||
r.Fatalf("output too long: %d (%d-byte limit)", n, CheckBufSize)
|
r.Fatalf("output too long: %d (%d-byte limit)", n, BufSize)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -327,7 +336,7 @@ func TestCheckHTTPTimeout(t *testing.T) {
|
||||||
HTTP: server.URL,
|
HTTP: server.URL,
|
||||||
Timeout: timeout,
|
Timeout: timeout,
|
||||||
Interval: 10 * time.Millisecond,
|
Interval: 10 * time.Millisecond,
|
||||||
Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags),
|
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||||
}
|
}
|
||||||
|
|
||||||
check.Start()
|
check.Start()
|
||||||
|
@ -348,7 +357,7 @@ func TestCheckHTTP_disablesKeepAlives(t *testing.T) {
|
||||||
CheckID: types.CheckID("foo"),
|
CheckID: types.CheckID("foo"),
|
||||||
HTTP: "http://foo.bar/baz",
|
HTTP: "http://foo.bar/baz",
|
||||||
Interval: 10 * time.Second,
|
Interval: 10 * time.Second,
|
||||||
Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags),
|
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||||
}
|
}
|
||||||
|
|
||||||
check.Start()
|
check.Start()
|
||||||
|
@ -365,7 +374,7 @@ func TestCheckHTTP_TLSSkipVerify_defaultFalse(t *testing.T) {
|
||||||
CheckID: "foo",
|
CheckID: "foo",
|
||||||
HTTP: "https://foo.bar/baz",
|
HTTP: "https://foo.bar/baz",
|
||||||
Interval: 10 * time.Second,
|
Interval: 10 * time.Second,
|
||||||
Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags),
|
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||||
}
|
}
|
||||||
|
|
||||||
check.Start()
|
check.Start()
|
||||||
|
@ -379,7 +388,7 @@ func TestCheckHTTP_TLSSkipVerify_defaultFalse(t *testing.T) {
|
||||||
func largeBodyHandler(code int) http.Handler {
|
func largeBodyHandler(code int) http.Handler {
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
// Body larger than 4k limit
|
// Body larger than 4k limit
|
||||||
body := bytes.Repeat([]byte{'a'}, 2*CheckBufSize)
|
body := bytes.Repeat([]byte{'a'}, 2*BufSize)
|
||||||
w.WriteHeader(code)
|
w.WriteHeader(code)
|
||||||
w.Write(body)
|
w.Write(body)
|
||||||
})
|
})
|
||||||
|
@ -397,7 +406,7 @@ func TestCheckHTTP_TLSSkipVerify_true_pass(t *testing.T) {
|
||||||
CheckID: types.CheckID("skipverify_true"),
|
CheckID: types.CheckID("skipverify_true"),
|
||||||
HTTP: server.URL,
|
HTTP: server.URL,
|
||||||
Interval: 25 * time.Millisecond,
|
Interval: 25 * time.Millisecond,
|
||||||
Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags),
|
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||||
TLSSkipVerify: true,
|
TLSSkipVerify: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -429,7 +438,7 @@ func TestCheckHTTP_TLSSkipVerify_true_fail(t *testing.T) {
|
||||||
CheckID: types.CheckID("skipverify_true"),
|
CheckID: types.CheckID("skipverify_true"),
|
||||||
HTTP: server.URL,
|
HTTP: server.URL,
|
||||||
Interval: 5 * time.Millisecond,
|
Interval: 5 * time.Millisecond,
|
||||||
Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags),
|
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||||
TLSSkipVerify: true,
|
TLSSkipVerify: true,
|
||||||
}
|
}
|
||||||
check.Start()
|
check.Start()
|
||||||
|
@ -457,7 +466,7 @@ func TestCheckHTTP_TLSSkipVerify_false(t *testing.T) {
|
||||||
CheckID: types.CheckID("skipverify_false"),
|
CheckID: types.CheckID("skipverify_false"),
|
||||||
HTTP: server.URL,
|
HTTP: server.URL,
|
||||||
Interval: 100 * time.Millisecond,
|
Interval: 100 * time.Millisecond,
|
||||||
Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags),
|
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||||
TLSSkipVerify: false,
|
TLSSkipVerify: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -504,7 +513,7 @@ func expectTCPStatus(t *testing.T, tcp string, status string) {
|
||||||
CheckID: types.CheckID("foo"),
|
CheckID: types.CheckID("foo"),
|
||||||
TCP: tcp,
|
TCP: tcp,
|
||||||
Interval: 10 * time.Millisecond,
|
Interval: 10 * time.Millisecond,
|
||||||
Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags),
|
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
|
||||||
}
|
}
|
||||||
check.Start()
|
check.Start()
|
||||||
defer check.Stop()
|
defer check.Stop()
|
||||||
|
@ -813,7 +822,7 @@ func TestCheck_Docker(t *testing.T) {
|
||||||
ScriptArgs: []string{"/health.sh"},
|
ScriptArgs: []string{"/health.sh"},
|
||||||
DockerContainerID: "123",
|
DockerContainerID: "123",
|
||||||
Interval: 25 * time.Millisecond,
|
Interval: 25 * time.Millisecond,
|
||||||
client: c,
|
Client: c,
|
||||||
}
|
}
|
||||||
check.Start()
|
check.Start()
|
||||||
defer check.Stop()
|
defer check.Stop()
|
|
@ -1,4 +1,4 @@
|
||||||
package agent
|
package checks
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
@ -54,6 +54,10 @@ func NewDockerClient(host string, maxbuf int64) (*DockerClient, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *DockerClient) Host() string {
|
||||||
|
return c.host
|
||||||
|
}
|
||||||
|
|
||||||
// ParseHost verifies that the given host strings is valid.
|
// ParseHost verifies that the given host strings is valid.
|
||||||
// copied from github.com/docker/docker/client.go
|
// copied from github.com/docker/docker/client.go
|
||||||
func ParseHost(host string) (string, string, string, error) {
|
func ParseHost(host string) (string, string, string, error) {
|
|
@ -1,5 +1,5 @@
|
||||||
// +build !windows
|
// +build !windows
|
||||||
|
|
||||||
package agent
|
package checks
|
||||||
|
|
||||||
const DefaultDockerHost = "unix:///var/run/docker.sock"
|
const DefaultDockerHost = "unix:///var/run/docker.sock"
|
|
@ -1,3 +1,3 @@
|
||||||
package agent
|
package checks
|
||||||
|
|
||||||
const DefaultDockerHost = "npipe:////./pipe/docker_engine"
|
const DefaultDockerHost = "npipe:////./pipe/docker_engine"
|
|
@ -0,0 +1,14 @@
|
||||||
|
package exec
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os/exec"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Subprocess returns a command to execute a subprocess directly.
|
||||||
|
func Subprocess(args []string) (*exec.Cmd, error) {
|
||||||
|
if len(args) == 0 {
|
||||||
|
return nil, fmt.Errorf("need an executable to run")
|
||||||
|
}
|
||||||
|
return exec.Command(args[0], args[1:]...), nil
|
||||||
|
}
|
|
@ -1,6 +1,6 @@
|
||||||
// +build !windows
|
// +build !windows
|
||||||
|
|
||||||
package agent
|
package exec
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
|
@ -8,8 +8,8 @@ import (
|
||||||
"syscall"
|
"syscall"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ExecScript returns a command to execute a script through a shell.
|
// Script returns a command to execute a script through a shell.
|
||||||
func ExecScript(script string) (*exec.Cmd, error) {
|
func Script(script string) (*exec.Cmd, error) {
|
||||||
shell := "/bin/sh"
|
shell := "/bin/sh"
|
||||||
if other := os.Getenv("SHELL"); other != "" {
|
if other := os.Getenv("SHELL"); other != "" {
|
||||||
shell = other
|
shell = other
|
|
@ -1,6 +1,6 @@
|
||||||
// +build windows
|
// +build windows
|
||||||
|
|
||||||
package agent
|
package exec
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
|
@ -9,8 +9,8 @@ import (
|
||||||
"syscall"
|
"syscall"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ExecScript returns a command to execute a script through a shell.
|
// Script returns a command to execute a script through a shell.
|
||||||
func ExecScript(script string) (*exec.Cmd, error) {
|
func Script(script string) (*exec.Cmd, error) {
|
||||||
shell := "cmd"
|
shell := "cmd"
|
||||||
if other := os.Getenv("SHELL"); other != "" {
|
if other := os.Getenv("SHELL"); other != "" {
|
||||||
shell = other
|
shell = other
|
|
@ -5,13 +5,14 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
osexec "os/exec"
|
||||||
"path"
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/exec"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
)
|
)
|
||||||
|
@ -161,12 +162,12 @@ func (a *Agent) handleRemoteExec(msg *UserEvent) {
|
||||||
|
|
||||||
// Create the exec.Cmd
|
// Create the exec.Cmd
|
||||||
a.logger.Printf("[INFO] agent: remote exec '%s'", script)
|
a.logger.Printf("[INFO] agent: remote exec '%s'", script)
|
||||||
var cmd *exec.Cmd
|
var cmd *osexec.Cmd
|
||||||
var err error
|
var err error
|
||||||
if len(spec.Args) > 0 {
|
if len(spec.Args) > 0 {
|
||||||
cmd, err = ExecSubprocess(spec.Args)
|
cmd, err = exec.Subprocess(spec.Args)
|
||||||
} else {
|
} else {
|
||||||
cmd, err = ExecScript(script)
|
cmd, err = exec.Script(script)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.logger.Printf("[DEBUG] agent: failed to start remote exec: %v", err)
|
a.logger.Printf("[DEBUG] agent: failed to start remote exec: %v", err)
|
||||||
|
@ -203,7 +204,7 @@ func (a *Agent) handleRemoteExec(msg *UserEvent) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try to determine the exit code
|
// Try to determine the exit code
|
||||||
if exitErr, ok := err.(*exec.ExitError); ok {
|
if exitErr, ok := err.(*osexec.ExitError); ok {
|
||||||
if status, ok := exitErr.Sys().(syscall.WaitStatus); ok {
|
if status, ok := exitErr.Sys().(syscall.WaitStatus); ok {
|
||||||
exitCh <- status.ExitStatus()
|
exitCh <- status.ExitStatus()
|
||||||
return
|
return
|
||||||
|
|
|
@ -111,7 +111,7 @@ func (a *TestAgent) Start() *TestAgent {
|
||||||
}
|
}
|
||||||
hclDataDir = `data_dir = "` + d + `"`
|
hclDataDir = `data_dir = "` + d + `"`
|
||||||
}
|
}
|
||||||
id := UniqueID()
|
id := NodeID()
|
||||||
|
|
||||||
for i := 10; i >= 0; i-- {
|
for i := 10; i >= 0; i-- {
|
||||||
a.Config = TestConfig(
|
a.Config = TestConfig(
|
||||||
|
@ -276,14 +276,6 @@ func (a *TestAgent) consulConfig() *consul.Config {
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
func UniqueID() string {
|
|
||||||
id := strconv.FormatUint(rand.Uint64(), 36)
|
|
||||||
for len(id) < 16 {
|
|
||||||
id += " "
|
|
||||||
}
|
|
||||||
return id
|
|
||||||
}
|
|
||||||
|
|
||||||
// pickRandomPorts selects random ports from fixed size random blocks of
|
// pickRandomPorts selects random ports from fixed size random blocks of
|
||||||
// ports. This does not eliminate the chance for port conflict but
|
// ports. This does not eliminate the chance for port conflict but
|
||||||
// reduces it significanltly with little overhead. Furthermore, asking
|
// reduces it significanltly with little overhead. Furthermore, asking
|
||||||
|
|
|
@ -92,15 +92,6 @@ GROUP:
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExecSubprocess returns a command to execute a subprocess directly.
|
|
||||||
func ExecSubprocess(args []string) (*exec.Cmd, error) {
|
|
||||||
if len(args) == 0 {
|
|
||||||
return nil, fmt.Errorf("need an executable to run")
|
|
||||||
}
|
|
||||||
|
|
||||||
return exec.Command(args[0], args[1:]...), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ForwardSignals will fire up a goroutine to forward signals to the given
|
// ForwardSignals will fire up a goroutine to forward signals to the given
|
||||||
// subprocess until the shutdown channel is closed.
|
// subprocess until the shutdown channel is closed.
|
||||||
func ForwardSignals(cmd *exec.Cmd, logFn func(error), shutdownCh <-chan struct{}) {
|
func ForwardSignals(cmd *exec.Cmd, logFn func(error), shutdownCh <-chan struct{}) {
|
||||||
|
|
|
@ -9,10 +9,11 @@ import (
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
osexec "os/exec"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"github.com/armon/circbuf"
|
"github.com/armon/circbuf"
|
||||||
|
"github.com/hashicorp/consul/agent/exec"
|
||||||
"github.com/hashicorp/consul/watch"
|
"github.com/hashicorp/consul/watch"
|
||||||
"github.com/hashicorp/go-cleanhttp"
|
"github.com/hashicorp/go-cleanhttp"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
@ -43,13 +44,13 @@ func makeWatchHandler(logOutput io.Writer, handler interface{}) watch.HandlerFun
|
||||||
logger := log.New(logOutput, "", log.LstdFlags)
|
logger := log.New(logOutput, "", log.LstdFlags)
|
||||||
fn := func(idx uint64, data interface{}) {
|
fn := func(idx uint64, data interface{}) {
|
||||||
// Create the command
|
// Create the command
|
||||||
var cmd *exec.Cmd
|
var cmd *osexec.Cmd
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
if len(args) > 0 {
|
if len(args) > 0 {
|
||||||
cmd, err = ExecSubprocess(args)
|
cmd, err = exec.Subprocess(args)
|
||||||
} else {
|
} else {
|
||||||
cmd, err = ExecScript(script)
|
cmd, err = exec.Script(script)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Printf("[ERR] agent: Failed to setup watch: %v", err)
|
logger.Printf("[ERR] agent: Failed to setup watch: %v", err)
|
||||||
|
|
|
@ -4,7 +4,7 @@ import (
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
osexec "os/exec"
|
||||||
"path"
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -12,6 +12,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent"
|
"github.com/hashicorp/consul/agent"
|
||||||
|
"github.com/hashicorp/consul/agent/exec"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/consul/command/flags"
|
"github.com/hashicorp/consul/command/flags"
|
||||||
"github.com/mitchellh/cli"
|
"github.com/mitchellh/cli"
|
||||||
|
@ -341,12 +342,12 @@ func (c *cmd) startChild(args []string, passStdin, shell bool) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the command
|
// Create the command
|
||||||
var cmd *exec.Cmd
|
var cmd *osexec.Cmd
|
||||||
var err error
|
var err error
|
||||||
if !shell {
|
if !shell {
|
||||||
cmd, err = agent.ExecSubprocess(args)
|
cmd, err = exec.Subprocess(args)
|
||||||
} else {
|
} else {
|
||||||
cmd, err = agent.ExecScript(strings.Join(args, " "))
|
cmd, err = exec.Script(strings.Join(args, " "))
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.UI.Error(fmt.Sprintf("Error executing handler: %s", err))
|
c.UI.Error(fmt.Sprintf("Error executing handler: %s", err))
|
||||||
|
|
|
@ -6,11 +6,12 @@ import (
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
osexec "os/exec"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent"
|
"github.com/hashicorp/consul/agent"
|
||||||
|
"github.com/hashicorp/consul/agent/exec"
|
||||||
"github.com/hashicorp/consul/command/flags"
|
"github.com/hashicorp/consul/command/flags"
|
||||||
consulwatch "github.com/hashicorp/consul/watch"
|
consulwatch "github.com/hashicorp/consul/watch"
|
||||||
"github.com/mitchellh/cli"
|
"github.com/mitchellh/cli"
|
||||||
|
@ -173,11 +174,11 @@ func (c *cmd) Run(args []string) int {
|
||||||
// Create the command
|
// Create the command
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
var err error
|
var err error
|
||||||
var cmd *exec.Cmd
|
var cmd *osexec.Cmd
|
||||||
if !c.shell {
|
if !c.shell {
|
||||||
cmd, err = agent.ExecSubprocess(c.flags.Args())
|
cmd, err = exec.Subprocess(c.flags.Args())
|
||||||
} else {
|
} else {
|
||||||
cmd, err = agent.ExecScript(strings.Join(c.flags.Args(), " "))
|
cmd, err = exec.Script(strings.Join(c.flags.Args(), " "))
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.UI.Error(fmt.Sprintf("Error executing handler: %s", err))
|
c.UI.Error(fmt.Sprintf("Error executing handler: %s", err))
|
||||||
|
|
Loading…
Reference in New Issue