make Pause()/Resume()/isPaused() behave more like a semaphore

see: https://github.com/hashicorp/consul/issues/1173 #1173

Reasoning: somewhere during consul development Pause()/Resume() and
PauseSync()/ResumeSync() were added to protect larger changes to
agent's localState.  A few of the places that it tries to protect are:

- (a *Agent) AddService(...)      # part of the method
- (c *Command) handleReload(...)  # almost the whole method
- (l *localState) antiEntropy(...)# isPaused() prevents syncChanges()

The main problem is, that in the middle of handleReload(...)'s
critical section it indirectly (loadServices()) calls  AddService(...).
AddService() in turn calls Pause() to protect itself against
syncChanges(). At the end of AddService() a defered call to Resume() is
made.

With the current implementation, this releases
isPaused() "lock" in the middle of handleReload() allowing antiEntropy
to kick in while configuration reload is still in progress.
Specifically almost all services and probably all check are unloaded
when syncChanges() is allowed to run.

This in turn can causes massive service/check de-/re-registration,
and since checks are by default registered in the critical state,
a majority of services on a node can be marked as failing.
It's made worse with automation, often calling `consul reload` in close
proximity on many nodes in the cluster.

This change basically turns Pause()/Resume() into P()/V() of
a garden-variety semaphore. Allowing Pause() to be called multiple times,
and releasing isPaused() only after all matching/defered Resumes() are
called as well.

TODO/NOTE: as with many semaphore implementations, it might be reasonable
to panic() if l.paused ever becomes negative.
This commit is contained in:
Wojciech Bederski 2015-09-11 18:28:06 +02:00
parent 24bc17eaa1
commit b014c0f91b

View File

@ -109,18 +109,18 @@ func (l *localState) ConsulServerUp() {
// Pause is used to pause state synchronization, this can be // Pause is used to pause state synchronization, this can be
// used to make batch changes // used to make batch changes
func (l *localState) Pause() { func (l *localState) Pause() {
atomic.StoreInt32(&l.paused, 1) atomic.AddInt32(&l.paused, 1)
} }
// Resume is used to resume state synchronization // Resume is used to resume state synchronization
func (l *localState) Resume() { func (l *localState) Resume() {
atomic.StoreInt32(&l.paused, 0) atomic.AddInt32(&l.paused, -1)
l.changeMade() l.changeMade()
} }
// isPaused is used to check if we are paused // isPaused is used to check if we are paused
func (l *localState) isPaused() bool { func (l *localState) isPaused() bool {
return atomic.LoadInt32(&l.paused) == 1 return atomic.LoadInt32(&l.paused) > 0
} }
// ServiceToken returns the configured ACL token for the given // ServiceToken returns the configured ACL token for the given