From b4e7d0b97443c21070e0cd364713fc717a71f45d Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Mon, 23 Oct 2017 10:08:35 +0200 Subject: [PATCH] Revert "agent: cleanup StateSyncer" This reverts commit b7136e100bef727144c202ae55f81152ac6a8b5f. --- agent/ae/ae.go | 203 ++++++++++++++++++-------------------------- agent/ae/ae_test.go | 29 +++---- agent/agent.go | 6 +- 3 files changed, 98 insertions(+), 140 deletions(-) diff --git a/agent/ae/ae.go b/agent/ae/ae.go index 78bf93e251..d5f56c37a9 100644 --- a/agent/ae/ae.go +++ b/agent/ae/ae.go @@ -1,4 +1,4 @@ -// Package ae provides tools to synchronize state between local and remote consul servers. +// Package ae provides an anti-entropy mechanism for the local state. package ae import ( @@ -10,43 +10,35 @@ import ( "github.com/hashicorp/consul/lib" ) -// scaleThreshold is the number of nodes after which regular sync runs are -// spread out farther apart. The value should be a power of 2 since the -// scale function uses log2. -// -// When set to 128 nodes the delay between regular runs is doubled when the -// cluster is larger than 128 nodes. It doubles again when it passes 256 -// nodes, and again at 512 nodes and so forth. At 8192 nodes, the delay -// factor is 8. -// -// If you update this, you may need to adjust the tuning of -// CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize. -const scaleThreshold = 128 +const ( + // This scale factor means we will add a minute after we cross 128 nodes, + // another at 256, another at 512, etc. By 8192 nodes, we will scale up + // by a factor of 8. + // + // If you update this, you may need to adjust the tuning of + // CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize. + aeScaleThreshold = 128 -// scaleFactor returns a factor by which the next sync run should be delayed to -// avoid saturation of the cluster. The larger the cluster grows the farther -// the sync runs should be spread apart. -// -// The current implementation uses a log2 scale which doubles the delay between -// runs every time the cluster doubles in size. -func scaleFactor(nodes int) int { - if nodes <= scaleThreshold { - return 1.0 + syncStaggerIntv = 3 * time.Second + syncRetryIntv = 15 * time.Second +) + +// aeScale is used to scale the time interval at which anti-entropy updates take +// place. It is used to prevent saturation as the cluster size grows. +func aeScale(d time.Duration, n int) time.Duration { + // Don't scale until we cross the threshold + if n <= aeScaleThreshold { + return d } - return int(math.Ceil(math.Log2(float64(nodes))-math.Log2(float64(scaleThreshold))) + 1.0) + + mult := math.Ceil(math.Log2(float64(n))-math.Log2(aeScaleThreshold)) + 1.0 + return time.Duration(mult) * d } -// StateSyncer manages background synchronization of the given state. -// -// The state is synchronized on a regular basis or on demand when either -// the state has changed or a new Consul server has joined the cluster. -// -// The regular state sychronization provides a self-healing mechanism -// for the cluster which is also called anti-entropy. type StateSyncer struct { - // paused flags whether sync runs are temporarily disabled. - // Must be the first element due to a go bug. - // todo(fs): which bug? Is this still relevant? + // paused is used to check if we are paused. Must be the first + // element due to a go bug. + // todo(fs): which bug? still relevant? paused int32 // State contains the data that needs to be synchronized. @@ -55,18 +47,18 @@ type StateSyncer struct { SyncChanges() error } - // Interval is the time between two regular sync runs. + // Interval is the time between two sync runs. Interval time.Duration - // ClusterSize returns the number of members in the cluster to - // allow staggering the sync runs based on cluster size. + // ClusterSize returns the number of members in the cluster. + // todo(fs): we use this for staggering but what about a random number? ClusterSize func() int // ShutdownCh is closed when the application is shutting down. ShutdownCh chan struct{} - // ServerUpCh contains data when a new consul server has been added to the cluster. - ServerUpCh chan struct{} + // ConsulCh contains data when a new consul server has been added to the cluster. + ConsulCh chan struct{} // TriggerCh contains data when a sync should run immediately. TriggerCh chan struct{} @@ -74,112 +66,81 @@ type StateSyncer struct { Logger *log.Logger } -const ( - // serverUpIntv is the max time to wait before a sync is triggered - // when a consul server has been added to the cluster. - serverUpIntv = 3 * time.Second +// Pause is used to pause state synchronization, this can be +// used to make batch changes +func (ae *StateSyncer) Pause() { + atomic.AddInt32(&ae.paused, 1) +} - // retryFailIntv is the min time to wait before a failed sync is retried. - retryFailIntv = 15 * time.Second -) - -// Run is the long running method to perform state synchronization -// between local and remote servers. -func (s *StateSyncer) Run() { - stagger := func(d time.Duration) time.Duration { - f := scaleFactor(s.ClusterSize()) - return lib.RandomStagger(time.Duration(f) * d) +// Resume is used to resume state synchronization +func (ae *StateSyncer) Resume() { + paused := atomic.AddInt32(&ae.paused, -1) + if paused < 0 { + panic("unbalanced State.Resume() detected") } + ae.changeMade() +} -Sync: +// Paused is used to check if we are paused +func (ae *StateSyncer) Paused() bool { + return atomic.LoadInt32(&ae.paused) > 0 +} + +func (ae *StateSyncer) changeMade() { + select { + case ae.TriggerCh <- struct{}{}: + default: + } +} + +// antiEntropy is a long running method used to perform anti-entropy +// between local and remote state. +func (ae *StateSyncer) Run() { +SYNC: + // Sync our state with the servers for { - // update the sync status - err := s.State.UpdateSyncState() + err := ae.State.UpdateSyncState() if err == nil { break } - - s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err) - - // retry updating sync status after some time or when a consul - // server was added. + ae.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err) select { - - // consul server added to cluster. - // retry sooner than retryFailIntv to converge cluster quicker - // but stagger delay to avoid thundering herd - case <-s.ServerUpCh: + case <-ae.ConsulCh: + // Stagger the retry on leader election, avoid a thundering heard select { - case <-time.After(stagger(serverUpIntv)): - case <-s.ShutdownCh: + case <-time.After(lib.RandomStagger(aeScale(syncStaggerIntv, ae.ClusterSize()))): + case <-ae.ShutdownCh: return } - - // retry full sync after some time - // todo(fs): why don't we use s.Interval here? - case <-time.After(retryFailIntv + stagger(retryFailIntv)): - - case <-s.ShutdownCh: + case <-time.After(syncRetryIntv + lib.RandomStagger(aeScale(syncRetryIntv, ae.ClusterSize()))): + case <-ae.ShutdownCh: return } } - // Force-trigger sync to pickup any changes - s.triggerSync() + // Force-trigger AE to pickup any changes + ae.changeMade() + + // Schedule the next full sync, with a random stagger + aeIntv := aeScale(ae.Interval, ae.ClusterSize()) + aeIntv = aeIntv + lib.RandomStagger(aeIntv) + aeTimer := time.After(aeIntv) // Wait for sync events for { select { - // todo(fs): why don't we honor the ServerUpCh here as well? - // todo(fs): by default, s.Interval is 60s which is >> 3s (serverUpIntv) - // case <-s.ServerUpCh: - // select { - // case <-time.After(stagger(serverUpIntv)): - // continue Sync - // case <-s.ShutdownCh: - // return - // } - - case <-time.After(s.Interval + stagger(s.Interval)): - goto Sync - - case <-s.TriggerCh: - if s.Paused() { + case <-aeTimer: + goto SYNC + case <-ae.TriggerCh: + // Skip the sync if we are paused + if ae.Paused() { continue } - if err := s.State.SyncChanges(); err != nil { - s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err) + if err := ae.State.SyncChanges(); err != nil { + ae.Logger.Printf("[ERR] agent: failed to sync changes: %v", err) } - - case <-s.ShutdownCh: + case <-ae.ShutdownCh: return } } } - -// Pause temporarily disables sync runs. -func (s *StateSyncer) Pause() { - atomic.AddInt32(&s.paused, 1) -} - -// Paused returns whether sync runs are temporarily disabled. -func (s *StateSyncer) Paused() bool { - return atomic.LoadInt32(&s.paused) > 0 -} - -// Resume re-enables sync runs. -func (s *StateSyncer) Resume() { - paused := atomic.AddInt32(&s.paused, -1) - if paused < 0 { - panic("unbalanced StateSyncer.Resume() detected") - } - s.triggerSync() -} - -// triggerSync queues a sync run if one has not been triggered already. -func (s *StateSyncer) triggerSync() { - select { - case s.TriggerCh <- struct{}{}: - default: - } -} diff --git a/agent/ae/ae_test.go b/agent/ae/ae_test.go index 7246e7e24a..5d91f2f9ba 100644 --- a/agent/ae/ae_test.go +++ b/agent/ae/ae_test.go @@ -1,27 +1,24 @@ package ae import ( - "fmt" "testing" + "time" ) -func TestAE_scaleFactor(t *testing.T) { +func TestAE_scale(t *testing.T) { t.Parallel() - tests := []struct { - nodes int - scale int - }{ - {100, 1}, - {200, 2}, - {1000, 4}, - {10000, 8}, + intv := time.Minute + if v := aeScale(intv, 100); v != intv { + t.Fatalf("Bad: %v", v) } - for _, tt := range tests { - t.Run(fmt.Sprintf("%d nodes", tt.nodes), func(t *testing.T) { - if got, want := scaleFactor(tt.nodes), tt.scale; got != want { - t.Fatalf("got scale factor %d want %d", got, want) - } - }) + if v := aeScale(intv, 200); v != 2*intv { + t.Fatalf("Bad: %v", v) + } + if v := aeScale(intv, 1000); v != 4*intv { + t.Fatalf("Bad: %v", v) + } + if v := aeScale(intv, 10000); v != 8*intv { + t.Fatalf("Bad: %v", v) } } diff --git a/agent/agent.go b/agent/agent.go index ddd438a6b3..1238359f3a 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -248,7 +248,7 @@ func (a *Agent) Start() error { // create a notif channel to trigger state sychronizations // when a consul server was added to the cluster. - serverUpCh := make(chan struct{}, 1) + consulCh := make(chan struct{}, 1) // create a notif channel to trigger state synchronizations // when the state has changed. @@ -263,7 +263,7 @@ func (a *Agent) Start() error { State: a.state, Interval: c.AEInterval, ShutdownCh: a.shutdownCh, - ServerUpCh: serverUpCh, + ConsulCh: consulCh, TriggerCh: triggerCh, Logger: a.logger, } @@ -280,7 +280,7 @@ func (a *Agent) Start() error { // todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer consulCfg.ServerUp = func() { select { - case serverUpCh <- struct{}{}: + case consulCh <- struct{}{}: default: } }