Merge pull request #3737 from hashicorp/autopilot-refactor

Move autopilot to a standalone package
This commit is contained in:
Kyle Havlovitz 2017-12-15 14:09:40 -08:00 committed by GitHub
commit a86d11ec0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 915 additions and 647 deletions

View File

@ -3,349 +3,61 @@ package consul
import (
"context"
"fmt"
"net"
"strconv"
"sync"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-version"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)
// AutopilotPolicy is the interface for the Autopilot mechanism
type AutopilotPolicy interface {
// PromoteNonVoters defines the handling of non-voting servers
PromoteNonVoters(*structs.AutopilotConfig) error
}
func (s *Server) startAutopilot() {
s.autopilotShutdownCh = make(chan struct{})
s.autopilotWaitGroup = sync.WaitGroup{}
s.autopilotWaitGroup.Add(1)
go s.autopilotLoop()
}
func (s *Server) stopAutopilot() {
close(s.autopilotShutdownCh)
s.autopilotWaitGroup.Wait()
}
var minAutopilotVersion = version.Must(version.NewVersion("0.8.0"))
// autopilotLoop periodically looks for nonvoting servers to promote and dead servers to remove.
func (s *Server) autopilotLoop() {
defer s.autopilotWaitGroup.Done()
// Monitor server health until shutdown
ticker := time.NewTicker(s.config.AutopilotInterval)
defer ticker.Stop()
for {
select {
case <-s.autopilotShutdownCh:
return
case <-ticker.C:
autopilotConfig, ok := s.getOrCreateAutopilotConfig()
if !ok {
continue
}
if err := s.autopilotPolicy.PromoteNonVoters(autopilotConfig); err != nil {
s.logger.Printf("[ERR] autopilot: Error checking for non-voters to promote: %s", err)
}
if err := s.pruneDeadServers(autopilotConfig); err != nil {
s.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
}
case <-s.autopilotRemoveDeadCh:
autopilotConfig, ok := s.getOrCreateAutopilotConfig()
if !ok {
continue
}
if err := s.pruneDeadServers(autopilotConfig); err != nil {
s.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
}
}
}
}
// fmtServer prints info about a server in a standard way for logging.
func fmtServer(server raft.Server) string {
return fmt.Sprintf("Server (ID: %q Address: %q)", server.ID, server.Address)
}
// pruneDeadServers removes up to numPeers/2 failed servers
func (s *Server) pruneDeadServers(autopilotConfig *structs.AutopilotConfig) error {
if !autopilotConfig.CleanupDeadServers {
return nil
}
// Failed servers are known to Serf and marked failed, and stale servers
// are known to Raft but not Serf.
var failed []string
staleRaftServers := make(map[string]raft.Server)
future := s.raft.GetConfiguration()
if err := future.Error(); err != nil {
return err
}
for _, server := range future.Configuration().Servers {
staleRaftServers[string(server.Address)] = server
}
for _, member := range s.serfLAN.Members() {
valid, parts := metadata.IsConsulServer(member)
if valid {
if _, ok := staleRaftServers[parts.Addr.String()]; ok {
delete(staleRaftServers, parts.Addr.String())
}
if member.Status == serf.StatusFailed {
failed = append(failed, member.Name)
}
}
}
// We can bail early if there's nothing to do.
removalCount := len(failed) + len(staleRaftServers)
if removalCount == 0 {
return nil
}
// Only do removals if a minority of servers will be affected.
peers, err := s.numPeers()
if err != nil {
return err
}
if removalCount < peers/2 {
for _, node := range failed {
s.logger.Printf("[INFO] autopilot: Attempting removal of failed server node %q", node)
go s.serfLAN.RemoveFailedNode(node)
}
minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
if err != nil {
return err
}
for _, raftServer := range staleRaftServers {
s.logger.Printf("[INFO] autopilot: Attempting removal of stale %s", fmtServer(raftServer))
var future raft.Future
if minRaftProtocol >= 2 {
future = s.raft.RemoveServer(raftServer.ID, 0, 0)
} else {
future = s.raft.RemovePeer(raftServer.Address)
}
if err := future.Error(); err != nil {
return err
}
}
} else {
s.logger.Printf("[DEBUG] autopilot: Failed to remove dead servers: too many dead servers: %d/%d", removalCount, peers)
}
return nil
}
// BasicAutopilot defines a policy for promoting non-voting servers in a way
// that maintains an odd-numbered voter count.
type BasicAutopilot struct {
// AutopilotDelegate is a Consul delegate for autopilot operations.
type AutopilotDelegate struct {
server *Server
}
// PromoteNonVoters promotes eligible non-voting servers to voters.
func (b *BasicAutopilot) PromoteNonVoters(autopilotConfig *structs.AutopilotConfig) error {
// If we don't meet the minimum version for non-voter features, bail
// early.
minRaftProtocol, err := ServerMinRaftProtocol(b.server.LANMembers())
if err != nil {
return fmt.Errorf("error getting server raft protocol versions: %s", err)
}
if minRaftProtocol < 3 {
return nil
}
// Find any non-voters eligible for promotion.
now := time.Now()
var promotions []raft.Server
future := b.server.raft.GetConfiguration()
if err := future.Error(); err != nil {
return fmt.Errorf("failed to get raft configuration: %v", err)
}
for _, server := range future.Configuration().Servers {
if !isVoter(server.Suffrage) {
health := b.server.getServerHealth(string(server.ID))
if health.IsStable(now, autopilotConfig) {
promotions = append(promotions, server)
}
}
}
if err := b.server.handlePromotions(promotions); err != nil {
return err
}
return nil
func (d *AutopilotDelegate) AutopilotConfig() *autopilot.Config {
return d.server.getOrCreateAutopilotConfig()
}
// handlePromotions is a helper shared with Consul Enterprise that attempts to
// apply desired server promotions to the Raft configuration.
func (s *Server) handlePromotions(promotions []raft.Server) error {
// This used to wait to only promote to maintain an odd quorum of
// servers, but this was at odds with the dead server cleanup when doing
// rolling updates (add one new server, wait, and then kill an old
// server). The dead server cleanup would still count the old server as
// a peer, which is conservative and the right thing to do, and this
// would wait to promote, so you could get into a stalemate. It is safer
// to promote early than remove early, so by promoting as soon as
// possible we have chosen that as the solution here.
for _, server := range promotions {
s.logger.Printf("[INFO] autopilot: Promoting %s to voter", fmtServer(server))
addFuture := s.raft.AddVoter(server.ID, server.Address, 0, 0)
if err := addFuture.Error(); err != nil {
return fmt.Errorf("failed to add raft peer: %v", err)
}
}
// If we promoted a server, trigger a check to remove dead servers.
if len(promotions) > 0 {
select {
case s.autopilotRemoveDeadCh <- struct{}{}:
default:
}
}
return nil
func (d *AutopilotDelegate) FetchStats(ctx context.Context, servers []serf.Member) map[string]*autopilot.ServerStats {
return d.server.statsFetcher.Fetch(ctx, servers)
}
// serverHealthLoop monitors the health of the servers in the cluster
func (s *Server) serverHealthLoop() {
// Monitor server health until shutdown
ticker := time.NewTicker(s.config.ServerHealthInterval)
defer ticker.Stop()
for {
select {
case <-s.shutdownCh:
return
case <-ticker.C:
if err := s.updateClusterHealth(); err != nil {
s.logger.Printf("[ERR] autopilot: Error updating cluster health: %s", err)
}
}
func (d *AutopilotDelegate) IsServer(m serf.Member) (*autopilot.ServerInfo, error) {
if m.Tags["role"] != "consul" {
return nil, nil
}
port_str := m.Tags["port"]
port, err := strconv.Atoi(port_str)
if err != nil {
return nil, err
}
build_version, err := metadata.Build(&m)
if err != nil {
return nil, err
}
server := &autopilot.ServerInfo{
Name: m.Name,
ID: m.Tags["id"],
Addr: &net.TCPAddr{IP: m.Addr, Port: port},
Build: *build_version,
Status: m.Status,
}
return server, nil
}
// updateClusterHealth fetches the Raft stats of the other servers and updates
// s.clusterHealth based on the configured Autopilot thresholds
func (s *Server) updateClusterHealth() error {
// Don't do anything if the min Raft version is too low
minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers())
if err != nil {
return fmt.Errorf("error getting server raft protocol versions: %s", err)
}
if minRaftProtocol < 3 {
return nil
}
state := s.fsm.State()
_, autopilotConf, err := state.AutopilotConfig()
if err != nil {
return fmt.Errorf("error retrieving autopilot config: %s", err)
}
// Bail early if autopilot config hasn't been initialized yet
if autopilotConf == nil {
return nil
}
// Get the the serf members which are Consul servers
serverMap := make(map[string]*metadata.Server)
for _, member := range s.LANMembers() {
if member.Status == serf.StatusLeft {
continue
}
valid, parts := metadata.IsConsulServer(member)
if valid {
serverMap[parts.ID] = parts
}
}
future := s.raft.GetConfiguration()
if err := future.Error(); err != nil {
return fmt.Errorf("error getting Raft configuration %s", err)
}
servers := future.Configuration().Servers
// Fetch the health for each of the servers in parallel so we get as
// consistent of a sample as possible. We capture the leader's index
// here as well so it roughly lines up with the same point in time.
targetLastIndex := s.raft.LastIndex()
var fetchList []*metadata.Server
for _, server := range servers {
if parts, ok := serverMap[string(server.ID)]; ok {
fetchList = append(fetchList, parts)
}
}
d := time.Now().Add(s.config.ServerHealthInterval / 2)
ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
fetchedStats := s.statsFetcher.Fetch(ctx, fetchList)
// Build a current list of server healths
leader := s.raft.Leader()
var clusterHealth structs.OperatorHealthReply
voterCount := 0
healthyCount := 0
healthyVoterCount := 0
for _, server := range servers {
health := structs.ServerHealth{
ID: string(server.ID),
Address: string(server.Address),
Leader: server.Address == leader,
LastContact: -1,
Voter: server.Suffrage == raft.Voter,
}
parts, ok := serverMap[string(server.ID)]
if ok {
health.Name = parts.Name
health.SerfStatus = parts.Status
health.Version = parts.Build.String()
if stats, ok := fetchedStats[string(server.ID)]; ok {
if err := s.updateServerHealth(&health, parts, stats, autopilotConf, targetLastIndex); err != nil {
s.logger.Printf("[WARN] autopilot: Error updating server %s health: %s", fmtServer(server), err)
}
}
} else {
health.SerfStatus = serf.StatusNone
}
if health.Voter {
voterCount++
}
// Heartbeat a metric for monitoring if we're the leader
func (d *AutopilotDelegate) NotifyHealth(health autopilot.OperatorHealthReply) {
if d.server.raft.State() == raft.Leader {
metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(health.FailureTolerance))
metrics.SetGauge([]string{"autopilot", "failure_tolerance"}, float32(health.FailureTolerance))
if health.Healthy {
healthyCount++
if health.Voter {
healthyVoterCount++
}
}
clusterHealth.Servers = append(clusterHealth.Servers, health)
}
clusterHealth.Healthy = healthyCount == len(servers)
// If we have extra healthy voters, update FailureTolerance
requiredQuorum := voterCount/2 + 1
if healthyVoterCount > requiredQuorum {
clusterHealth.FailureTolerance = healthyVoterCount - requiredQuorum
}
// Heartbeat a metric for monitoring if we're the leader
if s.IsLeader() {
metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
metrics.SetGauge([]string{"autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
if clusterHealth.Healthy {
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1)
metrics.SetGauge([]string{"autopilot", "healthy"}, 1)
} else {
@ -353,70 +65,21 @@ func (s *Server) updateClusterHealth() error {
metrics.SetGauge([]string{"autopilot", "healthy"}, 0)
}
}
s.clusterHealthLock.Lock()
s.clusterHealth = clusterHealth
s.clusterHealthLock.Unlock()
return nil
}
// updateServerHealth computes the resulting health of the server based on its
// fetched stats and the state of the leader.
func (s *Server) updateServerHealth(health *structs.ServerHealth,
server *metadata.Server, stats *structs.ServerStats,
autopilotConf *structs.AutopilotConfig, targetLastIndex uint64) error {
health.LastTerm = stats.LastTerm
health.LastIndex = stats.LastIndex
if stats.LastContact != "never" {
var err error
health.LastContact, err = time.ParseDuration(stats.LastContact)
if err != nil {
return fmt.Errorf("error parsing last_contact duration: %s", err)
}
func (d *AutopilotDelegate) PromoteNonVoters(conf *autopilot.Config, health autopilot.OperatorHealthReply) ([]raft.Server, error) {
future := d.server.raft.GetConfiguration()
if err := future.Error(); err != nil {
return nil, fmt.Errorf("failed to get raft configuration: %v", err)
}
lastTerm, err := strconv.ParseUint(s.raft.Stats()["last_log_term"], 10, 64)
if err != nil {
return fmt.Errorf("error parsing last_log_term: %s", err)
}
health.Healthy = health.IsHealthy(lastTerm, targetLastIndex, autopilotConf)
// If this is a new server or the health changed, reset StableSince
lastHealth := s.getServerHealth(server.ID)
if lastHealth == nil || lastHealth.Healthy != health.Healthy {
health.StableSince = time.Now()
} else {
health.StableSince = lastHealth.StableSince
}
return nil
return autopilot.PromoteStableServers(conf, health, future.Configuration().Servers), nil
}
func (s *Server) getClusterHealth() structs.OperatorHealthReply {
s.clusterHealthLock.RLock()
defer s.clusterHealthLock.RUnlock()
return s.clusterHealth
func (d *AutopilotDelegate) Raft() *raft.Raft {
return d.server.raft
}
func (s *Server) getServerHealth(id string) *structs.ServerHealth {
s.clusterHealthLock.RLock()
defer s.clusterHealthLock.RUnlock()
for _, health := range s.clusterHealth.Servers {
if health.ID == id {
return &health
}
}
return nil
}
func isVoter(suffrage raft.ServerSuffrage) bool {
switch suffrage {
case raft.Voter, raft.Staging:
return true
default:
return false
}
func (d *AutopilotDelegate) Serf() *serf.Serf {
return d.server.serfLAN
}

View File

@ -0,0 +1,480 @@
package autopilot
import (
"context"
"fmt"
"log"
"net"
"strconv"
"sync"
"time"
"github.com/hashicorp/go-version"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)
// Delegate is the interface for the Autopilot mechanism
type Delegate interface {
AutopilotConfig() *Config
FetchStats(context.Context, []serf.Member) map[string]*ServerStats
IsServer(serf.Member) (*ServerInfo, error)
NotifyHealth(OperatorHealthReply)
PromoteNonVoters(*Config, OperatorHealthReply) ([]raft.Server, error)
Raft() *raft.Raft
Serf() *serf.Serf
}
// Autopilot is a mechanism for automatically managing the Raft
// quorum using server health information along with updates from Serf gossip.
// For more information, see https://www.consul.io/docs/guides/autopilot.html
type Autopilot struct {
logger *log.Logger
delegate Delegate
interval time.Duration
healthInterval time.Duration
clusterHealth OperatorHealthReply
clusterHealthLock sync.RWMutex
removeDeadCh chan struct{}
shutdownCh chan struct{}
waitGroup sync.WaitGroup
}
type ServerInfo struct {
Name string
ID string
Addr net.Addr
Build version.Version
Status serf.MemberStatus
}
func NewAutopilot(logger *log.Logger, delegate Delegate, interval, healthInterval time.Duration) *Autopilot {
return &Autopilot{
logger: logger,
delegate: delegate,
interval: interval,
healthInterval: healthInterval,
removeDeadCh: make(chan struct{}),
}
}
func (a *Autopilot) Start() {
a.shutdownCh = make(chan struct{})
a.waitGroup = sync.WaitGroup{}
a.waitGroup.Add(1)
go a.run()
}
func (a *Autopilot) Stop() {
close(a.shutdownCh)
a.waitGroup.Wait()
}
// run periodically looks for nonvoting servers to promote and dead servers to remove.
func (a *Autopilot) run() {
defer a.waitGroup.Done()
// Monitor server health until shutdown
ticker := time.NewTicker(a.interval)
defer ticker.Stop()
for {
select {
case <-a.shutdownCh:
return
case <-ticker.C:
if err := a.promoteServers(); err != nil {
a.logger.Printf("[ERR] autopilot: Error promoting servers: %v", err)
}
if err := a.pruneDeadServers(); err != nil {
a.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
}
case <-a.removeDeadCh:
if err := a.pruneDeadServers(); err != nil {
a.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
}
}
}
}
// promoteServers asks the delegate for any promotions and carries them out.
func (a *Autopilot) promoteServers() error {
conf := a.delegate.AutopilotConfig()
if conf == nil {
return nil
}
// Skip the non-voter promotions unless all servers support the new APIs
minRaftProtocol, err := a.MinRaftProtocol()
if err != nil {
return fmt.Errorf("error getting server raft protocol versions: %s", err)
}
if minRaftProtocol >= 3 {
promotions, err := a.delegate.PromoteNonVoters(conf, a.GetClusterHealth())
if err != nil {
return fmt.Errorf("error checking for non-voters to promote: %s", err)
}
if err := a.handlePromotions(promotions); err != nil {
return fmt.Errorf("error handling promotions: %s", err)
}
}
return nil
}
// fmtServer prints info about a server in a standard way for logging.
func fmtServer(server raft.Server) string {
return fmt.Sprintf("Server (ID: %q Address: %q)", server.ID, server.Address)
}
// NumPeers counts the number of voting peers in the given raft config.
func NumPeers(raftConfig raft.Configuration) int {
var numPeers int
for _, server := range raftConfig.Servers {
if server.Suffrage == raft.Voter {
numPeers++
}
}
return numPeers
}
// pruneDeadServers removes up to numPeers/2 failed servers
func (a *Autopilot) pruneDeadServers() error {
conf := a.delegate.AutopilotConfig()
if conf == nil || !conf.CleanupDeadServers {
return nil
}
// Failed servers are known to Serf and marked failed, and stale servers
// are known to Raft but not Serf.
var failed []string
staleRaftServers := make(map[string]raft.Server)
raftNode := a.delegate.Raft()
future := raftNode.GetConfiguration()
if err := future.Error(); err != nil {
return err
}
raftConfig := future.Configuration()
for _, server := range raftConfig.Servers {
staleRaftServers[string(server.Address)] = server
}
serfLAN := a.delegate.Serf()
for _, member := range serfLAN.Members() {
server, err := a.delegate.IsServer(member)
if err != nil {
a.logger.Printf("[INFO] autopilot: Error parsing server info for %q: %s", member.Name, err)
continue
}
if server != nil {
// todo(kyhavlov): change this to index by UUID
if _, ok := staleRaftServers[server.Addr.String()]; ok {
delete(staleRaftServers, server.Addr.String())
}
if member.Status == serf.StatusFailed {
failed = append(failed, member.Name)
}
}
}
// We can bail early if there's nothing to do.
removalCount := len(failed) + len(staleRaftServers)
if removalCount == 0 {
return nil
}
// Only do removals if a minority of servers will be affected.
peers := NumPeers(raftConfig)
if removalCount < peers/2 {
for _, node := range failed {
a.logger.Printf("[INFO] autopilot: Attempting removal of failed server node %q", node)
go serfLAN.RemoveFailedNode(node)
}
minRaftProtocol, err := a.MinRaftProtocol()
if err != nil {
return err
}
for _, raftServer := range staleRaftServers {
a.logger.Printf("[INFO] autopilot: Attempting removal of stale %s", fmtServer(raftServer))
var future raft.Future
if minRaftProtocol >= 2 {
future = raftNode.RemoveServer(raftServer.ID, 0, 0)
} else {
future = raftNode.RemovePeer(raftServer.Address)
}
if err := future.Error(); err != nil {
return err
}
}
} else {
a.logger.Printf("[DEBUG] autopilot: Failed to remove dead servers: too many dead servers: %d/%d", removalCount, peers)
}
return nil
}
// MinRaftProtocol returns the lowest supported Raft protocol among alive servers
func (a *Autopilot) MinRaftProtocol() (int, error) {
minVersion := -1
members := a.delegate.Serf().Members()
for _, m := range members {
if m.Status != serf.StatusAlive {
continue
}
server, err := a.delegate.IsServer(m)
if err != nil {
return -1, err
}
if server == nil {
continue
}
vsn, ok := m.Tags["raft_vsn"]
if !ok {
vsn = "1"
}
raftVsn, err := strconv.Atoi(vsn)
if err != nil {
return -1, err
}
if minVersion == -1 || raftVsn < minVersion {
minVersion = raftVsn
}
}
if minVersion == -1 {
return minVersion, fmt.Errorf("No servers found")
}
return minVersion, nil
}
// handlePromotions is a helper shared with Consul Enterprise that attempts to
// apply desired server promotions to the Raft configuration.
func (a *Autopilot) handlePromotions(promotions []raft.Server) error {
// This used to wait to only promote to maintain an odd quorum of
// servers, but this was at odds with the dead server cleanup when doing
// rolling updates (add one new server, wait, and then kill an old
// server). The dead server cleanup would still count the old server as
// a peer, which is conservative and the right thing to do, and this
// would wait to promote, so you could get into a stalemate. It is safer
// to promote early than remove early, so by promoting as soon as
// possible we have chosen that as the solution here.
for _, server := range promotions {
a.logger.Printf("[INFO] autopilot: Promoting %s to voter", fmtServer(server))
addFuture := a.delegate.Raft().AddVoter(server.ID, server.Address, 0, 0)
if err := addFuture.Error(); err != nil {
return fmt.Errorf("failed to add raft peer: %v", err)
}
}
// If we promoted a server, trigger a check to remove dead servers.
if len(promotions) > 0 {
select {
case a.removeDeadCh <- struct{}{}:
default:
}
}
return nil
}
// ServerHealthLoop monitors the health of the servers in the cluster
func (a *Autopilot) ServerHealthLoop(shutdownCh <-chan struct{}) {
// Monitor server health until shutdown
ticker := time.NewTicker(a.healthInterval)
defer ticker.Stop()
for {
select {
case <-shutdownCh:
return
case <-ticker.C:
if err := a.updateClusterHealth(); err != nil {
a.logger.Printf("[ERR] autopilot: Error updating cluster health: %s", err)
}
}
}
}
// updateClusterHealth fetches the Raft stats of the other servers and updates
// s.clusterHealth based on the configured Autopilot thresholds
func (a *Autopilot) updateClusterHealth() error {
// Don't do anything if the min Raft version is too low
minRaftProtocol, err := a.MinRaftProtocol()
if err != nil {
return fmt.Errorf("error getting server raft protocol versions: %s", err)
}
if minRaftProtocol < 3 {
return nil
}
autopilotConf := a.delegate.AutopilotConfig()
// Bail early if autopilot config hasn't been initialized yet
if autopilotConf == nil {
return nil
}
// Get the the serf members which are Consul servers
var serverMembers []serf.Member
serverMap := make(map[string]*ServerInfo)
for _, member := range a.delegate.Serf().Members() {
if member.Status == serf.StatusLeft {
continue
}
server, err := a.delegate.IsServer(member)
if err != nil {
a.logger.Printf("[INFO] autopilot: Error parsing server info for %q: %s", member.Name, err)
continue
}
if server != nil {
serverMap[server.ID] = server
serverMembers = append(serverMembers, member)
}
}
raftNode := a.delegate.Raft()
future := raftNode.GetConfiguration()
if err := future.Error(); err != nil {
return fmt.Errorf("error getting Raft configuration %s", err)
}
servers := future.Configuration().Servers
// Fetch the health for each of the servers in parallel so we get as
// consistent of a sample as possible. We capture the leader's index
// here as well so it roughly lines up with the same point in time.
targetLastIndex := raftNode.LastIndex()
var fetchList []*ServerInfo
for _, server := range servers {
if parts, ok := serverMap[string(server.ID)]; ok {
fetchList = append(fetchList, parts)
}
}
d := time.Now().Add(a.healthInterval / 2)
ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
fetchedStats := a.delegate.FetchStats(ctx, serverMembers)
// Build a current list of server healths
leader := raftNode.Leader()
var clusterHealth OperatorHealthReply
voterCount := 0
healthyCount := 0
healthyVoterCount := 0
for _, server := range servers {
health := ServerHealth{
ID: string(server.ID),
Address: string(server.Address),
Leader: server.Address == leader,
LastContact: -1,
Voter: server.Suffrage == raft.Voter,
}
parts, ok := serverMap[string(server.ID)]
if ok {
health.Name = parts.Name
health.SerfStatus = parts.Status
health.Version = parts.Build.String()
if stats, ok := fetchedStats[string(server.ID)]; ok {
if err := a.updateServerHealth(&health, parts, stats, autopilotConf, targetLastIndex); err != nil {
a.logger.Printf("[WARN] autopilot: Error updating server %s health: %s", fmtServer(server), err)
}
}
} else {
health.SerfStatus = serf.StatusNone
}
if health.Voter {
voterCount++
}
if health.Healthy {
healthyCount++
if health.Voter {
healthyVoterCount++
}
}
clusterHealth.Servers = append(clusterHealth.Servers, health)
}
clusterHealth.Healthy = healthyCount == len(servers)
// If we have extra healthy voters, update FailureTolerance
requiredQuorum := voterCount/2 + 1
if healthyVoterCount > requiredQuorum {
clusterHealth.FailureTolerance = healthyVoterCount - requiredQuorum
}
a.delegate.NotifyHealth(clusterHealth)
a.clusterHealthLock.Lock()
a.clusterHealth = clusterHealth
a.clusterHealthLock.Unlock()
return nil
}
// updateServerHealth computes the resulting health of the server based on its
// fetched stats and the state of the leader.
func (a *Autopilot) updateServerHealth(health *ServerHealth,
server *ServerInfo, stats *ServerStats,
autopilotConf *Config, targetLastIndex uint64) error {
health.LastTerm = stats.LastTerm
health.LastIndex = stats.LastIndex
if stats.LastContact != "never" {
var err error
health.LastContact, err = time.ParseDuration(stats.LastContact)
if err != nil {
return fmt.Errorf("error parsing last_contact duration: %s", err)
}
}
raftNode := a.delegate.Raft()
lastTerm, err := strconv.ParseUint(raftNode.Stats()["last_log_term"], 10, 64)
if err != nil {
return fmt.Errorf("error parsing last_log_term: %s", err)
}
health.Healthy = health.IsHealthy(lastTerm, targetLastIndex, autopilotConf)
// If this is a new server or the health changed, reset StableSince
lastHealth := a.GetServerHealth(server.ID)
if lastHealth == nil || lastHealth.Healthy != health.Healthy {
health.StableSince = time.Now()
} else {
health.StableSince = lastHealth.StableSince
}
return nil
}
func (a *Autopilot) GetClusterHealth() OperatorHealthReply {
a.clusterHealthLock.RLock()
defer a.clusterHealthLock.RUnlock()
return a.clusterHealth
}
func (a *Autopilot) GetServerHealth(id string) *ServerHealth {
a.clusterHealthLock.RLock()
defer a.clusterHealthLock.RUnlock()
return a.clusterHealth.ServerHealth(id)
}
func IsPotentialVoter(suffrage raft.ServerSuffrage) bool {
switch suffrage {
case raft.Voter, raft.Staging:
return true
default:
return false
}
}

View File

@ -0,0 +1,26 @@
package autopilot
import (
"time"
"github.com/hashicorp/raft"
)
// PromoteStableServers is a basic autopilot promotion policy that promotes any
// server which has been healthy and stable for the duration specified in the
// given Autopilot config.
func PromoteStableServers(autopilotConfig *Config, health OperatorHealthReply, servers []raft.Server) []raft.Server {
// Find any non-voters eligible for promotion.
now := time.Now()
var promotions []raft.Server
for _, server := range servers {
if !IsPotentialVoter(server.Suffrage) {
health := health.ServerHealth(string(server.ID))
if health.IsStable(now, autopilotConfig) {
promotions = append(promotions, server)
}
}
}
return promotions
}

View File

@ -0,0 +1,102 @@
package autopilot
import (
"testing"
"time"
"github.com/hashicorp/raft"
"github.com/pascaldekloe/goe/verify"
)
func TestPromotion(t *testing.T) {
config := &Config{
LastContactThreshold: 5 * time.Second,
MaxTrailingLogs: 100,
ServerStabilizationTime: 3 * time.Second,
}
cases := []struct {
name string
conf *Config
health OperatorHealthReply
servers []raft.Server
promotions []raft.Server
}{
{
name: "one stable voter, no promotions",
conf: config,
health: OperatorHealthReply{
Servers: []ServerHealth{
{
ID: "a",
Healthy: true,
StableSince: time.Now().Add(-10 * time.Second),
},
},
},
servers: []raft.Server{
{ID: "a", Suffrage: raft.Voter},
},
promotions: []raft.Server{},
},
{
name: "one stable nonvoter, should be promoted",
conf: config,
health: OperatorHealthReply{
Servers: []ServerHealth{
{
ID: "a",
Healthy: true,
StableSince: time.Now().Add(-10 * time.Second),
},
{
ID: "b",
Healthy: true,
StableSince: time.Now().Add(-10 * time.Second),
},
},
},
servers: []raft.Server{
{ID: "a", Suffrage: raft.Voter},
{ID: "b", Suffrage: raft.Nonvoter},
},
promotions: []raft.Server{
{ID: "b", Suffrage: raft.Nonvoter},
},
},
{
name: "unstable servers, neither should be promoted",
conf: config,
health: OperatorHealthReply{
Servers: []ServerHealth{
{
ID: "a",
Healthy: true,
StableSince: time.Now().Add(-10 * time.Second),
},
{
ID: "b",
Healthy: false,
StableSince: time.Now().Add(-10 * time.Second),
},
{
ID: "c",
Healthy: true,
StableSince: time.Now().Add(-1 * time.Second),
},
},
},
servers: []raft.Server{
{ID: "a", Suffrage: raft.Voter},
{ID: "b", Suffrage: raft.Nonvoter},
{ID: "c", Suffrage: raft.Nonvoter},
},
promotions: []raft.Server{},
},
}
for _, tc := range cases {
promotions := PromoteStableServers(tc.conf, tc.health, tc.servers)
verify.Values(t, tc.name, tc.promotions, promotions)
}
}

View File

@ -0,0 +1,158 @@
package autopilot
import (
"time"
"github.com/hashicorp/serf/serf"
)
// Config holds the Autopilot configuration for a cluster.
type Config struct {
// CleanupDeadServers controls whether to remove dead servers when a new
// server is added to the Raft peers.
CleanupDeadServers bool
// LastContactThreshold is the limit on the amount of time a server can go
// without leader contact before being considered unhealthy.
LastContactThreshold time.Duration
// MaxTrailingLogs is the amount of entries in the Raft Log that a server can
// be behind before being considered unhealthy.
MaxTrailingLogs uint64
// ServerStabilizationTime is the minimum amount of time a server must be
// in a stable, healthy state before it can be added to the cluster. Only
// applicable with Raft protocol version 3 or higher.
ServerStabilizationTime time.Duration
// (Enterprise-only) RedundancyZoneTag is the node tag to use for separating
// servers into zones for redundancy. If left blank, this feature will be disabled.
RedundancyZoneTag string
// (Enterprise-only) DisableUpgradeMigration will disable Autopilot's upgrade migration
// strategy of waiting until enough newer-versioned servers have been added to the
// cluster before promoting them to voters.
DisableUpgradeMigration bool
// (Enterprise-only) UpgradeVersionTag is the node tag to use for version info when
// performing upgrade migrations. If left blank, the Consul version will be used.
UpgradeVersionTag string
// CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
CreateIndex uint64
ModifyIndex uint64
}
// ServerHealth is the health (from the leader's point of view) of a server.
type ServerHealth struct {
// ID is the raft ID of the server.
ID string
// Name is the node name of the server.
Name string
// Address is the address of the server.
Address string
// The status of the SerfHealth check for the server.
SerfStatus serf.MemberStatus
// Version is the version of the server.
Version string
// Leader is whether this server is currently the leader.
Leader bool
// LastContact is the time since this node's last contact with the leader.
LastContact time.Duration
// LastTerm is the highest leader term this server has a record of in its Raft log.
LastTerm uint64
// LastIndex is the last log index this server has a record of in its Raft log.
LastIndex uint64
// Healthy is whether or not the server is healthy according to the current
// Autopilot config.
Healthy bool
// Voter is whether this is a voting server.
Voter bool
// StableSince is the last time this server's Healthy value changed.
StableSince time.Time
}
// IsHealthy determines whether this ServerHealth is considered healthy
// based on the given Autopilot config
func (h *ServerHealth) IsHealthy(lastTerm uint64, leaderLastIndex uint64, autopilotConf *Config) bool {
if h.SerfStatus != serf.StatusAlive {
return false
}
if h.LastContact > autopilotConf.LastContactThreshold || h.LastContact < 0 {
return false
}
if h.LastTerm != lastTerm {
return false
}
if leaderLastIndex > autopilotConf.MaxTrailingLogs && h.LastIndex < leaderLastIndex-autopilotConf.MaxTrailingLogs {
return false
}
return true
}
// IsStable returns true if the ServerHealth shows a stable, passing state
// according to the given AutopilotConfig
func (h *ServerHealth) IsStable(now time.Time, conf *Config) bool {
if h == nil {
return false
}
if !h.Healthy {
return false
}
if now.Sub(h.StableSince) < conf.ServerStabilizationTime {
return false
}
return true
}
// ServerStats holds miscellaneous Raft metrics for a server
type ServerStats struct {
// LastContact is the time since this node's last contact with the leader.
LastContact string
// LastTerm is the highest leader term this server has a record of in its Raft log.
LastTerm uint64
// LastIndex is the last log index this server has a record of in its Raft log.
LastIndex uint64
}
// OperatorHealthReply is a representation of the overall health of the cluster
type OperatorHealthReply struct {
// Healthy is true if all the servers in the cluster are healthy.
Healthy bool
// FailureTolerance is the number of healthy servers that could be lost without
// an outage occurring.
FailureTolerance int
// Servers holds the health of each server.
Servers []ServerHealth
}
func (o *OperatorHealthReply) ServerHealth(id string) *ServerHealth {
for _, health := range o.Servers {
if health.ID == id {
return &health
}
}
return nil
}

View File

@ -1,4 +1,4 @@
package structs
package autopilot
import (
"testing"
@ -12,7 +12,7 @@ func TestServerHealth_IsHealthy(t *testing.T) {
health ServerHealth
lastTerm uint64
lastIndex uint64
conf AutopilotConfig
conf Config
expected bool
}{
// Healthy server, all values within allowed limits
@ -20,7 +20,7 @@ func TestServerHealth_IsHealthy(t *testing.T) {
health: ServerHealth{SerfStatus: serf.StatusAlive, LastTerm: 1, LastIndex: 0},
lastTerm: 1,
lastIndex: 10,
conf: AutopilotConfig{MaxTrailingLogs: 20},
conf: Config{MaxTrailingLogs: 20},
expected: true,
},
// Serf status failed
@ -38,7 +38,7 @@ func TestServerHealth_IsHealthy(t *testing.T) {
{
health: ServerHealth{SerfStatus: serf.StatusAlive, LastIndex: 0},
lastIndex: 10,
conf: AutopilotConfig{MaxTrailingLogs: 5},
conf: Config{MaxTrailingLogs: 5},
expected: false,
},
}
@ -56,14 +56,14 @@ func TestServerHealth_IsStable(t *testing.T) {
cases := []struct {
health *ServerHealth
now time.Time
conf AutopilotConfig
conf Config
expected bool
}{
// Healthy server, all values within allowed limits
{
health: &ServerHealth{Healthy: true, StableSince: start},
now: start.Add(15 * time.Second),
conf: AutopilotConfig{ServerStabilizationTime: 10 * time.Second},
conf: Config{ServerStabilizationTime: 10 * time.Second},
expected: true,
},
// Unhealthy server
@ -75,7 +75,7 @@ func TestServerHealth_IsStable(t *testing.T) {
{
health: &ServerHealth{Healthy: true, StableSince: start},
now: start.Add(5 * time.Second),
conf: AutopilotConfig{ServerStabilizationTime: 10 * time.Second},
conf: Config{ServerStabilizationTime: 10 * time.Second},
expected: false,
},
// Nil struct

View File

@ -301,7 +301,7 @@ func TestAutopilot_PromoteNonVoter(t *testing.T) {
if servers[1].Suffrage != raft.Nonvoter {
r.Fatalf("bad: %v", servers)
}
health := s1.getServerHealth(string(servers[1].ID))
health := s1.autopilot.GetServerHealth(string(servers[1].ID))
if health == nil {
r.Fatal("nil health")
}

View File

@ -7,7 +7,7 @@ import (
"os"
"time"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
@ -336,7 +336,7 @@ type Config struct {
// AutopilotConfig is used to apply the initial autopilot config when
// bootstrapping.
AutopilotConfig *structs.AutopilotConfig
AutopilotConfig *autopilot.Config
// ServerHealthInterval is the frequency with which the health of the
// servers in the cluster will be updated.
@ -416,9 +416,9 @@ func DefaultConfig() *Config {
TLSMinVersion: "tls10",
// TODO (slackpad) - Until #3744 is done, we need to keep these
// TODO (slackpad) - Until #3744 is done, we need to keep these
// in sync with agent/config/default.go.
AutopilotConfig: &structs.AutopilotConfig{
AutopilotConfig: &autopilot.Config{
CleanupDeadServers: true,
LastContactThreshold: 200 * time.Millisecond,
MaxTrailingLogs: 250,

View File

@ -8,6 +8,7 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/types"
@ -1096,7 +1097,7 @@ func TestFSM_Autopilot(t *testing.T) {
// Set the autopilot config using a request.
req := structs.AutopilotSetConfigRequest{
Datacenter: "dc1",
Config: structs.AutopilotConfig{
Config: autopilot.Config{
CleanupDeadServers: true,
LastContactThreshold: 10 * time.Second,
MaxTrailingLogs: 300,

View File

@ -1,6 +1,7 @@
package fsm
import (
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-msgpack/codec"
@ -354,7 +355,7 @@ func restorePreparedQuery(header *snapshotHeader, restore *state.Restore, decode
}
func restoreAutopilot(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
var req structs.AutopilotConfig
var req autopilot.Config
if err := decoder.Decode(&req); err != nil {
return err
}

View File

@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
@ -88,7 +89,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
t.Fatalf("err: %s", err)
}
autopilotConf := &structs.AutopilotConfig{
autopilotConf := &autopilot.Config{
CleanupDeadServers: true,
LastContactThreshold: 100 * time.Millisecond,
MaxTrailingLogs: 222,

View File

@ -9,6 +9,7 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
@ -23,6 +24,8 @@ const (
barrierWriteTimeout = 2 * time.Minute
)
var minAutopilotVersion = version.Must(version.NewVersion("0.8.0"))
// monitorLeadership is used to monitor if we acquire or lose our role
// as the leader in the Raft cluster. There is some work the leader is
// expected to do, so we must react to changes
@ -202,7 +205,7 @@ func (s *Server) establishLeadership() error {
}
s.getOrCreateAutopilotConfig()
s.startAutopilot()
s.autopilot.Start()
s.setConsistentReadReady()
return nil
}
@ -220,7 +223,7 @@ func (s *Server) revokeLeadership() error {
}
s.resetConsistentReadReady()
s.stopAutopilot()
s.autopilot.Stop()
return nil
}
@ -326,30 +329,30 @@ func (s *Server) initializeACL() error {
}
// getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary
func (s *Server) getOrCreateAutopilotConfig() (*structs.AutopilotConfig, bool) {
func (s *Server) getOrCreateAutopilotConfig() *autopilot.Config {
state := s.fsm.State()
_, config, err := state.AutopilotConfig()
if err != nil {
s.logger.Printf("[ERR] autopilot: failed to get config: %v", err)
return nil, false
return nil
}
if config != nil {
return config, true
return config
}
if !ServersMeetMinimumVersion(s.LANMembers(), minAutopilotVersion) {
s.logger.Printf("[WARN] autopilot: can't initialize until all servers are >= %s", minAutopilotVersion.String())
return nil, false
return nil
}
config = s.config.AutopilotConfig
req := structs.AutopilotSetConfigRequest{Config: *config}
if _, err = s.raftApply(structs.AutopilotRequestType, req); err != nil {
s.logger.Printf("[ERR] autopilot: failed to initialize config: %v", err)
return nil, false
return nil
}
return config, true
return config
}
// reconcileReaped is used to reconcile nodes that have failed and been reaped
@ -681,7 +684,7 @@ func (s *Server) joinConsulServer(m serf.Member, parts *metadata.Server) error {
// log entries. If the address is the same but the ID changed, remove the
// old server before adding the new one.
addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String()
minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
minRaftProtocol, err := s.autopilot.MinRaftProtocol()
if err != nil {
return err
}
@ -756,7 +759,7 @@ func (s *Server) removeConsulServer(m serf.Member, port int) error {
return err
}
minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
minRaftProtocol, err := s.autopilot.MinRaftProtocol()
if err != nil {
return err
}

View File

@ -812,7 +812,7 @@ func TestLeader_RollRaftServer(t *testing.T) {
for _, s := range []*Server{s1, s3} {
retry.Run(t, func(r *retry.R) {
minVer, err := ServerMinRaftProtocol(s.LANMembers())
minVer, err := s.autopilot.MinRaftProtocol()
if err != nil {
r.Fatal(err)
}

View File

@ -4,11 +4,12 @@ import (
"fmt"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/structs"
)
// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration.
func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *structs.AutopilotConfig) error {
func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *autopilot.Config) error {
if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done {
return err
}
@ -69,7 +70,7 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe
}
// ServerHealth is used to get the current health of the servers.
func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs.OperatorHealthReply) error {
func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *autopilot.OperatorHealthReply) error {
// This must be sent to the leader, so we fix the args since we are
// re-using a structure where we don't support all the options.
args.RequireConsistent = true
@ -88,7 +89,7 @@ func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs
}
// Exit early if the min Raft version is too low
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.LANMembers())
minRaftProtocol, err := op.srv.autopilot.MinRaftProtocol()
if err != nil {
return fmt.Errorf("error getting server raft protocol versions: %s", err)
}
@ -96,7 +97,7 @@ func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs
return fmt.Errorf("all servers must have raft_protocol set to 3 or higher to use this endpoint")
}
*reply = op.srv.getClusterHealth()
*reply = op.srv.autopilot.GetClusterHealth()
return nil
}

View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/testutil/retry"
@ -29,7 +30,7 @@ func TestOperator_Autopilot_GetConfiguration(t *testing.T) {
arg := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var reply structs.AutopilotConfig
var reply autopilot.Config
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
if err != nil {
t.Fatalf("err: %v", err)
@ -58,7 +59,7 @@ func TestOperator_Autopilot_GetConfiguration_ACLDeny(t *testing.T) {
arg := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var reply structs.AutopilotConfig
var reply autopilot.Config
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
if !acl.IsErrPermissionDenied(err) {
t.Fatalf("err: %v", err)
@ -112,7 +113,7 @@ func TestOperator_Autopilot_SetConfiguration(t *testing.T) {
// Change the autopilot config from the default
arg := structs.AutopilotSetConfigRequest{
Datacenter: "dc1",
Config: structs.AutopilotConfig{
Config: autopilot.Config{
CleanupDeadServers: true,
},
}
@ -151,7 +152,7 @@ func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) {
// Try to set config without permissions
arg := structs.AutopilotSetConfigRequest{
Datacenter: "dc1",
Config: structs.AutopilotConfig{
Config: autopilot.Config{
CleanupDeadServers: true,
},
}
@ -232,7 +233,7 @@ func TestOperator_ServerHealth(t *testing.T) {
arg := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var reply structs.OperatorHealthReply
var reply autopilot.OperatorHealthReply
err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply)
if err != nil {
r.Fatalf("err: %v", err)
@ -274,7 +275,7 @@ func TestOperator_ServerHealth_UnsupportedRaftVersion(t *testing.T) {
arg := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var reply structs.OperatorHealthReply
var reply autopilot.OperatorHealthReply
err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply)
if err == nil || !strings.Contains(err.Error(), "raft_protocol set to 3 or higher") {
t.Fatalf("bad: %v", err)

View File

@ -115,7 +115,7 @@ REMOVE:
// doing if you are calling this. If you remove a peer that's known to
// Serf, for example, it will come back when the leader does a reconcile
// pass.
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members())
minRaftProtocol, err := op.srv.autopilot.MinRaftProtocol()
if err != nil {
return err
}
@ -182,7 +182,7 @@ REMOVE:
// doing if you are calling this. If you remove a peer that's known to
// Serf, for example, it will come back when the leader does a reconcile
// pass.
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members())
minRaftProtocol, err := op.srv.autopilot.MinRaftProtocol()
if err != nil {
return err
}

View File

@ -18,6 +18,7 @@ import (
"time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/metadata"
@ -86,8 +87,8 @@ type Server struct {
// aclCache is the non-authoritative ACL cache.
aclCache *aclCache
// autopilotPolicy controls the behavior of Autopilot for certain tasks.
autopilotPolicy AutopilotPolicy
// autopilot is the Autopilot instance for this server.
autopilot *autopilot.Autopilot
// autopilotRemoveDeadCh is used to trigger a check for dead server removals.
autopilotRemoveDeadCh chan struct{}
@ -98,10 +99,6 @@ type Server struct {
// autopilotWaitGroup is used to block until Autopilot shuts down.
autopilotWaitGroup sync.WaitGroup
// clusterHealth stores the current view of the cluster's health.
clusterHealth structs.OperatorHealthReply
clusterHealthLock sync.RWMutex
// Consul configuration
config *Config
@ -305,8 +302,9 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
shutdownCh: shutdownCh,
}
// Set up the autopilot policy
s.autopilotPolicy = &BasicAutopilot{server: s}
// Set up autopilot
apDelegate := &AutopilotDelegate{s}
s.autopilot = autopilot.NewAutopilot(logger, apDelegate, config.AutopilotInterval, config.ServerHealthInterval)
// Initialize the stats fetcher that autopilot will use.
s.statsFetcher = NewStatsFetcher(logger, s.connPool, s.config.Datacenter)
@ -430,7 +428,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
go s.sessionStats()
// Start the server health checking.
go s.serverHealthLoop()
go s.autopilot.ServerHealthLoop(s.shutdownCh)
return s, nil
}
@ -732,7 +730,7 @@ func (s *Server) Leave() error {
// removed for some sane period of time.
isLeader := s.IsLeader()
if isLeader && numPeers > 1 {
minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
minRaftProtocol, err := s.autopilot.MinRaftProtocol()
if err != nil {
return err
}
@ -831,13 +829,7 @@ func (s *Server) numPeers() (int, error) {
return 0, err
}
var numPeers int
for _, server := range future.Configuration().Servers {
if server.Suffrage == raft.Voter {
numPeers++
}
}
return numPeers, nil
return autopilot.NumPeers(future.Configuration()), nil
}
// JoinLAN is used to have Consul join the inner-DC pool

View File

@ -304,7 +304,7 @@ func (s *Server) maybeBootstrap() {
// Attempt a live bootstrap!
var configuration raft.Configuration
var addrs []string
minRaftVersion, err := ServerMinRaftProtocol(members)
minRaftVersion, err := s.autopilot.MinRaftProtocol()
if err != nil {
s.logger.Printf("[ERR] consul: Failed to read server raft versions: %v", err)
}

View File

@ -3,7 +3,7 @@ package state
import (
"fmt"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/go-memdb"
)
@ -30,13 +30,13 @@ func init() {
}
// Autopilot is used to pull the autopilot config from the snapshot.
func (s *Snapshot) Autopilot() (*structs.AutopilotConfig, error) {
func (s *Snapshot) Autopilot() (*autopilot.Config, error) {
c, err := s.tx.First("autopilot-config", "id")
if err != nil {
return nil, err
}
config, ok := c.(*structs.AutopilotConfig)
config, ok := c.(*autopilot.Config)
if !ok {
return nil, nil
}
@ -45,7 +45,7 @@ func (s *Snapshot) Autopilot() (*structs.AutopilotConfig, error) {
}
// Autopilot is used when restoring from a snapshot.
func (s *Restore) Autopilot(config *structs.AutopilotConfig) error {
func (s *Restore) Autopilot(config *autopilot.Config) error {
if err := s.tx.Insert("autopilot-config", config); err != nil {
return fmt.Errorf("failed restoring autopilot config: %s", err)
}
@ -54,7 +54,7 @@ func (s *Restore) Autopilot(config *structs.AutopilotConfig) error {
}
// AutopilotConfig is used to get the current Autopilot configuration.
func (s *Store) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) {
func (s *Store) AutopilotConfig() (uint64, *autopilot.Config, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -64,7 +64,7 @@ func (s *Store) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) {
return 0, nil, fmt.Errorf("failed autopilot config lookup: %s", err)
}
config, ok := c.(*structs.AutopilotConfig)
config, ok := c.(*autopilot.Config)
if !ok {
return 0, nil, nil
}
@ -73,7 +73,7 @@ func (s *Store) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) {
}
// AutopilotSetConfig is used to set the current Autopilot configuration.
func (s *Store) AutopilotSetConfig(idx uint64, config *structs.AutopilotConfig) error {
func (s *Store) AutopilotSetConfig(idx uint64, config *autopilot.Config) error {
tx := s.db.Txn(true)
defer tx.Abort()
@ -86,7 +86,7 @@ func (s *Store) AutopilotSetConfig(idx uint64, config *structs.AutopilotConfig)
// AutopilotCASConfig is used to try updating the Autopilot configuration with a
// given Raft index. If the CAS index specified is not equal to the last observed index
// for the config, then the call is a noop,
func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotConfig) (bool, error) {
func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *autopilot.Config) (bool, error) {
tx := s.db.Txn(true)
defer tx.Abort()
@ -99,7 +99,7 @@ func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotCo
// If the existing index does not match the provided CAS
// index arg, then we shouldn't update anything and can safely
// return early here.
e, ok := existing.(*structs.AutopilotConfig)
e, ok := existing.(*autopilot.Config)
if !ok || e.ModifyIndex != cidx {
return false, nil
}
@ -110,7 +110,7 @@ func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotCo
return true, nil
}
func (s *Store) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.AutopilotConfig) error {
func (s *Store) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *autopilot.Config) error {
// Check for an existing config
existing, err := tx.First("autopilot-config", "id")
if err != nil {
@ -119,7 +119,7 @@ func (s *Store) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs
// Set the indexes.
if existing != nil {
config.CreateIndex = existing.(*structs.AutopilotConfig).CreateIndex
config.CreateIndex = existing.(*autopilot.Config).CreateIndex
} else {
config.CreateIndex = idx
}

View File

@ -5,14 +5,14 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/pascaldekloe/goe/verify"
)
func TestStateStore_Autopilot(t *testing.T) {
s := testStateStore(t)
expected := &structs.AutopilotConfig{
expected := &autopilot.Config{
CleanupDeadServers: true,
LastContactThreshold: 5 * time.Second,
MaxTrailingLogs: 500,
@ -41,7 +41,7 @@ func TestStateStore_Autopilot(t *testing.T) {
func TestStateStore_AutopilotCAS(t *testing.T) {
s := testStateStore(t)
expected := &structs.AutopilotConfig{
expected := &autopilot.Config{
CleanupDeadServers: true,
}
@ -53,7 +53,7 @@ func TestStateStore_AutopilotCAS(t *testing.T) {
}
// Do a CAS with an index lower than the entry
ok, err := s.AutopilotCASConfig(2, 0, &structs.AutopilotConfig{
ok, err := s.AutopilotCASConfig(2, 0, &autopilot.Config{
CleanupDeadServers: false,
})
if ok || err != nil {
@ -74,7 +74,7 @@ func TestStateStore_AutopilotCAS(t *testing.T) {
}
// Do another CAS, this time with the correct index
ok, err = s.AutopilotCASConfig(2, 1, &structs.AutopilotConfig{
ok, err = s.AutopilotCASConfig(2, 1, &autopilot.Config{
CleanupDeadServers: false,
})
if !ok || err != nil {
@ -96,7 +96,7 @@ func TestStateStore_AutopilotCAS(t *testing.T) {
func TestStateStore_Autopilot_Snapshot_Restore(t *testing.T) {
s := testStateStore(t)
before := &structs.AutopilotConfig{
before := &autopilot.Config{
CleanupDeadServers: true,
}
if err := s.AutopilotSetConfig(99, before); err != nil {
@ -106,7 +106,7 @@ func TestStateStore_Autopilot_Snapshot_Restore(t *testing.T) {
snap := s.Snapshot()
defer snap.Close()
after := &structs.AutopilotConfig{
after := &autopilot.Config{
CleanupDeadServers: false,
}
if err := s.AutopilotSetConfig(100, after); err != nil {

View File

@ -5,9 +5,10 @@ import (
"log"
"sync"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/serf/serf"
)
// StatsFetcher has two functions for autopilot. First, lets us fetch all the
@ -39,9 +40,9 @@ func NewStatsFetcher(logger *log.Logger, pool *pool.ConnPool, datacenter string)
// cancel this when the context is canceled because we only want one in-flight
// RPC to each server, so we let it finish and then clean up the in-flight
// tracking.
func (f *StatsFetcher) fetch(server *metadata.Server, replyCh chan *structs.ServerStats) {
func (f *StatsFetcher) fetch(server *metadata.Server, replyCh chan *autopilot.ServerStats) {
var args struct{}
var reply structs.ServerStats
var reply autopilot.ServerStats
err := f.pool.RPC(f.datacenter, server.Addr, server.Version, "Status.RaftStats", server.UseTLS, &args, &reply)
if err != nil {
f.logger.Printf("[WARN] consul: error getting server health from %q: %v",
@ -56,14 +57,20 @@ func (f *StatsFetcher) fetch(server *metadata.Server, replyCh chan *structs.Serv
}
// Fetch will attempt to query all the servers in parallel.
func (f *StatsFetcher) Fetch(ctx context.Context, servers []*metadata.Server) map[string]*structs.ServerStats {
func (f *StatsFetcher) Fetch(ctx context.Context, members []serf.Member) map[string]*autopilot.ServerStats {
type workItem struct {
server *metadata.Server
replyCh chan *structs.ServerStats
replyCh chan *autopilot.ServerStats
}
var servers []*metadata.Server
for _, s := range members {
if ok, parts := metadata.IsConsulServer(s); ok {
servers = append(servers, parts)
}
}
var work []*workItem
// Skip any servers that have inflight requests.
var work []*workItem
f.inflightLock.Lock()
for _, server := range servers {
if _, ok := f.inflight[server.ID]; ok {
@ -72,7 +79,7 @@ func (f *StatsFetcher) Fetch(ctx context.Context, servers []*metadata.Server) ma
} else {
workItem := &workItem{
server: server,
replyCh: make(chan *structs.ServerStats, 1),
replyCh: make(chan *autopilot.ServerStats, 1),
}
work = append(work, workItem)
f.inflight[server.ID] = struct{}{}
@ -83,7 +90,7 @@ func (f *StatsFetcher) Fetch(ctx context.Context, servers []*metadata.Server) ma
// Now wait for the results to come in, or for the context to be
// canceled.
replies := make(map[string]*structs.ServerStats)
replies := make(map[string]*autopilot.ServerStats)
for _, workItem := range work {
select {
case reply := <-workItem.replyCh:

View File

@ -47,7 +47,7 @@ func TestStatsFetcher(t *testing.T) {
func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
stats := s1.statsFetcher.Fetch(ctx, servers)
stats := s1.statsFetcher.Fetch(ctx, s1.LANMembers())
if len(stats) != 3 {
t.Fatalf("bad: %#v", stats)
}
@ -73,7 +73,7 @@ func TestStatsFetcher(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
stats := s1.statsFetcher.Fetch(ctx, servers)
stats := s1.statsFetcher.Fetch(ctx, s1.LANMembers())
if len(stats) != 2 {
t.Fatalf("bad: %#v", stats)
}

View File

@ -4,7 +4,7 @@ import (
"fmt"
"strconv"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/consul/autopilot"
)
// Status endpoint is used to check on server status
@ -42,7 +42,7 @@ func (s *Status) Peers(args struct{}, reply *[]string) error {
}
// Used by Autopilot to query the raft stats of the local server.
func (s *Status) RaftStats(args struct{}, reply *structs.ServerStats) error {
func (s *Status) RaftStats(args struct{}, reply *autopilot.ServerStats) error {
stats := s.server.raft.Stats()
var err error

View File

@ -93,35 +93,6 @@ func CanServersUnderstandProtocol(members []serf.Member, version uint8) (bool, e
return (numServers > 0) && (numWhoGrok == numServers), nil
}
// ServerMinRaftProtocol returns the lowest supported Raft protocol among alive servers
func ServerMinRaftProtocol(members []serf.Member) (int, error) {
minVersion := -1
for _, m := range members {
if m.Tags["role"] != "consul" || m.Status != serf.StatusAlive {
continue
}
vsn, ok := m.Tags["raft_vsn"]
if !ok {
vsn = "1"
}
raftVsn, err := strconv.Atoi(vsn)
if err != nil {
return -1, err
}
if minVersion == -1 || raftVsn < minVersion {
minVersion = raftVsn
}
}
if minVersion == -1 {
return minVersion, fmt.Errorf("No servers found")
}
return minVersion, nil
}
// Returns if a member is a consul node. Returns a bool,
// and the datacenter.
func isConsulNode(m serf.Member) (bool, string) {

View File

@ -6,6 +6,7 @@ import (
"strconv"
"time"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
multierror "github.com/hashicorp/go-multierror"
@ -194,7 +195,7 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re
return nil, nil
}
var reply structs.AutopilotConfig
var reply autopilot.Config
if err := s.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
return nil, err
}
@ -226,7 +227,7 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re
return nil, nil
}
args.Config = structs.AutopilotConfig{
args.Config = autopilot.Config{
CleanupDeadServers: conf.CleanupDeadServers,
LastContactThreshold: conf.LastContactThreshold.Duration(),
MaxTrailingLogs: conf.MaxTrailingLogs,
@ -276,7 +277,7 @@ func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Re
return nil, nil
}
var reply structs.OperatorHealthReply
var reply autopilot.OperatorHealthReply
if err := s.agent.RPC("Operator.ServerHealth", &args, &reply); err != nil {
return nil, err
}

View File

@ -8,6 +8,7 @@ import (
"strings"
"testing"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testutil/retry"
@ -329,7 +330,7 @@ func TestOperator_AutopilotSetConfiguration(t *testing.T) {
Datacenter: "dc1",
}
var reply structs.AutopilotConfig
var reply autopilot.Config
if err := a.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
t.Fatalf("err: %v", err)
}
@ -357,7 +358,7 @@ func TestOperator_AutopilotCASConfiguration(t *testing.T) {
Datacenter: "dc1",
}
var reply structs.AutopilotConfig
var reply autopilot.Config
if err := a.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -2,48 +2,11 @@ package structs
import (
"net"
"time"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)
// AutopilotConfig holds the Autopilot configuration for a cluster.
type AutopilotConfig struct {
// CleanupDeadServers controls whether to remove dead servers when a new
// server is added to the Raft peers.
CleanupDeadServers bool
// LastContactThreshold is the limit on the amount of time a server can go
// without leader contact before being considered unhealthy.
LastContactThreshold time.Duration
// MaxTrailingLogs is the amount of entries in the Raft Log that a server can
// be behind before being considered unhealthy.
MaxTrailingLogs uint64
// ServerStabilizationTime is the minimum amount of time a server must be
// in a stable, healthy state before it can be added to the cluster. Only
// applicable with Raft protocol version 3 or higher.
ServerStabilizationTime time.Duration
// (Enterprise-only) RedundancyZoneTag is the node tag to use for separating
// servers into zones for redundancy. If left blank, this feature will be disabled.
RedundancyZoneTag string
// (Enterprise-only) DisableUpgradeMigration will disable Autopilot's upgrade migration
// strategy of waiting until enough newer-versioned servers have been added to the
// cluster before promoting them to voters.
DisableUpgradeMigration bool
// (Enterprise-only) UpgradeVersionTag is the node tag to use for version info when
// performing upgrade migrations. If left blank, the Consul version will be used.
UpgradeVersionTag string
// RaftIndex stores the create/modify indexes of this configuration.
RaftIndex
}
// RaftServer has information about a server in the Raft configuration.
type RaftServer struct {
// ID is the unique ID for the server. These are currently the same
@ -109,7 +72,7 @@ type AutopilotSetConfigRequest struct {
Datacenter string
// Config is the new Autopilot configuration to use.
Config AutopilotConfig
Config autopilot.Config
// CAS controls whether to use check-and-set semantics for this request.
CAS bool
@ -123,111 +86,6 @@ func (op *AutopilotSetConfigRequest) RequestDatacenter() string {
return op.Datacenter
}
// ServerHealth is the health (from the leader's point of view) of a server.
type ServerHealth struct {
// ID is the raft ID of the server.
ID string
// Name is the node name of the server.
Name string
// Address is the address of the server.
Address string
// The status of the SerfHealth check for the server.
SerfStatus serf.MemberStatus
// Version is the Consul version of the server.
Version string
// Leader is whether this server is currently the leader.
Leader bool
// LastContact is the time since this node's last contact with the leader.
LastContact time.Duration
// LastTerm is the highest leader term this server has a record of in its Raft log.
LastTerm uint64
// LastIndex is the last log index this server has a record of in its Raft log.
LastIndex uint64
// Healthy is whether or not the server is healthy according to the current
// Autopilot config.
Healthy bool
// Voter is whether this is a voting server.
Voter bool
// StableSince is the last time this server's Healthy value changed.
StableSince time.Time
}
// IsHealthy determines whether this ServerHealth is considered healthy
// based on the given Autopilot config
func (h *ServerHealth) IsHealthy(lastTerm uint64, leaderLastIndex uint64, autopilotConf *AutopilotConfig) bool {
if h.SerfStatus != serf.StatusAlive {
return false
}
if h.LastContact > autopilotConf.LastContactThreshold || h.LastContact < 0 {
return false
}
if h.LastTerm != lastTerm {
return false
}
if leaderLastIndex > autopilotConf.MaxTrailingLogs && h.LastIndex < leaderLastIndex-autopilotConf.MaxTrailingLogs {
return false
}
return true
}
// IsStable returns true if the ServerHealth is in a stable, passing state
// according to the given AutopilotConfig
func (h *ServerHealth) IsStable(now time.Time, conf *AutopilotConfig) bool {
if h == nil {
return false
}
if !h.Healthy {
return false
}
if now.Sub(h.StableSince) < conf.ServerStabilizationTime {
return false
}
return true
}
// ServerStats holds miscellaneous Raft metrics for a server
type ServerStats struct {
// LastContact is the time since this node's last contact with the leader.
LastContact string
// LastTerm is the highest leader term this server has a record of in its Raft log.
LastTerm uint64
// LastIndex is the last log index this server has a record of in its Raft log.
LastIndex uint64
}
// OperatorHealthReply is a representation of the overall health of the cluster
type OperatorHealthReply struct {
// Healthy is true if all the servers in the cluster are healthy.
Healthy bool
// FailureTolerance is the number of healthy servers that could be lost without
// an outage occurring.
FailureTolerance int
// Servers holds the health of each server.
Servers []ServerHealth
}
// (Enterprise-only) NetworkSegment is the configuration for a network segment, which is an
// isolated serf group on the LAN.
type NetworkSegment struct {

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/structs"
"github.com/mitchellh/cli"
)
@ -44,7 +45,7 @@ func TestOperatorAutopilotSetConfigCommmand(t *testing.T) {
req := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var reply structs.AutopilotConfig
var reply autopilot.Config
if err := a.RPC("Operator.AutopilotGetConfiguration", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}