Merge pull request #497 from hashicorp/f-persist

Persist locally registered services and checks
This commit is contained in:
Armon Dadgar 2014-11-24 11:14:08 -08:00
commit f74d3dbd92
2 changed files with 285 additions and 2 deletions

View File

@ -1,6 +1,7 @@
package agent package agent
import ( import (
"encoding/json"
"fmt" "fmt"
"io" "io"
"log" "log"
@ -15,6 +16,14 @@ import (
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
) )
const (
// Path to save agent service definitions
servicesDir = "services"
// Path to save local agent checks
checksDir = "checks"
)
/* /*
The agent is the long running process that is run on every machine. The agent is the long running process that is run on every machine.
It exposes an RPC interface that is used by the CLI to control the It exposes an RPC interface that is used by the CLI to control the
@ -132,6 +141,14 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
return nil, err return nil, err
} }
// Load any persisted services and services
if err := agent.restoreServices(); err != nil {
return nil, err
}
if err := agent.restoreChecks(); err != nil {
return nil, err
}
// Start handling events // Start handling events
go agent.handleEvents() go agent.handleEvents()
@ -472,6 +489,144 @@ func (a *Agent) ResumeSync() {
a.state.Resume() a.state.Resume()
} }
// persistService saves a service definition to a JSON file in the data dir
func (a *Agent) persistService(service *structs.NodeService) error {
svcPath := filepath.Join(a.config.DataDir, servicesDir, service.ID)
if _, err := os.Stat(svcPath); os.IsNotExist(err) {
encoded, err := json.Marshal(service)
if err != nil {
return nil
}
if err := os.MkdirAll(filepath.Dir(svcPath), 0700); err != nil {
return err
}
fh, err := os.OpenFile(svcPath, os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return err
}
defer fh.Close()
if _, err := fh.Write(encoded); err != nil {
return err
}
}
return nil
}
// purgeService removes a persisted service definition file from the data dir
func (a *Agent) purgeService(serviceID string) error {
svcPath := filepath.Join(a.config.DataDir, servicesDir, serviceID)
if _, err := os.Stat(svcPath); err == nil {
return os.Remove(svcPath)
}
return nil
}
// restoreServices is used to load previously persisted service definitions
// into the agent during startup.
func (a *Agent) restoreServices() error {
svcDir := filepath.Join(a.config.DataDir, servicesDir)
if _, err := os.Stat(svcDir); os.IsNotExist(err) {
return nil
}
err := filepath.Walk(svcDir, func(path string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
if fi.Name() == servicesDir {
return nil
}
fh, err := os.Open(filepath.Join(svcDir, fi.Name()))
if err != nil {
return err
}
content := make([]byte, fi.Size())
if _, err := fh.Read(content); err != nil {
return err
}
var svc *structs.NodeService
if err := json.Unmarshal(content, &svc); err != nil {
return err
}
a.logger.Printf("[DEBUG] Restored service definition: %s", svc.ID)
return a.AddService(svc, nil)
})
return err
}
// persistCheck saves a check definition to the local agent's state directory
func (a *Agent) persistCheck(check *structs.HealthCheck) error {
checkPath := filepath.Join(a.config.DataDir, checksDir, check.CheckID)
if _, err := os.Stat(checkPath); os.IsNotExist(err) {
encoded, err := json.Marshal(check)
if err != nil {
return nil
}
if err := os.MkdirAll(filepath.Dir(checkPath), 0700); err != nil {
return err
}
fh, err := os.OpenFile(checkPath, os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return err
}
defer fh.Close()
if _, err := fh.Write(encoded); err != nil {
return err
}
}
return nil
}
// purgeCheck removes a persisted check definition file from the data dir
func (a *Agent) purgeCheck(checkID string) error {
checkPath := filepath.Join(a.config.DataDir, checksDir, checkID)
if _, err := os.Stat(checkPath); err == nil {
return os.Remove(checkPath)
}
return nil
}
// restoreChecks is used to load previously persisted health check definitions
// into the agent during startup.
func (a *Agent) restoreChecks() error {
checkDir := filepath.Join(a.config.DataDir, checksDir)
if _, err := os.Stat(checkDir); os.IsNotExist(err) {
return nil
}
err := filepath.Walk(checkDir, func(path string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
if fi.Name() == checksDir {
return nil
}
fh, err := os.Open(filepath.Join(checkDir, fi.Name()))
if err != nil {
return err
}
content := make([]byte, fi.Size())
if _, err := fh.Read(content); err != nil {
return err
}
var check *structs.HealthCheck
if err := json.Unmarshal(content, &check); err != nil {
return err
}
// Default check to critical to avoid placing potentially unhealthy
// services into the active pool
check.Status = structs.HealthCritical
a.logger.Printf("[DEBUG] Restored health check: %s", check.CheckID)
return a.AddCheck(check, nil)
})
return err
}
// AddService is used to add a service entry. // AddService is used to add a service entry.
// This entry is persistent and the agent will make a best effort to // This entry is persistent and the agent will make a best effort to
// ensure it is registered // ensure it is registered
@ -489,6 +644,11 @@ func (a *Agent) AddService(service *structs.NodeService, chkType *CheckType) err
// Add the service // Add the service
a.state.AddService(service) a.state.AddService(service)
// Persist the service to a file
if err := a.persistService(service); err != nil {
return err
}
// Create an associated health check // Create an associated health check
if chkType != nil { if chkType != nil {
check := &structs.HealthCheck{ check := &structs.HealthCheck{
@ -520,6 +680,11 @@ func (a *Agent) RemoveService(serviceID string) error {
// Remove service immeidately // Remove service immeidately
a.state.RemoveService(serviceID) a.state.RemoveService(serviceID)
// Remove the service from the data dir
if err := a.purgeService(serviceID); err != nil {
return err
}
// Deregister any associated health checks // Deregister any associated health checks
checkID := fmt.Sprintf("service:%s", serviceID) checkID := fmt.Sprintf("service:%s", serviceID)
return a.RemoveCheck(checkID) return a.RemoveCheck(checkID)
@ -580,7 +745,9 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType) error {
// Add to the local state for anti-entropy // Add to the local state for anti-entropy
a.state.AddCheck(check) a.state.AddCheck(check)
return nil
// Persist the check
return a.persistCheck(check)
} }
// RemoveCheck is used to remove a health check. // RemoveCheck is used to remove a health check.
@ -601,7 +768,7 @@ func (a *Agent) RemoveCheck(checkID string) error {
check.Stop() check.Stop()
delete(a.checkTTLs, checkID) delete(a.checkTTLs, checkID)
} }
return nil return a.purgeCheck(checkID)
} }
// UpdateCheck is used to update the status of a check. // UpdateCheck is used to update the status of a check.

View File

@ -1,6 +1,8 @@
package agent package agent
import ( import (
"bytes"
"encoding/json"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -380,3 +382,117 @@ func TestAgent_ConsulService(t *testing.T) {
t.Fatalf("%s service should be in sync", consul.ConsulServiceID) t.Fatalf("%s service should be in sync", consul.ConsulServiceID)
} }
} }
func TestAgent_PersistService(t *testing.T) {
config := nextConfig()
dir, agent := makeAgent(t, config)
defer os.RemoveAll(dir)
svc := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"foo"},
Port: 8000,
}
if err := agent.AddService(svc, nil); err != nil {
t.Fatalf("err: %v", err)
}
file := filepath.Join(agent.config.DataDir, servicesDir, svc.ID)
if _, err := os.Stat(file); err != nil {
t.Fatalf("err: %s", err)
}
expected, err := json.Marshal(svc)
if err != nil {
t.Fatalf("err: %s", err)
}
content, err := ioutil.ReadFile(file)
if err != nil {
t.Fatalf("err: %s", err)
}
if !bytes.Equal(expected, content) {
t.Fatalf("bad: %s", string(content))
}
agent.Shutdown()
// Should load it back during later start
agent2, err := Create(config, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
defer agent2.Shutdown()
if _, ok := agent2.state.services[svc.ID]; !ok {
t.Fatalf("bad: %#v", agent2.state.services)
}
// Should remove the service file
if err := agent2.RemoveService(svc.ID); err != nil {
t.Fatalf("err: %s", err)
}
if _, err := os.Stat(file); !os.IsNotExist(err) {
t.Fatalf("err: %s", err)
}
}
func TestAgent_PersistCheck(t *testing.T) {
config := nextConfig()
dir, agent := makeAgent(t, config)
defer os.RemoveAll(dir)
check := &structs.HealthCheck{
Node: config.NodeName,
CheckID: "service:redis1",
Name: "redischeck",
Status: structs.HealthPassing,
ServiceID: "redis",
ServiceName: "redis",
}
if err := agent.AddCheck(check, nil); err != nil {
t.Fatalf("err: %v", err)
}
file := filepath.Join(agent.config.DataDir, checksDir, check.CheckID)
if _, err := os.Stat(file); err != nil {
t.Fatalf("err: %s", err)
}
expected, err := json.Marshal(check)
if err != nil {
t.Fatalf("err: %s", err)
}
content, err := ioutil.ReadFile(file)
if err != nil {
t.Fatalf("err: %s", err)
}
if !bytes.Equal(expected, content) {
t.Fatalf("bad: %s", string(content))
}
agent.Shutdown()
// Should load it back during later start
agent2, err := Create(config, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
defer agent2.Shutdown()
result, ok := agent2.state.checks[check.CheckID]
if !ok {
t.Fatalf("bad: %#v", agent2.state.checks)
}
if result.Status != structs.HealthCritical {
t.Fatalf("bad: %#v", result)
}
// Should remove the service file
if err := agent2.RemoveCheck(check.CheckID); err != nil {
t.Fatalf("err: %s", err)
}
if _, err := os.Stat(file); !os.IsNotExist(err) {
t.Fatalf("err: %s", err)
}
}