From 4dab70cb937ee240bea8cf7287ffdc1af927748e Mon Sep 17 00:00:00 2001 From: James Phillips Date: Fri, 6 Oct 2017 07:54:49 -0700 Subject: [PATCH] Fixes handling of stop channel and failed barrier attempts. (#3546) * Fixes handling of stop channel and failed barrier attempts. There were two issues here. First, we needed to not exit when there was a timeout trying to write the barrier, because Raft might not step down, so we'd be left as the leader but having run all the step down actions. Second, we didn't close over the stopCh correctly, so it was possible to nil that out and have the leaderLoop never exit. We close over it properly AND sequence the nil-ing of it AFTER the leaderLoop exits for good measure, so the code is more robust. Fixes #3545 * Cleans up based on code review feedback. * Tweaks comments. * Renames variables and removes comments. --- agent/consul/leader.go | 51 ++++++++++++++++++++++++++++++------------ 1 file changed, 37 insertions(+), 14 deletions(-) diff --git a/agent/consul/leader.go b/agent/consul/leader.go index e4e84b4a4a..a96195016b 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -33,25 +33,39 @@ func (s *Server) monitorLeadership() { // cleanup and to ensure we never run multiple leader loops. raftNotifyCh := s.raftNotifyCh - var wg sync.WaitGroup - var stopCh chan struct{} + var weAreLeaderCh chan struct{} + var leaderLoop sync.WaitGroup for { select { case isLeader := <-raftNotifyCh: - if isLeader { - stopCh = make(chan struct{}) - wg.Add(1) - go func() { - s.leaderLoop(stopCh) - wg.Done() - }() + switch { + case isLeader: + if weAreLeaderCh != nil { + s.logger.Printf("[ERR] consul: attempted to start the leader loop while running") + continue + } + + weAreLeaderCh = make(chan struct{}) + leaderLoop.Add(1) + go func(ch chan struct{}) { + defer leaderLoop.Done() + s.leaderLoop(ch) + }(weAreLeaderCh) s.logger.Printf("[INFO] consul: cluster leadership acquired") - } else if stopCh != nil { - close(stopCh) - stopCh = nil - wg.Wait() + + default: + if weAreLeaderCh == nil { + s.logger.Printf("[ERR] consul: attempted to stop the leader loop while not running") + continue + } + + s.logger.Printf("[DEBUG] consul: shutting down leader loop") + close(weAreLeaderCh) + leaderLoop.Wait() + weAreLeaderCh = nil s.logger.Printf("[INFO] consul: cluster leadership lost") } + case <-s.shutdownCh: return } @@ -97,7 +111,7 @@ RECONCILE: barrier := s.raft.Barrier(barrierWriteTimeout) if err := barrier.Error(); err != nil { s.logger.Printf("[ERR] consul: failed to wait for barrier: %v", err) - return + goto WAIT } metrics.MeasureSince([]string{"consul", "leader", "barrier"}, start) metrics.MeasureSince([]string{"leader", "barrier"}, start) @@ -127,6 +141,15 @@ RECONCILE: reconcileCh = s.reconcileCh WAIT: + // Poll the stop channel to give it priority so we don't waste time + // trying to perform the other operations if we have been asked to shut + // down. + select { + case <-stopCh: + return + default: + } + // Periodically reconcile as long as we are the leader, // or when Serf events arrive for {