Syncs a check's output with the catalog when output rate limiting isn't in effect.

This commit is contained in:
James Phillips 2016-04-10 21:20:39 -07:00
parent 711d7a0ad4
commit cfe0651208
2 changed files with 132 additions and 6 deletions

View File

@ -434,11 +434,28 @@ func (l *localState) setSyncState() error {
if l.config.CheckUpdateInterval == 0 {
equal = existing.IsSame(check)
} else {
// Copy the existing check before potentially modifying
// it before the compare operation.
eCopy := new(structs.HealthCheck)
*eCopy = *existing
eCopy.Output = ""
check.Output = ""
equal = eCopy.IsSame(check)
// Copy the server's check before modifying, otherwise
// in-memory RPC-based unit tests will have side effects.
cCopy := new(structs.HealthCheck)
*cCopy = *check
// If there's a defer timer active then we've got a
// potentially spammy check so we don't sync the output
// during this sweep since the timer will mark the check
// out of sync for us. Otherwise, it is safe to sync the
// output now. This is especially important for checks
// that don't change state after they are created, in
// which case we'd never see their output synced back ever.
if _, ok := l.deferCheck[id]; ok {
eCopy.Output = ""
cCopy.Output = ""
}
equal = eCopy.IsSame(cCopy)
}
// Update the status
@ -499,6 +516,8 @@ func (l *localState) syncChanges() error {
if err := l.syncNodeInfo(); err != nil {
return err
}
} else {
l.logger.Printf("[DEBUG] agent: Node info in sync")
}
return nil

View File

@ -654,7 +654,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
conf := nextConfig()
conf.CheckUpdateInterval = 100 * time.Millisecond
conf.CheckUpdateInterval = 500 * time.Millisecond
dir, agent := makeAgent(t, conf)
defer os.RemoveAll(dir)
defer agent.Shutdown()
@ -693,8 +693,8 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
// Update the check output! Should be deferred
agent.state.UpdateCheck("web", structs.HealthPassing, "output")
// Should not update for 100 milliseconds
time.Sleep(50 * time.Millisecond)
// Should not update for 500 milliseconds
time.Sleep(250 * time.Millisecond)
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
t.Fatalf("err: %v", err)
}
@ -729,6 +729,113 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %s", err)
})
// Change the output in the catalog to force it out of sync.
eCopy := new(structs.HealthCheck)
*eCopy = *check
eCopy.Output = "changed"
reg := structs.RegisterRequest{
Datacenter: agent.config.Datacenter,
Node: agent.config.NodeName,
Address: agent.config.AdvertiseAddr,
TaggedAddresses: agent.config.TaggedAddresses,
Check: eCopy,
WriteRequest: structs.WriteRequest{},
}
var out struct{}
if err := agent.RPC("Catalog.Register", &reg, &out); err != nil {
t.Fatalf("err: %s", err)
}
// Verify that the output is out of sync.
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
t.Fatalf("err: %v", err)
}
for _, chk := range checks.HealthChecks {
switch chk.CheckID {
case "web":
if chk.Output != "changed" {
t.Fatalf("unexpected update: %v", chk)
}
}
}
// Trigger anti-entropy run and wait.
agent.StartSync()
time.Sleep(200 * time.Millisecond)
// Verify that the output was synced back to the agent's value.
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
t.Fatalf("err: %v", err)
}
for _, chk := range checks.HealthChecks {
switch chk.CheckID {
case "web":
if chk.Output != "output" {
t.Fatalf("missed update: %v", chk)
}
}
}
// Reset the catalog again.
if err := agent.RPC("Catalog.Register", &reg, &out); err != nil {
t.Fatalf("err: %s", err)
}
// Verify that the output is out of sync.
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
t.Fatalf("err: %v", err)
}
for _, chk := range checks.HealthChecks {
switch chk.CheckID {
case "web":
if chk.Output != "changed" {
t.Fatalf("unexpected update: %v", chk)
}
}
}
// Now make an update that should be deferred.
agent.state.UpdateCheck("web", structs.HealthPassing, "deferred")
// Trigger anti-entropy run and wait.
agent.StartSync()
time.Sleep(200 * time.Millisecond)
// Verify that the output is still out of sync since there's a deferred
// update pending.
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
t.Fatalf("err: %v", err)
}
for _, chk := range checks.HealthChecks {
switch chk.CheckID {
case "web":
if chk.Output != "changed" {
t.Fatalf("unexpected update: %v", chk)
}
}
}
// Wait for the deferred update.
testutil.WaitForResult(func() (bool, error) {
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
return false, err
}
// Verify updated
for _, chk := range checks.HealthChecks {
switch chk.CheckID {
case "web":
if chk.Output != "deferred" {
return false, fmt.Errorf("no update: %v", chk)
}
}
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
func TestAgentAntiEntropy_NodeInfo(t *testing.T) {