Merge pull request #3231 from hashicorp/issue_3230

Fixes deadlock between barrier write and leader notify channel read .…
This commit is contained in:
preetapan 2017-07-06 09:11:53 -05:00 committed by GitHub
commit 4abbbc91f5
2 changed files with 10 additions and 9 deletions

View File

@ -25,6 +25,7 @@ const (
ConsulServiceID = "consul" ConsulServiceID = "consul"
ConsulServiceName = "consul" ConsulServiceName = "consul"
newLeaderEvent = "consul:new-leader" newLeaderEvent = "consul:new-leader"
barrierWriteTimeout = 2 * time.Minute
) )
// monitorLeadership is used to monitor if we acquire or lose our role // monitorLeadership is used to monitor if we acquire or lose our role
@ -35,13 +36,13 @@ func (s *Server) monitorLeadership() {
// leaderCh, which is only notified best-effort. Doing this ensures // leaderCh, which is only notified best-effort. Doing this ensures
// that we get all notifications in order, which is required for // that we get all notifications in order, which is required for
// cleanup and to ensure we never run multiple leader loops. // cleanup and to ensure we never run multiple leader loops.
leaderCh := s.leaderCh raftNotifyCh := s.raftNotifyCh
var wg sync.WaitGroup var wg sync.WaitGroup
var stopCh chan struct{} var stopCh chan struct{}
for { for {
select { select {
case isLeader := <-leaderCh: case isLeader := <-raftNotifyCh:
if isLeader { if isLeader {
stopCh = make(chan struct{}) stopCh = make(chan struct{})
wg.Add(1) wg.Add(1)
@ -96,10 +97,10 @@ RECONCILE:
// Apply a raft barrier to ensure our FSM is caught up // Apply a raft barrier to ensure our FSM is caught up
start := time.Now() start := time.Now()
barrier := s.raft.Barrier(0) barrier := s.raft.Barrier(barrierWriteTimeout)
if err := barrier.Error(); err != nil { if err := barrier.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to wait for barrier: %v", err) s.logger.Printf("[ERR] consul: failed to wait for barrier: %v", err)
goto WAIT return
} }
metrics.MeasureSince([]string{"consul", "leader", "barrier"}, start) metrics.MeasureSince([]string{"consul", "leader", "barrier"}, start)

View File

@ -134,9 +134,9 @@ type Server struct {
raftTransport *raft.NetworkTransport raftTransport *raft.NetworkTransport
raftInmem *raft.InmemStore raftInmem *raft.InmemStore
// leaderCh set up by setupRaft() and ensures that we get reliable leader // raftNotifyCh is set up by setupRaft() and ensures that we get reliable leader
// transition notifications from the Raft layer. // transition notifications from the Raft layer.
leaderCh <-chan bool raftNotifyCh <-chan bool
// reconcileCh is used to pass events from the serf handler // reconcileCh is used to pass events from the serf handler
// into the leader manager, so that the strong state can be // into the leader manager, so that the strong state can be
@ -601,9 +601,9 @@ func (s *Server) setupRaft() error {
} }
// Set up a channel for reliable leader notifications. // Set up a channel for reliable leader notifications.
leaderCh := make(chan bool, 1) raftNotifyCh := make(chan bool, 1)
s.config.RaftConfig.NotifyCh = leaderCh s.config.RaftConfig.NotifyCh = raftNotifyCh
s.leaderCh = leaderCh s.raftNotifyCh = raftNotifyCh
// Setup the Raft store. // Setup the Raft store.
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans) s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans)