Implemented Docker health checks

This commit is contained in:
Diptanu Choudhury 2015-10-22 15:29:13 -07:00
parent 6a350d5d19
commit 80ad971e8a
4 changed files with 153 additions and 9 deletions

View File

@ -82,6 +82,9 @@ type Agent struct {
// checkTTLs maps the check ID to an associated check TTL // checkTTLs maps the check ID to an associated check TTL
checkTTLs map[string]*CheckTTL checkTTLs map[string]*CheckTTL
// checkDockers maps the check ID to an associated Docker Exec based check
checkDockers map[string]*CheckDocker
// checkLock protects updates to the check* maps // checkLock protects updates to the check* maps
checkLock sync.Mutex checkLock sync.Mutex
@ -150,6 +153,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
checkTTLs: make(map[string]*CheckTTL), checkTTLs: make(map[string]*CheckTTL),
checkHTTPs: make(map[string]*CheckHTTP), checkHTTPs: make(map[string]*CheckHTTP),
checkTCPs: make(map[string]*CheckTCP), checkTCPs: make(map[string]*CheckTCP),
checkDockers: make(map[string]*CheckDocker),
eventCh: make(chan serf.UserEvent, 1024), eventCh: make(chan serf.UserEvent, 1024),
eventBuf: make([]*UserEvent, 256), eventBuf: make([]*UserEvent, 256),
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
@ -831,7 +835,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist
tcp.Start() tcp.Start()
a.checkTCPs[check.CheckID] = tcp a.checkTCPs[check.CheckID] = tcp
} else { } else if chkType.IsMonitor() {
if existing, ok := a.checkMonitors[check.CheckID]; ok { if existing, ok := a.checkMonitors[check.CheckID]; ok {
existing.Stop() existing.Stop()
} }
@ -850,6 +854,27 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist
} }
monitor.Start() monitor.Start()
a.checkMonitors[check.CheckID] = monitor a.checkMonitors[check.CheckID] = monitor
} else if chkType.IsDocker() {
if existing, ok := a.checkDockers[check.CheckID]; ok {
existing.Stop()
}
if chkType.Interval < MinInterval {
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
check.CheckID, MinInterval))
chkType.Interval = MinInterval
}
dockerCheck := &CheckDocker{
Notify: &a.state,
CheckID: check.CheckID,
DockerContainerId: chkType.DockerContainerId,
Shell: chkType.Shell,
Script: chkType.Script,
Interval: chkType.Interval,
Logger: a.logger,
}
dockerCheck.Start()
a.checkDockers[check.CheckID] = dockerCheck
} }
} }

View File

@ -6,14 +6,16 @@ import (
"log" "log"
"net" "net"
"net/http" "net/http"
"os"
"os/exec" "os/exec"
"sync" "sync"
"syscall" "syscall"
"time" "time"
"github.com/armon/circbuf" "github.com/armon/circbuf"
"github.com/hashicorp/go-cleanhttp" docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-cleanhttp"
) )
const ( const (
@ -42,6 +44,8 @@ type CheckType struct {
HTTP string HTTP string
TCP string TCP string
Interval time.Duration Interval time.Duration
DockerContainerId string
Shell string
Timeout time.Duration Timeout time.Duration
TTL time.Duration TTL time.Duration
@ -54,7 +58,7 @@ type CheckTypes []*CheckType
// Valid checks if the CheckType is valid // Valid checks if the CheckType is valid
func (c *CheckType) Valid() bool { func (c *CheckType) Valid() bool {
return c.IsTTL() || c.IsMonitor() || c.IsHTTP() || c.IsTCP() return c.IsTTL() || c.IsMonitor() || c.IsHTTP() || c.IsTCP() || c.IsDocker()
} }
// IsTTL checks if this is a TTL type // IsTTL checks if this is a TTL type
@ -64,7 +68,7 @@ func (c *CheckType) IsTTL() bool {
// IsMonitor checks if this is a Monitor type // IsMonitor checks if this is a Monitor type
func (c *CheckType) IsMonitor() bool { func (c *CheckType) IsMonitor() bool {
return c.Script != "" && c.Interval != 0 return c.Script != "" && c.DockerContainerId == "" && c.Interval != 0
} }
// IsHTTP checks if this is a HTTP type // IsHTTP checks if this is a HTTP type
@ -77,6 +81,10 @@ func (c *CheckType) IsTCP() bool {
return c.TCP != "" && c.Interval != 0 return c.TCP != "" && c.Interval != 0
} }
func (c *CheckType) IsDocker() bool {
return c.DockerContainerId != "" && c.Shell != "" && c.Interval != 0
}
// CheckNotifier interface is used by the CheckMonitor // CheckNotifier interface is used by the CheckMonitor
// to notify when a check has a status update. The update // to notify when a check has a status update. The update
// should take care to be idempotent. // should take care to be idempotent.
@ -493,3 +501,103 @@ func (c *CheckTCP) check() {
c.Logger.Printf("[DEBUG] agent: check '%v' is passing", c.CheckID) c.Logger.Printf("[DEBUG] agent: check '%v' is passing", c.CheckID)
c.Notify.UpdateCheck(c.CheckID, structs.HealthPassing, fmt.Sprintf("TCP connect %s: Success", c.TCP)) c.Notify.UpdateCheck(c.CheckID, structs.HealthPassing, fmt.Sprintf("TCP connect %s: Success", c.TCP))
} }
// CheckDocker is used to periodically invoke a script to
// determine the health of an application running inside a
// Docker Container. We assume that the script is compatible
// with nagios plugins and expects the output in the same format.
type CheckDocker struct {
Notify CheckNotifier
CheckID string
Script string
DockerContainerId string
Shell string
Interval time.Duration
Logger *log.Logger
dockerClient *docker.Client
exec *docker.Exec
startExecOpts docker.StartExecOptions
stop bool
stopCh chan struct{}
stopLock sync.Mutex
}
// Start is used to start checks.
// Docker Checks runs until stop is called
func (c *CheckDocker) Start() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
//figure out the shell
if c.Shell == "" {
if otherShell := os.Getenv("SHELL"); otherShell != "" {
c.Shell = otherShell
} else {
c.Shell = "/bin/sh"
}
}
cmd := []string{c.Shell, "-c", c.Script}
//Set up the Exec since
execOpts := docker.CreateExecOptions{
AttachStdin: false,
AttachStdout: true,
AttachStderr: true,
Tty: false,
Cmd: cmd,
Container: c.DockerContainerId,
}
if exec, err := c.dockerClient.CreateExec(execOpts); err != nil {
c.exec = exec
} else {
c.Logger.Printf("[DEBUG] agent: Error while creating Exec: %s", err.Error())
}
c.startExecOpts = docker.StartExecOptions{
Detach: false,
Tty: false,
}
c.stop = false
c.stopCh = make(chan struct{})
go c.run()
}
// Stop is used to stop a docker check.
func (c *CheckDocker) Stop() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
if !c.stop {
c.stop = true
close(c.stopCh)
}
}
// run is invoked by a goroutine to run until Stop() is called
func (c *CheckDocker) run() {
// Get the randomized initial pause time
initialPauseTime := randomStagger(c.Interval)
c.Logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s -c %s in container %s", initialPauseTime, c.Shell, c.Script, c.DockerContainerId)
next := time.After(initialPauseTime)
for {
select {
case <-next:
c.check()
next = time.After(c.Interval)
case <-c.stopCh:
return
}
}
}
func (c *CheckDocker) check() {
err := c.dockerClient.StartExec(c.exec.ID, c.startExecOpts)
if err != nil {
c.Logger.Printf("[DEBUG] Error in executing health checks: %s", err.Error())
c.Notify.UpdateCheck(c.CheckID, structs.HealthCritical, err.Error())
return
}
c.Notify.UpdateCheck(c.CheckID, structs.HealthPassing, fmt.Sprintf("Script execution %s: Success", c.Script))
}

View File

@ -749,6 +749,9 @@ func FixupCheckType(raw interface{}) error {
case "service_id": case "service_id":
rawMap["serviceid"] = v rawMap["serviceid"] = v
delete(rawMap, "service_id") delete(rawMap, "service_id")
case "docker_container_id":
rawMap["DockerContainerId"] = v
delete(rawMap, "docker_container_id")
} }
} }

View File

@ -1061,7 +1061,7 @@ func TestDecodeConfig_Service(t *testing.T) {
func TestDecodeConfig_Check(t *testing.T) { func TestDecodeConfig_Check(t *testing.T) {
// Basics // Basics
input := `{"check": {"id": "chk1", "name": "mem", "notes": "foobar", "script": "/bin/check_redis", "interval": "10s", "ttl": "15s" }}` input := `{"check": {"id": "chk1", "name": "mem", "notes": "foobar", "script": "/bin/check_redis", "interval": "10s", "ttl": "15s", "shell": "/bin/bash", "docker_container_id": "redis" }}`
config, err := DecodeConfig(bytes.NewReader([]byte(input))) config, err := DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
@ -1095,6 +1095,14 @@ func TestDecodeConfig_Check(t *testing.T) {
if chk.TTL != 15*time.Second { if chk.TTL != 15*time.Second {
t.Fatalf("bad: %v", chk) t.Fatalf("bad: %v", chk)
} }
if chk.Shell != "/bin/bash" {
t.Fatalf("bad: %v", chk)
}
if chk.DockerContainerId != "redis" {
t.Fatalf("bad: %v", chk)
}
} }
func TestMergeConfig(t *testing.T) { func TestMergeConfig(t *testing.T) {