diff --git a/consul/leader.go b/consul/leader.go index 2bffdd967d..b5daad1bf4 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -5,6 +5,7 @@ import ( "net" "strconv" "strings" + "sync" "time" "github.com/armon/go-metrics" @@ -29,18 +30,29 @@ const ( // as the leader in the Raft cluster. There is some work the leader is // expected to do, so we must react to changes func (s *Server) monitorLeadership() { - leaderCh := s.raft.LeaderCh() + // We use the notify channel we configured Raft with, NOT Raft's + // leaderCh, which is only notified best-effort. Doing this ensures + // that we get all notifications in order, which is required for + // cleanup and to ensure we never run multiple leader loops. + leaderCh := s.leaderCh + + var wg sync.WaitGroup var stopCh chan struct{} for { select { case isLeader := <-leaderCh: if isLeader { stopCh = make(chan struct{}) - go s.leaderLoop(stopCh) + wg.Add(1) + go func() { + s.leaderLoop(stopCh) + wg.Done() + }() s.logger.Printf("[INFO] consul: cluster leadership acquired") } else if stopCh != nil { close(stopCh) stopCh = nil + wg.Wait() s.logger.Printf("[INFO] consul: cluster leadership lost") } case <-s.shutdownCh: diff --git a/consul/server.go b/consul/server.go index dbcff66afb..ab209d8630 100644 --- a/consul/server.go +++ b/consul/server.go @@ -132,6 +132,10 @@ type Server struct { raftTransport *raft.NetworkTransport raftInmem *raft.InmemStore + // leaderCh set up by setupRaft() and ensures that we get reliable leader + // transition notifications from the Raft layer. + leaderCh <-chan bool + // reconcileCh is used to pass events from the serf handler // into the leader manager, so that the strong state can be // updated @@ -554,6 +558,11 @@ func (s *Server) setupRaft() error { } } + // Set up a channel for reliable leader notifications. + leaderCh := make(chan bool, 1) + s.config.RaftConfig.NotifyCh = leaderCh + s.leaderCh = leaderCh + // Setup the Raft store. s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans) if err != nil {