mirror of
https://github.com/status-im/consul.git
synced 2025-01-25 13:10:32 +00:00
agent: Defer sync based on CheckUpdateInterval
This commit is contained in:
parent
1b53664879
commit
500bb3931b
@ -20,6 +20,7 @@ const (
|
||||
type syncStatus struct {
|
||||
remoteDelete bool // Should this be deleted from the server
|
||||
inSync bool // Is this in sync with the server
|
||||
deferSync *time.Timer // Defer sync until this time
|
||||
}
|
||||
|
||||
// localState is used to represent the node's services,
|
||||
@ -191,6 +192,25 @@ func (l *localState) UpdateCheck(checkID, status, output string) {
|
||||
return
|
||||
}
|
||||
|
||||
// Defer a sync if the output has changed. This is an optimization around
|
||||
// frequent updates of output. Instead, we update the output internally,
|
||||
// and periodically do a write-back to the servers. If there is a status
|
||||
// change we do the write immediately.
|
||||
if l.config.CheckUpdateInterval > 0 && check.Status == status {
|
||||
check.Output = output
|
||||
status := l.checkStatus[checkID]
|
||||
if status.deferSync == nil && status.inSync {
|
||||
deferSync := time.AfterFunc(l.config.CheckUpdateInterval, func() {
|
||||
l.Lock()
|
||||
l.checkStatus[checkID] = syncStatus{inSync: false}
|
||||
l.changeMade()
|
||||
l.Unlock()
|
||||
})
|
||||
l.checkStatus[checkID] = syncStatus{deferSync: deferSync}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Update status and mark out of sync
|
||||
check.Status = status
|
||||
check.Output = output
|
||||
|
@ -252,3 +252,77 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
|
||||
conf := nextConfig()
|
||||
conf.CheckUpdateInterval = 100 * time.Millisecond
|
||||
dir, agent := makeAgent(t, conf)
|
||||
defer os.RemoveAll(dir)
|
||||
defer agent.Shutdown()
|
||||
|
||||
testutil.WaitForLeader(t, agent.RPC, "dc1")
|
||||
|
||||
// Create a check
|
||||
check := &structs.HealthCheck{
|
||||
Node: agent.config.NodeName,
|
||||
CheckID: "web",
|
||||
Name: "web",
|
||||
Status: structs.HealthPassing,
|
||||
Output: "",
|
||||
}
|
||||
agent.state.AddCheck(check)
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
agent.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify that we are in sync
|
||||
req := structs.NodeSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: agent.config.NodeName,
|
||||
}
|
||||
var checks structs.IndexedHealthChecks
|
||||
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Verify checks in place
|
||||
if len(checks.HealthChecks) != 2 {
|
||||
t.Fatalf("checks: %v", check)
|
||||
}
|
||||
|
||||
// Update the check output! Should be defered
|
||||
agent.state.UpdateCheck("web", structs.HealthPassing, "output")
|
||||
|
||||
// Should not update for 100 milliseconds
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Verify not updated
|
||||
for _, chk := range checks.HealthChecks {
|
||||
switch chk.CheckID {
|
||||
case "web":
|
||||
if chk.Output != "" {
|
||||
t.Fatalf("early update: %v", chk)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for a defered update
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Verify not updated
|
||||
for _, chk := range checks.HealthChecks {
|
||||
switch chk.CheckID {
|
||||
case "web":
|
||||
if chk.Output != "output" {
|
||||
t.Fatalf("no update: %v", chk)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user