mirror of https://github.com/status-im/consul.git
agent: avoid reverting any check updates that occur while a service is being added or the config is reloaded (#6144)
This commit is contained in:
parent
7b0614ea82
commit
b16d7f00bc
|
@ -457,7 +457,7 @@ func (a *Agent) Start() error {
|
|||
if err := a.loadProxies(c); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := a.loadChecks(c); err != nil {
|
||||
if err := a.loadChecks(c, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := a.loadMetadata(c); err != nil {
|
||||
|
@ -2109,10 +2109,10 @@ func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*str
|
|||
a.PauseSync()
|
||||
defer a.ResumeSync()
|
||||
|
||||
// Take a snapshot of the current state of checks (if any), and
|
||||
// restore them before resuming anti-entropy.
|
||||
// Take a snapshot of the current state of checks (if any), and when adding
|
||||
// a check that already existed carry over the state before resuming
|
||||
// anti-entropy.
|
||||
snap := a.snapshotCheckState()
|
||||
defer a.restoreCheckState(snap)
|
||||
|
||||
var checks []*structs.HealthCheck
|
||||
|
||||
|
@ -2143,6 +2143,13 @@ func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*str
|
|||
check.Status = chkType.Status
|
||||
}
|
||||
|
||||
// Restore the fields from the snapshot.
|
||||
prev, ok := snap[check.CheckID]
|
||||
if ok {
|
||||
check.Output = prev.Output
|
||||
check.Status = prev.Status
|
||||
}
|
||||
|
||||
checks = append(checks, check)
|
||||
}
|
||||
|
||||
|
@ -3346,10 +3353,17 @@ func (a *Agent) unloadServices() error {
|
|||
|
||||
// loadChecks loads check definitions and/or persisted check definitions from
|
||||
// disk and re-registers them with the local agent.
|
||||
func (a *Agent) loadChecks(conf *config.RuntimeConfig) error {
|
||||
func (a *Agent) loadChecks(conf *config.RuntimeConfig, snap map[types.CheckID]*structs.HealthCheck) error {
|
||||
// Register the checks from config
|
||||
for _, check := range conf.Checks {
|
||||
health := check.HealthCheck(conf.NodeName)
|
||||
|
||||
// Restore the fields from the snapshot.
|
||||
if prev, ok := snap[health.CheckID]; ok {
|
||||
health.Output = prev.Output
|
||||
health.Status = prev.Status
|
||||
}
|
||||
|
||||
chkType := check.CheckType()
|
||||
if err := a.addCheckLocked(health, chkType, false, check.Token, ConfigSourceLocal); err != nil {
|
||||
return fmt.Errorf("Failed to register check '%s': %v %v", check.Name, err, check)
|
||||
|
@ -3406,6 +3420,12 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig) error {
|
|||
// services into the active pool
|
||||
p.Check.Status = api.HealthCritical
|
||||
|
||||
// Restore the fields from the snapshot.
|
||||
if prev, ok := snap[p.Check.CheckID]; ok {
|
||||
p.Check.Output = prev.Output
|
||||
p.Check.Status = prev.Status
|
||||
}
|
||||
|
||||
if err := a.addCheckLocked(p.Check, p.ChkType, false, p.Token, ConfigSourceLocal); err != nil {
|
||||
// Purge the check if it is unable to be restored.
|
||||
a.logger.Printf("[WARN] agent: Failed to restore check %q: %s",
|
||||
|
@ -3634,15 +3654,6 @@ func (a *Agent) snapshotCheckState() map[types.CheckID]*structs.HealthCheck {
|
|||
return a.State.Checks()
|
||||
}
|
||||
|
||||
// restoreCheckState is used to reset the health state based on a snapshot.
|
||||
// This is done after we finish the reload to avoid any unnecessary flaps
|
||||
// in health state and potential session invalidations.
|
||||
func (a *Agent) restoreCheckState(snap map[types.CheckID]*structs.HealthCheck) {
|
||||
for id, check := range snap {
|
||||
a.State.UpdateCheck(id, check.Status, check.Output)
|
||||
}
|
||||
}
|
||||
|
||||
// loadMetadata loads node metadata fields from the agent config and
|
||||
// updates them on the local agent.
|
||||
func (a *Agent) loadMetadata(conf *config.RuntimeConfig) error {
|
||||
|
@ -3765,9 +3776,9 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
|
|||
a.stateLock.Lock()
|
||||
defer a.stateLock.Unlock()
|
||||
|
||||
// Snapshot the current state, and restore it afterwards
|
||||
// Snapshot the current state, and use that to initialize the checks when
|
||||
// they are recreated.
|
||||
snap := a.snapshotCheckState()
|
||||
defer a.restoreCheckState(snap)
|
||||
|
||||
// First unload all checks, services, and metadata. This lets us begin the reload
|
||||
// with a clean slate.
|
||||
|
@ -3798,7 +3809,7 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
|
|||
if err := a.loadProxies(newCfg); err != nil {
|
||||
return fmt.Errorf("Failed reloading proxies: %s", err)
|
||||
}
|
||||
if err := a.loadChecks(newCfg); err != nil {
|
||||
if err := a.loadChecks(newCfg, snap); err != nil {
|
||||
return fmt.Errorf("Failed reloading checks: %s", err)
|
||||
}
|
||||
if err := a.loadMetadata(newCfg); err != nil {
|
||||
|
|
|
@ -500,6 +500,62 @@ func TestAgent_AddService(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestAgent_AddServices_AliasUpdateCheckNotReverted(t *testing.T) {
|
||||
t.Parallel()
|
||||
a := NewTestAgent(t, t.Name(), `
|
||||
node_name = "node1"
|
||||
`)
|
||||
defer a.Shutdown()
|
||||
|
||||
// It's tricky to get an UpdateCheck call to be timed properly so it lands
|
||||
// right in the middle of an addServiceInternal call so we cheat a bit and
|
||||
// rely upon alias checks to do that work for us. We add enough services
|
||||
// that probabilistically one of them is going to end up properly in the
|
||||
// critical section.
|
||||
//
|
||||
// The first number I picked here (10) surprisingly failed every time prior
|
||||
// to PR #6144 solving the underlying problem.
|
||||
const numServices = 10
|
||||
|
||||
services := make([]*structs.ServiceDefinition, numServices)
|
||||
checkIDs := make([]types.CheckID, numServices)
|
||||
for i := 0; i < numServices; i++ {
|
||||
name := fmt.Sprintf("web-%d", i)
|
||||
|
||||
services[i] = &structs.ServiceDefinition{
|
||||
ID: name,
|
||||
Name: name,
|
||||
Port: 8080 + i,
|
||||
Checks: []*structs.CheckType{
|
||||
&structs.CheckType{
|
||||
Name: "alias-for-fake-service",
|
||||
AliasService: "fake",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
checkIDs[i] = types.CheckID("service:" + name)
|
||||
}
|
||||
|
||||
// Add all of the services quickly as you might do from config file snippets.
|
||||
for _, service := range services {
|
||||
ns := service.NodeService()
|
||||
|
||||
chkTypes, err := service.CheckTypes()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, a.AddService(ns, chkTypes, false, service.Token, ConfigSourceLocal))
|
||||
}
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
gotChecks := a.State.Checks()
|
||||
for id, check := range gotChecks {
|
||||
require.Equal(r, "passing", check.Status, "check %q is wrong", id)
|
||||
require.Equal(r, "No checks found.", check.Output, "check %q is wrong", id)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestAgent_AddServiceNoExec(t *testing.T) {
|
||||
t.Parallel()
|
||||
a := NewTestAgent(t, t.Name(), `
|
||||
|
@ -2965,14 +3021,11 @@ func TestAgent_checkStateSnapshot(t *testing.T) {
|
|||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Reload the checks
|
||||
if err := a.loadChecks(a.Config); err != nil {
|
||||
// Reload the checks and restore the snapshot.
|
||||
if err := a.loadChecks(a.Config, snap); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Restore the state
|
||||
a.restoreCheckState(snap)
|
||||
|
||||
// Search for the check
|
||||
out, ok := a.State.Checks()[check1.CheckID]
|
||||
if !ok {
|
||||
|
@ -3010,7 +3063,7 @@ func TestAgent_loadChecks_checkFails(t *testing.T) {
|
|||
}
|
||||
|
||||
// Try loading the checks from the persisted files
|
||||
if err := a.loadChecks(a.Config); err != nil {
|
||||
if err := a.loadChecks(a.Config, nil); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue