diff --git a/agent/ae/ae.go b/agent/ae/ae.go index 4ec72ed469..6572d815bc 100644 --- a/agent/ae/ae.go +++ b/agent/ae/ae.go @@ -2,10 +2,9 @@ package ae import ( - "errors" "log" "math" - "sync" + "sync/atomic" "time" "github.com/hashicorp/consul/lib" @@ -76,8 +75,7 @@ type StateSyncer struct { SyncChanges *Trigger // paused stores whether sync runs are temporarily disabled. - pauseLock sync.Mutex - paused bool + paused *toggle } func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, logger *log.Logger) *StateSyncer { @@ -88,6 +86,7 @@ func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, lo Logger: logger, SyncFull: NewTrigger(), SyncChanges: NewTrigger(), + paused: new(toggle), } } @@ -100,8 +99,6 @@ const ( retryFailIntv = 15 * time.Second ) -var errPaused = errors.New("paused") - // Run is the long running method to perform state synchronization // between local and remote servers. func (s *StateSyncer) Run() { @@ -117,13 +114,13 @@ func (s *StateSyncer) Run() { FullSync: for { // attempt a full sync - err := s.ifNotPausedRun(s.State.SyncFull) - if err != nil { - if err != errPaused { - s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err) - } + if err := s.State.SyncFull(); err != nil { + s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err) + // retry full sync after some time or when a consul + // server was added. select { + // trigger a full sync immediately. // this is usually called when a consul server was added to the cluster. // stagger the delay to avoid a thundering herd. @@ -165,8 +162,10 @@ FullSync: // do partial syncs on demand case <-s.SyncChanges.Notif(): - err := s.ifNotPausedRun(s.State.SyncChanges) - if err != nil && err != errPaused { + if s.Paused() { + continue + } + if err := s.State.SyncChanges(); err != nil { s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err) } @@ -177,39 +176,40 @@ FullSync: } } -func (s *StateSyncer) ifNotPausedRun(f func() error) error { - s.pauseLock.Lock() - defer s.pauseLock.Unlock() - if s.paused { - return errPaused - } - return f() -} - // Pause temporarily disables sync runs. func (s *StateSyncer) Pause() { - s.pauseLock.Lock() - if s.paused { - panic("pause while paused") - } - s.paused = true - s.pauseLock.Unlock() + s.paused.On() } // Paused returns whether sync runs are temporarily disabled. func (s *StateSyncer) Paused() bool { - s.pauseLock.Lock() - defer s.pauseLock.Unlock() - return s.paused + return s.paused.IsOn() } // Resume re-enables sync runs. func (s *StateSyncer) Resume() { - s.pauseLock.Lock() - if !s.paused { - panic("resume while not paused") - } - s.paused = false - s.pauseLock.Unlock() + s.paused.Off() s.SyncChanges.Trigger() } + +// toggle implements an on/off switch using methods from the atomic +// package. Since fields in structs that are accessed via +// atomic.Load/Add methods need to be aligned properly on some platforms +// we move that code into a separate struct. +// +// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for details +type toggle int32 + +func (p *toggle) On() { + atomic.AddInt32((*int32)(p), 1) +} + +func (p *toggle) Off() { + if atomic.AddInt32((*int32)(p), -1) < 0 { + panic("toggle not on") + } +} + +func (p *toggle) IsOn() bool { + return atomic.LoadInt32((*int32)(p)) > 0 +}