diff --git a/agent/consul/autopilot.go b/agent/consul/autopilot.go index 7dbdbdcb12..267a9c9e9a 100644 --- a/agent/consul/autopilot.go +++ b/agent/consul/autopilot.go @@ -3,420 +3,43 @@ package consul import ( "context" "fmt" - "strconv" - "sync" - "time" - "github.com/armon/go-metrics" + "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/metadata" - "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/go-version" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" ) -// AutopilotPolicy is the interface for the Autopilot mechanism -type AutopilotPolicy interface { - // PromoteNonVoters defines the handling of non-voting servers - PromoteNonVoters(*structs.AutopilotConfig) error -} - -func (s *Server) startAutopilot() { - s.autopilotShutdownCh = make(chan struct{}) - s.autopilotWaitGroup = sync.WaitGroup{} - s.autopilotWaitGroup.Add(1) - - go s.autopilotLoop() -} - -func (s *Server) stopAutopilot() { - close(s.autopilotShutdownCh) - s.autopilotWaitGroup.Wait() -} - -var minAutopilotVersion = version.Must(version.NewVersion("0.8.0")) - -// autopilotLoop periodically looks for nonvoting servers to promote and dead servers to remove. -func (s *Server) autopilotLoop() { - defer s.autopilotWaitGroup.Done() - - // Monitor server health until shutdown - ticker := time.NewTicker(s.config.AutopilotInterval) - defer ticker.Stop() - - for { - select { - case <-s.autopilotShutdownCh: - return - case <-ticker.C: - autopilotConfig, ok := s.getOrCreateAutopilotConfig() - if !ok { - continue - } - - if err := s.autopilotPolicy.PromoteNonVoters(autopilotConfig); err != nil { - s.logger.Printf("[ERR] autopilot: Error checking for non-voters to promote: %s", err) - } - - if err := s.pruneDeadServers(autopilotConfig); err != nil { - s.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err) - } - case <-s.autopilotRemoveDeadCh: - autopilotConfig, ok := s.getOrCreateAutopilotConfig() - if !ok { - continue - } - - if err := s.pruneDeadServers(autopilotConfig); err != nil { - s.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err) - } - } - } -} - -// fmtServer prints info about a server in a standard way for logging. -func fmtServer(server raft.Server) string { - return fmt.Sprintf("Server (ID: %q Address: %q)", server.ID, server.Address) -} - -// pruneDeadServers removes up to numPeers/2 failed servers -func (s *Server) pruneDeadServers(autopilotConfig *structs.AutopilotConfig) error { - if !autopilotConfig.CleanupDeadServers { - return nil - } - - // Failed servers are known to Serf and marked failed, and stale servers - // are known to Raft but not Serf. - var failed []string - staleRaftServers := make(map[string]raft.Server) - future := s.raft.GetConfiguration() - if err := future.Error(); err != nil { - return err - } - for _, server := range future.Configuration().Servers { - staleRaftServers[string(server.Address)] = server - } - for _, member := range s.serfLAN.Members() { - valid, parts := metadata.IsConsulServer(member) - if valid { - if _, ok := staleRaftServers[parts.Addr.String()]; ok { - delete(staleRaftServers, parts.Addr.String()) - } - - if member.Status == serf.StatusFailed { - failed = append(failed, member.Name) - } - } - } - - // We can bail early if there's nothing to do. - removalCount := len(failed) + len(staleRaftServers) - if removalCount == 0 { - return nil - } - - // Only do removals if a minority of servers will be affected. - peers, err := s.numPeers() - if err != nil { - return err - } - if removalCount < peers/2 { - for _, node := range failed { - s.logger.Printf("[INFO] autopilot: Attempting removal of failed server node %q", node) - go s.serfLAN.RemoveFailedNode(node) - } - - minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members()) - if err != nil { - return err - } - for _, raftServer := range staleRaftServers { - s.logger.Printf("[INFO] autopilot: Attempting removal of stale %s", fmtServer(raftServer)) - var future raft.Future - if minRaftProtocol >= 2 { - future = s.raft.RemoveServer(raftServer.ID, 0, 0) - } else { - future = s.raft.RemovePeer(raftServer.Address) - } - if err := future.Error(); err != nil { - return err - } - } - } else { - s.logger.Printf("[DEBUG] autopilot: Failed to remove dead servers: too many dead servers: %d/%d", removalCount, peers) - } - - return nil -} - -// BasicAutopilot defines a policy for promoting non-voting servers in a way -// that maintains an odd-numbered voter count. -type BasicAutopilot struct { +// AutopilotDelegate is a Consul delegate for autopilot operations. +type AutopilotDelegate struct { server *Server } -// PromoteNonVoters promotes eligible non-voting servers to voters. -func (b *BasicAutopilot) PromoteNonVoters(autopilotConfig *structs.AutopilotConfig) error { - // If we don't meet the minimum version for non-voter features, bail - // early. - minRaftProtocol, err := ServerMinRaftProtocol(b.server.LANMembers()) - if err != nil { - return fmt.Errorf("error getting server raft protocol versions: %s", err) - } - if minRaftProtocol < 3 { - return nil - } +func (d *AutopilotDelegate) FetchStats(ctx context.Context, servers []*metadata.Server) map[string]*autopilot.ServerStats { + return d.server.statsFetcher.Fetch(ctx, servers) +} - // Find any non-voters eligible for promotion. - now := time.Now() - var promotions []raft.Server - future := b.server.raft.GetConfiguration() +func (d *AutopilotDelegate) GetOrCreateAutopilotConfig() (*autopilot.Config, bool) { + return d.server.getOrCreateAutopilotConfig() +} + +func (d *AutopilotDelegate) Raft() *raft.Raft { + return d.server.raft +} + +func (d *AutopilotDelegate) Serf() *serf.Serf { + return d.server.serfLAN +} + +func (d *AutopilotDelegate) NumPeers() (int, error) { + return d.server.numPeers() +} + +func (d *AutopilotDelegate) PromoteNonVoters(conf *autopilot.Config, health autopilot.OperatorHealthReply) ([]raft.Server, error) { + future := d.server.raft.GetConfiguration() if err := future.Error(); err != nil { - return fmt.Errorf("failed to get raft configuration: %v", err) - } - for _, server := range future.Configuration().Servers { - if !isVoter(server.Suffrage) { - health := b.server.getServerHealth(string(server.ID)) - if health.IsStable(now, autopilotConfig) { - promotions = append(promotions, server) - } - } + return nil, fmt.Errorf("failed to get raft configuration: %v", err) } - if err := b.server.handlePromotions(promotions); err != nil { - return err - } - return nil -} - -// handlePromotions is a helper shared with Consul Enterprise that attempts to -// apply desired server promotions to the Raft configuration. -func (s *Server) handlePromotions(promotions []raft.Server) error { - // This used to wait to only promote to maintain an odd quorum of - // servers, but this was at odds with the dead server cleanup when doing - // rolling updates (add one new server, wait, and then kill an old - // server). The dead server cleanup would still count the old server as - // a peer, which is conservative and the right thing to do, and this - // would wait to promote, so you could get into a stalemate. It is safer - // to promote early than remove early, so by promoting as soon as - // possible we have chosen that as the solution here. - for _, server := range promotions { - s.logger.Printf("[INFO] autopilot: Promoting %s to voter", fmtServer(server)) - addFuture := s.raft.AddVoter(server.ID, server.Address, 0, 0) - if err := addFuture.Error(); err != nil { - return fmt.Errorf("failed to add raft peer: %v", err) - } - } - - // If we promoted a server, trigger a check to remove dead servers. - if len(promotions) > 0 { - select { - case s.autopilotRemoveDeadCh <- struct{}{}: - default: - } - } - return nil -} - -// serverHealthLoop monitors the health of the servers in the cluster -func (s *Server) serverHealthLoop() { - // Monitor server health until shutdown - ticker := time.NewTicker(s.config.ServerHealthInterval) - defer ticker.Stop() - - for { - select { - case <-s.shutdownCh: - return - case <-ticker.C: - if err := s.updateClusterHealth(); err != nil { - s.logger.Printf("[ERR] autopilot: Error updating cluster health: %s", err) - } - } - } -} - -// updateClusterHealth fetches the Raft stats of the other servers and updates -// s.clusterHealth based on the configured Autopilot thresholds -func (s *Server) updateClusterHealth() error { - // Don't do anything if the min Raft version is too low - minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers()) - if err != nil { - return fmt.Errorf("error getting server raft protocol versions: %s", err) - } - if minRaftProtocol < 3 { - return nil - } - - state := s.fsm.State() - _, autopilotConf, err := state.AutopilotConfig() - if err != nil { - return fmt.Errorf("error retrieving autopilot config: %s", err) - } - // Bail early if autopilot config hasn't been initialized yet - if autopilotConf == nil { - return nil - } - - // Get the the serf members which are Consul servers - serverMap := make(map[string]*metadata.Server) - for _, member := range s.LANMembers() { - if member.Status == serf.StatusLeft { - continue - } - - valid, parts := metadata.IsConsulServer(member) - if valid { - serverMap[parts.ID] = parts - } - } - - future := s.raft.GetConfiguration() - if err := future.Error(); err != nil { - return fmt.Errorf("error getting Raft configuration %s", err) - } - servers := future.Configuration().Servers - - // Fetch the health for each of the servers in parallel so we get as - // consistent of a sample as possible. We capture the leader's index - // here as well so it roughly lines up with the same point in time. - targetLastIndex := s.raft.LastIndex() - var fetchList []*metadata.Server - for _, server := range servers { - if parts, ok := serverMap[string(server.ID)]; ok { - fetchList = append(fetchList, parts) - } - } - d := time.Now().Add(s.config.ServerHealthInterval / 2) - ctx, cancel := context.WithDeadline(context.Background(), d) - defer cancel() - fetchedStats := s.statsFetcher.Fetch(ctx, fetchList) - - // Build a current list of server healths - leader := s.raft.Leader() - var clusterHealth structs.OperatorHealthReply - voterCount := 0 - healthyCount := 0 - healthyVoterCount := 0 - for _, server := range servers { - health := structs.ServerHealth{ - ID: string(server.ID), - Address: string(server.Address), - Leader: server.Address == leader, - LastContact: -1, - Voter: server.Suffrage == raft.Voter, - } - - parts, ok := serverMap[string(server.ID)] - if ok { - health.Name = parts.Name - health.SerfStatus = parts.Status - health.Version = parts.Build.String() - if stats, ok := fetchedStats[string(server.ID)]; ok { - if err := s.updateServerHealth(&health, parts, stats, autopilotConf, targetLastIndex); err != nil { - s.logger.Printf("[WARN] autopilot: Error updating server %s health: %s", fmtServer(server), err) - } - } - } else { - health.SerfStatus = serf.StatusNone - } - - if health.Voter { - voterCount++ - } - if health.Healthy { - healthyCount++ - if health.Voter { - healthyVoterCount++ - } - } - - clusterHealth.Servers = append(clusterHealth.Servers, health) - } - clusterHealth.Healthy = healthyCount == len(servers) - - // If we have extra healthy voters, update FailureTolerance - requiredQuorum := voterCount/2 + 1 - if healthyVoterCount > requiredQuorum { - clusterHealth.FailureTolerance = healthyVoterCount - requiredQuorum - } - - // Heartbeat a metric for monitoring if we're the leader - if s.IsLeader() { - metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance)) - metrics.SetGauge([]string{"autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance)) - if clusterHealth.Healthy { - metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1) - metrics.SetGauge([]string{"autopilot", "healthy"}, 1) - } else { - metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0) - metrics.SetGauge([]string{"autopilot", "healthy"}, 0) - } - } - - s.clusterHealthLock.Lock() - s.clusterHealth = clusterHealth - s.clusterHealthLock.Unlock() - - return nil -} - -// updateServerHealth computes the resulting health of the server based on its -// fetched stats and the state of the leader. -func (s *Server) updateServerHealth(health *structs.ServerHealth, - server *metadata.Server, stats *structs.ServerStats, - autopilotConf *structs.AutopilotConfig, targetLastIndex uint64) error { - - health.LastTerm = stats.LastTerm - health.LastIndex = stats.LastIndex - - if stats.LastContact != "never" { - var err error - health.LastContact, err = time.ParseDuration(stats.LastContact) - if err != nil { - return fmt.Errorf("error parsing last_contact duration: %s", err) - } - } - - lastTerm, err := strconv.ParseUint(s.raft.Stats()["last_log_term"], 10, 64) - if err != nil { - return fmt.Errorf("error parsing last_log_term: %s", err) - } - health.Healthy = health.IsHealthy(lastTerm, targetLastIndex, autopilotConf) - - // If this is a new server or the health changed, reset StableSince - lastHealth := s.getServerHealth(server.ID) - if lastHealth == nil || lastHealth.Healthy != health.Healthy { - health.StableSince = time.Now() - } else { - health.StableSince = lastHealth.StableSince - } - - return nil -} - -func (s *Server) getClusterHealth() structs.OperatorHealthReply { - s.clusterHealthLock.RLock() - defer s.clusterHealthLock.RUnlock() - return s.clusterHealth -} - -func (s *Server) getServerHealth(id string) *structs.ServerHealth { - s.clusterHealthLock.RLock() - defer s.clusterHealthLock.RUnlock() - for _, health := range s.clusterHealth.Servers { - if health.ID == id { - return &health - } - } - return nil -} - -func isVoter(suffrage raft.ServerSuffrage) bool { - switch suffrage { - case raft.Voter, raft.Staging: - return true - default: - return false - } + return autopilot.PromoteStableServers(conf, health, future.Configuration().Servers), nil } diff --git a/agent/consul/autopilot/autopilot.go b/agent/consul/autopilot/autopilot.go new file mode 100644 index 0000000000..901ccf8eb1 --- /dev/null +++ b/agent/consul/autopilot/autopilot.go @@ -0,0 +1,457 @@ +package autopilot + +import ( + "context" + "fmt" + "log" + "strconv" + "sync" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/consul/agent/metadata" + "github.com/hashicorp/raft" + "github.com/hashicorp/serf/serf" +) + +// Delegate is the interface for the Autopilot mechanism +type Delegate interface { + FetchStats(ctx context.Context, servers []*metadata.Server) map[string]*ServerStats + GetOrCreateAutopilotConfig() (*Config, bool) + NumPeers() (int, error) + PromoteNonVoters(*Config, OperatorHealthReply) ([]raft.Server, error) + Raft() *raft.Raft + Serf() *serf.Serf +} + +// Autopilot is a mechanism for automatically managing the Raft +// quorum using server health information along with updates from Serf gossip. +// For more information, see https://www.consul.io/docs/guides/autopilot.html +type Autopilot struct { + logger *log.Logger + delegate Delegate + validServerFunc func(serf.Member) bool + + interval time.Duration + healthInterval time.Duration + + clusterHealth OperatorHealthReply + clusterHealthLock sync.RWMutex + + removeDeadCh chan struct{} + shutdownCh chan struct{} + waitGroup sync.WaitGroup +} + +func NewAutopilot(logger *log.Logger, delegate Delegate, serverFunc func(serf.Member) bool, interval, healthInterval time.Duration) *Autopilot { + return &Autopilot{ + logger: logger, + delegate: delegate, + validServerFunc: serverFunc, + interval: interval, + healthInterval: healthInterval, + } +} + +func (a *Autopilot) Start() { + a.removeDeadCh = make(chan struct{}) + a.shutdownCh = make(chan struct{}) + a.waitGroup = sync.WaitGroup{} + a.waitGroup.Add(1) + + go a.run() +} + +func (a *Autopilot) Stop() { + close(a.shutdownCh) + a.waitGroup.Wait() +} + +// autopilotLoop periodically looks for nonvoting servers to promote and dead servers to remove. +func (a *Autopilot) run() { + defer a.waitGroup.Done() + + // Monitor server health until shutdown + ticker := time.NewTicker(a.interval) + defer ticker.Stop() + + for { + select { + case <-a.shutdownCh: + return + case <-ticker.C: + autopilotConfig, ok := a.delegate.GetOrCreateAutopilotConfig() + if !ok { + continue + } + + // Skip the non-voter promotions unless all servers support the new APIs + minRaftProtocol, err := a.MinRaftProtocol() + if err != nil { + a.logger.Printf("[ERR] autopilot: error getting server raft protocol versions: %s", err) + continue + } + if minRaftProtocol >= 3 { + promotions, err := a.delegate.PromoteNonVoters(autopilotConfig, a.GetClusterHealth()) + if err != nil { + a.logger.Printf("[ERR] autopilot: Error checking for non-voters to promote: %s", err) + } + if err := a.handlePromotions(promotions); err != nil { + a.logger.Printf("[ERR] autopilot: Error handling promotions: %s", err) + } + } + + if err := a.pruneDeadServers(autopilotConfig); err != nil { + a.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err) + } + case <-a.removeDeadCh: + autopilotConfig, ok := a.delegate.GetOrCreateAutopilotConfig() + if !ok { + continue + } + + if err := a.pruneDeadServers(autopilotConfig); err != nil { + a.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err) + } + } + } +} + +// fmtServer prints info about a server in a standard way for logging. +func fmtServer(server raft.Server) string { + return fmt.Sprintf("Server (ID: %q Address: %q)", server.ID, server.Address) +} + +// pruneDeadServers removes up to numPeers/2 failed servers +func (a *Autopilot) pruneDeadServers(conf *Config) error { + if !conf.CleanupDeadServers { + return nil + } + + // Failed servers are known to Serf and marked failed, and stale servers + // are known to Raft but not Serf. + var failed []string + staleRaftServers := make(map[string]raft.Server) + raftNode := a.delegate.Raft() + future := raftNode.GetConfiguration() + if err := future.Error(); err != nil { + return err + } + for _, server := range future.Configuration().Servers { + staleRaftServers[string(server.Address)] = server + } + serfLAN := a.delegate.Serf() + for _, member := range serfLAN.Members() { + valid, parts := metadata.IsConsulServer(member) + if valid { + if _, ok := staleRaftServers[parts.Addr.String()]; ok { + delete(staleRaftServers, parts.Addr.String()) + } + + if member.Status == serf.StatusFailed { + failed = append(failed, member.Name) + } + } + } + + // We can bail early if there's nothing to do. + removalCount := len(failed) + len(staleRaftServers) + if removalCount == 0 { + return nil + } + + // Only do removals if a minority of servers will be affected. + peers, err := a.delegate.NumPeers() + if err != nil { + return err + } + if removalCount < peers/2 { + for _, node := range failed { + a.logger.Printf("[INFO] autopilot: Attempting removal of failed server node %q", node) + go serfLAN.RemoveFailedNode(node) + } + + minRaftProtocol, err := a.MinRaftProtocol() + if err != nil { + return err + } + for _, raftServer := range staleRaftServers { + a.logger.Printf("[INFO] autopilot: Attempting removal of stale %s", fmtServer(raftServer)) + var future raft.Future + if minRaftProtocol >= 2 { + future = raftNode.RemoveServer(raftServer.ID, 0, 0) + } else { + future = raftNode.RemovePeer(raftServer.Address) + } + if err := future.Error(); err != nil { + return err + } + } + } else { + a.logger.Printf("[DEBUG] autopilot: Failed to remove dead servers: too many dead servers: %d/%d", removalCount, peers) + } + + return nil +} + +// MinRaftProtocol returns the lowest supported Raft protocol among alive servers +func (a *Autopilot) MinRaftProtocol() (int, error) { + minVersion := -1 + members := a.delegate.Serf().Members() + for _, m := range members { + if m.Status != serf.StatusAlive { + continue + } + + if !a.validServerFunc(m) { + continue + } + + vsn, ok := m.Tags["raft_vsn"] + if !ok { + vsn = "1" + } + raftVsn, err := strconv.Atoi(vsn) + if err != nil { + return -1, err + } + + if minVersion == -1 || raftVsn < minVersion { + minVersion = raftVsn + } + } + + if minVersion == -1 { + return minVersion, fmt.Errorf("No servers found") + } + + return minVersion, nil +} + +// handlePromotions is a helper shared with Consul Enterprise that attempts to +// apply desired server promotions to the Raft configuration. +func (a *Autopilot) handlePromotions(promotions []raft.Server) error { + // This used to wait to only promote to maintain an odd quorum of + // servers, but this was at odds with the dead server cleanup when doing + // rolling updates (add one new server, wait, and then kill an old + // server). The dead server cleanup would still count the old server as + // a peer, which is conservative and the right thing to do, and this + // would wait to promote, so you could get into a stalemate. It is safer + // to promote early than remove early, so by promoting as soon as + // possible we have chosen that as the solution here. + for _, server := range promotions { + a.logger.Printf("[INFO] autopilot: Promoting %s to voter", fmtServer(server)) + addFuture := a.delegate.Raft().AddVoter(server.ID, server.Address, 0, 0) + if err := addFuture.Error(); err != nil { + return fmt.Errorf("failed to add raft peer: %v", err) + } + } + + // If we promoted a server, trigger a check to remove dead servers. + if len(promotions) > 0 { + select { + case a.removeDeadCh <- struct{}{}: + default: + } + } + return nil +} + +// ServerHealthLoop monitors the health of the servers in the cluster +func (a *Autopilot) ServerHealthLoop(shutdownCh <-chan struct{}) { + // Monitor server health until shutdown + ticker := time.NewTicker(a.healthInterval) + defer ticker.Stop() + + for { + select { + case <-shutdownCh: + return + case <-ticker.C: + if err := a.updateClusterHealth(); err != nil { + a.logger.Printf("[ERR] autopilot: Error updating cluster health: %s", err) + } + } + } +} + +// updateClusterHealth fetches the Raft stats of the other servers and updates +// s.clusterHealth based on the configured Autopilot thresholds +func (a *Autopilot) updateClusterHealth() error { + // Don't do anything if the min Raft version is too low + minRaftProtocol, err := a.MinRaftProtocol() + if err != nil { + return fmt.Errorf("error getting server raft protocol versions: %s", err) + } + if minRaftProtocol < 3 { + return nil + } + + autopilotConf, ok := a.delegate.GetOrCreateAutopilotConfig() + if !ok { + return nil + } + // Bail early if autopilot config hasn't been initialized yet + if autopilotConf == nil { + return nil + } + + // Get the the serf members which are Consul servers + serverMap := make(map[string]*metadata.Server) + for _, member := range a.delegate.Serf().Members() { + if member.Status == serf.StatusLeft { + continue + } + + valid, parts := metadata.IsConsulServer(member) + if valid { + serverMap[parts.ID] = parts + } + } + + raftNode := a.delegate.Raft() + future := raftNode.GetConfiguration() + if err := future.Error(); err != nil { + return fmt.Errorf("error getting Raft configuration %s", err) + } + servers := future.Configuration().Servers + + // Fetch the health for each of the servers in parallel so we get as + // consistent of a sample as possible. We capture the leader's index + // here as well so it roughly lines up with the same point in time. + targetLastIndex := raftNode.LastIndex() + var fetchList []*metadata.Server + for _, server := range servers { + if parts, ok := serverMap[string(server.ID)]; ok { + fetchList = append(fetchList, parts) + } + } + d := time.Now().Add(a.healthInterval / 2) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() + fetchedStats := a.delegate.FetchStats(ctx, fetchList) + + // Build a current list of server healths + leader := raftNode.Leader() + var clusterHealth OperatorHealthReply + voterCount := 0 + healthyCount := 0 + healthyVoterCount := 0 + for _, server := range servers { + health := ServerHealth{ + ID: string(server.ID), + Address: string(server.Address), + Leader: server.Address == leader, + LastContact: -1, + Voter: server.Suffrage == raft.Voter, + } + + parts, ok := serverMap[string(server.ID)] + if ok { + health.Name = parts.Name + health.SerfStatus = parts.Status + health.Version = parts.Build.String() + if stats, ok := fetchedStats[string(server.ID)]; ok { + if err := a.updateServerHealth(&health, parts, stats, autopilotConf, targetLastIndex); err != nil { + a.logger.Printf("[WARN] autopilot: Error updating server %s health: %s", fmtServer(server), err) + } + } + } else { + health.SerfStatus = serf.StatusNone + } + + if health.Voter { + voterCount++ + } + if health.Healthy { + healthyCount++ + if health.Voter { + healthyVoterCount++ + } + } + + clusterHealth.Servers = append(clusterHealth.Servers, health) + } + clusterHealth.Healthy = healthyCount == len(servers) + + // If we have extra healthy voters, update FailureTolerance + requiredQuorum := voterCount/2 + 1 + if healthyVoterCount > requiredQuorum { + clusterHealth.FailureTolerance = healthyVoterCount - requiredQuorum + } + + // Heartbeat a metric for monitoring if we're the leader + if raftNode.State() == raft.Leader { + metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance)) + metrics.SetGauge([]string{"autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance)) + if clusterHealth.Healthy { + metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1) + metrics.SetGauge([]string{"autopilot", "healthy"}, 1) + } else { + metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0) + metrics.SetGauge([]string{"autopilot", "healthy"}, 0) + } + } + + a.clusterHealthLock.Lock() + a.clusterHealth = clusterHealth + a.clusterHealthLock.Unlock() + + return nil +} + +// updateServerHealth computes the resulting health of the server based on its +// fetched stats and the state of the leader. +func (a *Autopilot) updateServerHealth(health *ServerHealth, + server *metadata.Server, stats *ServerStats, + autopilotConf *Config, targetLastIndex uint64) error { + + health.LastTerm = stats.LastTerm + health.LastIndex = stats.LastIndex + + if stats.LastContact != "never" { + var err error + health.LastContact, err = time.ParseDuration(stats.LastContact) + if err != nil { + return fmt.Errorf("error parsing last_contact duration: %s", err) + } + } + + raftNode := a.delegate.Raft() + lastTerm, err := strconv.ParseUint(raftNode.Stats()["last_log_term"], 10, 64) + if err != nil { + return fmt.Errorf("error parsing last_log_term: %s", err) + } + health.Healthy = health.IsHealthy(lastTerm, targetLastIndex, autopilotConf) + + // If this is a new server or the health changed, reset StableSince + lastHealth := a.GetServerHealth(server.ID) + if lastHealth == nil || lastHealth.Healthy != health.Healthy { + health.StableSince = time.Now() + } else { + health.StableSince = lastHealth.StableSince + } + + return nil +} + +func (a *Autopilot) GetClusterHealth() OperatorHealthReply { + a.clusterHealthLock.RLock() + defer a.clusterHealthLock.RUnlock() + return a.clusterHealth +} + +func (a *Autopilot) GetServerHealth(id string) *ServerHealth { + a.clusterHealthLock.RLock() + defer a.clusterHealthLock.RUnlock() + return a.clusterHealth.ServerHealth(id) +} + +func isVoter(suffrage raft.ServerSuffrage) bool { + switch suffrage { + case raft.Voter, raft.Staging: + return true + default: + return false + } +} diff --git a/agent/consul/autopilot/promotion.go b/agent/consul/autopilot/promotion.go new file mode 100644 index 0000000000..ff008445ff --- /dev/null +++ b/agent/consul/autopilot/promotion.go @@ -0,0 +1,26 @@ +package autopilot + +import ( + "time" + + "github.com/hashicorp/raft" +) + +// PromoteStableServers is a basic autopilot promotion policy that promotes any +// server which has been healthy and stable for the duration specified in the +// given Autopilot config. +func PromoteStableServers(autopilotConfig *Config, health OperatorHealthReply, servers []raft.Server) []raft.Server { + // Find any non-voters eligible for promotion. + now := time.Now() + var promotions []raft.Server + for _, server := range servers { + if !isVoter(server.Suffrage) { + health := health.ServerHealth(string(server.ID)) + if health.IsStable(now, autopilotConfig) { + promotions = append(promotions, server) + } + } + } + + return promotions +} diff --git a/agent/consul/autopilot/promotion_test.go b/agent/consul/autopilot/promotion_test.go new file mode 100644 index 0000000000..51d04a4937 --- /dev/null +++ b/agent/consul/autopilot/promotion_test.go @@ -0,0 +1,102 @@ +package autopilot + +import ( + "testing" + "time" + + "github.com/hashicorp/raft" + "github.com/pascaldekloe/goe/verify" +) + +func TestPromotion(t *testing.T) { + config := &Config{ + LastContactThreshold: 5 * time.Second, + MaxTrailingLogs: 100, + ServerStabilizationTime: 3 * time.Second, + } + + cases := []struct { + name string + conf *Config + health OperatorHealthReply + servers []raft.Server + promotions []raft.Server + }{ + { + name: "one stable voter, no promotions", + conf: config, + health: OperatorHealthReply{ + Servers: []ServerHealth{ + { + ID: "a", + Healthy: true, + StableSince: time.Now().Add(-10 * time.Second), + }, + }, + }, + servers: []raft.Server{ + {ID: "a", Suffrage: raft.Voter}, + }, + promotions: []raft.Server{}, + }, + { + name: "one stable nonvoter, should be promoted", + conf: config, + health: OperatorHealthReply{ + Servers: []ServerHealth{ + { + ID: "a", + Healthy: true, + StableSince: time.Now().Add(-10 * time.Second), + }, + { + ID: "b", + Healthy: true, + StableSince: time.Now().Add(-10 * time.Second), + }, + }, + }, + servers: []raft.Server{ + {ID: "a", Suffrage: raft.Voter}, + {ID: "b", Suffrage: raft.Nonvoter}, + }, + promotions: []raft.Server{ + {ID: "b", Suffrage: raft.Nonvoter}, + }, + }, + { + name: "unstable servers, neither should be promoted", + conf: config, + health: OperatorHealthReply{ + Servers: []ServerHealth{ + { + ID: "a", + Healthy: true, + StableSince: time.Now().Add(-10 * time.Second), + }, + { + ID: "b", + Healthy: false, + StableSince: time.Now().Add(-10 * time.Second), + }, + { + ID: "c", + Healthy: true, + StableSince: time.Now().Add(-1 * time.Second), + }, + }, + }, + servers: []raft.Server{ + {ID: "a", Suffrage: raft.Voter}, + {ID: "b", Suffrage: raft.Nonvoter}, + {ID: "c", Suffrage: raft.Nonvoter}, + }, + promotions: []raft.Server{}, + }, + } + + for _, tc := range cases { + promotions := PromoteStableServers(tc.conf, tc.health, tc.servers) + verify.Values(t, tc.name, tc.promotions, promotions) + } +} diff --git a/agent/consul/autopilot/structs.go b/agent/consul/autopilot/structs.go new file mode 100644 index 0000000000..54a5025ace --- /dev/null +++ b/agent/consul/autopilot/structs.go @@ -0,0 +1,158 @@ +package autopilot + +import ( + "time" + + "github.com/hashicorp/serf/serf" +) + +// Config holds the Autopilot configuration for a cluster. +type Config struct { + // CleanupDeadServers controls whether to remove dead servers when a new + // server is added to the Raft peers. + CleanupDeadServers bool + + // LastContactThreshold is the limit on the amount of time a server can go + // without leader contact before being considered unhealthy. + LastContactThreshold time.Duration + + // MaxTrailingLogs is the amount of entries in the Raft Log that a server can + // be behind before being considered unhealthy. + MaxTrailingLogs uint64 + + // ServerStabilizationTime is the minimum amount of time a server must be + // in a stable, healthy state before it can be added to the cluster. Only + // applicable with Raft protocol version 3 or higher. + ServerStabilizationTime time.Duration + + // (Enterprise-only) RedundancyZoneTag is the node tag to use for separating + // servers into zones for redundancy. If left blank, this feature will be disabled. + RedundancyZoneTag string + + // (Enterprise-only) DisableUpgradeMigration will disable Autopilot's upgrade migration + // strategy of waiting until enough newer-versioned servers have been added to the + // cluster before promoting them to voters. + DisableUpgradeMigration bool + + // (Enterprise-only) UpgradeVersionTag is the node tag to use for version info when + // performing upgrade migrations. If left blank, the Consul version will be used. + UpgradeVersionTag string + + // CreateIndex/ModifyIndex store the create/modify indexes of this configuration. + CreateIndex uint64 + ModifyIndex uint64 +} + +// ServerHealth is the health (from the leader's point of view) of a server. +type ServerHealth struct { + // ID is the raft ID of the server. + ID string + + // Name is the node name of the server. + Name string + + // Address is the address of the server. + Address string + + // The status of the SerfHealth check for the server. + SerfStatus serf.MemberStatus + + // Version is the version of the server. + Version string + + // Leader is whether this server is currently the leader. + Leader bool + + // LastContact is the time since this node's last contact with the leader. + LastContact time.Duration + + // LastTerm is the highest leader term this server has a record of in its Raft log. + LastTerm uint64 + + // LastIndex is the last log index this server has a record of in its Raft log. + LastIndex uint64 + + // Healthy is whether or not the server is healthy according to the current + // Autopilot config. + Healthy bool + + // Voter is whether this is a voting server. + Voter bool + + // StableSince is the last time this server's Healthy value changed. + StableSince time.Time +} + +// IsHealthy determines whether this ServerHealth is considered healthy +// based on the given Autopilot config +func (h *ServerHealth) IsHealthy(lastTerm uint64, leaderLastIndex uint64, autopilotConf *Config) bool { + if h.SerfStatus != serf.StatusAlive { + return false + } + + if h.LastContact > autopilotConf.LastContactThreshold || h.LastContact < 0 { + return false + } + + if h.LastTerm != lastTerm { + return false + } + + if leaderLastIndex > autopilotConf.MaxTrailingLogs && h.LastIndex < leaderLastIndex-autopilotConf.MaxTrailingLogs { + return false + } + + return true +} + +// IsStable returns true if the ServerHealth shows a stable, passing state +// according to the given AutopilotConfig +func (h *ServerHealth) IsStable(now time.Time, conf *Config) bool { + if h == nil { + return false + } + + if !h.Healthy { + return false + } + + if now.Sub(h.StableSince) < conf.ServerStabilizationTime { + return false + } + + return true +} + +// ServerStats holds miscellaneous Raft metrics for a server +type ServerStats struct { + // LastContact is the time since this node's last contact with the leader. + LastContact string + + // LastTerm is the highest leader term this server has a record of in its Raft log. + LastTerm uint64 + + // LastIndex is the last log index this server has a record of in its Raft log. + LastIndex uint64 +} + +// OperatorHealthReply is a representation of the overall health of the cluster +type OperatorHealthReply struct { + // Healthy is true if all the servers in the cluster are healthy. + Healthy bool + + // FailureTolerance is the number of healthy servers that could be lost without + // an outage occurring. + FailureTolerance int + + // Servers holds the health of each server. + Servers []ServerHealth +} + +func (o *OperatorHealthReply) ServerHealth(id string) *ServerHealth { + for _, health := range o.Servers { + if health.ID == id { + return &health + } + } + return nil +} diff --git a/agent/structs/operator_test.go b/agent/consul/autopilot/structs_test.go similarity index 85% rename from agent/structs/operator_test.go rename to agent/consul/autopilot/structs_test.go index 8a0916f113..6065766415 100644 --- a/agent/structs/operator_test.go +++ b/agent/consul/autopilot/structs_test.go @@ -1,4 +1,4 @@ -package structs +package autopilot import ( "testing" @@ -12,7 +12,7 @@ func TestServerHealth_IsHealthy(t *testing.T) { health ServerHealth lastTerm uint64 lastIndex uint64 - conf AutopilotConfig + conf Config expected bool }{ // Healthy server, all values within allowed limits @@ -20,7 +20,7 @@ func TestServerHealth_IsHealthy(t *testing.T) { health: ServerHealth{SerfStatus: serf.StatusAlive, LastTerm: 1, LastIndex: 0}, lastTerm: 1, lastIndex: 10, - conf: AutopilotConfig{MaxTrailingLogs: 20}, + conf: Config{MaxTrailingLogs: 20}, expected: true, }, // Serf status failed @@ -38,7 +38,7 @@ func TestServerHealth_IsHealthy(t *testing.T) { { health: ServerHealth{SerfStatus: serf.StatusAlive, LastIndex: 0}, lastIndex: 10, - conf: AutopilotConfig{MaxTrailingLogs: 5}, + conf: Config{MaxTrailingLogs: 5}, expected: false, }, } @@ -56,14 +56,14 @@ func TestServerHealth_IsStable(t *testing.T) { cases := []struct { health *ServerHealth now time.Time - conf AutopilotConfig + conf Config expected bool }{ // Healthy server, all values within allowed limits { health: &ServerHealth{Healthy: true, StableSince: start}, now: start.Add(15 * time.Second), - conf: AutopilotConfig{ServerStabilizationTime: 10 * time.Second}, + conf: Config{ServerStabilizationTime: 10 * time.Second}, expected: true, }, // Unhealthy server @@ -75,7 +75,7 @@ func TestServerHealth_IsStable(t *testing.T) { { health: &ServerHealth{Healthy: true, StableSince: start}, now: start.Add(5 * time.Second), - conf: AutopilotConfig{ServerStabilizationTime: 10 * time.Second}, + conf: Config{ServerStabilizationTime: 10 * time.Second}, expected: false, }, // Nil struct diff --git a/agent/consul/autopilot_test.go b/agent/consul/autopilot_test.go index 822022f9a7..f7be3273da 100644 --- a/agent/consul/autopilot_test.go +++ b/agent/consul/autopilot_test.go @@ -301,7 +301,7 @@ func TestAutopilot_PromoteNonVoter(t *testing.T) { if servers[1].Suffrage != raft.Nonvoter { r.Fatalf("bad: %v", servers) } - health := s1.getServerHealth(string(servers[1].ID)) + health := s1.autopilot.GetServerHealth(string(servers[1].ID)) if health == nil { r.Fatal("nil health") } diff --git a/agent/consul/config.go b/agent/consul/config.go index ffbd35ef9b..6c56d978ff 100644 --- a/agent/consul/config.go +++ b/agent/consul/config.go @@ -7,7 +7,7 @@ import ( "os" "time" - "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/types" @@ -336,7 +336,7 @@ type Config struct { // AutopilotConfig is used to apply the initial autopilot config when // bootstrapping. - AutopilotConfig *structs.AutopilotConfig + AutopilotConfig *autopilot.Config // ServerHealthInterval is the frequency with which the health of the // servers in the cluster will be updated. @@ -416,7 +416,7 @@ func DefaultConfig() *Config { TLSMinVersion: "tls10", - AutopilotConfig: &structs.AutopilotConfig{ + AutopilotConfig: &autopilot.Config{ CleanupDeadServers: true, LastContactThreshold: 200 * time.Millisecond, MaxTrailingLogs: 250, diff --git a/agent/consul/fsm/commands_oss_test.go b/agent/consul/fsm/commands_oss_test.go index b98d9ea5e3..ccf58a47f4 100644 --- a/agent/consul/fsm/commands_oss_test.go +++ b/agent/consul/fsm/commands_oss_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/types" @@ -1096,7 +1097,7 @@ func TestFSM_Autopilot(t *testing.T) { // Set the autopilot config using a request. req := structs.AutopilotSetConfigRequest{ Datacenter: "dc1", - Config: structs.AutopilotConfig{ + Config: autopilot.Config{ CleanupDeadServers: true, LastContactThreshold: 10 * time.Second, MaxTrailingLogs: 300, diff --git a/agent/consul/fsm/snapshot_oss.go b/agent/consul/fsm/snapshot_oss.go index 4bf7537222..be7bfc5a8a 100644 --- a/agent/consul/fsm/snapshot_oss.go +++ b/agent/consul/fsm/snapshot_oss.go @@ -1,6 +1,7 @@ package fsm import ( + "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/go-msgpack/codec" @@ -354,7 +355,7 @@ func restorePreparedQuery(header *snapshotHeader, restore *state.Restore, decode } func restoreAutopilot(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error { - var req structs.AutopilotConfig + var req autopilot.Config if err := decoder.Decode(&req); err != nil { return err } diff --git a/agent/consul/fsm/snapshot_oss_test.go b/agent/consul/fsm/snapshot_oss_test.go index ff2419f693..8b8544420b 100644 --- a/agent/consul/fsm/snapshot_oss_test.go +++ b/agent/consul/fsm/snapshot_oss_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" @@ -88,7 +89,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { t.Fatalf("err: %s", err) } - autopilotConf := &structs.AutopilotConfig{ + autopilotConf := &autopilot.Config{ CleanupDeadServers: true, LastContactThreshold: 100 * time.Millisecond, MaxTrailingLogs: 222, diff --git a/agent/consul/leader.go b/agent/consul/leader.go index a96195016b..07c0fc2cd6 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -9,6 +9,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" @@ -23,6 +24,8 @@ const ( barrierWriteTimeout = 2 * time.Minute ) +var minAutopilotVersion = version.Must(version.NewVersion("0.8.0")) + // monitorLeadership is used to monitor if we acquire or lose our role // as the leader in the Raft cluster. There is some work the leader is // expected to do, so we must react to changes @@ -202,7 +205,7 @@ func (s *Server) establishLeadership() error { } s.getOrCreateAutopilotConfig() - s.startAutopilot() + s.autopilot.Start() s.setConsistentReadReady() return nil } @@ -220,7 +223,7 @@ func (s *Server) revokeLeadership() error { } s.resetConsistentReadReady() - s.stopAutopilot() + s.autopilot.Stop() return nil } @@ -326,7 +329,7 @@ func (s *Server) initializeACL() error { } // getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary -func (s *Server) getOrCreateAutopilotConfig() (*structs.AutopilotConfig, bool) { +func (s *Server) getOrCreateAutopilotConfig() (*autopilot.Config, bool) { state := s.fsm.State() _, config, err := state.AutopilotConfig() if err != nil { @@ -681,7 +684,7 @@ func (s *Server) joinConsulServer(m serf.Member, parts *metadata.Server) error { // log entries. If the address is the same but the ID changed, remove the // old server before adding the new one. addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String() - minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members()) + minRaftProtocol, err := s.autopilot.MinRaftProtocol() if err != nil { return err } @@ -756,7 +759,7 @@ func (s *Server) removeConsulServer(m serf.Member, port int) error { return err } - minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members()) + minRaftProtocol, err := s.autopilot.MinRaftProtocol() if err != nil { return err } diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index 2e106c4636..a7d2c95d1d 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -812,7 +812,7 @@ func TestLeader_RollRaftServer(t *testing.T) { for _, s := range []*Server{s1, s3} { retry.Run(t, func(r *retry.R) { - minVer, err := ServerMinRaftProtocol(s.LANMembers()) + minVer, err := s.autopilot.MinRaftProtocol() if err != nil { r.Fatal(err) } diff --git a/agent/consul/operator_autopilot_endpoint.go b/agent/consul/operator_autopilot_endpoint.go index 0413910b67..3f702cb182 100644 --- a/agent/consul/operator_autopilot_endpoint.go +++ b/agent/consul/operator_autopilot_endpoint.go @@ -4,11 +4,12 @@ import ( "fmt" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/structs" ) // AutopilotGetConfiguration is used to retrieve the current Autopilot configuration. -func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *structs.AutopilotConfig) error { +func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *autopilot.Config) error { if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done { return err } @@ -69,7 +70,7 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe } // ServerHealth is used to get the current health of the servers. -func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs.OperatorHealthReply) error { +func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *autopilot.OperatorHealthReply) error { // This must be sent to the leader, so we fix the args since we are // re-using a structure where we don't support all the options. args.RequireConsistent = true @@ -88,7 +89,7 @@ func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs } // Exit early if the min Raft version is too low - minRaftProtocol, err := ServerMinRaftProtocol(op.srv.LANMembers()) + minRaftProtocol, err := op.srv.autopilot.MinRaftProtocol() if err != nil { return fmt.Errorf("error getting server raft protocol versions: %s", err) } @@ -96,7 +97,7 @@ func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs return fmt.Errorf("all servers must have raft_protocol set to 3 or higher to use this endpoint") } - *reply = op.srv.getClusterHealth() + *reply = op.srv.autopilot.GetClusterHealth() return nil } diff --git a/agent/consul/operator_autopilot_endpoint_test.go b/agent/consul/operator_autopilot_endpoint_test.go index f2d2e3acf5..34d36f8518 100644 --- a/agent/consul/operator_autopilot_endpoint_test.go +++ b/agent/consul/operator_autopilot_endpoint_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testutil/retry" @@ -29,7 +30,7 @@ func TestOperator_Autopilot_GetConfiguration(t *testing.T) { arg := structs.DCSpecificRequest{ Datacenter: "dc1", } - var reply structs.AutopilotConfig + var reply autopilot.Config err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply) if err != nil { t.Fatalf("err: %v", err) @@ -58,7 +59,7 @@ func TestOperator_Autopilot_GetConfiguration_ACLDeny(t *testing.T) { arg := structs.DCSpecificRequest{ Datacenter: "dc1", } - var reply structs.AutopilotConfig + var reply autopilot.Config err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply) if !acl.IsErrPermissionDenied(err) { t.Fatalf("err: %v", err) @@ -112,7 +113,7 @@ func TestOperator_Autopilot_SetConfiguration(t *testing.T) { // Change the autopilot config from the default arg := structs.AutopilotSetConfigRequest{ Datacenter: "dc1", - Config: structs.AutopilotConfig{ + Config: autopilot.Config{ CleanupDeadServers: true, }, } @@ -151,7 +152,7 @@ func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) { // Try to set config without permissions arg := structs.AutopilotSetConfigRequest{ Datacenter: "dc1", - Config: structs.AutopilotConfig{ + Config: autopilot.Config{ CleanupDeadServers: true, }, } @@ -232,7 +233,7 @@ func TestOperator_ServerHealth(t *testing.T) { arg := structs.DCSpecificRequest{ Datacenter: "dc1", } - var reply structs.OperatorHealthReply + var reply autopilot.OperatorHealthReply err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply) if err != nil { r.Fatalf("err: %v", err) @@ -274,7 +275,7 @@ func TestOperator_ServerHealth_UnsupportedRaftVersion(t *testing.T) { arg := structs.DCSpecificRequest{ Datacenter: "dc1", } - var reply structs.OperatorHealthReply + var reply autopilot.OperatorHealthReply err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply) if err == nil || !strings.Contains(err.Error(), "raft_protocol set to 3 or higher") { t.Fatalf("bad: %v", err) diff --git a/agent/consul/operator_raft_endpoint.go b/agent/consul/operator_raft_endpoint.go index 418836b915..9cabb51a3e 100644 --- a/agent/consul/operator_raft_endpoint.go +++ b/agent/consul/operator_raft_endpoint.go @@ -115,7 +115,7 @@ REMOVE: // doing if you are calling this. If you remove a peer that's known to // Serf, for example, it will come back when the leader does a reconcile // pass. - minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members()) + minRaftProtocol, err := op.srv.autopilot.MinRaftProtocol() if err != nil { return err } @@ -182,7 +182,7 @@ REMOVE: // doing if you are calling this. If you remove a peer that's known to // Serf, for example, it will come back when the leader does a reconcile // pass. - minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members()) + minRaftProtocol, err := op.srv.autopilot.MinRaftProtocol() if err != nil { return err } diff --git a/agent/consul/server.go b/agent/consul/server.go index bb64e96f7e..63eb6bc695 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -18,6 +18,7 @@ import ( "time" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/consul/fsm" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/metadata" @@ -86,8 +87,8 @@ type Server struct { // aclCache is the non-authoritative ACL cache. aclCache *aclCache - // autopilotPolicy controls the behavior of Autopilot for certain tasks. - autopilotPolicy AutopilotPolicy + // autopilot is the Autopilot instance for this server. + autopilot *autopilot.Autopilot // autopilotRemoveDeadCh is used to trigger a check for dead server removals. autopilotRemoveDeadCh chan struct{} @@ -98,10 +99,6 @@ type Server struct { // autopilotWaitGroup is used to block until Autopilot shuts down. autopilotWaitGroup sync.WaitGroup - // clusterHealth stores the current view of the cluster's health. - clusterHealth structs.OperatorHealthReply - clusterHealthLock sync.RWMutex - // Consul configuration config *Config @@ -305,8 +302,12 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (* shutdownCh: shutdownCh, } - // Set up the autopilot policy - s.autopilotPolicy = &BasicAutopilot{server: s} + // Set up autopilot + apDelegate := &AutopilotDelegate{s} + serverFunc := func(m serf.Member) bool { + return m.Tags["role"] == "consul" + } + s.autopilot = autopilot.NewAutopilot(logger, apDelegate, serverFunc, config.AutopilotInterval, config.ServerHealthInterval) // Initialize the stats fetcher that autopilot will use. s.statsFetcher = NewStatsFetcher(logger, s.connPool, s.config.Datacenter) @@ -430,7 +431,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (* go s.sessionStats() // Start the server health checking. - go s.serverHealthLoop() + go s.autopilot.ServerHealthLoop(s.shutdownCh) return s, nil } @@ -732,7 +733,7 @@ func (s *Server) Leave() error { // removed for some sane period of time. isLeader := s.IsLeader() if isLeader && numPeers > 1 { - minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members()) + minRaftProtocol, err := s.autopilot.MinRaftProtocol() if err != nil { return err } diff --git a/agent/consul/server_serf.go b/agent/consul/server_serf.go index 42fce4c8c3..325b4c3321 100644 --- a/agent/consul/server_serf.go +++ b/agent/consul/server_serf.go @@ -304,7 +304,7 @@ func (s *Server) maybeBootstrap() { // Attempt a live bootstrap! var configuration raft.Configuration var addrs []string - minRaftVersion, err := ServerMinRaftProtocol(members) + minRaftVersion, err := s.autopilot.MinRaftProtocol() if err != nil { s.logger.Printf("[ERR] consul: Failed to read server raft versions: %v", err) } diff --git a/agent/consul/state/autopilot.go b/agent/consul/state/autopilot.go index 21514e5be2..1f94d8888c 100644 --- a/agent/consul/state/autopilot.go +++ b/agent/consul/state/autopilot.go @@ -3,7 +3,7 @@ package state import ( "fmt" - "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/go-memdb" ) @@ -30,13 +30,13 @@ func init() { } // Autopilot is used to pull the autopilot config from the snapshot. -func (s *Snapshot) Autopilot() (*structs.AutopilotConfig, error) { +func (s *Snapshot) Autopilot() (*autopilot.Config, error) { c, err := s.tx.First("autopilot-config", "id") if err != nil { return nil, err } - config, ok := c.(*structs.AutopilotConfig) + config, ok := c.(*autopilot.Config) if !ok { return nil, nil } @@ -45,7 +45,7 @@ func (s *Snapshot) Autopilot() (*structs.AutopilotConfig, error) { } // Autopilot is used when restoring from a snapshot. -func (s *Restore) Autopilot(config *structs.AutopilotConfig) error { +func (s *Restore) Autopilot(config *autopilot.Config) error { if err := s.tx.Insert("autopilot-config", config); err != nil { return fmt.Errorf("failed restoring autopilot config: %s", err) } @@ -54,7 +54,7 @@ func (s *Restore) Autopilot(config *structs.AutopilotConfig) error { } // AutopilotConfig is used to get the current Autopilot configuration. -func (s *Store) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) { +func (s *Store) AutopilotConfig() (uint64, *autopilot.Config, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -64,7 +64,7 @@ func (s *Store) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) { return 0, nil, fmt.Errorf("failed autopilot config lookup: %s", err) } - config, ok := c.(*structs.AutopilotConfig) + config, ok := c.(*autopilot.Config) if !ok { return 0, nil, nil } @@ -73,7 +73,7 @@ func (s *Store) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) { } // AutopilotSetConfig is used to set the current Autopilot configuration. -func (s *Store) AutopilotSetConfig(idx uint64, config *structs.AutopilotConfig) error { +func (s *Store) AutopilotSetConfig(idx uint64, config *autopilot.Config) error { tx := s.db.Txn(true) defer tx.Abort() @@ -86,7 +86,7 @@ func (s *Store) AutopilotSetConfig(idx uint64, config *structs.AutopilotConfig) // AutopilotCASConfig is used to try updating the Autopilot configuration with a // given Raft index. If the CAS index specified is not equal to the last observed index // for the config, then the call is a noop, -func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotConfig) (bool, error) { +func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *autopilot.Config) (bool, error) { tx := s.db.Txn(true) defer tx.Abort() @@ -99,7 +99,7 @@ func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotCo // If the existing index does not match the provided CAS // index arg, then we shouldn't update anything and can safely // return early here. - e, ok := existing.(*structs.AutopilotConfig) + e, ok := existing.(*autopilot.Config) if !ok || e.ModifyIndex != cidx { return false, nil } @@ -110,7 +110,7 @@ func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotCo return true, nil } -func (s *Store) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.AutopilotConfig) error { +func (s *Store) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *autopilot.Config) error { // Check for an existing config existing, err := tx.First("autopilot-config", "id") if err != nil { @@ -119,7 +119,7 @@ func (s *Store) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs // Set the indexes. if existing != nil { - config.CreateIndex = existing.(*structs.AutopilotConfig).CreateIndex + config.CreateIndex = existing.(*autopilot.Config).CreateIndex } else { config.CreateIndex = idx } diff --git a/agent/consul/state/autopilot_test.go b/agent/consul/state/autopilot_test.go index bfe1be8a00..dec6dc5a72 100644 --- a/agent/consul/state/autopilot_test.go +++ b/agent/consul/state/autopilot_test.go @@ -5,14 +5,14 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/pascaldekloe/goe/verify" ) func TestStateStore_Autopilot(t *testing.T) { s := testStateStore(t) - expected := &structs.AutopilotConfig{ + expected := &autopilot.Config{ CleanupDeadServers: true, LastContactThreshold: 5 * time.Second, MaxTrailingLogs: 500, @@ -41,7 +41,7 @@ func TestStateStore_Autopilot(t *testing.T) { func TestStateStore_AutopilotCAS(t *testing.T) { s := testStateStore(t) - expected := &structs.AutopilotConfig{ + expected := &autopilot.Config{ CleanupDeadServers: true, } @@ -53,7 +53,7 @@ func TestStateStore_AutopilotCAS(t *testing.T) { } // Do a CAS with an index lower than the entry - ok, err := s.AutopilotCASConfig(2, 0, &structs.AutopilotConfig{ + ok, err := s.AutopilotCASConfig(2, 0, &autopilot.Config{ CleanupDeadServers: false, }) if ok || err != nil { @@ -74,7 +74,7 @@ func TestStateStore_AutopilotCAS(t *testing.T) { } // Do another CAS, this time with the correct index - ok, err = s.AutopilotCASConfig(2, 1, &structs.AutopilotConfig{ + ok, err = s.AutopilotCASConfig(2, 1, &autopilot.Config{ CleanupDeadServers: false, }) if !ok || err != nil { @@ -96,7 +96,7 @@ func TestStateStore_AutopilotCAS(t *testing.T) { func TestStateStore_Autopilot_Snapshot_Restore(t *testing.T) { s := testStateStore(t) - before := &structs.AutopilotConfig{ + before := &autopilot.Config{ CleanupDeadServers: true, } if err := s.AutopilotSetConfig(99, before); err != nil { @@ -106,7 +106,7 @@ func TestStateStore_Autopilot_Snapshot_Restore(t *testing.T) { snap := s.Snapshot() defer snap.Close() - after := &structs.AutopilotConfig{ + after := &autopilot.Config{ CleanupDeadServers: false, } if err := s.AutopilotSetConfig(100, after); err != nil { diff --git a/agent/consul/stats_fetcher.go b/agent/consul/stats_fetcher.go index 25eefb9609..4736b0d4d0 100644 --- a/agent/consul/stats_fetcher.go +++ b/agent/consul/stats_fetcher.go @@ -5,9 +5,9 @@ import ( "log" "sync" + "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" - "github.com/hashicorp/consul/agent/structs" ) // StatsFetcher has two functions for autopilot. First, lets us fetch all the @@ -39,9 +39,9 @@ func NewStatsFetcher(logger *log.Logger, pool *pool.ConnPool, datacenter string) // cancel this when the context is canceled because we only want one in-flight // RPC to each server, so we let it finish and then clean up the in-flight // tracking. -func (f *StatsFetcher) fetch(server *metadata.Server, replyCh chan *structs.ServerStats) { +func (f *StatsFetcher) fetch(server *metadata.Server, replyCh chan *autopilot.ServerStats) { var args struct{} - var reply structs.ServerStats + var reply autopilot.ServerStats err := f.pool.RPC(f.datacenter, server.Addr, server.Version, "Status.RaftStats", server.UseTLS, &args, &reply) if err != nil { f.logger.Printf("[WARN] consul: error getting server health from %q: %v", @@ -56,10 +56,10 @@ func (f *StatsFetcher) fetch(server *metadata.Server, replyCh chan *structs.Serv } // Fetch will attempt to query all the servers in parallel. -func (f *StatsFetcher) Fetch(ctx context.Context, servers []*metadata.Server) map[string]*structs.ServerStats { +func (f *StatsFetcher) Fetch(ctx context.Context, servers []*metadata.Server) map[string]*autopilot.ServerStats { type workItem struct { server *metadata.Server - replyCh chan *structs.ServerStats + replyCh chan *autopilot.ServerStats } var work []*workItem @@ -72,7 +72,7 @@ func (f *StatsFetcher) Fetch(ctx context.Context, servers []*metadata.Server) ma } else { workItem := &workItem{ server: server, - replyCh: make(chan *structs.ServerStats, 1), + replyCh: make(chan *autopilot.ServerStats, 1), } work = append(work, workItem) f.inflight[server.ID] = struct{}{} @@ -83,7 +83,7 @@ func (f *StatsFetcher) Fetch(ctx context.Context, servers []*metadata.Server) ma // Now wait for the results to come in, or for the context to be // canceled. - replies := make(map[string]*structs.ServerStats) + replies := make(map[string]*autopilot.ServerStats) for _, workItem := range work { select { case reply := <-workItem.replyCh: diff --git a/agent/consul/status_endpoint.go b/agent/consul/status_endpoint.go index 33b7b48f5a..dac364444e 100644 --- a/agent/consul/status_endpoint.go +++ b/agent/consul/status_endpoint.go @@ -4,7 +4,7 @@ import ( "fmt" "strconv" - "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/consul/autopilot" ) // Status endpoint is used to check on server status @@ -42,7 +42,7 @@ func (s *Status) Peers(args struct{}, reply *[]string) error { } // Used by Autopilot to query the raft stats of the local server. -func (s *Status) RaftStats(args struct{}, reply *structs.ServerStats) error { +func (s *Status) RaftStats(args struct{}, reply *autopilot.ServerStats) error { stats := s.server.raft.Stats() var err error diff --git a/agent/consul/util.go b/agent/consul/util.go index c13b6fe701..14a2ab0154 100644 --- a/agent/consul/util.go +++ b/agent/consul/util.go @@ -93,35 +93,6 @@ func CanServersUnderstandProtocol(members []serf.Member, version uint8) (bool, e return (numServers > 0) && (numWhoGrok == numServers), nil } -// ServerMinRaftProtocol returns the lowest supported Raft protocol among alive servers -func ServerMinRaftProtocol(members []serf.Member) (int, error) { - minVersion := -1 - for _, m := range members { - if m.Tags["role"] != "consul" || m.Status != serf.StatusAlive { - continue - } - - vsn, ok := m.Tags["raft_vsn"] - if !ok { - vsn = "1" - } - raftVsn, err := strconv.Atoi(vsn) - if err != nil { - return -1, err - } - - if minVersion == -1 || raftVsn < minVersion { - minVersion = raftVsn - } - } - - if minVersion == -1 { - return minVersion, fmt.Errorf("No servers found") - } - - return minVersion, nil -} - // Returns if a member is a consul node. Returns a bool, // and the datacenter. func isConsulNode(m serf.Member) (bool, string) { diff --git a/agent/operator_endpoint.go b/agent/operator_endpoint.go index 7ae505b194..4cf580e201 100644 --- a/agent/operator_endpoint.go +++ b/agent/operator_endpoint.go @@ -6,6 +6,7 @@ import ( "strconv" "time" + "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" multierror "github.com/hashicorp/go-multierror" @@ -194,7 +195,7 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re return nil, nil } - var reply structs.AutopilotConfig + var reply autopilot.Config if err := s.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil { return nil, err } @@ -226,7 +227,7 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re return nil, nil } - args.Config = structs.AutopilotConfig{ + args.Config = autopilot.Config{ CleanupDeadServers: conf.CleanupDeadServers, LastContactThreshold: conf.LastContactThreshold.Duration(), MaxTrailingLogs: conf.MaxTrailingLogs, @@ -276,7 +277,7 @@ func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Re return nil, nil } - var reply structs.OperatorHealthReply + var reply autopilot.OperatorHealthReply if err := s.agent.RPC("Operator.ServerHealth", &args, &reply); err != nil { return nil, err } diff --git a/agent/operator_endpoint_test.go b/agent/operator_endpoint_test.go index e069ae48ed..a6b1ca5731 100644 --- a/agent/operator_endpoint_test.go +++ b/agent/operator_endpoint_test.go @@ -8,6 +8,7 @@ import ( "strings" "testing" + "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/testutil/retry" @@ -329,7 +330,7 @@ func TestOperator_AutopilotSetConfiguration(t *testing.T) { Datacenter: "dc1", } - var reply structs.AutopilotConfig + var reply autopilot.Config if err := a.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil { t.Fatalf("err: %v", err) } @@ -357,7 +358,7 @@ func TestOperator_AutopilotCASConfiguration(t *testing.T) { Datacenter: "dc1", } - var reply structs.AutopilotConfig + var reply autopilot.Config if err := a.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil { t.Fatalf("err: %v", err) } diff --git a/agent/structs/operator.go b/agent/structs/operator.go index 5fb1c4da1e..121bb2fa2e 100644 --- a/agent/structs/operator.go +++ b/agent/structs/operator.go @@ -2,48 +2,11 @@ package structs import ( "net" - "time" + "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/raft" - "github.com/hashicorp/serf/serf" ) -// AutopilotConfig holds the Autopilot configuration for a cluster. -type AutopilotConfig struct { - // CleanupDeadServers controls whether to remove dead servers when a new - // server is added to the Raft peers. - CleanupDeadServers bool - - // LastContactThreshold is the limit on the amount of time a server can go - // without leader contact before being considered unhealthy. - LastContactThreshold time.Duration - - // MaxTrailingLogs is the amount of entries in the Raft Log that a server can - // be behind before being considered unhealthy. - MaxTrailingLogs uint64 - - // ServerStabilizationTime is the minimum amount of time a server must be - // in a stable, healthy state before it can be added to the cluster. Only - // applicable with Raft protocol version 3 or higher. - ServerStabilizationTime time.Duration - - // (Enterprise-only) RedundancyZoneTag is the node tag to use for separating - // servers into zones for redundancy. If left blank, this feature will be disabled. - RedundancyZoneTag string - - // (Enterprise-only) DisableUpgradeMigration will disable Autopilot's upgrade migration - // strategy of waiting until enough newer-versioned servers have been added to the - // cluster before promoting them to voters. - DisableUpgradeMigration bool - - // (Enterprise-only) UpgradeVersionTag is the node tag to use for version info when - // performing upgrade migrations. If left blank, the Consul version will be used. - UpgradeVersionTag string - - // RaftIndex stores the create/modify indexes of this configuration. - RaftIndex -} - // RaftServer has information about a server in the Raft configuration. type RaftServer struct { // ID is the unique ID for the server. These are currently the same @@ -109,7 +72,7 @@ type AutopilotSetConfigRequest struct { Datacenter string // Config is the new Autopilot configuration to use. - Config AutopilotConfig + Config autopilot.Config // CAS controls whether to use check-and-set semantics for this request. CAS bool @@ -123,111 +86,6 @@ func (op *AutopilotSetConfigRequest) RequestDatacenter() string { return op.Datacenter } -// ServerHealth is the health (from the leader's point of view) of a server. -type ServerHealth struct { - // ID is the raft ID of the server. - ID string - - // Name is the node name of the server. - Name string - - // Address is the address of the server. - Address string - - // The status of the SerfHealth check for the server. - SerfStatus serf.MemberStatus - - // Version is the Consul version of the server. - Version string - - // Leader is whether this server is currently the leader. - Leader bool - - // LastContact is the time since this node's last contact with the leader. - LastContact time.Duration - - // LastTerm is the highest leader term this server has a record of in its Raft log. - LastTerm uint64 - - // LastIndex is the last log index this server has a record of in its Raft log. - LastIndex uint64 - - // Healthy is whether or not the server is healthy according to the current - // Autopilot config. - Healthy bool - - // Voter is whether this is a voting server. - Voter bool - - // StableSince is the last time this server's Healthy value changed. - StableSince time.Time -} - -// IsHealthy determines whether this ServerHealth is considered healthy -// based on the given Autopilot config -func (h *ServerHealth) IsHealthy(lastTerm uint64, leaderLastIndex uint64, autopilotConf *AutopilotConfig) bool { - if h.SerfStatus != serf.StatusAlive { - return false - } - - if h.LastContact > autopilotConf.LastContactThreshold || h.LastContact < 0 { - return false - } - - if h.LastTerm != lastTerm { - return false - } - - if leaderLastIndex > autopilotConf.MaxTrailingLogs && h.LastIndex < leaderLastIndex-autopilotConf.MaxTrailingLogs { - return false - } - - return true -} - -// IsStable returns true if the ServerHealth is in a stable, passing state -// according to the given AutopilotConfig -func (h *ServerHealth) IsStable(now time.Time, conf *AutopilotConfig) bool { - if h == nil { - return false - } - - if !h.Healthy { - return false - } - - if now.Sub(h.StableSince) < conf.ServerStabilizationTime { - return false - } - - return true -} - -// ServerStats holds miscellaneous Raft metrics for a server -type ServerStats struct { - // LastContact is the time since this node's last contact with the leader. - LastContact string - - // LastTerm is the highest leader term this server has a record of in its Raft log. - LastTerm uint64 - - // LastIndex is the last log index this server has a record of in its Raft log. - LastIndex uint64 -} - -// OperatorHealthReply is a representation of the overall health of the cluster -type OperatorHealthReply struct { - // Healthy is true if all the servers in the cluster are healthy. - Healthy bool - - // FailureTolerance is the number of healthy servers that could be lost without - // an outage occurring. - FailureTolerance int - - // Servers holds the health of each server. - Servers []ServerHealth -} - // (Enterprise-only) NetworkSegment is the configuration for a network segment, which is an // isolated serf group on the LAN. type NetworkSegment struct { diff --git a/command/operator/autopilot/set/operator_autopilot_set_test.go b/command/operator/autopilot/set/operator_autopilot_set_test.go index cb5bc6e5a9..264c2bcf38 100644 --- a/command/operator/autopilot/set/operator_autopilot_set_test.go +++ b/command/operator/autopilot/set/operator_autopilot_set_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/hashicorp/consul/agent" + "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/structs" "github.com/mitchellh/cli" ) @@ -44,7 +45,7 @@ func TestOperatorAutopilotSetConfigCommmand(t *testing.T) { req := structs.DCSpecificRequest{ Datacenter: "dc1", } - var reply structs.AutopilotConfig + var reply autopilot.Config if err := a.RPC("Operator.AutopilotGetConfiguration", &req, &reply); err != nil { t.Fatalf("err: %v", err) }