agent: consolidate service loading code, better logging

This commit is contained in:
Ryan Uber 2015-01-07 21:24:47 -08:00
parent cfde9313de
commit 51fe9f32ff
1 changed files with 82 additions and 94 deletions

View File

@ -521,48 +521,6 @@ func (a *Agent) purgeService(serviceID string) error {
return nil
}
// restoreServices is used to load previously persisted service definitions
// into the agent during startup.
func (a *Agent) restoreServices() error {
svcDir := filepath.Join(a.config.DataDir, servicesDir)
if _, err := os.Stat(svcDir); os.IsNotExist(err) {
return nil
}
err := filepath.Walk(svcDir, func(path string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
if fi.Name() == servicesDir {
return nil
}
fh, err := os.Open(filepath.Join(svcDir, fi.Name()))
if err != nil {
return err
}
content := make([]byte, fi.Size())
if _, err := fh.Read(content); err != nil {
return err
}
var svc *structs.NodeService
if err := json.Unmarshal(content, &svc); err != nil {
return err
}
if _, ok := a.state.services[svc.ID]; ok {
// Purge previously persisted service. This allows config to be
// preferred over services persisted from the API.
a.logger.Printf("[DEBUG] Service %s exists, not restoring", svc.ID)
return a.purgeService(svc.ID)
} else {
a.logger.Printf("[DEBUG] Restored service definition: %s", svc.ID)
return a.AddService(svc, nil, false)
}
})
return err
}
// persistCheck saves a check definition to the local agent's state directory
func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *CheckType) error {
checkPath := filepath.Join(a.config.DataDir, checksDir, stringHash(check.CheckID))
@ -600,52 +558,6 @@ func (a *Agent) purgeCheck(checkID string) error {
return nil
}
// restoreChecks is used to load previously persisted health check definitions
// into the agent during startup.
func (a *Agent) restoreChecks() error {
checkDir := filepath.Join(a.config.DataDir, checksDir)
if _, err := os.Stat(checkDir); os.IsNotExist(err) {
return nil
}
err := filepath.Walk(checkDir, func(path string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
if fi.Name() == checksDir {
return nil
}
fh, err := os.Open(filepath.Join(checkDir, fi.Name()))
if err != nil {
return err
}
content := make([]byte, fi.Size())
if _, err := fh.Read(content); err != nil {
return err
}
var p persistedCheck
if err := json.Unmarshal(content, &p); err != nil {
return err
}
if _, ok := a.state.checks[p.Check.CheckID]; ok {
// Purge previously persisted check. This allows config to be
// preferred over persisted checks from the API.
a.logger.Printf("[DEBUG] Check %s exists, not restoring", p.Check.CheckID)
return a.purgeCheck(p.Check.CheckID)
} else {
// Default check to critical to avoid placing potentially unhealthy
// services into the active pool
p.Check.Status = structs.HealthCritical
a.logger.Printf("[DEBUG] Restored health check: %s", p.Check.CheckID)
return a.AddCheck(p.Check, p.ChkType, false)
}
})
return err
}
// AddService is used to add a service entry.
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
@ -908,12 +820,48 @@ func (a *Agent) loadServices(conf *Config) error {
}
// Load any persisted services
if err := a.restoreServices(); err != nil {
return fmt.Errorf("Failed restoring services: %s", err)
svcDir := filepath.Join(a.config.DataDir, servicesDir)
if _, err := os.Stat(svcDir); os.IsNotExist(err) {
return nil
}
err := filepath.Walk(svcDir, func(path string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
if fi.Name() == servicesDir {
return nil
}
filePath := filepath.Join(svcDir, fi.Name())
fh, err := os.Open(filePath)
if err != nil {
return err
}
content := make([]byte, fi.Size())
if _, err := fh.Read(content); err != nil {
return err
}
var svc *structs.NodeService
if err := json.Unmarshal(content, &svc); err != nil {
return err
}
if _, ok := a.state.services[svc.ID]; ok {
// Purge previously persisted service. This allows config to be
// preferred over services persisted from the API.
a.logger.Printf("[DEBUG] Service %q exists, not restoring from %q",
svc.ID, filePath)
return a.purgeService(svc.ID)
} else {
a.logger.Printf("[DEBUG] Restored service definition %q from %q",
svc.ID, filePath)
return a.AddService(svc, nil, false)
}
})
return err
}
// unloadServices will deregister all services other than the 'consul' service
// known to the local agent.
@ -943,12 +891,52 @@ func (a *Agent) loadChecks(conf *Config) error {
}
// Load any persisted checks
if err := a.restoreChecks(); err != nil {
return fmt.Errorf("Failed restoring checks: %s", err)
checkDir := filepath.Join(a.config.DataDir, checksDir)
if _, err := os.Stat(checkDir); os.IsNotExist(err) {
return nil
}
err := filepath.Walk(checkDir, func(path string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
if fi.Name() == checksDir {
return nil
}
filePath := filepath.Join(checkDir, fi.Name())
fh, err := os.Open(filePath)
if err != nil {
return err
}
content := make([]byte, fi.Size())
if _, err := fh.Read(content); err != nil {
return err
}
var p persistedCheck
if err := json.Unmarshal(content, &p); err != nil {
return err
}
if _, ok := a.state.checks[p.Check.CheckID]; ok {
// Purge previously persisted check. This allows config to be
// preferred over persisted checks from the API.
a.logger.Printf("[DEBUG] Check %q exists, not restoring from %q",
p.Check.CheckID, filePath)
return a.purgeCheck(p.Check.CheckID)
} else {
// Default check to critical to avoid placing potentially unhealthy
// services into the active pool
p.Check.Status = structs.HealthCritical
a.logger.Printf("[DEBUG] Restored health check %q from %q",
p.Check.CheckID, filePath)
return a.AddCheck(p.Check, p.ChkType, false)
}
})
return err
}
// unloadChecks will deregister all checks known to the local agent.
func (a *Agent) unloadChecks() error {