diff --git a/consul/leader.go b/consul/leader.go index 263096d0ee..e10e3fc547 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -65,9 +65,6 @@ func (s *Server) monitorLeadership() { // leaderLoop runs as long as we are the leader to run various // maintenance activities func (s *Server) leaderLoop(stopCh chan struct{}) { - // Ensure we revoke leadership on stepdown - defer s.revokeLeadership() - // Fire a user event indicating a new leader payload := []byte(s.config.NodeName) if err := s.serfLAN.UserEvent(newLeaderEvent, payload, false); err != nil { @@ -96,11 +93,11 @@ RECONCILE: // Check if we need to handle initial leadership actions if !establishedLeader { if err := s.establishLeadership(); err != nil { - s.logger.Printf("[ERR] consul: failed to establish leadership: %v", - err) + s.logger.Printf("[ERR] consul: failed to establish leadership: %v", err) goto WAIT } establishedLeader = true + defer s.revokeLeadership() } // Reconcile any missing data @@ -128,6 +125,18 @@ WAIT: s.reconcileMember(member) case index := <-s.tombstoneGC.ExpireCh(): go s.reapTombstones(index) + case <-s.reassertLeaderCh: + if !establishedLeader { + continue + } + if err := s.revokeLeadership(); err != nil { + s.logger.Printf("[ERR] consul: failed to revoke leadership: %v", err) + continue + } + if err := s.establishLeadership(); err != nil { + s.logger.Printf("[ERR] consul: failed to re-establish leadership: %v", err) + continue + } } } } diff --git a/consul/server.go b/consul/server.go index dee5ee9ac5..2ba958e9c8 100644 --- a/consul/server.go +++ b/consul/server.go @@ -174,6 +174,10 @@ type Server struct { // Consul servers. statsFetcher *StatsFetcher + // reassertLeaderCh is used to signal the leader loop should re-run + // leadership actions after a snapshot restore. + reassertLeaderCh chan struct{} + // tombstoneGC is used to track the pending GC invocations // for the KV tombstones tombstoneGC *state.TombstoneGC @@ -266,6 +270,7 @@ func NewServer(config *Config) (*Server, error) { router: servers.NewRouter(logger, shutdownCh, config.Datacenter), rpcServer: rpc.NewServer(), rpcTLS: incomingTLS, + reassertLeaderCh: make(chan struct{}), tombstoneGC: gc, shutdownCh: make(chan struct{}), } diff --git a/consul/snapshot_endpoint.go b/consul/snapshot_endpoint.go index 200b443082..367db4b9cf 100644 --- a/consul/snapshot_endpoint.go +++ b/consul/snapshot_endpoint.go @@ -100,11 +100,14 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re if err := barrier.Error(); err != nil { return nil, err } - if err := s.revokeLeadership(); err != nil { - return nil, err - } - if err := s.establishLeadership(); err != nil { - return nil, err + + select { + // Tell the leader loop to reassert leader actions since we just + // replaced the state store contents. + case s.reassertLeaderCh <- struct{}{}: + + // Make sure we don't get stuck during shutdown + case <-s.shutdownCh: } // Give the caller back an empty reader since there's nothing to