mirror of https://github.com/status-im/consul.git
Merge pull request #3004 from hashicorp/issue-2980
Fixes panic in leader loop on step down w/o step up
This commit is contained in:
commit
693f35e038
|
@ -65,9 +65,6 @@ func (s *Server) monitorLeadership() {
|
|||
// leaderLoop runs as long as we are the leader to run various
|
||||
// maintenance activities
|
||||
func (s *Server) leaderLoop(stopCh chan struct{}) {
|
||||
// Ensure we revoke leadership on stepdown
|
||||
defer s.revokeLeadership()
|
||||
|
||||
// Fire a user event indicating a new leader
|
||||
payload := []byte(s.config.NodeName)
|
||||
if err := s.serfLAN.UserEvent(newLeaderEvent, payload, false); err != nil {
|
||||
|
@ -96,11 +93,11 @@ RECONCILE:
|
|||
// Check if we need to handle initial leadership actions
|
||||
if !establishedLeader {
|
||||
if err := s.establishLeadership(); err != nil {
|
||||
s.logger.Printf("[ERR] consul: failed to establish leadership: %v",
|
||||
err)
|
||||
s.logger.Printf("[ERR] consul: failed to establish leadership: %v", err)
|
||||
goto WAIT
|
||||
}
|
||||
establishedLeader = true
|
||||
defer s.revokeLeadership()
|
||||
}
|
||||
|
||||
// Reconcile any missing data
|
||||
|
@ -128,6 +125,18 @@ WAIT:
|
|||
s.reconcileMember(member)
|
||||
case index := <-s.tombstoneGC.ExpireCh():
|
||||
go s.reapTombstones(index)
|
||||
case <-s.reassertLeaderCh:
|
||||
if !establishedLeader {
|
||||
continue
|
||||
}
|
||||
if err := s.revokeLeadership(); err != nil {
|
||||
s.logger.Printf("[ERR] consul: failed to revoke leadership: %v", err)
|
||||
continue
|
||||
}
|
||||
if err := s.establishLeadership(); err != nil {
|
||||
s.logger.Printf("[ERR] consul: failed to re-establish leadership: %v", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -174,6 +174,10 @@ type Server struct {
|
|||
// Consul servers.
|
||||
statsFetcher *StatsFetcher
|
||||
|
||||
// reassertLeaderCh is used to signal the leader loop should re-run
|
||||
// leadership actions after a snapshot restore.
|
||||
reassertLeaderCh chan struct{}
|
||||
|
||||
// tombstoneGC is used to track the pending GC invocations
|
||||
// for the KV tombstones
|
||||
tombstoneGC *state.TombstoneGC
|
||||
|
@ -266,6 +270,7 @@ func NewServer(config *Config) (*Server, error) {
|
|||
router: servers.NewRouter(logger, shutdownCh, config.Datacenter),
|
||||
rpcServer: rpc.NewServer(),
|
||||
rpcTLS: incomingTLS,
|
||||
reassertLeaderCh: make(chan struct{}),
|
||||
tombstoneGC: gc,
|
||||
shutdownCh: make(chan struct{}),
|
||||
}
|
||||
|
|
|
@ -100,11 +100,14 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re
|
|||
if err := barrier.Error(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := s.revokeLeadership(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := s.establishLeadership(); err != nil {
|
||||
return nil, err
|
||||
|
||||
select {
|
||||
// Tell the leader loop to reassert leader actions since we just
|
||||
// replaced the state store contents.
|
||||
case s.reassertLeaderCh <- struct{}{}:
|
||||
|
||||
// Make sure we don't get stuck during shutdown
|
||||
case <-s.shutdownCh:
|
||||
}
|
||||
|
||||
// Give the caller back an empty reader since there's nothing to
|
||||
|
|
Loading…
Reference in New Issue