mirror of
https://github.com/status-im/consul.git
synced 2025-02-23 10:58:25 +00:00
Revert "ae: ensure that syncs are blocked when paused"
This reverts commit ffb265dd939cefd7e865d624d0a4ba81f88e4505.
This commit is contained in:
parent
23a9ac9d56
commit
e95d22b9a8
@ -2,10 +2,9 @@
|
|||||||
package ae
|
package ae
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"log"
|
"log"
|
||||||
"math"
|
"math"
|
||||||
"sync"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/lib"
|
"github.com/hashicorp/consul/lib"
|
||||||
@ -76,8 +75,7 @@ type StateSyncer struct {
|
|||||||
SyncChanges *Trigger
|
SyncChanges *Trigger
|
||||||
|
|
||||||
// paused stores whether sync runs are temporarily disabled.
|
// paused stores whether sync runs are temporarily disabled.
|
||||||
pauseLock sync.Mutex
|
paused *toggle
|
||||||
paused bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, logger *log.Logger) *StateSyncer {
|
func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, logger *log.Logger) *StateSyncer {
|
||||||
@ -88,6 +86,7 @@ func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, lo
|
|||||||
Logger: logger,
|
Logger: logger,
|
||||||
SyncFull: NewTrigger(),
|
SyncFull: NewTrigger(),
|
||||||
SyncChanges: NewTrigger(),
|
SyncChanges: NewTrigger(),
|
||||||
|
paused: new(toggle),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -100,8 +99,6 @@ const (
|
|||||||
retryFailIntv = 15 * time.Second
|
retryFailIntv = 15 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
var errPaused = errors.New("paused")
|
|
||||||
|
|
||||||
// Run is the long running method to perform state synchronization
|
// Run is the long running method to perform state synchronization
|
||||||
// between local and remote servers.
|
// between local and remote servers.
|
||||||
func (s *StateSyncer) Run() {
|
func (s *StateSyncer) Run() {
|
||||||
@ -117,13 +114,13 @@ func (s *StateSyncer) Run() {
|
|||||||
FullSync:
|
FullSync:
|
||||||
for {
|
for {
|
||||||
// attempt a full sync
|
// attempt a full sync
|
||||||
err := s.ifNotPausedRun(s.State.SyncFull)
|
if err := s.State.SyncFull(); err != nil {
|
||||||
if err != nil {
|
|
||||||
if err != errPaused {
|
|
||||||
s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
|
s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
|
||||||
}
|
|
||||||
|
|
||||||
|
// retry full sync after some time or when a consul
|
||||||
|
// server was added.
|
||||||
select {
|
select {
|
||||||
|
|
||||||
// trigger a full sync immediately.
|
// trigger a full sync immediately.
|
||||||
// this is usually called when a consul server was added to the cluster.
|
// this is usually called when a consul server was added to the cluster.
|
||||||
// stagger the delay to avoid a thundering herd.
|
// stagger the delay to avoid a thundering herd.
|
||||||
@ -165,8 +162,10 @@ FullSync:
|
|||||||
|
|
||||||
// do partial syncs on demand
|
// do partial syncs on demand
|
||||||
case <-s.SyncChanges.Notif():
|
case <-s.SyncChanges.Notif():
|
||||||
err := s.ifNotPausedRun(s.State.SyncChanges)
|
if s.Paused() {
|
||||||
if err != nil && err != errPaused {
|
continue
|
||||||
|
}
|
||||||
|
if err := s.State.SyncChanges(); err != nil {
|
||||||
s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err)
|
s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -177,39 +176,40 @@ FullSync:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StateSyncer) ifNotPausedRun(f func() error) error {
|
|
||||||
s.pauseLock.Lock()
|
|
||||||
defer s.pauseLock.Unlock()
|
|
||||||
if s.paused {
|
|
||||||
return errPaused
|
|
||||||
}
|
|
||||||
return f()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pause temporarily disables sync runs.
|
// Pause temporarily disables sync runs.
|
||||||
func (s *StateSyncer) Pause() {
|
func (s *StateSyncer) Pause() {
|
||||||
s.pauseLock.Lock()
|
s.paused.On()
|
||||||
if s.paused {
|
|
||||||
panic("pause while paused")
|
|
||||||
}
|
|
||||||
s.paused = true
|
|
||||||
s.pauseLock.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Paused returns whether sync runs are temporarily disabled.
|
// Paused returns whether sync runs are temporarily disabled.
|
||||||
func (s *StateSyncer) Paused() bool {
|
func (s *StateSyncer) Paused() bool {
|
||||||
s.pauseLock.Lock()
|
return s.paused.IsOn()
|
||||||
defer s.pauseLock.Unlock()
|
|
||||||
return s.paused
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Resume re-enables sync runs.
|
// Resume re-enables sync runs.
|
||||||
func (s *StateSyncer) Resume() {
|
func (s *StateSyncer) Resume() {
|
||||||
s.pauseLock.Lock()
|
s.paused.Off()
|
||||||
if !s.paused {
|
|
||||||
panic("resume while not paused")
|
|
||||||
}
|
|
||||||
s.paused = false
|
|
||||||
s.pauseLock.Unlock()
|
|
||||||
s.SyncChanges.Trigger()
|
s.SyncChanges.Trigger()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// toggle implements an on/off switch using methods from the atomic
|
||||||
|
// package. Since fields in structs that are accessed via
|
||||||
|
// atomic.Load/Add methods need to be aligned properly on some platforms
|
||||||
|
// we move that code into a separate struct.
|
||||||
|
//
|
||||||
|
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for details
|
||||||
|
type toggle int32
|
||||||
|
|
||||||
|
func (p *toggle) On() {
|
||||||
|
atomic.AddInt32((*int32)(p), 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *toggle) Off() {
|
||||||
|
if atomic.AddInt32((*int32)(p), -1) < 0 {
|
||||||
|
panic("toggle not on")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *toggle) IsOn() bool {
|
||||||
|
return atomic.LoadInt32((*int32)(p)) > 0
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user