mirror of https://github.com/status-im/consul.git
Move autopilot to a standalone package
This commit is contained in:
parent
6be97eb8bb
commit
de28555671
|
@ -3,420 +3,43 @@ package consul
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/go-version"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
// AutopilotPolicy is the interface for the Autopilot mechanism
|
||||
type AutopilotPolicy interface {
|
||||
// PromoteNonVoters defines the handling of non-voting servers
|
||||
PromoteNonVoters(*structs.AutopilotConfig) error
|
||||
}
|
||||
|
||||
func (s *Server) startAutopilot() {
|
||||
s.autopilotShutdownCh = make(chan struct{})
|
||||
s.autopilotWaitGroup = sync.WaitGroup{}
|
||||
s.autopilotWaitGroup.Add(1)
|
||||
|
||||
go s.autopilotLoop()
|
||||
}
|
||||
|
||||
func (s *Server) stopAutopilot() {
|
||||
close(s.autopilotShutdownCh)
|
||||
s.autopilotWaitGroup.Wait()
|
||||
}
|
||||
|
||||
var minAutopilotVersion = version.Must(version.NewVersion("0.8.0"))
|
||||
|
||||
// autopilotLoop periodically looks for nonvoting servers to promote and dead servers to remove.
|
||||
func (s *Server) autopilotLoop() {
|
||||
defer s.autopilotWaitGroup.Done()
|
||||
|
||||
// Monitor server health until shutdown
|
||||
ticker := time.NewTicker(s.config.AutopilotInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.autopilotShutdownCh:
|
||||
return
|
||||
case <-ticker.C:
|
||||
autopilotConfig, ok := s.getOrCreateAutopilotConfig()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := s.autopilotPolicy.PromoteNonVoters(autopilotConfig); err != nil {
|
||||
s.logger.Printf("[ERR] autopilot: Error checking for non-voters to promote: %s", err)
|
||||
}
|
||||
|
||||
if err := s.pruneDeadServers(autopilotConfig); err != nil {
|
||||
s.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
|
||||
}
|
||||
case <-s.autopilotRemoveDeadCh:
|
||||
autopilotConfig, ok := s.getOrCreateAutopilotConfig()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := s.pruneDeadServers(autopilotConfig); err != nil {
|
||||
s.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// fmtServer prints info about a server in a standard way for logging.
|
||||
func fmtServer(server raft.Server) string {
|
||||
return fmt.Sprintf("Server (ID: %q Address: %q)", server.ID, server.Address)
|
||||
}
|
||||
|
||||
// pruneDeadServers removes up to numPeers/2 failed servers
|
||||
func (s *Server) pruneDeadServers(autopilotConfig *structs.AutopilotConfig) error {
|
||||
if !autopilotConfig.CleanupDeadServers {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Failed servers are known to Serf and marked failed, and stale servers
|
||||
// are known to Raft but not Serf.
|
||||
var failed []string
|
||||
staleRaftServers := make(map[string]raft.Server)
|
||||
future := s.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, server := range future.Configuration().Servers {
|
||||
staleRaftServers[string(server.Address)] = server
|
||||
}
|
||||
for _, member := range s.serfLAN.Members() {
|
||||
valid, parts := metadata.IsConsulServer(member)
|
||||
if valid {
|
||||
if _, ok := staleRaftServers[parts.Addr.String()]; ok {
|
||||
delete(staleRaftServers, parts.Addr.String())
|
||||
}
|
||||
|
||||
if member.Status == serf.StatusFailed {
|
||||
failed = append(failed, member.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We can bail early if there's nothing to do.
|
||||
removalCount := len(failed) + len(staleRaftServers)
|
||||
if removalCount == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Only do removals if a minority of servers will be affected.
|
||||
peers, err := s.numPeers()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if removalCount < peers/2 {
|
||||
for _, node := range failed {
|
||||
s.logger.Printf("[INFO] autopilot: Attempting removal of failed server node %q", node)
|
||||
go s.serfLAN.RemoveFailedNode(node)
|
||||
}
|
||||
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, raftServer := range staleRaftServers {
|
||||
s.logger.Printf("[INFO] autopilot: Attempting removal of stale %s", fmtServer(raftServer))
|
||||
var future raft.Future
|
||||
if minRaftProtocol >= 2 {
|
||||
future = s.raft.RemoveServer(raftServer.ID, 0, 0)
|
||||
} else {
|
||||
future = s.raft.RemovePeer(raftServer.Address)
|
||||
}
|
||||
if err := future.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
s.logger.Printf("[DEBUG] autopilot: Failed to remove dead servers: too many dead servers: %d/%d", removalCount, peers)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// BasicAutopilot defines a policy for promoting non-voting servers in a way
|
||||
// that maintains an odd-numbered voter count.
|
||||
type BasicAutopilot struct {
|
||||
// AutopilotDelegate is a Consul delegate for autopilot operations.
|
||||
type AutopilotDelegate struct {
|
||||
server *Server
|
||||
}
|
||||
|
||||
// PromoteNonVoters promotes eligible non-voting servers to voters.
|
||||
func (b *BasicAutopilot) PromoteNonVoters(autopilotConfig *structs.AutopilotConfig) error {
|
||||
// If we don't meet the minimum version for non-voter features, bail
|
||||
// early.
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(b.server.LANMembers())
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting server raft protocol versions: %s", err)
|
||||
}
|
||||
if minRaftProtocol < 3 {
|
||||
return nil
|
||||
}
|
||||
func (d *AutopilotDelegate) FetchStats(ctx context.Context, servers []*metadata.Server) map[string]*autopilot.ServerStats {
|
||||
return d.server.statsFetcher.Fetch(ctx, servers)
|
||||
}
|
||||
|
||||
// Find any non-voters eligible for promotion.
|
||||
now := time.Now()
|
||||
var promotions []raft.Server
|
||||
future := b.server.raft.GetConfiguration()
|
||||
func (d *AutopilotDelegate) GetOrCreateAutopilotConfig() (*autopilot.Config, bool) {
|
||||
return d.server.getOrCreateAutopilotConfig()
|
||||
}
|
||||
|
||||
func (d *AutopilotDelegate) Raft() *raft.Raft {
|
||||
return d.server.raft
|
||||
}
|
||||
|
||||
func (d *AutopilotDelegate) Serf() *serf.Serf {
|
||||
return d.server.serfLAN
|
||||
}
|
||||
|
||||
func (d *AutopilotDelegate) NumPeers() (int, error) {
|
||||
return d.server.numPeers()
|
||||
}
|
||||
|
||||
func (d *AutopilotDelegate) PromoteNonVoters(conf *autopilot.Config, health autopilot.OperatorHealthReply) ([]raft.Server, error) {
|
||||
future := d.server.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
return fmt.Errorf("failed to get raft configuration: %v", err)
|
||||
}
|
||||
for _, server := range future.Configuration().Servers {
|
||||
if !isVoter(server.Suffrage) {
|
||||
health := b.server.getServerHealth(string(server.ID))
|
||||
if health.IsStable(now, autopilotConfig) {
|
||||
promotions = append(promotions, server)
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("failed to get raft configuration: %v", err)
|
||||
}
|
||||
|
||||
if err := b.server.handlePromotions(promotions); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// handlePromotions is a helper shared with Consul Enterprise that attempts to
|
||||
// apply desired server promotions to the Raft configuration.
|
||||
func (s *Server) handlePromotions(promotions []raft.Server) error {
|
||||
// This used to wait to only promote to maintain an odd quorum of
|
||||
// servers, but this was at odds with the dead server cleanup when doing
|
||||
// rolling updates (add one new server, wait, and then kill an old
|
||||
// server). The dead server cleanup would still count the old server as
|
||||
// a peer, which is conservative and the right thing to do, and this
|
||||
// would wait to promote, so you could get into a stalemate. It is safer
|
||||
// to promote early than remove early, so by promoting as soon as
|
||||
// possible we have chosen that as the solution here.
|
||||
for _, server := range promotions {
|
||||
s.logger.Printf("[INFO] autopilot: Promoting %s to voter", fmtServer(server))
|
||||
addFuture := s.raft.AddVoter(server.ID, server.Address, 0, 0)
|
||||
if err := addFuture.Error(); err != nil {
|
||||
return fmt.Errorf("failed to add raft peer: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// If we promoted a server, trigger a check to remove dead servers.
|
||||
if len(promotions) > 0 {
|
||||
select {
|
||||
case s.autopilotRemoveDeadCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// serverHealthLoop monitors the health of the servers in the cluster
|
||||
func (s *Server) serverHealthLoop() {
|
||||
// Monitor server health until shutdown
|
||||
ticker := time.NewTicker(s.config.ServerHealthInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.shutdownCh:
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := s.updateClusterHealth(); err != nil {
|
||||
s.logger.Printf("[ERR] autopilot: Error updating cluster health: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// updateClusterHealth fetches the Raft stats of the other servers and updates
|
||||
// s.clusterHealth based on the configured Autopilot thresholds
|
||||
func (s *Server) updateClusterHealth() error {
|
||||
// Don't do anything if the min Raft version is too low
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers())
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting server raft protocol versions: %s", err)
|
||||
}
|
||||
if minRaftProtocol < 3 {
|
||||
return nil
|
||||
}
|
||||
|
||||
state := s.fsm.State()
|
||||
_, autopilotConf, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving autopilot config: %s", err)
|
||||
}
|
||||
// Bail early if autopilot config hasn't been initialized yet
|
||||
if autopilotConf == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get the the serf members which are Consul servers
|
||||
serverMap := make(map[string]*metadata.Server)
|
||||
for _, member := range s.LANMembers() {
|
||||
if member.Status == serf.StatusLeft {
|
||||
continue
|
||||
}
|
||||
|
||||
valid, parts := metadata.IsConsulServer(member)
|
||||
if valid {
|
||||
serverMap[parts.ID] = parts
|
||||
}
|
||||
}
|
||||
|
||||
future := s.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
return fmt.Errorf("error getting Raft configuration %s", err)
|
||||
}
|
||||
servers := future.Configuration().Servers
|
||||
|
||||
// Fetch the health for each of the servers in parallel so we get as
|
||||
// consistent of a sample as possible. We capture the leader's index
|
||||
// here as well so it roughly lines up with the same point in time.
|
||||
targetLastIndex := s.raft.LastIndex()
|
||||
var fetchList []*metadata.Server
|
||||
for _, server := range servers {
|
||||
if parts, ok := serverMap[string(server.ID)]; ok {
|
||||
fetchList = append(fetchList, parts)
|
||||
}
|
||||
}
|
||||
d := time.Now().Add(s.config.ServerHealthInterval / 2)
|
||||
ctx, cancel := context.WithDeadline(context.Background(), d)
|
||||
defer cancel()
|
||||
fetchedStats := s.statsFetcher.Fetch(ctx, fetchList)
|
||||
|
||||
// Build a current list of server healths
|
||||
leader := s.raft.Leader()
|
||||
var clusterHealth structs.OperatorHealthReply
|
||||
voterCount := 0
|
||||
healthyCount := 0
|
||||
healthyVoterCount := 0
|
||||
for _, server := range servers {
|
||||
health := structs.ServerHealth{
|
||||
ID: string(server.ID),
|
||||
Address: string(server.Address),
|
||||
Leader: server.Address == leader,
|
||||
LastContact: -1,
|
||||
Voter: server.Suffrage == raft.Voter,
|
||||
}
|
||||
|
||||
parts, ok := serverMap[string(server.ID)]
|
||||
if ok {
|
||||
health.Name = parts.Name
|
||||
health.SerfStatus = parts.Status
|
||||
health.Version = parts.Build.String()
|
||||
if stats, ok := fetchedStats[string(server.ID)]; ok {
|
||||
if err := s.updateServerHealth(&health, parts, stats, autopilotConf, targetLastIndex); err != nil {
|
||||
s.logger.Printf("[WARN] autopilot: Error updating server %s health: %s", fmtServer(server), err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
health.SerfStatus = serf.StatusNone
|
||||
}
|
||||
|
||||
if health.Voter {
|
||||
voterCount++
|
||||
}
|
||||
if health.Healthy {
|
||||
healthyCount++
|
||||
if health.Voter {
|
||||
healthyVoterCount++
|
||||
}
|
||||
}
|
||||
|
||||
clusterHealth.Servers = append(clusterHealth.Servers, health)
|
||||
}
|
||||
clusterHealth.Healthy = healthyCount == len(servers)
|
||||
|
||||
// If we have extra healthy voters, update FailureTolerance
|
||||
requiredQuorum := voterCount/2 + 1
|
||||
if healthyVoterCount > requiredQuorum {
|
||||
clusterHealth.FailureTolerance = healthyVoterCount - requiredQuorum
|
||||
}
|
||||
|
||||
// Heartbeat a metric for monitoring if we're the leader
|
||||
if s.IsLeader() {
|
||||
metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
|
||||
metrics.SetGauge([]string{"autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
|
||||
if clusterHealth.Healthy {
|
||||
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1)
|
||||
metrics.SetGauge([]string{"autopilot", "healthy"}, 1)
|
||||
} else {
|
||||
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0)
|
||||
metrics.SetGauge([]string{"autopilot", "healthy"}, 0)
|
||||
}
|
||||
}
|
||||
|
||||
s.clusterHealthLock.Lock()
|
||||
s.clusterHealth = clusterHealth
|
||||
s.clusterHealthLock.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateServerHealth computes the resulting health of the server based on its
|
||||
// fetched stats and the state of the leader.
|
||||
func (s *Server) updateServerHealth(health *structs.ServerHealth,
|
||||
server *metadata.Server, stats *structs.ServerStats,
|
||||
autopilotConf *structs.AutopilotConfig, targetLastIndex uint64) error {
|
||||
|
||||
health.LastTerm = stats.LastTerm
|
||||
health.LastIndex = stats.LastIndex
|
||||
|
||||
if stats.LastContact != "never" {
|
||||
var err error
|
||||
health.LastContact, err = time.ParseDuration(stats.LastContact)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error parsing last_contact duration: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
lastTerm, err := strconv.ParseUint(s.raft.Stats()["last_log_term"], 10, 64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error parsing last_log_term: %s", err)
|
||||
}
|
||||
health.Healthy = health.IsHealthy(lastTerm, targetLastIndex, autopilotConf)
|
||||
|
||||
// If this is a new server or the health changed, reset StableSince
|
||||
lastHealth := s.getServerHealth(server.ID)
|
||||
if lastHealth == nil || lastHealth.Healthy != health.Healthy {
|
||||
health.StableSince = time.Now()
|
||||
} else {
|
||||
health.StableSince = lastHealth.StableSince
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) getClusterHealth() structs.OperatorHealthReply {
|
||||
s.clusterHealthLock.RLock()
|
||||
defer s.clusterHealthLock.RUnlock()
|
||||
return s.clusterHealth
|
||||
}
|
||||
|
||||
func (s *Server) getServerHealth(id string) *structs.ServerHealth {
|
||||
s.clusterHealthLock.RLock()
|
||||
defer s.clusterHealthLock.RUnlock()
|
||||
for _, health := range s.clusterHealth.Servers {
|
||||
if health.ID == id {
|
||||
return &health
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func isVoter(suffrage raft.ServerSuffrage) bool {
|
||||
switch suffrage {
|
||||
case raft.Voter, raft.Staging:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
return autopilot.PromoteStableServers(conf, health, future.Configuration().Servers), nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,457 @@
|
|||
package autopilot
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
// Delegate is the interface for the Autopilot mechanism
|
||||
type Delegate interface {
|
||||
FetchStats(ctx context.Context, servers []*metadata.Server) map[string]*ServerStats
|
||||
GetOrCreateAutopilotConfig() (*Config, bool)
|
||||
NumPeers() (int, error)
|
||||
PromoteNonVoters(*Config, OperatorHealthReply) ([]raft.Server, error)
|
||||
Raft() *raft.Raft
|
||||
Serf() *serf.Serf
|
||||
}
|
||||
|
||||
// Autopilot is a mechanism for automatically managing the Raft
|
||||
// quorum using server health information along with updates from Serf gossip.
|
||||
// For more information, see https://www.consul.io/docs/guides/autopilot.html
|
||||
type Autopilot struct {
|
||||
logger *log.Logger
|
||||
delegate Delegate
|
||||
validServerFunc func(serf.Member) bool
|
||||
|
||||
interval time.Duration
|
||||
healthInterval time.Duration
|
||||
|
||||
clusterHealth OperatorHealthReply
|
||||
clusterHealthLock sync.RWMutex
|
||||
|
||||
removeDeadCh chan struct{}
|
||||
shutdownCh chan struct{}
|
||||
waitGroup sync.WaitGroup
|
||||
}
|
||||
|
||||
func NewAutopilot(logger *log.Logger, delegate Delegate, serverFunc func(serf.Member) bool, interval, healthInterval time.Duration) *Autopilot {
|
||||
return &Autopilot{
|
||||
logger: logger,
|
||||
delegate: delegate,
|
||||
validServerFunc: serverFunc,
|
||||
interval: interval,
|
||||
healthInterval: healthInterval,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Autopilot) Start() {
|
||||
a.removeDeadCh = make(chan struct{})
|
||||
a.shutdownCh = make(chan struct{})
|
||||
a.waitGroup = sync.WaitGroup{}
|
||||
a.waitGroup.Add(1)
|
||||
|
||||
go a.run()
|
||||
}
|
||||
|
||||
func (a *Autopilot) Stop() {
|
||||
close(a.shutdownCh)
|
||||
a.waitGroup.Wait()
|
||||
}
|
||||
|
||||
// autopilotLoop periodically looks for nonvoting servers to promote and dead servers to remove.
|
||||
func (a *Autopilot) run() {
|
||||
defer a.waitGroup.Done()
|
||||
|
||||
// Monitor server health until shutdown
|
||||
ticker := time.NewTicker(a.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-a.shutdownCh:
|
||||
return
|
||||
case <-ticker.C:
|
||||
autopilotConfig, ok := a.delegate.GetOrCreateAutopilotConfig()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip the non-voter promotions unless all servers support the new APIs
|
||||
minRaftProtocol, err := a.MinRaftProtocol()
|
||||
if err != nil {
|
||||
a.logger.Printf("[ERR] autopilot: error getting server raft protocol versions: %s", err)
|
||||
continue
|
||||
}
|
||||
if minRaftProtocol >= 3 {
|
||||
promotions, err := a.delegate.PromoteNonVoters(autopilotConfig, a.GetClusterHealth())
|
||||
if err != nil {
|
||||
a.logger.Printf("[ERR] autopilot: Error checking for non-voters to promote: %s", err)
|
||||
}
|
||||
if err := a.handlePromotions(promotions); err != nil {
|
||||
a.logger.Printf("[ERR] autopilot: Error handling promotions: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := a.pruneDeadServers(autopilotConfig); err != nil {
|
||||
a.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
|
||||
}
|
||||
case <-a.removeDeadCh:
|
||||
autopilotConfig, ok := a.delegate.GetOrCreateAutopilotConfig()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := a.pruneDeadServers(autopilotConfig); err != nil {
|
||||
a.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// fmtServer prints info about a server in a standard way for logging.
|
||||
func fmtServer(server raft.Server) string {
|
||||
return fmt.Sprintf("Server (ID: %q Address: %q)", server.ID, server.Address)
|
||||
}
|
||||
|
||||
// pruneDeadServers removes up to numPeers/2 failed servers
|
||||
func (a *Autopilot) pruneDeadServers(conf *Config) error {
|
||||
if !conf.CleanupDeadServers {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Failed servers are known to Serf and marked failed, and stale servers
|
||||
// are known to Raft but not Serf.
|
||||
var failed []string
|
||||
staleRaftServers := make(map[string]raft.Server)
|
||||
raftNode := a.delegate.Raft()
|
||||
future := raftNode.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, server := range future.Configuration().Servers {
|
||||
staleRaftServers[string(server.Address)] = server
|
||||
}
|
||||
serfLAN := a.delegate.Serf()
|
||||
for _, member := range serfLAN.Members() {
|
||||
valid, parts := metadata.IsConsulServer(member)
|
||||
if valid {
|
||||
if _, ok := staleRaftServers[parts.Addr.String()]; ok {
|
||||
delete(staleRaftServers, parts.Addr.String())
|
||||
}
|
||||
|
||||
if member.Status == serf.StatusFailed {
|
||||
failed = append(failed, member.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We can bail early if there's nothing to do.
|
||||
removalCount := len(failed) + len(staleRaftServers)
|
||||
if removalCount == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Only do removals if a minority of servers will be affected.
|
||||
peers, err := a.delegate.NumPeers()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if removalCount < peers/2 {
|
||||
for _, node := range failed {
|
||||
a.logger.Printf("[INFO] autopilot: Attempting removal of failed server node %q", node)
|
||||
go serfLAN.RemoveFailedNode(node)
|
||||
}
|
||||
|
||||
minRaftProtocol, err := a.MinRaftProtocol()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, raftServer := range staleRaftServers {
|
||||
a.logger.Printf("[INFO] autopilot: Attempting removal of stale %s", fmtServer(raftServer))
|
||||
var future raft.Future
|
||||
if minRaftProtocol >= 2 {
|
||||
future = raftNode.RemoveServer(raftServer.ID, 0, 0)
|
||||
} else {
|
||||
future = raftNode.RemovePeer(raftServer.Address)
|
||||
}
|
||||
if err := future.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
a.logger.Printf("[DEBUG] autopilot: Failed to remove dead servers: too many dead servers: %d/%d", removalCount, peers)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// MinRaftProtocol returns the lowest supported Raft protocol among alive servers
|
||||
func (a *Autopilot) MinRaftProtocol() (int, error) {
|
||||
minVersion := -1
|
||||
members := a.delegate.Serf().Members()
|
||||
for _, m := range members {
|
||||
if m.Status != serf.StatusAlive {
|
||||
continue
|
||||
}
|
||||
|
||||
if !a.validServerFunc(m) {
|
||||
continue
|
||||
}
|
||||
|
||||
vsn, ok := m.Tags["raft_vsn"]
|
||||
if !ok {
|
||||
vsn = "1"
|
||||
}
|
||||
raftVsn, err := strconv.Atoi(vsn)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
if minVersion == -1 || raftVsn < minVersion {
|
||||
minVersion = raftVsn
|
||||
}
|
||||
}
|
||||
|
||||
if minVersion == -1 {
|
||||
return minVersion, fmt.Errorf("No servers found")
|
||||
}
|
||||
|
||||
return minVersion, nil
|
||||
}
|
||||
|
||||
// handlePromotions is a helper shared with Consul Enterprise that attempts to
|
||||
// apply desired server promotions to the Raft configuration.
|
||||
func (a *Autopilot) handlePromotions(promotions []raft.Server) error {
|
||||
// This used to wait to only promote to maintain an odd quorum of
|
||||
// servers, but this was at odds with the dead server cleanup when doing
|
||||
// rolling updates (add one new server, wait, and then kill an old
|
||||
// server). The dead server cleanup would still count the old server as
|
||||
// a peer, which is conservative and the right thing to do, and this
|
||||
// would wait to promote, so you could get into a stalemate. It is safer
|
||||
// to promote early than remove early, so by promoting as soon as
|
||||
// possible we have chosen that as the solution here.
|
||||
for _, server := range promotions {
|
||||
a.logger.Printf("[INFO] autopilot: Promoting %s to voter", fmtServer(server))
|
||||
addFuture := a.delegate.Raft().AddVoter(server.ID, server.Address, 0, 0)
|
||||
if err := addFuture.Error(); err != nil {
|
||||
return fmt.Errorf("failed to add raft peer: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// If we promoted a server, trigger a check to remove dead servers.
|
||||
if len(promotions) > 0 {
|
||||
select {
|
||||
case a.removeDeadCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ServerHealthLoop monitors the health of the servers in the cluster
|
||||
func (a *Autopilot) ServerHealthLoop(shutdownCh <-chan struct{}) {
|
||||
// Monitor server health until shutdown
|
||||
ticker := time.NewTicker(a.healthInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-shutdownCh:
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := a.updateClusterHealth(); err != nil {
|
||||
a.logger.Printf("[ERR] autopilot: Error updating cluster health: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// updateClusterHealth fetches the Raft stats of the other servers and updates
|
||||
// s.clusterHealth based on the configured Autopilot thresholds
|
||||
func (a *Autopilot) updateClusterHealth() error {
|
||||
// Don't do anything if the min Raft version is too low
|
||||
minRaftProtocol, err := a.MinRaftProtocol()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting server raft protocol versions: %s", err)
|
||||
}
|
||||
if minRaftProtocol < 3 {
|
||||
return nil
|
||||
}
|
||||
|
||||
autopilotConf, ok := a.delegate.GetOrCreateAutopilotConfig()
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
// Bail early if autopilot config hasn't been initialized yet
|
||||
if autopilotConf == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get the the serf members which are Consul servers
|
||||
serverMap := make(map[string]*metadata.Server)
|
||||
for _, member := range a.delegate.Serf().Members() {
|
||||
if member.Status == serf.StatusLeft {
|
||||
continue
|
||||
}
|
||||
|
||||
valid, parts := metadata.IsConsulServer(member)
|
||||
if valid {
|
||||
serverMap[parts.ID] = parts
|
||||
}
|
||||
}
|
||||
|
||||
raftNode := a.delegate.Raft()
|
||||
future := raftNode.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
return fmt.Errorf("error getting Raft configuration %s", err)
|
||||
}
|
||||
servers := future.Configuration().Servers
|
||||
|
||||
// Fetch the health for each of the servers in parallel so we get as
|
||||
// consistent of a sample as possible. We capture the leader's index
|
||||
// here as well so it roughly lines up with the same point in time.
|
||||
targetLastIndex := raftNode.LastIndex()
|
||||
var fetchList []*metadata.Server
|
||||
for _, server := range servers {
|
||||
if parts, ok := serverMap[string(server.ID)]; ok {
|
||||
fetchList = append(fetchList, parts)
|
||||
}
|
||||
}
|
||||
d := time.Now().Add(a.healthInterval / 2)
|
||||
ctx, cancel := context.WithDeadline(context.Background(), d)
|
||||
defer cancel()
|
||||
fetchedStats := a.delegate.FetchStats(ctx, fetchList)
|
||||
|
||||
// Build a current list of server healths
|
||||
leader := raftNode.Leader()
|
||||
var clusterHealth OperatorHealthReply
|
||||
voterCount := 0
|
||||
healthyCount := 0
|
||||
healthyVoterCount := 0
|
||||
for _, server := range servers {
|
||||
health := ServerHealth{
|
||||
ID: string(server.ID),
|
||||
Address: string(server.Address),
|
||||
Leader: server.Address == leader,
|
||||
LastContact: -1,
|
||||
Voter: server.Suffrage == raft.Voter,
|
||||
}
|
||||
|
||||
parts, ok := serverMap[string(server.ID)]
|
||||
if ok {
|
||||
health.Name = parts.Name
|
||||
health.SerfStatus = parts.Status
|
||||
health.Version = parts.Build.String()
|
||||
if stats, ok := fetchedStats[string(server.ID)]; ok {
|
||||
if err := a.updateServerHealth(&health, parts, stats, autopilotConf, targetLastIndex); err != nil {
|
||||
a.logger.Printf("[WARN] autopilot: Error updating server %s health: %s", fmtServer(server), err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
health.SerfStatus = serf.StatusNone
|
||||
}
|
||||
|
||||
if health.Voter {
|
||||
voterCount++
|
||||
}
|
||||
if health.Healthy {
|
||||
healthyCount++
|
||||
if health.Voter {
|
||||
healthyVoterCount++
|
||||
}
|
||||
}
|
||||
|
||||
clusterHealth.Servers = append(clusterHealth.Servers, health)
|
||||
}
|
||||
clusterHealth.Healthy = healthyCount == len(servers)
|
||||
|
||||
// If we have extra healthy voters, update FailureTolerance
|
||||
requiredQuorum := voterCount/2 + 1
|
||||
if healthyVoterCount > requiredQuorum {
|
||||
clusterHealth.FailureTolerance = healthyVoterCount - requiredQuorum
|
||||
}
|
||||
|
||||
// Heartbeat a metric for monitoring if we're the leader
|
||||
if raftNode.State() == raft.Leader {
|
||||
metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
|
||||
metrics.SetGauge([]string{"autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
|
||||
if clusterHealth.Healthy {
|
||||
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1)
|
||||
metrics.SetGauge([]string{"autopilot", "healthy"}, 1)
|
||||
} else {
|
||||
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0)
|
||||
metrics.SetGauge([]string{"autopilot", "healthy"}, 0)
|
||||
}
|
||||
}
|
||||
|
||||
a.clusterHealthLock.Lock()
|
||||
a.clusterHealth = clusterHealth
|
||||
a.clusterHealthLock.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateServerHealth computes the resulting health of the server based on its
|
||||
// fetched stats and the state of the leader.
|
||||
func (a *Autopilot) updateServerHealth(health *ServerHealth,
|
||||
server *metadata.Server, stats *ServerStats,
|
||||
autopilotConf *Config, targetLastIndex uint64) error {
|
||||
|
||||
health.LastTerm = stats.LastTerm
|
||||
health.LastIndex = stats.LastIndex
|
||||
|
||||
if stats.LastContact != "never" {
|
||||
var err error
|
||||
health.LastContact, err = time.ParseDuration(stats.LastContact)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error parsing last_contact duration: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
raftNode := a.delegate.Raft()
|
||||
lastTerm, err := strconv.ParseUint(raftNode.Stats()["last_log_term"], 10, 64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error parsing last_log_term: %s", err)
|
||||
}
|
||||
health.Healthy = health.IsHealthy(lastTerm, targetLastIndex, autopilotConf)
|
||||
|
||||
// If this is a new server or the health changed, reset StableSince
|
||||
lastHealth := a.GetServerHealth(server.ID)
|
||||
if lastHealth == nil || lastHealth.Healthy != health.Healthy {
|
||||
health.StableSince = time.Now()
|
||||
} else {
|
||||
health.StableSince = lastHealth.StableSince
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Autopilot) GetClusterHealth() OperatorHealthReply {
|
||||
a.clusterHealthLock.RLock()
|
||||
defer a.clusterHealthLock.RUnlock()
|
||||
return a.clusterHealth
|
||||
}
|
||||
|
||||
func (a *Autopilot) GetServerHealth(id string) *ServerHealth {
|
||||
a.clusterHealthLock.RLock()
|
||||
defer a.clusterHealthLock.RUnlock()
|
||||
return a.clusterHealth.ServerHealth(id)
|
||||
}
|
||||
|
||||
func isVoter(suffrage raft.ServerSuffrage) bool {
|
||||
switch suffrage {
|
||||
case raft.Voter, raft.Staging:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
package autopilot
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/raft"
|
||||
)
|
||||
|
||||
// PromoteStableServers is a basic autopilot promotion policy that promotes any
|
||||
// server which has been healthy and stable for the duration specified in the
|
||||
// given Autopilot config.
|
||||
func PromoteStableServers(autopilotConfig *Config, health OperatorHealthReply, servers []raft.Server) []raft.Server {
|
||||
// Find any non-voters eligible for promotion.
|
||||
now := time.Now()
|
||||
var promotions []raft.Server
|
||||
for _, server := range servers {
|
||||
if !isVoter(server.Suffrage) {
|
||||
health := health.ServerHealth(string(server.ID))
|
||||
if health.IsStable(now, autopilotConfig) {
|
||||
promotions = append(promotions, server)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return promotions
|
||||
}
|
|
@ -0,0 +1,102 @@
|
|||
package autopilot
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/pascaldekloe/goe/verify"
|
||||
)
|
||||
|
||||
func TestPromotion(t *testing.T) {
|
||||
config := &Config{
|
||||
LastContactThreshold: 5 * time.Second,
|
||||
MaxTrailingLogs: 100,
|
||||
ServerStabilizationTime: 3 * time.Second,
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
conf *Config
|
||||
health OperatorHealthReply
|
||||
servers []raft.Server
|
||||
promotions []raft.Server
|
||||
}{
|
||||
{
|
||||
name: "one stable voter, no promotions",
|
||||
conf: config,
|
||||
health: OperatorHealthReply{
|
||||
Servers: []ServerHealth{
|
||||
{
|
||||
ID: "a",
|
||||
Healthy: true,
|
||||
StableSince: time.Now().Add(-10 * time.Second),
|
||||
},
|
||||
},
|
||||
},
|
||||
servers: []raft.Server{
|
||||
{ID: "a", Suffrage: raft.Voter},
|
||||
},
|
||||
promotions: []raft.Server{},
|
||||
},
|
||||
{
|
||||
name: "one stable nonvoter, should be promoted",
|
||||
conf: config,
|
||||
health: OperatorHealthReply{
|
||||
Servers: []ServerHealth{
|
||||
{
|
||||
ID: "a",
|
||||
Healthy: true,
|
||||
StableSince: time.Now().Add(-10 * time.Second),
|
||||
},
|
||||
{
|
||||
ID: "b",
|
||||
Healthy: true,
|
||||
StableSince: time.Now().Add(-10 * time.Second),
|
||||
},
|
||||
},
|
||||
},
|
||||
servers: []raft.Server{
|
||||
{ID: "a", Suffrage: raft.Voter},
|
||||
{ID: "b", Suffrage: raft.Nonvoter},
|
||||
},
|
||||
promotions: []raft.Server{
|
||||
{ID: "b", Suffrage: raft.Nonvoter},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "unstable servers, neither should be promoted",
|
||||
conf: config,
|
||||
health: OperatorHealthReply{
|
||||
Servers: []ServerHealth{
|
||||
{
|
||||
ID: "a",
|
||||
Healthy: true,
|
||||
StableSince: time.Now().Add(-10 * time.Second),
|
||||
},
|
||||
{
|
||||
ID: "b",
|
||||
Healthy: false,
|
||||
StableSince: time.Now().Add(-10 * time.Second),
|
||||
},
|
||||
{
|
||||
ID: "c",
|
||||
Healthy: true,
|
||||
StableSince: time.Now().Add(-1 * time.Second),
|
||||
},
|
||||
},
|
||||
},
|
||||
servers: []raft.Server{
|
||||
{ID: "a", Suffrage: raft.Voter},
|
||||
{ID: "b", Suffrage: raft.Nonvoter},
|
||||
{ID: "c", Suffrage: raft.Nonvoter},
|
||||
},
|
||||
promotions: []raft.Server{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
promotions := PromoteStableServers(tc.conf, tc.health, tc.servers)
|
||||
verify.Values(t, tc.name, tc.promotions, promotions)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,158 @@
|
|||
package autopilot
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
// Config holds the Autopilot configuration for a cluster.
|
||||
type Config struct {
|
||||
// CleanupDeadServers controls whether to remove dead servers when a new
|
||||
// server is added to the Raft peers.
|
||||
CleanupDeadServers bool
|
||||
|
||||
// LastContactThreshold is the limit on the amount of time a server can go
|
||||
// without leader contact before being considered unhealthy.
|
||||
LastContactThreshold time.Duration
|
||||
|
||||
// MaxTrailingLogs is the amount of entries in the Raft Log that a server can
|
||||
// be behind before being considered unhealthy.
|
||||
MaxTrailingLogs uint64
|
||||
|
||||
// ServerStabilizationTime is the minimum amount of time a server must be
|
||||
// in a stable, healthy state before it can be added to the cluster. Only
|
||||
// applicable with Raft protocol version 3 or higher.
|
||||
ServerStabilizationTime time.Duration
|
||||
|
||||
// (Enterprise-only) RedundancyZoneTag is the node tag to use for separating
|
||||
// servers into zones for redundancy. If left blank, this feature will be disabled.
|
||||
RedundancyZoneTag string
|
||||
|
||||
// (Enterprise-only) DisableUpgradeMigration will disable Autopilot's upgrade migration
|
||||
// strategy of waiting until enough newer-versioned servers have been added to the
|
||||
// cluster before promoting them to voters.
|
||||
DisableUpgradeMigration bool
|
||||
|
||||
// (Enterprise-only) UpgradeVersionTag is the node tag to use for version info when
|
||||
// performing upgrade migrations. If left blank, the Consul version will be used.
|
||||
UpgradeVersionTag string
|
||||
|
||||
// CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
|
||||
CreateIndex uint64
|
||||
ModifyIndex uint64
|
||||
}
|
||||
|
||||
// ServerHealth is the health (from the leader's point of view) of a server.
|
||||
type ServerHealth struct {
|
||||
// ID is the raft ID of the server.
|
||||
ID string
|
||||
|
||||
// Name is the node name of the server.
|
||||
Name string
|
||||
|
||||
// Address is the address of the server.
|
||||
Address string
|
||||
|
||||
// The status of the SerfHealth check for the server.
|
||||
SerfStatus serf.MemberStatus
|
||||
|
||||
// Version is the version of the server.
|
||||
Version string
|
||||
|
||||
// Leader is whether this server is currently the leader.
|
||||
Leader bool
|
||||
|
||||
// LastContact is the time since this node's last contact with the leader.
|
||||
LastContact time.Duration
|
||||
|
||||
// LastTerm is the highest leader term this server has a record of in its Raft log.
|
||||
LastTerm uint64
|
||||
|
||||
// LastIndex is the last log index this server has a record of in its Raft log.
|
||||
LastIndex uint64
|
||||
|
||||
// Healthy is whether or not the server is healthy according to the current
|
||||
// Autopilot config.
|
||||
Healthy bool
|
||||
|
||||
// Voter is whether this is a voting server.
|
||||
Voter bool
|
||||
|
||||
// StableSince is the last time this server's Healthy value changed.
|
||||
StableSince time.Time
|
||||
}
|
||||
|
||||
// IsHealthy determines whether this ServerHealth is considered healthy
|
||||
// based on the given Autopilot config
|
||||
func (h *ServerHealth) IsHealthy(lastTerm uint64, leaderLastIndex uint64, autopilotConf *Config) bool {
|
||||
if h.SerfStatus != serf.StatusAlive {
|
||||
return false
|
||||
}
|
||||
|
||||
if h.LastContact > autopilotConf.LastContactThreshold || h.LastContact < 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
if h.LastTerm != lastTerm {
|
||||
return false
|
||||
}
|
||||
|
||||
if leaderLastIndex > autopilotConf.MaxTrailingLogs && h.LastIndex < leaderLastIndex-autopilotConf.MaxTrailingLogs {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// IsStable returns true if the ServerHealth shows a stable, passing state
|
||||
// according to the given AutopilotConfig
|
||||
func (h *ServerHealth) IsStable(now time.Time, conf *Config) bool {
|
||||
if h == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if !h.Healthy {
|
||||
return false
|
||||
}
|
||||
|
||||
if now.Sub(h.StableSince) < conf.ServerStabilizationTime {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// ServerStats holds miscellaneous Raft metrics for a server
|
||||
type ServerStats struct {
|
||||
// LastContact is the time since this node's last contact with the leader.
|
||||
LastContact string
|
||||
|
||||
// LastTerm is the highest leader term this server has a record of in its Raft log.
|
||||
LastTerm uint64
|
||||
|
||||
// LastIndex is the last log index this server has a record of in its Raft log.
|
||||
LastIndex uint64
|
||||
}
|
||||
|
||||
// OperatorHealthReply is a representation of the overall health of the cluster
|
||||
type OperatorHealthReply struct {
|
||||
// Healthy is true if all the servers in the cluster are healthy.
|
||||
Healthy bool
|
||||
|
||||
// FailureTolerance is the number of healthy servers that could be lost without
|
||||
// an outage occurring.
|
||||
FailureTolerance int
|
||||
|
||||
// Servers holds the health of each server.
|
||||
Servers []ServerHealth
|
||||
}
|
||||
|
||||
func (o *OperatorHealthReply) ServerHealth(id string) *ServerHealth {
|
||||
for _, health := range o.Servers {
|
||||
if health.ID == id {
|
||||
return &health
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package structs
|
||||
package autopilot
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
@ -12,7 +12,7 @@ func TestServerHealth_IsHealthy(t *testing.T) {
|
|||
health ServerHealth
|
||||
lastTerm uint64
|
||||
lastIndex uint64
|
||||
conf AutopilotConfig
|
||||
conf Config
|
||||
expected bool
|
||||
}{
|
||||
// Healthy server, all values within allowed limits
|
||||
|
@ -20,7 +20,7 @@ func TestServerHealth_IsHealthy(t *testing.T) {
|
|||
health: ServerHealth{SerfStatus: serf.StatusAlive, LastTerm: 1, LastIndex: 0},
|
||||
lastTerm: 1,
|
||||
lastIndex: 10,
|
||||
conf: AutopilotConfig{MaxTrailingLogs: 20},
|
||||
conf: Config{MaxTrailingLogs: 20},
|
||||
expected: true,
|
||||
},
|
||||
// Serf status failed
|
||||
|
@ -38,7 +38,7 @@ func TestServerHealth_IsHealthy(t *testing.T) {
|
|||
{
|
||||
health: ServerHealth{SerfStatus: serf.StatusAlive, LastIndex: 0},
|
||||
lastIndex: 10,
|
||||
conf: AutopilotConfig{MaxTrailingLogs: 5},
|
||||
conf: Config{MaxTrailingLogs: 5},
|
||||
expected: false,
|
||||
},
|
||||
}
|
||||
|
@ -56,14 +56,14 @@ func TestServerHealth_IsStable(t *testing.T) {
|
|||
cases := []struct {
|
||||
health *ServerHealth
|
||||
now time.Time
|
||||
conf AutopilotConfig
|
||||
conf Config
|
||||
expected bool
|
||||
}{
|
||||
// Healthy server, all values within allowed limits
|
||||
{
|
||||
health: &ServerHealth{Healthy: true, StableSince: start},
|
||||
now: start.Add(15 * time.Second),
|
||||
conf: AutopilotConfig{ServerStabilizationTime: 10 * time.Second},
|
||||
conf: Config{ServerStabilizationTime: 10 * time.Second},
|
||||
expected: true,
|
||||
},
|
||||
// Unhealthy server
|
||||
|
@ -75,7 +75,7 @@ func TestServerHealth_IsStable(t *testing.T) {
|
|||
{
|
||||
health: &ServerHealth{Healthy: true, StableSince: start},
|
||||
now: start.Add(5 * time.Second),
|
||||
conf: AutopilotConfig{ServerStabilizationTime: 10 * time.Second},
|
||||
conf: Config{ServerStabilizationTime: 10 * time.Second},
|
||||
expected: false,
|
||||
},
|
||||
// Nil struct
|
|
@ -301,7 +301,7 @@ func TestAutopilot_PromoteNonVoter(t *testing.T) {
|
|||
if servers[1].Suffrage != raft.Nonvoter {
|
||||
r.Fatalf("bad: %v", servers)
|
||||
}
|
||||
health := s1.getServerHealth(string(servers[1].ID))
|
||||
health := s1.autopilot.GetServerHealth(string(servers[1].ID))
|
||||
if health == nil {
|
||||
r.Fatal("nil health")
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@ import (
|
|||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/consul/types"
|
||||
|
@ -336,7 +336,7 @@ type Config struct {
|
|||
|
||||
// AutopilotConfig is used to apply the initial autopilot config when
|
||||
// bootstrapping.
|
||||
AutopilotConfig *structs.AutopilotConfig
|
||||
AutopilotConfig *autopilot.Config
|
||||
|
||||
// ServerHealthInterval is the frequency with which the health of the
|
||||
// servers in the cluster will be updated.
|
||||
|
@ -416,7 +416,7 @@ func DefaultConfig() *Config {
|
|||
|
||||
TLSMinVersion: "tls10",
|
||||
|
||||
AutopilotConfig: &structs.AutopilotConfig{
|
||||
AutopilotConfig: &autopilot.Config{
|
||||
CleanupDeadServers: true,
|
||||
LastContactThreshold: 200 * time.Millisecond,
|
||||
MaxTrailingLogs: 250,
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/types"
|
||||
|
@ -1096,7 +1097,7 @@ func TestFSM_Autopilot(t *testing.T) {
|
|||
// Set the autopilot config using a request.
|
||||
req := structs.AutopilotSetConfigRequest{
|
||||
Datacenter: "dc1",
|
||||
Config: structs.AutopilotConfig{
|
||||
Config: autopilot.Config{
|
||||
CleanupDeadServers: true,
|
||||
LastContactThreshold: 10 * time.Second,
|
||||
MaxTrailingLogs: 300,
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package fsm
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
|
@ -354,7 +355,7 @@ func restorePreparedQuery(header *snapshotHeader, restore *state.Restore, decode
|
|||
}
|
||||
|
||||
func restoreAutopilot(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.AutopilotConfig
|
||||
var req autopilot.Config
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
|
@ -88,7 +89,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
|||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
autopilotConf := &structs.AutopilotConfig{
|
||||
autopilotConf := &autopilot.Config{
|
||||
CleanupDeadServers: true,
|
||||
LastContactThreshold: 100 * time.Millisecond,
|
||||
MaxTrailingLogs: 222,
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
|
@ -23,6 +24,8 @@ const (
|
|||
barrierWriteTimeout = 2 * time.Minute
|
||||
)
|
||||
|
||||
var minAutopilotVersion = version.Must(version.NewVersion("0.8.0"))
|
||||
|
||||
// monitorLeadership is used to monitor if we acquire or lose our role
|
||||
// as the leader in the Raft cluster. There is some work the leader is
|
||||
// expected to do, so we must react to changes
|
||||
|
@ -202,7 +205,7 @@ func (s *Server) establishLeadership() error {
|
|||
}
|
||||
|
||||
s.getOrCreateAutopilotConfig()
|
||||
s.startAutopilot()
|
||||
s.autopilot.Start()
|
||||
s.setConsistentReadReady()
|
||||
return nil
|
||||
}
|
||||
|
@ -220,7 +223,7 @@ func (s *Server) revokeLeadership() error {
|
|||
}
|
||||
|
||||
s.resetConsistentReadReady()
|
||||
s.stopAutopilot()
|
||||
s.autopilot.Stop()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -326,7 +329,7 @@ func (s *Server) initializeACL() error {
|
|||
}
|
||||
|
||||
// getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary
|
||||
func (s *Server) getOrCreateAutopilotConfig() (*structs.AutopilotConfig, bool) {
|
||||
func (s *Server) getOrCreateAutopilotConfig() (*autopilot.Config, bool) {
|
||||
state := s.fsm.State()
|
||||
_, config, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
|
@ -681,7 +684,7 @@ func (s *Server) joinConsulServer(m serf.Member, parts *metadata.Server) error {
|
|||
// log entries. If the address is the same but the ID changed, remove the
|
||||
// old server before adding the new one.
|
||||
addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String()
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
|
||||
minRaftProtocol, err := s.autopilot.MinRaftProtocol()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -756,7 +759,7 @@ func (s *Server) removeConsulServer(m serf.Member, port int) error {
|
|||
return err
|
||||
}
|
||||
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
|
||||
minRaftProtocol, err := s.autopilot.MinRaftProtocol()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -812,7 +812,7 @@ func TestLeader_RollRaftServer(t *testing.T) {
|
|||
|
||||
for _, s := range []*Server{s1, s3} {
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
minVer, err := ServerMinRaftProtocol(s.LANMembers())
|
||||
minVer, err := s.autopilot.MinRaftProtocol()
|
||||
if err != nil {
|
||||
r.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -4,11 +4,12 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration.
|
||||
func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *structs.AutopilotConfig) error {
|
||||
func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *autopilot.Config) error {
|
||||
if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
@ -69,7 +70,7 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe
|
|||
}
|
||||
|
||||
// ServerHealth is used to get the current health of the servers.
|
||||
func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs.OperatorHealthReply) error {
|
||||
func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *autopilot.OperatorHealthReply) error {
|
||||
// This must be sent to the leader, so we fix the args since we are
|
||||
// re-using a structure where we don't support all the options.
|
||||
args.RequireConsistent = true
|
||||
|
@ -88,7 +89,7 @@ func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs
|
|||
}
|
||||
|
||||
// Exit early if the min Raft version is too low
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.LANMembers())
|
||||
minRaftProtocol, err := op.srv.autopilot.MinRaftProtocol()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting server raft protocol versions: %s", err)
|
||||
}
|
||||
|
@ -96,7 +97,7 @@ func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs
|
|||
return fmt.Errorf("all servers must have raft_protocol set to 3 or higher to use this endpoint")
|
||||
}
|
||||
|
||||
*reply = op.srv.getClusterHealth()
|
||||
*reply = op.srv.autopilot.GetClusterHealth()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
"github.com/hashicorp/consul/testutil/retry"
|
||||
|
@ -29,7 +30,7 @@ func TestOperator_Autopilot_GetConfiguration(t *testing.T) {
|
|||
arg := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var reply structs.AutopilotConfig
|
||||
var reply autopilot.Config
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
@ -58,7 +59,7 @@ func TestOperator_Autopilot_GetConfiguration_ACLDeny(t *testing.T) {
|
|||
arg := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var reply structs.AutopilotConfig
|
||||
var reply autopilot.Config
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
|
||||
if !acl.IsErrPermissionDenied(err) {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
@ -112,7 +113,7 @@ func TestOperator_Autopilot_SetConfiguration(t *testing.T) {
|
|||
// Change the autopilot config from the default
|
||||
arg := structs.AutopilotSetConfigRequest{
|
||||
Datacenter: "dc1",
|
||||
Config: structs.AutopilotConfig{
|
||||
Config: autopilot.Config{
|
||||
CleanupDeadServers: true,
|
||||
},
|
||||
}
|
||||
|
@ -151,7 +152,7 @@ func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) {
|
|||
// Try to set config without permissions
|
||||
arg := structs.AutopilotSetConfigRequest{
|
||||
Datacenter: "dc1",
|
||||
Config: structs.AutopilotConfig{
|
||||
Config: autopilot.Config{
|
||||
CleanupDeadServers: true,
|
||||
},
|
||||
}
|
||||
|
@ -232,7 +233,7 @@ func TestOperator_ServerHealth(t *testing.T) {
|
|||
arg := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var reply structs.OperatorHealthReply
|
||||
var reply autopilot.OperatorHealthReply
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply)
|
||||
if err != nil {
|
||||
r.Fatalf("err: %v", err)
|
||||
|
@ -274,7 +275,7 @@ func TestOperator_ServerHealth_UnsupportedRaftVersion(t *testing.T) {
|
|||
arg := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var reply structs.OperatorHealthReply
|
||||
var reply autopilot.OperatorHealthReply
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply)
|
||||
if err == nil || !strings.Contains(err.Error(), "raft_protocol set to 3 or higher") {
|
||||
t.Fatalf("bad: %v", err)
|
||||
|
|
|
@ -115,7 +115,7 @@ REMOVE:
|
|||
// doing if you are calling this. If you remove a peer that's known to
|
||||
// Serf, for example, it will come back when the leader does a reconcile
|
||||
// pass.
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members())
|
||||
minRaftProtocol, err := op.srv.autopilot.MinRaftProtocol()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -182,7 +182,7 @@ REMOVE:
|
|||
// doing if you are calling this. If you remove a peer that's known to
|
||||
// Serf, for example, it will come back when the leader does a reconcile
|
||||
// pass.
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members())
|
||||
minRaftProtocol, err := op.srv.autopilot.MinRaftProtocol()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||
"github.com/hashicorp/consul/agent/consul/fsm"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
|
@ -86,8 +87,8 @@ type Server struct {
|
|||
// aclCache is the non-authoritative ACL cache.
|
||||
aclCache *aclCache
|
||||
|
||||
// autopilotPolicy controls the behavior of Autopilot for certain tasks.
|
||||
autopilotPolicy AutopilotPolicy
|
||||
// autopilot is the Autopilot instance for this server.
|
||||
autopilot *autopilot.Autopilot
|
||||
|
||||
// autopilotRemoveDeadCh is used to trigger a check for dead server removals.
|
||||
autopilotRemoveDeadCh chan struct{}
|
||||
|
@ -98,10 +99,6 @@ type Server struct {
|
|||
// autopilotWaitGroup is used to block until Autopilot shuts down.
|
||||
autopilotWaitGroup sync.WaitGroup
|
||||
|
||||
// clusterHealth stores the current view of the cluster's health.
|
||||
clusterHealth structs.OperatorHealthReply
|
||||
clusterHealthLock sync.RWMutex
|
||||
|
||||
// Consul configuration
|
||||
config *Config
|
||||
|
||||
|
@ -305,8 +302,12 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
|
|||
shutdownCh: shutdownCh,
|
||||
}
|
||||
|
||||
// Set up the autopilot policy
|
||||
s.autopilotPolicy = &BasicAutopilot{server: s}
|
||||
// Set up autopilot
|
||||
apDelegate := &AutopilotDelegate{s}
|
||||
serverFunc := func(m serf.Member) bool {
|
||||
return m.Tags["role"] == "consul"
|
||||
}
|
||||
s.autopilot = autopilot.NewAutopilot(logger, apDelegate, serverFunc, config.AutopilotInterval, config.ServerHealthInterval)
|
||||
|
||||
// Initialize the stats fetcher that autopilot will use.
|
||||
s.statsFetcher = NewStatsFetcher(logger, s.connPool, s.config.Datacenter)
|
||||
|
@ -430,7 +431,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
|
|||
go s.sessionStats()
|
||||
|
||||
// Start the server health checking.
|
||||
go s.serverHealthLoop()
|
||||
go s.autopilot.ServerHealthLoop(s.shutdownCh)
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
@ -732,7 +733,7 @@ func (s *Server) Leave() error {
|
|||
// removed for some sane period of time.
|
||||
isLeader := s.IsLeader()
|
||||
if isLeader && numPeers > 1 {
|
||||
minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
|
||||
minRaftProtocol, err := s.autopilot.MinRaftProtocol()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -304,7 +304,7 @@ func (s *Server) maybeBootstrap() {
|
|||
// Attempt a live bootstrap!
|
||||
var configuration raft.Configuration
|
||||
var addrs []string
|
||||
minRaftVersion, err := ServerMinRaftProtocol(members)
|
||||
minRaftVersion, err := s.autopilot.MinRaftProtocol()
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] consul: Failed to read server raft versions: %v", err)
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ package state
|
|||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
|
@ -30,13 +30,13 @@ func init() {
|
|||
}
|
||||
|
||||
// Autopilot is used to pull the autopilot config from the snapshot.
|
||||
func (s *Snapshot) Autopilot() (*structs.AutopilotConfig, error) {
|
||||
func (s *Snapshot) Autopilot() (*autopilot.Config, error) {
|
||||
c, err := s.tx.First("autopilot-config", "id")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
config, ok := c.(*structs.AutopilotConfig)
|
||||
config, ok := c.(*autopilot.Config)
|
||||
if !ok {
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -45,7 +45,7 @@ func (s *Snapshot) Autopilot() (*structs.AutopilotConfig, error) {
|
|||
}
|
||||
|
||||
// Autopilot is used when restoring from a snapshot.
|
||||
func (s *Restore) Autopilot(config *structs.AutopilotConfig) error {
|
||||
func (s *Restore) Autopilot(config *autopilot.Config) error {
|
||||
if err := s.tx.Insert("autopilot-config", config); err != nil {
|
||||
return fmt.Errorf("failed restoring autopilot config: %s", err)
|
||||
}
|
||||
|
@ -54,7 +54,7 @@ func (s *Restore) Autopilot(config *structs.AutopilotConfig) error {
|
|||
}
|
||||
|
||||
// AutopilotConfig is used to get the current Autopilot configuration.
|
||||
func (s *Store) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) {
|
||||
func (s *Store) AutopilotConfig() (uint64, *autopilot.Config, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
|
@ -64,7 +64,7 @@ func (s *Store) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) {
|
|||
return 0, nil, fmt.Errorf("failed autopilot config lookup: %s", err)
|
||||
}
|
||||
|
||||
config, ok := c.(*structs.AutopilotConfig)
|
||||
config, ok := c.(*autopilot.Config)
|
||||
if !ok {
|
||||
return 0, nil, nil
|
||||
}
|
||||
|
@ -73,7 +73,7 @@ func (s *Store) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) {
|
|||
}
|
||||
|
||||
// AutopilotSetConfig is used to set the current Autopilot configuration.
|
||||
func (s *Store) AutopilotSetConfig(idx uint64, config *structs.AutopilotConfig) error {
|
||||
func (s *Store) AutopilotSetConfig(idx uint64, config *autopilot.Config) error {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
|
@ -86,7 +86,7 @@ func (s *Store) AutopilotSetConfig(idx uint64, config *structs.AutopilotConfig)
|
|||
// AutopilotCASConfig is used to try updating the Autopilot configuration with a
|
||||
// given Raft index. If the CAS index specified is not equal to the last observed index
|
||||
// for the config, then the call is a noop,
|
||||
func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotConfig) (bool, error) {
|
||||
func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *autopilot.Config) (bool, error) {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
|
@ -99,7 +99,7 @@ func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotCo
|
|||
// If the existing index does not match the provided CAS
|
||||
// index arg, then we shouldn't update anything and can safely
|
||||
// return early here.
|
||||
e, ok := existing.(*structs.AutopilotConfig)
|
||||
e, ok := existing.(*autopilot.Config)
|
||||
if !ok || e.ModifyIndex != cidx {
|
||||
return false, nil
|
||||
}
|
||||
|
@ -110,7 +110,7 @@ func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotCo
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func (s *Store) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.AutopilotConfig) error {
|
||||
func (s *Store) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *autopilot.Config) error {
|
||||
// Check for an existing config
|
||||
existing, err := tx.First("autopilot-config", "id")
|
||||
if err != nil {
|
||||
|
@ -119,7 +119,7 @@ func (s *Store) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs
|
|||
|
||||
// Set the indexes.
|
||||
if existing != nil {
|
||||
config.CreateIndex = existing.(*structs.AutopilotConfig).CreateIndex
|
||||
config.CreateIndex = existing.(*autopilot.Config).CreateIndex
|
||||
} else {
|
||||
config.CreateIndex = idx
|
||||
}
|
||||
|
|
|
@ -5,14 +5,14 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||
"github.com/pascaldekloe/goe/verify"
|
||||
)
|
||||
|
||||
func TestStateStore_Autopilot(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
expected := &structs.AutopilotConfig{
|
||||
expected := &autopilot.Config{
|
||||
CleanupDeadServers: true,
|
||||
LastContactThreshold: 5 * time.Second,
|
||||
MaxTrailingLogs: 500,
|
||||
|
@ -41,7 +41,7 @@ func TestStateStore_Autopilot(t *testing.T) {
|
|||
func TestStateStore_AutopilotCAS(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
expected := &structs.AutopilotConfig{
|
||||
expected := &autopilot.Config{
|
||||
CleanupDeadServers: true,
|
||||
}
|
||||
|
||||
|
@ -53,7 +53,7 @@ func TestStateStore_AutopilotCAS(t *testing.T) {
|
|||
}
|
||||
|
||||
// Do a CAS with an index lower than the entry
|
||||
ok, err := s.AutopilotCASConfig(2, 0, &structs.AutopilotConfig{
|
||||
ok, err := s.AutopilotCASConfig(2, 0, &autopilot.Config{
|
||||
CleanupDeadServers: false,
|
||||
})
|
||||
if ok || err != nil {
|
||||
|
@ -74,7 +74,7 @@ func TestStateStore_AutopilotCAS(t *testing.T) {
|
|||
}
|
||||
|
||||
// Do another CAS, this time with the correct index
|
||||
ok, err = s.AutopilotCASConfig(2, 1, &structs.AutopilotConfig{
|
||||
ok, err = s.AutopilotCASConfig(2, 1, &autopilot.Config{
|
||||
CleanupDeadServers: false,
|
||||
})
|
||||
if !ok || err != nil {
|
||||
|
@ -96,7 +96,7 @@ func TestStateStore_AutopilotCAS(t *testing.T) {
|
|||
|
||||
func TestStateStore_Autopilot_Snapshot_Restore(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
before := &structs.AutopilotConfig{
|
||||
before := &autopilot.Config{
|
||||
CleanupDeadServers: true,
|
||||
}
|
||||
if err := s.AutopilotSetConfig(99, before); err != nil {
|
||||
|
@ -106,7 +106,7 @@ func TestStateStore_Autopilot_Snapshot_Restore(t *testing.T) {
|
|||
snap := s.Snapshot()
|
||||
defer snap.Close()
|
||||
|
||||
after := &structs.AutopilotConfig{
|
||||
after := &autopilot.Config{
|
||||
CleanupDeadServers: false,
|
||||
}
|
||||
if err := s.AutopilotSetConfig(100, after); err != nil {
|
||||
|
|
|
@ -5,9 +5,9 @@ import (
|
|||
"log"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
// StatsFetcher has two functions for autopilot. First, lets us fetch all the
|
||||
|
@ -39,9 +39,9 @@ func NewStatsFetcher(logger *log.Logger, pool *pool.ConnPool, datacenter string)
|
|||
// cancel this when the context is canceled because we only want one in-flight
|
||||
// RPC to each server, so we let it finish and then clean up the in-flight
|
||||
// tracking.
|
||||
func (f *StatsFetcher) fetch(server *metadata.Server, replyCh chan *structs.ServerStats) {
|
||||
func (f *StatsFetcher) fetch(server *metadata.Server, replyCh chan *autopilot.ServerStats) {
|
||||
var args struct{}
|
||||
var reply structs.ServerStats
|
||||
var reply autopilot.ServerStats
|
||||
err := f.pool.RPC(f.datacenter, server.Addr, server.Version, "Status.RaftStats", server.UseTLS, &args, &reply)
|
||||
if err != nil {
|
||||
f.logger.Printf("[WARN] consul: error getting server health from %q: %v",
|
||||
|
@ -56,10 +56,10 @@ func (f *StatsFetcher) fetch(server *metadata.Server, replyCh chan *structs.Serv
|
|||
}
|
||||
|
||||
// Fetch will attempt to query all the servers in parallel.
|
||||
func (f *StatsFetcher) Fetch(ctx context.Context, servers []*metadata.Server) map[string]*structs.ServerStats {
|
||||
func (f *StatsFetcher) Fetch(ctx context.Context, servers []*metadata.Server) map[string]*autopilot.ServerStats {
|
||||
type workItem struct {
|
||||
server *metadata.Server
|
||||
replyCh chan *structs.ServerStats
|
||||
replyCh chan *autopilot.ServerStats
|
||||
}
|
||||
var work []*workItem
|
||||
|
||||
|
@ -72,7 +72,7 @@ func (f *StatsFetcher) Fetch(ctx context.Context, servers []*metadata.Server) ma
|
|||
} else {
|
||||
workItem := &workItem{
|
||||
server: server,
|
||||
replyCh: make(chan *structs.ServerStats, 1),
|
||||
replyCh: make(chan *autopilot.ServerStats, 1),
|
||||
}
|
||||
work = append(work, workItem)
|
||||
f.inflight[server.ID] = struct{}{}
|
||||
|
@ -83,7 +83,7 @@ func (f *StatsFetcher) Fetch(ctx context.Context, servers []*metadata.Server) ma
|
|||
|
||||
// Now wait for the results to come in, or for the context to be
|
||||
// canceled.
|
||||
replies := make(map[string]*structs.ServerStats)
|
||||
replies := make(map[string]*autopilot.ServerStats)
|
||||
for _, workItem := range work {
|
||||
select {
|
||||
case reply := <-workItem.replyCh:
|
||||
|
|
|
@ -4,7 +4,7 @@ import (
|
|||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||
)
|
||||
|
||||
// Status endpoint is used to check on server status
|
||||
|
@ -42,7 +42,7 @@ func (s *Status) Peers(args struct{}, reply *[]string) error {
|
|||
}
|
||||
|
||||
// Used by Autopilot to query the raft stats of the local server.
|
||||
func (s *Status) RaftStats(args struct{}, reply *structs.ServerStats) error {
|
||||
func (s *Status) RaftStats(args struct{}, reply *autopilot.ServerStats) error {
|
||||
stats := s.server.raft.Stats()
|
||||
|
||||
var err error
|
||||
|
|
|
@ -93,35 +93,6 @@ func CanServersUnderstandProtocol(members []serf.Member, version uint8) (bool, e
|
|||
return (numServers > 0) && (numWhoGrok == numServers), nil
|
||||
}
|
||||
|
||||
// ServerMinRaftProtocol returns the lowest supported Raft protocol among alive servers
|
||||
func ServerMinRaftProtocol(members []serf.Member) (int, error) {
|
||||
minVersion := -1
|
||||
for _, m := range members {
|
||||
if m.Tags["role"] != "consul" || m.Status != serf.StatusAlive {
|
||||
continue
|
||||
}
|
||||
|
||||
vsn, ok := m.Tags["raft_vsn"]
|
||||
if !ok {
|
||||
vsn = "1"
|
||||
}
|
||||
raftVsn, err := strconv.Atoi(vsn)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
if minVersion == -1 || raftVsn < minVersion {
|
||||
minVersion = raftVsn
|
||||
}
|
||||
}
|
||||
|
||||
if minVersion == -1 {
|
||||
return minVersion, fmt.Errorf("No servers found")
|
||||
}
|
||||
|
||||
return minVersion, nil
|
||||
}
|
||||
|
||||
// Returns if a member is a consul node. Returns a bool,
|
||||
// and the datacenter.
|
||||
func isConsulNode(m serf.Member) (bool, string) {
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
|
@ -194,7 +195,7 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
var reply structs.AutopilotConfig
|
||||
var reply autopilot.Config
|
||||
if err := s.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -226,7 +227,7 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
args.Config = structs.AutopilotConfig{
|
||||
args.Config = autopilot.Config{
|
||||
CleanupDeadServers: conf.CleanupDeadServers,
|
||||
LastContactThreshold: conf.LastContactThreshold.Duration(),
|
||||
MaxTrailingLogs: conf.MaxTrailingLogs,
|
||||
|
@ -276,7 +277,7 @@ func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Re
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
var reply structs.OperatorHealthReply
|
||||
var reply autopilot.OperatorHealthReply
|
||||
if err := s.agent.RPC("Operator.ServerHealth", &args, &reply); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/testutil/retry"
|
||||
|
@ -329,7 +330,7 @@ func TestOperator_AutopilotSetConfiguration(t *testing.T) {
|
|||
Datacenter: "dc1",
|
||||
}
|
||||
|
||||
var reply structs.AutopilotConfig
|
||||
var reply autopilot.Config
|
||||
if err := a.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -357,7 +358,7 @@ func TestOperator_AutopilotCASConfiguration(t *testing.T) {
|
|||
Datacenter: "dc1",
|
||||
}
|
||||
|
||||
var reply structs.AutopilotConfig
|
||||
var reply autopilot.Config
|
||||
if err := a.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
|
|
@ -2,48 +2,11 @@ package structs
|
|||
|
||||
import (
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
// AutopilotConfig holds the Autopilot configuration for a cluster.
|
||||
type AutopilotConfig struct {
|
||||
// CleanupDeadServers controls whether to remove dead servers when a new
|
||||
// server is added to the Raft peers.
|
||||
CleanupDeadServers bool
|
||||
|
||||
// LastContactThreshold is the limit on the amount of time a server can go
|
||||
// without leader contact before being considered unhealthy.
|
||||
LastContactThreshold time.Duration
|
||||
|
||||
// MaxTrailingLogs is the amount of entries in the Raft Log that a server can
|
||||
// be behind before being considered unhealthy.
|
||||
MaxTrailingLogs uint64
|
||||
|
||||
// ServerStabilizationTime is the minimum amount of time a server must be
|
||||
// in a stable, healthy state before it can be added to the cluster. Only
|
||||
// applicable with Raft protocol version 3 or higher.
|
||||
ServerStabilizationTime time.Duration
|
||||
|
||||
// (Enterprise-only) RedundancyZoneTag is the node tag to use for separating
|
||||
// servers into zones for redundancy. If left blank, this feature will be disabled.
|
||||
RedundancyZoneTag string
|
||||
|
||||
// (Enterprise-only) DisableUpgradeMigration will disable Autopilot's upgrade migration
|
||||
// strategy of waiting until enough newer-versioned servers have been added to the
|
||||
// cluster before promoting them to voters.
|
||||
DisableUpgradeMigration bool
|
||||
|
||||
// (Enterprise-only) UpgradeVersionTag is the node tag to use for version info when
|
||||
// performing upgrade migrations. If left blank, the Consul version will be used.
|
||||
UpgradeVersionTag string
|
||||
|
||||
// RaftIndex stores the create/modify indexes of this configuration.
|
||||
RaftIndex
|
||||
}
|
||||
|
||||
// RaftServer has information about a server in the Raft configuration.
|
||||
type RaftServer struct {
|
||||
// ID is the unique ID for the server. These are currently the same
|
||||
|
@ -109,7 +72,7 @@ type AutopilotSetConfigRequest struct {
|
|||
Datacenter string
|
||||
|
||||
// Config is the new Autopilot configuration to use.
|
||||
Config AutopilotConfig
|
||||
Config autopilot.Config
|
||||
|
||||
// CAS controls whether to use check-and-set semantics for this request.
|
||||
CAS bool
|
||||
|
@ -123,111 +86,6 @@ func (op *AutopilotSetConfigRequest) RequestDatacenter() string {
|
|||
return op.Datacenter
|
||||
}
|
||||
|
||||
// ServerHealth is the health (from the leader's point of view) of a server.
|
||||
type ServerHealth struct {
|
||||
// ID is the raft ID of the server.
|
||||
ID string
|
||||
|
||||
// Name is the node name of the server.
|
||||
Name string
|
||||
|
||||
// Address is the address of the server.
|
||||
Address string
|
||||
|
||||
// The status of the SerfHealth check for the server.
|
||||
SerfStatus serf.MemberStatus
|
||||
|
||||
// Version is the Consul version of the server.
|
||||
Version string
|
||||
|
||||
// Leader is whether this server is currently the leader.
|
||||
Leader bool
|
||||
|
||||
// LastContact is the time since this node's last contact with the leader.
|
||||
LastContact time.Duration
|
||||
|
||||
// LastTerm is the highest leader term this server has a record of in its Raft log.
|
||||
LastTerm uint64
|
||||
|
||||
// LastIndex is the last log index this server has a record of in its Raft log.
|
||||
LastIndex uint64
|
||||
|
||||
// Healthy is whether or not the server is healthy according to the current
|
||||
// Autopilot config.
|
||||
Healthy bool
|
||||
|
||||
// Voter is whether this is a voting server.
|
||||
Voter bool
|
||||
|
||||
// StableSince is the last time this server's Healthy value changed.
|
||||
StableSince time.Time
|
||||
}
|
||||
|
||||
// IsHealthy determines whether this ServerHealth is considered healthy
|
||||
// based on the given Autopilot config
|
||||
func (h *ServerHealth) IsHealthy(lastTerm uint64, leaderLastIndex uint64, autopilotConf *AutopilotConfig) bool {
|
||||
if h.SerfStatus != serf.StatusAlive {
|
||||
return false
|
||||
}
|
||||
|
||||
if h.LastContact > autopilotConf.LastContactThreshold || h.LastContact < 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
if h.LastTerm != lastTerm {
|
||||
return false
|
||||
}
|
||||
|
||||
if leaderLastIndex > autopilotConf.MaxTrailingLogs && h.LastIndex < leaderLastIndex-autopilotConf.MaxTrailingLogs {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// IsStable returns true if the ServerHealth is in a stable, passing state
|
||||
// according to the given AutopilotConfig
|
||||
func (h *ServerHealth) IsStable(now time.Time, conf *AutopilotConfig) bool {
|
||||
if h == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if !h.Healthy {
|
||||
return false
|
||||
}
|
||||
|
||||
if now.Sub(h.StableSince) < conf.ServerStabilizationTime {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// ServerStats holds miscellaneous Raft metrics for a server
|
||||
type ServerStats struct {
|
||||
// LastContact is the time since this node's last contact with the leader.
|
||||
LastContact string
|
||||
|
||||
// LastTerm is the highest leader term this server has a record of in its Raft log.
|
||||
LastTerm uint64
|
||||
|
||||
// LastIndex is the last log index this server has a record of in its Raft log.
|
||||
LastIndex uint64
|
||||
}
|
||||
|
||||
// OperatorHealthReply is a representation of the overall health of the cluster
|
||||
type OperatorHealthReply struct {
|
||||
// Healthy is true if all the servers in the cluster are healthy.
|
||||
Healthy bool
|
||||
|
||||
// FailureTolerance is the number of healthy servers that could be lost without
|
||||
// an outage occurring.
|
||||
FailureTolerance int
|
||||
|
||||
// Servers holds the health of each server.
|
||||
Servers []ServerHealth
|
||||
}
|
||||
|
||||
// (Enterprise-only) NetworkSegment is the configuration for a network segment, which is an
|
||||
// isolated serf group on the LAN.
|
||||
type NetworkSegment struct {
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent"
|
||||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/mitchellh/cli"
|
||||
)
|
||||
|
@ -44,7 +45,7 @@ func TestOperatorAutopilotSetConfigCommmand(t *testing.T) {
|
|||
req := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var reply structs.AutopilotConfig
|
||||
var reply autopilot.Config
|
||||
if err := a.RPC("Operator.AutopilotGetConfiguration", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue