agent: allow config reload to modify checks/services persistence

This change consolidates loading services and checks from both config
and persisted state into methods on the agent. As part of this, we
introduce optional persistence when calling RemoveCheck/RemoveService.

Fixes a bug where config reloads would kill persisted services/checks.
Also fixes an edge case:

1. A service or check is registered via the HTTP API
2. A new service or check definition with the same ID is added to config
3. Config is reloaded

The desired behavior (which this implements) is:

1. All services and checks deregistered in memory
2. All services and checks in config are registered first
3. All persisted checks are restored using the same logic as the agent
   start sequence, which prioritizes config over persisted, and removes
   any persistence files if new config counterparts are present.
This commit is contained in:
Ryan Uber 2014-11-25 23:58:02 -08:00
parent ab92a900d6
commit b7587cac42
4 changed files with 143 additions and 69 deletions

View File

@ -141,31 +141,11 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
return nil, err
}
// Register the services from config
for _, service := range config.Services {
ns := service.NodeService()
chkType := service.CheckType()
if err := agent.AddService(ns, chkType, false); err != nil {
return nil, fmt.Errorf(
"Failed to register service '%s': %v", service.Name, err)
}
}
// Register the checks from config
for _, check := range config.Checks {
health := check.HealthCheck(config.NodeName)
chkType := &check.CheckType
if err := agent.AddCheck(health, chkType, false); err != nil {
return nil, fmt.Errorf(
"Failed to register check '%s': %v %v", check.Name, err, check)
}
}
// Load any persisted services or checks
if err := agent.restoreServices(); err != nil {
// Load checks/services
if err := agent.reloadServices(config); err != nil {
return nil, err
}
if err := agent.restoreChecks(); err != nil {
if err := agent.reloadChecks(config); err != nil {
return nil, err
}
@ -705,7 +685,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkType *CheckType, per
// RemoveService is used to remove a service entry.
// The agent will make a best effort to ensure it is deregistered
func (a *Agent) RemoveService(serviceID string) error {
func (a *Agent) RemoveService(serviceID string, persist bool) error {
// Protect "consul" service from deletion by a user
if a.server != nil && serviceID == consul.ConsulServiceID {
return fmt.Errorf(
@ -717,13 +697,15 @@ func (a *Agent) RemoveService(serviceID string) error {
a.state.RemoveService(serviceID)
// Remove the service from the data dir
if persist {
if err := a.purgeService(serviceID); err != nil {
return err
}
}
// Deregister any associated health checks
checkID := fmt.Sprintf("service:%s", serviceID)
return a.RemoveCheck(checkID)
return a.RemoveCheck(checkID, persist)
}
// AddCheck is used to add a health check to the agent.
@ -792,7 +774,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist
// RemoveCheck is used to remove a health check.
// The agent will make a best effort to ensure it is deregistered
func (a *Agent) RemoveCheck(checkID string) error {
func (a *Agent) RemoveCheck(checkID string, persist bool) error {
// Add to the local state for anti-entropy
a.state.RemoveCheck(checkID)
@ -808,7 +790,10 @@ func (a *Agent) RemoveCheck(checkID string) error {
check.Stop()
delete(a.checkTTLs, checkID)
}
if persist {
return a.purgeCheck(checkID)
}
return nil
}
// UpdateCheck is used to update the status of a check.
@ -904,3 +889,58 @@ func (a *Agent) deletePid() error {
}
return nil
}
// reloadServices reloads all known services from config and state. It is used
// at initial agent startup as well as during config reloads.
func (a *Agent) reloadServices(conf *Config) error {
for _, service := range a.state.Services() {
if service.ID == consul.ConsulServiceID {
continue
}
if err := a.RemoveService(service.ID, false); err != nil {
return fmt.Errorf("Failed deregistering service '%s': %v", service.ID, err)
}
}
// Register the services from config
for _, service := range conf.Services {
ns := service.NodeService()
chkType := service.CheckType()
if err := a.AddService(ns, chkType, false); err != nil {
return fmt.Errorf("Failed to register service '%s': %v", service.ID, err)
}
}
// Load any persisted services
if err := a.restoreServices(); err != nil {
return fmt.Errorf("Failed restoring services: %s", err)
}
return nil
}
// reloadChecks reloads all known checks from config and state. It can be used
// during initial agent start or for config reloads.
func (a *Agent) reloadChecks(conf *Config) error {
for _, check := range a.state.Checks() {
if err := a.RemoveCheck(check.CheckID, false); err != nil {
return fmt.Errorf("Failed deregistering check '%s': %s", check.CheckID, err)
}
}
// Register the checks from config
for _, check := range conf.Checks {
health := check.HealthCheck(conf.NodeName)
chkType := &check.CheckType
if err := a.AddCheck(health, chkType, false); err != nil {
return fmt.Errorf("Failed to register check '%s': %v %v", check.Name, err, check)
}
}
// Load any persisted checks
if err := a.restoreChecks(); err != nil {
return fmt.Errorf("Failed restoring checks: %s", err)
}
return nil
}

View File

@ -102,7 +102,7 @@ func (s *HTTPServer) AgentRegisterCheck(resp http.ResponseWriter, req *http.Requ
func (s *HTTPServer) AgentDeregisterCheck(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
checkID := strings.TrimPrefix(req.URL.Path, "/v1/agent/check/deregister/")
return nil, s.agent.RemoveCheck(checkID)
return nil, s.agent.RemoveCheck(checkID, true)
}
func (s *HTTPServer) AgentCheckPass(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
@ -174,5 +174,5 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re
func (s *HTTPServer) AgentDeregisterService(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
serviceID := strings.TrimPrefix(req.URL.Path, "/v1/agent/service/deregister/")
return nil, s.agent.RemoveService(serviceID)
return nil, s.agent.RemoveService(serviceID, true)
}

View File

@ -181,12 +181,12 @@ func TestAgent_RemoveService(t *testing.T) {
defer agent.Shutdown()
// Remove a service that doesn't exist
if err := agent.RemoveService("redis"); err != nil {
if err := agent.RemoveService("redis", false); err != nil {
t.Fatalf("err: %v", err)
}
// Remove the consul service
if err := agent.RemoveService("consul"); err == nil {
if err := agent.RemoveService("consul", false); err == nil {
t.Fatalf("should have errored")
}
@ -201,7 +201,7 @@ func TestAgent_RemoveService(t *testing.T) {
}
// Remove the service
if err := agent.RemoveService("redis"); err != nil {
if err := agent.RemoveService("redis", false); err != nil {
t.Fatalf("err: %v", err)
}
@ -291,7 +291,7 @@ func TestAgent_RemoveCheck(t *testing.T) {
defer agent.Shutdown()
// Remove check that doesn't exist
if err := agent.RemoveCheck("mem"); err != nil {
if err := agent.RemoveCheck("mem", false); err != nil {
t.Fatalf("err: %v", err)
}
@ -311,7 +311,7 @@ func TestAgent_RemoveCheck(t *testing.T) {
}
// Remove check
if err := agent.RemoveCheck("mem"); err != nil {
if err := agent.RemoveCheck("mem", false); err != nil {
t.Fatalf("err: %v", err)
}
@ -438,13 +438,39 @@ func TestAgent_PersistService(t *testing.T) {
if _, ok := agent2.state.services[svc.ID]; !ok {
t.Fatalf("bad: %#v", agent2.state.services)
}
}
// Should remove the service file
if err := agent2.RemoveService(svc.ID); err != nil {
func TestAgent_PurgeService(t *testing.T) {
config := nextConfig()
dir, agent := makeAgent(t, config)
defer os.RemoveAll(dir)
svc := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"foo"},
Port: 8000,
}
file := filepath.Join(agent.config.DataDir, servicesDir, svc.ID)
if err := agent.AddService(svc, nil, true); err != nil {
t.Fatalf("err: %v", err)
}
// Not removed
if err := agent.RemoveService(svc.ID, false); err != nil {
t.Fatalf("err: %s", err)
}
if _, err := os.Stat(file); err != nil {
t.Fatalf("err: %s", err)
}
// Removed
if err := agent.RemoveService(svc.ID, true); err != nil {
t.Fatalf("err: %s", err)
}
if _, err := os.Stat(file); !os.IsNotExist(err) {
t.Fatalf("err: %s", err)
t.Fatalf("bad: %#v", err)
}
}
@ -555,13 +581,41 @@ func TestAgent_PersistCheck(t *testing.T) {
if result.Status != structs.HealthCritical {
t.Fatalf("bad: %#v", result)
}
}
// Should remove the service file
if err := agent2.RemoveCheck(check.CheckID); err != nil {
func TestAgent_PurgeCheck(t *testing.T) {
config := nextConfig()
dir, agent := makeAgent(t, config)
defer os.RemoveAll(dir)
check := &structs.HealthCheck{
Node: config.NodeName,
CheckID: "service:redis1",
Name: "redischeck",
Status: structs.HealthPassing,
ServiceID: "redis",
ServiceName: "redis",
}
file := filepath.Join(agent.config.DataDir, checksDir, check.CheckID)
if err := agent.AddCheck(check, nil, true); err != nil {
t.Fatalf("err: %v", err)
}
// Not removed
if err := agent.RemoveCheck(check.CheckID, false); err != nil {
t.Fatalf("err: %s", err)
}
if _, err := os.Stat(file); err != nil {
t.Fatalf("err: %s", err)
}
// Removed
if err := agent.RemoveCheck(check.CheckID, true); err != nil {
t.Fatalf("err: %s", err)
}
if _, err := os.Stat(file); !os.IsNotExist(err) {
t.Fatalf("err: %s", err)
t.Fatalf("bad: %#v", err)
}
}

View File

@ -722,34 +722,14 @@ func (c *Command) handleReload(config *Config) *Config {
c.agent.PauseSync()
defer c.agent.ResumeSync()
// Deregister the old services
for _, service := range config.Services {
ns := service.NodeService()
c.agent.RemoveService(ns.ID)
}
// Deregister the old checks
for _, check := range config.Checks {
health := check.HealthCheck(config.NodeName)
c.agent.RemoveCheck(health.CheckID)
}
// Register the services
for _, service := range newConf.Services {
ns := service.NodeService()
chkType := service.CheckType()
if err := c.agent.AddService(ns, chkType, false); err != nil {
c.Ui.Error(fmt.Sprintf("Failed to register service '%s': %v", service.Name, err))
}
}
// Register the checks
for _, check := range newConf.Checks {
health := check.HealthCheck(config.NodeName)
chkType := &check.CheckType
if err := c.agent.AddCheck(health, chkType, false); err != nil {
c.Ui.Error(fmt.Sprintf("Failed to register check '%s': %v %v", check.Name, err, check))
// Reload services and check definitions
if err := c.agent.reloadServices(newConf); err != nil {
c.Ui.Error(fmt.Sprintf("Failed reloading services: %s", err))
return nil
}
if err := c.agent.reloadChecks(newConf); err != nil {
c.Ui.Error(fmt.Sprintf("Failed reloading checks: %s", err))
return nil
}
// Get the new client listener addr