agent: refactor loadChecks/loadServices, fixes a few minor bugs

This commit is contained in:
Ryan Uber 2015-06-04 14:33:30 -07:00
parent 5d74523d04
commit ebe57a1f65
1 changed files with 78 additions and 60 deletions

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"os"
@ -955,54 +956,61 @@ func (a *Agent) loadServices(conf *Config) error {
// Load any persisted services
svcDir := filepath.Join(a.config.DataDir, servicesDir)
if _, err := os.Stat(svcDir); os.IsNotExist(err) {
return nil
}
err := filepath.Walk(svcDir, func(path string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
if fi.Name() == servicesDir {
files, err := ioutil.ReadDir(svcDir)
if err != nil {
if os.IsNotExist(err) {
return nil
}
filePath := filepath.Join(svcDir, fi.Name())
fh, err := os.Open(filePath)
return fmt.Errorf("Failed reading services dir %q: %s", svcDir, err)
}
for _, fi := range files {
// Skip all dirs
if fi.IsDir() {
continue
}
// Open the file for reading
file := filepath.Join(svcDir, fi.Name())
fh, err := os.Open(file)
if err != nil {
return err
}
content := make([]byte, fi.Size())
if _, err := fh.Read(content); err != nil {
return err
return fmt.Errorf("failed opening service file %q: %s", file, err)
}
var wrapped *persistedService
var token string
var svc *structs.NodeService
if err := json.Unmarshal(content, &wrapped); err != nil {
// Read the contents into a buffer
buf, err := ioutil.ReadAll(fh)
fh.Close()
if err != nil {
return fmt.Errorf("failed reading service file %q: %s", file, err)
}
// Try decoding the service definition
var p persistedService
if err := json.Unmarshal(buf, &p); err != nil {
// Backwards-compatibility for pre-0.5.1 persisted services
if err := json.Unmarshal(content, &svc); err != nil {
return fmt.Errorf("failed decoding service from %s: %s", filePath, err)
if err := json.Unmarshal(buf, &p.Service); err != nil {
return fmt.Errorf("failed decoding service file %q: %s", file, err)
}
} else {
svc = wrapped.Service
token = wrapped.Token
}
serviceID := p.Service.ID
if _, ok := a.state.services[svc.ID]; ok {
if _, ok := a.state.services[serviceID]; ok {
// Purge previously persisted service. This allows config to be
// preferred over services persisted from the API.
a.logger.Printf("[DEBUG] agent: service %q exists, not restoring from %q",
svc.ID, filePath)
return a.purgeService(svc.ID)
serviceID, file)
if err := a.purgeService(serviceID); err != nil {
return fmt.Errorf("failed purging service %q: %s", serviceID, err)
}
} else {
a.logger.Printf("[DEBUG] agent: restored service definition %q from %q",
svc.ID, filePath)
return a.AddService(svc, nil, false, token)
serviceID, file)
if err := a.AddService(p.Service, nil, false, p.Token); err != nil {
return fmt.Errorf("failed adding service %q: %s", serviceID, err)
}
}
})
}
return err
return nil
}
// unloadServices will deregister all services other than the 'consul' service
@ -1034,38 +1042,48 @@ func (a *Agent) loadChecks(conf *Config) error {
// Load any persisted checks
checkDir := filepath.Join(a.config.DataDir, checksDir)
if _, err := os.Stat(checkDir); os.IsNotExist(err) {
return nil
}
err := filepath.Walk(checkDir, func(path string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
if fi.Name() == checksDir {
files, err := ioutil.ReadDir(checkDir)
if err != nil {
if os.IsNotExist(err) {
return nil
}
filePath := filepath.Join(checkDir, fi.Name())
fh, err := os.Open(filePath)
return fmt.Errorf("Failed reading checks dir %q: %s", checkDir, err)
}
for _, fi := range files {
// Ignore dirs - we only care about the check definition files
if fi.IsDir() {
continue
}
// Open the file for reading
file := filepath.Join(checkDir, fi.Name())
fh, err := os.Open(file)
if err != nil {
return err
}
content := make([]byte, fi.Size())
if _, err := fh.Read(content); err != nil {
return err
return fmt.Errorf("Failed opening check file %q: %s", file, err)
}
// Read the contents into a buffer
buf, err := ioutil.ReadAll(fh)
fh.Close()
if err != nil {
return fmt.Errorf("failed reading check file %q: %s", file, err)
}
// Decode the check
var p persistedCheck
if err := json.Unmarshal(content, &p); err != nil {
return err
if err := json.Unmarshal(buf, &p); err != nil {
return fmt.Errorf("Failed decoding check file %q: %s", file, err)
}
checkID := p.Check.CheckID
if _, ok := a.state.checks[p.Check.CheckID]; ok {
if _, ok := a.state.checks[checkID]; ok {
// Purge previously persisted check. This allows config to be
// preferred over persisted checks from the API.
a.logger.Printf("[DEBUG] agent: check %q exists, not restoring from %q",
p.Check.CheckID, filePath)
return a.purgeCheck(p.Check.CheckID)
checkID, file)
if err := a.purgeCheck(checkID); err != nil {
return fmt.Errorf("Failed purging check %q: %s", checkID, err)
}
} else {
// Default check to critical to avoid placing potentially unhealthy
// services into the active pool
@ -1074,17 +1092,17 @@ func (a *Agent) loadChecks(conf *Config) error {
if err := a.AddCheck(p.Check, p.ChkType, false, p.Token); err != nil {
// Purge the check if it is unable to be restored.
a.logger.Printf("[WARN] agent: Failed to restore check %q: %s",
p.Check.CheckID, err)
return a.purgeCheck(p.Check.CheckID)
checkID, err)
if err := a.purgeCheck(checkID); err != nil {
return fmt.Errorf("Failed purging check %q: %s", checkID, err)
}
}
a.logger.Printf("[DEBUG] agent: restored health check %q from %q",
p.Check.CheckID, filePath)
return nil
p.Check.CheckID, file)
}
})
}
return err
return nil
}
// unloadChecks will deregister all checks known to the local agent.