mirror of https://github.com/status-im/consul.git
agent: configuration reload preserves check's statuses for services (#7345)
This fixes issue #7318 Between versions 1.5.2 and 1.5.3, a regression has been introduced regarding health of services. A patch #6144 had been issued for HealthChecks of nodes, but not for healthchecks of services. What happened when a reload was: 1. save all healthcheck statuses 2. cleanup everything 3. add new services with healthchecks In step 3, the state of healthchecks was taken into account locally, so at step 3, but since we cleaned up at step 2, state was lost. This PR introduces the snap parameter, so step 3 can use information from step 1
This commit is contained in:
parent
4137d06f9f
commit
864f7efffa
|
@ -478,7 +478,7 @@ func (a *Agent) Start() error {
|
||||||
a.serviceManager.Start()
|
a.serviceManager.Start()
|
||||||
|
|
||||||
// Load checks/services/metadata.
|
// Load checks/services/metadata.
|
||||||
if err := a.loadServices(c); err != nil {
|
if err := a.loadServices(c, nil); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := a.loadChecks(c, nil); err != nil {
|
if err := a.loadChecks(c, nil); err != nil {
|
||||||
|
@ -2289,7 +2289,7 @@ func (a *Agent) AddServiceAndReplaceChecks(service *structs.NodeService, chkType
|
||||||
token: token,
|
token: token,
|
||||||
replaceExistingChecks: true,
|
replaceExistingChecks: true,
|
||||||
source: source,
|
source: source,
|
||||||
})
|
}, a.snapshotCheckState())
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddService is used to add a service entry.
|
// AddService is used to add a service entry.
|
||||||
|
@ -2308,12 +2308,12 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.Che
|
||||||
token: token,
|
token: token,
|
||||||
replaceExistingChecks: false,
|
replaceExistingChecks: false,
|
||||||
source: source,
|
source: source,
|
||||||
})
|
}, a.snapshotCheckState())
|
||||||
}
|
}
|
||||||
|
|
||||||
// addServiceLocked adds a service entry to the service manager if enabled, or directly
|
// addServiceLocked adds a service entry to the service manager if enabled, or directly
|
||||||
// to the local state if it is not. This function assumes the state lock is already held.
|
// to the local state if it is not. This function assumes the state lock is already held.
|
||||||
func (a *Agent) addServiceLocked(req *addServiceRequest) error {
|
func (a *Agent) addServiceLocked(req *addServiceRequest, snap map[structs.CheckID]*structs.HealthCheck) error {
|
||||||
req.fixupForAddServiceLocked()
|
req.fixupForAddServiceLocked()
|
||||||
|
|
||||||
req.service.EnterpriseMeta.Normalize()
|
req.service.EnterpriseMeta.Normalize()
|
||||||
|
@ -2331,7 +2331,7 @@ func (a *Agent) addServiceLocked(req *addServiceRequest) error {
|
||||||
req.persistDefaults = nil
|
req.persistDefaults = nil
|
||||||
req.persistServiceConfig = false
|
req.persistServiceConfig = false
|
||||||
|
|
||||||
return a.addServiceInternal(req)
|
return a.addServiceInternal(req, snap)
|
||||||
}
|
}
|
||||||
|
|
||||||
// addServiceRequest is the union of arguments for calling both
|
// addServiceRequest is the union of arguments for calling both
|
||||||
|
@ -2369,7 +2369,7 @@ func (r *addServiceRequest) fixupForAddServiceInternal() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// addServiceInternal adds the given service and checks to the local state.
|
// addServiceInternal adds the given service and checks to the local state.
|
||||||
func (a *Agent) addServiceInternal(req *addServiceRequest) error {
|
func (a *Agent) addServiceInternal(req *addServiceRequest, snap map[structs.CheckID]*structs.HealthCheck) error {
|
||||||
req.fixupForAddServiceInternal()
|
req.fixupForAddServiceInternal()
|
||||||
var (
|
var (
|
||||||
service = req.service
|
service = req.service
|
||||||
|
@ -2407,11 +2407,6 @@ func (a *Agent) addServiceInternal(req *addServiceRequest) error {
|
||||||
service.TaggedAddresses[structs.TaggedAddressWANIPv6] = structs.ServiceAddress{Address: service.Address, Port: service.Port}
|
service.TaggedAddresses[structs.TaggedAddressWANIPv6] = structs.ServiceAddress{Address: service.Address, Port: service.Port}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Take a snapshot of the current state of checks (if any), and when adding
|
|
||||||
// a check that already existed carry over the state before resuming
|
|
||||||
// anti-entropy.
|
|
||||||
snap := a.snapshotCheckState()
|
|
||||||
|
|
||||||
var checks []*structs.HealthCheck
|
var checks []*structs.HealthCheck
|
||||||
|
|
||||||
// all the checks must be associated with the same enterprise meta of the service
|
// all the checks must be associated with the same enterprise meta of the service
|
||||||
|
@ -3489,7 +3484,7 @@ func (a *Agent) deletePid() error {
|
||||||
|
|
||||||
// loadServices will load service definitions from configuration and persisted
|
// loadServices will load service definitions from configuration and persisted
|
||||||
// definitions on disk, and load them into the local agent.
|
// definitions on disk, and load them into the local agent.
|
||||||
func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
|
func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckID]*structs.HealthCheck) error {
|
||||||
// Load any persisted service configs so we can feed those into the initial
|
// Load any persisted service configs so we can feed those into the initial
|
||||||
// registrations below.
|
// registrations below.
|
||||||
persistedServiceConfigs, err := a.readPersistedServiceConfigs()
|
persistedServiceConfigs, err := a.readPersistedServiceConfigs()
|
||||||
|
@ -3526,7 +3521,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
|
||||||
token: service.Token,
|
token: service.Token,
|
||||||
replaceExistingChecks: false, // do default behavior
|
replaceExistingChecks: false, // do default behavior
|
||||||
source: ConfigSourceLocal,
|
source: ConfigSourceLocal,
|
||||||
})
|
}, snap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Failed to register service %q: %v", service.Name, err)
|
return fmt.Errorf("Failed to register service %q: %v", service.Name, err)
|
||||||
}
|
}
|
||||||
|
@ -3544,7 +3539,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
|
||||||
token: sidecarToken,
|
token: sidecarToken,
|
||||||
replaceExistingChecks: false, // do default behavior
|
replaceExistingChecks: false, // do default behavior
|
||||||
source: ConfigSourceLocal,
|
source: ConfigSourceLocal,
|
||||||
})
|
}, snap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Failed to register sidecar for service %q: %v", service.Name, err)
|
return fmt.Errorf("Failed to register sidecar for service %q: %v", service.Name, err)
|
||||||
}
|
}
|
||||||
|
@ -3636,8 +3631,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
|
||||||
token: p.Token,
|
token: p.Token,
|
||||||
replaceExistingChecks: false, // do default behavior
|
replaceExistingChecks: false, // do default behavior
|
||||||
source: source,
|
source: source,
|
||||||
},
|
}, snap)
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed adding service %q: %s", serviceID, err)
|
return fmt.Errorf("failed adding service %q: %s", serviceID, err)
|
||||||
}
|
}
|
||||||
|
@ -3672,7 +3666,6 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig, snap map[structs.CheckID]
|
||||||
// Register the checks from config
|
// Register the checks from config
|
||||||
for _, check := range conf.Checks {
|
for _, check := range conf.Checks {
|
||||||
health := check.HealthCheck(conf.NodeName)
|
health := check.HealthCheck(conf.NodeName)
|
||||||
|
|
||||||
// Restore the fields from the snapshot.
|
// Restore the fields from the snapshot.
|
||||||
if prev, ok := snap[health.CompoundCheckID()]; ok {
|
if prev, ok := snap[health.CompoundCheckID()]; ok {
|
||||||
health.Output = prev.Output
|
health.Output = prev.Output
|
||||||
|
@ -4025,7 +4018,7 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reload service/check definitions and metadata.
|
// Reload service/check definitions and metadata.
|
||||||
if err := a.loadServices(newCfg); err != nil {
|
if err := a.loadServices(newCfg, snap); err != nil {
|
||||||
return fmt.Errorf("Failed reloading services: %s", err)
|
return fmt.Errorf("Failed reloading services: %s", err)
|
||||||
}
|
}
|
||||||
if err := a.loadChecks(newCfg, snap); err != nil {
|
if err := a.loadChecks(newCfg, snap); err != nil {
|
||||||
|
|
|
@ -2030,7 +2030,7 @@ func testAgent_persistedService_compat(t *testing.T, extraHCL string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load the services
|
// Load the services
|
||||||
if err := a.loadServices(a.Config); err != nil {
|
if err := a.loadServices(a.Config, nil); err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3437,6 +3437,54 @@ func TestAgent_ReloadConfigOutgoingRPCConfig(t *testing.T) {
|
||||||
require.Len(t, tlsConf.ClientCAs.Subjects(), 2)
|
require.Len(t, tlsConf.ClientCAs.Subjects(), 2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAgent_ReloadConfigAndKeepChecksStatus(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
dataDir := testutil.TempDir(t, "agent") // we manage the data dir
|
||||||
|
defer os.RemoveAll(dataDir)
|
||||||
|
waitDurationSeconds := 1
|
||||||
|
hcl := `data_dir = "` + dataDir + `"
|
||||||
|
enable_local_script_checks=true
|
||||||
|
services=[{
|
||||||
|
name="webserver1",
|
||||||
|
check{name="check1",
|
||||||
|
args=["true"],
|
||||||
|
interval="` + strconv.Itoa(waitDurationSeconds) + `s"}}
|
||||||
|
]`
|
||||||
|
a := NewTestAgent(t, t.Name(), hcl)
|
||||||
|
defer a.Shutdown()
|
||||||
|
|
||||||
|
// Initially, state is critical during waitDurationSeconds seconds
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
gotChecks := a.State.Checks(nil)
|
||||||
|
require.Equal(r, 1, len(gotChecks), "Should have a check registered, but had %#v", gotChecks)
|
||||||
|
for id, check := range gotChecks {
|
||||||
|
require.Equal(r, "critical", check.Status, "check %q is wrong", id)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
c := TestConfig(testutil.Logger(t), config.Source{Name: t.Name(), Format: "hcl", Data: hcl})
|
||||||
|
a.ReloadConfig(c)
|
||||||
|
time.Sleep(time.Duration(waitDurationSeconds) * time.Second)
|
||||||
|
// It should now be passing
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
gotChecks := a.State.Checks(nil)
|
||||||
|
for id, check := range gotChecks {
|
||||||
|
require.Equal(r, "passing", check.Status, "check %q is wrong", id)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
require.NoError(t, a.ReloadConfig(c))
|
||||||
|
// After reload, should be passing directly (no critical state)
|
||||||
|
for id, check := range a.State.Checks(nil) {
|
||||||
|
require.Equal(t, "passing", check.Status, "check %q is wrong", id)
|
||||||
|
}
|
||||||
|
// Ensure to take reload into account event with async stuff
|
||||||
|
time.Sleep(time.Duration(100) * time.Millisecond)
|
||||||
|
// Of course, after a sleep, should be Ok too
|
||||||
|
for id, check := range a.State.Checks(nil) {
|
||||||
|
require.Equal(t, "passing", check.Status, "check %q is wrong", id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestAgent_ReloadConfigIncomingRPCConfig(t *testing.T) {
|
func TestAgent_ReloadConfigIncomingRPCConfig(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
dataDir := testutil.TempDir(t, "agent") // we manage the data dir
|
dataDir := testutil.TempDir(t, "agent") // we manage the data dir
|
||||||
|
|
|
@ -86,7 +86,7 @@ func (s *ServiceManager) registerOnce(args *addServiceRequest) error {
|
||||||
s.agent.stateLock.Lock()
|
s.agent.stateLock.Lock()
|
||||||
defer s.agent.stateLock.Unlock()
|
defer s.agent.stateLock.Unlock()
|
||||||
|
|
||||||
err := s.agent.addServiceInternal(args)
|
err := s.agent.addServiceInternal(args, s.agent.snapshotCheckState())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error updating service registration: %v", err)
|
return fmt.Errorf("error updating service registration: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -127,7 +127,7 @@ func (s *ServiceManager) AddService(req *addServiceRequest) error {
|
||||||
req.persistService = nil
|
req.persistService = nil
|
||||||
req.persistDefaults = nil
|
req.persistDefaults = nil
|
||||||
req.persistServiceConfig = false
|
req.persistServiceConfig = false
|
||||||
return s.agent.addServiceInternal(req)
|
return s.agent.addServiceInternal(req, s.agent.snapshotCheckState())
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -279,7 +279,7 @@ func (w *serviceConfigWatch) RegisterAndStart(
|
||||||
token: w.registration.token,
|
token: w.registration.token,
|
||||||
replaceExistingChecks: w.registration.replaceExistingChecks,
|
replaceExistingChecks: w.registration.replaceExistingChecks,
|
||||||
source: w.registration.source,
|
source: w.registration.source,
|
||||||
})
|
}, w.agent.snapshotCheckState())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error updating service registration: %v", err)
|
return fmt.Errorf("error updating service registration: %v", err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue