From eff282762d178525e81148b895d45af90113727c Mon Sep 17 00:00:00 2001 From: James Phillips Date: Thu, 4 May 2017 11:52:22 -0700 Subject: [PATCH] Adds timeout and waits for feedback when asking the leader loop to reassert. This adds on to the fix in #3004 for issue #2980. --- consul/leader.go | 27 +++++++++++++++------------ consul/server.go | 4 ++-- consul/snapshot_endpoint.go | 26 +++++++++++++++++++++++++- 3 files changed, 42 insertions(+), 15 deletions(-) diff --git a/consul/leader.go b/consul/leader.go index e10e3fc547..7a1bfd358f 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -76,6 +76,19 @@ func (s *Server) leaderLoop(stopCh chan struct{}) { var reconcileCh chan serf.Member establishedLeader := false + reassert := func() error { + if !establishedLeader { + return fmt.Errorf("leadership has not been established") + } + if err := s.revokeLeadership(); err != nil { + return err + } + if err := s.establishLeadership(); err != nil { + return err + } + return nil + } + RECONCILE: // Setup a reconciliation timer reconcileCh = nil @@ -125,18 +138,8 @@ WAIT: s.reconcileMember(member) case index := <-s.tombstoneGC.ExpireCh(): go s.reapTombstones(index) - case <-s.reassertLeaderCh: - if !establishedLeader { - continue - } - if err := s.revokeLeadership(); err != nil { - s.logger.Printf("[ERR] consul: failed to revoke leadership: %v", err) - continue - } - if err := s.establishLeadership(); err != nil { - s.logger.Printf("[ERR] consul: failed to re-establish leadership: %v", err) - continue - } + case errCh := <-s.reassertLeaderCh: + errCh <- reassert() } } } diff --git a/consul/server.go b/consul/server.go index 2ba958e9c8..56df71fc4d 100644 --- a/consul/server.go +++ b/consul/server.go @@ -176,7 +176,7 @@ type Server struct { // reassertLeaderCh is used to signal the leader loop should re-run // leadership actions after a snapshot restore. - reassertLeaderCh chan struct{} + reassertLeaderCh chan chan error // tombstoneGC is used to track the pending GC invocations // for the KV tombstones @@ -270,7 +270,7 @@ func NewServer(config *Config) (*Server, error) { router: servers.NewRouter(logger, shutdownCh, config.Datacenter), rpcServer: rpc.NewServer(), rpcTLS: incomingTLS, - reassertLeaderCh: make(chan struct{}), + reassertLeaderCh: make(chan chan error), tombstoneGC: gc, shutdownCh: make(chan struct{}), } diff --git a/consul/snapshot_endpoint.go b/consul/snapshot_endpoint.go index 367db4b9cf..3c6f130021 100644 --- a/consul/snapshot_endpoint.go +++ b/consul/snapshot_endpoint.go @@ -101,10 +101,34 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re return nil, err } + // This'll be used for feedback from the leader loop. + errCh := make(chan error, 1) + timeoutCh := time.After(time.Minute) + select { // Tell the leader loop to reassert leader actions since we just // replaced the state store contents. - case s.reassertLeaderCh <- struct{}{}: + case s.reassertLeaderCh <- errCh: + + // We might have lost leadership while waiting to kick the loop. + case <-timeoutCh: + return nil, fmt.Errorf("timed out waiting to re-run leader actions") + + // Make sure we don't get stuck during shutdown + case <-s.shutdownCh: + } + + select { + // Wait for the leader loop to finish up. + case err := <-errCh: + if err != nil { + return nil, err + } + + // We might have lost leadership while the loop was doing its + // thing. + case <-timeoutCh: + return nil, fmt.Errorf("timed out waiting for re-run of leader actions") // Make sure we don't get stuck during shutdown case <-s.shutdownCh: