Upgrade raft-autopilot and wait for autopilot it to stop when revoking leadership (#9644)

Fixes: 9626
This commit is contained in:
Matt Keeler 2021-01-27 11:14:52 -05:00 committed by GitHub
parent 49b773233b
commit f561462064
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 316 additions and 79 deletions

3
.changelog/9626.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
autopilot: Fixed a bug that would cause snapshot restoration to stop autopilot on the leader.
```

View File

@ -395,7 +395,9 @@ func (s *Server) revokeLeadership() {
s.resetConsistentReadReady()
s.autopilot.Stop()
// Stop returns a chan and we want to block until it is closed
// which indicates that autopilot is actually stopped.
<-s.autopilot.Stop()
}
// DEPRECATED (ACL-Legacy-Compat) - Remove once old ACL compatibility is removed

View File

@ -13,6 +13,8 @@ import (
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
autopilot "github.com/hashicorp/raft-autopilot"
"github.com/stretchr/testify/require"
)
// verifySnapshot is a helper that does a snapshot and restore.
@ -165,6 +167,11 @@ func TestSnapshot(t *testing.T) {
testrpc.WaitForLeader(t, s1.RPC, "dc1")
verifySnapshot(t, s1, "dc1", "")
// ensure autopilot is still running
// https://github.com/hashicorp/consul/issues/9626
apstatus, _ := s1.autopilot.IsRunning()
require.Equal(t, autopilot.Running, apstatus)
}
func TestSnapshot_LeaderState(t *testing.T) {

2
go.mod
View File

@ -53,7 +53,7 @@ require (
github.com/hashicorp/memberlist v0.2.2
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
github.com/hashicorp/raft v1.2.0
github.com/hashicorp/raft-autopilot v0.1.1
github.com/hashicorp/raft-autopilot v0.1.2
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
github.com/hashicorp/serf v0.9.5
github.com/hashicorp/vault/api v1.0.5-0.20200717191844-f687267c8086

4
go.sum
View File

@ -289,8 +289,8 @@ github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69/go.mo
github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft v1.2.0 h1:mHzHIrF0S91d3A7RPBvuqkgB4d/7oFJZyvf1Q4m7GA0=
github.com/hashicorp/raft v1.2.0/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft-autopilot v0.1.1 h1:f8Dv2y1Vq8ttuH2+oh5l87Paj/BINpMm5TBrMLx+qGQ=
github.com/hashicorp/raft-autopilot v0.1.1/go.mod h1:HUBUSYtpQRVkgjvvoOgsZPvwe6b6FZJ1xXtaftRZvrA=
github.com/hashicorp/raft-autopilot v0.1.2 h1:yeqdUjWLjVJkBM+mcVxqwxi+w+aHsb9cEON2dz69OCs=
github.com/hashicorp/raft-autopilot v0.1.2/go.mod h1:Af4jZBwaNOI+tXfIqIdbcAnh/UyyqIMj/pOISIfhArw=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk=
github.com/hashicorp/serf v0.9.5 h1:EBWvyu9tcRszt3Bxp3KNssBMP1KuHWyO51lz9+786iM=

View File

@ -75,6 +75,31 @@ func WithPromoter(promoter Promoter) Option {
}
}
// ExecutionStatus represents the current status of the autopilot background go routines
type ExecutionStatus string
const (
NotRunning ExecutionStatus = "not-running"
Running ExecutionStatus = "running"
ShuttingDown ExecutionStatus = "shutting-down"
)
type execInfo struct {
// status is the current state of autopilot executation
status ExecutionStatus
// shutdown is a function that can be execute to shutdown a running
// autopilot's go routines.
shutdown context.CancelFunc
// done is a chan that will be closed when the running autopilot go
// routines have exited. Technically closing it is the very last
// thing done in the go routine but at that point enough state has
// been cleaned up that we would then allow it to be started
// immediately afterward
done chan struct{}
}
// Autopilot is the type to manage a running Raft instance.
//
// Each Raft node in the cluster will have a corresponding Autopilot instance but
@ -132,10 +157,6 @@ type Autopilot struct {
// brought up.
startTime time.Time
// running is a simple bool to indicate whether the go routines to actually
// execute autopilot are currently running
running bool
// removeDeadCh is used to trigger the running autopilot go routines to
// find and remove any dead/failed servers
removeDeadCh chan struct{}
@ -143,20 +164,20 @@ type Autopilot struct {
// reconcileCh is used to trigger an immediate round of reconciliation.
reconcileCh chan struct{}
// shutdown is a function that can be execute to shutdown a running
// autopilot's go routines.
shutdown context.CancelFunc
// done is a chan that will be closed when the running autopilot go
// routines have exited. Technically closing it is the very last
// thing done in the go routine but at that point enough state has
// been cleaned up that we would then allow it to be started
// immediately afterward
done chan struct{}
// leaderLock implements a cancellable mutex that will be used to ensure
// that only one autopilot go routine is the "leader". The leader is
// the go routine that is currently responsible for updating the
// autopilot state and performing raft promotions/demotions.
leaderLock *mutex
// runLock is meant to protect all of the fields regarding coordination
// of whether the autopilot go routines are running and
// starting/stopping them.
runLock sync.Mutex
// execution is the information about the most recent autopilot execution.
// Start will initialize this with the most recent execution and it will
// be updated by Stop and by the go routines being executed when they are
// finished.
execution *execInfo
// execLock protects access to the execution field
execLock sync.Mutex
}
// New will create a new Autopilot instance utilizing the given Raft and Delegate.
@ -166,6 +187,7 @@ func New(raft Raft, delegate ApplicationIntegration, options ...Option) *Autopil
a := &Autopilot{
raft: raft,
delegate: delegate,
state: &State{},
promoter: DefaultPromoter(),
logger: hclog.Default().Named("autopilot"),
// should this be buffered?
@ -173,6 +195,7 @@ func New(raft Raft, delegate ApplicationIntegration, options ...Option) *Autopil
reconcileInterval: DefaultReconcileInterval,
updateInterval: DefaultUpdateInterval,
time: &runtimeTimeProvider{},
leaderLock: newMutex(),
}
for _, opt := range options {

View File

@ -7,4 +7,5 @@ require (
github.com/hashicorp/raft v1.2.0
github.com/stretchr/testify v1.6.1
go.uber.org/goleak v1.1.10
golang.org/x/sync v0.0.0-20190423024810-112230192c58
)

View File

@ -67,6 +67,7 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTmV7VDcZyvRZ+QQXkXTZQ=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

35
vendor/github.com/hashicorp/raft-autopilot/mutex.go generated vendored Normal file
View File

@ -0,0 +1,35 @@
/*
This code was taken from the same implementation in a branch from Consul and then
had the package updated and the mutex type unexported.
*/
package autopilot
import (
"context"
"golang.org/x/sync/semaphore"
)
type mutex semaphore.Weighted
// New returns a Mutex that is ready for use.
func newMutex() *mutex {
return (*mutex)(semaphore.NewWeighted(1))
}
func (m *mutex) Lock() {
_ = (*semaphore.Weighted)(m).Acquire(context.Background(), 1)
}
func (m *mutex) Unlock() {
(*semaphore.Weighted)(m).Release(1)
}
// TryLock acquires the mutex, blocking until resources are available or ctx is
// done. On success, returns nil. On failure, returns ctx.Err() and leaves the
// semaphore unchanged.
//
// If ctx is already done, Acquire may still succeed without blocking.
func (m *mutex) TryLock(ctx context.Context) error {
return (*semaphore.Weighted)(m).Acquire(ctx, 1)
}

View File

@ -9,50 +9,100 @@ import (
// When the context passed in is cancelled or the Stop method is called
// then these routines will exit.
func (a *Autopilot) Start(ctx context.Context) {
a.runLock.Lock()
defer a.runLock.Unlock()
a.execLock.Lock()
defer a.execLock.Unlock()
// already running so there is nothing to do
if a.running {
if a.execution != nil && a.execution.status == Running {
return
}
ctx, shutdown := context.WithCancel(ctx)
a.shutdown = shutdown
a.startTime = a.time.Now()
a.done = make(chan struct{})
// While a go routine executed by a.run below will periodically
// update the state, we want to go ahead and force updating it now
// so that during a leadership transfer we don't report an empty
// autopilot state. We put a pretty small timeout on this though
// so as to prevent leader establishment from taking too long
updateCtx, updateCancel := context.WithTimeout(ctx, time.Second)
defer updateCancel()
a.updateState(updateCtx)
exec := &execInfo{
status: Running,
shutdown: shutdown,
done: make(chan struct{}),
}
go a.run(ctx)
a.running = true
if a.execution == nil || a.execution.status == NotRunning {
// In theory with a nil execution or the current execution being in the not
// running state, we should be able to immediately gain the leader lock as
// nothing else should be running and holding the lock. While true we still
// gain the lock to ensure that only one thread may even attempt to be
// modifying the autopilot state at once.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := a.leaderLock.TryLock(ctx); err == nil {
a.updateState(ctx)
a.leaderLock.Unlock()
}
}
go a.beginExecution(ctx, exec)
a.execution = exec
return
}
// Stop will terminate the go routines being executed to perform autopilot.
func (a *Autopilot) Stop() <-chan struct{} {
a.runLock.Lock()
defer a.runLock.Unlock()
a.execLock.Lock()
defer a.execLock.Unlock()
// Nothing to do
if !a.running {
if a.execution == nil || a.execution.status == NotRunning {
done := make(chan struct{})
close(done)
return done
}
a.shutdown()
return a.done
a.execution.shutdown()
a.execution.status = ShuttingDown
return a.execution.done
}
func (a *Autopilot) run(ctx context.Context) {
// IsRunning returns the current execution status of the autopilot
// go routines as well as a chan which will be closed when the
// routines are no longer running
func (a *Autopilot) IsRunning() (ExecutionStatus, <-chan struct{}) {
a.execLock.Lock()
defer a.execLock.Unlock()
if a.execution == nil || a.execution.status == NotRunning {
done := make(chan struct{})
close(done)
return NotRunning, done
}
return a.execution.status, a.execution.done
}
func (a *Autopilot) finishExecution(exec *execInfo) {
// need to gain the lock because if this was the active execution
// then these values may be read while they are updated.
a.execLock.Lock()
defer a.execLock.Unlock()
exec.shutdown = nil
exec.status = NotRunning
// this should be the final cleanup task as it is what notifies the rest
// of the world that we are now done
close(exec.done)
exec.done = nil
}
func (a *Autopilot) beginExecution(ctx context.Context, exec *execInfo) {
// This will wait for any other go routine to finish executing
// before running any code ourselves to prevent any conflicting
// activity between the two.
if err := a.leaderLock.TryLock(ctx); err != nil {
a.finishExecution(exec)
return
}
a.logger.Debug("autopilot is now running")
// autopilot needs to do 3 things
//
// 1. periodically update the cluster state
@ -78,14 +128,8 @@ func (a *Autopilot) run(ctx context.Context) {
a.logger.Debug("autopilot is now stopped")
a.runLock.Lock()
a.shutdown = nil
a.running = false
// this should be the final cleanup task as it is what notifies the rest
// of the world that we are now done
close(a.done)
a.done = nil
a.runLock.Unlock()
a.finishExecution(exec)
a.leaderLock.Unlock()
}()
reconcileTicker := time.NewTicker(a.reconcileInterval)

View File

@ -30,10 +30,8 @@ type nextStateInputs struct {
Now time.Time
StartTime time.Time
Config *Config
State *State
RaftConfig *raft.Configuration
KnownServers map[raft.ServerID]*Server
AliveServers map[raft.ServerID]*Server
LatestIndex uint64
LastTerm uint64
FetchedStats map[raft.ServerID]*ServerStats
@ -48,16 +46,10 @@ type nextStateInputs struct {
// - Current state
// - Raft Configuration
// - Known Servers
// - Latest raft index (gatered right before the remote server stats so that they should
// - Latest raft index (gathered right before the remote server stats so that they should
// be from about the same point in time)
// - Stats for all non-left servers
func (a *Autopilot) gatherNextStateInputs(ctx context.Context) (*nextStateInputs, error) {
// we are going to hold this lock for the entire function. In theory nothing should
// modify the state on any other go routine so this really shouldn't block anything
// else. However we want to ensure that the inputs are as consistent as possible.
a.stateLock.RLock()
defer a.stateLock.RUnlock()
// there are a lot of inputs to computing the next state so they get put into a
// struct so that we don't have to return 8 values.
inputs := &nextStateInputs{
@ -72,12 +64,6 @@ func (a *Autopilot) gatherNextStateInputs(ctx context.Context) (*nextStateInputs
}
inputs.Config = config
// retrieve the current state
inputs.State = a.state
if inputs.State == nil {
inputs.State = &State{}
}
// retrieve the raft configuration
raftConfig, err := a.getRaftConfiguration()
if err != nil {
@ -125,9 +111,6 @@ func (a *Autopilot) gatherNextStateInputs(ctx context.Context) (*nextStateInputs
return nil, ctx.Err()
}
// filter the known servers to have a map of just the alive servers
inputs.AliveServers = aliveServers(inputs.KnownServers)
// we only allow the fetch to take place for up to half the health interval
// the next health interval will attempt to fetch the stats again but if
// we do not see responses within this time then we can assume they are
@ -136,7 +119,7 @@ func (a *Autopilot) gatherNextStateInputs(ctx context.Context) (*nextStateInputs
fetchCtx, cancel := context.WithDeadline(ctx, d)
defer cancel()
inputs.FetchedStats = a.delegate.FetchServerStats(fetchCtx, inputs.AliveServers)
inputs.FetchedStats = a.delegate.FetchServerStats(fetchCtx, aliveServers(inputs.KnownServers))
// it might be nil but we propagate the ctx.Err just in case our context was
// cancelled since the last time we checked.
@ -239,7 +222,7 @@ func (a *Autopilot) nextServers(inputs *nextStateInputs) map[raft.ServerID]*Serv
newServers := make(map[raft.ServerID]*ServerState)
for _, srv := range inputs.RaftConfig.Servers {
state := buildServerState(inputs, srv)
state := a.buildServerState(inputs, srv)
// update any promoter specific information. This isn't done within
// buildServerState to keep that function "pure" and not require
@ -257,7 +240,7 @@ func (a *Autopilot) nextServers(inputs *nextStateInputs) map[raft.ServerID]*Serv
// buildServerState takes all the nextStateInputs and builds out a ServerState
// for the given Raft server. This will take into account the raft configuration
// existing state, application known servers and recently fetched stats.
func buildServerState(inputs *nextStateInputs, srv raft.Server) ServerState {
func (a *Autopilot) buildServerState(inputs *nextStateInputs, srv raft.Server) ServerState {
// Note that the ordering of operations in this method are very important.
// We are building up the ServerState from the least important sources
// and overriding them with more up to date values.
@ -292,23 +275,24 @@ func buildServerState(inputs *nextStateInputs, srv raft.Server) ServerState {
state.State = RaftLeader
}
var existingHealth *ServerHealth
var previousHealthy *bool
a.stateLock.RLock()
// copy some state from an existing server into the new state - most of this
// should be overridden soon but at this point we are just building the base.
if existing, found := inputs.State.Servers[srv.ID]; found {
if existing, found := a.state.Servers[srv.ID]; found {
state.Stats = existing.Stats
state.Health = existing.Health
existingHealth = &existing.Health
previousHealthy = &state.Health.Healthy
// it is is important to note that the map values we retrieved this from are
// stored by value. Therefore we are modifying a copy of what is in the existing
// state and not the actual state itself. We want to ensure that the Address
// is what Raft will know about.
existing.Server.Address = srv.Address
state.Server = existing.Server
state.Server.Address = srv.Address
}
a.stateLock.RUnlock()
// pull in the latest information from the applications knowledge of the
// server. Mainly we want the NodeStatus & Meta
@ -317,8 +301,8 @@ func buildServerState(inputs *nextStateInputs, srv raft.Server) ServerState {
// map we retrieved this from has a non-pointer type value. We definitely
// do not want to modify the current known servers but we do want to ensure
// that we do not overwrite the Address
known.Address = srv.Address
state.Server = *known
state.Server.Address = srv.Address
} else {
// TODO (mkeeler) do we need a None state. In the previous autopilot code
// we would have set this to serf.StatusNone
@ -336,7 +320,7 @@ func buildServerState(inputs *nextStateInputs, srv raft.Server) ServerState {
// the health status changes. No need for an else as we previously set
// it when we overwrote the whole Health structure when finding a
// server in the existing state
if existingHealth == nil || existingHealth.Healthy != state.Health.Healthy {
if previousHealthy == nil || *previousHealthy != state.Health.Healthy {
state.Health.StableSince = inputs.Now
}

136
vendor/golang.org/x/sync/semaphore/semaphore.go generated vendored Normal file
View File

@ -0,0 +1,136 @@
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package semaphore provides a weighted semaphore implementation.
package semaphore // import "golang.org/x/sync/semaphore"
import (
"container/list"
"context"
"sync"
)
type waiter struct {
n int64
ready chan<- struct{} // Closed when semaphore acquired.
}
// NewWeighted creates a new weighted semaphore with the given
// maximum combined weight for concurrent access.
func NewWeighted(n int64) *Weighted {
w := &Weighted{size: n}
return w
}
// Weighted provides a way to bound concurrent access to a resource.
// The callers can request access with a given weight.
type Weighted struct {
size int64
cur int64
mu sync.Mutex
waiters list.List
}
// Acquire acquires the semaphore with a weight of n, blocking until resources
// are available or ctx is done. On success, returns nil. On failure, returns
// ctx.Err() and leaves the semaphore unchanged.
//
// If ctx is already done, Acquire may still succeed without blocking.
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
s.mu.Lock()
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}
if n > s.size {
// Don't make other Acquire calls block on one that's doomed to fail.
s.mu.Unlock()
<-ctx.Done()
return ctx.Err()
}
ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w)
s.mu.Unlock()
select {
case <-ctx.Done():
err := ctx.Err()
s.mu.Lock()
select {
case <-ready:
// Acquired the semaphore after we were canceled. Rather than trying to
// fix up the queue, just pretend we didn't notice the cancelation.
err = nil
default:
isFront := s.waiters.Front() == elem
s.waiters.Remove(elem)
// If we're at the front and there're extra tokens left, notify other waiters.
if isFront && s.size > s.cur {
s.notifyWaiters()
}
}
s.mu.Unlock()
return err
case <-ready:
return nil
}
}
// TryAcquire acquires the semaphore with a weight of n without blocking.
// On success, returns true. On failure, returns false and leaves the semaphore unchanged.
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock()
success := s.size-s.cur >= n && s.waiters.Len() == 0
if success {
s.cur += n
}
s.mu.Unlock()
return success
}
// Release releases the semaphore with a weight of n.
func (s *Weighted) Release(n int64) {
s.mu.Lock()
s.cur -= n
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held")
}
s.notifyWaiters()
s.mu.Unlock()
}
func (s *Weighted) notifyWaiters() {
for {
next := s.waiters.Front()
if next == nil {
break // No more waiters blocked.
}
w := next.Value.(waiter)
if s.size-s.cur < w.n {
// Not enough tokens for the next waiter. We could keep going (to try to
// find a waiter with a smaller request), but under load that could cause
// starvation for large requests; instead, we leave all remaining waiters
// blocked.
//
// Consider a semaphore used as a read-write lock, with N tokens, N
// readers, and one writer. Each reader can Acquire(1) to obtain a read
// lock. The writer can Acquire(N) to obtain a write lock, excluding all
// of the readers. If we allow the readers to jump ahead in the queue,
// the writer will starve — there is always one token available for every
// reader.
break
}
s.cur += w.n
s.waiters.Remove(next)
close(w.ready)
}
}

3
vendor/modules.txt vendored
View File

@ -280,7 +280,7 @@ github.com/hashicorp/memberlist
github.com/hashicorp/net-rpc-msgpackrpc
# github.com/hashicorp/raft v1.2.0
github.com/hashicorp/raft
# github.com/hashicorp/raft-autopilot v0.1.1
# github.com/hashicorp/raft-autopilot v0.1.2
github.com/hashicorp/raft-autopilot
# github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
github.com/hashicorp/raft-boltdb
@ -507,6 +507,7 @@ golang.org/x/oauth2/jws
golang.org/x/oauth2/jwt
# golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
golang.org/x/sync/errgroup
golang.org/x/sync/semaphore
golang.org/x/sync/singleflight
# golang.org/x/sys v0.0.0-20201024232916-9f70ab9862d5
golang.org/x/sys/cpu