mirror of https://github.com/status-im/consul.git
Merge pull request #1934 from hashicorp/b-check-output
Syncs a check's output with the catalog when output rate limiting isn't in effect.
This commit is contained in:
commit
529b24adbf
|
@ -434,11 +434,26 @@ func (l *localState) setSyncState() error {
|
|||
if l.config.CheckUpdateInterval == 0 {
|
||||
equal = existing.IsSame(check)
|
||||
} else {
|
||||
eCopy := new(structs.HealthCheck)
|
||||
*eCopy = *existing
|
||||
// Copy the existing check before potentially modifying
|
||||
// it before the compare operation.
|
||||
eCopy := existing.Clone()
|
||||
|
||||
// Copy the server's check before modifying, otherwise
|
||||
// in-memory RPC-based unit tests will have side effects.
|
||||
cCopy := check.Clone()
|
||||
|
||||
// If there's a defer timer active then we've got a
|
||||
// potentially spammy check so we don't sync the output
|
||||
// during this sweep since the timer will mark the check
|
||||
// out of sync for us. Otherwise, it is safe to sync the
|
||||
// output now. This is especially important for checks
|
||||
// that don't change state after they are created, in
|
||||
// which case we'd never see their output synced back ever.
|
||||
if _, ok := l.deferCheck[id]; ok {
|
||||
eCopy.Output = ""
|
||||
check.Output = ""
|
||||
equal = eCopy.IsSame(check)
|
||||
cCopy.Output = ""
|
||||
}
|
||||
equal = eCopy.IsSame(cCopy)
|
||||
}
|
||||
|
||||
// Update the status
|
||||
|
@ -499,6 +514,8 @@ func (l *localState) syncChanges() error {
|
|||
if err := l.syncNodeInfo(); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
l.logger.Printf("[DEBUG] agent: Node info in sync")
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -654,7 +654,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
|||
|
||||
func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
|
||||
conf := nextConfig()
|
||||
conf.CheckUpdateInterval = 100 * time.Millisecond
|
||||
conf.CheckUpdateInterval = 500 * time.Millisecond
|
||||
dir, agent := makeAgent(t, conf)
|
||||
defer os.RemoveAll(dir)
|
||||
defer agent.Shutdown()
|
||||
|
@ -693,8 +693,8 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
|
|||
// Update the check output! Should be deferred
|
||||
agent.state.UpdateCheck("web", structs.HealthPassing, "output")
|
||||
|
||||
// Should not update for 100 milliseconds
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
// Should not update for 500 milliseconds
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -729,6 +729,112 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
|
|||
}, func(err error) {
|
||||
t.Fatalf("err: %s", err)
|
||||
})
|
||||
|
||||
// Change the output in the catalog to force it out of sync.
|
||||
eCopy := check.Clone()
|
||||
eCopy.Output = "changed"
|
||||
reg := structs.RegisterRequest{
|
||||
Datacenter: agent.config.Datacenter,
|
||||
Node: agent.config.NodeName,
|
||||
Address: agent.config.AdvertiseAddr,
|
||||
TaggedAddresses: agent.config.TaggedAddresses,
|
||||
Check: eCopy,
|
||||
WriteRequest: structs.WriteRequest{},
|
||||
}
|
||||
var out struct{}
|
||||
if err := agent.RPC("Catalog.Register", ®, &out); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Verify that the output is out of sync.
|
||||
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
for _, chk := range checks.HealthChecks {
|
||||
switch chk.CheckID {
|
||||
case "web":
|
||||
if chk.Output != "changed" {
|
||||
t.Fatalf("unexpected update: %v", chk)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Trigger anti-entropy run and wait.
|
||||
agent.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify that the output was synced back to the agent's value.
|
||||
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
for _, chk := range checks.HealthChecks {
|
||||
switch chk.CheckID {
|
||||
case "web":
|
||||
if chk.Output != "output" {
|
||||
t.Fatalf("missed update: %v", chk)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Reset the catalog again.
|
||||
if err := agent.RPC("Catalog.Register", ®, &out); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Verify that the output is out of sync.
|
||||
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
for _, chk := range checks.HealthChecks {
|
||||
switch chk.CheckID {
|
||||
case "web":
|
||||
if chk.Output != "changed" {
|
||||
t.Fatalf("unexpected update: %v", chk)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Now make an update that should be deferred.
|
||||
agent.state.UpdateCheck("web", structs.HealthPassing, "deferred")
|
||||
|
||||
// Trigger anti-entropy run and wait.
|
||||
agent.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify that the output is still out of sync since there's a deferred
|
||||
// update pending.
|
||||
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
for _, chk := range checks.HealthChecks {
|
||||
switch chk.CheckID {
|
||||
case "web":
|
||||
if chk.Output != "changed" {
|
||||
t.Fatalf("unexpected update: %v", chk)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for the deferred update.
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Verify updated
|
||||
for _, chk := range checks.HealthChecks {
|
||||
switch chk.CheckID {
|
||||
case "web":
|
||||
if chk.Output != "deferred" {
|
||||
return false, fmt.Errorf("no update: %v", chk)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %s", err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
|
||||
|
|
|
@ -396,6 +396,13 @@ func (c *HealthCheck) IsSame(other *HealthCheck) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
// Clone returns a distinct clone of the HealthCheck.
|
||||
func (c *HealthCheck) Clone() *HealthCheck {
|
||||
clone := new(HealthCheck)
|
||||
*clone = *c
|
||||
return clone
|
||||
}
|
||||
|
||||
type HealthChecks []*HealthCheck
|
||||
|
||||
// CheckServiceNode is used to provide the node, its service
|
||||
|
|
|
@ -211,6 +211,28 @@ func TestStructs_HealthCheck_IsSame(t *testing.T) {
|
|||
check(&other.ServiceName)
|
||||
}
|
||||
|
||||
func TestStructs_HealthCheck_Clone(t *testing.T) {
|
||||
hc := &HealthCheck{
|
||||
Node: "node1",
|
||||
CheckID: "check1",
|
||||
Name: "thecheck",
|
||||
Status: HealthPassing,
|
||||
Notes: "it's all good",
|
||||
Output: "lgtm",
|
||||
ServiceID: "service1",
|
||||
ServiceName: "theservice",
|
||||
}
|
||||
clone := hc.Clone()
|
||||
if !hc.IsSame(clone) {
|
||||
t.Fatalf("should be equal to its clone")
|
||||
}
|
||||
|
||||
clone.Output = "different"
|
||||
if hc.IsSame(clone) {
|
||||
t.Fatalf("should not longer be equal to its clone")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStructs_CheckServiceNodes_Shuffle(t *testing.T) {
|
||||
// Make a huge list of nodes.
|
||||
var nodes CheckServiceNodes
|
||||
|
|
Loading…
Reference in New Issue