agent: cleanup StateSyncer

This patch cleans up the state syncer code by renaming fields, adding
helpers and documentation.
This commit is contained in:
Frank Schroeder 2017-08-28 14:17:10 +02:00 committed by Frank Schröder
parent a842dc9c2b
commit b7136e100b
3 changed files with 140 additions and 98 deletions

View File

@ -1,4 +1,4 @@
// Package ae provides an anti-entropy mechanism for the local state. // Package ae provides tools to synchronize state between local and remote consul servers.
package ae package ae
import ( import (
@ -10,35 +10,43 @@ import (
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
) )
const ( // scaleThreshold is the number of nodes after which regular sync runs are
// This scale factor means we will add a minute after we cross 128 nodes, // spread out farther apart. The value should be a power of 2 since the
// another at 256, another at 512, etc. By 8192 nodes, we will scale up // scale function uses log2.
// by a factor of 8. //
// // When set to 128 nodes the delay between regular runs is doubled when the
// If you update this, you may need to adjust the tuning of // cluster is larger than 128 nodes. It doubles again when it passes 256
// CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize. // nodes, and again at 512 nodes and so forth. At 8192 nodes, the delay
aeScaleThreshold = 128 // factor is 8.
//
// If you update this, you may need to adjust the tuning of
// CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize.
const scaleThreshold = 128
syncStaggerIntv = 3 * time.Second // scaleFactor returns a factor by which the next sync run should be delayed to
syncRetryIntv = 15 * time.Second // avoid saturation of the cluster. The larger the cluster grows the farther
) // the sync runs should be spread apart.
//
// aeScale is used to scale the time interval at which anti-entropy updates take // The current implementation uses a log2 scale which doubles the delay between
// place. It is used to prevent saturation as the cluster size grows. // runs every time the cluster doubles in size.
func aeScale(d time.Duration, n int) time.Duration { func scaleFactor(nodes int) int {
// Don't scale until we cross the threshold if nodes <= scaleThreshold {
if n <= aeScaleThreshold { return 1.0
return d
} }
return int(math.Ceil(math.Log2(float64(nodes))-math.Log2(float64(scaleThreshold))) + 1.0)
mult := math.Ceil(math.Log2(float64(n))-math.Log2(aeScaleThreshold)) + 1.0
return time.Duration(mult) * d
} }
// StateSyncer manages background synchronization of the given state.
//
// The state is synchronized on a regular basis or on demand when either
// the state has changed or a new Consul server has joined the cluster.
//
// The regular state sychronization provides a self-healing mechanism
// for the cluster which is also called anti-entropy.
type StateSyncer struct { type StateSyncer struct {
// paused is used to check if we are paused. Must be the first // paused flags whether sync runs are temporarily disabled.
// element due to a go bug. // Must be the first element due to a go bug.
// todo(fs): which bug? still relevant? // todo(fs): which bug? Is this still relevant?
paused int32 paused int32
// State contains the data that needs to be synchronized. // State contains the data that needs to be synchronized.
@ -47,18 +55,18 @@ type StateSyncer struct {
SyncChanges() error SyncChanges() error
} }
// Interval is the time between two sync runs. // Interval is the time between two regular sync runs.
Interval time.Duration Interval time.Duration
// ClusterSize returns the number of members in the cluster. // ClusterSize returns the number of members in the cluster to
// todo(fs): we use this for staggering but what about a random number? // allow staggering the sync runs based on cluster size.
ClusterSize func() int ClusterSize func() int
// ShutdownCh is closed when the application is shutting down. // ShutdownCh is closed when the application is shutting down.
ShutdownCh chan struct{} ShutdownCh chan struct{}
// ConsulCh contains data when a new consul server has been added to the cluster. // ServerUpCh contains data when a new consul server has been added to the cluster.
ConsulCh chan struct{} ServerUpCh chan struct{}
// TriggerCh contains data when a sync should run immediately. // TriggerCh contains data when a sync should run immediately.
TriggerCh chan struct{} TriggerCh chan struct{}
@ -66,81 +74,112 @@ type StateSyncer struct {
Logger *log.Logger Logger *log.Logger
} }
// Pause is used to pause state synchronization, this can be const (
// used to make batch changes // serverUpIntv is the max time to wait before a sync is triggered
func (ae *StateSyncer) Pause() { // when a consul server has been added to the cluster.
atomic.AddInt32(&ae.paused, 1) serverUpIntv = 3 * time.Second
}
// Resume is used to resume state synchronization // retryFailIntv is the min time to wait before a failed sync is retried.
func (ae *StateSyncer) Resume() { retryFailIntv = 15 * time.Second
paused := atomic.AddInt32(&ae.paused, -1) )
if paused < 0 {
panic("unbalanced State.Resume() detected") // Run is the long running method to perform state synchronization
// between local and remote servers.
func (s *StateSyncer) Run() {
stagger := func(d time.Duration) time.Duration {
f := scaleFactor(s.ClusterSize())
return lib.RandomStagger(time.Duration(f) * d)
} }
ae.changeMade()
}
// Paused is used to check if we are paused Sync:
func (ae *StateSyncer) Paused() bool {
return atomic.LoadInt32(&ae.paused) > 0
}
func (ae *StateSyncer) changeMade() {
select {
case ae.TriggerCh <- struct{}{}:
default:
}
}
// antiEntropy is a long running method used to perform anti-entropy
// between local and remote state.
func (ae *StateSyncer) Run() {
SYNC:
// Sync our state with the servers
for { for {
err := ae.State.UpdateSyncState() // update the sync status
err := s.State.UpdateSyncState()
if err == nil { if err == nil {
break break
} }
ae.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
// retry updating sync status after some time or when a consul
// server was added.
select { select {
case <-ae.ConsulCh:
// Stagger the retry on leader election, avoid a thundering heard // consul server added to cluster.
// retry sooner than retryFailIntv to converge cluster quicker
// but stagger delay to avoid thundering herd
case <-s.ServerUpCh:
select { select {
case <-time.After(lib.RandomStagger(aeScale(syncStaggerIntv, ae.ClusterSize()))): case <-time.After(stagger(serverUpIntv)):
case <-ae.ShutdownCh: case <-s.ShutdownCh:
return return
} }
case <-time.After(syncRetryIntv + lib.RandomStagger(aeScale(syncRetryIntv, ae.ClusterSize()))):
case <-ae.ShutdownCh: // retry full sync after some time
// todo(fs): why don't we use s.Interval here?
case <-time.After(retryFailIntv + stagger(retryFailIntv)):
case <-s.ShutdownCh:
return return
} }
} }
// Force-trigger AE to pickup any changes // Force-trigger sync to pickup any changes
ae.changeMade() s.triggerSync()
// Schedule the next full sync, with a random stagger
aeIntv := aeScale(ae.Interval, ae.ClusterSize())
aeIntv = aeIntv + lib.RandomStagger(aeIntv)
aeTimer := time.After(aeIntv)
// Wait for sync events // Wait for sync events
for { for {
select { select {
case <-aeTimer: // todo(fs): why don't we honor the ServerUpCh here as well?
goto SYNC // todo(fs): by default, s.Interval is 60s which is >> 3s (serverUpIntv)
case <-ae.TriggerCh: // case <-s.ServerUpCh:
// Skip the sync if we are paused // select {
if ae.Paused() { // case <-time.After(stagger(serverUpIntv)):
// continue Sync
// case <-s.ShutdownCh:
// return
// }
case <-time.After(s.Interval + stagger(s.Interval)):
goto Sync
case <-s.TriggerCh:
if s.Paused() {
continue continue
} }
if err := ae.State.SyncChanges(); err != nil { if err := s.State.SyncChanges(); err != nil {
ae.Logger.Printf("[ERR] agent: failed to sync changes: %v", err) s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err)
} }
case <-ae.ShutdownCh:
case <-s.ShutdownCh:
return return
} }
} }
} }
// Pause temporarily disables sync runs.
func (s *StateSyncer) Pause() {
atomic.AddInt32(&s.paused, 1)
}
// Paused returns whether sync runs are temporarily disabled.
func (s *StateSyncer) Paused() bool {
return atomic.LoadInt32(&s.paused) > 0
}
// Resume re-enables sync runs.
func (s *StateSyncer) Resume() {
paused := atomic.AddInt32(&s.paused, -1)
if paused < 0 {
panic("unbalanced StateSyncer.Resume() detected")
}
s.triggerSync()
}
// triggerSync queues a sync run if one has not been triggered already.
func (s *StateSyncer) triggerSync() {
select {
case s.TriggerCh <- struct{}{}:
default:
}
}

View File

@ -1,24 +1,27 @@
package ae package ae
import ( import (
"fmt"
"testing" "testing"
"time"
) )
func TestAE_scale(t *testing.T) { func TestAE_scaleFactor(t *testing.T) {
t.Parallel() t.Parallel()
intv := time.Minute tests := []struct {
if v := aeScale(intv, 100); v != intv { nodes int
t.Fatalf("Bad: %v", v) scale int
}{
{100, 1},
{200, 2},
{1000, 4},
{10000, 8},
} }
if v := aeScale(intv, 200); v != 2*intv { for _, tt := range tests {
t.Fatalf("Bad: %v", v) t.Run(fmt.Sprintf("%d nodes", tt.nodes), func(t *testing.T) {
if got, want := scaleFactor(tt.nodes), tt.scale; got != want {
t.Fatalf("got scale factor %d want %d", got, want)
} }
if v := aeScale(intv, 1000); v != 4*intv { })
t.Fatalf("Bad: %v", v)
}
if v := aeScale(intv, 10000); v != 8*intv {
t.Fatalf("Bad: %v", v)
} }
} }

View File

@ -248,7 +248,7 @@ func (a *Agent) Start() error {
// create a notif channel to trigger state sychronizations // create a notif channel to trigger state sychronizations
// when a consul server was added to the cluster. // when a consul server was added to the cluster.
consulCh := make(chan struct{}, 1) serverUpCh := make(chan struct{}, 1)
// create a notif channel to trigger state synchronizations // create a notif channel to trigger state synchronizations
// when the state has changed. // when the state has changed.
@ -263,7 +263,7 @@ func (a *Agent) Start() error {
State: a.state, State: a.state,
Interval: c.AEInterval, Interval: c.AEInterval,
ShutdownCh: a.shutdownCh, ShutdownCh: a.shutdownCh,
ConsulCh: consulCh, ServerUpCh: serverUpCh,
TriggerCh: triggerCh, TriggerCh: triggerCh,
Logger: a.logger, Logger: a.logger,
} }
@ -280,7 +280,7 @@ func (a *Agent) Start() error {
// todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer // todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer
consulCfg.ServerUp = func() { consulCfg.ServerUp = func() {
select { select {
case consulCh <- struct{}{}: case serverUpCh <- struct{}{}:
default: default:
} }
} }