consul: Disable tombstones as follower

This commit is contained in:
Armon Dadgar 2015-01-05 14:58:59 -08:00
parent ce6cbab397
commit d842b6da74
3 changed files with 57 additions and 21 deletions

View File

@ -50,16 +50,15 @@ func (s *Server) monitorLeadership() {
// leaderLoop runs as long as we are the leader to run various
// maintence activities
func (s *Server) leaderLoop(stopCh chan struct{}) {
// Ensure we revoke leadership on stepdown
defer s.revokeLeadership()
// Fire a user event indicating a new leader
payload := []byte(s.config.NodeName)
if err := s.serfLAN.UserEvent(newLeaderEvent, payload, false); err != nil {
s.logger.Printf("[WARN] consul: failed to broadcast new leader event: %v", err)
}
// Clear the session timers on either shutdown or step down, since we
// are no longer responsible for session expirations.
defer s.clearAllSessionTimers()
// Reconcile channel is only used once initial reconcile
// has succeeded
var reconcileCh chan serf.Member
@ -126,7 +125,7 @@ func (s *Server) establishLeadership() error {
// Hint the tombstone expiration timer. When we freshly establish leadership
// we become the authoritative timer, and so we need to start the clock
// on any pending GC events.
s.tombstoneGC.Reset()
s.tombstoneGC.SetEnabled(true)
lastIndex := s.raft.LastIndex()
s.tombstoneGC.Hint(lastIndex)
s.logger.Printf("[DEBUG] consul: reset tombstone GC to index %d", lastIndex)
@ -154,6 +153,21 @@ func (s *Server) establishLeadership() error {
return nil
}
// revokeLeadership is invoked once we step down as leader.
// This is used to cleanup any state that may be specific to a leader.
func (s *Server) revokeLeadership() error {
// Disable the tombstone GC, since it is only useful as a leader
s.tombstoneGC.SetEnabled(false)
// Clear the session timers on either shutdown or step down, since we
// are no longer responsible for session expirations.
if err := s.clearAllSessionTimers(); err != nil {
s.logger.Printf("[ERR] consul: Clearing session timers failed: %v", err)
return err
}
return nil
}
// initializeACL is used to setup the ACLs if we are the leader
// and need to do this.
func (s *Server) initializeACL() error {

View File

@ -23,13 +23,18 @@ type TombstoneGC struct {
ttl time.Duration
granularity time.Duration
// enabled controls if we actually setup any timers.
enabled bool
// expires maps the time of expiration to the highest
// tombstone value that should be expired.
expires map[time.Time]*expireInterval
expiresLock sync.Mutex
expires map[time.Time]*expireInterval
// expireCh is used to stream expiration
expireCh chan uint64
// lock is used to ensure safe access to all the fields
lock sync.Mutex
}
// expireInterval is used to track the maximum index
@ -53,6 +58,7 @@ func NewTombstoneGC(ttl, granularity time.Duration) (*TombstoneGC, error) {
t := &TombstoneGC{
ttl: ttl,
granularity: granularity,
enabled: false,
expires: make(map[time.Time]*expireInterval),
expireCh: make(chan uint64, 1),
}
@ -65,14 +71,25 @@ func (t *TombstoneGC) ExpireCh() <-chan uint64 {
return t.expireCh
}
// Reset is used to clear the TTL timers
func (t *TombstoneGC) Reset() {
t.expiresLock.Lock()
defer t.expiresLock.Unlock()
for _, exp := range t.expires {
exp.timer.Stop()
// SetEnabled is used to control if the tombstone GC is
// enabled. Should only be enabled by the leader node.
func (t *TombstoneGC) SetEnabled(enabled bool) {
t.lock.Lock()
defer t.lock.Unlock()
if enabled == t.enabled {
return
}
t.expires = make(map[time.Time]*expireInterval)
// Stop all the timers and clear
if !enabled {
for _, exp := range t.expires {
exp.timer.Stop()
}
t.expires = make(map[time.Time]*expireInterval)
}
// Update the status
t.enabled = enabled
}
// Hint is used to indicate that keys at the given index have been
@ -80,8 +97,11 @@ func (t *TombstoneGC) Reset() {
func (t *TombstoneGC) Hint(index uint64) {
expires := t.nextExpires()
t.expiresLock.Lock()
defer t.expiresLock.Unlock()
t.lock.Lock()
defer t.lock.Unlock()
if !t.enabled {
return
}
// Check for an existing expiration timer
exp, ok := t.expires[expires]
@ -104,8 +124,8 @@ func (t *TombstoneGC) Hint(index uint64) {
// PendingExpiration is used to check if any expirations are pending
func (t *TombstoneGC) PendingExpiration() bool {
t.expiresLock.Lock()
defer t.expiresLock.Unlock()
t.lock.Lock()
defer t.lock.Unlock()
return len(t.expires) > 0
}
@ -120,10 +140,10 @@ func (t *TombstoneGC) nextExpires() time.Time {
// expireTime is used to expire the entries at the given time
func (t *TombstoneGC) expireTime(expires time.Time) {
// Get the maximum index and clear the entry
t.expiresLock.Lock()
t.lock.Lock()
exp := t.expires[expires]
delete(t.expires, expires)
t.expiresLock.Unlock()
t.lock.Unlock()
// Notify the expires channel
t.expireCh <- exp.maxIndex

View File

@ -29,6 +29,7 @@ func TestTombstoneGC(t *testing.T) {
if err != nil {
t.Fatalf("should fail")
}
gc.SetEnabled(true)
if gc.PendingExpiration() {
t.Fatalf("should not be pending")
@ -82,13 +83,14 @@ func TestTombstoneGC_Expire(t *testing.T) {
if err != nil {
t.Fatalf("should fail")
}
gc.SetEnabled(true)
if gc.PendingExpiration() {
t.Fatalf("should not be pending")
}
gc.Hint(100)
gc.Reset()
gc.SetEnabled(false)
if gc.PendingExpiration() {
t.Fatalf("should not be pending")