From 56fe6b65d10346f6946545c42c81cc9b73e278d3 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Wed, 3 May 2017 20:15:52 -0700 Subject: [PATCH 1/4] Runs revoke leadership actions only if we've established leadership. --- consul/leader.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/consul/leader.go b/consul/leader.go index 263096d0ee..f44b73e031 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -65,9 +65,6 @@ func (s *Server) monitorLeadership() { // leaderLoop runs as long as we are the leader to run various // maintenance activities func (s *Server) leaderLoop(stopCh chan struct{}) { - // Ensure we revoke leadership on stepdown - defer s.revokeLeadership() - // Fire a user event indicating a new leader payload := []byte(s.config.NodeName) if err := s.serfLAN.UserEvent(newLeaderEvent, payload, false); err != nil { @@ -101,6 +98,7 @@ RECONCILE: goto WAIT } establishedLeader = true + defer s.revokeLeadership() } // Reconcile any missing data From 2a652b440a7d1fac0fe0d33b9dd76e5a336ea33b Mon Sep 17 00:00:00 2001 From: James Phillips Date: Wed, 3 May 2017 20:31:14 -0700 Subject: [PATCH 2/4] Kick the leader loop on the proper thread after a snapshot restore, and only if leadership is already established. --- consul/leader.go | 11 +++++++++++ consul/server.go | 5 +++++ consul/snapshot_endpoint.go | 10 ++++------ 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/consul/leader.go b/consul/leader.go index f44b73e031..f154d5857c 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -124,6 +124,17 @@ WAIT: goto RECONCILE case member := <-reconcileCh: s.reconcileMember(member) + case <-s.reassertLeaderCh: + if establishedLeader { + if err := s.revokeLeadership(); err != nil { + s.logger.Printf("[ERR] consul: failed to revoke leadership: %v", err) + goto WAIT + } + if err := s.establishLeadership(); err != nil { + s.logger.Printf("[ERR] consul: failed to re-establish leadership: %v", err) + goto WAIT + } + } case index := <-s.tombstoneGC.ExpireCh(): go s.reapTombstones(index) } diff --git a/consul/server.go b/consul/server.go index dee5ee9ac5..2ba958e9c8 100644 --- a/consul/server.go +++ b/consul/server.go @@ -174,6 +174,10 @@ type Server struct { // Consul servers. statsFetcher *StatsFetcher + // reassertLeaderCh is used to signal the leader loop should re-run + // leadership actions after a snapshot restore. + reassertLeaderCh chan struct{} + // tombstoneGC is used to track the pending GC invocations // for the KV tombstones tombstoneGC *state.TombstoneGC @@ -266,6 +270,7 @@ func NewServer(config *Config) (*Server, error) { router: servers.NewRouter(logger, shutdownCh, config.Datacenter), rpcServer: rpc.NewServer(), rpcTLS: incomingTLS, + reassertLeaderCh: make(chan struct{}), tombstoneGC: gc, shutdownCh: make(chan struct{}), } diff --git a/consul/snapshot_endpoint.go b/consul/snapshot_endpoint.go index 200b443082..98e0e41684 100644 --- a/consul/snapshot_endpoint.go +++ b/consul/snapshot_endpoint.go @@ -100,12 +100,10 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re if err := barrier.Error(); err != nil { return nil, err } - if err := s.revokeLeadership(); err != nil { - return nil, err - } - if err := s.establishLeadership(); err != nil { - return nil, err - } + + // Tell the leader loop to reassert leader actions since we just + // replaced the state store contents. + s.reassertLeaderCh <- struct{}{} // Give the caller back an empty reader since there's nothing to // stream back. From cfd584a784d889d22676a9606d2c60b7d54c686d Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Thu, 4 May 2017 16:15:25 +0200 Subject: [PATCH 3/4] Straighten control flow in leader.go --- consul/leader.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/consul/leader.go b/consul/leader.go index f154d5857c..e10e3fc547 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -93,8 +93,7 @@ RECONCILE: // Check if we need to handle initial leadership actions if !establishedLeader { if err := s.establishLeadership(); err != nil { - s.logger.Printf("[ERR] consul: failed to establish leadership: %v", - err) + s.logger.Printf("[ERR] consul: failed to establish leadership: %v", err) goto WAIT } establishedLeader = true @@ -124,19 +123,20 @@ WAIT: goto RECONCILE case member := <-reconcileCh: s.reconcileMember(member) - case <-s.reassertLeaderCh: - if establishedLeader { - if err := s.revokeLeadership(); err != nil { - s.logger.Printf("[ERR] consul: failed to revoke leadership: %v", err) - goto WAIT - } - if err := s.establishLeadership(); err != nil { - s.logger.Printf("[ERR] consul: failed to re-establish leadership: %v", err) - goto WAIT - } - } case index := <-s.tombstoneGC.ExpireCh(): go s.reapTombstones(index) + case <-s.reassertLeaderCh: + if !establishedLeader { + continue + } + if err := s.revokeLeadership(); err != nil { + s.logger.Printf("[ERR] consul: failed to revoke leadership: %v", err) + continue + } + if err := s.establishLeadership(); err != nil { + s.logger.Printf("[ERR] consul: failed to re-establish leadership: %v", err) + continue + } } } } From b33c4a16c15df280d30cbceb3791ad5f2aa3925a Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Thu, 4 May 2017 16:48:54 +0200 Subject: [PATCH 4/4] Do not block on reassertLeader during shutdown --- consul/snapshot_endpoint.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/consul/snapshot_endpoint.go b/consul/snapshot_endpoint.go index 98e0e41684..367db4b9cf 100644 --- a/consul/snapshot_endpoint.go +++ b/consul/snapshot_endpoint.go @@ -101,9 +101,14 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re return nil, err } + select { // Tell the leader loop to reassert leader actions since we just // replaced the state store contents. - s.reassertLeaderCh <- struct{}{} + case s.reassertLeaderCh <- struct{}{}: + + // Make sure we don't get stuck during shutdown + case <-s.shutdownCh: + } // Give the caller back an empty reader since there's nothing to // stream back.