mirror of https://github.com/status-im/consul.git
Implemented Docker health checks
This commit is contained in:
parent
13ef4836f2
commit
71ede8addb
|
@ -83,6 +83,9 @@ type Agent struct {
|
|||
// checkTTLs maps the check ID to an associated check TTL
|
||||
checkTTLs map[string]*CheckTTL
|
||||
|
||||
// checkDockers maps the check ID to an associated Docker Exec based check
|
||||
checkDockers map[string]*CheckDocker
|
||||
|
||||
// checkLock protects updates to the check* maps
|
||||
checkLock sync.Mutex
|
||||
|
||||
|
@ -151,6 +154,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
|
|||
checkTTLs: make(map[string]*CheckTTL),
|
||||
checkHTTPs: make(map[string]*CheckHTTP),
|
||||
checkTCPs: make(map[string]*CheckTCP),
|
||||
checkDockers: make(map[string]*CheckDocker),
|
||||
eventCh: make(chan serf.UserEvent, 1024),
|
||||
eventBuf: make([]*UserEvent, 256),
|
||||
shutdownCh: make(chan struct{}),
|
||||
|
@ -905,7 +909,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist
|
|||
tcp.Start()
|
||||
a.checkTCPs[check.CheckID] = tcp
|
||||
|
||||
} else {
|
||||
} else if chkType.IsMonitor() {
|
||||
if existing, ok := a.checkMonitors[check.CheckID]; ok {
|
||||
existing.Stop()
|
||||
}
|
||||
|
@ -924,6 +928,27 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist
|
|||
}
|
||||
monitor.Start()
|
||||
a.checkMonitors[check.CheckID] = monitor
|
||||
} else if chkType.IsDocker() {
|
||||
if existing, ok := a.checkDockers[check.CheckID]; ok {
|
||||
existing.Stop()
|
||||
}
|
||||
if chkType.Interval < MinInterval {
|
||||
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
|
||||
check.CheckID, MinInterval))
|
||||
chkType.Interval = MinInterval
|
||||
}
|
||||
|
||||
dockerCheck := &CheckDocker{
|
||||
Notify: &a.state,
|
||||
CheckID: check.CheckID,
|
||||
DockerContainerId: chkType.DockerContainerId,
|
||||
Shell: chkType.Shell,
|
||||
Script: chkType.Script,
|
||||
Interval: chkType.Interval,
|
||||
Logger: a.logger,
|
||||
}
|
||||
dockerCheck.Start()
|
||||
a.checkDockers[check.CheckID] = dockerCheck
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -6,12 +6,14 @@ import (
|
|||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/armon/circbuf"
|
||||
docker "github.com/fsouza/go-dockerclient"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/go-cleanhttp"
|
||||
)
|
||||
|
@ -38,10 +40,12 @@ const (
|
|||
// Only one of the types needs to be provided
|
||||
// TTL or Script/Interval or HTTP/Interval or TCP/Interval
|
||||
type CheckType struct {
|
||||
Script string
|
||||
HTTP string
|
||||
TCP string
|
||||
Interval time.Duration
|
||||
Script string
|
||||
HTTP string
|
||||
TCP string
|
||||
Interval time.Duration
|
||||
DockerContainerId string
|
||||
Shell string
|
||||
|
||||
Timeout time.Duration
|
||||
TTL time.Duration
|
||||
|
@ -54,7 +58,7 @@ type CheckTypes []*CheckType
|
|||
|
||||
// Valid checks if the CheckType is valid
|
||||
func (c *CheckType) Valid() bool {
|
||||
return c.IsTTL() || c.IsMonitor() || c.IsHTTP() || c.IsTCP()
|
||||
return c.IsTTL() || c.IsMonitor() || c.IsHTTP() || c.IsTCP() || c.IsDocker()
|
||||
}
|
||||
|
||||
// IsTTL checks if this is a TTL type
|
||||
|
@ -64,7 +68,7 @@ func (c *CheckType) IsTTL() bool {
|
|||
|
||||
// IsMonitor checks if this is a Monitor type
|
||||
func (c *CheckType) IsMonitor() bool {
|
||||
return c.Script != "" && c.Interval != 0
|
||||
return c.Script != "" && c.DockerContainerId == "" && c.Interval != 0
|
||||
}
|
||||
|
||||
// IsHTTP checks if this is a HTTP type
|
||||
|
@ -77,6 +81,10 @@ func (c *CheckType) IsTCP() bool {
|
|||
return c.TCP != "" && c.Interval != 0
|
||||
}
|
||||
|
||||
func (c *CheckType) IsDocker() bool {
|
||||
return c.DockerContainerId != "" && c.Shell != "" && c.Interval != 0
|
||||
}
|
||||
|
||||
// CheckNotifier interface is used by the CheckMonitor
|
||||
// to notify when a check has a status update. The update
|
||||
// should take care to be idempotent.
|
||||
|
@ -493,3 +501,103 @@ func (c *CheckTCP) check() {
|
|||
c.Logger.Printf("[DEBUG] agent: check '%v' is passing", c.CheckID)
|
||||
c.Notify.UpdateCheck(c.CheckID, structs.HealthPassing, fmt.Sprintf("TCP connect %s: Success", c.TCP))
|
||||
}
|
||||
|
||||
// CheckDocker is used to periodically invoke a script to
|
||||
// determine the health of an application running inside a
|
||||
// Docker Container. We assume that the script is compatible
|
||||
// with nagios plugins and expects the output in the same format.
|
||||
type CheckDocker struct {
|
||||
Notify CheckNotifier
|
||||
CheckID string
|
||||
Script string
|
||||
DockerContainerId string
|
||||
Shell string
|
||||
Interval time.Duration
|
||||
Logger *log.Logger
|
||||
|
||||
dockerClient *docker.Client
|
||||
exec *docker.Exec
|
||||
startExecOpts docker.StartExecOptions
|
||||
stop bool
|
||||
stopCh chan struct{}
|
||||
stopLock sync.Mutex
|
||||
}
|
||||
|
||||
// Start is used to start checks.
|
||||
// Docker Checks runs until stop is called
|
||||
func (c *CheckDocker) Start() {
|
||||
c.stopLock.Lock()
|
||||
defer c.stopLock.Unlock()
|
||||
|
||||
//figure out the shell
|
||||
if c.Shell == "" {
|
||||
if otherShell := os.Getenv("SHELL"); otherShell != "" {
|
||||
c.Shell = otherShell
|
||||
} else {
|
||||
c.Shell = "/bin/sh"
|
||||
}
|
||||
}
|
||||
|
||||
cmd := []string{c.Shell, "-c", c.Script}
|
||||
|
||||
//Set up the Exec since
|
||||
execOpts := docker.CreateExecOptions{
|
||||
AttachStdin: false,
|
||||
AttachStdout: true,
|
||||
AttachStderr: true,
|
||||
Tty: false,
|
||||
Cmd: cmd,
|
||||
Container: c.DockerContainerId,
|
||||
}
|
||||
if exec, err := c.dockerClient.CreateExec(execOpts); err != nil {
|
||||
c.exec = exec
|
||||
} else {
|
||||
c.Logger.Printf("[DEBUG] agent: Error while creating Exec: %s", err.Error())
|
||||
}
|
||||
|
||||
c.startExecOpts = docker.StartExecOptions{
|
||||
Detach: false,
|
||||
Tty: false,
|
||||
}
|
||||
|
||||
c.stop = false
|
||||
c.stopCh = make(chan struct{})
|
||||
go c.run()
|
||||
}
|
||||
|
||||
// Stop is used to stop a docker check.
|
||||
func (c *CheckDocker) Stop() {
|
||||
c.stopLock.Lock()
|
||||
defer c.stopLock.Unlock()
|
||||
if !c.stop {
|
||||
c.stop = true
|
||||
close(c.stopCh)
|
||||
}
|
||||
}
|
||||
|
||||
// run is invoked by a goroutine to run until Stop() is called
|
||||
func (c *CheckDocker) run() {
|
||||
// Get the randomized initial pause time
|
||||
initialPauseTime := randomStagger(c.Interval)
|
||||
c.Logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s -c %s in container %s", initialPauseTime, c.Shell, c.Script, c.DockerContainerId)
|
||||
next := time.After(initialPauseTime)
|
||||
for {
|
||||
select {
|
||||
case <-next:
|
||||
c.check()
|
||||
next = time.After(c.Interval)
|
||||
case <-c.stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CheckDocker) check() {
|
||||
err := c.dockerClient.StartExec(c.exec.ID, c.startExecOpts)
|
||||
if err != nil {
|
||||
c.Logger.Printf("[DEBUG] Error in executing health checks: %s", err.Error())
|
||||
c.Notify.UpdateCheck(c.CheckID, structs.HealthCritical, err.Error())
|
||||
return
|
||||
}
|
||||
c.Notify.UpdateCheck(c.CheckID, structs.HealthPassing, fmt.Sprintf("Script execution %s: Success", c.Script))
|
||||
}
|
||||
|
|
|
@ -773,6 +773,9 @@ func FixupCheckType(raw interface{}) error {
|
|||
case "service_id":
|
||||
rawMap["serviceid"] = v
|
||||
delete(rawMap, "service_id")
|
||||
case "docker_container_id":
|
||||
rawMap["DockerContainerId"] = v
|
||||
delete(rawMap, "docker_container_id")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1072,7 +1072,7 @@ func TestDecodeConfig_Service(t *testing.T) {
|
|||
|
||||
func TestDecodeConfig_Check(t *testing.T) {
|
||||
// Basics
|
||||
input := `{"check": {"id": "chk1", "name": "mem", "notes": "foobar", "script": "/bin/check_redis", "interval": "10s", "ttl": "15s" }}`
|
||||
input := `{"check": {"id": "chk1", "name": "mem", "notes": "foobar", "script": "/bin/check_redis", "interval": "10s", "ttl": "15s", "shell": "/bin/bash", "docker_container_id": "redis" }}`
|
||||
config, err := DecodeConfig(bytes.NewReader([]byte(input)))
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
|
@ -1106,6 +1106,14 @@ func TestDecodeConfig_Check(t *testing.T) {
|
|||
if chk.TTL != 15*time.Second {
|
||||
t.Fatalf("bad: %v", chk)
|
||||
}
|
||||
|
||||
if chk.Shell != "/bin/bash" {
|
||||
t.Fatalf("bad: %v", chk)
|
||||
}
|
||||
|
||||
if chk.DockerContainerId != "redis" {
|
||||
t.Fatalf("bad: %v", chk)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMergeConfig(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue