More refactoring to make autopilot consul-agnostic

This commit is contained in:
Kyle Havlovitz 2017-12-12 17:45:03 -08:00
parent de28555671
commit b92f895c23
No known key found for this signature in database
GPG Key ID: 8A5E6B173056AD6C
6 changed files with 126 additions and 86 deletions

View File

@ -3,7 +3,10 @@ package consul
import (
"context"
"fmt"
"net"
"strconv"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/raft"
@ -15,24 +18,52 @@ type AutopilotDelegate struct {
server *Server
}
func (d *AutopilotDelegate) FetchStats(ctx context.Context, servers []*metadata.Server) map[string]*autopilot.ServerStats {
return d.server.statsFetcher.Fetch(ctx, servers)
}
func (d *AutopilotDelegate) GetOrCreateAutopilotConfig() (*autopilot.Config, bool) {
func (d *AutopilotDelegate) AutopilotConfig() *autopilot.Config {
return d.server.getOrCreateAutopilotConfig()
}
func (d *AutopilotDelegate) Raft() *raft.Raft {
return d.server.raft
func (d *AutopilotDelegate) FetchStats(ctx context.Context, servers []serf.Member) map[string]*autopilot.ServerStats {
return d.server.statsFetcher.Fetch(ctx, servers)
}
func (d *AutopilotDelegate) Serf() *serf.Serf {
return d.server.serfLAN
func (d *AutopilotDelegate) IsServer(m serf.Member) (bool, *autopilot.ServerInfo) {
if m.Tags["role"] != "consul" {
return false, nil
}
port_str := m.Tags["port"]
port, err := strconv.Atoi(port_str)
if err != nil {
return false, nil
}
build_version, err := metadata.Build(&m)
if err != nil {
return false, nil
}
return true, &autopilot.ServerInfo{
Name: m.Name,
ID: m.Tags["id"],
Addr: &net.TCPAddr{IP: m.Addr, Port: port},
Build: *build_version,
Status: m.Status,
}
}
func (d *AutopilotDelegate) NumPeers() (int, error) {
return d.server.numPeers()
// Heartbeat a metric for monitoring if we're the leader
func (d *AutopilotDelegate) NotifyHealth(health autopilot.OperatorHealthReply) {
if d.server.raft.State() == raft.Leader {
metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(health.FailureTolerance))
metrics.SetGauge([]string{"autopilot", "failure_tolerance"}, float32(health.FailureTolerance))
if health.Healthy {
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1)
metrics.SetGauge([]string{"autopilot", "healthy"}, 1)
} else {
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0)
metrics.SetGauge([]string{"autopilot", "healthy"}, 0)
}
}
}
func (d *AutopilotDelegate) PromoteNonVoters(conf *autopilot.Config, health autopilot.OperatorHealthReply) ([]raft.Server, error) {
@ -43,3 +74,11 @@ func (d *AutopilotDelegate) PromoteNonVoters(conf *autopilot.Config, health auto
return autopilot.PromoteStableServers(conf, health, future.Configuration().Servers), nil
}
func (d *AutopilotDelegate) Raft() *raft.Raft {
return d.server.raft
}
func (d *AutopilotDelegate) Serf() *serf.Serf {
return d.server.serfLAN
}

View File

@ -4,21 +4,22 @@ import (
"context"
"fmt"
"log"
"net"
"strconv"
"sync"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/go-version"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)
// Delegate is the interface for the Autopilot mechanism
type Delegate interface {
FetchStats(ctx context.Context, servers []*metadata.Server) map[string]*ServerStats
GetOrCreateAutopilotConfig() (*Config, bool)
NumPeers() (int, error)
AutopilotConfig() *Config
FetchStats(context.Context, []serf.Member) map[string]*ServerStats
IsServer(serf.Member) (bool, *ServerInfo)
NotifyHealth(OperatorHealthReply)
PromoteNonVoters(*Config, OperatorHealthReply) ([]raft.Server, error)
Raft() *raft.Raft
Serf() *serf.Serf
@ -28,9 +29,8 @@ type Delegate interface {
// quorum using server health information along with updates from Serf gossip.
// For more information, see https://www.consul.io/docs/guides/autopilot.html
type Autopilot struct {
logger *log.Logger
delegate Delegate
validServerFunc func(serf.Member) bool
logger *log.Logger
delegate Delegate
interval time.Duration
healthInterval time.Duration
@ -43,18 +43,25 @@ type Autopilot struct {
waitGroup sync.WaitGroup
}
func NewAutopilot(logger *log.Logger, delegate Delegate, serverFunc func(serf.Member) bool, interval, healthInterval time.Duration) *Autopilot {
type ServerInfo struct {
Name string
ID string
Addr net.Addr
Build version.Version
Status serf.MemberStatus
}
func NewAutopilot(logger *log.Logger, delegate Delegate, interval, healthInterval time.Duration) *Autopilot {
return &Autopilot{
logger: logger,
delegate: delegate,
validServerFunc: serverFunc,
interval: interval,
healthInterval: healthInterval,
logger: logger,
delegate: delegate,
interval: interval,
healthInterval: healthInterval,
removeDeadCh: make(chan struct{}),
}
}
func (a *Autopilot) Start() {
a.removeDeadCh = make(chan struct{})
a.shutdownCh = make(chan struct{})
a.waitGroup = sync.WaitGroup{}
a.waitGroup.Add(1)
@ -67,7 +74,7 @@ func (a *Autopilot) Stop() {
a.waitGroup.Wait()
}
// autopilotLoop periodically looks for nonvoting servers to promote and dead servers to remove.
// run periodically looks for nonvoting servers to promote and dead servers to remove.
func (a *Autopilot) run() {
defer a.waitGroup.Done()
@ -80,8 +87,8 @@ func (a *Autopilot) run() {
case <-a.shutdownCh:
return
case <-ticker.C:
autopilotConfig, ok := a.delegate.GetOrCreateAutopilotConfig()
if !ok {
autopilotConfig := a.delegate.AutopilotConfig()
if autopilotConfig == nil {
continue
}
@ -101,16 +108,16 @@ func (a *Autopilot) run() {
}
}
if err := a.pruneDeadServers(autopilotConfig); err != nil {
if err := a.pruneDeadServers(); err != nil {
a.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
}
case <-a.removeDeadCh:
autopilotConfig, ok := a.delegate.GetOrCreateAutopilotConfig()
if !ok {
autopilotConfig := a.delegate.AutopilotConfig()
if autopilotConfig == nil {
continue
}
if err := a.pruneDeadServers(autopilotConfig); err != nil {
if err := a.pruneDeadServers(); err != nil {
a.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
}
}
@ -122,12 +129,19 @@ func fmtServer(server raft.Server) string {
return fmt.Sprintf("Server (ID: %q Address: %q)", server.ID, server.Address)
}
// pruneDeadServers removes up to numPeers/2 failed servers
func (a *Autopilot) pruneDeadServers(conf *Config) error {
if !conf.CleanupDeadServers {
return nil
// NumPeers counts the number of voting peers in the given raft config.
func NumPeers(raftConfig raft.Configuration) int {
var numPeers int
for _, server := range raftConfig.Servers {
if isVoter(server.Suffrage) {
numPeers++
}
}
return numPeers
}
// pruneDeadServers removes up to numPeers/2 failed servers
func (a *Autopilot) pruneDeadServers() error {
// Failed servers are known to Serf and marked failed, and stale servers
// are known to Raft but not Serf.
var failed []string
@ -137,13 +151,17 @@ func (a *Autopilot) pruneDeadServers(conf *Config) error {
if err := future.Error(); err != nil {
return err
}
for _, server := range future.Configuration().Servers {
raftConfig := future.Configuration()
for _, server := range raftConfig.Servers {
staleRaftServers[string(server.Address)] = server
}
serfLAN := a.delegate.Serf()
for _, member := range serfLAN.Members() {
valid, parts := metadata.IsConsulServer(member)
valid, parts := a.delegate.IsServer(member)
if valid {
// todo(kyhavlov): change this to index by UUID
if _, ok := staleRaftServers[parts.Addr.String()]; ok {
delete(staleRaftServers, parts.Addr.String())
}
@ -161,10 +179,7 @@ func (a *Autopilot) pruneDeadServers(conf *Config) error {
}
// Only do removals if a minority of servers will be affected.
peers, err := a.delegate.NumPeers()
if err != nil {
return err
}
peers := NumPeers(raftConfig)
if removalCount < peers/2 {
for _, node := range failed {
a.logger.Printf("[INFO] autopilot: Attempting removal of failed server node %q", node)
@ -203,7 +218,7 @@ func (a *Autopilot) MinRaftProtocol() (int, error) {
continue
}
if !a.validServerFunc(m) {
if ok, _ := a.delegate.IsServer(m); !ok {
continue
}
@ -287,25 +302,24 @@ func (a *Autopilot) updateClusterHealth() error {
return nil
}
autopilotConf, ok := a.delegate.GetOrCreateAutopilotConfig()
if !ok {
return nil
}
autopilotConf := a.delegate.AutopilotConfig()
// Bail early if autopilot config hasn't been initialized yet
if autopilotConf == nil {
return nil
}
// Get the the serf members which are Consul servers
serverMap := make(map[string]*metadata.Server)
var serverMembers []serf.Member
serverMap := make(map[string]*ServerInfo)
for _, member := range a.delegate.Serf().Members() {
if member.Status == serf.StatusLeft {
continue
}
valid, parts := metadata.IsConsulServer(member)
valid, parts := a.delegate.IsServer(member)
if valid {
serverMap[parts.ID] = parts
serverMembers = append(serverMembers, member)
}
}
@ -320,7 +334,7 @@ func (a *Autopilot) updateClusterHealth() error {
// consistent of a sample as possible. We capture the leader's index
// here as well so it roughly lines up with the same point in time.
targetLastIndex := raftNode.LastIndex()
var fetchList []*metadata.Server
var fetchList []*ServerInfo
for _, server := range servers {
if parts, ok := serverMap[string(server.ID)]; ok {
fetchList = append(fetchList, parts)
@ -329,7 +343,7 @@ func (a *Autopilot) updateClusterHealth() error {
d := time.Now().Add(a.healthInterval / 2)
ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
fetchedStats := a.delegate.FetchStats(ctx, fetchList)
fetchedStats := a.delegate.FetchStats(ctx, serverMembers)
// Build a current list of server healths
leader := raftNode.Leader()
@ -380,18 +394,7 @@ func (a *Autopilot) updateClusterHealth() error {
clusterHealth.FailureTolerance = healthyVoterCount - requiredQuorum
}
// Heartbeat a metric for monitoring if we're the leader
if raftNode.State() == raft.Leader {
metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
metrics.SetGauge([]string{"autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
if clusterHealth.Healthy {
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1)
metrics.SetGauge([]string{"autopilot", "healthy"}, 1)
} else {
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0)
metrics.SetGauge([]string{"autopilot", "healthy"}, 0)
}
}
a.delegate.NotifyHealth(clusterHealth)
a.clusterHealthLock.Lock()
a.clusterHealth = clusterHealth
@ -403,7 +406,7 @@ func (a *Autopilot) updateClusterHealth() error {
// updateServerHealth computes the resulting health of the server based on its
// fetched stats and the state of the leader.
func (a *Autopilot) updateServerHealth(health *ServerHealth,
server *metadata.Server, stats *ServerStats,
server *ServerInfo, stats *ServerStats,
autopilotConf *Config, targetLastIndex uint64) error {
health.LastTerm = stats.LastTerm

View File

@ -329,30 +329,30 @@ func (s *Server) initializeACL() error {
}
// getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary
func (s *Server) getOrCreateAutopilotConfig() (*autopilot.Config, bool) {
func (s *Server) getOrCreateAutopilotConfig() *autopilot.Config {
state := s.fsm.State()
_, config, err := state.AutopilotConfig()
if err != nil {
s.logger.Printf("[ERR] autopilot: failed to get config: %v", err)
return nil, false
return nil
}
if config != nil {
return config, true
return config
}
if !ServersMeetMinimumVersion(s.LANMembers(), minAutopilotVersion) {
s.logger.Printf("[WARN] autopilot: can't initialize until all servers are >= %s", minAutopilotVersion.String())
return nil, false
return nil
}
config = s.config.AutopilotConfig
req := structs.AutopilotSetConfigRequest{Config: *config}
if _, err = s.raftApply(structs.AutopilotRequestType, req); err != nil {
s.logger.Printf("[ERR] autopilot: failed to initialize config: %v", err)
return nil, false
return nil
}
return config, true
return config
}
// reconcileReaped is used to reconcile nodes that have failed and been reaped

View File

@ -304,10 +304,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
// Set up autopilot
apDelegate := &AutopilotDelegate{s}
serverFunc := func(m serf.Member) bool {
return m.Tags["role"] == "consul"
}
s.autopilot = autopilot.NewAutopilot(logger, apDelegate, serverFunc, config.AutopilotInterval, config.ServerHealthInterval)
s.autopilot = autopilot.NewAutopilot(logger, apDelegate, config.AutopilotInterval, config.ServerHealthInterval)
// Initialize the stats fetcher that autopilot will use.
s.statsFetcher = NewStatsFetcher(logger, s.connPool, s.config.Datacenter)
@ -832,13 +829,7 @@ func (s *Server) numPeers() (int, error) {
return 0, err
}
var numPeers int
for _, server := range future.Configuration().Servers {
if server.Suffrage == raft.Voter {
numPeers++
}
}
return numPeers, nil
return autopilot.NumPeers(future.Configuration()), nil
}
// JoinLAN is used to have Consul join the inner-DC pool

View File

@ -8,6 +8,7 @@ import (
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/serf/serf"
)
// StatsFetcher has two functions for autopilot. First, lets us fetch all the
@ -56,14 +57,20 @@ func (f *StatsFetcher) fetch(server *metadata.Server, replyCh chan *autopilot.Se
}
// Fetch will attempt to query all the servers in parallel.
func (f *StatsFetcher) Fetch(ctx context.Context, servers []*metadata.Server) map[string]*autopilot.ServerStats {
func (f *StatsFetcher) Fetch(ctx context.Context, members []serf.Member) map[string]*autopilot.ServerStats {
type workItem struct {
server *metadata.Server
replyCh chan *autopilot.ServerStats
}
var work []*workItem
var servers []*metadata.Server
for _, s := range members {
if ok, parts := metadata.IsConsulServer(s); ok {
servers = append(servers, parts)
}
}
// Skip any servers that have inflight requests.
var work []*workItem
f.inflightLock.Lock()
for _, server := range servers {
if _, ok := f.inflight[server.ID]; ok {

View File

@ -47,7 +47,7 @@ func TestStatsFetcher(t *testing.T) {
func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
stats := s1.statsFetcher.Fetch(ctx, servers)
stats := s1.statsFetcher.Fetch(ctx, s1.LANMembers())
if len(stats) != 3 {
t.Fatalf("bad: %#v", stats)
}
@ -73,7 +73,7 @@ func TestStatsFetcher(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
stats := s1.statsFetcher.Fetch(ctx, servers)
stats := s1.statsFetcher.Fetch(ctx, s1.LANMembers())
if len(stats) != 2 {
t.Fatalf("bad: %#v", stats)
}