mirror of https://github.com/status-im/consul.git
agent: replace docker check
This patch replaces the Docker client which is used for health checks with a simplified version tailored for that purpose. See #3254 See #3257 Fixes #3270
This commit is contained in:
parent
34df7f59ba
commit
2123700056
|
@ -128,6 +128,9 @@ type Agent struct {
|
|||
// checkLock protects updates to the check* maps
|
||||
checkLock sync.Mutex
|
||||
|
||||
// dockerClient is the client for performing docker health checks.
|
||||
dockerClient *DockerClient
|
||||
|
||||
// eventCh is used to receive user events
|
||||
eventCh chan serf.UserEvent
|
||||
|
||||
|
@ -1637,9 +1640,12 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
|||
|
||||
// Check if already registered
|
||||
if chkType != nil {
|
||||
if chkType.IsTTL() {
|
||||
switch {
|
||||
|
||||
case chkType.IsTTL():
|
||||
if existing, ok := a.checkTTLs[check.CheckID]; ok {
|
||||
existing.Stop()
|
||||
delete(a.checkTTLs, check.CheckID)
|
||||
}
|
||||
|
||||
ttl := &CheckTTL{
|
||||
|
@ -1658,9 +1664,10 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
|||
ttl.Start()
|
||||
a.checkTTLs[check.CheckID] = ttl
|
||||
|
||||
} else if chkType.IsHTTP() {
|
||||
case chkType.IsHTTP():
|
||||
if existing, ok := a.checkHTTPs[check.CheckID]; ok {
|
||||
existing.Stop()
|
||||
delete(a.checkHTTPs, check.CheckID)
|
||||
}
|
||||
if chkType.Interval < MinInterval {
|
||||
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
|
||||
|
@ -1682,9 +1689,10 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
|||
http.Start()
|
||||
a.checkHTTPs[check.CheckID] = http
|
||||
|
||||
} else if chkType.IsTCP() {
|
||||
case chkType.IsTCP():
|
||||
if existing, ok := a.checkTCPs[check.CheckID]; ok {
|
||||
existing.Stop()
|
||||
delete(a.checkTCPs, check.CheckID)
|
||||
}
|
||||
if chkType.Interval < MinInterval {
|
||||
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
|
||||
|
@ -1703,9 +1711,10 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
|||
tcp.Start()
|
||||
a.checkTCPs[check.CheckID] = tcp
|
||||
|
||||
} else if chkType.IsDocker() {
|
||||
case chkType.IsDocker():
|
||||
if existing, ok := a.checkDockers[check.CheckID]; ok {
|
||||
existing.Stop()
|
||||
delete(a.checkDockers, check.CheckID)
|
||||
}
|
||||
if chkType.Interval < MinInterval {
|
||||
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
|
||||
|
@ -1713,6 +1722,15 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
|||
chkType.Interval = MinInterval
|
||||
}
|
||||
|
||||
if a.dockerClient == nil {
|
||||
dc, err := NewDockerClient(os.Getenv("DOCKER_HOST"), CheckBufSize)
|
||||
if err != nil {
|
||||
a.logger.Printf("[ERR] agent: error creating docker client: %s", err)
|
||||
return err
|
||||
}
|
||||
a.dockerClient = dc
|
||||
}
|
||||
|
||||
dockerCheck := &CheckDocker{
|
||||
Notify: a.state,
|
||||
CheckID: check.CheckID,
|
||||
|
@ -1721,15 +1739,15 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
|||
Script: chkType.Script,
|
||||
Interval: chkType.Interval,
|
||||
Logger: a.logger,
|
||||
}
|
||||
if err := dockerCheck.Init(); err != nil {
|
||||
return err
|
||||
client: a.dockerClient,
|
||||
}
|
||||
dockerCheck.Start()
|
||||
a.checkDockers[check.CheckID] = dockerCheck
|
||||
} else if chkType.IsMonitor() {
|
||||
|
||||
case chkType.IsMonitor():
|
||||
if existing, ok := a.checkMonitors[check.CheckID]; ok {
|
||||
existing.Stop()
|
||||
delete(a.checkMonitors, check.CheckID)
|
||||
}
|
||||
if chkType.Interval < MinInterval {
|
||||
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
|
||||
|
@ -1747,7 +1765,8 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
|||
}
|
||||
monitor.Start()
|
||||
a.checkMonitors[check.CheckID] = monitor
|
||||
} else {
|
||||
|
||||
default:
|
||||
return fmt.Errorf("Check type is not valid")
|
||||
}
|
||||
|
||||
|
|
173
agent/check.go
173
agent/check.go
|
@ -4,6 +4,7 @@ import (
|
|||
"crypto/tls"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
|
@ -14,7 +15,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/armon/circbuf"
|
||||
docker "github.com/fsouza/go-dockerclient"
|
||||
"github.com/hashicorp/consul/agent/consul/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
|
@ -516,14 +516,6 @@ func (c *CheckTCP) check() {
|
|||
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("TCP connect %s: Success", c.TCP))
|
||||
}
|
||||
|
||||
// DockerClient defines an interface for a docker client
|
||||
// which is used for injecting a fake client during tests.
|
||||
type DockerClient interface {
|
||||
CreateExec(docker.CreateExecOptions) (*docker.Exec, error)
|
||||
StartExec(string, docker.StartExecOptions) error
|
||||
InspectExec(string) (*docker.ExecInspect, error)
|
||||
}
|
||||
|
||||
// CheckDocker is used to periodically invoke a script to
|
||||
// determine the health of an application running inside a
|
||||
// Docker Container. We assume that the script is compatible
|
||||
|
@ -537,137 +529,100 @@ type CheckDocker struct {
|
|||
Interval time.Duration
|
||||
Logger *log.Logger
|
||||
|
||||
dockerClient DockerClient
|
||||
cmd []string
|
||||
stop bool
|
||||
stopCh chan struct{}
|
||||
stopLock sync.Mutex
|
||||
client *DockerClient
|
||||
stop chan struct{}
|
||||
}
|
||||
|
||||
// Init initializes the Docker Client
|
||||
func (c *CheckDocker) Init() error {
|
||||
var err error
|
||||
c.dockerClient, err = docker.NewClientFromEnv()
|
||||
if err != nil {
|
||||
c.Logger.Printf("[DEBUG] Error creating the Docker client: %s", err.Error())
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start is used to start checks.
|
||||
// Docker Checks runs until stop is called
|
||||
func (c *CheckDocker) Start() {
|
||||
c.stopLock.Lock()
|
||||
defer c.stopLock.Unlock()
|
||||
|
||||
//figure out the shell
|
||||
if c.Shell == "" {
|
||||
c.Shell = shell()
|
||||
if c.stop != nil {
|
||||
panic("Docker check already started")
|
||||
}
|
||||
|
||||
c.cmd = []string{c.Shell, "-c", c.Script}
|
||||
if c.Logger == nil {
|
||||
c.Logger = log.New(ioutil.Discard, "", 0)
|
||||
}
|
||||
|
||||
c.stop = false
|
||||
c.stopCh = make(chan struct{})
|
||||
if c.Shell == "" {
|
||||
c.Shell = os.Getenv("SHELL")
|
||||
if c.Shell == "" {
|
||||
c.Shell = "/bin/sh"
|
||||
}
|
||||
}
|
||||
c.stop = make(chan struct{})
|
||||
go c.run()
|
||||
}
|
||||
|
||||
// Stop is used to stop a docker check.
|
||||
func (c *CheckDocker) Stop() {
|
||||
c.stopLock.Lock()
|
||||
defer c.stopLock.Unlock()
|
||||
if !c.stop {
|
||||
c.stop = true
|
||||
close(c.stopCh)
|
||||
if c.stop == nil {
|
||||
panic("Stop called before start")
|
||||
}
|
||||
close(c.stop)
|
||||
}
|
||||
|
||||
// run is invoked by a goroutine to run until Stop() is called
|
||||
func (c *CheckDocker) run() {
|
||||
// Get the randomized initial pause time
|
||||
initialPauseTime := lib.RandomStagger(c.Interval)
|
||||
c.Logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s -c %s in container %s", initialPauseTime, c.Shell, c.Script, c.DockerContainerID)
|
||||
next := time.After(initialPauseTime)
|
||||
firstWait := lib.RandomStagger(c.Interval)
|
||||
c.Logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s -c %s in container %s", firstWait, c.Shell, c.Script, c.DockerContainerID)
|
||||
next := time.After(firstWait)
|
||||
for {
|
||||
select {
|
||||
case <-next:
|
||||
c.check()
|
||||
next = time.After(c.Interval)
|
||||
case <-c.stopCh:
|
||||
case <-c.stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CheckDocker) check() {
|
||||
//Set up the Exec since
|
||||
execOpts := docker.CreateExecOptions{
|
||||
AttachStdin: false,
|
||||
AttachStdout: true,
|
||||
AttachStderr: true,
|
||||
Tty: false,
|
||||
Cmd: c.cmd,
|
||||
Container: c.DockerContainerID,
|
||||
}
|
||||
var (
|
||||
exec *docker.Exec
|
||||
err error
|
||||
)
|
||||
if exec, err = c.dockerClient.CreateExec(execOpts); err != nil {
|
||||
c.Logger.Printf("[DEBUG] agent: Error while creating Exec: %s", err.Error())
|
||||
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, fmt.Sprintf("Unable to create Exec, error: %s", err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
// Collect the output
|
||||
output, _ := circbuf.NewBuffer(CheckBufSize)
|
||||
|
||||
err = c.dockerClient.StartExec(exec.ID, docker.StartExecOptions{Detach: false, Tty: false, OutputStream: output, ErrorStream: output})
|
||||
var out string
|
||||
status, b, err := c.doCheck()
|
||||
if err != nil {
|
||||
c.Logger.Printf("[DEBUG] Error in executing health checks: %s", err.Error())
|
||||
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, fmt.Sprintf("Unable to start Exec: %s", err.Error()))
|
||||
return
|
||||
c.Logger.Printf("[DEBUG] agent: Check '%s': %s", c.CheckID, err)
|
||||
out = err.Error()
|
||||
} else {
|
||||
// out is already limited to CheckBufSize since we're getting a
|
||||
// limited buffer. So we don't need to truncate it just report
|
||||
// that it was truncated.
|
||||
out = string(b.Bytes())
|
||||
if int(b.TotalWritten()) > len(out) {
|
||||
out = fmt.Sprintf("Captured %d of %d bytes\n...\n%s", len(out), b.TotalWritten(), out)
|
||||
}
|
||||
c.Logger.Printf("[DEBUG] agent: Check '%s' script '%s' output: %s", c.CheckID, c.Script, out)
|
||||
}
|
||||
|
||||
// Get the output, add a message about truncation
|
||||
outputStr := string(output.Bytes())
|
||||
if output.TotalWritten() > output.Size() {
|
||||
outputStr = fmt.Sprintf("Captured %d of %d bytes\n...\n%s",
|
||||
output.Size(), output.TotalWritten(), outputStr)
|
||||
if status == api.HealthCritical {
|
||||
c.Logger.Printf("[WARN] agent: Check '%v' is now critical", c.CheckID)
|
||||
}
|
||||
|
||||
c.Logger.Printf("[DEBUG] agent: Check '%s' script '%s' output: %s",
|
||||
c.CheckID, c.Script, outputStr)
|
||||
|
||||
execInfo, err := c.dockerClient.InspectExec(exec.ID)
|
||||
if err != nil {
|
||||
c.Logger.Printf("[DEBUG] Error in inspecting check result : %s", err.Error())
|
||||
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, fmt.Sprintf("Unable to inspect Exec: %s", err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
// Sets the status of the check to healthy if exit code is 0
|
||||
if execInfo.ExitCode == 0 {
|
||||
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, outputStr)
|
||||
return
|
||||
}
|
||||
|
||||
// Set the status of the check to Warning if exit code is 1
|
||||
if execInfo.ExitCode == 1 {
|
||||
c.Logger.Printf("[DEBUG] Check failed with exit code: %d", execInfo.ExitCode)
|
||||
c.Notify.UpdateCheck(c.CheckID, api.HealthWarning, outputStr)
|
||||
return
|
||||
}
|
||||
|
||||
// Set the health as critical
|
||||
c.Logger.Printf("[WARN] agent: Check '%v' is now critical", c.CheckID)
|
||||
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, outputStr)
|
||||
c.Notify.UpdateCheck(c.CheckID, status, out)
|
||||
}
|
||||
|
||||
func shell() string {
|
||||
if sh := os.Getenv("SHELL"); sh != "" {
|
||||
return sh
|
||||
func (c *CheckDocker) doCheck() (string, *circbuf.Buffer, error) {
|
||||
cmd := []string{c.Shell, "-c", c.Script}
|
||||
execID, err := c.client.CreateExec(c.DockerContainerID, cmd)
|
||||
if err != nil {
|
||||
return api.HealthCritical, nil, err
|
||||
}
|
||||
|
||||
buf, err := c.client.StartExec(c.DockerContainerID, execID)
|
||||
if err != nil {
|
||||
return api.HealthCritical, nil, err
|
||||
}
|
||||
|
||||
exitCode, err := c.client.InspectExec(c.DockerContainerID, execID)
|
||||
if err != nil {
|
||||
return api.HealthCritical, nil, err
|
||||
}
|
||||
|
||||
switch exitCode {
|
||||
case 0:
|
||||
return api.HealthPassing, buf, nil
|
||||
case 1:
|
||||
c.Logger.Printf("[DEBUG] Check failed with exit code: %d", exitCode)
|
||||
return api.HealthWarning, buf, nil
|
||||
default:
|
||||
c.Logger.Printf("[DEBUG] Check failed with exit code: %d", exitCode)
|
||||
return api.HealthCritical, buf, nil
|
||||
}
|
||||
return "/bin/sh"
|
||||
}
|
||||
|
|
|
@ -2,21 +2,18 @@ package agent
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"os/exec"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
docker "github.com/fsouza/go-dockerclient"
|
||||
"github.com/hashicorp/consul/agent/mock"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/testutil/retry"
|
||||
|
@ -512,260 +509,288 @@ func TestCheckTCPPassing(t *testing.T) {
|
|||
tcpServer.Close()
|
||||
}
|
||||
|
||||
// A fake docker client to test happy path scenario
|
||||
type fakeDockerClientWithNoErrors struct {
|
||||
}
|
||||
|
||||
func (d *fakeDockerClientWithNoErrors) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) {
|
||||
return &docker.Exec{ID: "123"}, nil
|
||||
}
|
||||
|
||||
func (d *fakeDockerClientWithNoErrors) StartExec(id string, opts docker.StartExecOptions) error {
|
||||
fmt.Fprint(opts.OutputStream, "output")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *fakeDockerClientWithNoErrors) InspectExec(id string) (*docker.ExecInspect, error) {
|
||||
return &docker.ExecInspect{
|
||||
ID: "123",
|
||||
ExitCode: 0,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// A fake docker client to test truncation of output
|
||||
type fakeDockerClientWithLongOutput struct {
|
||||
}
|
||||
|
||||
func (d *fakeDockerClientWithLongOutput) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) {
|
||||
return &docker.Exec{ID: "123"}, nil
|
||||
}
|
||||
|
||||
func (d *fakeDockerClientWithLongOutput) StartExec(id string, opts docker.StartExecOptions) error {
|
||||
b, _ := exec.Command("od", "-N", "81920", "/dev/urandom").Output()
|
||||
fmt.Fprint(opts.OutputStream, string(b))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *fakeDockerClientWithLongOutput) InspectExec(id string) (*docker.ExecInspect, error) {
|
||||
return &docker.ExecInspect{
|
||||
ID: "123",
|
||||
ExitCode: 0,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// A fake docker client to test non-zero exit codes from exec invocation
|
||||
type fakeDockerClientWithExecNonZeroExitCode struct {
|
||||
}
|
||||
|
||||
func (d *fakeDockerClientWithExecNonZeroExitCode) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) {
|
||||
return &docker.Exec{ID: "123"}, nil
|
||||
}
|
||||
|
||||
func (d *fakeDockerClientWithExecNonZeroExitCode) StartExec(id string, opts docker.StartExecOptions) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *fakeDockerClientWithExecNonZeroExitCode) InspectExec(id string) (*docker.ExecInspect, error) {
|
||||
return &docker.ExecInspect{
|
||||
ID: "123",
|
||||
ExitCode: 127,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// A fake docker client to test exit code which result into Warning
|
||||
type fakeDockerClientWithExecExitCodeOne struct {
|
||||
}
|
||||
|
||||
func (d *fakeDockerClientWithExecExitCodeOne) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) {
|
||||
return &docker.Exec{ID: "123"}, nil
|
||||
}
|
||||
|
||||
func (d *fakeDockerClientWithExecExitCodeOne) StartExec(id string, opts docker.StartExecOptions) error {
|
||||
fmt.Fprint(opts.OutputStream, "output")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *fakeDockerClientWithExecExitCodeOne) InspectExec(id string) (*docker.ExecInspect, error) {
|
||||
return &docker.ExecInspect{
|
||||
ID: "123",
|
||||
ExitCode: 1,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// A fake docker client to simulate create exec failing
|
||||
type fakeDockerClientWithCreateExecFailure struct {
|
||||
}
|
||||
|
||||
func (d *fakeDockerClientWithCreateExecFailure) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) {
|
||||
return nil, errors.New("Exec Creation Failed")
|
||||
}
|
||||
|
||||
func (d *fakeDockerClientWithCreateExecFailure) StartExec(id string, opts docker.StartExecOptions) error {
|
||||
return errors.New("Exec doesn't exist")
|
||||
}
|
||||
|
||||
func (d *fakeDockerClientWithCreateExecFailure) InspectExec(id string) (*docker.ExecInspect, error) {
|
||||
return nil, errors.New("Exec doesn't exist")
|
||||
}
|
||||
|
||||
// A fake docker client to simulate start exec failing
|
||||
type fakeDockerClientWithStartExecFailure struct {
|
||||
}
|
||||
|
||||
func (d *fakeDockerClientWithStartExecFailure) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) {
|
||||
return &docker.Exec{ID: "123"}, nil
|
||||
}
|
||||
|
||||
func (d *fakeDockerClientWithStartExecFailure) StartExec(id string, opts docker.StartExecOptions) error {
|
||||
return errors.New("Couldn't Start Exec")
|
||||
}
|
||||
|
||||
func (d *fakeDockerClientWithStartExecFailure) InspectExec(id string) (*docker.ExecInspect, error) {
|
||||
return nil, errors.New("Exec doesn't exist")
|
||||
}
|
||||
|
||||
// A fake docker client to test exec info query failures
|
||||
type fakeDockerClientWithExecInfoErrors struct {
|
||||
}
|
||||
|
||||
func (d *fakeDockerClientWithExecInfoErrors) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) {
|
||||
return &docker.Exec{ID: "123"}, nil
|
||||
}
|
||||
|
||||
func (d *fakeDockerClientWithExecInfoErrors) StartExec(id string, opts docker.StartExecOptions) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *fakeDockerClientWithExecInfoErrors) InspectExec(id string) (*docker.ExecInspect, error) {
|
||||
return nil, errors.New("Unable to query exec info")
|
||||
}
|
||||
|
||||
func expectDockerCheckStatus(t *testing.T, dockerClient DockerClient, status string, output string) {
|
||||
notif := mock.NewNotify()
|
||||
check := &CheckDocker{
|
||||
Notify: notif,
|
||||
CheckID: types.CheckID("foo"),
|
||||
Script: "/health.sh",
|
||||
DockerContainerID: "54432bad1fc7",
|
||||
Shell: "/bin/sh",
|
||||
Interval: 25 * time.Millisecond,
|
||||
Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags),
|
||||
dockerClient: dockerClient,
|
||||
}
|
||||
check.Start()
|
||||
defer check.Stop()
|
||||
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
|
||||
// Should have at least 2 updates
|
||||
if notif.Updates("foo") < 2 {
|
||||
t.Fatalf("should have 2 updates %v", notif.UpdatesMap())
|
||||
func TestCheck_Docker(t *testing.T) {
|
||||
tests := []struct {
|
||||
desc string
|
||||
handlers map[string]http.HandlerFunc
|
||||
out *regexp.Regexp
|
||||
state string
|
||||
}{
|
||||
{
|
||||
desc: "create exec: bad container id",
|
||||
handlers: map[string]http.HandlerFunc{
|
||||
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(404)
|
||||
},
|
||||
},
|
||||
out: regexp.MustCompile("^create exec failed for unknown container 123$"),
|
||||
state: api.HealthCritical,
|
||||
},
|
||||
{
|
||||
desc: "create exec: paused container",
|
||||
handlers: map[string]http.HandlerFunc{
|
||||
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(409)
|
||||
},
|
||||
},
|
||||
out: regexp.MustCompile("^create exec failed since container 123 is paused or stopped$"),
|
||||
state: api.HealthCritical,
|
||||
},
|
||||
{
|
||||
desc: "create exec: bad status code",
|
||||
handlers: map[string]http.HandlerFunc{
|
||||
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(999)
|
||||
fmt.Fprint(w, "some output")
|
||||
},
|
||||
},
|
||||
out: regexp.MustCompile("^create exec failed for container 123 with status 999: some output$"),
|
||||
state: api.HealthCritical,
|
||||
},
|
||||
{
|
||||
desc: "create exec: bad json",
|
||||
handlers: map[string]http.HandlerFunc{
|
||||
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(201)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprint(w, `this is not json`)
|
||||
},
|
||||
},
|
||||
out: regexp.MustCompile("^create exec response for container 123 cannot be parsed: .*$"),
|
||||
state: api.HealthCritical,
|
||||
},
|
||||
{
|
||||
desc: "start exec: bad exec id",
|
||||
handlers: map[string]http.HandlerFunc{
|
||||
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(201)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprint(w, `{"Id":"456"}`)
|
||||
},
|
||||
"POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(404)
|
||||
},
|
||||
},
|
||||
out: regexp.MustCompile("^start exec failed for container 123: invalid exec id 456$"),
|
||||
state: api.HealthCritical,
|
||||
},
|
||||
{
|
||||
desc: "start exec: paused container",
|
||||
handlers: map[string]http.HandlerFunc{
|
||||
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(201)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprint(w, `{"Id":"456"}`)
|
||||
},
|
||||
"POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(409)
|
||||
},
|
||||
},
|
||||
out: regexp.MustCompile("^start exec failed since container 123 is paused or stopped$"),
|
||||
state: api.HealthCritical,
|
||||
},
|
||||
{
|
||||
desc: "start exec: bad status code",
|
||||
handlers: map[string]http.HandlerFunc{
|
||||
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(201)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprint(w, `{"Id":"456"}`)
|
||||
},
|
||||
"POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(999)
|
||||
fmt.Fprint(w, "some output")
|
||||
},
|
||||
},
|
||||
out: regexp.MustCompile("^start exec failed for container 123 with status 999: some output$"),
|
||||
state: api.HealthCritical,
|
||||
},
|
||||
{
|
||||
desc: "inspect exec: bad exec id",
|
||||
handlers: map[string]http.HandlerFunc{
|
||||
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(201)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprint(w, `{"Id":"456"}`)
|
||||
},
|
||||
"POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
fmt.Fprint(w, "OK")
|
||||
},
|
||||
"GET /exec/456/json": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(404)
|
||||
},
|
||||
},
|
||||
out: regexp.MustCompile("^inspect exec failed for container 123: invalid exec id 456$"),
|
||||
state: api.HealthCritical,
|
||||
},
|
||||
{
|
||||
desc: "inspect exec: bad status code",
|
||||
handlers: map[string]http.HandlerFunc{
|
||||
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(201)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprint(w, `{"Id":"456"}`)
|
||||
},
|
||||
"POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
fmt.Fprint(w, "OK")
|
||||
},
|
||||
"GET /exec/456/json": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(999)
|
||||
fmt.Fprint(w, "some output")
|
||||
},
|
||||
},
|
||||
out: regexp.MustCompile("^inspect exec failed for container 123 with status 999: some output$"),
|
||||
state: api.HealthCritical,
|
||||
},
|
||||
{
|
||||
desc: "inspect exec: bad json",
|
||||
handlers: map[string]http.HandlerFunc{
|
||||
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(201)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprint(w, `{"Id":"456"}`)
|
||||
},
|
||||
"POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
fmt.Fprint(w, "OK")
|
||||
},
|
||||
"GET /exec/456/json": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprint(w, `this is not json`)
|
||||
},
|
||||
},
|
||||
out: regexp.MustCompile("^inspect exec response for container 123 cannot be parsed: .*$"),
|
||||
state: api.HealthCritical,
|
||||
},
|
||||
{
|
||||
desc: "inspect exec: exit code 0: passing",
|
||||
handlers: map[string]http.HandlerFunc{
|
||||
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(201)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprint(w, `{"Id":"456"}`)
|
||||
},
|
||||
"POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
fmt.Fprint(w, "OK")
|
||||
},
|
||||
"GET /exec/456/json": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprint(w, `{"ExitCode":0}`)
|
||||
},
|
||||
},
|
||||
out: regexp.MustCompile("^OK$"),
|
||||
state: api.HealthPassing,
|
||||
},
|
||||
{
|
||||
desc: "inspect exec: exit code 0: passing: truncated",
|
||||
handlers: map[string]http.HandlerFunc{
|
||||
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(201)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprint(w, `{"Id":"456"}`)
|
||||
},
|
||||
"POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
fmt.Fprint(w, "01234567890123456789OK") // more than 20 bytes
|
||||
},
|
||||
"GET /exec/456/json": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprint(w, `{"ExitCode":0}`)
|
||||
},
|
||||
},
|
||||
out: regexp.MustCompile("^Captured 20 of 22 bytes\n...\n234567890123456789OK$"),
|
||||
state: api.HealthPassing,
|
||||
},
|
||||
{
|
||||
desc: "inspect exec: exit code 1: warning",
|
||||
handlers: map[string]http.HandlerFunc{
|
||||
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(201)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprint(w, `{"Id":"456"}`)
|
||||
},
|
||||
"POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
fmt.Fprint(w, "WARN")
|
||||
},
|
||||
"GET /exec/456/json": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprint(w, `{"ExitCode":1}`)
|
||||
},
|
||||
},
|
||||
out: regexp.MustCompile("^WARN$"),
|
||||
state: api.HealthWarning,
|
||||
},
|
||||
{
|
||||
desc: "inspect exec: exit code 2: critical",
|
||||
handlers: map[string]http.HandlerFunc{
|
||||
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(201)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprint(w, `{"Id":"456"}`)
|
||||
},
|
||||
"POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
fmt.Fprint(w, "NOK")
|
||||
},
|
||||
"GET /exec/456/json": func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprint(w, `{"ExitCode":2}`)
|
||||
},
|
||||
},
|
||||
out: regexp.MustCompile("^NOK$"),
|
||||
state: api.HealthCritical,
|
||||
},
|
||||
}
|
||||
|
||||
if notif.State("foo") != status {
|
||||
t.Fatalf("should be %v %v", status, notif.StateMap())
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.desc, func(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
x := r.Method + " " + r.RequestURI
|
||||
h := tt.handlers[x]
|
||||
if h == nil {
|
||||
t.Fatalf("bad url %s", x)
|
||||
}
|
||||
h(w, r)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
if notif.Output("foo") != output {
|
||||
t.Fatalf("should be %v %v", output, notif.OutputMap())
|
||||
}
|
||||
}
|
||||
|
||||
func TestDockerCheckWhenExecReturnsSuccessExitCode(t *testing.T) {
|
||||
t.Parallel()
|
||||
expectDockerCheckStatus(t, &fakeDockerClientWithNoErrors{}, api.HealthPassing, "output")
|
||||
}
|
||||
|
||||
func TestDockerCheckWhenExecCreationFails(t *testing.T) {
|
||||
t.Parallel()
|
||||
expectDockerCheckStatus(t, &fakeDockerClientWithCreateExecFailure{}, api.HealthCritical, "Unable to create Exec, error: Exec Creation Failed")
|
||||
}
|
||||
|
||||
func TestDockerCheckWhenExitCodeIsNonZero(t *testing.T) {
|
||||
t.Parallel()
|
||||
expectDockerCheckStatus(t, &fakeDockerClientWithExecNonZeroExitCode{}, api.HealthCritical, "")
|
||||
}
|
||||
|
||||
func TestDockerCheckWhenExitCodeIsone(t *testing.T) {
|
||||
t.Parallel()
|
||||
expectDockerCheckStatus(t, &fakeDockerClientWithExecExitCodeOne{}, api.HealthWarning, "output")
|
||||
}
|
||||
|
||||
func TestDockerCheckWhenExecStartFails(t *testing.T) {
|
||||
t.Parallel()
|
||||
expectDockerCheckStatus(t, &fakeDockerClientWithStartExecFailure{}, api.HealthCritical, "Unable to start Exec: Couldn't Start Exec")
|
||||
}
|
||||
|
||||
func TestDockerCheckWhenExecInfoFails(t *testing.T) {
|
||||
t.Parallel()
|
||||
expectDockerCheckStatus(t, &fakeDockerClientWithExecInfoErrors{}, api.HealthCritical, "Unable to inspect Exec: Unable to query exec info")
|
||||
}
|
||||
|
||||
func TestDockerCheckDefaultToSh(t *testing.T) {
|
||||
t.Parallel()
|
||||
os.Setenv("SHELL", "")
|
||||
notif := mock.NewNotify()
|
||||
check := &CheckDocker{
|
||||
Notify: notif,
|
||||
CheckID: types.CheckID("foo"),
|
||||
Script: "/health.sh",
|
||||
DockerContainerID: "54432bad1fc7",
|
||||
Interval: 10 * time.Millisecond,
|
||||
Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags),
|
||||
dockerClient: &fakeDockerClientWithNoErrors{},
|
||||
}
|
||||
check.Start()
|
||||
defer check.Stop()
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
if check.Shell != "/bin/sh" {
|
||||
t.Fatalf("Shell should be: %v , actual: %v", "/bin/sh", check.Shell)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDockerCheckUseShellFromEnv(t *testing.T) {
|
||||
t.Parallel()
|
||||
notif := mock.NewNotify()
|
||||
os.Setenv("SHELL", "/bin/bash")
|
||||
check := &CheckDocker{
|
||||
Notify: notif,
|
||||
CheckID: types.CheckID("foo"),
|
||||
Script: "/health.sh",
|
||||
DockerContainerID: "54432bad1fc7",
|
||||
Interval: 10 * time.Millisecond,
|
||||
Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags),
|
||||
dockerClient: &fakeDockerClientWithNoErrors{},
|
||||
}
|
||||
check.Start()
|
||||
defer check.Stop()
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
if check.Shell != "/bin/bash" {
|
||||
t.Fatalf("Shell should be: %v , actual: %v", "/bin/bash", check.Shell)
|
||||
}
|
||||
os.Setenv("SHELL", "")
|
||||
}
|
||||
|
||||
func TestDockerCheckTruncateOutput(t *testing.T) {
|
||||
t.Parallel()
|
||||
notif := mock.NewNotify()
|
||||
check := &CheckDocker{
|
||||
Notify: notif,
|
||||
CheckID: types.CheckID("foo"),
|
||||
Script: "/health.sh",
|
||||
DockerContainerID: "54432bad1fc7",
|
||||
Shell: "/bin/sh",
|
||||
Interval: 10 * time.Millisecond,
|
||||
Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags),
|
||||
dockerClient: &fakeDockerClientWithLongOutput{},
|
||||
}
|
||||
check.Start()
|
||||
defer check.Stop()
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
// Allow for extra bytes for the truncation message
|
||||
if len(notif.Output("foo")) > CheckBufSize+100 {
|
||||
t.Fatalf("output size is too long")
|
||||
// create a docker client with a tiny output buffer
|
||||
// to test the truncation
|
||||
c, err := NewDockerClient(srv.URL, 20)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
notif, upd := mock.NewNotifyChan()
|
||||
id := types.CheckID("chk")
|
||||
check := &CheckDocker{
|
||||
Notify: notif,
|
||||
CheckID: id,
|
||||
Script: "/health.sh",
|
||||
DockerContainerID: "123",
|
||||
Interval: 25 * time.Millisecond,
|
||||
client: c,
|
||||
}
|
||||
check.Start()
|
||||
defer check.Stop()
|
||||
|
||||
<-upd // wait for update
|
||||
|
||||
if got, want := notif.Output(id), tt.out; !want.MatchString(got) {
|
||||
t.Fatalf("got %q want %q", got, want)
|
||||
}
|
||||
if got, want := notif.State(id), tt.state; got != want {
|
||||
t.Fatalf("got status %q want %q", got, want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,153 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/armon/circbuf"
|
||||
)
|
||||
|
||||
// DockerClient is a simplified client for the Docker Engine API
|
||||
// to execute the health checks and avoid significant dependencies.
|
||||
// It also consumes all data returned from the Docker API through
|
||||
// a ring buffer with a fixed limit to avoid excessive resource
|
||||
// consumption.
|
||||
type DockerClient struct {
|
||||
network string
|
||||
addr string
|
||||
baseurl string
|
||||
maxbuf int64
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
func NewDockerClient(host string, maxbuf int64) (*DockerClient, error) {
|
||||
if host == "" {
|
||||
host = DefaultDockerHost
|
||||
}
|
||||
p := strings.SplitN(host, "://", 2)
|
||||
if len(p) != 2 {
|
||||
return nil, fmt.Errorf("invalid docker host: %s", host)
|
||||
}
|
||||
network, addr := p[0], p[1]
|
||||
basepath := "http://" + addr
|
||||
if network == "unix" {
|
||||
basepath = "http://unix"
|
||||
}
|
||||
client := &http.Client{}
|
||||
if network == "unix" {
|
||||
client.Transport = &http.Transport{
|
||||
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
|
||||
return net.Dial(network, addr)
|
||||
},
|
||||
}
|
||||
}
|
||||
return &DockerClient{network, addr, basepath, maxbuf, client}, nil
|
||||
}
|
||||
|
||||
func (c *DockerClient) call(method, uri string, v interface{}) (*circbuf.Buffer, int, error) {
|
||||
urlstr := c.baseurl + uri
|
||||
req, err := http.NewRequest(method, urlstr, nil)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
if v != nil {
|
||||
var b bytes.Buffer
|
||||
if err := json.NewEncoder(&b).Encode(v); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
req.Body = ioutil.NopCloser(&b)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
}
|
||||
|
||||
resp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
b, err := circbuf.NewBuffer(c.maxbuf)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
_, err = io.Copy(b, resp.Body)
|
||||
return b, resp.StatusCode, err
|
||||
}
|
||||
|
||||
func (c *DockerClient) CreateExec(containerID string, cmd []string) (string, error) {
|
||||
data := struct {
|
||||
AttachStdin bool
|
||||
AttachStdout bool
|
||||
AttachStderr bool
|
||||
Tty bool
|
||||
Cmd []string
|
||||
}{
|
||||
AttachStderr: true,
|
||||
AttachStdout: true,
|
||||
Cmd: cmd,
|
||||
}
|
||||
|
||||
uri := fmt.Sprintf("/containers/%s/exec", url.QueryEscape(containerID))
|
||||
b, code, err := c.call("POST", uri, data)
|
||||
switch {
|
||||
case err != nil:
|
||||
return "", fmt.Errorf("create exec failed for container %s: %s", containerID, err)
|
||||
case code == 201:
|
||||
var resp struct{ Id string }
|
||||
if err = json.NewDecoder(bytes.NewReader(b.Bytes())).Decode(&resp); err != nil {
|
||||
return "", fmt.Errorf("create exec response for container %s cannot be parsed: %s", containerID, err)
|
||||
}
|
||||
return resp.Id, nil
|
||||
case code == 404:
|
||||
return "", fmt.Errorf("create exec failed for unknown container %s", containerID)
|
||||
case code == 409:
|
||||
return "", fmt.Errorf("create exec failed since container %s is paused or stopped", containerID)
|
||||
default:
|
||||
return "", fmt.Errorf("create exec failed for container %s with status %d: %s", containerID, code, b)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *DockerClient) StartExec(containerID, execID string) (*circbuf.Buffer, error) {
|
||||
data := struct{ Detach, Tty bool }{Detach: false, Tty: true}
|
||||
uri := fmt.Sprintf("/exec/%s/start", execID)
|
||||
b, code, err := c.call("POST", uri, data)
|
||||
switch {
|
||||
case err != nil:
|
||||
return nil, fmt.Errorf("start exec failed for container %s: %s", containerID, err)
|
||||
case code == 200:
|
||||
return b, nil
|
||||
case code == 404:
|
||||
return nil, fmt.Errorf("start exec failed for container %s: invalid exec id %s", containerID, execID)
|
||||
case code == 409:
|
||||
return nil, fmt.Errorf("start exec failed since container %s is paused or stopped", containerID)
|
||||
default:
|
||||
return nil, fmt.Errorf("start exec failed for container %s with status %d: %s", containerID, code, b)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *DockerClient) InspectExec(containerID, execID string) (int, error) {
|
||||
uri := fmt.Sprintf("/exec/%s/json", execID)
|
||||
b, code, err := c.call("GET", uri, nil)
|
||||
switch {
|
||||
case err != nil:
|
||||
return 0, fmt.Errorf("inspect exec failed for container %s: %s", containerID, err)
|
||||
case code == 200:
|
||||
var resp struct{ ExitCode int }
|
||||
if err := json.NewDecoder(bytes.NewReader(b.Bytes())).Decode(&resp); err != nil {
|
||||
return 0, fmt.Errorf("inspect exec response for container %s cannot be parsed: %s", containerID, err)
|
||||
}
|
||||
return resp.ExitCode, nil
|
||||
case code == 404:
|
||||
return 0, fmt.Errorf("inspect exec failed for container %s: invalid exec id %s", containerID, execID)
|
||||
default:
|
||||
return 0, fmt.Errorf("inspect exec failed for container %s with status %d: %s", containerID, code, b)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,3 @@
|
|||
package agent
|
||||
|
||||
const DefaultDockerHost = "unix:///var/run/docker.sock"
|
|
@ -0,0 +1,3 @@
|
|||
package agent
|
||||
|
||||
const DefaultDockerHost = "npipe:////./pipe/docker_engine"
|
|
@ -8,14 +8,15 @@ import (
|
|||
)
|
||||
|
||||
type Notify struct {
|
||||
state map[types.CheckID]string
|
||||
updates map[types.CheckID]int
|
||||
output map[types.CheckID]string
|
||||
updated chan int
|
||||
|
||||
// A guard to protect an access to the internal attributes
|
||||
// of the notification mock in order to prevent panics
|
||||
// raised by the race conditions detector.
|
||||
sync.RWMutex
|
||||
state map[types.CheckID]string
|
||||
updates map[types.CheckID]int
|
||||
output map[types.CheckID]string
|
||||
}
|
||||
|
||||
func NewNotify() *Notify {
|
||||
|
@ -26,6 +27,16 @@ func NewNotify() *Notify {
|
|||
}
|
||||
}
|
||||
|
||||
func NewNotifyChan() (*Notify, chan int) {
|
||||
n := &Notify{
|
||||
updated: make(chan int),
|
||||
state: make(map[types.CheckID]string),
|
||||
updates: make(map[types.CheckID]int),
|
||||
output: make(map[types.CheckID]string),
|
||||
}
|
||||
return n, n.updated
|
||||
}
|
||||
|
||||
func (m *Notify) sprintf(v interface{}) string {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
@ -38,12 +49,15 @@ func (m *Notify) OutputMap() string { return m.sprintf(m.output) }
|
|||
|
||||
func (m *Notify) UpdateCheck(id types.CheckID, status, output string) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
m.state[id] = status
|
||||
old := m.updates[id]
|
||||
m.updates[id] = old + 1
|
||||
m.output[id] = output
|
||||
m.Unlock()
|
||||
|
||||
if m.updated != nil {
|
||||
m.updated <- 1
|
||||
}
|
||||
}
|
||||
|
||||
// State returns the state of the specified health-check.
|
||||
|
|
|
@ -163,4 +163,4 @@
|
|||
{"checksumSHA1":"yHpUeGwKoqqwd3cbEp3lkcnvft0=","path":"google.golang.org/grpc/transport","revision":"50955793b0183f9de69bd78e2ec251cf20aab121","revisionTime":"2017-01-11T19:10:52Z"}
|
||||
],
|
||||
"rootPath": "github.com/hashicorp/consul"
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue