mirror of https://github.com/status-im/consul.git
agent: support multiple checks per service
This commit is contained in:
parent
4c27d125ac
commit
674be58e55
|
@ -9,6 +9,7 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/consul/consul"
|
||||
|
@ -582,15 +583,17 @@ func (a *Agent) purgeCheck(checkID string) error {
|
|||
// AddService is used to add a service entry.
|
||||
// This entry is persistent and the agent will make a best effort to
|
||||
// ensure it is registered
|
||||
func (a *Agent) AddService(service *structs.NodeService, chkType *CheckType, persist bool) error {
|
||||
func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, persist bool) error {
|
||||
if service.Service == "" {
|
||||
return fmt.Errorf("Service name missing")
|
||||
}
|
||||
if service.ID == "" && service.Service != "" {
|
||||
service.ID = service.Service
|
||||
}
|
||||
if chkType != nil && !chkType.Valid() {
|
||||
return fmt.Errorf("Check type is not valid")
|
||||
for _, check := range chkTypes {
|
||||
if !check.Valid() {
|
||||
return fmt.Errorf("Check type is not valid")
|
||||
}
|
||||
}
|
||||
|
||||
// Add the service
|
||||
|
@ -604,10 +607,14 @@ func (a *Agent) AddService(service *structs.NodeService, chkType *CheckType, per
|
|||
}
|
||||
|
||||
// Create an associated health check
|
||||
if chkType != nil {
|
||||
for i, chkType := range chkTypes {
|
||||
checkID := fmt.Sprintf("service:%s", service.ID)
|
||||
if len(chkTypes) > 1 {
|
||||
checkID += fmt.Sprintf(":%d", i+1)
|
||||
}
|
||||
check := &structs.HealthCheck{
|
||||
Node: a.config.NodeName,
|
||||
CheckID: fmt.Sprintf("service:%s", service.ID),
|
||||
CheckID: checkID,
|
||||
Name: fmt.Sprintf("Service '%s' check", service.Service),
|
||||
Status: structs.HealthCritical,
|
||||
Notes: chkType.Notes,
|
||||
|
@ -642,9 +649,14 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error {
|
|||
}
|
||||
|
||||
// Deregister any associated health checks
|
||||
checkID := fmt.Sprintf("service:%s", serviceID)
|
||||
if err := a.RemoveCheck(checkID, persist); err != nil {
|
||||
return err
|
||||
for checkID, _ := range a.state.Checks() {
|
||||
prefix := "service:" + serviceID
|
||||
if checkID != prefix && !strings.HasPrefix(checkID, prefix+":") {
|
||||
continue
|
||||
}
|
||||
if err := a.RemoveCheck(checkID, persist); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("[DEBUG] agent: removed service %q", serviceID)
|
||||
|
@ -663,6 +675,14 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist
|
|||
return fmt.Errorf("Check type is not valid")
|
||||
}
|
||||
|
||||
if check.ServiceID != "" {
|
||||
svc, ok := a.state.Services()[check.ServiceID]
|
||||
if !ok {
|
||||
return fmt.Errorf("ServiceID %q does not exist", check.ServiceID)
|
||||
}
|
||||
check.ServiceName = svc.Service
|
||||
}
|
||||
|
||||
a.checkLock.Lock()
|
||||
defer a.checkLock.Unlock()
|
||||
|
||||
|
@ -864,8 +884,8 @@ func (a *Agent) loadServices(conf *Config) error {
|
|||
// Register the services from config
|
||||
for _, service := range conf.Services {
|
||||
ns := service.NodeService()
|
||||
chkType := service.CheckType()
|
||||
if err := a.AddService(ns, chkType, false); err != nil {
|
||||
chkTypes := service.CheckTypes()
|
||||
if err := a.AddService(ns, chkTypes, false); err != nil {
|
||||
return fmt.Errorf("Failed to register service '%s': %v", service.ID, err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -161,15 +161,17 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re
|
|||
ns := args.NodeService()
|
||||
|
||||
// Verify the check type
|
||||
chkType := args.CheckType()
|
||||
if chkType != nil && !chkType.Valid() {
|
||||
resp.WriteHeader(400)
|
||||
resp.Write([]byte("Must provide TTL or Script and Interval!"))
|
||||
return nil, nil
|
||||
chkTypes := args.CheckTypes()
|
||||
for _, check := range chkTypes {
|
||||
if !check.Valid() {
|
||||
resp.WriteHeader(400)
|
||||
resp.Write([]byte("Must provide TTL or Script and Interval!"))
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Add the check
|
||||
return nil, s.agent.AddService(ns, chkType, true)
|
||||
return nil, s.agent.AddService(ns, chkTypes, true)
|
||||
}
|
||||
|
||||
func (s *HTTPServer) AgentDeregisterService(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
|
|
|
@ -683,3 +683,60 @@ func TestHTTPAgent_DisableNodeMaintenance(t *testing.T) {
|
|||
t.Fatalf("should have removed maintenance check")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHTTPAgentRegisterServiceCheck(t *testing.T) {
|
||||
dir, srv := makeHTTPServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer srv.Shutdown()
|
||||
defer srv.agent.Shutdown()
|
||||
|
||||
// First register the service
|
||||
req, err := http.NewRequest("GET", "/v1/agent/service/register", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
args := &ServiceDefinition{
|
||||
Name: "memcache",
|
||||
Port: 8000,
|
||||
Check: CheckType{
|
||||
TTL: 15 * time.Second,
|
||||
},
|
||||
}
|
||||
req.Body = encodeReq(args)
|
||||
|
||||
if _, err := srv.AgentRegisterService(nil, req); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Now register an additional check
|
||||
req, err = http.NewRequest("GET", "/v1/agent/check/register", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
checkArgs := &CheckDefinition{
|
||||
Name: "memcache_check2",
|
||||
ServiceID: "memcache",
|
||||
CheckType: CheckType{
|
||||
TTL: 15 * time.Second,
|
||||
},
|
||||
}
|
||||
req.Body = encodeReq(checkArgs)
|
||||
|
||||
if _, err := srv.AgentRegisterCheck(nil, req); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Ensure we have a check mapping
|
||||
result := srv.agent.state.Checks()
|
||||
if _, ok := result["service:memcache"]; !ok {
|
||||
t.Fatalf("missing memcached check")
|
||||
}
|
||||
if _, ok := result["memcache_check2"]; !ok {
|
||||
t.Fatalf("missing memcache_check2 check")
|
||||
}
|
||||
|
||||
// Make sure the new check is associated with the service
|
||||
if result["memcache_check2"].ServiceID != "memcache" {
|
||||
t.Fatalf("bad: %#v", result["memcached_check2"])
|
||||
}
|
||||
}
|
||||
|
|
|
@ -139,39 +139,96 @@ func TestAgent_AddService(t *testing.T) {
|
|||
defer os.RemoveAll(dir)
|
||||
defer agent.Shutdown()
|
||||
|
||||
srv := &structs.NodeService{
|
||||
ID: "redis",
|
||||
Service: "redis",
|
||||
Tags: []string{"foo"},
|
||||
Port: 8000,
|
||||
}
|
||||
chk := &CheckType{
|
||||
TTL: time.Minute,
|
||||
Notes: "redis health check",
|
||||
}
|
||||
err := agent.AddService(srv, chk, false)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
// Service registration with a single check
|
||||
{
|
||||
srv := &structs.NodeService{
|
||||
ID: "redis",
|
||||
Service: "redis",
|
||||
Tags: []string{"foo"},
|
||||
Port: 8000,
|
||||
}
|
||||
chkTypes := CheckTypes{
|
||||
&CheckType{
|
||||
TTL: time.Minute,
|
||||
Notes: "redis heath check 2",
|
||||
},
|
||||
}
|
||||
err := agent.AddService(srv, chkTypes, false)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Ensure we have a state mapping
|
||||
if _, ok := agent.state.Services()["redis"]; !ok {
|
||||
t.Fatalf("missing redis service")
|
||||
}
|
||||
|
||||
// Ensure the check is registered
|
||||
if _, ok := agent.state.Checks()["service:redis"]; !ok {
|
||||
t.Fatalf("missing redis check")
|
||||
}
|
||||
|
||||
// Ensure a TTL is setup
|
||||
if _, ok := agent.checkTTLs["service:redis"]; !ok {
|
||||
t.Fatalf("missing redis check ttl")
|
||||
}
|
||||
|
||||
// Ensure the notes are passed through
|
||||
if agent.state.Checks()["service:redis"].Notes == "" {
|
||||
t.Fatalf("missing redis check notes")
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure we have a state mapping
|
||||
if _, ok := agent.state.Services()["redis"]; !ok {
|
||||
t.Fatalf("missing redis service")
|
||||
}
|
||||
// Service registration with multiple checks
|
||||
{
|
||||
srv := &structs.NodeService{
|
||||
ID: "memcache",
|
||||
Service: "memcache",
|
||||
Tags: []string{"bar"},
|
||||
Port: 8000,
|
||||
}
|
||||
chkTypes := CheckTypes{
|
||||
&CheckType{
|
||||
TTL: time.Minute,
|
||||
Notes: "memcache health check 1",
|
||||
},
|
||||
&CheckType{
|
||||
TTL: time.Second,
|
||||
Notes: "memcache heath check 2",
|
||||
},
|
||||
}
|
||||
if err := agent.AddService(srv, chkTypes, false); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Ensure we have a check mapping
|
||||
if _, ok := agent.state.Checks()["service:redis"]; !ok {
|
||||
t.Fatalf("missing redis check")
|
||||
}
|
||||
// Ensure we have a state mapping
|
||||
if _, ok := agent.state.Services()["memcache"]; !ok {
|
||||
t.Fatalf("missing memcache service")
|
||||
}
|
||||
|
||||
// Ensure a TTL is setup
|
||||
if _, ok := agent.checkTTLs["service:redis"]; !ok {
|
||||
t.Fatalf("missing redis check ttl")
|
||||
}
|
||||
// Ensure both checks were added
|
||||
if _, ok := agent.state.Checks()["service:memcache:1"]; !ok {
|
||||
t.Fatalf("missing memcache:1 check")
|
||||
}
|
||||
if _, ok := agent.state.Checks()["service:memcache:2"]; !ok {
|
||||
t.Fatalf("missing memcache:2 check")
|
||||
}
|
||||
|
||||
// Ensure the notes are passed through
|
||||
if agent.state.Checks()["service:redis"].Notes == "" {
|
||||
t.Fatalf("missing redis check notes")
|
||||
// Ensure a TTL is setup
|
||||
if _, ok := agent.checkTTLs["service:memcache:1"]; !ok {
|
||||
t.Fatalf("missing memcache:1 check ttl")
|
||||
}
|
||||
if _, ok := agent.checkTTLs["service:memcache:2"]; !ok {
|
||||
t.Fatalf("missing memcache:2 check ttl")
|
||||
}
|
||||
|
||||
// Ensure the notes are passed through
|
||||
if agent.state.Checks()["service:memcache:1"].Notes == "" {
|
||||
t.Fatalf("missing redis check notes")
|
||||
}
|
||||
if agent.state.Checks()["service:memcache:2"].Notes == "" {
|
||||
t.Fatalf("missing redis check notes")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -190,34 +247,67 @@ func TestAgent_RemoveService(t *testing.T) {
|
|||
t.Fatalf("should have errored")
|
||||
}
|
||||
|
||||
srv := &structs.NodeService{
|
||||
ID: "redis",
|
||||
Service: "redis",
|
||||
Port: 8000,
|
||||
}
|
||||
chk := &CheckType{TTL: time.Minute}
|
||||
if err := agent.AddService(srv, chk, false); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
// Removing a service with a single check works
|
||||
{
|
||||
srv := &structs.NodeService{
|
||||
ID: "memcache",
|
||||
Service: "memcache",
|
||||
Port: 8000,
|
||||
}
|
||||
chkTypes := CheckTypes{&CheckType{TTL: time.Minute}}
|
||||
|
||||
if err := agent.AddService(srv, chkTypes, false); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if err := agent.RemoveService("memcache", false); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if _, ok := agent.state.Checks()["service:memcache"]; ok {
|
||||
t.Fatalf("have memcache check")
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the service
|
||||
if err := agent.RemoveService("redis", false); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
// Removing a service with multiple checks works
|
||||
{
|
||||
srv := &structs.NodeService{
|
||||
ID: "redis",
|
||||
Service: "redis",
|
||||
Port: 8000,
|
||||
}
|
||||
chkTypes := CheckTypes{
|
||||
&CheckType{TTL: time.Minute},
|
||||
&CheckType{TTL: 30 * time.Second},
|
||||
}
|
||||
if err := agent.AddService(srv, chkTypes, false); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Ensure we have a state mapping
|
||||
if _, ok := agent.state.Services()["redis"]; ok {
|
||||
t.Fatalf("have redis service")
|
||||
}
|
||||
// Remove the service
|
||||
if err := agent.RemoveService("redis", false); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Ensure we have a check mapping
|
||||
if _, ok := agent.state.Checks()["service:redis"]; ok {
|
||||
t.Fatalf("have redis check")
|
||||
}
|
||||
// Ensure we have a state mapping
|
||||
if _, ok := agent.state.Services()["redis"]; ok {
|
||||
t.Fatalf("have redis service")
|
||||
}
|
||||
|
||||
// Ensure a TTL is setup
|
||||
if _, ok := agent.checkTTLs["service:redis"]; ok {
|
||||
t.Fatalf("have redis check ttl")
|
||||
// Ensure checks were removed
|
||||
if _, ok := agent.state.Checks()["service:redis:1"]; ok {
|
||||
t.Fatalf("check redis:1 should be removed")
|
||||
}
|
||||
if _, ok := agent.state.Checks()["service:redis:2"]; ok {
|
||||
t.Fatalf("check redis:2 should be removed")
|
||||
}
|
||||
|
||||
// Ensure a TTL is setup
|
||||
if _, ok := agent.checkTTLs["service:redis:1"]; ok {
|
||||
t.Fatalf("check ttl for redis:1 should be removed")
|
||||
}
|
||||
if _, ok := agent.checkTTLs["service:redis:2"]; ok {
|
||||
t.Fatalf("check ttl for redis:2 should be removed")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -285,6 +375,27 @@ func TestAgent_AddCheck_MinInterval(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestAgent_AddCheck_MissingService(t *testing.T) {
|
||||
dir, agent := makeAgent(t, nextConfig())
|
||||
defer os.RemoveAll(dir)
|
||||
defer agent.Shutdown()
|
||||
|
||||
health := &structs.HealthCheck{
|
||||
Node: "foo",
|
||||
CheckID: "baz",
|
||||
Name: "baz check 1",
|
||||
ServiceID: "baz",
|
||||
}
|
||||
chk := &CheckType{
|
||||
Script: "exit 0",
|
||||
Interval: time.Microsecond,
|
||||
}
|
||||
err := agent.AddCheck(health, chk, false)
|
||||
if err == nil || err.Error() != `ServiceID "baz" does not exist` {
|
||||
t.Fatalf("expected service id error, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgent_RemoveCheck(t *testing.T) {
|
||||
dir, agent := makeAgent(t, nextConfig())
|
||||
defer os.RemoveAll(dir)
|
||||
|
@ -534,12 +645,10 @@ func TestAgent_PersistCheck(t *testing.T) {
|
|||
defer agent.Shutdown()
|
||||
|
||||
check := &structs.HealthCheck{
|
||||
Node: config.NodeName,
|
||||
CheckID: "service:redis1",
|
||||
Name: "redischeck",
|
||||
Status: structs.HealthPassing,
|
||||
ServiceID: "redis",
|
||||
ServiceName: "redis",
|
||||
Node: config.NodeName,
|
||||
CheckID: "mem",
|
||||
Name: "memory check",
|
||||
Status: structs.HealthPassing,
|
||||
}
|
||||
chkType := &CheckType{
|
||||
Script: "/bin/true",
|
||||
|
@ -607,12 +716,10 @@ func TestAgent_PurgeCheck(t *testing.T) {
|
|||
defer agent.Shutdown()
|
||||
|
||||
check := &structs.HealthCheck{
|
||||
Node: config.NodeName,
|
||||
CheckID: "service:redis1",
|
||||
Name: "redischeck",
|
||||
Status: structs.HealthPassing,
|
||||
ServiceID: "redis",
|
||||
ServiceName: "redis",
|
||||
Node: config.NodeName,
|
||||
CheckID: "mem",
|
||||
Name: "memory check",
|
||||
Status: structs.HealthPassing,
|
||||
}
|
||||
|
||||
file := filepath.Join(agent.config.DataDir, checksDir, stringHash(check.CheckID))
|
||||
|
@ -645,12 +752,10 @@ func TestAgent_PurgeCheckOnDuplicate(t *testing.T) {
|
|||
defer agent.Shutdown()
|
||||
|
||||
check1 := &structs.HealthCheck{
|
||||
Node: config.NodeName,
|
||||
CheckID: "service:redis1",
|
||||
Name: "redischeck",
|
||||
Status: structs.HealthPassing,
|
||||
ServiceID: "redis",
|
||||
ServiceName: "redis",
|
||||
Node: config.NodeName,
|
||||
CheckID: "mem",
|
||||
Name: "memory check",
|
||||
Status: structs.HealthPassing,
|
||||
}
|
||||
|
||||
// First persist the check
|
||||
|
@ -661,8 +766,8 @@ func TestAgent_PurgeCheckOnDuplicate(t *testing.T) {
|
|||
|
||||
// Start again with the check registered in config
|
||||
check2 := &CheckDefinition{
|
||||
ID: "service:redis1",
|
||||
Name: "redischeck",
|
||||
ID: "mem",
|
||||
Name: "memory check",
|
||||
Notes: "my cool notes",
|
||||
CheckType: CheckType{
|
||||
Script: "/bin/check-redis.py",
|
||||
|
@ -697,16 +802,26 @@ func TestAgent_unloadChecks(t *testing.T) {
|
|||
defer os.RemoveAll(dir)
|
||||
defer agent.Shutdown()
|
||||
|
||||
// First register a service
|
||||
svc := &structs.NodeService{
|
||||
ID: "redis",
|
||||
Service: "redis",
|
||||
Tags: []string{"foo"},
|
||||
Port: 8000,
|
||||
}
|
||||
if err := agent.AddService(svc, nil, false); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Register a check
|
||||
check1 := &structs.HealthCheck{
|
||||
Node: config.NodeName,
|
||||
CheckID: "service:redis1",
|
||||
CheckID: "service:redis",
|
||||
Name: "redischeck",
|
||||
Status: structs.HealthPassing,
|
||||
ServiceID: "redis",
|
||||
ServiceName: "redis",
|
||||
}
|
||||
|
||||
// Register the check
|
||||
if err := agent.AddCheck(check1, nil, false); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@ type CheckType struct {
|
|||
|
||||
Notes string
|
||||
}
|
||||
type CheckTypes []*CheckType
|
||||
|
||||
// Valid checks if the CheckType is valid
|
||||
func (c *CheckType) Valid() bool {
|
||||
|
|
|
@ -568,7 +568,6 @@ func DecodeConfig(r io.Reader) (*Config, error) {
|
|||
|
||||
// DecodeServiceDefinition is used to decode a service definition
|
||||
func DecodeServiceDefinition(raw interface{}) (*ServiceDefinition, error) {
|
||||
var sub interface{}
|
||||
rawMap, ok := raw.(map[string]interface{})
|
||||
if !ok {
|
||||
goto AFTER_FIX
|
||||
|
@ -582,17 +581,23 @@ func DecodeServiceDefinition(raw interface{}) (*ServiceDefinition, error) {
|
|||
}
|
||||
|
||||
for k, v := range rawMap {
|
||||
if strings.ToLower(k) == "check" {
|
||||
sub = v
|
||||
break
|
||||
switch strings.ToLower(k) {
|
||||
case "check":
|
||||
if err := FixupCheckType(v); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
case "checks":
|
||||
chkTypes, ok := v.([]interface{})
|
||||
if !ok {
|
||||
goto AFTER_FIX
|
||||
}
|
||||
for _, chkType := range chkTypes {
|
||||
if err := FixupCheckType(chkType); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if sub == nil {
|
||||
goto AFTER_FIX
|
||||
}
|
||||
if err := FixupCheckType(sub); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
AFTER_FIX:
|
||||
var md mapstructure.Metadata
|
||||
var result ServiceDefinition
|
||||
|
@ -610,22 +615,23 @@ AFTER_FIX:
|
|||
}
|
||||
|
||||
func FixupCheckType(raw interface{}) error {
|
||||
var ttlKey, intervalKey string
|
||||
|
||||
// Handle decoding of time durations
|
||||
rawMap, ok := raw.(map[string]interface{})
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
var ttlKey string
|
||||
for k, _ := range rawMap {
|
||||
if strings.ToLower(k) == "ttl" {
|
||||
for k, v := range rawMap {
|
||||
switch strings.ToLower(k) {
|
||||
case "ttl":
|
||||
ttlKey = k
|
||||
}
|
||||
}
|
||||
var intervalKey string
|
||||
for k, _ := range rawMap {
|
||||
if strings.ToLower(k) == "interval" {
|
||||
case "interval":
|
||||
intervalKey = k
|
||||
case "service_id":
|
||||
rawMap["serviceid"] = v
|
||||
delete(rawMap, "service_id")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -640,7 +640,17 @@ func TestDecodeConfig_Services(t *testing.T) {
|
|||
"script": "/bin/check_redis -p 6000",
|
||||
"interval": "5s",
|
||||
"ttl": "20s"
|
||||
}
|
||||
},
|
||||
"checks": [
|
||||
{
|
||||
"script": "/bin/check_redis_read",
|
||||
"interval": "1m"
|
||||
},
|
||||
{
|
||||
"script": "/bin/check_redis_write",
|
||||
"interval": "1m"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "red1",
|
||||
|
@ -672,6 +682,16 @@ func TestDecodeConfig_Services(t *testing.T) {
|
|||
Script: "/bin/check_redis -p 6000",
|
||||
TTL: 20 * time.Second,
|
||||
},
|
||||
Checks: CheckTypes{
|
||||
&CheckType{
|
||||
Interval: time.Minute,
|
||||
Script: "/bin/check_redis_read",
|
||||
},
|
||||
&CheckType{
|
||||
Interval: time.Minute,
|
||||
Script: "/bin/check_redis_write",
|
||||
},
|
||||
},
|
||||
ID: "red0",
|
||||
Name: "redis",
|
||||
Tags: []string{
|
||||
|
@ -715,6 +735,13 @@ func TestDecodeConfig_Checks(t *testing.T) {
|
|||
"name": "cpu",
|
||||
"script": "/bin/check_cpu",
|
||||
"interval": "10s"
|
||||
},
|
||||
{
|
||||
"id": "chk3",
|
||||
"name": "service:redis:tx",
|
||||
"script": "/bin/check_redis_tx",
|
||||
"interval": "1m",
|
||||
"service_id": "redis"
|
||||
}
|
||||
]
|
||||
}`
|
||||
|
@ -742,6 +769,15 @@ func TestDecodeConfig_Checks(t *testing.T) {
|
|||
Interval: 10 * time.Second,
|
||||
},
|
||||
},
|
||||
&CheckDefinition{
|
||||
ID: "chk3",
|
||||
Name: "service:redis:tx",
|
||||
ServiceID: "redis",
|
||||
CheckType: CheckType{
|
||||
Script: "/bin/check_redis_tx",
|
||||
Interval: time.Minute,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -368,7 +368,34 @@ func (l *localState) syncChanges() error {
|
|||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
// Sync the services
|
||||
// Sync the checks first. This allows registering the service in the
|
||||
// same transaction as its checks.
|
||||
var checkIDs []string
|
||||
for id, status := range l.checkStatus {
|
||||
if status.remoteDelete {
|
||||
if err := l.deleteCheck(id); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if !status.inSync {
|
||||
// Cancel a deferred sync
|
||||
if timer, ok := l.deferCheck[id]; ok {
|
||||
timer.Stop()
|
||||
delete(l.deferCheck, id)
|
||||
}
|
||||
|
||||
checkIDs = append(checkIDs, id)
|
||||
} else {
|
||||
l.logger.Printf("[DEBUG] agent: Check '%s' in sync", id)
|
||||
}
|
||||
|
||||
if len(checkIDs) > 0 {
|
||||
if err := l.syncChecks(checkIDs); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sync any remaining services.
|
||||
for id, status := range l.serviceStatus {
|
||||
if status.remoteDelete {
|
||||
if err := l.deleteService(id); err != nil {
|
||||
|
@ -383,26 +410,6 @@ func (l *localState) syncChanges() error {
|
|||
}
|
||||
}
|
||||
|
||||
// Sync the checks
|
||||
for id, status := range l.checkStatus {
|
||||
if status.remoteDelete {
|
||||
if err := l.deleteCheck(id); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if !status.inSync {
|
||||
// Cancel a deferred sync
|
||||
if timer := l.deferCheck[id]; timer != nil {
|
||||
timer.Stop()
|
||||
delete(l.deferCheck, id)
|
||||
}
|
||||
|
||||
if err := l.syncCheck(id); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
l.logger.Printf("[DEBUG] agent: Check '%s' in sync", id)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -462,33 +469,72 @@ func (l *localState) syncService(id string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// syncCheck is used to sync a service to the server
|
||||
func (l *localState) syncCheck(id string) error {
|
||||
// Pull in the associated service if any
|
||||
check := l.checks[id]
|
||||
var service *structs.NodeService
|
||||
if check.ServiceID != "" {
|
||||
if serv, ok := l.services[check.ServiceID]; ok {
|
||||
service = serv
|
||||
// syncChecks is used to sync checks to the server
|
||||
func (l *localState) syncChecks(checkIDs []string) error {
|
||||
reqs := make(map[string]*structs.RegisterRequest)
|
||||
|
||||
for _, id := range checkIDs {
|
||||
if check, ok := l.checks[id]; ok {
|
||||
// Add checks to the base request if it already exists
|
||||
if req, ok := reqs[check.ServiceID]; ok {
|
||||
req.Checks = append(req.Checks, check)
|
||||
continue
|
||||
}
|
||||
|
||||
// Pull in the associated service if any
|
||||
var service *structs.NodeService
|
||||
if serv, ok := l.services[check.ServiceID]; ok {
|
||||
service = serv
|
||||
}
|
||||
|
||||
// Create the base request
|
||||
reqs[check.ServiceID] = &structs.RegisterRequest{
|
||||
Datacenter: l.config.Datacenter,
|
||||
Node: l.config.NodeName,
|
||||
Address: l.config.AdvertiseAddr,
|
||||
Service: service,
|
||||
Checks: structs.HealthChecks{check},
|
||||
WriteRequest: structs.WriteRequest{Token: l.config.ACLToken},
|
||||
}
|
||||
}
|
||||
}
|
||||
req := structs.RegisterRequest{
|
||||
Datacenter: l.config.Datacenter,
|
||||
Node: l.config.NodeName,
|
||||
Address: l.config.AdvertiseAddr,
|
||||
Service: service,
|
||||
Check: l.checks[id],
|
||||
WriteRequest: structs.WriteRequest{Token: l.config.ACLToken},
|
||||
|
||||
for _, req := range reqs {
|
||||
// Send check data as Check for backward compatibility if we only have a
|
||||
// single check. Otherwise, send it as Checks
|
||||
if len(req.Checks) == 1 {
|
||||
req.Check = req.Checks[0]
|
||||
req.Checks = nil
|
||||
}
|
||||
|
||||
// Perform the sync
|
||||
var out struct{}
|
||||
err := l.iface.RPC("Catalog.Register", &req, &out)
|
||||
if err == nil {
|
||||
for _, id := range checkIDs {
|
||||
l.checkStatus[id] = syncStatus{inSync: true}
|
||||
l.logger.Printf("[INFO] agent: Synced check '%s'", id)
|
||||
}
|
||||
|
||||
// If the check was associated with a service and we synced it,
|
||||
// then mark the service as in sync.
|
||||
if svc := req.Service; svc != nil {
|
||||
if status, ok := l.serviceStatus[svc.ID]; ok && status.inSync {
|
||||
continue
|
||||
}
|
||||
l.serviceStatus[svc.ID] = syncStatus{inSync: true}
|
||||
l.logger.Printf("[INFO] agent: Synced service '%s'", svc.ID)
|
||||
}
|
||||
} else if strings.Contains(err.Error(), permissionDenied) {
|
||||
for _, id := range checkIDs {
|
||||
l.checkStatus[id] = syncStatus{inSync: true}
|
||||
l.logger.Printf("[WARN] agent: Check '%s' registration blocked by ACLs", id)
|
||||
}
|
||||
return nil
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
var out struct{}
|
||||
err := l.iface.RPC("Catalog.Register", &req, &out)
|
||||
if err == nil {
|
||||
l.checkStatus[id] = syncStatus{inSync: true}
|
||||
l.logger.Printf("[INFO] agent: Synced check '%s'", id)
|
||||
} else if strings.Contains(err.Error(), permissionDenied) {
|
||||
l.checkStatus[id] = syncStatus{inSync: true}
|
||||
l.logger.Printf("[WARN] agent: Check '%s' registration blocked by ACLs", id)
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ type ServiceDefinition struct {
|
|||
Address string
|
||||
Port int
|
||||
Check CheckType
|
||||
Checks CheckTypes
|
||||
}
|
||||
|
||||
func (s *ServiceDefinition) NodeService() *structs.NodeService {
|
||||
|
@ -28,11 +29,14 @@ func (s *ServiceDefinition) NodeService() *structs.NodeService {
|
|||
return ns
|
||||
}
|
||||
|
||||
func (s *ServiceDefinition) CheckType() *CheckType {
|
||||
if s.Check.Script == "" && s.Check.Interval == 0 && s.Check.TTL == 0 {
|
||||
return nil
|
||||
func (s *ServiceDefinition) CheckTypes() (checks CheckTypes) {
|
||||
s.Checks = append(s.Checks, &s.Check)
|
||||
for _, check := range s.Checks {
|
||||
if (check.Script != "" && check.Interval != 0) || check.TTL != 0 {
|
||||
checks = append(checks, check)
|
||||
}
|
||||
}
|
||||
return &s.Check
|
||||
return
|
||||
}
|
||||
|
||||
// ChecKDefinition is used to JSON decode the Check definitions
|
||||
|
@ -40,16 +44,18 @@ type CheckDefinition struct {
|
|||
ID string
|
||||
Name string
|
||||
Notes string
|
||||
ServiceID string
|
||||
CheckType `mapstructure:",squash"`
|
||||
}
|
||||
|
||||
func (c *CheckDefinition) HealthCheck(node string) *structs.HealthCheck {
|
||||
health := &structs.HealthCheck{
|
||||
Node: node,
|
||||
CheckID: c.ID,
|
||||
Name: c.Name,
|
||||
Status: structs.HealthCritical,
|
||||
Notes: c.Notes,
|
||||
Node: node,
|
||||
CheckID: c.ID,
|
||||
Name: c.Name,
|
||||
Status: structs.HealthCritical,
|
||||
Notes: c.Notes,
|
||||
ServiceID: c.ServiceID,
|
||||
}
|
||||
if health.CheckID == "" && health.Name != "" {
|
||||
health.CheckID = health.Name
|
||||
|
|
|
@ -53,11 +53,15 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error
|
|||
}
|
||||
|
||||
if args.Check != nil {
|
||||
if args.Check.CheckID == "" && args.Check.Name != "" {
|
||||
args.Check.CheckID = args.Check.Name
|
||||
args.Checks = append(args.Checks, args.Check)
|
||||
args.Check = nil
|
||||
}
|
||||
for _, check := range args.Checks {
|
||||
if check.CheckID == "" && check.Name != "" {
|
||||
check.CheckID = check.Name
|
||||
}
|
||||
if args.Check.Node == "" {
|
||||
args.Check.Node = args.Node
|
||||
if check.Node == "" {
|
||||
check.Node = args.Node
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -66,6 +70,7 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error
|
|||
c.srv.logger.Printf("[ERR] consul.catalog: Register failed: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -497,12 +497,17 @@ func (s *StateStore) EnsureRegistration(index uint64, req *structs.RegisterReque
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure the check if provided
|
||||
// Ensure the check(s), if provided
|
||||
if req.Check != nil {
|
||||
if err := s.ensureCheckTxn(index, req.Check, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, check := range req.Checks {
|
||||
if err := s.ensureCheckTxn(index, check, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Commit as one unit
|
||||
return tx.Commit()
|
||||
|
|
|
@ -32,6 +32,15 @@ func TestEnsureRegistration(t *testing.T) {
|
|||
Status: structs.HealthPassing,
|
||||
ServiceID: "api",
|
||||
},
|
||||
Checks: structs.HealthChecks{
|
||||
&structs.HealthCheck{
|
||||
Node: "foo",
|
||||
CheckID: "api-cache",
|
||||
Name: "Can cache stuff",
|
||||
Status: structs.HealthPassing,
|
||||
ServiceID: "api",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if err := store.EnsureRegistration(13, reg); err != nil {
|
||||
|
@ -60,7 +69,7 @@ func TestEnsureRegistration(t *testing.T) {
|
|||
if idx != 13 {
|
||||
t.Fatalf("bad: %v", idx)
|
||||
}
|
||||
if len(checks) != 1 {
|
||||
if len(checks) != 2 {
|
||||
t.Fatalf("check: %#v", checks)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -137,6 +137,7 @@ type RegisterRequest struct {
|
|||
Address string
|
||||
Service *NodeService
|
||||
Check *HealthCheck
|
||||
Checks HealthChecks
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
|
|
|
@ -104,6 +104,28 @@ This is the only convention that Consul depends on. Any output of the script
|
|||
will be captured and stored in the `notes` field so that it can be viewed
|
||||
by human operators.
|
||||
|
||||
## Service-bound checks
|
||||
|
||||
Health checks may also be optionally bound to a specific service. This ensures
|
||||
that the status of the health check will only affect the health status of the
|
||||
given service instead of the entire node. Service-bound health checks may be
|
||||
provided by adding a `service_id` field to a check configuration:
|
||||
|
||||
```javascript
|
||||
{
|
||||
"check": {
|
||||
"id": "web-app",
|
||||
"name": "Web App Status",
|
||||
"service_id": "web-app",
|
||||
"ttl": "30s"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
In the above configuration, if the web-app health check begins failing, it will
|
||||
only affect the availability of the web-app service and no other services
|
||||
provided by the node.
|
||||
|
||||
## Multiple Check Definitions
|
||||
|
||||
Multiple check definitions can be provided at once using the `checks` (plural)
|
||||
|
|
|
@ -469,6 +469,21 @@ An `HTTP` check will preform an HTTP GET request to the value of `HTTP` (expecte
|
|||
If a `TTL` type is used, then the TTL update APIs must be used to periodically update
|
||||
the state of the check.
|
||||
|
||||
It is also possible to associate a new check with an existing service registered
|
||||
on the agent by providing an additional `ServiceID` field. This type of request
|
||||
must look like:
|
||||
|
||||
```javascript
|
||||
{
|
||||
"ID": "service:redis:tx",
|
||||
"ServiceID": "redis",
|
||||
"Name": "Redis test transaction",
|
||||
"Notes": "Tests Redis SET, GET, and DELETE",
|
||||
"Script": "/usr/local/bin/check_redis_tx.py",
|
||||
"Interval": "1m"
|
||||
}
|
||||
```
|
||||
|
||||
The return code is 200 on success.
|
||||
|
||||
### <a name="agent_check_deregister"></a> /v1/agent/check/deregister/\<checkId\>
|
||||
|
|
|
@ -26,10 +26,12 @@ A service definition that is a script looks like:
|
|||
"tags": ["master"],
|
||||
"address": "127.0.0.1",
|
||||
"port": 8000,
|
||||
"check": {
|
||||
"script": "/usr/local/bin/check_redis.py",
|
||||
"interval": "10s"
|
||||
}
|
||||
"checks": [
|
||||
{
|
||||
"script": "/usr/local/bin/check_redis.py",
|
||||
"interval": "10s"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
@ -58,7 +60,10 @@ There is more information about [checks here](/docs/agent/checks.html). The
|
|||
check must be of the script, HTTP or TTL type. If it is a script type, `script` and
|
||||
`interval` must be provided. If it is a HTTP type, `http` and
|
||||
`interval` must be provided. If it is a TTL type, then only `ttl` must be
|
||||
provided. The check name is automatically generated as "service:<service-id>".
|
||||
provided. The check name is automatically generated as
|
||||
`service:<service-id>`. If there are multiple service checks registered, the
|
||||
ID will be generated as `service:<service-id>:<num>`, where `<num>` is an
|
||||
incrementing number starting from `1`.
|
||||
|
||||
To configure a service, either provide it as a `-config-file` option to the
|
||||
agent, or place it inside the `-config-dir` of the agent. The file must
|
||||
|
@ -82,11 +87,13 @@ Multiple services definitions can be provided at once using the `services`
|
|||
],
|
||||
"address": "127.0.0.1",
|
||||
"port": 6000,
|
||||
"check": {
|
||||
"script": "/bin/check_redis -p 6000",
|
||||
"interval": "5s",
|
||||
"ttl": "20s"
|
||||
}
|
||||
"checks": [
|
||||
{
|
||||
"script": "/bin/check_redis -p 6000",
|
||||
"interval": "5s",
|
||||
"ttl": "20s"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "red1",
|
||||
|
@ -97,11 +104,13 @@ Multiple services definitions can be provided at once using the `services`
|
|||
],
|
||||
"address": "127.0.0.1",
|
||||
"port": 7000,
|
||||
"check": {
|
||||
"script": "/bin/check_redis -p 7000",
|
||||
"interval": "30s",
|
||||
"ttl": "60s"
|
||||
}
|
||||
"checks": [
|
||||
{
|
||||
"script": "/bin/check_redis -p 7000",
|
||||
"interval": "30s",
|
||||
"ttl": "60s"
|
||||
}
|
||||
]
|
||||
},
|
||||
...
|
||||
]
|
||||
|
|
Loading…
Reference in New Issue