Kick the leader loop on the proper thread after a snapshot restore, and

only if leadership is already established.
This commit is contained in:
James Phillips 2017-05-03 20:31:14 -07:00 committed by Frank Schroeder
parent 56fe6b65d1
commit 2a652b440a
No known key found for this signature in database
GPG Key ID: 4D65C6EAEC87DECD
3 changed files with 20 additions and 6 deletions

View File

@ -124,6 +124,17 @@ WAIT:
goto RECONCILE goto RECONCILE
case member := <-reconcileCh: case member := <-reconcileCh:
s.reconcileMember(member) s.reconcileMember(member)
case <-s.reassertLeaderCh:
if establishedLeader {
if err := s.revokeLeadership(); err != nil {
s.logger.Printf("[ERR] consul: failed to revoke leadership: %v", err)
goto WAIT
}
if err := s.establishLeadership(); err != nil {
s.logger.Printf("[ERR] consul: failed to re-establish leadership: %v", err)
goto WAIT
}
}
case index := <-s.tombstoneGC.ExpireCh(): case index := <-s.tombstoneGC.ExpireCh():
go s.reapTombstones(index) go s.reapTombstones(index)
} }

View File

@ -174,6 +174,10 @@ type Server struct {
// Consul servers. // Consul servers.
statsFetcher *StatsFetcher statsFetcher *StatsFetcher
// reassertLeaderCh is used to signal the leader loop should re-run
// leadership actions after a snapshot restore.
reassertLeaderCh chan struct{}
// tombstoneGC is used to track the pending GC invocations // tombstoneGC is used to track the pending GC invocations
// for the KV tombstones // for the KV tombstones
tombstoneGC *state.TombstoneGC tombstoneGC *state.TombstoneGC
@ -266,6 +270,7 @@ func NewServer(config *Config) (*Server, error) {
router: servers.NewRouter(logger, shutdownCh, config.Datacenter), router: servers.NewRouter(logger, shutdownCh, config.Datacenter),
rpcServer: rpc.NewServer(), rpcServer: rpc.NewServer(),
rpcTLS: incomingTLS, rpcTLS: incomingTLS,
reassertLeaderCh: make(chan struct{}),
tombstoneGC: gc, tombstoneGC: gc,
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
} }

View File

@ -100,12 +100,10 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re
if err := barrier.Error(); err != nil { if err := barrier.Error(); err != nil {
return nil, err return nil, err
} }
if err := s.revokeLeadership(); err != nil {
return nil, err // Tell the leader loop to reassert leader actions since we just
} // replaced the state store contents.
if err := s.establishLeadership(); err != nil { s.reassertLeaderCh <- struct{}{}
return nil, err
}
// Give the caller back an empty reader since there's nothing to // Give the caller back an empty reader since there's nothing to
// stream back. // stream back.