agent: prefer config over persisted services/checks (#497)

This commit is contained in:
Ryan Uber 2014-11-24 19:24:32 -08:00
parent f74d3dbd92
commit 9f9087badb
5 changed files with 193 additions and 53 deletions

View File

@ -141,7 +141,27 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
return nil, err
}
// Load any persisted services and services
// Register the services from config
for _, service := range config.Services {
ns := service.NodeService()
chkType := service.CheckType()
if err := agent.AddService(ns, chkType, false); err != nil {
return nil, fmt.Errorf(
"Failed to register service '%s': %v", service.Name, err)
}
}
// Register the checks from config
for _, check := range config.Checks {
health := check.HealthCheck(config.NodeName)
chkType := &check.CheckType
if err := agent.AddCheck(health, chkType, false); err != nil {
return nil, fmt.Errorf(
"Failed to register check '%s': %v %v", check.Name, err, check)
}
}
// Load any persisted services or checks
if err := agent.restoreServices(); err != nil {
return nil, err
}
@ -550,8 +570,15 @@ func (a *Agent) restoreServices() error {
return err
}
a.logger.Printf("[DEBUG] Restored service definition: %s", svc.ID)
return a.AddService(svc, nil)
if _, ok := a.state.services[svc.ID]; ok {
// Purge previously persisted service. This allows config to be
// preferred over services persisted from the API.
a.logger.Printf("[DEBUG] Service %s exists, not restoring", svc.ID)
return a.purgeService(svc.ID)
} else {
a.logger.Printf("[DEBUG] Restored service definition: %s", svc.ID)
return a.AddService(svc, nil, false)
}
})
return err
}
@ -617,12 +644,19 @@ func (a *Agent) restoreChecks() error {
return err
}
// Default check to critical to avoid placing potentially unhealthy
// services into the active pool
check.Status = structs.HealthCritical
if _, ok := a.state.checks[check.CheckID]; ok {
// Purge previously persisted check. This allows config to be
// preferred over persisted checks from the API.
a.logger.Printf("[DEBUG] Check %s exists, not restoring", check.CheckID)
return a.purgeCheck(check.CheckID)
} else {
// Default check to critical to avoid placing potentially unhealthy
// services into the active pool
check.Status = structs.HealthCritical
a.logger.Printf("[DEBUG] Restored health check: %s", check.CheckID)
return a.AddCheck(check, nil)
a.logger.Printf("[DEBUG] Restored health check: %s", check.CheckID)
return a.AddCheck(check, nil, false)
}
})
return err
}
@ -630,7 +664,7 @@ func (a *Agent) restoreChecks() error {
// AddService is used to add a service entry.
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
func (a *Agent) AddService(service *structs.NodeService, chkType *CheckType) error {
func (a *Agent) AddService(service *structs.NodeService, chkType *CheckType, persist bool) error {
if service.Service == "" {
return fmt.Errorf("Service name missing")
}
@ -645,8 +679,10 @@ func (a *Agent) AddService(service *structs.NodeService, chkType *CheckType) err
a.state.AddService(service)
// Persist the service to a file
if err := a.persistService(service); err != nil {
return err
if persist {
if err := a.persistService(service); err != nil {
return err
}
}
// Create an associated health check
@ -660,7 +696,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkType *CheckType) err
ServiceID: service.ID,
ServiceName: service.Service,
}
if err := a.AddCheck(check, chkType); err != nil {
if err := a.AddCheck(check, chkType, persist); err != nil {
return err
}
}
@ -694,7 +730,7 @@ func (a *Agent) RemoveService(serviceID string) error {
// This entry is persistent and the agent will make a best effort to
// ensure it is registered. The Check may include a CheckType which
// is used to automatically update the check status
func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType) error {
func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist bool) error {
if check.CheckID == "" {
return fmt.Errorf("CheckID missing")
}
@ -747,7 +783,11 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType) error {
a.state.AddCheck(check)
// Persist the check
return a.persistCheck(check)
if persist {
return a.persistCheck(check)
}
return nil
}
// RemoveCheck is used to remove a health check.

View File

@ -97,7 +97,7 @@ func (s *HTTPServer) AgentRegisterCheck(resp http.ResponseWriter, req *http.Requ
}
// Add the check
return nil, s.agent.AddCheck(health, chkType)
return nil, s.agent.AddCheck(health, chkType, true)
}
func (s *HTTPServer) AgentDeregisterCheck(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
@ -169,7 +169,7 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re
}
// Add the check
return nil, s.agent.AddService(ns, chkType)
return nil, s.agent.AddService(ns, chkType, true)
}
func (s *HTTPServer) AgentDeregisterService(resp http.ResponseWriter, req *http.Request) (interface{}, error) {

View File

@ -288,7 +288,7 @@ func TestHTTPAgentDeregisterCheck(t *testing.T) {
defer srv.agent.Shutdown()
chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
if err := srv.agent.AddCheck(chk, nil); err != nil {
if err := srv.agent.AddCheck(chk, nil, false); err != nil {
t.Fatalf("err: %v", err)
}
@ -320,7 +320,7 @@ func TestHTTPAgentPassCheck(t *testing.T) {
chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
chkType := &CheckType{TTL: 15 * time.Second}
if err := srv.agent.AddCheck(chk, chkType); err != nil {
if err := srv.agent.AddCheck(chk, chkType, false); err != nil {
t.Fatalf("err: %v", err)
}
@ -353,7 +353,7 @@ func TestHTTPAgentWarnCheck(t *testing.T) {
chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
chkType := &CheckType{TTL: 15 * time.Second}
if err := srv.agent.AddCheck(chk, chkType); err != nil {
if err := srv.agent.AddCheck(chk, chkType, false); err != nil {
t.Fatalf("err: %v", err)
}
@ -386,7 +386,7 @@ func TestHTTPAgentFailCheck(t *testing.T) {
chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
chkType := &CheckType{TTL: 15 * time.Second}
if err := srv.agent.AddCheck(chk, chkType); err != nil {
if err := srv.agent.AddCheck(chk, chkType, false); err != nil {
t.Fatalf("err: %v", err)
}
@ -465,7 +465,7 @@ func TestHTTPAgentDeregisterService(t *testing.T) {
ID: "test",
Service: "test",
}
if err := srv.agent.AddService(service, nil); err != nil {
if err := srv.agent.AddService(service, nil, false); err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -8,6 +8,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"reflect"
"sync/atomic"
"testing"
"time"
@ -148,7 +149,7 @@ func TestAgent_AddService(t *testing.T) {
TTL: time.Minute,
Notes: "redis health check",
}
err := agent.AddService(srv, chk)
err := agent.AddService(srv, chk, false)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -195,7 +196,7 @@ func TestAgent_RemoveService(t *testing.T) {
Port: 8000,
}
chk := &CheckType{TTL: time.Minute}
if err := agent.AddService(srv, chk); err != nil {
if err := agent.AddService(srv, chk, false); err != nil {
t.Fatalf("err: %v", err)
}
@ -235,7 +236,7 @@ func TestAgent_AddCheck(t *testing.T) {
Script: "exit 0",
Interval: 15 * time.Second,
}
err := agent.AddCheck(health, chk)
err := agent.AddCheck(health, chk, false)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -266,7 +267,7 @@ func TestAgent_AddCheck_MinInterval(t *testing.T) {
Script: "exit 0",
Interval: time.Microsecond,
}
err := agent.AddCheck(health, chk)
err := agent.AddCheck(health, chk, false)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -304,7 +305,7 @@ func TestAgent_RemoveCheck(t *testing.T) {
Script: "exit 0",
Interval: 15 * time.Second,
}
err := agent.AddCheck(health, chk)
err := agent.AddCheck(health, chk, false)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -339,7 +340,7 @@ func TestAgent_UpdateCheck(t *testing.T) {
chk := &CheckType{
TTL: 15 * time.Second,
}
err := agent.AddCheck(health, chk)
err := agent.AddCheck(health, chk, false)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -395,11 +396,21 @@ func TestAgent_PersistService(t *testing.T) {
Port: 8000,
}
if err := agent.AddService(svc, nil); err != nil {
file := filepath.Join(agent.config.DataDir, servicesDir, svc.ID)
// Check is not persisted unless requested
if err := agent.AddService(svc, nil, false); err != nil {
t.Fatalf("err: %v", err)
}
if _, err := os.Stat(file); err == nil {
t.Fatalf("should not persist")
}
// Persists to file if requested
if err := agent.AddService(svc, nil, true); err != nil {
t.Fatalf("err: %v", err)
}
file := filepath.Join(agent.config.DataDir, servicesDir, svc.ID)
if _, err := os.Stat(file); err != nil {
t.Fatalf("err: %s", err)
}
@ -437,6 +448,53 @@ func TestAgent_PersistService(t *testing.T) {
}
}
func TestAgent_PurgeServiceOnDuplicate(t *testing.T) {
config := nextConfig()
dir, agent := makeAgent(t, config)
defer os.RemoveAll(dir)
svc1 := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"foo"},
Port: 8000,
}
// First persist the service
if err := agent.AddService(svc1, nil, true); err != nil {
t.Fatalf("err: %v", err)
}
agent.Shutdown()
// Try bringing the agent back up with the service already
// existing in the config
svc2 := &ServiceDefinition{
ID: "redis",
Name: "redis",
Tags: []string{"bar"},
Port: 9000,
}
config.Services = []*ServiceDefinition{svc2}
agent2, err := Create(config, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
defer agent2.Shutdown()
file := filepath.Join(agent.config.DataDir, servicesDir, svc1.ID)
if _, err := os.Stat(file); err == nil {
t.Fatalf("should have removed persisted service")
}
result, ok := agent2.state.services[svc2.ID]
if !ok {
t.Fatalf("missing service registration")
}
if !reflect.DeepEqual(result.Tags, svc2.Tags) || result.Port != svc2.Port {
t.Fatalf("bad: %#v", result)
}
}
func TestAgent_PersistCheck(t *testing.T) {
config := nextConfig()
dir, agent := makeAgent(t, config)
@ -451,11 +509,21 @@ func TestAgent_PersistCheck(t *testing.T) {
ServiceName: "redis",
}
if err := agent.AddCheck(check, nil); err != nil {
file := filepath.Join(agent.config.DataDir, checksDir, check.CheckID)
// Not persisted if not requested
if err := agent.AddCheck(check, nil, false); err != nil {
t.Fatalf("err: %v", err)
}
if _, err := os.Stat(file); err == nil {
t.Fatalf("should not persist")
}
// Should persist if requested
if err := agent.AddCheck(check, nil, true); err != nil {
t.Fatalf("err: %v", err)
}
file := filepath.Join(agent.config.DataDir, checksDir, check.CheckID)
if _, err := os.Stat(file); err != nil {
t.Fatalf("err: %s", err)
}
@ -496,3 +564,55 @@ func TestAgent_PersistCheck(t *testing.T) {
t.Fatalf("err: %s", err)
}
}
func TestAgent_PurgeCheckOnDuplicate(t *testing.T) {
config := nextConfig()
dir, agent := makeAgent(t, config)
defer os.RemoveAll(dir)
check1 := &structs.HealthCheck{
Node: config.NodeName,
CheckID: "service:redis1",
Name: "redischeck",
Status: structs.HealthPassing,
ServiceID: "redis",
ServiceName: "redis",
}
// First persist the check
if err := agent.AddCheck(check1, nil, true); err != nil {
t.Fatalf("err: %v", err)
}
agent.Shutdown()
// Start again with the check registered in config
check2 := &CheckDefinition{
ID: "service:redis1",
Name: "redischeck",
Notes: "my cool notes",
CheckType: CheckType{
Script: "/bin/check-redis.py",
Interval: 30 * time.Second,
},
}
config.Checks = []*CheckDefinition{check2}
agent2, err := Create(config, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
defer agent2.Shutdown()
file := filepath.Join(agent.config.DataDir, checksDir, check1.CheckID)
if _, err := os.Stat(file); err == nil {
t.Fatalf("should have removed persisted check")
}
result, ok := agent2.state.checks[check2.ID]
if !ok {
t.Fatalf("missing check registration")
}
expected := check2.HealthCheck(config.NodeName)
if !reflect.DeepEqual(expected, result) {
t.Fatalf("bad: %#v", result)
}
}

View File

@ -574,26 +574,6 @@ func (c *Command) Run(args []string) int {
return 1
}
// Register the services
for _, service := range config.Services {
ns := service.NodeService()
chkType := service.CheckType()
if err := c.agent.AddService(ns, chkType); err != nil {
c.Ui.Error(fmt.Sprintf("Failed to register service '%s': %v", service.Name, err))
return 1
}
}
// Register the checks
for _, check := range config.Checks {
health := check.HealthCheck(config.NodeName)
chkType := &check.CheckType
if err := c.agent.AddCheck(health, chkType); err != nil {
c.Ui.Error(fmt.Sprintf("Failed to register check '%s': %v %v", check.Name, err, check))
return 1
}
}
// Get the new client http listener addr
httpAddr, err := config.ClientListenerAddr(config.Addresses.HTTP, config.Ports.HTTP)
if err != nil {
@ -758,7 +738,7 @@ func (c *Command) handleReload(config *Config) *Config {
for _, service := range newConf.Services {
ns := service.NodeService()
chkType := service.CheckType()
if err := c.agent.AddService(ns, chkType); err != nil {
if err := c.agent.AddService(ns, chkType, false); err != nil {
c.Ui.Error(fmt.Sprintf("Failed to register service '%s': %v", service.Name, err))
}
}
@ -767,7 +747,7 @@ func (c *Command) handleReload(config *Config) *Config {
for _, check := range newConf.Checks {
health := check.HealthCheck(config.NodeName)
chkType := &check.CheckType
if err := c.agent.AddCheck(health, chkType); err != nil {
if err := c.agent.AddCheck(health, chkType, false); err != nil {
c.Ui.Error(fmt.Sprintf("Failed to register check '%s': %v %v", check.Name, err, check))
}
}