From 2a652b440a7d1fac0fe0d33b9dd76e5a336ea33b Mon Sep 17 00:00:00 2001 From: James Phillips Date: Wed, 3 May 2017 20:31:14 -0700 Subject: [PATCH] Kick the leader loop on the proper thread after a snapshot restore, and only if leadership is already established. --- consul/leader.go | 11 +++++++++++ consul/server.go | 5 +++++ consul/snapshot_endpoint.go | 10 ++++------ 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/consul/leader.go b/consul/leader.go index f44b73e031..f154d5857c 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -124,6 +124,17 @@ WAIT: goto RECONCILE case member := <-reconcileCh: s.reconcileMember(member) + case <-s.reassertLeaderCh: + if establishedLeader { + if err := s.revokeLeadership(); err != nil { + s.logger.Printf("[ERR] consul: failed to revoke leadership: %v", err) + goto WAIT + } + if err := s.establishLeadership(); err != nil { + s.logger.Printf("[ERR] consul: failed to re-establish leadership: %v", err) + goto WAIT + } + } case index := <-s.tombstoneGC.ExpireCh(): go s.reapTombstones(index) } diff --git a/consul/server.go b/consul/server.go index dee5ee9ac5..2ba958e9c8 100644 --- a/consul/server.go +++ b/consul/server.go @@ -174,6 +174,10 @@ type Server struct { // Consul servers. statsFetcher *StatsFetcher + // reassertLeaderCh is used to signal the leader loop should re-run + // leadership actions after a snapshot restore. + reassertLeaderCh chan struct{} + // tombstoneGC is used to track the pending GC invocations // for the KV tombstones tombstoneGC *state.TombstoneGC @@ -266,6 +270,7 @@ func NewServer(config *Config) (*Server, error) { router: servers.NewRouter(logger, shutdownCh, config.Datacenter), rpcServer: rpc.NewServer(), rpcTLS: incomingTLS, + reassertLeaderCh: make(chan struct{}), tombstoneGC: gc, shutdownCh: make(chan struct{}), } diff --git a/consul/snapshot_endpoint.go b/consul/snapshot_endpoint.go index 200b443082..98e0e41684 100644 --- a/consul/snapshot_endpoint.go +++ b/consul/snapshot_endpoint.go @@ -100,12 +100,10 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re if err := barrier.Error(); err != nil { return nil, err } - if err := s.revokeLeadership(); err != nil { - return nil, err - } - if err := s.establishLeadership(); err != nil { - return nil, err - } + + // Tell the leader loop to reassert leader actions since we just + // replaced the state store contents. + s.reassertLeaderCh <- struct{}{} // Give the caller back an empty reader since there's nothing to // stream back.