mirror of https://github.com/status-im/consul.git
Merge pull request #2480 from hashicorp/b-atomic-writes
Atomic writes for persisting service/check state
This commit is contained in:
commit
83d2f36b54
|
@ -741,6 +741,7 @@ func (a *Agent) reapServices() {
|
||||||
// persistService saves a service definition to a JSON file in the data dir
|
// persistService saves a service definition to a JSON file in the data dir
|
||||||
func (a *Agent) persistService(service *structs.NodeService) error {
|
func (a *Agent) persistService(service *structs.NodeService) error {
|
||||||
svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID))
|
svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID))
|
||||||
|
|
||||||
wrapped := persistedService{
|
wrapped := persistedService{
|
||||||
Token: a.state.ServiceToken(service.ID),
|
Token: a.state.ServiceToken(service.ID),
|
||||||
Service: service,
|
Service: service,
|
||||||
|
@ -749,18 +750,8 @@ func (a *Agent) persistService(service *structs.NodeService) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := os.MkdirAll(filepath.Dir(svcPath), 0700); err != nil {
|
|
||||||
return err
|
return writeFileAtomic(svcPath, encoded)
|
||||||
}
|
|
||||||
fh, err := os.OpenFile(svcPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer fh.Close()
|
|
||||||
if _, err := fh.Write(encoded); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// purgeService removes a persisted service definition file from the data dir
|
// purgeService removes a persisted service definition file from the data dir
|
||||||
|
@ -787,18 +778,8 @@ func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *CheckType) err
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := os.MkdirAll(filepath.Dir(checkPath), 0700); err != nil {
|
|
||||||
return err
|
return writeFileAtomic(checkPath, encoded)
|
||||||
}
|
|
||||||
fh, err := os.OpenFile(checkPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer fh.Close()
|
|
||||||
if _, err := fh.Write(encoded); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// purgeCheck removes a persisted check definition file from the data dir
|
// purgeCheck removes a persisted check definition file from the data dir
|
||||||
|
@ -810,6 +791,29 @@ func (a *Agent) purgeCheck(checkID types.CheckID) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// writeFileAtomic writes the given contents to a temporary file in the same
|
||||||
|
// directory, does an fsync and then renames the file to its real path
|
||||||
|
func writeFileAtomic(path string, contents []byte) error {
|
||||||
|
tempPath := path + ".tmp"
|
||||||
|
if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fh, err := os.OpenFile(tempPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := fh.Write(contents); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := fh.Sync(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := fh.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return os.Rename(tempPath, path)
|
||||||
|
}
|
||||||
|
|
||||||
// AddService is used to add a service entry.
|
// AddService is used to add a service entry.
|
||||||
// This entry is persistent and the agent will make a best effort to
|
// This entry is persistent and the agent will make a best effort to
|
||||||
// ensure it is registered
|
// ensure it is registered
|
||||||
|
@ -1184,8 +1188,16 @@ func (a *Agent) persistCheckState(check *CheckTTL, status, output string) error
|
||||||
|
|
||||||
// Write the state to the file
|
// Write the state to the file
|
||||||
file := filepath.Join(dir, checkIDHash(check.CheckID))
|
file := filepath.Join(dir, checkIDHash(check.CheckID))
|
||||||
if err := ioutil.WriteFile(file, buf, 0600); err != nil {
|
|
||||||
return fmt.Errorf("failed writing file %q: %s", file, err)
|
// Create temp file in same dir, to make more likely atomic
|
||||||
|
tempFile := file + ".tmp"
|
||||||
|
|
||||||
|
// persistCheckState is called frequently, so don't use writeFileAtomic to avoid calling fsync here
|
||||||
|
if err := ioutil.WriteFile(tempFile, buf, 0600); err != nil {
|
||||||
|
return fmt.Errorf("failed writing temp file %q: %s", tempFile, err)
|
||||||
|
}
|
||||||
|
if err := os.Rename(tempFile, file); err != nil {
|
||||||
|
return fmt.Errorf("failed to rename temp file from %q to %q: %s", tempFile, file, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -1206,7 +1218,8 @@ func (a *Agent) loadCheckState(check *structs.HealthCheck) error {
|
||||||
// Decode the state data
|
// Decode the state data
|
||||||
var p persistedCheckState
|
var p persistedCheckState
|
||||||
if err := json.Unmarshal(buf, &p); err != nil {
|
if err := json.Unmarshal(buf, &p); err != nil {
|
||||||
return fmt.Errorf("failed decoding check state: %s", err)
|
a.logger.Printf("[ERROR] agent: failed decoding check state: %s", err)
|
||||||
|
return a.purgeCheckState(check.CheckID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if the state has expired
|
// Check if the state has expired
|
||||||
|
|
Loading…
Reference in New Issue